Kafka Java API Example


Kafka Java API Example
Let us create an application for publishing and consuming messages using a Java client.
Steps to follow
Step 1 - Open a new terminal (CTRL + ALT + T) and change the directory to /usr/local/kafka
 
$ cd /usr/local/kafka
Step 2 - Start the zookeeper
 
$ ./bin/zookeeper-server-start.sh config/zookeeper.properties
Step 3 - Open a new terminal (CTRL + ALT + T) and change the directory to /usr/local/kafka
 
$ cd /usr/local/kafka
Step 4 - Start the kafka server
 
$ ./bin/kafka-server-start.sh config/server.properties
Step 5 - Open a new terminal (CTRL + ALT + T) and just type 'jps' (without quotes). It shows all the java processes that are running.
 
Here Kafka is the kafka server and QuorumPeerMain is zookeeper process.
KafkaProducer.java
 
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class KafkaProducer {
	private static Producer producer;
	private static final String topic = "mytopic";

	public void initialize() {
		Properties producerProps = new Properties();
		producerProps.put("metadata.broker.list", "localhost:9092");
		producerProps.put("serializer.class", "kafka.serializer.StringEncoder");
		producerProps.put("request.required.acks", "1");
		ProducerConfig producerConfig = new ProducerConfig(producerProps);
		producer = new Producer(producerConfig);
	}

	public void publishMesssage() throws Exception{            
             BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));               
         while (true){
             System.out.print("Enter message to send to kafka broker (Press 'Y' to close producer): ");
           String msg = null;
           msg = reader.readLine(); // Read message from console
           //Define topic name and message
           KeyedMessage keyedMsg =
                        new KeyedMessage(topic, msg);
           producer.send(keyedMsg); // This publishes message on given topic
           if("Y".equals(msg)){ break; }
           System.out.println("--> Message [" + msg + "] sent. Check message on Consumer's program console");
         }
         return;
       }

	public static void main(String[] args) throws Exception {
		KafkaProducer kafkaProducer = new KafkaProducer();
		// Initialize producer
		kafkaProducer.initialize();
		// Publish message
		kafkaProducer.publishMesssage();
		// Close the producer
		producer.close();
	}
}
KafkaConsumer.java
 
import java.util.*;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

public class KafkaConsumer {
	private ConsumerConnector consumerConnector = null;
	private final String topic = "mytopic";

	public void initialize() {
		Properties props = new Properties();
		props.put("zookeeper.connect", "localhost:2181");
		props.put("group.id", "testgroup");
		props.put("zookeeper.session.timeout.ms", "400");
		props.put("zookeeper.sync.time.ms", "300");
		props.put("auto.commit.interval.ms", "1000");
		ConsumerConfig conConfig = new ConsumerConfig(props);
		consumerConnector = Consumer.createJavaConsumerConnector(conConfig);
	}

	public void consume() {
		// Key = topic name, Value = No. of threads for topic
		Map topicCount = new HashMap();
		topicCount.put(topic, new Integer(1));

		// ConsumerConnector creates the message stream for each topic
		Map>> consumerStreams = consumerConnector
				.createMessageStreams(topicCount);

		// Get Kafka stream for topic 'mytopic'
		List> kStreamList = consumerStreams
				.get(topic);
		// Iterate stream using ConsumerIterator
		for (final KafkaStream kStreams : kStreamList) {
			ConsumerIterator consumerIte = kStreams.iterator();

			while (consumerIte.hasNext())
				System.out.println("Message consumed from topic [" + topic
						+ "] : " + new String(consumerIte.next().message()));
		}
		// Shutdown the consumer connector
		if (consumerConnector != null)
			consumerConnector.shutdown();
	}

	public static void main(String[] args) throws InterruptedException {
		KafkaConsumer kafkaConsumer = new KafkaConsumer();
		// Configure Kafka consumer
		kafkaConsumer.initialize();
		// Start consumption
		kafkaConsumer.consume();
	}
}
Step 6 - Open a new terminal (CTRL + ALT + T) and compile both the programs.
 
$ javac -cp "/usr/local/kafka/lib/*" *.java
Step 7 - Open a new terminal (CTRL + ALT + T) and run KafkaProducer 
 
$ java KafkaProducer
Step 8 - Open a new terminal (CTRL + ALT + T) and run KafkaConsumer 
 
$ java KafkaConsumer
Step 9 - Stop zookeeper and kafka server. Just press 'CTRL + D' in the terminals of zookeeper and kafka server.
 
 


Have any Question or Comment?

Leave a Reply

Your email address will not be published. Required fields are marked *