A collection of order-relaxing concurrent operations on fs2 streams.
- SBT 0.13.16+
- fs2-cats-effect
$ sbt publishLocal
import fs2._
import org.jameshales.fs2.pipeline._
val fruits: Stream[IO, Event] = ???
val writeToDatabase: Sink[IO, Event] = ???
val writeToIndex: Sink[IO, Event] = ???
val writeToWorkQueue: Sink[IO, Event] = ???
// Pass a Stream through multiple Sinks sequentially
fruits
.through(writeToDatabase.passthrough)
.through(writeToIndex.passthrough)
.through(writeToWorkQueue.passthrough)
.run
.unsafeRunSync
// Pass a Stream through multiple Sinks sequentially with pipelining
fruits
.pipeline(writeToDatabase.passthrough)
.pipeline(writeToIndex.passthrough)
.pipeline(writeToWorkQueue.passthrough)
.run
.unsafeRunSync
// Partition a Stream by key, processing each partition concurrently
fruits.joinPartition(4)(_.key)(
_.pipeline(writeToDatabase.passthrough)
.pipeline(writeToIndex.passthrough)
.pipeline(writeToWorkQueue.passthrough)
)
.run
.unsafeRunSync
See PipelineExample.scala for the full example.