Adding Subjects
chfischerx opened this issue · 0 comments
Is your feature request related to a problem? Please describe.
The ReactiveX "standard" defines various types of subjects to allow multiple subscribers observe an observable (see https://reactivex.io/documentation/subject.html). While the same can be achieved with a connectable Observable, there are some missing features. Most important, dynamically adding and removing subscribers. A RxGo connectable Observable allows adding multiple subscribers, and each will receive a copy of all items. But once added, subscribers cannot be removed (or unsubscribe).
Subjects are useful when trying to emulate an in-memory message broker. A typical example is creating a fan-out of a single data source to multiple clients within a web server using e.g. WebSockets. Every new WS connection will be added as a Subject subscriber. Once the client disconnects the subscription must be removed.
Describe the solution you'd like
I suggest three Subject types:
- Subject ... simple Fanout with unsubscribe
- BehaviorSubject ... Subject which replays the last received items to new subscribers
- ReplaySubject ... Subject which replays the last n received items to new subscribers
Suggested Subject API:
type ISubject interface {
Subscribe() (Subscription, rxgo.Observable)
Next(value any)
Error(err error)
Complete()
}
And the Subscription API:
type Subscription interface {
Unsubscribe()
}
Additional context
I already created all of the above in a private repo based on RxGo Observables. I would be glad to contribute (and of course maintain) my code into this project if there is interest.