attaswift/GlueKit

Observing every element of a collection is too expensive

Closed this issue · 1 comments

Composite observables often need to observe many dependencies at once; e.g. array.flatMap { $0.observableField } needs to observe not only the array itself, but also the specified observable field of each of its individual elements. Currently, each individual observation requires at least five allocations:

  • The subscription sink itself is always a closure that notifies the dependent observable; it is heap-allocated (allocation no. 1).
  • The Connection disposable that controls the lifetime of the subscription is a heap-allocated object (allocation no. 2).
  • Connection holds an array of disconnection callbacks and a lock:
    • The lock is of a class type (allocation no. 3).
    • The array's storage is heap allocated (allocation no. 4).
    • There is always at least one disconnection callback; it's a closure with a strong reference to the associated signal. It's heap-allocated (allocation no. 5).

This seems incredibly wasteful, and it should be changed to minimize the number of allocations. Ideally, we'd be able to subscribe to a set of observables with no per-subscription allocations.

Notes about fixing this

  • SourceType should not be defined in terms of the type-lifted Sink struct; its connect function should be generic, so that we can use sinks that don't contain closures:

    public protocol SourceType {
       associatedtype SourceValue
       func connect<S: SinkType>(_ sink: S) -> Connection where S.SinkValue == SourceValue
    }
    struct FooSink {  // A sink that doesn't need anything allocated
      let target: Foo
      func receive(_ value: Int) { target.doSomething(value) }
    }

    (I think most aggregate sinks will fit the space reserved for such generic parameters without the allocation of a box.)

  • We could convert Connection into a class hierarchy, and move the responsibility of storing the sinks into it. I.e., define

    class ConcreteConnection<Source: SourceType, Sink: SinkType>: BaseConnection, SinkType
    where Source.SourceValue == Sink.SinkValue {
      typealias Value = Source.SourceValue
      var source: Source
      var sink: Sink
    
      deinit { disconnect() }
      func disconnect() { source?.disconnect(self); source = nil; sink = nil }
      func receive(_ value: Value) { sink?.receive(value) }
    }

    Connection is currently a mish-mash of various hooks, bells and whistles (RefListElement, locking, additional callbacks, etc.). These should be ripped out and/or replaced by specific subclasses, as needed.

    In this approach, sources wouldn't create the connections any more — the connection would be created outside the source, then given to it using new API. (In this scheme, Connection should probably be renamed Sink, replacing the existing sink concept.)

  • Ideally, Connection would be a protocol so that structs could also implement it. However, the protocol must not have associated types, so that it has existentials that can be stored in a collection. Therefore, it cannot contain the receive function above. However, the connection has a reference to the source and knows all types involved, so we could use it to define a workaround:

    public protocol SourceType {
       associatedtype SourceValue
       func register(_ connection: Connection)
       func unregister(_ connection: Connection)
       func _getValue() -> SourceValue // Only callable from Connection.receiveValue()
    }
    protocol Connection {
      func disconnect()
      func receiveValue() // Calls source._getValue() to get the value, and sends it to the sink
    }

    (Hopefully there is a way to make this less ugly.) This approach means that a single connection may not subscribe to more than one source at a time -- but that's not really a limitation, since instantiating a new collection gets much cheaper.

  • A binary observable operator like + could then simply observe its dependencies without any allocations:

    struct LeftSink<Source: SourceType, Value: IntegerArithmetic>: SinkType, Connection 
    where Source.SourceValue == SimpleChange<Value> {
      let source: Source
      let target: BinaryCompositeObservable<Value>
      func receive(_ change: SimpleChange<Value>) { target.applyLeft(change) }
    }
    // struct RightSink is defined similarly.
    
    class BinaryCompositeObservable: ... {
      init<O: ObservableType>(left: O, right: O, combinator: @escaping (Value, Value) -> Value) {
         
         left.changes.register(LeftSink(source: left.changes, target: self))
         right.changes.register(RightSink(source: left.changes, target: self))
      }
      deinit {
          left.unregister(LeftSink(source: left.changes, target: self))
          right.unregister(RightSink(source: left.changes, target: self))
      }
    }
  • All of this should be done so that the closure-based API remains unchanged:

    let connection = signal.connect { value in print(value) }
    signal.send(2)
    signal.send(3)
    connection.disconnect()

    The compatibility definition of connect above would simply create a Connection/Sink containing the specified closure, register it to the source, and return it.

After some cooperative planning on Twitter (thanks!), work on this is now progressing on the new-signals branch.

Sources, Sinks and Signals have already been refactored to have a much more pleasant API that's also much faster.