Flink学习笔记
Flink 1.18.1、MySQL CDC 3.0.1
基于JsonDebeziumDeserializationSchema自带的序列化器
JSON字符串中默认是一个Base64编码的字符串
解决方式:
Map<String, Object> customConverterConfigs = Map.of(
// Decimal类型使用数字而不是base64编码
JsonConverterConfig.DECIMAL_FORMAT_CONFIG, DecimalFormat.NUMERIC.name()
);
var deserializer = new JsonDebeziumDeserializationSchema(false, customConverterConfigs);
- DATE:JSON字符串展示的是原日期到1970-01-01的天数
- DATETIME:把这个时间作为UTC时区的时间,然后返回的是纪元毫秒数,这样对于国内东八区来说,这个毫秒数就多个8个小时。是个错误的值,因为把东八区的时间直接作为UTC的时间来计算,而不是转换成UTC的时间再计算毫秒数。
- TIMESTAMP:由于MySQL存的是纪元秒数,在JSON字符串中基于MySQL的
time_zone
系统变量转换成UTC时区的时间,时间是对的,相比国内东八区而言,会少8个小时,解析时需要带着时区去解析。形如2018-06-20T13:37:03Z
对于上面说的情况,转成Java对象时,都需要自己写Jackson的反序列化器来解决。
参见自定义的(非内置)MysqlDateDeserialize
、MysqlDateTimeDeserialize
、MysqlTimestampDeserialize
。
- 在JDK8中,由于没有模块化机制限制,可以正常在POJO中使用。
- 在JDK11中,虽然有模块化限制,但是对于unamed module而言,是可以通过反射访问模块中的私有变量的,只不过会有一个警告。
- 而在JDK17+版本,由于Java增强了模块化限制,即使是一个module放在classpath上作为一个unnamed module时,通过反射访问非公共属性时还是会报错,需要手动增加启动参数,公开反射访问权限。否则无法在Flink中使用,Flink在序列化时会报错。
解决方式: 增加以下jvm启动参数即可
# 公开java.base模块的java.util、java.time包的反射访问权限给所有的未命名模块
--add-opens java.base/java.util=ALL-UNNAMED
--add-opens java.base/java.time=ALL-UNNAMED
注:如果是在flink集群中运行任务,则无需添加,因为flink发行包中已经加了这些启动参数。