Moleculer service for persistent event queue with Kafka
$ npm install imicros-events --save
Requires a running Kafka broker - docker example.
const { ServiceBroker } = require("moleculer");
const { Publisher } = require("imicros-events");
let broker = new ServiceBroker({ logger: console });
broker.createService(Publisher, Object.assign({
name: "publisher",
settings: {
brokers: ['localhost:9092'], // list of Kafka brokers
topic: "events" // must already exist (can be created with Admin)
}
}));
let run = async () => {
await broker.start();
await broker.call("publisher.emit", {
event: "my.first.event",
payload: { msg: "somthing useful" }
})
await broker.stop();
}
run();
Calls the publisher service for each emitted event (w/o broadcast events).
const { ServiceBroker } = require("moleculer");
const { EventsMiddleware } = require("imicros-events");
let broker = new ServiceBroker({
logger: console,
middlewares: [EventsMiddleware({service: "publisher"})],
});
Calls the defined action for each received message.
const { ServiceBroker } = require("moleculer");
const { Subscriber } = require("imicros-events");
const Handler = {
name: "handler",
actions: {
eachEvent: {
params: {
offset: { type: "string" },
event: { type: "string" },
payload: { type: "any" },
version: { type: "string" },
uid: { type: "string" },
timestamp: { type: "number" }
},
handler(ctx) {
// do something useful...
this.logger.info(this.name + " called", { service: this.name, meta: ctx.meta, params: ctx.params });
// if you don't throw an error, the message will acknowledged for this group
return true;
}
}
}
};
// Subscriber service
const Sub = {
name: "subsriber",
mixins: [ Subscriber ],
dependencies: ["handler"],
settings: {
brokers: ['localhost:9092'], // list of Kafka brokers
topic: "events", // must already exist (can be created with Admin)
groupId: "subscriber", // group id for consuming messages
fromBeginning: false,
handler: "handler.eachEvent" // action to call for event handling (signature see service Handler)
}
};
broker = new ServiceBroker({
logger: console
});
await broker.createService(handler);
await broker.createService(Sub);
await broker.start();
JSON.stringify({
event: ctx.params.event,
payload: await this.serializer.serialize(ctx.params.payload),
meta: await this.serializer.serialize(meta),
version: Constants.VERSION,
uid: uuid(),
timestamp: Date.now()
})
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
ports:
- "2181:2181"
networks:
- kafka
kafka:
image: wurstmeister/kafka
container_name: kafka
ports:
- "9092:9092"
links:
- zookeeper
environment:
# KAFKA_ADVERTISED_HOST_NAME: ${HOST_IP}
# KAFKA_ADVERTISED_HOST_NAME: ${DOCKER_KAFKA_HOST}
KAFKA_ADVERTISED_HOST_NAME: 192.168.2.124
# KAFKA_ADVERTISED_PORT: ${DOCKER_KAFKA_PORT}
KAFKA_ADVERTISED_PORT: 9092
# KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://${HOST_IP}:${DOCKER_KAFKA_PORT}
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.2.124:9092
KAFKA_CREATE_TOPICS: "events:1:1"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
volumes:
- /var/run/docker.sock:/var/run/docker.sock
networks:
- kafka
networks:
kafka:
external: true