spring-cloud/spring-cloud-sleuth

Span is not closed & exported when returning Mono.empty in Spring Cloud Function

cmergenthaler opened this issue · 6 comments

Describe the bug
We are facing problems when using sleuth auto-instrumentation for kafka messaging. Everything works fine as long as we do not return an empty Mono in our reactive function. We even see invalid parent span IDs warnings in jaeger.
Any ideas or suggestions?

Sample

@Bean
public Function<Flux<Message<KafkaMessage>>, Mono<Void>> messageProcessor(Tracer tracer) {
    return data -> data
            .flatMap(inbound -> {
                log.info("Auto-created span");
                Span child = tracer.nextSpan().start();
                log.info("Manual inner span");
                child.end();
                return Mono.empty();
            }).then();
}

missingspan

olegz commented

Out of curiosity, why are you returning an empty Mono? I mean we may never accounted for this case, but just curious.
Also, keep in mind we are very limited with what we can do on reactive function, since we have absolutely no control of the stream or what user does with it. The function acts only as initialisation function and invoked once at the startup to connect user stream (e.g., Flux). Once connected it's as if s-c-stream didn't even exist.
This is fundamentally different than an imperative function that is a classic message handler fully controlled by the framework.

Thanks for your reply @olegz
We do have some business logic where we do not want to publish a message on some cases (but also don't error because it is expected), so we have solved this by returning an empty Mono in the past. If you have suggestions/examples how to solve this in another way, we'd be happy for some input.
Now that we introduce tracing with sleuth in our services, the cases that return an empty Mono won't be traced properly (because of an unclosed span). I do understand that scf is limited on accessing the function provided by the user, but any other solution/workaround would also be fine for us.

olegz commented

If you want to publish message optionally and you can consider moving to imperative approach you can do a combination of Consumer and StreamBridge

@Bean
public Consumer messageProcessor(StreamBridge bridge, Tracer tracer) {
   return message -> {
       // do whatever with message
       // use tracer etc. . . 
       if (i want to send a message) {
          bridge.send(bindingName, message);
       }
   };
}

This way you have full control and in case of any errors we would do what we promise with retries etc. . .

Let us know
Cheers

Moving to imperative is not really an option as most of our code is in reactive already. Is there any way using reactive to achieve this?

olegz commented

Have you tried

public Consumer<Flux<Message<KafkaMessage>>> messageProcessor(Tracer tracer) {
. . .
}

https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_consumer_reactive

I think we are fine when building it that way. Therefore I will close the issue for now.
Thanks @olegz