Broken metadata values encoding
slavirok opened this issue ยท 13 comments
Description
We have been using MirrorMaker to copy data from EventHub to Kafka for a while. Everything worked well since then except for one thing.
When consuming messages from Kafka we noticed that Header values got weird encoding. Please see the screenshot below.
P.S. I skipped the checklist because I don't think it would bring any value.
Can you paste your MirrorMaker configs? I'll set up a repro. I wonder if this may be IoT Hub sending in a specific encoding.
Yup - if you have an EH producer adding headers to AMQP messages (e.g. IoT Hub enriching messages), the message headers will be AMQP encoded when read by a Kafka consumer. EH is encoding-agnostic, we just pass around bytes. I'll still do a repro on my own, but you should try doing an AMQP decoding on the headers.
Can you paste your MirrorMaker configs? I'll set up a repro. I wonder if this may be IoT Hub sending in a specific encoding.
Producer:
max.in.flight.requests.per.connection=1
security.protocol=ssl
ssl.keystore.type=PKCS12
ssl.truststore.type=JKS
client.id=${KAFKA_CLIENT_ID}
bootstrap.servers=${KAFKA_BOOTSTRAP_SERVER}
ssl.keystore.location=${KAFKA_SSL_KEYSTORE_LOCATION}
ssl.truststore.location=${KAFKA_SSL_TRUSTSTORE_LOCATION}
ssl.keystore.password=${KAFKA_SSL_PASSWORD}
ssl.key.password=${KAFKA_SSL_PASSWORD}
ssl.truststore.password=${KAFKA_SSL_PASSWORD}
Consumer:
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${DOLLAR}ConnectionString" password="${EVENTHUB_CONNECTION_STRING}";
group.id=${EVENTHUB_CLIENT_ID}
client.id=${EVENTHUB_CLIENT_ID}
exclude.internal.topics=true
partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
auto.offset.reset=earlies
@arerlend, thanks for reply.
Yup - if you have an EH producer adding headers to AMQP messages (e.g. IoT Hub enriching messages), the message headers will be AMQP encoded when read by a Kafka consumer. EH is encoding-agnostic, we just pass around bytes. I'll still do a repro on my own, but you should try doing an AMQP decoding on the headers.
By any chance do you have an example how AMQP decoding on the headers looks like?
Sorry about taking a while to respond, that is a good question...
Here's the spec - http://docs.oasis-open.org/amqp/core/v1.0/amqp-core-types-v1.0.xml
But I don't know if that's the most helpful answer....
The .Net AMQP library (https://www.nuget.org/packages/Microsoft.Azure.Amqp/) supports it. This is one example.
using System;
using Microsoft.Azure.Amqp;
using Microsoft.Azure.Amqp.Encoding;
namespace ConsoleApplication1
{
class Program
{
static void Main(string[] args)
{
byte[] bytes = { 0xa1, 0x05, 0x48, 0x65, 0x6c, 0x6c, 0x6f };
Console.WriteLine(Decode(bytes));
}
static object Decode(byte[] bytes)
{
return AmqpEncoding.DecodeObject(new ByteBuffer(bytes, 0, bytes.Length));
}
}
}
If you use Java, take a look at the proton-j package (https://mvnrepository.com/artifact/org.apache.qpid/proton-j). Using the DecoderImpl class you should be able to read an object from a buffer.
I see, thanks Xin.
Link to docs for proton-j decoder class - https://qpid.apache.org/releases/qpid-proton-j-0.33.1/api/index.html
Thanks for your help, @arerlend, @xinchen10.
In case someone is interested, this is Java code to decode Kafka header values using proton-j library.
DecoderImpl decoder = new DecoderImpl();
EncoderImpl encoder = new EncoderImpl(decoder);
AMQPDefinedTypes.registerAllTypes(decoder, encoder);
decoder.setByteBuffer(ByteBuffer.wrap(header.value()));
Object obj = decoder.readObject();
String result = obj.toString();```
Does anyone know of a way to decode these values in a Python UDF? We have a use case where we can't run the scala example.
Does anyone know of a way to decode these values in a Python UDF? We have a use case where we can't run the scala example.
Have you looked into the pamqp package? https://github.com/gmr/pamqp
I myself decided to go for the Scala impl, but if you can't go that way, look into the above package.
If someone happends to want to decode AMQP encoded strings in spark (e.g. IoT hub headers), I found that this worked (as long as you know that it is strings, since all it does is ditch the typing information in the first bytes):
def decode_amqp_str(val_col):
"""Amqp adds some bytes of type-metadata. We ditch it and parses the rest as a string"""
#2147483647: https://stackoverflow.com/questions/57867088/pyspark-substr-without-length
return F.substring(val_col, 3,2147483647).cast("string")
If someone happends to want to decode AMQP encoded strings in spark (e.g. IoT hub headers), I found that this worked (as long as you know that it is strings, since all it does is ditch the typing information in the first bytes):
def decode_amqp_str(val_col): """Amqp adds some bytes of type-metadata. We ditch it and parses the rest as a string""" #2147483647: https://stackoverflow.com/questions/57867088/pyspark-substr-without-length return F.substring(val_col, 3,2147483647).cast("string")
Here are some slightly more robust ones
@udf("string")
def string_from_amqp(val: bytes):
# See https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-types-v1.0-os.html#type-string
if val == None:
return None
match val[0]:
# An AMQP Null value
case 0x40:
return None
# An AMQP string up to 2^8 - 1 octets worth of UTF-8 Unicode (with no byte order mark)
case 0xa1:
return val[2:].decode('UTF-8', 'strict')
# an AMQP string up to 2^32 - 1 octets worth of UTF-8 Unicode (with no byte order mark)
case 0xb1:
return val[5:].decode('UTF-8', 'strict')
#TODO: maybe this should fail here instead
return val.hex()
@udf("boolean")
def bool_from_amqp(val: bytes):
# See https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-types-v1.0-os.html#type-boolean
if val == None:
return None
match val[0]:
case 0x56:
if val[1] == 0x1:
return True
return False
case 0x41:
return True
case 0x42:
return False
#TODO: maybe this should fail here instead
return None