d-exclaimation/pioneer

Redis backed AsyncPubSub

Closed this issue · 5 comments

It might make sense to consider adding something analogous to the following:

https://www.apollographql.com/blog/backend/subscriptions/graphql-subscriptions-with-redis-pub-sub/

While AsyncPubSub is great for many use cases, in production where we might have multiple server instances, it may be necessary to use something like Redis instead of an in memory pubsub to handle subscriptions. It would be cool if there could be an option to use Redis when configuring Pioneer and it would be even cooler if we could use the same API as AsyncPubSub.

So it would be a simple configuration change for the developer, and then we could switch between redis and the in memory AsyncPubSub without changing our EventStream code.

Do you think this is possible? I know this might be a challenge.

It's very much possible. Pioneer can take any EventStream created from any AsyncSequence. Looking at this page of the Vapor docs, subscribing to a channel looked like

app.redis.subscribe(
  to: "channel_1", "channel_2",
  messageReceiver: { channel, message in
    switch channel {
    case "channel_1": // do something with the message
    default: break
    }
  },
  onUnsubscribe: { channel, subscriptionCount in
    print("unsubscribed from \(channel)")
    print("subscriptions remaining: \(subscriptionCount)")
  })

which is easy to translate to AsyncStream.

AsyncStream { con in
    app.redis.subscribe(
        to: ...,
        messageReceiver: { _, message in
            con.yield(message)
        },
        onUnsubscribe: { _, _ in
            con.finish()
        })
    
    ...
}

I think implementing something from that which have similar API to AsyncPubSub should be relatively straightforward. All you need to is to make a struct, wrap that code above into a method for the struct named asyncStream, and wrap the RediStack publish method into a method as well named publish.

The only problem is that I don't think it belong here. It make more sense as a separate package.

AsyncPubSub is provided in this library because I wanted to provide at least a basic in memory solution for Pub/Sub that can be used to start with immediately without extra setup.

I might try to make a separate package for this and maybe make AsyncPubSubBase protocol so that you can build something that will have similar API to AsyncPubSub, but I can't make any guarantees for that.

I agree. I think this approach makes a lot of sense.

I might try to make a separate package for this and maybe make AsyncPubSubBase protocol so that you can build something that will have similar API to AsyncPubSub, but I can't make any guarantees for that.

I have a working solutions for the public protocol from AsyncPubSub so that you can build a custom pubsub implementation with similar API to AsyncPubSub. More details on the changes is on #45

The changes are not final yet, I might change my mind on it to make it simpler. Would appreciate your thoughts on it.

I think the PubSub protocol should be available in v0.8.0. Theres should also be part of documentation that explains a bit more about creating a custom PubSub implementation.