Skip to content

Commit

Permalink
adding coverages
Browse files Browse the repository at this point in the history
  • Loading branch information
antgonza committed Aug 7, 2023
1 parent 179e27a commit b220a4e
Show file tree
Hide file tree
Showing 8 changed files with 201 additions and 73 deletions.
4 changes: 4 additions & 0 deletions qp_woltka/support_files/coverages/coverage_1.cov
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
GXXX 200 300
GXXX 500 600
GXXX 100 200
GYYY 100 200
3 changes: 3 additions & 0 deletions qp_woltka/support_files/coverages/coverage_2.cov
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
GYYY 500 1000
GYYY 200 400
GXXX 300 400
3 changes: 3 additions & 0 deletions qp_woltka/support_files/coverages/coverage_3.cov
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
GYYY 500 1000
GXXX 100 400
GZZZ 200 400 500 1500
109 changes: 109 additions & 0 deletions qp_woltka/tests/test_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
# -----------------------------------------------------------------------------
# Copyright (c) 2020--, The Qiita Development Team.
#
# Distributed under the terms of the BSD 3-clause License.
#
# The full license is in the file LICENSE, distributed with this software.
# -----------------------------------------------------------------------------

from unittest import main, TestCase
from os import environ
from os.path import join
from tempfile import TemporaryDirectory
import gzip
import io
import pandas as pd

from qp_woltka.util import (
get_dbs, generate_woltka_dflt_params, mux, demux, search_by_filename,
merge_ranges)


class UtilTests(TestCase):
def setUp(self):
self.db_path = environ["QC_WOLTKA_DB_DP"]

def test_get_dbs(self):
db_path = self.db_path
obs = get_dbs(db_path)
exp = {'wol': join(db_path, 'wol/WoLmin'),
'rep82': join(db_path, 'rep82/5min')}

self.assertDictEqual(obs, exp)

def test_generate_woltka_dflt_params(self):
obs = generate_woltka_dflt_params()
exp = {'wol': {'Database': join(self.db_path, 'wol/WoLmin')},
'rep82': {'Database': join(self.db_path, 'rep82/5min')}}

self.assertDictEqual(obs, exp)

def test_mux(self):
f1 = b"@foo\nATGC\n+\nIIII\n"
f2 = b"@bar\nAAAA\n+\nIIII\n"
exp = b"@foo@@@foofile\nATGC\n+\nIIII\n@bar@@@barfile\nAAAA\n+\nIIII\n"
with TemporaryDirectory() as d:
f1fp = d + '/foofile.fastq'
f2fp = d + '/barfile.fastq'
ofp = d + '/output'
with gzip.open(f1fp, 'wb') as fp:
fp.write(f1)
with gzip.open(f2fp, 'wb') as fp:
fp.write(f2)
with open(ofp, 'wb') as output:
mux([f1fp, f2fp], output)
with open(ofp, 'rb') as result:
obs = result.read()

self.assertEqual(obs, exp)

def test_demux(self):
prep = pd.DataFrame([['sample_foo', 'foofile'],
['sample_bar', 'barfile']],
columns=['sample_name', 'run_prefix'])
input_ = io.BytesIO(b"foo@@@foofile_R1\tATGC\t+\tIIII\nbar@@@"
b"barfile_R2\tAAAA\t+\tIIII\n")
expfoo = b"foo\tATGC\t+\tIIII\n"
expbar = b"bar\tAAAA\t+\tIIII\n"
with TemporaryDirectory() as d:
demux(input_, d.encode('ascii'), prep)
foo = open(d + '/sample_foo.sam', 'rb').read()
bar = open(d + '/sample_bar.sam', 'rb').read()

self.assertEqual(foo, expfoo)
self.assertEqual(bar, expbar)

def test_search_by_filename(self):
lookup = {'foo_bar': 'baz',
'foo': 'bar'}
tests = [('foo_bar_thing', 'baz'),
('foo_stuff_blah', 'bar'),
('foo.stuff.blah', 'bar'),
('foo_bar', 'baz')]
for test, exp in tests:
obs = search_by_filename(test, lookup)
self.assertEqual(obs, exp)

with self.assertRaises(KeyError):
search_by_filename('does_not_exist', lookup)

def test_merge_ranges(self):
files = [
'qp_woltka/support_files/coverages/coverage_1.cov',
'qp_woltka/support_files/coverages/coverage_2.cov',
]
exp = ['GXXX\t100\t400\t500\t600', 'GYYY\t100\t400\t500\t1000']
self.assertEqual(merge_ranges(files), exp)

files = [
'qp_woltka/support_files/coverages/coverage_1.cov',
'qp_woltka/support_files/coverages/coverage_2.cov',
'qp_woltka/support_files/coverages/coverage_3.cov',
]
exp = ['GXXX\t100\t400\t500\t600', 'GYYY\t100\t400\t500\t1000',
'GZZZ\t200\t400\t500\t1500']
self.assertEqual(merge_ranges(files), exp)


if __name__ == '__main__':
main()
79 changes: 7 additions & 72 deletions qp_woltka/tests/test_woltka.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,10 @@
from os import remove, environ
from os.path import exists, isdir, join, dirname, basename
from shutil import rmtree, copyfile
from tempfile import mkdtemp, TemporaryDirectory
from tempfile import mkdtemp
from json import dumps
import gzip
import io
import pandas as pd

from qp_woltka import plugin
from qp_woltka.util import (
get_dbs, generate_woltka_dflt_params, mux, demux, search_by_filename)
from qp_woltka.woltka import woltka_to_array, woltka


Expand All @@ -47,21 +42,6 @@ def tearDown(self):
else:
remove(fp)

def test_get_dbs(self):
db_path = self.db_path
obs = get_dbs(db_path)
exp = {'wol': join(db_path, 'wol/WoLmin'),
'rep82': join(db_path, 'rep82/5min')}

self.assertDictEqual(obs, exp)

def test_generate_woltka_dflt_params(self):
obs = generate_woltka_dflt_params()
exp = {'wol': {'Database': join(self.db_path, 'wol/WoLmin')},
'rep82': {'Database': join(self.db_path, 'rep82/5min')}}

self.assertDictEqual(obs, exp)

def _helper_woltka_bowtie(self, prep_info_dict, database=None):
data = {'prep_info': dumps(prep_info_dict),
# magic #1 = testing study
Expand Down Expand Up @@ -167,7 +147,8 @@ def test_woltka_to_array_rep82(self):
'for f in `cat sample_processing_${SLURM_ARRAY_TASK_ID}.log`\n',
'do\n',
' echo "woltka classify -i ${f} -o ${f}.woltka-taxa --no-demux '
f'--lineage {database}.tax --rank free,none"\n',
f'--lineage {database}.tax --rank free,none --outcov '
'coverages/"\n',
'done | parallel -j 8\n',
'for f in `cat sample_processing_${SLURM_ARRAY_TASK_ID}.log`\n',
'do\n',
Expand Down Expand Up @@ -204,6 +185,7 @@ def test_woltka_to_array_rep82(self):
'wait\n',
'\n',
f'cd {out_dir}; tar -cvf alignment.tar *.sam.xz\n',
f'cd {out_dir}; tar zxvf coverages.tgz coverages\n'
'fi\n',
f'finish_woltka {url} {job_id} {out_dir}\n',
'date\n']
Expand Down Expand Up @@ -290,7 +272,8 @@ def test_woltka_to_array_wol(self):
'for f in `cat sample_processing_${SLURM_ARRAY_TASK_ID}.log`\n',
'do\n',
' echo "woltka classify -i ${f} -o ${f}.woltka-taxa --no-demux '
f'--lineage {database}.tax --rank free,none"\n',
f'--lineage {database}.tax --rank free,none --outcov '
'coverages/"\n',
f' echo "woltka classify -i ${{f}} -c {database}.coords '
'-o ${f}.woltka-per-gene --no-demux"\n',
'done | parallel -j 8\n',
Expand Down Expand Up @@ -331,6 +314,7 @@ def test_woltka_to_array_wol(self):
'wait\n',
'\n',
f'cd {out_dir}; tar -cvf alignment.tar *.sam.xz\n',
f'cd {out_dir}; tar zxvf coverages.tgz coverages\n'
'fi\n',
f'finish_woltka {url} {job_id} {out_dir}\n',
'date\n']
Expand Down Expand Up @@ -414,55 +398,6 @@ def test_creation_error_no_unique_run_prefix(self):
self.assertEqual(str(error.exception), "Multiple run prefixes match "
"this fwd read: S22205_S104_L001_R1_001.fastq.gz")

def test_mux(self):
f1 = b"@foo\nATGC\n+\nIIII\n"
f2 = b"@bar\nAAAA\n+\nIIII\n"
exp = b"@foo@@@foofile\nATGC\n+\nIIII\n@bar@@@barfile\nAAAA\n+\nIIII\n"
with TemporaryDirectory() as d:
f1fp = d + '/foofile.fastq'
f2fp = d + '/barfile.fastq'
ofp = d + '/output'
with gzip.open(f1fp, 'wb') as fp:
fp.write(f1)
with gzip.open(f2fp, 'wb') as fp:
fp.write(f2)
with open(ofp, 'wb') as output:
mux([f1fp, f2fp], output)
with open(ofp, 'rb') as result:
obs = result.read()

self.assertEqual(obs, exp)

def test_search_by_filename(self):
lookup = {'foo_bar': 'baz',
'foo': 'bar'}
tests = [('foo_bar_thing', 'baz'),
('foo_stuff_blah', 'bar'),
('foo.stuff.blah', 'bar'),
('foo_bar', 'baz')]
for test, exp in tests:
obs = search_by_filename(test, lookup)
self.assertEqual(obs, exp)

with self.assertRaises(KeyError):
search_by_filename('does_not_exist', lookup)

def test_demux(self):
prep = pd.DataFrame([['sample_foo', 'foofile'],
['sample_bar', 'barfile']],
columns=['sample_name', 'run_prefix'])
input_ = io.BytesIO(b"foo@@@foofile_R1\tATGC\t+\tIIII\nbar@@@"
b"barfile_R2\tAAAA\t+\tIIII\n")
expfoo = b"foo\tATGC\t+\tIIII\n"
expbar = b"bar\tAAAA\t+\tIIII\n"
with TemporaryDirectory() as d:
demux(input_, d.encode('ascii'), prep)
foo = open(d + '/sample_foo.sam', 'rb').read()
bar = open(d + '/sample_bar.sam', 'rb').read()

self.assertEqual(foo, expfoo)
self.assertEqual(bar, expbar)


if __name__ == '__main__':
main()
64 changes: 64 additions & 0 deletions qp_woltka/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from os import environ, listdir
from os.path import join, isdir, expanduser
from configparser import ConfigParser
from collections import defaultdict

import gzip
from signal import signal, SIGPIPE, SIG_DFL
Expand Down Expand Up @@ -163,3 +164,66 @@ def demux(input_, output, prep):
current_fp.write(id_)
current_fp.write(tab)
current_fp.write(remainder)


def merge_ranges(files):
# the lines below are borrowed from zebra filter but they are sligthly
# modified; mainly the autocompress parameter was deleted so it always
# autocompresses
class SortedRangeList:
def __init__(self):
self.ranges = []

def add_range(self, start, end):
self.ranges.append((start, end))
self.compress()

def compress(self):
# Sort ranges by start index
self.ranges.sort(key=lambda r: r[0])

new_ranges, start_val, end_val = [], None, None
for r in self.ranges:
if end_val is None:
# case 1: no active range, start active range.
start_val = r[0]
end_val = r[1]
elif end_val >= r[0] - 1:
# case 2: active range continues through this range
# extend active range
end_val = max(end_val, r[1])
else: # if end_val < r[0] - 1:
# case 3: active range ends before this range begins
# write new range out, then start new active range
new_range = (start_val, end_val)
new_ranges.append(new_range)
start_val = r[0]
end_val = r[1]

if end_val is not None:
new_range = (start_val, end_val)
new_ranges.append(new_range)

self.ranges = new_ranges

def compute_length(self):
total = 0
for r in self.ranges:
total += r[1] - r[0] + 1
return total

merger = defaultdict(SortedRangeList)
for file in files:
with open(file, 'r') as f:
for range_line in f:
mems = range_line.split()
gotu = mems.pop(0)
for srange, erange in zip(*[iter(mems)]*2):
merger[gotu].add_range(int(srange), int(erange))

lines = []
for gotu in merger:
ranges = '\t'.join([f'{i}' for x in merger[gotu].ranges for i in x])
lines.append(f'{gotu}\t{ranges}')

return lines
3 changes: 2 additions & 1 deletion qp_woltka/woltka.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ def woltka_to_array(files, output, database_bowtie2, prep, url, name):
"wait",
'\n'.join(fcmds),
f'cd {output}; tar -cvf alignment.tar *.sam.xz\n'
f'cd {output}; tar zxvf coverages.tgz coverages\n'
'fi',
f'finish_woltka {url} {name} {output}\n'
"date"] # end time
Expand All @@ -166,7 +167,7 @@ def woltka_to_array(files, output, database_bowtie2, prep, url, name):
'-o ${f}.woltka-taxa ' + \
'--no-demux ' + \
f'--lineage {db_files["taxonomy"]} ' + \
f'--rank {",".join(ranks)}'
f'--rank {",".join(ranks)} --outcov coverages/'

memory = MEMORY
if 'RS210' in database_bowtie2:
Expand Down
9 changes: 9 additions & 0 deletions scripts/woltka_merge
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import biom
import glob as glob_
import h5py
import click
from qp_woltka.util import merge_ranges


@click.command()
Expand All @@ -20,11 +21,15 @@ def merge(base, glob, name, rename):

search = os.path.join(base, glob)
tables = glob_.glob(search)
coverages = []
for block in range(0, len(tables), chunk_size):
chunk = tables[block:block + chunk_size]

loaded = []
for c in chunk:
if name == 'free':
bn = os.path.basename(c).replace('.biom', '.cov')
coverages.append(f'coverages/{bn}')
skip = True
if biom.util.is_hdf5_file(c):
skip = False
Expand All @@ -50,6 +55,10 @@ def merge(base, glob, name, rename):
with h5py.File(f'{base}/{name}.biom', 'w') as out:
full.to_hdf5(out, 'fast-merge')

if coverages:
with open(f'{base}/artifact.cov', 'w') as out:
out.write('\n'.join(merge_ranges(coverages)))


if __name__ == '__main__':
merge()

0 comments on commit b220a4e

Please sign in to comment.