/KafkaExchanger

Kafka broker message processing service generator to simplify communication in a microservices environment. Can be either statefull (on any storage) or stateless.

Primary LanguageC#MIT LicenseMIT

KafkaExchanger

Nuget Downloads Stars License

Kafka broker message processing service generator to simplify communication in a microservices environment. Can be either statefull (on any storage) or stateless.

Usage with Protobuff key/value:

syntax = "proto3";
option csharp_namespace = "protobuff";
package protobuffKeys;

message SimpleKey
{
    int32 Id = 1;
}
syntax = "proto3";
option csharp_namespace = "protobuff";
package protobuffValues;

message SimpleValue
{
    int32 Id = 1;
    Priority Priority = 2;
    string Message = 3;
}

enum Priority
{
    Priority_UNSPECIFIED = 0;
    Priority_WHITE = 1;
    Priority_YELLOW = 2;
    Priority_RED = 3;
}

Declare partial classes with attributes:

[RequestAwaiter(useLogger: false),
  Input(keyType: typeof(protobuff.SimpleKey), valueType: typeof(protobuff.SimpleValue)),//input0
  Output(keyType: typeof(protobuff.SimpleKey), valueType: typeof(protobuff.SimpleValue))//output0
]
public partial class TestProtobuffAwaiter
{

}

Pass configs to Start methods. It's all what you need to do.

using var producerPool = new ProducerPoolProtoProto(3, "localhost:9194, localhost:9294, localhost:9394");
using var awaitService = new TestProtobuffAwaiter();
awaitService.Start(configKafka, producerPool);

using var response = await awaitService.Produce(
  new protobuff.SimpleKey() { Id = 459  },
  new protobuff.SimpleValue() { Id = 459, Priority = protobuff.Priority.Unspecified, Message = "Hello world!" }
  );
//response.Input0Message is TestProtobuffAwaiter.Input0Message
//where response.Input0Message.Key is protobuff.SimpleKey
//and response.Input0Message.Value is protobuff.SimpleValue
            

You can have many input or output topics(two in the example):

[RequestAwaiter(useLogger: false),
  Input(keyType: typeof(protobuff.SimpleKey), valueType: typeof(protobuff.SimpleValue)),//input0
  Input(keyType: typeof(protobuff.SimpleKey), valueType: typeof(protobuff.SimpleValue)),//input1

  Output(keyType: typeof(protobuff.SimpleKey), valueType: typeof(protobuff.SimpleValue)),//output0
  Output(keyType: typeof(protobuff.SimpleKey), valueType: typeof(protobuff.SimpleValue))//output1
]
public partial class TestProtobuffAwaiter
{

}
using var producerPool = new ProducerPoolProtoProto(3, "localhost:9194, localhost:9294, localhost:9394");
using var awaitService = new TestProtobuffAwaiter();
awaitService.Start(configKafka, producerPool, producerPool);

using var response = await awaitService.Produce(
  //to output0
  new protobuff.SimpleKey() { Id = 459  },
  new protobuff.SimpleValue() { Id = 459, Priority = protobuff.Priority.Unspecified, Message = "Hello world!" },

  //to output1
  new protobuff.SimpleKey() { Id = 123  },
  new protobuff.SimpleValue() { Id = 123, Priority = protobuff.Priority.Unspecified, Message = "Hello world! 2" }
  );

//response.Input0Message is TestProtobuffAwaiter.Input0Message
//response.Input1Message is TestProtobuffAwaiter.Input1Message