This project is no longer maintained for some platforms on GitHub, but you may obtain the latest maintained version on Unreal Marketplace.
EasyKafka is a Kafka/Redpanda client sub-system for unreal engine. It supports producing and consuming records through blueprint and C++.
- Windows x86_64
- Hololens 2 (Windows ARM64)
- Linux x86_64
- Linux ARM64
Link the plugin modules to your project through <YourModule>.build.cs
:
CppStandard = CppStandardVersion.Cpp17;//avoid using boost
if(Target.Platform == UnrealTargetPlatform.HoloLens || Target.Platform == UnrealTargetPlatform.Win64)
bUseRTTI = true;
PrivateDependencyModuleNames.AddRange( new string[]
{
"EasyKafka",
"KafkaLib",
"KafkaConsumer",
"KafkaProducer",
"KafkaAdmin"
});
PAY ATTENTION TO THE BLOCKING METHODS.
Create Consumer with default configuration:
#include "EasyKafkaSubsystem.h"
TSharedPtr<FEasyKafkaModule> EasyKafka = GEngine->GetEngineSubsystem<UEasyKafkaSubsystem>()->GetEasyKafka();
EasyKafka->GetConsumer()->CreateConsumer(`<BOOTSTRAP_SERVERS_COMMA_SEPARATED>`, `<USERNAME>`, `<TOKEN/PASSWORD>`, (int)EKafkaLogLevel::ERR);
Create Consumer with configuration:
#include "EasyKafkaSubsystem.h"
TSharedPtr<FEasyKafkaModule> EasyKafka = GEngine->GetEngineSubsystem<UEasyKafkaSubsystem>()->GetEasyKafka();
TMap<EKafkaConsumerConfig, FString> KafkaConfiguration =
{
{EKafkaConsumerConfig::CLIENT_ID,"34235"},
{EKafkaConsumerConfig::SOCKET_TIMEOUT_MS,"10000"}
};
EasyKafka->GetConsumer()->CreateConsumer(`<BOOTSTRAP_SERVERS_COMMA_SEPARATED>`, `<USERNAME>`, `<TOKEN/PASSWORD>`, KafkaConfiguration, (int)EKafkaLogLevel::ERR);
Consume messages:
EasyKafka->GetConsumer()->OnNewMessage().AddLambda([](const TArray<FConsumerRecord>& Messages)
{
for (FConsumerRecord Message : Messages)
{
UE_LOG(LogTemp, Display, TEXT("New Message %s \n"), *Message.Value);//process messages
}
});
EasyKafka->GetConsumer()->Subscribe(
{
"topic",
"topic1",
"topic2"
});
EasyKafka->GetConsumer()->StartConsuming();
ATTENTION: MAKE SURE TO COMMIT FROM THE CONSUMER RUNNABLE THREAD BEFORE PROCESSING RECORDS IF YOU DISABLED AUTOCOMMIT.
PAY ATTENTION TO THE BLOCKING METHODS.
Create Producer with default configuration:
#include "EasyKafkaSubsystem.h"
TSharedPtr<FEasyKafkaModule> EasyKafka = GEngine->GetEngineSubsystem<UEasyKafkaSubsystem>()->GetEasyKafka();
EasyKafka->GetProducer()->CreateProducer(`<BOOTSTRAP_SERVERS_COMMA_SEPARATED>`, `<USERNAME>`, `<TOKEN/PASSWORD>`, (int)EKafkaLogLevel::ERR);
Create Producer with configuration:
#include "EasyKafkaSubsystem.h"
TSharedPtr<FEasyKafkaModule> EasyKafka = GEngine->GetEngineSubsystem<UEasyKafkaSubsystem>()->GetEasyKafka();
TMap<EKafkaProducerConfig, FString> KafkaConfiguration =
{
{EKafkaProducerConfig::MESSAGE_TIMEOUT_MS,"5000"},
{EKafkaProducerConfig::REQUEST_TIMEOUT_MS,"5000"}
};
EasyKafka->GetProducer()->CreateProducer(`<BOOTSTRAP_SERVERS_COMMA_SEPARATED>`, `<USERNAME>`, `<TOKEN/PASSWORD>`, KafkaConfiguration, (int)EKafkaLogLevel::ERR);
on record produced/failed to produce callback
EasyKafka->GetProducer()->OnProduce().AddLambda([](const FProducerCallback& Callback)
{
if (Callback.bError)
{
UE_LOG(LogTemp, Error, TEXT("Error producing recordId: %d \nError Message: %s\n"), Callback.RecordMetadata.RecordId, *Callback.ErrorMessage);
}
else
{
UE_LOG(LogTemp, Display, TEXT("RecordId: %d produced.\n"), Callback.RecordMetadata.RecordId);
}
});
produce record async
EasyKafka->GetProducer()->ProduceRecord('<TOPIC>', '<"RECORD_VALUE>');
/*
More control over your record
Such as headers,Id...
*/
FProducerRecord record;
record.Key = "key";
record.Topic = "topic";
record.Value = "value";
record.Id = 2312;//Unique id to identify this record OnProduce callback;
record.Headers = FRecordHeader(
{
{"KeyOne","ValueOne"},
{"KeyTwo","ValueTwo"}
});
EasyKafka->GetProducer()->ProduceRecord(record);
ALL THE METHODS ARE BLOCKING, ASYNC TO BE ADDED.
Create Admin with default configuration:
#include "EasyKafkaSubsystem.h"
TSharedPtr<FEasyKafkaModule> EasyKafka = GEngine->GetEngineSubsystem<UEasyKafkaSubsystem>()->GetEasyKafka();
EasyKafka->GetAdmin()->CreateAdmin(`<BOOTSTRAP_SERVERS_COMMA_SEPARATED>`, `<USERNAME>`, `<TOKEN/PASSWORD>`, (int)EKafkaLogLevel::ERR);
Create Admin with configuration:
#include "EasyKafkaSubsystem.h"
TSharedPtr<FEasyKafkaModule> EasyKafka = GEngine->GetEngineSubsystem<UEasyKafkaSubsystem>()->GetEasyKafka();
TMap<EKafkaAdminConfig, FString> KafkaConfiguration =
{
{EKafkaAdminConfig::SOCKET_TIMEOUT_MS,"10000"}
};
EasyKafka->GetAdmin()->CreateAdmin(`<BOOTSTRAP_SERVERS_COMMA_SEPARATED>`, `<USERNAME>`, `<TOKEN/PASSWORD>`, KafkaConfiguration, (int)EKafkaLogLevel::ERR);
Simple Admin request example:
const TArray<FString> TopicsToDelete = { "Topic1Name", "Topic2Name" };
FAdminRequestResult Result = EasyKafka->GetAdmin()->DeleteTopics(TopicsToDelete);
if (Result.bError)
{
UE_LOG(LogTemp, Error, TEXT("Error deleting topics: %s\n"), *Result.ErrorMessage);
}
Give us a ⭐️!