siddhi-io/siddhi-map-avro

Avro Map not compatible with Kafka I/O due to retention of schema registry header

jheinnic opened this issue · 2 comments

Description:
The Avro Map extension is often referenced in conjunction with Siddhi's extnesion for Kafka. Unfortunately none of the available examples for using these together seem to work!
The culprit appears to be that Confluent's support for Avro in Kafka utiilizes an interaction with its Schema Registry to assign a schema ID to each unique schema it finds, and prefixes every message it stores in Kafka with its schema ID in a 5-byte prefix just before every Avro-seraiilized message follows. Those five bytes are not stripped away before this mapper is invoked, and the extra 5 byte header throws the binary decoding off kilter.

It is possible to reproduce the same incorrect decoding by taking a message from an Avro-enabled topic and get the identical wrong result just by retaining the 5-byte header, and then also to cure the problem by removing those five out-of-band header bytes.

Affected Product Version:
2.0.6

OS, DB, other environment details and versions:
MacOS Mojave (10.14.4) running with Kafka 2.3.1's Docker Images on Docker Desktop Community 2.1.0.4

Steps to reproduce:

  1. Save this avro schema to sample.avsc:
{"namespace":"sample.avro","type":"record","name":"avro","fields":[{"name":"name","type":"string"},{"name":"amount","type":"double"}]}
  1. Save an example document in JSON to sample.json:
{"name":"exnobhbyaitkfiayoyipianxshgkofnmpkef","amount":0.9629494268310558}
  1. Launch docker container with Kafka exposed on port 9092 and Schema Registry on 8081. The following docker-compose works for that purpose:
version: '3.6'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:5.3.1
    ports:
     - 2181:2181
     - 2888:2888
     - 3888:3888
    environment:
     - ZOOKEEPER_CLIENT_PORT=2181
     - ZOOKEEPER_SERVER_ID=1
     - ZOOKEEPER_TICK_TIME=2000
     - ZOOKEEPER_SYNC_LIMIT=2
  kafka:
    image: confluentinc/cp-kafka:5.3.1
    ports:
     - 9092:9092
    links:
     - zookeeper
    environment:
     - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
     - KAFKA_KAFKA_HOST_NAME=kafka
     - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
     - KAFKA_BROKER_ID=1
     - KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:19092,LISTENER_HOST://0.0.0.0:9092
     - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:19092,LISTENER_HOST://localhost:9092
     - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,LISTENER_HOST:PLAINTEXT
     - KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT
  schema-registry:
    image: confluentinc/cp-schema-registry:5.3.1
    ports:
     - 8081:8081
    environment:
     - SCHEMA_REGISTRY_HOST_NAME=schema-registry
     - SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS=PLAINTEXT://kafka:19092
     - SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081
     - SCHEMA_REGISTRY_DEBUG=false
    links:
     - zookeeper
     - kafka
  1. Load our message to a topic:
cat sample.json | kafka-avro-console-producer --topic "kafka_result_topic" --broker-list localhost:9092 --property schema.registry.url=http://localhost:8081  --property value.schema=`cat sample.avsc`
  1. Create and run the following Siddhi app
@App:name('KafkaSample')
@sink(type='log')
define stream logStream(name string, amount double);

@source(type='kafka', bootstrap.servers='localhost:9092', topic.list='kafka_result_topic', group.id='test', threading.option='single.threaded', is.binary.message='true', @map(type='avro', schema.def = '{"namespace":"sample.avro","type":"record","name":"avro","fields":[{"name":"name","type":"string"},{"name":"amount","type":"double"}]}' 
define stream LowProductionAlertStream(name string, amount double);

from LowProductionAlertStream
select *
insert into logStream;
  1. Observe the incorrect decoding in Siddhi's output log:
     [java] [io.siddhi.core.stream.output.sink.LogSink] : KafkaSample : logStream : Event{timestamp=1573734234725, data=[, 1.410940917531979E224], isExpired=false}
  1. .Extract the raw message from Kakfa
 kafka-console-consumer --topic kafka_result_topic --offset 0 --partition 0 --max-messages 1 --bootstrap-server localhost:9092 > onemsg.dat
  1. Confirm that the retention of Confluent's 5 byte header for Schema Registry yields the same incorrect result:
    avro-tools fragtojson --schema-file sample.avsc onemsg.dat
    Observe that the name is similarly absent and the amount is still 1.410940917531979E224, not the expected 0.9629494268310558
  2. Remove the five byte header and try again
old_sz=$(echo `wc -c onemsg.dat | awk '{print $1}'`)
new_sz = $(echo "$old_sz - 5" | bc)
sh -c "tail -${s}c onemsg.dat" > trunc.dat
avro-tools fragtojson --schema-file sample.avsc onemsg.dat

Observe that this time the 5-byte-shorter message deserialized too our original input was!!

Alternately, consider providing an alternate map extension that leverages https://github.com/AbsaOSS/ABRiS, an Avro bridge for Apache Spark that supports encoding and decoding with either a user-provided schema, a Confluent schema registry, or both. The first of these three case is what Siddhi's existing map covers, the other two are the cases it does not.

The difference between ABRiS's two schema registry cases, one with user-provided schema, the other without, boil down to whether registry is used to provide source-of-truth schema for encoding/decoding or to verify compatibility with user-provided source-of-truth schema used for encoding/decoding. In both cases, ABRiS ensures correct addition/removal of Confluent's 5-byte header with content kept consistent with Schema Registry semantics.

Hold on a minute. Maybe I missed it before, but this extension already accepts a schema registry argument. It seems to want a hard-coded reference to the expected schema ID, rather than using a topic name, but this issue may boil down to user error on my part. I may be revisiting this use case again soon and will post an amendment soon after that if what I'm seeing on revisiting it this evening pans out. It looks as though I simply failed to set the right options to accomplish my use case. I'll return to either clarify or close this issue once I've found that opportunity to revisit it, and leave this half-embarrassed comment to help anyone who happens to look at it before then in the meantime.