AboutContactENPT

Using Spring Batch to create an ETL for financial information

February 14, 2020

I've started to play with Spring Batch and ETLs and this is what I've got (so far)….

The year of 2019 has been very intense for me because I've had the opportunity to learn a lot about Web development using Java Spring and DevOps culture in general. However, there is a saying: "The more you know, the more you realize you don't know". So here I was looking a way to deepen even further my knowledge about the Spring ecosystem, when I've got curious on Spring Batch project.

For those who don't know Spring Batch is a subproject from Spring framework aimed for batch processing. This project contains many resources in its toolset that allows a systematic, deterministic and trackable execution of batch jobs. An interesting example of application is the use Spring Batch to process a data streaming such as a spreadsheet or a database. And this is the very first example I'm going to show here launching my first article for real in my blog =D

While searching for a nice use case to understand a little more of Spring Batch and since last that last year I was part of a group of study in data science, a university extension program from UNILA, which you can check more about it here link, I've come to know the concept of ETL.

ELT (TL;DR)

A ETL is an acronym which means Extract Transform and Load. The idea itself is very simple and consists in operations that involve big amounts of data. These operations are made basically in three steps: extraction, transformation and loading (or storage). Complete definition on Wikipedia.

Its use is very common for transferring one, or even many data from a storage dataset into another one. And between the origin and destination it is possible to make transformations in the data such as aggregation, formatting and other things.

Description of the problem I've hunted

With this concept in mind, I needed a dataset to begin with. Since I really enjoy learning more about financial products, such as fixed income, stocks and real state investment trusts, I've searched something involving these types of data. And then I've discovered that CVM (Brazilian equivalent of American's SEC or UK's FCA) has an open data portal where it is possible to download a huge amount of spreadsheets related to investment funds such as daily report, register data and many others.

From the data sets available through CVM, there is a set called "Daily report" (informe diário), which according to CVM it has financial statements such as:

  • Market Cap
  • Net worth
  • Single share value
  • Fundraising during the day
  • Reclaims paid during the day
  • Number of shareholders

I've downloaded a spreadsheet of a random day and boom! A CSV with more than 200,000 lines of structured data. Perfect to roll up my sleeves and starting to play.

Every row in the CSV file represents information about an investment fund in a given day. From the dozens of columns available in the spreadsheet, I've peeked some to use on my ETL solution such as:

  1. CNPJ of the funds' issuing company (CNPJ is a Brazilian equivalent of American EIN or UK's Employer Number)
  2. Referenced date
  3. Total Value
  4. Quota(share) Value
  5. Net Worth
  6. Fundraising during the day(total of deposits)
  7. Reclaims paid during the day(total of withdraws)
  8. Number of shareholders (quote owners)

For the first step I thought in doing only a simple ETL, which means load the information from de CSV and store them in a MySQL database. But in order to do at least one thing in the transformation step, I've just simply removed CPNJ field formatting by stripping dots and slashes and saving only digits. In Brazil a CNPJ is often represented like XX.XXX.XXX/0001-XX, so I've just converted to XXXXXXXX0001XX.

The napkin sketch of my solution were something like that:

ETL Schema

The idea is, read the CSV file, performing an ETL using Spring Batch, save information in database table using Spring data and finally use Spring Web to publish the information in JSON format through an API.

The data schema of the database entity I've created into the database is the following:

DailyInform

Now that the general idea and architecture of the solution is shown, let's write a bit of code!

Step 1, making my "Hello World" with Spring Batch

In order to use Spring Batch, first you need to insert its dependencies into your project. You can begin by creating a project in Spring Initializr by picking the following projects:

  • Spring boot starter data jpa: (to connect to the database)
  • Spring boot starter web: (to create the web layer responsible to expose data via an API REST using JSON).
  • mySql-connector-java: For this tutorial I've chosen MySQL but any other relational database will do.
  • spring-boot-starter-batch: The shining star of the solution, it contains the necessary classes to use Spring Batch.

SpringInitializr

Once the Zip file is downloaded, just open it in your favorite IDE and start coding!

First it is necessary to create the entity DailyInform, that will be the final form where the converted information from the spreadsheet will be stored as table in the database.

import javax.persistence.*; import javax.validation.constraints.NotNull; import java.io.Serializable; import java.math.BigDecimal; import java.time.LocalDate; @Entity @Table(indexes = { @Index(columnList = "cnpj", name = "cnpj_hidx"), @Index(columnList = "referenceDate", name = "reference_date_hidx")}, uniqueConstraints = @UniqueConstraint(columnNames = { "cnpj", "referenceDate" })) public class DailyInform implements Serializable { @Id @GeneratedValue(strategy = GenerationType.AUTO) private Long id; @NotNull @CNPJ @Column(nullable = false) private String cnpj; @NotNull @Column(nullable = false) private LocalDate referenceDate; @NotNull @Column(nullable = false) private BigDecimal totalValue; @NotNull @Column(nullable = false) private BigDecimal quotaValue; @NotNull @Column(nullable = false) private BigDecimal netWorth; @NotNull @Column(nullable = false) private BigDecimal totalDeposits; @NotNull @Column(nullable = false) private BigDecimal totalWithdrawals; @NotNull @Column(nullable = false) private Long numberOfQuotaHolders; public DailyInform() {}; public DailyInform(String cnpj, LocalDate referenceDate, BigDecimal totalValue, BigDecimal quotaValue, BigDecimal netWorth, BigDecimal totalDeposits, BigDecimal totalWithdrawals, Long numberOfQuotaHolders) { this.cnpj = cnpj; this.referenceDate = referenceDate; this.totalValue = totalValue; this.quotaValue = quotaValue; this.netWorth = netWorth; this.totalDeposits = totalDeposits; this.totalWithdrawals = totalWithdrawals; this.numberOfQuotaHolders = numberOfQuotaHolders; } @Override public String toString() { return "DailyInform{" + "cnpj='" + cnpj + '\'' + ", referenceDate=" + referenceDate + ", quotaValue=" + quotaValue + '}'; } // .... Getters/Setters }

Notice that this is an elementary JPA entity just to show the concepts. I've also added some indexes to improve query performance.

The batch processing in Spring is executed through Jobs. A job is made of Steps that define successive tasks of reading, transformation and writing (for this tutorial I've made a Job with a single Step). The Step basically has three elements:

  1. A reader: Defines what and how is going to be read. In the step it is defined from where that data will come (in this case a spreadsheet), and which columns will be digested.
  2. A processor: Defines a transformation operation the read data will have to go through until is ready to be written. It receives an entry object and returns another object.
  3. A writer: It defines where the just processed data will be written, in this tutorial we will use a Repository bean to write information into the database.

To use a Job in SpringBatch it is necessary to configure it first. The snippet below is an example of a @Configuration bean (the complete code you'll find in my GitHub):

@Configuration @EnableBatchProcessing public class SpringBatchConfiguration { @Bean public Job job(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory, ItemReader<DailyInform> itemReader, ItemProcessor<DailyInform, DailyInform> itemProcessor, ItemWriter<DailyInform> itemWriter) { Step step = stepBuilderFactory.get("ETL-file-load") .<DailyInform, DailyInform>chunk(1000) .reader(itemReader) .processor(itemProcessor) .writer(itemWriter) .build(); return jobBuilderFactory.get("ETL-Load") .incrementer(new RunIdIncrementer()) .start(step) .build(); } // more beans will come }

As it can be seen on line 25, it is necessary to use an annotation @EnableBatchProcessing to habilitate Spring Batch in the project. The Job is exposed to the application through a bean, being managed by the Spring Container. The method job receive five parameters: jobBuilderFactory, stepBuilderFactory, itemReader, itemProcessor and itemWriter. The factories are injected by Spring itself, and it's up to us to define other parameters through beans in the application. Once defined, Spring can inject them into the method and create the Job. Notice that reader, processor and writer types are strongly typed according to the modeled entity DailyInform. It is perfectly possible to model a flux where reading is a type, converted to another type during processing and later written in the write stage. But in order to keep things simple in this tutorial, I will stick with the idea to keep the same type during the whole cycle.

Thanks to dependency injection, Spring is able to assemble the Job using a reader, processor and writer that will also be application beans. I'm going to explain how they are created and the details of each one of them in the sessions below.

Step 2 - Creating the reading bean

We can create a reading bean from class FlatFileItemReader<T>, a class that indirectly implements the interface ItemReader<T>. The easiness of using this class comes from its design specially made for reading files row by row (perfect for this CSV example). Its configuration is relatively simple, however it demands the creation of another auxiliary bean. An implementation example follows:

@Bean public FlatFileItemReader<DailyInform> fileItemReader(@Value("${input}") Resource resource) { FlatFileItemReader<DailyInform> fileItemReader = new FlatFileItemReader<>(); fileItemReader.setResource(resource); fileItemReader.setEncoding("ISO-8859-3"); fileItemReader.setName("CSV-Reader"); fileItemReader.setLinesToSkip(1); fileItemReader.setLineMapper(lineMapper()); return fileItemReader; }

In whitch:

Input: Resource (a reference to the .csv file), here I've set up the path in an environment variable for convenience.

Output: An object from type FlatFileItemReader<DailyInform> where I define:

  • The resource to be read.
  • The file encoding, in this case I've discovered that the spreadsheet is not in UTF-8, therefore I needed to inform the correct encoding so there were no misreading parsing for special characters from Portuguese such as à, ó and ã for example.
  • Name of this bean: Probably used by Spring for an internal index of this reader.
  • Jumping lines: I've set up the value "1" so it skips the header and goes straight to the data.
  • The LineMapper: The auxiliary bean need to configure the reading policies, its creation will be detailed below.

Assembling the LineMapper

The LineMapper is the bean that basically defines the reading logic such as: What character is the column delimiter, the column names and also how to convert to an object each row read. Its construction goes as follows:

@Bean public LineMapper<DailyInform> lineMapper() { DefaultLineMapper<DailyInform> defaultLineMapper = new DefaultLineMapper<>(); DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer(); lineTokenizer.setDelimiter(";"); lineTokenizer.setNames("CNPJ_FUNDO", "DT_COMPTC", "VL_TOTAL", "VL_QUOTA", "VL_PATRIM_LIQ", "CAPTC_DIA", "RESG_DIA", "NR_COTST"); lineTokenizer.setStrict(false); DailyInformFieldSetMapper dailyInformFieldSetMapper = new DailyInformFieldSetMapper(); defaultLineMapper.setLineTokenizer(lineTokenizer); defaultLineMapper.setFieldSetMapper(dailyInformFieldSetMapper); return defaultLineMapper; }

The returned object by this method is an instance of class DefaultLineMaper, which implements the interface LineMapper, where is configured:

  • The delimiter: in the case of the downloaded CSV file from CVM, the character ; has been used to split the data for each column.
  • Define the tokens (or names) of each column that will be read, as well how many will be processed. Later these names are going to be used by the bean responsible to insert the information of each line in a POJO DailyInform.
  • Set strict reading: In case of true, every line needs to have the exact number of columns, if this option is false SpringBatch accepts reading lines which may not have the same number of columns as defined by the tokens. When this happens, the missing values will be filled with empty, if there are more columns than tokens they will simply be ignored.
  • The fieldsetmapper: Bean which defines how the data read from rows will be converted into a POJO. In this case I've created a class called DailyInformFieldSetMapper responsible for this job.

The DailyInformFieldSetMapper (because there is no such thing as magic)

This class implements the interface FieldSetMapper<T>, that by its contract, demands the implementation of the method <T> mapFieldSet(FieldSet fieldSet), this method receives as a parameter a line read by LineMapper and it's responsible to for setting the attributes from class DailyInform with each token data. This is the very heart of data conversion from a CSV line into a Java object (that's why I've said before that there's no such thing as magic xD). The class has only the method mapFieldSet and stands as below:

public class DailyInformFieldSetMapper implements FieldSetMapper<DailyInform> { @Override public DailyInform mapFieldSet(FieldSet fieldSet) throws BindException { final DailyInform dailyInform = new DailyInform(); dailyInform.setCnpj(fieldSet.readString("CNPJ_FUNDO")); dailyInform.setReferenceDate(fieldSet.readDate("DT_COMPTC") .toInstant().atZone(ZoneId.systemDefault()).toLocalDate()); dailyInform.setTotalValue(fieldSet.readBigDecimal("VL_TOTAL")); dailyInform.setQuotaValue(fieldSet.readBigDecimal("VL_QUOTA")); dailyInform.setNetWorth(fieldSet.readBigDecimal("VL_PATRIM_LIQ")); dailyInform.setTotalDeposits(fieldSet.readBigDecimal("CAPTC_DIA")); dailyInform.setTotalWithdrawals(fieldSet.readBigDecimal("RESG_DIA")); dailyInform.setNumberOfQuotaHolders(fieldSet.readLong("NR_COTST")); return dailyInform; } }

With those steps done, everything is set and configured for the reading step, now there are only two remaining steps: transformation and writing. Fortunately they are simpler as I'll show below.

Step 3 - Configuring the Processor

The class DailyInformProcessor is responsible for transforming operations upon the object DailyInform. In this stage is possible to perform all kinds of operations such as apply calculations, formatting, data aggregation and even convert the output to another completely different object. In the case of this tutorial, just as an example I've used this step to format the CNPJ value from companies by removing dots and slashes. For that I borrowed a help from a library called Alura Stella, which contain many helping functions to manipulate Brazilian documents. Besides, I haven't modified the type of the output object (DailyInform comes in, DailyInform comes out). The class has a very simple implementation and stands as follows:

@Component public class DailyInformProcessor implements ItemProcessor<DailyInform, DailyInform> { @Autowired private CNPJFormatter cnpjFormatter; @Override public DailyInform process(DailyInform dailyInform) throws Exception { dailyInform.setCnpj(cnpjFormatter.unformat(dailyInform.getCnpj())); return dailyInform; } }

Notice that it's necessary to implement the methods process from interface ItemProcessor, very self-explanatory.

Lastly, there's only the writing stage to build.

Step 4 - Configuring Writing

First, let's create a repository for the entity DailyInform which is going to be used by Writer to writer data into the Database.

@Repository public interface DailyInformRepository extends JpaRepository<DailyInform, Long> { List<DailyInform> findDistinctByCnpj(String cnpj); }

It's a simple repository used by Spring Data without any hidden mystery. I've just added a method to find daily inform via CNPJ because I've created an endpoint in the web layer that shows results by company.

Created the repository, now it is just to create the Writer, which basically is a Spring component that implements the interface ItemWeriter:

@Component public class DBWriter implements ItemWriter<DailyInform> { private final DailyInformRepository dailyInformRepository; @Autowired public DBWriter(DailyInformRepository dailyInformRepository) { this.dailyInformRepository = dailyInformRepository; } @Override public void write(List<? extends DailyInform> list) { dailyInformRepository.saveAll(list); } }

The only method that the interface demands implementation is write. This method gets a list with data (the chunks that were transformed from the previous step), which in turn I send to the database using DailyInformRepository, by injecting the latter into the class by the attribute dailyInformRepository. The JPA repositories have a method called saveAll which accepts a Java collection that is going to be saved in the database. By performing these steps the information is persisted, and the data finally get to its final destination =D

Bonus - Web Layer

Once the data has been persisted into the database, it is fairly simple to create a web layer to expose the information in JSON format. If you're up to follow a very traditional architecture and want to follow a N-layers style, you can create a Service and a Controller. The snippets below show how to do that:

@Service public class FundsService { private final DailyInformRepository dailyInformRepository; @Autowired public FundsService(DailyInformRepository dailyInformRepository) { this.dailyInformRepository = dailyInformRepository; } @Transactional(readOnly = true) public List<DailyInform> getDailyInformByCNPJ(String cnpj) { return dailyInformRepository.findDistinctByCnpj(cnpj); } }

And the controller:

@RestController @RequestMapping("/api/daily-informs") public class FundsController { private final FundsService fundsService; @Autowired public FundsController(FundsService fundsService) { this.fundsService = fundsService; } @GetMapping("/{cnpj}") public ResponseEntity<List<DailyInform>> all(@PathVariable("cnpj") String cnpj) { return ResponseEntity.of(Optional.of(fundsService.getDailyInformByCNPJ(cnpj))); } }

About these two components I have the following points:

  1. Notice that I inject the repository into the service and the service into the controller. Following the architecture guidelines of N-layers, exposing the data layer only to the service layer.
  2. In FundsService I've inserted an annotation @Transaction with its parameter readonly set to true. This way I tell Spring to open a transactional context in the method that won't perform writing operations into the database, achieving a transactional behavior with a performance gain.
  3. In the controller I've decided to use CNPJ as a PathParam, but there are other ways this can be done (through a non-nullable query parameter for example).
  4. Lastly I've injected all components through class constructors, I prefer doing this way because I can declare my dependencies as final, so the class attributes are immutable.

Firing in up!

With all said and done now it's time to run this batch. To run the job as soon as Spring starts I've implemented a interface called CommandLineRunner in the main class which runs the configured job. The result is as follows:

@SpringBootApplication public class BatchProcessingApplication implements CommandLineRunner { @Bean public CNPJFormatter cnpjFormatter() { return new CNPJFormatter(); } @Autowired private JobLauncher jobLauncher; @Autowired private Job job; public static void main(String[] args) { SpringApplication.run(BatchProcessingApplication.class, args); } @Override public void run(String... args) throws Exception { jobLauncher.run(job, new JobParameters()); } }

The job bean is detected automatically by Spring, and it comes from the Job created in the configuration file, besides, it is necessary a JobLaucher which is nothing more that a bean provided by Spring you can use it to launch your job. We do this by inserting the attribute job autowired inside the method run from jobLauncher. The parameter JobParameters is basically a Wrapper with a Map<String, Object> which allows you to send parameters to the job going to be launched (it's not going to be used at this tutorial, so we can leave it as an empty instance).

With this done, the application is ready to be run (by running the main method). Check that your database is running and it's reachable for your application.

In my machine it took about 90 minutes to process 200k lines of data.

Results Cmd

This is how the database looks like:

Results DB

This is the result by calling the endpoint I've created to bring results by CNPJ. Cool isn't it ? XD

Results JSON

Conclusion

Wow if you've got so far congratulations! Hehehe this tutorial got bigger than I thought, but is fine to start this blog with the right foot. It's obvious that there is much to improve, but now I have "a north" and I expect to write 2 articles here every month.

Regarding the application, well it worked, but I believe that there is way too many improvement points. Some stuff I've already discovered how to improve and can write about it in another article. An example is what to do if there's a line with irregular information in the spreadsheet? The way it stands now the whole job stops running. Since this is not desirable, it is necessary to define a tolerance policy and error registration. Another thing that is also possible to do is running the execution in parallel, how to make more efficient? Is there additional database configs that can be made? Stay tuned for more content coming up.

I hope you've enjoyed my first article and I ask you to share with your collegues. Feel free to comment, the more feedback received the better!

Application GitHub

References

For more reference pleas go the following sites. I've made this tutorial based on them.

Guide

This guide also is very good and comes from Spring itself, I recommend for everyone who wants that to learn further.

Cheers;

© 2022 Lucas de Castro Oliveira