Implement strategy for preventing message loss and preventing message duplication
Closed this issue · 3 comments
When a pod restarts, we need to ensure when it starts up it it reloads previous state and resumes the output stream at the correct location based on where it stopped producing before.
Pod can keep the sqlite database in a persistent volume and reload state from there, or just re-read it from the input topic. Reading from a topic is slower and potentially more expensive depending on how often it needs to be done.
I think properly resuming input/output at the correct offset can be managed in a couple ways:
(1) Commit offsets back to kafka after consuming a batch of polled messages. Next time the pod resumes, it can reload state (either from the topic or from a persistent volume) and resume processing as normal from the committed consumer group offset. This requires a SIGTERM handler to stop the pod from being shutdown in the middle of processing a batch of consumed messages.
(2) Same as (1), but store the most recently processed message offset in a row of the database or file next to the database file.
(3) Don't commit/remember any offsets. When restarted, re-read the whole input stream from scratch into the local database, rebuild all the output messages, and rely on kafka key compaction to deduplicate output messages. I have not tested enough to know if this truly works as we'd want it. There are costs to consider in re-reading gigabytes of data from kafka on each restart, but it may not be a big problem. There is some simplicity benefits to not having any cross-restart persistent state used the pod itself.
If clinvar-raw
operates under a at least once
strategy for kafka consuming, then clinvar-combiner
downstream from it will need to be able to handle duplicate records, which it currently cannot. Since at least once
seems to be a safe and simpler way to implement this across the pipeline than exactly once
, I will go forward with updating clinvar-combiner
to be able to work this way.
The combiner pod should still use a persistent volume for storing the sqlite database, so that dirty flags are persisted and the entire clinvar-raw
stream is not pushed out to the clinvar-combined
topic every time the clinvar-combiner
pod gets restarted. This will reduce read fees in Confluent.
Kafka key-based compaction/deduplication should be turned on for clinvar-raw
and clinvar-combined
. My understanding of that is that consumers will see duplicates if they are continuously consuming new messages, but the duplicates will not be stored persistently in kafka, only the latest version. So if an identical message is produced 2 times, a consumer consuming over that period will receive both, but a new consumer who started consuming after the occurrence of the latest (2nd) instance of the key will only receive the 2nd instance. This will not cause any out-of-order problems.
Current plan is to finish implementing the snapshot functionality so a pod can resume from a known prior state.
But also implement manual consumer group committing as well as a signal handler to gracefully handle pod terminations. If a pod receives a terminate signal, cut off the poll loop but finish the current batch. The batch should be able to finish within a reasonable window. This will also use a persistent volume to store live state.
Completed for clinvar-combiner
. Implementation for clinvar-raw
in progress.