whatwg/streams

ReadableStream.from(X)

ricea opened this issue ยท 17 comments

ricea commented

It would be good to have a convenience method to create a ReadableStream in a DWIM-ish fashion from something else.

Logic being something like

  1. If X is an async iterable, then create a stream and enqueue each value as a chunk, applying flow control, closing when the async iterable is done.
  2. Else if X is an iterable, then create a stream and enqueue each value as a chunk, closing when the iterable is done.
  3. Else if X is a Blob, return the original value of Blob.prototype.stream called on X.
  4. Else create a stream, enqueue X, and close it.

I wonder if we want to omit (4) for future-proofing, in case there are other idiomatic types we want to add to this.

(3) is also somewhat dubious as it's not much value over blob.stream().

What are code examples where this would be most helpful?

ricea commented

The main motivating example is this from the CompressionStream explainer:

async function compressArrayBuffer(in) {
  const cs = new CompressionStream('deflate');
  const writer = cs.writable.getWriter();
  writer.write(in);
  writer.close();
  const out = [];
  const reader = cs.readable.getReader();
  let totalSize = 0;
  while (true) {
    const { value, done } = await reader.read();
    if (done)
      break;
    out.push(value);
    totalSize += value.byteLength;
  }
  const concatenated = new Uint8Array(totalSize);
  let offset = 0;
  for (const array of out) {
    concatenated.set(array, offset);
    offset += array.byteLength;
  }
  return concatenated;
}

where it took 3 lines just to create a stream containing a single ArrayBuffer chunk, something that feels like it should be a one-liner. This is also a possible solution to whatwg/fetch#809.

Edit:

By "three lines" I meant turning

  const writer = cs.writable.getWriter();
  writer.write(in);
  writer.close();

into

  ReadableStream.from(in).pipeTo(cs.writable);
ricea commented

I just realised that

ReadableStream.from([in]).pipeTo(cs.writable);

would also work, is only two more characters, and requires less DWIM.

Awesome. Yeah just doing (1) and (2) seems totally uncontroversial (although it'll need a bit of Web IDL work, which I'm happy to do).

like this proposal. this could make the conversion between node/whatwg stream easy if both have @@asyncIterator

btw, An idea that poped up in whatwg/fetch#809 is if we maybe could have something like Symbol.toStream... (just throwing out alternative ideas)

PS. have BYOB reader in mind also when designing this?

I just realised that

ReadableStream.from([in]).pipeTo(cs.writable);

That's not that far off from

new Response(in).body.pipeTo(cs.writable);

...exacpt that Response can't handle iterators but it can handle a lot of other things like strings, FormData, blobs, typed arrays... and you can only get a byte stream back

btw, An idea that poped up in whatwg/fetch#809 is if we maybe could have something like Symbol.toStream... (just throwing out alternative ideas)

In the future, we could consider calling x[Symbol.toStream]() (if it exists) from ReadableStream.from(x) to allow for custom conversions. But for now, I think Symbol.asyncIterator will suffice for most use cases.

PS. have BYOB reader in mind also when designing this?

You wouldn't benefit much from having a BYOB reader. We'll be reading chunks with AsyncIterator.next(), which has no way for passing in a buffer to use for the returned chunk. So even if we were to support a BYOB reader for the constructed stream, we would still be copying bytes from it.next()'s result over to the buffer provided to reader.readInto(view).

That said, it would still be nice if there were an easy way to convert any stream with byte chunks into a proper byte stream, even if it incurred extra copies. Something like this could work:

ReadableStream.from(x, { type: "bytes" })

This would be roughly equivalent to a "special" identity transform:

ReadableStream.from(x).pipeThrough(new TransformStream({ readableType: "bytes" }))

Note: this does not work yet. This assumes that TransformStream will at some point support setting readableType and/or writableType to "bytes". These properties are already reserved for this purpose (I think?) in the TransformStream constructor, but they can only be undefined at the moment.

Fwiw, I agree with @domenic. Allow 1 & 2, throw otherwise.

Actually, thinking about it more:

ReadableStream.from(uint8Array);

Seems a bit odd that this creates a stream of numbers. Maybe we should special-case buffer source types, which would create a readable byte stream. Or have an opt-in like @MattiasBuelens suggests.

ReadableStream.from(uint8Array, { type: 'bytes' });

Or, since the option impacts both the accepted input and produced output:

ReadableStream.fromBytes(uint8Array);

It would behave the same as Array.from(uint8array). Perhaps we should add ReadableStream.of(...chunks) to mirror Array.of(...elements)?

ricea commented

It would behave the same as Array.from(uint8array). Perhaps we should add ReadableStream.of(...chunks) to mirror Array.of(...elements)?

I prefer this to making rs.from(uint8array) a special-case. If we special case buffer source types, then we also have to special case strings. Strings are arguably worse, since if you rs.from('word') then you get a stream of 4 1-character strings, which will appear to work but perform horribly.

Is there a way to express of(...) in WebIDL?

Is there a way to express of(...) in WebIDL?

Looks like a variadic operation is a thing in WebIDL, so yes.

ricea commented

Looks like a variadic operation is a thing in WebIDL, so yes.

Thanks!

I saw that node have already added Readable.from

maybe would be grate if they could be similar (isomorphic)?

ricea commented

maybe would be grate if they could be similar (isomorphic)?

If I'm reading the doc correctly, it will be identical, except we probably won't need an options parameter.

If this is added it would be really nice if .from also accepted a strategy. This way given say an async generator we could pass highWaterMark and thus have the stream pull extra values so they're ready more quickly e.g.:

const chunks = ReadableStream.from(
    chunkGenerator, 
    new CountQueueingStrategy({ highWaterMark: 10 }), // Try to have chunks available immediately when possible
);

@Jamesernator I agree, that would be a good addition. We're still discussing whether we want to add that now, or leave it for a future extension. Feel free to chime in. ๐Ÿ™‚