Schema Registry serialized message causes SchemaRegistryException on AWS
Closed this issue · 1 comments
horuktaras commented
We're currently using AWS Glue Schema Registry for storing our schemas. However, due to the limitations of the current library, we were unable to connect directly to Glue. As a workaround, I hard-coded the schema directly into the code. This allowed me to successfully serialize the messages and send them to Kafka. Unfortunately, the next day, the DevOps team notified us of errors related to the serialization of these messages.
KafkaMessageDTO
class where we are generating messages and transform them to AVRO
import RandomUtils from "../../_utils/RandomUtils.js";
import {SchemaRegistry, SCHEMA_TYPE_AVRO, SCHEMA_TYPE_STRING} from "k6/x/kafka"
export class KafkaMessageDTO{
constructor() {
//todo: configure SchemaRegistry to connect to AWS Glue when it will be possible
this.schemaRegistry = new SchemaRegistry();
this.geoDataAVROSchema =
{
"namespace": "com.namespace.example",
"type": "record",
"name": "Data",
"fields": [
{
"name": "id",
"type": "string"
},
{
"name": "type",
"type": {
"type": "enum",
"name": "Type",
"symbols": [
"123",
"321"
]
}
},
{
"name": "source",
"type": {
"type": "enum",
"name": "Source",
"symbols": [
"321",
"123"
]
}
},
{...other fields}
]
}
}
createGeoDataMessage(id) {
const random = new RandomUtils();
const eventTypes = ["321", "123"];
const eventSource = "123"; // Fixed event source
let generatedData = {
id: id,
type: eventTypes[random.getRandomInt(0, eventTypes.length - 1)],
source: eventSource,
{...other fields}
};
console.log("JSON message to send to Kafka: " + JSON.stringify(generatedData))
try {
let serializedMessage = [
{
key: this.schemaRegistry.serialize({
data: vehicleId,
schemaType: SCHEMA_TYPE_STRING,
}),
value: this.schemaRegistry.serialize({
data: generatedData,
schema: {schema: JSON.stringify(this.geoDataAVROSchema)},
schemaType: SCHEMA_TYPE_AVRO,
}),
}
];
console.log("Serialized to AVRO message to send to Kafka: " + JSON.stringify(serializedMessage))
return serializedMessage;
} catch (error) {
console.error("Serialization error:", error);
throw error;
}
}
}
export default GeoDataMessage;
Test.js
class where we are sending messages
const kafka = new KafkaClient("data.vtg.normalized.gps-event");
let msg = new GeoDataMessage().createGeoDataMessage(vehicleId);
kafka.produceMessage(msg)
KafkaClient
class with connections
import {Connection, Reader, SASL_AWS_IAM, TLS_1_2, Writer} from "k6/x/kafka"
export class KafkaClient {
constructor(topic) {
const brokers = [
"b-1.....c7.kafka.us-east-1.amazonaws.com:9098",
"b-3.....c7.kafka.us-east-1.amazonaws.com:9098",
"b-2.....c7.kafka.us-east-1.amazonaws.com:9098"
]
const saslConfig = {
algorithm: SASL_AWS_IAM
}
const tlsConfig = {
enableTls: true,
insecureSkipTlsVerify: true,
minVersion: TLS_1_2,
}
const offset = 0
const partition = 0
const numPartitions = 1
const replicationFactor = 1
this.writer = new Writer({
brokers: brokers,
topic: topic,
sasl: saslConfig,
tls: tlsConfig
})
this.reader = new Reader({
brokers: brokers,
topic: topic,
partition: partition,
offset: offset,
sasl: saslConfig,
tls: tlsConfig
})
this.connection = new Connection({
address: brokers[0],
sasl: saslConfig,
tls: tlsConfig,
})
}
produceMessage(message) {
try {
this.writer.produce({messages: message});
} catch (error) {
console.error("Producing error:", error);
throw error;
}
}
}
export default KafkaClient;
Errors we are recieving:
Unable to deserialize message using <aws_schema_registry.serde.KafkaDeserializer object at 0x7f5e0a383710>: b’\x00\x00\x00\x00\x00\x12937397190\x00\x00\x00(2024-08-21T15:50:42Z\x02(2024-08-21T15:50:42Z(2024-08-21T15:50:42Z\x02\x0e1165551\x10M7738371
20603\x9a\x99sB\x02\x00\x00\x00\x00\x02\x88\xf7
={\xd3\x99\xa0\xc8U\xc0b\x97\xf2k\xb5
_\xc0\x02\x02\x01\x02\x00\x02\x01\x02\x00\x02\x00'
or
aws_schema_registry.exception.SchemaRegistryException: no secondary deserializer provided to handle unrecognized data encoding
Do you have and idea WHY it is going on?
mostafa commented
@horuktaras I explained it in detail here.