Copyright (c) 2016 Blizzard Entertainment.
https://github.com/blizzard/node-rdkafka
The node-rdkafka
library is a high-performance NodeJS client for Apache Kafka that wraps the native librdkafka library. All the complexity of balancing writes across partitions and managing (possibly ever-changing) brokers should be encapsulated in the library.
This library currently uses librdkafka
version 0.9.5
.
To view the reference docs for the current version, go here
For guidelines on contributing please see CONTRIBUTING.md
Play nice; Play fair.
- Apache Kafka >=0.9
- Node.js >=4
- Linux/Mac (Sorry Windows :()
This project includes two types of unit tests in this project:
- end-to-end integration tests
- unit tests
You can run both types of tests by using Makefile
. Doing so calls mocha
in your locally installed node_modules
directory.
- Before you run the tests, be sure to init and update the submodules:
git submodule init
git submodule update
- To run the unit tests, you can run
make lint
ormake test
. - To run the integration tests, you must have a running Kafka installation available. By default, the test tries to connect to
localhost:9092
; however, you can supply theKAFKA_HOST
environment variable to override this default behavior.
You can install the node-rdkafka
module like any other module:
npm install node-rdkafka
To use the module, you must require
it.
var Kafka = require('node-rdkafka');
You can pass many configuration options to librdkafka
. A full list can be found in librdkafka
's Configuration.md
Configuration keys that have the suffix _cb
are designated as callbacks. Some
of these keys are informational and you can choose to opt-in (for example, dr_cb
). Others are callbacks designed to
return a value, such as partitioner_cb
.
Not all of these options are supported. The library will throw an error if the value you send in is invalid.
The library currently supports the following callbacks:
partitioner_cb
dr_cb
ordr_msg_cb
event_cb
librdkafka
supports using SASL for authentication and node-rdkafka
has it turned on by default. If you would like
disable sasl
support, export WITH_SASL=0
before you run npm install
. (You can also specify it when using node-gyp
, node-gyp --WITH_SASL=0 rebuild
)
This means you are required to have libsasl2
on the machine before you build it.
A Producer
sends messages to Kafka. The Producer
constructor takes a configuration object, as shown in the following example:
var producer = new Kafka.Producer({
'metadata.broker.list': 'kafka-host1:9092,kafka-host2:9092'
});
A Producer
requires only metadata.broker.list
(the Kafka brokers) to be created. The values in this list are separated by commas. For other configuration options, see the Configuration.md file described previously.
The following example illustrates a list with several librdkafka
options set.
var producer = new Kafka.Producer({
'client.id': 'kafka',
'metadata.broker.list': 'localhost:9092',
'compression.codec': 'gzip',
'retry.backoff.ms': 200,
'message.send.max.retries': 10,
'socket.keepalive.enable': true,
'queue.buffering.max.messages': 100000,
'queue.buffering.max.ms': 1000,
'batch.num.messages': 1000000,
'dr_cb': true
});
You can easily use the Producer
as a writable stream immediately after creation (as shown in the following example):
// Our producer with its Kafka brokers
// This call returns a new writable stream to our topic 'topic-name'
var stream = Kafka.Producer.createWriteStream({
'metadata.broker.list': 'kafka-host1:9092,kafka-host2:9092'
}, {}, {
topic: 'topic-name'
});
// Writes a message to the stream
var queuedSuccess = stream.write(new Buffer('Awesome message'));
if (queuedSuccess) {
console.log('We queued our message!');
} else {
// Note that this only tells us if the stream's queue is full,
// it does NOT tell us if the message got to Kafka! See below...
console.log('Too many messages in our queue already');
}
stream.on('error', function (err) {
// Here's where we'll know if something went wrong sending to Kafka
console.error('Error in our kafka stream');
console.error(err);
})
The Standard API is more performant, particularly when handling high volumes of messages. However, it requires more manual setup to use. The following example illustrates its use:
var producer = new Kafka.Producer({
'metadata.broker.list': 'localhost:9092',
'dr_cb': true
});
// Connect to the broker manually
producer.connect();
// Wait for the ready event before proceeding
producer.on('ready', function() {
try {
producer.produce(
// Topic to send the message to
'topic',
// optionally we can manually specify a partition for the message
// this defaults to -1 - which will use librdkafka's default partitioner (consistent random for keyed messages, random for unkeyed messages)
null,
// Message to send. Must be a buffer
new Buffer('Awesome message'),
// for keyed messages, we also specify the key - note that this field is optional
'Stormwind',
// you can send a timestamp here. If your broker version supports it,
// it will get added. Otherwise, we default to 0
Date.now(),
// you can send an opaque token here, which gets passed along
// to your delivery reports
);
} catch (err) {
console.error('A problem occurred when sending our message');
console.error(err);
}
});
// Any errors we encounter, including connection errors
producer.on('event.error', function(err) {
console.error('Error from producer');
console.error(err);
})
To see the configuration options available to you, see the Configuration section.
Method | Description |
---|---|
producer.connect() |
Connects to the broker. The connect() method emits the ready event when it connects successfully or an error when it does not. |
producer.disconnect() |
Disconnects from the broker. The disconnect() method emits the disconnected event when it has disconnected or error if something went wrong. |
producer.poll() |
Polls the producer for delivery reports or other events to be transmitted via the emitter. In order to get the events in librdkafka 's queue to emit, you must call this regularly. |
producer.setPollInterval(interval) |
Polls the producer on this interval, handling disconnections and reconnection. Set it to 0 to turn it off. |
producer.produce(topic, partition, msg, key, timestamp, opaque) |
Sends a message. The produce() method throws when produce would return an error. Ordinarily, this is just if the queue is full. |
producer.flush(timeout, callback) |
Flush the librdkafka internal queue, sending all messages. Default timeout is 500ms |
Some configuration properties that end in _cb
indicate that an event should be generated for that option. You can either:
- provide a value of
true
and react to the event - provide a callback function directly
The following example illustrates an event:
var producer = new Kafka.Producer({
'client.id': 'my-client', // Specifies an identifier to use to help trace activity in Kafka
'metadata.broker.list': 'localhost:9092', // Connect to a Kafka instance on localhost
'dr_cb': true // Specifies that we want a delivery-report event to be generated
});
// Poll for events every 100 ms
producer.setPollInterval(100);
producer.on('delivery-report', function(err, report) {
// Report of delivery statistics here:
//
console.log(report);
});
The following table describes types of events.
Event | Description |
---|---|
error |
Error reporting is handled through this pipeline. Most errors will have a value for code , message , and origin . origin will be local or kafka to determine where the error happened. |
disconnected |
The disconnected event is emitted when the broker has disconnected. This event is emitted only when .disconnect is called. The wrapper will always try to reconnect otherwise. |
ready |
The ready event is emitted when the Producer is ready to send messages. |
event |
The event event is emitted when librdkafka reports an event (if you opted in via the event_cb option). |
event.log |
The event.log event is emitted when logging events come in (if you opted into logging via the event_cb option). You will need to set a value for debug if you want to send information. |
event.stats |
The event.stats event is emitted when librdkafka reports stats (if you opted in). |
event.error |
The event.error event is emitted when librdkafka reports an error |
event.throttle |
The event.throttle event emitted when librdkafka reports throttling. |
delivery-report |
The delivery-report event is emitted when a delivery report has been found via polling. To use this event, you must set request.required.acks to 1 or -1 in topic configuration and dr_cb (or dr_msg_db if you want the report to contain the message payload) to true in the Producer constructor options. |
To read messages from Kafka, you use a KafkaConsumer
. You instantiate a KafkaConsumer
object as follows:
var consumer = new Kafka.KafkaConsumer({
'group.id': 'kafka',
'metadata.broker.list': 'localhost:9092',
}, {});
The first parameter is the global config, while the second parameter is the topic config that gets applied to all subscribed topics. To view a list of all supported configuration properties, see the Configuration.md file described previously. Look for the C
and *
keys.
The group.id
and metadata.broker.list
properties are required for a consumer.
Rebalancing is managed internally by librdkafka
by default. If you would like to override this functionality, you may provide your own logic as a rebalance callback.
var consumer = new Kafka.KafkaConsumer({
'group.id': 'kafka',
'metadata.broker.list': 'localhost:9092',
'rebalance_cb': function(err, assignment) {
if (err.code === Kafka.CODES.ERRORS.ERR__ASSIGN_PARTITIONS) {
this.assign(assignment);
} else if (err.code == Kafka.CODES.ERRORS.ERR__REVOKE_PARTITIONS){
this.unassign();
} else {
// We had a real error
console.error(err);
}
}
})
this
is bound to the KafkaConsumer
you have created. By specifying a rebalance_cb
you can also listen to the rebalance
event as an emitted event. This event is not emitted when using the internal librdkafka
rebalancer.
When you commit in node-rdkafka
, the standard way is to queue the commit request up with the next librdkafka
request to the broker. When doing this, there isn't a way to know the result of the commit. Luckily there is another callback you can listen to to get this information
var consumer = new Kafka.KafkaConsumer({
'group.id': 'kafka',
'metadata.broker.list': 'localhost:9092',
'offset_commit_cb': function(err, topicPartitions) {
if (err) {
// There was an error committing
console.error(err);
} else {
// Commit went through. Let's log the topic partitions
console.log(topicPartitions);
}
}
})
this
is bound to the KafkaConsumer
you have created. By specifying an offset_commit_cb
you can also listen to the offset.commit
event as an emitted event. It also has an error parameter and a list of topic partitions. This is not emitted unless opted in.
Messages that are returned by the KafkaConsumer
have the following structure.
{
value: new Buffer('hi'), // message contents as a Buffer
size: 2, // size of the message, in bytes
topic: 'librdtesting-01', // topic the message comes from
offset: 1337, // offset the message was read from
partition: 1, // partition the message was on
key: 'someKey' // key of the message if present
}
The stream API is the easiest way to consume messages. The following example illustrates the use of the stream API:
// Read from the librdtesting-01 topic... note that this creates a new stream on each call!
var stream = KafkaConsumer.createReadStream(globalConfig, topicConfig, {
topics: ['librdtesting-01']
});
stream.on('data', function(message) {
console.log('Got message');
console.log(message.value.toString());
});
You can also use the Standard API and manage callbacks and events yourself. You can choose different modes for consuming messages:
- Flowing mode. This mode flows all of the messages it can read by maintaining an infinite loop in the event loop. It only stops when it detects the consumer has issued the
unsubscribe
ordisconnect
method. - Non-flowing mode. This mode reads a single message from Kafka at a time manually.
The following example illustrates flowing mode:
// Flowing mode
consumer.connect();
consumer
.on('ready', function() {
consumer.subscribe(['librdtesting-01']);
// Consume from the librdtesting-01 topic. This is what determines
// the mode we are running in. By not specifying a callback (or specifying
// only a callback) we get messages as soon as they are available.
consumer.consume();
})
.on('data', function(data) {
// Output the actual message contents
console.log(data.message.toString());
});
The following example illustrates non-flowing mode:
// Non-flowing mode
consumer.connect();
consumer
.on('ready', function() {
// Subscribe to the librdtesting-01 topic
// This makes subsequent consumes read from that topic.
consumer.subscribe(['librdtesting-01']);
// Read one message every 1000 seconds
setInterval(function() {
consumer.consume(1);
}, 1000);
})
.on('data', function(data) {
console.log('Message found! Contents below.');
console.log(data.message.toString());
});
The following table lists important methods for this API.
Method | Description |
---|---|
consumer.connect() |
Connects to the broker. The connect() emits the event ready when it has successfully connected, or an error when it has not. |
consumer.disconnect() |
Disconnects from the broker. The disconnect() method emits disconnected when it has disconnected or error if something went wrong. |
consumer.subscribe(topics) |
Subscribes to an array of topics. |
consumer.unsubscribe() |
Unsubscribes from the currently subscribed topics. You cannot subscribe to different topics without calling the unsubscribe() method first. |
consumer.consume(cb) |
Gets messages from the existing subscription as quickly as possible. This method keeps a background thread running to do the work. If cb is specified, invokes cb(err, message) . |
consumer.consume(number, cb) |
Gets number of messages from the existing subscription. If cb is specified, invokes cb(err, message) . |
consumer.commit() |
Commits all locally stored offsets |
consumer.commit(topicPartition) |
Commits offsets specified by the topic partition |
consumer.commitMessage(message) |
Commits the offsets specified by the message |
The following table lists events for this API.
Event | Description |
---|---|
error |
Error reporting is handled through this pipeline. Most errors will have a code , message , and origin value. The origin value will be local or remote to determine where the error happened. |
disconnected |
The disconnected event is emitted when the broker disconnects. This event is only emitted when .disconnect is called. The wrapper will always try to reconnect otherwise. |
ready |
The ready event is emitted when the Producer is ready to send messages. |
event |
The event event is emitted when librdkafka reports an event (if you opted in via the event_cb option). |
event.log |
The event.log event is emitted when logging events occur (if you opted in for logging via the event_cb option).You will need to set a value for debug if you want information to send. |
event.stats |
The event.stats event is emitted when librdkafka reports stats (if you opted in). |
event.throttle |
The event.throttle event is emitted when librdkafka reports throttling. |
Both Kafka.Producer
and Kafka.KafkaConsumer
include a getMetadata
method to retrieve metadata from Kafka.
Getting metadata on any connection returns the following data structure:
{
orig_broker_id: 1,
orig_broker_name: "broker_name",
brokers: [
{
id: 1,
host: 'localhost',
port: 40
}
],
topics: [
{
name: 'awesome-topic',
partitions: [
{
id: 1,
leader: 20,
replicas: [1, 2],
isrs: [1, 2]
}
]
}
]
}
The following example illustrates how to use the getMetadata
method.
When fetching metadata for a specific topic, if a topic reference does not exist, one is created using the default config.
Please see the documentation on Client.getMetadata
if you want to set configuration parameters, e.g. acks
, on a topic to produce messages to.
var opts = {
topic: 'librdtesting-01',
timeout: 10000
};
producer.getMetadata(opts, function(err, metadata) {
if (err) {
console.error('Error getting metadata');
console.error(err);
} else {
console.log('Got metadata');
console.log(metadata);
}
});