Disruptor is an (almost) pure-Swift implementation of the inter-thread messaging library & concurrent design pattern of the same name. It aims to reach feature parity with LMAX's reference implementation.
Have a read of the LMAX literature if you're not familiar with it.
Swift Disruptor uses SwiftPM as its build tool. Add the package in the usual way, first with a new dependencies
clause:
dependencies: [
.package(url: "https://github.com/tuplestream/swift-disruptor.git", from: "0.14.0")
]
then add the Disruptor
module to your target dependencies:
dependencies: [.product(name: "Disruptor", package: "swift-disruptor")]
Although you can create and manipulate ring buffer and sequencers directly, it's easier in most cases to work with the higher level Disruptor & Translator components for publishing and handling messages:
// 1) Import the Disruptor module
import Disruptor
// 2) Declare a message type we want to pass, and a factory for its
// pre-allocation in the ring buffer
final class LongEvent {
var value: Int64 = 0
}
class LongEventFactory: EventFactory {
typealias Event = LongEvent
func newInstance() -> LongEvent {
return LongEvent()
}
}
// 3) Implement an EventHandler to be run from a consuming thread:
final class LongEventHandler: EventHandler {
typealias Event = LongEvent
func onEvent(_ event: LongEvent, sequence: Int64, endOfBatch: Bool) {
print("🚀 \(event.value)")
}
}
Since the LongEvent
objects are allocated only once when we create the disruptor, we need a way of passing new values from producers. For this we need to implement an EventTranslator
:
// 4) Our translator assigns newly produced long values to a claimed LongEvent from the ring buffer
class LongEventTranslator: EventTranslator {
typealias Event = LongEvent
typealias Input = Int64
func translateTo(_ event: inout LongEvent, sequence: Int64, input: Int64) {
event.value = input
}
}
Finally we need to wire it all together- let's instantiate a disruptor (the ring buffer can be sized up pretty big, but it should fit inside L3 cache and it must be a power of 2):
// 5) Instantiate a Disruptor
let disruptor = Disruptor<LongEvent>(eventFactory: LongEventFactory(), ringBufferSize: (1 << 16), producerType: .single)
// let's just add a single consumer:
disruptor.handleEventsWith(eventHandlers: [LongEventHandler()])
// start the consumers
disruptor.start()
// publish an event
disruptor.publishEvent(LongEventTranslator(), input: 1)
Have a look at the list of issues. Bug reports (including perceived performance issues) are much appreciated.
This package uses the Swift Atomics library and needs double-word atomic support explicitly enabled on Linux. For example:
swift test -Xcc -mcx16 -Xswiftc -DENABLE_DOUBLEWIDE_ATOMICS
Benchmarks are pretty unstable right now- there's a separate target with an executable which you can invoke:
swift run -c release
Currently there's a single producer/single consumer scenario (with no batching) with both disruptor and GCD. Most disruptor implementations diverge more markedly from queues in multi producer/consumer environments so reproducing this and finding bottlenecks is a priority.