Can rubico handle backpressure better than RxJS?
cjk opened this issue · 5 comments
Hi @richytong
what a great library, thanks for writing and maintaining it!
I’m a heavy RxJS-user but was fighting with some of it’s shortcomings. My main problem right now is ‘backpressure’ created when I work with massive amounts of data that is read from files / REST-APIs, transformed and then written to another file.
Since reading files is faster than writing, after running a couple of minutes / hours my heap-memory gets exhausted since data is piling up before it gets written. RxJS is push-driven, so it’s hard to handle the backpressure-problem without a lot of extra code.
From reading your Medium post I had the impression rubico could address this issue, but I’m not sure how - should I use (async) transducers and rubico will handle the control-flow if the source produces faster than downstream can handle? Or did I get this wrong?
Sorry for asking here, but until #161 is resolved, I was not sure where to ask such questions.
Hello, creating issues for now is fine. I am assuming since you mention data from files that you are working with JavaScript that runs on the server. Are you using Node.js? If so, have you tried out their streams/ran into any issues with backpressure there? Node.js streams could possibly be an easy solution just to get things to work. Otherwise, rubico may be able to help you - and yes you could certainly use rubico's transducers, but the concept of handling all the backpressure without worry comes from the real champ - a newish language feature of JavaScript called async iterators. In particular, rubico's transformer functions map
, filter
, reduce
, transform
, and flatMap
look for Symbol.asyncIterator
on an incoming object to be reduced and, if present, iterates through all the objects that come out of that async iterator. rubico does no more work to handle backpressure because the issue with backpressure is solved by async iterators by their design - they are inherently a "pull" based system while Observables (and Node.js streams) are "push" based systems with internals to handle backpressure. All rubico does is provide a way to consume this async iterator API.
I bring up Node.js streams because actually they implement the async iterator API, so it is possible to use rubico to consume a Node.js stream. I think at some point in master branch RxJS also implemented it, but then they decided to pull it out before RxJS 7. That said, I recommend you try out the Node.js streams way first. They have dealt with backpressure problems a lot so perhaps they have dealt with yours in particular. You can use their .pipe
method to chain streams together.
fetch('https://my-upload-endpoint.com', {
method: 'post',
body: fs.createReadStream('my/path').pipe(zlib.createGzip()),
headers: {
'Content-Type': 'content-type-of-your-file',
},
})
If the Node.js way does not work, rubico could come in and help deal with backpressure.
fetch('https://my-upload-endpoint.com', {
method: 'post',
body: stream.Readable.from(map(async chunk => {
await new Promise(resolve => setTimeout(resolve, 10)) // sleep 10ms each chunk
return zlib.deflate(chunk)
})(fs.createReadStream('my/path'))),
headers: {
'Content-Type': 'content-type-of-your-file',
},
})
You can replace the sleep 10ms part with any Promise, which could come as negative feedback from a separate system.
Ah, I see, then I should have a second look at async-iterators / NodeJS's streams first. I did that once, but they looked cumbersome by the time (perhaps 2 years ago). At that time I was really disappointed that RxJS had no way to interact with Node-streams. But still, RxJS looked promising since it provided a clear + solid path to solve complex transformations + sync/async with it's flexible pipe-like principles.
But thanks a lot for the clarification, I didn't know streams implement async-iterators and that this may solve my backpressure-problems.
Since rubioc's functional way looks so elegant, i'm pretty confident to give it a shot in implementing the necessary logic, since I need some plumbing-logic to connect the various streams to my flow in a way I can still understand it months later 😄
Let's close this then, your examples above will get me going 👍
I think Node.js streams have come a long way, that's for sure. And getting files contents to places on a network is probably in their core problem domain. The first example should be sufficient is my thinking.
Also, thank you for reminding me to update the examples. I am now redoing the rubico examples.