Create and list Kafka topics in Java

Updated On - August 15, 2020  |  By Mahesh Mogal

We have seen how to create, list and manage topics using Kafka console. Kafka console is good for practice and testing your code. But in production, we will need to use some API to interact with Apache Kafka. So, we will explore how to use Java and Python API with Apache Kafka. In this article, We will learn to Create and list Kafka topics in Java.

Create Kafka topics in Java

We can use 'kafka.zk.AdminZkClient' library to create topics and change their configuration using Java code.

First, we need to connect to the Zookeeper server and establish ZkUtils session.

String zookeeperHost = "127.0.0.1:2181"; 
Boolean isSucre = false;
int sessionTimeoutMs = 200000;
int connectionTimeoutMs = 15000;
int maxInFlightRequests = 10;
Time time = Time.SYSTEM;
String metricGroup = "myGroup";
String metricType = "myType";
KafkaZkClient zkClient = KafkaZkClient.apply(zookeeperHost,isSucre,sessionTimeoutMs,
        connectionTimeoutMs,maxInFlightRequests,time,metricGroup,metricType);
AdminZkClient adminZkClient = new AdminZkClient(zkClient);

Once our session is established, we can start creating our topic. We can pass the topic configuration while creating a new topic.

String topicName1 = "myTopic";
int partitions = 3;
int replication = 1; // you should have replication factor less than or equal to number of nodes in Kafka cluster
Properties topicConfig = new Properties(); // you can pass topic configurations while creating topic

Once the topic configuration is fixed, we can create the topic using the following command.

adminZkClient.createTopic(topicName1,partitions,replication,topicConfig,RackAwareMode.Disabled$.MODULE$);

After running this code, a topic named 'myTopic' will be created with three partitions and one replication factor.

Listing topic configuration

We can get topic configuration using the following method.

adminZkClient.getAllTopicConfigs().get(topicName1);

Checking topic existence

We need to check if some topic is already present in Kafka server before creating it. We can use the following function to check that

zkClient.topicExists(topicName1);

Changing topic configuration

We can modify topic configuration using the following function

topicConfig.put("cleanup.policy","delete");
adminZkClient.changeTopicConfig(topicName1,topicConfig);

Listing topics

We need to create Kafka consumer to list all topics in the Kafka server. Then we can use its 'listTopic' method to list all topics.

Seq allTopic = zkClient.getAllTopicsInCluster();
System.out.println("Cluset has " + allTopic.length() + " topics");
System.out.println(allTopic);

This will return all the topics in Kafka server.

Deleting Kafka topic

Though it is not often we need to delete Kafka topics; We can use the following method to do so.

adminZkClient.deleteTopic(topicName1);

I hope you have learned how we can manage topics in the Kafka server using Java. You can find this detailed code at git repository.

In the next article, we will see how to write Kafka Producer using Java. Till then, Happy learning.

Manage Kafka topics with Java
Mahesh Mogal
I am passionate about Cloud, Data Analytics, Machine Learning, and Artificial Intelligence. I like to learn and try out new things. I have started blogging about my experience while learning these exciting technologies.

Stay Updated with Latest Blogs

Get latest blogs delivered to your mail directly.

Recent Posts

Partitioning in Hive

Using Partitioning, We can increase hive query performance. But if we do not choose partitioning column correctly it can create small file issue.

Partitioning in Hive
Read More
Hive Data Manipulation - Loading Data to Hive Tables

We will learn how to load and populate data to hive table. We will also learn how to copy data to hive tables from local system.

Loading Data to Hive Tables
Read More
Create, Alter, Delete Tables in Hive

We will learn how to create Hive tables, also altering table columns, adding comments and table properties and deleting Hive tables.

manage tables in hive -2
Read More

Leave a Reply

Your email address will not be published. Required fields are marked *

5 comments on “Create and list Kafka topics in Java”

  1. If the zookeepers are locked. this code is throwing the below error
    Exception in thread "main" kafka.admin.AdminOperationException: org.apache.zookeeper.KeeperException$InvalidACLException: KeeperErrorCode = InvalidACL for /brokers/topics/myTopic1
    at kafka.zk.AdminZkClient.writeTopicPartitionAssignment(AdminZkClient.scala:162)
    at kafka.zk.AdminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(AdminZkClient.scala:102)
    at kafka.zk.AdminZkClient.createTopic(AdminZkClient.scala:56)
    Any methods available to unlock/lock the zookeeper?

  2. Hi Mahesh,

    Have you tried API for creating topics for the brokers configured with SASL_SSL authentication if ,so can you please share the code.

    Thanks
    Sujay

linkedin facebook pinterest youtube rss twitter instagram facebook-blank rss-blank linkedin-blank pinterest youtube twitter instagram
Share via
Copy link