/zeebe-client-csharp-accelerator

C# Zeebe Job Workers made easy - boostrapped via a .NET HostedService and added to DI

Primary LanguageC#Apache License 2.0Apache-2.0

BUILD ANALYZE Compatible with: Camunda Platform 8

Bootstrap Accelerator for the C# Zeebe client

This project is an extension of the C# Zeebe client project. Zeebe Workers are automatically recognized and bootstrapped via a .Net HostedService.

Read the Zeebe documentation for more information about the Zeebe project.

The basic idea and implementation for this came from https://github.com/camunda-community-hub/zeebe-client-csharp-bootstrap. We loved the idea, but had in some parts our own preferences for defaults, behaviour and separation of concerns. So this is our version of a good Bootstrap Extension for the C# Zeebe Client. Credits for the base work still belong to https://github.com/arjangeertsema.

Requirements

How to use

The Zeebe C# client bootstrap extension is available via nuget (https://www.nuget.org/packages/zb-client-accelerator/).

Recommendation: a complete sample project using this extension can be found in examples.

Quick start

All classes which implement IZeebeWorker, IAsyncZeebeWorker, IZeebeWorkerWithResult or IAsyncZeebeWorkerWithResult are automatically added to the service collection and autowired to Zeebe when you register this bootstrap project with the IServiceCollection.BootstrapZeebe() extension method.

More power is provided by using global::Zeebe.Client.Accelerator.Extensions; which provides you with further extensions for IHost, IZeebeClient etc. in order to deploy processes or create one time message receivers.

Bootstrap Zeebe

The BootstrapZeebe method has two parameters:

  1. ZeebeBootstrapOptions via configuration, action delegate or both.
  2. An array with assemblies which will be scanned for job handlers.
ConfigureServices((hostContext, services) => {
    services.BootstrapZeebe(
        hostContext.Configuration.GetSection("ZeebeConfiguration"),
        this.GetType().Assembly
    );
})

Example Web Application:

// Start building my WebApplication
var builder = WebApplication.CreateBuilder(args);

// Bootstrap Zeebe Integration
builder.Services.BootstrapZeebe(
    builder.Configuration.GetSection("ZeebeConfiguration"),
    typeof(Program).Assembly);

The configuration will e.g. look as follows:

{
  "ZeebeConfiguration": {
    "Client": {
      "GatewayAddress": "127.0.0.1:26500"
    },
    "Worker": {
      "MaxJobsActive": 5,
      "TimeoutInMilliseconds": 500,
      "PollIntervalInMilliseconds": 50,
      "PollingTimeoutInMilliseconds": 1000,
      "RetryTimeoutInMilliseconds": 1000
    }
  },
}

Deploy Processes

If we want to deploy some processes right before the final startup of our application we create a deployment using the extension for IHost or IServiceProvider as follows:

var app = builder.Build();
...
// Deploy all process resources
app.CreateZeebeDeployment()
    .UsingDirectory("Resources")
    .AddResource("insurance_application.bpmn")
    .AddResource("document_request.bpmn")
    .AddResource("risk_check.dmn")
    .Deploy();

// Now run the application
app.Run();

Zeebe Workers

A Zeebe Worker is an implementation of IZeebeWorker, IAsyncZeebeWorker, IZeebeWorkerWithResult or IAsyncZeebeWorkerWithResult. Zeebe Workers are automatically added to the DI container, therefore you can use dependency injection inside. The default worker configuration can be overwritten with AbstractWorkerAttribute implementations, see attributes for more information.

[JobType("doSomeWork")]
public class SomeWorker : IAsyncZeebeWorker
{
    private readonly MyApiService _myApiService;

    public SimpleJobHandler(MyApiService myApiService)
    {
        _myApiService = myApiService;
    }

    /// <summary>
    /// Handles the job "doSomeWork".
    /// </summary>
    /// <param name="job">the Zeebe job</param>
    /// <param name="cancellationToken">cancellation token</param>
    public async Task HandleJob(ZeebeJob job, CancellationToken cancellationToken)
    {  
        // execute business service etc.
        await _myApiService.DoSomethingAsync(cancellationToken);
    }
}

Of course you are able to access process variables and return a result. E.g.:

[JobType("doAwesomeWork")]
public class AwesomeWorker : IAsyncZeebeWorker<SimpleJobPayload, SimpleResponse>
{
    ...

    public async Task<SimpleResponse> HandleJob(ZeebeJob<SimpleJobPayload> job, CancellationToken cancellationToken)
    {  
        // get variables as declared (SimpleJobPayload)
        var variables = job.getVariables();

        // execute business service etc.
        var result = await _myApiService.DoSomethingAsync(variables.CustomerNo, cancellationToken);
        return new SimpleResponse(result);
    }

    class SimpleJobPayload
    {
        public string CustomerNo { get; set; }
    }
}

The above code will fetch exactly the variables defined as attributes in SimpleJobPaylad from the process.

And there are more options, including the option to access custom headers configured in the process model:

[JobType("doComplexWork")]
public class ComplexWorker : IAsyncZeebeWorker
{
    ...

    public async Task HandleJob(ZeebeJob job, CancellationToken cancellationToken)
    {  
        // get all variables (and deserialize to a given type)
        ProcessVariables variables = job.getVariables<ProcessVariables>();
        // get custom headers (and deserialize to a given type)
        MyCustomHeaders headers = job.getCustomHeaders<MyCustomHeaders>();

        // execute business service etc.
        await _myApiService.DoSomethingComplex(variables.Customer, headers.SomeConfiguration, cancellationToken);
        ...
    }

    class ProcessVariables
    {
        public string? BusinessKey { get; set; }

        public CustomerData Customer { get; set; }

        public string? AccountName { get; set; }

        ...
    }

    class MyCustomHeaders
    {
        public string SomeConfiguration { get; set; }
    }
}

The following table gives you an overview of the available options:

Interface Description Fetched Variables
IAsyncZeebeWorker Asynchronous worker without specific input and no response Default is to fetch all process variables. Use FetchVariables attribute for restictions.
IAsyncZeebeWorker<TInput> Asynchronous worker with specific input and no response Fetches exactly the variables defined as attributes in TInput.
IAsyncZeebeWorker<TInput, TResponse> Asynchronous worker with specific input and specific response Fetches exactly the variables defined as attributes in TInput.
IAsyncZeebeWorkerWithResult<TResponse> Asynchronous worker without specific input but a specific response Default is to fetch all process variables. Use FetchVariables attribute for restrictions.
IZeebeWorker Synchronous worker without specific input and no response Default is to fetch all process variables. Use FetchVariables attribute for restictions.
IZeebeWorker<TInput> Synchronous worker with specific input and no response Fetches exactly the variables defined as attributes in TInput.
IZeebeWorker<TInput, TResponse> Synchronous worker with specific input and specific response Fetches exactly the variables defined as attributes in TInput.
IZeebeWorkerWithResult<TResponse> Synchronous worker without specific input but a specific response Default is to fetch all process variables. Use FetchVariables attribute for restrictions.

If you like to explicitely restrict the variables fetched from Zeebe, you have the following additional option:

[JobType("doComplexWork")]
[FetchVariables("businessKey", "applicantName")]
public class SimpleWorker : IAsyncZeebeWorker
{
   ...
}

In case you do not want to fetch any variables at all from Zeebe, use [FetchVariables(none: true)]:

[JobType("doSimpleWork")]
[FetchVariables(none: true)]
class SimpleWorker : IZeebeWorker
{
   ...
}

A handled job has three outcomes:

  1. The job has been handled without exceptions: this will automaticly result in a JobCompletedCommand beeing send to the broker. The optional TResponse is automaticly serialized and added to the JobCompletedCommand.
  2. A BpmnErrorException has been thrown while handling the job: this will automaticly result in a ThrowErrorCommand beeing send to the broker triggering Error Boundary Events in the process.
  3. Any other unexpected exception will automatically result in a FailCommand beeing send to the broker including message details and reducing the number of retries;

Dynamic message receiver

See Example for synchronous responses from processes for a description of the scenario.

You can create a one time job handler for receiving a message for a dynamic job type "received_" + number as follows:

try
{
    string jsonContent = _zeebeClient.ReceiveMessage("received_" + number, TimeSpan.FromSeconds(5), "someVariable1", "someVariable2");
    ...
} catch (MessageTimeoutException)
{
    // nothing received
    ...
}

Of course it is possible to use a typed response, which will automatically fetch and deserialize all variables defined as attributes in the given type:

MyVariables typedContent = _zeebeClient.ReceiveMessage<MyVariables>("received_" + number, TimeSpan.FromSeconds(3));

Simply waiting without receiving any variables:

bool messageReceived = _zeebeClient.ReceiveMessage("received_" + number, TimeSpan.FromSeconds(3));

The one time job handler will be destroyed after ReceiveMessage returns.

Hints

  1. By default the workers are added to de DI container with a Transient service lifetime. This can be overriden by adding the ServiceLifetimeAttribute to the worker, see attributes for more information.
  2. By default the ZeebeVariablesSerializer is registered as the implementation for IZeebeVariablesSerializer which uses System.Text.Json.JsonSerializer. Serialization / Deserialization always uses CamelCase as naming policy!
  3. The default job type of a worker is the class name of the worker. This can be overriden by adding the JobTypeAttribute to the worker, e.g. [JobType("myJobName")].

How to build

Run dotnet build Zeebe.Client.Accelerator.sln

How to test

Run dotnet test Zeebe.Client.Accelerator.sln