Support for distributed, highly available (batch) singleton jobs, with job scheduling, locking, supervision and job status persistence. Implemented with Scala, Akka and Cassandra.
While 1.2.0 being binary backwards compatible, the routes file of the Play! Module should be changed: the list
action got a
new (optional) parameter limit
, so that the related route should now be s.th. like
GET /jobs/:jobType @de.kaufhof.hajobs.JobsController.list(jobType, limit: Int ?= 20)
- Overview
- Features
- Constraints
- Implementation/Solution Details
- Alternative Solutions
- Installation / Setup
- Usage
- Example 1: A Job with Cron Schedule, Persistence and Supervision
- Example 2: An Actor Job with Cron Schedule etc.
- Example 3: An Actor Job running continuously
- Play! REST API
- License
- Job Locking
- Ensures, that a job is run by at most one instance
- Locks are automatically released when an instance crashed
- When a lock is lost / timed out (e.g. because the jvm is only/mainly doing GC), the job will be canceled so that another instance can step in
- Job Supervision
- When a crashed job is detected, the job will be resumed/retried (maybe by another instance) a configured number of times
- Job Scheduling
- Jobs can be scheduled, the Quartz scheduler is used for this.
- Job Status / Reporting
- For each job the status and result is stored, jobs can also provide details to be stored
- Job Management
- There's a Play! Framework controller that provides a REST api for jobs, which allows to start a job, get a list of job executions, and details for a specific job execution.
- There's no need for work distribution / if a single instance can execute a job.
Currently for Job Locking Cassandra's lightweight transactions are used, locks are stored with a certain TTL (e.g. 30 seconds) in a dedicated table. As long as a job is running, an (Akka) actor keeps the lock for the job active. When the job completes, the actor is stopped and the lock is released/deleted. When the instance running the job crashes, the lock will be deleted due to the used TTL.
Job Supervision is done by a Job, that detects dead jobs and restarts them the configured number of times.
There's the JobManager
, that allows to start or restart a job, and schedules jobs based on their scheduling pattern.
A job must extend Job
, which has the following interface:
abstract class Job(val jobType: JobType,
val retriggerCount: Int,
val cronExpression: Option[String] = None,
val lockTimeout: FiniteDuration = 60 seconds) {
/**
* Starts a new job and returns a [[de.kaufhof.hajobs.JobExecution JobExecution]].
* The [[de.kaufhof.hajobs.JobExecution#result JobExecution.result]] future must be
* completed when the job execution is finished.
*
* This method (`run`) should not "block", all the work must be performed
* by the returned `JobExecution`.
*/
def run()(implicit context: JobContext): JobExecution
}
The JobType
identifies the job, it is defined with a name and a LockType
.
There's a distinction between JobType
and LockType
so that jobs with the same LockType
cannot
run simultaneously. A LockType
is just defined by name.
If your job is implemented as an actor, you can just use the ActorJob
, as shown by examples below.
- For job locking Zookeeper could be used.
- For job locking and job supervision Akka's Cluster Singleton could be used. There's also a nice Activator template for distributed workers with Akka that provides an example for the cluster singleton.
- Chronos Fault tolerant job scheduler for Mesos which handles dependencies and ISO8601 based schedules. Batteries included
You must add the ha-jobs to the dependencies of the build file, e.g. add to build.sbt
:
libraryDependencies += "de.kaufhof" %% "ha-jobs" % "1.2.5"
It is published to maven central for both scala 2.10 and 2.11.
You must create the tables lock
, job_status_data
and job_status_meta
in the used Cassandra keyspace, according
to the (Pillar) migration scripts in
ha-jobs-core/src/test/resources/migrations.
For a single Job you have to
- Define the
LockType
(with name) - Define the
JobType
(with name andLockType
) - Implement your concrete job, which must extend
Job
. - If you're using an Akka Actor for your Job, you can just use the
ActorJob
asJob
implementation and configure it with theProps
creating your Actor (see also the example below).
To complete the setup you also need
- the
JobSupervisor
job, which detects dead jobs and retriggers them on an alive instance - the
JobManager
, which schedules Jobs according to their cron patterns
You also need to setup/configure 2 or 3 things more, which should be self-explanatory.
Here's a fully working example of a job (might e.g. import products) that is started every 10 minutes. The job stores its status when starting/finished, it might do so as well during the import so that its progress could be tracked. In this example the JobSupervisor is also configured, so that failed/dead jobs would be detected.
import akka.actor.ActorSystem
import de.kaufhof.hajobs.JobState._
import de.kaufhof.hajobs._
import play.api.libs.json.Json
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Promise, Future}
import scala.util.{Failure, Success, Try}
// == Product Import Job
val ProductImportLockType = LockType("ProductImportLock")
val ProductImportJobType = JobType("ProductImport", ProductImportLockType)
/**
* A job that normally would import products, but now only prints "importing" some times.
*/
class ProductImport(override val jobStatusRepository: JobStatusRepository,
cronExpression: Option[String]) extends Job(
ProductImportJobType, retriggerCount = 3, cronExpression = cronExpression) with WriteStatus {
override def run()(implicit context: JobContext): JobExecution = new JobExecution() {
private val promise = Promise[Unit]()
override val result = promise.future
override def cancel(): Unit = {
// We might update some flag that could be checked by `importProducts()`
}
writeStatus(Running)
// onComplete: after updating our status we must complete the result. This will
// release the lock and stop the lock keeper actor.
importProducts().onComplete(updateStatus.andThen(_ => promise.success(())))
// A not so long running operation, but still producing some side effect
private def importProducts(): Future[Int] = {
Future.successful {
println("Importing products ... done.")
42 // products imported
}
}
private def updateStatus(implicit context: JobContext): Try[Int] => Future[JobStatus] = {
case Success(count) =>
writeStatus(Finished, Some(Json.obj("count" -> count)))
case Failure(e) =>
writeStatus(Failed, Some(Json.obj("error" -> e.getMessage)))
}
}
}
// Setup repos needed for jobs + job manager
// session: the Cassandra Session (com.datastax.driver.core.Session)
val statusRepo = new JobStatusRepository(session, jobTypes = JobTypes(ProductImportJobType))
val lockRepo = new LockRepository(session, LockTypes(ProductImportLockType))
// Setup jobs
val productImporter = new ProductImport(statusRepo, Some("0 0 * * * ?"))
val jobSupervisor = new JobSupervisor(manager, lockRepo, statusRepo, Some("0 * * * * ?"))
val system = ActorSystem("example1")
// Setup the JobManager
val manager: JobManager = new JobManager(Seq(productImporter, jobSupervisor), lockRepo, statusRepo, system)
// You should `shutdown()` the manager when the application is stopped.
In the given example, once the JobManager
is created, it will schedule the productImporter
according to its cronExpression
.
The JobTypes
and LockTypes
must provide all your custom JobType
s and LockType
s, they're needed by
the repositories when loading db records, which is done when a job status is reported (e.g. via the Play! REST API).
If the previous Job would be implemented via an Actor this would be used together with the ActorJob
.
So the ProductImport
class and its instance would be replaced by this
class ProductImportActor(override val jobStatusRepository: JobStatusRepository)
(implicit jobContext: JobContext) extends Actor with WriteStatus {
writeStatus(Running)
self ! "go"
override def receive: Receive = {
case "go" =>
println("Importing products ... done.")
writeStatus(Finished, Some(Json.obj("count" -> 42)))
// no need to tell the context that we're finished, this will be done by ActorJob when we're stopped.
context.stop(self)
case ActorJob.Cancel =>
// We should support ActorJob.Cancel and stop() processing.
writeStatus(Canceled)
context.stop(self)
}
}
object ProductImportActor {
def props(statusRepo: JobStatusRepository)(jobContext: JobContext) =
Props(new ProductImportActor(statusRepo)(jobContext))
}
// the `ActorJob` creates a new actor from the given Props on each schedule
val productImporter = new ActorJob(ProductImportJobType, ProductImportActor.props(statusRepo),
system, cronExpression = Some("0 0 * * * ?"))
A use case for this might be an actor that regularly consumes a queue, with a high frequency. So the actor job should be started on system start, grab the lock, and run infinitely.
The relevant parts from the example above would be modified like this:
class QueueConsumerActor(interval: FiniteDuration,
override val jobStatusRepository: JobStatusRepository)
(implicit jobContext: JobContext) extends Actor with WriteStatus {
writeStatus(Running)
self ! "consume"
override def receive: Receive = consuming(0)
private def consuming(consumed: Int): Receive = {
case "consume" =>
println(s"Consuming, until now consumed $consumed items...")
writeStatus(Running, Some(Json.obj("consumed" -> consumed)))
context.system.scheduler.scheduleOnce(interval, self, "consume")
context.become(consuming(consumed + 42))
case ActorJob.Cancel =>
// We should support ActorJob.Cancel and stop() processing.
writeStatus(Canceled)
context.stop(self)
}
}
object QueueConsumerActor {
def props(interval: FiniteDuration, statusRepo: JobStatusRepository)(jobContext: JobContext) =
Props(new QueueConsumerActor(interval, statusRepo)(jobContext))
}
// the ActorJob does not define a `cronExpression`
val queueConsumer = new ActorJob(ConsumerJobType, QueueConsumerActor.props(2 seconds, statusRepo),
system, cronExpression = None)
val manager: JobManager = new JobManager(Seq(queueConsumer, jobSupervisor), lockRepo, statusRepo, system)
// manually trigger the job
manager.triggerJob(ConsumerJobType) onComplete {
case Success(Started(jobId, _)) => println(s"Started queue consumer job $jobId")
// The Success case can also carry LockedStatus or Error
case Success(els) => println(s"Could not start queue consumer: $els")
case Failure(e) => println(s"An exception occurred when trying to start queue consumer: $e")
}
The module ha-jobs-play
provides a Play! controller that allows to start jobs and retrieve the job status via HTTP.
To use this you must add the following to the build file:
libraryDependencies += "de.kaufhof" %% "ha-jobs-play" % "1.2.5"
In your routes file you have to add these routes (of course you may choose different urls):
POST /jobs/:jobType @de.kaufhof.hajobs.JobsController.run(jobType)
GET /jobs/:jobType @de.kaufhof.hajobs.JobsController.list(jobType, limit: Int ?= 20)
GET /jobs/:jobType/latest @de.kaufhof.hajobs.JobsController.latest(jobType)
GET /jobs/:jobType/:jobId @de.kaufhof.hajobs.JobsController.status(jobType, jobId)
Use your preferred dependency injection mechanism to provide the managed JobsController
to your GlobalSettings
:
val jobManager = ... // the JobManager
val jobTypes = ... // e.g. JobTypes(ProductImportJobType) in the 1st example
new JobsController(jobManager, jobTypes, de.kaufhof.hajobs.routes.JobsController)
The de.kaufhof.hajobs.routes.JobsController
is the reverse router (ReverseJobsController
) created by Play!
on compilation.
Then you can manage your jobs via http, e.g. using the following for a job of JobType("productimport")
:
# get a list of all job executions
curl http://localhost:9000/jobs/productimport
# get redirected to the status of the latest job execution
curl -L http://localhost:9000/jobs/productimport/latest
# get job execution status/details
curl http://localhost:9000/jobs/productimport/a13037f0-9076-11e4-a8d6-4ff0e8bdfb24
# execute the job
curl -X POST http://localhost:9000/jobs/productimport
The license is Apache 2.0, see LICENSE.