/emqx_plugin_kafka_1

EMQX Kafka Plugin, 基于EMQX v5.4.0版本,支持转发Kafka设置SASL/PLAIN模式

Primary LanguageErlangApache License 2.0Apache-2.0

emqx_plugin_kafka

Kafka plugin for EMQX V5.4 版本,支持EMQX转Kafka配置SASL/PLAIN模式。

Usage

Build the EMQX broker

  • 本文中内容,在操作系统 RockyLinux 8.5 和 RockyLinux 9.2 中,已经分别验证过; 理论上兼容RHEL8 和 RHEL9的操作系统,都是可以。

  • 在服务器上,先安装相关依赖组件,通过dnf方式安装,如下:

dnf -y install gcc gcc-c++ cpp glibc  glibc-devel glibc-headers kernel-devel kernel-headers cmake make m4 ncurses ncurses-devel openssl openssl-devel openssl-libs zlib zlib-devel libselinux-devel xmlto perl git wget zip unzip gtk2-devel binutils-devel unixODBC libtool wxWidgets bzip2 binutils-devel  
  • 在服务器上,安装Erlang/OTP ( emqx v5.4 安装 Erlang/OTP 的版本 25.3.2.10 )
下载地址
https://www.erlang.org/patches/otp-25.3.2.10 

解压 Erlang
tar xvf  otp_src_25.3.2.10.tar.gz
 
进入解压目录
cd otp_src_25.3.2.10

编译  
./configure --prefix=/usr/local/erlang --with-ssl --enable-threads --enable-smp-support --enable-kernel-poll --enable-hipe --without-javac

编译并安装
make && make install

修改环境变量
vim /etc/profile
 
将以下内容存储至profile文件中,保存并退出
export ERLPATH=/usr/local/erlang
export PATH=$ERLPATH/bin:$PATH
 
使环境变量刷新并生效
source /etc/profile
 
验证erlang是否安装成功
erl

  • 在服务器上,安装 Rebar3 ( rebar3 的版本 3.22.0 )
下载地址:
https://github.com/erlang/rebar3/archive/refs/tags/3.22.0.tar.gz 

解压 Rebar3:  
tar -xvf  rebar3-3.22.0.tar.gz 

进入解压目录
cd rebar3-3.22.0

修改rebar3中 rebar.config 文件,  在文件末尾添加 {plugins, [rebar3_hex]}.  

编译
./bootstrap
./rebar3 local install

添加到PATH,修改环境变量
vi ~/.bashrc

将以下内容存储至文件中,保存并退出
export PATH=$PATH:~/.cache/rebar3/bin

使环境变量刷新并生效
source ~/.bashrc   

验证版本 
rebar3 -v
rebar3 hex
  • 编译EMQX Broker
拷贝源码:  git clone -b v5.4.0  https://github.com/emqx/emqx.git emqx-v5.4.0
编译执行:  export BUILD_WITHOUT_QUIC=1;make

下载EMQX的Kafka插件源码并编译

> 下载Kafka插件的源码: git clone https://github.com/caijinpeng1113/emqx_plugin_kafka.git
> 进入目录: cd emqx_plugin_kafka
> 执行编译Kafka插件的命令: make rel

在对应的源码目录下,生成编译后插件包,如下:
_build/default/emqx_plugrel/emqx_plugin_kafka-<vsn>.tar.gz

启动EMQX并配置Kafka插件

  • 启动EMQX服务,然后将编译后emqx_plugin_kafka插件包,通过EMQX的插件管理页面,进行安装Kafka插件。(先不要启动Kafka插件)
  • 在EMQX服务中,检查 emqx-v5.4.0/_build/emqx/rel/emqx/etc/ 目录下,是否存在 emqx_plugin_kafka.hocon 文件(如果此文件不存在,需要新建),配置文件内容如下,然后进行再启动Kafka插件。 (注意: 检查Kafka的地址和用户密码是否正确,相关topic是否已创建,如:mqtt_data和emqx_test )
plugin_kafka {
  // required
  connection {
    // Kafka address.
    bootstrap_hosts = ["10.3.64.220:9192", "10.3.64.221:9292", "10.3.64.222:9392"]

    // enum: per_partition | per_broker
    // optional   default:per_partition
    connection_strategy = per_partition
    // optional   default:5s
    min_metadata_refresh_interval = 5s

    sasl {
      // enum:  plain | scram_sha_256 | scram_sha_512
      mechanism = plain
      username = "admin"
      password = "admin"
    }
    ssl {
      enable = false
    }
  }

  // optional
  producer {
    // Most number of bytes to collect into a produce request.
    // optional   default:896KB
    max_batch_bytes = 896KB
    // enum:  no_compression | snappy | gzip
    // optional   default:no_compression
    compression = no_compression
    // enum:  random | roundrobin | first_key_dispatch
    // optional   default:random
    partition_strategy = random

    // enum:  plain | base64
    encode_payload_type = plain
  }

  // create kafka topic [mqtt_data] and [emqx_test]
  hooks = [
    {endpoint = client.connect}
    , {endpoint = client.connack}
    , {endpoint = client.connected, kafka_topic = mqtt_data}
    , {endpoint = client.disconnected, kafka_topic = mqtt_data}
    , {endpoint = client.authenticate}
    , {endpoint = client.authorize}
    , {endpoint = client.authenticate}
    , {endpoint = client.check_authz_complete}
    , {endpoint = session.created}
    , {endpoint = session.subscribed}
    , {endpoint = session.unsubscribed}
    , {endpoint = session.resumed}
    , {endpoint = session.discarded}
    , {endpoint = session.takenover}
    , {endpoint = session.terminated}
    , {endpoint = message.publish, kafka_topic = mqtt_data,  filter = "sys/#"}
    , {endpoint = message.delivered, kafka_topic = mqtt_data, filter = "sys/#"}
    , {endpoint = message.acked, filter = "sys/#"}
    , {endpoint = message.dropped, filter = "sys/#"}
  ]
}

Some examples in the directory priv/example/.

Hook Point

endpoint filter
client.connect /
client.connack /
client.connected /
client.disconnected /
client.authenticate /
client.authorize /
client.authenticate /
client.check_authz_complete /
session.created /
session.subscribed /
session.unsubscribed /
session.resumed /
session.discarded /
session.takenover /
session.terminated /
message.publish required
message.delivered required
message.acked required
message.dropped required

Path

  • Default path: emqx/etc/emqx_plugin_kafka.hocon
  • Attach to path: set system environment variables export EMQX_PLUGIN_KAFKA_CONF="absolute_path"

1111111