/camunda-zeebe-kafka-exporter

Fork of project to export events from Zeebe to Kafka

Primary LanguageJavaApache License 2.0Apache-2.0

Zeebe Kafka Exporter

Java CI Compatible with: Camunda Platform 8

An easy to use exporter which will export Zeebe records to a configured Kafka topic.

For more information about the exporters please read the Exporter documentation.

Supported Zeebe versions

Version 1.x and 2.x is compatible with the following Zeebe versions:

  • 0.23.x
  • 0.24.x
  • 0.25.x
  • 0.26.x

Version 3.x is compatible with the following Zeebe versions:

  • 1.0

Backwards compatibility

As there is currently only a single maintainer, only the latest major version will be maintained and supported.

At the moment, the only guarantees for backwards compatibility are:

  • the exporter's configuration
  • the serde module

Quick start

The quickest way to get started is:

  1. Download the latest release (zeebe-kafka-exporter-*-jar-with-dependencies.jar).
  2. Copy it somewhere accessible the Zeebe broker
  3. Add the following to your Zeebe broker's application.yaml
zeebe:
  broker:
    exporters:
      kafka:
        className: io.zeebe.exporters.kafka.KafkaExporter
        jarPath: /path/to/zeebe-kafka-exporter-3.0.0-jar-with-dependencies.jar
        args:
          producer:
            # update this to point to your Kafka brokers
            servers: "kafka-1:9092,kafka-2:9092"
  1. Update the configuration's list of servers to point to your Kafka instances.

The next time you start your Zeebe cluster, all event-type records will be exported to their respective Kafka topics. See exporter/exporter.yml for a fully commented configuration file example.

NOTE: pre Zeebe 1.0.1, there was a bug in Zeebe which prevented this exporter from being loaded in an isolated way. See this issue for more. If that's the case for you, then you must place this exporter in the lib/ folder of your Zeebe broker directory - by default that would be /usr/local/zeebe/lib. It will then be automatically available on the classpath, and you can omit the jarPath configuration option. This is not recommended however, and if you're on Zeebe 1.0.1 or greater, you should use the method described above.

Usage

The exporter is set up to stream records from Zeebe to Kafka as they are processed by the exporter stream processor. While this is done asynchronously, it makes use of Kafka transactions to minimize issues with out-of-order processing. As such, your consumers should use read_committed isolation level.

Records are serialized to Kafka using plain JSON. Keys are JSON representation of io.zeebe.exporters.kafka.serde.RecordId, and values are serialized using the standard Zeebe io.zeebe.protocol.record.Record#toJson() method. The io.zeebe.exporters:zeebe-kafka-exporter-serde module provides easy to use Deserializer implementations in Java for use in your consumers.

The configuration file is a good starting point to learn more about how the exporter works.

Kafka configuration

You may need to configure your Kafka instance(s) for the exporter. It's recommended that you provision the expected topics beforehand, so you can configure them properly beforehand. You can read more below about partitioning.

Transactions

NOTE: I'm still planning on adding an alternative method that doesn't require transactions, as we can't make use of exactly-once semantics anyway due to Zeebe's at-least once semantics. However, it needs to be fault tolerant as well, so it may take a bit more time.

Additionally, the exporter makes use of transactions, and it's recommended that you configure transactions accordingly for your brokers. You can find a description of the relevant settings in the official Kafka documentation. The important settings are:

  • transaction.state.log.min.isr: the minimum number of in-sync replicas for the transaction topic. By default it's 2, but if you're running a single node cluster (i.e. for demo or testing purposes, make sure to lower it to 1).
  • transaction.state.log.replication.factor: by default 3, such that transactions can handle one failure. Again, if running a single node for demo/testing purposes, lower this to 1.
  • transaction.state.log.segment.bytes: you can leave it as default, but it can be even smaller to more aggressively compact, considering that the exporter flushes fairly often.
  • transaction.id.expiration.ms: configure this with respect to the exporter's flush interval, and how much load your cluster will see. By default, the exporter flushes every second - however, on a low load cluster, there may not be anything to flush at times. It's recommended to set this low if you have constant load - say, one hour - but keep it to the default if your load is irregular.
  • transaction.max.timeout.ms: configure this with respect to the exporter's flush interval, and how much load your cluster will see. By default, the exporter flushes every second - however, on a low load cluster, there may not be anything to flush at times. It's recommended to set this low if you have constant load - say, one hour - but keep it to the default if your load is irregular.

Partitioning

As ordering in Zeebe is critical to understanding the flow of events, it's important that it be preserved in Kafka as well. To achieve this, the exporter implements its own Partitioner.

It does so by taking the Zeebe partition ID (which starts at 1), and applying a modulo against the number of Kafka partitions for the given topic, e.g. zeebePartitionId % kafkaPartitionsCount.

One downside is that if you have more Kafka partitions than Zeebe partitions, some of your partitions will be unused: partition 0, and any partition whose number is greater than the count of Zeebe partitions. As such, it's completely useless to add more Kafka partitions than Zeebe partitions in most cases.

For example, if you have 3 Zeebe partitions, and 2 Kafka partitions:

  • RecordId{partitionId=1, position=1} => Kafka partition 1
  • RecordId{partitionId=2, position=1} => Kafka partition 0
  • RecordId{partitionId=3, position=1} => Kafka partition 1
  • RecordId{partitionId=3, position=2} => Kafka partition 1
  • RecordId{partitionId=2, position=2} => Kafka partition 0

With more Kafka partitions, for example, 4 Kafka partitions, and 3 Zeebe partitions:

  • RecordId{partitionId=1, position=1} => Kafka partition 1
  • RecordId{partitionId=2, position=1} => Kafka partition 2
  • RecordId{partitionId=3, position=1} => Kafka partition 3
  • RecordId{partitionId=3, position=2} => Kafka partition 3
  • RecordId{partitionId=2, position=2} => Kafka partition 2

Advanced configuration

You can configure the producer for more advanced use cases by using the zeebe.broker.exporters.kafka.args.producer.config configuration property, which lets you arbitrarily configure your Kafka producer the same way you normally would. This property is parsed as a standard Java properties file. For example, say you wanted to connect to a secured Kafka instance, you could define the producer config as:

config: |
  security.protocol=SSL
  ssl.truststore.location=/truststore.jks
  ssl.truststore.password=test1234

You can also pass this configuration via an environment variable. If you exporter ID is kafka, for example, you could set the following environment variable:

export ZEEBE_BROKER_EXPORTERS_KAFKA_ARGS_PRODUCER_CONFIG="security.protocol=SSL\nssl.truststore.location=/truststore.jks\nssl.truststore.password=test1234"

Examples

The zeebe-kafka-exporter-qa module shows how to start a Docker container, inject the exporter, configure it, and consume the exported records.

For a more normal deployment, you can look at the docker-compose.yml file, which will start a Zeebe broker with the exporter configured via exporter.yml, a Zookeeper node, a Kafka node, and a consumer node which simply prints out everything send to Kafka on any topic starting with zeebe.

Consuming Zeebe records

As mentioned, Zeebe records are serialized using JSON. The key is the JSON representation of the Java class RecordId, and the value is serialized using the Zeebe io.zeebe.protocol.record.Record#toJson() method.

If you want to consume records via the Java client, you can make use of the deserializers provided by the io.zeebe.exporters:zeebe-kafka-exporter-serde module:

An example of a consumer reading from all zeebe-* prefixed topics:

package com.acme;

import io.camunda.zeebe.protocol.record.Record;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class MyClass {

  private static final Logger LOGGER = LoggerFactory.getLogger(MyClass.class);
  private static final Pattern SUBSCRIPTION_PATTERN = Pattern.compile("^zeebe-.*$");

  public static void main(final String[] args) {
    final Map<String, Object> config = new HashMap<>();
    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29092");
    config.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
    config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
    config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, Integer.MAX_VALUE);
    config.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, 5_000);
    config.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");

    final Consumer<RecordId, Record<?>> consumer =
      new KafkaConsumer<>(config, new RecordIdDeserializer(), new RecordDeserializer());
    consumer.subscribe(SUBSCRIPTION_PATTERN);

    while (true) {
      final ConsumerRecords<RecordId, Record<?>> consumed = consumer.poll(Duration.ofSeconds(1));
      for (final ConsumerRecord<RecordId, Record<?>> record : consumed) {
        LOGGER.info(
          "================[{}] {}-{} ================",
          record.topic(),
          record.key().getPartitionId(),
          record.key().getPosition());
        LOGGER.info("{}", record.value().getValue());
      }
    }
  }
}

Docker

The docker-compose.yml found in the root of the project is a good example of how you can deploy Zeebe, Kafka, and connect them via the exporter.

To run it, first build the correct exporter artifact which docker-compose can find. From the root of the project, run:

mvn install -DskipTests -Dexporter.finalName=zeebe-kafka-exporter

It's important here to note that we set the artifact's final name - this allows us to use a fixed name in the docker-compose.yml in order to mount the file to the Zeebe container.

Then you start the services - they can be started in parallel with no worries.

docker-compose up -d

If you wish to stop these containers, remember that some of them create volumes, so unless you plan on reusing those make sure to bring everything down using docker-compose down -v.

The services started are the following:

  • zeebe: with the gateway port (26500) opened
  • kafka: with the standard 9092 port opened for internal communication, and port 29092 for external
  • consumer: a simple kafkacat image which will print out every record published on any topic starting with zeebe
  • zookeeper: required to start Kafka

Once everything is up and running, use your Zeebe cluster as you normally would. For example, given a workflow at ~/workflow.bpmn, you could deploy it as:

zbctl --insecure deploy ~/workflow.bpmn

After this, you can see the messages being consumed by the consumer running:

docker logs -f consumer

You may see some initial error logs from the consumer - this happens while the Kafka broker isn't fully up, but it should stop once kafkacat can connect to it.

The first time a record of a certain kind (e.g. deployment, job, workflow, etc.) is published, it will create a new topic for it. The consumer is refreshing the list of topics every second, which means that for that first message there may be a bit of delay.

Reference

The exporter uses a Kafka producer to push records out to different topics based on the incoming record value type (e.g. deployment, etc.).

The producer makes use of transactions to simplify

The producer is configured to be an idempotent producer which will retry a record "forever"; there is a delivery timeout configured, but the timeout is set to ~25 days, which for most use cases should be enough to fix any recoverable errors. In the case of unrecoverable errors, unfortunately a restart is pretty much the only solution at the moment, although community contributions are very welcome to fix this.

The main reason records are retried forever is that Zeebe processes records sequentially, and to ensure we've exported a record, we can't update Zeebe and say record 2 has been exported if we can't guarantee that previous records have also been exported (or in Kafka terms, acknowledged).

To take advantage of the asynchronous API and minimize blocking operations, the exporter keeps a queue of in-flight record futures (configurable) and will export records until that queue is full; once full, it will block until the first element (i.e. the oldest sent record) has been acknowledged by Kafka, at which point it will then send the next record and resume operation.

At the same time, a background job is scheduled every second to flush the queue of any completed records. So in a best case scenario the queue always has some space and the exporter never blocks.

Configuration

A sample configuration file is included in the project under exporter.yml.

zeebe:
  broker:
    exporters:
      kafka:
        className: io.zeebe.exporters.kafka.KafkaExporter
        # Update this path to the location of the JAR
        # Note that this must be visible to the broker process
        jarPath: /path/to/zeebe-kafka-exporter-3.0.0-jar-with-dependencies.jar
        args:
          # Controls the number of records to buffer in a single record batch before forcing a flush. Note
          # that a flush may occur before anyway due to periodic flushing. This setting should help you
          # estimate a soft upper bound to the memory consumption of the exporter. If you assume a worst
          # case scenario where every record is the size of your zeebe.broker.network.maxMessageSize, then
          # the memory required by the exporter would be at least:
          #   (maxBatchSize * zeebe.broker.network.maxMessageSize * 2)
          #
          # We multiply by 2 as the records are buffered twice - once in the exporter itself, and once
          # in the producer's network buffers (but serialized at that point). There's some additional
          # memory overhead used by the producer as well for compression/encryption/etc., so you have to
          # add a bit, but that one is not proportional to the number of records and is more or less
          # constant.
          #
          # Once the batch has reached this size, a flush is automatically triggered. Too small a number
          # here would cause many flush, which is not good for performance, but would mean you will see
          # your records faster/sooner.
          #
          # Default is 100
          maxBatchSize: 100
          # The maximum time to block when the batch is full. If the batch is full, and a new
          # record comes in, the exporter will block until there is space in the batch, or until
          # maxBlockingTimeoutMs milliseconds elapse.
          maxBlockingTimeoutMs: 1000
          # How often should pending batches be flushed to the Kafka broker. Too low a value will
          # cause more load on the broker, but means your records will be visible faster.
          flushIntervalMs: 1000

          # Producer specific configuration
          producer:
            # The list of initial Kafka broker contact points. The format should be the same
            # one as the ProducerConfig expects, i.e. "host:port"
            # Maps to ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
            # For example:
            # servers: "kafka:9092,localhost:29092"
            servers: ""
            # Controls how long the producer will wait for a request to be acknowledged by
            # the Kafka broker before retrying it
            # Maps to ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG
            requestTimeoutMs: 5000
            # Grace period when shutting down the producer in milliseconds
            closeTimeoutMs: 5000
            # Producer client identifier
            clientId: zeebe

            # Any setting under the following section will be passed verbatim to
            # ProducerConfig; you can use this to configure authentication, compression,
            # etc. Note that you can overwrite some important settings, so avoid changing
            # idempotency, delivery timeout, and retries, unless you know what you're doing
            config: |
              linger.ms=5
              buffer.memory=8388608
              batch.size=32768
              max.block.ms=5000

          # Controls which records are pushed to Kafka and to which topic
          # Each entry is a sub-map which can contain two entries:
          #     type => string
          #     topic => string
          #
          # Topic is the topic to which the record with the given value type
          # should be sent to, e.g. for a deployment record below we would
          # send the record to "zeebe-deployment" topic.
          #
          # Type is a comma separated string of accepted record types, allowing you to filter if you
          # want nothing (""), commands ("command"), events ("events"), or rejections ("rejection"),
          # or a combination of the three, e.g. "command,event".
          #
          # To omit certain records entirely, set type to an empty string. For example,
          # records:
          #   deployment: { type: "" }
          records:
            # If a record value type is omitted in your configuration file,
            # it will fall back to whatever is configured in the defaults
            defaults: { type: "event", topic: zeebe }
            # For records with a value of type DEPLOYMENT
            deployment: { topic: zeebe-deployment }
            # For records with a value of type DEPLOYMENT_DISTRIBUTION
            deploymentDistribution: { topic: zeebe-deployment-distribution }
            # For records with a value of type ERROR
            error: { topic: zeebe-error }
            # For records with a value of type INCIDENT
            incident: { topic: zeebe-incident }
            # For records with a value of type JOB_BATCH
            jobBatch: { topic: zeebe-job-batch }
            # For records with a value of type JOB
            job: { topic: zeebe-job }
            # For records with a value of type MESSAGE
            message: { topic: zeebe-message }
            # For records with a value of type MESSAGE_SUBSCRIPTION
            messageSubscription: { topic: zeebe-message-subscription }
            # For records with a value of type MESSAGE_START_EVENT_SUBSCRIPTION
            messageStartEventSubscription: { topic: zeebe-message-subscription-start-event }
            # For records with a value of type PROCESS
            process: { topic: zeebe-process }
            # For records with a value of type PROCESS_EVENT
            processEvent: { topic: zeebe-process-event }
            # For records with a value of type PROCESS_INSTANCE
            processInstance: { topic: zeebe-process-instance }
            # For records with a value of type PROCESS_INSTANCE_RESULT
            processInstanceResult: { topic: zeebe-process-instance-result }
            # For records with a value of type PROCESS_MESSAGE_SUBSCRIPTION
            processMessageSubscription: { topic: zeebe-process-message-subscription }
            # For records with a value of type TIMER
            timer: { topic: zeebe-timer }
            # For records with a value of type VARIABLE
            variable: { topic: zeebe-variable }

Contributing

Contributions are more than welcome! Please make sure to read and adhere to the Code of Conduct. Additionally, in order to have your contributions accepted, you will need to sign the Contributor License Agreement.

Build from source

Prerequisites

In order to build from source, you will need to install maven 3.6+. You can find more about it on the maven homepage.

You will also need a JDK targeting Java 8+. We recommend installing any flavour of OpenJDK such as AdoptOpenJDK.

Finally, you will need to install Docker on your local machine, as integration tests rely heavily on Testcontainers.

Building

With all requirements ready, you can now simply clone the repository, and from its root, run the following command:

mvn clean install

This will build the project and run all tests locally.

Should you wish to only build without running the tests, you can run:

mvn clean package

Backwards compatibility

Zeebe Kafka Exporter uses a Semantic Versioning scheme for its versions, and revapi to enforce backwards compatibility according to its specification.

Additionally, we also use apiguardian to specify backwards compatibility guarantees on a more granular level. As such, any APIs marked as EXPERIMENTAL will not be checked.

If you wish to incubate a new feature, or if you're unsure about a new API type/method, please use the EXPERIMENTAL status for it. This will give us flexibility to test out new features and change them easily if we realize they need to be adapted.

Report issues or contact developers

Work on Zeebe Kafka Exporter is done entirely through the Github repository. If you want to report a bug or request a new feature feel free to open a new issue on [GitHub][issues].

Create a Pull Request

To work on an issue, follow the following steps:

  1. Check that a [GitHub issue][issues] exists for the task you want to work on. If one does not, create one.
  2. Checkout the main branch and pull the latest changes.
    git checkout main
    git pull
    
  3. Create a new branch with the naming scheme issueId-description.
    git checkout -b 123-my-new-feature
    
  4. Follow the Google Java Format and Zeebe Code Style while coding.
  5. Implement the required changes on your branch, and make sure to build and test your changes locally before opening a pull requests for review.
  6. If you want to make use of the CI facilities before your feature is ready for review, feel free to open a draft PR.
  7. If you think you finished the issue please prepare the branch for reviewing. In general the commits should be squashed into meaningful commits with a helpful message. This means cleanup/fix etc commits should be squashed into the related commit.
  8. Finally, be sure to check on the CI results and fix any reported errors.

Commit Message Guidelines

Commit messages use Conventional Commits format, with a slight twist. See the Zeebe commit guidelines for more .

Contributor License Agreement

You will be asked to sign our Contributor License Agreement when you open a Pull Request. We are not asking you to assign copyright to us, but to give us the right to distribute your code without restriction. We ask this of all contributors in order to assure our users of the origin and continuing existence of the code. You only need to sign the CLA once.

Note that this is a general requirement of any Camunda Community Hub project.