mostafa/xk6-kafka

How to come up the limit for reader.consume method?

DurgaManasaMaruboyina opened this issue · 9 comments

Hello team,

We are using xk6-kafka to run the performance tests on services which communicate with each other using kafka.

We have 2 services for which we wanted to measure the performance.

Eg:

The flow is like:

  • Produce a message to Topic1
  • Service A consumes message from Topic1 and produces 2 messages to Topic2 (This Service A does some replications that's why it's multiplying the message by 2)
  • Service B consumes message from Topic2 and produces message to Topic3

In this flow we would like to get the performance of the entire flow where starting from Service A to the end of processing of Service B.

For that we are trying to check if we are able to consume the correct no.of messages based on the no.of messages that we are producing.We tried this example https://github.com/mostafa/xk6-kafka/blob/main/scripts/test_json.js and trying to get the limit value that we have to pass to reader.consume method https://github.com/mostafa/xk6-kafka/blob/main/scripts/test_json.js#L119.

For example we tried with default VUs which is 1 user and produced 100 messages and then we are able to get the 200 messages back from service B. And we were able to check that in the mentioned script.

We would like to ensure that we consume the messages which we only produced.

Questions:

  1. Is there any way to come up with the correct number for limit for reader.consume method when we run with x no.of VUs?
  2. Can we rely on iteration_duration and say that the mentioned flow took that much time?

The problem is when we tried with more VUs then we are unable to come up with the limit on the reader.consume.

writer.produce({messages: messagesToProduce}); // produces 100 messages
let limit = messagesToProduce.length * 2
try {
    let consumedMessages = reader.consume({limit: limit});
    console.log("consumed count: " + consumedMessages.length)
    check(consumedMessages, {
        "200 messages are received": (messages) => messages.length === limit,
    });
    check(consumedMessages[0], {
        "Topic equals to test-topic": (msg) => msg["topic"] === "test-topic",
    });
} catch (error) {
    console.error("<<=================== ERROR: " + error);
}

Hey @DurgaManasaMaruboyina,

We would like to ensure that we consume the messages which we only produced.

This is not possible with the current consumer implementation, as the reader.consume's behavior is not deterministic.

Q1:

No. As I mentioned above, the behavior is non-deterministic (especially when mixed with the VU/iterations).

Q2:

Since you cannot set an exact limit on the number of messages to read, the iteration_duration might be skewed. However, you can use Redis (using xk6-redis) or something similar to record timestamps and then read and compare them afterward.

Hello @mostafa,

Thank you so much for your quick response.

How can we just use this script to produce the load on our services like just produce the messages to Topic1? And we might follow some other approach to get the metrics.

We tried just to use writer to produce the messages with --vus 5 --duration 1m options and didn't mention any consumer then the script started giving error like
ERRO[0038] thresholds on metrics 'kafka_writer_error_count' have been crossed

Can use xk6-kafka script to produce the messages only with --vu and --duration options?

@DurgaManasaMaruboyina

That is definitely possible. In the tests, all the scripts are executed against a running instance of Kafka and Schema Registry with a duration of 2s. If you're hitting thresholds on errors, then probably your script is not working in the first place. Also, there is an example in the README which I run usually after each release (that contains a feature change) and it runs with 50 VUs for 1m: k6 run --vus 50 --duration 60s scripts/test_json.js.

Here is the script which we are trying to run without using reader and just using writer to produce messages. It is giving this error ERRO[0038] thresholds on metrics 'kafka_writer_error_count' have been crossed And could you let us know what are we missing here?

import {
    CODEC_SNAPPY,
    Connection,
    SCHEMA_TYPE_JSON,
    SchemaRegistry,
    Writer,
    SCHEMA_TYPE_STRING
} from "k6/x/kafka";
import {SharedArray} from 'k6/data'

const data = new SharedArray('users', function () {
    return JSON.parse(open('../resources/test_message.json')); // f must be an array[]
});

const inputMessage = data[0]
const brokers = ["localhost:9092"];
const inputTopic = "input-topic";

const writer = new Writer({
    brokers: brokers,
    topic: inputTopic,
    autoCreateTopic: true,
    compression: CODEC_SNAPPY,
});
const connection = new Connection({
    address: brokers[0],
});
const schemaRegistry = new SchemaRegistry();

if (__VU == 0) {
    connection.createTopic({
        topic: inputTopic,
        configEntries: [
            {
                configName: "compression.type",
                configValue: CODEC_SNAPPY,
            },
        ],
    });
}
export default function () {
    const messagesToProduce = [];
    for (let index = 0; index < 100; index++) {
        messagesToProduce.push(
            {
                key: schemaRegistry.serialize({
                    data: "test-id",
                    schemaType: SCHEMA_TYPE_STRING,
                }),
                value: schemaRegistry.serialize({
                    data: inputMessage,
                    schemaType: SCHEMA_TYPE_JSON,
                }),
                time: new Date(),
            });
    }
    writer.produce({messages: messagesToProduce});
}

export function teardown(data) {
    if (__VU == 0) {
        connection.deleteTopic(inputTopic);
    }
    writer.close();
    connection.close();
}

export const options = {
    thresholds: {
        // Base thresholds to see if the writer or reader is working
        kafka_writer_error_count: ["count == 0"],
        kafka_reader_error_count: ["count == 0"],
    },
    // discardResponseBodies: true,
    // scenarios: {
    //     contacts: {
    //         executor: 'ramping-vus',
    //         startVUs: 0,
    //         stages: [
    //             {duration: '20s', target: 5},
    //             // {duration: '10s', target: 5},
    //             {duration: '30s', target: 0},
    //         ],
    //         gracefulRampDown: '0s',
    //     },
    // },
};

@DurgaManasaMaruboyina

The only thing I can recommend is to avoid using global variables (__VU == 0, as discouraged by k6 docs) to create the topic and use the execution context variables. Also, I've observed this issue before that the topic might be created after initial messages are produced and sent, which causes errors on Kafka, as you're trying to send to a non-existent topic. This might be causing issues for you. Try to use the connectLogger: true in Writer config to see what is going wrong in your script that causes thresholds to be hit.

Hi @mostafa

  1. What is the equivalent parameter to __VU? Is it ${exec.vu.idInInstance}
  2. We tried to set the connectLogger: true and we are able to find the cause and fixed it

Our use case is we write to a topic and consume from another topic. In between these topics there are multiple micro services involved in. We are trying to get the latency and throughput for the entire flow which is starting from service1 and ending at service 2 producing the message to the end topic.

We were going through one of your blogs on load testing the applications which uses kafka. And we would like to know that the metrics that k6 gives, are they metric of the micro service which consumes from a topic and produces to another topic?

Referring this blog

Can we rely on those metrics and say that's the performance of the services which are involved in between multiple topics?

Got to ask this question by looking at a statement in point no. 8 which states Note that the intention is not to test your MSK cluster, rather your microservices that are connected to your MSK cluster.

Could you let us know your thoughts here?

Hey @DurgaManasaMaruboyina,

  1. That you can ask on https://community.grafana.com if you haven't found the answer on the docs yet.
  2. Glad to hear!

We were going through one of your blogs on load testing the applications which uses kafka. And we would like to know that the metrics that k6 gives, are they metric of the micro service which consumes from a topic and produces to another topic?

As much as k6 and xk6-kafka can simulate load, end to end testing is not what this extension is necessarily designed for. You must have o11y added to your platform as well to observe everything in between while testing. And the metrics are merely message production and consumption time and similar metrics exposed by kafka-go library, which is subsequently collected by xk6-kafka.

Can we rely on those metrics and say that's the performance of the services which are involved in between multiple topics?

Based on my answer above, you can rely on these metrics and your custom metrics to ensure your system is behaving as expected under load.

Could you let us know your thoughts here?

Kafka itself is performant enough, but this doesn't necessarily mean that your code (consumers and producers) are as performant, hence my sentence.

Thank you @mostafa for the clarification.

@DurgaManasaMaruboyina I hope your question is answered and your issue is resolved. I'll close this ticket. Feel free to reopen it if the issue still exists.