/FlinkSQL

仿照阿里blink使用sql开发flink的实时程序

Primary LanguageJavaApache License 2.0Apache-2.0

一.背景

阿里工作的时候是使用Blink进行流数据处理和计算,通过编写sql实现Blink的计算job,开发简单高效,产品易用。 目前尝试实现Flink产品化,类似Blink。使用SQL为统一开发规范,SQL语言的好处是:声明式,易理解,稳定可靠,自动优化。 如果采用API开发的话,最大的问题是对于job调优依赖程序员经验,比较困难,同时API开发方式侵入性太强(数据安全,集群安全等),而sql可以自动调优,避免这种问题的产生。

二.实现思路:

用户输入sql(ddl,query,dml)  -> ddl对应为Flink的source和sink
                       
                       
                            -> query/dml的insert into数据处理和计算
                       
                       
--> 封装为对应Flink的Job:env.sqlQuery/env.sqlUpdate


--> JobGraph和对应job提交,ClusterClient.submitJob或者ClusterDescriptor.deployJobCluster

三.发布版本:

v3.0.0 2020年1月

     1.使用flink 1.10版本
           1.10之前的版本自带的sql解析功能不完善,如解析function,watermark等,所以比较鸡肋,还不如不用更换以前开发的解析层功能。
     2.使用新接口ClusterClient.submitJob提交job
     3.
     4.

新特性

      1. flink自带的sql解析
      2. 使用新的job提交接口
      2. 流批处理一体化实现
      3. 钉钉/微信告警通知        

v2.0.1
v2.0.0 2019年4月

       blink-client    接口定义
       blink-sql/calcite   stream和batch table的sql解析
       blink-libraries   自定义source, sink, side开发
       blink-batch   BatchTableSource和BatchTableSink
       blink-stream  StreamTableSource和StreamTableSink
       blink-job   batch/stream job 提交

v2.0.1新特性
v2.0.0新特性

    1. 抽取sql层被流和批使用,SQL参考flink issues和对应提供的doc
    2. 增加批处理开发
    3. 增加维表功能
    4. 升级flink版本为1.7.x

v1.0.0 2018年7月

      blink-client 接口定义
      blink-sqlserver  stream table的sql解析
      blink-job  封装为stream job    

新特性

      1. 实现create function
      2. 实现sql开发流处理程序任务  
      3. 更改源码实现sql CEP     

四.样例

v3.0.0 sql开发流任务示例:

v2.0.1 sql开发流任务示例:

batch sql示例:

CREATE FUNCTION demouf AS 'ambition.api.sql.function.DemoUDF' 
LIBRARY 'hdfs://flink/udf/jedis.jar','hdfs://flink/udf/customudf.jar';

CREATE TABLE csv_source (
    id int, 
    name varchar, 
    `date` date , 
    age int
) 
with (
    type=source,
    connect.type=json,
    'file.path'='file:///FlinkSQL/blink-job/src/test/resources/demo.json'
);

CREATE TABLE csv_sink (
    `date` date, 
    age int, 
    PRIMARY KEY (`date`)
) 
with (
    type=sink,
    connect.type=csv,
    'file.path'='file:///FlinkSQL/blink-job/src/test/resources/demo_out.csv'
);

create view view_select as  
    SELECT `date`, age FROM csv_source group by `date`,age
;

INSERT INTO csv_sink 
    SELECT `date`, sum(age) FROM view_select group by `date`
;

stream sql 示例:

CREATE FUNCTION demouf AS 
      'ambition.api.sql.function.DemoUDF' 
LIBRARY 'hdfs://flink/udf/jedis.jar','hdfs://flink/udf/customudf.jar';
      
CREATE TABLE kafka_source (
      `date` varchar,
      amount float, 
      proctime timestamp
      ) 
with (
      type=source,
      'connect.type'=kafka,
      'flink.parallelism'=1,
      'kafka.topic'=topic,
      'kafka.group.id'=flinks,
      'kafka.enable.auto.commit'=true,
      'kafka.bootstrap.servers'='localhost:9092'
);

CREATE TABLE mysql_sink (
      `date` varchar, 
      total_amount float, 
      PRIMARY KEY (`date`)
      ) 
with (
      type=mysql,
      'connect.type'=mysql,
      'mysql.connection'='localhost:3306',
      'mysql.db.name'=flink,
      'mysql.batch.size'=10,
      'mysql.table.name'=flink_table,
      'mysql.user'=root,
      'mysql.pass'=root
);

CREATE VIEW view_select AS 
      SELECT `date`, 
              amount 
      FROM kafka_source 
      GROUP BY 
            `date`,
            amount
      ;

INSERT INTO mysql_sink 
       SELECT 
          `date`, 
          sum(amount) as total_amount
       FROM view_select 
       GROUP BY 
          `date`
      ;

v2.0.0 sql开发流任务示例:

batch sql示例:

CREATE FUNCTION demouf AS 'ambition.api.sql.function.DemoUDF' 
LIBRARY 'hdfs://flink/udf/jedis.jar','hdfs://flink/udf/customudf.jar';

CREATE SOURCE TABLE json_source (
      id int, 
      name varchar, 
      `date` date , 
      age int
) 
with (
      type=json,
      'file.path'='file:///FlinkSQL/blink-job/src/test/resources/demo.json'
);

CREATE SINK TABLE csv_sink (
      `date` date, 
      total_age int
) 
with (
      type=csv,
      'file.path'='file:///FlinkSQL/blink-job/src/test/resources/demo_out.csv'
);

CREATE VIEW view_select as  
      SELECT `date`, age 
      FROM json_source 
      GROUP BY `date`,age;
  
INSERT INTO csv_sink 
      SELECT `date`, sum(age) as total_age
      FROM view_select 
      GROUP BY `date`;

stream sql 示例:

CREATE FUNCTION demouf AS 
      'ambition.api.sql.function.DemoUDF' 
LIBRARY 'hdfs://flink/udf/jedis.jar','hdfs://flink/udf/customudf.jar';
      
CREATE SOURCE TABLE kafka_source (
      `date` varchar,
      amount float, 
      proctime timestamp
      ) 
with (
      type=kafka,
      'flink.parallelism'=1,
      'kafka.topic'=topic,
      'kafka.group.id'=flinks,
      'kafka.enable.auto.commit'=true,
      'kafka.bootstrap.servers'='localhost:9092'
);

CREATE SINK TABLE mysql_sink (
      `date` varchar, 
      total_amount float, 
      PRIMARY KEY (`date`)
      ) 
with (
      type=mysql,
      'mysql.connection'='localhost:3306',
      'mysql.db.name'=flink,
      'mysql.batch.size'=10,
      'mysql.table.name'=flink_table,
      'mysql.user'=root,
      'mysql.pass'=root
);

CREATE VIEW view_select AS 
      SELECT `date`, 
              amount 
      FROM kafka_source 
      GROUP BY 
            `date`,
            amount
      ;


INSERT INTO mysql_sink 
       SELECT 
          `date`, 
          sum(amount) as total_amount
       FROM view_select 
       GROUP BY 
          `date`
      ;

v1.0.0 sql开发流任务示例:

CREATE FUNCTION demouf AS 
      'ambition.api.sql.function.DemoUDF' 
USING JAR 'hdfs://flink/udf/jedis.jar',
      JAR 'hdfs://flink/udf/customudf.jar';
      
      
CREATE TABLE kafka_source (
      `date` string,
      amount float, 
      proctime timestamp
      ) 
with (
      type=kafka,
      'flink.parallelism'=1,
      'kafka.topic'=topic,
      'kafka.group.id'=flinks,
      'kafka.enable.auto.commit'=true,
      'kafka.bootstrap.servers'='localhost:9092'
);
CREATE TABLE mysql_sink (
      `date` string, 
      amount float, 
      PRIMARY KEY (`date`,amount)
      ) 
with (
      type=mysql,
      'mysql.connection'='localhost:3306',
      'mysql.db.name'=flink,
      'mysql.batch.size'=0,
      'mysql.table.name'=flink_table,
      'mysql.user'=root,
      'mysql.pass'=root
);
CREATE VIEW view_select AS 
      SELECT `date`, 
              amount 
      FROM kafka_source 
      GROUP BY 
            `date`,
            amount
      ;
INSERT INTO mysql_sink 
       SELECT 
          `date`, 
          sum(amount) 
       FROM view_select 
       GROUP BY 
          `date`
      ;

五.代码关注

apache flink

apache calcite

uber AthenaX

DTStack flinkStreamSQL