/mundone

A Python library to get mundane tasks done

Primary LanguagePythonThe UnlicenseUnlicense

Mundone

Mundone is a Python library to get mundane tasks done by building complex workflows.

Requirements

  • Python 3.9+

Installation

pip install mundone

Usage

Task

A Task is a basic processing unit to execute functions.

Parameters

  • fn (Callable): function to be executed.
  • args (list | tuple | None): list or tuple of arguments to be passed to fn.
  • kwargs (dict | None): dictionary of keyword arguments to be passed to fn.
  • name (str | None): name of the task. Used to name the job when executing the task using the LSF or Slurm job scheduler (defaults to the name of fn).
  • scheduler (dict | None): dictionary specifying the job scheduler to use and job requirements:
    • type (str): must be lsf or slurm.
    • queue (str | None): LSF queue or Slurm partition.
    • cpu (int | None): specifies to number of processors required by the job.
    • gpu (int | str | None): specifies properties of GPU resources required by the job.
    • mem (int | float | None): specifies the memory (in MB) required by the job.
    • tmp (int | float | None): specifies the amount temporary disk space required by the job.
  • requires (list): list of the names of tasks the task directly depends on.
  • random_suffix (bool): if True (default), temporary files are created using the name of the task and a random suffix.
  • keep (bool): if True (default), temporary files are deleted once the task has completed.

Methods

is_ready()

Returns True if all

is_running() is_done() is_successful() start(dir: str) terminate(force: bool = False) clean(seconds: int = 30, max_attempts: int = 5) wait(seconds: int = 10) poll() collect()

Properties

  • state (str): represents the current state of the task (pending, running, failed, cancelled, done).
  • cputime (int | None): CPU time, for tasks that completed using a job scheduler.
  • maxmem (int | None): highest memory used, for tasks that completed using a job scheduler.
  • stdout (str): standard output of the task.
  • stderr (str): standard error of the task.
  • result: whatever is returned by the task's function, or None if the task has not successfully completed.
  • submit_time (datetime.datetime | None): date/time at which the task started.
  • start_time (datetime.datetime | None): date/time at which the task actually started (when running task using job scheduler).
  • end_time (datetime.datetime | None): date/time at which the task finished.

Example

The following code defines a function which uses hmmsearch to search Swiss-Prot protein sequences using Pfam profile hidden Markov models, and a Task to execute the function using the LSF job scheduler.

import subprocess as sp
from mundone import Task


def hmmsearch(hmmfile: str, seqdb: str, output: str, **kwargs):
    num_threads = kwargs.get("threads")
    
    cmd = ["hmmsearch", "-o", output]
    
    if isinstance(num_threads, int) and num_threads >= 0:
        cmd += ["--cpu", str(num_threads)]
        
    cmd += [hmmfile, seqdb]
    sp.run(cmd, check=True)


hmmfile = "Pfam-A.hmm"
seqdb = "uniprot_sprot.fasta"
output = "Pfam-A.hits.out"
task = Task(hmmsearch, [hmmfile, seqdb, output], {"threads": 8},
            name="run-hmmsearch",
            scheduler={
                "type": "lsf",
                "queue": "standard",
                "cpu": 8,
                "memory": 1000
            })

task.start()
task.wait()
if task.is_successful():
    print("ok")
else:
    print(f"error: {task.stdout} {task.stderr}")

Pool

A Pool controls a pool of worker A task pool object which controls a pool of worker processes to which jobs can be submitted. It supports asynchronous results with timeouts and callbacks and has a parallel map implementation.

Parameters

  • path (str):
  • max_running (int):
  • kill_on_exit (bool):
  • threads (int):

Methods

submit(task: Task)

dfg

as_completed(wait: bool = False)

terminate()

Example

Workflow

Parameters

  • tasks
  • name
  • id
  • dir
  • database

Methods

run(tasks: list[str] | None = None, dry_run: bool = False, max_retries: int = 0, monitor: bool = True)

terminate()

Example