Error writing messages: broker appears to be expecting TLS
dnaby opened this issue · 4 comments
import { check } from "k6";
import { Writer, Reader, SchemaRegistry, SCHEMA_TYPE_JSON, TLS_1_3 } from "k6/x/kafka";
const tlsConfig = {
enableTls: true,
insecureSkipTlsVerify: false,
minVersion: TLS_1_3,
clientCertPem: "/Users/mac/go/bin/script_kafka/super-user/user.crt",
clientKeyPem: "/Users/mac/go/bin/script_kafka/super-user/user.key",
serverCaPem: "/Users/mac/go/bin/script_kafka/super-user/ca.crt"
};
const writer = new Writer({
brokers: ["x.x.x.x:9094", "x.x.x.x:9094", "x.x.x.x:9094"],
topic: "xk6-kafka-test-topic",
tlsConfig: tlsConfig,
});
const reader = new Reader({
brokers: ["x.x.x.x:9094", "x.x.x.x:9094", "x.x.x.x:9094"],
topic: "xk6-kafka-test-topic",
tlsConfig: tlsConfig,
});
const schemaRegistry = new SchemaRegistry();
export const options = {
thresholds: {
kafka_writer_error_count: ["count == 0"],
kafka_reader_error_count: ["count == 0"],
},
};
export default function () {
for (let index = 0; index < 100; index++) {
let messages = [
{
// The data type of the key is JSON
key: schemaRegistry.serialize({
data: {
correlationId: "test-id-abc-" + index,
},
schemaType: SCHEMA_TYPE_JSON,
}),
// The data type of the value is JSON
value: schemaRegistry.serialize({
data: {
name: "xk6-kafka",
version: "0.9.0",
author: "Mouhamadou Naby DIA",
description:
"k6 extension to load test Apache Kafka",
index: index,
},
schemaType: SCHEMA_TYPE_JSON,
}),
headers: {
key: "value",
},
offset: index,
partition: 0,
time: new Date(), // Will be converted to timestamp automatically
},
{
key: schemaRegistry.serialize({
data: {
correlationId: "test-id-def-" + index,
},
schemaType: SCHEMA_TYPE_JSON,
}),
value: schemaRegistry.serialize({
data: {
name: "xk6-kafka",
version: "0.9.0",
author: "Mouhamadou Naby DIA",
description:
"k6 extension to load test Apache Kafka",
index: index,
},
schemaType: SCHEMA_TYPE_JSON,
}),
headers: {
key: "value",
},
},
];
writer.produce({ messages: messages });
}
// Read 10 messages only
let messages = reader.consume({ limit: 10 });
check(messages, {
"10 messages are received": (messages) => messages.length == 10,
});
check(messages[0], {
"Topic equals to xk6_kafka_json_topic": (msg) => msg["topic"] == "xk6-kafka-test-topic",
"Key contains key/value and is JSON": (msg) =>
schemaRegistry
.deserialize({ data: msg.key, schemaType: SCHEMA_TYPE_JSON })
.correlationId.startsWith("test-id-"),
"Value contains key/value and is JSON": (msg) =>
typeof schemaRegistry.deserialize({
data: msg.value,
schemaType: SCHEMA_TYPE_JSON,
}) == "object" &&
schemaRegistry.deserialize({
data: msg.value,
schemaType: SCHEMA_TYPE_JSON,
}).name == "xk6-kafka",
"Header equals {'key': 'value'}": (msg) =>
"key" in msg.headers &&
String.fromCharCode(...msg.headers["key"]) == "value",
"Time is past": (msg) => new Date(msg["time"]) < new Date(),
"Partition is zero": (msg) => msg["partition"] == 0,
"Offset is gte zero": (msg) => msg["offset"] >= 0,
"High watermark is gte zero": (msg) => msg["highWaterMark"] >= 0,
});
}
export function teardown(data) {
writer.close();
reader.close();
}
When trying to run the below xk6-kafka script, I got the following error:
❯ ./k6 run script_kafka/topic.js ─╯
/\ |‾‾| /‾‾/ /‾‾/
/\ / \ | |/ / / /
/ \/ \ | ( / ‾‾\
/ \ | |\ \ | (‾) |
/ __________ \ |__| \__\ \_____/ .io
execution: local
script: script_kafka/topic.js
output: -
scenarios: (100.00%) 1 scenario, 1 max VUs, 10m30s max duration (incl. graceful stop):
* default: 1 iterations for each of 1 VUs (maxDuration: 10m0s, gracefulStop: 30s)
ERRO[0003] Error writing messages., OriginalError: unexpected EOF: broker appears to be expecting TLS error="Error writing messages., OriginalError: unexpected EOF: broker appears to be expecting TLS"m03.0s), 1/1 VUs, 0 complete and 0 interrupted iterations
ERRO[0003] GoError: Error writing messages., OriginalError: unexpected EOF: broker appears to be expecting TLS
at github.com/mostafa/xk6-kafka.(*Kafka).writerClass.func1 (native)
at file:///Users/mac/go/bin/script_kafka/topic.js:89:19(104) executor=per-vu-iterations scenario=default source=stacktrace
█ teardown
data_received......................: 0 B 0 B/s
data_sent..........................: 0 B 0 B/s
iteration_duration.................: avg=1.5s min=3.58µs med=1.5s max=3.01s p(90)=2.71s p(95)=2.86s
iterations.........................: 1 0.331717/s
✓ kafka_reader_error_count...........: 0 0/s
kafka_writer_acks_required.........: 0 min=0 max=0
kafka_writer_async.................: 0.00% ✓ 0 ✗ 1
kafka_writer_attempts_max..........: 0 min=0 max=0
kafka_writer_batch_bytes...........: 0 B 0 B/s
kafka_writer_batch_max.............: 1 min=1 max=1
kafka_writer_batch_queue_seconds...: avg=0s min=0s med=0s max=0s p(90)=0s p(95)=0s
kafka_writer_batch_seconds.........: avg=0s min=0s med=0s max=0s p(90)=0s p(95)=0s
kafka_writer_batch_size............: 0 0/s
kafka_writer_batch_timeout.........: 0s min=0s max=0s
✓ kafka_writer_error_count...........: 0 0/s
kafka_writer_message_bytes.........: 0 B 0 B/s
kafka_writer_message_count.........: 0 0/s
kafka_writer_read_timeout..........: 0s min=0s max=0s
kafka_writer_retries_count.........: 0 0/s
kafka_writer_wait_seconds..........: avg=0s min=0s med=0s max=0s p(90)=0s p(95)=0s
kafka_writer_write_count...........: 0 0/s
kafka_writer_write_seconds.........: avg=0s min=0s med=0s max=0s p(90)=0s p(95)=0s
kafka_writer_write_timeout.........: 0s min=0s max=0s
vus................................: 1 min=1 max=1
vus_max............................: 1 min=1 max=1
running (00m03.0s), 0/1 VUs, 1 complete and 0 interrupted iterations
default ✓ [======================================] 1 VUs 00m03.0s/10m0s 1/1 iters, 1 per VU
I setup my Kafka cluster on a kubernetes cluster using strimzi operator and I have enabled mTLS(mutual TLS).
Hey @mostafa,
I am using certificates coming from strimzi. They offer an automated certificate management system.
I have tried all these but it's not working. Also the cluster on which I installed kafka is on premise. And lately there has been added a security that block remote ssh connection to the nodes(if the access has not been granted). So I am waiting to get the authorization in order to see if the problem is not coming from there.
@dnaby Can you connect the logger to both instances of Writer (in WriterConfig.connectLogger) and Reader (in ReaderConfig.connectLogger) to see what's happening behind the scenes?
❯ ./k6 run script_kafka/kafka.js ─╯
/\ |‾‾| /‾‾/ /‾‾/
/\ / \ | |/ / / /
/ \/ \ | ( / ‾‾\
/ \ | |\ \ | (‾) |
/ __________ \ |__| \__\ \_____/ .io
execution: local
script: script_kafka/kafka.js
output: -
scenarios: (100.00%) 1 scenario, 1 max VUs, 10m30s max duration (incl. graceful stop):
* default: 1 iterations for each of 1 VUs (maxDuration: 10m0s, gracefulStop: 30s)
INFO[0000] {"This":{"addr":[{},{},{}],"topic":"xk6_kafka_json_topic","balancer":{},"max_attempts":0,"write_backoff_min":0,"write_backoff_max":0,"batch_size":1,"batch_bytes":0,"batch_timeout":0,"read_timeout":0,"write_timeout":0,"required_acks":0,"async":false,"compression":0,"logger":{"out":{},"hooks":{},"formatter":{"force_colors":false,"disable_colors":false,"force_quote":false,"disable_quote":false,"environment_override_colors":false,"disable_timestamp":false,"full_timestamp":false,"timestamp_format":"","disable_sorting":false,"disable_level_truncation":false,"pad_level_text":false,"quote_empty_fields":false,"field_map":{}},"report_caller":false,"level":4,"buffer_pool":null},"error_logger":null,"transport":{"dial_timeout":0,"idle_timeout":0,"metadata_ttl":0,"metadata_topics":[],"client_id":"","tls":null,"s_a_s_l":null,"resolver":null,"context":null},"allow_auto_topic_creation":false}} source=console
INFO[0000] {"This":{}} source=console
ERRO[0003] Error writing messages., OriginalError: unexpected EOF: broker appears to be expecting TLS error="Error writing messages., OriginalError: unexpected EOF: broker appears to be expecting TLS"m03.0s), 1/1 VUs, 0 complete and 0 interrupted iterations
ERRO[0003] GoError: Error writing messages., OriginalError: unexpected EOF: broker appears to be expecting TLS
at github.com/mostafa/xk6-kafka.(*Kafka).writerClass.func1 (native)
at file:///Users/mac/go/bin/script_kafka/kafka.js:92:19(114) executor=per-vu-iterations scenario=default source=stacktrace
█ teardown
data_received......................: 0 B 0 B/s
data_sent..........................: 0 B 0 B/s
iteration_duration.................: avg=1.5s min=15.2µs med=1.5s max=3.01s p(90)=2.71s p(95)=2.86s
iterations.........................: 1 0.33155/s
✓ kafka_reader_error_count...........: 0 0/s
kafka_writer_acks_required.........: 0 min=0 max=0
kafka_writer_async.................: 0.00% ✓ 0 ✗ 1
kafka_writer_attempts_max..........: 10 min=10 max=10
kafka_writer_batch_bytes...........: 0 B 0 B/s
kafka_writer_batch_max.............: 1 min=1 max=1
kafka_writer_batch_queue_seconds...: avg=0s min=0s med=0s max=0s p(90)=0s p(95)=0s
kafka_writer_batch_seconds.........: avg=0s min=0s med=0s max=0s p(90)=0s p(95)=0s
kafka_writer_batch_size............: 0 0/s
kafka_writer_batch_timeout.........: 277h46m40s min=277h46m40s max=277h46m40s
✓ kafka_writer_error_count...........: 0 0/s
kafka_writer_message_bytes.........: 0 B 0 B/s
kafka_writer_message_count.........: 0 0/s
kafka_writer_read_timeout..........: 2777h46m40s min=2777h46m40s max=2777h46m40s
kafka_writer_retries_count.........: 0 0/s
kafka_writer_wait_seconds..........: avg=0s min=0s med=0s max=0s p(90)=0s p(95)=0s
kafka_writer_write_count...........: 0 0/s
kafka_writer_write_seconds.........: avg=0s min=0s med=0s max=0s p(90)=0s p(95)=0s
kafka_writer_write_timeout.........: 2777h46m40s min=2777h46m40s max=2777h46m40s
vus................................: 1 min=1 max=1
vus_max............................: 1 min=1 max=1
running (00m03.0s), 0/1 VUs, 1 complete and 0 interrupted iterations
default ✓ [======================================] 1 VUs 00m03.0s/10m0s 1/1 iters, 1 per VU
Here is the output when I set the connectLogger
to true
and I log the writer and reader.