Nest.js MQTT broker that can run on any stream server
- What is MQTT ?
- What is Pigeon MQTT ?
- Installation
- Usage
- Handlers
- Events
- Methods
- Packets
- Middleware & Plugins
- Dependencies
- Stay in touch
- License
- Example
MQTT is a lightweight IoT messaging protocol based on the publish/subscribe model. It can provide real-time and reliable messaging services for networked devices with very little code and bandwidth. It is widely used in the industries such as the IoT, mobile Internet, smart hardware, Internet of Vehicles and power energy.
MQTT (Message Queuing Telemetry Transport) is an open source, lightweight messaging protocol, optimized for low latency. This protocol provides a callable and cost-efficient way to connect devices using a publish/subscribe model. A communication system built on MQTT consists of the publishing server, a broker and one or more clients. It is designed for constrained devices and low-bandwidth, high-latency or unreliable networks.
Pigeon-MQTT-Nest is a lightweight and easy-to-use library that provides a simple MQTT broker for your NestJS applications. With this library, you can easily integrate MQTT messaging into your NestJS application and communicate with MQTT clients using topics and messages.
This library provides a customizable MQTT broker that can be easily integrated into your NestJS project. It uses the popular Eclipse Mosquitto library as the underlying MQTT broker engine and supports the MQTT 3.1, 3.1.1, and 5.0 protocol versions.
Pigeon-MQTT-Nest is designed to be flexible and easy to use. It provides a simple API that allows you to create topics, subscribe to topics, and publish messages to topics with just a few lines of code. Additionally, it provides advanced features such as message persistence and quality of service (QoS) levels.
Whether you're building a real-time chat application, IoT device management system, or any other kind of application that requires messaging, Pigeon-MQTT-Nest makes it easy to add MQTT messaging to your NestJS project.
Install From NPM :
$ npm i pigeon-mqtt-nest
Pigeon-Mqtt-Nestjs will register as a global module. You can import with configuration
@Module({
imports: [
PigeonModule.forRoot({
port:1884, // Port MQTT TCP Server
transport:Transport.TCP,
id: "binarybeast",
concurrency:100,
queueLimit:42,
maxClientsIdLength:23,
connectTimeout:30000,
heartbeatInterval:60000,
})
],
controllers: [AppController],
providers: [AppService],
})
export class AppModule {
}
- options
<object>
mq
<MQEmitter>
middleware used to deliver messages to subscribed clients. In a cluster environment it is used also to share messages between brokers instances. Default:mqemitter
concurrency
<number>
maximum number of concurrent messages delivered bymq
. Default:100
persistence
<Persistence>
middleware that stores QoS > 0, retained, will packets and subscriptions. Default:aedes-persistence
(in memory)queueLimit
<number>
maximum number of queued messages before client session is established. If number of queued items exceeds,connectionError
throws an errorClient queue limit reached
. Default:42
maxClientsIdLength
option to override MQTT 3.1.0 clients Id length limit. Default:23
heartbeatInterval
<number>
an interval in millisconds at which server beats its health signal in$SYS/<aedes.id>/heartbeat
topic. Default:60000
id
<string>
aedes broker unique identifier. Default:uuidv4()
connectTimeout
<number>
maximum waiting time in milliseconds waiting for aCONNECT
packet. Default:30000
- Returns
<Aedes>
Handler | Emitted When |
---|---|
preConnect | Invoked when server receives a valid CONNECT packet. |
authenticate | Invoked after preConnect . |
authorizePublish | publish LWT to all online clients,incoming client publish |
authorizeSubscribe | restore subscriptions in non-clean session.,incoming client SUBSCRIBE |
published | same as Event: publish , but provides a backpressure functionality. |
- client:
<Client>
- packet:
<object>
CONNECT
- callback:
<Function>
(error, successful) => void
- error
<Error>
|null
- successful
<boolean>
- error
Invoked when server receives a valid CONNECT
packet. The packet can be modified.
client
object is in default state. If invoked callback
with no errors and successful
be true
, server will
continue to establish a session.
Any error
will be raised in connectionError
event.
Some Use Cases:
- Rate Limit / Throttle by
client.conn.remoteAddress
- Check
aedes.connectedClient
to limit maximum connections - IP blacklisting
@Injectable()
export class TestService {
@onPreConnect()
onPreConnect(@Client() client, @Packet() packet, @Function() done) {
console.log("Function: @onPreConnect()");
return done(null, true);
}
}
- client:
<Client>
- credential:
<string>
- callback:
<Function>
(error, successful) => void
- error
<Error>
|null
- successful
<boolean>
- error
Invoked after preConnect
.
Server parses the CONNECT
packet, initializes client
object which set client.id
to match the one
in CONNECT
packet and extract username
and password
as parameters for user-defined authentication flow.
If invoked callback
with no errors and successful
be true
, server authenticates client
and continues to setup
the client session.
If authenticated, server acknowledges a CONNACK
with returnCode=0
, otherwise returnCode=5
. Users could
define the value between 2
and 5
by defining a returnCode
property in error
object.
@Injectable()
export class TestService {
@onAuthenticate()
onAuthenticate(@Client() client, @Credential() credential, @Function() done) {
console.log("Function: @onAuthenticate()");
return done(null, true);
}
}
@Injectable()
export class TestService {
@onAuthenticate()
onAuthenticate(@Client() client, @Credential() credential, @Function() done) {
console.log("Function: @onAuthenticate()");
var error = new Error('Auth error')
error.returnCode = 4
return done(error, false);
}
}
Please refer to Connect Return Code to see their meanings.
- client:
<Client>
|null
- packet:
<object>
PUBLISH
- callback:
<Function>
(error) => void
- error
<Error>
|null
- error
Invoked when
- publish LWT to all online clients
- incoming client publish
client
is null
when aedes publishes obsolete LWT without connected clients
If invoked callback
with no errors, server authorizes the packet otherwise emits clientError
with error
. If
an error
occurs the client connection will be closed, but no error is returned to the client (MQTT-3.3.5-2)
@Injectable()
export class TestService {
@onAuthorizePublish()
onAuthorizePublish(@Client() client, @Packet() packet, @Function() done) {
console.log("Function: @onAuthorizePublish()");
if (packet.topic === 'aaaa') {
return done(new Error('wrong topic'))
}
if (packet.topic === 'bbb') {
packet.payload = Buffer.from('overwrite packet payload')
}
return done(null);
}
}
By default authorizePublish
throws error in case a client publish to topics with $SYS/
prefix to prevent possible
DoS (see #597). If you write your own implementation
of authorizePublish
we suggest you to add a check for this. Default implementation:
@Injectable()
export class TestService {
@onAuthorizePublish()
onAuthorizePublish(@Client() client, @Packet() packet, @Function() done) {
if (packet.topic.startsWith($SYS_PREFIX)) {
return done(new Error($SYS_PREFIX + ' topic is reserved'))
}
return done(null);
}
}
- client:
<Client>
- subscription:
<object>
- callback:
<Function>
(error) => void
- error
<Error>
|null
- subscription:
<object>
|null
- error
Invoked when
- restore subscriptions in non-clean session.
- incoming client
SUBSCRIBE
subscription
is a dictionary object like { topic: hello, qos: 0 }
.
If invoked callback
with no errors, server authorizes the packet otherwise emits clientError
with error
.
In general user should not touch the subscription
and pass to callback, but server gives an option to change the
subscription on-the-fly.
@Injectable()
export class TestService {
@onAuthorizeSubscribe()
onAuthorizeSubscribe(@Client() client, @Subscription() subscription, @Function() done) {
console.log("Function: @onAuthorizeSubscribe()");
if (subscription.topic === 'aaaa') {
return done(new Error('wrong topic'))
}
if (subscription.topic === 'bbb') {
// overwrites subscription
subscription.topic = 'foo'
subscription.qos = 1
}
return done(null, subscription);
}
}
To negate a subscription, set the subscription to null
. Aedes ignores the negated subscription and the qos
in SubAck
is set to 128
based
on MQTT 3.11 spec:
@Injectable()
export class TestService {
@onAuthorizeSubscribe()
onAuthorizeSubscribe(@Client() client, @Subscription() subscription, @Function() done) {
done(null, subscription.topic === 'aaaa' ? null : sub)
}
}
same as Event: publish
, but provides a backpressure functionality. TLDR; If you are doing operations
on packets that MUST require finishing operations on a packet before handling the next one use this otherwise,
especially for long-running operations, you should use Event: publish
instead.
@Injectable()
export class TestService {
@onPublish()
OnPublish(@Topic() topic, @Packet() packet, @Payload() payload, @Client() client) {
console.log("Function: @OnPublish()");
}
}
Name | Emitted When |
---|---|
Client | client registers itself to server |
Client Ready | client has received all its offline messages and be initialized. |
Client Disconnect | client disconnects. |
Client Error | an error occurs. |
Connection Error | an error occurs. |
Keep Alive Timeout | timeout happes in the client keepalive. |
Publish | servers delivers the packet to subscribed client . |
Ack | packet successfully delivered to the client . |
Subscribe | client successfully subscribe the subscriptions in server. |
Unsubscribe | client successfully unsubscribe the subscriptions in server. |
Connack Sent | server sends an acknowledge to client . |
Closed | server is closed. |
client
<Client>
Emitted when the client
registers itself to server. The client
is not ready yet.
Its connecting
state equals to true
.
Server publishes a SYS topic $SYS/<aedes.id>/new/clients
to inform it registers the client into its registration
pool. client.id
is the payload.
@Injectable()
export class TestService {
constructor(@Inject(PigeonService) private readonly aedesService: PigeonService) {
}
@onClient()
OnNewClient(@Client() client) {
console.log("Function: @onClient()");
}
}
client
<Client>
Emitted when the client
has received all its offline messages and be initialized.
The client
connected
state equals to true
and is ready for processing incoming
messages.
@Injectable()
export class TestService {
constructor(@Inject(PigeonService) private readonly aedesService: PigeonService) {
}
@onClientReady()
async onClientReady(@Client() client) {
console.log("Function: @onClientReady()");
}
}
client
<Client>
Emitted when a client disconnects.
Server publishes a SYS topic $SYS/<aedes.id>/disconnect/clients
to inform it deregisters the client. client.id
is
the payload.
@Injectable()
export class TestService {
constructor(@Inject(PigeonService) private readonly aedesService: PigeonService) {
}
@onClientDisconnect()
OnClientDisconnect(@Client() client) {
console.log("Function: @OnClientDisconnect()");
}
}
client
<Client>
error
<Error>
Emitted when an error occurs.
@Injectable()
export class TestService {
constructor(@Inject(PigeonService) private readonly aedesService: PigeonService) {
}
@onClientError()
OnClientError(@Client() client, @Error() error) {
console.log("Function: @onClientError()");
}
}
client
<Client>
error
<Error>
Emitted when an error occurs. Unlike clientError
it raises only when client
is uninitialized.
@Injectable()
export class TestService {
constructor(@Inject(PigeonService) private readonly aedesService: PigeonService) {
}
@onConnectionError()
OnConnectionError(@Client() client, @Error() error) {
console.log("Function: @OnConnectionError()");
}
}
client
<Client>
Emitted when timeout happes in the client
keepalive.
@Injectable()
export class TestService {
constructor(@Inject(PigeonService) private readonly aedesService: PigeonService) {
}
@onKeepLiveTimeout()
onKeepLiveTimeout(@Client() client) {
console.log("Function: @onKeepLiveTimeout()");
}
}
Emitted when servers delivers the packet
to subscribed client
. If there are no clients subscribed to the packet
topic, server still publish the packet
and emit the event. client
is null
when packet
is an internal message
like aedes heartbeat message and LWT.
Note!
packet
belongsaedes-packet
type. Some properties belongs to aedes internal, any changes on them will break aedes internal flow.
@Injectable()
export class TestService {
constructor(@Inject(PigeonService) private readonly aedesService: PigeonService) {
}
@onPublish()
OnPublish(@Topic() topic, @Packet() packet, @Payload() payload, @Client() client) {
console.log("Function: @OnPublish()");
}
}
Emitted an QoS 1 or 2 acknowledgement when the packet
successfully delivered to the client
.
@Injectable()
export class TestService {
constructor(@Inject(PigeonService) private readonly aedesService: PigeonService) {
}
@onAck()
onAck(@Client() client, @Packet() packet) {
console.log("Function: @onAck()");
}
}
subscriptions
<object>
client
<Client>
Emitted when client
successfully subscribe the subscriptions
in server.
subscriptions
is an array of { topic: topic, qos: qos }
. The array excludes duplicated topics and includes negated
subscriptions where qos
equals to 128
. See more
on authorizeSubscribe
Server publishes a SYS topic $SYS/<aedes.id>/new/subscribers
to inform a client successfully subscribed to one or more
topics. The payload is a JSON that has clientId
and subs
props, subs
equals to subscriptions
array.
@Injectable()
export class TestService {
constructor(@Inject(PigeonService) private readonly aedesService: PigeonService) {
}
@onSubscribe()
OnSubscribe(@Subscription() subscription, @Client() client) {
console.log("Function: @OnSubscribe()");
}
}
unsubscriptions
Array<string>
client
<Client>
Emitted when client
successfully unsubscribe the subscriptions
in server.
unsubscriptions
are an array of unsubscribed topics.
Server publishes a SYS topic $SYS/<aedes.id>/new/unsubscribers
to inform a client successfully unsubscribed to one or
more topics. The payload is a JSON that has clientId
and subs
props, subs
equals to unsubscriptions
array.
@Injectable()
export class TestService {
constructor(@Inject(PigeonService) private readonly aedesService: PigeonService) {
}
@onUnsubscribe()
OnUnsubscribe(@Subscription() subscription, @Client() client) {
console.log("Function: @OnUnsubscribe()");
}
}
Emitted when server sends an acknowledge to client
. Please refer to the MQTT specification for the explanation of
returnCode object property in CONNACK
.
@Injectable()
export class TestService {
constructor(@Inject(PigeonService) private readonly aedesService: PigeonService) {
}
@onConnackSent()
onConnackSent(@Client() client, @Packet() packet) {
console.log("Function: @onConnackSent()");
}
}
Emitted when server is closed.
@Injectable()
export class TestService {
constructor(@Inject(PigeonService) private readonly aedesService: PigeonService) {
}
@onClosed()
onClosed(@Client() client, @Packet() packet) {
console.log("Function: @onClosed()");
}
}
Method | Emitted When |
---|---|
Publish | Directly deliver packet on behalf of server to subscribed clients. |
Close | Close aedes server and disconnects all clients. |
packet
<object>
PUBLISH
Directly deliver packet
on behalf of server to subscribed clients.
Bypass authorizePublish
.
callback
will be invoked with error
arugments after finish.
@Injectable()
export class TestService {
//inject Pigeon Service
constructor(@Inject(PigeonService) private readonly pigeonService: PigeonService) {
}
@onPublish()
async OnPublish(@Topic() topic, @Packet() packet, @Payload() payload, @Client() client) {
//use this method to publish
await this.pigeonService.publish({
topic: "test2", qos: 0, cmd: "publish", payload: "", dup: false, retain: false
});
}
}
- callback:
<Function>
Close aedes server and disconnects all clients.
callback
will be invoked when server is closed.
@Injectable()
export class TestService {
//inject Pigeon Service
constructor(@Inject(PigeonService) private readonly pigeonService: PigeonService) {
}
@onPublish()
async OnPublish(@Topic() topic, @Packet() packet, @Payload() payload, @Client() client) {
//use this method to publish
await this.pigeonService.close();
}
}
This section describes the format of all packets
Packet | - |
---|---|
Connect | --- |
Connack | --- |
Subscribe | --- |
Suback | --- |
Unsubscribe | --- |
Unsuback | --- |
Publish | --- |
Puback | --- |
Pubrec | --- |
Pubrel | --- |
Pubcomp | --- |
Pingreq | --- |
Pingresp | --- |
Disconnect | --- |
{
cmd: 'connect',
protocolId: 'MQTT', // Or 'MQIsdp' in MQTT 3.1 and 5.0
protocolVersion: 4, // Or 3 in MQTT 3.1, or 5 in MQTT 5.0
clean: true, // Can also be false
clientId: 'my-device',
keepalive: 0, // Seconds which can be any positive number, with 0 as the default setting
username: 'matteo',
password: Buffer.from('collina'), // Passwords are buffers
will: {
topic: 'mydevice/status',
payload: Buffer.from('dead'), // Payloads are buffers
properties: { // MQTT 5.0
willDelayInterval: 1234,
payloadFormatIndicator: false,
messageExpiryInterval: 4321,
contentType: 'test',
responseTopic: 'topic',
correlationData: Buffer.from([1, 2, 3, 4]),
userProperties: {
'test': 'test'
}
}
},
properties: { // MQTT 5.0 properties
sessionExpiryInterval: 1234,
receiveMaximum: 432,
maximumPacketSize: 100,
topicAliasMaximum: 456,
requestResponseInformation: true,
requestProblemInformation: true,
userProperties: {
'test': 'test'
},
authenticationMethod: 'test',
authenticationData: Buffer.from([1, 2, 3, 4])
}
}
If password
or will.payload
are passed as strings, they will automatically be converted into a Buffer
.
{
cmd: 'connack',
returnCode: 0, // Or whatever else you see fit MQTT < 5.0
sessionPresent: false, // Can also be true.
reasonCode: 0, // reason code MQTT 5.0
properties: { // MQTT 5.0 properties
sessionExpiryInterval: 1234,
receiveMaximum: 432,
maximumQoS: 2,
retainAvailable: true,
maximumPacketSize: 100,
assignedClientIdentifier: 'test',
topicAliasMaximum: 456,
reasonString: 'test',
userProperties: {
'test': 'test'
},
wildcardSubscriptionAvailable: true,
subscriptionIdentifiersAvailable: true,
sharedSubscriptionAvailable: false,
serverKeepAlive: 1234,
responseInformation: 'test',
serverReference: 'test',
authenticationMethod: 'test',
authenticationData: Buffer.from([1, 2, 3, 4])
}
}
{
cmd: 'subscribe',
messageId: 42,
properties: { // MQTT 5.0 properties
subscriptionIdentifier: 145,
userProperties: {
test: 'test'
}
}
subscriptions: [{
topic: 'test',
qos: 0,
nl: false, // no Local MQTT 5.0 flag
rap: true, // Retain as Published MQTT 5.0 flag
rh: 1 // Retain Handling MQTT 5.0
}]
}
{
cmd: 'suback',
messageId: 42,
properties: { // MQTT 5.0 properties
reasonString: 'test',
userProperties: {
'test': 'test'
}
}
granted: [0, 1, 2, 128]
}
{
cmd: 'unsubscribe',
messageId: 42,
properties: { // MQTT 5.0 properties
userProperties: {
'test': 'test'
}
}
unsubscriptions: [
'test',
'a/topic'
]
}
{
cmd: 'unsuback',
messageId: 42,
properties: { // MQTT 5.0 properties
reasonString: 'test',
userProperties: {
'test': 'test'
}
}
}
{
cmd: 'publish',
messageId: 42,
qos: 2,
dup: false,
topic: 'test',
payload: Buffer.from('test'),
retain: false,
properties: { // optional properties MQTT 5.0
payloadFormatIndicator: true,
messageExpiryInterval: 4321,
topicAlias: 100,
responseTopic: 'topic',
correlationData: Buffer.from([1, 2, 3, 4]),
userProperties: {
'test': 'test'
},
subscriptionIdentifier: 120, // can be an Array in message from broker, if message included in few another subscriptions
contentType: 'test'
}
}
{
cmd: 'puback',
messageId: 42,
reasonCode: 16, // only for MQTT 5.0
properties: { // MQTT 5.0 properties
reasonString: 'test',
userProperties: {
'test': 'test'
}
}
}
{
cmd: 'pubrec',
messageId: 42,
reasonCode: 16, // only for MQTT 5.0
properties: { // properties MQTT 5.0
reasonString: 'test',
userProperties: {
'test': 'test'
}
}
}
{
cmd: 'pubrel',
messageId: 42,
reasonCode: 16, // only for MQTT 5.0
properties: { // properties MQTT 5.0
reasonString: 'test',
userProperties: {
'test': 'test'
}
}
}
{
cmd: 'pubcomp',
messageId: 42,
reasonCode: 16, // only for MQTT 5.0
properties: { // properties MQTT 5.0
reasonString: 'test',
userProperties: {
'test': 'test'
}
}
}
{
cmd: 'pingreq'
}
{
cmd: 'pingresp'
}
{
cmd: 'disconnect',
reasonCode: 0, // MQTT 5.0 code
properties: { // properties MQTT 5.0
sessionExpiryInterval: 145,
reasonString: 'test',
userProperties: {
'test': 'test'
},
serverReference: 'test'
}
}
- [aedes-persistence]: In-memory implementation of an Aedes persistence
- [aedes-persistence-mongodb]: MongoDB persistence for Aedes
- [aedes-persistence-redis]: Redis persistence for Aedes
- [aedes-persistence-level]: LevelDB persistence for Aedes
- [aedes-persistence-nedb]: NeDB persistence for Aedes
- [mqemitter]: An opinionated memory Message Queue with an emitter-style API
- [mqemitter-redis]: Redis-powered mqemitter
- [mqemitter-mongodb]: Mongodb based mqemitter
- [mqemitter-child-process]: Share the same mqemitter between a hierarchy of child processes
- [mqemitter-cs]: Expose a MQEmitter via a simple client/server protocol
- [mqemitter-p2p]: A P2P implementation of MQEmitter, based on HyperEmitter and a Merkle DAG
- [mqemitter-aerospike]: Aerospike mqemitter
- Author Twitter - @binarybeast
MIT License
Copyright (c) 2021 binaryb3ast
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.