A job manager that includes automatic retries with updated state on restart. Written in Racket.
A user submits a function of one argument. Two threads are created:
- worker thread: runs the provided code
- manager thread: receives messages from the worker thread, restarts it when needed, relays messages to the customer
The worker thread can use the (tell-manager the-task msg)
function to send the following messages to the manager thread:
- Task complete : a task struct with status 'succeeded or 'failed'
- Keepalive : use
(keepalive the-task)
. Resets the timeout #f
: restart task using most recently updated state- Update state : anything that is not one of the above. Replaces previous state
The task will be restarted at most (max-restarts)
times if any of the following applies:
- The worker thread does not send any messages to the manager within
(task-timeout)
seconds - The worker thread exits normally and did not send a task complete message to the manager
- The worker thread died due to an exception and did not send a task complete message to the manager
task-timeout
is a parameter
max-restarts
is a parameter.
A task struct has the following public fields, each of which must satisfy the specified contract:
- proc
(-> task? any)
; The function that was submitted for proccessing - id
symbol?
; unique identifier for this task. autogenerated - status
(or/c 'succeeded 'failed)
- data
any/c
; Any needed state. Defaults to (hash)
The return value of proc
is ignored.
The worker thread can (but does not have to) send data-update messages to the manager. The manager will keep track of this new state and will make it available to each restart.
(define jeeves (start-majordomo))
(define the-task
(start-task jeeves
(lambda (the-task)
; do a long-running task, e.g. torrenting bits of a file
(define to-manager (task.manager-ch the-task))
(define data (task.data the-task))
(define final-data
(for/fold ([data data])
([i (in-range 10)])
; tell the manager that we are working on chunk i
(tell-manager the-task (hash-meld data (hash i 'wip)))
(sleep (random)) ; simulates the download
; tell the manager that we got it
(define next (hash-meld data (hash i #t)))
(tell-manager the-task next)
next))
(keepalive the-task) ; reset the timeout counter
(sleep 3) ; simulates disk writes, intense processing, etc
(keepalive the-task) ; reset the timeout counter
(define result (set-task-data (set-task-status the-task 'succeeded) final-data))
(tell-manager result result))))
(define task-result
(let loop ()
(match (async-channel-get (task.customer-ch the-task))
[(? task-msg:restart?) (loop)] ; could log the restart or etc
[(? task? result) result])))
(pretty-print (task.data task-result)) ; show the result
(stop-majordomo) ; custodian-shutdown-all on all running tasks