Para este playground se ha instalado Kafka Connect en AKS
Confluent Operator
en versión: 0.921.20
helm repo update
helm upgrade --install confluent-operator confluentinc/confluent-for-kubernetes --namespace confluent
Kafka Connect
teniendo como backend un Confluent Cloud desplegado en la misma VNET linkada con la red Confluent.
Para Kafka Connect usaremos la versión 7.6.1
con init-container 2.8.2
Para autenticar con CCLOUD necesitaremos tener las API-KEYS adecuadas.
Para ello necesitamos crear los secretos adecuados:
Credenciales para Cluster Kafka
kubectl create secret generic ccloud-credentials --from-file=plain.txt=ccloud-credentials.txt -n confluent --dry-run=client -o yaml > ccloud-credentials.yaml
donde ccloud-credentials.txt
contiene:
username=<CCLOUD-API-KEY>
password=<CCLOUD-API-SECRET>
En caso de necesitar crear Topics desde el cluster AKS, necesitaremos una rest credential
kubectl create secret generic ccloud-rest-credentials --from-file=basic.txt=ccloud-credentials.txt --namespace confluent --dry-run=client -o yaml > ccloud-rest-credentials.yaml
Credenciales Schema Registry
kubectl create secret generic ccloud-sr-credentials --from-file=basic.txt=ccloud-sr-credentials.txt -n confluent --dry-run=client -o yaml > ccloud-sr-credentials.yaml
donde ccloud-credentials.txt
contiene:
username=<CCLOUD-API-KEY>
password=<CCLOUD-API-SECRET>
Esto generá los recursos yaml para crear los secretos en el namespace confluent
, por tanto solo nos queda aplicarlos:
kubectl apply -f ccloud-credentials.yaml
kubectl apply -f ccloud-sr-credentials.yaml
Usaremos el CRD de Connect
para deplegar nuestro cluster de connect, con varias cosas a tener en cuenta:
- Clausula
build
que instala el plugin para MQTT Source Connector en la propia instalación:
build:
type: onDemand
onDemand:
plugins:
locationType: confluentHub
confluentHub:
- name: kafka-connect-mqtt
owner: confluentinc
version: 1.7.0
- name: kafka-connect-datagen
owner: confluentinc
version: 0.4.0
- Argumento extra de JVM para dar permisos al usuario de los conectores a escribir (necesario para el MQTT Source Connector):
configOverrides:
jvm:
- "-Duser.dir=/tmp/"
Con esto tendremos nuestro Worker de Connect preparado para crear replicar datos a un topic.
Utilizaremos el CRD Connector
para crear el connector MQTT con las siguientes configuraciones:
key.converter: "org.apache.kafka.connect.storage.StringConverter"
value.converter: "org.apache.kafka.connect.converters.ByteArrayConverter"
mqtt.topics: "hello,bye"
kafka.topic: hello
mqtt.server.uri: "tcp://20.238.230.187:1883"
mqtt.qos: "0"
mqtt.clean.session.enabled: "false"
a destacar:
- Usamos String para deserializar la Key de los mensaje a producir
- ByteArray de modo que no convertimos el mensaje tal cual llega de MQTT
mqtt.topics
: lista de topics a leer del servidor MQTTkafka.topic
: Tópic Confluent donde volcaremos los datos de los topic MQTTmqtt.clean.session.enabled
: Deshabilitamos el cleanup de la session en MQTT para asegurar la semántica at-least-once
Usamos el CRD Topic para crear el Kafka Topic usado por el connector.
Instalamos el core de EMQX haciendo uso de su operador:
helm repo add emqx https://repos.emqx.io/charts
helm repo update
helm upgrade --install emqx-operator emqx/emqx-operator
--namespace emqx-operator-system
--create-namespace
Y un broker EMQ -> emqx-broker,yaml
Aplicamos el recurso hello-mqtt-connector.yaml
Usaremos el cliente mqttx
para producir mensajes dos topics mqtt hello
y bye
:
mqttx pub -t 'hello' -h 20.238.230.187 -p 1883 -m'{"greeting": "Hello World"}'
mqttx pub -t 'bye' -h 20.238.230.187 -p 1883 -m '{"greeting": "Bye bye World"}'
Usaremos el cliente confluent desplegado con el recurso confluent-cli.yaml
para ver los mensajes producidos al topic hello
:
confluent login --save
confluent kafka topic consume hello --cluster lkc-m5ogd2 --api-key CIJWRHA2XP2VRTI4 --api-secret yGo0hM1pmpm84AMfmvvuHWvIAAeCT3JN11bx6G0deBUC6j1WrGb1+6PRCmOAfPV8 -b
Starting Kafka Consumer. Use Ctrl-C to exit.
{"greeting": "Bye bye World"}
% Headers: [mqtt.message.id="0" mqtt.qos="0" mqtt.retained="false" mqtt.duplicate="false"]
{"greeting": "Bye bye World"}
% Headers: [mqtt.message.id="0" mqtt.qos="0" mqtt.retained="false" mqtt.duplicate="false"]
{"greeting": "Bye bye World"}
% Headers: [mqtt.message.id="0" mqtt.qos="0" mqtt.retained="false" mqtt.duplicate="false"]
{"greeting": "Hello World"}
% Headers: [mqtt.message.id="0" mqtt.qos="0" mqtt.retained="false" mqtt.duplicate="false"]
{"greeting": "Hello World"}
% Headers: [mqtt.message.id="0" mqtt.qos="0" mqtt.retained="false" mqtt.duplicate="false"]
{"greeting": "Hello World"}
% Headers: [mqtt.message.id="0" mqtt.qos="0" mqtt.retained="false" mqtt.duplicate="false"]
{"greeting": "Bye bye World"}
% Headers: [mqtt.message.id="0" mqtt.qos="0" mqtt.retained="false" mqtt.duplicate="false"]
{"greeting": "Hello World"}
% Headers: [mqtt.message.id="0" mqtt.qos="0" mqtt.retained="false" mqtt.duplicate="false"]
La carpeta mqtt-cflt-avro-record
contiene un proyecto Java en el que hacemos uso del KafkaAvroSerializer
configurado con los datos de acceso del cluster de Schema Registry en Confluent Cloud para serializar los mensajes que enviamos a EMQX
siendo compatibles con la serialización confluent y por tanto usables desde el conector MQTT.
Como primer paso generamos una clase AVRO Message usando el Avro Maven Plugin
esta clase se genera en tiempo de construcción del proyecto maven.
El código de serialización se encuentra en la clase AvroMessageSerializer
.
La clase MqttAvroRecord
genera un mensaje aleatorio usando el POJO Avro generado con el plugin y lo publica en un topic MQTT
Lo siguiente es configurar nuestro conector:
apiVersion: platform.confluent.io/v1beta1
kind: Connector
metadata:
name: mqtt-avro
namespace: confluent
spec:
class: "io.confluent.connect.mqtt.MqttSourceConnector"
taskMax: 1
connectClusterRef:
name: connect
configs:
key.converter: "org.apache.kafka.connect.storage.StringConverter"
value.converter: "org.apache.kafka.connect.converters.ByteArrayConverter"
mqtt.topics: "cflt-record"
kafka.topic: "mqtt-avro"
mqtt.server.uri: "tcp://20.238.230.187:1883"
mqtt.qos: "0"
mqtt.clean.session.enabled: "false"
Es importante destacar que al escribir el mensaje ya serializado y en bytearray directamente en el topic mqtt y no existir soporte de esquema en el, el conector no hará ninguna conversion (de ahi el uso del ByteArrayConverter).
Podemos usar el confluent cli
para consumir los mensajes del topic destino ya en avro:
confluent-cli-fc9f87dc6-zncfc:/$ confluent kafka topic consume mqtt-avro --cluster lkc-m5ogd2 --api-key CIJWRHA2XP2VRTI4 --api-secret yGo0hM1pmpm84AMfmvvuHWvIAAeCT3JN11bx6G0deBUC6j1WrGb1+6PRCmOAfPV8 --value-format avro --schema-registry-endpoint https://psrc-4kk0p.westeurope.azure.confluent.cloud --schema-registry-api-key 67HOWLYGDRHBQXP4 --schema-registry-api-secret gJuk7OGlz3Kf+ugrjScjrB2VyYe7nVRVYlo/YPYfZuzfGw+wAR59PuVirndcGGL2 -b
confluent-cli-fc9f87dc6-zncfc:/$ confluent login --save
Logged in as "ogomezsoriano+svcs-cemea@confluent.io" for organization "c9bc4776-9ab9-473b-8b16-c76108251133" ("Confluent PS Service CEMEA").
confluent-cli-fc9f87dc6-zncfc:/$ confluent kafka topic consume mqtt-avro --cluster lkc-m5ogd2 --api-key CIJWRHA2XP2VRTI4 --api-secret yGo0hM1pmpm84AMfmvvuHWvIAAeCT3JN11bx6G0deBUC6j1WrGb1+6PRCmOAfPV8 --value-format avro --schema-registry-endpoint https://psrc-4kk0p.westeurope.azure.confluent.cloud --schema-registry-api-key 67HOWLYGDRHBQXP4 --schema-registry-api-secret gJuk7OGlz3Kf+ugrjScjrB2VyYe7nVRVYlo/YPYfZuzfGw+wAR59PuVirndcGGL2 -b
Starting Kafka Consumer. Use Ctrl-C to exit.
{"station":"station","time":1718889958926,"temp":1}
% Headers: [mqtt.message.id="0" mqtt.qos="0" mqtt.retained="true" mqtt.duplicate="false"]
{"station":"station","time":1718897073658,"temp":1}
% Headers: [mqtt.message.id="0" mqtt.qos="0" mqtt.retained="false" mqtt.duplicate="false"]
{"temp":7,"station":"station9","time":920}
% Headers: [mqtt.message.id="0" mqtt.qos="0" mqtt.retained="false" mqtt.duplicate="false"]
El objetivo de la funcionalidad es que el conector MQTT sea capaz de producir cada mensaje MQTT en el topic Kafka adecuado usando para ello el contenido de la cabecera topicName
que contiene el nombre del topic kafka destino.
Para ello usaremos la SMT io.confluent.connect.transforms.ExtractTopic$Header
.
Por tanto necesitaremos instalar el plugin connect-transforms
, añadiendolo a la cláusula build del CR de connect:
build:
type: onDemand
onDemand:
plugins:
locationType: confluentHub
confluentHub:
- name: kafka-connect-mqtt
owner: confluentinc
version: 1.7.0
- name: kafka-connect-datagen
owner: confluentinc
version: 0.4.0
- name: connect-transforms
owner: confluentinc
version: latest
A partir de aquí podemos usarlo facilmente en la configuración de nuestro conector:
---
apiVersion: platform.confluent.io/v1beta1
kind: Connector
metadata:
name: mqtt-avro
namespace: confluent
spec:
class: "io.confluent.connect.mqtt.MqttSourceConnector"
taskMax: 1
connectClusterRef:
name: connect
configs:
key.converter: "org.apache.kafka.connect.storage.StringConverter"
value.converter: "org.apache.kafka.connect.converters.ByteArrayConverter"
mqtt.topics: "cflt-record"
kafka.topic: "best-topic-ever"
mqtt.server.uri: "tcp://20.238.230.187:1883"
mqtt.qos: "0"
mqtt.clean.session.enabled: "false"
transforms: "TopicName"
transforms.TopicName.type: "io.confluent.connect.transforms.ExtractTopic$Header"
transforms.TopicName.field: "topicName"