/sqlSubmit

基于 Flink 的 sqlSubmit 程序

Primary LanguageJavaGNU General Public License v3.0GPL-3.0

sqlSubmit

SQL submission program base on flink

Now just support flink 1.16.0

Learn more about Flink https://flink.apache.org/

Derived from Jark's blog http://wuchong.me/blog/2019/09/02/flink-sql-1-9-read-from-kafka-write-into-mysql/

Features

  • submit flink sql to cluster

Example

SQL file demo.sql like :

-- parse set statement as table config
set pipeline.name = demo_sql_job;
set table.exec.resource.default-parallelism = 1;
-- source
CREATE TABLE user_log (
    user_id VARCHAR
    ,item_id VARCHAR
    ,category_id VARCHAR
    ,behavior VARCHAR
    ,ts TIMESTAMP
) WITH (
    'connector.type' = 'kafka',                                   -- 使用 kafka connector
    ,'connector.version' = 'universal',                           -- kafka 版本,universal 支持 0.11 以上的版本
    ,'connector.topic' = 'user_behavior',                         -- kafka topic
    ,'connector.startup-mode' = 'earliest-offset',                -- 从起始 offset 开始读取
    ,'connector.properties.bootstrap.servers' = 'localhost:9092', 
    ,'update-mode' = 'append',
    ,'format.type' = 'json',  -- 数据源格式为 json
    ,'format.derive-schema' = 'true' -- 从 DDL schema 确定 json 解析规则
)

-- sink
CREATE TABLE pvuv_sink (
    dt VARCHAR
    ,pv BIGINT
    ,uv BIGINT
) WITH (
    'connector.type' = 'jdbc' -- 使用 jdbc connector
    ,'connector.url' = 'jdbc:mysql://localhost:3306/flink-test' -- jdbc url
    ,'connector.table' = 'pvuv_sink' -- 表名
    ,'connector.username' = 'root' -- 用户名
    ,'connector.password' = '123456' -- 密码
    ,'connector.write.flush.max-rows' = '1' -- 默认5000条,为了演示改为1条
)

-- exec sql
INSERT INTO pvuv_sink
SELECT
  DATE_FORMAT(ts, 'yyyy-MM-dd HH:00') dt
  ,COUNT(*) AS pv
  ,COUNT(DISTINCT user_id) AS uv
FROM user_log
GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd HH:00')

commit to flink cluster

# --sql demo.sql                              special sql file demo.sql
# --state.backend rocksdb                     add properties state.backend as rocksdb
# --job.prop.file demoJobPropFile.properties  special job properties
# parameter priority : special parameter is hightest, next is job.prop.file, default properties [sqlSubmit.properties] last
sh start_pre_job.sh --session sqlDemo --sql demo.sql --state.backend rocksdb --job.prop.file demoJobPropFile.properties

Building

  • Git
  • Maven (recommend version 3.2.5 and require at least 3.1.1)
  • Java 8 or 11 (Java 9 or 10 may work)
git clone https://github.com/springMoon/sqlSubmit.git
cd sqlSubmit
mvn clean package -DskipTests # this will take up to 10 minutes

hive dialect

Flink create hive table need use hive dialect, just create rule when sql containes "hive_table_" means use hive dialect. such as :

-- set table.sql-dialect=hive;
-- hvie sink
drop table if exists hive_table_user_log_sink;
CREATE TABLE hive_table_user_log_sink (
  user_id STRING
  ,item_id STRING
  ,category_id STRING
  ,behavior STRING
) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (
  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
  'sink.partition-commit.trigger'='partition-time',
  'sink.partition-commit.delay'='1 min',
  'sink.partition-commit.policy.kind'='metastore,success-file'
);

checkpoint configuration

Config checkpoint use stream api

var stateBackend: StateBackend = null
  if ("rocksdb".equals(paraTool.get(STATE_BACKEND))) {
    stateBackend = new EmbeddedRocksDBStateBackend(true)
  } else {
    stateBackend = new HashMapStateBackend()
  }
  env.setStateBackend(stateBackend)
  // checkpoint
  env.enableCheckpointing(paraTool.getLong(CHECKPOINT_INTERVAL) * 1000, CheckpointingMode.EXACTLY_ONCE)
  env.getCheckpointConfig.setCheckpointTimeout(paraTool.getLong(CHECKPOINT_TIMEOUT) * 1000)
  // Flink 1.11.0 new feature: Enables unaligned checkpoints
  env.getCheckpointConfig.enableUnalignedCheckpoints()
  // checkpoint dir
  env.getCheckpointConfig.setCheckpointStorage(paraTool.get(CHECKPOINT_DIR))

udf

register udf

// udf
env.createTemporarySystemFunction("udf_decode", new Decode)

// udtf
env.createTemporarySystemFunction("udf_split", new SplitFunction)
env.createTemporarySystemFunction("udf_parse_json", new ParseJson)

Support

image

Gratitude

Thanks for JetBrains provide opensource license.

About

Apologize, modifying the license, for protect this project is free.