Allow overrriding `OnSubscribe` and `OnUnsubscribe` functions per Stream.
Opened this issue · 0 comments
Goal
While the Server
supports initializing the server with global OnSubscribe
and OnUnsubscribe
callbacks, this applies to all streams. Allowing customized behavior for these events on a per-stream basis provides greater flexibility.
Problem
The Server
contains only an unexported getStream
method, so the consumer of this library cannot set the OnSubscribe
callback on the Stream
directly. This seems like a sensible choice, especially since access to resources on the Server
requires synchronization via a sync.Mutex
.
However the programmer can still kind of get around this by doing doing the following:
stream := server.CreateStream(stream) // only creates the stream if it doesn't already exist
stream.OnSubscribe = customSubscribeFunc // not atomic, since CreateStream uses a mutex
While this kind of works, it seems like this was not the intent of the API and seems unsafe to rely on.
Proposed Solution
Export a method on the Server
instance that allows customizing the callback per stream, for example:
func (s *Server) OverrideOnSubscribeCallbackForStream(streamID string, callback func(string, *Subscriber)) {
s.mu.Lock()
defer s.mu.Unlock()
s.Streams[streamID].OnSubscribe = callback
}
Example use case:
s.OverrideOnSubscribeCallback(stream, myCustomStreamFunc)