/majordomo2

Replacement for majordomo. A Racket task manager that supports carrying state across restarts, among other things.

Primary LanguageRacketOtherNOASSERTION

majordomo

A job manager that includes automatic retries with updated state on restart. Written in Racket.

Synopsis

Overview

A user submits a function of one argument. Two threads are created:

  • worker thread: runs the provided code
  • manager thread: receives messages from the worker thread, restarts it when needed, relays messages to the customer

The worker thread can use the (tell-manager the-task msg) function to send the following messages to the manager thread:

  • Task complete : a task struct with status 'succeeded or 'failed'
  • Keepalive : use (keepalive the-task). Resets the timeout
  • #f : restart task using most recently updated state
  • Update state : anything that is not one of the above. Replaces previous state

The task will be restarted at most (max-restarts) times if any of the following applies:

  • The worker thread does not send any messages to the manager within (task-timeout) seconds
  • The worker thread exits normally and did not send a task complete message to the manager
  • The worker thread died due to an exception and did not send a task complete message to the manager

task-timeout is a parameter

max-restarts is a parameter.

Tasks

A task struct has the following public fields, each of which must satisfy the specified contract:

  • proc (-> task? any) ; The function that was submitted for proccessing
  • id symbol? ; unique identifier for this task. autogenerated
  • status (or/c 'succeeded 'failed)
  • data any/c ; Any needed state. Defaults to (hash)

The return value of proc is ignored.

Data updates

The worker thread can (but does not have to) send data-update messages to the manager. The manager will keep track of this new state and will make it available to each restart.

Example

(define jeeves (start-majordomo))

(define the-task
 (start-task jeeves
            (lambda (the-task)
             ; do a long-running task, e.g. torrenting bits of a file
             (define to-manager (task.manager-ch the-task))
             (define data (task.data the-task))
  	    (define final-data
                  (for/fold ([data data])
                            ([i (in-range 10)])
                     ; tell the manager that we are working on chunk i
                     (tell-manager the-task (hash-meld data (hash i 'wip)))
                     (sleep (random))   ; simulates the download
                     
                     ; tell the manager that we got it
                     (define next (hash-meld data (hash i #t)))
                     (tell-manager the-task next)
                     next))
	   (keepalive the-task)  ; reset the timeout counter

	   (sleep 3) ; simulates disk writes, intense processing, etc

	   (keepalive the-task)  ; reset the timeout counter
	   
            (define result (set-task-data (set-task-status the-task 'succeeded) final-data))
            (tell-manager result result))))

(define task-result
 (let loop ()
   (match (async-channel-get (task.customer-ch the-task))
    [(? task-msg:restart?) (loop)] ; could log the restart or etc
    [(? task? result) result])))

(pretty-print (task.data task-result)) ; show the result

(stop-majordomo) ; custodian-shutdown-all on all running tasks