/tp3-resque

thinkphp3集成的php-resque队列

Primary LanguagePHPMIT LicenseMIT

ThinkPHP3.2 集成 php-resque: PHP Resque Worker

php-resque是php环境中一个轻量级的队列服务。具体队列服务是做什么用的,请自行百度!

运行环境

  • PHP 5.2+
  • Redis 2.2+

集成方法

将源码放到ThinkPHP的Vendor目录中

将源码更新到 ThinkPHP/Library/Vendor/php-resque/ 目录中

注意要定义应用目录,之前发的内容没定义应用目录导致部分小伙伴引发了找不到Queue类的异常

在项目根目录中创建resque入口脚本

#!/usr/bin/env php
<?php
ini_set('display_errors', true);
error_reporting(E_ERROR);
set_time_limit(0);

// 定义应用目录
define('APP_PATH','./Application/');

define('MODE_NAME', 'cli');	// 自定义cli模式
define('BIND_MODULE', 'Home');	// 绑定到Home模块
define('BIND_CONTROLLER', 'Queue');	// 绑定到Queue控制器
define('BIND_ACTION', 'index');	// 绑定到index方法

// 处理自定义参数
$act = isset($argv[1]) ? $argv[1] : 'start';
putenv("Q_ACTION={$act}");
putenv("Q_ARGV=" . json_encode($argv));

require './ThinkPHP/ThinkPHP.php';

创建Queue控制器

Home模块的Controller中创建Queue控制器

<?php
namespace Home\Controller;

if (!IS_CLI)  die('The file can only be run in cli mode!');
use Exception;
use Resque;

/***
 * queue入口
 * Class Worker
 * @package Common\Controller
 */
class QueueController
{
    protected $vendor;
    protected $args = [];
    protected $keys = [];
    protected $queues = '*';

    public function __construct()
    {
        vendor('php-resque.autoload');
        $argv = json_decode(getenv('Q_ARGV'));
        foreach ($argv as $item) {
            if (strpos($item, '=')) {
                list($key, $val) = explode('=', $item);
            } else {
                $key = $val = $item;
            }
            $this->keys[] = $key;
            $this->args[$key] = $val;
        }

        $this->init();
    }

    /**
     * 执行队列
     * 环境变量参数值:
     * --queue|QUEUE: 需要执行的队列的名字
     * --interval|INTERVAL:在队列中循环的间隔时间,即完成一个任务后的等待时间,默认是5秒
     * --app|APP_INCLUDE:需要自动载入PHP文件路径,Worker需要知道你的Job的位置并载入Job
     * --count|COUNT:需要创建的Worker的数量。所有的Worker都具有相同的属性。默认是创建1个Worker
     * --debug|VVERBOSE:设置“1”启用更啰嗦模式,会输出详细的调试信息
     * --pid|PIDFILE:手动指定PID文件的位置,适用于单Worker运行方式
     */
    private function init()
    {
        $is_sington = false; //是否单例运行,单例运行会在tmp目录下建立一个唯一的PID

        // 根据参数设置QUEUE环境变量
        $QUEUE = in_array('--queue', $this->keys) ? $this->args['--queue'] : '*';
        if (empty($QUEUE)) {
            die("Set QUEUE env var containing the list of queues to work.\n");
        }
        $this->queues = explode(',', $QUEUE);

        // 根据参数设置INTERVAL环境变量
        $interval = in_array('--interval', $this->keys) ? $this->args['--interval'] : 5;
        putenv("INTERVAL={$interval}");

        // 根据参数设置COUNT环境变量
        $count = in_array('--count', $this->keys) ? $this->args['--count'] : 1;
        putenv("COUNT={$count}");

        // 根据参数设置APP_INCLUDE环境变量
        $app = in_array('--app', $this->keys) ? $this->args['--app'] : '';
        putenv("APP_INCLUDE={$app}");

        // 根据参数设置PIDFILE环境变量
        $pid = in_array('--pid', $this->keys) ? $this->args['--pid'] : '';
        putenv("PIDFILE={$pid}");

        // 根据参数设置VVERBOSE环境变量
        $debug = in_array('--debug', $this->keys) ? $this->args['--debug'] : '';
        putenv("VVERBOSE={$debug}");
    }

    public function index()
    {
        $act = getenv('Q_ACTION');
        switch ($act) {
            case 'stop':
                $this->stop();
                break;
            case 'status':
                $this->status();
                break;
            default:
                $this->start();
        }
    }

    /**
     * 开始队列
     */
    public function start()
    {
        // 载入任务类
        $path = COMMON_PATH . "Job";
        $flag = \FilesystemIterator::KEY_AS_FILENAME;
        $glob = new \FilesystemIterator($path, $flag);
        foreach ($glob as $file) {
            if('php' === pathinfo($file, PATHINFO_EXTENSION))
                require realpath($file);
        }

        $logLevel = 0;
        $LOGGING = getenv('LOGGING');
        $VERBOSE = getenv('VERBOSE');
        $VVERBOSE = getenv('VVERBOSE');
        if (!empty($LOGGING) || !empty($VERBOSE)) {
            $logLevel = Resque\Worker::LOG_NORMAL;
        } else {
            if (!empty($VVERBOSE)) {
                $logLevel = Resque\Worker::LOG_VERBOSE;
            }
        }

        $APP_INCLUDE = getenv('APP_INCLUDE');
        if ($APP_INCLUDE) {
            if (!file_exists($APP_INCLUDE)) {
                die('APP_INCLUDE (' . $APP_INCLUDE . ") does not exist.\n");
            }
            require_once $APP_INCLUDE;
        }

        $interval = 5;
        $INTERVAL = getenv('INTERVAL');
        if (!empty($INTERVAL)) {
            $interval = $INTERVAL;
        }

        $count = 1;
        $COUNT = getenv('COUNT');
        if (!empty($COUNT) && $COUNT > 1) {
            $count = $COUNT;
        }

        if ($count > 1) {
            for ($i = 0; $i < $count; ++$i) {
                $pid = pcntl_fork();
                if ($pid == -1) {
                    die("Could not fork worker " . $i . "\n");
                } // Child, start the worker
                else {
                    if (!$pid) {
                        $worker = new Resque\Worker($this->queues);
                        $worker->logLevel = $logLevel;
                        fwrite(STDOUT, '*** Starting worker ' . $worker . "\n");
                        $worker->work($interval);
                        break;
                    }
                }
            }
        } // Start a single worker
        else {
            $worker = new Resque\Worker($this->queues);
            $worker->logLevel = $logLevel;

            $PIDFILE = getenv('PIDFILE');
            if ($PIDFILE) {
                file_put_contents($PIDFILE, getmypid()) or
                die('Could not write PID information to ' . $PIDFILE);
            }

            fwrite(STDOUT, '*** Starting worker ' . $worker . "\n");
            $worker->work($interval);
        }
    }

    /**
     * 停止队列
     */
    public function stop()
    {
        $worker = new Resque\Worker($this->queues);
        $worker->shutdown();
    }

    /**
     * 查看某个任务状态
     */
    public function status()
    {
        $id = in_array('--id', $this->keys) ? $this->args['--id'] : '';
        $status = new \Resque\Job\Status($id);
        if (!$status->isTracking()) {
            die("Resque is not tracking the status of this job.\n");
        }

        echo "Tracking status of " . $id . ". Press [break] to stop.\n\n";
        while (true) {
            fwrite(STDOUT, "Status of " . $id . " is: " . $status->get() . "\n");
            sleep(1);
        }
    }
}

新增队列配置

在公共config.php中新增队列配置,如下

/* 消息队列配置 */
'QUEUE' => array(
    'type' => 'redis',
    'host' => '127.0.0.1',
    'port' =>  '6379',
    'persistent' => false, //是否启用
    'prefix' => 'queue',
    'password' =>  '', // 密码
),

新增队列初始化行为

app_init行为中新增队列初始化的行为,run内容为

public function run()
{
	// 处理队列配置
    $config = C('QUEUE');
    if ($config) {
        vendor('php-resque.autoload');
        // 初始化队列服务
        $select = isset($config['select']) ? $config['select'] : 0;
        $password = isset($config['password']) ? $config['password'] : null;
        $persistent = isset($config['persistent']) ? $config['persistent'] : false;
        $timeout = isset($config['timeout']) ? $config['timeout'] : 30;
        $server = $config['host'] . ":" . $config['port'];
        \Resque::setBackend($server, $select, $password, $persistent, $timeout);
        // 初始化缓存前缀
        if(isset($config['prefix']) && !empty($config['prefix'])){
            \Resque\Redis::prefix($config['prefix']);
        }
    }
}

到此,整个队列服务基本已配置完成。

接下来就要创建队列执行的任务了

Jobs

创建 Jobs

目前任务类固定在Common模块的Job中,命名格式为XxxxJob.class.php

<?php
namespace Common\Job;
class XxxxJob
{
    public function perform()
    {
        $args = $this->args;
        fwrite(STDOUT, json_encode($args) . PHP_EOL);
    }
}

要获取队列中传入的参数值请使用$this->args

任务perform方法中抛出的任何异常都会导致任务失败,所以在写任务业务时要小心,并且处理异常情况。

任务也有setUptearDown方法,如果定义了一个setUp方法,那么它将在perform方法之前调用,如果定义了一个tearDown方法,那么它将会在perform方法之后调用。

<?php
namespace Common\Job;
class XxxxJob
{
	public function setUp()
	{
		// ... Set up environment for this job
	}
	
	public function perform()
	{
		// .. Run job
	}
	
	public function tearDown()
	{
		// ... Remove environment for this job
	}
}

添加任务到队列中

在程序控制器的任意方法中引入队列类库时,使用Resque::enqueue方法执行入栈,Resque::enqueue方法有四个参数,第一个是当前的队列名称,第二个参数为任务类,第三个是传入的参数,第四个表示是否返回工作状态的令牌

vendor('php-resque.autoload');	// 引入队列类库
$job = '\\Common\\Job\\XxxxJob'; // 定义任务类
// 定义参数
$args = array(
    'time' => time(),
    'array' => array(
        'test' => 'test',
    ),
);
// 入栈
$jobId = \Resque::enqueue('default', $job, $args, true);
echo "Queued job ".$jobId."\n\n";

如果要查看当前任务的工作状态可以使用如下方法:

$status = new \Resque\Job\Status($jobId);
echo $status->get(); // Outputs the status

任务的工作状态值有专门的常量\Resque\Job\Status对应类。 具体的对应关系如下:

  • Resque\Job\Status::STATUS_WAITING - 任务在队列中
  • Resque\Job\Status::STATUS_RUNNING - 任务正在运行
  • Resque\Job\Status::STATUS_FAILED - 任务执行失败
  • Resque\Job\Status::STATUS_COMPLETE - 任务执行完成
  • false - 无法获取状态 - 检查令牌是否有效?

任务的过期时间为任务完成后的24小时后,也可以定义过期类的stop()方法

队列任务启动

在命令行中转到项目根目录,执行

$ php resque start

即可启动服务

启动时也可以加入部分参数:

  • --queue - 需要执行的队列的名字,可以为空,也可以多个以,分割
  • --interval -在队列中循环的间隔时间,即完成一个任务后的等待时间,默认是5秒
  • --count - 需要创建的Worker的数量。所有的Worker都具有相同的属性。默认是创建1个Worker
  • --debug - 设置“1”启用更啰嗦模式,会输出详细的调试信息
  • --pid - 手动指定PID文件的位置,适用于单Worker运行方式

如:

$ php resque start --queue=default --pid=/tmp/resque.pid --debug=1

如果要使用守护进程方式启动则需要在最后加入&即可

如:

$ php resque start --queue=default --pid=/tmp/resque.pid --debug=1 &

也可以配合supervisord实现进程长驻

更多的操作请参考php-resque官方文档。