mostafa/xk6-kafka

Error writing messages: broker appears to be expecting TLS

dnaby opened this issue · 4 comments

dnaby commented
import { check } from "k6";
import { Writer, Reader, SchemaRegistry, SCHEMA_TYPE_JSON, TLS_1_3 } from "k6/x/kafka";

const tlsConfig = {
  enableTls: true,
  insecureSkipTlsVerify: false,
  minVersion: TLS_1_3,

  clientCertPem: "/Users/mac/go/bin/script_kafka/super-user/user.crt",
  clientKeyPem: "/Users/mac/go/bin/script_kafka/super-user/user.key",
  serverCaPem: "/Users/mac/go/bin/script_kafka/super-user/ca.crt"
};

const writer = new Writer({
  brokers: ["x.x.x.x:9094", "x.x.x.x:9094", "x.x.x.x:9094"],
  topic: "xk6-kafka-test-topic",
  tlsConfig: tlsConfig,
});

const reader = new Reader({
  brokers: ["x.x.x.x:9094", "x.x.x.x:9094", "x.x.x.x:9094"],
  topic: "xk6-kafka-test-topic",
  tlsConfig: tlsConfig,
});

const schemaRegistry = new SchemaRegistry();

export const options = {
  thresholds: {
    kafka_writer_error_count: ["count == 0"],
    kafka_reader_error_count: ["count == 0"],
  },
};

export default function () {
  for (let index = 0; index < 100; index++) {
    let messages = [
      {
        // The data type of the key is JSON
        key: schemaRegistry.serialize({
          data: {
            correlationId: "test-id-abc-" + index,
          },
          schemaType: SCHEMA_TYPE_JSON,
        }),
        // The data type of the value is JSON
        value: schemaRegistry.serialize({
          data: {
            name: "xk6-kafka",
            version: "0.9.0",
            author: "Mouhamadou Naby DIA",
            description:
              "k6 extension to load test Apache Kafka",
            index: index,
          },
          schemaType: SCHEMA_TYPE_JSON,
        }),
        headers: {
          key: "value",
        },
        offset: index,
        partition: 0,
        time: new Date(), // Will be converted to timestamp automatically
      },
      {
        key: schemaRegistry.serialize({
          data: {
            correlationId: "test-id-def-" + index,
          },
          schemaType: SCHEMA_TYPE_JSON,
        }),
        value: schemaRegistry.serialize({
          data: {
            name: "xk6-kafka",
            version: "0.9.0",
            author: "Mouhamadou Naby DIA",
            description:
              "k6 extension to load test Apache Kafka",
            index: index,
          },
          schemaType: SCHEMA_TYPE_JSON,
        }),
        headers: {
          key: "value",
        },
      },
    ];

    writer.produce({ messages: messages });
  }

  // Read 10 messages only
  let messages = reader.consume({ limit: 10 });

  check(messages, {
    "10 messages are received": (messages) => messages.length == 10,
  });

  check(messages[0], {
    "Topic equals to xk6_kafka_json_topic": (msg) => msg["topic"] == "xk6-kafka-test-topic",
    "Key contains key/value and is JSON": (msg) =>
      schemaRegistry
        .deserialize({ data: msg.key, schemaType: SCHEMA_TYPE_JSON })
        .correlationId.startsWith("test-id-"),
    "Value contains key/value and is JSON": (msg) =>
      typeof schemaRegistry.deserialize({
        data: msg.value,
        schemaType: SCHEMA_TYPE_JSON,
      }) == "object" &&
      schemaRegistry.deserialize({
        data: msg.value,
        schemaType: SCHEMA_TYPE_JSON,
      }).name == "xk6-kafka",
    "Header equals {'key': 'value'}": (msg) =>
      "key" in msg.headers &&
      String.fromCharCode(...msg.headers["key"]) == "value",
    "Time is past": (msg) => new Date(msg["time"]) < new Date(),
    "Partition is zero": (msg) => msg["partition"] == 0,
    "Offset is gte zero": (msg) => msg["offset"] >= 0,
    "High watermark is gte zero": (msg) => msg["highWaterMark"] >= 0,
  });
}

export function teardown(data) {
  writer.close();
  reader.close();
}

When trying to run the below xk6-kafka script, I got the following error:

❯ ./k6 run script_kafka/topic.js                                                                                                                                                           ─╯

          /\      |‾‾| /‾‾/   /‾‾/   
     /\  /  \     |  |/  /   /  /    
    /  \/    \    |     (   /   ‾‾\  
   /          \   |  |\  \ |  (‾)  | 
  / __________ \  |__| \__\ \_____/ .io

  execution: local
     script: script_kafka/topic.js
     output: -

  scenarios: (100.00%) 1 scenario, 1 max VUs, 10m30s max duration (incl. graceful stop):
           * default: 1 iterations for each of 1 VUs (maxDuration: 10m0s, gracefulStop: 30s)

ERRO[0003] Error writing messages., OriginalError: unexpected EOF: broker appears to be expecting TLS  error="Error writing messages., OriginalError: unexpected EOF: broker appears to be expecting TLS"m03.0s), 1/1 VUs, 0 complete and 0 interrupted iterations
ERRO[0003] GoError: Error writing messages., OriginalError: unexpected EOF: broker appears to be expecting TLS
        at github.com/mostafa/xk6-kafka.(*Kafka).writerClass.func1 (native)
        at file:///Users/mac/go/bin/script_kafka/topic.js:89:19(104)  executor=per-vu-iterations scenario=default source=stacktrace

     █ teardown

     data_received......................: 0 B   0 B/s
     data_sent..........................: 0 B   0 B/s
     iteration_duration.................: avg=1.5s min=3.58µs med=1.5s max=3.01s p(90)=2.71s p(95)=2.86s
     iterations.........................: 1     0.331717/s
   ✓ kafka_reader_error_count...........: 0     0/s
     kafka_writer_acks_required.........: 0     min=0      max=0 
     kafka_writer_async.................: 0.00% ✓ 0        ✗ 1   
     kafka_writer_attempts_max..........: 0     min=0      max=0 
     kafka_writer_batch_bytes...........: 0 B   0 B/s
     kafka_writer_batch_max.............: 1     min=1      max=1 
     kafka_writer_batch_queue_seconds...: avg=0s   min=0s     med=0s   max=0s    p(90)=0s    p(95)=0s   
     kafka_writer_batch_seconds.........: avg=0s   min=0s     med=0s   max=0s    p(90)=0s    p(95)=0s   
     kafka_writer_batch_size............: 0     0/s
     kafka_writer_batch_timeout.........: 0s    min=0s     max=0s
   ✓ kafka_writer_error_count...........: 0     0/s
     kafka_writer_message_bytes.........: 0 B   0 B/s
     kafka_writer_message_count.........: 0     0/s
     kafka_writer_read_timeout..........: 0s    min=0s     max=0s
     kafka_writer_retries_count.........: 0     0/s
     kafka_writer_wait_seconds..........: avg=0s   min=0s     med=0s   max=0s    p(90)=0s    p(95)=0s   
     kafka_writer_write_count...........: 0     0/s
     kafka_writer_write_seconds.........: avg=0s   min=0s     med=0s   max=0s    p(90)=0s    p(95)=0s   
     kafka_writer_write_timeout.........: 0s    min=0s     max=0s
     vus................................: 1     min=1      max=1 
     vus_max............................: 1     min=1      max=1 


running (00m03.0s), 0/1 VUs, 1 complete and 0 interrupted iterations
default ✓ [======================================] 1 VUs  00m03.0s/10m0s  1/1 iters, 1 per VU

I setup my Kafka cluster on a kubernetes cluster using strimzi operator and I have enabled mTLS(mutual TLS).

Hey @dnaby,

Are your certificates self-signed? Have you tried setting insecureSkipTlsVerify to true to see if it works? Or even downgrading the TLS version?

Update:
I released a new version (v0.22.0) just now. Try that as well to see if it is fixed.

dnaby commented

Hey @mostafa,

I am using certificates coming from strimzi. They offer an automated certificate management system.

I have tried all these but it's not working. Also the cluster on which I installed kafka is on premise. And lately there has been added a security that block remote ssh connection to the nodes(if the access has not been granted). So I am waiting to get the authorization in order to see if the problem is not coming from there.

@dnaby Can you connect the logger to both instances of Writer (in WriterConfig.connectLogger) and Reader (in ReaderConfig.connectLogger) to see what's happening behind the scenes?

dnaby commented
❯ ./k6 run script_kafka/kafka.js                                                                                                                                                           ─╯

          /\      |‾‾| /‾‾/   /‾‾/   
     /\  /  \     |  |/  /   /  /    
    /  \/    \    |     (   /   ‾‾\  
   /          \   |  |\  \ |  (‾)  | 
  / __________ \  |__| \__\ \_____/ .io

  execution: local
     script: script_kafka/kafka.js
     output: -

  scenarios: (100.00%) 1 scenario, 1 max VUs, 10m30s max duration (incl. graceful stop):
           * default: 1 iterations for each of 1 VUs (maxDuration: 10m0s, gracefulStop: 30s)

INFO[0000] {"This":{"addr":[{},{},{}],"topic":"xk6_kafka_json_topic","balancer":{},"max_attempts":0,"write_backoff_min":0,"write_backoff_max":0,"batch_size":1,"batch_bytes":0,"batch_timeout":0,"read_timeout":0,"write_timeout":0,"required_acks":0,"async":false,"compression":0,"logger":{"out":{},"hooks":{},"formatter":{"force_colors":false,"disable_colors":false,"force_quote":false,"disable_quote":false,"environment_override_colors":false,"disable_timestamp":false,"full_timestamp":false,"timestamp_format":"","disable_sorting":false,"disable_level_truncation":false,"pad_level_text":false,"quote_empty_fields":false,"field_map":{}},"report_caller":false,"level":4,"buffer_pool":null},"error_logger":null,"transport":{"dial_timeout":0,"idle_timeout":0,"metadata_ttl":0,"metadata_topics":[],"client_id":"","tls":null,"s_a_s_l":null,"resolver":null,"context":null},"allow_auto_topic_creation":false}}  source=console
INFO[0000] {"This":{}}                                   source=console
ERRO[0003] Error writing messages., OriginalError: unexpected EOF: broker appears to be expecting TLS  error="Error writing messages., OriginalError: unexpected EOF: broker appears to be expecting TLS"m03.0s), 1/1 VUs, 0 complete and 0 interrupted iterations
ERRO[0003] GoError: Error writing messages., OriginalError: unexpected EOF: broker appears to be expecting TLS
        at github.com/mostafa/xk6-kafka.(*Kafka).writerClass.func1 (native)
        at file:///Users/mac/go/bin/script_kafka/kafka.js:92:19(114)  executor=per-vu-iterations scenario=default source=stacktrace

     █ teardown

     data_received......................: 0 B         0 B/s
     data_sent..........................: 0 B         0 B/s
     iteration_duration.................: avg=1.5s min=15.2µs med=1.5s max=3.01s p(90)=2.71s p(95)=2.86s
     iterations.........................: 1           0.33155/s
   ✓ kafka_reader_error_count...........: 0           0/s
     kafka_writer_acks_required.........: 0           min=0           max=0          
     kafka_writer_async.................: 0.00%       ✓ 0             ✗ 1            
     kafka_writer_attempts_max..........: 10          min=10          max=10         
     kafka_writer_batch_bytes...........: 0 B         0 B/s
     kafka_writer_batch_max.............: 1           min=1           max=1          
     kafka_writer_batch_queue_seconds...: avg=0s   min=0s     med=0s   max=0s    p(90)=0s    p(95)=0s   
     kafka_writer_batch_seconds.........: avg=0s   min=0s     med=0s   max=0s    p(90)=0s    p(95)=0s   
     kafka_writer_batch_size............: 0           0/s
     kafka_writer_batch_timeout.........: 277h46m40s  min=277h46m40s  max=277h46m40s 
   ✓ kafka_writer_error_count...........: 0           0/s
     kafka_writer_message_bytes.........: 0 B         0 B/s
     kafka_writer_message_count.........: 0           0/s
     kafka_writer_read_timeout..........: 2777h46m40s min=2777h46m40s max=2777h46m40s
     kafka_writer_retries_count.........: 0           0/s
     kafka_writer_wait_seconds..........: avg=0s   min=0s     med=0s   max=0s    p(90)=0s    p(95)=0s   
     kafka_writer_write_count...........: 0           0/s
     kafka_writer_write_seconds.........: avg=0s   min=0s     med=0s   max=0s    p(90)=0s    p(95)=0s   
     kafka_writer_write_timeout.........: 2777h46m40s min=2777h46m40s max=2777h46m40s
     vus................................: 1           min=1           max=1          
     vus_max............................: 1           min=1           max=1          


running (00m03.0s), 0/1 VUs, 1 complete and 0 interrupted iterations
default ✓ [======================================] 1 VUs  00m03.0s/10m0s  1/1 iters, 1 per VU

Here is the output when I set the connectLogger to true and I log the writer and reader.