Cannot consume produced messages in v2.0
theaungmyatmoe opened this issue · 2 comments
Producer
Kafka::publish()
->onTopic('saga')
->withConfigOption('linger.ms', 0)
->withConfigOption('queue.buffering.max.ms', 0)
->withMessage(new Message(body: [
'name' => fake()->name
]))
->send(true);
I set the message flush and config accordingly #277 .
The produced message can be consumer in the Conductor console.
![Screenshot 2024-05-13 at 9 03 56 PM](https://private-user-images.githubusercontent.com/65492233/330096993-64eadbe6-91f3-48b3-bd68-e0a79e2c11b5.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MjYxMjUzNjAsIm5iZiI6MTcyNjEyNTA2MCwicGF0aCI6Ii82NTQ5MjIzMy8zMzAwOTY5OTMtNjRlYWRiZTYtOTFmMy00OGIzLWJkNjgtZTBhNzllMmMxMWI1LnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA5MTIlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwOTEyVDA3MTEwMFomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPWI4YjA0ZmY3NTI4MmNkYTIyZDY1YjI2Nzc5MDU0ODc2YzRkZGU0MzJiOWI2ZTA2ZWNjNzYzZDYwNGNmNmY4MGQmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.Pe2Wq_IwC3071EU7F3QXAdVGW40HISmrK2xGIKNyfqo)
However it cannot be consumed via the kafka consumer.
Consumer
<?php
namespace App\Console\Commands;
use App\Kafka\Handler;
use Carbon\Exceptions\Exception;
use Illuminate\Console\Command;
use Junges\Kafka\Consumers\Consumer;
use Junges\Kafka\Contracts\KafkaMessage;
use Junges\Kafka\Exceptions\ConsumerException;
use Junges\Kafka\Facades\Kafka;
class KafkaConsumerCommand extends Command
{
/**
* The name and signature of the console command.
*
* @var string
*/
protected $signature = 'saga:listen';
/**
* The console command description.
*
* @var string
*/
protected $description = 'Saga';
/**
* Execute the console command.
* @throws ConsumerException
* @throws Exception
*/
public function handle(): void
{
$this->line('Listening for messages...');
$consumer = Kafka::consumer(['saga'])
->subscribe('saga')
->withDlq()
->withHandler(function () {
$this->line('Message received');
})
->build();
$consumer->consume();
}
}
PHP Version
➜ laravel-kafka-pub-sub git:(main) ✗
task shell
task: [shell] docker compose exec app sh
/var/www/html # php -v
PHP 8.3.7 (cli) (built: May 10 2024 23:03:08) (ZTS)
Copyright (c) The PHP Group
Zend Engine v4.3.7, Copyright (c) Zend Technologies
with Zend OPcache v8.3.7, Copyright (c), by Zend Technologies
/var/www/html #
Working Version in v1.13
Even though it's working on v1.13 I cannot use both version in our Laravel app since it was upgraded to the Laravel 11.
<?php
namespace App\Console\Commands;
use App\Jobs\KafkaJob;
use Illuminate\Console\Command;
use Junges\Kafka\Facades\Kafka;
class KafkaConsumerCommand extends Command
{
/**
* The name and signature of the console command.
*
* @var string
*/
protected $signature = 'kafka:listen';
/**
* The console command description.
*
* @var string
*/
protected $description = 'Command description';
/**
* Execute the console command.
*/
public function handle()
{
$consumer = Kafka::consumer(['wallets'])
->withHandler(new KafkaJob())
->withAutoCommit()
->build();
$consumer->consume();
}
}
@aungmyatmoethegreat v2.1.0 should fix this. You no longer have to pass the shouldFlush
parameter. I also updated the docs about sending single messages (Proucer::send) and multiple messages (Producer::sendBatch).
For single messages, flush is called automatically, but keep in mind that this slows down the producer.
If you need to send multiple messages on one request, please use batching and send them all in once.
Feel free to open another issue if you problem persists.