/runnify

Primary LanguageTypeScript

🏃🏼‍♂️‍➡️🏃🏼‍➡️ Runnify 🏃🏽‍♀️‍➡️

Typescript library that allows you to create chains of executable code to manipulate an initial state.

It has the following features:

  • The state is immutable
  • Execution can be in parallel
  • Every step can be async
  • It can be used in streaming as a Transformer
  • Implements an event emitter to let emit events during the execution. Emitted events can be listened to for telemetry purposes or to send progressive responses to clients
  • For each chain or for each step in the chain it is possible to set:
    • Retry
    • Cache
    • Timeout
    • Fallback
    • Circuit Breaker
    • Bulk Head Limiter
  • Natively implements OpenTelemetry to track each execution step
  • Perform conditional jumps within the chain (eg. to do loops or to act as node graph)
  • Execute loops for iterable state objects (eg. Arrays)
  • Pass a context to the chain to make it available to all steps
  • Nest chains within chains
  • Use signal to Abort chain run
  • Use Zod to manipulate the state

Documentations

🚀 Read full docs here

Install

npm install runnify

Examples

Chain with nested chains

const subSequence = Runnable.from(
  [
    { k: async () => "O", j: 1 },
    async (state: any) => {
      state.y = "ciao";
      return state;
    }
  ],
  { name: "sub:seq" }
);

const subSubSequence = Runnable.from(
  [
    { z: async () => "Z" },
    {
      y: async (state: any) => {
        return "Y";
      }
    }
  ],
  { name: "sub:sub:seq" }
);

const main = Runnable.init({ name: "full:main:seq" })
  .assign("year",() => new Date.getYear(), { name: "year" })
  .assign({ b: 2 }, { name: "b" })
  .pipe((state)=> {
	  state.joy = "high";
	  return state;
  })
  .push((state) =>{
	  return {a: state.a + 1}
  }, { name: "increment:a" })
  .passThrough((state,{emit}) =>{
	  if (state.a === 1) emit("check", "a is ok");
  }, { name: "emit:check" })
  .push(setC, { name: "set:c" }) // setC is a fnc
  .milestone("state:analyzed")
  .push(subSequence)
  .branch(
    [
      { if: A_is_1, then: subSubSequence },
      { if: A_is_5, then: setE }
    ],
    { name: "on:a:1:or:5" }
  )
  .parallel([setF, setG], { name: "set:f:and:g" })
  .go(
    [
      { to: "increment:a", if: A_less_4 },
      { to: "state:analyzed", if: A_more_9 }
    ],
    { name: "go:checking:a" }
  )
  .assign({
    blocks: [
      { id: 1, items: [{ id: 1 }, { id: 2 }, { id: 3 }] },
      { id: 2, items: [{ id: 1 }, { id: 2 }, { id: 3 }] }
    ]
  })
  .push(processBlocks) // processBlocks is a nested chain
  .on("check", (msg: string) => true);
  
  const res = await main.run({ a: 0 });

Loops

const setBlockIndex = async (state: any) => {
  state.element.index = state.index;
  return state;
};
const setBlockCheck = async (state: any) => {
  state.element.check = true;
  return state;
};
const setItemTitle = async (state: any) => {
  state.element.title = "title";
  return state;
};
const setItemDescription = async (state: any) => {
  state.element.description = "description";
  return state;
};

const processBlock = Runnable.init({ name: "block:chain" }).parallel(
  [setBlockIndex, setBlockCheck],
  { name: "set:index:and:check" }
);
const processItems = Runnable.init({ name: "items:chain" }).loop({
  key: "items",
  chain: (chain: Runnable) =>
    chain.parallel([setItemTitle, setItemDescription], {
      name: "set:title:and:description"
    })
});
const processBlocks = Runnable.init({ name: "blocks:chain" }).loop({
  key: "blocks",
  chain: (chain: Runnable) =>
    chain.parallel([processBlock, processItems], {
      name: "process:block:and:items"
    })
});

const main = Runnable.init({ name: "loop:seq" }).push(processBlocks);

const res = res = await main
    .run({
      blocks: [
        { id: 1, items: [{ id: 1 }, { id: 2 }, { id: 3 }] },
        { id: 2, items: [{ id: 1 }, { id: 2 }, { id: 3 }] }
      ]
    })

Streaming

import { Readable, Writable } from "node:stream";
import { pipeline } from "node:stream/promises";

interface State {
  a: number;
  b?: number;
}

const read: Readable = Readable.from(
  Array.from({ length: 10 }, (_, a) => ({ a }))
);

const main = Runnable.init({ name: "stream:main:seq" }).assign({
  b: async (state: State) => state.a + 1
});

// get the transform stream from chain
const transform = main.stream();

const risultati: State[] = [];

const write = new Writable({
  objectMode: true,
  write: (state: State, _, next) => {
    risultati.push(state);
    next();
  }
});

await pipeline(read, transform, write);

Retry, Fallback and Timeout

let errors: number;

// Retry
const r = Runnable.init({
  name: "wrap:main:seq",
  circuit: { retry: 3 }
}).push(function throwError(): never {
  throw new Error(`Error ${++errors}`);
});

// Retry + Timeout
const r = Runnable.init({
  name: "wrap:main:seq"
}).push(
  function throwError() {
    throw new Error(`Error ${++errors}`);
  },
  { circuit: { retry: 3, timeout: 100000 } }
);

// Retry + Circuit Breaker
const r = Runnable.init({
  name: "wrap:main:seq"
}).push(
  function throwError() {
    throw new Error(`Error ${++errors}`);
  },
  { circuit: { retry: 3, circuitBreaker: { consecutiveFaillures: 2 } } }
);

// Retry + Circuit Breaker + Fallback
const r = Runnable.init({
  name: "wrap:main:seq"
}).push(
  function throwError() {
    throw new Error(`Error ${++errors}`);
  },
  {
    circuit: {
      retry: 3,
      circuitBreaker: { consecutiveFaillures: 2 },
      fallback: async (params: any) => {
        await sleep(10);
        return { fallback: "hallo" };
      }
    }
  }
);

Caching

The cache must be set in the 'circuit' parameter and can be applied to the entire chain and/or to each individual step. It's important to assign a name to the cache so that only one cache object can be instantiated for each name/label (useful for external store adapters to avoid opening N connections). The cache system used is Keyv thus any Keyv-compatible adapter can be used. By default, the QuickLRU cache is used.

// Fixed Params
const cache = {
{
      name: 'cache-map',
      store: new Map(),
      active: true,
      cacheKeyStrategy: [ 'a', 'b', 'c' ], // get those keys
      ttlStrategy: 10000, // ms
      timeout: 200 // ms
}

// Dynamic Params
{
      name: 'cache-LRU',
      store: undefined, // if omitted -> dafault QuickLRU cache
      maxSize: 100, // max object in LRU cache [default 1000] 
      active: (state:any) => state.type !== "j",
      cacheKeyStrategy: (state:any) =>{
	      if(staet.type === "k") return `${state.a}:${state.c}`
	      return `${state.b}:${state.c}`
      },
      ttlStrategy: (state:any) =>{
	      if(state.type === "k") return 1000;
	      return 1000 * 60 * 24;
      },
      timeout: 200
    }

// Cache Store Adaptor eg. Redis
import KeyvRedis from "@keyv/redis";

 const keyvRedis = new KeyvRedis("redis://localhost:6379");
const cache = {
	name: "cache-redis",
	store: keyvRedis
}
 
// Set Chain Cache
const chain = Runnable.init({circuit:{ cache }})

// Set Step Cache
const chain = Runnable.init({}).pipe((state:any)=>{
	...
},{circuit:{ cache }})

Goto

const seq = Runnable.init({ name: "operators:pipe:seq" })
    .pipe(
      (state: any) => {
        state.a = (state.a || 0) + 1;
        return state;
      },
      { name: "increment:a" }
    )
    .go({
      to: "increment:a",
      if: (state: any) => state.a < 10
    })
    .pipe((state: any) => {
      state.c = state.a / 2;
      return state;
    });

const state = await seq.run({ b: 0 });

External Context

const ctx = {
  config: {
    get: async (key: string) => "value"
  },
  gRPC: {
    ingress: {
      get: async (key: string) => "ingress"
    }
  }
};

const main = Runnable.init({ name: "context:main:seq", ctx }).assign({
  conf: async function (state: any, params: RunFncParams) {
    return await params.ctx.config.get("key");
  },
  remote: async function (state: any, params: RunFncParams) {
    return await params.ctx.gRPC.ingress.get("key");
  }
});

const state = await main.run({});

State Manipulation (Zod)

import { z } from "zod";
const schema: z.ZodType = z.object({ name: z.string(), age: z.number() });


// Simple Pick (array of keys)
const simplePick = Runnable.init({ name: "zod:pick:pipe:seq" })
  .pick(["keyA","keyB"], { name: "pick:keysArray" })
  
// Zod Pick
const pickSeq = Runnable.init({ name: "zod:pick:seq" }).pick(schema);

// Zod Pipe
const pipeSeq = Runnable.init({ name: "zod:pipe:seq" }).push(
  (state: any) => {
    state.extra = state.extra;
    return state;
  },
  { schema, name: "pipe:pick" }
);

// Zod Pipe Pick
const pipeAndPickSeq = Runnable.init({ name: "zod:pipe:pick:seq" })
  .push((state: any) => state, { schema })
  .pick(schema, { name: "filter:egress" });
  
// Zod Pick Pipe
const pickAndPipeSeq = Runnable.init({ name: "zod:pick:pipe:seq" })
  .pick(schema, { name: "filter:ingress" })
  .push((state: any) => state);

Signal

// Immediately Abort
const signal = AbortSignal.abort();
const res = await chain.run({}, { signal }));

// Abort on Timeout
const signal = AbortSignal.timeout(50);
const res = await chain.run({}, { signal }));

Progressive Response (HTTP SSE)

import Fastify from "fastify";
import { FastifySSEPlugin } from "fastify-sse-v2";
import cors from "@fastify/cors";
import { RunState, RunFncParams } from "../dist/types.js";

interface State extends RunState {
  a: number;
  done?: boolean;
}
const main = Runnable.init({ name: "progressive:seq" })
  .passThrough((state: State, params: RunFncParams) => {
    params.emit("progress", { a: 1 });
  })
  .passThrough((state: State, params: RunFncParams) => {
    params.emit("progress", { a: 2 });
  })
  .passThrough((state: State, params: RunFncParams) => {
    params.emit("progress", { a: 3, done: true });
  });

async function* getProgress() {
  const emitter = new EventEmitter();
  main.run({}, { emitter });

  for await (const [state] of on(emitter, "progress")) {
    yield state;
    if (state.done) break;
  }
}

const fastify = Fastify({
  logger: true
});
await fastify.register(cors, {
  origin: true
});
fastify.register(FastifySSEPlugin);

fastify.get("/progress", async function (req, res) {
  res.sse(getProgress());
});

Stream Steps

interface State extends RunState {
  a: number;
  done?: boolean;
}
const main = Runnable.init({ name: "progressive:seq" })
  .passThrough((state: State, params: RunFncParams) => {
    params.emit("progress", { a: 1 });
  })
  .passThrough((state: State, params: RunFncParams) => {
    params.emit("progress", { a: 2 });
  })
  .passThrough((state: State, params: RunFncParams) => {
    params.emit("progress", { a: 3, done: true });
  });
  
 const states: string[] = [];

for await (const step of main.streamSteps()) {
	states.push(step.type);
	if (step.type === "end") break;
}

OpenTelemetry

// instrumentation.js
import { NodeSDK } from "@opentelemetry/sdk-node";
import { ConsoleSpanExporter } from "@opentelemetry/sdk-trace-node";
import { Resource } from "@opentelemetry/resources";
import {
  SEMRESATTRS_SERVICE_NAME,
  SEMRESATTRS_SERVICE_VERSION
} from "@opentelemetry/semantic-conventions";

import { SimpleSpanProcessor } from "@opentelemetry/sdk-trace-base";
import { JaegerExporter } from "@opentelemetry/exporter-jaeger";
import { PrometheusExporter } from "@opentelemetry/exporter-prometheus";
import { getNodeAutoInstrumentations } from "@opentelemetry/auto-instrumentations-node";

const sdk = new NodeSDK({
  resource: new Resource({
    [SEMRESATTRS_SERVICE_NAME]: "Runnify",
    [SEMRESATTRS_SERVICE_VERSION]: "1.0"
  }),
  spanProcessors: [
    new SimpleSpanProcessor(new JaegerExporter())
    //new SimpleSpanProcessor(new ConsoleSpanExporter())
  ],
  metricReader: new PrometheusExporter(),
  instrumentations: [getNodeAutoInstrumentations()]
});

sdk.start();

Jeager Example

License

Licensed under MIT.