/beansclient

PHP7.1+ client for beanstalkd work queue with no dependencies

Primary LanguagePHPMIT LicenseMIT

BeansClient

About

BeansClient is a pure 7.1+ dependency-free client for beanstalkd work queue with thorough unit-testing. Library uses PSR-4 autoloader standard and always has 100% tests coverage.
Library gives you a simple way to provide your own Connection implementation, in cases when you need to log requests and responses or to proxy traffic to non-standard transport.

BeansClient supports whole bunch of commands and responses specified in protocol for version 1.10

Why BeansClient?

  1. It is well documented
  2. I'm eating my own sweet pie=)
  3. The only library i can compare beansclient is pheanstalk. And the only and the hugest beansclient's advantage - it is much more handy to use.
    4ex: pheanstalk will throw an exception in almost every time when you are not expecting it =)
    Want to ignore the last tube watched? Exception.
    Want to kick job, but other worker already done that? One more!
    Want to pause the tube but other worker deleted the last job (tube vanishes when nobody is watching it and there are no jobs)? Here you go, my friend, you are loving them so much! And the weirdest one: there is possibility that server will run out of memory trying to grow the priority queue data structure. In that case job will be assigned the id, and it will be placed to the buried queue. But with pheanstalk the only way you can figure out that id is by parsing the exception message ;)

    There is no huge problem in it, but pheanstalk always forces you to wrap its every call with try/catch blocks in order to handle exception that will be thrown in common situation actually.
    It became the main reason why i wrote beansclient.

Contents

  1. Requirements
  2. Installation
  3. Usage
  4. Docs

Requirements


Installation

Install with composer

composer require xobotyi/beansclient

Usage

<?php
use xobotyi\beansclient\BeansClient;
use xobotyi\beansclient\Connection;

$connection  = new Connection('127.0.0.1', 11300, 2, true);
$beansClient = new BeansClient($connection);

##            ##
#   PRODUCER   #
##            ##

$beansClient->useTube('myAwesomeTube')
            ->put("job's payload");

##            ##
#    WORKER    #
##            ##

$job = $beansClient->watchTube('myAwesomeTube')
                   ->reserve();

if ($job->id) {
    echo "Hey, i received first {$job->payload} of job with id {$job->id}\n";

    $job->delete();

    echo "And i've done it!\n";
}
else {
    echo "So sad, i have nothing to do";
}

echo "Am I still connected? \n" . ($beansClient->getConnection()->isActive() ? 'Yes' : 'No') . "\n";

Docs

Classes


beansclient\Connection

Connection class responsible for transport between client and beanstalkd server.

Parameters:

  • hoststring [optional, default: 127.0.0.1] - can be a host, or the path to a unix domain socket
  • portint [optional, default: 11300] - port to connect, -1 for sockets
  • connectionTimeoutint [optional, default: 2] - connection timeout in seconds, 0 means unlimited
  • persistentbool [optional, default: false] - whether to use persistent connection or not. If true - connection will not be closed with destruction of Connection instance

Throws:
xobotyi\beansclient\Exception\Connection - on inability to open connection

Example:

use xobotyi\beansclient\Connection;

$socket = new Connection(); // defaults
$connection = new Connection(null,null,null,new SocketFactory('unix:///tmp/beanstalkd.sock',-1,2,SocketFactory::IMPL_STREAM)); // unix domain socket.

beansclient\BeansClient

The main class of library. Puts everything together and makes the magic! Parameters:

  • connectionxobotyi\beansclient\Exception\Connection - Connection instance
  • serializerxobotyi\beansclient\Serializer\Json [optional, default: null] - Serializer instance

Throws:
xobotyi\beansclient\Exception\Client - if constructor got inactive connection

Example:

use xobotyi\beansclient\BeansClient;
use xobotyi\beansclient\Connection;

$client = new BeansClient(new Connection());
$client->getConnection()->isActive();   // true
$client->getConnection()->getHost();    // 127.0.0.1

beansclient\Job

This class provides handy way to manage a single job. Even if it havent been reserved by worker.
Due to the fact that we cant get all the data in one request (job's payload available through the peek command and all the other data through the stats-job command) and the desire to minimize the number of requests - needed data will bw requested only in case of it's usage, 4ex:

$job = new Job($beansclientInstance, 13); // creating Job instance

$job->payload;  // here will be performed a single 'peek' request to the beanstalkd server
                // and vice-versa
$job->tube;     // will perform 'stats-job' request

There is a cool and useful thing about Job: if it has delayed or reserved state it's instance will have a non-zero property releaseTime and always-actual property timeLeft. When the time has come - will be performed extra stats-job request to synchronize it's data.
But be careful! Due to calculation of that value, it can have a deviation if range of 1 second

$job = new Job($beansclientInstance, 13, 'reserved');

$job->timeLeft; // for example it has 13 seconds to release
sleep(3);
$job->timeLeft; // 10
$job->sate;     // reserved
sleep(11);
$job->timeLeft; // 0
$job->sate;     // ready

beansclient\Serializer

Beanstalkd job's payload can be only a string, so if we want to use non-string payload we have to serialize it. Serializer is an interface that requires only 2 methods: serialize(mixed $data):string and unserialize(string $str):mixed

Beansclient provides JSON serializer out of the box, but you can use any serializer you want, just implement the Serializer interface.

use xobotyi\beansclient\BeansClient;
use xobotyi\beansclient\Serializer\JsonSerializer;

$client = new BeansClient(new Connection(), new JsonSerializer());
$client->getSerializer(); //  instance of \xobotyi\beansclient\Serializer\JsonSerializer

#or

$client = new BeansClient(new Connection());
$beansClient->setSerializer(new JsonSerializer())
            ->getSerializer(); //  instance of \xobotyi\beansclient\Serializer\Json

If you will not provide serializer with second parameter of BeansClient constructor, payload in put command mist be string or stringable value.

Jobs commands


put($payload[, int $priority[, int $delay[, int $ttr]]])

Inserts a job into the client's currently used tube (see the "useTube")

Return value:
\xobotyi\beansclient\Job instance

Example:

$client->put('myAwesomePayload', 2048, 0, 60)->payload; // myAwesomePayload
# or, if we use payload encoder 
$client->put(['it'=>'can be any', 'thing'], 2048, 0, 60)->id; //2

reserve([?int $timeout])

Returns a newly-reserved job. Once a job is reserved for the client, the client has limited time to run (TTR) the job before the job times out. When the job times out, the server will put the job back into the ready queue. Both the TTR and the actual time left can be found in response to the statsJob command. If more than one job is ready, beanstalkd will choose the one with the smallest priority value. Within each priority, it will choose the one that was received first.

Return value:
\xobotyi\beansclient\Job instance
NULL if there is no ready jobs in queue

Example:

$client->reserve()->id; // 1
$client->reserve()->id; // 2
$client->reserve()->id; // null

delete(int $jobId)

Removes a job from the server entirely. It is normally used by the client when the job has successfully run to completion. A client can delete jobs that it has reserved, ready jobs, delayed jobs, and jobs that are buried.

Return value:
BOOLEAN

  • true if job exists and been deleted;
  • false if job not exists.

Example:

$client->delete(1); // true
$client->delete(3); // false

release(int $jobId[, int $priority[, int $delay]])

Puts a reserved job back into the ready queue (and marks its state as "ready") to be run by any client. It is normally used when the job fails because of a transitory error.

Return value:
STRING

  • 'RELEASED' if job been released;
  • 'BURIED' if the server ran out of memory trying to grow the priority queue data structure.

NULL if job not exists

Example:

$client->delete(2, 2048, 0); // 'RELEASED'
$client->delete(3); // null

bury(int $jobId[, int $priority])

Puts a job into the "buried" state. Buried jobs are put into a FIFO linked list and will not be touched by the server again until a client kicks them with the kick or kickJob command.

Return value:
BOOLEAN

  • true if job exists and been buried;
  • false if job not exists.

Example:

$client->bury(2, 2048, 0); // true
$client->bury(3); // false

touch(int $jobId)

Allows a worker to request more time to work on a job. This is useful for jobs that potentially take a long time, but you still want the benefits of a TTR pulling a job away from an unresponsive worker. A worker may periodically tell the server that it's still alive and processing a job (e.g. it may do this on DEADLINE_SOON). The command postpones the auto release of a reserved job until TTR seconds from when the command is issued.

Return value:
BOOLEAN

  • true if job exists and been touched;
  • false if job not exists.

Example:

$client->touch(2); // true
$client->touch(3); // false

kick(int $count)

Applies only to the currently used tube. It moves jobs into the ready queue. If there are any buried jobs, it will only kick buried jobs. Otherwise it will kick delayed jobs.

Return value:
INTEGER number of jobs actually kicked.

Example:

$client->kick(3); // 1
$client->kick(3); // 0

kickJob(int $jobId)

A variant of kick that operates with a single job identified by its job id. If the given job id exists and is in a buried or delayed state, it will be moved to the ready queue of the the same tube where it currently belongs.

Return value:
BOOLEAN

  • true if job exists and been kicked;
  • false if job not exists.

Example:

$client->kickJob(2); // true
$client->kickJob(3); // false

Tubes commands


listTubeUsed()

Returns the tube currently being used by the client.

Return value:
STRING

Example:

$client->listTubeUsed(); // 'default'

useTube(string $tube)

Subsequent put commands will put jobs into the tube specified by this command. If no use command has been issued, jobs will be put into the tube named "default"

Return value:
xobotyi\beansclient\BeansClient instance

Example:

$client->useTube('awesomeTube')
       ->listTubeUsed(); // 'awesomeTube'

listTubes()

Returns a list of all existing tubes.

Return value:
ARRAY

Example:

$client->listTubes(); // ['default', 'awesomeTube']

listTubesWatched()

Returns a list tubes currently being watched by the client.

Return value:
ARRAY

Example:

$client->listTubesWatched(); // ['default']

watchTube(string $tube)

Command adds the named tube to the watch list for the current connection. A reserve command will take a job from any of the tubes in the watch list. For each new connection, the watch list initially consists of one tube, named "default".

Return value:
xobotyi\beansclient\BeansClient instance

Example:

$client->listTubesWatched(); // ['default']

$client->watchTube('awesomeTube')
       ->listTubesWatched(); // ['default', 'awesomeTube']

ignoreTube(string $tube)

Removes the named tube from the watch list for the current connection.

Return value:
xobotyi\beansclient\BeansClient instance

Example:

$client->listTubesWatched(); // ['default']

$client->watchTube('awesomeTube')
       ->listTubesWatched(); // ['default', 'awesomeTube']
       
$client->ignoreTube('awesomeTube')
       ->ignoreTube('myAwesomeTube2')
       ->listTubesWatched(); // ['default']

Stats commands


stats()

Gives statistical information about the system as a whole.

Return value:
ARRAY

Example:

$client->stats();
/*[
    'current-jobs-urgent' => '0',
    'current-jobs-ready' => '0',
    'current-jobs-reserved' => '0',
    'current-jobs-delayed' => '0',
    'current-jobs-buried' => '0',
    'cmd-put' => '12',
    'cmd-peek' => '0',
    'cmd-peek-ready' => '0',
    'cmd-peek-delayed' => '0',
    'cmd-peek-buried' => '0',
    'cmd-reserve' => '0',
    'cmd-reserve-with-timeout' => '12',
    'cmd-delete' => '12',
    'cmd-release' => '0',
    'cmd-use' => '12',
    'cmd-watch' => '14',
    'cmd-ignore' => '0',
    'cmd-bury' => '0',
    'cmd-kick' => '0',
    'cmd-touch' => '0',
    'cmd-stats' => '1',
    'cmd-stats-job' => '0',
    'cmd-stats-tube' => '0',
    'cmd-list-tubes' => '6',
    'cmd-list-tube-used' => '0',
    'cmd-list-tubes-watched' => '15',
    'cmd-pause-tube' => '0',
    'job-timeouts' => '0',
    'total-jobs' => '12',
    'max-job-size' => '65535',
    'current-tubes' => '3',
    'current-connections' => '2',
    'current-producers' => '2',
    'current-workers' => '2',
    'current-waiting' => '0',
    'total-connections' => '6',
    'pid' => '1',
    'version' => '1.10' (length=4),
    'rusage-utime' => '0.040000',
    'rusage-stime' => '0.000000',
    'uptime' => '41384',
    'binlog-oldest-index' => '0',
    'binlog-current-index' => '0',
    'binlog-records-migrated' => '0',
    'binlog-records-written' => '0',
    'binlog-max-size' => '10485760',
    'id' => 'f7546f4280926fcd',
    'hostname' => '2feafb46e549'
]*/

statsTube(string $tube)

Gives statistical information about the specified tube if it exists.

Return value:
ARRAY
NULL if tube not exists.

Example:

$client->statsTube('myAwesomeTube');
/*[
    'name' => 'myAwesomeTube',
    'current-jobs-urgent' => '0',
    'current-jobs-ready' => '0',
    'current-jobs-reserved' => '0',
    'current-jobs-delayed' => '0',
    'current-jobs-buried' => '0',
    'total-jobs' => '0',
    'current-using' => '0',
    'current-watching' => '1',
    'current-waiting' => '0',
    'cmd-delete' => '0',
    'cmd-pause-tube' => '0',
    'pause' => '0',
    'pause-time-left' => '0'
]*/

statsJob(int $jobId)

Gives statistical information about the specified job if it exists.

Return value:
ARRAY
NULL if job not exists.

Example:

$client->statsJob(2);
/*[
    'id' => '2',
    'tube' => 'myAwesomeTube',
    'state' => 'reserved',
    'pri' => '2048',
    'age' => '0',
    'delay' => '0',
    'ttr' => '30',
    'time-left' => '29',
    'file' => '0',
    'reserves' => '1',
    'timeouts' => '0',
    'releases' => '0',
    'buries' => '0',
    'kicks' => '0'
]*/