Kafka Producer Api Part 1

Kafka Producer API – 0.8.2 – Insights (Part 1)

Recently, I have been involved in writing a fault tolerant wrapper over Kafka Producer so as to minimize message loss in case of cluster failures or network failures between Kafka Producer and Kafka Brokers.

Overview

So, let us first see, from 100 feet above (not 10,000), what does Kafka Producer API provides, in its endeavour to maintain perfect balance between performance and reliability. By performance, I mean the ability to minimize the maximum blocking time of Producer’s Send method and by reliability, the ability to minimize the probability of loss of message without any notification to sender.

Creating a kafka producer object:

KafkaProducer kp = new KafkaProducer(configs);

A producer is instantiated by providing a set of key-value pairs as configuration. For configurations please refer new producer configs section of the documentation.

A lot depends on these configurations as to how we want our producer to behave, more reliably or more performant.

Sending a message using producer:

Future<RecordMetadata> future = producer.send(record, callback);

The code snippet shown above asynchronously sends a record to a topic and invokes the provided callback when the send has been acknowledged.

Unlike older versions, in 0.8.2 Kafka Clients API, the producer’s send method is no more synchronous inherently (Although the sender can be programmed to behave synchronously).

In asynchronous mode, the send method returns immediately.

For programming a synchronous send, we can leverage the future object returned by send.

producer.send(record, null).get();

Insights

Now, we will try to gain insights of the Kafka Producer by analysing its behaviour in a sample program.

Let's use following example to send messages to a Kafka cluster:

    public class KafkaSender {

        Map<String, Object> configs;
        KafkaProducer<byte[], byte[]> producer;
        AtomicInteger reqSeq = new AtomicInteger(0);

        public static void main(String[] args) throws InterruptedException, ExecutionException {
            Map<String, Object> configs = new HashMap<String, Object>();
            configs.put("bootstrap.servers", "localhost:9092");
            configs.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
            configs.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
            KafkaProducer<byte[], byte[]> producer = new KafkaProducer<byte[], byte[]>(configs);
            KafkaSender kafkaSender = new KafkaSender(configs, producer);
            System.out.println("Sending message every 1 sec, kill the program to terminate:");
            while (true) {
                kafkaSender.sendMessage();
                Thread.sleep(1000);
            }
        }

        public KafkaSender(Map<String, Object> configs, KafkaProducer<byte[], byte[]> producer) {
            this.configs = configs;
            this.producer = producer;
        }

        private void sendMessage() throws InterruptedException, ExecutionException {
            final String messageString = "Test Message No. " + reqSeq.incrementAndGet() + " at " + new Date();
            byte[] message = messageString.getBytes();
            ProducerRecord<byte[], byte[]> record = new ProducerRecord<byte[], byte[]>("topic1", message);
            producer.send(record, new Callback() {

                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception == null) {
                        System.out.println("Message Delivered Successfully: " + messageString);
                    } else {
                        System.out.println("Message Could not be delivered : " + messageString + ". Cause: "
                                + exception.getMessage());
                    }
                }
            });
            System.out.println("Message Sent:" + messageString);
        }
    }

The above program sends a messge (identified with a sequence number and timestamp) to a kafka cluster every one second and notifies the acknowledgement/failure.

Now, spawn a cluster on localhost with three brokers on port 9092, 9093 and 9094 with a topic "topic1". After starting the cluster, run the above program.

Sending message every 1 sec, kill the program to terminate:

Message Sent:Test Message No. 1 at Sat Jul 18 18:31:02 IST 2015

Message Delivered Successfully: Test Message No. 1 at Sat Jul 18 18:31:02 IST 2015

Message Sent:Test Message No. 2 at Sat Jul 18 18:31:03 IST 2015

Message Delivered Successfully: Test Message No. 2 at Sat Jul 18 18:31:03 IST 2015

Message Sent:Test Message No. 3 at Sat Jul 18 18:31:04 IST 2015

Message Delivered Successfully: Test Message No. 3 at Sat Jul 18 18:31:04 IST 2015

Message Sent:Test Message No. 4 at Sat Jul 18 18:31:05 IST 2015

Message Delivered Successfully: Test Message No. 4 at Sat Jul 18 18:31:05 IST 2015

Message Sent:Test Message No. 5 at Sat Jul 18 18:31:06 IST 2015

.

.

.

.

.

Every one second it sends a message and receives the delivery via callback.

In producer configs only one broker is mentioned in bootstrap.servers property

configs.put("bootstrap.servers", "localhost:9092");

Insight 1:

Kafka Producer only needs one of the brokers for discovering the whole cluster. The one mentioned here must be up and running for the send to work. So, for fault tolerance, bootstrap.servers can be a list of brokers, out of which only one is required to identify the cluster.


What happens if the cluster becomes unreachable:

Now, during the above program’s execution, let us kill the kafka cluster by killing all the processes related to the cluster viz. all kafka brokers and one zookeeper. Now our program stops receiving callbacks all together.

Sending message every 1 sec, kill the program to terminate:

Message Sent:Test Message No. 1 at Sat Jul 18 22:41:30 IST 2015

Message Delivered Successfully: Test Message No. 1 at Sat Jul 18 22:41:30 IST 2015

Message Sent:Test Message No. 2 at Sat Jul 18 22:41:31 IST 2015

Message Delivered Successfully: Test Message No. 2 at Sat Jul 18 22:41:31 IST 2015

Message Sent:Test Message No. 3 at Sat Jul 18 22:41:32 IST 2015

Message Delivered Successfully: Test Message No. 3 at Sat Jul 18 22:41:32 IST 2015

Message Sent:Test Message No. 4 at Sat Jul 18 22:41:33 IST 2015

Message Delivered Successfully: Test Message No. 4 at Sat Jul 18 22:41:33 IST 2015

Message Sent:Test Message No. 5 at Sat Jul 18 22:41:34 IST 2015

Message Sent:Test Message No. 6 at Sat Jul 18 22:41:35 IST 2015

Message Sent:Test Message No. 7 at Sat Jul 18 22:41:36 IST 2015

Message Sent:Test Message No. 8 at Sat Jul 18 22:41:37 IST 2015

Message Sent:Test Message No. 9 at Sat Jul 18 22:41:38 IST 2015

Shouldn’t it send a failure (Exception) in callbacks ? No, it shouldn’t because the ‘acks’ property of producer config, is not set explicitly which defaults to 1, and also because of the buffer it has for queueing up messages before sending to broker servers. We will come back, heavily, to this ‘buffer’ thing in next episode.

Insight 2:

If the acks value is either null or set to 1 (default), no Callbacks are received (not even failure) when the cluster becomes unreachable and the producer is not able to forward the data to the broker servers. All the messages keep on accumulating in the buffer until the buffer is full.


Now, without killing our program, let us restart the kafka cluster by bringing zookeeper and brokers up, respectively. Lets see what happens now: (Continuing the above output)

Message Sent:Test Message No. 7 at Sat Jul 18 22:41:36 IST 2015

Message Sent:Test Message No. 8 at Sat Jul 18 22:41:37 IST 2015

Message Sent:Test Message No. 9 at Sat Jul 18 22:41:38 IST 2015

Message Sent:Test Message No. 10 at Sat Jul 18 22:41:39 IST 2015

Message Could not be delivered : Test Message No. 6 at Sat Jul 18 22:41:35 IST 2015. Cause: This server does not host this
topic-partition.

Message Could not be delivered : Test Message No. 8 at Sat Jul 18 22:41:37 IST 2015. Cause: This server does not host this
topic-partition.

Message Could not be delivered : Test Message No. 10 at Sat Jul 18 22:41:39 IST 2015. Cause: This server does not host this
topic-partition.

Message Could not be delivered : Test Message No. 5 at Sat Jul 18 22:41:34 IST 2015. Cause: This server does not host this
topic-partition.

Message Could not be delivered : Test Message No. 7 at Sat Jul 18 22:41:36 IST 2015. Cause: This server does not host this
topic-partition.

Message Could not be delivered : Test Message No. 9 at Sat Jul 18 22:41:38 IST 2015. Cause: This server does not host this
topic-partition.

Message Sent:Test Message No. 11 at Sat Jul 18 22:41:40 IST 2015

Message Could not be delivered : Test Message No. 11 at Sat Jul 18 22:41:40 IST 2015. Cause: This server does not host this
topic-partition.

Message Sent:Test Message No. 12 at Sat Jul 18 22:41:41 IST 2015

Message Could not be delivered : Test Message No. 12 at Sat Jul 18 22:41:41 IST 2015. Cause: This server does not host this
topic-partition.

Message Sent:Test Message No. 13 at Sat Jul 18 22:41:42 IST 2015

Message Could not be delivered : Test Message No. 13 at Sat Jul 18 22:41:42 IST 2015. Cause: This server does not host this
topic-partition.

Message Sent:Test Message No. 14 at Sat Jul 18 22:41:43 IST 2015

Message Could not be delivered : Test Message No. 14 at Sat Jul 18 22:41:43 IST 2015. Cause: This server does not host this
topic-partition.

Message Sent:Test Message No. 15 at Sat Jul 18 22:41:44 IST 2015

Message Could not be delivered : Test Message No. 15 at Sat Jul 18 22:41:44 IST 2015. Cause: This server does not host this
topic-partition.

Message Sent:Test Message No. 16 at Sat Jul 18 22:41:45 IST 2015

Message Could not be delivered : Test Message No. 16 at Sat Jul 18 22:41:45 IST 2015. Cause: This server does not host this
topic-partition.

Message Sent:Test Message No. 17 at Sat Jul 18 22:41:46 IST 2015

Message Delivered Successfully: Test Message No. 17 at Sat Jul 18 22:41:46 IST 2015

Message Sent:Test Message No. 18 at Sat Jul 18 22:41:47 IST 2015

Message Delivered Successfully: Test Message No. 18 at Sat Jul 18 22:41:47 IST 2015

Message Sent:Test Message No. 19 at Sat Jul 18 22:41:48 IST 2015

Message Delivered Successfully: Test Message No. 19 at Sat Jul 18 22:41:48 IST 2015

Message Sent:Test Message No. 20 at Sat Jul 18 22:41:49 IST 2015

Message Delivered Successfully: Test Message No. 20 at Sat Jul 18 22:41:49 IST 2015

Here we see that, as soon as the cluster is up, we start receiving callbacks for the messages. This is because, the producer, which was polling for connection with brokers, gets the connection and tries to send messages present in buffer to the broker. However, the broker responds with error ‘this server does not host this topic or partition’. This is because the messages that were written to the buffer are sent as soon as the connection is available but before the brokers were fully up.

Now, after the cluster is completely up again, producer refreshes its metadata and the messages now deliver successfully.

Insight 3:

All the messages already accumulated in the buffer of the producer are pushed at once as soon as the connectivity with the cluster is restored. Only then, (in case of acks property of producer config set to 1), the callbacks will be fired (with or without exception, depending on what we mean by the cluster being unreachable [process killed or network down etc.]).

Kafka-Producer-Acks-0

Now, lets set the ‘acks’ property of the producer config to 0. Now, the producer will not wait for any acknowledgment from the server at all. The record will be immediately added to the socket buffer and considered sent. To see it, add the following line during setting the configurations for producer:

configs.put("acks", "0");

Now, again bring the cluster up and run the updated program. After 3-4 messages delivered successfully, bring the cluster down, by killing all the brokers. Just after few seconds, again bring the cluster up. (You can write scripts to automate the process of starting and stoping the cluster). You should see an output similar to below:

Message Sent:Test Message No. 1 at Wed Jul 29 09:00:32 IST 2015

Message Delivered Successfully: Test Message No. 1 at Wed Jul 29 22:00:32 IST 2015

Message Sent:Test Message No. 2 at Wed Jul 29 09:00:34 IST 2015

Message Delivered Successfully: Test Message No. 2 at Wed Jul 29 22:00:34 IST 2015

Message Sent:Test Message No. 3 at Wed Jul 29 09:00:35 IST 2015

Message Delivered Successfully: Test Message No. 3 at Wed Jul 29 22:00:35 IST 2015

Message Sent:Test Message No. 4 at Wed Jul 29 09:00:36 IST 2015

Message Delivered Successfully: Test Message No. 4 at Wed Jul 29 22:00:36 IST 2015

Message Sent:Test Message No. 5 at Wed Jul 29 09:00:37 IST 2015

Message Delivered Successfully: Test Message No. 5 at Wed Jul 29 22:00:37 IST 2015

Message Sent:Test Message No. 6 at Wed Jul 29 09:00:38 IST 2015

Message Sent:Test Message No. 7 at Wed Jul 29 09:00:39 IST 2015

Message Sent:Test Message No. 8 at Wed Jul 29 09:00:40 IST 2015

Message Sent:Test Message No. 9 at Wed Jul 29 09:00:41 IST 2015

Message Sent:Test Message No. 10 at Wed Jul 29 09:00:42 IST 2015

Message Delivered Successfully: Test Message No. 6 at Wed Jul 29 22:00:38 IST 2015

Message Delivered Successfully: Test Message No. 7 at Wed Jul 29 22:00:39 IST 2015

Message Delivered Successfully: Test Message No. 8 at Wed Jul 29 22:00:40 IST 2015

Message Delivered Successfully: Test Message No. 9 at Wed Jul 29 22:00:41 IST 2015

Message Delivered Successfully: Test Message No. 10 at Wed Jul 29 22:00:42 IST 2015

Message Sent:Test Message No. 11 at Wed Jul 29 09:00:43 IST 2015

Message Delivered Successfully: Test Message No. 11 at Wed Jul 29 22:00:43 IST 2015

Message Sent:Test Message No. 12 at Wed Jul 29 09:00:44 IST 2015

Message Delivered Successfully: Test Message No. 12 at Wed Jul 29 22:00:44 IST 2015

Message Sent:Test Message No. 13 at Wed Jul 29 09:00:45 IST 2015

Message Delivered Successfully: Test Message No. 13 at Wed Jul 29 22:00:45 IST 2015

It looks similar to previous output but now all the messages appear to be successfully delivered although the delivery report (callbacks) are received only after the cluster is up again. But, wait, did changing the acks value to 0, increased the reliability? No, in fact, reliability is decreased. This can be seen by running the consumer on that cluster to see which all messages are received. It can be noticed that some messages are not available on consumer side despite of their receipts in callbacks without any exception.

Insight 4:

If property acks is set to zero then the producer will not wait for any acknowledgment from the server at all. The record will be added to the buffer and the callbacks will be called as soon as the connection with the broker is established. Since the acknowledgement is not received from the servers, the callbacks are always without any exception.

Kafka-Producer-Acks-1


Conclusion

Every property in Producer configuration has special meaning and the behaviour of the Kafka Producer becomes quite interesting when one or more values are changed. Understanding these behaviours can help you avoid surprises during production and enable you to cope with almost any scenario. I will present rest of the insights regarding other configuration parameters in next episode.

Leave a Reply

Your email address will not be published. Required fields are marked *