Azure/azure-event-hubs-for-kafka

Broken metadata values encoding

slavirok opened this issue ยท 13 comments

Description

We have been using MirrorMaker to copy data from EventHub to Kafka for a while. Everything worked well since then except for one thing.

When consuming messages from Kafka we noticed that Header values got weird encoding. Please see the screenshot below.

Screen Shot 2019-07-10 at 16 53 33

P.S. I skipped the checklist because I don't think it would bring any value.

Thanks for the report @slavirok, I'll look into it.

Can you paste your MirrorMaker configs? I'll set up a repro. I wonder if this may be IoT Hub sending in a specific encoding.

Yup - if you have an EH producer adding headers to AMQP messages (e.g. IoT Hub enriching messages), the message headers will be AMQP encoded when read by a Kafka consumer. EH is encoding-agnostic, we just pass around bytes. I'll still do a repro on my own, but you should try doing an AMQP decoding on the headers.

Can you paste your MirrorMaker configs? I'll set up a repro. I wonder if this may be IoT Hub sending in a specific encoding.

Producer:

max.in.flight.requests.per.connection=1
security.protocol=ssl
ssl.keystore.type=PKCS12
ssl.truststore.type=JKS
client.id=${KAFKA_CLIENT_ID}
bootstrap.servers=${KAFKA_BOOTSTRAP_SERVER}
ssl.keystore.location=${KAFKA_SSL_KEYSTORE_LOCATION}
ssl.truststore.location=${KAFKA_SSL_TRUSTSTORE_LOCATION}
ssl.keystore.password=${KAFKA_SSL_PASSWORD}
ssl.key.password=${KAFKA_SSL_PASSWORD}
ssl.truststore.password=${KAFKA_SSL_PASSWORD}

Consumer:

sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${DOLLAR}ConnectionString" password="${EVENTHUB_CONNECTION_STRING}";
group.id=${EVENTHUB_CLIENT_ID}
client.id=${EVENTHUB_CLIENT_ID}
exclude.internal.topics=true
partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
auto.offset.reset=earlies

@arerlend, thanks for reply.

Yup - if you have an EH producer adding headers to AMQP messages (e.g. IoT Hub enriching messages), the message headers will be AMQP encoded when read by a Kafka consumer. EH is encoding-agnostic, we just pass around bytes. I'll still do a repro on my own, but you should try doing an AMQP decoding on the headers.

By any chance do you have an example how AMQP decoding on the headers looks like?

Sorry about taking a while to respond, that is a good question...

Here's the spec - http://docs.oasis-open.org/amqp/core/v1.0/amqp-core-types-v1.0.xml

But I don't know if that's the most helpful answer....

The .Net AMQP library (https://www.nuget.org/packages/Microsoft.Azure.Amqp/) supports it. This is one example.

using System;
using Microsoft.Azure.Amqp;
using Microsoft.Azure.Amqp.Encoding;

namespace ConsoleApplication1
{
    class Program
    {
        static void Main(string[] args)
        {
            byte[] bytes = { 0xa1, 0x05, 0x48, 0x65, 0x6c, 0x6c, 0x6f };
            Console.WriteLine(Decode(bytes));
        }

        static object Decode(byte[] bytes)
        {
            return AmqpEncoding.DecodeObject(new ByteBuffer(bytes, 0, bytes.Length));
        }
    }
}

If you use Java, take a look at the proton-j package (https://mvnrepository.com/artifact/org.apache.qpid/proton-j). Using the DecoderImpl class you should be able to read an object from a buffer.

I see, thanks Xin.

Link to docs for proton-j decoder class - https://qpid.apache.org/releases/qpid-proton-j-0.33.1/api/index.html

Thanks for your help, @arerlend, @xinchen10.

In case someone is interested, this is Java code to decode Kafka header values using proton-j library.

  DecoderImpl decoder = new DecoderImpl();
  EncoderImpl encoder = new EncoderImpl(decoder);
  AMQPDefinedTypes.registerAllTypes(decoder, encoder);

  decoder.setByteBuffer(ByteBuffer.wrap(header.value()));
  Object obj = decoder.readObject();
  String result = obj.toString();```

Does anyone know of a way to decode these values in a Python UDF? We have a use case where we can't run the scala example.

Does anyone know of a way to decode these values in a Python UDF? We have a use case where we can't run the scala example.

Have you looked into the pamqp package? https://github.com/gmr/pamqp
I myself decided to go for the Scala impl, but if you can't go that way, look into the above package.

If someone happends to want to decode AMQP encoded strings in spark (e.g. IoT hub headers), I found that this worked (as long as you know that it is strings, since all it does is ditch the typing information in the first bytes):

def decode_amqp_str(val_col):
  """Amqp adds some bytes of type-metadata. We ditch it and parses the rest as a string"""
  #2147483647:   https://stackoverflow.com/questions/57867088/pyspark-substr-without-length
  return F.substring(val_col, 3,2147483647).cast("string")

If someone happends to want to decode AMQP encoded strings in spark (e.g. IoT hub headers), I found that this worked (as long as you know that it is strings, since all it does is ditch the typing information in the first bytes):

def decode_amqp_str(val_col):
  """Amqp adds some bytes of type-metadata. We ditch it and parses the rest as a string"""
  #2147483647:   https://stackoverflow.com/questions/57867088/pyspark-substr-without-length
  return F.substring(val_col, 3,2147483647).cast("string")

Here are some slightly more robust ones

@udf("string")
def string_from_amqp(val: bytes):
    # See https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-types-v1.0-os.html#type-string
    if val == None:
        return None
    match val[0]:
        # An AMQP Null value
        case 0x40:
            return None
        # An AMQP string up to 2^8 - 1 octets worth of UTF-8 Unicode (with no byte order mark)
        case 0xa1:
            return val[2:].decode('UTF-8', 'strict')
        # an AMQP string up to 2^32 - 1 octets worth of UTF-8 Unicode (with no byte order mark)
        case 0xb1:
            return val[5:].decode('UTF-8', 'strict')

    #TODO: maybe this should fail here instead
    return val.hex()

@udf("boolean")
def bool_from_amqp(val: bytes):
    # See https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-types-v1.0-os.html#type-boolean
    if val == None:
        return None
    match val[0]:
        case 0x56:
            if val[1] == 0x1: 
                return True
            return False
        case 0x41:
            return True
        case 0x42:
            return False

    #TODO: maybe this should fail here instead
    return None