Krackle - A Low Overhead Kafka Client
While the standard Java Kafka client is easy to use, it does tend to have a high level of overhead. A lot of objects are created, only to be garbage collected very quickly, often within milliseconds on a heavily loaded producer.
Krackle is a Kafka 0.8 client designed to minimize the number of objects created, and therefore to reduce garbage collection overhead. In my tests this has reduced the CPU usage by 50% under heavy load.
I order to achieve these performance improvements, some compromises had to be made. In particular, this producer requires an instance for each topic and key, and the consumer requires an instance per partition. This means that it may not useful in the general case.
Example use case: You have thousands of applications running, and you are using Kafka to centralize the logs. Since each application will know upfront what topic it will log to, and the key it will use (a unique app instance id), we can use the Krackle Producer effectively. This means small savings in CPU and memory usage on each instance, but that can add up over a large number of instances to provide large savings.
Basic Usage
Properties props = ... // load some configuration properties
ProducerConfiguration conf = new ProducerConfiguration(props);
Producer producer = new Producer(conf, "clientId", "topic", "key");
// Use a byte buffer to store your data, and pass the data by referencing that.
byte[] buffer = new byte[1024];
while ( fillBuffer() ) { // Get some data in your buffer
producer.send(buffer, offset, length);
}
producer.close();
Properties props = ... // load some configuration properties
ConsumerConfiguration conf = new ConsumerConfiguration(props);
Consumer consumer = new Consumer(conf, "clientId", "topic", "key");
// Use a byte buffer to store the message you retrieve.
byte[] buffer = new byte[1024];
int bytesRead;
while ( true ) {
bytesRead = consumer.getMessage(buffer, 0, buffer.length);
if (bytesRead != -1) {
// the first bytesRead bytes of the buffer are the message.
}
}
consumer.close();
Producer Configuration
Configuration is done via properties. Many of these are the same as the standard Java client.
property | default | description |
---|---|---|
metadata.broker.list | (required) A comma separated list of seed brokers to connect to in order to get metadata about the cluster. | |
queue.buffering.max.ms | 5000 | Maximum time to buffer data. For example a setting of 100 will try to batch together 100ms of messages to send at once. This will improve throughput but adds message delivery latency due to the buffering. |
request.required.acks | 1 | This value controls when a produce request is considered completed.
Specifically, how many other brokers must have committed the data to their
log and acknowledged this to the leader? Typical values are
|
request.timeout.ms | 10000 | The amount of time the broker will wait trying to meet the request.required.acks requirement before sending back an error to the client. |
message.send.max.retries | 3 | This property will cause the producer to automatically retry a failed send request. This property specifies the number of retries when such failures occur. Note that setting a non-zero value here can lead to duplicates in the case of network errors that cause a message to be sent but the acknowledgement to be lost. |
retry.backoff.ms | 100 | Before each retry, the producer refreshes the metadata of relevant topics to see if a new leader has been elected. Since leader election takes a bit of time, this property specifies the amount of time that the producer waits before refreshing the metadata. |
topic.metadata.refresh.interval.ms | 600 * 1000 | The producer generally refreshes the topic metadata from brokers when there is a failure (partition missing, leader not available...). It will also poll regularly (default: every 10min so 600000ms). If you set this to a negative value, metadata will only get refreshed on failure. If you set this to zero, the metadata will get refreshed after each message sent (not recommended). Important note: the refresh happen only AFTER the message is sent, so if the producer never sends a message the metadata is never refreshed |
message.buffer.size | 1024*1024 | The size of each buffer that is used to store raw messages before they are sent. Since a full buffer is sent at once, don't make this too big. |
num.buffers | 2 | The number of buffers to use. At any given time, there is up to one
buffer being filled with new data, up to one buffer having its data sent to
the broker, and any number of buffers waiting to be filled and/or sent.
Essentially, the limit of the amount of data that can be queued at at any given time is message.buffer.size * num.buffers. Although, in reality, you won't get buffers to 100% full each time. |
send.buffer.size | message.buffer.size + 200 | Size of the byte buffer used to store the final (with headers and compression applied) data to be sent to the broker. |
compression.codec | none | This parameter allows you to specify the compression codec for all data generated by this producer. Valid values are "none", "gzip" and "snappy". |
gzip.compression.level | java.util.zip.Deflater.DEFAULT_COMPRESSION | If compression.codec is set to gzip, then this allows configuration of
the compression level.
|
queue.enqueue.timeout.ms | -1 | The amount of time to block before dropping messages when all buffers are full. If set to 0 events will be enqueued immediately or dropped if the queue is full (the producer send call will never block). If set to -1 the producer will block indefinitely and never willingly drop a send. |
Consumer Configuration
Configuration is done via properties. Many of these are the same as the standard Java client.
property | default | description |
---|---|---|
metadata.broker.list | (required) A list of seed brokers to connect to in order to get information about the Kafka broker cluster. | |
fetch.message.max.bytes | 1024 * 1024 | The number of byes of messages to attempt to fetch for each topic-partition in each fetch request. These bytes will be read into memory for each partition, so this helps control the memory used by the consumer. The fetch request size must be at least as large as the maximum message size the server allows or else it is possible for the producer to send messages larger than the consumer can fetch. |
fetch.wait.max.ms | 100 | The maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy fetch.min.bytes |
fetch.min.bytes | 1 | The minimum amount of data the server should return for a fetch request. If insufficient data is available the request will wait for that much data to accumulate before answering the request. |
socket.receive.buffer.bytes | 64 * 1024 | The socket receive buffer for network requests |
auto.offset.reset | largest | What to do when there is no initial offset in ZooKeeper or if an offset
is out of range:
|
Contributing
To contribute code to this repository you must be signed up as an official contributor.
Disclaimer
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.