- references
- https://www.zionomicon.com
- https://zio.dev/version-1.x/datatypes/stream/zstream/
- Zymposium - ZIO Streams Part 3 (Streaming Sandwiches)
- ZIO Stream — Part 2 — Sinks!
- https://blog.rockthejvm.com/zio-streams/
- ZIO-streams tutorial - build a Bitcoin ticker in 10 minutes
- https://j3t.ch/tech/zio-streams-trappings/
- https://github.com/adamgfraser/0-to-100-with-zio-test
- https://www.manning.com/books/functional-programming-in-scala-second-edition
- The Streaming Future by Adam Fraser
- The Next Generation Of Streaming by Adam Fraser
- goals of this workshop
- introduction to zio streams
- Stream
- Sink
- Pipeline
- introduction to zio streams
- workshop task
- task1: migration from fs2 to ZStream
- base repo: https://github.com/mtumilowicz/scala-zio2-fs2-refined-newtype-workshop
- replace fs2 with ZStream
- base repo: https://github.com/mtumilowicz/scala-zio2-fs2-refined-newtype-workshop
- task2: get BTC-EUR price every 5 seconds (answer: BitcoinTicker object)
import zio.stream.ZStream import zio.{Scope, ZIO, ZIOAppArgs, ZIOAppDefault, durationInt} import scala.io.Source object BitcoinTicker extends ZIOAppDefault { val url = "https://api.kraken.com/0/public/Ticker?pair=BTCEUR" val getPrice = ZIO.fromAutoCloseable(ZIO.attempt(Source.fromURL(url))) .map(_.mkString) .map(extractPrice) def extractPrice(json: String): BigDecimal = BigDecimal.apply( """c":\["([0-9.]+)""" .r("priceGroup") .findAllIn(json) .group("priceGroup") ) override def run: ZIO[Any with ZIOAppArgs with Scope, Any, Any] = ZStream // constantly call getPrice, hint: repeatZIO // every 5 seconds, hint: throttleShape // println price, hint: foreach, s"BTC-EUR: ${price.setScale(2)}" }
- task3: implement decoding
ZChannel
- input: concatenated products char by char
ProductService.encodedProducts.runCollect.flatMap(zio.Console.printLine(_))
Chunk(C,o,m,p,u,t,e,r,W,a,s,h,i,n,g,M,a,c,h,i,n,e,T,V,T,V)
- output: parsed products
ProductService.products.runCollect.flatMap(zio.Console.printLine(_))
Chunk(Computer,WashingMachine,TV,TV)
- method to implement
val decodeProduct: ZPipeline[Any, Nothing, Char, Product] = { // read some input // decode as many products as we can, maybe some leftovers appear // emit decoded products // read more input and add it to the leftovers // repeat def read(buffer: Chunk[Char]): ZChannel[Any, ZNothing, Chunk[Char], Any, Nothing, Chunk[Product], Any] = { // read from input, hint: ZChannel.readWith // use process buffer, hint: (leftovers, products) // write to channel, hint: ZChannel.writeAll // repeat with leftovers, hint: read(leftovers) // handle error channel, hint: ZNothing, ZChannel.fail // handle done, hint: if buffer not empty - ZIO.debug error, otherwise ZChannel.succeed } ZPipeline.fromChannel(read(Chunk.empty)) }
- solution:
ProductService
- input: concatenated products char by char
- task5
- read from
src/test/resources/contributors/data.txt
and group contributors by repository- solution:
ContributorService
- solution:
- you could verify number of lines using:
cat ~/IdeaProjects/scala-zio2-zstream-workshop/src/test/resources/contributors/data.txt | awk -F, '{ print $1 }' | sort -u | wc -l
- read from
- task1: migration from fs2 to ZStream
- why we need streaming instead of using just
IO
?IO
type fundamentally provides us with the same level of abstraction as ordinary imperative programming- writing efficient, streaming I/O will generally involve monolithic loops
- not composable
- writing efficient, streaming I/O will generally involve monolithic loops
- example: program that checks whether the number of lines in a file is greater than 5
def linesGt(filename: String, limit: Int): IO[Boolean] = new IO { val src = io.Source.fromFile(filename) try var count = 0 val lines: Iterator[String] = src.getLines() while count <= limit && lines.hasNext do lines.next count += 1 count > limit finally src.close }
- entangles the high-level algorithm with low-level concerns about iteration and file access
- not easy to read
- barrier to composition
- difficult to extend later
- followup: find a line index before 40,000 where the first letters of consecutive lines spell out "hello"
- we’d need to modify our loop to keep track of some further state
- entangles the high-level algorithm with low-level concerns about iteration and file access
- components
ZStream[R, E, O]
- effectual stream
- requires an environment R
- may fail with an error E
- if a stream fails with an error it is not well defined to pull from that stream again
- succeed with zero or more values of type O
- pull-based
- elements are processed by being "pulled through the stream" by the sink
- vs ZIO: ZIO - single value (no intermediate results) and after that - will never produce another result
- example
def publish(queue: Queue[Int]): ZIO[Any, Nothing, Any] = Random.nextInt.flatMap(queue.offer).delay(1.second).forever.fork
- does work incrementally
- work is not reflected in the return type (contain no meaningful information)
- we cannot do anything with that work, ex. transforming it or taking certain number of values
- example
trait ZSink[-Env, +Err, -In, +Leftover, +Summary]
- describe ways of consuming elements
- composable aggregation strategy
- strategy for aggregating zero or more elements into a summary value
- sink may emit zero or more leftover values of type
Chunk[+Leftover]
- represents inputs that were received but not included in the aggregation
- in some cases it can be useful to keep leftovers for further processing
- chunking can lead to leftovers
- sink does not need all of the elements in the chunk to produce a summary value
- example
- suppose that we want to have 3 elements, but results are produced with 2 x two-element chunks
- example
ZSink.collectAllWhile(_ == "a")
- suppose inputs: "a" then "b"
- "b" is leftover because we have to consume it (execute check
_ == "a"
) to decide if the sink is done
- "b" is leftover because we have to consume it (execute check
- suppose inputs: "a" then "b"
- represents inputs that were received but not included in the aggregation
- how to create?
ZSink.fromFileName("README2.md")
- example
def run[R1 <: R, E1 >: E, B](sink: ZSink[R1, E1, A, Any, B]): ZIO[R1, E1, B] stream.run(ZSink.collectAll)
- combining sinks
outputSink1.zipPar(outputSink2) // send inputs to both
- asynchronous aggregations
- size of a chunk + duration (to not wait eternally for filling chunk)
- example
def aggregateAsyncWithin[R1 <: R, E1 >: E, A1 >: A, B]( sink: => ZSink[R1, E1, A1, A1, B], schedule: => Schedule[R1, Option[B], Any] )
- if the sink is done first => aggregated value will be emitted downstream
- previous schedule timeout will be canceled
- then run the sink again and the next recurrence of the schedule
- if the schedule is done first => write a done value to sink
- writes its aggregated value to the downstream immediately
- then run the sink again and the next recurrence of the schedule
- if the sink is done first => aggregated value will be emitted downstream
trait ZPipeline[-Env, +Err, -In, +Out]
- represents the "middle" of the stream
- streams: beginning of a data flow process
- sinks: end of a data flow process
- takes as input a stream and returns a new stream of different element type
- definition is extremely broad
- almost any stream operator can be described as a pipeline
- can: map, filter, aggregate, append, etc
- can't: provide environment or handle stream errors
- definition is extremely broad
- strategy for describing transformations, not error handling
- use case: encoders and decoders
- encoding or decoding should be completely independent of the logic of a particular stream
- how to create?
object ZPipeline { def map[In, Out](f: In => Out)(implicit trace: Trace): ZPipeline[Any, Nothing, In, Out] }
- example of usage
def via[R1 <: R, E1 >: E, B](pipeline: ZPipeline[R1, E1, A, B]): ZStream[R1, E1, B] stream.via(ZPipeline.utf8Decode)
- contramap
- useful when we have a fixed output, and our existing function cannot consume those outputs
- motivation
- we have some logic to process a stream already
- we want to apply logic to stream of different type
- category theory
- covariant Functor: map
- produce value A
- example: covariant Decoder[A]
- JSON => A
- contravariant Functor: contramap
- consumes value A
- example: JSON contravariant Encoder[A]
- A => JSON
- covariant Functor: map
- example
val numericSum: ZSink[Any, Nothing, Int, Nothing, Int] = ZSink.sum[Int] val stringSum : ZSink[Any, Nothing, String, Nothing, Int] = numericSum.contramap((x: String) => x.toInt) // done on the sink side (contramap) val sum: ZIO[Any, Nothing, Int] = ZStream("1", "2", "3", "4", "5").run(stringSum) val sum: ZIO[Any, Nothing, Int] = ZStream("1", "2", "3", "4", "5").map(_.toInt).run(numericSum) // done on the stream side (map)
- represents the "middle" of the stream
- trait
ZChannel[-Env, -InErr, -InElem, -InDone, +OutErr, +OutElem, +OutDone]
- unifies streams, sinks, and pipelines
type ZStream[-R, +E, +A] = ZChannel[R, Any, Any, Any, E, Chunk[A], Any]
- Any = does not need / does not produce
type ZSink[-R, +E, -In, +L, +Z] = ZChannel[R, Nothing, Chunk[In], Any, E, L, Z]
- sink itself doesn’t know how to handle any errors so the error type has to be Nothing
- if the stream potentially fails with an error of type
E
usepipeToOrFail
- fails with the error of the first channel without passing it through to the second channel
- example:
stream.channel.pipeToOrFail(sink.channel)
- if the stream potentially fails with an error of type
- it can receive exactly one done value (
-InDone
) of type Any- elements might be produced asynchronously
- sink needs some way to know that there would not be more elements in the future
- elements might be produced asynchronously
- no inputs for its error type (
-InErr
)- sinks are not strategies for handling errors
- sink will eventually terminate, if at all
- with either a summary value of type
+OutElem
- or an error of type
+OutErr
- with either a summary value of type
- sink itself doesn’t know how to handle any errors so the error type has to be Nothing
type ZPipeline[-R, +E, -In, +Out] = ZChannel[R, Nothing, Chunk[In], Any, E, Chunk[Out], Any]
- the reason why we can hook sink with pipeline
- if a pipeline was just a function we would have very limited ability to compose it with other streaming data types
- result of combining a pipeline with a sink/stream is a new sink/stream
- the reason why we can hook sink with pipeline
type ZIO[-R, +E, +A] = ZChannel[R, Any, Any, Any, E, Nothing, A]
- ZIO is just a special case of a channel that does not read any input and does not produce any incremental results
- everything we know about how to run an individual ZIO workflow applies to a channel
- only need to handle some additional cases - only few of them
write
- emit and increment output- similar to
ZIO.succeed
butZIO.succeed
means "produce this final output and be done",write
says "produce that incremental output and then potentially keep going"
- similar to
read
- read an input element, error or done valuePipeTo
- send one output of one channel to input of anotherConcatMap
- generate a new channel for each element of a channel and combine them- similar to
flatMap
of our elements
- similar to
- only need to handle some additional cases - only few of them
- shouldn't think of streams as higher level built on top of effect systems
- channels are fundamental and effect systems are specialized versions
- operations supported by ZIO are subset of operations supported by
ZChannel
private final case class FromZIO[R, E, A](zio: ZIO[R, E, A]) extends ZChannel[R, Any, Any, Any, E, Nothing, A]
- runtime that can execute channels can also execute individual workflows
- under the hood
- implicit chunking
however, filter and map work on individual values
trait ZStream[-R, +E, +O] { def process: ZIO[R with Scope, Option[E], Chunk[O]] // motivation: efficiency }
- to run a channel you start a channel fiber:
ChannelFiber
- like running a ZIO workflow: you start a ZIO fiber
- implicit chunking
- useful operators
- collect = map + filter
- concat - switch to other stream after this stream is done
- mapAccum - map with stateful function
def mapAccum[S, A1](s: => S)(f: (S, A) => (S, A1))
- unfold
- declaration
def unfold[S, A](s: S)(f: S => Option[(A, S)]): ZStream[Any, Nothing, A]
- is only evaluated as values are pulled
- can be used to describe streams that continue forever
- effectual variant: unfoldZIO
- example: reading incrementally from a data source while maintaining some cursor
- declaration
- groupByKey
- example
stream.groupByKey(_.key) { case (key, stream) => operations on stream}
- function will helpfully push entries with the same key to their own sub-stream
- reason: are potentially infinite
apply
function of groupBy/groupByKey:- already using flatMapPar under the hood
- example
- many operators have effectual variants (ZIO suffix)
- for effectual variants - many have parallel variants (Par suffix)
- example: map, mapZIO, mapZIOPar
- digression
- problem: enters your 100mb/sec production stream of ~200'000 messages each second, and suddenly your app can’t keep up even though its CPU usage seems desperately low.
- reason: absence of parallelism
- if you’ve expressed your logic using .map or .flatMap only on your stream, well, that particular stage of your processing pipeline is guaranteed to be run on a single fiber
- solution: replace the mapM/flatMap that where applying the business logic to the stream with mapMPar (or some variants)
- running stream
- transform ZStream to a ZIO effect
- ZStream produces potentially infinitely many values
- how to run a stream to produce a single value (ZIO effect)?
- run stream and discard results (runDrain)
- return the first value (runHead)
- fold to produce summary, consume only as many elements as necessary to produce summary
- example
- get tweets -> transform -> save to db
- entire program described as a stream
- no need for any result, just run it
- how to run a stream to produce a single value (ZIO effect)?
- ZStream produces potentially infinitely many values
- execute ZIO effect
- transform ZStream to a ZIO effect
- scope
- ZIO workflow never produces any incremental output
- it is clear that the finalizer should be run immediately after ZIO completes execution
- general rule: finalizer should be run immediately after stream completes execution
- we don’t want to run finalizers associated with an upstream channel while a downstream channel is still processing elements
- example
results:
val finalizer: URIO[Any, Unit] = Console.printLine("finalizer finished").orDie def logging(prefix: String): Any => URIO[Any, Unit] = v => Console.printLine(s"$prefix " + v).orDie val businessLogic: Int => UStream[Int] = (v: Int) => ZStream.fromZIO(Console.printLine(s"flatMap $v").orDie) *> ZStream.succeed(v) val stream1 = ZStream(1, 2, 3, 4) val stream2 = ZStream(5, 6, 7, 8) val ex1 = stream1 .ensuring(finalizer) .tap(logging("first tapping")) .flatMap(businessLogic) .concat(stream2) .tap(logging("second tapping")) .runDrain
first tapping 1 flatMap 1 second tapping 1 ... second tapping 4 finalizer finished second tapping 5 ... second tapping 8
- ZIO workflow never produces any incremental output