/hpguppi_pypeline

A modular framework for post processing hashpipe captures with python scripts

Primary LanguagePython

Hpguppi_pypeline

Hpguppi_pypeline aims to provide a framework for pipelining the execution of modular Python scripts, enabling the creation of a custom post-processing pipeline for the data captured by a hashpipe (hpguppi_daq) instance.

Approach

The hpguppi_pypeline is governed by key-value pairs within a redis hash. Currently this hash matches the one generated by an hpguppi_redis_gateway instance: hashpipe://${hostname}/${instanceID}/

The post-processing pipeline starts processing the data captured by a hashpipe instance when the DAQSTATE key's value transitions from 'recording' to anything else. The stages in the pipeline are determined by the names listed in the POSTPROC key's value. At each stage, the run() is called from a python script with a filename related to the stage's name, as listed in the POSTPROC key's value. An artificial stage is inferred for the hpguppi RAW acquisition with the name hpguppi.

The python script for a stage is able to provide the names of 3 keys whose values will be pre-processed and passed as argument's to its run(): a key for INPUTS, ARGUMENTS and ENVIRONMENT variables. Because the values of keys are just strings, they are preprocessed by the primary pypeline script and keywords in the strings are replaced with dynamic values. In this way the arguments of a stage's run() can be the outputs of a previous stage's run(): for instance an INPUT key with the value of hpguppi would be pre-processed to replace hpguppi with the single filepath (sans %d{4}.raw) of the latest RAW file recorded (which is the output of the artificial hpguppi stage). The primary script also holds some static values which can be received by each stage's run().

The INPUT and ARGUMENT keys's values can be comma delimited to provide a number of inputs that must be run() for the related stage. The permutations of {INPUT, ARGUMENT} for each stage are exhausted by the primary pypeline script.

At the end of each stage the primary stage moves on to the next stage, if there is another listed in the POSTPROC key's value, otherwise it rewinds up the list of stages to the last stage with an input/argument permutation that has not been processed.

Stages can produce more than one output (each run() must return a list). The permutations of a stage's input argument is exhaustive combination of the INPUT's references to stages' outputs (as listed in the value of the stage's PROC_INP_KEY).

Of course, it may be desired that a stage's list of outputs is input all at once, instead of sequentially. To this end, and a few other ends, there are syntactical markers on the keywords within INPUT values that adjust the pre-processing applied.

Post-processing Pipeline Stages (POSTPROC)

The value of the POSTPROC key space delimits the list of stage-scripts that make up the post-processing pipeline. Each stage-name listed names the postproc_stagename.py script to be loaded from the directory of the primary hpguppi_pypeline.py script.

Stage Requirements

An artificial stage is inferred for the hpguppi RAW acquisition with the name hpguppi. Its output is the single filepath (sans %d{4}.raw) of the latest RAW file recorded.

Each stage's script is expected to have a run() with the following declaration, as well as the following 4 variables:

def run(arg::str, input::list, env::str):
	return outputs::list
  • PROC_ARG_KEY : names the key whose value determines the 1st argument for run()
  • PROC_INP_KEY : names the key whose value determines the 2nd argument for run()
  • PROC_ENV_KEY : names the key whose value determines the 3rd argument for run()
  • PROC_NAME : the display name of the stage
  • POPENED : (situational) a list of the Popen objects spawned

Stages Spawning Detached Processes

A convention is in place for stages that spawn detached processes (with subprocess.Popen): the stage's name should end with an ampersand (&) and the Popen objects should be collected in the stage's global POPENED. With these two pieces in place, the primary pypeline script will await the termination of a stage's previous POPENED. See the mv_& stage for a practical example

INPUT Keywords and Modifiers

The values of INPUT keys are preprocessed for the names of previous stages, which are the only keywords processed. It is assumed that each word (sequence of characters surrounded by spaces) is the name of a previous stage. It is possible however, to mark a word as 'verbatim input' by prefixing the word with &, in which case only the ampersand is removed. Otherwise the occurence of a stage's name is replaced by one of that stage's output values (the output values are exhausted across reruns of the referencing stage). To have a stage's name replaced by its last input, the name can be marked with a prefixed ^. To have the name replaced by all of the stage's outputs (all at once), prefix the stage's name with *.

  • &: Verbatim input word
  • ^: Input of named-stage's last input
  • *: Exhaustive input of named-stage's output

Mutliple words in the INPUT value are listed separated by spaces, and multiple input-sets are separated by commas (,).

ARGUMENT and ENVIRONMENT Keywords

Keywords within the ARGUMENT and ENVIRONMENT keys' values are surrounded by $, which are replaced by values held by the primary script.

  • inst: the instanceID that the script is attached to
  • hnme: the hostname of the machine
  • stem: the stem of the latest RAW dump
  • beg: the time.time() value produced as DAQSTATE changed to recording
  • end: the time.time() value produced as DAQSTATE changed from recording
  • time: the duration (in seconds) of each preceding stage's run as a list of floats
  • proc: the PROC_NAME of each preceding stage's run as a list of strings

Mutliple words in the ARGUMENT and ENVIRONMENT values are listed separated by spaces, and multiple argument-sets are separated by commas (,).

An Example Showcased by the Value of Stages' Keys

hashpipe_check_status is used to set the value of key -k to -s.

Specify the 'postproc_*' names of the stages to be run in the post-processing, in order

  • hashpipe_check_status -k POSTPROC -s "rawspec turboseti candidate_filter log rm"

Specify that the input of rawspec (RWS) is the output of hpguppi, then the static arguments and the environment variables of the rawspec command:

  • hashpipe_check_status -k PPRWSINP -s "hpguppi"
  • hashpipe_check_status -k PPRWSARG -s "-f 116480 -t 2 -I 1.0 -d /mnt/buf$inst$/rawspec/$stem$/"
  • hashpipe_check_status -k PPRWSENV -s "CUDA_VISIBLE_DEVICES:$inst$"

Specify that the input of turboSETI (TBS) is the output of rawspec, then the static arguments and the environment variables of the turboSETI command:

  • hashpipe_check_status -k PPTBSINP -s "rawspec"
  • hashpipe_check_status -k PPTBSARG -s "-M 10 -g n -p 12 -n 1440 -o /mnt/buf$inst$/turboseti/$stem$/"
  • hashpipe_check_status -k PPTBSENV -s "CUDA_VISIBLE_DEVICES:$inst$"

Specify that the input of candidate filter (CND) is the output and the input of turboseti, then the static arguments of the candidate filter script

  • hashpipe_check_status -k PPCNDINP -s "turboseti ^turboseti"
  • hashpipe_check_status -k PPCNDINP -s "-r 1 -s 10 -o auto -n bla"

Development of a Bespoke Pipeline

Development starts with creating a 'stage' in a Python script postproc_stagename.py. Setup the names of the keys required by creating POSTPROC_ARG/INP/ENV_KEY variables (set the value of ARG/ENV_KEY to None if they are to be ignored). Then create the run(argstr, inputs, envvar) function that details the module's process. Finally ensure that the redis hash has the necessary keys for the module, with appropriate values.

Exemplary modules exist for rawspec and turboSETI as well as some others, within this repository.