Send messages to a kafka topic at a constant rate
ramananaa opened this issue · 5 comments
Relatively new to k6 kafka, I created a script to produce messages to a kafka topic with sasl authentication. I'm able to successfully send single message but now want to use the script to send requests at a constant rate (~500/second). How do I define this scenario? I was looking at k6 documentations and tried using "constant-arrival-rate" scenario but that doesn't seem to be working.
How can I setup a scenario where I can either send messages to kafka at a constant rate (500/s for 3 mins) or send a fixed number of messages (1 million messages)?
Hey,
The execution scenarios doesn't work with xk6-kafka. So, you should consider testing how your system can handle this amount of message using the number of VUs, iterations and for-loops.
Hi @mostafa ,
Can you please elaborate a little on how I can incorporate VUs and iterations? Should I surround writer.produce({ messages: messages });
in a for loop to achieve the fixed amount of messages?
my test script:
export const options = {
scenarios: {
produceScenario: {
executor: 'constant-arrival-rate',
exec: 'produce',
rate: 1,
timeUnit: '1s',
preAllocatedVUs: 2,
duration: '10s',
},
},
thresholds: {
'ResponseTime_01_Produce': ['p(95)<100'],
'ErrorRate_01_Produce': ['rate<=0.1'],
},
};
const brokers = ["localhost:9092"];
const writeTopic = "my.test.topic";
const tlsConfig = {
enableTls: true,
insecureSkipTlsVerify: false,
minVersion: TLS_1_2,
clientCertPem: "client.pem",
clientKeyPem: "client-key.pem",
serverCaPem: "ca.pem",
};
const writer = new Writer({ brokers: brokers, topic: writeTopic, tls: tlsConfig });
const schemaRegistry = new SchemaRegistry();
export function produce() {
let startDate, endDate;
let messages = [
{
// The data type of the value is JSON
value: schemaRegistry.serialize({
data: {
id: "1f0288a9-21a6-4ad6-b0a8-1514eb1a5c82",
timestamp: new Date().toISOString(),
body: "test body",
metadata: { }
},
schemaType: SCHEMA_TYPE_JSON,
})
}];
try{
startDate = new Date();
writer.produce({ messages: messages });
endDate = new Date();
produceResponseTime.add(endDate-startDate);
produceCounter.add(1);
} catch(error) {
produceErrorRate.add(1);
}
}
export function teardown(data) {
writer.close();
}
}
As stated here, using VUs and iterations on the commandline would translate to shared-iterations executor, which is what I originally tested with this extension and documented in the README, which you can also see from the k6 output:
scenarios: (100.00%) 1 scenario, 50 max VUs, 1m30s max duration (incl. graceful stop):
* default: 50 looping VUs for 1m0s (gracefulStop: 30s)
So, you should test to find the correct combination of parameters that yields your result, including using VUs, duration, for-loops/iteration in your code.
@mostafa
https://github.com/mostafa/xk6-kafka/blob/main/scripts/test_sasl_auth.js#L20 - the only example, can I have more examples of what options are supported?
@zibul444 These are all the options supported by k6: https://k6.io/docs/using-k6/k6-options/