Documentation | API | Change log | Fork me
- Supports of any language to run you processes.
- Automatic deduction of input based on the process dependencies. Details
- Different ways of exporting output files (including
gzip
). Details - Process caching (including caching using exported files). Details
- Flexible placeholder handling in output and script settings. Details
- APIs to modify channels. Details
- Different runners to run you processes on different platforms. Details
- Runner customization (you can define your own runner). Details
- Callbacks of processes. Details
- Error handling for processes. Details
- Configuration file support for pipelines. Details
- Flowchat in DOT for your pipelines. Details
- Aggregations (a set of processes predefined). Details
- Detailed documentation and API documentation.
- Linux (Maybe works on OSX, not tested)
- Python 2.7
# install latest version
git clone https://github.com/pwwang/pyppl.git
cd pyppl
python setup.py install
# or simly:
pip install git+git://github.com/pwwang/pyppl.git
# install released version
pip install pyppl
To sort 5 files:
from pyppl import pyppl, proc
pSort = proc()
# Use sys.argv as input channel,
# because this proc does not have any dependents
# infile will be the placeholder to access it in your output assignment
# and script.
# The ":file" denotes the type of input, a symbol link will be created in
# the input directory
pSort.input = "infile:file"
# Output file (the ":file" sign) will be generated in job output directory
# "infile" is the full path of the input file, "fn" takes its filename (without extension)
pSort.output = "outfile:file:{{infile | fn}}.sorted"
# You can use placeholders to access input and output
pSort.script = """
sort -k1r {{infile}} > {{outfile}}
"""
# Assign the entrance process
pyppl().starts(pSort).run()
Run python test.py test?.txt
will output:
Sort each 5 file and then combine them into one file
from pyppl import pyppl, proc
pSort = proc()
pSort.input = "infile:file"
pSort.output = "outfile:file:{{infile | fn}}.sorted"
pSort.script = """
sort -k1r {{infile}} > {{outfile}}
"""
pCombine = proc()
# Will use pSort's output channel as input
pCombine.depends = pSort
# Modify the channel, "collapse" returns the common directory of the files
# The files are at: <workdir>/<job.id>/output/test?.txt.sorted
# So the common directory is <workdir>/
pCombine.input = {"indir:file": lambda ch: ch.collapse()}
pCombine.output = "outfile:file:{{indir | fn}}.sorted"
# Export the final result file
pCombine.exdir = "./export"
pCombine.script = """
> {{outfile}}
for infile in {{indir}}/*/output/*.sorted; do
cat $infile >> {{outfile}}
done
"""
pyppl().starts(pSort).run()
Run python test.py test?.txt
, then you will find the combined file named output.sorted
in ./export
.
pPlot = proc()
# Specify input explicitly
pPlot.input = {"infile:file": ["./data.txt"]}
# data.png
pPlot.output = "outfile:file:{{infile | fn}}.png"
pPlot.lang = "Rscript"
pPlot.script = """
data <- read.table ("{{infile}}")
H <- hclust(dist(data))
png (figure = “{{outfile}}”)
plot(H)
dev.off()
"""
pPlot = proc()
pPlot.input = {"infile:file": ["./data1.txt", "./data2.txt", "./data3.txt", "./data4.txt", "./data5.txt"]}
pPlot.output = "outfile:file:{{infile.fn}}.png"
pPlot.lang = "Rscript"
pPlot.runner = "sge"
# run all 5 jobs at the same time
pPlot.forks = 5
pPlot.script = """
data <- read.table ("{{infile}}")
H <- hclust(dist(data))
png (figure = “{{outfile}}”)
plot(H)
dev.off()
"""
pyppl({
"proc": {
"sgeRunner": {
"sge.q" : "1-day"
}
}
}).starts(pPlot).run()
pyppl
can generate the graph in DOT language.
# "A" is the tag of p1
p1 = proc("A")
p2 = proc("B")
p3 = proc("C")
p4 = proc("D")
p5 = proc("E")
p6 = proc("F")
p7 = proc("G")
p8 = proc("H")
p9 = proc("I")
p1.script = "echo 1"
p1.input = {"input": ['a']}
p8.input = {"input": ['a']}
p9.input = {"input": ['a']}
p2.input = "input"
p3.input = "input"
p4.input = "input"
p5.input = "input"
p6.input = "input"
p7.input = "input"
p1.output = "{{input}}"
p2.script = "echo 1"
p2.output = "{{input}}"
p3.script = "echo 1"
p3.output = "{{input}}"
p4.script = "echo 1"
p4.output = "{{input}}"
p5.script = "echo 1"
p5.output = "{{input}}"
p6.script = "echo 1"
p6.output = "{{input}}"
p7.script = "echo 1"
p7.output = "{{input}}"
p8.script = "echo 1"
p8.output = "{{input}}"
p9.script = "echo 1"
p9.output = "{{input}}"
"""
1A 8H
/ \ /
2B 3C
\ /
4D(e) 9I
/ \ /
5E 6F(e)
\ /
7G(e)
"""
p2.depends = p1
p3.depends = [p1, p8]
p4.depends = [p2, p3]
p4.exdir = "./"
p5.depends = p4
p6.depends = [p4, p9]
p6.exdir = "./"
p7.depends = [p5, p6]
p7.exdir = "./"
pyppl().starts(p1, p8, p9).flowchart()
# saved to dot file: test.pyppl.dot
# saved to svg file: test.pyppl.svg
# run it after the chart generated:
# pyppl().starts(p1, p8, p9).flowchart().run()
test.pyppl.dot
:
digraph PyPPL {
"p1.A" -> "p2.B"
"p1.A" -> "p3.C"
"p8.H" -> "p3.C"
"p2.B" -> "p4.D"
"p3.C" -> "p4.D"
"p4.D" -> "p5.E"
"p4.D" -> "p6.F"
"p9.I" -> "p6.F"
"p5.E" -> "p7.G"
"p6.F" -> "p7.G"
"p6.F" [shape=box, style=filled, color="#f0f998", fontcolor=red]
"p1.A" [shape=box, style=filled, color="#c9fcb3"]
"p8.H" [shape=box, style=filled, color="#c9fcb3"]
"p9.I" [shape=box, style=filled, color="#c9fcb3"]
"p7.G" [shape=box, style=filled, color="#fcc9b3" fontcolor=red]
"p4.D" [shape=box, style=filled, color="#f0f998", fontcolor=red]
}
You can use different dot renderers to render and visualize it.