Support defining time attribute on KafkaSource
yunfengzhou-hub opened this issue · 0 comments
yunfengzhou-hub commented
kafka_utils.py:
# TODO: Kafka Source throw exception when define a UDF computed column as
# row time attribute. We only compute the timestamp here without defining
# the row time attribute. The table will then be converted to DataStream
# and back to Table. Its row time attribute is defined when it is
# converted back to Table. This logical can be remove after UNIX_TIMESTAMP
# support return millisecond epoch.
# https://issues.apache.org/jira/browse/FLINK-19200
builder = Schema.new_builder().from_schema(flink_schema)
# TODO: Define row time attribute when converted from DataStream. This logical
# can be remove after UNIX_TIMESTAMP support return
# millisecond epoch.
# https://issues.apache.org/jira/browse/FLINK-19200
schema = to_flink_schema(schema)
max_out_of_orderness_interval = timedelta_to_flink_sql_interval(
kafka_source.max_out_of_orderness + timedelta(milliseconds=1),
day_precision=3,
)
schema = (
Schema.new_builder()
.from_schema(schema)
.column(EVENT_TIME_ATTRIBUTE_NAME, DataTypes.TIMESTAMP_LTZ(3))
.watermark(
EVENT_TIME_ATTRIBUTE_NAME,
watermark_expr=f"`{EVENT_TIME_ATTRIBUTE_NAME}` "
f"- {max_out_of_orderness_interval}",
)
.build()
)
table = t_env.from_data_stream(t_env.to_data_stream(table), schema)