/kotlin-coworker

Zeebe Worker with Kotlin coroutines

Primary LanguageKotlinApache License 2.0Apache-2.0

Community badge: Incubating Community extension badge Compatible with: Camunda Platform 8

Kotlin-coworker

This project aims to provide a neat Kotlin Coroutines API to Zeebe Gateway. Right now there is just a worker coroutine API.

Motivation

I decided to create it in trying to gain all performance that I can gain using Kotlin Coroutines stack. So, if you replace blocking calls with Coroutines suspension you can take more jobs in parallel, instead of one (in Zeebe Client Java default settings).

You can see the performance comparison test for yourself, but in my machine, the numbers are next:

For Zeebe Client it took 41.186149961s to process 40
...
For Coworker it took 1.522231647s to process 40
So, Zeebe Client Java duration / Coworker duration = 27.056427346106805

So, the same worker with delay, but in the reactive stack takes in 27 times less time to complete process instances.

Usage

  • Add the dependency
<dependency>
    <groupId>org.camunda.community.extension.kotlin.coworker</groupId>
    <artifactId>coworker-core</artifactId>
    <version>x.y.z</version>
</dependency>
  • Obtain a ZeebeClient instance, for example, ZeebeClient.newClient()
  • Use the extension function to obtain a Cozeebe instance zeebeClient.toCozeebe()
  • Create a new Coworker instance:
val coworker = cozeebe.newCoWorker(jobType, object : JobHandler {
    override suspend fun handle(client: JobClient, job: ActivatedJob) {
        val variables = job.variablesAsMap
        val aVar = variables["a"] as Int
        val bVar = variables["b"] as Int
        variables["c"] = aVar + bVar

        client.newCompleteCommand(job).variables(variables).send().await()
    }
})
  • Open it, like Zeebe's Java Worker: coworker.open()

Features

  • Coroutine native implementation (you can use suspend functions inside JobHandler methods)
  • Easily combine with existing Zeebe Client Java libs.
  • Because of using coroutines Coworker could activate more jobs containing blocking logic (Database queries, HTTP REST calls, etc.) if they adopted coroutines (a.k.a non-blocking API) than a classic Zeebe Java worker. You can see results for yourself in the benchmark module.
  • Spring Boot Starter

Spring Boot Starter

It requires:

  • Spring Boot 2.7.+ (should work with Spring Boot 3.0.x but haven't tested properly).
  • JDK 11

First, you need to add dependency:

<dependency>
    <groupId>org.camunda.community.extension.kotlin.coworker</groupId>
    <artifactId>coworker-spring-boot-starter</artifactId>
    <version>x.y.z</version>
</dependency>

Then, if you need to define Zeebe Worker with coroutines, like this:

@Coworker(type = "test")
suspend fun testWorker(jobClient: JobClient, job: ActivatedJob) {
  someService.callSomeSuspendMethod(job.variables)
  jobClient.newCompleteCommand(activatedJob.key).send().await()
}

Note:

  1. Method should be suspend
  2. Method should be annotated with @Coworker
  3. Method should not call thread-blocking functions. Use Kotlin's .await() instead of .join() in the example upward.
  4. It hasn't had all the features from Spring Zeebe, but it seems that some features will be ported eventually. Create an issue or PR with the feature that you need :)

Override coroutine context for each coworker execution

Sometimes you need to provide some data in a coroutine context (an MDC map, for example) based on the job. To do so, you have to override additionalCoroutineContextProvider from JobCoworkerBuilder. Something, like this:

client.toCozeebe().newCoWorker(jobType) { client, job ->
            // your worker logic
            client.newCompleteCommand(job).send().await()
        }
            // override additionalCoroutineContextProvider
            .also { it.additionalCoroutineContextProvider = JobCoroutineContextProvider { testCoroutineContext } }
            // open worker
            .open().use {
                // logic to keep the worker running
            }

If you are using the Spring Boot Starter, you need just to create a bean with the type JobCoroutineContextProvider in your Spring context. Like this:

    @Bean
    fun loggingJobCoroutineContextProvider(): JobCoroutineContextProvider {
        return JobCoroutineContextProvider {
          MDCContext()
        }
    }

Custom error handling

Sometimes, you want to override the default error handling mechanism. To do so, you need to customize your worker like this:

        client.toCozeebe().newCoWorker(jobType) { job: ActivatedJob, jobClient: JobClient ->
            // worker's logic
        }
            .also {
                // override job error handler
                it.jobErrorHandler = JobErrorHandler { e, activatedJob, jobClient ->
                    if (e is IgnorableException) {
                        jobClient.newCompleteCommand(activatedJob).variables(mapOf("ignored" to true)).send().await()
                    } else {
                        jobClient.newFailCommand(activatedJob).retries(activatedJob.retries - 1).send().await()
                    }
                }
            }

Error handling in Spring Boot

If you are using the Spring Boot Starter, you need to define a JobErrorHandler bean in your context:

        @Bean
        open fun customErrorHandler(): JobErrorHandler {
            val defaultErrorHandler = DefaultSpringZeebeErrorHandler()
            return JobErrorHandler { e, activatedJob, jobClient ->
                logger.error(e) { "Got error: ${e.message}, on job: $activatedJob" }
                defaultErrorHandler.handleError(e, activatedJob, jobClient)
            }
        }

Warning: It is highly recommend to use the DefaultSpringZeebeErrorHandler wrapper to wrap your error handling logic. More info in: #54

Override annotation parameters via configuration properties

This works basically the same as in the Spring Zeebe project. So, you can override values in the @Coworker annotation with type foo like this:

zeebe.client.worker.override.foo.enabled=false

Note: you can't use the SpEL and properties placeholders in this value. You should return the same type in the @Coworker annotation. The exception is Duration. You should return Long values in milliseconds.

Annotation parameters

If you want to redefine org.camunda.community.extension.coworker.spring.annotation.Coworker parameters, you should use SPeL to define annotation values. See the property's JavaDoc for the type that should be resolved. Also, you may use property placeholders (${}) in the annotation parameters to replace them with configuration properties if needed.

As an example you may refer to the test.

Metrics

If you want to observe your coworkers, there is a port of some metrics from the Spring Zeebe project in the Coworker's Spring Boot Starter:

  1. camunda.job.invocations
    1. It supports the following tags:
      1. action - what happens to a job
        1. activated - The job was activated and started to process an item
        2. failed - The processing failed with some exception
      2. type - job's type

Missing Features

  • Coroutines native JobClient