Maxresdefault

Batch processing with Spring Batch and Apache Kafka — Quick Start

Home > Creations > Batch processing with Spring Batch and Apache Kafka — Quick Start

anu priyaLast Seen: Oct 9, 2023 @ 4:08am 4OctUTC
anu priya
@anu-priya

In this blog I will be briefly explaining on how to get started with Spring batch with Apache Kafka with a single broker. The initial assumption that I have made is that your system has no pre-existing Kafka or Zookeeper installation.

Spring Boot + Spring batch + Apache kafka

Step 1: Download and Run Apache Kafka .

To download and install Kafka, please refer the official guide https://kafka.apache.org/quickstart .

Once you download Kafka, you can issue a command to start @gavindya/what-is-zookeeper-db8dfc30fc9b” rel=”noopener”>ZooKeeper which is used by Kafka to store metadata.                  

zookeeper-server-start.bat .\config\zookeeper.properties

Next, we need to start the Kafka cluster locally by issuing the below command.

kafka-server-start.bat .\config\server.properties

Now, by default, the Kafka server starts on localhost:9092

Step 2: Create Spring Boot Application.

Create Simple Spring boot application with spring boot initializer https://start.spring.io/.

And also with dependencies listed below.

  1. kafka
  2. batch
  3. lombok
  4. h2 database
  5. gson

Step 2: Create Producer to Post Message

Here cron job created to post customer message to kafka topic customer_topic

@Scheduled(fixedRate = 10000)
 public void produce() {
     for(int i = 1; i < 3; i++) {
         String id = UUID.randomUUID().toString();
         String customer = new Customer(id, Math.random() > 0.5 ?   “Manoj”:”Kumar”).toString();
         System.out.println(“Produced :: “ + customer);
         this.kafkaTemplate.send(“customer_topic”,id, customer);
 }

Step 2: Create batch consumer to consume produced message

For creating batch consumer we have to configure 3 things (Job, ItemReader, ItemWriter)

Create Spring Batch Job

@Bean
 Job job() {
 return jobBuilderFactory.get(“job”)
                         .incrementer(new RunIdIncrementer())
                         .start(start())
                         .build();
 }

Create ItemReader

Note: Here our data source is Kafka therefore KafkaItemReader is used

@Bean
 KafkaItemReader<String, String> kafkaItemReader() {
     Properties props = new Properties();
     props.putAll(this.properties.buildConsumerProperties());
     return new KafkaItemReaderBuilder<String, String>()
            .partitions(0).consumerProperties(props)
            .name(“customer-reader”).saveState(true)
            .topic(“customer-topic”).build();
}

Create ItemWriter and Step for Job

@Bean
 Step start() {
     ItemWriter writer = new ItemWriter<String>() {
     @Override
     public void write(List<? extends String> items) 
     throws Exception {
         items.forEach(item -> 
                System.out.println(“Consumed Message “ + item));
         }
     }; 
     return stepBuilderFactory.get(“job”)
                              .chunk(0)
                              .reader(kafkaItemReader())
                              .writer(writer)
                              .build();
 }

The above producer and consumer spring boot application code is available in below link

Spring kafka Batch 

Pre-requisite

Batch Processing , kafka, Spring Boot, SpringMaxresdefault

Happy Coding  ♥

anu priyaLast Seen: Oct 9, 2023 @ 4:08am 4OctUTC

anu priya

@anu-priya





Published: | Last Updated: | Views: 17

You may also like

Leave a Reply