diff --git a/tests/__snapshots__/test_workflow.ambr b/tests/__snapshots__/test_workflow.ambr index 0dffafa..ecaf069 100644 --- a/tests/__snapshots__/test_workflow.ambr +++ b/tests/__snapshots__/test_workflow.ambr @@ -21,750 +21,6 @@ --- # name: test_map_isolates [ - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', - '', 'HWI-ST1410:82:C2VAGACXX:7:1101:10137:16449\t0\tNC_016509\t402\t255\t1S100M\t*\t0\t0\tACCGCCTTCTTTCTTTTCGTATCGTTCATTTCCTTCTCTTTTCTTTTTCTTTTTATCTTTACTTTTTCGCCTTCTTTCTTATTGTATCCATCGTTTCCTTT\tCCCFFFFFHHHHHJJJJJJHIJJIFHIGIIIJJIJIIJJJJJJIJJJJGIJJJJGIIIJIGGIJJJIIHJHHHFFDFFFFCEECCEEDCDDDDABDCDDDD\tAS:i:192\tXN:i:0\tXM:i:1\tXO:i:0\tXG:i:0\tNM:i:1\tMD:Z:56G43\tYT:Z:UU', 'HWI-ST1410:82:C2VAGACXX:7:1101:10321:45253\t0\tNC_016416\t4708\t255\t101M\t*\t0\t0\tAAAGTTTCGAAGGCCTGCAATACAGAGGAGCCTGCTGCTCAAACTAAGCCGACAAGAATACCAGATCCATCTTATATTAAGTTGGATGAAGAAAGAAACGC\tBCCFDFFFHHHHHJJIJJJJJJJJJJJJJJJJJJJJJJJJJJJJIJJHIIJJJJJJJHHHHHHFFFFFEEEEEEDDFFFFDDFDDDDDDDDDDDDDDDDDD\tAS:i:142\tXN:i:0\tXM:i:8\tXO:i:0\tXG:i:0\tNM:i:8\tMD:Z:8A5T2T5G21G17A0C13G22\tYT:Z:UU', 'HWI-ST1410:82:C2VAGACXX:7:1101:10368:58012\t0\tNC_016509\t594\t255\t101M\t*\t0\t0\tCCCCTTTCTTTTACTGTCTAGGTTCTATTTTTTCCTTTTCCTCCTTAGTATTTTGCAGTATCGGTTTCATTTCTAACTTTTTTATTCGTCTTGTACTTCCT\tCCCFFFFFHHHHHJJJHJJJJJHHIJIJJJJJJJJIJJJJJIJJJJJJIJJJJJIJIJIJIIJJGIIJJJJJJJHHHHHHHFDDEEEDDDDDCDDEFFDDD\tAS:i:194\tXN:i:0\tXM:i:1\tXO:i:0\tXG:i:0\tNM:i:1\tMD:Z:27A73\tYT:Z:UU', diff --git a/tests/test_workflow.py b/tests/test_workflow.py index 79c28cf..8c5a4f8 100644 --- a/tests/test_workflow.py +++ b/tests/test_workflow.py @@ -187,9 +187,10 @@ async def test_map_isolates( assert intermediate.isolate_high_scores == snapshot +@pytest.mark.parametrize("none", [True, False]) @pytest.mark.datafiles(SAM_PATH, FASTQ_PATH) async def test_eliminate_subtraction( - datafiles, subtractions, work_path, run_subprocess + none, datafiles, subtractions, work_path, run_subprocess ): isolate_fastq_path = work_path / "test.fq" isolate_sam_path = work_path / "test_al.sam" @@ -198,6 +199,9 @@ async def test_eliminate_subtraction( results = {} + if none: + subtractions = [] + await eliminate_subtraction( isolate_fastq_path, isolate_sam_path, @@ -209,9 +213,13 @@ async def test_eliminate_subtraction( work_path, ) + if none: + assert results["subtracted_count"] == 0 + else: + assert results["subtracted_count"] == 4 + assert not (work_path / "to_subtraction.sam").is_file() assert (work_path / "subtracted.sam").is_file() - assert results["subtracted_count"] == 4 async def test_pathoscope( diff --git a/workflow.py b/workflow.py index d3f4c4a..3ce3bb5 100644 --- a/workflow.py +++ b/workflow.py @@ -5,7 +5,7 @@ from logging import getLogger from pathlib import Path from types import SimpleNamespace -from typing import Any, Dict, List, TextIO +from typing import Any, Dict, List, TextIO, Set import rust_utils import aiofiles @@ -22,6 +22,34 @@ logger = getLogger("pathoscope") +BAD_FIRST_SAM_CHARACTERS = {"\n", "@", "#"} + + +def read_fastq_grouped_lines(fastq_file: TextIO): + while True: + fastq_read = ( + fastq_file.readline(), + fastq_file.readline(), + fastq_file.readline(), + fastq_file.readline(), + ) + + if "" in fastq_read: + return + + yield fastq_read + + +def subtract_fastq( + current_fastq_path: Path, new_fastq_path: Path, subtracted_reads: Set[str] +): + with open(current_fastq_path, "r") as current_fastq_file, open( + new_fastq_path, "w" + ) as new_fastq_file: + for record in read_fastq_grouped_lines(current_fastq_file): + if record[0].strip("@\n") not in subtracted_reads: + new_fastq_file.write("".join(record)) + @hooks.on_failure async def delete_analysis_document(analysis_provider): @@ -46,7 +74,6 @@ async def map_default_isolates( Map sample reads to all default isolates to identify candidate OTUs. This will be used to identify candidate OTUs. - """ async def stdout_handler(line: bytes): @@ -89,7 +116,7 @@ async def stdout_handler(line: bytes): stdout_handler=stdout_handler, ) - logger.info(f"Found {len(intermediate.to_otus)} potential OTUs.") + logger.info("Found %i potential OTUs.", len(intermediate.to_otus)) @step @@ -140,7 +167,7 @@ async def map_isolates( async def stdout_handler(line: bytes): line = line.decode() - if line[0] == "@" or line == "#": + if not line or line[0] in BAD_FIRST_SAM_CHARACTERS: return sam_line = SamLine(line) @@ -158,12 +185,14 @@ async def stdout_handler(line: bytes): if sam_line.score > isolate_high_scores[sam_line.read_id]: isolate_high_scores[sam_line.read_id] = sam_line.score - await f.write(f"{line}\n") + await f.write(f"{line}") command = [ "bowtie2", "-p", str(proc), + "--no-hd", + "--no-sq", "--no-unal", "--local", "--score-min", @@ -187,32 +216,6 @@ async def stdout_handler(line: bytes): intermediate.isolate_high_scores = dict(isolate_high_scores) -def read_fastq_grouped_lines(fastq_file: TextIO): - while True: - fastq_read = ( - fastq_file.readline(), - fastq_file.readline(), - fastq_file.readline(), - fastq_file.readline(), - ) - - if "" in fastq_read: - return - - yield fastq_read - - -def subtract_fastq( - current_fastq_path: Path, new_fastq_path: Path, subtracted_reads: List[str] -): - with open(current_fastq_path, "r") as current_fastq_file, open( - new_fastq_path, "w" - ) as new_fastq_file: - for record in read_fastq_grouped_lines(current_fastq_file): - if record[0].strip("@\n") not in subtracted_reads: - new_fastq_file.write("".join(record)) - - @step async def eliminate_subtraction( isolate_fastq_path: Path, @@ -247,17 +250,22 @@ async def eliminate_subtraction( :param work_path: path to the workflow working directory """ - to_subtraction_sam_path = work_path / "to_subtraction.sam" + if len(subtractions) == 0: + logger.info("No subtractions to eliminate reads against. Skipping step.") + await asyncio.to_thread(shutil.copyfile, isolate_sam_path, subtracted_sam_path) + results["subtracted_count"] = 0 + return current_fastq_path = work_path / "current_fastq.fq" - new_fastq_path = work_path / "new_fastq.fq" + to_subtraction_sam_path = work_path / "to_subtraction.sam" # copy the original fastq file into a working fastq file # as to not disrupt possible uses elsewhere await asyncio.to_thread(shutil.copyfile, isolate_fastq_path, current_fastq_path) - # for the first subtraction, use the original isolate file - current_isolate_path = isolate_sam_path + # The SAM file that reads should be subtracted from if they map better to a + # subtraction. + current_sam_input_path = isolate_sam_path subtracted_count = 0 @@ -284,37 +292,40 @@ async def eliminate_subtraction( ) rust_utils.run_eliminate_subtraction( - str(current_isolate_path), + str(current_sam_input_path), str(to_subtraction_sam_path), str(subtracted_sam_path), ) await aiofiles.os.remove(to_subtraction_sam_path) - # set next iteration's current isolate file to the newly created isolate file - current_isolate_path = work_path / "working_isolate.sam" + current_sam_input_path = work_path / "working_isolate.sam" + await asyncio.to_thread( - shutil.copyfile, subtracted_sam_path, current_isolate_path + shutil.copyfile, subtracted_sam_path, current_sam_input_path ) - subtracted_reads = {} - async with aiofiles.open("subtracted_read_ids.txt", "r") as f: subtracted_reads = {str(line).strip("@\n") async for line in f} subtracted_count += len(subtracted_reads) + # Rewrite the input FASTQ file to exclude reads that mapped better to a + # subtraction. + new_fastq_path = work_path / "new_fastq.fq" + await asyncio.to_thread( subtract_fastq, current_fastq_path, new_fastq_path, subtracted_reads ) - # rewrite the fastq file to exclude previous reads - - # set next iteration's current fastq file to newly created fastq file + # Overwrite the previous input FASTA with the subtracted one. await asyncio.to_thread(shutil.copyfile, new_fastq_path, current_fastq_path) logger.info( - "Subtracted %s reads that mapped better to a subtraction.", subtracted_count + "Subtracted reads that mapped better to a subtraction subtraction_id=%s subtraction_name=%s count=%i", + subtraction.id, + subtraction.name, + subtracted_count, ) results["subtracted_count"] = subtracted_count @@ -335,12 +346,14 @@ async def reassignment( Tab-separated output is written to ``pathoscope.tsv``. The results are also parsed and saved to `intermediate.coverage`. - """ reassigned_path = work_path / "reassigned.sam" - logger.info(f"subtracted_sam_path: {subtracted_sam_path}") - logger.info(f"reassigned_path: {reassigned_path}") + logger.info( + "Running Pathoscope subtracted_sam_path=%s reassigned_path=%s", + subtracted_sam_path, + reassigned_path, + ) ( best_hit_initial_reads, @@ -363,6 +376,8 @@ async def reassignment( report_path = work_path / "report.tsv" + logger.info("Writing report report_path=%s", report_path) + report = await asyncio.to_thread( write_report, report_path, @@ -382,10 +397,14 @@ async def reassignment( analysis.upload(report_path, "tsv") + logger.info("Calculating coverage") + intermediate.coverage = await asyncio.to_thread( calculate_coverage, reassigned_path, intermediate.lengths ) + logger.info("Preparing hits") + hits = list() for sequence_id, hit in report.items():