diff --git a/.dockstore.yml b/.dockstore.yml index ff958fe50..729e7ba33 100644 --- a/.dockstore.yml +++ b/.dockstore.yml @@ -96,3 +96,15 @@ workflows: - name: CleanupIntermediate subclass: wdl primaryDescriptorPath: /wdl/pipelines/TechAgnostic/Utility/CleanupIntermediate.wdl +- name: StatisticalPhasing + subclass: wdl + primaryDescriptorPath: /wdl/pipelines/PacBio/Assembly +- name: BandagePlotGraph + subclass: wdl + primaryDescriptorPath: /wdl/pipelines/TechAgnostic/Utility/BandagePlotGraph.wdl +- name: ConvertToHailMTT2T + subclass: wdl + primaryDescriptorPath: /wdl/pipelines/TechAgnostic/Utility/ConvertToHailMTT2T.wdl +- name: MergePhasedVCF + subclass: wdl + primaryDescriptorPath: /wdl/pipelines/PacBio/Utility/MergePhasedVCF.wdl \ No newline at end of file diff --git a/docker/lr-bandage/Dockerfile b/docker/lr-bandage/Dockerfile new file mode 100644 index 000000000..659a6c79a --- /dev/null +++ b/docker/lr-bandage/Dockerfile @@ -0,0 +1,31 @@ +FROM flowcraft/bandage:0.8.1 + +# ARG SAMTOOLS_VERSION=1.15.1 +# ARG BCFTOOLS_VERSION=1.15.1 + +# ARG DEBIAN_FRONTEND=noninteractive +# RUN apt-get -qqy update --fix-missing && \ +# apt-get -qqy dist-upgrade && \ +# apt-get -qqy install --no-install-recommends \ +# ca-certificates \ +# libbz2-dev \ +# libcurl4-openssl-dev \ +# liblzma-dev \ +# libncurses5-dev \ +# autoconf \ +# automake \ +# bzip2 \ +# gcc \ +# g++ \ +# make \ +# wget \ +# zlib1g-dev && \ + +# wget https://github.com/samtools/samtools/releases/download/${SAMTOOLS_VERSION}/samtools-${SAMTOOLS_VERSION}.tar.bz2 && \ +# tar xjf samtools-${SAMTOOLS_VERSION}.tar.bz2 && \ +# cd samtools-${SAMTOOLS_VERSION} && ./configure --without-curses --enable-libcurl && make -s all all-htslib && make install install-htslib && cd - && \ +# rm -rf samtools-${SAMTOOLS_VERSION}* && \ +# wget https://github.com/samtools/bcftools/releases/download/${BCFTOOLS_VERSION}/bcftools-${BCFTOOLS_VERSION}.tar.bz2 && \ +# tar xjf bcftools-${BCFTOOLS_VERSION}.tar.bz2 && \ +# cd bcftools-${BCFTOOLS_VERSION} && ./configure --without-curses && make -s && make install && cd - && \ +# rm -rf bcftools-${BCFTOOLS_VERSION}* \ No newline at end of file diff --git a/docker/lr-jellyfish/AGG.py b/docker/lr-jellyfish/AGG.py new file mode 100644 index 000000000..5ac25bebe --- /dev/null +++ b/docker/lr-jellyfish/AGG.py @@ -0,0 +1,648 @@ +import gzip +import json +import numpy + +class GraphicalGenome: + def __init__(self, filename): + self.anchor, self.edges, self.outgoing, self.incoming = self.loadgraph(filename) + + def loadgraph(self, filename): + if filename.endswith(".gz"): + with gzip.open(filename, 'rb') as fp: + data = fp.readlines() + else: + with open(filename, 'rb') as fp: + data = fp.readlines() + Anchor_dict = {} + Edge_dict = {} + Outgoing = {} + Incoming = {} + + for line in data: + line = line.decode()[:-1] + if line.startswith("S"): + itemlist = line.split('\t') + name = itemlist[1] + seq = itemlist[2] + annotation = itemlist[3][5:] + if name.startswith("A"): + Anchor_dict[name] = json.loads(annotation) + Anchor_dict[name]['seq'] = seq + elif name.startswith("E"): + Edge_dict[name] = json.loads(annotation) + Edge_dict[name]['seq'] = seq + elif line.startswith("L"): + itemlist = line.split('\t') + src = itemlist[1] + dst = itemlist[3] + Outgoing[src] = Outgoing.get(src, []) + [dst] + Incoming[dst] = Incoming.get(dst, []) + [src] + + return Anchor_dict, Edge_dict, Outgoing, Incoming + +def reconstruct_refpath(graph, ref = 'NC_012920.1'): + src = "SOURCE" + Path = [] + + visited = set() + + while src != "SINK": + edgelist = graph.outgoing[src] + ref_edge = [] + for edge in edgelist: + if ref in graph.edges[edge]['reads']: + ref_edge += [edge] + if len(ref_edge) == 1: + edge = ref_edge[0] + else: + for edge in ref_edge: + if graph.outgoing[edge] == "SINK": + break + Path += [src, edge] + visited.add(src) + src = graph.outgoing[edge][0] + assert src.startswith("A") or src == "SINK" + if src in visited: + break + + reconstruct = '' + for item in Path: + if item == "SOURCE": + reconstruct += "" + elif item.startswith('A'): + reconstruct += graph.anchor[item]['seq'] + elif item.startswith("E"): + reconstruct += graph.edges[item]['seq'] + else: + print(item) + return reconstruct, Path + +def write_gfa(AnchorInfo, Edge_info, outputfilename): + header = ['H\tVN:Z:1.0\n'] + # add json annotation to edge and anchor sequences + AnchorS = [] + for anchor,D in AnchorInfo.items(): + seq = D.pop('seq') + json_string = json.dumps(D) + AnchorS += ['S\t%s\t%s\t%s\n' % (anchor, seq, "PG:J:" + json_string)] + + EdgeS = [] + Link = [] + for edge,edge_dict in Edge_info.items(): + seq = edge_dict.pop('seq') + src = edge_dict.pop('src') + dst = edge_dict.pop('dst') + + json_string = json.dumps(edge_dict) + EdgeS += ['S\t%s\t%s\t%s\t%s\n' % (edge, seq, "PG:J:" + json_string, "RC:i:" + str(len(edge_dict['reads'])))] + Link.append('L\t%s\t%s\t%s\t%s\t%s\n'% (src, "+", edge, "+", "0M")) + Link.append('L\t%s\t%s\t%s\t%s\t%s\n'% (edge, "+", dst, "+", "0M")) + + with open(outputfilename, 'w') as fp: + for h in header: + fp.write(h) + for s in AnchorS: + fp.write(s) + for s in EdgeS: + fp.write(s) + for l in Link: + fp.write(l) + +def reconstruct_sample_subgraph(graph, samplelist, outputfile): + Anchor_dict = {} + Edge_dict = {} + edgelist = list(graph.edges.keys()) + Anchor_dict = {} + Edge_dict = {} + nodelist = [] + for edge in edgelist: + if len(set(samplelist) & set(graph.edges[edge]['strain'])) > 0: + Edge_dict[edge] = graph.edges[edge] + Edge_dict[edge]['src'] = graph.incoming[edge][0] + Edge_dict[edge]['dst'] = graph.outgoing[edge][0] + nodelist += graph.incoming[edge] + nodelist += graph.outgoing[edge] + + nodelist = list(set(nodelist)) + for node in nodelist: + if node.startswith("A"): + Anchor_dict[node] = graph.anchor[node] + + write_gfa(Anchor_dict, Edge_dict, outputfile) + + return Anchor_dict, Edge_dict + +def find_most_supported_path(graph, sample, ref = '012920'): + src = sorted(graph.anchor.keys())[0] + dst = sorted(graph.anchor.keys())[-1] + Path = [] + + visited = set() + + def find_most_supported_edge(graph, sample, src, ref): + + edgelist = graph.outgoing.get(src, []) + sample_edge = [] + for edge in edgelist: + if sample in graph.edges[edge]['strain']: + sample_edge += [edge] + if len(sample_edge)<1: # no haplotype for this sample at this place the go to the reference one + for ref_edge in edgelist: + if ref in set(graph.edges[ref_edge]['strain']): + break + edge = ref_edge + + elif len(sample_edge) == 1: + edge = sample_edge[0] + else: + # find the most supported edge + read_count = [len(graph.edges[e]['reads']) for e in sample_edge] + + index = numpy.argmax(read_count) + + edge = sample_edge[index] + + return edge + + while src != dst: + edge = find_most_supported_edge(graph, sample, src, ref) + Path += [src, edge] + + visited.add(src) + + src = graph.outgoing[edge][0] + + assert src.startswith("A") or src == "SINK" + + if src in visited: + break + + if src == dst: + try: + edge = find_most_supported_edge(graph, sample, src, ref) + Path += [src, edge] + except: + Path = Path + + reconstruct = '' + for item in Path: + if item == "SOURCE": + reconstruct += "" + elif item.startswith('A'): + reconstruct += graph.anchor[item]['seq'] + elif item.startswith("E"): + reconstruct += graph.edges[item]['seq'] + else: + print(item) + + return reconstruct, Path + + +class Find_all_Path_between_anchors: + def __init__(self, graph, start, end, read_sets): + self.subpath = [] + self.initial_set = read_sets + self.find_path(graph, start, end, [], 0, self.initial_set) + + def find_path(self, g, start, end, sofar, depth, readset): + + if start == end: + sofar1 = sofar + [end] + if len(readset)>0: + self.subpath.append((sofar1, readset)) + return + + # path not supported + if len(readset) <1: + return + + # path not circular + if start == "SINK": + return + + depth1 = depth+ 1 + + + for dst in g.outgoing[start]: + # consider the read set in the latest 10 intervals + if dst.startswith("E"): + readset1 = readset & set(g.edges[dst]['reads']) + else: + readset1 = readset + + self.find_path(g, dst, end, sofar = sofar + [start], depth = depth1, readset = readset1) + +def reconstruct_path_seq(graph, path): + seq = "" + for item in path: + if item.startswith('A'): + seq += graph.anchor[item]['seq'] + elif item.startswith("E"): + seq += graph.edges[item]['seq'] + else: + item += "" + return seq + +def find_furthest_node(node_candidate, subgraph): + max_distance = 0 + node = "" + for n in node_candidate: + d = numpy.absolute(subgraph.anchor[node]['pos'] - subgraph.anchor['start_node']['pos']) + if d > max_distance: + node = n + return node + +class Get_Series_Parallel_Graph: + + def __init__(self, graph): + self.initial_set = self.find_all_reads(graph) + self.nodelist = self.series_parallel_graph_nodelist(graph) + #print(self.nodelist) + self.anchor, self.edges, self.outgoing, self.incoming = self.series_parallel_graph(self.nodelist, graph) + + def find_all_reads(self, graph): + read_sets = set() + edgelist = graph.edges.keys() + for item in edgelist: + readlist = graph.edges[item]['reads'] + for read in readlist: + read_sets.add(read) + return read_sets + def find_furthest_node(self, node_candidate, subgraph, start_node): + max_distance = 0 + node = "" + for n in node_candidate: + d = numpy.absolute(subgraph.anchor[n]['pos'] - subgraph.anchor[start_node]['pos']) + if d > max_distance: + node = n + return node + + def series_parallel_graph_nodelist(self, subgraph): + # need to consider the loop i.e.the last anchor to the first anchor + + start_node = sorted(subgraph.anchor.keys())[0] + Nodelist = [start_node] + + edgelist = subgraph.outgoing[start_node] + node_candidate = [] + for edge in edgelist: + nodelist = subgraph.outgoing[edge] + node_candidate += nodelist + if nodelist[0] not in subgraph.anchor: + continue + node_candidate = sorted(node_candidate) + #node = node_candidate[-1] ## node should be selected by the largest distance instead of the sorting order + node = self.find_furthest_node(node_candidate, subgraph, start_node) + + # find the furthurest anchor + Nodelist.append(node) # append the furthest node + + + + while node != start_node: + # print(node) + + edgelist = subgraph.outgoing[node] + node_candidate = [] + for edge in edgelist: + nodelist = subgraph.outgoing[edge] + # exclude deadend + if "SINK" in nodelist: + continue + if nodelist[0] not in subgraph.anchor: + continue + if nodelist[0] not in subgraph.outgoing: + continue + node_candidate += nodelist + + node_candidate = sorted(node_candidate) + node = self.find_furthest_node(node_candidate, subgraph, node) + if node in set(Nodelist): + Nodelist.append(node) + break + Nodelist.append(node) # append the furthest node + + return Nodelist + + def series_parallel_graph(self, Nodelist, subgraph): + Node_dict = {} + Edge_dict = {} + Outgoing_dict = {} + Incoming_dict = {} + for i, node in enumerate(Nodelist[:-1]): + start_node = node + end_node = Nodelist[i+1] + Node_dict[start_node] = subgraph.anchor[start_node] + # print(start_node, end_node) + path = Find_all_Path_between_anchors(subgraph, start_node, end_node, self.initial_set) + #print(start_node,end_node, len(path.subpath)) + index = 0 + for p, rs in path.subpath: + edgename = 'E%05d.%04d' % (int(start_node[1:]), index) + seq = reconstruct_path_seq(subgraph, p[1:-1]) + Edge_dict[edgename] = {} + Edge_dict[edgename]['seq'] = seq + Edge_dict[edgename]['src'] = start_node + Edge_dict[edgename]['dst'] = end_node + Edge_dict[edgename]['reads'] = list(rs) + Edge_dict[edgename]['strain'] = list(set([item.split("_")[-1] for item in list(rs)])) + + Outgoing_dict[start_node] = Outgoing_dict.get(start_node, []) + [edgename] + Outgoing_dict[edgename] = [end_node] + + Incoming_dict[end_node] = Incoming_dict.get(end_node, []) + [edgename] + Incoming_dict[edgename] = [start_node] + index += 1 + return Node_dict, Edge_dict, Outgoing_dict, Incoming_dict + + +class SubGraph: + def __init__(self, graph, samplelist): + self.anchor, self.edges, self.outgoing, self.incoming = self.reconstruct_sample_subgraph(graph, samplelist) + + def reconstruct_sample_subgraph(self, graph, samplelist): + Anchor_dict = {} + Edge_dict = {} + Outgoing = {} + Incoming = {} + + edgelist = list(graph.edges.keys()) + nodelist = [] + + for edge in edgelist: + if len(set(samplelist) & set(graph.edges[edge]['strain'])) > 0: + Edge_dict[edge] = graph.edges[edge] + src = graph.incoming[edge][0] + dst = graph.outgoing[edge][0] + + Edge_dict[edge]['src'] = src + Edge_dict[edge]['dst'] = dst + + Incoming[edge] = graph.incoming[edge] + Incoming[dst] = Incoming.get(dst, []) + [edge] + + Outgoing[edge] = graph.outgoing[edge] + Outgoing[src] = Outgoing.get(src, []) + [edge] + + nodelist += graph.incoming[edge] + nodelist += graph.outgoing[edge] + + nodelist = list(set(nodelist)) + for node in nodelist: + if node.startswith("A"): + Anchor_dict[node] = graph.anchor[node] + return Anchor_dict, Edge_dict, Outgoing, Incoming + +def validate_sp_graph(series_parallelgraph): + nodelist = series_parallelgraph.anchor.keys() + for node in nodelist: + edgelist = series_parallelgraph.outgoing[node] + outgoing_nodelist = [] + for edge in edgelist: + outgoing_nodelist.append(series_parallelgraph.edges[edge]['dst']) + outgoing_nodelist.append(series_parallelgraph.outgoing[edge][0]) + outgoing_nodelist = set(outgoing_nodelist) + assert len(outgoing_nodelist) == 1 + print("PASS") + +def processCigar(cigar): + """Helper Function, may not be used directly, expand Cigar string + + Parameters: + cigar: - compressed cigar + """ + out = '' + N = 0 + for symbol in cigar: + if symbol in '0123456789': + N = 10*N + int(symbol) + else: + #if (symbol != 'D'): + if (N == 0): + out += symbol + else: + out += N*symbol + N = 0 + return out + +def combineCigar(cigar): + """Helper Function, may not be used directly, compress Cigar string + + Parameters: + cigar: - expanded cigar + """ + cigar = cigar +'$' + out = '' + N = 0 + start = 0 + for i in range(1,len(cigar)): + if cigar[i-1] != cigar[i]: + out += str(i-start) + cigar[i-1] + start = i + return out + +def get_variant_position(cigar): + ref_pos = [] + alt_pos = [] + var_type = [] + alt_i = 0 + ref_i = 0 + for i, s in enumerate(cigar): + if s == 'I': + if ref_i > 0: + ref_pos.append(ref_i-1) + else: + ref_pos.append(ref_i) + alt_pos.append(alt_i) + var_type.append("I") + + alt_i += 1 + + if s == 'D': + ref_pos.append(ref_i) + if alt_i > 0: + alt_pos.append(alt_i-1) + else: + alt_pos.append(alt_i) + var_type.append("D") + ref_i += 1 + + if s == 'X': + ref_pos.append(ref_i) + alt_pos.append(alt_i) + alt_i += 1 + ref_i += 1 + var_type.append("X") + + if s == '=': + alt_i += 1 + ref_i += 1 + + return ref_pos, alt_pos, var_type + +def findBedge(Graph, src, dst, refstrain, k): + paths = AGG.Find_all_Path_between_anchors(Graph, src, dst, {refstrain}) + subpaths = paths.subpath + + if len(subpaths) < 1: + return "" + for p, strain in subpaths: + seq = AGG.reconstruct_path_seq(spgraph, path = p) + return seq[k:-k] + +def find_all_reads(graph): + read_sets = set() + edgelist = graph.edges.keys() + for item in edgelist: + readlist = graph.edges[item]['reads'] + for read in readlist: + read_sets.add(read) + return read_sets + +def get_snps(subgraph, mpath, k, ref): + SNPs = {} # Var[refpos]['edgename'] = ['A'] + for edge in mpath: + if edge.startswith('E'): + cigar = subgraph.edges[edge].get('variants', "") + if cigar == "": + continue + src = subgraph.incoming[edge][0] + if src != "SOURCE": + refstart = int(subgraph.anchor[src]["pos"]) + k + else: + raise "SOURCE or SINK node" + + # find reference seq + dst = subgraph.outgoing[edge][0] + ref_edge_list, ref_seq = findBedge(spgraph, src, dst, "NC_012920", k) + ref_edge = ref_edge_list[0] + expanded_cigar = AGG.processCigar(cigar) + refpos, altpos, var_type = get_variant_position(AGG.processCigar(cigar)) + alt_seq = subgraph.edges[edge]['seq'] + # only record SNPs + for i, vart in enumerate(var_type): + if vart == "X": + rp = refpos[i] + ap = altpos[i] + pos = refstart + rp + SNPs[pos] = SNPs.get(pos, {}) + SNPs[pos][edge] = SNPs[pos].get(edge, {}) + SNPs[pos][edge]['base'] = SNPs[pos][edge].get('base', "") + alt_seq[ap] + SNPs[pos][edge]['altoffset'] = SNPs[pos][edge].get('altoffset', []) + [ap] + SNPs[pos][ref_edge] = SNPs[pos].get(ref_edge, {}) + SNPs[pos][ref_edge]['base'] = ref_seq[refpos[i]] + SNPs[pos]['refbase'] = SNPs[pos][ref_edge]['base'] + return SNPs + +def get_indels(subgraph, mpath, k, ref): + Indels = {} # Var[refpos]['edgename'] = ['A'] + for edge in mpath: + if edge.startswith('E'): + cigar = subgraph.edges[edge].get('variants', "") + if cigar == "": + continue + src = subgraph.incoming[edge][0] + if src != "SOURCE": + refstart = int(subgraph.anchor[src]["pos"]) + k + else: + raise "SOURCE or SINK node" + + # find reference seq + dst = subgraph.outgoing[edge][0] + ref_edge_list, ref_seq = findBedge(spgraph, src, dst, "NC_012920", k) + ref_edge = ref_edge_list[0] + expanded_cigar = AGG.processCigar(cigar) + refpos, altpos, var_type = get_variant_position(AGG.processCigar(cigar)) + alt_seq = subgraph.edges[edge]['seq'] + # only record SNPs + for i, vart in enumerate(var_type): + if vart == "I": + rp = refpos[i] + ap = altpos[i] + pos = refstart + rp + Indels[pos] = Indels.get(pos, {}) + Indels[pos][edge] = Indels[pos].get(edge, {}) + Indels[pos][edge]['base'] = Indels[pos][edge].get('base', "") + alt_seq[ap:ap+2] + Indels[pos][edge]['info'] = "INS" + Indels[pos][edge]['altoffset'] = Indels[pos][edge].get('altoffset', []) + [ap] + Indels[pos][ref_edge] = Indels[pos].get(ref_edge, {}) + Indels[pos][ref_edge]['base'] = Indels[pos][ref_edge].get("base", "") + ref_seq[rp-1] + Indels[pos]['refbase'] = Indels[pos][ref_edge]['base'] + + if vart == "D": + rp = refpos[i] + ap = altpos[i] + pos = refstart + rp + Indels[pos] = Indels.get(pos, {}) + Indels[pos][edge] = Indels[pos].get(edge, {}) + Indels[pos][edge]['base'] = Indels[pos][edge].get('base', "") + alt_seq[ap] + Indels[pos][edge]['info'] = "DEL" + Indels[pos][edge]['altoffset'] = Indels[pos][edge].get('altoffset', []) + [ap-1] + Indels[pos][ref_edge] = Indels[pos].get(ref_edge, {}) + Indels[pos][ref_edge]['base'] = Indels[pos][ref_edge].get("base", "") + ref_seq[rp-1:rp+1] + Indels[pos]['refbase'] = Indels[pos][ref_edge]['base'] + return Indels + + +def get_VCF_file(Var, samplelist): + Mat = {} + + for pos, D in Var.items(): + Mat[pos] = {} + for edge, B in D.items(): + if edge == "refbase": + continue + for strain in samplelist: + Mat[pos][strain] = Mat[pos].get(strain, set()) | set([B['base']]) + #print(Mat) + # write VCF + VCF = {} + for pos, D in Mat.items(): + refbase = Var[pos]['refbase'] + if list(D.values())[0] == set([refbase]): + continue + VCF[pos]= VCF.get(pos, {}) + for strain, base in D.items(): + VCF[pos]["CHROM"] = "chrM" + VCF[pos]["POS"] = pos + VCF[pos]['ID'] = "." + VCF[pos]['REF'] = refbase + #print(base - set([refbase])) + VCF[pos]['ALT'] = ",".join(base - set([refbase])) + VCF[pos]['QUAL'] = "." + VCF[pos]['FILTER'] = "." + VCF[pos]['INFO'] = "." + VCF[pos]['FORMAT'] = "GT" + #print(base) + # allelelist = [] + # if refbase in base: + # allelelist += [0] + + # for i,s in enumerate(base - set([refbase])): + # allelelist += [i+1] + # VCF[pos][strain] = "/".join([str(item) for item in allelelist]) + VCF[pos][strain] = "/".join([str(item) for item in base]) + return VCF + + +def loadFasta(filename): + """ Parses a classically formatted and possibly + compressed FASTA file into a list of headers + and fragment sequences for each sequence contained. + The resulting sequences are 0-indexed! """ + if (filename.endswith(".gz")): + fp = gzip.open(filename, 'rb') + else: + fp = open(filename, 'rb') + # split at headers + data = fp.read().decode().split('>') + fp.close() + # ignore whatever appears before the 1st header + data.pop(0) + headers = [] + sequences = [] + for sequence in data: + lines = sequence.split('\n') + headers.append(lines.pop(0)) + sequences.append(''.join(lines)) + return (headers, sequences) \ No newline at end of file diff --git a/docker/lr-jellyfish/Dockerfile b/docker/lr-jellyfish/Dockerfile new file mode 100644 index 000000000..6ffb5afec --- /dev/null +++ b/docker/lr-jellyfish/Dockerfile @@ -0,0 +1,51 @@ +FROM continuumio/miniconda3 + +RUN apt-get -qqy update --fix-missing && \ + apt-get -qqy dist-upgrade && \ + apt-get -qqy install --no-install-recommends \ + ca-certificates \ + libbz2-dev \ + libcurl4-openssl-dev \ + liblzma-dev \ + libncurses5-dev \ + autoconf \ + automake \ + bzip2 \ + curl \ + gcc \ + g++ \ + git \ + gettext \ + make \ + wget \ + zlib1g-dev \ + pkg-config \ + python3-pip + +# RUN curl -O https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh && \ +# sh ./Miniconda3-latest-Linux-x86_64.sh -bufp /python3.10 && \ +# export PATH=/python3.10/bin:$PATH && \ +# python --version + +ENV PYTHONPATH=/opt/conda/lib/python3.11 + +RUN wget https://github.com/gmarcais/Jellyfish/releases/download/v2.3.0/jellyfish-2.3.0.tar.gz && \ + tar -xvf jellyfish-2.3.0.tar.gz && \ + cd jellyfish-2.3.0 && \ + ./configure --prefix=$HOME --enable-python-binding && \ + make -j 4 && \ + make install + + +ENV PKG_CONFIG_PATH=/jellyfish-2.3.0/ + + +RUN cd /jellyfish-2.3.0/swig/python && \ + python setup.py build && \ + python setup.py install + +RUN pip install numpy && \ + pip install matplotlib && \ + pip install argparse + +COPY AGG.py /usr/local/bin/ diff --git a/docker/lr-jellyfish/graph_construction_jellyfish.py b/docker/lr-jellyfish/graph_construction_jellyfish.py new file mode 100644 index 000000000..01176864d --- /dev/null +++ b/docker/lr-jellyfish/graph_construction_jellyfish.py @@ -0,0 +1,309 @@ +import sys +sys.path.append("/usr/local/bin") +import gzip +import argparse +from collections import defaultdict +import json +import numpy +import AGG + +def read_reference(filename): + + if filename.endswith(".gz"): + with gzip.open(filename, "rb") as fp: + data =fp.readlines() + else: + with open(filename, 'rb') as fp: + data = fp.readlines() + + kmers = [km.decode()[:-1] for km in data[1:]] + contig = "".join(kmers) + contig = "+" + contig # 1-based coordinates + return contig + +def map_to_genome(contig, anchorlist, k): + PositionDict = defaultdict(list) + for anchor in anchorlist: + PositionDict[anchor] + for i in range(1, len(contig) - k + 1): + kmer = contig[i:i+k] + if kmer in PositionDict: + PositionDict[kmer] = PositionDict.get(kmer, []) + [i] + return PositionDict + +def create_anchors(PositionDict, k): + anchor_updated_list = [] + + for kmer in PositionDict: + if len(PositionDict[kmer]) == 1: + anchor_updated_list.append(kmer) + AnchorInfo = {} + for kmer in anchor_updated_list: + pos = PositionDict[kmer][0] + anchorname = "A%06d" % (pos//k + 1) + AnchorInfo[anchorname] = {} + AnchorInfo[anchorname]['seq'] = kmer + AnchorInfo[anchorname]['pos'] = pos + + anchornames = sorted(AnchorInfo.keys()) + anchor_unadjacent_list = [] + index = 0 + sanchor = anchornames[index] + while sanchor < anchornames[-1]: + for danchor in anchornames[index+1:]: + if AnchorInfo[danchor]['pos'] - AnchorInfo[sanchor]['pos'] > k+1: + break + index += 1 + anchor_unadjacent_list += [sanchor, danchor] + sanchor = danchor + anchor_unadjacent_list = sorted(set(anchor_unadjacent_list))[:-1] + return AnchorInfo, anchor_unadjacent_list + +def loadFasta(filename): + """ Parses a classically formatted and possibly + compressed FASTA file into a list of headers + and fragment sequences for each sequence contained. + The resulting sequences are 0-indexed! """ + if (filename.endswith(".gz")): + fp = gzip.open(filename, 'rb') + else: + fp = open(filename, 'rb') + # split at headers + data = fp.read().decode().split('>') + fp.close() + # ignore whatever appears before the 1st header + data.pop(0) + headers = [] + sequences = [] + for sequence in data: + lines = sequence.split('\n') + headers.append(lines.pop(0)) + sequences.append(''.join(lines)) + return (headers, sequences) + +def mapping_info(AnchorInfo, contig, k): + anchor_list = list(AnchorInfo.keys()) + Anchorseq = {AnchorInfo[anchor]['seq']:anchor for anchor in anchor_list} + + seqlist = Anchorseq.keys() + PositionDict = defaultdict(list) + for anchor_seq in seqlist: + anchor_rev = ''.join([{'A':'T','C':'G','G':'C','T':'A'}[base] for base in reversed(anchor_seq)]) + PositionDict[anchor_seq] + PositionDict[anchor_rev] + + for i in range(1, len(contig) - k + 1): + kmer = contig[i:i+k] + if kmer in PositionDict: + PositionDict[kmer] = PositionDict.get(kmer, []) + [i] + + A = {} + SVs = {} + for anchor, D in AnchorInfo.items(): + anchor_seq = D['seq'] + anchor_rev = ''.join([{'A':'T','C':'G','G':'C','T':'A'}[base] for base in reversed(anchor_seq)]) + poslist = PositionDict[anchor_seq] + PositionDict[anchor_rev] + if len(poslist) == 1: + A[anchor] = poslist[0] + else: + SVs[anchor] = poslist + + return A, SVs + + +def find_edge_info(src_pos, dst_pos, k, contig, contigname, sample, Anchorseq): + E = {} # edgeinfo + # get source infomation + if src_pos == 0: + src = "SOURCE" + src_seq = "" + pr = False + else: + src_seq = contig[src_pos:src_pos + k] + try: + src = Anchorseq[src_seq] + pr = False + except: + src_seq = ''.join([{'A':'T','C':'G','G':'C','T':'A'}[base] for base in reversed(src_seq)]) + src = Anchorseq[src_seq] + pr = True + + dst_seq = contig[dst_pos:dst_pos+k] + + if dst_pos == len(contig): # fix sink edge issue 08/29/23 + dst = "SINK" + dst_seq = "" + sr = True + else: + try: + dst = Anchorseq[dst_seq] + sr = False + except: + dst_seq = ''.join([{'A':'T','C':'G','G':'C','T':'A'}[base] for base in reversed(dst_seq)]) + dst = Anchorseq[dst_seq] + sr = True + + if src_pos == 0: + edge_seq = contig[src_pos:dst_pos] # first edge fix bug 08/28/2023 + else: + edge_seq = contig[src_pos+k:dst_pos] # fix bug 08/23/2023 + + if pr and sr: + edge_seq = ''.join([{'A':'T','C':'G','G':'C','T':'A'}[base] for base in reversed(edge_seq)]) + node = src + src = dst + dst = node + + + E = {} + E['seq'] = edge_seq + E['src'] = src + E['dst'] = dst + E['reads'] = [contigname] + E['strain'] = [sample] + + + return E + +# loop through all reads +def create_edgefile(headers, sequences, AnchorInfo, k): + Edge_info = {} + Outgoing = {} + + edgenum_perread = [] + anchor_list = list(AnchorInfo.keys()) + Anchorseq = {AnchorInfo[anchor]['seq']:anchor for anchor in anchor_list} + + for contig_index in range(len(headers)): + contig = sequences[contig_index] + contig_name = headers[contig_index] + sample_name = contig_name.split('|')[-1] + A, SVs = mapping_info(AnchorInfo, contig, k) + splitposlist = sorted(A.values()) + edgeindex = 0 # + src_pos = 0 + for dst_pos in splitposlist: + E = find_edge_info(src_pos, dst_pos, k, contig, contig_name, sample_name, Anchorseq) + src = E['src'] + edgelist = Outgoing.get(src, []) + for edge in edgelist: + if Edge_info[edge]['dst'] != E['dst']: + continue + if Edge_info[edge]['seq'] == E['seq']: + Edge_info[edge]['reads'] += E['reads'] + Edge_info[edge]['strain'] = sorted(set(Edge_info[edge]['strain']) | set(E['strain'])) + break + else: + edgename = "E%05d.%04d" % (contig_index, edgeindex) + Edge_info[edgename] = E + Outgoing[src] = Outgoing.get(src,[]) + [edgename] + edgeindex += 1 + # update + src_pos = dst_pos + + dst_pos = len(contig) # fix bug 08/29/2023 + E = find_edge_info(src_pos, dst_pos, k, contig, contig_name, sample_name, Anchorseq) + src = E['src'] + edgelist = Outgoing.get(src, []) + for edge in edgelist: + if Edge_info[edge]['dst'] != E['dst']: + continue + if Edge_info[edge]['seq'] == E['seq']: + Edge_info[edge]['reads'] += E['reads'] + Edge_info[edge]['strain'] = sorted(set(Edge_info[edge]['strain']) | set(E['strain'])) + break + else: + edgename = "E%05d.%04d" % (contig_index, edgeindex) + Edge_info[edgename] = E + Outgoing[src] = Outgoing.get(src,[]) + [edgename] + edgeindex += 1 + + edgenum_perread.append(edgeindex) + + return Edge_info, Outgoing + #print(contig_name, edgeindex) + +def write_gfa(AnchorInfo, Edge_info, outputfilename): + header = ['H\tVN:Z:1.0\n'] + # add json annotation to edge and anchor sequences + AnchorS = [] + for anchor,D in AnchorInfo.items(): + seq = D.pop('seq') + json_string = json.dumps(D) + AnchorS += ['S\t%s\t%s\t%s\n' % (anchor, seq, "PG:J:" + json_string)] + + EdgeS = [] + Link = [] + for edge,edge_dict in Edge_info.items(): + seq = edge_dict.pop('seq') + src = edge_dict.pop('src') + dst = edge_dict.pop('dst') + + json_string = json.dumps(edge_dict) + EdgeS += ['S\t%s\t%s\t%s\t%s\n' % (edge, seq, "PG:J:" + json_string, "RC:i:" + str(len(edge_dict['reads'])))] + Link.append('L\t%s\t%s\t%s\t%s\t%s\n'% (src, "+", edge, "+", "0M")) + Link.append('L\t%s\t%s\t%s\t%s\t%s\n'% (edge, "+", dst, "+", "0M")) + + with open(outputfilename, 'w') as fp: + for h in header: + fp.write(h) + for s in AnchorS: + fp.write(s) + for s in EdgeS: + fp.write(s) + for l in Link: + fp.write(l) + +def main(): + # create parser object + parser = argparse.ArgumentParser(description = "Construct Anchor-based Graphical Genome") + + parser.add_argument("-i", "--Anchorlist", type = str, nargs = 1, + metavar = "input anchor list", default = None, + help = "a path for a list of anchor sequences") + parser.add_argument("-f", "--ReferenceFa", type = str, nargs = 1, + metavar = "Reference fasta file", help = "path for linear reference fasta file") + parser.add_argument("-n", "--referencename", type = str, nargs = 1, + metavar = "a string for reference accession", help = "a reference id") + parser.add_argument("-r", "--readFa", type = str, nargs = 1, + metavar = "a fasta file for all input reads with readname_sample information in header", help = "a fasta file for all input reads with readname_sample information in header") + + parser.add_argument("-t", "--ReadcountThreshold", type = int, nargs = 1, + metavar = "an integer, edges with less than t reads supported are purged, unless its on reference assemblies") + parser.add_argument("-o", "--Output", type = str, nargs = 1, + metavar = "output filename", help = "output gfa filename") + + args = parser.parse_args() + anchor_file = args.Anchorlist[0] + ref_fa = args.ReferenceFa[0] + ref_name = args.referencename[0] + read_fa = args.readFa[0] + outputgfa = args.Output[0] + threshold = args.ReadcountThreshold[0] + + + with open(anchor_file, 'r') as fp: + data = fp.readlines() + anchorlist = [item[:-1] for item in data] + ref_contig = read_reference(ref_fa) + PositionDict = map_to_genome(ref_contig, anchorlist, k) + AnchorInfo, anchor_unadjacent_list = create_anchors(PositionDict, k) + Final_anchor = {anchor:AnchorInfo[anchor] for anchor in anchor_unadjacent_list} + + pos1 = [AnchorInfo[anchor]['pos'] for anchor in anchor_unadjacent_list[1:]] + pos = [AnchorInfo[anchor]['pos'] for anchor in anchor_unadjacent_list[:-1]] + assert len(numpy.where((numpy.array(pos1) - numpy.array(pos))<=k)[0]) == 0 + + headers, sequences = loadFasta(read_fa) + headers.append(ref_name) + sequences.append(ref_contig[1:]) + + Edge_info, Outgoing = create_edgefile(headers, sequences, Final_anchor, k) + write_gfa(Final_anchor, Edge_info, outputgfa) + + + +if __name__ == "__main__": + # calling the main function + main() + diff --git a/wdl/pipelines/PacBio/Assembly/ClassifyHumanMitochondriaLongReads.wdl b/wdl/pipelines/PacBio/Assembly/ClassifyHumanMitochondriaLongReads.wdl new file mode 100644 index 000000000..dfcd7ae1a --- /dev/null +++ b/wdl/pipelines/PacBio/Assembly/ClassifyHumanMitochondriaLongReads.wdl @@ -0,0 +1,267 @@ +version 1.0 + +workflow ClassifyHumanMitochondriaLongReads { + meta { + description: + "A preprocessing workfow for classifying chrM-mapping reads in a human sample's long reads BAM." + } + parameter_meta { + classifier_docker_tag: + "tag of docker 'us.gcr.io/broad-dsp-lrma/mito_ccs_read_preprocessor' to use" + + for_assembly_bam : + "mitochondria reads ready for assembly, in BAM format." + for_assembly_fasta : + "mitochondria reads ready for assembly, in FASTA(.gz) format." + reads_multimap : + "BAM holding alignment records of sequences that has multiple mapping records." + reads_too_short : + "BAM holding alignment records of sequences that are too short (currently hardcoded length of 7kbp)." + reads_selfoverlap_on_ref : + "BAM holding alignment records of sequences whose alignment overlap 'significantly' on rCRS reference." + reads_break_cocircularity : + "BAM holding alignment records of sequences whose alignments break co-circularity." + read_length_histogram : + "A flatfile holding the length distribution of input bam." + output_notebook : + "A jupyternotebook display information for debugging how reads are classied." + read_count_by_class: + "Count of reads in each category. If they are " + } + input { + File bam + File bai + + String classifier_docker_tag # 0.0.1 is the latest as of 2024-01-11 + } + output { + File for_assembly_bam = select_first([ClassifyMitoReads.for_assembly_bam]) + File for_assembly_bai = select_first([ClassifyMitoReads.for_assembly_bai]) + File for_assembly_fasta = select_first([ClassifyMitoReads.for_assembly_fasta]) + Map[String, Int]? read_count_by_class = CountReads.count_by_case + File? reads_multimap = ClassifyMitoReads.reads_multimap + File? reads_too_short = ClassifyMitoReads.reads_too_short + File? reads_selfoverlap_on_ref = ClassifyMitoReads.reads_selfoverlap_on_ref + File? reads_break_cocircularity = ClassifyMitoReads.reads_break_cocircularity + File? read_length_histogram = ClassifyMitoReads.read_length_histogram + File output_notebook = select_first([ClassifyMitoReads.output_notebook]) + } + + call ExtractMitochondriaReads { input: bam = bam, bai = bai } + + if (ExtractMitochondriaReads.is_samtools_failed) { + call StopWorkflow { input: + reason = "Streaming from bucket to subset to chrM failed. This is likely transient. Please rerun without call-caching." + } + } + + if (!ExtractMitochondriaReads.is_samtools_failed) { + call ClassifyMitoReads { input: + bam = ExtractMitochondriaReads.subset_bam, + bai = ExtractMitochondriaReads.subset_bai, + classifier_docker_tag = classifier_docker_tag + } + + call CountReads { input: + chrM_bam = ExtractMitochondriaReads.subset_bam, + classified_bams = select_all([ClassifyMitoReads.for_assembly_bam, + ClassifyMitoReads.reads_multimap, ClassifyMitoReads.reads_too_short, + ClassifyMitoReads.reads_selfoverlap_on_ref, ClassifyMitoReads.reads_break_cocircularity]) + } + + + } +} +task ClassifyMitoReads { + meta { + description: + "Classify chrM-mapping reads in a human sample's long reads BAM." + } + parameter_meta { + bam: + "chrM mapping long reads of a human sample" + + classifier_docker_tag: + "tag of docker 'us.gcr.io/broad-dsp-lrma/mito_ccs_read_preprocessor' to use" + } + input { + File bam + File bai + + String classifier_docker_tag + } + output { + File for_assembly_bam = "local_out_dir/~{prefix}.classified_reads.for_assembly.bam" + File for_assembly_bai = "local_out_dir/~{prefix}.classified_reads.for_assembly.bam.bai" + File for_assembly_fasta = "local_out_dir/~{prefix}.classified_reads.for_assembly.fastq.gz" + + File? reads_multimap = "local_out_dir/~{prefix}.classified_reads.multimap.bam" + File? reads_too_short = "local_out_dir/~{prefix}.classified_reads.too_short.bam" + File? reads_selfoverlap_on_ref = "local_out_dir/~{prefix}.classified_reads.ovp_pair_alns_lr.bam" + File? reads_break_cocircularity = "local_out_dir/~{prefix}.classified_reads.break_cocircularity.bam" + + File read_length_histogram = "local_out_dir/~{prefix}.lengths.histogram.flatfile" + File output_notebook = "local_out_dir/~{prefix}.classified_reads.mito_reads_analyzer.papermill_out.ipynb" + } + String prefix = basename(bam, ".bam") + + command <<< + set -eux + + bash classify_reads.sh \ + ~{bam} \ + local_out_dir + + tree local_out_dir + >>> + + Int disk_size = 20 + runtime { + cpu: 2 + memory: "8 GiB" + disks: "local-disk ~{disk_size} SSD" + preemptible: 2 + maxRetries: 1 + docker: "us.gcr.io/broad-dsp-lrma/mito_ccs_read_preprocessor:~{classifier_docker_tag}" + } +} + +task ExtractMitochondriaReads { + meta { + description: + "For subsetting a high-coverage BAM stored in GCS, without localizing (more resilient to auth. expiration)." + } + + parameter_meta { + bam: { + localization_optional: true + } + + is_samtools_failed: "if true, the streaming of BAM from the bucket didn't succeed, so consider the result BAM corrupted." + } + + input { + File bam + File bai + } + + output { + Boolean is_samtools_failed = read_boolean("samtools.failed.txt") + File subset_bam = "~{subset_prefix}.bam" + File subset_bai = "~{subset_prefix}.bam.bai" + } + + Int disk_size = 20 # 20GiB reads mapping onto chrM is problematic + + String subset_prefix = basename(bam, ".bam") + ".chrM" + + command <<< + + # the way this works is the following: + # 0) relying on the re-auth.sh script to export the credentials + # 1) perform the remote sam-view subsetting in the background + # 2) listen to the PID of the background process, while re-auth every 1200 seconds + source /opt/re-auth.sh + set -euxo pipefail + + echo "false" > "samtools.failed.txt" + + # see man page for what '-M' means + samtools view \ + -bhX \ + -M \ + -@ 1 \ + --verbosity=8 \ + --write-index \ + -o "~{subset_prefix}.bam##idx##~{subset_prefix}.bam.bai" \ + ~{bam} ~{bai} \ + 'chrM' && exit 0 || { echo "samtools seem to have failed"; echo "true" > "samtools.failed.txt"; exit 77; } & + pid=$! + + set +e + count=0 + while true; do + sleep 1200 && date && source /opt/re-auth.sh + count=$(( count+1 )) + if [[ ${count} -gt 6 ]]; then echo "true" > "samtools.failed.txt" && exit 0; fi # way too many attempts, get out + if ! pgrep -x -P $pid; then exit 0; fi + done + >>> + + + runtime { + cpu: 2 + memory: "8 GiB" + disks: "local-disk ~{disk_size} SSD" + preemptible: 2 + maxRetries: 1 + docker: "us.gcr.io/broad-dsp-lrma/lr-gcloud-samtools:0.1.3" + } +} + +task StopWorkflow { + + meta { + description: "Utility to stop a workflow" + } + + parameter_meta { + reason: "reason for stopping" + } + + input { + String reason + } + command <<< + echo -e "Workflow explicitly stopped because \n ~{reason}." && exit 1 + >>> + runtime {docker: "gcr.io/cloud-marketplace/google/ubuntu2004:latest"} +} + +task CountReads { + meta { + desciption: "Count the number of reads" + } + + input { + File chrM_bam + Array[File] classified_bams + } + + output { + Map[String, Int] count_by_case = read_map("count_by_case.tsv") + } + + command <<< + set -eux + + # not using 'samtools view -c' because we want count of unique molecules, not alignment records + cnt=$(samtools view ~{chrM_bam} | awk -F '\t' '{print $1}' | sort | uniq | wc -l | awk '{print $1}') + reads_class='chrM' + echo -e "${reads_class}\t${cnt}" >> count_by_case.tsv + rm ~{chrM_bam} + + for bam in ~{sep=' ' classified_bams}; + do + reads_class=$(echo "${bam}" | awk -F '.' '{print $(NF-1)}') + cnt=$(samtools view "${bam}" | awk -F '\t' '{print $1}' | sort | uniq | wc -l | awk '{print $1}') + echo -e "${reads_class}\t${cnt}" >> count_by_case.tsv + done + cat count_by_case.tsv + + all_reads_classes=("for_assembly" "multimap" "too_short" "ovp_pair_alns_lr" "break_cocircularity") + for reads_class in "${all_reads_classes[@]}"; + do + if ! grep -qF 'for_assembly' count_by_case.tsv; then echo -e "\t0" >> count_by_case.tsv; fi + done + + sed -i.bak 's/ovp_pair_alns_lr/signif_overlap/' count_by_case.tsv + >>> + + runtime { + cpu: 1 + memory: "4 GiB" + disks: "local-disk 10 HDD" + docker: "us.gcr.io/broad-dsp-lrma/lr-gcloud-samtools:0.1.3" + } +} \ No newline at end of file diff --git a/wdl/pipelines/PacBio/Assembly/Shapeit5.wdl b/wdl/pipelines/PacBio/Assembly/Shapeit5.wdl new file mode 100644 index 000000000..01e8890a5 --- /dev/null +++ b/wdl/pipelines/PacBio/Assembly/Shapeit5.wdl @@ -0,0 +1,52 @@ +version 1.0 + +workflow Shapeit5{ + meta{ + description: "a workflow that using Shapeit5 to do phasing" + } + input{ + File wholegenomevcf + File wholegenometbi + File geneticmapping + String genomeregion + Int nthreads + } + call phasing{input: vcf_input=wholegenomevcf, vcf_index=wholegenometbi, mappingfile= geneticmapping, region=genomeregion, num_threads=nthreads} + output{ + File scaffold = phasing.scaffold_vcf + } +} + +task phasing{ + input{ + File vcf_input + File vcf_index + File mappingfile + String region + Int num_threads + Float minimal_maf = 0.001 + } + command <<< + #bcftools +fill-tags bcftools view -Oz -o myVCF.filtag.vcf.gz + # add AN AC tag + bcftools +fill-tags ~{vcf_input} -Ob -o tmp.out.bcf -- -t AN,AC + bcftools index tmp.out.bcf + phase_common_static --input tmp.out.bcf --filter-maf ~{minimal_maf} --region ~{region} --map ~{mappingfile} --output scaffold.bcf --thread ~{num_threads} + >>> + + output{ + File scaffold_vcf = "scaffold.bcf" + } + + Int disk_size = 100 + ceil(2 * size(vcf_input, "GiB")) + + runtime { + cpu: 1 + memory: "50 GiB" + disks: "local-disk " + disk_size + " HDD" #"local-disk 100 HDD" + bootDiskSizeGb: 10 + preemptible: 2 + maxRetries: 1 + docker: "lindonkambule/shapeit5_2023-05-05_d6ce1e2:v5.1.1" + } +} \ No newline at end of file diff --git a/wdl/pipelines/PacBio/Assembly/StatisticalPhasing.wdl b/wdl/pipelines/PacBio/Assembly/StatisticalPhasing.wdl new file mode 100644 index 000000000..24e352645 --- /dev/null +++ b/wdl/pipelines/PacBio/Assembly/StatisticalPhasing.wdl @@ -0,0 +1,133 @@ +version 1.0 + +workflow StatisticalPhasing{ + meta{ + description: "a workflow that using Whatshap to do read-based phasing and Shapeit for statistical phasing" + } + input{ + Array[File] baminputs + File reference + File joint_g_vcf + File joint_g_tbi + String outputpref + File geneticmapping + String genomeregion + Int nthreads + } + call whatshapphasing{input: inputbams=baminputs, ref=reference, outputprefix=outputpref, joint_vcf=joint_g_vcf, joint_vcf_tbi=joint_g_tbi} + call shapeit5{input: vcf_input=joint_g_vcf, vcf_index=joint_g_tbi, mappingfile= geneticmapping, region=genomeregion, num_threads=nthreads} + call shapeit4{input: vcf_input=whatshapphasing.phased_vcf, vcf_index=whatshapphasing.phase_vcf_tbi, mappingfile= geneticmapping,scaffold=shapeit5.scaffold_vcf,scaffold_index=shapeit5.scaffold_vcf_index, region=genomeregion,num_threads=nthreads} + output{ + File scaffold = shapeit4.scaffold_vcf + } +} + +task whatshapphasing { + input{ + Array[File] inputbams + File ref + File joint_vcf + File joint_vcf_tbi + String outputprefix + } + + command <<< + + set -x pipefail + samtools faidx ~{ref} + + samtools index -M ~{sep=' ' inputbams} + whatshap phase -o ~{outputprefix}.phased.vcf --tag=PS --reference=~{ref} ~{joint_vcf} ~{sep=" " inputbams} + bcftools +fill-tags ~{outputprefix}.phased.vcf -Ob -o tmp.phased.bcf -- -t AN,AC + bcftools index tmp.phased.bcf + + >>> + + output { + File phased_vcf = "tmp.phased.bcf" + File phase_vcf_tbi = "tmp.phased.bcf.csi" + } + + + Int disk_size = 100 + + runtime { + cpu: 16 + memory: "64 GiB" + disks: "local-disk " + disk_size + " HDD" #"local-disk 100 HDD" + bootDiskSizeGb: 10 + preemptible: 2 + maxRetries: 1 + docker: "hangsuunc/whatshap:v1" + } +} + + +task shapeit5{ + input{ + File vcf_input + File vcf_index + File mappingfile + String region + Int num_threads + Float minimal_maf = 0.001 + } + command <<< + # add AN AC tag + bcftools +fill-tags ~{vcf_input} -Ob -o tmp.out.bcf -- -t AN,AC + bcftools index tmp.out.bcf + phase_common_static --input tmp.out.bcf --filter-maf ~{minimal_maf} --region ~{region} --map ~{mappingfile} --output scaffold.bcf --thread ~{num_threads} + bcftools +fill-tags scaffold.bcf -Ob -o tmp.scaffold.bcf -- -t AN,AC + bcftools index tmp.scaffold.bcf + >>> + + output{ + File scaffold_vcf = "tmp.scaffold.bcf" + File scaffold_vcf_index = "tmp.scaffold.bcf.csi" + } + + Int disk_size = 100 + ceil(2 * size(vcf_input, "GiB")) + + runtime { + cpu: 1 + memory: "50 GiB" + disks: "local-disk " + disk_size + " HDD" #"local-disk 100 HDD" + bootDiskSizeGb: 10 + preemptible: 2 + maxRetries: 1 + docker: "lindonkambule/shapeit5_2023-05-05_d6ce1e2:v5.1.1" + } +} + +task shapeit4{ + input{ + File vcf_input + File vcf_index + File mappingfile + File scaffold + File scaffold_index + String region + Int num_threads + } + command <<< + # add AN AC tag + shapeit4 --input ~{vcf_input} --map ~{mappingfile} --region ~{region} --scaffold ~{scaffold} --use-PS 0.0001 --output scaffold.bcf --thread ~{num_threads} --log phased.log + + >>> + + output{ + File scaffold_vcf = "scaffold.bcf" + } + + Int disk_size = 100 + ceil(2 * size(vcf_input, "GiB")) + + runtime { + cpu: 1 + memory: "50 GiB" + disks: "local-disk " + disk_size + " HDD" #"local-disk 100 HDD" + bootDiskSizeGb: 10 + preemptible: 2 + maxRetries: 1 + docker: "lifebitai/shapeit4:latest" + } +} \ No newline at end of file diff --git a/wdl/pipelines/PacBio/Assembly/WhatshapPhasing.wdl b/wdl/pipelines/PacBio/Assembly/WhatshapPhasing.wdl new file mode 100644 index 000000000..11e3cccc2 --- /dev/null +++ b/wdl/pipelines/PacBio/Assembly/WhatshapPhasing.wdl @@ -0,0 +1,67 @@ +version 1.0 + +workflow WhatshapPhasing{ + input{ + Array[File] baminputs + File reference + File joint_g_vcf + File joint_g_tbi + String outputpref + } + + + call phasing{input: inputbams=baminputs, ref=reference, outputprefix=outputpref, joint_vcf=joint_g_vcf, joint_vcf_tbi=joint_g_tbi} + + meta{ + Purpose:" read-based phasing by whatshap" + } + + output{ + File readphased_vcf = phasing.phased_vcf + File readphased_vcf_tbi = phasing.phase_vcf_tbi + } +} + + +task phasing { + input{ + Array[File] inputbams + File ref + File joint_vcf + File joint_vcf_tbi + String outputprefix + } + + command <<< + + set -x pipefail + samtools faidx ~{ref} + + samtools index -M ~{sep=' ' inputbams} + + whatshap phase -o ~{outputprefix}.phased.vcf --tag=PS --reference=~{ref} ~{joint_vcf} ~{sep=" " inputbams} + + bgzip -c ~{outputprefix}.phased.vcf > ~{outputprefix}.phased.vcf.gz + + tabix -p vcf ~{outputprefix}.phased.vcf.gz + + >>> + + output { + File phased_vcf = "~{outputprefix}.phased.vcf.gz" + File phase_vcf_tbi = "~{outputprefix}.phased.vcf.gz.tbi" + } + + + Int disk_size = 100 + + runtime { + cpu: 16 + memory: "64 GiB" + disks: "local-disk " + disk_size + " HDD" #"local-disk 100 HDD" + bootDiskSizeGb: 10 + preemptible: 2 + maxRetries: 1 + docker: "hangsuunc/whatshap:v1" + } +} diff --git a/wdl/pipelines/PacBio/Utility/Bam2Fastq.wdl b/wdl/pipelines/PacBio/Utility/Bam2Fastq.wdl new file mode 100644 index 000000000..035ad3c20 --- /dev/null +++ b/wdl/pipelines/PacBio/Utility/Bam2Fastq.wdl @@ -0,0 +1,48 @@ +version 1.0 + +workflow Bam2Fastq{ + meta{ + description: "a workflow that extract vcfs and bams in a genomic interval" + } + input{ + File bam + String prefix + } + call bam2fastq{input: bam_input=bam, pref=prefix} + output{ + File fq1 = bam2fastq.fq1 + File fq2 = bam2fastq.fq2 + } +} + + + +task bam2fastq{ + input{ + File bam_input + String pref + } + command <<< + samtools index ~{bam_input} + bedtools bamtofastq -i ~{bam_input} \ + -fq ~{pref}.aln.R1.fq \ + -fq2 ~{pref}.aln.R2.fq + >>> + + output{ + File fq1="~{pref}.aln.R1.fq" + File fq2 = "~{pref}.aln.R2.fq" + } + + Int disk_size = 10 + ceil(2 * size(bam_input, "GiB")) + + runtime { + cpu: 1 + memory: "6 GiB" + disks: "local-disk " + disk_size + " HDD" #"local-disk 100 HDD" + bootDiskSizeGb: 10 + preemptible: 2 + maxRetries: 1 + docker: "us.gcr.io/broad-dsp-lrma/lr-basic:0.1.1" + } +} diff --git a/wdl/pipelines/PacBio/Utility/MergeFa.wdl b/wdl/pipelines/PacBio/Utility/MergeFa.wdl new file mode 100644 index 000000000..73b1d02bd --- /dev/null +++ b/wdl/pipelines/PacBio/Utility/MergeFa.wdl @@ -0,0 +1,43 @@ +version 1.0 + +workflow Bam2Fastq{ + meta{ + description: "a workflow that extract vcfs and bams in a genomic interval" + } + input{ + Array[File] fastas + String prefix + } + call catfasta{input: fas=fastas, pref=prefix} + output{ + File fq1 = catfasta.fasta + } +} + + + +task catfasta{ + input{ + Array[File] fas + String pref + } + command <<< + cat ~{sep=" " fas} > ~{pref}.aln.fa + >>> + + output{ + File fasta="~{pref}.aln.fa" + } + + Int disk_size = 100 + + runtime { + cpu: 1 + memory: "6 GiB" + disks: "local-disk " + disk_size + " HDD" #"local-disk 100 HDD" + bootDiskSizeGb: 10 + preemptible: 2 + maxRetries: 1 + docker: "us.gcr.io/broad-dsp-lrma/lr-basic:0.1.1" + } +} diff --git a/wdl/pipelines/PacBio/Utility/MergePhasedVCF.wdl b/wdl/pipelines/PacBio/Utility/MergePhasedVCF.wdl new file mode 100644 index 000000000..5e3cd0b3a --- /dev/null +++ b/wdl/pipelines/PacBio/Utility/MergePhasedVCF.wdl @@ -0,0 +1,113 @@ +version 1.0 +import "../../../tasks/Utility/VariantUtils.wdl" as VU + + +workflow MergePhasedVCF{ + meta{ + description: "a workflow that get a vcf file from a list of perchromosome level vcf" + } + input{ + String SampleFolder + String Samplename + File reference_fasta_fai + } + call FindFiles{input: sampleFolder = SampleFolder} + scatter (bcf_file in FindFiles.vcfFiles){ + call preprocess{input: bcf = bcf_file, prefix = basename(bcf_file)} + } + + call VU.CollectDefinitions as CD { + input: + vcfs = preprocess.vcf + } + call VU.MergeAndSortVCFs as Merge { + input: + vcfs = preprocess.vcf, + ref_fasta_fai = reference_fasta_fai, + prefix = Samplename, + header_definitions_file = CD.union_definitions + } + + output{ + File wholegenomevcf = Merge.vcf + File wholegenometbi = Merge.tbi + } +} + + +task FindFiles { + input { + String sampleFolder + } + +command { + # Use GSUtil to list all files in the given directory + gsutil ls "${sampleFolder}" > vcf_files.txt + # Filter the lines with ".bam" extension and store the result in "bam_files.txt" + grep -E "\.bcf$" vcf_files.txt > output.txt + + cat output.txt + } + + output { + # Output the list of .bam files + Array[String] vcfFiles = read_lines("output.txt") + + } + + runtime { + docker: "broadinstitute/gatk:4.4.0.0" + disks: "local-disk 100 HDD" + } +} + +task preprocess { + input { + File bcf + String prefix + } + +command { + bcftools view -Oz -o ~{prefix}.tmp.vcf.gz ~{bcf} + # bcftools sort -o whole_genome_sorted.vcf.gz tmp.vcf.gz + # bcftools index --tbi --force tmp.vcf.gz + + + + } + + output { + # Output the list of .bam files + File vcf = "~{prefix}.tmp.vcf.gz" + # File whole_genome_vcf_tbi = "tmp.vcf.gz.tbi" + + } + + runtime { + docker: "us.gcr.io/broad-dsp-lrma/lr-basic:0.1.1" + disks: "local-disk 100 HDD" + } +} + +task MergeVcf { + input { + Array[File] vcf_files + } + +command { + bcftools concat -o whole_genome.vcf.gz ~{sep=" " vcf_files} + # bcftools sort -o whole_genome_sorted.vcf.gz whole_genome.vcf.gz + + } + + output { + # Output the list of .bam files + File whole_genome_vcf = "whole_genome.vcf.gz" + + } + + runtime { + docker: "us.gcr.io/broad-dsp-lrma/lr-basic:0.1.1" + disks: "local-disk 100 HDD" + } +} diff --git a/wdl/pipelines/TechAgnostic/Utility/BandagePlotGraph.wdl b/wdl/pipelines/TechAgnostic/Utility/BandagePlotGraph.wdl new file mode 100644 index 000000000..6eaedc03b --- /dev/null +++ b/wdl/pipelines/TechAgnostic/Utility/BandagePlotGraph.wdl @@ -0,0 +1,41 @@ +version 1.0 + +workflow BandagePlotGraph{ + meta{ + description: "a workflow that using Bandage to plot gfa graph" + } + input{ + File graph_gfa + String prefix + } + call bandage{input: gfa=graph_gfa, outputprefix=prefix} + output{ + File figure = bandage.image + } +} + +task bandage{ + input{ + File gfa + String outputprefix + } + command <<< + Bandage image ~{gfa} ~{outputprefix}.png + >>> + + output{ + File image = "~{outputprefix}.png" + } + + Int disk_size = 10 + ceil(size(gfa, "GiB")) + + runtime { + cpu: 4 + memory: "16 GiB" + disks: "local-disk " + disk_size + " HDD" #"local-disk 100 HDD" + bootDiskSizeGb: 10 + preemptible: 0 + maxRetries: 1 + docker: "hangsuunc/bandage:v1" + } +} \ No newline at end of file diff --git a/wdl/pipelines/TechAgnostic/Utility/ConvertToHailMTT2T.wdl b/wdl/pipelines/TechAgnostic/Utility/ConvertToHailMTT2T.wdl new file mode 100644 index 000000000..e01801851 --- /dev/null +++ b/wdl/pipelines/TechAgnostic/Utility/ConvertToHailMTT2T.wdl @@ -0,0 +1,37 @@ +version 1.0 + +import "../../../tasks/Utility/Hail.wdl" as Hail +import "../../../tasks/Utility/Finalize.wdl" as FF + +workflow ConvertToHailMTT2T { + + meta { + description: "Convert a VCF to a Hail MatrixTable" + } + parameter_meta { + } + + input { + File whole_genome_vcf + File whole_genome_vcf_tbi + String prefix + String gcs_out_root_dir + } + + String outdir = sub(gcs_out_root_dir, "/$", "") + "/Hail/~{prefix}" + + call Hail.ConvertToHailMT as RunConvertToHailMT { + input: + gvcf = whole_genome_vcf, + tbi = whole_genome_vcf_tbi, + prefix = prefix, + outdir = outdir + + } + + + output { + String joint_mt = RunConvertToHailMT.gcs_path + } +} + diff --git a/wdl/pipelines/TechAgnostic/Utility/CountReadNum.wdl b/wdl/pipelines/TechAgnostic/Utility/CountReadNum.wdl new file mode 100644 index 000000000..698ac6553 --- /dev/null +++ b/wdl/pipelines/TechAgnostic/Utility/CountReadNum.wdl @@ -0,0 +1,46 @@ +version 1.0 + +workflow ExtractVcfandBam{ + meta{ + description: "a workflow that extract vcfs and bams in a genomic interval" + } + input{ + File bam + File bai + String genomeregion + String prefix + } + call count_read_num{input: bam_input=bam, bam_index=bai, region= genomeregion, pref=prefix} + output{ + Int read_number = read_int(count_read_num.read_number) + } +} + +task count_read_num{ + input{ + File bam_input + File bam_index + String region + String pref + } + command <<< + samtools view -c ~{bam_input} > read_num.txt + #samtools index ~{pref}.~{region}.bam + >>> + + output{ + File read_number = "read_num.txt" + } + + Int disk_size = 10 + ceil(2 * size(bam_input, "GiB")) + + runtime { + cpu: 1 + memory: "10 GiB" + disks: "local-disk " + disk_size + " HDD" #"local-disk 100 HDD" + bootDiskSizeGb: 10 + preemptible: 2 + maxRetries: 1 + docker: "us.gcr.io/broad-dsp-lrma/lr-basic:0.1.1" + } +} \ No newline at end of file diff --git a/wdl/tasks/Utility/Hail.wdl b/wdl/tasks/Utility/Hail.wdl index 1b2c16215..39d78ccb2 100644 --- a/wdl/tasks/Utility/Hail.wdl +++ b/wdl/tasks/Utility/Hail.wdl @@ -32,7 +32,7 @@ task ConvertToHailMT { RuntimeAttr? runtime_attr_override } - Int disk_size = 1 + 3*ceil(size(gvcf, "GB")) + Int disk_size = 100 + 3*ceil(size(gvcf, "GB")) command <<< set -x diff --git a/wdl/tasks/Utility/VariantUtils.wdl b/wdl/tasks/Utility/VariantUtils.wdl index eaecfbaf9..892092b27 100644 --- a/wdl/tasks/Utility/VariantUtils.wdl +++ b/wdl/tasks/Utility/VariantUtils.wdl @@ -234,9 +234,9 @@ task CollectDefinitions { cat tmp*txt > easy.txt && rm tmp*txt - touch tmp.alt.txt tmp.ft.txt tmp.info.txt tmp.format.txt + touch tmp.ft.txt tmp.info.txt tmp.format.txt for vcf in ~{sep=' ' vcfs}; do - zgrep -F '##ALT=' "${vcf}" >> tmp.alt.txt + # zgrep -F '##ALT=' "${vcf}" >> tmp.alt.txt zgrep -F '##FILTER=' "${vcf}" >> tmp.ft.txt zgrep -F '##INFO=' "${vcf}" >> tmp.info.txt zgrep -F '##FORMAT=' "${vcf}" >> tmp.format.txt @@ -244,7 +244,7 @@ task CollectDefinitions { for txt in tmp*txt; do sort "${txt}" | uniq > "${txt}.union" done - cat tmp.alt.txt.union tmp.ft.txt.union tmp.info.txt.union tmp.format.txt.union > hard.txt + cat tmp.ft.txt.union tmp.info.txt.union tmp.format.txt.union > hard.txt cat easy.txt hard.txt > definitions.union.txt >>>