基于flink1.11.2的单词统计,包括table api和streaming使用,以及常见的数据源读写
Flink 根据抽象程度分层,提供了三种不同的 API。每一种 API 在简洁性和表达力上有着不同的侧重,并且针对不同的应用场景。
Table wcTable = tEnv.fromDataStream(ds, Expressions.$("word"), Expressions.$("frequency"));
tEnv.createTemporaryView("word_count", wcTable);
tEnv.sqlQuery("select word, frequency from word_count ").execute().print();
DataStreamSource<String> textDs = env.socketTextStream(hostName, port, "\n");
ds.assignTimestampsAndWatermarks(WatermarkStrategy
// 延迟三秒
.forBoundedOutOfOrderness(Duration.ofSeconds(3)));
ds.addSink(new StdPrintSink());
// 定义
@PublicEvolving
public class StdPrintSink<IN> extends PrintSinkFunction<IN> {
@Override
public void invoke(IN record) {
super.invoke((IN) ("word count print>" + record.toString()));
}
}
// 获取外部参数 --configPath /xxx/xx/config.properties
ParameterTool parameters = ParameterTool.fromArgs(args);
// 读取configPath参数的值:配置文件目录
String configPath = parameters.get("configPath", null);
// 读取配置文件
ParameterTool paramFromProps = ParameterTool.fromPropertiesFile(configPath);
// 把配置参数传入运行时上下文
env.getConfig().setGlobalJobParameters(paramFromProps);
// 设定事件时间
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
继承RichSourceFunction,从mysql读取数据
public class MysqlDataSource extends RichSourceFunction> {
@Override public void open(Configuration parameters) throws Exception { // 创建数据库连接 // 从上下文获取配置参数 String dbHost = parameters.getString("dbHost"); ... } @Override public void run(SourceContext<List<String>> ctx) throws Exception { // 执行查询并获取结果 ... // 查询结果放入上下文,并添加时间戳和水位线 ctx.collectWithTimestamp(textQueryList, System.currentTimeMillis()); ctx.emitWatermark(new Watermark(System.currentTimeMillis())); textQueryList.clear(); } @Override public void cancel() { // 关闭数据库连接 ... }
}
DataStream<List<String>> ds0 = env.addSource(new MysqlDataSource());
继承RichSinkFunction,数据存入mysql
public class MysqlSink extends RichSinkFunction {
@Override public void open(Configuration parameters) throws Exception { // 创建数据库连接 ... } @Override public void close() throws Exception { // 关闭数据库连接 ... } @Override public void invoke(Wc value, @SuppressWarnings("rawtypes") Context context) throws Exception { // 执行 ps.setString(1, value.getWord()); ps.setInt(2, value.getFrequency()); ps.execute(); }
}
ds.addSink(new MysqlSink());
// 提交执行
env.execute("Mysql WordCount");