ReactorSleuth.tracedMono does not work with parallel processing
Kashif-S opened this issue · 1 comments
Describe the bug
The span & corresponding scope that wrap a Mono in ReactorSleuth.tracedMono
seem to get prematurely terminated when using an operator that forces a parallel scheduler like flatMap
or delayElements
. When this occurs, I’ve observed that the wrong trace & span ids get placed in the MDC, and new spans get created under the wrong parent span.
The Spring Sleuth instrumentation for Reactor Kafka currently wraps all KafkaRecords
within a ReactorSleuth.tracedMono
. As far as I am aware, due to this issue, there is no way to handle these KafkaRecords
concurrently while still maintaining accurate distributed trace information.
I have observed this issue with spring-cloud-sleuth-instrumentation-3.1.9
and spring-cloud-sleuth-otel-1.1.4
.
Samples
In both examples, a unique trace & span id is expected to be present in the MDC and on all logs for a given element. Both examples show that this is not the case for all values of spring.sleuth.reactor.instrumentation-type
FlatMap
Test case:
@Test
@DisplayName("Wrap elements in ReactorSleuth.tracedMono & apply flatMap")
void tracedMonoTestFlatMap() {
Hooks.enableAutomaticContextPropagation();
Flux.range(0, 10)
.flatMap(num -> ReactorSleuth.tracedMono(tracer, tracer.nextSpan(), () -> Mono.just(num))) // Simulates what Spring Sleuth does to wrap reactor kafka records
.doOnNext(num -> log.info("Before flatmap: {}", num))
.flatMap(num -> Mono.just(num).delayElement(Duration.ofMillis(1000)), 3, 3)
.doOnNext(num -> log.info("After flatmap: {}", num))
.blockLast();
}
Sample logs:
17:20:42.560 trace_id=5dca7636b4b2e86e095cb1333dcbd678 span_id=da332bb7e0efff32 [Test worker] INFO - Before flatmap: 0
17:20:42.582 trace_id=e293783dbaffc5820edd48d35442407f span_id=5e7a8039aed68482 [Test worker] INFO - Before flatmap: 1
17:20:42.583 trace_id=d64ce54f80ad0842f1767900c0bee0cc span_id=e9780f445d48f275 [Test worker] INFO - Before flatmap: 2
17:20:43.585 trace_id=5dca7636b4b2e86e095cb1333dcbd678 span_id=da332bb7e0efff32 [parallel-1] INFO - After flatmap: 0
17:20:43.585 trace_id=5dca7636b4b2e86e095cb1333dcbd678 span_id=da332bb7e0efff32 [parallel-1] INFO - After flatmap: 1
17:20:43.585 trace_id=5dca7636b4b2e86e095cb1333dcbd678 span_id=da332bb7e0efff32 [parallel-1] INFO - After flatmap: 2
17:20:43.585 trace_id=5dca7636b4b2e86e095cb1333dcbd678 span_id=da332bb7e0efff32 [parallel-1] INFO - Before flatmap: 4
17:20:43.585 trace_id=5dca7636b4b2e86e095cb1333dcbd678 span_id=da332bb7e0efff32 [parallel-1] INFO - Before flatmap: 5
17:20:43.585 trace_id=5dca7636b4b2e86e095cb1333dcbd678 span_id=da332bb7e0efff32 [parallel-1] INFO - Before flatmap: 6
17:20:44.591 trace_id=5dca7636b4b2e86e095cb1333dcbd678 span_id=da332bb7e0efff32 [parallel-5] INFO - After flatmap: 5
17:20:44.591 trace_id=5dca7636b4b2e86e095cb1333dcbd678 span_id=da332bb7e0efff32 [parallel-5] INFO - Before flatmap: 3
17:20:44.592 trace_id=5dca7636b4b2e86e095cb1333dcbd678 span_id=da332bb7e0efff32 [parallel-5] INFO - After flatmap: 6
17:20:44.592 trace_id=5dca7636b4b2e86e095cb1333dcbd678 span_id=da332bb7e0efff32 [parallel-5] INFO - Before flatmap: 7
17:20:44.593 trace_id=5dca7636b4b2e86e095cb1333dcbd678 span_id=da332bb7e0efff32 [parallel-5] INFO - After flatmap: 4
17:20:44.593 trace_id=5dca7636b4b2e86e095cb1333dcbd678 span_id=da332bb7e0efff32 [parallel-5] INFO - Before flatmap: 8
17:20:45.596 trace_id=5dca7636b4b2e86e095cb1333dcbd678 span_id=da332bb7e0efff32 [parallel-8] INFO - After flatmap: 7
17:20:45.597 trace_id=5dca7636b4b2e86e095cb1333dcbd678 span_id=da332bb7e0efff32 [parallel-8] INFO - After flatmap: 3
17:20:45.597 trace_id=5dca7636b4b2e86e095cb1333dcbd678 span_id=da332bb7e0efff32 [parallel-8] INFO - After flatmap: 8
17:20:45.597 trace_id=5dca7636b4b2e86e095cb1333dcbd678 span_id=da332bb7e0efff32 [parallel-8] INFO - Before flatmap: 9
17:20:46.612 trace_id=5dca7636b4b2e86e095cb1333dcbd678 span_id=da332bb7e0efff32 [parallel-2] INFO - After flatmap: 9
delayElements
Test case:
@Test
@DisplayName("Wrap elements in ReactorSleuth.tracedMono & apply delayElements")
void tracedMonoTestDelay() {
Hooks.enableAutomaticContextPropagation();
Flux.range(0, 10)
.flatMap(num -> ReactorSleuth.tracedMono(tracer, tracer.nextSpan(), () -> Mono.just(num))) // Simulates what Spring Sleuth does to wrap reactor kafka records
.doOnNext(num -> log.info("Before delay: {}", num))
.delayElements(Duration.ofMillis(1000))
.doOnNext(num -> log.info("After delay: {}", num))
.blockLast();
}
Sample logs:
17:21:48.012 trace_id=4b3a6780228de0034e4d216bc71f7dff span_id=c3a833fb10c89f09 [Test worker] INFO - Before delay: 0
17:21:49.052 trace_id=4b3a6780228de0034e4d216bc71f7dff span_id=c3a833fb10c89f09 [parallel-1] INFO - After delay: 0
17:21:49.052 trace_id=4b3a6780228de0034e4d216bc71f7dff span_id=c3a833fb10c89f09 [parallel-1] INFO - Before delay: 4
17:21:50.063 trace_id=4b3a6780228de0034e4d216bc71f7dff span_id=c3a833fb10c89f09 [parallel-2] INFO - After delay: 4
17:21:50.063 trace_id=4b3a6780228de0034e4d216bc71f7dff span_id=c3a833fb10c89f09 [parallel-2] INFO - Before delay: 1
17:21:51.071 trace_id=4b3a6780228de0034e4d216bc71f7dff span_id=c3a833fb10c89f09 [parallel-3] INFO - After delay: 1
17:21:51.071 trace_id=4b3a6780228de0034e4d216bc71f7dff span_id=c3a833fb10c89f09 [parallel-3] INFO - Before delay: 2
17:21:52.080 trace_id=4b3a6780228de0034e4d216bc71f7dff span_id=c3a833fb10c89f09 [parallel-4] INFO - After delay: 2
17:21:52.080 trace_id=4b3a6780228de0034e4d216bc71f7dff span_id=c3a833fb10c89f09 [parallel-4] INFO - Before delay: 3
17:21:53.085 trace_id=4b3a6780228de0034e4d216bc71f7dff span_id=c3a833fb10c89f09 [parallel-5] INFO - After delay: 3
17:21:53.085 trace_id=4b3a6780228de0034e4d216bc71f7dff span_id=c3a833fb10c89f09 [parallel-5] INFO - Before delay: 5
17:21:54.088 trace_id=4b3a6780228de0034e4d216bc71f7dff span_id=c3a833fb10c89f09 [parallel-6] INFO - After delay: 5
17:21:54.089 trace_id=4b3a6780228de0034e4d216bc71f7dff span_id=c3a833fb10c89f09 [parallel-6] INFO - Before delay: 6
17:21:55.092 trace_id=4b3a6780228de0034e4d216bc71f7dff span_id=c3a833fb10c89f09 [parallel-7] INFO - After delay: 6
17:21:55.092 trace_id=4b3a6780228de0034e4d216bc71f7dff span_id=c3a833fb10c89f09 [parallel-7] INFO - Before delay: 7
17:21:56.105 trace_id=4b3a6780228de0034e4d216bc71f7dff span_id=c3a833fb10c89f09 [parallel-8] INFO - After delay: 7
17:21:56.106 trace_id=4b3a6780228de0034e4d216bc71f7dff span_id=c3a833fb10c89f09 [parallel-8] INFO - Before delay: 8
17:21:57.107 trace_id=4b3a6780228de0034e4d216bc71f7dff span_id=c3a833fb10c89f09 [parallel-1] INFO - After delay: 8
17:21:57.107 trace_id=4b3a6780228de0034e4d216bc71f7dff span_id=c3a833fb10c89f09 [parallel-1] INFO - Before delay: 9
17:21:58.112 trace_id=4b3a6780228de0034e4d216bc71f7dff span_id=c3a833fb10c89f09 [parallel-2] INFO - After delay: 9
We encourage to migrate to Micrometer Tracing and use the new context propagation mechanism that fixes a lot of issues that Sleuth had