De-spaghettify your event logic! Declaratively compose event logic with Topics and Streams.
npm install linguine
yarn add linguine
Linguine is a library for declaratively composing event logic.
Linguine is built around two concepts: Topics and Streams.
A Topic is something you can write values to. A Stream takes values from a Topic and manipulates them. The beauty of linguine is Streams can branch and merge, allowing you to compose complex event logic in a declarative way.
import { Topic } from 'linguine'
const numberTopic = new Topic<number>()
const doubledNumberTopic = new Topic<number>()
const numberStream = numberTopic.stream()
numberStream.map((value) => value * 2).to(doubledNumberTopic)
numberStream.destroy()
Calls the given function for each value in the stream.
const topic = new Topic<number>()
// double each value
topic.stream().map((value) => value * 2)
Calls the given function for each value in the stream, and returns multiple messages. The given function must return an array.
const topic = new Topic<number>()
// return the original value, plus the doubled value
topic.stream().flatMap((value) => [value, value * 2])
Only pass values through the stream that match a predicate.
const topic = new Topic<number>()
// only pass even numbers through the stream
topic.stream().filter((value) => value % 2 === 0)
Call a function on each value in the stream.
const topic = new Topic<number>()
// log each value
topic.stream().forEach((value) => console.log(value))
Write each value in the stream to a topic. This is a terminal operation.
const inputTopic = new Topic<number>()
const outputTopic = new Topic<number>()
// write each value to another topic
topic.stream().to(outputTopic)
Merge two streams together.
const topicA = new Topic<number>()
const topicB = new Topic<string>()
const streamA = topicA.stream()
const streamB = topicB.stream()
// merge the two streams together
const mergedStream = streamA.merge(streamB)
Merging streams is typesafe! The merged stream's type will be the union of the merged streams.
Catches errors in the following streams.
const topic = new Topic<number>()
// catch errors in the following streams
topic.stream().catchError((error) => console.error(error))
Skip duplicate values in the stream.
const topic = new Topic<number>()
// skip duplicate values and log the results
topic
.stream()
.skipDuplicates()
.forEach((value) => console.log(value))
topic.write(1)
topic.write(1)
// stdout:
// `1`
Debounce the stream. A message will only be passed through the stream if there are no other messages for a given number of milliseconds.
const topic = new Topic<number>()
// debounce by 1000ms
topic
.stream()
.debounce(1000)
.forEach((value) => console.log(value))
// only `2` and `3` will be logged
topic.write(1)
topic.write(2)
setTimeout(() => {
topic.write(3)
}, 1000)
Delay the stream by a given number of milliseconds.
const topic = new Topic<number>()
// delay by 1000ms
topic
.stream()
.delay(1000)
.forEach((value) => console.log(value))
// `1` will be logged after 1000ms
topic.write(1)
Throttle the stream by time. Only one message will be passed through the stream every ms
milliseconds, others will be ignored.
const topic = new Topic<number>()
// only pass one message through the stream every 1000ms
topic
.stream()
.throttleByTime(1000)
.forEach((value) => console.log(value))
// only `1` and `3` will be logged
topic.write(1)
topic.write(2)
setTimeout(() => {
topic.write(3)
}, 1000)
Buffer messages in the stream by time. Messages will be passed through the stream in batches, with each batch containing messages that were written to the stream within a given number of milliseconds of each other.
const topic = new Topic<number>()
// buffer messages by 1000ms
topic
.stream()
.bufferByTime(1000)
.forEach((values) => console.log(values))
topic.write(1)
topic.write(2)
setTimeout(() => {
topic.write(3)
}, 1000)
// stdout:
// `[1, 2]`
// `[3]`
Buffer messages in the stream by count. Messages will be passed through the stream in batches, with each batch containing a given number of messages.
const topic = new Topic<number>()
// buffer messages in groups of 2
topic
.stream()
.bufferByCount(2)
.forEach((values) => console.log(values))
topic.write(1)
topic.write(2)
topic.write(3)
// stdout:
// `[1, 2]`
Buffer messages in the stream until a given function returns true.
const topic = new Topic<number>()
// buffer messages until the buffer inclues the number 3
topic
.stream()
.bufferUntil((values) => values.includes(3))
.forEach((values) => console.log(values))
topic.write(1)
topic.write(2)
topic.write(3)
// stdout:
// `[1, 2, 3]`
Destroys the stream from the current node.
const topic = new Topic<number>()
const stream = topic.stream()
stream
.map((value) => value * 2)
.forEach((value) => console.log(value))
// should log `2`
topic.write(1)
// destroy the stream
stream.destroy()
// should not log anything
topic.write(1)