Observing every element of a collection is too expensive
Closed this issue · 1 comments
Composite observables often need to observe many dependencies at once; e.g. array.flatMap { $0.observableField }
needs to observe not only the array itself, but also the specified observable field of each of its individual elements. Currently, each individual observation requires at least five allocations:
- The subscription sink itself is always a closure that notifies the dependent observable; it is heap-allocated (allocation no. 1).
- The
Connection
disposable that controls the lifetime of the subscription is a heap-allocated object (allocation no. 2). Connection
holds an array of disconnection callbacks and a lock:- The lock is of a class type (allocation no. 3).
- The array's storage is heap allocated (allocation no. 4).
- There is always at least one disconnection callback; it's a closure with a strong reference to the associated signal. It's heap-allocated (allocation no. 5).
This seems incredibly wasteful, and it should be changed to minimize the number of allocations. Ideally, we'd be able to subscribe to a set of observables with no per-subscription allocations.
Notes about fixing this
-
SourceType
should not be defined in terms of the type-liftedSink
struct; its connect function should be generic, so that we can use sinks that don't contain closures:public protocol SourceType { associatedtype SourceValue func connect<S: SinkType>(_ sink: S) -> Connection where S.SinkValue == SourceValue } struct FooSink { // A sink that doesn't need anything allocated let target: Foo func receive(_ value: Int) { target.doSomething(value) } }
(I think most aggregate sinks will fit the space reserved for such generic parameters without the allocation of a box.)
-
We could convert
Connection
into a class hierarchy, and move the responsibility of storing the sinks into it. I.e., defineclass ConcreteConnection<Source: SourceType, Sink: SinkType>: BaseConnection, SinkType where Source.SourceValue == Sink.SinkValue { typealias Value = Source.SourceValue var source: Source var sink: Sink deinit { disconnect() } func disconnect() { source?.disconnect(self); source = nil; sink = nil } func receive(_ value: Value) { sink?.receive(value) } }
Connection
is currently a mish-mash of various hooks, bells and whistles (RefListElement
, locking, additional callbacks, etc.). These should be ripped out and/or replaced by specific subclasses, as needed.In this approach, sources wouldn't create the connections any more — the connection would be created outside the source, then given to it using new API. (In this scheme,
Connection
should probably be renamedSink
, replacing the existing sink concept.) -
Ideally,
Connection
would be a protocol so that structs could also implement it. However, the protocol must not have associated types, so that it has existentials that can be stored in a collection. Therefore, it cannot contain thereceive
function above. However, the connection has a reference to the source and knows all types involved, so we could use it to define a workaround:public protocol SourceType { associatedtype SourceValue func register(_ connection: Connection) func unregister(_ connection: Connection) func _getValue() -> SourceValue // Only callable from Connection.receiveValue() } protocol Connection { func disconnect() func receiveValue() // Calls source._getValue() to get the value, and sends it to the sink }
(Hopefully there is a way to make this less ugly.) This approach means that a single connection may not subscribe to more than one source at a time -- but that's not really a limitation, since instantiating a new collection gets much cheaper.
-
A binary observable operator like
+
could then simply observe its dependencies without any allocations:struct LeftSink<Source: SourceType, Value: IntegerArithmetic>: SinkType, Connection where Source.SourceValue == SimpleChange<Value> { let source: Source let target: BinaryCompositeObservable<Value> func receive(_ change: SimpleChange<Value>) { target.applyLeft(change) } } // struct RightSink is defined similarly. class BinaryCompositeObservable: ... { init<O: ObservableType>(left: O, right: O, combinator: @escaping (Value, Value) -> Value) { … left.changes.register(LeftSink(source: left.changes, target: self)) right.changes.register(RightSink(source: left.changes, target: self)) } deinit { left.unregister(LeftSink(source: left.changes, target: self)) right.unregister(RightSink(source: left.changes, target: self)) } }
-
All of this should be done so that the closure-based API remains unchanged:
let connection = signal.connect { value in print(value) } signal.send(2) signal.send(3) connection.disconnect()
The compatibility definition of
connect
above would simply create aConnection/Sink
containing the specified closure, register it to the source, and return it.
After some cooperative planning on Twitter (thanks!), work on this is now progressing on the new-signals
branch.
Sources, Sinks and Signals have already been refactored to have a much more pleasant API that's also much faster.