Tutorial: Use the Apache Kafka Producer and Consumer APIs
Learn how to use the Apache Kafka Producer and Consumer APIs with Kafka on HDInsight.
The Kafka Producer API allows applications to send streams of data to the Kafka cluster. The Kafka Consumer API allows applications to read streams of data from the cluster.
In this tutorial, you learn how to:
- Prerequisites
- Understand the code
- Build and deploy the application
- Run the application on the cluster
For more information on the APIs, see Apache documentation on the Producer API and Consumer API.
Prerequisites
- Apache Kafka on HDInsight cluster. To learn how to create the cluster, see Start with Apache Kafka on HDInsight.
- Java Developer Kit (JDK) version 8 or an equivalent, such as OpenJDK.
- Apache Maven properly installed according to Apache. Maven is a project build system for Java projects.
- An SSH client like Putty. For more information, see Connect to HDInsight (Apache Hadoop) using SSH.
Understand the code
The example application is located at https://github.com/Azure-Samples/hdinsight-kafka-java-get-started, in the Producer-Consumer
subdirectory. If you're using Enterprise Security Package (ESP) enabled Kafka cluster, you should use the application version located in the DomainJoined-Producer-Consumer
subdirectory.
The application consists primarily of four files:
pom.xml
: This file defines the project dependencies, Java version, and packaging methods.Producer.java
: This file sends random sentences to Kafka using the producer API.Consumer.java
: This file uses the consumer API to read data from Kafka and emit it to STDOUT.AdminClientWrapper.java
: This file uses the admin API to create, describe, and delete Kafka topics.Run.java
: The command-line interface used to run the producer and consumer code.
Pom.xml
The important things to understand in the pom.xml
file are:
Dependencies: This project relies on the Kafka producer and consumer APIs, which are provided by the
kafka-clients
package. The following XML code defines this dependency:<!-- Kafka client for producer/consumer operations --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka.version}</version> </dependency>
The
${kafka.version}
entry is declared in the<properties>..</properties>
section ofpom.xml
, and is configured to the Kafka version of the HDInsight cluster.Plugins: Maven plugins provide various capabilities. In this project, the following plugins are used:
maven-compiler-plugin
: Used to set the Java version used by the project to 8. This is the version of Java used by HDInsight 3.6.maven-shade-plugin
: Used to generate an uber jar that contains this application as well as any dependencies. It is also used to set the entry point of the application, so that you can directly run the Jar file without having to specify the main class.
Producer.java
The producer communicates with the Kafka broker hosts (worker nodes) and sends data to a Kafka topic. The following code snippet is from the Producer.java file from the GitHub repository and shows how to set the producer properties. For Enterprise Security Enabled clusters an additional property must be added "properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");"
Properties properties = new Properties();
// Set the brokers (bootstrap servers)
properties.setProperty("bootstrap.servers", brokers);
// Set how to serialize key/value pairs
properties.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
Consumer.java
The consumer communicates with the Kafka broker hosts (worker nodes), and reads records in a loop. The following code snippet from the Consumer.java file sets the consumer properties. For Enterprise Security Enabled clusters an additional property must be added "properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");"
KafkaConsumer<String, String> consumer;
// Configure the consumer
Properties properties = new Properties();
// Point it to the brokers
properties.setProperty("bootstrap.servers", brokers);
// Set the consumer group (all consumers must belong to a group).
properties.setProperty("group.id", groupId);
// Set how to serialize key/value pairs
properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
// When a group is first created, it has no offset stored to start reading from. This tells it to start
// with the earliest record in the stream.
properties.setProperty("auto.offset.reset","earliest");
consumer = new KafkaConsumer<>(properties);
In this code, the consumer is configured to read from the start of the topic (auto.offset.reset
is set to earliest
.)
Run.java
The Run.java file provides a command-line interface that runs either the producer or consumer code. You must provide the Kafka broker host information as a parameter. You can optionally include a group ID value, which is used by the consumer process. If you create multiple consumer instances using the same group ID, they'll load balance reading from the topic.
Build and deploy the example
Use pre-built JAR files
Download the jars from the Kafka Get Started Azure sample. If your cluster is Enterprise Security Package (ESP) enabled, use kafka-producer-consumer-esp.jar. Use the command below to copy the jars to your cluster.
scp kafka-producer-consumer*.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar
Build the JAR files from code
If you would like to skip this step, prebuilt jars can be downloaded from the Prebuilt-Jars
subdirectory. Download the kafka-producer-consumer.jar. If your cluster is Enterprise Security Package (ESP) enabled, use kafka-producer-consumer-esp.jar. Execute step 3 to copy the jar to your HDInsight cluster.
Download and extract the examples from https://github.com/Azure-Samples/hdinsight-kafka-java-get-started.
Set your current directory to the location of the
hdinsight-kafka-java-get-started\Producer-Consumer
directory. If you are using Enterprise Security Package (ESP) enabled Kafka cluster, you should set the location toDomainJoined-Producer-Consumer
subdirectory. Use the following command to build the application:mvn clean package
This command creates a directory named
target
, that contains a file namedkafka-producer-consumer-1.0-SNAPSHOT.jar
. For ESP clusters the file will bekafka-producer-consumer-esp-1.0-SNAPSHOT.jar
Replace
sshuser
with the SSH user for your cluster, and replaceCLUSTERNAME
with the name of your cluster. Enter the following command to copy thekafka-producer-consumer-1.0-SNAPSHOT.jar
file to your HDInsight cluster. When prompted enter the password for the SSH user.scp ./target/kafka-producer-consumer*.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar
Run the example
Replace
sshuser
with the SSH user for your cluster, and replaceCLUSTERNAME
with the name of your cluster. Open an SSH connection to the cluster, by entering the following command. If prompted, enter the password for the SSH user account.ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
To get the Kafka broker hosts, substitute the values for
<clustername>
and<password>
in the following command and execute it. Use the same casing for<clustername>
as shown in the Azure portal. Replace<password>
with the cluster login password, then execute:sudo apt -y install jq export CLUSTER_NAME='<clustername>' export PASSWORD='<password>' export KAFKABROKERS=$(curl -sS -u admin:$PASSWORD -G https://$CLUSTER_NAME.azurehdinsight.net/api/v1/clusters/$CLUSTER_NAME/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
Note
This command requires Ambari access. If your cluster is behind an NSG, run this command from a machine that can access Ambari.
Create Kafka topic,
myTest
, by entering the following command:java -jar kafka-producer-consumer.jar create myTest $KAFKABROKERS
To run the producer and write data to the topic, use the following command:
java -jar kafka-producer-consumer.jar producer myTest $KAFKABROKERS
Once the producer has finished, use the following command to read from the topic:
java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS scp ./target/kafka-producer-consumer*.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar
The records read, along with a count of records, is displayed.
Use Ctrl + C to exit the consumer.
Multiple consumers
Kafka consumers use a consumer group when reading records. Using the same group with multiple consumers results in load balanced reads from a topic. Each consumer in the group receives a portion of the records.
The consumer application accepts a parameter that is used as the group ID. For example, the following command starts a consumer using a group ID of myGroup
:
java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup
Use Ctrl + C to exit the consumer.
To see this process in action, use the following command:
tmux new-session 'java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup' \
\; split-window -h 'java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup' \
\; attach
This command uses tmux
to split the terminal into two columns. A consumer is started in each column, with the same group ID value. Once the consumers finish reading, notice that each read only a portion of the records. Use Ctrl + C twice to exit tmux
.
Consumption by clients within the same group is handled through the partitions for the topic. In this code sample, the test
topic created earlier has eight partitions. If you start eight consumers, each consumer reads records from a single partition for the topic.
Important
There cannot be more consumer instances in a consumer group than partitions. In this example, one consumer group can contain up to eight consumers since that is the number of partitions in the topic. Or you can have multiple consumer groups, each with no more than eight consumers.
Records stored in Kafka are stored in the order they're received within a partition. To achieve in-ordered delivery for records within a partition, create a consumer group where the number of consumer instances matches the number of partitions. To achieve in-ordered delivery for records within the topic, create a consumer group with only one consumer instance.
Common Issues faced
Topic creation fails If your cluster is Enterprise Security Pack enabled, use the pre-built JAR files for producer and consumer. The ESP jar can be built from the code in the
DomainJoined-Producer-Consumer
subdirectory. The producer and consumer properties have an additional propertyCommonClientConfigs.SECURITY_PROTOCOL_CONFIG
for ESP enabled clusters.Failure in ESP enabled clusters: If produce and consume operations fail and you are using an ESP enabled cluster, check that the user
kafka
is present in all Ranger policies. If it is not present, add it to all Ranger policies.
Clean up resources
To clean up the resources created by this tutorial, you can delete the resource group. Deleting the resource group also deletes the associated HDInsight cluster, and any other resources associated with the resource group.
To remove the resource group using the Azure portal:
- In the Azure portal, expand the menu on the left side to open the menu of services, and then choose Resource Groups to display the list of your resource groups.
- Locate the resource group to delete, and then right-click the More button (...) on the right side of the listing.
- Select Delete resource group, and then confirm.
Next steps
In this document, you learned how to use the Apache Kafka Producer and Consumer API with Kafka on HDInsight. Use the following to learn more about working with Kafka:
Feedback
https://aka.ms/ContentUserFeedback.
Coming soon: Throughout 2024 we will be phasing out GitHub Issues as the feedback mechanism for content and replacing it with a new feedback system. For more information see:Submit and view feedback for