This is EventStore driver for Java, that uses Netty for network communication and GSON for object serialization/deserialization to JSON (e.g.: stream metadata, cluster information dto). Client logic implementation is the same as in the original client for .NET platform.
- Java 8
- EventStore 4.0.0 - 5.0.11 (esjc v2)
- EventStore 3.2.0 - 3.9.4 (esjc v1)
- EventStore v4 (or higher) compatible
<dependency>
<groupId>com.github.msemys</groupId>
<artifactId>esjc</artifactId>
<version>2.5.0</version>
</dependency>
- EventStore v3 compatible
<dependency>
<groupId>com.github.msemys</groupId>
<artifactId>esjc</artifactId>
<version>1.8.1</version>
</dependency>
$ ./scripts/generate-ssl-cert.sh
$ docker-compose up -d
$ mvn clean verify
Client instances are created using a builder class. The examples below demonstrate how to create default client with singe-node and cluster-node configuration.
- creates a single-node client
EventStore eventstore = EventStoreBuilder.newBuilder()
.singleNodeAddress("127.0.0.1", 1113)
.userCredentials("admin", "changeit")
.build();
- creates a cluster-node client (using gossip seeds)
EventStore eventstore = EventStoreBuilder.newBuilder()
.clusterNodeUsingGossipSeeds(cluster -> cluster
.gossipSeedEndpoints(asList(
new InetSocketAddress("127.0.0.1", 2113),
new InetSocketAddress("127.0.0.1", 2213),
new InetSocketAddress("127.0.0.1", 2313))))
.userCredentials("admin", "changeit")
.build();
- creates a cluster-node client (using dns)
EventStore eventstore = EventStoreBuilder.newBuilder()
.clusterNodeUsingDns(cluster -> cluster.dns("mycluster.com"))
.userCredentials("admin", "changeit")
.build();
Driver uses full-duplex communication channel to server. It is recommended that only one instance per application is created.
In order to use secure channel between the client and server, first of all we need to enable SSL on server side by providing TCP secure port and server certificate.
- create private key file and self-signed certificate request (for testing purposes)
openssl req \
-x509 -sha256 -nodes -days 365 -subj "/CN=test.com" \
-newkey rsa:2048 -keyout domain.pem -out domain.csr
- export private key file and self-signed certificate request to PKCS#12 archive
openssl pkcs12 -export -inkey domain.pem -in domain.csr -out domain.p12
- start server with encrypted TCP connection
./run-node.sh --ext-secure-tcp-port 1119 --certificate-file domain.p12
Now we are ready to connect to single-node or cluster-node using secure channel. On the client side we are able to verify server certificate (check either signing certificate or CN and expiration date only) or accept any server certificate without verification.
// creates a client with secure connection to server whose certificate is trusted by the given certificate file
EventStore eventstore = EventStoreBuilder.newBuilder()
.singleNodeAddress("127.0.0.1", 1119)
.useSslConnection(new File("domain.csr"))
.userCredentials("admin", "changeit")
.build();
// creates a client with secure connection to server whose certificate Common Name (CN) matches 'test.com'
EventStore eventstore = EventStoreBuilder.newBuilder()
.singleNodeAddress("127.0.0.1", 1119)
.useSslConnection("test.com")
.userCredentials("admin", "changeit")
.build();
// creates a client with secure connection to server without certificate verification
EventStore eventstore = EventStoreBuilder.newBuilder()
.singleNodeAddress("127.0.0.1", 1119)
.useSslConnection()
.userCredentials("admin", "changeit")
.build();
All operations are handled fully asynchronously and returns CompletableFuture<T>
. For asynchronous result handling you could use whenComplete((result, throwable) -> { ... })
or thenAccept(result -> { ... })
methods on created future object. To handle result synchronously simply use get()
or join()
methods on future object.
// handles result asynchronously
eventstore.appendToStream("foo", ExpectedVersion.ANY, asList(
EventData.newBuilder().type("bar").jsonData("{ a : 1 }").build(),
EventData.newBuilder().type("baz").jsonData("{ b : 2 }").build())
).thenAccept(r -> System.out.println(r.logPosition));
// handles result synchronously
eventstore.appendToStream("foo", ExpectedVersion.ANY, asList(
EventData.newBuilder().type("bar").jsonData("{ a : 1 }").build(),
EventData.newBuilder().type("baz").jsonData("{ b : 2 }").build())
).thenAccept(r -> System.out.println(r.logPosition)).get();
eventstore.appendToStream("foo", ExpectedVersion.ANY, asList(
EventData.newBuilder()
.type("bar")
.data(new byte[]{1, 2, 3, 4, 5})
.metadata(new byte[]{6, 7, 8, 9, 0})
.build(),
EventData.newBuilder()
.eventId(UUID.randomUUID())
.type("baz")
.data("dummy content")
.build(),
EventData.newBuilder()
.type("qux")
.jsonData("{ a : 1 }")
.build())
).thenAccept(r -> System.out.println(r.logPosition));
eventstore.appendToStream("foo", 2,
EventData.newBuilder()
.type("quux")
.data(new byte[0])
.metadata(new byte[0])
.build()
).thenAccept(r -> System.out.println(r.logPosition));
eventstore.tryAppendToStream("foo", ExpectedVersion.ANY,
EventData.newBuilder()
.type("corge")
.build()
).thenAccept(r -> {
if (r.status == WriteStatus.Success) {
System.out.println(r.logPosition);
} else {
System.err.println(r.status);
}
});
try (Transaction t = eventstore.startTransaction("foo", ExpectedVersion.ANY).get()) {
t.write(EventData.newBuilder().type("bar").jsonData("{ a : 1 }").build());
t.write(EventData.newBuilder().type("baz").jsonData("{ b : 2 }").build());
t.write(asList(
EventData.newBuilder().type("qux").jsonData("{ c : 3 }").build(),
EventData.newBuilder().type("quux").jsonData("{ d : 4 }").build())
);
t.commit();
} catch (Exception e) {
e.printStackTrace();
}
eventstore.startTransaction("foo", ExpectedVersion.ANY).thenAccept(t -> {
t.write(EventData.newBuilder().type("bar").jsonData("{ a : 1 }").build());
t.write(EventData.newBuilder().type("baz").jsonData("{ b : 2 }").build());
t.write(asList(
EventData.newBuilder().type("qux").jsonData("{ c : 3 }").build(),
EventData.newBuilder().type("quux").jsonData("{ d : 4 }").build())
);
t.rollback();
});
eventstore.readEvent("foo", 1, false).thenAccept(e ->
System.out.format("id: '%s'; type: '%s'; data: '%s'",
e.event.originalEvent().eventId,
e.event.originalEvent().eventType,
e.event.originalEvent().data));
eventstore.readStreamEventsForward("foo", 10, 5, false).thenAccept(e ->
e.events.forEach(i -> System.out.format("#%d id: '%s'; type: '%s'; data: '%s'\n",
i.originalEvent().eventNumber,
i.originalEvent().eventId,
i.originalEvent().eventType,
new String(i.originalEvent().data))));
eventstore.readStreamEventsBackward("foo", 10, 5, false).thenAccept(e ->
e.events.forEach(i -> System.out.format("#%d id: '%s'; type: '%s'; data: '%s'\n",
i.originalEvent().eventNumber,
i.originalEvent().eventId,
i.originalEvent().eventType,
new String(i.originalEvent().data))));
eventstore.readAllEventsForward(Position.START, 10, false).thenAccept(e ->
e.events.forEach(i -> System.out.format("@%s id: '%s'; type: '%s'; data: '%s'\n",
i.originalPosition,
i.originalEvent().eventId,
i.originalEvent().eventType,
new String(i.originalEvent().data))));
eventstore.readAllEventsBackward(Position.END, 10, false).thenAccept(e ->
e.events.forEach(i -> System.out.format("@%s id: '%s'; type: '%s'; data: '%s'\n",
i.originalPosition,
i.originalEvent().eventId,
i.originalEvent().eventType,
new String(i.originalEvent().data))));
eventstore.iterateStreamEventsForward("foo", 10, 500, false).forEachRemaining(e ->
System.out.format("#%d id: '%s'; type: '%s'; data: '%s'\n",
e.originalEvent().eventNumber,
e.originalEvent().eventId,
e.originalEvent().eventType,
new String(e.originalEvent().data)));
eventstore.iterateStreamEventsBackward("foo", 10, 500, false).forEachRemaining(e ->
System.out.format("#%d id: '%s'; type: '%s'; data: '%s'\n",
e.originalEvent().eventNumber,
e.originalEvent().eventId,
e.originalEvent().eventType,
new String(e.originalEvent().data)));
eventstore.iterateAllEventsForward(Position.START, 500, false).forEachRemaining(e ->
System.out.format("@%s id: '%s'; type: '%s'; data: '%s'\n",
e.originalPosition,
e.originalEvent().eventId,
e.originalEvent().eventType,
new String(e.originalEvent().data)));
eventstore.iterateAllEventsBackward(Position.END, 500, false).forEachRemaining(e ->
System.out.format("@%s id: '%s'; type: '%s'; data: '%s'\n",
e.originalPosition,
e.originalEvent().eventId,
e.originalEvent().eventType,
new String(e.originalEvent().data)));
eventstore.streamEventsForward("foo", 10, 500, false)
.filter(e -> e.originalEvent().eventType.equals("bar"))
.forEach(e -> System.out.format("#%d id: '%s'; type: '%s'; data: '%s'\n",
e.originalEvent().eventNumber,
e.originalEvent().eventId,
e.originalEvent().eventType,
new String(e.originalEvent().data)));
eventstore.streamEventsBackward("foo", 10, 500, false)
.filter(e -> e.originalEvent().eventType.equals("bar"))
.forEach(e -> System.out.format("#%d id: '%s'; type: '%s'; data: '%s'\n",
e.originalEvent().eventNumber,
e.originalEvent().eventId,
e.originalEvent().eventType,
new String(e.originalEvent().data)));
eventstore.streamAllEventsForward(Position.START, 500, false)
.filter(e -> !e.originalEvent().eventType.startsWith("$"))
.forEach(e -> System.out.format("@%s id: '%s'; type: '%s'; data: '%s'\n",
e.originalPosition,
e.originalEvent().eventId,
e.originalEvent().eventType,
new String(e.originalEvent().data)));
eventstore.streamAllEventsBackward(Position.END, 500, false)
.filter(e -> !e.originalEvent().eventType.startsWith("$"))
.forEach(e -> System.out.format("@%s id: '%s'; type: '%s'; data: '%s'\n",
e.originalPosition,
e.originalEvent().eventId,
e.originalEvent().eventType,
new String(e.originalEvent().data)));
CompletableFuture<Subscription> volatileSubscription = eventstore.subscribeToStream("foo", false,
new VolatileSubscriptionListener() {
@Override
public void onEvent(Subscription subscription, ResolvedEvent event) {
System.out.println(event.originalEvent().eventType);
}
@Override
public void onClose(Subscription subscription, SubscriptionDropReason reason, Exception exception) {
System.out.println("Subscription closed: " + reason);
}
});
volatileSubscription.get().close();
CompletableFuture<Subscription> volatileSubscription = eventstore.subscribeToStream("foo", false, (s, e) ->
System.out.println(e.originalEvent().eventType)
);
volatileSubscription.get().close();
CompletableFuture<Subscription> volatileSubscription = eventstore.subscribeToAll(false,
new VolatileSubscriptionListener() {
@Override
public void onEvent(Subscription subscription, ResolvedEvent event) {
System.out.println(event.originalEvent().eventType);
}
@Override
public void onClose(Subscription subscription, SubscriptionDropReason reason, Exception exception) {
System.out.println("Subscription closed: " + reason);
}
});
volatileSubscription.get().close();
CompletableFuture<Subscription> volatileSubscription = eventstore.subscribeToAll(false, (s, e) ->
System.out.println(e.originalEvent().eventType)
);
volatileSubscription.get().close();
CatchUpSubscription catchupSubscription = eventstore.subscribeToStreamFrom("foo", 3L,
new CatchUpSubscriptionListener() {
@Override
public void onLiveProcessingStarted(CatchUpSubscription subscription) {
System.out.println("Live processing started!");
}
@Override
public void onEvent(CatchUpSubscription subscription, ResolvedEvent event) {
System.out.println(event.originalEvent().eventType);
}
@Override
public void onClose(CatchUpSubscription subscription, SubscriptionDropReason reason, Exception exception) {
System.out.println("Subscription closed: " + reason);
}
});
catchupSubscription.close();
CatchUpSubscription catchupSubscription = eventstore.subscribeToStreamFrom("foo", 3L, (s, e) ->
System.out.println(e.originalEvent().eventType)
);
catchupSubscription.close();
CatchUpSubscription catchupSubscription = eventstore.subscribeToAllFrom(Position.START,
new CatchUpSubscriptionListener() {
@Override
public void onLiveProcessingStarted(CatchUpSubscription subscription) {
System.out.println("Live processing started!");
}
@Override
public void onEvent(CatchUpSubscription subscription, ResolvedEvent event) {
System.out.println(event.originalEvent().eventType);
}
@Override
public void onClose(CatchUpSubscription subscription, SubscriptionDropReason reason, Exception exception) {
System.out.println("Subscription closed: " + reason);
}
});
catchupSubscription.close();
CatchUpSubscription catchupSubscription = eventstore.subscribeToAllFrom(Position.of(1, 1), (s, e) ->
System.out.println(e.originalEvent().eventType)
);
catchupSubscription.close();
CompletableFuture<PersistentSubscription> persistentSubscription = eventstore.subscribeToPersistent("foo", "group",
new PersistentSubscriptionListener() {
@Override
public void onEvent(PersistentSubscription subscription, RetryableResolvedEvent event) {
System.out.println(event.originalEvent().eventType);
}
@Override
public void onClose(PersistentSubscription subscription, SubscriptionDropReason reason, Exception exception) {
System.out.println("Subscription closed: " + reason);
}
});
persistentSubscription.get().close();
CompletableFuture<PersistentSubscription> persistentSubscription = eventstore.subscribeToPersistent("foo", "group", (s, e) ->
System.out.println(e.originalEvent().eventType)
);
persistentSubscription.get().stop(Duration.ofSeconds(3));
eventstore.createPersistentSubscription("foo", "group", PersistentSubscriptionSettings.newBuilder()
.resolveLinkTos(false)
.historyBufferSize(20)
.liveBufferSize(10)
.minCheckPointCount(10)
.maxCheckPointCount(1000)
.checkPointAfter(Duration.ofSeconds(2))
.maxRetryCount(500)
.maxSubscriberCount(5)
.messageTimeout(Duration.ofSeconds(30))
.readBatchSize(500)
.startFromCurrent()
.timingStatistics(false)
.namedConsumerStrategy(SystemConsumerStrategy.ROUND_ROBIN)
.build()
).thenAccept(r -> System.out.println(r.status));
eventstore.createPersistentSubscription("bar", "group").thenAccept(r -> System.out.println(r.status));
eventstore.updatePersistentSubscription("foo", "group", PersistentSubscriptionSettings.newBuilder()
.maxRetryCount(200)
.readBatchSize(100)
.build()
).thenAccept(r -> System.out.println(r.status));
eventstore.deletePersistentSubscription("bar", "group").thenAccept(r -> System.out.println(r.status));
eventstore.deleteStream("bar", ExpectedVersion.ANY).thenAccept(r -> System.out.println(r.logPosition));
eventstore.setStreamMetadata("foo", ExpectedVersion.ANY, StreamMetadata.newBuilder()
.aclReadRoles(asList("eric", "kyle", "stan", "kenny"))
.cacheControl(Duration.ofMinutes(10))
.maxAge(Duration.ofDays(1))
.customProperty("baz", "dummy text")
.customProperty("bar", 2)
.customProperty("quux", 3.4)
.customProperty("quuux", true)
.build()
).thenAccept(r -> System.out.println(r.logPosition));
eventstore.setStreamMetadata("foo", ExpectedVersion.ANY, StreamMetadata.empty())
.thenAccept(r -> System.out.println(r.logPosition));
eventstore.getStreamMetadata("foo").thenAccept(r ->
System.out.format("deleted: %s, version: %s, stream: %s\nmetadata: %s\n",
r.isStreamDeleted,
r.metastreamVersion,
r.stream,
r.streamMetadata.toJson()));
eventstore.getStreamMetadataAsRawBytes("foo").thenAccept(r ->
System.out.format("deleted: %s, version: %s, stream: %s\nmetadata-bytes: %s\n",
r.isStreamDeleted,
r.metastreamVersion,
r.stream,
r.streamMetadata));
StreamAcl userStreamAcl = StreamAcl.newBuilder()
.readRoles(asList("eric", "kyle", "stan", "kenny"))
.writeRoles(asList("butters"))
.deleteRoles(asList("$admins"))
.metaReadRoles(asList("victoria", "mackey"))
.metaWriteRoles(asList("randy"))
.build();
StreamAcl systemStreamAcl = StreamAcl.newBuilder()
.readRoles(asList("$admins"))
.writeRoles(asList("$all"))
.deleteRoles(asList("$admins"))
.metaWriteRoles(asList("$all"))
.build();
eventstore.setSystemSettings(SystemSettings.newBuilder()
.userStreamAcl(userStreamAcl)
.systemStreamAcl(systemStreamAcl)
.build()
).thenAccept(r -> System.out.println(r.logPosition));