Kafka Producer Api Part 2

Kafka Producer API – 0.8.2 – Insights (Part 2)

Continuing our discussion on Kafka Producer API Insights Part-1, let’s see the impact of other configuration properties on the behaviour of Producer. Please refer to the same program that we used to demonstrate the working of Kafka Producer in the previous episode. Let’s start the cluster and run the same program. You should see a list of responses which sends and prints the acknowledgement of the messages. Now bring down the cluster.

What happens if the cluster never comes up ?

We see the familiar output, with only “sent” messages and without any “delivered” notifications.
As we saw, in the insight 2 in previous episode, the messages which could not be delivered because of unavailability of cluster, keep on accumulating in the buffer. But, until when? Is there a limit?

Yes, until the buffer gets full.

Insight 5:

The number of messages that can be accumulated in the buffer of the Kafka Producer, is limited by the property buffer.memory. It specifies the maximum memory that the messages can occupy if remain undelivered. By default this limit *is 32 MB*.

The default limit of 32 MB is just too much to get filled by a few bytes of data sent by our program each second. So we need to set it to a lower value (100 KB) so that it fills in a reasonable time.

So add:

configs.put("buffer.memory", "102400"); // 100 KB

Also, we will increase the frequency of messages from one message per second to one message for 20 milliseconds (50 times faster).
So, edit sleep statement to look like this:

Thread.sleep(20);

And one last thing that can be done is to fix the size of message by padding 0s on left side of message number.
Change message string definition to:

final String messageString = "Test Message No. " +  String.format("%011d", reqSeq.incrementAndGet()) + " at " + new Date();

So each message now is of fixed size 60 bytes.

Let us now, start the cluster, run our program and then kill the cluster

Sending message every 1 sec, kill the program to terminate:

Message Sent:Test Message No. 00000000001 at Fri Aug 28 14:40:20 IST 2015

Message Sent:Test Message No. 00000000002 at Fri Aug 28 14:40:21 IST 2015

Message Delivered Successfully: Test Message No. 00000000001 at Fri Aug 28 14:40:20 IST 2015

Message Delivered Successfully: Test Message No. 00000000002 at Fri Aug 28 14:40:21 IST 2015

Message Sent:Test Message No. 00000000003 at Fri Aug 28 14:40:21 IST 2015

Message Delivered Successfully: Test Message No. 00000000003 at Fri Aug 28 14:40:21 IST 2015

Message Sent:Test Message No. 00000000004 at Fri Aug 28 14:40:21 IST 2015

Message Delivered Successfully: Test Message No. 00000000004 at Fri Aug 28 14:40:21 IST 2015

Message Sent:Test Message No. 00000000005 at Fri Aug 28 14:40:21 IST 2015

.

.

.

.

Message Sent:Test Message No. 00000000148 at Fri Aug 28 14:40:23 IST 2015

Message Delivered Successfully: Test Message No. 00000000148 at Fri Aug 28 14:40:23 IST 2015

Message Sent:Test Message No. 00000000149 at Fri Aug 28 14:40:24 IST 2015

Message Could not be delivered : Test Message No. 00000000149 at Fri Aug 28 14:40:24 IST 2015. Cause: The server disconnected before a response was received.

Message Sent:Test Message No. 00000000150 at Fri Aug 28 14:40:24 IST 2015

Message Sent:Test Message No. 00000000151 at Fri Aug 28 14:40:24 IST 2015

Message Sent:Test Message No. 00000000152 at Fri Aug 28 14:40:24 IST 2015

Message Sent:Test Message No. 00000000153 at Fri Aug 28 14:40:24 IST 2015

.

.

.

.

.

.

.

.

.

.

.

.

Message Sent:Test Message No. 00000001285 at Fri Aug 28 14:40:47 IST 2015

Message Sent:Test Message No. 00000001286 at Fri Aug 28 14:40:47 IST 2015

Message Sent:Test Message No. 00000001287 at Fri Aug 28 14:40:47 IST 2015

Message Sent:Test Message No. 00000001288 at Fri Aug 28 14:40:47 IST 2015

Message Sent:Test Message No. 00000001289 at Fri Aug 28 14:40:47 IST 2015

In above execution, the cluster was killed after message number 148 was delivered successfully. So all the messages started accumulating in the buffer. But after some time, the program hung. Why ? Because the buffer is full and now no more messages can be stored until some messages get delivered to the cluster.

Let us count the size of messages stored in buffer. Each message is appended with a fix sized metadata of 26 bytes. So, (1289 -148)86 = 114086 = 98040 bytes = 95.74 KB. But, since set the buffer limit to be 100 KB, why did it fail only by 1140 messages (which total to 95.7 KB) ? We will come back to this discrepancy in a bit.

For now, let’s focus on the fact that the Producer blocks until any space gets free in the buffer. But, for some scenarios like writing a fault tolerant Kafka Producer, we want to fail instead of wait.

What if we want to fail and not wait infinitely in case buffer is full ?

Insight 6:

In case, the limit of messages that can be accommodated in the buffer exceeds, i.e. the buffer becomes full, the send method blocks for any new message until any message present in buffer gets delivered and releases the space to accommodate new message. Setting the property *block.on.buffer.full** to false makes the send method fail instead of blocking infinitely.*

Let’s set this property to false:
java
configs.put("block.on.buffer.full", "false");

Now, it will throw exception like :

Message Sent:Test Message No. 00000001215 at Fri Aug 28 20:21:46 IST
2015
Message Sent:Test Message No. 00000001216 at Fri Aug 28 20:21:46 IST 2015
Exception in thread “main”
org.apache.kafka.clients.producer.BufferExhaustedException: You have
exhausted the 102400 bytes of memory you configured for the client and
the client is configured to error rather than block when memory is
exhausted. at
org.apache.kafka.clients.producer.internals.BufferPool.allocate(BufferPool.java:124)
at
org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:157)
at
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:338)
at
com.pubmatic.matrix.ingestion.api.metrics.publisher.KafkaSender.sendMessage(KafkaSender.java:45)
at
com.pubmatic.matrix.ingestion.api.metrics.publisher.KafkaSender.main(KafkaSender.java:31)

BufferExhaustedException is thrown when the buffer gets full.

Let’s get back to the discrepancy we saw where the buffer size is set to 100 KB but it blocks/fails at 95 KB.

Here comes the concept of batching.

Insight 7:

The messages are not sent to cluster individually, instead, they are batched together in batches of specific maximum size and then these batches are sent to the cluster. The batches are created per topic-partition. Each batch is allocated a fixed amount of memory defined by batch size configuration. This memory is allocated out of the total buffer memory. Thus, the messages are accumulated in buffer as multiple batches with each batch containing multiple messages.

For each message, there is a fixed size, 26 bytes, metadata attached with it. So, the effective size of each message, in our case, becomes 26 + 60 = 86 bytes. The default batch size is 16384 bytes. Thus a batch can have maximum of 16384/86 = 190 messages. Maximum number of batches that can be allocated from the available buffer memory is 100 KB/16384 = 102400/16384 = 6. So, for the buffer memory will fill out by 1906 = 1140 messages which totals to 95.7 KB, as seen earlier. *So, it is the fragmentation of the messages within the buffer which leaves some wasted space at end of each batch (depending upon message size).**

Kafka-Message-Buffer

Coming back to our fault tolerant implementation of Kafka producer, we would want to know the failure of clusters as soon as possible. Till now, in the above implementation, we get to know the failure (through BufferExhaustException) after it has eaten up 1140 messages. These 1140 messages are prone to get lost if the cluster never comes up. To minimize this risk, we would want to keep the buffer as small as possible. But,

Is there a side effect of keeping a small buffer ?

Let’s find out, again using the same example. This time, we will keep the buffer size to be 32768 bytes i.e. 163842* i.e. now the buffer will be able to hold two batches, each having a capacity of 190 messages.

configs.put("buffer.memory", "32768"); // size equals to two batches

Also, to simulate the real time system where there can be messages coming as fast as 500 messages per second, decrease the sleep time to 2 ms.
java
Thread.sleep(2);

Ideally, in a real system, the messages can come concurrently at the same time, but for the sake of simplicity of the example, we will keep it single threaded.

Now, since we have very small buffer which can accommodate, 2 batches i.e. 380 messages, it can run out of space if the delivery reports of these 380 messages are not received before new messages arrives. So, we should expect the failure, if the number of messages with pending acknowledgements reach beyond 380.
Let’s run the program:

Sending message every 1 sec, kill the program to terminate:

Message Sent:Test Message No. 00000000001 at Wed Sep 02 23:12:34 IST 2015

Message Sent:Test Message No. 00000000002 at Wed Sep 02 23:12:34 IST 2015

Message Sent:Test Message No. 00000000003 at Wed Sep 02 23:12:34 IST 2015

Exception in thread “main” org.apache.kafka.clients.producer.BufferExhaustedException: You have exhausted the 32768 bytes of memory you configured for the client and the client is configured to error rather than block when memory is exhausted.

at org.apache.kafka.clients.producer.internals.BufferPool.allocate(BufferPool.java:124)

at org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:157)

at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:338)

at com.pubmatic.matrix.ingestion.api.metrics.publisher.KafkaSender.sendMessage(KafkaSender.java:45)

at com.pubmatic.matrix.ingestion.api.metrics.publisher.KafkaSender.main(KafkaSender.java:31)

It failed just after three messages (being undelivered) ?? How is it possible. Let’s run it again. This time, it failed after 2 messages. It may fail for more number of messages on your machines, depending upon the configuration and speed of the machines. However, it will never fail just after one message.

Let’s see why.

Insight 8:

The producer tries to batch together the records that arrive in quick enough succession. That means, under load when records arrive faster than they can be sent out, they are put together in a single batched request. Otherwise, each batch may just have a single message. The memory for this single batch remains allocated until the acknowledgements of all the messages contained in it, are received.

So, in our example, each message is received after 2 ms. When first message comes, a batch of size 16384 bytes is allocated from the buffer with total capacity 32768, leaving another 16384 bytes. Now, by the time second message comes in, the first batch may already be sent to server and thus a new batch might be required for this second message. This is what is happening the second case above. It is also possible that the second message arrives just in time to get batched together with first message. Or second or third get batched together. In any case, the acknowledgements are not swift enough to free up the memory allocated by sent batches. This is what is happening in first case above.

If we remove the sleep at all, it is possible, that more messages are getting batched together, thus delaying the failure. On my machine it went to around 27 messages.

Conclusion:

Kafka Producer’s buffer and batching techniques makes the system quite performant under load by minimizing the network trips with each trip containing multiple records. Producer configurations give you numerous possiblities to shift the tradeoff between performance and fault-tolerance from one side to another depending upon the scenarios.

Leave a Reply

Your email address will not be published. Required fields are marked *