Laravel Kafka

This package is based on anam-hossain's example

Installation

  1. Install the librdkafka library

  2. Install the php-rdkafka PECL extension

  3. Install this package using composer:

composer require thiagobrauer/laravel-kafka
  1. Publish the package's configuration file:
php artisan vendor:publish --provider="ThiagoBrauer\LaravelKafka\ServiceProvider"
  1. Add these properties to your .env file, changing the values as needed:
KAFKA_PRODUCER_SERVERS=kafka:9092
KAFKA_PRODUCER_DEBUG=true
KAFKA_CONSUMER_SERVERS=kafka:9092
KAFKA_CONSUMER_TOPICS=inventories
KAFKA_CONSUMER_GROUP_ID=group1

You can set multiple producer servers, consumer servers and consumer topics, using a , as separator.

Usage

Producer

To produce a message, just use the KafkaProducer.php class:

use ThiagoBrauer\LaravelKafka\KafkaProducer;

...

$producer new KafkaProducer()
$producer->setTopic('topic1')->send('message');

Consumer

First, you need to create a class to handle the messages received. The class must extend ThiagoBrauer\LaravelKafka\Handlers\MessageHandler and implement the method handle, like the example below:

<?php

namespace App\Kafka\Handlers;

use ThiagoBrauer\LaravelKafka\Handlers\MessageHandler;
use RdKafka\Message;

class KafkaMessageHandler extends MessageHandler
{
    public function handle(Message $message)
    {
        var_dump($message);
    }
}

Then, add your class to the message_handlers section of your config/laravel_kafka.php file, organized by topic:

...

'message_handlers' => [
    'topic1' => [
        App\Kafka\Handlers\KafkaMessageHandler::class   
    ],
    'topic2' => [
        App\Kafka\Handlers\KafkaMessageHandler::class   
    ]        
]

...

and run php artisan config:cache

After that, you're ready to start the consumer

php artisan kafka:consume

You can define the consumer configuration using variables in your .env file or command options:

KAFKA_PRODUCER_SERVERS=kafka:9092
KAFKA_PRODUCER_DEBUG=true
KAFKA_PRODUCER_COMPRESSION=snappy
KAFKA_CONSUMER_SERVERS=kafka:9092
KAFKA_CONSUMER_TOPICS=inventories
KAFKA_CONSUMER_GROUP_ID=group1
KAFKA_CONSUMER_COMMIT_ASYNC=true
KAFKA_CONSUMER_TIMEOUT_MS=120000
KAFKA_CONSUMER_AUTO_OFFSET_RESET=earliest
KAFKA_CONSUMER_AUTO_COMMIT=true
php artisan kafka:consume --servers=kafka:9092
php artisan kafka:consume --topics=inventories
php artisan kafka:consume --group_id=group1
php artisan kafka:consume --timeout_ms=120000
php artisan kafka:consume --auto_offset_reset=earliest
php artisan kafka:consume --commit_async
php artisan kafka:consume --auto_commit
php artisan kafka:consume --once // consume only one message