/ts-reactivekafka

Typesafe & reactive Kafka Subjects made with Typescript, KafkaJS and RxJS

Primary LanguageTypeScript

ts-reactivekafka

Typesafe & reactive Kafka Subjects made with Typescript, KafkaJS and RxJS.

Quickstart

npm i ts-reactivekafka

single consumer

import rxkafka from "ts-reactivekafka";
import { tap } from "rxjs/operators";

(async () => {
	let kafkaConfig = {
		kafkaHost: process.env.KAFKA_HOST || "url:9092",
		serviceId: "test",
		logAllEvents: false,
		ssl: true,
		sasl: {
			mechanism: "plain",
			username: process.env.KAFKA_USER || "xxuser",
			password: process.env.KAFKA_PASS || "xxapitoken",
		},
	};

	// singleton is instantiated
	rxkafka({
		...kafkaConfig,
		consumerConfig: {
			topics: ["topicA", "topicB", "topicC"],
			consumerId: "testConsumer",
		},
		producerConfig: {
			topics: ["target"],
		},
	});

	// NOTE: multiple invocations of rxkafka result in the original singleton
	let kafka = rxkafka();

	kafka.consumer
		.getSubject()
		.pipe(
			tap((event) => console.log(event.message)) // rxjs operators
		)
		.subscribe(kafka.producer.getSubject());
})();

consumer group with dynamic import

import { merge } from "rxjs";

(async () => {
	const kafkaConfig = {
		kafkaHost: process.env.KAFKA_HOST || "url:9092",
		serviceId: "test",
		logAllEvents: false,
		ssl: true,
		sasl: {
			mechanism: "plain",
			username: process.env.KAFKA_USER || "xxuser",
			password: process.env.KAFKA_PASS || "xxapitoken",
		},
	};

	const kafkaProducerSingleton = (await import("ts-reactivekafka"))({
		...kafkaConfig,
		producerConfig: {
			topics: ["target"],
		},
	});

	const kafkaGroupConsumer1 = (await import("ts-reactivekafka"))({
		...kafkaConfig,
		consumerConfig: {
			topics: ["topicA", "topicB", "topicC"],
			consumerGroupId: "consumerGroupX", // NOTE: consumerGroupId instead of consumerId
		},
	});

	const kafkaGroupConsumer2 = (await import("ts-reactivekafka"))({
		...kafkaConfig,
		consumerConfig: {
			topics: ["topicA", "topicB", "topicC"],
			consumerGroupId: "consumerGroupX",
		},
	});

	const kafkaGroupConsumer3 = (await import("ts-reactivekafka"))({
		...kafkaConfig,
		consumerConfig: {
			topics: ["topicA", "topicB", "topicC"],
			consumerGroupId: "consumerGroupX",
		},
	});

	const consumerGroup = merge(
		kafkaGroupConsumer1.consumer.getSubject(),
		kafkaGroupConsumer2.consumer.getSubject(),
		kafkaGroupConsumer3.consumer.getSubject()
	);

	consumerGroup.subscribe(kafkaProducerSingleton.producer.getSubject());
})();

The Code

The code follows a Singleton Pattern* in which an optional consumer and producer instance are made available given the respective consumer and producer configurations and given a general Kafka config. The consumer or producer instance is only instantiated when their respective configs are well-defined.

The only public methods for both instances are:

  • getSubject() which returns the RxJs Subject (a multicasted observable)
  • disconnect() which disconnects the Kafka consumer/producer and completes the subject

The subjects are typed as follows:

type Event = {
	key?: string | null
	value: any
	partition?: number
	headers?: IHeaders
	timestamp?: string
}

Subject<{topic: string, message: Event}>

Since, in Typescript, no basic JSON type exists yet the value is typed as any. Otherwise value could have been typed as JSON | string.

Reactive Operators

RxJs offers a vast range operators in which developers can construct fast & flexible functionally reactive programming constructs. A good overview of the operators can be found here:

Optimized module thanks to Vercel Ncc

Vercel's ncc provides NPM developers with the ultimate NPM module compilation toolkit:

  • Publish minimal packages to npm
  • Only ship relevant app code to serverless environments
  • Don't waste time configuring bundlers
  • Generally faster bootup time and less I/O overhead
  • Compiled language-like experience (e.g.: go)

Sponsored by Charp

This package is used in production and maintained by the developers of Charp.