mostafa/xk6-kafka

Schema Registry serialized message causes SchemaRegistryException on AWS

Closed this issue · 1 comments

We're currently using AWS Glue Schema Registry for storing our schemas. However, due to the limitations of the current library, we were unable to connect directly to Glue. As a workaround, I hard-coded the schema directly into the code. This allowed me to successfully serialize the messages and send them to Kafka. Unfortunately, the next day, the DevOps team notified us of errors related to the serialization of these messages.

KafkaMessageDTO class where we are generating messages and transform them to AVRO

import RandomUtils from "../../_utils/RandomUtils.js";
import {SchemaRegistry, SCHEMA_TYPE_AVRO, SCHEMA_TYPE_STRING} from "k6/x/kafka"

export class KafkaMessageDTO{
    constructor() {
        //todo: configure SchemaRegistry to connect to AWS Glue when it will be possible
        this.schemaRegistry = new SchemaRegistry();
        this.geoDataAVROSchema =
            {
                "namespace": "com.namespace.example",
                "type": "record",
                "name": "Data",
                "fields": [
                    {
                        "name": "id",
                        "type": "string"
                    },
                    {
                        "name": "type",
                        "type": {
                            "type": "enum",
                            "name": "Type",
                            "symbols": [
                                "123",
                                "321"
                            ]
                        }
                    },
                    {
                        "name": "source",
                        "type": {
                            "type": "enum",
                            "name": "Source",
                            "symbols": [
                                "321",
                                "123"
                            ]
                        }
                    },
                  {...other fields}
                ]
            }
    }


    createGeoDataMessage(id) {
        const random = new RandomUtils();
        const eventTypes = ["321", "123"];
        const eventSource = "123"; // Fixed event source

        let generatedData = {
            id: id,
            type: eventTypes[random.getRandomInt(0, eventTypes.length - 1)],
            source: eventSource,
            {...other fields}
        };

        console.log("JSON message to send to Kafka: " + JSON.stringify(generatedData))

        try {
            let serializedMessage = [
                {
                    key: this.schemaRegistry.serialize({
                        data: vehicleId,
                        schemaType: SCHEMA_TYPE_STRING,
                    }),
                    value: this.schemaRegistry.serialize({
                        data: generatedData,
                        schema: {schema: JSON.stringify(this.geoDataAVROSchema)},
                        schemaType: SCHEMA_TYPE_AVRO,
                    }),
                }
            ];
            console.log("Serialized to AVRO message to send to Kafka: " + JSON.stringify(serializedMessage))
            return serializedMessage;

        } catch (error) {
            console.error("Serialization error:", error);
            throw error;
        }
    }
}

export default GeoDataMessage;

Test.js class where we are sending messages

const kafka = new KafkaClient("data.vtg.normalized.gps-event");
let msg = new GeoDataMessage().createGeoDataMessage(vehicleId);
kafka.produceMessage(msg)

KafkaClient class with connections

import {Connection, Reader, SASL_AWS_IAM, TLS_1_2, Writer} from "k6/x/kafka"

export class KafkaClient {

    constructor(topic) {
        const brokers = [
            "b-1.....c7.kafka.us-east-1.amazonaws.com:9098",
            "b-3.....c7.kafka.us-east-1.amazonaws.com:9098",
            "b-2.....c7.kafka.us-east-1.amazonaws.com:9098"
        ]
        const saslConfig = {
            algorithm: SASL_AWS_IAM
        }

        const tlsConfig = {
            enableTls: true,
            insecureSkipTlsVerify: true,
            minVersion: TLS_1_2,
        }

        const offset = 0
        const partition = 0
        const numPartitions = 1
        const replicationFactor = 1

        this.writer = new Writer({
            brokers: brokers,
            topic: topic,
            sasl: saslConfig,
            tls: tlsConfig
        })
        this.reader = new Reader({
            brokers: brokers,
            topic: topic,
            partition: partition,
            offset: offset,
            sasl: saslConfig,
            tls: tlsConfig
        })
        this.connection = new Connection({
            address: brokers[0],
            sasl: saslConfig,
            tls: tlsConfig,
        })
    }

    produceMessage(message) {

        try {
            this.writer.produce({messages: message});
        } catch (error) {
            console.error("Producing error:", error);
            throw error;
        }
    }
}

export default KafkaClient;

Errors we are recieving:

Unable to deserialize message using <aws_schema_registry.serde.KafkaDeserializer object at 0x7f5e0a383710>: b’\x00\x00\x00\x00\x00\x12937397190\x00\x00\x00(2024-08-21T15:50:42Z\x02(2024-08-21T15:50:42Z(2024-08-21T15:50:42Z\x02\x0e1165551\x10M7738371
20603\x9a\x99sB\x02\x00\x00\x00\x00\x02\x88\xf7
={\xd3\x99\xa0\xc8U\xc0b\x97\xf2k\xb5

_\xc0\x02\x02\x01\x02\x00\x02\x01\x02\x00\x02\x00'

or

aws_schema_registry.exception.SchemaRegistryException: no secondary deserializer provided to handle unrecognized data encoding

Do you have and idea WHY it is going on?

@horuktaras I explained it in detail here.