xemantic/xemantic-osc

That’s cool!

damianogiusti opened this issue · 7 comments

Hi! This project looks very promising, could you please detail in the README file how to use it? As far as I understood, it looks like a converter that is pluggable in Ktor, right?

Thanks a lot!

I got about this far:

    val osc = osc {
        hostname = "127.0.0.1"
        port = 5555
    }

    val theOtherThing = osc.output {
        hostname = "127.0.0.1"
        port = 9000
    }

    theOtherThing.send("/chatbox/input",  listOf("Hi", true))

Didn't do anything - didn't error either. Mystery box.

Also took a second shot:

    osc {
        hostname = "127.0.0.1"
        port = 5555
    }.use { osc ->
        osc.output {
            hostname = "127.0.0.1"
            port = 9000
        }.use { theOtherThing ->
            theOtherThing.send("/chatbox/input",  listOf("Hi", true))
        }
    }

The code which went to the repo so far is more like a sketch. I am already using much more mature version in production, still I have to review all the APIs before committing the first alpha version. I am allocating some time for this at the beginning of 2023. Anyway this code already powered one show:

https://vimeo.com/kazikpogoda/404-clip
https://xemantic.com/404/

I've been bashing my own API together just in case I will need it. Since it's quite a small sort-of-standard, the amount of code needed to meet the spec isn't all that much, but it certainly gets interesting once you try to put a clean API on it, as there are a lot of ways I could imagine people wanting to use it.

The minimum viable API I had in mind for mine was something like:

    route("/container1/method1") { value1: Int, value2: String -> doTheThing() }
    route("/container1/method2") { value1: Int, value2: Boolean -> doTheOtherThing() }

But I can also imagine someone wanting to do something like...

    class MyThing {
        @OscMethod("method1")
        fun doTheThing(value1: Int, value2: String) { ... }

        @OscMethod("method2")
        fun doTheOtherThing(value1: Int, value2: Boolean) { ... }
    }

    route("/container1", MyThing());

Which would behave exactly the same but gives you a different way to structure the callbacks if you had a lot of them.

And then of course what I'm trying to actually do in my end application is, a node-based editor where the user drags and drops an OSC method endpoint onto the graph and wires it up to the other stuff in the graph. Which I think I can build on basic lambda-based routes, but I won't know until I get far enough to try. In the first place, node-based editor design is hard and I haven't quite figured out the right way to do something event-driven.

Annotation scanning would be possible, but I consider it something which is already an outer layer. In my particular case I use xemantic-state project to define state through delegates instead of annotations. This way I can use OSC protocol as a transport layer for transparently distributing state of complex hierarchical objects, while still allowing mutation of this state through standard OSC clients without any additional code. Maybe I will embrace your routing nomenclature. What I am struggling with at the moment, while seeing usage patterns of OSC protocol, is that initially I assumed that all the address routing happens only when OSC is initialized on certain port, but it seems that typically routing can be dynamic, and even OSC query protocol is supporting notification when it is being changed. So I think I will rather switch to this scenario, which will require something like CopyOnWriteArrayList for keeping the routing information. It's not available in Kotlin MPP, so I have to think about the best replacement honoring decent concurrency.

Here is my current test case I have to improve:

package com.xemantic.osc

import io.kotest.matchers.collections.containExactlyInAnyOrder
import io.kotest.matchers.collections.shouldHaveSize
import io.kotest.matchers.shouldBe
import io.ktor.utils.io.core.*
import kotlinx.coroutines.*
import kotlinx.coroutines.test.runTest
import kotlin.test.AfterTest
import kotlin.test.Test

@OptIn(ExperimentalCoroutinesApi::class)
class OscTest {

  val listOfStringsConverter = Osc.Message.Converter(
    decode = {
      typeTag.forEach {
        check(it == 's') {
          "Invalid type tag from list of strings: $typeTag"
        }
      }
      typeTag.map { string() }
    },
    encode = {
      typeTag("s".repeat(value.size))
      value.forEach {
        string(it)
      }
    }
  )

  private lateinit var receivingOsc: Osc

  private lateinit var sendingOsc: Osc

  @AfterTest
  fun shutDownOsc() {
    if (this::sendingOsc.isInitialized) { sendingOsc.close() }
    if (this::receivingOsc.isInitialized) { receivingOsc.close() }
  }

  @Test
  fun writeOscTypeTagWithPadding() {
    // when
    val floatTag = toBytes { writeOscTypeTag("f") }
    val vec2Tag = toBytes { writeOscTypeTag("ff") }
    val vec3Tag = toBytes { writeOscTypeTag("fff") }
    val vec4Tag = toBytes { writeOscTypeTag("ffff") }

    // then
    floatTag shouldBe byteArrayOf(COMMA_BYTE, FLOAT_BYTE, 0, 0)
    vec2Tag shouldBe byteArrayOf(COMMA_BYTE, FLOAT_BYTE, FLOAT_BYTE, 0)
    vec3Tag shouldBe byteArrayOf(COMMA_BYTE, FLOAT_BYTE, FLOAT_BYTE, FLOAT_BYTE, 0, 0, 0, 0)
    vec4Tag shouldBe byteArrayOf(
      COMMA_BYTE,
      FLOAT_BYTE,
      FLOAT_BYTE,
      FLOAT_BYTE,
      FLOAT_BYTE,
      0,
      0,
      0
    )
  }

  @Test
  fun shouldSendAndReceiveOscMessages() {
    receivingOsc = osc {
      converter<List<String>>(listOfStringsConverter)
      address<Int>("/int")
      address<Float>("/float")
      address<Double>("/double")
      address<String>("/string")
      address<Boolean>("/boolean")
      address("/listOfStrings1", listOfStringsConverter)
      address("/listOfStrings2", listOfStringsConverter)
      address<List<String>>("/listOfStrings3")
    }
    // technically we could use the same osc port for input and output
    // but let's make it more similar to actual use case
    sendingOsc = osc { }

    sendingOsc.output {
      port = receivingOsc.port
      converter<List<String>>(listOfStringsConverter)
      address<Int>("/int")
      address<Float>("/float")
      address<Double>("/double")
      address<String>("/string")
      address<Boolean>("/boolean")
      address("/listOfStrings1", listOfStringsConverter)
      address("/listOfStrings2", listOfStringsConverter)
      address<List<String>>("/listOfStrings3")
    }.use { output ->

      runTest {
        val receivedMessages = mutableListOf<Osc.Message<*>>()

        val job = launch {
          receivingOsc.messageFlow.collect {
            receivedMessages.add(it)
            if (receivedMessages.size == 6) {
              coroutineContext.job.cancel()
            }
          }
        }

        output.send("/double", 42.0)
        output.send("/float", 4242.0f)
        output.send("/string", "foo")
        output.send("/boolean", true)
        output.send("/boolean", false)
        output.send("/listOfStrings1", listOf("bar", "buzz"))

        job.join()

        receivedMessages shouldHaveSize 6
        // slight packet reordering might happen
        receivedMessages.map { it.value } shouldBe containExactlyInAnyOrder(
          42.0,
          4242.0f,
          "foo",
          true,
          false,
          listOf("bar", "buzz")
        )
      }
    }
  }
}

fun toBytes(
  output: Output.() -> Unit
): ByteArray = buildPacket {
  output()
}.readBytes()

const val FLOAT_BYTE = 'f'.code.toByte()

As you can see I am receiving all the OSC messages as a flow/stream. Your routing would be something like registering converter with proper type, and then hooking to messageFlow to react to received data. I will think about this use case to possibly simplify it in the API already. Probably your action lambdas would have to be suspended, so interacting with osc this way would need to be defined within a coroutine, which actually makes sense.

Here is current API independent for UDP/ktor transport (can be also HTTP/web sockets transport, or TCP transport):

package com.xemantic.osc

import io.ktor.utils.io.core.*
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.Flow
import mu.KLogger
import kotlin.reflect.KType
import kotlin.reflect.typeOf

interface Osc : Closeable {

  open class Builder(val direction: String = "INPUT") {

    var hostname: String = "0.0.0.0"

    var port: Int = 0

    val converterMap = mutableMapOf<KType, Message.Converter<*>>(
      typeOf<Int>() to Message.Converter.INT,
      typeOf<Float>() to Message.Converter.FLOAT,
      typeOf<Double>() to Message.Converter.DOUBLE,
      typeOf<String>() to Message.Converter.STRING,
      typeOf<Boolean>() to Message.Converter.BOOLEAN,
      typeOf<List<Int>>() to Message.Converter.LIST_OF_INTS,
      typeOf<List<Float>>() to Message.Converter.LIST_OF_FLOATS,
      typeOf<List<Double>>() to Message.Converter.LIST_OF_DOUBLES,
      typeOf<List<String>>() to Message.Converter.LIST_OF_STRINGS,
    )

    val conversions = mutableListOf<Pair<String, Message.Converter<*>>>()

    inline fun <reified T> converter(converter: Message.Converter<T>) {
      converterMap[typeOf<T>()] = converter
    }

    inline fun <reified T> address(address: String) {
      val type = typeOf<T>()
      val converter = converterMap[type]
        ?: throw IllegalArgumentException("No converter for type: $type")
      conversions.add(Pair(address, converter))
    }

    fun address(address: String, converter: Message.Converter<*>) {
      conversions.add(Pair(address, converter))
    }

  }

  val coroutineScope: CoroutineScope

  val hostname: String

  val port: Int

  val messageFlow: Flow<Message<*>>

  fun <T> valueFlow(address: String): Flow<T>

  interface Output : Closeable {

    class Builder : Osc.Builder(direction = "OUTPUT")

    // TODO do we need address here? It seems that it's not used.
    val hostname: String

    val port: Int

    fun send(packet: Packet)

    /**
     * @throws IllegalArgumentException if no converter was registered for the [address].
     */
    fun <T> send(address: String, value: T)

  }

  interface Packet

  data class Message<T>(
    val address: String,
    val value: T,
    val hostname: String,
    val port: Int
  ) : Packet {

    class Reader(
      val typeTag: String,
      val input: Input
    ) {
      fun int(): Int = input.readInt()
      fun float(): Float = input.readFloat()
      fun double(): Double = input.readDouble()
      fun string(): String = input.readOscString()
    }

    class Writer<T>(
      val value: T,
      val output: io.ktor.utils.io.core.Output
    ) {
      fun typeTag(tag: String) = output.writeOscTypeTag(tag)
      fun int(x: Int) = output.writeInt(x)
      fun float(x: Float) = output.writeFloat(x)
      fun double(x: Double) = output.writeDouble(x)
      fun string(x: String) = output.writeOscString(x)
    }

    class Converter<T>(
      val typeTag: String? = null,
      val decode: Message.Reader.() -> T,
      val encode: Message.Writer<T>.() -> Unit
    ) {

      class Error(message: String) : RuntimeException(message)

      companion object {

        val INT = Converter(
          typeTag = "i",
          decode = { int() },
          encode = { int(value) }
        )

        val FLOAT = Converter(
          typeTag = "f",
          decode = { float() },
          encode = { float(value) }
        )

        val DOUBLE = Converter(
          typeTag = "d",
          decode = { double() },
          encode = { double(value) }
        )

        val STRING = Converter(
          typeTag = "s",
          decode = { string() },
          encode = { string(value) }
        )

        val BOOLEAN = Converter(
          decode = {
            when (typeTag) {
              "T" -> true
              "F" -> false
              else -> throw IllegalStateException(
                "Invalid typeTag for boolean: $typeTag"
              )
            }
          },
          encode = {
            typeTag(
              if (value) "T"
              else "F"
            )
          }
        )

        val LIST_OF_INTS = listConverter(
          elementTypeTag = 'i',
          elementReader = { int() },
          elementWriter = { int(value) }
        )

        val LIST_OF_FLOATS = listConverter(
          elementTypeTag = 'f',
          elementReader = { float() },
          elementWriter = { float(value) }
        )

        val LIST_OF_DOUBLES = listConverter(
          elementTypeTag = 'd',
          elementReader = { float() },
          elementWriter = { float(value) }
        )

        val LIST_OF_STRINGS = listConverter(
          elementTypeTag = 's',
          elementReader = { string() },
          elementWriter = { string(value) }
        )

        private inline fun <reified T> listConverter(
          elementTypeTag: Char,
          crossinline elementReader: Reader.() -> T,
          crossinline elementWriter: Writer<T>.() -> Unit
        ): Converter<List<T>> {
          return Converter(
            decode = {
              if (typeTag.any { it != elementTypeTag }) {
                throw Error(
                  "Cannot decode List<${typeOf<T>()}>, typeTag must consists " +
                      "of '$elementTypeTag' characters only, but was: $typeTag"
                )
              }
              typeTag.map {
                elementReader(this)
              }
            },
            encode = {
              typeTag(CharArray(value.size) { elementTypeTag }.concatToString())
              value.forEach {
                elementWriter(Writer(it, output))
              }
            }
          )
        }

      }

    }

  }

  data class Bundle(
    val packets: List<Packet> // TODO list of packets or list of messages?
  ) : Packet

  fun output(build: Output.Builder.() -> Unit): Output

  val outputs: List<Output>

  val logger: KLogger

  fun read(remoteAddress: String, packet: Input): Message<*> {
    val address = packet.readOscString()

    val maybeConverter = conversionMap[address]
    if (maybeConverter == null) {
      logger.error {
        "No conversion registered for address: $address"
      }
      null
    } else {
      @Suppress("UNCHECKED_CAST")
      val converter = maybeConverter as Osc.Message.Converter<Any>

      packet.discardUntilDelimiter(COMMA_BYTE)

      val typeTag = packet.readOscString().removePrefix(",")
      if (converter.typeTag != null && converter.typeTag != typeTag) {
        logger.error {
          "Invalid typeTag, expected: ${converter.typeTag}, was: $typeTag"
        }
      }

      val reader = Osc.Message.Reader(typeTag, packet)
      val value = converter.decode(reader)

      logger.trace {
        "OSC Message IN: udp:$remoteAddress -> $address=$value"
      }

      Osc.Message(
        address = address,
        value = value,
        hostname = remoteAddress.hostname,
        port = remoteAddress.port
      )
    }
  }

}

const val COMMA_BYTE = ','.code.toByte()

// TODO it seems to belong to implementation more
fun Input.readOscString(): String = buildPacket {
  readUntilDelimiter(0, this)
  val padding = 4 - ((size) % 4)
  discard(padding)
}.readText()

fun Output.writeOscTypeTag(tag: String) {
  require(tag.isNotBlank()) {
    "tag cannot be blank"
  }
  writeByte(COMMA_BYTE)
  writeText(tag)
  val padding = 4 - ((tag.length + 1) % 4)
  writeZeros(count = padding)
}

fun Output.writeOscString(value: String) {
  writeText(value)
  val padding = 4 - ((value.length) % 4)
  writeZeros(padding)
}

fun Output.writeZeros(
  count: Int
) = fill(times = count.toLong(), 0)

I agree that my kind of routing is sort of next layer out. In my own code, it's in a separate router class, while the lower level socket class just exposes a flow of messages. I kept it superficially similar to this library in case I wanted to swap out the backend of it but keep the routing stuff. There's a lot of reflection garbage to do just to support lambdas like I'm doing.

I also realised perhaps a bit too late that sometimes I will want to forward entire sub-branches to another destination without looking at the contents, and that's something my router completely misses.

I think I will park my project for a while anyway, I was looking at it for VRChat automation and my VR setup is still very much inaccessible.