mostafa/xk6-kafka

Query : Avro with and without Schema Registry

Closed this issue · 9 comments

Hi @mostafa , i have one query

You have provided 2 examples in script section for avro messages

No Schema registry and with Schema Registry .

I have one query , suppose if i have one schema registry , so is it possible to publish the avro message on topic even without connecting to schema registry ( by explicitly defining the schema in script and data ) [ As what you shown in no schema registry example ] Or does that work only in case schema registry itself is not there and we have one schema format to follow ?

What if i have schema registry with schema defined there , which example i can use for my script because in the example "with schema registry " you are creating schema after explicitly defining the schema in the script .

I am asking because i have a scenario where i have a schema in schema registry and i have to post message on topic . I was able to do it using get schema method but is there any other way where i don't have to use this get schema method and load JKS files and all in the script and simply i can explicitly mention the schema in script and post the message

Hey @Shabda1988,

You always have access to the schemaRegistry.getSchema method.

Yeah @mostafa i was able to post using getSchema method but is it possible to post the message without using this method ..i mean without referring to schema in schema registry and somehow defining schema in the script [ Condition : Schema is there in schema registry ]

@Shabda1988
Can you share a snippet and add comments in the code?

Hi @mostafa

Below is the code snippet

import {
  TLS_1_2,
  SASL_SCRAM_SHA512,
  Writer,
  Reader,
  SchemaRegistry,
  SCHEMA_TYPE_AVRO,
  } from "k6/x/kafka";

const topic = "topic-v1";

const tlsConfig = {
  enableTls: true,
  insecureSkipTlsVerify: true,
  minVersion: TLS_1_2
};

const saslConfig= {
  username: "demousername",
  password: "demopassword",
  algorithm: SASL_SCRAM_SHA512,
}
 
 
const writer = new Writer({
    brokers: ["broker:435"],
    topic: topic,
    sasl: saslConfig,
    tls: tlsConfig  
    });  
 
const schemaRegistry = new SchemaRegistry();
  
const valueSchema = JSON.stringify({
  "type": "record",
  "namespace": "com.d",
  "name": "Plan",
  "doc": "The  Plan",
  "fields": [
    {
      "name": "createdTime",
      "doc": "When",
      "type": [
        "null",
        {
          "type": "long",
          "logicalType": "timestamp-millis"
        }
      ],
      "default": null
    },
    {
      "name": "lastModifiedTime",
      "doc": "Whe",
      "type": [
        "null",
        {
          "type": "long",
          "logicalType": "timestamp-millis"
        }
      ],
      "default": null
    }   
  ]
})
;

// Main function 

export  default function(){ 
 
let messages = [
      {
        
        value: schemaRegistry.serialize({
          data: {  
            "createdTime": {"long.timestamp-millis": 1718025502872},
            "lastModifiedTime": {"long.timestamp-millis": 1718025502872}
          },
          schema: {schema:valueSchema},
          schemaType: SCHEMA_TYPE_AVRO,
        }),
        
       
      }
    ];    
  writer.produce({ messages: messages });   
}

I am able to post the message on kafka topic but consumer service is not consuming the message and also when i checked the message on topic its weird and not readable
While when i use getschema method ...proper message gets posted and consumer service consumes the message.

I have used the format which you provided in no schema registry example .

@Shabda1988 The getSchema gets the schema and caches it for reuse, AFAIK. So, it'd be safe to use it in your script.

@mostafa Ok but is it possible in the other way without using get schema method and trying like i showed above ?

Why above scenario is causing issue ? Is it because i already have schema in schema registry and explicitly putting schema in script is causing some issue or what when message is getting posted ?

@Shabda1988

In theory there should be no difference when using a schema in your script vs. the one in Schema Registry. But in practice, the Schema Registry assign a Schema ID to a schema stored in the Schema Registry. When the schema is read (get) from the Schema Registry, the magic-byte along with the the Schema ID is prepended to the encoded message upon producing and sending the message to Kafka, as shown below:

Avro-encoded message

On the contrary, when using a schema in the script, there is no Schema ID (schema is not stored anywhere to have an ID). This is why the messages are garbled when consumed elsewhere: you have to either 1) provide the same ID-less schema to the consumer on the other end, or 2) use getSchema on both ends.

Another possible (untested) solution is to figure out the Schema ID from Schema Registry and set it on the schema when (de/)serializing, something like this:

export default function() {
    let messages = [{
        value: schemaRegistry.serialize({
            data: {
                ...
            },
            schema: {
                id: 1, // Pass the Schema ID from Schema Registry here
                schema: valueSchema
            },
            schemaType: SCHEMA_TYPE_AVRO,
        }),
    }];
    writer.produce({
        messages: messages
    });
}

Note that I don't guarantee that it works.

@mostafa Awesome . It worked by using schema id :) and Thanks for the insight above on how things work .

schema: { id: 1, // Pass the Schema ID from Schema Registry here schema: valueSchema },

@Shabda1988 Good, glad I could help.