clingen-data-model/clinvar-streams

Implement strategy for preventing message loss and preventing message duplication

Closed this issue · 3 comments

When a pod restarts, we need to ensure when it starts up it it reloads previous state and resumes the output stream at the correct location based on where it stopped producing before.

Pod can keep the sqlite database in a persistent volume and reload state from there, or just re-read it from the input topic. Reading from a topic is slower and potentially more expensive depending on how often it needs to be done.

I think properly resuming input/output at the correct offset can be managed in a couple ways:
(1) Commit offsets back to kafka after consuming a batch of polled messages. Next time the pod resumes, it can reload state (either from the topic or from a persistent volume) and resume processing as normal from the committed consumer group offset. This requires a SIGTERM handler to stop the pod from being shutdown in the middle of processing a batch of consumed messages.
(2) Same as (1), but store the most recently processed message offset in a row of the database or file next to the database file.
(3) Don't commit/remember any offsets. When restarted, re-read the whole input stream from scratch into the local database, rebuild all the output messages, and rely on kafka key compaction to deduplicate output messages. I have not tested enough to know if this truly works as we'd want it. There are costs to consider in re-reading gigabytes of data from kafka on each restart, but it may not be a big problem. There is some simplicity benefits to not having any cross-restart persistent state used the pod itself.

If clinvar-raw operates under a at least once strategy for kafka consuming, then clinvar-combiner downstream from it will need to be able to handle duplicate records, which it currently cannot. Since at least once seems to be a safe and simpler way to implement this across the pipeline than exactly once, I will go forward with updating clinvar-combiner to be able to work this way.

The combiner pod should still use a persistent volume for storing the sqlite database, so that dirty flags are persisted and the entire clinvar-raw stream is not pushed out to the clinvar-combined topic every time the clinvar-combiner pod gets restarted. This will reduce read fees in Confluent.

Kafka key-based compaction/deduplication should be turned on for clinvar-raw and clinvar-combined. My understanding of that is that consumers will see duplicates if they are continuously consuming new messages, but the duplicates will not be stored persistently in kafka, only the latest version. So if an identical message is produced 2 times, a consumer consuming over that period will receive both, but a new consumer who started consuming after the occurrence of the latest (2nd) instance of the key will only receive the 2nd instance. This will not cause any out-of-order problems.

https://kafka.apache.org/documentation/#compaction

Current plan is to finish implementing the snapshot functionality so a pod can resume from a known prior state.

But also implement manual consumer group committing as well as a signal handler to gracefully handle pod terminations. If a pod receives a terminate signal, cut off the poll loop but finish the current batch. The batch should be able to finish within a reasonable window. This will also use a persistent volume to store live state.

Completed for clinvar-combiner. Implementation for clinvar-raw in progress.