/kafka-opentelemetry

Primary LanguageJavaApache License 2.0Apache-2.0

Instrumenting Kafka clients with OpenTelemetry

In order to get tracing information out of your application using Kafka clients, there are two ways to do so:

  • instrumenting your application by enabling the tracing on the Kafka clients;
  • using an external agent running alongside your application to add tracing;

Instrumenting the Kafka clients based application

Instrumenting the application means enabling the tracing in the Kafka clients. First, you need to add the dependency to the Kafka clients instrumentation library.

<dependency>
    <groupId>io.opentelemetry.instrumentation</groupId>
    <artifactId>opentelemetry-kafka-clients-2.6</artifactId>
</dependency>

Also, depending on the exporter that you want to use for exporting tracing information, you have to add the corresponding dependency as well. For example, in order to use the Jaeger exporter, the dependency is the following one.

<dependency>
    <groupId>io.opentelemetry</groupId>
    <artifactId>opentelemetry-exporter-jaeger</artifactId>
</dependency>

Setting up the OpenTelemetry instance

In order to enable tracing on the Kafka clients, it is needed to create and register an OpenTelemetry instance globally. This can be done in two different ways:

  • using the SDK extension for environment-based autoconfiguration;
  • using SDK builders for programmatic configuration;

SDK extension: autoconfiguration

It is possible to configure a global OpenTelemetry instance by using environment variables thanks to the SDK extension for autoconfiguration, enabled with the following dependency.

<dependency>
    <groupId>io.opentelemetry</groupId>
    <artifactId>opentelemetry-sdk-extension-autoconfigure</artifactId>
</dependency>

The main environment variables to be set are the following:

  • OTEL_SERVICE_NAME: specify the logical service name;
  • OTEL_TRACES_EXPORTER: the list of exporters to be used for tracing. For example, by using jaeger you also need to have the corresponding dependency in the application;
  • OTEL_METRICS_EXPORTER: the list of exporters to be used for metrics. If you don't use metrics then it has to be set to none;

Instead of using the above environment variables, it is also possible to use corresponding system properties to be set programmatically or on the command line. They are otel.service.name, otel.traces.exporter and otel.metrics.exporter.

SDK builders: programmatic configuration

In order to build your own OpenTelemetry instance and not relying on autoconfiguration, it is possible to do so by using the SDK builders programmatically. The OpenTelemetry SDK dependency is needed in order to have such SDK builders classes available.

<dependency>
    <groupId>io.opentelemetry</groupId>
    <artifactId>opentelemetry-sdk</artifactId>
</dependency>

The following code snippet sets the main attributes like the service name,then it configures the Jaeger exporter. Finally, it creates the OpenTelemetry instance and registers it globally so that it can be used by the Kafka clients.

Resource resource = Resource.getDefault()
        .merge(Resource.create(Attributes.of(ResourceAttributes.SERVICE_NAME, "my-kafka-service")));

SdkTracerProvider sdkTracerProvider = SdkTracerProvider.builder()
        .addSpanProcessor(BatchSpanProcessor.builder(JaegerGrpcSpanExporter.builder().build()).build())
        .setSampler(Sampler.alwaysOn())
        .setResource(resource)
        .build();

OpenTelemetry openTelemetry = OpenTelemetrySdk.builder()
        .setTracerProvider(sdkTracerProvider)
        .setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance()))
        .buildAndRegisterGlobal();

Using interceptors

The Kafka clients API provides a way to "intercept" messages before they are sent to the brokers as well as messages received from the broker before being passed to the application. The OpenTelemetry instrumented Kafka library provides two interceptors to be configured to add tracing information automatically. The interceptor class has to be set in the properties bag used to create the Kafka client.

Use the TracingProducerInterceptor for the producer in order to create a "send" span automatically, each time a message is sent.

props.setProperty(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName());

Use the TracingConsumerInterceptor for the consumer in order to create a "receive" span automatically, each time a message is received.

props.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName());

For a Streams API based application, you have to set the interceptors for the underlying producer and consumer.

props.put(StreamsConfig.CONSUMER_PREFIX + ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName());
props.put(StreamsConfig.PRODUCER_PREFIX + ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName());

Wrapping clients

The other way is by wrapping the Kafka client with a tracing enabled Kafka client.

Assuming you have a Producer<K, V> producer instance, you can wrap it in the following way.

KafkaTelemetry telemetry = KafkaTelemetry.create(GlobalOpenTelemetry.get());
Producer<String, String> tracingProducer = telemetry.wrap(producer);

Then use the tracingProducer as usual for sending messages to the Kafka cluster.

Assuming you have a Consumer<K, V> consumer instance, you can wrap it in the following way.

KafkaTelemetry telemetry = KafkaTelemetry.create(GlobalOpenTelemetry.get());
Consumer<String, String> tracingConsumer = telemetry.wrap(this.consumer);

Then use the tracingConsumer as usual for receiving messages from the Kafka cluster.

For a Streams API based application, you have to wrap the underlying producer and consumer. This can be done by implementing the KafkaClientSupplier interface which returns the instances of producer and consumer used by the Streams API. Or you can leverage the Kafka provided default implementation DefaultKafkaClientSupplier, to avoid code duplication, and wrapping producer and consumer to add the telemetry logic.

private static class TracingKafkaClientSupplier extends DefaultKafkaClientSupplier {
    @Override
    public Producer<byte[], byte[]> getProducer(Map<String, Object> config) {
        KafkaTelemetry telemetry = KafkaTelemetry.create(GlobalOpenTelemetry.get());
        return telemetry.wrap(super.getProducer(config));
    }

    @Override
    public Consumer<byte[], byte[]> getConsumer(Map<String, Object> config) {
        KafkaTelemetry telemetry = KafkaTelemetry.create(GlobalOpenTelemetry.get());
        return telemetry.wrap(super.getConsumer(config));
    }

    @Override
    public Consumer<byte[], byte[]> getRestoreConsumer(Map<String, Object> config) {
        return this.getConsumer(config);
    }

    @Override
    public Consumer<byte[], byte[]> getGlobalConsumer(Map<String, Object> config) {
        return this.getConsumer(config);
    }
}

Using agent

Another way is by adding tracing to your application with no changes or additions into your application code. You also don't need to add any dependencies to OpenTelemetry specific libraries. It is possible by using the OpenTelemetry agent you can download from here. This agent has to run alongside your application in order to inject the logic for tracing messages sent and received to/from a Kafka cluster.

Run the producer application in the following way.

java -javaagent:path/to/opentelemetry-javaagent.jar \
      -Dotel.service.name=my-kafka-service \
      -Dotel.traces.exporter=jaeger \
      -Dotel.metrics.exporter=none \
      -jar kafka-producer-agent/target/kafka-producer-agent-1.0-SNAPSHOT-jar-with-dependencies.jar

Run the consumer application similarly.

java -javaagent:path/to/opentelemetry-javaagent.jar \
      -Dotel.service.name=my-kafka-service \
      -Dotel.traces.exporter=jaeger \
      -Dotel.metrics.exporter=none \
      -Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true \
      -jar kafka-consumer-agent/target/kafka-consumer-agent-1.0-SNAPSHOT-jar-with-dependencies.jar

As usual, the main three system properties are set to specify the logical service name, the exporter to be used (i.e. jaeger) and disable the metrics exporter.

The same can be used for running the Streams API based application.