[core] GcsSubscriber only supports 1 subscription per key
Opened this issue · 0 comments
rynewang commented
What happened + What you expected to happen
If you call ray::pubsub::Subscriber::Subscribe
multiple times with same publisher_address
and key_id
, only one of the subscriber callbacks are invoked. This is because we index callbacks by (publisher_id, key_id)
to a single SubscriptionInfo here:
ray/src/ray/pubsub/subscriber.cc
Lines 37 to 41 in 6ae3f8c
I wonder if this is intentional. In my view, we should either support multiple callbacks, or to fail a subsequent Subscribe
call.
Versions / Dependencies
master
Reproduction script
This fails:
TEST_F(SubscriberTest, TestMultipleSubscriptionsOnSameKey) {
std::unordered_map<ObjectID, int64_t> object_subscribed_1, object_subscribed_2;
auto subscription_callback1 = [&](const rpc::PubMessage &msg) {
object_subscribed_1[ObjectID::FromBinary(msg.key_id())]++;
};
auto subscription_callback2 = [&](const rpc::PubMessage &msg) {
object_subscribed_2[ObjectID::FromBinary(msg.key_id())]++;
};
auto failure_callback = EMPTY_FAILURE_CALLBACK;
const auto owner_addr = GenerateOwnerAddress();
const auto object_id = ObjectID::FromRandom();
ASSERT_FALSE(subscriber_->Unsubscribe(channel, owner_addr, object_id.Binary()));
ASSERT_TRUE(owner_client->ReplyCommandBatch());
subscriber_->Subscribe(GenerateSubMessage(object_id),
channel,
owner_addr,
object_id.Binary(),
/*subscribe_done_callback=*/nullptr,
subscription_callback1,
failure_callback);
subscriber_->Subscribe(GenerateSubMessage(object_id),
channel,
owner_addr,
object_id.Binary(),
/*subscribe_done_callback=*/nullptr,
subscription_callback2,
failure_callback);
ASSERT_TRUE(owner_client->ReplyCommandBatch());
ASSERT_TRUE(subscriber_->IsSubscribed(channel, owner_addr, object_id.Binary()));
std::vector<ObjectID> objects_batched;
objects_batched.push_back(object_id);
ASSERT_TRUE(ReplyLongPolling(channel, objects_batched));
// Make sure the long polling batch works as expected.
for (const auto &object_id : objects_batched) {
ASSERT_TRUE(object_subscribed_1[object_id] == 1);
ASSERT_TRUE(object_subscribed_2[object_id] == 1);
}
// Publish the objects again, and subscriber should receive it.
ASSERT_TRUE(ReplyLongPolling(channel, objects_batched));
for (const auto &object_id : objects_batched) {
ASSERT_TRUE(object_subscribed_[object_id] == 2);
ASSERT_TRUE(object_subscribed_2[object_id] == 2);
}
ASSERT_TRUE(subscriber_->Unsubscribe(channel, owner_addr, object_id.Binary()));
ASSERT_TRUE(owner_client->ReplyCommandBatch());
ASSERT_FALSE(subscriber_->IsSubscribed(channel, owner_addr, object_id.Binary()));
// Here, once the long polling request is replied, the metadata is cleaned up.
ASSERT_TRUE(ReplyLongPolling(channel, objects_batched));
// ASSERT_TRUE(subscriber_->CheckNoLeaks());
}
Issue Severity
Medium: It is a significant difficulty but I can work around it.