shanewholloway/js-u8-mqtt

Question with respect to on_topic

Closed this issue ยท 9 comments

gwww commented

I'm getting multiple matches on my topics, I'd like to get just the first match. What I have is this:

mqtt
  .on_topic('zigbee2mqtt/bridge/:msg', (pkt, params, ctx) => { print_pkt('rule 1', pkt, params, ctx) })
  .on_topic('zigbee2mqtt/:device/availability', (pkt, params, ctx) => { print_pkt('rule 2', pkt, params, ctx) })
  .on_topic('zigbee2mqtt/:device', (pkt, params, ctx) => { print_pkt('rule 3', pkt, params, ctx) })
  .on_topic('zigbee2mqtt/*', (pkt, params, ctx) => { print_pkt('rule 4', pkt, params, ctx) })
  .subscribe_topic('zigbee2mqtt/#', (pkt, params, ctx) => { print_pkt('rule sub', pkt, params, ctx) })

A message with topic zigbee2mqtt/bridge/state matches rule sub, rule 1, and rule 4 (those refer to the first param to print_pkt).

I can understand why I could get multiple matches. Is there a way to stop matching after the first match?

Thanks! BTW, I found your lib through this: mqttjs/MQTT.js#1541. I'm experimenting with transitioning some working code that is using MQTT.js to your lib. If you're interested, my project, still in its infancy, is here: https://github.com/gwww/z2m-frontend.

I'm glad you found u8-mqtt!

To skip following matches, set ctx.done = true (or another truthy value) in the handler. I've documented it briefly in mqtt.router.invoke(pkt). Peek at the implementation in invoke's source code.

gwww commented

Thank you! Your pointers helped! I had to first figure out what the heck jsy code was :) Once I figured that bit out, and working myself through a couple of dumb errors, I have matches working as I would hope. I love the simplicity of the route matching paradigm.

I think I've played with this enough to drop it into my code base to replace MQTT.js.

Thanks for writing this!

gwww commented

I just integrated the code with the browser (I was doing my tests with node) and it connects and receives packets, but I get many errors as so:

ReferenceError: WebSocket is not defined
    at file:///Users/glenn/Development/mqtt/z2m-frontend/node_modules/.pnpm/u8-mqtt@0.3.1/node_modules/u8-mqtt/esm/web/index.js:1532:15
    at self._use_conn (file:///Users/glenn/Development/mqtt/z2m-frontend/node_modules/.pnpm/u8-mqtt@0.3.1/node_modules/u8-mqtt/esm/web/index.js:1460:45)
    at self.with_websock (file:///Users/glenn/Development/mqtt/z2m-frontend/node_modules/.pnpm/u8-mqtt@0.3.1/node_modules/u8-mqtt/esm/web/index.js:1530:19)
    at new MQTT (/Users/glenn/Development/mqtt/z2m-frontend/src/lib/mqtt.ts:22:7)
    at /Users/glenn/Development/mqtt/z2m-frontend/src/lib/mqtt.ts:97:13

My Google foo is not turning up anything that fixes this. I did try one thing I saw which was to Object.assign(global, {WebSocket: require('ws')});, which felt not right, and did not work. As you can see I'm using the esm/web version, which seems right.

Thanks in advance!

gwww commented

Nevermind. I'm using the Svelte framework and code must be flagged to run in browser. Turns out it was trying to the code server side, which of course does not have WebSocket.

Thanks for leaving a breadcrumb to follow!

gwww commented

I do have it running in the browser now! I still have a Svelte issue the causes the code to run in the sever, but I can work around that for the time being.

Once I figured out a few of the concepts in your library (thank you!) the switch from MQTT.js to your library was seamless.

@gwww we also use js-u8-mqtt in svelte.
We wrapped it into store

/store/mqtt.ts

import { writable } from "svelte/store";
import mqtt_client from "u8-mqtt/esm/web/v5.mjs";
import type { Params } from "$store/params";
import { throttle } from "../throttle";

const client = writable<MqttClient | undefined>();
export const mqttReady = writable(false);

function onLiveCallback(mqtt: MqttClient) {
	console.log("Live!");
	client.set(mqtt);
}

function onReconnectCallback() {
	console.warn("Websocket lost, reconnecting in 3s...");
	mqttReady.set(false);
	client.set(undefined);
}

function initWebsocket(params: Params) {
	mqtt_client()
		.with_live(onLiveCallback)
		.with_reconnect(onReconnectCallback)
		.with_websock(params.mqtt.origin);
}

export default {
	...client,
	initWebsocket: throttle(initWebsocket, 3000),
};

But there is missing typings for it now.
You can put
/src//u8-mqtt.d.ts

type RouteParams<R extends string> = R extends `:${infer S}/${infer Rest}`
	? { [k in S]: string } & RouteParams<Rest>
	: R extends `${string}/${infer Rest}`
	? RouteParams<Rest>
	: R extends `:${infer S}`
	? { [k in S]: string }
	: unknown;

type OnLiveCb = (mqtt: MqttClient) => any;
type OnReconnectCb = (mqtt: MqttClient) => any;
type OnTopicCallback<T extends string = string> = (pkt: Packet, params: RouteParams<T>) => any;

interface Packet {
	payload: Uint8Array;
	utf8(): string;
	text(): string;
	json(): object;
}

// type CutLeadingSlash<T extends string> = T extends `/${infer R}`
// 	? CutLeadingSlash<R>
// 	: R;
// type RouteParams<T extends string> = T extends `:${infer P}/${infer Rest}`
// 	? { [k in P]: string } & RouteParams<Rest>
// 	: T extends `:${infer P}`
// 	? { [k in P]: string }
// 	: unknown;

type Payload = string | Uint8Array | null;
type JsonPayload = number | string | object;
type SendReturn = Promise<unknown>;

interface ConnectOpts {
	username?: string;
	password?: string;
	client_id?: string | string[];
	flags?: {
		clean_start?: boolean;
	};
	will?: {
		topic: string;
		payload?: Payload;
	};
}

type QosLevel = 0 | 1 | 2;

interface PubPacket {
	qos?: QosLevel;
	retain?: boolean;
	topic: string;
	payload: Payload;
}

type PubOpt = {
	/** an alternate encoding to JSON.stringify(msg) */
	fn_encode?(msg: any): Payload;
	/** are copied over pkt.props, if present. */
	props: any;
	/** is called before publishing pkt = xform(pkt), if present. */
	xform?(pkt: Packet): void;
};

type SubscribeFlags = {
	qos?: QosLevel;
	/** In fact No Local */
	retain?: boolean;
};

interface MqttClient {
	with_live(fn: OnLiveCb): this;
	with_reconnect(fn: OnReconnectCb): this;
	with_websock(url: string): this;

	connect(opts?: ConnectOpts): Promise<unknown>;

	on_topic<T extends string>(topic: T, callback: OnTopicCallback<T>): this;
	subscribe_topic<T extends string>(topic: T, callback: OnTopicCallback<T>): this;

	publish(pkt: PubPacket, opt?: PubOpt): SendReturn;
	pub: this["publish"];

	/**
	 * Alias for
	 * `mqtt.publish({qos: 0, topic, payload}, pub_opt)`
	 */
	post(topic: string, payload: Payload, opt?: PubOpt): SendReturn;
	post(topic: string): (payload: Payload) => SendReturn;
	/**
	 * Alias for
	 * `mqtt.publish({qos: 1, topic, payload}, pub_opt)`
	 */
	send(topic: string, payload: Payload, opt?: PubOpt): SendReturn;
	send(topic: string): (payload?: Payload) => SendReturn;
	/**
	 * Alias for
	 * `mqtt.publish({qos: 1, retain: true, topic, payload}, pub_opt)`
	 */
	store(topic: string, payload: Payload, opt?: PubOpt): SendReturn;
	store(topic: string): (payload?: Payload) => SendReturn;

	/**
	 * Alias for
	 * `mqtt.publish({qos: 1, topic, payload: JSON.stringify(msg)}, pub_opt)`
	 */
	json_send(topic: string, payload: JsonPayload, opt?: PubOpt): SendReturn;
	json_send(topic: string): (payload: JsonPayload) => SendReturn;
	/**
	 * Alias for
	 * `mqtt.publish({qos: 0, topic, payload: JSON.stringify(msg)}, pub_opt)`
	 */
	json_post(topic: string, payload: JsonPayload, opt?: PubOpt): SendReturn;
	json_post(topic: string): (payload: JsonPayload) => SendReturn;
	/**
	 * Alias for
	 * `mqtt.publish({qos: 1, retain:true, topic, payload: JSON.stringify(msg)}, pub_opt)`
	 */
	json_store(topic: string, payload: JsonPayload, opt?: PubOpt): SendReturn;
	json_store(topic: string): (payload: JsonPayload, opt?: PubOpt) => SendReturn;

	subscribe(topic: string | string[], ex?: SubscribeFlags): SendReturn;
}

interface ClientOpts {
	on_live?: OnLiveCb;
	on_reconnect?: OnReconnectCb;
}

function client(opts?: ClientOpts): MqttClient;

declare module "u8-mqtt/esm/web/v4.mjs" {
	export default client;
}

declare module "u8-mqtt/esm/web/v5.mjs" {
	export default client;
}

declare module "u8-mqtt" {
	export default client;
}

it will a bit improve situation, but as you can see some future steps needed for correct typing.

gwww commented

Thanks @nosovk! I'll take a look deeper at a later time. I got around the typing by just defining a module statement. It's good enough for now. All my MQTT code is isolated in a single file. My repo where I'm working on this stuff is here: https://github.com/gwww/z2m-frontend. It is in various states of working as it's active development.

One thing is that the MQTT lib has been solid. Other than having a couple of learning hiccups it does exactly what I need, it seems fast, and the code base is interesting to read.

I'm a total newbie at Javascript, Typescript, and web development. I started this project to learn and have some fun. Beyond learning the tech listed above, I'm playing with the creative side and trying different UX stuff. I'm spending lots of time trying to "get it right" before building out the features. Much of what I've done has been refactored multiple times. And, as I learn more about the base technologies, I try and go back and refactor that in. I'm trying for a clean code base.

Feel free to ping me on my repo (an issue is fine for a discussion).