apache/rocketmq-streams

The logic of watermark judging whether to solve the data may result in missing data.

starmilkxin opened this issue · 2 comments

image

  long time = this.context.getDataTime();

  long watermark = this.watermark(time - allowDelay, stateTopicMessageQueue);
  if (time < watermark) {
      //已经触发,丢弃数据
      logger.warn("discard data:[{}], window has been fired. time of data:{}, watermark:{}",
              data, time, watermark);
      return;
  }

It is expected that the dataA should be included in the time window, but the current logic is to discard it directly.

Should it be based on the end time of the time window?

ni-ze commented

The watermark is based on the endTime of window:
image

But I agree, commit(store) watermark into RocksDB should only happened after window fired.

store watermark when data coming is not correct.
image

ni-ze commented

can you submit a pr to solve this? thx.