mostafa/xk6-kafka

Endless waiting for the consumer when trying to read all sent messages in the topic.

bloodgang94 opened this issue · 4 comments

Hello! Thanks for your work.
I have a question.
I am using your example https://github.com/mostafa/xk6-kafka/blob/main/scripts/test_consumer_group.js
and ran into the problem that if you try to subtract all 300 messages, then the consumer waits indefinitely. And this happens somewhere in 3 out of 5 attempts.
Full code

/*
This is a k6 test script that imports the xk6-kafka and
tests Kafka with a 200 JSON messages per iteration.
*/

import { check } from "k6";
// import * as kafka from "k6/x/kafka";
import {
  Writer,
  Reader,
  Connection,
  SchemaRegistry,
  CODEC_SNAPPY,
  SCHEMA_TYPE_JSON,
} from "k6/x/kafka"; // import kafka extension

// Prints module-level constants
// console.log(kafka);

const brokers = ["localhost:9092"];
const topic = "test";
const groupID = "my-group";

const writer = new Writer({
  brokers: brokers,
  topic: topic,
  compression: CODEC_SNAPPY,
});
const reader = new Reader({
  brokers: brokers,
  groupID: groupID,
  groupTopics: [topic],
});
const connection = new Connection({
  address: brokers[0],
});
const schemaRegistry = new SchemaRegistry();

if (__VU == 0) {
  connection.createTopic({
    topic: topic,
    numPartitions: 3,
    replicationFactor: 1,
    configEntries: [
      {
        configName: "compression.type",
        configValue: CODEC_SNAPPY,
      },
    ],
  });
}

export const options = {
  thresholds: {
    // Base thresholds to see if the writer or reader is working
    kafka_writer_error_count: ["count == 0"],
    kafka_reader_error_count: ["count == 0"],
  },
};

export default function () {
  let messages = [];
  for (let i = 0; i < 100; i++) {
    for (let partition = 0; partition < 3; partition++) {
      messages.push({
        // The data type of the key is JSON
        key: schemaRegistry.serialize({
          data: {
            key: "value",
          },
          schemaType: SCHEMA_TYPE_JSON,
        }),
        // The data type of the value is JSON
        value: schemaRegistry.serialize({
          data: {
            key: "value",
          },
          schemaType: SCHEMA_TYPE_JSON,
        }),
        parition: partition,
      });
    }
  }

  writer.produce({ messages: messages });

  // Read one message only
  messages = reader.consume({ limit: 300 });

  check(messages, {
    "10 messages is received": (messages) => messages.length == 300,
  });

  check(messages[0], {
    "Topic equals to xk6_kafka_consumer_group_topic": (msg) =>
      msg["topic"] == topic,
    "Key contains key/value and is JSON": (msg) =>
      schemaRegistry.deserialize({
        data: msg.key,
        schemaType: SCHEMA_TYPE_JSON,
      }).key == "value",
    "Value contains key/value and is JSON": (msg) =>
      typeof schemaRegistry.deserialize({
        data: msg.value,
        schemaType: SCHEMA_TYPE_JSON,
      }) == "object" &&
      schemaRegistry.deserialize({
        data: msg.value,
        schemaType: SCHEMA_TYPE_JSON,
      }).key == "value",
  });
}


export function teardown(data) {
	  if (__VU == 0) {
    // Delete the topic
    connection.deleteTopic(topic);
  }
  writer.close();
  reader.close();
  connection.close();
}

logs

time="2023-03-13T09:02:59+03:00" level=info msg="entering loop for consumer group, my-group\n"
time="2023-03-13T09:02:59+03:00" level=info msg="entering loop for consumer group, my-group\n"
time="2023-03-13T09:03:04+03:00" level=info msg="joined group my-group as member k6@QPRM-FVSC1F3 (github.com/segmentio/kafka-go)-de501498-6407-4c63-8e28-15f52cd51a41 in generation 10"tions
time="2023-03-13T09:03:04+03:00" level=info msg="selected as leader for group, my-group\n" VU
time="2023-03-13T09:03:04+03:00" level=info msg="joined group my-group as member k6@QPRM-FVSC1F3 (github.com/segmentio/kafka-go)-87f19579-d19e-4e5c-aed6-b3b2bc286b5f in generation 10"
time="2023-03-13T09:03:04+03:00" level=info msg="joinGroup succeeded for response, my-group.  generationID=10, memberID=k6@QPRM-FVSC1F3 (github.com/segmentio/kafka-go)-87f19579-d19e-4e5c-aed6-b3b2bc286b5f"
time="2023-03-13T09:03:04+03:00" level=info msg="Joined group my-group as member k6@QPRM-FVSC1F3 (github.com/segmentio/kafka-go)-87f19579-d19e-4e5c-aed6-b3b2bc286b5f in generation 10"
time="2023-03-13T09:03:04+03:00" level=info msg="using 'range' balancer to assign group, my-group"
time="2023-03-13T09:03:04+03:00" level=info msg="found member: k6@QPRM-FVSC1F3 (github.com/segmentio/kafka-go)-de501498-6407-4c63-8e28-15f52cd51a41/[]byte(nil)"
time="2023-03-13T09:03:04+03:00" level=info msg="found member: k6@QPRM-FVSC1F3 (github.com/segmentio/kafka-go)-87f19579-d19e-4e5c-aed6-b3b2bc286b5f/[]byte(nil)"
time="2023-03-13T09:03:04+03:00" level=info msg="found topic/partition: test/0"
time="2023-03-13T09:03:04+03:00" level=info msg="assigned member/topic/partitions k6@QPRM-FVSC1F3 (github.com/segmentio/kafka-go)-de501498-6407-4c63-8e28-15f52cd51a41/test/[0]"
time="2023-03-13T09:03:04+03:00" level=info msg="joinGroup succeeded for response, my-group.  generationID=10, memberID=k6@QPRM-FVSC1F3 (github.com/segmentio/kafka-go)-de501498-6407-4c63-8e28-15f52cd51a41"
time="2023-03-13T09:03:04+03:00" level=info msg="Joined group my-group as member k6@QPRM-FVSC1F3 (github.com/segmentio/kafka-go)-de501498-6407-4c63-8e28-15f52cd51a41 in generation 10"
time="2023-03-13T09:03:04+03:00" level=info msg="Syncing 2 assignments for generation 10 as member k6@QPRM-FVSC1F3 (github.com/segmentio/kafka-go)-de501498-6407-4c63-8e28-15f52cd51a41"
time="2023-03-13T09:03:04+03:00" level=info msg="received empty assignments for group, my-group as member k6@QPRM-FVSC1F3 (github.com/segmentio/kafka-go)-87f19579-d19e-4e5c-aed6-b3b2bc286b5f for generation 10"
time="2023-03-13T09:03:04+03:00" level=info msg="sync group finished for group, my-group"
time="2023-03-13T09:03:04+03:00" level=info msg="sync group finished for group, my-group"

running (00m05.0s), 1/1 VUs, 0 complete and 0 interrupted iterations
default   [--------------------------------------time="2023-03-13T09:03:04+03:00" level=info msg="started heartbeat for group, my-group [3s]"
time="2023-03-13T09:03:04+03:00" level=info msg="subscribed to topics and partitions: map[]"
] 1 VUs  00m05.0s/10m0s  0/1 iters, 1 per VUtime="2023-03-13T09:03:04+03:00" level=info msg="started heartbeat for group, my-group [3s]"

time="2023-03-13T09:03:04+03:00" level=info msg="initializing kafka reader for partition 0 of test starting at offset first offset"
time="2023-03-13T09:03:04+03:00" level=info msg="subscribed to topics and partitions: map[{topic:test partition:0}:-2]"

image
I have tried the rebalanceTimeout and sessionTimeout settings but without success.

Could you suggest what I am doing wrong?

@bloodgang94

I ran your exact script (with no modifications) for a minute with 50 VUs and this is the result:

View results
./k6 run --vus 50 --duration 60s test-kafka.js

          /\      |‾‾| /‾‾/   /‾‾/
     /\  /  \     |  |/  /   /  /
    /  \/    \    |     (   /   ‾‾\
   /          \   |  |\  \ |  (‾)  |
  / __________ \  |__| \__\ \_____/ .io

  execution: local
     script: /mnt/c/Users/Mostafa Moradian/Desktop/test-kafka.js
     output: engine

  scenarios: (100.00%) 1 scenario, 50 max VUs, 1m30s max duration (incl. graceful stop):
           * default: 50 looping VUs for 1m0s (gracefulStop: 30s)

ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."

     ✓ 10 messages is received
     ✓ Topic equals to xk6_kafka_consumer_group_topic
     ✓ Key contains key/value and is JSON
     ✓ Value contains key/value and is JSON

     █ teardown

     checks.........................: 100.00% ✓ 1764        ✗ 0
     data_received..................: 0 B     0 B/s
     data_sent......................: 0 B     0 B/s
     iteration_duration.............: avg=137.67ms min=122.3ms med=131.32ms max=941ms    p(90)=147.73ms p(95)=155.48ms
     iterations.....................: 441     4.857311/s
     kafka_reader_dial_count........: 1       0.011014/s
     kafka_reader_dial_seconds......: avg=4.13µs   min=0s      med=0s       max=1.82ms   p(90)=0s       p(95)=0s
   ✓ kafka_reader_error_count.......: 0       0/s
     kafka_reader_fetch_bytes.......: 7.5 MB  83 kB/s
     kafka_reader_fetch_bytes_max...: 1000000 min=1000000   max=1000000
     kafka_reader_fetch_bytes_min...: 1       min=1         max=1
     kafka_reader_fetch_size........: 124995  1376.733793/s
     kafka_reader_fetch_wait_max....: 10s     min=10s       max=10s
     kafka_reader_fetches_count.....: 16      0.176229/s
     kafka_reader_lag...............: 7400    min=6500      max=14600
     kafka_reader_message_bytes.....: 4.0 MB  44 kB/s
     kafka_reader_message_count.....: 132401  1458.30578/s
     kafka_reader_offset............: 132400  min=400       max=132400
     kafka_reader_queue_capacity....: 100     min=100       max=100
     kafka_reader_queue_length......: 100     min=99        max=100
     kafka_reader_read_seconds......: avg=127.24ms min=0s      med=0s       max=4.01s    p(90)=0s       p(95)=0s
     kafka_reader_rebalance_count...: 1       0.011014/s
     kafka_reader_timeouts_count....: 0       0/s
     kafka_reader_wait_seconds......: avg=32.96µs  min=0s      med=0s       max=1.54ms   p(90)=0s       p(95)=0s
     kafka_writer_acks_required.....: 0       min=0         max=0
     kafka_writer_async.............: 0.00%   ✓ 0           ✗ 490
     kafka_writer_attempts_max......: 0       min=0         max=0
     kafka_writer_batch_bytes.......: 26 kB   281 B/s
     kafka_writer_batch_max.........: 1       min=1         max=1
     kafka_writer_batch_size........: 490     5.397012/s
     kafka_writer_batch_timeout.....: 0s      min=0s        max=0s
   ✓ kafka_writer_error_count.......: 0       0/s
     kafka_writer_message_bytes.....: 7.6 MB  84 kB/s
     kafka_writer_message_count.....: 147000  1619.103705/s
     kafka_writer_read_timeout......: 0s      min=0s        max=0s
     kafka_writer_retries_count.....: 0       0/s
     kafka_writer_wait_seconds......: avg=0s       min=0s      med=0s       max=0s       p(90)=0s       p(95)=0s
     kafka_writer_write_count.......: 147000  1619.103705/s
     kafka_writer_write_seconds.....: avg=30.37µs  min=12.5µs  med=23.17µs  max=163.86µs p(90)=50.43µs  p(95)=78.4µs
     kafka_writer_write_timeout.....: 0s      min=0s        max=0s
     vus............................: 0       min=0         max=50
     vus_max........................: 50      min=50        max=50


running (1m30.8s), 00/50 VUs, 441 complete and 49 interrupted iterations
default ✓ [======================================] 50 VUs  1m0s

I am aware of the reader errors, as you see in the logs. The writer generated 147k messages and the reader consumed 132k. The number of errors and your limit of 300 correlates to the difference between the messages produced vs. consumed. I tweaked the reader config:

import {
  Writer,
  Reader,
  Connection,
  SchemaRegistry,
  CODEC_SNAPPY,
  SCHEMA_TYPE_JSON,
  SECOND,
} from "k6/x/kafka"

const reader = new Reader({
  brokers: brokers,
  groupID: groupID,
  groupTopics: [topic],
  queueCapacity: 300,
  readBatchTimeout: 20 * SECOND,
  maxBytes: 10000000, // 10 MB
  readLagInterval: 10 * SECOND,
})

And this is the best result I can get:

View results
 ./k6 run --vus 50 --duration 60s test-kafka.js

          /\      |‾‾| /‾‾/   /‾‾/
     /\  /  \     |  |/  /   /  /
    /  \/    \    |     (   /   ‾‾\
   /          \   |  |\  \ |  (‾)  |
  / __________ \  |__| \__\ \_____/ .io

  execution: local
     script: /mnt/c/Users/Mostafa Moradian/Desktop/test-kafka.js
     output: engine

  scenarios: (100.00%) 1 scenario, 50 max VUs, 1m30s max duration (incl. graceful stop):
           * default: 50 looping VUs for 1m0s (gracefulStop: 30s)

ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."
ERRO[0090] Unable to read messages.                      error="Unable to read messages."

     ✓ 10 messages is received
     ✓ Topic equals to xk6_kafka_consumer_group_topic
     ✓ Key contains key/value and is JSON
     ✓ Value contains key/value and is JSON

     █ teardown

     checks.........................: 100.00%  ✓ 1808        ✗ 0
     data_received..................: 0 B      0 B/s
     data_sent......................: 0 B      0 B/s
     iteration_duration.............: avg=134.43ms min=122.3ms med=128.82ms max=998.19ms p(90)=138.76ms p(95)=145.67ms
     iterations.....................: 452      4.971897/s
     kafka_reader_dial_count........: 1        0.011/s
     kafka_reader_dial_seconds......: avg=2.98µs   min=0s      med=0s       max=1.35ms   p(90)=0s       p(95)=0s
   ✓ kafka_reader_error_count.......: 0        0/s
     kafka_reader_fetch_bytes.......: 7.8 MB   86 kB/s
     kafka_reader_fetch_bytes_max...: 10000000 min=10000000  max=10000000
     kafka_reader_fetch_bytes_min...: 1        min=1         max=1
     kafka_reader_fetch_size........: 130200   1432.170354/s
     kafka_reader_fetch_wait_max....: 10s      min=10s       max=10s
     kafka_reader_fetches_count.....: 10       0.109998/s
     kafka_reader_lag...............: 8700     min=0         max=14400
     kafka_reader_message_bytes.....: 4.1 MB   45 kB/s
     kafka_reader_message_count.....: 135901   1494.880056/s
     kafka_reader_offset............: 135900   min=600       max=135900
     kafka_reader_queue_capacity....: 300      min=300       max=300
     kafka_reader_queue_length......: 300      min=300       max=300
     kafka_reader_read_seconds......: avg=125.21ms min=0s      med=0s       max=6.52s    p(90)=0s       p(95)=0s
     kafka_reader_rebalance_count...: 1        0.011/s
     kafka_reader_timeouts_count....: 0        0/s
     kafka_reader_wait_seconds......: avg=17.22µs  min=0s      med=0s       max=1.14ms   p(90)=0s       p(95)=0s
     kafka_writer_acks_required.....: 0        min=0         max=0
     kafka_writer_async.............: 0.00%    ✓ 0           ✗ 501
     kafka_writer_attempts_max......: 0        min=0         max=0
     kafka_writer_batch_bytes.......: 26 kB    287 B/s
     kafka_writer_batch_max.........: 1        min=1         max=1
     kafka_writer_batch_size........: 501      5.510886/s
     kafka_writer_batch_timeout.....: 0s       min=0s        max=0s
   ✓ kafka_writer_error_count.......: 0        0/s
     kafka_writer_message_bytes.....: 7.8 MB   86 kB/s
     kafka_writer_message_count.....: 150300   1653.265778/s
     kafka_writer_read_timeout......: 0s       min=0s        max=0s
     kafka_writer_retries_count.....: 0        0/s
     kafka_writer_wait_seconds......: avg=0s       min=0s      med=0s       max=0s       p(90)=0s       p(95)=0s
     kafka_writer_write_count.......: 150300   1653.265778/s
     kafka_writer_write_seconds.....: avg=34.71µs  min=11.38µs med=21.61µs  max=406.52µs p(90)=45.03µs  p(95)=130.23µs
     kafka_writer_write_timeout.....: 0s       min=0s        max=0s
     vus............................: 0        min=0         max=50
     vus_max........................: 50       min=50        max=50


running (1m30.9s), 00/50 VUs, 452 complete and 49 interrupted iterations
default ✓ [======================================] 50 VUs  1m0s

I'll close this due to inactivity. Feel free to re-open it if you still have the issue.