- Topics - Channels for streaming data
- Producers - Applications that send data to topics
- Consumers - Applications that read data from topics
- Partitions - Divisions of topics for parallel processing
docker pull apache/kafka
docker pull confluentinc/cp-kafka
docker pull zookeeper
# Run zookeeper
docker run -p 2181:2181 zookeeper
# Run Kafka
docker run -p 9092:9092 -e KAFKA_ZOOKEEPER_CONNECT=<PRIVATE_IP>:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://<PRIVATE_IP>:9092 -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 confluentinc/cp-kafka
import { Kafka } from "kafkajs";
export const kafka = new Kafka({
brokers: ["192.168.56.1:9092"],
clientId: "basics",
});
async function init() {
const admin = kafka.admin();
await admin.connect();
await admin.createTopics({
topics: [
{
topic: "rider-updates",
numPartitions: 2,
},
],
});
await admin.disconnect();
}
async function init() {
const producer = kafka.producer();
await producer.connect();
await producer.send({
topic: "rider-updates",
messages: [
{
key: "rider1",
partition: 0,
value: JSON.stringify({
rider_id: 1,
location: {
lat: 18.558908,
lng: 73.791271,
},
}),
},
],
});
await producer.disconnect();
}
const group = "group-1";
async function init() {
const consumer = kafka.consumer({ groupId: group });
await consumer.connect();
await consumer.subscribe({
topics: ["rider-updates"],
fromBeginning: true,
});
await consumer.run({
eachMessage: async ({ topic, partition, message, heartbeat, pause }) => {
console.log(
`[GROUP: ${group}]:[TOPIC:${topic}]: Partition: {${partition}} Rider Update Received: ${message.value.toString()}`
);
},
});
// await consumer.disconnect();
}