Kafka Consumer in Java
In this tutorial, we are going to learn how to build simple Kafka Consumer in Java. We will understand properties that we need to set while creating Consumers and how to handle topic offset to read messages from the beginning of the topic or just the latest messages.
Prerequisite
For building Kafka Consumer, We need to have one or more topics present in the Kafka server. That topic should have some messages published already, or some Kafka producer is going to publish messages to that topic when we are going to read those messages from Consumer. You can learn how to create a topic in Kafka here and how to write Kafka Producer here.
Kafka Consumer in Java
Let us see how we can write Kafka Consumer now. In the following code, we can see essential imports and properties that we need to set while creating consumers.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.util.Arrays; import java.util.Properties; public class SimpleKafkaConsumer { public static void main (String aregs[]) { String bootstrapServer = "localhost:9092"; String keyDeserializer = StringDeserializer.class.getName(); String valueDeserializer = StringDeserializer.class.getName(); String groupID = "consumerGroup"; String offsetReset = "earliest"; String topicName = "myTopic"; Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServer); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,keyDeserializer); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,valueDeserializer); properties.put(ConsumerConfig.GROUP_ID_CONFIG,groupID); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,offsetReset); } } |
We need to pass bootstrap server details so that Consumers can connect to Kafka server. In this example, we are reading from the topic which has Keys and Messages in String format. So we need to use String Deserializer for reading Keays and messages from that topic. We need to send a group name for that consumer. If the Consumer group has more than one consumer, then they can read messages in parallel from the topic. You can learn more about Kafka consumers here.
We need to tell Kafka from which point we want to read messages from that topic. For this purpose, we are passing offset reset property. Setting it to the earliest means Consumer will start reading messages from the beginning of that topic.
Reading messages
We need to create a consumer record for reading messages from the topic. We can use the following code to keep on reading from the consumer.
1 2 3 4 5 6 7 8 9 10 11 |
KafkaConsumer kafkaConsumer = new KafkaConsumer(properties); kafkaConsumer.subscribe(Arrays.asList(topicName)); while(true) { ConsumerRecords consumerRecords = kafkaConsumer.poll(1000); System.out.println(consumerRecords.count()); for (ConsumerRecord consumerRecord : consumerRecords) { System.out.println("Partition: " + consumerRecord.partition() + ", Key: " + consumerRecord.key() + ", Value: " + consumerRecord.value()); } } |
Here we are reading from the topic and displaying value, key and partition of each message. We are using ‘poll’ method of Kafka Consumer which will make consumers wait for 1000 milliseconds if there are no messages in the queue to read. If there are messages, it will return immediately with the new message. We can start another consumer with the same group id and they will read messages from different partitions of the topic in parallel. you can get all this code at the git repository
Conclusion
We have learned how to build Kafka consumer and read messages from the topic using Java language. In the last few articles, we have seen how to create the topic, Build a Producer, send messages to that topic and read those messages from the Consumer. In the future, we will learn more use cases of Kafka. Till then, happy learning !!!