In this blog I will be briefly explaining on how to get started with Spring batch with Apache Kafka with a single broker. The initial assumption that I have made is that your system has no pre-existing Kafka or Zookeeper installation.
Spring Boot + Spring batch + Apache kafka
Step 1: Download and Run Apache Kafka .
To download and install Kafka, please refer the official guide https://kafka.apache.org/quickstart .
Once you download Kafka, you can issue a command to start @gavindya/what-is-zookeeper-db8dfc30fc9b” rel=”noopener”>ZooKeeper which is used by Kafka to store metadata.
zookeeper-server-start.bat .\config\zookeeper.properties
Next, we need to start the Kafka cluster locally by issuing the below command.
kafka-server-start.bat .\config\server.properties
Now, by default, the Kafka server starts on localhost:9092
Step 2: Create Spring Boot Application.
Create Simple Spring boot application with spring boot initializer https://start.spring.io/.
And also with dependencies listed below.
- kafka
- batch
- lombok
- h2 database
- gson
Step 2: Create Producer to Post Message
Here cron job created to post customer message to kafka topic customer_topic
@Scheduled(fixedRate = 10000)
public void produce() {
for(int i = 1; i < 3; i++) {
String id = UUID.randomUUID().toString();
String customer = new Customer(id, Math.random() > 0.5 ? “Manoj”:”Kumar”).toString();
System.out.println(“Produced :: “ + customer);
this.kafkaTemplate.send(“customer_topic”,id, customer);
}
Step 2: Create batch consumer to consume produced message
For creating batch consumer we have to configure 3 things (Job, ItemReader, ItemWriter)
Create Spring Batch Job
@Bean
Job job() {
return jobBuilderFactory.get(“job”)
.incrementer(new RunIdIncrementer())
.start(start())
.build();
}
Create ItemReader
Note: Here our data source is Kafka therefore KafkaItemReader is used
@Bean
KafkaItemReader<String, String> kafkaItemReader() {
Properties props = new Properties();
props.putAll(this.properties.buildConsumerProperties());
return new KafkaItemReaderBuilder<String, String>()
.partitions(0).consumerProperties(props)
.name(“customer-reader”).saveState(true)
.topic(“customer-topic”).build();
}
Create ItemWriter and Step for Job
@Bean
Step start() {
ItemWriter writer = new ItemWriter<String>() {
@Override
public void write(List<? extends String> items)
throws Exception {
items.forEach(item ->
System.out.println(“Consumed Message “ + item));
}
};
return stepBuilderFactory.get(“job”)
.chunk(0)
.reader(kafkaItemReader())
.writer(writer)
.build();
}
The above producer and consumer spring boot application code is available in below link
Pre-requisite
Batch Processing , kafka, Spring Boot, Spring
Happy Coding ♥
Published: | Last Updated: | Views: 17