clingen-data-model/clinvar-streams

Validate that releases received matches the sequence of releases expected

Opened this issue · 0 comments

Read the release sequence produced by the clinvar-ftp-watcher and use that sequence to validate releases on the broad-dsp-clinvar stream before producing the clinvar-raw stream.

Maybe do in the process drop function, or right before it. When the message is read off the stream the release date should be validated and then halt and don't do any further processing if it doesn't match what is expected.

(defn process-clinvar-drop
"Constructs a lazy sequence of output messages based on an input drop file
from the upstream DSP service.
Caller should avoid realizing whole sequence into memory."
;; TODO SPEC
[msg {:keys [storage-protocol]
:or {storage-protocol "gs://"}}]
; 1. parse the drop message to determine where the files are
; this will return the folder and bucket and file manifest
(log/info {:fn :process-clinvar-drop :msg "Processing drop message" :drop-message msg})
(let [parsed-drop-record (if (string? msg) (json/parse-string msg true) msg)
release-date (:release_date parsed-drop-record)]
(lazy-cat
[(create-sentinel-message release-date :start)]
(generate-messages-from-diff parsed-drop-record storage-protocol)
[(create-sentinel-message release-date :end)])))

(doseq [filtered-message
(->> (process-clinvar-drop m-value
(select-keys opts [:STORAGE_PROTOCOL]))