gearmancmd is a simple command-driven gearman module for python created to ease gearman usage.
import gearmancmd
class Queue(gearmancmd.GearmanCMDQueue):
"""
Class to process commands.
From the box gearmancmd comes with expectation of JSON encoded tasks, like:
{"command": "command_name", "all_other_arguments": "will go here"}
See below how to change it.
"""
def reverse(self, gcmd, task):
"""
Every command is a simple routine within class.
Returned data will be sent back to the client
Input sent by client: {"command": "reverse", "message": "Hello"}
task variable value: {u'command': u'reverse', u'message': u'Hello'}
In reply client should receive: "olleH"
"""
message = task.get("message", None)
if message is None:
raise AttributeError("Missing 'message' key")
return message[::-1]
def default(self, gcmd, task):
""" Default task will be executed if no routine found for command. """
return None
def stop(self, gcmd, task):
""" It is possible to control gearmancmd from within the command. """
print "Stopping gcmd..."
gcmd.stop()
# Returned value will be ignored, None will be sent to client
return
def main():
# Create GearmanCMD object. See further for more options
worker = gearmancmd.GearmanCMD(['localhost:4730'])
# Register queues
worker.register_task('queue', Queue())
# Start worker
try:
worker.run()
except KeyboardInterrupt: # Worker can be stopped by Ctrl+C
worker.stop()
except Exception, e: # Here we can handle exceptions from queues
worker.stop()
if __name__ == '__main__':
main()
Next options available for GearmanCMD constructor:
servers
(required) - list of gearman servers to connect to (will be passed directly to underlying worker, see below)command
(default: "command") - command argument to be used by dispatcher when determining routine to call from queuepoll_timeout
(default: 0.1) - poll interval for gearman worker and underlying pipe polls
In order to use different worker (by default GearmanCMD is shipped with JSON aware encoder) one should inherit GearmanCMD class and override worker
attribute:
class CustomWorker(gearman.worker.GearmanWorker):
data_encoder = CustomDataEncoder
class CustomGCMD(GearmanCMD):
""" Override worker attribute. """
worker = CustomWorker
GearmanCMD can also be stopped with external stop trigger. Normally one want to send instance of Event (from threading or multiprocessing module) to run
routine. GearmanCMD will call is_set
method to check if stop trigger is triggered.
import threading
trigger = threading.Event()
# now start GearmanCMD with additional argument
worker.run(trigger)
# now, to stop worker from external thread, just call
trigger.set()
By gefault GearmanCMD will use command
value from JSON task and try to call routine with same name from queue class (therefore commands can contain only values that are proper for function names in python).
This can be overriden in next ways:
command
attribute ininit
method - defines what key to use (default: "command")dispatch
method in queue class - see example belowdispatch
method in GearmanCMD child class - can be used if you need to dispatch tasks between different queues by some custom logic.
Custom tasks dispatching within a queue:
class Queue(gearmancmd.GearmanCMDQueue):
""" Override dispatch method to implement custom dispatching logic. """
def dispatch(self, task):
"""
Return string - function name within this class to be called.
As example, let's use uppercase function names
and ignore all commands with "ignore" in it's name
(if dispatch return None, task will be ignored)
"""
command = task.get("command", "default")
if 'ignore' in command:
return None
return command.upper()
def REVERSE(self, gcmd, task):
""" Reverse routine from first example... """
return "olleH" # Dirty hack :)
def DEFAULT(self, gcmd, task):
""" Default task will be executed if no routine found for command. """
return None
def STOP(self, gcmd, task):
""" It is possible to control gearmancmd from within the command. """
print "Stopping gcmd..."
gcmd.stop()
- Internally we use
Pipes
to send data between threads. All commands from all queues are passed by one Pipe. This should not be an issue, but just be informed. - You must restart GearmanCMD if you add qeues while it's running:
worker = gearmancmd.GearmanCMD(['localhost:4730'])
worker.register_task('queue', Queue())
worker.run()
# This queue will not be used ....
worker.register_task('another_queue', Queue2())
# ... until you do
worker.stop()
worker.run()