/Krackle

A Low Overhead Kafka Client

Primary LanguageJavaApache License 2.0Apache-2.0

Krackle - A Low Overhead Kafka Client

While the standard Java Kafka client is easy to use, it does tend to have a high level of overhead. A lot of objects are created, only to be garbage collected very quickly, often within milliseconds on a heavily loaded producer.

Krackle is a Kafka 0.8 client designed to minimize the number of objects created, and therefore to reduce garbage collection overhead. In my tests this has reduced the CPU usage by 50% under heavy load.

I order to achieve these performance improvements, some compromises had to be made. In particular, this producer requires an instance for each topic and key, and the consumer requires an instance per partition. This means that it may not useful in the general case.

Example use case: You have thousands of applications running, and you are using Kafka to centralize the logs. Since each application will know upfront what topic it will log to, and the key it will use (a unique app instance id), we can use the Krackle Producer effectively. This means small savings in CPU and memory usage on each instance, but that can add up over a large number of instances to provide large savings.

Basic Usage

Properties props = ... // load some configuration properties
ProducerConfiguration conf = new ProducerConfiguration(props);
Producer producer = new Producer(conf, "clientId", "topic", "key");

// Use a byte buffer to store your data, and pass the data by referencing that.
byte[] buffer = new byte[1024];
while ( fillBuffer() ) { // Get some data in your buffer
    producer.send(buffer, offset, length);
}
producer.close();
Properties props = ... // load some configuration properties
ConsumerConfiguration conf = new ConsumerConfiguration(props);
Consumer consumer = new Consumer(conf, "clientId", "topic", "key");

// Use a byte buffer to store the message you retrieve.
byte[] buffer = new byte[1024];
int bytesRead;
while ( true ) {
    bytesRead = consumer.getMessage(buffer, 0, buffer.length);
    if (bytesRead != -1) {
        // the first bytesRead bytes of the buffer are the message.
    }
}
consumer.close();

Producer Configuration

Configuration is done via properties. Many of these are the same as the standard Java client.

property default description
metadata.broker.list (required) A comma separated list of seed brokers to connect to in order to get metadata about the cluster.
queue.buffering.max.ms 5000 Maximum time to buffer data. For example a setting of 100 will try to batch together 100ms of messages to send at once. This will improve throughput but adds message delivery latency due to the buffering.
request.required.acks 1 This value controls when a produce request is considered completed. Specifically, how many other brokers must have committed the data to their log and acknowledged this to the leader? Typical values are
  • 0, which means that the producer never waits for an acknowledgement from the broker (the same behavior as 0.7). This option provides the lowest latency but the weakest durability guarantees (some data will be lost when a server fails).
  • 1, which means that the producer gets an acknowledgement after the leader replica has received the data. This option provides better durability as the client waits until the server acknowledges the request as successful (only messages that were written to the now-dead leader but not yet replicated will be lost).
  • -1, which means that the producer gets an acknowledgement after all in-sync replicas have received the data. This option provides the best durability, we guarantee that no messages will be lost as long as at least one in sync replica remains.
request.timeout.ms 10000 The amount of time the broker will wait trying to meet the request.required.acks requirement before sending back an error to the client.
message.send.max.retries 3 This property will cause the producer to automatically retry a failed send request. This property specifies the number of retries when such failures occur. Note that setting a non-zero value here can lead to duplicates in the case of network errors that cause a message to be sent but the acknowledgement to be lost.
retry.backoff.ms 100 Before each retry, the producer refreshes the metadata of relevant topics to see if a new leader has been elected. Since leader election takes a bit of time, this property specifies the amount of time that the producer waits before refreshing the metadata.
topic.metadata.refresh.interval.ms 600 * 1000 The producer generally refreshes the topic metadata from brokers when there is a failure (partition missing, leader not available...). It will also poll regularly (default: every 10min so 600000ms). If you set this to a negative value, metadata will only get refreshed on failure. If you set this to zero, the metadata will get refreshed after each message sent (not recommended). Important note: the refresh happen only AFTER the message is sent, so if the producer never sends a message the metadata is never refreshed
message.buffer.size 1024*1024 The size of each buffer that is used to store raw messages before they are sent. Since a full buffer is sent at once, don't make this too big.
num.buffers 2 The number of buffers to use. At any given time, there is up to one buffer being filled with new data, up to one buffer having its data sent to the broker, and any number of buffers waiting to be filled and/or sent.

Essentially, the limit of the amount of data that can be queued at at any given time is message.buffer.size * num.buffers. Although, in reality, you won't get buffers to 100% full each time.

send.buffer.size message.buffer.size + 200 Size of the byte buffer used to store the final (with headers and compression applied) data to be sent to the broker.
compression.codec none This parameter allows you to specify the compression codec for all data generated by this producer. Valid values are "none", "gzip" and "snappy".
gzip.compression.level java.util.zip.Deflater.DEFAULT_COMPRESSION If compression.codec is set to gzip, then this allows configuration of the compression level.
  • -1: default compression level
  • 0: no compression
  • 1-9: 1=fastest compression ... 9=best compression
queue.enqueue.timeout.ms -1 The amount of time to block before dropping messages when all buffers are full. If set to 0 events will be enqueued immediately or dropped if the queue is full (the producer send call will never block). If set to -1 the producer will block indefinitely and never willingly drop a send.

Consumer Configuration

Configuration is done via properties. Many of these are the same as the standard Java client.

property default description
metadata.broker.list (required) A list of seed brokers to connect to in order to get information about the Kafka broker cluster.
fetch.message.max.bytes 1024 * 1024 The number of byes of messages to attempt to fetch for each topic-partition in each fetch request. These bytes will be read into memory for each partition, so this helps control the memory used by the consumer. The fetch request size must be at least as large as the maximum message size the server allows or else it is possible for the producer to send messages larger than the consumer can fetch.
fetch.wait.max.ms 100 The maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy fetch.min.bytes
fetch.min.bytes 1 The minimum amount of data the server should return for a fetch request. If insufficient data is available the request will wait for that much data to accumulate before answering the request.
socket.receive.buffer.bytes 64 * 1024 The socket receive buffer for network requests
auto.offset.reset largest What to do when there is no initial offset in ZooKeeper or if an offset is out of range:
  • smallest : automatically reset the offset to the smallest offset
  • largest : automatically reset the offset to the largest offset
  • anything else: throw exception to the consumer

Contributing

To contribute code to this repository you must be signed up as an official contributor.

Disclaimer

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.