Skip to content

Commit

Permalink
Initial import
Browse files Browse the repository at this point in the history
  • Loading branch information
bjpop committed Aug 26, 2011
0 parents commit eeca2ca
Show file tree
Hide file tree
Showing 8 changed files with 467 additions and 0 deletions.
23 changes: 23 additions & 0 deletions chrom_info.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import sys

from shell_command import shellCommand

def chromInfo(refFile):
(stdout, stderr, code) = shellCommand('infoseq -noheading ' + refFile)
if (code != 0):
print('error from infoseq: ')
print(stderr)
sys.exit(code)
lines = stdout.splitlines()

# extract the chromosome name and length from the output of infoseq
chroms = []
for l in lines:
words = l.split()
# just skip bad lines
if len(words) >= 6:
chrName = words[2]
chrLen = words[5]
chroms.append((chrName, chrLen))

return chroms
76 changes: 76 additions & 0 deletions cluster_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
from shell_command import shellCommand
import sys
from time import sleep
from tempfile import NamedTemporaryFile

def isJobCompleted(jobID):
(stdout, stderr, returnCode) = shellCommand("qstat %s" % jobID)
if returnCode != 0:
return True
else:
try:
lines = stdout.split('\n')
statusLine = lines[2]
statusVal = statusLine.split()[4]
except:
return "bad result from qstat"
return statusVal is 'C'

def waitForJobCompletion(jobID):
while(not isJobCompleted(jobID)):
sleep(10)

def runJobAndWait(script):
#print jobScript
jobID = script.launch()
#print jobID
waitForJobCompletion(jobID)

class PBS_Script(object):
def __init__(self, command, walltime=None, name=None, memInGB=None, queue='batch', moduleList=None):
self.command = command
if queue in ['batch', 'smp']:
self.queue = queue
else:
self.queue = 'batch'
self.name = name
self.memInGB = memInGB
self.walltime = walltime
self.moduleList = moduleList

def __str__(self):
script = ['#!/bin/bash']
# XXX hack
script.append('#PBS -o log/%s' % self.name)
script.append('#PBS -e log/%s' % self.name)
script.append('#PBS -q %s' % self.queue)
if self.name:
script.append('#PBS -N %s' % self.name)
if self.memInGB:
if self.queue == 'smp':
script.append('#PBS -l mem=%sgb' % self.memInGB)
else:
script.append('#PBS -l pvmem=%sgb' % self.memInGB)
if self.walltime:
script.append('#PBS -l walltime=%s' % self.walltime)
if type(self.moduleList) == list and len(self.moduleList) > 0:
script.append('module load %s' % ' '.join(self.moduleList))
script.append('cd $PBS_O_WORKDIR')
script.append(self.command)
return '\n'.join(script) + '\n'

def launch(self):
file = NamedTemporaryFile()
file.write(str(self))
file.flush()
command = 'qsub ' + file.name
(stdout, stderr, returnCode) = shellCommand(command)
file.close()
if returnCode == 0:
return stdout
else:
raise(Exception('qsub command failed with exit status: ' + str(returnCode)))

#waitForJobCompletion(jobID)
#jobScript = PBS_Script("sleep 50", "00:00:50", "sleepy", "1")
#runJobAndWait(jobScript)
36 changes: 36 additions & 0 deletions docs/alignment_pipeline.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
### these steps need to be run for each set of fastq read files + reference file
# generate alignment
# two steps can run in parallel
bwa aln -t 8 ref.fasta data/reads_1.fastq > reads_1.sai
bwa aln -t 8 ref.fasta data/reads_1.fastq > reads_1.sai

# make sorted bam file (depends on .sai files created in previous step)
# each step depends on prior step
bwa sampe ref.fasta reads_1.sai reads_2.sai reads_1.fastq reads_2.fastq > reads.sam
samtools import ref.fasta reads.sam reads.bam
samtools sort reads.bam reads_sorted

# generate full pileup (depends on sorted bam)
samtools pileup -cf ref.fasta reads_sorted.bam > reads_full.pileup

# call SNPs (depends on sorted bam)
samtools pileup -vcf ref.fasta reads_sorted.bam > reads.var.pileup

### the filtering step needs to be run for each of the output pileups, with each chromosome provided in the reference file ($chrname below)
### i.e. for reference file:
### >chr1
### >chr2
### >plasmid1
### the filter step would be run using 'chr1', 'chr2', 'plasmid1' as values of $chrname

# filter SNPs based on depth (depends on full pileup and SNP pileup)
# note this may need to be done for multiple values of '$chrname'
grep '$chrname' reads_full.pileup | awk "BEGIN{total=0} {total += $8} END{print total}" > ref_$chrname.depth

### this is a bit of perl I use to obtain the parameters (minimum & maximum depth) for filtering of SNPs
my $depth = `cat ref_$chrname.depth`;
my $minD = int $depth/$length/2 + 1;
my $maxD = int $depth/$length*2 - 1;

### these parameters are then used in the varFilter:
samtools.pl varFilter -D $maxD -d $minD reads_var.pileup | awk '$3!="*" && $4!="*" && ($4=="A" || $4=="C" || $4=="G" || $4=="T") && $5 > 30 && $1=="$chrname"' > reads_$chrname_var_filtered.pileup
34 changes: 34 additions & 0 deletions ngs_pipeline.opt
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
reference: /vlsci/VLSCI/bjpop/scratch/references/kat.holt/Klebs_MGH78578_withPlasmid.mfasta
sequences: /vlsci/VLSCI/bjpop/scratch/sequences/kat.holt/*.fastq
pipeline:
logDir: log
logFile: pipeline.log
style: run
procs: 8
paired: True
stageDefaults:
distributed: False
walltime: "02:00:00"
memInGB: 10
modules:
- "bwa-gcc"
- "samtools-gcc/0.1.8"
stages:
mkRefDataBase:
command: "lambda ref, out: 'bwa index %s -a bwtsw' % ref"
alignSequence:
command: "lambda ref, seq, out: 'bwa aln -t 8 %s %s > %s' % (ref, seq, out)"
walltime: "06:00:00"
queue: smp
alignToSam:
command: "lambda ref, align1, align2, seq1, seq2, out: 'bwa sampe %s %s %s %s %s > %s' % (ref, align1, align2, seq1, seq2, out)"
samToBam:
command: "lambda ref, sam, out: 'samtools import %s %s %s' % (ref, sam, out)"
sortBam:
command: "lambda bam, out: 'samtools sort %s %s' % (bam, out)"
pileupFull:
command: "lambda ref, bam, out: 'samtools pileup -cf %s %s > %s' % (ref, bam, out)"
callSNPs:
command: "lambda ref, bam, out: 'samtools pileup -vcf %s %s > %s' % (ref, bam, out)"
varFilter:
command: "lambda fullPileup, varPileup, chromName, chromLength, out: './varFilter %s %s %s %s %s' % (fullPileup, varPileup, chromName, chromLength, out)"
128 changes: 128 additions & 0 deletions pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
#!/bin/env python

'''
BWA pipeline for Kat Holt.
Authors: Bernie Pope, Kat Holt.
Description:
This program implements a workflow pipeline for next generation
sequencing, predominantly the read mapping task, up until variant
detection.
It uses the Ruffus library to make the description of the pipeline
more declarative.
It supports parallel evaluation of independent pipeline stages,
and can run stages on a cluster environment.
The pipeline is configured by an options file in YAML format,
including the actual commands which are run at each stage.
'''

from ruffus import *
import os.path
import shutil
from utils import (runStage, splitPath, getOptions, initLog, getCommand)
from chrom_info import (chromInfo)
import sys
import glob

# Read the configuation options from file, determine the reference file
# and list of sequence files.
options = getOptions()
reference = options['reference']
#sequences = options['sequences']
sequencePatterns = options['sequences']
sequences = []
if type(sequencePatterns) == list:
for pattern in sequencePatterns:
sequences.append(glob.glob(pattern))
else:
sequences = glob.glob(sequencePatterns)
# Start the logging process.
logger = initLog(options)
# Get information about chromosomes in the reference file
chromosomes = chromInfo(reference)

# Index the reference file.
@files(reference, reference + '.bwt', logger)
def mkRefDataBase(reference, output, logger):
runStage('mkRefDataBase', logger, options, reference, output)

# Align sequence reads to the reference genome.
@follows(mkRefDataBase)
@transform(sequences, suffix('.fastq'), '.sai', logger)
def alignSequence(sequence, output, logger):
runStage('alignSequence', logger, options, reference, sequence, output)

input = r'(.+)_1\.sai'
extraInputs = [r'\1_2.sai', r'\1_1.fastq', r'\1_2.fastq']

# Convert alignments to SAM format.
@transform(alignSequence, regex(input), add_inputs(extraInputs), r'\1.sam', logger)
def alignToSam(inputs, output, logger):
align1, [align2, seq1, seq2] = inputs
runStage('alignToSam', logger, options, reference, align2, align2, seq1, seq2, output)

# Convert SAM alignments to BAM format.
@transform(alignToSam, suffix('.sam'), '.bam', logger)
def samToBam(samFile, output, logger):
runStage('samToBam', logger, options, reference, samFile, output)

# Sort BAM alignments by (leftmost?) coordinates.
#@transform(samToBam, suffix('.bam'), '.sorted.bam', logger)
#def sortBam(bamFile, output, logger):
# (prefix, name, ext) = splitPath(output)
# outFile = os.path.join(prefix,name)
# runStage('sortBam', logger, options, bamFile, outFile)

# Sort BAM alignments.
@transform(samToBam, suffix('.bam'), '.sorted.bam', logger)
def sortBam(bamFile, output, logger):
(prefix, name, ext) = splitPath(output)
outFile = os.path.join(prefix,name)
runStage('sortBam', logger, options, bamFile, outFile)

# Convert BAM alignment to pileup format.
#@transform(sortBam, suffix('.bam'), '.full.pileup', logger)
@transform(sortBam, regex(r'(.+)\.sorted\.bam'), r'\1.full.pileup', logger)
def pileupFull(bamFile, output, logger):
runStage('pileupFull', logger, options, reference, bamFile, output)

# Call SNPs
#@transform(sortBam, suffix('.bam'), '.var.pileup', logger)
@transform(sortBam, regex(r'(.+)\.sorted\.bam'), r'\1.var.pileup', logger)
def callSNPs(bamFile, output, logger):
runStage('callSNPs', logger, options, reference, bamFile, output)

(prefix,seqName,ext) = splitPath(sequences[0])
fullPileups = glob.glob(os.path.join(prefix, '*.full.pileup'))

def filterInputs():
for chromName,chromLength in chromosomes:
for fullPileup in fullPileups:
(prefix,seqName,ext) = splitPath(fullPileup)
varPileup = os.path.join(prefix, seqName + '.var.pileup')
output = os.path.join(prefix, seqName + '.' + chromName + 'var_filtered.pileup')
print([fullPileup, output, varPileup, chromName, chromLength, logger])
yield([fullPileup, output, varPileup, chromName, chromLength, logger])

@follows(pileupFull)
@follows(callSNPs)
@files(filterInputs)
def varFilter(fullPileup, output, varPileup, chromName, chromLength, logger):
runStage('varFilter', logger, options, fullPileup, varPileup, chromName, chromLength, output)

# Define the end-points of the pipeline.
pipeline = [varFilter]

# Invoke the pipeline.
pipelineOptions = options['pipeline']
if pipelineOptions['style'] == 'run':
# Perform the pipeline steps.
pipeline_run(pipeline, multiprocess = pipelineOptions['procs'], logger = black_hole_logger)
elif pipelineOptions['style'] == 'flowchart':
# Draw the pipeline as a diagram.
pipeline_printout_graph ('flowchart.svg', 'svg', pipeline, no_key_legend = False)
7 changes: 7 additions & 0 deletions shell_command.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import subprocess

def shellCommand(command):
process = subprocess.Popen(command, stdout = subprocess.PIPE,
stderr = subprocess.PIPE, shell = True)
stdoutStr, stderrStr = process.communicate()
return(stdoutStr, stderrStr, process.returncode)
Loading

0 comments on commit eeca2ca

Please sign in to comment.