Running an intermediary daemon and an advanced job protocol on top of Gearman and a (My)SQL table, Gearbox provides job scheduling, job dependency, retries, the possibility of advanced analytics and a complete monitoring and audit trail for your job system.
The protocol is fairly simple to implement, and there is a full suite of ready-made command-line tools to hit the ground running.
-
Jobs pre-emptively fail if the Gearman function to run them is not available when they’re ready for scheduling. This is in contrast to Gearman, which will queue the job until the function is available.
-
Jobs can depend on other jobs in two different ways:
afterrequirements andbeforerequirements. A job can only have one of each, but together these two low-level controls compose into useful patterns. Both requirements declare that a job may only run after one or some others have completed:afteris N:1 (many jobs running after one),beforeis 1:N (one job running only after N others are all done). -
Coming from Gearman, the
dedupeof a job is like theuniqueid, except that in the case of scheduled or dependent jobs, two jobs with the same dedupe can be input so long as they're not scheduled to run immediately, and if one job later becomes schedulable while another with the same dedupe is current running, the former is marked as a duplicate of the latter, withwatchqueries being redirected transparently. -
There are three special methods under the
gearbox\corenamespace. They discard all input unless specified, return onlynull, and are always available. They act as meta jobs for dependency composition.::noopruns immediately as scheduled.::noopwaitruns as scheduled but pauses immediately and waits.::noopgotakes the ID of a::noopwaitjob and unpauses it.
All tools log to the console, and also have a debug facility. Generally
debugging can be enabled by running with the DEBUG=gearbox:* env variable, and
SQL queries can be shown with DEBUG=knex:query.
While methods formally follow the name\space::method format, in command-line
arguments they can be written with forward slashes instead (name/space::method)
for ease of use and to avoid escaping.
The Gearbox daemon. Connects to MySQL, connects to Gearman, and manages the lot.
Takes no options, instead is controlled via the environment:
- MySQL connection:
MYSQL_HOSTNAMEMYSQL_DATABASEMYSQL_USERMYSQL_PASSWORD
- Gearman connection (also used by other tools below):
GEARMAN_SERVER
A full command-line client to the gearbox interface. You can:
queuejobs, which displays a job ID and returns immediately;watchjobs, which waits for a job given its ID and prints its output;runjobs, which does both of the above in one convenient command;- issue
rawgearbox RPC calls, for advanced use or debugging; - get the
statusof all current jobs or of specific job IDs; - get some
statsabout a method, including average run times and current use.
You can watch jobs several times! That is, you can:
# Queue a job...
$ g-client queue a::job
...
==> Job ID: 208
# Watch it...
$ g-client watch 208
# In another terminal, on another machine, watch it also:
$ g-client watch 208
# ^ both of these will return and print the job's output when the job ends.You can also "watch" a completed (or errored) job after the fact, which will return immediately and print the job's output.
When passing arguments for jobs:
a b cis interpreted as["a", "b", "c"];a=b c=dis interpreted as{"a": "b", "c": "d"};a=b cis interpreted as{"a": "b", "c": null};1 2 3is interpreted as["1", "2", "3"];1 2 3with-Jis interpreted as[1, 2, 3];a=true cwith-Jis interpreted as{"a": true, "c": null};'{"a":[123,true]}'with-Jis interpreted as{"a":[123,true]};'{"a":[123,true]}'without-Jis interpreted as"{"a":[123,true]}".
Operates a single gearbox method based on command-line arguments.
Usage: worker [options] <name/space::method> <command> [arguments...]
Options:
--help Show help [boolean]
--version Show version number [boolean]
--concurrency, -j how many jobs can run at the same time.
[number] [default: 1]
--input, -I handle job input
[choices: "stdin", "append", "prepend", "ignore"] [default: "stdin"]
--output, -O handle command output
[choices: "string", "buffer", "json", "nl-json", "ignore"] [default: "string"]
--log log every run to a file [string]
--log-output also write the output of each job to the log [boolean]
--log-no-time don’t prepend timestamps to log lines [boolean]
--quiet, -q output minimally [boolean]
See the multiworker option description below for details.
Reads one or more TOML configuration files and sets up methods and workers as described. Supports reloading the config via signal and gearman job.
$ g-multiworker /path/to/config.tomlHere's a sample config file:
[config]
reload_worker = true
[Test.sleep]
command = "sleep"
input = "append"
output = "ignore"
concurrency = 4
[Test.Foo.echo]
command = "echo test"
input = "append"
concurrency = 10
[Php.eval]
command = ['php', '-R', '$expr = implode(" ", json_decode($argn)); echo json_encode(eval("return $expr;"))."\n";']
output = "nl-json"
concurrency = 1
log = "/var/log/gearbox/php_eval.log"
log_output = trueThis defines three methods:
-
Test::sleeprunssleepin a shell (commandis a string) with any arguments the job brings as inputs appended to the command string. It discards (ignore) any output from the command, and up to 4 jobs run at the same time. -
Test\Foo::echorunsecho testin a shell with job inputs appended to the command. It interprets the output of the command as a string and can run up to 10 jobs at once. -
Php::evalrunsphp -R ...as a direct program call (which is safer) with job input passed to the program on STDIN. It interprets the output of the command as newline-separated JSON and can only run one at a time. It also writes a full log, including job output, to /var/log/gearbox/...
reload_worker(boolean): Installs a method calledgearbox\reloadmw::UUIDwhereUUIDis the instance ID (randomised at tool start) which reloads the configuration files and updates the workers when called.
command: What the method runs for each job. (Required.) There are several forms:- (string) Runs the command within a shell (usually
/bin/sh).input=prependis not available in this form. - (array of strings) Runs the command directly, where the first string is the program, and any others are arguments.
- (string) Runs the command within a shell (usually
input(string): How job inputs are handled. Defaults tostdin:stdin: Writes the inputs as JSON to STDIN I/O.append: Transforms the inputs to strings* and appends them to the arguments.prepend: Transforms the inputs to strings* and prepends them to the arguments.ignore: Discards any inputs.
output(string): How job output is handled. Defaults tostring:string: Sent back as a string.buffer: Sent back as a byte array, of the form{"type":"Buffer","data":[byte,byte,byte]}where bytes are integers. This is more appropriate thanstringfor binary data.json: Parsed as JSON (will throw if malformed!) and sent back as such.nl-json: Each line is parsed as JSON (will throw!) and collected in an array.ignore: Discarded.
log(string): Logs job events to a file.log_output(boolean): Withlog, also writes the output of each job to the log, even ifoutput = ignore.log_no_time(boolean): Withlog, don't prepend timestamps to log lines.concurrency(unsigned integer): How many jobs can run in parallel in this multiworker instance. (NB. this doesn't control how many jobs can run across the entire gearman cluster, nor even for other multiworkers.) If this is zero or less, will disable the method.disabled(boolean): If true, the method is ignored.
TOML parsing errors will crash the multiworker, schema errors will only skip the relevant method definition.
The gearbox protocol layers JSON-RPC over Gearman jobs. All data, whether calls
result in success or error, is returned via WORK_DATA and underlying gearman
tasks are marked as WORK_COMPLETE, unless a hard error occurs.
Queues a job for scheduling, returns its job id, which is the handle by which the job can be watched or managed from then on.
name(string): Method name. (required)args(any): Arguments. (required)priority(either ofnormal,high,low): Priority.dedupe(string): Arbitrary string that serves to de-duplicate jobs when scheduling. If a job with a dedupe is running at the time another job with that same dedupe is getting scheduled, the newer job will immediately be marked as duplicate of the first.after_date(iso8601 string): If provided, the job will not be scheduled before then.after_id(unsigned integer): If provided, the job will not be scheduled until the other job described by that ID is complete (that is, successful). If the other job fails (errors and cannot be retried anymore), this job will also be marked as failed. Note that this doesn't currently work properly with duplicated jobs (see dedupe description).before_id(unsigned integer): If provided, the job described by that ID will not be scheduled until this and all other jobs with thisbefore_idare complete (that is, successful). If jobs within that “pool” fail, the descendent job will be marked as failed, but only once all jobs within the pool have completed or failed.max_retries(unsigned integer): If provided, the job will be automatically retried up to that number of times until it succeeds. Jobs marked invalid or duplicate are not retried.retry_delay(unsigned integer): The time in seconds between each retry. Defaults to 1 second.
(unsigned integer) the job id
Watches a job until it either completes or fails, then returns its result.
id(unsigned integer): Job id. (required)wait_for_id(boolean): If true, doesn't immediately fail if the job id provided doesn't exist, but instead waits until that id becomes real, and then watches that job. May be useful in some race condition situations.
(mixed) the job result
Updates a running job's data, status, or progress. To be used by workers,
usually as an implementation detail of the worker abstraction (data returned
via Gearman's WORK_DATA / JSON-RPC for scheduled jobs is ignored & discarded.)
id(unsigned integer): Job id. (required)data(mixed): Some data to replace the job results with.status(eithercompleteorerrored): If provided, updates a running job to that status. Ignored if the job isn't running.progress(floating-point number): If provided, updates the job's progress field. (The number is arbitrary but generally is interpreted as a percentage out of 100.)append(boolean or string): If true or a string, data is appended instead of replacing the previous result. If the value is a string, use that as a separator, otherwise use a newline.
Nothing. (Method should be called as a notification.)
Returns status information about jobs. By default returns the status of all current (running and scheduled) jobs.
An array of job ids (unsigned integers).
An array of status objects, like so:
[{
"id": 115,
"method_name": "sample::method",
"arguments": [
"a",
"b",
"c"
],
"priority": "normal",
"created": "2019-06-18T02:25:18.000Z",
"updated": "2019-06-18T02:25:18.000Z",
"status": "running",
"after_date": null,
"after_id": null,
"before_id": null,
"completed": null,
"retries": false,
"dedupe": "1984d092-5f6b-4e45-8672-bcc402df2fe4",
"progress": false,
"data": null
}]Returns current and historical stats about a particular method.
method(string): Method. (required)
totalRuns(unsigned integer): Total number of jobs ever run for this method.earliest(iso8601 string): Earliest run of this method.latest(iso8601 string): Latest run of this method.averageCompletionTime(floating-point number): In seconds.averageRetries(floating-point number).stdAverageCompletionTime(floating-point number): Averages normalised without outliers.stdAverageRetries(floating-point number): Averages normalised without outliers.states(object): Count of jobs in each status.
The payload a worker gets per job is an RPC request:
{
"jsonrpc": "2.0",
"method": "Name\\Space::method",
"id": 12345,
"_meta": {
"gearbox_id": 987
},
"params": ...
}Workers must provide the contents of params to the actual job, may do
some processing to present these more cromulently to the expectant application,
and may provide the _meta contents as an additional argument. They should
not provide access to the rest of the RPC envelope.
Workers must run the job then answer with an RPC response as WORK_DATA, and
end the job as WORK_COMPLETE, regardless of the actual status of the job. The
RPC result field should be null. Note that jobs are requests, not RPC
notifications: the worker must answer.
Workers may send an RPC error back with WORK_COMPLETE, but that should be
reserved for worker failure, i.e. not for application errors.
Workers may end the job as WORK_ERROR as a last resort. In that case, the
payload need not necessarily be formatted as JSON-RPC, and will be assumed an
error in any case if it is received by gearbox.
{
"jsonrpc": "2.0",
"id": 12345,
"result": null
}Job data and status must be returned via a gearman background job, with an
RPC notification payload, to the method gearbox\core::job_data, as described
in its documentation above. Whenever possible, that notification should be
sent after the gearman job is ended (e.g. with setImmediate in Node to
run on next tick).
It’s technically possible to set a job’s data, status, and progress from a completely different job, or even another system altogether. Don’t do it. Thanks.