/package-kafka

Primary LanguageJavaApache License 2.0Apache-2.0

Ballerina Kafka Connector

Ballerina Kafka Connector is used to connect Ballerina with Kafka Brokers. With this Kafka Connector, Ballerina can act as Kafka Consumers and Kafka Producers.

Compatibility

Ballerina Version Kafka Version
0.970.0 1.0.0
1.1.0

Steps to configure,

  1. Extract ballerina-kafka-connector-<version>.zip and copy containing jars in to <BRE_HOME>/bre/lib/ `

Ballerina as a Kafka Consumer

Following is a simple service (kafkaService) which is subscribed to topic 'test-topic' on remote Kafka broker cluster. In this example, offsets are manually committed inside the resource by setting property autoCommit = false at service level annotation.

import wso2/kafka;
import ballerina/io;

endpoint kafka:SimpleConsumer consumer {
    bootstrapServers: "localhost:9092",
    groupId: "group-id",
    topics: ["test-kafka-topic"],
    pollingInterval: 1000
};

service<kafka:Consumer> kafkaService bind consumer {
    onMessage (kafka:ConsumerAction consumerAction, kafka:ConsumerRecord[] records) {
        foreach record in records {
            blob serializedMsg = record.value;
             string msg = serializedMsg.toString("UTF-8");
             io:println("Topic: " + record.topic + " Received Message: " + msg);
        }
    }
}

Ballerina as a Kafka Producer

Following example demonstrates a way to publish a message to a specified topic. A Kafka record is created from serialized string, and then it is published to topic 'test-topic' partition '1' in remote Kafka broker cluster.

import wso2/kafka;

endpoint kafka:SimpleProducer producer {
    bootstrapServers: "localhost:9092, localhost:9093",
    clientID:"basic-producer",
    acks:"all",
    noRetries:3
};

function main (string... args) {
    string msg = "Hello World, Ballerina";
    blob serializedMsg = msg.toBlob("UTF-8");
    kafkaProducer->send(serializedMsg, "test-topic");

}

For more Kafka Connector Ballerina configurations please refer to the samples directory.