Only receiving the first message with QOS = 1 and QOS = 2
Closed this issue · 7 comments
Sorry, it's me again :) - I'm using the code largely from the example:
require './lib/ruby-mqtt3'
require 'pp'
require 'async'
require 'fiber'
require 'socket'
scheduler = Async::Scheduler.new
Fiber.set_scheduler scheduler
m = Mqtt3.new(
keepalive_sec: 30,
persistence_filename: 'persist.data',
clean_session: true,
reconnect: true,
host: 'localhost',
port: '1883'
)
m.debug = true
m.on_disconnect do |reason|
m.debug "TCP disconnected #{reason}"
end
m.on_connect do |session_present|
m.debug 'on_connect'
m.subscribe [['test', 1], ['test2', 1]]
# m.invalid
puts "Session Present: #{session_present}"
# m.publish('test', 'message')
end
m.on_tcp_connect_error do |e,counter|
m.debug e.inspect + ", waiting #{counter} sec"
sleep counter
false
end
m.on_tcp_connect_error do |e,counter|
m.debug e.inspect + ", waiting #{counter} sec"
true
end
m.on_message do |topic, message, qos, packet_id|
m.debug "Incoming topic: #{topic} message: #{message} qos: #{qos} packet_id: #{packet_id}"
end
m.on_publish_finished do |packet_id|
m.debug "packet published #{packet_id}"
end
Signal.trap("INT") do
m.save
exit
end
Signal.trap("TERM") do
m.save
exit
end
m.run
scheduler.run
When I run it I see this:
$ ruby example.rb
2022.05.26 20:15:57.656 removing file persist.data
2022.05.26 20:15:57.657 TCP connected
2022.05.26 20:15:57.657 --- CONNECT flags: 0 101f00044d5154540402001e00136578616d706c652e72622d575745596e513663
2022.05.26 20:15:57.657 +++ CONNACK flags: 0 length: 2 data: 0000
2022.05.26 20:15:57.658 on_connect
2022.05.26 20:15:57.658 --- SUBSCRIBE flags: 2 82110001000474657374020005746573743202
Session Present: false
2022.05.26 20:15:57.658 +++ SUBACK flags: 0 length: 4 data: 00010202
So far so good! - I run:
$ mosquitto_pub -t test -m "test message" -q 1
I see this:
2022.05.26 20:16:18.883 +++ PUBLISH flags: 2 length: 20 data: 000474657374000174657374206d657373616765
2022.05.26 20:16:18.883 Incoming topic: test message: test message qos: 1 packet_id: 1
2022.05.26 20:16:18.883 --- PUBACK flags: 2 42020001
However, if I run it again, there are no more messages :( - only the first one arrives. Even if I send messages with QOS=0
I now restart the process and run:
$ mosquitto_pub -t test -m "test message" -q 0
$ mosquitto_pub -t test -m "test message" -q 0
$ mosquitto_pub -t test -m "test message" -q 0
I see all 3 messages, this carries on working with QOS=0 but if I try to use 1 or 2, it stops working:
$ ruby example.rb
2022.05.26 20:17:40.667 removing file persist.data
2022.05.26 20:17:40.668 TCP connected
2022.05.26 20:17:40.668 --- CONNECT flags: 0 101f00044d5154540402001e00136578616d706c652e72622d6653475a594b4557
2022.05.26 20:17:40.669 +++ CONNACK flags: 0 length: 2 data: 0000
2022.05.26 20:17:40.669 on_connect
2022.05.26 20:17:40.669 --- SUBSCRIBE flags: 2 82110001000474657374020005746573743202
Session Present: false
2022.05.26 20:17:40.669 +++ SUBACK flags: 0 length: 4 data: 00010202
2022.05.26 20:17:44.872 +++ PUBLISH flags: 0 length: 18 data: 00047465737474657374206d657373616765
2022.05.26 20:17:44.872 Incoming topic: test message: test message qos: 0 packet_id:
2022.05.26 20:17:45.368 +++ PUBLISH flags: 0 length: 18 data: 00047465737474657374206d657373616765
2022.05.26 20:17:45.368 Incoming topic: test message: test message qos: 0 packet_id:
2022.05.26 20:17:45.667 +++ PUBLISH flags: 0 length: 18 data: 00047465737474657374206d657373616765
2022.05.26 20:17:45.667 Incoming topic: test message: test message qos: 0 packet_id:
2022.05.26 20:18:10.669 --- PINGREQ flags: 0 c000
2022.05.26 20:18:10.669 +++ PINGRESP flags: 0 length: 0 data:
Any ideas? - Sounds like maybe the PUBACK isn't working right?
I added the extra debug statements that were commented out in the source and when I send a QOS=1 message, it shows this:
2022.05.26 20:44:15.308
+++ PUBLISH flags: 2 length: 20 data: 000474657374000174657374206d657373616765
2022.05.26 20:44:15.308
Incoming topic: test message: test message qos: 1 packet_id: 1
2022.05.26 20:44:15.308
--- PUBACK flags: 2 42020001
2022.05.26 20:44:15.308
exiting ping fiber#<Fiber:0x000000011256e970 /usr/local/lib/ruby/gems/3.1.0/gems/async-2.0.2/lib/async/task.rb:254 (resumed)>
It seems to be failing here:
def read_bytes(count)
buffer = ''
while buffer.length != count
# TODO: rescue
chunk = @socket.read(count - buffer.length)
raise Mqtt3NormalExitException if chunk == '' || chunk.nil?
Chunk is nil here for some reason and this kills the process, reconnect doesn't seem to work. I can see with this exception it disables reconnect?
If I remove the raise, then I get hundreds of messages per second which look like this hmm:
2022.05.26 21:31:32.723 +++ PUBLISH flags: 10 length: 20 data: 000474657374000174657374206d657373616765
2022.05.26 21:31:32.723 Incoming topic: test message: test message qos: 1 packet_id: 1
2022.05.26 21:31:32.723 --- PUBACK flags: 2 42020001
2022.05.26 21:31:32.723 +++ SUBACK flags: 0 length: 3 data: 035201
2022.05.26 21:31:32.723 TCP disconnected
2022.05.26 21:31:32.723 TCP connected
2022.05.26 21:31:32.723 --- CONNECT flags: 0 102000044d5154540400001e00146578616d706c652e72622d207a5a6d517336476e
2022.05.26 21:31:32.723 entering ping fiber #<Fiber:0x0000000117665680 /usr/local/lib/ruby/gems/3.1.0/gems/async-2.0.2/lib/async/task.rb:254 (resumed)>2022.05.26 21:31:32.723 +++ CONNACK flags: 0 length: 2 data: 0100
2022.05.26 21:31:32.723 on_connect
2022.05.26 21:31:32.724 sleeping for 29.999921 sec
2022.05.26 21:31:32.724 --- SUBSCRIBE flags: 2 8209035300047465737401
Session Present: true
2022.05.26 21:31:32.724 +++ PUBLISH flags: 10 length: 20 data: 000474657374000174657374206d657373616765
2022.05.26 21:31:32.724 Incoming topic: test message: test message qos: 1 packet_id: 1
2022.05.26 21:31:32.724 --- PUBACK flags: 2 42020001
2022.05.26 21:31:32.724 +++ SUBACK flags: 0 length: 3 data: 035301
2022.05.26 21:31:32.724 TCP disconnected
2022.05.26 21:31:32.724 TCP connected
2022.05.26 21:31:32.724 --- CONNECT flags: 0 102000044d5154540400001e00146578616d706c652e72622d207a5a6d517336476e
2022.05.26 21:31:32.724 entering ping fiber #<Fiber:0x000000011765d228 /usr/local/lib/ruby/gems/3.1.0/gems/async-2.0.2/lib/async/task.rb:254 (resumed)>2022.05.26 21:31:32.724 +++ CONNACK flags: 0 length: 2 data: 0100
2022.05.26 21:31:32.724 on_connect
2022.05.26 21:31:32.724 sleeping for 29.999923 sec
2022.05.26 21:31:32.724 --- SUBSCRIBE flags: 2 8209035400047465737401
Session Present: true
2022.05.26 21:31:32.724 +++ PUBLISH flags: 10 length: 20 data: 000474657374000174657374206d657373616765
2022.05.26 21:31:32.724 Incoming topic: test message: test message qos: 1 packet_id: 1
2022.05.26 21:31:32.724 --- PUBACK flags: 2 42020001
2022.05.26 21:31:32.724 +++ SUBACK flags: 0 length: 3 data: 035401
2022.05.26 21:31:32.724 TCP disconnected
2022.05.26 21:31:32.724 TCP connected
2022.05.26 21:31:32.724 --- CONNECT flags: 0 102000044d5154540400001e00146578616d706c652e72622d207a5a6d517336476e
2022.05.26 21:31:32.724 entering ping fiber #<Fiber:0x0000000117654d30 /usr/local/lib/ruby/gems/3.1.0/gems/async-2.0.2/lib/async/task.rb:254 (resumed)>2022.05.26 21:31:32.724 +++ CONNACK flags: 0 length: 2 data: 0100
2022.05.26 21:31:32.724 on_connect
2022.05.26 21:31:32.724 sleeping for 29.999919 sec
2022.05.26 21:31:32.724 --- SUBSCRIBE flags: 2 8209035500047465737401
Session Present: true
...
Interesting, if I set clean_session: true, then I see something like this - it successfully reconnects after every message but then I see a lot of extra sleep messages:
Session Present: false
2022.05.26 21:33:23.181 +++ SUBACK flags: 0 length: 3 data: 000e01
2022.05.26 21:33:23.644 +++ PUBLISH flags: 2 length: 20 data: 000474657374000174657374206d657373616765
2022.05.26 21:33:23.644 Incoming topic: test message: test message qos: 1 packet_id: 1
2022.05.26 21:33:23.644 --- PUBACK flags: 2 42020001
2022.05.26 21:33:23.644 TCP disconnected
2022.05.26 21:33:23.644 TCP connected
2022.05.26 21:33:23.644 --- CONNECT flags: 0 102000044d5154540402001e00146578616d706c652e72622d204a42414f65566c4a
2022.05.26 21:33:23.644 entering ping fiber #<Fiber:0x000000010adb7850 /usr/local/lib/ruby/gems/3.1.0/gems/async-2.0.2/lib/async/task.rb:254 (resumed)>2022.05.26 21:33:23.644 +++ CONNACK flags: 0 length: 2 data: 0000
2022.05.26 21:33:23.645 on_connect
2022.05.26 21:33:23.645 sleeping for 29.998856 sec
2022.05.26 21:33:23.645 --- SUBSCRIBE flags: 2 8209000f00047465737401
Session Present: false
2022.05.26 21:33:23.646 +++ SUBACK flags: 0 length: 3 data: 000f01
2022.05.26 21:33:24.604 +++ PUBLISH flags: 2 length: 20 data: 000474657374000174657374206d657373616765
2022.05.26 21:33:24.604 Incoming topic: test message: test message qos: 1 packet_id: 1
2022.05.26 21:33:24.604 --- PUBACK flags: 2 42020001
2022.05.26 21:33:24.604 TCP disconnected
2022.05.26 21:33:24.605 TCP connected
2022.05.26 21:33:24.605 --- CONNECT flags: 0 102000044d5154540402001e00146578616d706c652e72622d204a42414f65566c4a
2022.05.26 21:33:24.605 entering ping fiber #<Fiber:0x000000010ada7bd0 /usr/local/lib/ruby/gems/3.1.0/gems/async-2.0.2/lib/async/task.rb:254 (resumed)>2022.05.26 21:33:24.605 +++ CONNACK flags: 0 length: 2 data: 0000
2022.05.26 21:33:24.605 sleeping for 30 sec
2022.05.26 21:33:24.605 on_connect
2022.05.26 21:33:24.607 --- SUBSCRIBE flags: 2 8209001000047465737401
Session Present: false
2022.05.26 21:33:24.607 +++ SUBACK flags: 0 length: 3 data: 001001
2022.05.26 21:33:42.064 sleeping for 12.543218 sec
2022.05.26 21:33:43.783 sleeping for 10.823659 sec
^[[A
2022.05.26 21:33:46.971 sleeping for 7.636008 sec
2022.05.26 21:33:47.374 +++ PUBLISH flags: 2 length: 20 data: 000474657374000174657374206d657373616765
2022.05.26 21:33:47.374 Incoming topic: test message: test message qos: 1 packet_id: 1
2022.05.26 21:33:47.374 --- PUBACK flags: 2 42020001
2022.05.26 21:33:47.374 TCP disconnected
2022.05.26 21:33:47.375 TCP connected
2022.05.26 21:33:47.375 --- CONNECT flags: 0 102000044d5154540402001e00146578616d706c652e72622d204a42414f65566c4a
2022.05.26 21:33:47.375 entering ping fiber #<Fiber:0x000000010ad9d018 /usr/local/lib/ruby/gems/3.1.0/gems/async-2.0.2/lib/async/task.rb:254 (resumed)>2022.05.26 21:33:47.375 +++ CONNACK flags: 0 length: 2 data: 0000
2022.05.26 21:33:47.381 on_connect
2022.05.26 21:33:47.381 sleeping for 29.993233 sec
2022.05.26 21:33:47.382 --- SUBSCRIBE flags: 2 8209001100047465737401
Session Present: false
2022.05.26 21:33:47.382 +++ SUBACK flags: 0 length: 3 data: 001101
2022.05.26 21:33:47.826 sleeping for 29.556088 sec
2022.05.26 21:33:48.291 sleeping for 29.090683 sec
2022.05.26 21:33:48.741 sleeping for 28.640531 sec
2022.05.26 21:33:49.265 sleeping for 28.116997 sec
2022.05.26 21:33:49.789 sleeping for 27.592246 sec
2022.05.26 21:33:50.887 sleeping for 26.494276 sec
2022.05.26 21:33:51.441 sleeping for 25.94052 sec
2022.05.26 21:33:51.921 sleeping for 25.460815 sec
2022.05.26 21:33:52.371 sleeping for 25.01077 sec
2022.05.26 21:33:52.760 sleeping for 24.62138 sec
2022.05.26 21:33:53.181 sleeping for 24.201019 sec
2022.05.26 21:33:53.645 sleeping for 23.736735 sec
2022.05.26 21:33:54.608 sleeping for 22.77354 sec2022.05.26 21:33:54.608 sleeping for 22.773487 sec2022.05.26 21:33:54.608 sleeping for 22.773467 sec2022.05.26 21:33:54.608 sleeping for 22.77345 sec
2022.05.26 21:33:56.629 +++ PUBLISH flags: 2 length: 20 data: 000474657374000174657374206d657373616765
2022.05.26 21:33:56.629 Incoming topic: test message: test message qos: 1 packet_id: 1
2022.05.26 21:33:56.629 --- PUBACK flags: 2 42020001
2022.05.26 21:33:56.630 TCP disconnected
2022.05.26 21:33:56.630 TCP connected
2022.05.26 21:33:56.630 --- CONNECT flags: 0 102000044d5154540402001e00146578616d706c652e72622d204a42414f65566c4a
2022.05.26 21:33:56.630 entering ping fiber #<Fiber:0x000000010ad86778 /usr/local/lib/ruby/gems/3.1.0/gems/async-2.0.2/lib/async/task.rb:254 (resumed)>2022.05.26 21:33:56.630 +++ CONNACK flags: 0 length: 2 data: 0000
2022.05.26 21:33:56.637 on_connect
2022.05.26 21:33:56.637 sleeping for 29.992504 sec
2022.05.26 21:33:56.637 --- SUBSCRIBE flags: 2 8209001200047465737401
Session Present: false
2022.05.26 21:33:56.638 +++ SUBACK flags: 0 length: 3 data: 001201
Can you please try it with libev or async 1.x?
I really have no idea what is going on, I am using async 1.x and it works flawlessly.
I am guessing that async 2.x is the culprit.
Exact same behaviour with libev_scheduler, any ideas?
Thanks for giving it a try.
You were right.
It seems my mosquitto is not as picky as yours.
The library sent --- PUBACK flags: 2 42020001
But this is not a legal answer according to spec, first byte should be 0x40
http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718043
I fixed this.
Also there is some problem with reconnection, i improved it a littlee, still not perfect i fear.