The goal of this project
To provide an easy way for sending and receiving notifications between processes
The structure of this project
- A Notifier client spring boot starter -- a spring boot started that when include in a spring boot project automatically create a Notifier client
configuration is done with one property
notifier.server.url
that should set the url (possible behind load balancer) of the Notifier server, for examplenotifier.server.url=ws://localhost:6565/rsocket
- A Notifier server -- a server that use kafka to manage subscription and notifications configuration is done using the following properties
# suppress inspection "UnusedProperty" for whole file
spring.rsocket.server.port=6565
spring.rsocket.server.transport=websocket
spring.rsocket.server.mapping-path=/rsocket
notifier.server.kafka.bootstrapServers=localhost:29092
notifier.server.kafka.topic=notifications
the rsocket params should match the clients notifier.server.url
- notifier local env a docker compose with kafka zipkin jaeger and open telemetry collector
- An example -- a spring boot application that uses the notifier client started to send and receive notifications
Using the Notifier
The Notifier interface is very simple
interface Notifier {
fun notify(notification: String) : Mono<Void>
fun oneway(notification: String): Mono<Void>
fun subscribe(pattern: String, subscriberMode: SubscriberMode = SubscriberMode.Shared): Flux<Event>
}
Sending notification can be done in 2 ways
- sending and waiting for the notification to be written to the kafka topic
notifier.notify("foo")
- sending and not waiting
notifier.oneway("a one way")
Subscription modes
when subscribing the client can choose between 2 modes
- A shared mode, in which all the subscribers share the same kafka consumer, the advantage of this mode is that it is very lightweight, the downside is that a slow subscribers are immediately disconnected otherwise other subscribers will get the notification late (no back-pressure). This is the default mode.
val disposable: Disposable = notifier.subscribe("*").doOnNext{
logger.debug("shared subscriber got $it")
}.subscribe()
- stand-alone subscription on the other hand is a bit heavier because each subscriber get its own Kafka reactive consumer, this subscriber gets events as fast as it consumes them but not faster (back-pressure is working).
val standAloneDisposable: Disposable = notifier.subscribe("FOO *", SubscriberMode.Standalone).doOnNext{
logger.debug("standalone subscriber got $it")
}.subscribe()
An example, cleaning Caffeine cache using notifications
package com.totango.notifier.example
import com.github.benmanes.caffeine.cache.Cache
import com.github.benmanes.caffeine.cache.Caffeine
import com.totango.notifier.client.Notifier
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.boot.ApplicationRunner
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
import org.springframework.context.annotation.Bean
import reactor.core.Disposable
import java.util.concurrent.TimeUnit
@SpringBootApplication
class NotifierApplication {
@Bean
fun runExample(notifier: Notifier): ApplicationRunner =
ApplicationRunner {
example(notifier)
}
data class CachedValue(val payload: String)
private fun example(notifier: Notifier){
val cache: Cache<String, CachedValue> = Caffeine.newBuilder()
.expireAfterWrite(1, TimeUnit.MINUTES)
.maximumSize(100)
.build()
cache.put("880:ACCOUNT1", CachedValue("ACCOUNT1"))
cache.put("880:ACCOUNT2", CachedValue("ACCOUNT2"))
cache.put("880:ACCOUNT3", CachedValue("ACCOUNT3"))
val disposable: Disposable = notifier.subscribe("880 MODIFY ACCOUNT ?").doOnNext{ event ->
val key = "${event.tokens[0]}:${event.tokens[3]}"
logger.info("invalidating $key")
cache.invalidate(key)
logger.info("cache contains ${cache.asMap()}")
}.retry().subscribe()
logger.info("cache contains ${cache.asMap()}")
notifier.notify("880 MODIFY ACCOUNT ACCOUNT1").block()
notifier.oneway("880 MODIFY ACCOUNT ACCOUNT3").block()
notifier.notify("880 MODIFY ACCOUNT ACCOUNT3").block()
notifier.notify("881 MODIFY ACCOUNT ACCOUNT1").block()
Thread.sleep(10000)
logger.info("at end, cache contains ${cache.asMap()}")
disposable.dispose()
logger.debug("Done")
}
companion object {
@Suppress("unused")
var logger: Logger = LoggerFactory.getLogger(NotifierApplication::class.java)
}
}
fun main(args: Array<String>) {
runApplication<NotifierApplication>(*args)
}
the output of the client should be something like:
2022-03-13 21:18:29.682 INFO [notifier-client,,] 57013 --- [ main] c.t.n.example.NotifierApplication : cache contains {880:ACCOUNT2=CachedValue(payload=ACCOUNT2), 880:ACCOUNT1=CachedValue(payload=ACCOUNT1), 880:ACCOUNT3=CachedValue(payload=ACCOUNT3)}
2022-03-13 21:18:30.181 INFO [notifier-client,7eafddc7d9f9fe8e,7eafddc7d9f9fe8e] 57013 --- [actor-tcp-nio-2] c.t.n.example.NotifierApplication : invalidating 880:ACCOUNT1
2022-03-13 21:18:30.184 INFO [notifier-client,7eafddc7d9f9fe8e,7eafddc7d9f9fe8e] 57013 --- [actor-tcp-nio-2] c.t.n.example.NotifierApplication : cache contains {880:ACCOUNT2=CachedValue(payload=ACCOUNT2), 880:ACCOUNT3=CachedValue(payload=ACCOUNT3)}
2022-03-13 21:18:30.185 INFO [notifier-client,7eafddc7d9f9fe8e,7eafddc7d9f9fe8e] 57013 --- [actor-tcp-nio-2] c.t.n.example.NotifierApplication : invalidating 880:ACCOUNT3
2022-03-13 21:18:30.186 INFO [notifier-client,7eafddc7d9f9fe8e,7eafddc7d9f9fe8e] 57013 --- [actor-tcp-nio-2] c.t.n.example.NotifierApplication : cache contains {880:ACCOUNT2=CachedValue(payload=ACCOUNT2)}
2022-03-13 21:18:30.188 INFO [notifier-client,7eafddc7d9f9fe8e,7eafddc7d9f9fe8e] 57013 --- [actor-tcp-nio-2] c.t.n.example.NotifierApplication : invalidating 880:ACCOUNT3
2022-03-13 21:18:30.188 INFO [notifier-client,7eafddc7d9f9fe8e,7eafddc7d9f9fe8e] 57013 --- [actor-tcp-nio-2] c.t.n.example.NotifierApplication : cache contains {880:ACCOUNT2=CachedValue(payload=ACCOUNT2)}
2022-03-13 21:18:40.200 INFO [notifier-client,,] 57013 --- [ main] c.t.n.example.NotifierApplication : at end, cache contains {880:ACCOUNT2=CachedValue(payload=ACCOUNT2)}
2022-03-13 21:18:40.201 DEBUG [notifier-client,,] 57013 --- [ main] c.t.n.example.NotifierApplication : Done
and the server
2022-03-13 21:21:53.019 DEBUG [notifier,,] 57042 --- [ main] c.t.notifier.server.NotifierApplication : Event server ready WEBSOCKET 6565 /rsocket
2022-03-13 21:21:53.019 DEBUG [notifier,,] 57042 --- [ main] c.t.notifier.server.NotifierApplication : Zipkin http://localhost:9411
2022-03-13 21:21:53.019 DEBUG [notifier,,] 57042 --- [ main] c.t.notifier.server.NotifierApplication : Jaeger http://localhost:16686
2022-03-13 21:22:00.134 DEBUG [notifier,b7621673249fbe5e,24ac47e0ddea97fb] 57042 --- [ctor-http-nio-2] c.t.notifier.server.NotifierController : got a shared subscribe request, pattern: [880 MODIFY ACCOUNT ?]
2022-03-13 21:22:00.161 DEBUG [notifier,b7621673249fbe5e,24ac47e0ddea97fb] 57042 --- [ctor-http-nio-2] com.totango.notifier.server.Subscribers : adding a shared subscription Subscription(pattern=[880, MODIFY, ACCOUNT, ?], emitter=FluxSink(ERROR), uid=c0d0b865-d1cd-40de-a076-81364badfd9d)
2022-03-13 21:22:00.171 DEBUG [notifier,b2fb5a2d3a242b61,3fc22abec39fa2e0] 57042 --- [ctor-http-nio-2] c.t.notifier.server.NotifierController : got notification 880 MODIFY ACCOUNT ACCOUNT1
2022-03-13 21:22:00.281 DEBUG [notifier,b2fb5a2d3a242b61,85878a9d356bc44d] 57042 --- [otifier-group-1] c.t.notifier.server.NotifierProcessor : shared dispatch notification [880 MODIFY ACCOUNT ACCOUNT1] to subscriber c0d0b865-d1cd-40de-a076-81364badfd9d because of subscription [880, MODIFY, ACCOUNT, ?]
2022-03-13 21:22:00.302 DEBUG [notifier,7ddd2fba4d1681bf,bdacd00dcd221c3b] 57042 --- [ctor-http-nio-2] c.t.notifier.server.NotifierController : got notification 880 MODIFY ACCOUNT ACCOUNT3
2022-03-13 21:22:00.311 DEBUG [notifier,dfeb13611af19fb9,48b16aef9c28566d] 57042 --- [otifier-group-1] c.t.notifier.server.NotifierProcessor : shared dispatch notification [880 MODIFY ACCOUNT ACCOUNT3] to subscriber c0d0b865-d1cd-40de-a076-81364badfd9d because of subscription [880, MODIFY, ACCOUNT, ?]
2022-03-13 21:22:00.317 DEBUG [notifier,7ddd2fba4d1681bf,f722e0a826cb207c] 57042 --- [otifier-group-1] c.t.notifier.server.NotifierProcessor : shared dispatch notification [880 MODIFY ACCOUNT ACCOUNT3] to subscriber c0d0b865-d1cd-40de-a076-81364badfd9d because of subscription [880, MODIFY, ACCOUNT, ?]
2022-03-13 21:22:00.323 DEBUG [notifier,7640da0a5858e66e,3b50eb851e05985e] 57042 --- [ctor-http-nio-2] c.t.notifier.server.NotifierController : got notification 881 MODIFY ACCOUNT ACCOUNT1
2022-03-13 21:22:10.332 DEBUG [notifier,b7621673249fbe5e,24ac47e0ddea97fb] 57042 --- [ctor-http-nio-2] com.totango.notifier.server.Subscribers : removing a shared subscription Subscription(pattern=[880, MODIFY, ACCOUNT, ?], emitter=FluxSink(ERROR), uid=c0d0b865-d1cd-40de-a076-81364badfd9d)