BeansClient is a pure 7.1+ dependency-free client for beanstalkd work queue with thorough unit-testing. Library uses PSR-4 autoloader standard and always has 100% tests coverage.
Library gives you a simple way to provide your own Connection implementation, in cases when you need to log requests and responses or to proxy traffic to non-standard transport.
BeansClient supports whole bunch of commands and responses specified in protocol for version 1.10
- It is well documented
- I'm eating my own sweet pie=)
- The only library i can compare beansclient is pheanstalk. And the only and the hugest beansclient's advantage - it is much more handy to use.
4ex: pheanstalk will throw an exception in almost every time when you are not expecting it =)
Want to ignore the last tube watched? Exception.
Want to kick job, but other worker already done that? One more!
Want to pause the tube but other worker deleted the last job (tube vanishes when nobody is watching it and there are no jobs)? Here you go, my friend, you are loving them so much! And the weirdest one: there is possibility that server will run out of memory trying to grow the priority queue data structure. In that case job will be assigned the id, and it will be placed to the buried queue. But with pheanstalk the only way you can figure out that id is by parsing the exception message ;)
There is no huge problem in it, but pheanstalk always forces you to wrap its every call withtry/catch
blocks in order to handle exception that will be thrown in common situation actually.
It became the main reason why i wrote beansclient.
- PHP 7.1+
- beanstalkd 1.10+
Install with composer
composer require xobotyi/beansclient
<?php
use xobotyi\beansclient\BeansClient;
use xobotyi\beansclient\Connection;
$connection = new Connection('127.0.0.1', 11300, 2, true);
$beansClient = new BeansClient($connection);
## ##
# PRODUCER #
## ##
$beansClient->useTube('myAwesomeTube')
->put("job's payload");
## ##
# WORKER #
## ##
$job = $beansClient->watchTube('myAwesomeTube')
->reserve();
if ($job->id) {
echo "Hey, i received first {$job->payload} of job with id {$job->id}\n";
$job->delete();
echo "And i've done it!\n";
}
else {
echo "So sad, i have nothing to do";
}
echo "Am I still connected? \n" . ($beansClient->getConnection()->isActive() ? 'Yes' : 'No') . "\n";
Connection class responsible for transport between client and beanstalkd server.
Parameters:
- host
string
[optional, default: 127.0.0.1] - can be a host, or the path to a unix domain socket - port
int
[optional, default: 11300] - port to connect, -1 for sockets - connectionTimeout
int
[optional, default: 2] - connection timeout in seconds, 0 means unlimited - persistent
bool
[optional, default: false] - whether to use persistent connection or not. Iftrue
- connection will not be closed with destruction of Connection instance
Throws:
xobotyi\beansclient\Exception\Connection
- on inability to open connection
Example:
use xobotyi\beansclient\Connection;
$socket = new Connection(); // defaults
$connection = new Connection(null,null,null,new SocketFactory('unix:///tmp/beanstalkd.sock',-1,2,SocketFactory::IMPL_STREAM)); // unix domain socket.
The main class of library. Puts everything together and makes the magic! Parameters:
- connection
xobotyi\beansclient\Exception\Connection
- Connection instance - serializer
xobotyi\beansclient\Serializer\Json
[optional, default: null] - Serializer instance
Throws:
xobotyi\beansclient\Exception\Client
- if constructor got inactive connection
Example:
use xobotyi\beansclient\BeansClient;
use xobotyi\beansclient\Connection;
$client = new BeansClient(new Connection());
$client->getConnection()->isActive(); // true
$client->getConnection()->getHost(); // 127.0.0.1
This class provides handy way to manage a single job. Even if it havent been reserved by worker.
Due to the fact that we cant get all the data in one request (job's payload available through the peek
command and all the other data through the stats-job
command) and the desire to minimize the number of requests - needed data will bw requested only in case of it's usage, 4ex:
$job = new Job($beansclientInstance, 13); // creating Job instance
$job->payload; // here will be performed a single 'peek' request to the beanstalkd server
// and vice-versa
$job->tube; // will perform 'stats-job' request
There is a cool and useful thing about Job: if it has delayed or reserved state it's instance will have a non-zero property releaseTime
and always-actual property timeLeft
. When the time has come - will be performed extra stats-job
request to synchronize it's data.
But be careful! Due to calculation of that value, it can have a deviation if range of 1 second
$job = new Job($beansclientInstance, 13, 'reserved');
$job->timeLeft; // for example it has 13 seconds to release
sleep(3);
$job->timeLeft; // 10
$job->sate; // reserved
sleep(11);
$job->timeLeft; // 0
$job->sate; // ready
Beanstalkd job's payload can be only a string, so if we want to use non-string payload we have to serialize it.
Serializer
is an interface that requires only 2 methods: serialize(mixed $data):string
and unserialize(string $str):mixed
Beansclient provides JSON serializer out of the box, but you can use any serializer you want, just implement the Serializer
interface.
use xobotyi\beansclient\BeansClient;
use xobotyi\beansclient\Serializer\JsonSerializer;
$client = new BeansClient(new Connection(), new JsonSerializer());
$client->getSerializer(); // instance of \xobotyi\beansclient\Serializer\JsonSerializer
#or
$client = new BeansClient(new Connection());
$beansClient->setSerializer(new JsonSerializer())
->getSerializer(); // instance of \xobotyi\beansclient\Serializer\Json
If you will not provide serializer with second parameter of BeansClient
constructor, payload in put
command mist be string or stringable value.
Inserts a job into the client's currently used tube (see the "useTube")
Return value:
\xobotyi\beansclient\Job
instance
Example:
$client->put('myAwesomePayload', 2048, 0, 60)->payload; // myAwesomePayload
# or, if we use payload encoder
$client->put(['it'=>'can be any', 'thing'], 2048, 0, 60)->id; //2
Returns a newly-reserved job. Once a job is reserved for the client, the client has limited time to run (TTR) the job before the job times out. When the job times out, the server will put the job back into the ready queue. Both the TTR and the actual time left can be found in response to the statsJob command. If more than one job is ready, beanstalkd will choose the one with the smallest priority value. Within each priority, it will choose the one that was received first.
Return value:
\xobotyi\beansclient\Job
instance
NULL
if there is no ready jobs in queue
Example:
$client->reserve()->id; // 1
$client->reserve()->id; // 2
$client->reserve()->id; // null
Removes a job from the server entirely. It is normally used by the client when the job has successfully run to completion. A client can delete jobs that it has reserved, ready jobs, delayed jobs, and jobs that are buried.
Return value:
BOOLEAN
true
if job exists and been deleted;false
if job not exists.
Example:
$client->delete(1); // true
$client->delete(3); // false
Puts a reserved job back into the ready queue (and marks its state as "ready") to be run by any client. It is normally used when the job fails because of a transitory error.
Return value:
STRING
- 'RELEASED' if job been released;
- 'BURIED' if the server ran out of memory trying to grow the priority queue data structure.
NULL
if job not exists
Example:
$client->delete(2, 2048, 0); // 'RELEASED'
$client->delete(3); // null
Puts a job into the "buried" state. Buried jobs are put into a FIFO linked list and will not be touched by the server again until a client kicks them with the kick or kickJob command.
Return value:
BOOLEAN
true
if job exists and been buried;false
if job not exists.
Example:
$client->bury(2, 2048, 0); // true
$client->bury(3); // false
Allows a worker to request more time to work on a job. This is useful for jobs that potentially take a long time, but you still want the benefits of a TTR pulling a job away from an unresponsive worker. A worker may periodically tell the server that it's still alive and processing a job (e.g. it may do this on DEADLINE_SOON). The command postpones the auto release of a reserved job until TTR seconds from when the command is issued.
Return value:
BOOLEAN
true
if job exists and been touched;false
if job not exists.
Example:
$client->touch(2); // true
$client->touch(3); // false
Applies only to the currently used tube. It moves jobs into the ready queue. If there are any buried jobs, it will only kick buried jobs. Otherwise it will kick delayed jobs.
Return value:
INTEGER
number of jobs actually kicked.
Example:
$client->kick(3); // 1
$client->kick(3); // 0
A variant of kick that operates with a single job identified by its job id. If the given job id exists and is in a buried or delayed state, it will be moved to the ready queue of the the same tube where it currently belongs.
Return value:
BOOLEAN
true
if job exists and been kicked;false
if job not exists.
Example:
$client->kickJob(2); // true
$client->kickJob(3); // false
Returns the tube currently being used by the client.
Return value:
STRING
Example:
$client->listTubeUsed(); // 'default'
Subsequent put commands will put jobs into the tube specified by this command. If no use command has been issued, jobs will be put into the tube named "default"
Return value:
xobotyi\beansclient\BeansClient
instance
Example:
$client->useTube('awesomeTube')
->listTubeUsed(); // 'awesomeTube'
Returns a list of all existing tubes.
Return value:
ARRAY
Example:
$client->listTubes(); // ['default', 'awesomeTube']
Returns a list tubes currently being watched by the client.
Return value:
ARRAY
Example:
$client->listTubesWatched(); // ['default']
Command adds the named tube to the watch list for the current connection. A reserve command will take a job from any of the tubes in the watch list. For each new connection, the watch list initially consists of one tube, named "default".
Return value:
xobotyi\beansclient\BeansClient
instance
Example:
$client->listTubesWatched(); // ['default']
$client->watchTube('awesomeTube')
->listTubesWatched(); // ['default', 'awesomeTube']
Removes the named tube from the watch list for the current connection.
Return value:
xobotyi\beansclient\BeansClient
instance
Example:
$client->listTubesWatched(); // ['default']
$client->watchTube('awesomeTube')
->listTubesWatched(); // ['default', 'awesomeTube']
$client->ignoreTube('awesomeTube')
->ignoreTube('myAwesomeTube2')
->listTubesWatched(); // ['default']
Gives statistical information about the system as a whole.
Return value:
ARRAY
Example:
$client->stats();
/*[
'current-jobs-urgent' => '0',
'current-jobs-ready' => '0',
'current-jobs-reserved' => '0',
'current-jobs-delayed' => '0',
'current-jobs-buried' => '0',
'cmd-put' => '12',
'cmd-peek' => '0',
'cmd-peek-ready' => '0',
'cmd-peek-delayed' => '0',
'cmd-peek-buried' => '0',
'cmd-reserve' => '0',
'cmd-reserve-with-timeout' => '12',
'cmd-delete' => '12',
'cmd-release' => '0',
'cmd-use' => '12',
'cmd-watch' => '14',
'cmd-ignore' => '0',
'cmd-bury' => '0',
'cmd-kick' => '0',
'cmd-touch' => '0',
'cmd-stats' => '1',
'cmd-stats-job' => '0',
'cmd-stats-tube' => '0',
'cmd-list-tubes' => '6',
'cmd-list-tube-used' => '0',
'cmd-list-tubes-watched' => '15',
'cmd-pause-tube' => '0',
'job-timeouts' => '0',
'total-jobs' => '12',
'max-job-size' => '65535',
'current-tubes' => '3',
'current-connections' => '2',
'current-producers' => '2',
'current-workers' => '2',
'current-waiting' => '0',
'total-connections' => '6',
'pid' => '1',
'version' => '1.10' (length=4),
'rusage-utime' => '0.040000',
'rusage-stime' => '0.000000',
'uptime' => '41384',
'binlog-oldest-index' => '0',
'binlog-current-index' => '0',
'binlog-records-migrated' => '0',
'binlog-records-written' => '0',
'binlog-max-size' => '10485760',
'id' => 'f7546f4280926fcd',
'hostname' => '2feafb46e549'
]*/
Gives statistical information about the specified tube if it exists.
Return value:
ARRAY
NULL
if tube not exists.
Example:
$client->statsTube('myAwesomeTube');
/*[
'name' => 'myAwesomeTube',
'current-jobs-urgent' => '0',
'current-jobs-ready' => '0',
'current-jobs-reserved' => '0',
'current-jobs-delayed' => '0',
'current-jobs-buried' => '0',
'total-jobs' => '0',
'current-using' => '0',
'current-watching' => '1',
'current-waiting' => '0',
'cmd-delete' => '0',
'cmd-pause-tube' => '0',
'pause' => '0',
'pause-time-left' => '0'
]*/
Gives statistical information about the specified job if it exists.
Return value:
ARRAY
NULL
if job not exists.
Example:
$client->statsJob(2);
/*[
'id' => '2',
'tube' => 'myAwesomeTube',
'state' => 'reserved',
'pri' => '2048',
'age' => '0',
'delay' => '0',
'ttr' => '30',
'time-left' => '29',
'file' => '0',
'reserves' => '1',
'timeouts' => '0',
'releases' => '0',
'buries' => '0',
'kicks' => '0'
]*/