/vertx-rxjava2

ReadStream as Flowable

Primary LanguageJava

Build Status

Vert.X + RxJava 2

Interim solution to adapt Vert.X ReadStream to Rxjva2's Flowable. Use Vert.X EventBus to subscribe multiple subscribers to source Flowable.

Dependencies

  • Vert.X 3.5.x
  • RxJava2 2.1.x(latest)

Build

mvn clean install

Maven

   <repositories>
    <repository>
      <id>com.blueskiron-public-repo</id>
      <name>com.blueskiron maven public</name>
      <releases>
        <enabled>true</enabled>
      </releases>
      <snapshots>
        <enabled>false</enabled>
      </snapshots>
      <url>https://raw.githubusercontent.com/jurajzachar/com.blueskiron-public-repo/releases</url>
    </repository>
   </repositories>

   <dependency>
     <groupId>com.blueskiron</groupId>
     <artifactId>vertx-rxjava2</artifactId>
     <version>1.1.1</version>
   </dependency>

Simply wrap ReadStream and use normal Flowable:

FileSystem fs = getVertx().fileSystem();
  fs.open(FILEPATH.toAbsolutePath().toString(), new OpenOptions().setRead(true), ar -> {
    if (ar.failed()) {
      context.fail(ar.cause());
    } else {
      Flowable<Buffer> flowable = FlowableReadStream.newLineDelimitedReadStream(ar.result());
      flowable.subscribe(new PacedTestSubscriber(getVertx(), async, 1000));
    }
  });
FileSystem fs = getVertx().fileSystem();
  fs.open(FILEPATH.toAbsolutePath().toString(), new OpenOptions().setRead(true), ar -> {
    if (ar.failed()) {
      context.fail(ar.cause());
    } else {
      Flowable<Buffer> flowable = FlowableReadStream.newLineDelimitedReadStream(ar.result());
      String sourceAddress = "testSourceAddress";
      new FlowableEventBusPublisher<>(flowable, getVertx(), sourceAddress, new DeliveryOptions());
      FlowableEventBusSubscriber<Buffer> busSubscriber = new FlowableEventBusSubscriber<>(getVertx(), sourceAddress, "testSubscriberAddress");
      busSubscriber.subscribe(new PacedTestSubscriber(getVertx(), async, 50));
    }
  });