/queue

基于php实现kafka和redis的队列

Primary LanguagePHP

queue

PHP的一个队列包,目前已有kafka和redis两种队列驱动,实现方式是用swoole的进程池 + symfony的console + enqueue的队列包

kafka队列使用需要先安装 rdkafka 拓展

redis队列的使用需要有一个redis的操作包,这里指定了predis

后续可能会继续拓展新的队列驱动,目前只存在这两种

版本要求

"php": "^7.1.3",
"ext-swoole": "^2.0 || ^3.0 || ^4.0",

安装教程

建议在composer.json里加上

{
    "require": {
        xxxxxx,
        "egret/queue": "^1.0"
    },
    "config": {
        "bin-dir": "bin"
    }
}
composer update egret/queue

当然也可以直接composer require,但是这样可执行文件就不会放到跟composer.json同级的bin目录下,比较难找

composer require egret/queue

使用指引

创建一个简单的redis队列

<?php

namespace Egret\Queue\Test;

use Egret\Queue\AbstractQueueCommand;
use Monolog\Handler\RotatingFileHandler;
use Monolog\Logger;
use Egret\Queue\DingTalk;

// 继承这个抽象类
class TestRedisQueue extends AbstractQueueCommand
{
    protected function configure()
    {
        // 调用父类的configure方法,必须要有
        parent::configure();
        // 设置队列名称,必须要有,设置队列描述(非必须)
        $this->setName('redis')->setDescription('测试redis队列');
        // 设置队列驱动,不设置默认redis,目前只支持redis/kafka
        $this->driver = 'redis';
        // 设置redis连接的配置,这里测试时用,相关的配置信息往下看
        $this->config = [
            'host' => '127.0.0.1',
            'port' => 6379,
            'password' => '123456',
            'database' => 10,
            'predis_options' => [
                // 指定缓存的前缀
                'prefix'  => 'queue:test:'
            ]
        ];
        // 设置pid文件的存放路径,可以不设置,不设置不能用命令停止队列,这里配置的是我测试的路径,大家按自己的情况设置值
        $this->pidPath = CONSOLE_PATH . '/test/runtime';
    }

    // 设置Monolog的日志,设置之后会写入任务的相关日志,可不设置
    protected function getLogger()
    {
        $logger = new Logger('queue');
        $rotating = new RotatingFileHandler(CONSOLE_PATH . '/test/runtime/info.log', 30);
        $logger->pushHandler($rotating);
        return $logger;
    }

    // 配置钉钉机器人,任务异常时会报警到钉钉上,可不设置
    protected function getDingTalk()
    {
        $config = [
            'token' => 'xxxx',
            'secret' => 'xxxx',
            'sign' => true
        ];
        return new DingTalk($config['token'], $config['sign'], $config['secret']);
    }
}

启动队列命令

./queue 队列别名 start topic名称(redis的键/kafka的topic)

Options:
  -w, --workerNum=WORKERNUM                    进程数量 [default: 1]
  -r, --maxRetry=MAXRETRY                      任务失败最大重试次数 [default: 3]
  -t, --timeout=TIMEOUT                        单次等待超时时间,单位毫秒 [default: 30000]
  -c, --ding_trace_num=DING_TRACE_NUM          队列异常钉钉通知展示调试信息数,一般可以不用修改 [default: 5]
  -m, --ding_notice_mobile=DING_NOTICE_MOBILE  队列异常钉钉通知@人,要用手机号,多个用,隔开,格式是13400000000,13500000000: [default: ""]
  -d, --daemon                                 以守护进程运行

启动上面的队列,指定topic为test

./queue redis start test

查看队列

./queue redis status

Queue: redis
PID file: /queue-redis.pid, PID: 0

+-----------+-------+-------+------+--------+------------------------------+
| USER      | PID   | RSS   | STAT | START  | COMMAND                      |
+-----------+-------+-------+------+--------+------------------------------+
| zbangtang | 11993 | 6720  | S+   | 3:28PM | php ./queue redis start test |
| zbangtang | 11984 | 16056 | S+   | 3:28PM | php ./queue redis start test |
+-----------+-------+-------+------+--------+------------------------------+

停止队列,需要有PID文件才可以

./queue redis stop

上面就完成了一个消费者队列的创建

下面展示一下生产者,先创建一个工作类,所有工作类都必须继承AbstractJob

<?php


namespace Egret\Queue\Test;

use Egret\Queue\AbstractJob;

class TestJob extends AbstractJob
{
    protected $name;
    protected $flag;

    public function __construct($name, $flag = true)
    {
        $this->name = $name;
        $this->flag = $flag;
    }

    public function execute()
    {
        echo sprintf('queue %s is run' . PHP_EOL, $this->name);
        return $this->flag;
    }
}

Redis生产者代码

$job = new TestJob($this->getName(), false);
(new RedisQueue($redisConf, $topic))->produce($job);

Kafka生产者代码

$job = new TestJob($this->getName(), false);
$kafkaConf = [
     'global' => [
         'group.id' => 'test-group',
         'metadata.broker.list' => '127.0.0.1:9092',
         'enable.auto.commit' => 'false',
     ],
     'topic' => [
         'auto.offset.reset' => 'latest',
     ],
];
(new KafkaQueue($kafkaConf, $topic))->produce($job);

到这里就完成了队列的生产者和消费者的创建了,还差一个步骤就是把消费者加入到queue命令行中,用命令便可以启动和管理

此处采用文件加载的方式,使用的时候可以在composer.json同级的目录,或者同级的app或者application亦或者src目录下创建console.php,内容示例如下:

<?php

use Egret\Queue\Test\TestConsumer;
use Egret\Queue\Test\TestCustomErrorQueue;
use Egret\Queue\Test\TestDingTalkQueue;
use Egret\Queue\Test\TestKafkaQueue;
use Egret\Queue\Test\TestLoggerQueue;
use Egret\Queue\Test\TestRedisQueue;

return [
    TestDingTalkQueue::class,
    TestKafkaQueue::class,
    TestLoggerQueue::class,
    TestRedisQueue::class,
    TestConsumer::class,
    TestCustomErrorQueue::class,

];

必须把console.php存放到指定的几个地方之一,不然读取不到,文件存放位置示例:

-- 项目demo  
  -- app文件夹  
    -- 可在app文件夹下创建console.php  
  -- application文件夹   
    -- 可在application文件夹下创建console.php  
  -- src文件夹   
    -- 可在src文件夹下创建console.php  
  -- vendor  
  -- 可在这一层目录下创建console.php  
  -- composer.json  
  -- composer.lock

创建完后执行./queue便可以看到相关的队列了

./queue

________                               
\_____  \  __ __   ____  __ __   ____  
 /  / \  \|  |  \_/ __ \|  |  \_/ __ \ 
/   \_/.  \  |  /\  ___/|  |  /\  ___/ 
\_____\ \_/____/  \___  >____/  \___  >
       \__>           \/            \/ 
                                            v1.0.0                                                                   

Usage:
  command [options] [arguments]

Options:
  -h, --help            Display this help message
  -q, --quiet           Do not output any message
  -V, --version         Display this application version
      --ansi            Force ANSI output
      --no-ansi         Disable ANSI output
  -n, --no-interaction  Do not ask any interactive question
  -v|vv|vvv, --verbose  Increase the verbosity of messages: 1 for normal output, 2 for more verbose output and 3 for debug

Available commands:
  consumer  队列生产者
  dingtalk  测试钉钉报警
  error     设置自定义的错误处理
  help      Displays help for a command
  kafka     测试kafka队列
  list      Lists commands
  logger    测试monolog日志
  redis     测试redis队列