Calling Unsubscribe from a channel is still receiving update messages
GrimLothar opened this issue · 4 comments
I have these 2 methods:
public async void SubscribeToTournamentChanges(int tournament_id, TournamentChangedHandler handler)
{
var channelName = "tournaments:" + tournament_id;
channels.TryGetValue(channelName, out RealtimeChannel channel);
if (channel == null)
{
channel = _supabase.Realtime.Channel(channelName);
channels.Add(channelName, channel);
channel.Register(new PostgresChangesOptions("public", "aw-tournaments", ListenType.Updates, "id=eq." + tournament_id ));
channel.AddPostgresChangeHandler(ListenType.Updates, (_, change) =>
{
if (change.Payload?.Data?.Table == "aw-tournaments")
{
//Tournament data changed
var model = change.Model<Tournament>();
if (model != null)
{
_mainThreadContext.Post(_ => handler.Invoke(model), null);
}
}
});
}
await channel.Subscribe();
}
public void UnsubscribeToTournamentChanges(int tournament_id)
{
var channelName = "tournaments:" + tournament_id;
channels.TryGetValue(channelName, out RealtimeChannel channel);
if (channel != null)
{
channel.Unsubscribe();
}
}
When I call channel.Unsubscribe();
i see in the logs that something gets changed:
Socket Push [topic: realtime:tournaments:5, event: phx_leave, ref: 5505e55a-d2d9-484d-8d98-34fc9d3c3d9d]:
null
UnityEngine.Debug:Log (object)
SupabaseManager/<>c:<Start>b__9_0 (object,string,System.Exception) (at Assets/TcgEngine/SupabaseManager.cs:60)
Supabase.Realtime.Debugger:Log (object,string,System.Exception)
Supabase.Realtime.RealtimeSocket:Push (Supabase.Realtime.Socket.SocketRequest)
Supabase.Realtime.Channel.Push:Send ()
Supabase.Realtime.RealtimeChannel:Unsubscribe ()
SupabaseManager:UnsubscribeToTournamentChanges (int) (at Assets/TcgEngine/SupabaseManager.cs:222)
TcgEngine.UI.TournamentDetailsPanel:OnBack () (at Assets/TcgEngine/Scripts/UI/TournamentDetailsPanel.cs:149)
UnityEngine.Events.UnityEvent:Invoke ()
TcgEngine.UI.AWButtonNew:OnClick () (at Assets/TcgEngine/Scripts/UI/AWButtonNew.cs:34)
UnityEngine.EventSystems.EventSystem:Update () (at ./Library/PackageCache/com.unity.ugui@1.0.0/Runtime/EventSystem/EventSystem.cs:530)
But when I change my tournaments table in the DB, I still get the message. I would expect Unsubscribe to stop getting these (thus saving bandwith)
What is Unsubscribe
meant to do if not this?
Thanks!
Thanks for the issue!
To clarify, the websocket it still receiving messages? Or your callback is continuing to be called?
Thanks for the issue!
To clarify, the websocket it still receiving messages? Or your callback is continuing to be called?
Both!
@grimAgent I'm struggling to duplicate - are you sure you don't have multiple socket clients connected?
I wrote the following test in the RealtimeTests project (see the realtime-csharp repo] using a primary channel (demonstrating the Unsubscribe
functionality) and a secondary channel (different channel with same postgres_changes parameters but remaining subscribed).
It passes as expected:
[TestMethod("Channel: Unsubscribes")]
public async Task ChannelDisconnectsAppropriately()
{
var callbackInvokedTCS = new TaskCompletionSource<bool>();
var primaryClientInvocations = 0;
var secondaryClientInvocations = 0;
var channel1 = _socketClient!.Channel("test:unsubscribes");
channel1.Register(new PostgresChangesOptions("public", "todos", ListenType.Inserts, "user_id=eq.1"));
channel1.AddPostgresChangeHandler(ListenType.Inserts, (_, changes) =>
{
primaryClientInvocations++;
callbackInvokedTCS.TrySetResult(true);
});
await channel1.Subscribe();
var channel2 = _socketClient!.Channel("test:unsubscribes:2");
channel2.Register(new PostgresChangesOptions("public", "todos", ListenType.Inserts, "user_id=eq.1"));
channel2.AddPostgresChangeHandler(ListenType.Inserts, (_, changes) => secondaryClientInvocations++);
await channel2.Subscribe();
await _restClient!.Table<Todo>().Insert(new Todo { UserId = 1 });
await callbackInvokedTCS.Task;
channel1.Unsubscribe();
await _restClient!.Table<Todo>().Insert(new Todo { UserId = 1 });
await _restClient!.Table<Todo>().Insert(new Todo { UserId = 1 });
await Task.Delay(3000);
Assert.AreEqual(1, primaryClientInvocations);
Assert.AreEqual(3, secondaryClientInvocations);
}
With the following console output:
Debug Trace:
Socket Reconnection: Initial
Socket Connected to: ws://realtime-dev.localhost:4000/socket/websocket?apikey=eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpc3MiOiIiLCJpYXQiOjE2NzEyMzc4NzMsImV4cCI6MjAwMjc3Mzk5MywiYXVkIjoiIiwic3ViIjoiIiwicm9sZSI6ImF1dGhlbnRpY2F0ZWQifQ.qoYdljDZ9rjfs1DKj5_OqMweNtj7yk20LZKlGNLpUO8&vsn=1.0.0
Socket State Change: Open
Socket Push [topic: phoenix, event: heartbeat, ref: 91de2812-09d7-4fcc-8263-2834ae862423]:
{}
Socket Push [topic: realtime:test:unsubscribes, event: phx_join, ref: 23775bd6-5117-405d-bdaa-99074b798f30]:
{
"config": {
"broadcast": {
"self": false,
"ack": false
},
"presence": {
"key": ""
},
"postgres_changes": [
{
"schema": "public",
"table": "todos",
"filter": "user_id=eq.1",
"event": "INSERT"
}
]
}
}
Socket Message Received:
{"event":"phx_reply","payload":{"response":{},"status":"ok"},"ref":"91de2812-09d7-4fcc-8263-2834ae862423","topic":"phoenix"}
Socket Message Received:
{"event":"phx_reply","payload":{"response":{"postgres_changes":[{"id":48556260,"event":"INSERT","filter":"user_id=eq.1","schema":"public","table":"todos"}]},"status":"ok"},"ref":"23775bd6-5117-405d-bdaa-99074b798f30","topic":"realtime:test:unsubscribes"}
Socket Message Received:
{"event":"presence_state","payload":{},"ref":null,"topic":"realtime:test:unsubscribes"}
Socket Message Received:
{"event":"system","payload":{"channel":"test:unsubscribes","extension":"postgres_changes","message":"Subscribed to PostgreSQL","status":"ok"},"ref":null,"topic":"realtime:test:unsubscribes"}
Socket Push [topic: realtime:test:unsubscribes:2, event: phx_join, ref: 2fcb5d42-4470-4be3-8660-49b3c081dda3]:
{
"config": {
"broadcast": {
"self": false,
"ack": false
},
"presence": {
"key": ""
},
"postgres_changes": [
{
"schema": "public",
"table": "todos",
"filter": "user_id=eq.1",
"event": "INSERT"
}
]
}
}
Socket Message Received:
{"event":"phx_reply","payload":{"response":{"postgres_changes":[{"id":48556260,"event":"INSERT","filter":"user_id=eq.1","schema":"public","table":"todos"}]},"status":"ok"},"ref":"2fcb5d42-4470-4be3-8660-49b3c081dda3","topic":"realtime:test:unsubscribes:2"}
Socket Message Received:
{"event":"presence_state","payload":{},"ref":null,"topic":"realtime:test:unsubscribes:2"}
Socket Message Received:
{"event":"system","payload":{"channel":"test:unsubscribes:2","extension":"postgres_changes","message":"Subscribed to PostgreSQL","status":"ok"},"ref":null,"topic":"realtime:test:unsubscribes:2"}
Socket Message Received:
{"event":"postgres_changes","payload":{"data":{"columns":[{"name":"id","type":"int8"},{"name":"name","type":"text"},{"name":"notes","type":"text"},{"name":"done","type":"bool"},{"name":"details","type":"text"},{"name":"inserted_at","type":"timestamp"},{"name":"numbers","type":"_int4"},{"name":"user_id","type":"text"},{"name":"status","type":"todo_status"}],"commit_timestamp":"2023-11-29T03:43:19.106Z","errors":null,"record":{"details":null,"done":false,"id":25,"inserted_at":null,"name":null,"notes":null,"numbers":[],"status":"NOT STARTED","user_id":"1"},"schema":"public","table":"todos","type":"INSERT"},"ids":[48556260]},"ref":null,"topic":"realtime:test:unsubscribes:2"}
Socket Message Received:
{"event":"postgres_changes","payload":{"data":{"columns":[{"name":"id","type":"int8"},{"name":"name","type":"text"},{"name":"notes","type":"text"},{"name":"done","type":"bool"},{"name":"details","type":"text"},{"name":"inserted_at","type":"timestamp"},{"name":"numbers","type":"_int4"},{"name":"user_id","type":"text"},{"name":"status","type":"todo_status"}],"commit_timestamp":"2023-11-29T03:43:19.106Z","errors":null,"record":{"details":null,"done":false,"id":25,"inserted_at":null,"name":null,"notes":null,"numbers":[],"status":"NOT STARTED","user_id":"1"},"schema":"public","table":"todos","type":"INSERT"},"ids":[48556260]},"ref":null,"topic":"realtime:test:unsubscribes"}
Socket Push [topic: realtime:test:unsubscribes, event: phx_leave, ref: 998db8ee-771d-43f8-be6a-11d82334af56]:
null
Socket Message Received:
{"event":"phx_reply","payload":{"response":{},"status":"ok"},"ref":"998db8ee-771d-43f8-be6a-11d82334af56","topic":"realtime:test:unsubscribes"}
Socket Message Received:
{"event":"phx_close","payload":{},"ref":"23775bd6-5117-405d-bdaa-99074b798f30","topic":"realtime:test:unsubscribes"}
Socket Message Received:
{"event":"postgres_changes","payload":{"data":{"columns":[{"name":"id","type":"int8"},{"name":"name","type":"text"},{"name":"notes","type":"text"},{"name":"done","type":"bool"},{"name":"details","type":"text"},{"name":"inserted_at","type":"timestamp"},{"name":"numbers","type":"_int4"},{"name":"user_id","type":"text"},{"name":"status","type":"todo_status"}],"commit_timestamp":"2023-11-29T03:43:19.150Z","errors":null,"record":{"details":null,"done":false,"id":26,"inserted_at":null,"name":null,"notes":null,"numbers":[],"status":"NOT STARTED","user_id":"1"},"schema":"public","table":"todos","type":"INSERT"},"ids":[48556260]},"ref":null,"topic":"realtime:test:unsubscribes:2"}
Socket Message Received:
{"event":"postgres_changes","payload":{"data":{"columns":[{"name":"id","type":"int8"},{"name":"name","type":"text"},{"name":"notes","type":"text"},{"name":"done","type":"bool"},{"name":"details","type":"text"},{"name":"inserted_at","type":"timestamp"},{"name":"numbers","type":"_int4"},{"name":"user_id","type":"text"},{"name":"status","type":"todo_status"}],"commit_timestamp":"2023-11-29T03:43:19.155Z","errors":null,"record":{"details":null,"done":false,"id":27,"inserted_at":null,"name":null,"notes":null,"numbers":[],"status":"NOT STARTED","user_id":"1"},"schema":"public","table":"todos","type":"INSERT"},"ids":[48556260]},"ref":null,"topic":"realtime:test:unsubscribes:2"}
Socket Closed at 11/28/2023 9:43:22 PM
I don't think I have multiple clients connected... you can see my full implementation here: #126