Create an AsyncIterableIterator from anything (on any modern platform) while handling back-pressure!
yarn install -E @n1ru4l/push-pull-async-iterable-iterator
Standalone Usage
import { makePushPullAsyncIterableIterator } from "@n1ru4l/push-pull-async-iterable-iterator";
const {
pushValue,
asyncIterableIterator
} = makePushPullAsyncIterableIterator();
pushValue(1);
pushValue(2);
pushValue(3);
// prints 1, 2, 3
for await (const value of asyncIterableIterator) {
console.log(value);
}
Check if something is an AsyncIterable
import { isAsyncIterable } from "@n1ru4l/push-pull-async-iterable-iterator";
if (isAsyncIterable(something)) {
for await (const value of something) {
console.log(value);
}
}
Note: On Safari iOS Symbol.asyncIterator
is not available, therefore all async iterators used must be build using AsyncGenerators.
If a AsyncIterable that is NO AsyncGenerator is passed to isAsyncIterable
on the Safari iOS environment, it will return the value false
.
Wrap a Sink
import { makeAsyncIterableIteratorFromSink } from "@n1ru4l/push-pull-async-iterable-iterator";
// let's use some GraphQL client :)
import { createClient } from "graphql-ws/lib/use/ws";
const client = createClient({
url: "ws://localhost:3000/graphql"
});
const asyncIterableIterator = makeAsyncIterableIteratorFromSink(sink => {
const dispose = client.subscribe(
{
query: "{ hello }"
},
{
next: sink.next,
error: sink.error,
complete: sink.complete
}
);
return () => dispose();
});
for await (const value of asyncIterableIterator) {
console.log(value);
}
Apply an AsyncIterableIterator to a sink
import Observable from "zen-observable";
import {
makePushPullAsyncIterableIterator,
applyAsyncIterableIteratorToSink
} from "@n1ru4l/push-pull-async-iterable-iterator";
const { asyncIterableIterator } = makePushPullAsyncIterableIterator();
const observable = new Observable(sink => {
const dispose = applyAsyncIterableIteratorToSink(asyncIterableIterator, sink);
// dispose will be called when the observable subscription got destroyed
// the dispose call will ensure that the async iterator is completed.
return () => dispose();
});
const subscription = observable.subscribe({
next: console.log,
complete: () => console.log("done."),
error: () => console.log("error.")
});
const interval = setInterval(() => {
iterator.push("hi");
}, 1000);
setTimeout(() => {
subscription.unsubscribe();
clearInterval(interval);
}, 5000);
Put it all together
import { Observable, RequestParameters, Variables } from "relay-runtime";
import { createClient } from "graphql-ws/lib/use/ws";
import {
makeAsyncIterableFromSink,
applyAsyncIterableIteratorToSink
} from "@n1ru4l/push-pull-async-iterable-iterator";
import { createApplyLiveQueryPatch } from "@n1ru4l/graphql-live-query-patch";
const client = createClient({
url: "ws://localhost:3000/graphql"
});
export const execute = (request: RequestParameters, variables: Variables) => {
if (!request.text) {
throw new Error("Missing document.");
}
const query = request.text;
return Observable.create<GraphQLResponse>(sink => {
// Create our asyncIterator from a Sink
const executionResultIterator = makeAsyncIterableFromSink(wsSink => {
const dispose = client.subscribe({ query }, wsSink);
return () => dispose();
});
const applyLiveQueryPatch = createApplyLiveQueryPatch();
// apply some middleware to our asyncIterator
const compositeIterator = applyLiveQueryPatch(executionResultIterator);
// Apply our async iterable to the relay sink
// unfortunately relay cannot consume an async iterable right now.
const dispose = applyAsyncIterableIteratorToSink(compositeIterator, sink);
// dispose will be called by relay when the observable is disposed
// the dispose call will ensure that the async iterator is completed.
return () => dispose();
});
};
This package also ships a few utilities that make your life easier!
Map a source
import { map } from "@n1ru4l/push-pull-async-iterable-iterator";
async function* source() {
yield 1;
yield 2;
yield 3;
}
const square = map((value: number): number => value * value);
for await (const value of square(source())) {
console.log(value);
}
// logs 1, 4, 9
Filter a source
import { filter } from "@n1ru4l/push-pull-async-iterable-iterator";
async function* source() {
yield 1;
yield 2;
yield 3;
}
const biggerThan1 = filter((value: number): number => value > 1);
for await (const value of biggerThan1(source())) {
console.log(value);
}
// logs 2, 3
Attach a return and throw handler to a source.
import { withReturn } from "@n1ru4l/push-pull-async-iterable-iterator";
async function* source() {
yield 1;
yield 2;
yield 3;
}
const sourceInstance = source();
const newSourceWithHandlers = withHandlers(
sourceInstance,
() => sourceInstance.return(),
err => sourceInstance.throw(err)
);
for await (const value of stream) {
// ...
}