Quarts scheduler library using cats-effect
libraryDependencies ++= Seq(
"com.itv" %% "cats-quartz-core" % "0.7.0",
"com.itv" %% "cats-quartz-extruder" % "0.7.0"
)
The project uses a quartz scheduler, and as scheduled messages are generated from Quartz they are
decoded and put onto an fs2.concurrent.Queue
.
- a
QuartzTaskScheduler[F[_], A]
which schedules jobs of typeA
- a
JobDataEncoder[A]
which encodes job data in a map for the given job of typeA
- a job factory which is triggered by quartz when a scheduled task occurs and creates messages to put on the queue
- a
JobDecoder[A]
which decodes the incoming message data map into anA
- the decoded message is put onto the provided
fs2.concurrent.Queue
We need to have a set of types to encode and decode.
The extruder project provides the ability to
encode/decode an object as a Map[String, String]
, which works perfectly for
putting data into the quartz JobDataMap
.
import com.itv.scheduler.{JobDataEncoder, JobDecoder}
import com.itv.scheduler.extruder.implicits._
import extruder.map._
sealed trait ParentJob
case object ChildObjectJob extends ParentJob
case class UserJob(id: String) extends ParentJob
object ParentJob {
implicit val jobDataEncoder: JobDataEncoder[ParentJob] = deriveEncoder[ParentJob]
implicit val jobDecoder: JobDecoder[ParentJob] = deriveDecoder[ParentJob]
}
There are 2 options when creating a CallbackJobFactory
: auto-acked and manually acked messages.
Scheduled jobs from quartz are immediately acked and the resulting message of type A
is placed on a Queue[F, A]
.
If the message taken from the queue isn't handled cleanly then the resulting quartz job won't be re-run,
as it has already been marked as successful.
import cats.effect._
import com.itv.scheduler._
import fs2.concurrent.Queue
import scala.concurrent.ExecutionContext
implicit val contextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
// contextShift: ContextShift[IO] = cats.effect.internals.IOContextShift@2b00911a
val jobMessageQueue = Queue.unbounded[IO, ParentJob].unsafeRunSync()
// jobMessageQueue: Queue[IO, ParentJob] = fs2.concurrent.Queue$InPartiallyApplied$$anon$3@61313295
val autoAckJobFactory = CatsStreamJobFactory.autoAcking[IO, ParentJob](jobMessageQueue)
// autoAckJobFactory: AutoAckingQueueJobFactory[IO, ParentJob] = com.itv.scheduler.AutoAckingQueueJobFactory@1663393b
Scheduled jobs are received but only acked with quartz once the handler has completed via an acker: MessageAcker[F, A]
.
Scheduled jobs from quartz are bundled into a message: A
and an acker: MessageAcker[F, A]
.
The items in the queue are each a Resource[F, A]
which uses the message and acks the message as the Resource
is use
d.
Alternatively the lower-level way of handling each message is via a queue of
AckableMessage[F, A](message: A, acker: MessageAcker[F, A])
items where the message is explicitly acked by the user.
In both cases, the quartz job is only marked as complete once the acker.complete(result: Either[Throwable, Unit])
is called.
// each message is wrapped as a `Resource` which acks on completion
val ackableJobResourceMessageQueue = Queue.unbounded[IO, Resource[IO, ParentJob]].unsafeRunSync()
// ackableJobResourceMessageQueue: Queue[IO, Resource[IO, ParentJob]] = fs2.concurrent.Queue$InPartiallyApplied$$anon$3@753f860
val ackingResourceJobFactory: AckingQueueJobFactory[IO, Resource, ParentJob] =
CatsStreamJobFactory.ackingResource(ackableJobResourceMessageQueue)
// ackingResourceJobFactory: AckingQueueJobFactory[IO, Resource, ParentJob] = com.itv.scheduler.AckingQueueJobFactory@279c4c65
// each message is wrapped as a `AckableMessage` which acks on completion
val ackableJobMessageQueue = Queue.unbounded[IO, AckableMessage[IO, ParentJob]].unsafeRunSync()
// ackableJobMessageQueue: Queue[IO, AckableMessage[IO, ParentJob]] = fs2.concurrent.Queue$InPartiallyApplied$$anon$3@61db9c1a
val ackingJobFactory: AckingQueueJobFactory[IO, AckableMessage, ParentJob] =
CatsStreamJobFactory.acking(ackableJobMessageQueue)
// ackingJobFactory: AckingQueueJobFactory[IO, AckableMessage, ParentJob] = com.itv.scheduler.AckingQueueJobFactory@40f435a9
import java.util.concurrent.Executors
import com.itv.scheduler.extruder.implicits._
val quartzProperties = QuartzProperties(new java.util.Properties())
// quartzProperties: QuartzProperties = QuartzProperties(properties = {})
val blocker = Blocker.liftExecutorService(Executors.newFixedThreadPool(8))
// blocker: Blocker = cats.effect.Blocker@599321a5
val schedulerResource: Resource[IO, QuartzTaskScheduler[IO, ParentJob]] =
QuartzTaskScheduler[IO, ParentJob](blocker, quartzProperties, autoAckJobFactory)
// schedulerResource: Resource[IO, QuartzTaskScheduler[IO, ParentJob]] = Allocate(
// resource = Map(
// source = Map(
// source = Bind(
// source = Delay(
// thunk = com.itv.scheduler.QuartzTaskScheduler$$$Lambda$13719/0x00000008047f0040@7d838369
// ),
// f = cats.FlatMap$$Lambda$13721/0x00000008047f1840@6d290da7,
// trace = StackTrace(
// stackTrace = List(
// cats.effect.internals.IOTracing$.buildFrame(IOTracing.scala:48),
// cats.effect.internals.IOTracing$.buildCachedFrame(IOTracing.scala:39),
// cats.effect.internals.IOTracing$.cached(IOTracing.scala:34),
// cats.effect.IO.flatMap(IO.scala:133),
// cats.effect.IOLowPriorityInstances$IOEffect.flatMap(IO.scala:886),
// cats.effect.IOLowPriorityInstances$IOEffect.flatMap(IO.scala:863),
// cats.FlatMap.flatTap(FlatMap.scala:154),
// cats.FlatMap.flatTap$(FlatMap.scala:153),
// cats.effect.IOLowPriorityInstances$IOEffect.flatTap(IO.scala:863),
// cats.FlatMap$Ops.flatTap(FlatMap.scala:234),
// cats.FlatMap$Ops.flatTap$(FlatMap.scala:234),
// cats.FlatMap$ToFlatMapOps$$anon$2.flatTap(FlatMap.scala:243),
// com.itv.scheduler.QuartzTaskScheduler$.apply(QuartzTaskScheduler.scala:97),
// repl.MdocSession$App.<init>(README.md:89),
// repl.MdocSession$.app(README.md:3),
// mdoc.internal.document.DocumentBuilder$$doc$.$anonfun$build$2(DocumentBuilder.scala:89),
// scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18),
// scala.util.DynamicVariable.withValue(DynamicVariable.scala:59),
// scala.Console$.withErr(Console.scala:193),
// mdoc.internal.document.DocumentBuilder$$doc$.$anonfun$build$1(DocumentBuilder.scala:89),
// scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18),
// scala.util.DynamicVariable.withValue(DynamicVariable.scala:59),
// scala.Console$.withOut(Console.scala:164),
// mdoc.internal.document.DocumentBuilder$$doc$.build(DocumentBuilder.scala:88),
// mdoc.internal.markdown.MarkdownBuilder$.buildDocument(MarkdownBuilder.scala:44),
// mdoc.internal.markdown.Processor.processScalaInputs(Processor.scala:185),
// mdoc.internal.markdown.Processor.processScalaInputs(Processor.scala:152),
// mdoc.internal.markdown.Processor.processDocument(Processor.scala:52)...
import java.time.Instant
import org.quartz.{CronExpression, JobKey, TriggerKey}
def scheduleCronJob(scheduler: QuartzTaskScheduler[IO, ParentJob]): IO[Option[Instant]] =
scheduler.scheduleJob(
JobKey.jobKey("child-object-job"),
ChildObjectJob,
TriggerKey.triggerKey("cron-test-trigger"),
CronScheduledJob(new CronExpression("* * * ? * *"))
)
def scheduleSingleJob(scheduler: QuartzTaskScheduler[IO, ParentJob]): IO[Option[Instant]] =
scheduler.scheduleJob(
JobKey.jobKey("single-user-job"),
UserJob("user-123"),
TriggerKey.triggerKey("scheduled-single-test-trigger"),
JobScheduledAt(Instant.now.plusSeconds(2))
)