user-signal/fs2-mqtt

Trying to get LocalSubscriber example to work

Closed this issue · 6 comments

tulth commented

I run the local subscriber with no arguments, note I changed the host from localhost to my mqtt-broker.

sbt:root> project examples
sbt:examples> runMain net.sigusr.mqtt.examples.LocalSubscriber
[warn] Credentials file /home/me/.ivy2/.credentials does not exist
[warn] Credentials file /home/me/.ivy2/.credentials does not exist
...
[info] running net.sigusr.mqtt.examples.LocalSubscriber 
Transport disconnected
Transport connected
 → ConnectFrame(Header(false,0,false),ConnectVariableHeader(true,true,false,0,false,false,5),Local-Subscriber,None,None,Some(Local-Subscriber),Some(yolo))
 ← ConnackFrame(Header(false,0,false),0)
Session started
 → SubscribeFrame(Header(false,1,false),0,Vector((Local-Subscriber/stop,2), (AtMostOnce,0), (AtLeastOnce,1), (ExactlyOnce,2)))
Transport disconnected
Transport connected
 → ConnectFrame(Header(false,0,false),ConnectVariableHeader(true,true,false,0,false,false,5),Local-Subscriber,None,None,Some(Local-Subscriber),Some(yolo))
 ← ConnackFrame(Header(false,0,false),0)
 → SubscribeFrame(Header(true,1,false),0,Vector((Local-Subscriber/stop,2), (AtMostOnce,0), (AtLeastOnce,1), (ExactlyOnce,2)))
Session started
Transport disconnected
Transport connected
 → ConnectFrame(Header(false,0,false),ConnectVariableHeader(true,true,false,0,false,false,5),Local-Subscriber,None,None,Some(Local-Subscriber),Some(yolo))
 ← ConnackFrame(Header(false,0,false),0)
Session started
 → PingReqFrame(Header(false,0,false))
 ← PingRespFrame(Header(false,0,false))
 → PingReqFrame(Header(false,0,false))

It fails to stop on the Local-Subscriber/stop topic and prints nothing on the AtMostOnce.
I never see it report a SubAck.
Also maybe its related but it reports transport disconnected right after the SubscribeFrame.

Any suggestions welcome!

The fact that it fails to stop is because there is no successful subscription to the topic Local-Subscriber/stop. And, as far as I can see, it looks like it's the subscription that fails (as you mentioned no SubAck is traced and disconnection occurs just after the subscription attempt which is a normal sing that something went wrong).

Just to get further insight, what kind of broker are you using ? Could you try to subscribe to this broker with mosquitto-sub ? What would be valuable for me is to see the complete mosquitto-sub command line you'll use, and run it with the -d to get some message exchange traces. Moreover, if you look at the code, the Local-Subscriber provides the credentials Local-Subscriber/yolo to the broker. This may be the cause of your issue here, but anyway it would be nice to try to incorporate this in your mosquitto-sub command line.

Let's see if there is something interesting out of this...

tulth commented

Ok infos:

I have these diffs from starting from hash bcd04b3:

@@ -54,7 +54,7 @@ object LocalSubscriber extends App {
     )
     val transportConfig =
       TransportConfig[Task](
-        "localhost",
+        "mqtt-broker",
         1883,
         // TLS support looks like
         // 8883,
@@ -66,9 +66,9 @@ object LocalSubscriber extends App {
       SessionConfig(
         s"$localSubscriber",
         cleanSession = false,
-        user = Some(localSubscriber),
-        password = Some("yolo"),
         keepAlive = 5
+//        user = Some(localSubscriber),
+//        password = Some("yolo"),

Reran, same behavior:

[tulth@crm-114 fs2-mqtt]$ sbt
[info] Loading settings for project global-plugins from metals.sbt ...
[info] Loading global plugins from /home/tulth/.sbt/1.0/plugins
[info] Loading settings for project fs2-mqtt-build from plugins.sbt ...
[info] Loading project definition from /mnt/nfs/tulth/scala/mqtt-fs2-fun/fs2-mqtt/project
[info] Loading settings for project root from build.sbt ...
[info] Set current project to root (in build file:/mnt/nfs/tulth/scala/mqtt-fs2-fun/fs2-mqtt/)
[info] sbt server started at local:///home/tulth/.sbt/1.0/server/ce06dbe2da41d5f9aae8/sock
sbt:root> project examples
[info] Set current project to examples (in build file:/mnt/nfs/tulth/scala/mqtt-fs2-fun/fs2-mqtt/)
sbt:examples> runMain net.sigusr.mqtt.examples.LocalSubscriber
[warn] Credentials file /home/tulth/.ivy2/.credentials does not exist
[warn] Credentials file /home/tulth/.ivy2/.credentials does not exist
[warn] Multiple main classes detected.  Run 'show discoveredMainClasses' to see the list
[info] running net.sigusr.mqtt.examples.LocalSubscriber 
Transport connected
 → ConnectFrame(Header(false,0,false),ConnectVariableHeader(false,false,false,0,false,false,5),Local-Subscriber,None,None,None,None)
 ← ConnackFrame(Header(false,0,false),0)
Session started
 → SubscribeFrame(Header(false,1,false),0,Vector((Local-Subscriber/stop,2), (AtMostOnce,0), (AtLeastOnce,1), (ExactlyOnce,2)))
Transport disconnected
Transport connected
 → ConnectFrame(Header(false,0,false),ConnectVariableHeader(false,false,false,0,false,false,5),Local-Subscriber,None,None,None,None)
 ← ConnackFrame(Header(false,0,false),0)
 → SubscribeFrame(Header(true,1,false),0,Vector((Local-Subscriber/stop,2), (AtMostOnce,0), (AtLeastOnce,1), (ExactlyOnce,2)))
Session started
Transport disconnected
Transport connected
 → ConnectFrame(Header(false,0,false),ConnectVariableHeader(false,false,false,0,false,false,5),Local-Subscriber,None,None,None,None)
 ← ConnackFrame(Header(false,0,false),0)
Session started
 → PingReqFrame(Header(false,0,false))
 ← PingRespFrame(Header(false,0,false))
 → PingReqFrame(Header(false,0,false))
 ← PingRespFrame(Header(false,0,false))
^C
[warn] Canceling execution...
[error] Total time: 14 s, completed Nov 8, 2020 10:01:11 AM
sbt:examples>  → DisconnectFrame(Header(false,0,false))
Transport disconnected
2020-11-08 10:01:11,198 shutdown-hooks-run-all ERROR No Log4j 2 configuration file found. Using default configuration (logging only errors to the console), or user programmatically provided configurations. Set system property 'log4j2.debug' to show Log4j 2 internal initialization logging. See https://logging.apache.org/log4j/2.x/manual/configuration.html for instructions on how to configure Log4j 2
Transport connected
 → ConnectFrame(Header(false,0,false),ConnectVariableHeader(false,false,false,0,false,false,5),Local-Subscriber,None,None,None,None)
 ← ConnackFrame(Header(false,0,false),0)
Session started
[tulth@crm-114 fs2-mqtt]$ 

Trying something similar with mosquitto_sub, at the end I sent it some events using the MQTT Explorer app:

$ mosquitto_sub -d -h mqtt-broker -t Local-Subscriber/stop
Client mosq-x7lRbXfewhffMZaFb9 sending CONNECT
Client mosq-x7lRbXfewhffMZaFb9 received CONNACK (0)
Client mosq-x7lRbXfewhffMZaFb9 sending SUBSCRIBE (Mid: 1, Topic: Local-Subscriber/stop, QoS: 0, Options: 0x00)
Client mosq-x7lRbXfewhffMZaFb9 received SUBACK
Subscribed (mid: 1): 0
Client mosq-x7lRbXfewhffMZaFb9 received PUBLISH (d0, q0, r0, m0, 'Local-Subscriber/stop', ... (1 bytes))
1
Client mosq-x7lRbXfewhffMZaFb9 received PUBLISH (d0, q0, r0, m0, 'Local-Subscriber/stop', ... (1 bytes))
0
Client mosq-x7lRbXfewhffMZaFb9 sending DISCONNECT
tulth commented

I notice the message ID in mosquitto_sub is 1, where it seems to be 0 in fs2-mqtt.
The spec says:

2.3.1 Packet Identifier
SUBSCRIBE, UNSUBSCRIBE, and PUBLISH (in cases where QoS > 0) Control Packets MUST contain a
non-zero 16-bit Packet Identifier

Maybe this is my root issue?

tulth commented

I applied these diffs and it got a suback:

@@ -35,10 +35,10 @@ object IdGenerator {
   private def idQueue[F[_]: Concurrent](q: Queue[F, Int]): F[Fiber[F, Unit]] = {
     def go(v: Int): Stream[Pure, Int] =
       v match {
-        case 65535 => Stream.emit(0) ++ go(1)
+        case 65535 => Stream.emit(1) ++ go(2)
         case _     => Stream.emit(v) ++ go(v + 1)
       }
-    go(0).through(q.enqueue(_)).compile.drain.start
+    go(1).through(q.enqueue(_)).compile.drain.start
   }

I am not sure this is the correct patch as I am not familiar with the code.

Well spotted!
Thank you for the fix.
I've leaved this project abandoned for too long... I'll take advantage of this to update some dependencies, check if everything is still alright, and produce a 0.4.1 version.

tulth commented

happy to help! I am excited to replace some of my home automation python mqtt services with scala versions (why? fun/learning!).