MQTT bridge for Apache Kafka®
This project provides a software component which acts as a bridge between MQTT 3.1.1 and an Apache Kafka® cluster. It enables the one-way communication from MQTT to Kafka, allowing MQTT clients to send data to an Apache Kafka cluster. MQTT subscriptions to read data from the Apache Kafka brokers is out of scope.
Running the bridge
On bare-metal / VM
Download the ZIP or TAR.GZ file from the GitHub release page and unpack it.
Afterwards, edit the config/application.properties
file which contains the configuration and the config/topic-mapping-rules.json
with the topics' mapping rules.
Once your configuration is ready, start the bridge using:
bin/mqtt_bridge_run.sh --config-file config/application.properties --mapping-rules config/topic-mapping-rules.json
On Kubernetes and OpenShift
Download the ZIP or TAR.GZ file from the GitHub release page and unpack it.
The MQTT Bridge is deployed using a Kubernetes Deployment
, and it is configured using a ConfigMap
.
The ConfigMap
contains the configuration and the topics' mapping rules files.
The files under the install
directory are used to deploy the MQTT Bridge on Kubernetes.
To deploy the MQTT Bridge use the following command:
kubectl apply -f ./install
Bridge Overview
How it works
To enable a seamless integration between MQTT and Kafka, the MQTT Bridge provides a way to map MQTT topics to Kafka topics. This mapping is done using a set of predefined patterns and Kafka topic templates. The MQTT Bridge uses these patterns to map MQTT topics to Kafka topics. As a part of the bridge, a Kafka producer will be responsible for producing messages from the MQTT clients to the Kafka Cluster.
Topic Mapping Rules (ToMaR)
The ToMaR is a set of patterns the user provides defining how the MQTT Bridge maps MQTT topic names to Kafka topic names.
A Mapping Rule is a model that contains an MQTT topic pattern
, a Kafka topic template
, and optionally a Kafka record key
.
All the incoming MQTT message's topic should match an MQTT topic pattern
in the ToMaR so that the bridge knows in which Kafka topic to produce this message.
This Kafka topic is defined by a template, Kafka Topic template
.
However, if the incoming MQTT message's topic does not match any pattern in the ToMaR, the Bridge uses a default Kafka topic where the incoming message will be mapped to.
This Kafka topic is configurable but if it is not specified, the bridge uses the messages_default
topic name.
The optional Kafka record key
is used to define the key of the Kafka record that will be produced to the Kafka topic.
It is defined by a template as well, and its default value is null
.
A valid ToMaR is a JSON file that contains an array of mapping rules.
Each mapping rule is a JSON object that contains two mandatory properties, mqttTopic
, kafkaTopic
, and one optional kafkaKey
.
The following is an example of a valid ToMaR:
[
{
"mqttTopic": "building/(\\w+)/room/(\\d{1,4})/.*",
"kafkaTopic": "building_$1",
"kafkaKey": "room_$2"
},
{
"mqttTopic": "sensors/([^/]+)/data",
"kafkaTopic": "sensor_data"
},
{
"mqttTopic": "sensors.*",
"kafkaTopic": "sensor_others"
},
{
"mqttTopic": "devices/([^/]+)/data/(\b(all|new|old)\b)",
"kafkaTopic": "device_$1_data",
"kafkaKey": "device_$2"
},
{
"mqttTopic": "locations/([^/]+)(?:\\/.*)?$",
"kafkaTopic": "locations",
"kafkaKey": "location_$1"
}
]
- The expressions
.*
and(?:\\/.*)?$
are used to represent the wildcard#
, which in turn represents one or more levels in the MQTT topic hierarchy. However, there are some cases that can lead to unexpected behavior when using the these wildcards interchangeably . Therefore, you should note the following:- You cannot use the
.*
wildcard in capturing groups. For example, the patternbuilding/(\\w+)/room/(\\d{1,4})/(.*)
is invalid because it will capture the whole subtopic of the pattern, and$3
placeholder might include/
characters, which will lead to an invalid Kafka topic name. - You can use the
.*
wildcard without a preceding/
character, but it is not equivalent to when it is preceded by/
. For example, the patternbuilding.*
will matchbuilding
,building/room
,buildingroom
, and so on. On the other hand, the patternbuilding/.*
will matchbuilding/room
,building/floor/room
, and so on, but notbuildingroom
andbuilding/
. - You can also use the
(?:\\/.*)?$
wildcard to match the whole subtopic level of the pattern. For example, the patternlocations/([^/]+)(?:\\/.*)?$
will matchlocations/city/luanda/angola
,locations/city
,locations/city/
, and so on. It's literally equivalent tolocations/+/#
.
Please note that the(?:\\/.*)?$
wildcard is a non-capturing group, which means that it will not be used to replace the placeholders in thekafkaTopic
andkafkaKey
templates. - The
(?:\\/.*)?$
wildcard is different from the.*
wildcard. For example, the patternsensors.*
will match everything aftersensors
, seperated with a slash or not. For example,sensors
,sensors/
,sensorsdata
, and so on. On the other hand, the patternsensors(?:\\/.*)?$
will matchsensors
,sensors/
,sensors/data
, and so on, but notsensorsdata
.
- You cannot use the
- The expression
([^/]+)
is used to represent the wildcard+
, which in turn represents a single level in the MQTT topic hierarchy. It worth's mentioning that it is the user's responsibility to adhere to the MQTT 3.1.1 naming conventions when defining the MQTT topic patterns.
Placeholders in the kafkaTopic
and kafkaKey
templates are defined using the $
character followed by a number.
The number represents the index of the capturing group
in the mqttTopic
pattern. Please note that the index starts from 1 and not 0
.
The MQTT Bridge uses the capturing groups
in the mqttTopic
pattern to positionally extract the values that will be used to replace the placeholders in the kafkaTopic
and kafkaKey
templates.
Let's go through each rule in the above example to understand how the MQTT Bridge uses these rules to map MQTT topics to Kafka topics:
-
MQTT Topic:
building/(\\w+)/room/(\\d{1,4}).*
-> Kafka Topic:building_$1
with Kafka Key:room_$2
This rule maps MQTT topics of the form
building/{some word}/room/{some number with 4 digits}/#
to the Kafka topicbuilding_$1
. For example, if the MQTT topic isbuilding/A/room/1003/floor/2
it will be mapped to the Kafka topicbuilding_A
with the keyroom_1003
. -
MQTT Topic:
sensors/([^/]+)/data
-> Kafka Topic:sensor_data
with Kafka Key:null
This rule maps MQTT topics of the form
sensors/+/data
to the Kafka topicsensor_data
. For example, if the MQTT topic issensors/temperature/data
, it will be mapped to the Kafka topicsensor_data
. Because thekafkaKey
is not defined, the key of the Kafka record will benull
. -
MQTT Topic:
sensors.*
-> Kafka Topic:sensor_others
with Kafka Key:null
This rule maps any MQTT topic starting with
sensors/
followed by any number of levels in the hierarchy to the Kafka topicsensor_others
. For example, if the MQTT topic issensors/temperature/living-room
, it will be mapped to the Kafka topicsensor_others
with the keysensor
. Because thekafkaKey
is not defined, the key of the Kafka record will benull
. -
MQTT Topic:
devices/([^/]+)/data/(\b(all|new|old)\b)
-> Kafka Topic:device_$1_data
with Kafka Key:device_$2
This rule maps MQTT topics of the form
devices/{some word}/data/{either all, new or old}
to the Kafka topicdevice_$1_data
. For example, if the MQTT topic isdevices/thermostat/data/all
, it will be mapped to the Kafka topicdevice_thermostat_data
with the keydevice_all
. This example also shows the advantage of using regex in themqttTopic
pattern. The last capturing group in themqttTopic
should be a word boundary\b
followed by eitherall
,new
orold
and anything else will not match the pattern.
The order in which the rules are defined is important.
The MQTT Bridge will use the first rule that matches the MQTT topic.
For example, if the MQTT topic is sensors/temperature/data
, it will be mapped to the Kafka topic sensor_data
because sensors/([^/]+)/data
matches the MQTT topic before sensors/#
.
If we swap the positions of the rules, the MQTT Bridge would use the sensors.*
to map the MQTT topic to the Kafka topic sensor_others
.
Bridge Configuration
The user can configure the MQTT Bridge using an application.properties
file.
This section describes the configuration properties that can be used to configure the MQTT Bridge.
The MQTT bridge can be configured using the appropriate prefix.
Following the prefixes for the specific configurations:
bridge.
is the prefix used for general configuration of the Bridge.mqtt.
is the prefix used for MQTT configuration of the Bridge.kafka.
is the prefix used for Kafka configurations of the Bridge.
A valid configuration file should look like this:
# Bridge configuration
bridge.id=my-bridge
bridge.topic.default=default_topic
# MQTT configuration
mqtt.host=localhost
mqtt.port=1883
# Kafka configuration
kafka.bootstrap.servers=localhost:9092
The following table describes the configuration properties defined above.
Setting | Description | Default |
---|---|---|
bridge.id | ID of the bridge | null/undefined |
bridge.topic.default | Topic to be used if no matches with any mapping rules | messages_default |
mqtt.host | Host address of the MQTT server | localhost |
mqtt.port | Port number of the MQTT server | 1883 |
kafka.bootstrap.servers | Bootstrap servers for Apache Kafka | localhost:9092 |
kafka.producer.* | Any Kafka producer configuration (i.e. acks, linger.ms, ...) | Kafka producer defaults |
Other than the above properties, the user can also configure the bridge using environment variables.
Contributing
You can contribute by:
- Raising any issues you find using Strimzi MQTT Bridge
- Fixing issues by opening Pull Requests
- Improving documentation
- Talking about Strimzi Kafka Bridge
All bugs, tasks or enhancements are tracked as GitHub issues. Issues which might be a good start for new contributors are marked with "good-start" label.
The Building Strimzi MQTT Bridge guide describes how to build Strimzi MQTT Bridge and how to test your changes before submitting a patch or opening a PR.
If you want to get in touch with us first before contributing, you can use:
Learn more on how you can contribute on our Join Us page.
License
Strimzi Kafka Bridge is licensed under the Apache License, Version 2.0