Kafka Producer and Consumer in Python
Till now we have seen basics of Apache Kafka and created Producer and Consumer using Java. In this tutorial, we are going to build Kafka Producer and Consumer in Python. Along with that, we are going to learn about how to set up configurations and how to use group and offset concepts in Kafka.
Set up
We should have python installed on our machine for this tutorial. Also, we need to have access to Apache Kafka running on our device or some server. You can check how to install Apache Kafka on windows. Apart from this, we need python’s kafka library to run our code. To fix this, on system run following command
1 |
pip install kafka |
Kafka Producer
Let us start creating our own Kafka Producer. We have to import KafkaProducer from kafka library. We also need to give broker list of our Kafka server to Producer so that it can connect to the Kafka server. We also need to provide a topic name to which we want to publish messages. That is the minimal configuration that we need to give to create a Producer.
1 2 3 4 5 |
from kafka import KafkaProducer bootstrap_servers = ['localhost:9092'] topicName = 'myTopic' producer = KafkaProducer(bootstrap_servers = bootstrap_servers) producer = KafkaProducer() |
We can start sending messages to this topic using the following code.
1 2 3 4 |
ack = producer.send(topicName, b'Hello World!!!!!!!!') metadata = ack.get() print(metadata.topic) print(metadata.partition) |
The above code sends a message to the topic named ‘myTopic’ in Kafka server. However, what if that topic is not already present in the Kafka server? In such a case, Kafka creates a new topic with this name and publish messages to it. Convenient isn’t it? But you should remember to check for any spelling mistakes in topic names.
If you want to set some more properties for your Producer or change its serialization format you can use the following lines of code.
1 |
producer = KafkaProducer(bootstrap_servers = bootstrap_servers, retries = 5,value_serializer=lambda m: json.dumps(m).encode('ascii')) |
Kafka Consumer
As we are finished with creating Producer, let us now start building Consumer in python and see if that will be equally easy. After importing KafkaConsumer, we need to set up provide bootstrap server id and topic name to establish a connection with Kafka server.
1 2 3 4 5 6 |
from kafka import KafkaConsumer import sys bootstrap_servers = ['localhost:9092'] topicName = 'myTopic' consumer = KafkaConsumer (topicName, group_id = 'group1',bootstrap_servers = bootstrap_servers, auto_offset_reset = 'earliest') |
As we can see we need to set up which group consumer belongs to. Also, we need to specify offset from which this consumer should read messages from the topic. In the above case, we have specified auto_offset_reset to earliest which means this consumer will start reading messages from the beginning of the topic.
After this, we can start reading messages from a topic. Along with each message, we get some additional information like which partition that message belongs to, its offset in that partition and its key.
1 2 3 4 5 |
try: for message in consumer: print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value)) except KeyboardInterrupt: sys.exit() |
This will print output in the following format.
This is it. We have created our first Kafka consumer in python. We can see this consumer has read messages from the topic and printed it on a console.
Conclusion
We have learned how to create Kafka producer and Consumer in python. In the next articles, we will learn the practical use case when we will read live stream data from Twitter. Until then, keep learning.
Unable to get this code to run!
I am using WIN 10 and Python 3.8 with IDLE.
I get timeouts since the code hangs
I installed kafka-python library – kafka did not work