jsaak/ruby-mqtt3

Only receiving the first message with QOS = 1 and QOS = 2

Closed this issue · 7 comments

Sorry, it's me again :) - I'm using the code largely from the example:

require './lib/ruby-mqtt3'
require 'pp'
require 'async'
require 'fiber'
require 'socket'

scheduler = Async::Scheduler.new
Fiber.set_scheduler scheduler

m = Mqtt3.new(
  keepalive_sec: 30,
  persistence_filename: 'persist.data',
  clean_session: true,
  reconnect: true,
  host: 'localhost',
  port: '1883'
)

m.debug = true

m.on_disconnect do |reason|
  m.debug "TCP disconnected #{reason}"
end

m.on_connect do |session_present|
  m.debug 'on_connect'
  m.subscribe [['test', 1], ['test2', 1]]
  # m.invalid
  puts "Session Present: #{session_present}"
  # m.publish('test', 'message')
end

m.on_tcp_connect_error do |e,counter|
  m.debug e.inspect + ", waiting #{counter} sec"
  sleep counter
  false
end

m.on_tcp_connect_error do |e,counter|
  m.debug e.inspect + ", waiting #{counter} sec"
  true
end

m.on_message do |topic, message, qos, packet_id|
  m.debug "Incoming topic: #{topic} message: #{message} qos: #{qos} packet_id: #{packet_id}"
end

m.on_publish_finished do |packet_id|
  m.debug "packet published #{packet_id}"
end

Signal.trap("INT") do
  m.save
  exit
end

Signal.trap("TERM") do
  m.save
  exit
end

m.run

scheduler.run

When I run it I see this:

$ ruby example.rb
2022.05.26 20:15:57.656 removing file persist.data
2022.05.26 20:15:57.657 TCP connected
2022.05.26 20:15:57.657 --- CONNECT flags: 0  101f00044d5154540402001e00136578616d706c652e72622d575745596e513663
2022.05.26 20:15:57.657 +++ CONNACK flags: 0  length: 2  data: 0000
2022.05.26 20:15:57.658 on_connect
2022.05.26 20:15:57.658 --- SUBSCRIBE flags: 2  82110001000474657374020005746573743202
Session Present: false
2022.05.26 20:15:57.658 +++ SUBACK flags: 0  length: 4  data: 00010202

So far so good! - I run:

$ mosquitto_pub -t test -m "test message" -q 1

I see this:

2022.05.26 20:16:18.883 +++ PUBLISH flags: 2  length: 20  data: 000474657374000174657374206d657373616765
2022.05.26 20:16:18.883 Incoming topic: test message: test message qos: 1 packet_id: 1
2022.05.26 20:16:18.883 --- PUBACK flags: 2  42020001

However, if I run it again, there are no more messages :( - only the first one arrives. Even if I send messages with QOS=0

I now restart the process and run:

$  mosquitto_pub -t test -m "test message" -q 0
$  mosquitto_pub -t test -m "test message" -q 0
$  mosquitto_pub -t test -m "test message" -q 0

I see all 3 messages, this carries on working with QOS=0 but if I try to use 1 or 2, it stops working:

$ ruby example.rb
2022.05.26 20:17:40.667 removing file persist.data
2022.05.26 20:17:40.668 TCP connected
2022.05.26 20:17:40.668 --- CONNECT flags: 0  101f00044d5154540402001e00136578616d706c652e72622d6653475a594b4557
2022.05.26 20:17:40.669 +++ CONNACK flags: 0  length: 2  data: 0000
2022.05.26 20:17:40.669 on_connect
2022.05.26 20:17:40.669 --- SUBSCRIBE flags: 2  82110001000474657374020005746573743202
Session Present: false
2022.05.26 20:17:40.669 +++ SUBACK flags: 0  length: 4  data: 00010202
2022.05.26 20:17:44.872 +++ PUBLISH flags: 0  length: 18  data: 00047465737474657374206d657373616765
2022.05.26 20:17:44.872 Incoming topic: test message: test message qos: 0 packet_id:
2022.05.26 20:17:45.368 +++ PUBLISH flags: 0  length: 18  data: 00047465737474657374206d657373616765
2022.05.26 20:17:45.368 Incoming topic: test message: test message qos: 0 packet_id:
2022.05.26 20:17:45.667 +++ PUBLISH flags: 0  length: 18  data: 00047465737474657374206d657373616765
2022.05.26 20:17:45.667 Incoming topic: test message: test message qos: 0 packet_id:
2022.05.26 20:18:10.669 --- PINGREQ flags: 0  c000
2022.05.26 20:18:10.669 +++ PINGRESP flags: 0  length: 0  data:

Any ideas? - Sounds like maybe the PUBACK isn't working right?

I added the extra debug statements that were commented out in the source and when I send a QOS=1 message, it shows this:

2022.05.26 20:44:15.308
+++ PUBLISH flags: 2  length: 20  data: 000474657374000174657374206d657373616765
2022.05.26 20:44:15.308
Incoming topic: test message: test message qos: 1 packet_id: 1
2022.05.26 20:44:15.308
--- PUBACK flags: 2  42020001
2022.05.26 20:44:15.308
exiting ping fiber#<Fiber:0x000000011256e970 /usr/local/lib/ruby/gems/3.1.0/gems/async-2.0.2/lib/async/task.rb:254 (resumed)>

It seems to be failing here:

  def read_bytes(count)
    buffer = ''
    while buffer.length != count
      # TODO: rescue
      chunk = @socket.read(count - buffer.length)
      raise Mqtt3NormalExitException if chunk == '' || chunk.nil?

Chunk is nil here for some reason and this kills the process, reconnect doesn't seem to work. I can see with this exception it disables reconnect?

If I remove the raise, then I get hundreds of messages per second which look like this hmm:

2022.05.26 21:31:32.723  +++ PUBLISH flags: 10  length: 20  data: 000474657374000174657374206d657373616765
2022.05.26 21:31:32.723  Incoming topic: test message: test message qos: 1 packet_id: 1
2022.05.26 21:31:32.723  --- PUBACK flags: 2  42020001
2022.05.26 21:31:32.723  +++ SUBACK flags: 0  length: 3  data: 035201
2022.05.26 21:31:32.723  TCP disconnected
2022.05.26 21:31:32.723  TCP connected
2022.05.26 21:31:32.723  --- CONNECT flags: 0  102000044d5154540400001e00146578616d706c652e72622d207a5a6d517336476e
2022.05.26 21:31:32.723  entering ping fiber #<Fiber:0x0000000117665680 /usr/local/lib/ruby/gems/3.1.0/gems/async-2.0.2/lib/async/task.rb:254 (resumed)>2022.05.26 21:31:32.723  +++ CONNACK flags: 0  length: 2  data: 0100
2022.05.26 21:31:32.723  on_connect
2022.05.26 21:31:32.724  sleeping for 29.999921 sec
2022.05.26 21:31:32.724  --- SUBSCRIBE flags: 2  8209035300047465737401
Session Present: true

2022.05.26 21:31:32.724  +++ PUBLISH flags: 10  length: 20  data: 000474657374000174657374206d657373616765
2022.05.26 21:31:32.724  Incoming topic: test message: test message qos: 1 packet_id: 1
2022.05.26 21:31:32.724  --- PUBACK flags: 2  42020001
2022.05.26 21:31:32.724  +++ SUBACK flags: 0  length: 3  data: 035301
2022.05.26 21:31:32.724  TCP disconnected
2022.05.26 21:31:32.724  TCP connected
2022.05.26 21:31:32.724  --- CONNECT flags: 0  102000044d5154540400001e00146578616d706c652e72622d207a5a6d517336476e
2022.05.26 21:31:32.724  entering ping fiber #<Fiber:0x000000011765d228 /usr/local/lib/ruby/gems/3.1.0/gems/async-2.0.2/lib/async/task.rb:254 (resumed)>2022.05.26 21:31:32.724  +++ CONNACK flags: 0  length: 2  data: 0100
2022.05.26 21:31:32.724  on_connect
2022.05.26 21:31:32.724  sleeping for 29.999923 sec
2022.05.26 21:31:32.724  --- SUBSCRIBE flags: 2  8209035400047465737401
Session Present: true

2022.05.26 21:31:32.724  +++ PUBLISH flags: 10  length: 20  data: 000474657374000174657374206d657373616765
2022.05.26 21:31:32.724  Incoming topic: test message: test message qos: 1 packet_id: 1
2022.05.26 21:31:32.724  --- PUBACK flags: 2  42020001
2022.05.26 21:31:32.724  +++ SUBACK flags: 0  length: 3  data: 035401
2022.05.26 21:31:32.724  TCP disconnected
2022.05.26 21:31:32.724  TCP connected
2022.05.26 21:31:32.724  --- CONNECT flags: 0  102000044d5154540400001e00146578616d706c652e72622d207a5a6d517336476e
2022.05.26 21:31:32.724  entering ping fiber #<Fiber:0x0000000117654d30 /usr/local/lib/ruby/gems/3.1.0/gems/async-2.0.2/lib/async/task.rb:254 (resumed)>2022.05.26 21:31:32.724  +++ CONNACK flags: 0  length: 2  data: 0100
2022.05.26 21:31:32.724  on_connect
2022.05.26 21:31:32.724  sleeping for 29.999919 sec
2022.05.26 21:31:32.724  --- SUBSCRIBE flags: 2  8209035500047465737401
Session Present: true

...

Interesting, if I set clean_session: true, then I see something like this - it successfully reconnects after every message but then I see a lot of extra sleep messages:

Session Present: false
2022.05.26 21:33:23.181  +++ SUBACK flags: 0  length: 3  data: 000e01
2022.05.26 21:33:23.644  +++ PUBLISH flags: 2  length: 20  data: 000474657374000174657374206d657373616765
2022.05.26 21:33:23.644  Incoming topic: test message: test message qos: 1 packet_id: 1
2022.05.26 21:33:23.644  --- PUBACK flags: 2  42020001
2022.05.26 21:33:23.644  TCP disconnected
2022.05.26 21:33:23.644  TCP connected
2022.05.26 21:33:23.644  --- CONNECT flags: 0  102000044d5154540402001e00146578616d706c652e72622d204a42414f65566c4a
2022.05.26 21:33:23.644  entering ping fiber #<Fiber:0x000000010adb7850 /usr/local/lib/ruby/gems/3.1.0/gems/async-2.0.2/lib/async/task.rb:254 (resumed)>2022.05.26 21:33:23.644  +++ CONNACK flags: 0  length: 2  data: 0000
2022.05.26 21:33:23.645  on_connect
2022.05.26 21:33:23.645  sleeping for 29.998856 sec
2022.05.26 21:33:23.645  --- SUBSCRIBE flags: 2  8209000f00047465737401

Session Present: false
2022.05.26 21:33:23.646  +++ SUBACK flags: 0  length: 3  data: 000f01
2022.05.26 21:33:24.604  +++ PUBLISH flags: 2  length: 20  data: 000474657374000174657374206d657373616765
2022.05.26 21:33:24.604  Incoming topic: test message: test message qos: 1 packet_id: 1
2022.05.26 21:33:24.604  --- PUBACK flags: 2  42020001
2022.05.26 21:33:24.604  TCP disconnected
2022.05.26 21:33:24.605  TCP connected
2022.05.26 21:33:24.605  --- CONNECT flags: 0  102000044d5154540402001e00146578616d706c652e72622d204a42414f65566c4a
2022.05.26 21:33:24.605  entering ping fiber #<Fiber:0x000000010ada7bd0 /usr/local/lib/ruby/gems/3.1.0/gems/async-2.0.2/lib/async/task.rb:254 (resumed)>2022.05.26 21:33:24.605  +++ CONNACK flags: 0  length: 2  data: 0000
2022.05.26 21:33:24.605  sleeping for 30 sec
2022.05.26 21:33:24.605  on_connect
2022.05.26 21:33:24.607  --- SUBSCRIBE flags: 2  8209001000047465737401

Session Present: false
2022.05.26 21:33:24.607  +++ SUBACK flags: 0  length: 3  data: 001001
2022.05.26 21:33:42.064  sleeping for 12.543218 sec
2022.05.26 21:33:43.783  sleeping for 10.823659 sec
^[[A
2022.05.26 21:33:46.971  sleeping for 7.636008 sec
2022.05.26 21:33:47.374  +++ PUBLISH flags: 2  length: 20  data: 000474657374000174657374206d657373616765
2022.05.26 21:33:47.374  Incoming topic: test message: test message qos: 1 packet_id: 1
2022.05.26 21:33:47.374  --- PUBACK flags: 2  42020001
2022.05.26 21:33:47.374  TCP disconnected
2022.05.26 21:33:47.375  TCP connected
2022.05.26 21:33:47.375  --- CONNECT flags: 0  102000044d5154540402001e00146578616d706c652e72622d204a42414f65566c4a
2022.05.26 21:33:47.375  entering ping fiber #<Fiber:0x000000010ad9d018 /usr/local/lib/ruby/gems/3.1.0/gems/async-2.0.2/lib/async/task.rb:254 (resumed)>2022.05.26 21:33:47.375  +++ CONNACK flags: 0  length: 2  data: 0000
2022.05.26 21:33:47.381  on_connect
2022.05.26 21:33:47.381  sleeping for 29.993233 sec
2022.05.26 21:33:47.382  --- SUBSCRIBE flags: 2  8209001100047465737401

Session Present: false
2022.05.26 21:33:47.382  +++ SUBACK flags: 0  length: 3  data: 001101
2022.05.26 21:33:47.826  sleeping for 29.556088 sec
2022.05.26 21:33:48.291  sleeping for 29.090683 sec
2022.05.26 21:33:48.741  sleeping for 28.640531 sec
2022.05.26 21:33:49.265  sleeping for 28.116997 sec
2022.05.26 21:33:49.789  sleeping for 27.592246 sec
2022.05.26 21:33:50.887  sleeping for 26.494276 sec
2022.05.26 21:33:51.441  sleeping for 25.94052 sec
2022.05.26 21:33:51.921  sleeping for 25.460815 sec
2022.05.26 21:33:52.371  sleeping for 25.01077 sec
2022.05.26 21:33:52.760  sleeping for 24.62138 sec
2022.05.26 21:33:53.181  sleeping for 24.201019 sec
2022.05.26 21:33:53.645  sleeping for 23.736735 sec
2022.05.26 21:33:54.608  sleeping for 22.77354 sec2022.05.26 21:33:54.608  sleeping for 22.773487 sec2022.05.26 21:33:54.608  sleeping for 22.773467 sec2022.05.26 21:33:54.608  sleeping for 22.77345 sec



2022.05.26 21:33:56.629  +++ PUBLISH flags: 2  length: 20  data: 000474657374000174657374206d657373616765
2022.05.26 21:33:56.629  Incoming topic: test message: test message qos: 1 packet_id: 1
2022.05.26 21:33:56.629  --- PUBACK flags: 2  42020001
2022.05.26 21:33:56.630  TCP disconnected
2022.05.26 21:33:56.630  TCP connected
2022.05.26 21:33:56.630  --- CONNECT flags: 0  102000044d5154540402001e00146578616d706c652e72622d204a42414f65566c4a
2022.05.26 21:33:56.630  entering ping fiber #<Fiber:0x000000010ad86778 /usr/local/lib/ruby/gems/3.1.0/gems/async-2.0.2/lib/async/task.rb:254 (resumed)>2022.05.26 21:33:56.630  +++ CONNACK flags: 0  length: 2  data: 0000
2022.05.26 21:33:56.637  on_connect
2022.05.26 21:33:56.637  sleeping for 29.992504 sec
2022.05.26 21:33:56.637  --- SUBSCRIBE flags: 2  8209001200047465737401

Session Present: false
2022.05.26 21:33:56.638  +++ SUBACK flags: 0  length: 3  data: 001201
jsaak commented

Can you please try it with libev or async 1.x?
I really have no idea what is going on, I am using async 1.x and it works flawlessly.
I am guessing that async 2.x is the culprit.

Exact same behaviour with libev_scheduler, any ideas?

jsaak commented

Thanks for giving it a try.
You were right.
It seems my mosquitto is not as picky as yours.
The library sent --- PUBACK flags: 2 42020001
But this is not a legal answer according to spec, first byte should be 0x40
http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718043

I fixed this.

Also there is some problem with reconnection, i improved it a littlee, still not perfect i fear.