Queue for multi-thread consumers. How to organize consumers for multi-thread access?
polyakovmyu opened this issue · 7 comments
Hello!
Assume that we have 1 pool of values for multiple threads and we want to get fresh value every time we ask for it.
Example of it is VTS which I used with LoadRunner. HTTP Simple Table Server has the same idea.
When I launch 2 threads I get exception: KafkaConsumer is not safe for multi-threaded access.
How should I configure consumers to get fresh values with multiple threads and is it possible?
Hello,
This plugin is designed in such a way to support the multi threaded functionality.
Is it possible for you to attach a sample test plan, I will have a look at it and come back at the earliest?
I have made simple test plan with all used plugins, modified elements were marked with >, except kafka elements.
jmeter version 5.6.3
java version 17
Kafka_playground.zip
Thanks for addressing this issue.
I have validated it, it is throwing error while running the consumer with multiple threads
2024-02-21 16:33:09,684 ERROR o.a.j.t.JMeterThread: Error while processing sampler: 'Thread-2-Read'.
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2495) ~[kafka-clients-3.3.1.jar:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2479) ~[kafka-clients-3.3.1.jar:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1227) ~[kafka-clients-3.3.1.jar:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1220) ~[kafka-clients-3.3.1.jar:?]
at com.di.jmeter.kafka.sampler.KafkaConsumerSampler.getConsumerRecords(KafkaConsumerSampler.java:90) ~[di-kafkameter-1.2.jar:?]
at com.di.jmeter.kafka.sampler.KafkaConsumerSampler.sample(KafkaConsumerSampler.java:73) ~[di-kafkameter-1.2.jar:?]
at org.apache.jmeter.threads.JMeterThread.doSampling(JMeterThread.java:651) ~[ApacheJMeter_core.jar:5.6.2]
at org.apache.jmeter.threads.JMeterThread.executeSamplePackage(JMeterThread.java:570) ~[ApacheJMeter_core.jar:5.6.2]
at org.apache.jmeter.threads.JMeterThread.processSampler(JMeterThread.java:501) [ApacheJMeter_core.jar:5.6.2]
at org.apache.jmeter.threads.JMeterThread.run(JMeterThread.java:268) [ApacheJMeter_core.jar:5.6.2]
at java.lang.Thread.run(Thread.java:829) [?:?]
I will release a patch soon to fix this.
Keep supporting by hitting a star and open up issues, if you have bug
Hey @polyakovmyu
After careful read about the consumer group, I came to know that, The Kafka consumer is not thread-safe, and multi-threaded access must be properly synchronized, which can be complex.
Typically, a single-threaded model is used where each consumer in a consumer group is mapped to a partition of the topic.
If you want to use multiple threads, you should ensure that each thread is consuming from a different partition to avoid conflicts and potential data inconsistencies
https://www.confluent.io/blog/kafka-consumer-multi-threaded-messaging/
https://stackoverflow.com/questions/44587416/kafka-single-consumer-group-in-multiple-instances
So, you should try reading each thread to partition instead of reading from a global factor on where the broker decides
I will look for other options to make it multi threaded - until then, will keep this issue open. If you come across any implementation idea, pls lmk
My solution is simple table server plugin - enough for my tasks.
It would be a problem in case you want to load kafka itself.
Reading is not always a bottleneck its the insert. Btw, will update here when I implement a logic
I have a couple of options in mind but it won't be a replica of the real time scenario
bump