Overview | Phone Setup | MQTT Server | MQTT to Kafka |
---|
OwnTracks for displaying participant progress and location - Kafka, KSQL, Kibana and MQTT based integration.
Race mapper for displaying participant progress and location - Kafka, KSQL, Kibana and MQTT based integration
- clone this repo!
- Having k8s cluster or deploy each component of project with docker
- enjoy :)
We use EMQX.
If you want to have a play with demonstration data (and not bother with phone setup and MQTT server) skip to the next section.
If you want to setup the entire project, you will need a to start with Phone Setup
- Kafka
- Kafka Connect
- KSQL
- Kafka UI
We use this config to start MQTT Connector:
curl -X POST -H "Content-Type: application/json" --data '{ "name": "mqtt", "config":
{
"connector.class" : "io.confluent.connect.mqtt.MqttSourceConnector",
"mqtt.server.uri" : "tcp://external.abriment.com:30733",
"mqtt.topics" : "owntracks/#",
"kafka.topic" : "data_mqtt",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"tasks.max" : "1",
"confluent.topic.bootstrap.servers" : "kafka-60046646:9092",
"confluent.topic.replication.factor" : "1"
}}' http://localhost:8083/connectors
The Kafka Connect MQTT connector is used to integrate with existing MQTT servers. The connector is compatible with Confluent Platform, version 4.1 and later. Prior versions do not work with this connector. The Kafka Connect MQTT Source Connector connects to a MQTT broker and subscribes to the specified topics. SSL is supported. For information on how to create SSL keys and certificates, see Creating SSL Keys and Certificates. For the relevant configuration properties, see the MQTT Source Connector configuration reference.
You can use this connector for a 30-day trial period without a license key.
First we setup ksqldb-server and ksqldb-cli then create streams and tables with KSQL:
$ ksql http://ksql-server:8088
create stream data_demo_stream (who varchar KEY, batt INTEGER, lon DOUBLE, lat DOUBLE, tst BIGINT, alt INTEGER, vel DOUBLE)
with (kafka_topic = 'data_mqtt', value_format='JSON', KEY_FORMAT = 'KAFKA');
CREATE table runner_status with (value_format='JSON') AS
select who
, min(vel) as min_speed
, max(vel) as max_speed
, min(GEO_DISTANCE(lat, lon, -33.87014, 151.211945, 'km')) as dist_to_finish
, count(*) as num_events
from data_demo_stream WINDOW TUMBLING (size 5 minute)
group by who;
create stream runner_location with (value_format='JSON') as
select who
, tst as event_time
, concat(concat(CAST(LAT AS STRING), ','), CAST(LON AS STRING)) as LOCATION
, alt
, batt
, vel
from data_demo_stream ;
First we setup elastic and kibana then create index template with kafkaconnect
name:
curl -XPUT "http://localhost:9200/_template/kafkaconnect/" -H 'Content-Type: application/json' -d'
{
"template": {
"settings": {
"index": {
"number_of_shards": "1",
"number_of_replicas": "0"
}
},
"mappings": {
"dynamic": "false",
"dynamic_templates": [],
"properties": {
"ALT": {
"type": "integer"
},
"BATT": {
"type": "integer"
},
"DIST_TO_FINISH": {
"type": "double"
},
"EVENT_TIME": {
"type": "date",
"format": "strict_date_optional_time||epoch_millis||epoch_second"
},
"LOCATION": {
"type": "geo_point",
"ignore_malformed": false,
"ignore_z_value": true
},
"MAX_SPEED": {
"type": "integer"
},
"MIN_SPEED": {
"type": "integer"
},
"NUM_EVENTS": {
"type": "integer"
},
"VEL": {
"type": "double"
}
}
},
"aliases": {}
}
}'
We also use Elasticsearch Sink connector to tranfer data with this config:
curl -X "POST" "localhost:8083/connectors/" \
-H "Content-Type: application/json" \
-d $'{
"name": "es_sink_runner_location",
"config": {
"schema.ignore": "true",
"topics": "RUNNER_LOCATION",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter.schemas.enable": false,
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"key.ignore": "true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"type.name": "kafkaconnect",
"topic.index.map": "RUNNER_LOCATION:runner_location",
"connection.url": "http://elastic-81592013:9200",
"transforms": "ExtractTimestamp",
"transforms.ExtractTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.ExtractTimestamp.timestamp.field" : "EVENT_TIME"
}
}'
curl -X "POST" "localhost:8083/connectors/" \
-H "Content-Type: application/json" \
-d $'{
"name": "es_sink_runner_status",
"config": {
"schema.ignore": "true",
"topics": "RUNNER_STATUS",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter.schemas.enable": false,
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"key.ignore": "true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"type.name": "kafkaconnect",
"topic.index.map": "RUNNER_STATUS:runner_status",
"connection.url": "http://elastic-81592013:9200",
"transforms": "ExtractTimestamp",
"transforms.ExtractTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.ExtractTimestamp.timestamp.field" : "EVENT_TIME"
}
}'
We can see indexes in kibana then for check data we should create index patterns (runner_location and runner_status) Also we can import dashboard in this way:
Management > Stack Management > Kibana > Saved Objects > Import
You can import Dashboard
You can see dashboard from Analytics > Dashboards
If you want to contribute to the OwnTracks, feel free to fork the repository and submit pull requests. We welcome any contributions, including bug fixes, feature enhancements, and documentation improvements.