mostafa/xk6-kafka

Multiple Kafka producers generated for each thread in high-concurrency test

Closed this issue · 3 comments

Description:
I am currently running a test with over 5000 VUs, and I am facing an issue where a new Kafka connection is generated for each thread when initializing Kafka producers. This has led to a large number of open connections and is impacting the performance of my Kafka server.

Problem: Multiple Kafka producers are being created for each VU, causing a high number of open connections.

Attempted Solutions:

  1. Tried initializing Kafka producers outside the default function.
  2. Investigated using the setup() stage, but encountered issues with accepting objects with functions like a Kafka producer.

Is there a way to share the producers between VUs?? Thanks in advance!

Test script:

import { KafkaMessageProducer } from "../kafka/KafkaMessageProducer.js";

export const options = {
  discardResponseBodies: true,
  scenarios: {
    my_first_scenario: {
      executor: "per-vu-iterations",
      vus: 5000,
      iterations: 20,
      maxDuration: "60m",
    },
  },
};

const broker = __ENV.KAFKA_BROKER_URL || "kafka:9092";
const topic = __ENV.KAFKA_TOPIC || "test_topic";

const kafkaProducer = new KafkaMessageProducer([broker], topic);

export default function () {
  const message = createTestMessage();
  const messages = [
    {
      keyData: {},
      valueData: message,
      headers: {},
    },
  ];
  kafkaProducer.produceMessages(messages);
}

export function teardown(data) {
  kafkaProducer.close();
}

KafkaProducer:

import { Writer, SchemaRegistry, CODEC_SNAPPY, SCHEMA_TYPE_JSON } from "k6/x/kafka";

export class KafkaMessageProducer {
	constructor(brokers, topic) {
	this.writer = new Writer({
		brokers: brokers,
		topic: topic,
		autoCreateTopic: true,
		compression: CODEC_SNAPPY,
	});
	this.schemaRegistry = new SchemaRegistry();
	}

	produceMessages(messages) {
	for (let index = 0; index < messages.length; index++) {
		const message = messages[index];
		this.writer.produce({
		messages: [
		{
				key: this.schemaRegistry.serialize({
				data: message.keyData,
				schemaType: SCHEMA_TYPE_JSON,
				}),
				value: this.schemaRegistry.serialize({
						data: message.valueData,
						schemaType: SCHEMA_TYPE_JSON,
				}),
				headers: message.headers,
				offset: index,
				partition: 0,
				time: new Date(),
		},
		],
		});
	}
	}

	close() {
	this.writer.close();
	}
}

Hey @rrubiostuart,

Based on the test lifecycle docs, the init code runs once per VU, which is why 5000 connections are created to connect to Kafka. The setup and teardown functions run only once and I recommend creating the connection in the setup function and closing them in the teardown. What error do you see when you say "encountered issues with accepting objects with functions like a Kafka producer"?

Hi @mostafa, this is the error we faced if we return the kafkaProducer
level=error msg="error marshaling setup() data to JSON: json: unsupported type: func(goja.FunctionCall) goja.Value"

Reading the documentation I see that the setup comes with a restriction "You can pass only data (i.e. JSON) between setup and the other stages. You cannot pass functions."... so how do you propose to create the connection in the setup? Thanks in advance!

@rrubiostuart

I am afraid that's not possible then. And since VUs don't share context (as in, they are independent of each other, AFAIK), this means that creating a reusable connection won't be possible, and you won't be able to share the connection. You can use iterations, though, as all iterations of a VU will share the same connection.

If you want connection pooling (as far as I understood), it should (and possibly can) be handled in the Go code (in the plugin), but I don't have time to invest on this. However, I'd be very happy to see contributions.