Priority Kafka Client
Kafka Client that allows records to produce to and consume from Kafka on configured priority levels
Releases
Release | Date | Description |
---|---|---|
Version 1.0.0 | Oct 01 2018 | First release |
Changelog
Changelog can be viewed in CHANGELOG.md
Getting Started
This client provides abstraction to implement Kafka's Producer<K, V>
and Consumer<K, V>
with priority support.
It provides capacity burst
based general purpose prioritized Kafka queueing implementation. In this context, priority is a positive integer (N) with priority levels 0 < 1 < ... < N-1
PriorityKafkaProducer
Config | Mandatory | Description |
---|---|---|
max.priority | Yes | Defines max priority |
Rest of the configs are similar to that of KafkaProducer . |
PriorityKafkaProducer
takes in an additional arg of priority level Future<RecordMetadata> send(int priority, ProducerRecord<K, V> record)
. This is an indication to produce record on that priority level. Future<RecordMetadata> send(int priority, ProducerRecord<K, V> record)
defaults the record production on the lowest priority level 0. For every logical topic XYZ
- priority level 0 <= i < N
is backed by Kafka topic XYZ-i
CapacityBurstPriorityKafkaConsumer
Config | Mandatory | Description |
---|---|---|
max.priority | Yes | Defines max priority |
group.id | Yes | Defines logical group ID for the consumer |
max.poll.records | Yes | Defines max records to be polled across priority levels |
max.poll.history.window.size | No | This is window length to track historical counts of records obtained from poll() . Defaulted to 6 |
min.poll.window.maxout.threshold | No | This is threshold on how many historical counts os records obtained from poll() maxed out w.r.t. max.poll.records config. Defaulted to 4 |
Rest of the configs are similar to that of KafkaConsumer . |
CapacityBurstPriorityKafkaConsumer implementation maintains a KafkaConsumer
for every priority level 0 <= i < N
. For every logical topic XYZ
and logical group ID ABC
- priority level 0 <= i < N
consumer binds to Kafka topic XYZ-i
with group ID ABC-i
. This works in tandem with PriorityKafkaProducer
.
max.poll.records
property is split across priority topic consumers based on
maxPollRecordsDistributor
- defaulted to ExpMaxPollRecordsDistributor
. Rest of the KafkaConsumer
configs are passed as is to each of the priority topic consumers. Care has to be taken when defining max.partition.fetch.bytes
, fetch.max.bytes
and max.poll.interval.ms
as these values will be used as is across all the priority topic consumers.
Works on the idea of distributing max.poll.records
property across each of the priority topic consumers as their reserved capacity. Records are fetched sequentially from all priority level topics consumers which are configured with distributed max.poll.records
values. The distribution must reserve higher capacity or processing rate to higher priorities.
Caution 1
- if we have skewed partitions in priority level topics e.g. 10K records in a priority 2 partition, 100 records in a priority 1 partition, 10 records in a priority 0 partition that are assigned to different consumer threads, then the implementation will not synchronize across such consumers to regulate capacity and hence will fail to honour priority. So the producers must ensure there are no skewed partitions (e.g. using round-robin - this "may" imply there is no message ordering assumptions and the consumer may choose to process records in parallel by separating out fetching and processing concerns).
Caution 2
- If we have empty partitions in priority level topics e.g. no pending records in assigned priority 2 and 1 partitions, 10K records in priority 0 partition that are assigned to the same consumer thread, then we want priority 0 topic partition consumer to burst its capacity to max.poll.records
and not restrict itself to its reserved capacity based on maxPollRecordsDistributor
else the overall capacity will be under utilized.
This implementation will try to address cautions explained above. Every consumer object will have individual priority level topic consumers, with each priority level consumer having reserved capacity based on maxPollRecordsDistributor
. Each of the priority level topic consumer will try to burst into other priority level topic consumer's capacity in the group provided all the below are true:
- It is eligible to burst - This is if in the last
max.poll.history.window.size
attempts ofpoll()
atleastmin.poll.window.maxout.threshold
times it has received number of records equal to assignedmax.poll.records
which was distributed based onmaxPollRecordsDistributor
. This is an indication that the partition has more incoming records to be processed. - Higher priority level is not eligible to burst - There is no higher priority level topic consumer that is eligible to burst based on above logic. Basically give way to higher priorities.
If the above are true, then the priority level topic consumer will burst into all other priority level topic consumer capacities. The amount of burst per priority level topic consumer is equal to the least un-used capacity in the last max.poll.history.window.size
attempts of poll()
.
For example say we have:
max.priority = 3;
max.poll.records = 50;
maxPollRecordsDistributor = ExpMaxPollRecordsDistributor.instance();
max.poll.history.window.size = 6;
min.poll.window.maxout.threshold = 4;
Capacity distribution of max.poll.records
property is {2=29, 1=14, 0=7}
.
Case 1: FIFO view of poll window is
2 - [29, 29, 29, 29, 29, 29];
1 - [14, 14, 14, 14, 14, 14];
0 - [7, 7, 7, 7, 7, 7];
In this case every priority level topic consumer will retain its
capacity and not burst as everyone is eligible to burst but no
one is ready to give away reserved capacity.
Case 2: FIFO view of poll window is
2 - [29, 29, 29, 29, 29, 29];
1 - [14, 14, 14, 11, 10, 9];
0 - [7, 7, 7, 7, 7, 7];
In this case every priority level topic consumer will retain its
capacity and not burst as everyone is eligible to burst but no
one is ready to give away reserved capacity.
Case 3: FIFO view of poll window is
2 - [29, 29, 29, 29, 29, 29];
1 - [10, 10, 7, 10, 9, 0];
0 - [7, 7, 7, 7, 7, 7];
In this case priority level 2 topic consumer will burst into
priority level 1 topic consumer's capacity and steal (14 - 10 = 4),
hence increasing its capacity or max.poll.records property to
(29 + 4 = 33).
Case 3: FIFO view of poll window is
2 - [20, 25, 25, 20, 15, 10];
1 - [10, 10, 7, 10, 9, 0];
0 - [7, 7, 7, 7, 7, 7];
In this case priority level 0 topic consumer will burst into
priority level 2 and 1 topic consumer capacities and steal
(29 - 25 = 4 and 14 - 10 = 4), hence increasing its capacity
or max.poll.records property to (7 + 8 = 15).
Client in Action
Kafka broker version - 0.10.1.1
max.priority - 3
max.poll.records - 50
Records - 100000 per priority level (300000 overall)
Consumer Processing - 1 sec sleep for every record
Code for Experiments can be found at Example.java
Experiment 1 - Without prioritization
Producer
- Produce 100000 records for each of 3 priority levels to a single topic with 15 partitions using KafkaProducer
.
Consumer
- Maintain 5 consumer threads to fetch messages using KafkaConsumer
. Each consumer thread uses a Fixed threadpool of size 55 to process records parallely and commitSync once processing is done
Here we observe all the records are processed at the same rate without any notion of priority.
Experiment 2 - With prioritization
Producer
- Produce 100000 records for each of 3 priority levels to 3 topics with 5 partitions each using PriorityKafkaProducer
.
Consumer
- Maintain 5 consumer threads to fetch messages using CapacityBurstPriorityKafkaConsumer
. Each consumer thread uses a Fixed threadpool of size 55 to process records parallely and commitSync once processing is done
Here we observe priority level 2 records are processed at a faster rate compared to priority levels 1 and 0. Once priority level 2 records are exhausted, then priority level 1 takes over priority level 2's capacity and bursts its processing rate. Similarly once priority level 1 records drain, priority level 0 takes over capacities of priority levels 2 and 1 and bursts its processing rate.
Experiment 3 - With prioritization
This is an extension to Experiment 2
Here we observe initially priority level 2 records are processed at a faster rate compared to priority levels 1 and 0. Once priority level 2 records are exhausted, then priority level 1 takes over priority level 2's capacity and bursts its processing rate. But once priority level 2 records start pouring in, then priority level 1 unlocks the burst capacity and everyone fallback to their reserved capacities.
Contribution, Bugs and Feedback
For bugs, questions and discussions please use the Github Issues. Please follow the contribution guidelines when submitting pull requests.
License
Copyright 2018 Flipkart Internet, pvt ltd.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.