Flink团队提供用于比赛的Flink分支,选手进行Flink 代码优化以提升Flink SQL执行效率, 组委会提供Benchmark工具 供选手自行测试, 比赛阶段,选手将最终优化后的Flink代码打包,通过天池平台提交,天池平台会使用相同的性能测评工具,进行评测、排名。
1.比赛使用的测试数据集为100G,用户自行用Benchmark工具生成,生成数据集的随机因子组委会不公开。
2.比赛使用SQL query为TPC-DS工具生成的99个SQL(含变种SQL),共计103个,组委会从中选择20条SQL进行评测。
3.选手可以进行SQL优化,执行Plan算子优化,引擎优化,调度优化,数据传输优化等,以便让SQL作业执行更快速。
4.选手每天有三次提交作业的机会,每个自然日提供5次代码运行机会。
5.组委会保留对赛题修改的权利。
1.选手从Flink分支拉取Flink代码进行优化。
2.选手从Benchmark工具 获取query。
3.选手通过Benchmark工具 生成测试数据,建议利用Benchmark工具将数据转换成parquet格式以获取更好的性能。
4.本地利用Benchmark工具评测优化效果。
5.选手将优化后的代码打包提交天池平台。
一共两个工程,建议工程组织:
+--/home
+--/hadoop
+--/flink
+--/flink-dist
+--/flink-core
+--...
+--/flink-community-perf
+--/src
+--/bin
+--/resource
flink工程目录不做细讲,flink-community-perf 工程目录结构如下:
+--/src
+--/main
+--/java
+--/scala
+--/bin
+--/params.conf
+--/runBenchMark.sh
+--/runParquetCnnvertTool.sh
+--/resource
+--/tpcds
+--/compileTpcds.sh
+--/compileTpcdsMac.sh
+--/datagen
+--/querygen
+--/tpc-ds-tool
+--/tpc-ds-tool-mac
Flink比赛使用分支:Flink代码是从社区apache/flink代码拉取分支,并根据比赛需要新增了一些feature和bug修复,请所有选手基于这份代码进行优化 Benchmark工具: Flink 用来做TPC-DS性能测试的工具,提供生成数据集、生成测试query,同时提供了数据集从CSV转parquet的功能,Benchmark工具中有TPC-DS全量99个SQL。
(1) 编译最新的tpc-ds tools(v2.11.0), 命令:
cd flink-community-perf/resource/tpcds
sh compileTpcds.sh
tpc-ds tools对Mac系统支持不好,Mac用户可以使用下面命令,这个工具源自tpcds-kit ,命令:
cd flink-community-perf/resource/tpcds
sh compileTpcdsMac.sh
(2) 指定 scaleFactor(GB) 和 RNGSEED (随机种子) 生成测试数据,默认:
cd flink-community-perf/resource/tpcds/datagen
sh generateTpcdsData.sh 1 0
测试数据集可以适当小些,数据位于flink-community-perf/resource/tpcds/datagen/data下
(3) 生成query,提供了全量99个query位于 flink-community-perf/resource/tpcds/querygen/queries下,其中 SF1包含两个测试SQL,SF100包含全量99个SQL
(1) 拉取比赛使用的Flink代码tpcds-master,commands:
cd /home/hadoop/flink
git remote add flink-tpc-ds git@github.com:flink-tpc-ds/flink.git
git fetch flink-tpc-ds
git checkout tpcds-master
(2) 优化 也是比赛最主要的部分, 优化Flink代码和配置,包含但不限于以下模块:
SQL 优化,优化出更合理的SQL执行Plan。
算子优化, 优化出更高效的SQL执行算子。
引擎优化, 优化调度,数据传输等等,可以让作业执行更快速。
(3) 将优化后的Flink代码打包上传,并将压缩包上传至天池:
cd /home/hadoop/flink
mvn clean install -DskipTests=true -Dfast
# optional, compiling parallel to speed up
mvn clean install -DskipTests=true -Dfast -T 2C -Dmaven.compile.fork=true
选手可以在本地IDE中进行测试,也可以在选手自己的集群上进行测试:
(1) 找到主类:
src/main/scala/com.alibaba/flink/benchmark/QueryBenchmark
(2)修改 pom.xml 依赖.
<dependency.scope>compile</dependency.scope>
(3) 设置运行参数:
运行一个 sql 作业(如query1.sql),设置数据集大小为 1G, 执行次数为 3,测试数据格式为csv,运行参数如下:
--sqlQueries query1.sql
--numIters 3
--scaleFactor 1
--dataLocation /home/hadoop/flink-community-perf/resource/tpcds/datagen/data/SF1
--sqlLocation /home/hadoop/flink-community-perf/resource/tpcds/querygen/queries/SF1
--sqlType tpcds
--sourceType csv
--optimizedPlanCollect true
--dumpFileOfOptimizedPlan /tmp/plan/tpcds
--operatorMetricCollect false
--dumpFileOfPlanWithMetrics /tmp/metrics
--analyzeTable false
运行所有 sql 作业,设置数据集大小为 1G, 执行次数为 3,测试数据格式为parquet,运行参数如下:
--sqlQueries all
--numIters 3
--scaleFactor 1
--dataLocation /home/hadoop/flink-community-perf/resource/tpcds/datagen/data/SF1
--sqlLocation /home/hadoop/flink-community-perf/resource/tpcds/querygen/queries/SF1
--sqlType tpcds
--sourceType parquet
--optimizedPlanCollect true
--dumpFileOfOptimizedPlan /tmp/plan/tpcds
--operatorMetricCollect false
--dumpFileOfPlanWithMetrics /tmp/metrics
--analyzeTable false
选手可以根据自己的需求进行修改,其中analyzeTable表示在运行SQL前是否对所有数据集进行信息统计。
(1) 比赛使用standalone集群,命令如下:
cd /home/hadoop/flink/flink-target/flink-dist/target/flink-1.9-tpcds-master-bin
# ---:parquet Sink Source dependencies,copy here when using parquet---
cp ~/.m2/repository/org/apache/parquet/parquet-column/1.8.2/parquet-column-1.8.2.jar ./lib/
cp ~/.m2/repository/org/apache/parquet/parquet-hadoop/1.8.2/parquet-hadoop-1.8.2.jar ./lib/
cp ~/.m2/repository/org/apache/parquet/parquet-common/1.8.2/parquet-common-1.8.2.jar ./lib/
cp ~/.m2/repository/org/apache/parquet/parquet-encoding/1.8.2/parquet-encoding-1.8.2.jar ./lib/
cp ~/.m2/repository/org/apache/parquet/parquet-format/2.3.1/parquet-format-2.3.1.jar ./lib/
cp ~/.m2/repository/org/codehaus/jackson/jackson-core-asl/1.8.8/jackson-core-asl-1.8.8.jar ./lib/
cp ~/.m2/repository/org/codehaus/jackson/jackson-mapper-asl/1.8.8/jackson-mapper-asl-1.8.8.jar ./lib/
cp ~/.m2/repository/org/apache/flink/flink-shaded-hadoop-2-uber/2.4.1-7.0/flink-shaded-hadoop-2-uber-2.4.1-7.0.jar ./lib/
# ---:parquet Sink Source dependencies,copy here when using parquet---
cd /home/hadoop/flink/flink-dist/target/flink-1.9-tpcds-master-bin/flink-1.9-tpcds-master/bin
sh start-cluster.sh
你可以在http:localhost:8081 管理作业,在拉起集群前,你可以修改flink配置文件以获得更好的性能
(2)设置运行参数:
cd /home/hadoop/flink-community-perf/bin
vim param.conf
关键的参数有:
### flink binaries package directory
export FLINK_DIR=/home/hadoop/flink/flink-dist/target/flink-1.9-tpcds-master-bin
### file directory stores tpc raw data
export DATA_LOCATION=/home/hadoop/flink-community-perf/resource/tpcds/datagen/data/SF${SF}
### sql queries local directory
export SQL_LOCATION=/home/hadoop/flink-community-perf/resource/tpcds/querygen/queries/SF${SF}
### flink-perf jar local file
export JAR_FILE=/home/hadoop/flink-community-perf/target/flink-community-perf-1.0-SNAPSHOT-jar-with-dependencies.jar
(3)打包 Modify the dependencies scope in pom.xml for run in cluster.
<dependency.scope>provided</dependency.scope>
Build the jar
cd /home/hadoop/flink-community-perf/
mvn assembly:assembly
(4)提交 tpc-ds benchmark 作业
cd /home/hadoop/flink-community-perf/bin
运行一个sql, 设置数据集大小为 10G, 执行次数为 3, 运行参数如下:
sh runBenchmark.sh query1.sql 3 SF=10
运行所有 sql, 设置数据集大小为 100G, 执行次数为 1, 运行参数如下:
sh runBenchmark.sh all 1 SF=100
选手可以根据自己的需求进行修改param.conf文件,其中analyzeTable表示在运行SQL前是否对所有数据集进行信息统计。
(1) 代码优化完成后,打包提交到天池平台,打包命令如下:
cd /home/hadoop/flink/flink-target/flink-dist/target/flink-1.9-tpcds-master-bin
# ---:parquet Sink Source依赖,使用parquet时需要引入---
cp ~/.m2/repository/org/apache/parquet/parquet-column/1.8.2/parquet-column-1.8.2.jar ./lib/
cp ~/.m2/repository/org/apache/parquet/parquet-hadoop/1.8.2/parquet-hadoop-1.8.2.jar ./lib/
cp ~/.m2/repository/org/apache/parquet/parquet-common/1.8.2/parquet-common-1.8.2.jar ./lib/
cp ~/.m2/repository/org/apache/parquet/parquet-encoding/1.8.2/parquet-encoding-1.8.2.jar ./lib/
cp ~/.m2/repository/org/apache/parquet/parquet-format/2.3.1/parquet-format-2.3.1.jar ./lib/
cp ~/.m2/repository/org/codehaus/jackson/jackson-core-asl/1.8.8/jackson-core-asl-1.8.8.jar ./lib/
cp ~/.m2/repository/org/codehaus/jackson/jackson-mapper-asl/1.8.8/jackson-mapper-asl-1.8.8.jar ./lib/
cp ~/.m2/repository/org/apache/flink/flink-shaded-hadoop-2-uber/2.4.1-7.0/flink-shaded-hadoop-2-uber-2.4.1-7.0.jar ./lib/
# ---:parquet Sink Source依赖,使用parquet时需要引入---
tar --exclude=flink-1.9-tpcds-master/opt/ -zcvf flink-1.9-tpcds-master.tar.gz flink-1.9-tpcds-master/
上传刚刚压缩的flink-1.9-tpcds-master.tar.gz到天池进行测评
# zip打包:zip -v -r flink-1.9-tpcds-master.zip flink-1.9-tpcds-master/ -x \*flink-1.9-tpcds-master/opt\*
parquet在大量数据的的读取和查询上有更好的性能,Flink也支持该格式,tpc-ds工具生成的数据集格式是csv的,benchmark提供了工具进行换。
(1) 找到主类:
src/main/scala/com.alibaba/flink/benchmark/tools/CsvToParquet
(2) 修改 pom.xml:
<dependency.scope>compile</dependency.scope>
(3) 设置主类运行参数:
进行数据规模为1G的csv数据转为parquet数据:
--scaleFactor 1
--sqlType tpcds
--dataLocation /home/hadoop/flink-community-perf/resource/tpcds/datagen/data/SF1
--compressionCodec snappy
你可以根据自己需求进行参数设置
(1) 拉起flink集群,命令:
cd /home/hadoop/flink/flink-dist/target/flink-1.9-tpcds-master-bin/flink-1.9-tpcds-master/bin
sh start-cluster.sh
(2)设置参数:
cd /home/hadoop/flink-community-perf/bin
vim param.conf
关键参数有:
### flink parquet compression type,parquet format has better performance than csv format
export COMPRESSION_TYPE=snappy
### flink binaries package directory
export FLINK_DIR=/home/hadoop/flink/flink-dist/target/flink-1.9-tpcds-master-bin
### file directory stores tpc raw data
export DATA_LOCATION=/home/hadoop/flink-community-perf/resource/tpcds/datagen/data/SF${SF}
### flink-perf jar local file
export JAR_FILE=/home/hadoop/flink-community-perf/target/flink-community-perf-1.0-SNAPSHOT-jar-with-dependencies.jar
(3)打包: 修改依赖scope
<dependency.scope>provided</dependency.scope>
编译打包
cd /home/hadoop/flink-community-perf/
mvn assembly:assembly
(4)提交作业
cd /home/hadoop/flink-community-perf/bin
执行格式转换作业
sh runParquetConvertTool.sh SF=10
如有任何关于Flink TPC-DS 比赛的疑问,请联系我们。 钉钉群号:21950145