SourceRecordReader implementations should use milliseconds to create their records
ahawtho opened this issue · 3 comments
ahawtho commented
Kafka record timestamps are defined in milliseconds, but the StreamSourceRecordReader
class and the KeySourceRecordReader
class use the following to create their records:
return new SourceRecord(..., Instant.now().getEpochSecond());
These should be switched to Instant.now().toEpochMilli()
.
The effect is that when the source connector appends a message to a topic, if the topic has retention.ms
set to a reasonable value, the messages are immediately deleted from the topic.
This can be worked around by setting the topic configuration value:
message.timestamp.type: LogAppendTime
ahawtho commented
Thanks for the quick turnaround. Any idea when will this make a release?
jruaux commented
Thank you for your contribution. This is included in version 7.2.
jruaux commented
🎉 This issue has been resolved in v0.7.5
(Release Notes)