# rocketmq-connect-doris ``` 注: 目前支持将根据mysql ,openMLDB对应的source connect以流式的方式导入到rocketmq中的数据导入到doris ``` ## rocketmq-connect-doirs 打包 ``` mvn clean package -Dmaven.test.skip=true ``` ## rocketmq-connect-doris 启动 * **jdbc-source-connector** 启动 ``` POST http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-jdbc-source-connector-name} { "connector.class":"org.apache.rocketmq.connect.doris.connector.DorisSinkConnector", "max-task":"1", "tableName":"doris_test_sink", "connect.topicname":"doris_test_sink", "connect.topicnames":"doris_test_sink", "host":"xx.xx.xx.xx", "port":"7030", "user":"***", "passwd":"***", "database":"dbname", "insert.mode":"INSERT", "db.timezone":"UTC", "table.types":"TABLE", "auto.create":"true", "source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter", "key-converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter", "value-converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter"} } ``` * **doris-sink-connector** 启动 ``` POST http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-jdbc-sink-connector-name} { "connector-class":"org.apache.rocketmq.connect.doris.connector.DorisSinkConnector", "max-task":"1", "connect-topicname":"topicname", "host":"localhost//127.0.0.1", "port":"3306", "user":"*****", "passwd":"*****", "database":"database", "insert.mode":"INSERT", "db.timezone":"UTC", "table.types":"TABLE", "auto.create":"true", "source-record-converter":"org.apache.rocketmq.connect.doris.converter.JsonConverter", "key-converter":"org.apache.rocketmq.connect.doris.converter.record.json.JsonConverter", "value-converter":"org.apache.rocketmq.connect.doris.converter.record.json.JsonConverter" } ``` > **注:** `rocketmq-jdbc-connect` 的启动依赖于`rocketmq-connect-runtime`项目的启动,需将打好的所有`jar`包放置到`runtime`项目中`pluginPaths`配置的路径后再执行上面的启动请求,该值配置在`runtime`项目下的`connect.conf`文件中 ## rocketmq-connect-jdbc 停止 ``` http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-jdbc-connector-name}/stop ``` ## rocketmq-connect-jdbc 参数说明 * **jdbc-source-connector 参数说明** | KEY | TYPE | Must be filled | Description| Example |------------------------|----|---------|---------------|------------------| |connection.url | String | YES | source端 jdbc连接 | jdbc:mysql://XXXXXXXXX:3306| |connection.user | String | YES | source端 DB 用户名 | root | |connection.password | String | YES | source端 DB 密码 | root | |connection.attempts | String | YES | source端 DB连接重试次数 | 3 | |connection.backoff.ms | Long | YES | | |poll.interval.ms | Long | YES |拉取间隔时间 | 3000ms | |batch.max.rows | Integer | NO |每次拉取数量 | 300 | |mode | Integer | NO |拉取模式 | bulk、timestamp、incrementing、timestamp+incrementing | |incrementing.column.name | Integer | NO |增量字段,常用ID | id | |timestamp.column.name | String | YES |时间增量字段 | modified_time | |table.whitelist | String | YES |需要扫描的表 | db.table,db.table01 | |max-task | Integer | YES |任务数量,最大不能大于表的数量 | 2 | |source-record-converter | Integer | YES |data转换器 | org.apache.rocketmq.connect.doris.converter.JsonConverter | ``` 注:1.source拉取的数据写入到以表名自动创建的topic中,如果需要写入特定的topic中则需要指定"connect-topicname" 参数 2.topic.prefix参数可以为自动创建的topic增加前缀,用来进行逻辑的隔离 ``` * **jdbc-sink-connector 参数说明** | KEY | TYPE | Must be filled | Description| Example |------------------------|----|---------|---------------|------------------| |connection.url | String | YES | sink端 jdbc连接 | jdbc:mysql://XXXXXXXXX:3306| |connection.user | String | YES | sink端 DB 用户名 | root | |connection.password | String | YES | sink端 DB 密码 | root | |host | String | YES |doris host | 192.168.0.1 | |port | String | YES |doris http port | 8030 | |user | String | YES |监听的topic | root | |passwd | String | YES |监听的topic | passwd | |max-task | Integer | NO |任务数量 | 2 | |source-record-converter | Integer | YES |data转换器 | org.apache.rocketmq.connect.doris.converter.JsonConverter | ``` 注: openMLDB maven包的引入: ---------MacOS 系统下运行------------- <dependency> <groupId>com.4paradigm.openmldb</groupId> <artifactId>openmldb-native</artifactId> <version>0.5.0-macos</version> </dependency> <dependency> <groupId>com.4paradigm.openmldb</groupId> <artifactId>openmldb-jdbc</artifactId> <version>0.5.0</version> <exclusions> <exclusion> <groupId>com.4paradigm.openmldb</groupId> <artifactId>openmldb-native</artifactId> </exclusion> </exclusions> </dependency> ---------Linux 系统下运行------------- <dependency> <groupId>com.4paradigm.openmldb</groupId> <artifactId>openmldb-jdbc</artifactId> <version>0.5.0</version> </dependency> ```