/hera-workflows

Hera is an Argo Workflows Python SDK. Hera aims to make workflow construction and submission easy and accessible to everyone! Hera abstracts away workflow setup details while still maintaining a consistent vocabulary with Argo Workflows.

Primary LanguagePythonMIT LicenseMIT

Hera (hera-workflows)

The Argo was constructed by the shipwright Argus,
and its crew were specially protected by the goddess Hera.

(https://en.wikipedia.org/wiki/Argo)

Open in Gitpod

Build Docs codecov License: MIT

Pypi CondaForge Versions

Downloads Downloads/month Downloads/week

Hera is a Python framework for constructing and submitting Argo Workflows. The main goal of Hera is to make the Argo ecosystem accessible by simplifying workflow construction and submission.

You can watch the introductory Hera presentation at the "Argo Workflows and Events Community Meeting 20 Oct 2021" here!

Table of content

Requirements

Hera requires an Argo server to be deployed to a Kubernetes cluster. Currently, Hera assumes that the Argo server sits behind an authentication layer that can authenticate workflow submission requests by using the Bearer token on the request. To learn how to deploy Argo to your own Kubernetes cluster you can follow the Argo Workflows guide!

Another option for workflow submission without the authentication layer is using port forwarding to your Argo server deployment and submitting workflows to localhost:2746 (2746 is the default, but you are free to use yours). Please refer to the documentation of Argo Workflows to see the command for port forward!

Installation

Source Command
PyPi pip install hera-workflows
Conda conda install -c conda-forge hera-workflows
GitHub repo python -m pip install git+https://github.com/argoproj-labs/hera-workflows --ignore-installed/pip install .

Examples

from hera import Task, Workflow


def say(message: str):
    print(message)


with Workflow("diamond") as w:
    a = Task('a', say, ['This is task A!'])
    b = Task('b', say, ['This is task B!'])
    c = Task('c', say, ['This is task C!'])
    d = Task('d', say, ['This is task D!'])

    a >> [b, c] >> d

w.create()

See the examples directory for a collection of Argo workflow construction and submission via Hera!

Contributing

If you plan to submit contributions to Hera you can install Hera in a virtual environment managed by poetry:

poetry install

In your activated poetry shell, you can utilize the tasks found in tox.ini, e.g.:

To run tests on all supported python versions with coverage run tox:

tox

To list all available tox envs run:

tox -a

To run selected tox envs, e.g. for a specific python version with coverage run:

tox -e py37,coverage

As coverage depends on py37, it will run after py37

See project tox.ini for more details

Also, see the contributing guide!

Comparison

There are other libraries currently available for structuring and submitting Argo Workflows:

  • Couler, which aims to provide a unified interface for constructing and managing workflows on different workflow engines;
  • Argo Python DSL, which allows you to programmaticaly define Argo worfklows using Python.

While the aforementioned libraries provide amazing functionality for Argo workflow construction and submission, they require an advanced understanding of Argo concepts. When Dyno Therapeutics started using Argo Workflows, it was challenging to construct and submit experimental machine learning workflows. Scientists and engineers at Dyno Therapeutics used a lot of time for workflow definition rather than the implementation of the atomic unit of execution - the Python function - that performed, for instance, model training.

Hera presents a much simpler interface for task and workflow construction, empowering users to focus on their own executable payloads rather than workflow setup. Here's a side by side comparison of Hera, Argo Python DSL, and Couler:

HeraCoulerArgo Python DSL

from hera import Task, Workflow


def say(message: str):
    print(message)


with Workflow("diamond") as w:
    a = Task('a', say, ['This is task A!'])
    b = Task('b', say, ['This is task B!'])
    c = Task('c', say, ['This is task C!'])
    d = Task('d', say, ['This is task D!'])

    a >> [b, c] >> d

w.create()

import couler.argo as couler
from couler.argo_submitter import ArgoSubmitter


def job(name):
    couler.run_container(
        image="docker/whalesay:latest",
        command=["cowsay"],
        args=[name],
        step_name=name,
    )


def diamond():
    couler.dag(
        [
            [lambda: job(name="A")],
            [lambda: job(name="A"), lambda: job(name="B")],  # A -> B
            [lambda: job(name="A"), lambda: job(name="C")],  # A -> C
            [lambda: job(name="B"), lambda: job(name="D")],  # B -> D
            [lambda: job(name="C"), lambda: job(name="D")],  # C -> D
        ]
    )


diamond()
submitter = ArgoSubmitter()
couler.run(submitter=submitter)

from argo.workflows.dsl import Workflow

from argo.workflows.dsl.tasks import *
from argo.workflows.dsl.templates import *


class DagDiamond(Workflow):

    @task
    @parameter(name="message", value="A")
    def A(self, message: V1alpha1Parameter) -> V1alpha1Template:
        return self.echo(message=message)

    @task
    @parameter(name="message", value="B")
    @dependencies(["A"])
    def B(self, message: V1alpha1Parameter) -> V1alpha1Template:
        return self.echo(message=message)

    @task
    @parameter(name="message", value="C")
    @dependencies(["A"])
    def C(self, message: V1alpha1Parameter) -> V1alpha1Template:
        return self.echo(message=message)

    @task
    @parameter(name="message", value="D")
    @dependencies(["B", "C"])
    def D(self, message: V1alpha1Parameter) -> V1alpha1Template:
        return self.echo(message=message)

    @template
    @inputs.parameter(name="message")
    def echo(self, message: V1alpha1Parameter) -> V1Container:
        container = V1Container(
            image="alpine:3.7",
            name="echo",
            command=["echo", "{{inputs.parameters.message}}"],
        )

        return container