reactor/reactor-kafka

Reactor Core + reactor-core-micrometer + context-propagation + tap, still no trace ID seen

patpatpat123 opened this issue · 16 comments

Hello Reactor Kafka team,

I wanted to reach out reporting a small issue please:

I am having this very simple consumer based on Reactor Kafka:

package org.example;

import io.micrometer.observation.ObservationRegistry;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Service;
import reactor.core.observability.micrometer.Micrometer;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;

import java.time.Duration;

@Service
public class MyConsumer implements CommandLineRunner {

    private static final Logger LOGGER = LoggerFactory.getLogger(MyConsumer.class);

    @Autowired
    private ReceiverOptions<String, String> receiverOptions;

    @Autowired
    private ObservationRegistry observationRegistry;


    @Override
    public void run(String... args) {
        myConsumer().subscribe();
    }

    public Flux<String> myConsumer() {
        return KafkaReceiver
                .create(receiverOptions)
                .receive()
                .concatMap(oneMessage -> consume(oneMessage), 500)
                .name("greeting.call")		//1
                .tag("latency", "low")	//2
                .tap(Micrometer.observation(observationRegistry));
    }

    private Mono<String> consume(ConsumerRecord<String, String> oneMessage) {
        LOGGER.info(String.format("<= Hope for traceID", Thread.currentThread().getName(), oneMessage));
        String transformedStringCPUIntensiveNonButNonBLocking = oneMessage.value().toUpperCase();
        return Mono.delay(Duration.ofSeconds(5L)).just(transformedStringCPUIntensiveNonButNonBLocking);
    }

}

Please note, in the code, you do see:

the pom:

<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xmlns="http://maven.apache.org/POM/4.0.0"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>reactkafka</artifactId>
    <version>1.0-SNAPSHOT</version>

    <name>reactkafka</name>
    <description>Spring Boot</description>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.0.1</version>
        <relativePath/>
    </parent>

    <properties>
        <java.version>17</java.version>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>io.projectreactor.kafka</groupId>
            <artifactId>reactor-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>io.micrometer</groupId>
            <artifactId>micrometer-registry-prometheus</artifactId>
        </dependency>
        <dependency>
            <groupId>io.micrometer</groupId>
            <artifactId>micrometer-tracing-bridge-brave</artifactId>
        </dependency>
        <dependency>
            <groupId>io.zipkin.reporter2</groupId>
            <artifactId>zipkin-reporter-brave</artifactId>
        </dependency>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-core-micrometer</artifactId>
        </dependency>
        <dependency>
            <groupId>io.micrometer</groupId>
            <artifactId>context-propagation</artifactId>
            <version>1.0.0</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

Please note, in the pom you do see: Reactor Core + reactor-core-micrometer + context-propagation

However, there are still no trace in the square bracket.

2023-02-07 18:48:57 23748 INFO  --- [reactive-kafka-rector-thread-1] [kafka-consumer,,] org.example.MyConsumer : <= Hope for traceID

I am expecting to see something like this.

2023-02-07 18:48:57 23748 INFO  --- [reactive-kafka-rector-thread-1] [kafka-consumer,SOMETRACEIDHERE,SOMETRACEIDHERE] org.example.MyConsumer : <= Hope for traceID

Again, there are the correct dependencies, and tap.

Could you please help on this issue?

Thank you

It would be great to understand if tap() operator has some impact on others like yours concatMap(), but I'd suggest you to look into some explicit propagation in addition to tap() for an Observation:

.concatMap((oneMessage) ->
	Mono.deferContextual(contextView -> {
		var scope = ContextSnapshot.setAllThreadLocalsFrom(contextView);
		try (scope) {
			return consume(oneMessage);
		}
}), 500)

Thank you Artem for your comment.

I gave it a try with your sample, and indeed, I am able to see traceId within the square brackets.
However, for each and every messages they come from, the traceId will always stay the exact same, making any sort of tracing impossible.

May I ask if I miss something?

Would it be possible to have a behavior similar to Spring Web with Sleuth, where each and every separate http request would result in a different traceId in the square brackets?

The behavior for your current configuration is expected.
You do .tap(Micrometer.observation(observationRegistry)) and this one starts a single Observation for the whole subscriber.
If you need a trace per event in the Flux, then you need to start one manually.
Perhaps in that your consume() method.
This way a tap() might be redundant for your since you are not interested in the subscriber's traceId.

Another way is to move your tap() down to your Mono:

    private Mono<String> consume(ConsumerRecord<String, String> oneMessage) {
        LOGGER.info(String.format("<= Hope for traceID", Thread.currentThread().getName(), oneMessage));
        String transformedStringCPUIntensiveNonButNonBLocking = oneMessage.value().toUpperCase();
        return Mono.delay(Duration.ofSeconds(5L)).just(transformedStringCPUIntensiveNonButNonBLocking)
                               .tap(Micrometer.observation(observationRegistry));
    }

Then you don't need it for the top-level subscriber and you will need to bring your simple concatMap() back.

Many thanks @artembilan for this suggestion.

I followed your advices and moved the tap down to the Mono.

Unfortunately, all traces are lost (from only one traceId for everything)

--- [reactive-kafka-rector-thread-1] [kafka-consumer,,] org.example.MyConsumer

It seems this SpringBoot 3 Observability announed in the latest SpringOne is quite difficult for reactive API!

I will keep the trials and errors (mostly errors) going

It was always difficult for Reactive Streams since MDC is based on a ThreadLocal and we have to ensure somehow that trace is set over there from this or that thread involved in a reactive stream processing.
So, since you'd like to have a new trace on your LOGGER.info() in that consume() method, then there is no way to automatically do it unless you start a new Observation in scope in that method.
See more info in Observation docs: https://micrometer.io/docs/observation

 Observation.createNotStarted("my observation", registry)
                .observe(() ->  LOGGER.info(String.format("<= Hope for traceID", Thread.currentThread().getName(), oneMessage)););

Thank you @artembilan for your continuous advices on this.

I gave it a try, and unfortunately, this will yield a strange behavior.

Let's say I have a producer, which generates the message, and the message from consumer will have traceId A.

This code from the producer, will print a traceId B, completely different from the one sent from producer. There is no way to correlate both.

Well, but that's different story. Sounds like you would like to have in addition a trace propagation from Apache Kafka producer to consumer and therefore correlate a producer record with consumer one.
We did such an integration on Spring for Apache Kafka side: https://docs.spring.io/spring-kafka/reference/html/#observation.
Not sure what is there in plain Apache Kafka client to leverage what you are asking.
If you have that info in the ConsumerRecord already, of course you can restore a parent trace using ReceiverContext and Getter from that ConsumerRecord.headers().
Something similar to what we have in the KafkaRecordReceiverContext.
At the same time you can borrow an Observation creation idea from KafkaMessageListenerContainer:

			Observation observation = KafkaListenerObservation.LISTENER_OBSERVATION.observation(
					this.containerProperties.getObservationConvention(),
					DefaultKafkaListenerObservationConvention.INSTANCE,
					() -> new KafkaRecordReceiverContext(cRecord, getListenerId(), this::clusterId),
					this.observationRegistry);
			return observation.observe(() -> {

Of course, that is going to work if your producer side propagate that info into ProducerRecord.

Many thanks @artembilan,

For Spring Kafka (not Reactor Kafka) having correct traces between producer and consumer is quite easy, with the help of enableObservation

Would it be possible to have similar mechanism in Reactor Kafka please?

A world where:
reactor kafka producer -> reactor kafka consumer
normal kafka producer -> reactor kafka consumer
reactor kafka producer -> normal kafka consumer

for above three case, to see the traces in the MDC, the square bracket, and the traces correct between producer and consumer would be amazing.

See this PR: #323.

I guess we can go similar way for an ObservationRegistry.
Or we can just inject an ObservationRegistry to respective options and instrument KafkaConsumer.poll() and KafkaProducer.send() with respective ReceiverContext and SenderContext to be able to propagate traces similar way as we do in Spring for Apache Kafka.
I'm not sure about automatic MDC in every singe reactive operator, probably something like Hooks.onEachOperator(), but for that I need to consult with Reactor experts.

Understood @artembilan.

Should I forward this to a "Reactor expert" (which I am not) by opening some ticket to a particular repo?

Basically, anything you would like me to help with, I am okay.

I think having the traceId for Reactor Kafka would be a huge gain for this project.

No, you don't need to do anything else: this ticket is fully enough.
I'll come back when I have more info.

Thank you for your help!

OK. I think we will go with general Reactor approach: https://projectreactor.io/docs/core/release/reference/#_operators_that_transparently_restore_a_snapshot_handle_and_tap.
So, if you use handle() or tap(), the you'll get MDC populated from a ThreadLocal-stored by the Reactor context Observation.
If that is not a case, then whatever we have discussed above about Mono.deferContextual() is a way to go in other operators where you'd like to restore ThreadLocal from Reactor context.

That still leave this issue as about automatic Observation propagation from Consumer to Producer.
And that I guess would be similar to what we have so far with a org.springframework.web.filter.reactive.ServerHttpObservationFilter:

	private Publisher<Void> filter(ServerWebExchange exchange, ServerRequestObservationContext observationContext, Mono<Void> call) {
		Observation observation = ServerHttpObservationDocumentation.HTTP_REACTIVE_SERVER_REQUESTS.observation(this.observationConvention,
				DEFAULT_OBSERVATION_CONVENTION, () -> observationContext, this.observationRegistry);
		observation.start();
		return call.doOnEach(signal -> {
					Throwable throwable = signal.getThrowable();
					if (throwable != null) {
						if (DISCONNECTED_CLIENT_EXCEPTIONS.contains(throwable.getClass().getSimpleName())) {
							observationContext.setConnectionAborted(true);
						}
						observationContext.setError(throwable);
					}
					onTerminalSignal(observation, exchange);
				})
				.doOnCancel(() -> {
					observationContext.setConnectionAborted(true);
					observation.stop();
				})
				.contextWrite(context -> context.put(ObservationThreadLocalAccessor.KEY, observation));
	}

But for each received ConsumerRecord to be able to take a trace info from record headers like we do in the KafkaRecordReceiverContext.

Ok! Looks like there has been something released for MDC in Reactor recently: https://twitter.com/mgrzejszczak/status/1625592303929679899?s=46&t=yLiUFTaEZDsXWsxpciUNiw 😅

But, yeah, we still need to provide an observability propagation in this project…

Will be great if we can have this in Reactor Kafka, crossing all my fingers

Hi guys,

any more update on this?

Thanks