Ballerina Kafka Connector is used to connect Ballerina with Kafka Brokers. With this Kafka Connector, Ballerina can act as Kafka Consumers and Kafka Producers.
Ballerina Version | Kafka Version |
---|---|
0.970.0 | 1.0.0 |
1.1.0 |
Steps to configure,
- Extract
ballerina-kafka-connector-<version>.zip
and copy containing jars in to<BRE_HOME>/bre/lib/
`
Following is a simple service (kafkaService) which is subscribed to topic 'test-topic' on remote Kafka broker cluster. In this example, offsets are manually committed inside the resource
by setting property autoCommit = false
at service level annotation.
import wso2/kafka;
import ballerina/io;
endpoint kafka:SimpleConsumer consumer {
bootstrapServers: "localhost:9092",
groupId: "group-id",
topics: ["test-kafka-topic"],
pollingInterval: 1000
};
service<kafka:Consumer> kafkaService bind consumer {
onMessage (kafka:ConsumerAction consumerAction, kafka:ConsumerRecord[] records) {
foreach record in records {
blob serializedMsg = record.value;
string msg = serializedMsg.toString("UTF-8");
io:println("Topic: " + record.topic + " Received Message: " + msg);
}
}
}
Following example demonstrates a way to publish a message to a specified topic. A Kafka record is created from serialized string, and then it is published to topic 'test-topic' partition '1' in remote Kafka broker cluster.
import wso2/kafka;
endpoint kafka:SimpleProducer producer {
bootstrapServers: "localhost:9092, localhost:9093",
clientID:"basic-producer",
acks:"all",
noRetries:3
};
function main (string... args) {
string msg = "Hello World, Ballerina";
blob serializedMsg = msg.toBlob("UTF-8");
kafkaProducer->send(serializedMsg, "test-topic");
}
For more Kafka Connector Ballerina configurations please refer to the samples directory.