ray-project/ray

[core] GcsSubscriber only supports 1 subscription per key

Opened this issue · 0 comments

What happened + What you expected to happen

If you call ray::pubsub::Subscriber::Subscribe multiple times with same publisher_address and key_id, only one of the subscriber callbacks are invoked. This is because we index callbacks by (publisher_id, key_id) to a single SubscriptionInfo here:

return subscription_map_[publisher_id]
.per_entity_subscription
.try_emplace(*key_id,
SubscriptionInfo(std::move(subscription_callback),
std::move(subscription_failure_callback)))

I wonder if this is intentional. In my view, we should either support multiple callbacks, or to fail a subsequent Subscribe call.

Versions / Dependencies

master

Reproduction script

This fails:

TEST_F(SubscriberTest, TestMultipleSubscriptionsOnSameKey) {
  std::unordered_map<ObjectID, int64_t> object_subscribed_1, object_subscribed_2;
  auto subscription_callback1 = [&](const rpc::PubMessage &msg) {
    object_subscribed_1[ObjectID::FromBinary(msg.key_id())]++;
  };
  auto subscription_callback2 = [&](const rpc::PubMessage &msg) {
    object_subscribed_2[ObjectID::FromBinary(msg.key_id())]++;
  };
  auto failure_callback = EMPTY_FAILURE_CALLBACK;

  const auto owner_addr = GenerateOwnerAddress();
  const auto object_id = ObjectID::FromRandom();
  ASSERT_FALSE(subscriber_->Unsubscribe(channel, owner_addr, object_id.Binary()));
  ASSERT_TRUE(owner_client->ReplyCommandBatch());
  subscriber_->Subscribe(GenerateSubMessage(object_id),
                         channel,
                         owner_addr,
                         object_id.Binary(),
                         /*subscribe_done_callback=*/nullptr,
                         subscription_callback1,
                         failure_callback);
  subscriber_->Subscribe(GenerateSubMessage(object_id),
                         channel,
                         owner_addr,
                         object_id.Binary(),
                         /*subscribe_done_callback=*/nullptr,
                         subscription_callback2,
                         failure_callback);
  ASSERT_TRUE(owner_client->ReplyCommandBatch());
  ASSERT_TRUE(subscriber_->IsSubscribed(channel, owner_addr, object_id.Binary()));

  std::vector<ObjectID> objects_batched;
  objects_batched.push_back(object_id);
  ASSERT_TRUE(ReplyLongPolling(channel, objects_batched));
  // Make sure the long polling batch works as expected.
  for (const auto &object_id : objects_batched) {
    ASSERT_TRUE(object_subscribed_1[object_id] == 1);
    ASSERT_TRUE(object_subscribed_2[object_id] == 1);
  }

  // Publish the objects again, and subscriber should receive it.
  ASSERT_TRUE(ReplyLongPolling(channel, objects_batched));
  for (const auto &object_id : objects_batched) {
    ASSERT_TRUE(object_subscribed_[object_id] == 2);
    ASSERT_TRUE(object_subscribed_2[object_id] == 2);
  }

  ASSERT_TRUE(subscriber_->Unsubscribe(channel, owner_addr, object_id.Binary()));
  ASSERT_TRUE(owner_client->ReplyCommandBatch());
  ASSERT_FALSE(subscriber_->IsSubscribed(channel, owner_addr, object_id.Binary()));

  // Here, once the long polling request is replied, the metadata is cleaned up.
  ASSERT_TRUE(ReplyLongPolling(channel, objects_batched));
  // ASSERT_TRUE(subscriber_->CheckNoLeaks());
}

Issue Severity

Medium: It is a significant difficulty but I can work around it.