ExpediaGroup/pitchfork

Kafka messages are malformatted

vineethpp opened this issue · 6 comments

Hi,
I was trying to use pitchfork as listener and forward the spans to kafka, but not sure the messages when i tail the kafka topic is coming like below, am not sure its encoded for haystack.

016dd9ad540015a153755872f034a24d016dd9ad540015a1"
tracingapp*get0�҅���8�"J

http.methodGETJ
http.path /serviceBJ
span.kindclient

016dd9ad540015a153755872f034a24d016dd9ad540015a1"
sample*get0�����8�J

http.methodGETJ
http.path /serviceBJ)
mvc.controller.classTracingControllerJ+
mvc.controller.methodcallAnotherServiceJ!
1557936287610Jer
span.kindserver

The expectation was the span to come like this

[{"http.method":"GET","http.path":"/serviceB"},"timestamp":1557911263087114,"parentId":"c2da33ad4e851c5d","id":"f8489acb3d718ffb","name":"get","localEndpoint":{"ipv4":"127.0.0.1","serviceName":"tracingapp"},"traceId":"c2da33ad4e851c5d"}]

Also let me know if am trying something wrong or understood wrong.

Hi @vineethpp!
Haystack expects the messages to be encoded with protobuf so they won't be human readable.

Did you integrate it with Haystack yet and are you seeing problems on that side?

ps: feel free to jump into https://gitter.im/expedia-haystack/Lobby if you think it's easier to discuss this.

Thank you, I was trying to ingest it to zipkin elastic search using zipkin collector. There it failed and its obvious. I would like to get some directions if i can extend this to write zipkin compatible span message to kafka.

I'm not sure if I understood the question. You want to use Pitchfork to accept spans in Zipkin format (HTTP) and forward this data to a Kafka topic also using the Zipkin format?

yes exactly. If you can give me some directions on doing it in the code, I can give a try and if works fine i can push it here as well. Appreciate your support and help. Thanks

Sure. I think we'll need to discuss it further if it makes sense adding this feature to Pitchfork, but I guess for now you can fork the project and use it as a starting point.

I had a look at the code and I think what you want is almost a copy of the HaystackKafkaSpanForwarder which already forwards spans to a Kafka topic, but instead of serializing the input span into the Haystack format you can serialize it back to the Zipkin format.

Something along these lines:

 @Override
 public void process(zipkin2.Span input) {
     logger.debug("operation=process, span={}", input);

     var bytes = SpanBytesEncoder.JSON_V2.encodeList(List.of(input));
     final ProducerRecord<String, byte[]> record = new ProducerRecord<>(topic, input.id(), bytes);

     producer.send(record);
 }

Thank you @worldtiki I will try this out in my fork.
Thanks a lot for your help.