a python Module to create DAG Task and Manage Task on SGE
Python (2.5 or later)
Sun Grid Engine(SGE)
DAGflow has been tested on CentOS 7.2.1511 and Ubuntu 12.04.4 LTS
Download the code and unzip it into desired installation directory
git clone https://github.com/FlyPythons/DAGflow.git
The following tutorial shows how to create a DAGflow and run it
Now i have a set of fasta files and i want to blast them to a db named 'db.fasta'.
To complete this work, a workflow as following is needed
At first, you should write your workflow script
import os
from dagflow import DAG, Task, ParallelTask, do_dag
inputs = ['1.fasta', "2.fasta", "3.fasta", "4.fasta"]
db = "db.fasta"
db = os.path.abspath(db)
# create a DAG object
my_dag = DAG("blast")
# create the first task 'make_db'
make_db = Task(
id="make_db", # your task id, should be unique
work_dir=".", # you task work directory
type="local", # the way your task run. if "sge", task will submit with qsub
option={}, # the option of "sge" or "local"
script="makeblastdb -in %s -dbtype nucl" % db # the command of the task
)
# when you create a task, then add it to DAG object
my_dag.add_task(make_db)
# then add blast tasks
blast_tasks = ParallelTask(id="blast",
work_dir="{id}",
type="sge",
option="-pe smp 4 -q all.q",
script="blastn -in {query} -db %s -outfmt 6 -out {query}.m6",
query=inputs)
my_dag.add_task(*blast_tasks)
make_db.set_downstream(*blast_tasks)
# add blast_join task to join blast results
blast_join = Task(
id="blast_join",
work_dir=".",
type="local", # option is default
script="cat */*.m6 > blast.all.m6"
)
# you should always remember to add you task to DAG object when created
my_dag.add_task(blast_join)
# this task need a list of tasks in blast_task all done
blast_join.set_upstream(*blast_tasks)
# all of you tasks were added to you workflow, you can run it
do_dag(my_dag)
Now, your workflow script is completed, you can name it as 'workflow.py'
You can run you workflow script as a python script using the following commands.
python workflow.py
For some reason, you workflow was broken with some tasks undone.
You can use the same command python workflow.py
to re-run the undone jobs.
Sometimes you may want to add a workflow to another workflow, this can be down as following:
from DAGflow import *
# two workflow wf1 and wf2
wf1 = DAG("workflow1")
wf2 = DAG("workflow2")
task1 = Task(
id="task",
work_dir=".",
script="hello, i am a task"
)
# set wf2 depends on wf1
wf1.add_dag(wf2)
# set task1 depends on wf2
task1.set_upstream(wf2.tasks.values())