tjjfvi/subshape

scale-based rpc library

tjjfvi opened this issue · 5 comments

Name TBD – srpc? scalar?

// api.ts

import { $func } from "srpc"

export const $api = $.object(
  $.field("a", $func($.tuple(), $.str)),
  $.field("b", $func($.tuple(), $.str)),
  $.field("c", $func($.tuple($.str, $.i8), $.field("some", $.str)))
)
// server.ts

import { $api } from "./api.ts"
import { serve } from "srpc"

const api: $.Native<typeof $api> = {
  a: async () => "a",
  b: async () => Math.random() ? "heads" : "tails",
  c: async (foo, bar) => ({ some: `${foo.toUpperCase()}-${bar.toFixed(2)}` })
}

serve($api, api)
// client.ts

import { $api } from "./api.ts"
import { connect } from "srpc"

const api = await connect($api, "ws://...")

console.log(await api.c("Hello", 46)) // HELLO-46.00

@vjjft suggests scald

  • what connection types should be supported out-of-the-box?
  • is an object definition desirable? What other schemes might we consider? In an offline convo, you mentioned deeply-nested fns.
import { $func } from "x"

export const $api = $.field(
  "a",
  $.field(
    "b",
    $.field(
      "c",
      $func($.tuple(), $.str)
    )
  )
)
  • What do we want as a convention for scale lib naming? Do we want to simply prefix the subject codec (in this case a $func) with scale- / aka. scale-func?
  • I'd personally prefer fn instead of func, but this isn't a strong opinion.

I'd personally prefer fn instead of func, but this isn't a strong opinion.

Agreed.

is an object definition desirable? What other schemes might we consider? In an offline convo, you mentioned deeply-nested fns.

Yes, $fn would be general such that you could nest fields like that, return functions from functions, etc.

What do we want as a convention for scale lib naming? Do we want to simply prefix the subject codec (in this case a $func) with scale- / aka. scale-func?

scale-foo as a general convention seems reasonable, but given that this library isn't simply a collection of codecs (but rather an api building off of scale), I think a name not following this convention makes sense. I'm fond of scald, personally.

what connection types should be supported out-of-the-box?

Web sockets and workers, at minimum. Ideally we should also make it as easy as possible to use with custom connections.

I'm fond of scald, personally.

When I first read the suggestion of scald, it sounded a bit aggressive (no one wants to be scalded). But I suppose JS tools have a sorted past of heat-related names #blazingly. Another contender: scall.

EDIT

Scall is defined as "a scurf or scabby disorder," so that's a no.

An initial implementation, extracted from paritytech/capi#823
import * as $ from "../deps/scale.ts"
import { Deferred, deferred } from "../deps/std/async.ts"

const $id = $.compact($.u32)

type Message = $.Native<typeof $message>
const $message = $.taggedUnion("type", [
  $.variant(
    "call",
    $.field("callId", $id),
    $.field("fn", $id),
    $.field("args", $.uint8Array),
  ),
  $.variant(
    "resolve",
    $.field("callId", $id),
    $.field("value", $.uint8Array),
  ),
  $.variant(
    "reject",
    $.field("callId", $id),
    $.field("error", $.str),
  ),
])

type ResolutionMessage = Extract<Message, { type: "resolve" | "reject" }>

export interface Link {
  send(data: Uint8Array): void
  recv(cb: (data: Uint8Array) => void, signal: AbortSignal): void
}

class Scald {
  constructor(readonly connection: Link, readonly signal: AbortSignal) {
    this.connection.recv((data) => {
      const message = $message.decode(data)
      this.recv(message)
    }, this.signal)
  }

  nextHeldId = 0
  held = new Map<number, (data: Uint8Array) => Promise<Uint8Array>>()

  nextCallId = 0
  pending = new Map<number, Deferred<ResolutionMessage>>()

  async recv(message: Message) {
    const { callId } = message
    if (message.type !== "call") {
      this.pending.get(callId)?.resolve(message)
      return
    }
    const held = this.held.get(message.fn)
    if (!held) {
      return this.send({ type: "reject", callId, error: "invalid callId" })
    }
    try {
      const value = await held(message.args)
      this.send({ type: "resolve", callId, value })
    } catch (e) {
      return this.send({ type: "reject", callId, error: Deno.inspect(e) })
    }
  }

  send(message: Message): void {
    this.connection.send($message.encode(message))
  }

  expose(value: (data: Uint8Array) => Promise<Uint8Array>) {
    const id = this.nextHeldId++
    this.held.set(id, value)
    return id
  }

  async call(fn: number, args: Uint8Array) {
    const callId = ++this.nextCallId
    const pending = deferred<ResolutionMessage>()
    this.pending.set(callId, pending)
    this.send({ type: "call", callId, fn, args })
    const result = await pending
    if (result.type === "resolve") return result.value
    throw new ScaldError(result.error)
  }

  async encode<T>($value: $.Codec<T>, value: T) {
    const buf = new $.EncodeBuffer($value._staticSize)
    buf.context.get(ScaldContext).scald = this
    $value._encode(buf, value)
    return buf.finishAsync()
  }

  decode<T>($value: $.Codec<T>, data: Uint8Array): T {
    const buf = new $.DecodeBuffer(data)
    buf.context.get(ScaldContext).scald = this
    return $value._decode(buf)
  }
}

export class ScaldError extends Error {
  override name = "ScaldError"
}

class ScaldContext {
  scald?: Scald
}

export function $fn<A extends unknown[], R>(
  $args: $.Codec<A>,
  $return: $.Codec<R>,
): $.Codec<(...args: A) => Promise<R>> {
  return $.createCodec({
    _metadata: $.metadata("$fn", $fn, $args, $return),
    _staticSize: $id._staticSize,
    _encode(buffer, fn) {
      const scald = buffer.context.get(ScaldContext).scald
      if (!scald) {
        throw new $.ScaleEncodeError(this, fn, "$fn can only be used in a scald context")
      }
      const id = scald.expose(async (data) => {
        const args = scald.decode($args, data)
        const result = await fn(...args)
        return scald.encode($return, result)
      })
      $id._encode(buffer, id)
    },
    _decode(buffer) {
      const scald = buffer.context.get(ScaldContext).scald
      if (!scald) {
        throw new $.ScaleDecodeError(this, buffer, "$fn can only be used in a scald context")
      }
      const id = $id._decode(buffer)
      return async (...args: A) => {
        const data = await scald.call(id, await scald.encode($args, args))
        const result = scald.decode($return, data)
        return result
      }
    },
    _assert(assert) {
      assert.typeof(this, "function")
    },
  })
}

export class WsLink implements Link {
  ready = deferred()
  constructor(readonly ws: WebSocket, signal: AbortSignal) {
    ws.binaryType = "arraybuffer"
    this.ws.addEventListener("open", () => {
      this.ready.resolve()
    })
    signal.addEventListener("abort", () => {
      this.ws.close()
    })
  }

  async send(data: Uint8Array) {
    await this.ready
    this.ws.send(data.buffer)
  }

  recv(cb: (data: Uint8Array) => void, signal: AbortSignal): void {
    this.ws.addEventListener("message", (msg) => {
      if (!(msg.data instanceof ArrayBuffer)) return
      cb(new Uint8Array(msg.data))
    }, { signal })
  }
}

export function serveScald<T>(
  $api: $.Codec<T>,
  api: T,
  connection: Link,
  signal: AbortSignal,
) {
  const scald = new Scald(connection, signal)
  scald.expose(() => scald.encode($api, api))
}

export async function connectScald<T>(
  $api: $.Codec<T>,
  connection: Link,
  signal: AbortSignal,
): Promise<T> {
  const scald = new Scald(connection, signal)
  const data = await scald.call(0, new Uint8Array())
  return scald.decode($api, data)
}