NodeJS Stream Meeting Notes - Async Iterators
benjamingr opened this issue ยท 13 comments
Hey,
Node discussed Symbol.asyncIterator
streams in the last WG meeting yesterday (I had to miss it at the last minute :( ).
Symbol.asyncIterator can be supported inside streams without internal changes, however merging it inside core depends on what is the adopted change for Promises in core. We are happy to maintain the prototype and expand if needed. We should take care on what is the story for writing to streams, as it should match somehow the story for reading, i.e. we should be able to write with promises as well.
One issue raised was how to make async iterators work with writable streams - that is, there needs to be a way to do:
async function* foo() {
yield 1;
yield 2;
yield 3;
}
And have that coerced into a NodeJS stream - I don't think it should be too problematic to do in Node and I'm excited that the streams WG is keeping a prototype for interop. Pinging @mcollina who attended the meeting.
Thanks @benjamingr. I hope this will clarify things further.
Node.js Streams are composed of three types/classes:
Readable
, the source of the dataWritable
, the destination of the dataDuplex
, bothReadable
andWritable
, for socket etcTransform
, bothReadable
andWritable
, to be used to transform data as it pass by
If we are not interested in controlling the amount of data (backpressure) that the Node.js process accepts, we can use the 'data'
event:
stream.on('data', function (chunk) {
// do something with chunk
})
stream.on('end', function () {
// data finished
})
If we are interested in controlling the amount of data that the Node.js process accepts, we can use pipe()
:
readable.pipe(transform).pipe(writable)
We can also concatenate multiple transforms:
readable.pipe(transform1).pipe(transform2).pipe(writable)
AsyncIterator is another way we can interact with streams, without using pipe()
, but still controlling the amount of data that we can accept, respecting backpressure.
In @calvinmetcalf prototype, we can do:
async function test() {
for await (let x of new MyStream()) {
console.log(x);
}
}
test().then(()=>console.log('done'), e=>{console.log(e)});
This allows us to read from a stream just using a language feature. However it is not clear how we could write()
to another stream, or create a processing pipeline like pipe()
.
Ideally we should be able to do:
async function doubleStream (iter) {
for await (let x of iter) {
yield x * 2
}
}
async function toDest (iter, dest) {
for await (let x of iter) {
await dest.write(s)
}
await dest.end()
}
toDest(doubleStream(doubleStream(stream)), writable)
Error handling was also a topic in the meeting, but it is not clear what it happens if something inside the for await
loops throws. As it seems, it should land in the AsyncIterator.throw
method if we understand this correctly, but I do not see it happening in our demo. We might want to destroy the underlining resources if that happens, however we should probably discuss this further between ourselves.
Would it make sense to allow Writeable.write
to take an async (or sync) iterator and write the contents to the stream by iterating over it?
async function *messagesFromSomewhere(somewhere){
// etc...
}
var messages = messagesFromSomewhere("here");
fs.createWriteStream("messages.txt").write(messages);
This would make it easy to consume sequences with streams, and I think would be intuitive to use without breaking backwards compatibility.
async function *messagesFromSomewhere(somewhere){
// etc...
}
var source = Readable.wrap(messagesFromSomewhere("here"))
var dest = fs.createWriteStream("messages.txt")
source.pipe(dest)
Can I know if messagesFromSomewhere
is finished?
yes the promise result includes a done member, having the ability to create a readable from an async iterator seams like a good idea
Would it make sense to call it Readable.from()
to be similar to Array.from
and similar methods in other types like Observable
?
Would it make sense to allow regular iterables like Array or Set, then? This would potentially help get rid of the need for user libraries like stream-array.
we'd probably need it to be more like the Readable.prototype.wrap so that options could be sepcified (i.e. objectMode, highwatermark and encoding)
@RangerMauve I agree. However it's not the point of the discussion, we can have that in our readable-stream repo.
I think this issue is only about async-iterator, whatever the method name is, we need to make sure can wrap them easily, and possibly assemble them in a transform pipeline.
@calvinmetcalf do you know how would you implement Readable.wrap()
?
yes I'll open a pull for async-iter
This is all very interesting; it's really great to hear that async iteration may fit seemlessly with Node Streams. Do you have any feedback that could affect the semantics of this proposal? Do the currently proposed semantics work well for Node?
I've been following this spec for a while and am a big fan and think it's a great fit.
I didn't know about it, and I'm really happy with the spec.
What is the officially recommended way for using for-await-of with streams on Node.js at the moment? An npm package such as async-iterate-stream?