- TableDataSource:DataSet类型的数据
- ReadTableDataSet:读取DataSet类型的数据
- WriteTableDataSet:写出DataSet类型的数据
- FileDataSource:文件类型的数据
- HashMapFileData: 文件类型的数据内容为HashMap类型的
- ReadHashMapFileData:读取内容为HashMap类型的文件类型的数据
功能介绍:实现ReadHashMapFileData接口,可通过http方式获取PKG包
- json方式使用:
"httpPkgFileEx": {
"data_id": "pkg_http_file",
"username": "restUser",
"password": "8ang$un",
"httpUrl": "http://xx.xx.xxx.xxx/adm/example/inputs/"
}
- yaml方式使用:
httpPkgFileEx:
data_id: pkg_http_file
username: restUser
password: 8ang$un
httpUrl: http://xx.xx.xxx.xxx/adm/example/inputs/
功能介绍:实现WriteTableDataSet接口,可以将dataset数据写出到jdbc类型数据库中;
- json方式使用:
"outputDataExample": {
"data_id": "jdbc_table",
"dburl": "jdbc:mysql://10.100.2.202:3306/sdm_7190?useSSL=false;useUnicode=true;characterEncoding=UTF-8",
"dbname": "dom230",
"dbpassword": "bangsun",
"dbdriver": "com.mysql.cj.jdbc.Driver",
"dbtablename": "/user/example/outputs/"
}
- yaml方式使用:
outputDataExample:
data_id: jdbc_table
dburl: jdbc:mysql://10.100.2.202:3306/sdm_7190?useSSL=false;useUnicode=true;characterEncoding=UTF-8
dbname: dom230
dbpassword: bangsun
dbdriver: com.mysql.cj.jdbc.Driver
dbtablename: /user/example/outputs/
功能介绍:实现WriteTableDataSet接口,可以将dataset数据推送到Kafka中;
- json方式使用:
"outputDataExample": {
"data_id": "kafka_data",
"kafkaServers": "xx.xx.xx.xx:9092",
"kafkaTopic": "kafka_test_topic",
"options": ""
}
- yaml方式使用:
outputDataExample:
data_id: kafka_data
kafkaServers: xx.xx.xx.xx:9092
kafkaTopic: kafka_test_topic
options:
功能介绍:实现ReadTableDataSet,WriteTableDataSet接口,可以从内存中读取DataSet数据和临时存储,多个组件组合运行时可以用该方式传递数据集;
- json方式使用:
"inputMemoryCacheDataSetEx": {
"data_id": "memory_cache",
"keyId": "memory_data_key"
},
"outputMemoryCacheDataSetEx": {
"data_id": "memory_cache",
"keyId": "memory_data_key"
}
- yaml方式使用:
inputDataExample:
data_id: memory_data
keyId: memory_data_key
outputDataExample:
data_id: memory_data
keyId: memory_data_key
功能介绍:实现ReadTableDataSet,WriteTableDataSet接口,可以从Hdfs路径读取parquet数据,也可将dataset存到hdfs中;
- json方式使用:
"inputDataExample": {
"data_id": "parquet_table",
"path": "/user/example/inputs/"
},
"outputDataExample": {
"data_id": "parquet_table",
"path": "/user/example/outputs/"
}
- yaml方式使用:
inputDataExample:
data_id: parquet_table
path: /user/example/inputs/
outputDataExample:
data_id: parquet_table
path: /user/example/inputs/
功能介绍:实现ReadTableDataSet,WriteTableDataSet接口,可以从Hdfs路径读取parquet数据,也可将dataset存到hdfs中;
- json方式使用:
"inputDataExample": {
"data_id": "partition_parquet_table",
"path": "/user/example/inputs/",
"partitionColumn":["event_code","biz_data"]
},
"outputDataExample": {
"data_id": "partition_parquet_table",
"path": "/user/example/outputs/",
"partitionColumn":["event_code","biz_data"]
}
- yaml方式使用:
inputDataExample:
data_id: partition_parquet_table
path: /user/example/inputs/
partitionColumn:
- event_code
- biz_data
outputDataExample:
data_id: partition_parquet_table
path: /user/example/inputs/
partitionColumn:
- event_code
- biz_data
功能介绍:实现ReadHashMapFileData接口,可直接读取hdfs路径中的PKG包
- json方式使用:
"pathPkgFileEx": {
"data_id": "pkg_path_file",
"path": "/user/example/inputs/pkg"
}
- yaml方式使用:
httpPkgFileEx:
data_id: pkg_path_file
path: /user/example/inputs/pkg
功能介绍: 支持多个组件串行运行,通过组件之间的组合实现目标的功能;
- json方式使用:
{
"cmpts": [
{
"inputTableDataSet": {
"data_id": "parquet_path_table",
"path": "/user/dom/3000/input"
},
"cmpt_id": "data_migration",
"outputDataSet": {
"data_id": "memory_cache",
"keyId": "memory_data_set"
}
},
{
"inputTableDataSet": {
"data_id": "memory_cache",
"keyId": "memory_data_set"
},
"cmpt_id": "data_migration",
"outputDataSet": {
"data_id": "parquet_path_table",
"path": "/user/dom/3000/output"
}
}
],
"cmpt_id": "chain_cmpt"
}
- yaml方式使用:
rulebacktestingtask:
cmpt_id: chain_cmpt
cmpts:
- cmpt_id: data_migration #第一个组件ID
inputTableDataSet:
data_id: parquet_path_table
path: /user/dom/3000/input #从该路径读出parquet数据
outputDataSet:
data_id: memory_cache # //建议存放在内存中,便于下个组件使用;
keyId: memory_data_set #//可自定义名,保持唯一即可
- cmpt_id: data_migration #第二个组件ID
inputTableDataSet:
data_id: memory_cache
keyId: memory_data_set #与上一个组件的输出保持一致,即可取出上一个组件存放的数据
outputDataSet:
data_id: parquet_path_table
path: /user/dom/3000/output #数据存放hdfs路径