spring-projects/spring-integration-aws

KinesisMessageDrivenChannelAdapter crashes if first batch of a data stream never checkpoints (manual checkpoint, batch listenerMode)

lalkmim opened this issue · 16 comments

The relevant bit is this one: https://github.com/spring-projects/spring-integration-aws/blob/main/src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter.java#L1108

If the consumer receives the very first batch of records and no checkpoint is triggered, then the line above will throw a NullPointerException (lastCheckpointSequence is null), crashing the runner and stopping consumption altogether.

So, this logic:

							if (lastCheckpointSequence.equals(lastRecordSequence)) {
								logger.info("latestCheckpointSequence is same as latestRecordSequence. " +
										"Should getNextShardIterator()");
								// Means the manual checkpointer has processed the last record, Should move forward
								this.shardIterator = result.nextShardIterator();
							}

must be guarged with != null.

Would you mind contributing the fix?
And looks like we don't have a respective test to catch it earlier 😢

Sure, my understanding is that if lastCheckpointSequence is null the entire if/else needs to be skipped and remain with the current shardIterator, correct?

Well, it looks like you don't perform checkpoint in your code, therefore nothing to do here and we probably must go to the next iterator with the hope that you'll checkpoint it at one point.
In the perfect world we really don't need checkpointing and can just process records how we pull them from the stream.
In this case that:

this.shardIterator = result.nextShardIterator();

is valid remaining logic. The sequence is always forwarding and if we checkpoint it at some point, that mean that all lower sequences are also confirmed.
So, probably that:

if (lastCheckpointSequence.equals(lastRecordSequence)) {
								logger.info("latestCheckpointSequence is same as latestRecordSequence. " +
										"Should getNextShardIterator()");
								// Means the manual checkpointer has processed the last record, Should move forward
								this.shardIterator = result.nextShardIterator();
							}

must be like if (lastCheckpointSequence == null || lastCheckpointSequence.equals(lastRecordSequence)) {
But we must modify a logging message we have in that if block.
Or just have another if(lastCheckpointSequence == null) with its specific logging message, but still move forward for the next iterator.

So, if no checkpoint is triggered and we get the next shard iterator, then:

  1. will the same records be retrieved or;
  2. those will be skipped and a new set of records will be retrieved?

My main concern regards option 2. If that happens then we'll lose data and completely remove the ability for "at-least-once" delivery (which is my use case).

The Kinesis steam (to be precise shard) is a log of records. There is a cursor attached to the consumer and when we ask for the next iterator that cursor goes forward.
Why would it come back to the current records if you don't ask about that?
Why would the framework make an assumption about this your decision to not commit an offset?
The point if the manual checkpointing is exactly how I explained it before: to avoid some extra network overhead when we think that we are in a perfect worlds, at least for some time.
When you configure a manual checkpointing it is up to you already to not lose that consumed data.

You can checkpoint at the specific sequence:

	/**
	 * Checkpoint the provided sequence number, if it is bigger than already stored.
	 * @param sequenceNumber the sequence number to checkpoint.
	 * @return true if checkpoint performed; false otherwise.
	 */
	boolean checkpoint(String sequenceNumber);

So, the next poll will start from there. This indeed may lead to refetching records you've got on the previous cycle.

Technically your question is more about how AWS Kinesis works, then what this framework provides for you.

The NPE concern is still valid and it has to be fixed, but with a minimal and a reasonable impact: you didn't commit, then you assume that you are in the perfect world, so you are good to get the next iterator.
To be more clearer: the AWS Kinesis does not provide any built-in feature for storing offsets, so all the logic about lost and retriable data is up to target project.
The checkpointing feature is an invention of this projected inspired by Apache Kafka and KCL. Just because it is hard to achieve a perfect world in the real world.

Well, looking to the code, we really have a logic for your use-case with manual checkpointing:

else {
								logger.info("latestCheckpointSequence is not the same as latestRecordSequence. " +
										"Should Get a new iterator AFTER_SEQUENCE_NUMBER latestCheckpointSequence");
								// Something wrong happened and not all records were processed.
								// Must start from the latest known checkpoint
								KinesisShardOffset newOffset = new KinesisShardOffset(this.shardOffset);
								newOffset.setSequenceNumber(lastCheckpointSequence);
								newOffset.setIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER);
								GetShardIteratorRequest shardIteratorRequest = newOffset.toShardIteratorRequest();
								this.shardIterator =
										KinesisMessageDrivenChannelAdapter.this.amazonKinesis
												.getShardIterator(shardIteratorRequest)
												.join()
												.shardIterator();
							}

So, I was right before: you need to checkpoint at the specific sequence where you are sure that it was processed.

I believe my previous message wasn't clear, sorry about that. What you said makes sense and it fits our use case, but there's a long lasting issue with KCL[1] where, if no checkpoints happen (regardless of the reason), that batch of records is simply skipped. So, back to this library, consider this example: if I receive records 1 to 10 and no checkpoint is made, will the next shardIterator send records 1 to 10 (again) or 11 to 20?

[1] awslabs/amazon-kinesis-client#10

No, it is not going to retry: it is really not this component responsibility to retry if you didn't say that to do via the mentioned checkpointing.
You have a lot of choice to not overhead a Kinesis consumer with retry logic. for example see RequestHandlerRetryAdvice in Spring Integration to be set on your downstream handler.

The AWS Kinesis is similar to Apache Kafka and they both are about event streaming. There is no something like acknowledge per event: we just got a sequence from a log line to track.
Yes, we can go back to some sequence, but we explicitly have to say it to do that.
Either way in most cases we need to follow event streaming principles: always go forward. If even has been delivered to your process, it is up to you to deal with it this or other way.
So, in general I agree with KCL committers' answers to that issue: it is not a Kinesis consumer responsibility to retry failed data.
It is even fully not an event streaming source responsibility to handle errors.

Well that doesn't make sense to me. If I'm selecting manual checkpointing and that is only respected in some cases, then there's no point in having manual checkpoint at all.

I'm not talking about Kinesis/Kafka but the client libraries: I expect that when I request a batch of records that my last checkpoint is respected. If I receive records 1 to 10 and I checkpoint only record 1, I expect that my following request receive records 2 to 11. And if I don't checkpoint at all for that batch, my following request should receive 2 to 11 again.

I'd expect though a totally different behavior if checkpointing would be by record/batch/periodic. But I'm talking specifically about manual checkpoint with batch request. Again, under this specific scenario, I'd expect the checkpoint to always be respected and nothing should be assumed or done behind the scenes.

That's my point: you still think in a regular ack-aware messaging.
The streaming works fully different way and I'm not going to change that logic.
If you don't checkpoint, we just go ahead to the next iterator. That's it, nothing more.
You need to keep this logic in mind and adapt your processing to that.

You got sequences 1-10. If you don't checkpoint, we go to 11-20. If you commit just 1 we go to 2-11.
The manual mode was exactly introduced for cases when you may not checkpoint at all with an expectation to get next batch or checkpoint at specific sequence to retry on it.

If you don't checkpoint, we just go ahead to the next iterator. That's it, nothing more.

Why? This means that if I don't checkpoint (for whatever reason, intentionally or not) the library assumes the behavior should shift to a batch checkpoint.

Back to the previous example: imagine that I receive a batch with records 1 to 10, and record 2 causes an issue. I'd checkpoint only record 1 and move on. Next batch will serve records 2 to 11, allowing me to effectively retry record 2, which will cause the same problem. This time though I won't be able to checkpoint at all. Now, instead of receiving records 2 to 11 (again) I'll receive records 12 to 21, completely skipping records 2 to 11 and removing the application's ability to guarantee at-least-once delivery.

If that's indeed the behavior how could I achieve at-least-once delivery? Otherwise I'd honestly prefer to leave the application as is, at least in that way I'm able to restart the application and not lose any data.

That is how manual checkpoint was designed. To be honest checkpointing is just a side effect of this library.
We could just use Kinesis API as is and let the target processor to deal with sequences on its own.
And I still think that whatever you'd like to achieve is out of this library responsibility.
The API we provide has a minimal sophistication and it is really natural to assume that if you don't checkpoint we just go ahead in the stream of events.
The checkpoint is there for convenience where you restart the whole application for some crash reason.

I told you already: use a RequestHandlerRetryAdvice on your records processor side.

We may consider to introduce some extra logic to re-seek to the beginning of the current batch if an exception is thrown from the listener, but that has to be a separate issue.
But no checkpointing in the manual mode must not mean to retry the current batch.

Please, don't forget that we must talk in this one just about fixing that NPE.

While tried to fix the mentioned NPE, I had looked into the logic closer and it really doesn't match to whatever I have explained you here and what is manual in my opinion.

Currently the logic is like always rely on a checkpoint and if you don't perform it in your code, we just assume that something is wrong and comes back to the previously committed sequence.
And this feels like an expectation of an automatic batch mode. So, why do we need a manual if we always have to commit it like it would happen when we would just use a batch mode?

Therefore I'm going to revise the logic into something like this:

  1. If everything is OK (any mode), we just go ahead to the next iterator, even if you don't call checkpoint from your code, or that batch does it in the end of send cycle.
  2. If you throw (a new) RequestShardForSequenceException we perform a GetShardIteratorRequest for a sequence in that exception.
  3. It will work same way for any modes. We perform automatic commits when send was OK for batch and record. That exception would give us a flow control with a precedence.

I think this should meet your expectations with at-least-once delivery.

I completely disagree from your interpretation, especially point 1). If the checkpoint mode is manual then nothing should be assumed or done automatically. That's the behavior I'd expect from batch checkpoint.

I haven't checked your changes yet, but what I understood from your message you are actually removing from any user of the library the possibility of guaranteeing at-least-once delivery. At least once means: "move forward if, and only if, I explicitly say so".

Regarding point 2), what will happen if the first record crashes? Will that record be part of the results for the next iterator?

Finally on item 3), what do you mean by "send was OK"? Because if that means "the consumer received the data" then manual will be rendered completely useless.

If you say so about manual as a batch, then why do we need manual at all?
The batch is committed in the end of listener process. If you do something similar with manual in the end of your process, then it is fully the same with a batch.
Therefore I disagree with this and will insist on a different implication for a manual. And it is exactly as I explained before: no problems - go ahead to the next iterator!
That's really how streaming suppose to work. An it must not be a surprise for us why Kinesis API does not provides any hooks for checkpointing.

You know I don't see any logic in the KCL about seeking to specific sequence after processing error:

        try {
            shardRecordProcessor.processRecords(processRecordsInput);
        } catch (Exception e) {
            log.error("ShardId {}: Application processRecords() threw an exception when processing shard ",
                    shardInfoId, e);
            log.error("ShardId {}: Skipping over the following data records: {}", shardInfoId, records);
        }

So, whatever we come up here for our KinesisMessageDrivenChannelAdapter would be just a bonus of Spring flexibility rather than something dictated by standard.
Although I said you before that streaming standard to not handle errors in a streaming library and just always go ahead.
I also told you about a RequestHandlerRetryAdvice to achieve your at-least-once delivery.

I'm not sure from what streaming solution you took an idea about "move forward if, and only if, I explicitly say so", but from my experience it is exactly opposite.

I'm going to revise the logic again in favor of any exception thrown from the record process to be as a signal to rewind. Since this one feels more natural than searching for a specific exception to perform a flow control.

Again: the point of manual is to not force a record process to always commit, but rather at some reasonable point. Just because you may assume that you are in a perfect world, so you don't need an extra DynamoDB overhead for checkpointing.

what will happen if the first record crashes? Will that record be part of the results for the next iterator?

You probably didn't see my RequestShardForSequenceException explanation. It has a sequenceNumber property from where to start the next iterator.
So, if you fail on a first record, you just re-throw this RequestShardForSequenceException with a sequenceNumber from this record.
My new idea is to save a first sequence in a batch, so if you don't call manual checkpoint, we will rewind to that stored sequence on any exception thrown from the processor.

Here is also some info about KCL behavior: https://stackoverflow.com/questions/47056385/kinesis-client-library-record-processor-failure.

So, according to your assumption there is no at least once semantics.
That is possible over there now only if you fail to commit and restart the worker.
Therefore what we are trying to achieve with you here would be as a discrepancy with KCL.
But I guess that OK because native Kinesis API does not dictate us any directions.

Added another commit: bc9a082.

Hope this one might be treated as middle ground from what I see as a manual check-pointing and what you'd expect from at-least-once.