r3labs/sse

Allow overrriding `OnSubscribe` and `OnUnsubscribe` functions per Stream.

Opened this issue · 0 comments

Goal

While the Server supports initializing the server with global OnSubscribe and OnUnsubscribe callbacks, this applies to all streams. Allowing customized behavior for these events on a per-stream basis provides greater flexibility.

Problem

The Server contains only an unexported getStream method, so the consumer of this library cannot set the OnSubscribe callback on the Stream directly. This seems like a sensible choice, especially since access to resources on the Server requires synchronization via a sync.Mutex.

However the programmer can still kind of get around this by doing doing the following:

stream := server.CreateStream(stream)    // only creates the stream if it doesn't already exist
stream.OnSubscribe = customSubscribeFunc // not atomic, since CreateStream uses a mutex

While this kind of works, it seems like this was not the intent of the API and seems unsafe to rely on.

Proposed Solution

Export a method on the Server instance that allows customizing the callback per stream, for example:

func (s *Server) OverrideOnSubscribeCallbackForStream(streamID string, callback func(string, *Subscriber)) {
    s.mu.Lock()
    defer s.mu.Unlock()
    s.Streams[streamID].OnSubscribe = callback
}

Example use case:

s.OverrideOnSubscribeCallback(stream, myCustomStreamFunc)