logstash-plugins/logstash-integration-kafka

Add support for fetching specific ranges on a topic

yaauie opened this issue · 0 comments

A feature request for this plugin came in on Logstash core elastic/logstash#14510 from @ktpktr0:

Kafka can re consume messages from a specified time

I noticed that logstash consumes Kafka, when some errors lead to the need to re consume messages. I found Auto_ offset_ There are only three reset options. Kafka has the following options, which include that messages can be consumed again from a specified time.

Displacement dimension

Earliest

Adjust the displacement to the current earliest displacement
The early strategy means that the displacement is adjusted to the current earliest displacement of the theme. This earliest displacement is not necessarily 0, because in the real estate environment, a very long message will be deleted by Kafka, so the current earliest displacement is likely to be a value less than 0. If you want to re consume all the messages of the theme, you can use the early strategy.


Latest

Adjust the displacement to the current latest displacement
The latest strategy means that the displacement is reset to the latest end displacement. If you have sent a total of 15 messages to a topic, the latest end displacement is 15. If you want to skip all historical messages and start consuming from the latest messages, you can use the latest strategy


Current

Adjust the displacement to the current latest submitted displacement
The current policy means that the displacement is adjusted to the latest displacement currently submitted by the consumer. Sometimes you may encounter a scenario where you modify the consumer program code and restart the consumer, and find that there is a problem with the code. You need to roll back the previous code changes and reset the displacement to the position when the consumer restarts. Then, current strategy can help you achieve this function.


Specified-Offset

Adjust the displacement to the specified displacement
The specified offset strategy is a more common strategy, which means that consumers adjust the displacement value to the displacement you specify. The typical scenario of this strategy is that when the consumer program processes an error message, you can automatically "skip" the processing of this message. In the actual use process, the corrupted message method may be consumed. At this time, the consumer program will throw an exception and the method will continue. Once you encounter this problem, you can try to use the specified offset strategy to avoid it.


Shift-By-N

Adjust the displacement to the current displacement + n (n can be a negative value)
If the specified offset policy requires you to specify the absolute value of the displacement, then the shift-by-n policy specifies the relative value of the displacement, that is, you can give the distance of the segment of messages to skip. The "jump" here is two-way, you can "jump" forward or backward. For example, if you want to reset the displacement to the first 100 displacements of the current displacement, you need to specify n as - 100

Time dimension

DateTime

Adjust the displacement to be greater than the minimum displacement at a given time
Datetime allows you to specify a time and then reset the displacement to the earliest displacement after that time. A common scenario is that if you want to re consume yesterday's data, you can reset the strategy to 0 o'clock yesterday.

Duration

Adjust the displacement to the displacement at the specified interval from the current time
The duration strategy is to give a relative time interval, and then adjust the displacement to the displacement from the current given time interval. The specific format is pndtnhnmns. If you are familiar with the duration class of Java 8, you should not feel strange to this format. It is a duration format conforming to the iso-8601 specification. It starts with the word ⺟ P and is composed of four parts, namely D, h, m and s, which represent day, hour, minute and second respectively. For example, if you want to adjust the displacement back to 15 minutes ago, you can specify pt0h15m0s.