Interim solution to adapt Vert.X ReadStream to Rxjva2's Flowable. Use Vert.X EventBus to subscribe multiple subscribers to source Flowable.
- Vert.X 3.5.x
- RxJava2 2.1.x(latest)
mvn clean install
<repositories>
<repository>
<id>com.blueskiron-public-repo</id>
<name>com.blueskiron maven public</name>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
<url>https://raw.githubusercontent.com/jurajzachar/com.blueskiron-public-repo/releases</url>
</repository>
</repositories>
<dependency>
<groupId>com.blueskiron</groupId>
<artifactId>vertx-rxjava2</artifactId>
<version>1.1.1</version>
</dependency>
Example: FlowableReadStream
Simply wrap ReadStream and use normal Flowable:
FileSystem fs = getVertx().fileSystem();
fs.open(FILEPATH.toAbsolutePath().toString(), new OpenOptions().setRead(true), ar -> {
if (ar.failed()) {
context.fail(ar.cause());
} else {
Flowable<Buffer> flowable = FlowableReadStream.newLineDelimitedReadStream(ar.result());
flowable.subscribe(new PacedTestSubscriber(getVertx(), async, 1000));
}
});
Example: FlowableEventbusPublisher and FlowableEventBusSubscriber
FileSystem fs = getVertx().fileSystem();
fs.open(FILEPATH.toAbsolutePath().toString(), new OpenOptions().setRead(true), ar -> {
if (ar.failed()) {
context.fail(ar.cause());
} else {
Flowable<Buffer> flowable = FlowableReadStream.newLineDelimitedReadStream(ar.result());
String sourceAddress = "testSourceAddress";
new FlowableEventBusPublisher<>(flowable, getVertx(), sourceAddress, new DeliveryOptions());
FlowableEventBusSubscriber<Buffer> busSubscriber = new FlowableEventBusSubscriber<>(getVertx(), sourceAddress, "testSubscriberAddress");
busSubscriber.subscribe(new PacedTestSubscriber(getVertx(), async, 50));
}
});