The logic of watermark judging whether to solve the data may result in missing data.
starmilkxin opened this issue · 2 comments
starmilkxin commented
long time = this.context.getDataTime();
long watermark = this.watermark(time - allowDelay, stateTopicMessageQueue);
if (time < watermark) {
//已经触发,丢弃数据
logger.warn("discard data:[{}], window has been fired. time of data:{}, watermark:{}",
data, time, watermark);
return;
}
It is expected that the dataA should be included in the time window, but the current logic is to discard it directly.
Should it be based on the end time of the time window?
ni-ze commented
ni-ze commented
can you submit a pr to solve this? thx.