Events channel, tag field
materkov opened this issue · 6 comments
It seems that tag
field is always equal to 1
for messages in events channel:
1|Tue, 23 May 2017 10:28:04 GMT|{"type": "client_subscribed", "channel": "user_conn_track.user-1"}
1|Tue, 23 May 2017 10:28:04 GMT|{"type": "client_unsubscribed", "channel": "user_conn_track.user-1"}
1|Tue, 23 May 2017 10:28:04 GMT|{"type": "client_subscribed", "channel": "user_conn_track.user-1"}
1|Tue, 23 May 2017 10:28:05 GMT|{"type": "client_unsubscribed", "channel": "user_conn_track.user-1"}
So, there is a small possibility to receive duplicates messages, when retrieving old message from history channel.
Is this a bug? Any workaround for this problem?
Not sure if I understood your question @materkov . Which tag
are you talking about?
The events channel should kind behave like any other channel from the perspective of publishing messages. So it increases a sequence humber when the messages are published on the same second. Where are you seeing equal tags
?
@wandenberg
I'll try to explain more. Here is config that I am using:
push_stream_shared_memory_size 128M;
push_stream_events_channel_id connection_events_channel;
push_stream_ping_message_text "ping";
push_stream_message_template "{\"id\": ~id~, \"eventid\": ~event-id~, \"channel\": \"~channel~\", \"text\": ~text~, \"tag\": ~tag~, \"time\": \"~time~\"}";
push_stream_store_messages on;
server {
listen 80 default_server;
location /pub {
# activate publisher (admin) mode for this location
push_stream_publisher admin;
# query string based channel id
push_stream_channels_path $arg_id;
}
location ~ /sub_ws/(.*) {
push_stream_subscriber websocket;
push_stream_last_received_message_time "$arg_time";
push_stream_last_received_message_tag "$arg_tag";
# positional channel path
push_stream_channels_path $1;
}
location ~ /sub_events_channel {
push_stream_allow_connections_to_events_channel on;
push_stream_subscriber websocket;
push_stream_ping_message_interval 2s;
push_stream_last_received_message_time $arg_time;
push_stream_last_received_message_tag $arg_tag;
# positional channel path
push_stream_channels_path connection_events_channel;
}
}
Then, I am sending 4 messages very quickly:
curl http://10.1.2.6:8200/pub?id=12 -d "dsad"
In normal channel, everything seems to be OK:
ws://10.1.2.6:8200/sub/11/12
{"id": 11, "eventid": , "channel": "12", "text": dsad, "tag": 1, "time": "Tue, 23 May 2017 12:41:44 GMT"}
{"id": 12, "eventid": , "channel": "12", "text": dsad, "tag": 2, "time": "Tue, 23 May 2017 12:41:44 GMT"}
{"id": 13, "eventid": , "channel": "12", "text": dsad, "tag": 3, "time": "Tue, 23 May 2017 12:41:44 GMT"}
{"id": 14, "eventid": , "channel": "12", "text": dsad, "tag": 4, "time": "Tue, 23 May 2017 12:41:44 GMT"}
Tags are different here.
Then, I've tried to quickly connect and disconnect websocket connection. Here is what I see in events channel:
{"id": 47, "eventid": , "channel": "connection_events_channel", "text": {"type": "client_unsubscribed", "channel": "11"}, "tag": 1, "time": "Tue, 23 May 2017 12:44:41 GMT"}
{"id": 48, "eventid": , "channel": "connection_events_channel", "text": {"type": "client_unsubscribed", "channel": "12"}, "tag": 1, "time": "Tue, 23 May 2017 12:44:41 GMT"}
{"id": 49, "eventid": , "channel": "connection_events_channel", "text": {"type": "client_subscribed", "channel": "11"}, "tag": 1, "time": "Tue, 23 May 2017 12:44:41 GMT"}
{"id": 50, "eventid": , "channel": "connection_events_channel", "text": {"type": "client_subscribed", "channel": "12"}, "tag": 1, "time": "Tue, 23 May 2017 12:44:41 GMT"}
{"id": 51, "eventid": , "channel": "connection_events_channel", "text": {"type": "client_unsubscribed", "channel": "11"}, "tag": 1, "time": "Tue, 23 May 2017 12:44:41 GMT"}
{"id": 52, "eventid": , "channel": "connection_events_channel", "text": {"type": "client_unsubscribed", "channel": "12"}, "tag": 1, "time": "Tue, 23 May 2017 12:44:41 GMT"}
{"id": 53, "eventid": , "channel": "connection_events_channel", "text": {"type": "client_subscribed", "channel": "11"}, "tag": 1, "time": "Tue, 23 May 2017 12:44:41 GMT"}
{"id": 54, "eventid": , "channel": "connection_events_channel", "text": {"type": "client_subscribed", "channel": "12"}, "tag": 1, "time": "Tue, 23 May 2017 12:44:41 GMT"}
{"id": 55, "eventid": , "channel": "connection_events_channel", "text": {"type": "client_unsubscribed", "channel": "11"}, "tag": 1, "time": "Tue, 23 May 2017 12:44:41 GMT"}
{"id": 56, "eventid": , "channel": "connection_events_channel", "text": {"type": "client_unsubscribed", "channel": "12"}, "tag": 1, "time": "Tue, 23 May 2017 12:44:41 GMT"}
As you can see, tag is 1
for every message (but it should be different, because time is Tue, 23 May 2017 12:44:41 GMT
for every message)
I will check this. Should not work like that.
Besides that this channel was not created to be used with history.
Thank you. I am looking forward to your reply.
By the way, what I am trying to do is some kind of daemon. This tiny daemon will listen connect/disconnect events and re-send them to message broker. Then, our backend will consume this messages and perform some processing (e.g. presence tracking, or some other stuff). I want to make sure that this daemon works reliable and don't loose events in periods of code deployment/restarting/or some troubles. To do this, I am tracking tag&time of the last message, and then retrieve history events when daemon connected again. If tag is 1
for every message, I am recieving all messages for this second (except first message) in next reconnect.
@materkov sorry for the delay. Can you give it a try on this branch fix_tag_in_concurrent_messages
?
Thank you. I can confirm that problem is solved in this branch.
Just one note, there was an error:
In file included from ../nginx-push-stream-module/src/ngx_http_push_stream_module.c:29:0:
../nginx-push-stream-module/src/ngx_http_push_stream_module_utils.c: In function 'ngx_http_push_stream_convert_char_to_msg_on_shared':
../nginx-push-stream-module/src/ngx_http_push_stream_module_utils.c:273:48: error: unused variable 'shm_data' [-Werror=unused-variable]
ngx_http_push_stream_shm_data_t *shm_data = mcf->shm_data;
^
cc1: all warnings being treated as errors
make[1]: *** [objs/addon/src/ngx_http_push_stream_module.o] Error 1
After removing this line, build was successful.