Endless waiting for the consumer when trying to read all sent messages in the topic.
bloodgang94 opened this issue · 4 comments
Hello! Thanks for your work.
I have a question.
I am using your example https://github.com/mostafa/xk6-kafka/blob/main/scripts/test_consumer_group.js
and ran into the problem that if you try to subtract all 300 messages, then the consumer waits indefinitely. And this happens somewhere in 3 out of 5 attempts.
Full code
/*
This is a k6 test script that imports the xk6-kafka and
tests Kafka with a 200 JSON messages per iteration.
*/
import { check } from "k6";
// import * as kafka from "k6/x/kafka";
import {
Writer,
Reader,
Connection,
SchemaRegistry,
CODEC_SNAPPY,
SCHEMA_TYPE_JSON,
} from "k6/x/kafka"; // import kafka extension
// Prints module-level constants
// console.log(kafka);
const brokers = ["localhost:9092"];
const topic = "test";
const groupID = "my-group";
const writer = new Writer({
brokers: brokers,
topic: topic,
compression: CODEC_SNAPPY,
});
const reader = new Reader({
brokers: brokers,
groupID: groupID,
groupTopics: [topic],
});
const connection = new Connection({
address: brokers[0],
});
const schemaRegistry = new SchemaRegistry();
if (__VU == 0) {
connection.createTopic({
topic: topic,
numPartitions: 3,
replicationFactor: 1,
configEntries: [
{
configName: "compression.type",
configValue: CODEC_SNAPPY,
},
],
});
}
export const options = {
thresholds: {
// Base thresholds to see if the writer or reader is working
kafka_writer_error_count: ["count == 0"],
kafka_reader_error_count: ["count == 0"],
},
};
export default function () {
let messages = [];
for (let i = 0; i < 100; i++) {
for (let partition = 0; partition < 3; partition++) {
messages.push({
// The data type of the key is JSON
key: schemaRegistry.serialize({
data: {
key: "value",
},
schemaType: SCHEMA_TYPE_JSON,
}),
// The data type of the value is JSON
value: schemaRegistry.serialize({
data: {
key: "value",
},
schemaType: SCHEMA_TYPE_JSON,
}),
parition: partition,
});
}
}
writer.produce({ messages: messages });
// Read one message only
messages = reader.consume({ limit: 300 });
check(messages, {
"10 messages is received": (messages) => messages.length == 300,
});
check(messages[0], {
"Topic equals to xk6_kafka_consumer_group_topic": (msg) =>
msg["topic"] == topic,
"Key contains key/value and is JSON": (msg) =>
schemaRegistry.deserialize({
data: msg.key,
schemaType: SCHEMA_TYPE_JSON,
}).key == "value",
"Value contains key/value and is JSON": (msg) =>
typeof schemaRegistry.deserialize({
data: msg.value,
schemaType: SCHEMA_TYPE_JSON,
}) == "object" &&
schemaRegistry.deserialize({
data: msg.value,
schemaType: SCHEMA_TYPE_JSON,
}).key == "value",
});
}
export function teardown(data) {
if (__VU == 0) {
// Delete the topic
connection.deleteTopic(topic);
}
writer.close();
reader.close();
connection.close();
}
logs
time="2023-03-13T09:02:59+03:00" level=info msg="entering loop for consumer group, my-group\n"
time="2023-03-13T09:02:59+03:00" level=info msg="entering loop for consumer group, my-group\n"
time="2023-03-13T09:03:04+03:00" level=info msg="joined group my-group as member k6@QPRM-FVSC1F3 (github.com/segmentio/kafka-go)-de501498-6407-4c63-8e28-15f52cd51a41 in generation 10"tions
time="2023-03-13T09:03:04+03:00" level=info msg="selected as leader for group, my-group\n" VU
time="2023-03-13T09:03:04+03:00" level=info msg="joined group my-group as member k6@QPRM-FVSC1F3 (github.com/segmentio/kafka-go)-87f19579-d19e-4e5c-aed6-b3b2bc286b5f in generation 10"
time="2023-03-13T09:03:04+03:00" level=info msg="joinGroup succeeded for response, my-group. generationID=10, memberID=k6@QPRM-FVSC1F3 (github.com/segmentio/kafka-go)-87f19579-d19e-4e5c-aed6-b3b2bc286b5f"
time="2023-03-13T09:03:04+03:00" level=info msg="Joined group my-group as member k6@QPRM-FVSC1F3 (github.com/segmentio/kafka-go)-87f19579-d19e-4e5c-aed6-b3b2bc286b5f in generation 10"
time="2023-03-13T09:03:04+03:00" level=info msg="using 'range' balancer to assign group, my-group"
time="2023-03-13T09:03:04+03:00" level=info msg="found member: k6@QPRM-FVSC1F3 (github.com/segmentio/kafka-go)-de501498-6407-4c63-8e28-15f52cd51a41/[]byte(nil)"
time="2023-03-13T09:03:04+03:00" level=info msg="found member: k6@QPRM-FVSC1F3 (github.com/segmentio/kafka-go)-87f19579-d19e-4e5c-aed6-b3b2bc286b5f/[]byte(nil)"
time="2023-03-13T09:03:04+03:00" level=info msg="found topic/partition: test/0"
time="2023-03-13T09:03:04+03:00" level=info msg="assigned member/topic/partitions k6@QPRM-FVSC1F3 (github.com/segmentio/kafka-go)-de501498-6407-4c63-8e28-15f52cd51a41/test/[0]"
time="2023-03-13T09:03:04+03:00" level=info msg="joinGroup succeeded for response, my-group. generationID=10, memberID=k6@QPRM-FVSC1F3 (github.com/segmentio/kafka-go)-de501498-6407-4c63-8e28-15f52cd51a41"
time="2023-03-13T09:03:04+03:00" level=info msg="Joined group my-group as member k6@QPRM-FVSC1F3 (github.com/segmentio/kafka-go)-de501498-6407-4c63-8e28-15f52cd51a41 in generation 10"
time="2023-03-13T09:03:04+03:00" level=info msg="Syncing 2 assignments for generation 10 as member k6@QPRM-FVSC1F3 (github.com/segmentio/kafka-go)-de501498-6407-4c63-8e28-15f52cd51a41"
time="2023-03-13T09:03:04+03:00" level=info msg="received empty assignments for group, my-group as member k6@QPRM-FVSC1F3 (github.com/segmentio/kafka-go)-87f19579-d19e-4e5c-aed6-b3b2bc286b5f for generation 10"
time="2023-03-13T09:03:04+03:00" level=info msg="sync group finished for group, my-group"
time="2023-03-13T09:03:04+03:00" level=info msg="sync group finished for group, my-group"
running (00m05.0s), 1/1 VUs, 0 complete and 0 interrupted iterations
default [--------------------------------------time="2023-03-13T09:03:04+03:00" level=info msg="started heartbeat for group, my-group [3s]"
time="2023-03-13T09:03:04+03:00" level=info msg="subscribed to topics and partitions: map[]"
] 1 VUs 00m05.0s/10m0s 0/1 iters, 1 per VUtime="2023-03-13T09:03:04+03:00" level=info msg="started heartbeat for group, my-group [3s]"
time="2023-03-13T09:03:04+03:00" level=info msg="initializing kafka reader for partition 0 of test starting at offset first offset"
time="2023-03-13T09:03:04+03:00" level=info msg="subscribed to topics and partitions: map[{topic:test partition:0}:-2]"
I have tried the rebalanceTimeout and sessionTimeout settings but without success.
Could you suggest what I am doing wrong?
I ran your exact script (with no modifications) for a minute with 50 VUs and this is the result:
View results
./k6 run --vus 50 --duration 60s test-kafka.js
/\ |‾‾| /‾‾/ /‾‾/
/\ / \ | |/ / / /
/ \/ \ | ( / ‾‾\
/ \ | |\ \ | (‾) |
/ __________ \ |__| \__\ \_____/ .io
execution: local
script: /mnt/c/Users/Mostafa Moradian/Desktop/test-kafka.js
output: engine
scenarios: (100.00%) 1 scenario, 50 max VUs, 1m30s max duration (incl. graceful stop):
* default: 50 looping VUs for 1m0s (gracefulStop: 30s)
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
✓ 10 messages is received
✓ Topic equals to xk6_kafka_consumer_group_topic
✓ Key contains key/value and is JSON
✓ Value contains key/value and is JSON
█ teardown
checks.........................: 100.00% ✓ 1764 ✗ 0
data_received..................: 0 B 0 B/s
data_sent......................: 0 B 0 B/s
iteration_duration.............: avg=137.67ms min=122.3ms med=131.32ms max=941ms p(90)=147.73ms p(95)=155.48ms
iterations.....................: 441 4.857311/s
kafka_reader_dial_count........: 1 0.011014/s
kafka_reader_dial_seconds......: avg=4.13µs min=0s med=0s max=1.82ms p(90)=0s p(95)=0s
✓ kafka_reader_error_count.......: 0 0/s
kafka_reader_fetch_bytes.......: 7.5 MB 83 kB/s
kafka_reader_fetch_bytes_max...: 1000000 min=1000000 max=1000000
kafka_reader_fetch_bytes_min...: 1 min=1 max=1
kafka_reader_fetch_size........: 124995 1376.733793/s
kafka_reader_fetch_wait_max....: 10s min=10s max=10s
kafka_reader_fetches_count.....: 16 0.176229/s
kafka_reader_lag...............: 7400 min=6500 max=14600
kafka_reader_message_bytes.....: 4.0 MB 44 kB/s
kafka_reader_message_count.....: 132401 1458.30578/s
kafka_reader_offset............: 132400 min=400 max=132400
kafka_reader_queue_capacity....: 100 min=100 max=100
kafka_reader_queue_length......: 100 min=99 max=100
kafka_reader_read_seconds......: avg=127.24ms min=0s med=0s max=4.01s p(90)=0s p(95)=0s
kafka_reader_rebalance_count...: 1 0.011014/s
kafka_reader_timeouts_count....: 0 0/s
kafka_reader_wait_seconds......: avg=32.96µs min=0s med=0s max=1.54ms p(90)=0s p(95)=0s
kafka_writer_acks_required.....: 0 min=0 max=0
kafka_writer_async.............: 0.00% ✓ 0 ✗ 490
kafka_writer_attempts_max......: 0 min=0 max=0
kafka_writer_batch_bytes.......: 26 kB 281 B/s
kafka_writer_batch_max.........: 1 min=1 max=1
kafka_writer_batch_size........: 490 5.397012/s
kafka_writer_batch_timeout.....: 0s min=0s max=0s
✓ kafka_writer_error_count.......: 0 0/s
kafka_writer_message_bytes.....: 7.6 MB 84 kB/s
kafka_writer_message_count.....: 147000 1619.103705/s
kafka_writer_read_timeout......: 0s min=0s max=0s
kafka_writer_retries_count.....: 0 0/s
kafka_writer_wait_seconds......: avg=0s min=0s med=0s max=0s p(90)=0s p(95)=0s
kafka_writer_write_count.......: 147000 1619.103705/s
kafka_writer_write_seconds.....: avg=30.37µs min=12.5µs med=23.17µs max=163.86µs p(90)=50.43µs p(95)=78.4µs
kafka_writer_write_timeout.....: 0s min=0s max=0s
vus............................: 0 min=0 max=50
vus_max........................: 50 min=50 max=50
running (1m30.8s), 00/50 VUs, 441 complete and 49 interrupted iterations
default ✓ [======================================] 50 VUs 1m0s
I am aware of the reader errors, as you see in the logs. The writer generated 147k messages and the reader consumed 132k. The number of errors and your limit of 300 correlates to the difference between the messages produced vs. consumed. I tweaked the reader config:
import {
Writer,
Reader,
Connection,
SchemaRegistry,
CODEC_SNAPPY,
SCHEMA_TYPE_JSON,
SECOND,
} from "k6/x/kafka"
const reader = new Reader({
brokers: brokers,
groupID: groupID,
groupTopics: [topic],
queueCapacity: 300,
readBatchTimeout: 20 * SECOND,
maxBytes: 10000000, // 10 MB
readLagInterval: 10 * SECOND,
})
And this is the best result I can get:
View results
./k6 run --vus 50 --duration 60s test-kafka.js
/\ |‾‾| /‾‾/ /‾‾/
/\ / \ | |/ / / /
/ \/ \ | ( / ‾‾\
/ \ | |\ \ | (‾) |
/ __________ \ |__| \__\ \_____/ .io
execution: local
script: /mnt/c/Users/Mostafa Moradian/Desktop/test-kafka.js
output: engine
scenarios: (100.00%) 1 scenario, 50 max VUs, 1m30s max duration (incl. graceful stop):
* default: 50 looping VUs for 1m0s (gracefulStop: 30s)
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
ERRO[0090] Unable to read messages. error="Unable to read messages."
✓ 10 messages is received
✓ Topic equals to xk6_kafka_consumer_group_topic
✓ Key contains key/value and is JSON
✓ Value contains key/value and is JSON
█ teardown
checks.........................: 100.00% ✓ 1808 ✗ 0
data_received..................: 0 B 0 B/s
data_sent......................: 0 B 0 B/s
iteration_duration.............: avg=134.43ms min=122.3ms med=128.82ms max=998.19ms p(90)=138.76ms p(95)=145.67ms
iterations.....................: 452 4.971897/s
kafka_reader_dial_count........: 1 0.011/s
kafka_reader_dial_seconds......: avg=2.98µs min=0s med=0s max=1.35ms p(90)=0s p(95)=0s
✓ kafka_reader_error_count.......: 0 0/s
kafka_reader_fetch_bytes.......: 7.8 MB 86 kB/s
kafka_reader_fetch_bytes_max...: 10000000 min=10000000 max=10000000
kafka_reader_fetch_bytes_min...: 1 min=1 max=1
kafka_reader_fetch_size........: 130200 1432.170354/s
kafka_reader_fetch_wait_max....: 10s min=10s max=10s
kafka_reader_fetches_count.....: 10 0.109998/s
kafka_reader_lag...............: 8700 min=0 max=14400
kafka_reader_message_bytes.....: 4.1 MB 45 kB/s
kafka_reader_message_count.....: 135901 1494.880056/s
kafka_reader_offset............: 135900 min=600 max=135900
kafka_reader_queue_capacity....: 300 min=300 max=300
kafka_reader_queue_length......: 300 min=300 max=300
kafka_reader_read_seconds......: avg=125.21ms min=0s med=0s max=6.52s p(90)=0s p(95)=0s
kafka_reader_rebalance_count...: 1 0.011/s
kafka_reader_timeouts_count....: 0 0/s
kafka_reader_wait_seconds......: avg=17.22µs min=0s med=0s max=1.14ms p(90)=0s p(95)=0s
kafka_writer_acks_required.....: 0 min=0 max=0
kafka_writer_async.............: 0.00% ✓ 0 ✗ 501
kafka_writer_attempts_max......: 0 min=0 max=0
kafka_writer_batch_bytes.......: 26 kB 287 B/s
kafka_writer_batch_max.........: 1 min=1 max=1
kafka_writer_batch_size........: 501 5.510886/s
kafka_writer_batch_timeout.....: 0s min=0s max=0s
✓ kafka_writer_error_count.......: 0 0/s
kafka_writer_message_bytes.....: 7.8 MB 86 kB/s
kafka_writer_message_count.....: 150300 1653.265778/s
kafka_writer_read_timeout......: 0s min=0s max=0s
kafka_writer_retries_count.....: 0 0/s
kafka_writer_wait_seconds......: avg=0s min=0s med=0s max=0s p(90)=0s p(95)=0s
kafka_writer_write_count.......: 150300 1653.265778/s
kafka_writer_write_seconds.....: avg=34.71µs min=11.38µs med=21.61µs max=406.52µs p(90)=45.03µs p(95)=130.23µs
kafka_writer_write_timeout.....: 0s min=0s max=0s
vus............................: 0 min=0 max=50
vus_max........................: 50 min=50 max=50
running (1m30.9s), 00/50 VUs, 452 complete and 49 interrupted iterations
default ✓ [======================================] 50 VUs 1m0s
I'll close this due to inactivity. Feel free to re-open it if you still have the issue.