alibaba/feathub

Support defining time attribute on KafkaSource

yunfengzhou-hub opened this issue · 0 comments

kafka_utils.py:

            # TODO: Kafka Source throw exception when define a UDF computed column as
            #  row time attribute. We only compute the timestamp here without defining
            #  the row time attribute. The table will then be converted to DataStream
            #  and back to Table. Its row time attribute is defined when it is
            #  converted back to Table. This logical can be remove after UNIX_TIMESTAMP
            #  support return millisecond epoch.
            #  https://issues.apache.org/jira/browse/FLINK-19200
            builder = Schema.new_builder().from_schema(flink_schema)
        # TODO: Define row time attribute when converted from DataStream. This logical
        #  can be remove after UNIX_TIMESTAMP support return
        #  millisecond epoch.
        #  https://issues.apache.org/jira/browse/FLINK-19200
        schema = to_flink_schema(schema)
        max_out_of_orderness_interval = timedelta_to_flink_sql_interval(
            kafka_source.max_out_of_orderness + timedelta(milliseconds=1),
            day_precision=3,
        )
        schema = (
            Schema.new_builder()
            .from_schema(schema)
            .column(EVENT_TIME_ATTRIBUTE_NAME, DataTypes.TIMESTAMP_LTZ(3))
            .watermark(
                EVENT_TIME_ATTRIBUTE_NAME,
                watermark_expr=f"`{EVENT_TIME_ATTRIBUTE_NAME}` "
                f"- {max_out_of_orderness_interval}",
            )
            .build()
        )
        table = t_env.from_data_stream(t_env.to_data_stream(table), schema)