PHP-rdkafka is a thin librdkafka binding providing a working PHP 5 / PHP 7 Kafka 0.8 / 0.9 client.
It supports the consumer, producer, and metadata APIs.
The API ressembles as much as possible to librdkafka's.
- Installation
- Examples
- Usage
- RdKafka\Consumer
- RdKafka\Producer
- RdKafka
- RdKafka\Conf
- RdKafka\TopicConf
- RdKafka\Topic
- RdKafka\ConsumerTopic
- RdKafka\ProducerTopic
- RdKafka\Message
- RdKafka\Queue
- RdKafka\Exception
- RdKafka\Metadata
- RdKafka\Metadata\Topic
- RdKafka\Metadata\Broker
- RdKafka\Metadata\Partition
- RdKafka\Metadata\Collection
- Functions
- Constants
- Credits
- License
php-rdkafka depends on the stable version of librdkafka
php-rdkafka is compatible with PHP 5 (master branch, PECL release); and has an experimental PHP 7 branch
For PHP version 7, installation from source should be preferred.
sudo pecl install channel://
brew tap homebrew/dupes
brew tap homebrew/versions
brew tap homebrew/homebrew-php
brew install homebrew/php/php70-rdkafka
is the rdkafka package for PHP 7.0. Replace 70
by 53, 54, 55, or 56 for PHP version 5.3, 5.4, 5.5, or 5.6, respectively.
For PHP version 7, make sure to use the php7 branch.
git clone
cd php-rdkafka
# for php7 only:
# git checkout php7
sudo make install
# Add to your php.ini:
echo|sudo tee -a /path/to/php.ini
See examples
For producing, we first need to create a producer, and to add brokers (Kafka servers) to it:
$rk = new RdKafka\Producer();
Next, we create a topic instance from the producer:
$topic = $rk->newTopic("test");
From there, we can produce as much messages as we want, using the produce method:
$topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message payload");
The first argument is the partition. RD_KAFKA_PARTITION_UA stands for unassigned, and lets librdkafka choose the partition.
The second argument are message flags and should always be 0, currently.
The message payload can be anything.
For consuming, we first need to create a consumer, and to add brokers (Kafka servers) to it:
$rk = new RdKafka\Consumer();
Next, create a topic instance by calling the newTopic()
method, and start
consuming on partition 0:
$topic = $rk->newTopic("test");
// The first argument is the partition to consume from.
// The second argument is the offset at which to start consumption. Valid values
$topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);
Next, retrieve the consumed messages:
while (true) {
// The first argument is the partition (again).
// The second argument is the timeout.
$msg = $topic->consume(0, 1000);
if ($msg->err) {
echo $msg->errstr(), "\n";
} else {
echo $msg->payload, "\n";
Consuming from multiple topics and/or partitions can be done by telling librdkafka to forward all messages from these topics/partitions to an internal queue, and then consuming from this queue:
Creating the queue:
$queue = $rk->newQueue();
Adding topars to the queue:
$topic1 = $rk->newTopic("topic1");
$topic1->consumeQueueStart(0, RD_KAFKA_OFFSET_BEGINNING, $queue);
$topic1->consumeQueueStart(1, RD_KAFKA_OFFSET_BEGINNING, $queue);
$topic2 = $rk->newTopic("topic2");
$topic2->consumeQueueStart(0, RD_KAFKA_OFFSET_BEGINNING, $queue);
Next, retrieve the consumed messages from the queue:
while (true) {
// The only argument is the timeout.
$msg = $queue->consume(1000);
if ($msg->err) {
echo $msg->errstr(), "\n";
} else {
echo $msg->payload, "\n";
librdkafka can store offsets in a local file, or on the broker. The default is local file, and as soon as you start using RD_KAFKA_OFFSET_STORED
as consuming offset, rdkafka starts to store the offset.
By default, the file is created in the current directory, with a named based on the topic and the partition. The directory can be changed by setting the
configuration property.
Other interesting properties are:
, auto.commit.enable
$topicConf = new RdKafka\TopicConf();
$topicConf->set("", 1e3);
$topicConf->set("", 60e3);
$topic = $rk->newTopic("test", $topicConf);
$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
librdkafka will buffer up to 1GB of messages for each consumed partition by default. You can lower memory usage by reducing the value of the queued.max.messages.kbytes
parameter on your consumers.
Each consumer and procuder instance will fetch topics metadata at an interval defined by the
parameter. Depending on your librdkafka version, the parameter defaults to 10 seconds, or 600 seconds.
librdkafka fetches the metadata for all topics of the cluster by default. Setting topic.metadata.refresh.sparse
to the string "true"
makes sure that librdkafka fetches only the topics he uses.
Setting topic.metadata.refresh.sparse
to "true"
, and
to 600 seconds (plus some jitter) can reduce the bandwidth a lot, depending on the number of consumers and topics.
This setting allows librdkafka threads to terminate as soon as librdkafka is done with them. This effectively allows your PHP processes / requests to terminate quickly.
When enabling this, you have to mask the signal like this:
// once
pcntl_sigprocmask(SIG_BLOCK, array(SIGIO));
// any time
$conf->set('internal.termination.signal', SIGIO);
$producer = new RdKafka\Producer(RdKafka\Conf $conf = null);
Creates a new Kafka producer and starts its operation.
is an optional RdKafka\Conf
instance that will
be used instead of the default configuration.
The $conf
object is copied, and changing $conf
after that as no effect
on the producer.
See RdKafka\Conf
for more information.
$topic = $producer->newTopic(string $topic, RdKafka\TopicConf $conf = null);
Creates a new RdKafka\ProducerTopic
instance for topic named $topic
is an optional configuration for the topic that will be used instead
of the default topic configuration.
The $conf
object is copied by this function, and changing $conf
that has no effect on the topic.
See RdKafka\TopicConf
for more information.
$qlen = $producer->outqLen();
Returns the current out queue length: messages waiting to be sent to, or acknowledged by, the broker.
$producer->poll(int $timeout_ms);
Polls the Producer handle for events.
$consumer = new RdKafka\Consumer(RdKafka\Conf $conf = null);
Creates a new Kafka consumer and starts its operation.
is an optional RdKafka\Conf
instance that will
be used instead of the default configuration.
The $conf
object is copied, and changing $conf
after that as no effect
on the producer.
See RdKafka\Conf
for more information.
$queue = $consumer->newQueue();
Returns a RdKafka\Queue instance.
$topic = $consumer->newTopic(string $topic, RdKafka\TopicConf $conf = null);
Creates a new RdKafka\ConsumerTopic
for topic named $topic
is an optional configuration for the topic that will be used instead
of the default topic configuration.
The $conf
object is copied by this function, and changing $conf
that has no effect on the topic.
See RdKafka\TopicConf
for more information.
RdKafka is the base class for RdKafka\Producer
and RdKafka\Consumer
$rk->addBrokers(string $brokerList);
Adds a one or more brokers to the instance's list of initial brokers. Additional brokers will be discovered automatically as soon as rdkafka connects to a broker by querying the broker metadata.
If a broker name resolves to multiple addresses (and possibly address families) all will be used for connection attempts in round-robin fashion.
is a ,-separated list of brokers in the format:
Returns the number of brokers successfully added.
NOTE: Brokers may also be defined with the
configuration property.
$rk->setLogLevel(int $level);
Specifies the maximum logging level produced by
internal kafka logging and debugging.
If the debug
configuration property is set the level is automatically
adjusted to LOG_DEBUG
Valid values for $level
are any of the syslog LOG_*
$metadata = $rk->metadata(bool $all_topics, RdKafka\Topic $only_topic = null, int $timeout_ms);
Request Metadata from broker.
- all_topics - if true: request info about all topics in cluster, if false: only request info about locally known topics.
- only_rkt - only request info about this topic
- timeout_ms - maximum response time before failing.
Returns a RdKafka\Metadata
$conf = new RdKafka\Conf();
Creates a new configuration. The list of available configuration properties is documented at
$dump = $conf->dump();
Dump the configuration properties and values to an array.
$conf->set(string $name, string $value);
Sets a configuration property.
Throws a RdKafka\Exception
on error.
$conf = new RdKafka\TopicConf();
Creates a new topic configuration. See RdKafka\Conf
Set partitioner callback.
RdKafka\Topic is the base class for RdKafka\ConsumerTopic
and RdKafka\ProducerTopic
$name = $topic->getName();
Returns the topic name.
New ConsumerTopic instances can be created by calling
$topic->consumeStart(int $partition, int $offset);
Start consuming messages for $partition
at offset $offset
which may
either be a proper offset (0..N) or one of the the special offsets:
, rd_kafka_offset_tail(..)
rdkafka will attempt to keep queued.min.messages
(config property)
messages in the local queue by repeatedly fetching batches of messages
from the broker until the threshold is reached.
The application shall use the consume()
to consume messages from the local queue, each kafka message being
represented as a RdKafka\Message
must not be called multiple times for the same
topic and partition without stopping consumption first with
Throws a RdKafka\Exception
on error.
$topic->consumeStop(int $partition);
Stop consuming messages for $partition
, purging all messages currently in the
local queue.
Throws a RdKafka\Exception
on error.
$topic->consumeQueueStart(int $partition, int $offset, RdKafka\Queue $queue);
Same as consumeStart()
but re-routes incoming messages to
the provided queue $queue
The application must use one of the RdKafka\Queue::consume*()
to receive fetched messages.
must not be called multiple times for the
same topic and partition without stopping consumption first with
and consumeQueueStart()
must not be combined for the
same topic and partition.
Throws a RdKafka\Exception
on error.
$message = $topic->consume(int $partition, int $timeout_ms);
Consume a single message from $partition
is maximum amount of time to wait for a message to be received.
Consumer must have been previously started with consumeStart()
Returns NULL on timeout.
Throws a RdKafka\Exception
on error.
NOTE: The returned message's ..->err
must be checked for errors.
NOTE: ..->err
signals that the end
of the partition has been reached, which should typically not be
considered an error. The application should handle this case
(e.g., ignore).
See Topic::getName()
$topic->offsetStore($message->partition, $message->offset+1);
Store offset offset
for topic rkt
partition partition
. The
offset will be commited (written) to the offset store according to
NOTE: auto.commit.enable
must be set to "false"
when using this API.
Throws a RdKafka\Exception
on error.
New ProducerTopic instances can be created by calling
See Topic::getName()
$topic->produce(int $partition, int $msgflags, string $payload, string $key = null)
Produce and send a single message to broker.
is an asynch non-blocking API.
is the target partition, either:
(unassigned) for automatic partitioning using the topic's partitioner function, or- a fixed partition (0..N)
must be 0.
is the message payload.
is an optional message key, if non-NULL it
will be passed to the topic partitioner as well as be sent with the
message to the broker and passed on to the consumer.
Throws a RdKafka\Exception
on error.
A Kafka message as returned by the consuming methods.
This object has two purposes:
- provide the application with a consumed message. (
== 0) - report per-topic+partition consumer errors (
!= 0)
The application must check err
to decide what action to take.
Non-zero for error signaling. Use errstr()
for a string representation.
Topic name
When err == 0: the message payload
When err == 0: Optional message key
When err == 0: Message offset
$errstr = $message->errstr();
When err != 0, returns the string representation of the error.
New Queue instances can be created by calling
Message queues allows the application to re-route consumed messages
from multiple topic+partitions into one single queue point.
This queue point, containing messages from a number of topic+partitions,
may then be served by a single consume()
rather than one per topic+partition combination.
See RdKafka\ConsumerTopic::consumeQueueStart()
, RdKafka\Queue::consume()
$message = $queue->consume(int $timeout_ms);
See RdKafka\ConsumerTopic::consume()
Exceptions thrown by php-rdkafka are of this type.
Metadata container.
See RdKafka::metadata()
$id = $metadata->getOrigBrokerId();
Returns the broker originating this metadata.
$name = $metadata->getOrigBrokerName();
Returns the name of originating broker.
$brokers = metadata->getBrokers();
printf("There are %d brokers", count($brokers));
foreach ($brokers as $broker) {
Returns a RdKafka\Metadata\Collection
of RdKafka\Metadata\Broker
$topics = $metadata->getTopics();
printf("There are %d topics", count($topics));
foreach ($topics as $topic) {
Returns a RdKafka\Metadata\Collection
of RdKafka\Metadata\Topic
Metadata: Broker information.
$id = $broker->getId();
Returns the broker id.
$host = $broker->getHost();
Returns the broker hostname.
$port = $broker->getPort();
Returns the broker port.
Metadata: Topic information.
$name = $topic->getTopic();
Returns the topic name.
$name = $topic->getErr();
Returns the topic error reported by broker.
$topics = $topic->getPartitions();
printf("There are %d partitions", count($partitions));
foreach ($partitions as $partition) {
Returns a RdKafka\Metadata\Collection
of RdKafka\Metadata\Partition
Metadata: Partition information.
$id = $partition->getId();
Returns the partition id.
$err = $partition->getErr();
Returns the partition error reported by broker.
$leader = $partition->getLeader();
Returns the leader broker id.
$replicas = $partitions->getReplicas();
printf("There are %d replicas", count($replicas));
foreach ($replicas as $replica) {
Returns a RdKafka\Metadata\Collection
of replica broker ids for this partition.
$replicas = $partitions->getIsrs();
printf("There are %d In-Sync-Replicas", count($replicas));
foreach ($replicas as $replica) {
Returns a RdKafka\Metadata\Collection
of In-Sync-Replica broker ids for this partition.
implements Iterator
(can be used in foreach
), and Countable
(can be used in count()
Returns a human readable representation of a kafka error
Converts errno
to a rd_kafka_resp_err_t
error code
Returns errno
$offset = rd_kafka_offset_tail($cnt);
Returns a special offset to start consuming $cnt
messages from topic's current .._END
That is, if current end offset is 12345 and $cnt
is 200, it will start consuming from offset 12345-200 = 12145.
Start consuming from beginning of kafka partition queue: oldest msg
Start consuming from end of kafka partition queue: next msg
Start consuming from offset retrieved from offset store
Unassigned partition.
The unassigned partition is used by the producer API for messages that should be partitioned using the configured or default partitioner.
librdkafka version
Interpreted as hex
- MM = Major
- mm = minor
- rr = revision
- xx = currently unused
I.e.: 0x00080100 = 0.8.1
begin internal error codes
Received message is incorrect
Bad/unknown compression
Broker is going away
Generic failure
Broker transport error
Critical system resource failure
Failed to resolve broker
Produced message timed out
Reached the end of the topic+partition queue on the broker. Not really an error.
Permanent: Partition does not exist in cluster.
File or filesystem error
Permanent: Topic does not exist in cluster.
All broker connections are down.
Invalid argument, or invalid configuration
Operation timed out
Queue is full
ISR count < required.acks
end internal error codes
Random partitioner.
This is the default partitioner.
Returns a random partition between 0 and the number of partitions minus 1.
Consistent partitioner.
Uses consistent hashing to map identical keys onto identical partitions.
Returns a partition between 0 and number of partitions minus 1 based on the crc value of the key.
Documentation copied from librdkafka.h.
Authors: see contributors.
php-rdkafka is released under the MIT license.