Ingestion loop when Snowflake Ingestion fails
lsimac opened this issue · 3 comments
Hi,
about a week and a half ago we found an edge case that is not covered in Snowflake Streaming ingestion (potential bug). Edge case occurs when inserting rows into Snowflake fails and when the next message that needs to be inserted is the null message. The root cause analysis of this issue is provided down below.
Root cause analysis
Root cause analysis started by inspecting Kafka Connector logs for the connector around the time when Consumer lag started to increase.
While analyzing the logs, a log entry that indicates that Snowpipe Streaming ingestion failed was found (for partitions 37 and 10)
- [2024-01-02 22:27:09,854] WARN [connector-name|task-3] [SF_KAFKA_CONNECTOR] Failed Attempt to invoke the insertRows API for buffer:StreamingBuffer{numOfRecords=522, bufferSizeBytes=50090186, firstOffset=18977858, lastOffset=18978380} (com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel:82)
-
[2024-01-02 22:27:09,855] WARN [connector-name|task-3] [SF_KAFKA_CONNECTOR] [INSERT_ROWS_FALLBACK] Re-opening channel:PROD.SCHEMA.TABLE.TOPIC_37 (com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel:82)
-
[2024-01-02 22:27:09,935] WARN [connector-name|task-3] [SF_KAFKA_CONNECTOR] [INSERT_ROWS_FALLBACK] Fetching offsetToken after re-opening the channel:PROD.SCHEMA.TABLE.TOPIC_37 (com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel:82)
-
[2024-01-02 22:27:10,025] WARN [connector-name|task-3] [SF_KAFKA_CONNECTOR] [RESET_PARTITION] E**mptying current buffer:**StreamingBuffer{numOfRecords=0, bufferSizeBytes=0, firstOffset=-1, lastOffset=-1} for Channel:PROD.SCHEMA.TABLE.TOPIC_37 due to reset of offsets in kafka (com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel:82)
-
[2024-01-02 22:27:10,025] WARN [connector-name|task-3] [SF_KAFKA_CONNECTOR] [INSERT_ROWS_FALLBACK] Channel:PROD.SCHEMA.TABLE.TOPIC_37, OffsetRecoveredFromSnowflake:18977478, reset kafka offset to:18977479 (com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel:82)
-
[2024-01-02 22:27:10,026] WARN [connector-name|task-3] [SF_KAFKA_CONNECTOR] [INSERT_BUFFERED_RECORDS] Failure inserting buffer:StreamingBuffer{numOfRecords=522, bufferSizeBytes=50090186, firstOffset=18977858, lastOffset=18978380} for channel:PROD.SCHEMA.TABLE.TOPIC_37 (com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel:82)
When searching in Snowflake Sink codebase where the logs are created it can be found that logs are generated in insertRowsWithFallback method, to be more specific streamingApiFallbackSupplier
When insertRow fallback method is called variable isOffsetResetInKafka
is set to true
(inside method resetChannelMetadataAfterRecovery, which is part of streamingApiFallbackSupplier
).
isOffsetResetInKafka
variable is important during record ingestion because if isOffsetResetInKafka is set to true then for the record to be accepted for ingestion if-statement (kafkaSinkRecord.kafkaOffset() - currentProcessedOffset) == 1L
(GitHub link) must be true, otherwise if isOffsetResetInKafka
is false that case is not checked.
The case when isOffsetResetInKafka is set to true and if-statement must be checked caused a problem for us because of the NULL/TOMBSTONE message (otherwise it would not be a problem)
(Focusing on partition 37, but same thing is for partition 10) In our case, last successful committed offset (before failed ingestion) in Snowflake for partition 37 was 18977478
. After that next ingestion for partition 37 failed and insertRow fallback was called and that meant two things:
- isOffsetResetInKafka is set to true (which implies that the if-statement mentioned before must be true for the next record to be accepted)
- consumer group offset for that partition will be reseted to the last committed offset in Snowflake ( GitHub link ), which
18977478
. - Those two statements imply that the next record that can be ingested into Snowflake for partition 37 must have offset
18977479
Everything would work fine if the record in partition 37 with offset 18977479
was not NULL message. That NULL message caused Kafka Connect to be in the loop where the same batch of records was processing over and over but nothing was ingested.
The reason for that is that the record with offset 18977479
never got to that if-statement because it was filtered out before in the code (because it is NULL) in insert method of SnowfalkeSinkServiceV2
class. So the first record that got to the if-statement was actually record with offset 18977480
and he did not pass the if-statement (18977480(kafkaSinkRecord.kafkaOffset) - 18977478(currentProcessedOffset) is not equal to 1), and then record with offset 18977481
and he also did not pass the if-statement and so on, and then 18977482
…..
- insert (
SnowfalkeSinkServiceV2
) → insertRecordToBuffer(TopicPartitionChannel
) → shouldIgnoreAddingRecordToBuffer (TopicPartitionChannel) → if-statement
So the only way to break that loop of processing and not ingesting the same batch was to somehow change the value of the isOffsetResetInKafka
back to false so that the if-statement would be skipped (GitHub Link). Since the default value for isOffsetResetInKafka
is false solution that we used to achieve that was simply to delete and re-create the connector
After the connector was recreated messages were consumed by Kafka Connect and ingested into a Snowflake
Important Settings
- Kafka Connect Snowflake sing version: 2.1.2
- snowflake.ingestion.method: SNOWPIPE_STREAMING
- behavior.on.null.values: IGNORE
Hi @sfc-gh-xhuang,
thank you for your reply and provided information. When the new version is out we will test this use-case and if everything is okay I will close this issue.
@lsimac please upgrade to 2.2.0 and see if this issue is resolved, thanks!