Implementation of a non blocking client accessing the low level API of the Nakadi event bus. Internally, it uses Akka and Akka Http to implement its communication tasks.
Please note that the client provides a Scala as well as a Java interface.
- Java >= 1.8
- Scala >= 2.11
nakadi.client {
noListenerReconnectDelay = 10 seconds // if no listener could be found, no connection to Nakadi is established.
// noListenerReconnectDelay specifies the delay after which the actor
// should try again to connect
pollParallelism = 100 // number of parallel polls from a specific host
receiveBufferSize = 1024 bytes // initial buffer size for event retrieval
defaultBatchFlushTimeout = 5 seconds // default batch flush timeout set in ListenParameters
defaultBatchLimit = 1 // default batch limit set in ListenParameters
defaultStreamLimit = 0 // default stream limit set in ListenParameters
supervisor {
// note: Supervisor strategy parameter names are from the Akka - keep them like this
maxNrOfRetries = 100
withinTimeRange = 5 minutes
resolveActorTimeout = 1 second // timeout for resolving PartitionReceiver actor reference
}
}
Scala
val klient = KlientBuilder()
.withEndpoint(new URI("localhost"))
.withPort(8080)
.withSecuredConnection(false)
.withTokenProvider(() => "<my token>")
.build()
Java
final Client client = new KlientBuilder()
.withEndpoint(new URI("localhost"))
.withPort(8080)
.withSecuredConnection(false)
.withJavaTokenProvider(() -> "<my token>")
.buildJavaClient();
NOTE: metrics format is not defined / fixed
Scala
klient.getMetrics map ( _ match {
case Left(error) => fail(s"could not retrieve metrics: $error")
case Right(metrics: Map[String, Any]) => logger.debug(s"metrics => $metrics")
})
Java
Future<Map<String, Object>> metrics = client.getMetrics();
Scala
klient.getTopics map ( _ match {
case Left(error) => fail(s"could not retrieve topics: $error")
case Right(topics: List[Topic]) => logger.debug(s"topics => $topics")
})
Java
Fuure<List<Topic>> topics = klient.getTopics();
Partition selection is done using the defined partition resolution. The partition resolution strategy is defined per
topic and is managed by the event bus (currently resolved from a hash over ordering_key
).
Scala
val event = Event("eventType",
"orderingKey",
Map("id" -> "1234567890"),
Map("greeting" -> "hello",
"target" -> "world"))
klient.postEvent(topic, event) map (_ match {
case Left(error) => fail(s"an error occurred while posting event to topic $topic")
case Right(_) => logger.debug("event post request was successful")
})
Java
HashMap<String,Object> meta = Maps.newHashMap();
meta.put("id", "1234567890");
HashMap<String, String> body = Maps.newHashMap();
body.put("greeting", "hello");
body.put("target", "world");
Future<Void> f = client.postEvent("test", new Event("eventType", "orderingKey", meta, body);
Scala
klient.getPartitions("topic") map (_ match {
case Left(error: String) => fail(s"could not retrieve partitions: $error")
case Right(partitions: List[TopicPartition]) => partitions
})
Java
Future<List<TopicPartition>> topics = klient.getPartitions("topic");
Non-blocking subscription to a topic requires a Listener
implementation. The event listener does not have to be
thread-safe because each listener is handled by its own Akka
actor
Scala
klient.subscribeToTopic("topic", ListenParameters(), listener, autoReconnect = true) // autoReconnect default is true
Java
public final class MyListener implements JListener {...}
client.subscribeToTopic("topic",
ListenParametersUtils.defaultInstance(),
new JListenerWrapper(new MyListener()),
true);
Non blocking subscription to events of specified topic and partition.
Scala
klient.listenForEvents("topic",
"partitionId",
ListenParameters(),
listener,
autoReconnect = false) // default is false
Java
public final class MyListener implements JListener {...}
client.listenForEvent("topic",
"partitionId",
ListenParametersUtils.defaultInstance(),
new JListenerWrapper(new MyListener()));
- handle case where separate clusters consisting of 1 member are built