/trailpack-tasker

:package: Easily set up background workers with RabbitMQ

Primary LanguageJavaScriptMIT LicenseMIT

trailpack-tasker

NPM version Build status Dependency Status Code Climate

Easily set up background workers with RabbitMQ and Trails.

Install

$ npm install --save trailpack-tasker

Configure

Add Trailpack

// config/main.js
module.exports = {
  packs: [
    // ... other trailpacks
    require('trailpack-tasker')
  ]
}

Configure Tasker Settings

// config/tasker.js
module.exports = {

  /**
   * Define worker profiles. Each worker of a given type listens for the
   * "tasks" defined in its profile below. The task names represent a Task
   * defined in api.services.tasks
   */
  profiles: {
    memoryBound: {
      tasks: [ 'hiMemoryTask1' ]
    },
    cpuBound: {
      tasks: [ 'VideoEncoder', 'hiCpuTask2' ]
    }
  },

  /**
   * Set RabbitMQ connection info
   */
  connection: {
    exchange: process.env.TASKER_EXCHANGE, // optional, defaults to `tasker-work-x`
    workQueueName: process.env.TASKER_WORK_QUEUE, // optional, defaults to `tasker-work-q`
    interruptQueueName: process.env.TASKER_INTERRUPT_QUEUE, // optional, defaults to `tasker-interrupt-q`

    /**
     * The RabbitMQ connection information.
     * See: https://www.rabbitmq.com/uri-spec.html
     */
     host: process.env.TASKER_RMQ_HOST,
     user: process.env.TASKER_RMQ_USER,
     pass: process.env.TASKER_RMQ_PASS,
     port: process.env.TASKER_RMQ_PORT,
     vhost: process.env.TASKER_RMQ_VHOST
  },

  /**
   * Set worker to subscribe to tasks in the matching profile (tasker.profiles).
   * If process.env.WORKER does not match a profile, the application will not subscribe to any tasks
   */
  worker: process.env.WORKER
}

Configure worker Environment

// config/env/worker.js
module.exports = {
  main: {

    /**
     * Only load the packs needed by the workers
     */
    packs: [
      require('trailpack-core'),
      require('trailpack-tasker')
    ]
  }
}

If the worker profiles each require more granular environment configurations, create worker-cpuBound, worker-memoryBound, etc. environments.

Usage

Define tasks in api.services.tasks. Each task is run in a separate process.

// api/services/tasks/VideoEncoder.js

const Task = require('trailpack-tasker').Task
module.exports = class VideoEncoder extends Task {

  /**
   * "payload" is the message from RabbitMQ, and contains all the information
   * the worker needs to do its job. By default, sets this.payload and this.app.
   *
   * @param payload.videoFormat
   * @param payload.videoBuffer
   */
  constructor (app, payload) {
    super(app, payload)
  }

  /**
   * Do work here. When the work is finished (the Promise is resolved), send
   * "ack" to the worker queue. You must override this method.
   *
   * @return Promise
   */
  run () {
    return doWork(this.payload)
  }

  /**
   * This is a listener which is invoked when the worker is interrupted (specifically,
   * an interrupt is a particular type of message that instructs this worker to
   * stop).
   */
  interrupt () {
    this.log.warn('only encoded', this.currentIndex, 'out of', this.totalItems, 'frames')
  }

  /**
   * Perform any necessary cleanup, close connections, etc. This method will be
   * invoked regardless of whether the worker completed successfully or not.
   * @return Promise
   */
  finalize () {
    return doCleanup(this.payload)
  }
}

To start a task, publish a message via the app.tasker interface:

const taskId = app.tasker.publish('VideoEncoder', { vidoeUrl: 'http://...' }

To interrupt a task in progress, use the taskId that is returned from app.tasker.publish(..):

app.tasker.cancel('VideoEncoder', taskId)

Deployment

An example Procfile may look like:

web: npm start
memoryBound: NODE_ENV=worker WORKER=memoryBound npm start
cpuBound: NODE_ENV=worker WORKER=cpuBound npm start

License

MIT

Maintained By