/hello-mess-nf

A Nextflow pipeline to experiment with DSL2 + Zero Mess

Primary LanguageNextflowMIT LicenseMIT

logo

Hello Mess Pipeline

Nextflow

This pipeline demonstrates how to integrate the Zero-Mess library to send events to a Kafka-compatible broker.

Description

4 processes, each of them appends a line (in a different language) to a file consumed by the next process. At the end of the execution, the content of the final file is shown.

Messages

There are two types of messages:

  • Stateful messages (PipelineMessage, ProcessMessage)
  • Stateless messages (BashMessage)

Stateful messages are used inside the main pipeline code and benefit from the current state of the execution.

Stateless messages are executed locally or remotely inside a process execution (task). They don't have a state nor can access to the pipeline's state. They need more parameters as they must be 'self-contained'.

Pipeline Messages

Pipeline messages can be sent at any point in the pipeline Groovy code (main workflow, subworkflows, etc.).

The following code shows how to build and send a message that notifies the pipeline has started:

PipelineMessage.started().forTopic('pipelineEvents')
        .data('launch time', "${workflow.start.format('dd-MMM-yyyy HH:mm:ss')}")
        .data('hope', 'wish it works').send()

or

PipelineMessage.started(workflow).forTopic('pipelineEvents')
        .data('hope', 'wish it works').send()

In the latter case, PipelineMessage extracts some default metadata from the workflow object and add them to the message.

data() can be invoked as many times as needed, and it keeps appending information to the message payload.

Process Messages

A process can log started and completed events using ProcessMessage inside the beforeScript and afterScript directives.

These two directives allow to specify one or more commands to execute in a shell before and after the process execution. For this reason, the buildCommand() should be invoked instead of send()

process A {
    // Messaging
    beforeScript ProcessMessage.started('A').forTopic('processEvents').data('my info','message').buildCommand()
    
    afterScript ProcessMessage.completed('A').forTopic('processEvents').data('another info','another message').buildCommand()

    input:
    
    output:
    
    """
    """
}

As for PipelineMessage, data() can be invoked as many times as needed.

Bash messages

If the process' script is a Bash script, the BashMessage function can be used at any point in the script.

The expected parameters are:

  • the topic where to publish the message
  • the data for the payload. Format:
    "name1":"value1","name2":"value2","name3":"value3"
    surrounded by single quotes. For example:
BashMessage 'taskEvents' '"process":"A"','"messageStatus":"about to echo from A"'
echo 'Hello from A' > helloA.txt
if [[ $? != 0 ]]; then
    BashMessage 'A.failure' '"process":"A"',"exitStatus":"127","message":"A failed to echo"'
    exit 127
fi

Execution

Clone the project and run:

nextflow run main.nf  

or with pipeline sharing, just run:

nextflow run eipm/hello-mess-nf

Note: The pipeline will run but won't publish any message. For that, you need to start an instance of Kafka-Dispatcher and configure the related endpoint in nextflow.config.