mostafa/xk6-kafka

Send messages to a kafka topic at a constant rate

ramananaa opened this issue · 5 comments

Relatively new to k6 kafka, I created a script to produce messages to a kafka topic with sasl authentication. I'm able to successfully send single message but now want to use the script to send requests at a constant rate (~500/second). How do I define this scenario? I was looking at k6 documentations and tried using "constant-arrival-rate" scenario but that doesn't seem to be working.

How can I setup a scenario where I can either send messages to kafka at a constant rate (500/s for 3 mins) or send a fixed number of messages (1 million messages)?

Hey,

The execution scenarios doesn't work with xk6-kafka. So, you should consider testing how your system can handle this amount of message using the number of VUs, iterations and for-loops.

Hi @mostafa ,
Can you please elaborate a little on how I can incorporate VUs and iterations? Should I surround writer.produce({ messages: messages }); in a for loop to achieve the fixed amount of messages?

my test script:

export const options = {

   scenarios: {
        produceScenario: {
          executor: 'constant-arrival-rate',
          exec: 'produce',
          rate: 1,
          timeUnit: '1s',
          preAllocatedVUs: 2,
          duration: '10s',
        },
    },

    thresholds: {
        'ResponseTime_01_Produce': ['p(95)<100'],
        'ErrorRate_01_Produce': ['rate<=0.1'],
    },
};

const brokers = ["localhost:9092"];
const writeTopic = "my.test.topic";

const tlsConfig = {
  enableTls: true,
  insecureSkipTlsVerify: false,
  minVersion: TLS_1_2,

  clientCertPem: "client.pem",
  clientKeyPem: "client-key.pem",
  serverCaPem: "ca.pem",
};

const writer = new Writer({ brokers: brokers, topic: writeTopic, tls: tlsConfig });
const schemaRegistry = new SchemaRegistry();

export function produce() {
   
    let startDate, endDate;
    
    let messages = [
        {
            // The data type of the value is JSON
            value: schemaRegistry.serialize({
                data: {
                    id: "1f0288a9-21a6-4ad6-b0a8-1514eb1a5c82",
                    timestamp: new Date().toISOString(),
                    body: "test body",
                    metadata: { }
                },
                schemaType: SCHEMA_TYPE_JSON,
            })
        }];
 
    try{
        
        startDate = new Date();
        writer.produce({ messages: messages });
        endDate = new Date();
        produceResponseTime.add(endDate-startDate);
        produceCounter.add(1);

    } catch(error) {
        produceErrorRate.add(1);
    }
  
}

export function teardown(data) {
    writer.close();
}
}

As stated here, using VUs and iterations on the commandline would translate to shared-iterations executor, which is what I originally tested with this extension and documented in the README, which you can also see from the k6 output:

scenarios: (100.00%) 1 scenario, 50 max VUs, 1m30s max duration (incl. graceful stop):
        * default: 50 looping VUs for 1m0s (gracefulStop: 30s)

So, you should test to find the correct combination of parameters that yields your result, including using VUs, duration, for-loops/iteration in your code.

@mostafa
https://github.com/mostafa/xk6-kafka/blob/main/scripts/test_sasl_auth.js#L20 - the only example, can I have more examples of what options are supported?

@zibul444 These are all the options supported by k6: https://k6.io/docs/using-k6/k6-options/