Delayed queue

Build Status

DelayedQueue is a library to enqueue and later handle immutable events. It is written in java on top of reactor and lettuce. Redis is the only available storage engine. Queue doesn't provide durability guarantees but in pair with clusterized redis installation it is suitable for many use cases.

Key features are:

  • event handling is retriable at increased intervals between attempts
  • subscription context could be passed to a handler
  • support for the @PreDestroy lifecycle annotation
  • sending metrics using Micrometer

DelayedQueue are highly opinionated (hence customizable), with very little configuration needed to start using it. If you want more control consider using Netflix's dyno-queue.

Installation

Minimal supported java version is 1.8.

repositories {
    maven { url 'https://jitpack.io' }
}
dependencies {
    implementation 'com.github.fred84:delayedQueue:0.4.0'
}

Examples

Minimal configuration

import static com.github.fred84.queue.DelayedEventService.delayedEventService;

var eventService = delayedEventService().client(redisClient).build();

Full configuration

import static com.github.fred84.queue.DelayedEventService.delayedEventService;

eventService = delayedEventService()
        .client(redisClient)
        .mapper(objectMapper)
        .handlerScheduler(Schedulers.fromExecutorService(executor))
        .schedulingInterval(Duration.ofSeconds(1))
        .schedulingBatchSize(SCHEDULING_BATCH_SIZE)
        .enableScheduling(false)
        .pollingTimeout(POLLING_TIMEOUT)
        .eventContextHandler(new DefaultEventContextHandler())
        .dataSetPrefix("")
        .retryAttempts(10)
        .metrics(new NoopMetrics())
        .refreshSubscriptionsInterval(Duration.ofMinutes(5))
        .build();

Add event handler

eventService.addHandler(DummyEvent.class, e -> Mono.just(true), 1);

Enqueue event

eventService.enqueueWithDelayNonBlocking(new DummyEvent("1"), Duration.ofHours(1)).subscribe();

Close service

eventService.close();

Event context

eventService.addHandler(
        DummyEvent.class,
        e -> Mono
            .subscriberContext()
            .doOnNext(ctx -> {
                Map<String, String> eventContext = ctx.get("eventContext");
                log.info("context key {}", eventContext.get("key"));
            })
            .thenReturn(true),
        1
);

eventService.enqueueWithDelayNonBlocking(new DummyEvent("1"), Duration.ofHours(1), Map.of("key", "value")).subscribe();

Contribution

Contributions are welcome. Just create a pull request.