Code-Sharp/WampSharp

Publishing to topic in .SubscriptionAdded seems to be impossible?

aaaaaaaaargh opened this issue · 6 comments

Hey, I'll try to keep it short, so bear with me. I'm having the following scenario: I want to publish something to a subject as soon as the remote side has subscribed to the topic. This code demonstrates what I'm trying to achieve:

// this is on router side
var topic = realm.TopicContainer.CreateTopicByUri(topicUri, false);
topic.SubscriptionAdded += (sender, args) => 
{
    realm.TopicContainer.Publish(WampObjectFormatter.Value, new PublishOptions(), topicUrl), new object[]{...});
}

However, after investigating why the object is not being received on subscriber side it turns out it is impossible to do this in the event listener due to the following implementation in WampRawTopic:

public void Subscribe(ISubscribeRequest<TMessage> request, SubscribeOptions options)
    {
      RemoteWampTopicSubscriber subscriber = new RemoteWampTopicSubscriber(this.SubscriptionId, (IWampSubscriber) request.Client);
      WampRawTopic<TMessage>.RemoteObserver remoteObserver = this.mSubscriberBook.Subscribe(request.Client);
      if (!remoteObserver.IsOpen)
        this.RaiseSubscriptionAdding(subscriber, options);
      request.Subscribed(this.SubscriptionId);
      if (remoteObserver.IsOpen)
        return;
      this.RaiseSubscriptionAdded(subscriber, options); // <--- this is where the code above is executed
      remoteObserver.Open(); <-- this is where the observer is opened
    }

So, accoring to this method (also WampRawTopic) any published message is discarded unless the remote observer has been opened:

public void Message(WampMessage<object> message)
{
  if (!this.IsOpen)
    return; // <-- this is what's happening in my case
  this.mClient.Message(message);
}

In conclusion seems to be impossible to publish messages when the subscription has been opened. Is there any recommended way of doing this? I'm thinking about delaying the execution with a deferred Task but that feels really dirty and not very good in terms of perfomance. Do you have any suggestions how this could be solved?

Thank in advance!

darkl commented

Thanks for the quick response. Unfortunately when it comes to calling Subscriber.Event I am totally lost at this point. I don't have access to mBinding so I'm unable to create the serialized message payload.

It would be way more convenient to be able to create the event via Publish. May I suggest another event maybe? For example, there could be a post subscription event that is called after Open(). It could be named .SubscriptionInitialized or something comparable. This way it would still be possible to keep the current behaviour but allow both sides to send the first message. Also I think implementation would be very straight-forward and nonintrusive. What are your thoughts on that?

darkl commented

Well, I'd be happy to implement it the way you suggested (although a dedicated event would be far more convenient), but I'm really struggling with constructing the event here. It is not so easy right now to dig into the implementation and understand what needs to be done here. So to simplify the question:

How would I transform this call

realm.TopicContainer.Publish(WampObjectFormatter.Value,new PublishOptions(), topicUri, new object[] { payload }

into a respective call to Subscriber.Event ? I seem to be missing so many things here. How do I get an EventDetails instance? The PublishOptionsExtensions interface is internal, so no chance of calling the static getter. Also I don't have access to mOptions. That's what I meant by another event delegate wouldn't hurt here but maybe I'm just doing things wrong here. Can you give some kind of advice on how to do it please?

By the way: For the time being I've had some success with a wrapped Task.Run(() => ...) call but that just doesn't feel like a good solution by any means.

darkl commented

Something like this should work. I don't understand why you wrote you need mBinding.

private static readonly WampIdGenerator mWampIdGenerator = new WampIdGenerator();

// ...
static void Main(string[] args)
{
    WampHost host = new WampHost();
    // ...

    IWampTopic myTopic = host.RealmContainer.GetRealmByName("realm1")
                         .TopicContainer.GetOrCreateTopicByUri("mytopic");

    myTopic.SubscriptionAdded +=
        (sender, eventArgs) =>
            eventArgs.Subscriber.Event(mWampIdGenerator.Generate(),
                                       new EventDetails(),
                                       new object[] {"foo", 3, "bar"});
}

I see. That is indeed an easy to reproduce solution. I was looking up the implementations of various methods involved in the call of the Publish() method and at one point mBinding is being used like so:

WampMessage<object> rawMessage = this.mBinding.GetRawMessage(message);

So I concluded I needed it to create the message object that is being passed to Message(). Luckily it seems I was on the wrong track here.

I will test your example tomorrow, can't wait to get this implemented in a more decent way. Thank you very much for the support!