-
Notifications
You must be signed in to change notification settings - Fork 6
/
workflow.py
117 lines (94 loc) · 3.15 KB
/
workflow.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import asyncio
import shutil
from pathlib import Path
from types import SimpleNamespace
from virtool_core.utils import compress_file, decompress_file, is_gzipped
from virtool_workflow import hooks, step
from virtool_workflow.data.subtractions import WFNewSubtraction
from virtool_workflow.runtime.run_subprocess import RunSubprocess
@hooks.on_failure
async def delete_subtraction(new_subtraction: WFNewSubtraction):
"""Delete the subtraction in the case of a failure."""
await new_subtraction.delete()
@step(name="Decompress FASTA")
async def decompress(
decompressed_fasta_path: Path,
new_subtraction: WFNewSubtraction,
proc: int,
):
"""Ensure the input FASTA data is decompressed."""
if await asyncio.to_thread(is_gzipped, new_subtraction.fasta_path):
await asyncio.to_thread(
decompress_file,
new_subtraction.fasta_path,
decompressed_fasta_path,
processes=proc,
)
else:
await asyncio.to_thread(
shutil.copyfile, new_subtraction.fasta_path, decompressed_fasta_path
)
@step(name="Compute GC and Count")
async def compute_gc_and_count(
decompressed_fasta_path: Path, intermediate: SimpleNamespace
):
"""Compute the GC and count."""
def func(path: Path):
if not path.suffix != "fa":
raise ValueError("Input file is not a FASTA file.")
_count = 0
_nucleotides = {"a": 0, "t": 0, "g": 0, "c": 0, "n": 0}
with open(path, "r") as f:
for line in f:
if line[0] == ">":
_count += 1
elif line:
for i in ["a", "t", "g", "c", "n"]:
# Find lowercase and uppercase nucleotide characters
_nucleotides[i] += line.lower().count(i)
return _count, _nucleotides
count, nucleotides = await asyncio.to_thread(func, decompressed_fasta_path)
nucleotides_sum = sum(nucleotides.values())
intermediate.count = count
intermediate.gc = {
key: round(nucleotides[key] / nucleotides_sum, 3) for key in nucleotides
}
@step
async def build_index(
bowtie_index_path: Path,
decompressed_fasta_path: Path,
proc: int,
run_subprocess: RunSubprocess,
):
"""Build a Bowtie2 index."""
await run_subprocess(
[
"bowtie2-build",
"-f",
"--threads",
str(proc),
decompressed_fasta_path,
str(bowtie_index_path) + "/subtraction",
]
)
@step
async def finalize(
bowtie_index_path: Path,
decompressed_fasta_path: Path,
intermediate: SimpleNamespace,
new_subtraction: WFNewSubtraction,
proc: int,
work_path: Path,
):
"""Compress and subtraction data."""
compressed_path = work_path / "subtraction.fa.gz"
await asyncio.to_thread(
compress_file,
decompressed_fasta_path,
compressed_path,
processes=proc,
)
await new_subtraction.upload(compressed_path)
for path in bowtie_index_path.glob("*.bt2"):
await new_subtraction.upload(path)
await new_subtraction.finalize(intermediate.gc, intermediate.count)