weiboad/kafka-php

在swoole中,作为常驻进程,如何使用该扩展呢?

Opened this issue · 7 comments

最近使用php swoole做项目,启动一个常驻进程,需要把接口日志发送到kafka,问题如下:
1、在github上的示例,异步和同步的生成者方式每次都是new一个生成者。那么在常驻进程中,怎么实现一个生产者连接池,不需要每次都是new一个生成者。
2、使用异步生成者方式进行测试,发现每次会发送多次数据到kafka,这个是怎么回事呢?
3、使用同步方式,把kafka生产者赋给一个类属性,这样不需要每次都去new一个生产者,但是发现会报如下的错误:
Fatal error: Uncaught Kafka\Exception\Socket: After 4 attempts could not write 444 bytes to stream, completed writing only 0 bytes in kafka/nmred/kafka-php/src/Kafka/SocketSync.php:380

use Kafka\Producer;
use Kafka\ProducerConfig;
use Monolog\Handler\ErrorLogHandler;
use Monolog\Logger;
use Swoole\Http\Request;
use Swoole\Http\Response;

//自动加载
require 'vendor/autoload.php';

$server = new \swoole_http_server("0.0.0.0", 9501);
$config = ProducerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10000);
$config->setMetadataBrokerList('192.168.111.200:9092');
$config->setRequiredAck(1);
$config->setIsAsyn(false);
$config->setProduceInterval(500);
$config->setBrokerVersion('2.2.0');
$logger = new Logger('my_logger');
$logger->pushHandler(new ErrorLogHandler());

$producer = new Producer();
//$producer->setLogger($logger);

$server->on('request', function (Request $request, Response $response) use (&$producer) {
    $response->header("Content-Type", "text/html; charset=utf-8");
    $ret = $producer->send([[
        'topic' => 'test',
        'value' => 'test....message.',
        'key' => '',
    ]]);
    $response->end(json_encode($ret));
});
$server->start();

我也遇到这个问题,刚开始很好,但是空闲一段时间后,下一次访问就会报错,并且导致Worker退出,我猜想可能是Client丢失了连接

PHP Notice:  fwrite(): send of 94 bytes failed with errno=32 Broken pipe in /var/www/vendor/nmred/kafka-php/src/Kafka/SocketSync.php on line 372
PHP Notice:  fwrite(): send of 94 bytes failed with errno=32 Broken pipe in /var/www/vendor/nmred/kafka-php/src/Kafka/SocketSync.php on line 372
PHP Notice:  fwrite(): send of 94 bytes failed with errno=32 Broken pipe in /var/www/vendor/nmred/kafka-php/src/Kafka/SocketSync.php on line 372
PHP Notice:  fwrite(): send of 94 bytes failed with errno=32 Broken pipe in /var/www/vendor/nmred/kafka-php/src/Kafka/SocketSync.php on line 372
PHP Fatal error:  Uncaught Kafka\Exception\Socket: After 4 attempts could not write 94 bytes to stream, completed writing only 0 bytes in /var/www/vendor/nmred/kafka-php/src/Kafka/SocketSync.php:380
Stack trace:
#0 /var/www/vendor/nmred/kafka-php/src/Kafka/Producer/SyncProcess.php(84): Kafka\SocketSync->write('\x00\x00\x00Z\x00\x00\x00\x02\x00\x00\x00\x00\x00\tk...')
#1 /var/www/vendor/nmred/kafka-php/src/Kafka/Producer.php(81): Kafka\Producer\SyncProcess->send(Array)
#2 /var/www/server.php(99): Kafka\Producer->send(Array)
#3 {main}
  thrown in /var/www/vendor/nmred/kafka-php/src/Kafka/SocketSync.php on line 380

我试过php-rdkafka,像我上面那样写,多个请求共享一个连接,Kafka收不到数据,你是怎么解决共享连接的问题?

下面这个是我现在使用的:
`class kafka{
private $producer = null; // 生产者
private $partition = 0;
private $topic_conf = null;
private $topic_obj = null;

public function __construct($topic, $broker_list){
    if (empty($topic)) {
        throw new Exception('topic为空');
    }
    if (empty($broker_list)) {
        throw new Exception('broker_list为空');
    }

    $this->topic = $topic;
    $this->broker_list = $broker_list;

    $conf = new RdKafka\Conf();
    $conf->setDrMsgCb(function ($kafka, $message) {
        var_export($message, true);
        // file_put_contents("./dr_cb.log", var_export($message, true).PHP_EOL, FILE_APPEND);
    });
    $conf->setErrorCb(function ($kafka, $err, $reason) {
        sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason);
        // file_put_contents("./err_cb.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND);
    });

    $this->producer = new RdKafka\Producer($conf);
    $this->producer->setLogLevel(LOG_DEBUG);
    $this->producer->addBrokers($this->broker_list);

    $this->topic_conf = new RdKafka\TopicConf();
    // -1必须等所有brokers同步完成的确认 1当前服务器确认 0不确认,这里如果是0回调里的offset无返回,如果是1和-1会返回offset
    // 我们可以利用该机制做消息生产的确认,不过还不是100%,因为有可能会中途kafka服务器挂掉
    $this->topic_conf->set('request.required.acks', 0);

    $this->topic_obj = $this->producer->newTopic($this->topic, $this->topic_conf);
}
public function send($messages = []) {
    $this->topic_obj->produce(RD_KAFKA_PARTITION_UA, $this->partition, json_encode($messages));
    return true;
}

}`

然后再swoole中使用的方式:
public function onWorkerStart($server, $worker_id) { if ($server->taskworker) { // 创建kafka对象 require_once(__DIR__ . '/kafka.php'); $server->kafka = new kafka($CONFIG['kafka']['topic'], $CONFIG['kafka']['broker_list']); } }

每个task进程都有一个kafka连接,在task进程里面是可以一直使用这个连接的。

@laiason 感谢,我的方法是把连接放在server上,同步阻塞的方式投递消息,我觉得使用同步方式投递消息是可以接受的,kafka本来就是一个队列系统,投递消息的代价很小,没有必要把投递消息的过程放到另一个进程里

use Swoole\Http\Request;
use Swoole\Http\Response;

//自动加载
require 'vendor/autoload.php';

$server = new \swoole_http_server("0.0.0.0", 9501);

$server->on('WorkerStart', function (\Swoole\Http\Server $server, int $workerId) {
    if ($server->topic == null) {
        $rk = new RdKafka\Producer();
        $rk->setLogLevel(LOG_DEBUG);
        $rk->addBrokers("10.0.75.1:9092,10.0.75.1:9092");
        $topic = $rk->newTopic("test");
        $server->topic = $topic;
    }
});

$server->on('request', function (Request $request, Response $response) use ($server) {
    $response->header("Content-Type", "text/html; charset=utf-8");
    $server->topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message payload");
    $response->end(json_encode([]));
});

$server->start();
  • 建议每个Worker共享一个Kafka 客户端,这样即便其中某个客户端连接挂了,不会影响其他Worker发送数据。
  • Kafka的调用放在end()之后,这样不会拖慢正常Http响应,类似于FPMfastcgi_finish_request
<?php
$http = new Swoole\Http\Server('127.0.0.1', 5200);
$http->on('workerStart', function (Swoole\Http\Server $server, $workerId) {
    global $topic;

    $brokerList = '192.168.15.83:9092';
    $topic = 'bi_test';

    $conf = new RdKafka\Conf();
    $conf->set('queue.buffering.max.messages', 1000000);

    $rk = new RdKafka\Producer($conf);
    $rk->setLogLevel(LOG_WARNING);
    $rk->addBrokers($brokerList);

    $topic = $rk->newTopic($topic);
});
$http->on('request', function (Swoole\Http\Request $request, Swoole\Http\Response $response) {
    global $topic;
    $payload = 'Message payload: ' . date('Y-m-d H:i:s ') . microtime(true);
    $response->end($payload);
    $topic->produce(RD_KAFKA_PARTITION_UA, 0, $payload);
});
$http->start();

rdkafka的包太大了,打包docker的话要多出几百M