/pyppl

A pipeline framework in python for Linux

Primary LanguagePythonApache License 2.0Apache-2.0

pyppl - A python lightweight pipeline framework

Pypi Github Gitbook

Documentation | API | Change log | Fork me

Features

  • Supports of any language to run you processes.
  • Automatic deduction of input based on the process dependencies. Details
  • Different ways of exporting output files (including gzip). Details
  • Process caching (including caching using exported files). Details
  • Flexible placeholder handling in output and script settings. Details
  • APIs to modify channels. Details
  • Different runners to run you processes on different platforms. Details
  • Runner customization (you can define your own runner). Details
  • Callbacks of processes. Details
  • Error handling for processes. Details
  • Configuration file support for pipelines. Details
  • Flowchat in DOT for your pipelines. Details
  • Aggregations (a set of processes predefined). Details
  • Detailed documentation and API documentation.

Requirements

  • Linux (Maybe works on OSX, not tested)
  • Python 2.7

Installation

# install latest version
git clone https://github.com/pwwang/pyppl.git
cd pyppl
python setup.py install
# or simly:
pip install git+git://github.com/pwwang/pyppl.git

# install released version
pip install pyppl

First script

To sort 5 files:

from pyppl import pyppl, proc

pSort         = proc()
# Use sys.argv as input channel,
# because this proc does not have any dependents
# infile will be the placeholder to access it in your output assignment
# and script.
# The ":file" denotes the type of input, a symbol link will be created in 
# the input directory
pSort.input   = "infile:file"
# Output file (the ":file" sign) will be generated in job output directory
# "infile" is the full path of the input file, "fn" takes its filename (without extension)
pSort.output  = "outfile:file:{{infile | fn}}.sorted"
# You can use placeholders to access input and output
pSort.script  = """
  sort -k1r {{infile}} > {{outfile}}
""" 

# Assign the entrance process
pyppl().starts(pSort).run()

Run python test.py test?.txt will output: First-script-output

A toy example

Sort each 5 file and then combine them into one file

from pyppl import pyppl, proc

pSort         = proc()
pSort.input   = "infile:file"
pSort.output  = "outfile:file:{{infile | fn}}.sorted"
pSort.script  = """
  sort -k1r {{infile}} > {{outfile}}
""" 

pCombine      = proc()
# Will use pSort's output channel as input
pCombine.depends  = pSort
# Modify the channel, "collapse" returns the common directory of the files
# The files are at: <workdir>/<job.id>/output/test?.txt.sorted
# So the common directory is <workdir>/
pCombine.input    = {"indir:file": lambda ch: ch.collapse()}
pCombine.output   = "outfile:file:{{indir | fn}}.sorted"
# Export the final result file
pCombine.exdir    = "./export" 
pCombine.script   = """
> {{outfile}}
for infile in {{indir}}/*/output/*.sorted; do
	cat $infile >> {{outfile}}
done
"""

pyppl().starts(pSort).run()

Run python test.py test?.txt, then you will find the combined file named output.sorted in ./export.

Using a different interpreter:

pPlot = proc()
# Specify input explicitly
pPlot.input   = {"infile:file": ["./data.txt"]}
# data.png
pPlot.output  = "outfile:file:{{infile | fn}}.png"
pPlot.lang    = "Rscript"
pPlot.script  = """
data <- read.table ("{{infile}}")
H    <- hclust(dist(data))
png (figure = “{{outfile}}”)
plot(H)
dev.off()
"""

Using a different runner:

pPlot = proc()
pPlot.input   = {"infile:file": ["./data1.txt", "./data2.txt", "./data3.txt", "./data4.txt", "./data5.txt"]}
pPlot.output  = "outfile:file:{{infile.fn}}.png"
pPlot.lang    = "Rscript"
pPlot.runner  = "sge"
# run all 5 jobs at the same time
pPlot.forks   = 5
pPlot.script  = """
data <- read.table ("{{infile}}")
H    <- hclust(dist(data))
png (figure = “{{outfile}}”)
plot(H)
dev.off()
"""
pyppl({
  "proc": {
    "sgeRunner": {
      "sge.q" : "1-day"
    }
  }
}).starts(pPlot).run()

Draw the pipeline chart

pyppl can generate the graph in DOT language.

# "A" is the tag of p1
p1 = proc("A")
p2 = proc("B")
p3 = proc("C")
p4 = proc("D")
p5 = proc("E")
p6 = proc("F")
p7 = proc("G")
p8 = proc("H")
p9 = proc("I")
p1.script = "echo 1"
p1.input  = {"input": ['a']}
p8.input  = {"input": ['a']}
p9.input  = {"input": ['a']}
p2.input  = "input"
p3.input  = "input"
p4.input  = "input"
p5.input  = "input"
p6.input  = "input"
p7.input  = "input"
p1.output = "{{input}}" 
p2.script = "echo 1"
p2.output = "{{input}}" 
p3.script = "echo 1"
p3.output = "{{input}}" 
p4.script = "echo 1"
p4.output = "{{input}}" 
p5.script = "echo 1"
p5.output = "{{input}}" 
p6.script = "echo 1"
p6.output = "{{input}}" 
p7.script = "echo 1"
p7.output = "{{input}}" 
p8.script = "echo 1"
p8.output = "{{input}}" 
p9.script = "echo 1"
p9.output = "{{input}}" 
"""
			   1A         8H
			/      \      /
		 2B           3C
			\      /
			  4D(e)       9I
			/      \      /
		 5E          6F(e)
			\      /
			  7G(e)
"""
p2.depends = p1
p3.depends = [p1, p8]
p4.depends = [p2, p3]
p4.exdir   = "./"
p5.depends = p4
p6.depends = [p4, p9]
p6.exdir   = "./"
p7.depends = [p5, p6]
p7.exdir   = "./"
pyppl().starts(p1, p8, p9).flowchart()
# saved to dot file: test.pyppl.dot
# saved to svg file: test.pyppl.svg
# run it after the chart generated:
# pyppl().starts(p1, p8, p9).flowchart().run()

test.pyppl.dot:

digraph PyPPL {
	"p1.A" -> "p2.B"
	"p1.A" -> "p3.C"
	"p8.H" -> "p3.C"
	"p2.B" -> "p4.D"
	"p3.C" -> "p4.D"
	"p4.D" -> "p5.E"
	"p4.D" -> "p6.F"
	"p9.I" -> "p6.F"
	"p5.E" -> "p7.G"
	"p6.F" -> "p7.G"
	"p6.F" [shape=box, style=filled, color="#f0f998", fontcolor=red]
	"p1.A" [shape=box, style=filled, color="#c9fcb3"]
	"p8.H" [shape=box, style=filled, color="#c9fcb3"]
	"p9.I" [shape=box, style=filled, color="#c9fcb3"]
	"p7.G" [shape=box, style=filled, color="#fcc9b3" fontcolor=red]
	"p4.D" [shape=box, style=filled, color="#f0f998", fontcolor=red]
}

You can use different dot renderers to render and visualize it.

test.pyppl.svg:
PyPPL chart