From eb52cde4997e11d77181ff5d3925a0a8bf630691 Mon Sep 17 00:00:00 2001 From: Jie Zhu Date: Thu, 25 Apr 2024 12:53:34 +0800 Subject: [PATCH 1/8] extract then merge --- SemiBin/generate_coverage.py | 62 ++++++++++++++++++++++++++++++++++++ SemiBin/main.py | 24 ++++---------- 2 files changed, 68 insertions(+), 18 deletions(-) diff --git a/SemiBin/generate_coverage.py b/SemiBin/generate_coverage.py index 816cb02..3d77058 100644 --- a/SemiBin/generate_coverage.py +++ b/SemiBin/generate_coverage.py @@ -172,6 +172,68 @@ def combine_cov(cov_dir : str, bam_list, is_combined : bool): # bam_list : list[ data_split_cov = None return data_cov, data_split_cov + +def combine_sample_cov(sample: str, cov_dir: str, bam_list, is_combined: bool, separator): + """ + generate cov/cov_split for specific sample in one file + + Parameters + ---------- + sample_id : sample name + cov_dir : where coverage files are stored + bam_list : list of BAM files + is_combined : whether to process split files + separator: separator + + Returns + ------- + sample_cov : DataFrame + sample_cov_split : DataFrame (if is_combined) or None (otherwise) + """ + import pandas as pd + import numpy as np + + covs = [] + split_covs = [] + for bam_index, bam_file in enumerate(bam_list): + bam_fname = os.path.split(bam_file)[-1] + data_cov = pd.read_csv(f'{cov_dir}/{bam_fname}_{bam_index}_data_cov.csv', index_col=0) + data_cov = data_cov.reset_index() + columns_list = list(data_cov.columns) + columns_list[0] = 'contig_name' + data_cov.columns = columns_list + + part_data = data_cov[data_cov['contig_name'].str.contains(sample + separator, regex=False)] + part_data = part_data.set_index("contig_name") + part_data.index.name = None + part_data.index = [ix.split(separator)[1] for ix in part_data.index] + covs.append(part_data) + + if is_combined: + data_split_cov = pd.read_csv(f'{cov_dir}/{bam_fname}_{bam_index}_data_split_cov.csv', index_col=0) + data_split_cov = data_split_cov.reset_index() + columns_list = list(data_split_cov.columns) + columns_list[0] = 'contig_name' + data_split_cov.columns = columns_list + + part_data = data_split_cov[data_split_cov['contig_name'].str.contains(sample + separator, regex=False)] + part_data = part_data.set_index("contig_name") + part_data.index.name = None + part_data.index = [ix.split(separator)[1] for ix in part_data.index] + split_covs.append(part_data) + + sample_cov = pd.concat(covs, axis=1) + sample_cov.index = sample_cov.index.astype(str) + if is_combined: + sample_cov_split = pd.concat(split_covs, axis=1) + sample_cov_split.index = sample_cov_split.index.astype(str) + abun_scale = (sample_cov_split.mean() / 100).apply(np.ceil) * 100 + sample_cov_split = sample_cov_split.div(abun_scale) + else: + sample_cov_split = None + return sample_cov, sample_cov_split + + def generate_cov_from_abundances(abundances, output, contig_path, contig_threshold=1000, sep=None, contig_threshold_dict=None): import pandas as pd import numpy as np diff --git a/SemiBin/main.py b/SemiBin/main.py index 491846b..3b8c9d7 100644 --- a/SemiBin/main.py +++ b/SemiBin/main.py @@ -11,7 +11,7 @@ from . import utils from .utils import validate_normalize_args, get_must_link_threshold, generate_cannot_link, \ set_random_seed, process_fasta, split_data, get_model_path, extract_bams -from .generate_coverage import generate_cov, combine_cov, generate_cov_from_abundances +from .generate_coverage import generate_cov, combine_cov, combine_sample_cov, generate_cov_from_abundances from .generate_kmer import generate_kmer_features_from_fasta from .fasta import fasta_iter @@ -978,29 +978,17 @@ def fasta_sample_iter(fn): sys.exit(1) # Generate cov features for every sample - data_cov, data_split_cov = combine_cov(os.path.join(args.output, 'samples'), args.bams, is_combined) - if is_combined: - data_split_cov = data_split_cov.reset_index() - columns_list = list(data_split_cov.columns) - columns_list[0] = 'contig_name' - data_split_cov.columns = columns_list - - data_cov = data_cov.reset_index() - columns_list = list(data_cov.columns) - columns_list[0] = 'contig_name' - data_cov.columns = columns_list - for sample in sample_list: output_path = os.path.join(args.output, 'samples', sample) os.makedirs(output_path, exist_ok=True) - part_data = split_data(data_cov, sample, args.separator, is_combined) - part_data.to_csv(os.path.join(output_path, 'data_cov.csv')) + sample_cov, sample_cov_split = combine_sample_cov( + sample, os.path.join(args.output, "samples"), args.bams, is_combined, args.separator) + + sample_cov.to_csv(os.path.join(output_path, 'data_cov.csv')) if is_combined: - part_data = split_data(data_split_cov, sample, args.separator, is_combined) - part_data.to_csv(os.path.join( - output_path, 'data_split_cov.csv')) + sample_cov_split.to_csv(os.path.join(output_path, 'data_split_cov.csv')) sample_contig_fasta = os.path.join( args.output, f'samples/{sample}.fa') From effd67115c6afd937c0217a06b48f9bf9b7b0068 Mon Sep 17 00:00:00 2001 From: Jie Zhu Date: Thu, 25 Apr 2024 18:39:55 +0800 Subject: [PATCH 2/8] abun scale --- SemiBin/generate_coverage.py | 32 ++++++++------------------------ SemiBin/main.py | 7 ++++--- 2 files changed, 12 insertions(+), 27 deletions(-) diff --git a/SemiBin/generate_coverage.py b/SemiBin/generate_coverage.py index 3d77058..5b6fd82 100644 --- a/SemiBin/generate_coverage.py +++ b/SemiBin/generate_coverage.py @@ -173,7 +173,7 @@ def combine_cov(cov_dir : str, bam_list, is_combined : bool): # bam_list : list[ return data_cov, data_split_cov -def combine_sample_cov(sample: str, cov_dir: str, bam_list, is_combined: bool, separator): +def combine_sample_cov(sample: str, cov_dir: str, bam_list, suffix: str, is_combined: bool, separator): """ generate cov/cov_split for specific sample in one file @@ -187,17 +187,15 @@ def combine_sample_cov(sample: str, cov_dir: str, bam_list, is_combined: bool, s Returns ------- - sample_cov : DataFrame - sample_cov_split : DataFrame (if is_combined) or None (otherwise) + sample_cov : DataFrame (if is_combined) or None (otherwise) """ import pandas as pd import numpy as np covs = [] - split_covs = [] for bam_index, bam_file in enumerate(bam_list): bam_fname = os.path.split(bam_file)[-1] - data_cov = pd.read_csv(f'{cov_dir}/{bam_fname}_{bam_index}_data_cov.csv', index_col=0) + data_cov = pd.read_csv(f'{cov_dir}/{bam_fname}_{bam_index}_{suffix}', index_col=0) data_cov = data_cov.reset_index() columns_list = list(data_cov.columns) columns_list[0] = 'contig_name' @@ -209,29 +207,15 @@ def combine_sample_cov(sample: str, cov_dir: str, bam_list, is_combined: bool, s part_data.index = [ix.split(separator)[1] for ix in part_data.index] covs.append(part_data) - if is_combined: - data_split_cov = pd.read_csv(f'{cov_dir}/{bam_fname}_{bam_index}_data_split_cov.csv', index_col=0) - data_split_cov = data_split_cov.reset_index() - columns_list = list(data_split_cov.columns) - columns_list[0] = 'contig_name' - data_split_cov.columns = columns_list - - part_data = data_split_cov[data_split_cov['contig_name'].str.contains(sample + separator, regex=False)] - part_data = part_data.set_index("contig_name") - part_data.index.name = None - part_data.index = [ix.split(separator)[1] for ix in part_data.index] - split_covs.append(part_data) - sample_cov = pd.concat(covs, axis=1) sample_cov.index = sample_cov.index.astype(str) + if is_combined: - sample_cov_split = pd.concat(split_covs, axis=1) - sample_cov_split.index = sample_cov_split.index.astype(str) - abun_scale = (sample_cov_split.mean() / 100).apply(np.ceil) * 100 - sample_cov_split = sample_cov_split.div(abun_scale) + abun_scale = (sample_cov.mean() / 100).apply(np.ceil) * 100 + sample_cov = sample_cov.div(abun_scale) else: - sample_cov_split = None - return sample_cov, sample_cov_split + sample_cov = None + return sample_cov def generate_cov_from_abundances(abundances, output, contig_path, contig_threshold=1000, sep=None, contig_threshold_dict=None): diff --git a/SemiBin/main.py b/SemiBin/main.py index 3b8c9d7..b75941c 100644 --- a/SemiBin/main.py +++ b/SemiBin/main.py @@ -982,12 +982,13 @@ def fasta_sample_iter(fn): output_path = os.path.join(args.output, 'samples', sample) os.makedirs(output_path, exist_ok=True) - sample_cov, sample_cov_split = combine_sample_cov( - sample, os.path.join(args.output, "samples"), args.bams, is_combined, args.separator) - + sample_cov = combine_sample_cov( + sample, os.path.join(args.output, "samples"), args.bams, "data_cov.csv", is_combined, args.separator) sample_cov.to_csv(os.path.join(output_path, 'data_cov.csv')) if is_combined: + sample_cov_split = combine_sample_cov( + sample, os.path.join(args.output, "samples"), args.bams, "data_split_cov.csv", is_combined, args.separator) sample_cov_split.to_csv(os.path.join(output_path, 'data_split_cov.csv')) sample_contig_fasta = os.path.join( From d7442d71711b7f8d3dc3cbaaadc3b316e3e8d611 Mon Sep 17 00:00:00 2001 From: Jie Zhu Date: Thu, 25 Apr 2024 18:51:21 +0800 Subject: [PATCH 3/8] return dataframe --- SemiBin/generate_coverage.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/SemiBin/generate_coverage.py b/SemiBin/generate_coverage.py index 5b6fd82..82574dc 100644 --- a/SemiBin/generate_coverage.py +++ b/SemiBin/generate_coverage.py @@ -187,7 +187,7 @@ def combine_sample_cov(sample: str, cov_dir: str, bam_list, suffix: str, is_comb Returns ------- - sample_cov : DataFrame (if is_combined) or None (otherwise) + sample_cov : DataFrame """ import pandas as pd import numpy as np @@ -213,8 +213,6 @@ def combine_sample_cov(sample: str, cov_dir: str, bam_list, suffix: str, is_comb if is_combined: abun_scale = (sample_cov.mean() / 100).apply(np.ceil) * 100 sample_cov = sample_cov.div(abun_scale) - else: - sample_cov = None return sample_cov From ec3429d45c1c419dc92695f223743492f89ba5c6 Mon Sep 17 00:00:00 2001 From: Jie Zhu Date: Fri, 26 Apr 2024 11:12:27 +0800 Subject: [PATCH 4/8] parse csv using pyarrow backend --- SemiBin/generate_coverage.py | 2 +- SemiBin/main.py | 5 ++--- setup.py | 1 + 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/SemiBin/generate_coverage.py b/SemiBin/generate_coverage.py index 82574dc..c54e40b 100644 --- a/SemiBin/generate_coverage.py +++ b/SemiBin/generate_coverage.py @@ -195,7 +195,7 @@ def combine_sample_cov(sample: str, cov_dir: str, bam_list, suffix: str, is_comb covs = [] for bam_index, bam_file in enumerate(bam_list): bam_fname = os.path.split(bam_file)[-1] - data_cov = pd.read_csv(f'{cov_dir}/{bam_fname}_{bam_index}_{suffix}', index_col=0) + data_cov = pd.read_csv(f'{cov_dir}/{bam_fname}_{bam_index}_{suffix}', index_col=0, enigine="pyarrow", low_memory=False) data_cov = data_cov.reset_index() columns_list = list(data_cov.columns) columns_list[0] = 'contig_name' diff --git a/SemiBin/main.py b/SemiBin/main.py index b75941c..c2fd4d9 100644 --- a/SemiBin/main.py +++ b/SemiBin/main.py @@ -998,14 +998,13 @@ def fasta_sample_iter(fn): kmer_split = generate_kmer_features_from_fasta( sample_contig_fasta, 1000, 4, split=True, split_threshold=must_link_threshold) - sample_cov = pd.read_csv(os.path.join(output_path, 'data_cov.csv'), index_col=0) + sample_cov = pd.read_csv(os.path.join(output_path, 'data_cov.csv'), index_col=0, engine="pyarrow", low_memory=False) kmer_whole.index = kmer_whole.index.astype(str) sample_cov.index = sample_cov.index.astype(str) data = pd.merge(kmer_whole, sample_cov, how='inner', on=None, left_index=True, right_index=True, sort=False, copy=True) if is_combined: - sample_cov_split = pd.read_csv(os.path.join( - output_path, 'data_split_cov.csv'), index_col=0) + sample_cov_split = pd.read_csv(os.path.join(output_path, 'data_split_cov.csv'), index_col=0, engine="pyarrow", low_memory=False) data_split = pd.merge(kmer_split, sample_cov_split, how='inner', on=None, left_index=True, right_index=True, sort=False, copy=True) else: diff --git a/setup.py b/setup.py index a1a807e..d72dbaf 100644 --- a/setup.py +++ b/setup.py @@ -36,6 +36,7 @@ 'torch', 'python-igraph', 'pandas', + 'pyarrow', 'scikit-learn', 'requests', 'numexpr', From 832abc349ae2aeaec87591bfe9da6bbf03410bec Mon Sep 17 00:00:00 2001 From: Jie Zhu Date: Fri, 26 Apr 2024 11:22:48 +0800 Subject: [PATCH 5/8] fix typo and parameters --- SemiBin/generate_coverage.py | 2 +- SemiBin/main.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/SemiBin/generate_coverage.py b/SemiBin/generate_coverage.py index c54e40b..fa0e9da 100644 --- a/SemiBin/generate_coverage.py +++ b/SemiBin/generate_coverage.py @@ -195,7 +195,7 @@ def combine_sample_cov(sample: str, cov_dir: str, bam_list, suffix: str, is_comb covs = [] for bam_index, bam_file in enumerate(bam_list): bam_fname = os.path.split(bam_file)[-1] - data_cov = pd.read_csv(f'{cov_dir}/{bam_fname}_{bam_index}_{suffix}', index_col=0, enigine="pyarrow", low_memory=False) + data_cov = pd.read_csv(f'{cov_dir}/{bam_fname}_{bam_index}_{suffix}', index_col=0, engine="pyarrow") data_cov = data_cov.reset_index() columns_list = list(data_cov.columns) columns_list[0] = 'contig_name' diff --git a/SemiBin/main.py b/SemiBin/main.py index c2fd4d9..aa87bcc 100644 --- a/SemiBin/main.py +++ b/SemiBin/main.py @@ -998,13 +998,13 @@ def fasta_sample_iter(fn): kmer_split = generate_kmer_features_from_fasta( sample_contig_fasta, 1000, 4, split=True, split_threshold=must_link_threshold) - sample_cov = pd.read_csv(os.path.join(output_path, 'data_cov.csv'), index_col=0, engine="pyarrow", low_memory=False) + sample_cov = pd.read_csv(os.path.join(output_path, 'data_cov.csv'), index_col=0, engine="pyarrow") kmer_whole.index = kmer_whole.index.astype(str) sample_cov.index = sample_cov.index.astype(str) data = pd.merge(kmer_whole, sample_cov, how='inner', on=None, left_index=True, right_index=True, sort=False, copy=True) if is_combined: - sample_cov_split = pd.read_csv(os.path.join(output_path, 'data_split_cov.csv'), index_col=0, engine="pyarrow", low_memory=False) + sample_cov_split = pd.read_csv(os.path.join(output_path, 'data_split_cov.csv'), index_col=0, engine="pyarrow") data_split = pd.merge(kmer_split, sample_cov_split, how='inner', on=None, left_index=True, right_index=True, sort=False, copy=True) else: From 6fa4ad7174547b8fc4ae986c6e4d0428a5d130fd Mon Sep 17 00:00:00 2001 From: Jie Zhu Date: Fri, 26 Apr 2024 18:15:41 +0800 Subject: [PATCH 6/8] faster parsing csv file using polars --- SemiBin/generate_coverage.py | 47 +++++++++++++++++++++--------------- SemiBin/main.py | 14 ++++++----- setup.py | 1 + 3 files changed, 37 insertions(+), 25 deletions(-) diff --git a/SemiBin/generate_coverage.py b/SemiBin/generate_coverage.py index fa0e9da..de22d11 100644 --- a/SemiBin/generate_coverage.py +++ b/SemiBin/generate_coverage.py @@ -173,47 +173,56 @@ def combine_cov(cov_dir : str, bam_list, is_combined : bool): # bam_list : list[ return data_cov, data_split_cov -def combine_sample_cov(sample: str, cov_dir: str, bam_list, suffix: str, is_combined: bool, separator): +def combine_sample_cov(sample: str, cov_dir: str, bam_list, suffix: str, is_combined: bool, separator, logger, outcsv): """ generate cov/cov_split for specific sample in one file Parameters ---------- - sample_id : sample name + sample : sample name cov_dir : where coverage files are stored bam_list : list of BAM files + suffix: data_cov.csv or data_split_cov.csv is_combined : whether to process split files separator: separator + logger: logger + outcsv: writable path to csv file Returns ------- - sample_cov : DataFrame + True : Bool """ - import pandas as pd + import polars as pl import numpy as np covs = [] for bam_index, bam_file in enumerate(bam_list): bam_fname = os.path.split(bam_file)[-1] - data_cov = pd.read_csv(f'{cov_dir}/{bam_fname}_{bam_index}_{suffix}', index_col=0, engine="pyarrow") - data_cov = data_cov.reset_index() - columns_list = list(data_cov.columns) - columns_list[0] = 'contig_name' - data_cov.columns = columns_list + logger.info(f"\tprocessing #{bam_index}: {bam_fname}_{bam_index}_{suffix}") - part_data = data_cov[data_cov['contig_name'].str.contains(sample + separator, regex=False)] - part_data = part_data.set_index("contig_name") - part_data.index.name = None - part_data.index = [ix.split(separator)[1] for ix in part_data.index] - covs.append(part_data) + data_cov = pl.scan_csv(f'{cov_dir}/{bam_fname}_{bam_index}_{suffix}')\ + .rename({"": "contig_name"})\ + .filter(pl.col("contig_name").str.contains(sample + separator))\ + .collect() + covs.append(data_cov) - sample_cov = pd.concat(covs, axis=1) - sample_cov.index = sample_cov.index.astype(str) + sample_cov = pl.concat(covs, how="align") + contig_names = [i.split(":")[1] for i in sample_cov["contig_name"]] + sample_cov = sample_cov.drop("contig_name") + headers = ["contig_name"] + list(sample_cov.columns) if is_combined: - abun_scale = (sample_cov.mean() / 100).apply(np.ceil) * 100 - sample_cov = sample_cov.div(abun_scale) - return sample_cov + abun_scale = (sample_cov.mean() / 100).map_rows(np.ceil) * 100 + divided_columns = [pl.col(col) / abun_scale[0, index] for index, col in enumerate(list(sample_cov.columns))] + + sample_cov = sample_cov.select(divided_columns) + + sample_cov = sample_cov.with_columns(pl.Series("contig_name", contig_names)) + sample_cov = sample_cov.select(headers) + + sample_cov.write_csv(outcsv) + + return True def generate_cov_from_abundances(abundances, output, contig_path, contig_threshold=1000, sep=None, contig_threshold_dict=None): diff --git a/SemiBin/main.py b/SemiBin/main.py index aa87bcc..b1ff51e 100644 --- a/SemiBin/main.py +++ b/SemiBin/main.py @@ -982,14 +982,16 @@ def fasta_sample_iter(fn): output_path = os.path.join(args.output, 'samples', sample) os.makedirs(output_path, exist_ok=True) - sample_cov = combine_sample_cov( - sample, os.path.join(args.output, "samples"), args.bams, "data_cov.csv", is_combined, args.separator) - sample_cov.to_csv(os.path.join(output_path, 'data_cov.csv')) + combine_sample_cov( + sample, os.path.join(args.output, "samples"), args.bams, "data_cov.csv", + is_combined, args.separator, logger, + os.path.join(output_path, "data_cov.csv")) if is_combined: - sample_cov_split = combine_sample_cov( - sample, os.path.join(args.output, "samples"), args.bams, "data_split_cov.csv", is_combined, args.separator) - sample_cov_split.to_csv(os.path.join(output_path, 'data_split_cov.csv')) + combine_sample_cov( + sample, os.path.join(args.output, "samples"), args.bams, "data_split_cov.csv", + is_combined, args.separator, logger, + os.path.join(output_path, 'data_split_cov.csv')) sample_contig_fasta = os.path.join( args.output, f'samples/{sample}.fa') diff --git a/setup.py b/setup.py index d72dbaf..16ab06a 100644 --- a/setup.py +++ b/setup.py @@ -37,6 +37,7 @@ 'python-igraph', 'pandas', 'pyarrow', + 'polars', 'scikit-learn', 'requests', 'numexpr', From 884776d3794d9bd2da82fef08cd42b2d2592e6fb Mon Sep 17 00:00:00 2001 From: Jie Zhu Date: Sat, 27 Apr 2024 22:34:47 +0800 Subject: [PATCH 7/8] batch concat, avoid signal SIGSEGV --- SemiBin/generate_coverage.py | 26 ++++++++++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/SemiBin/generate_coverage.py b/SemiBin/generate_coverage.py index de22d11..4281270 100644 --- a/SemiBin/generate_coverage.py +++ b/SemiBin/generate_coverage.py @@ -196,9 +196,11 @@ def combine_sample_cov(sample: str, cov_dir: str, bam_list, suffix: str, is_comb import numpy as np covs = [] + count = 0 + batch = 0 for bam_index, bam_file in enumerate(bam_list): bam_fname = os.path.split(bam_file)[-1] - logger.info(f"\tprocessing #{bam_index}: {bam_fname}_{bam_index}_{suffix}") + logger.info(f"\tProcessing #{bam_index}: {bam_fname}_{bam_index}_{suffix}") data_cov = pl.scan_csv(f'{cov_dir}/{bam_fname}_{bam_index}_{suffix}')\ .rename({"": "contig_name"})\ @@ -206,7 +208,25 @@ def combine_sample_cov(sample: str, cov_dir: str, bam_list, suffix: str, is_comb .collect() covs.append(data_cov) - sample_cov = pl.concat(covs, how="align") + count += 1 + if count > 99: + batch += 1 + logger.info(f"\tConcating #batch {batch} dataframe") + data_cov_merged = pl.concat(covs, how="align") + logger.info(f"\tConcating #batch {batch} dataframe done") + covs = [] + covs.append(data_cov_merged) + count = 0 + + if len(covs) > 1: + batch += 1 + logger.info(f"\tConcating #batch {batch} dataframe") + sample_cov = pl.concat(covs, how="align") # signal SIGSEGV (Address boundary error) + logger.info(f"\tConcating #batch {batch} dataframe done") + else: + sample_cov = covs[0] + #covs = [] + contig_names = [i.split(":")[1] for i in sample_cov["contig_name"]] sample_cov = sample_cov.drop("contig_name") headers = ["contig_name"] + list(sample_cov.columns) @@ -220,7 +240,9 @@ def combine_sample_cov(sample: str, cov_dir: str, bam_list, suffix: str, is_comb sample_cov = sample_cov.with_columns(pl.Series("contig_name", contig_names)) sample_cov = sample_cov.select(headers) + logger.info(f"\tSaving {outcsv}") sample_cov.write_csv(outcsv) + logger.info(f"\tSaving {outcsv} done") return True From e600aeaacd4d120f1cc2b34e13bdfea3bb595bb5 Mon Sep 17 00:00:00 2001 From: Jie Zhu Date: Sat, 27 Apr 2024 23:36:43 +0800 Subject: [PATCH 8/8] generate contig coverage using multi threads --- SemiBin/generate_coverage.py | 86 +++++++++++++++++++++++++++++++----- SemiBin/main.py | 67 ++++++++++++---------------- 2 files changed, 104 insertions(+), 49 deletions(-) diff --git a/SemiBin/generate_coverage.py b/SemiBin/generate_coverage.py index 4281270..082afbb 100644 --- a/SemiBin/generate_coverage.py +++ b/SemiBin/generate_coverage.py @@ -1,5 +1,6 @@ import os import subprocess +from .generate_kmer import generate_kmer_features_from_fasta from .atomicwrite import atomic_write def calculate_coverage(depth_stream, bam_file, must_link_threshold, edge=75, is_combined=False, @@ -172,10 +173,9 @@ def combine_cov(cov_dir : str, bam_list, is_combined : bool): # bam_list : list[ data_split_cov = None return data_cov, data_split_cov - def combine_sample_cov(sample: str, cov_dir: str, bam_list, suffix: str, is_combined: bool, separator, logger, outcsv): """ - generate cov/cov_split for specific sample in one file + combine cov/cov_split for specific sample in one file Parameters ---------- @@ -190,17 +190,18 @@ def combine_sample_cov(sample: str, cov_dir: str, bam_list, suffix: str, is_comb Returns ------- - True : Bool + sample : sample name """ import polars as pl import numpy as np + #logger.debug("\tCombining coverage information for sample {sample}") covs = [] count = 0 batch = 0 for bam_index, bam_file in enumerate(bam_list): bam_fname = os.path.split(bam_file)[-1] - logger.info(f"\tProcessing #{bam_index}: {bam_fname}_{bam_index}_{suffix}") + #logger.debug(f"\tProcessing #{bam_index}: {bam_fname}_{bam_index}_{suffix}") data_cov = pl.scan_csv(f'{cov_dir}/{bam_fname}_{bam_index}_{suffix}')\ .rename({"": "contig_name"})\ @@ -211,18 +212,18 @@ def combine_sample_cov(sample: str, cov_dir: str, bam_list, suffix: str, is_comb count += 1 if count > 99: batch += 1 - logger.info(f"\tConcating #batch {batch} dataframe") + #logger.debug(f"\tConcating #batch {batch} dataframe") data_cov_merged = pl.concat(covs, how="align") - logger.info(f"\tConcating #batch {batch} dataframe done") + #logger.debug(f"\tConcating #batch {batch} dataframe done") covs = [] covs.append(data_cov_merged) count = 0 if len(covs) > 1: batch += 1 - logger.info(f"\tConcating #batch {batch} dataframe") + #logger.debug(f"\tConcating #batch {batch} dataframe") sample_cov = pl.concat(covs, how="align") # signal SIGSEGV (Address boundary error) - logger.info(f"\tConcating #batch {batch} dataframe done") + #logger.debug(f"\tConcating #batch {batch} dataframe done") else: sample_cov = covs[0] #covs = [] @@ -240,12 +241,75 @@ def combine_sample_cov(sample: str, cov_dir: str, bam_list, suffix: str, is_comb sample_cov = sample_cov.with_columns(pl.Series("contig_name", contig_names)) sample_cov = sample_cov.select(headers) - logger.info(f"\tSaving {outcsv}") + #logger.debug(f"\tSaving {outcsv}") sample_cov.write_csv(outcsv) - logger.info(f"\tSaving {outcsv} done") + #logger.debug(f"\tSaving {outcsv} done") + + #logger.debug("\tCombining coverage information for sample {sample} done") + return sample + +def generate_sample_cov(sample, bams, output, is_combined, separator, logger, binning_threshold, must_link_threshold): + """ + generate cov/cov_split for specific sample in one file + + Parameters + ---------- + sample : sample name + bams: bams list + output : output dirname + is_combined : whether to process split files + separator: separator + logger: logger + binning_threshold + must_link_threshold + + Returns + ------- + sample : sample name + """ + import pandas as pd + + logger.debug("\tGenerating coverage information for sample {sample}") + + output_path = os.path.join(output, 'samples', sample) + os.makedirs(output_path, exist_ok=True) + + combine_sample_cov( + sample, os.path.join(output, "samples"), bams, "data_cov.csv", + is_combined, separator, logger, + os.path.join(output_path, "data_cov.csv")) + + if is_combined: + combine_sample_cov( + sample, os.path.join(output, "samples"), bams, "data_split_cov.csv", + is_combined, separator, logger, + os.path.join(output_path, 'data_split_cov.csv')) + + sample_contig_fasta = os.path.join(output, f'samples/{sample}.fa') + kmer_whole = generate_kmer_features_from_fasta( + sample_contig_fasta, binning_threshold[sample], 4) + kmer_split = generate_kmer_features_from_fasta( + sample_contig_fasta, 1000, 4, split=True, split_threshold=must_link_threshold) + + sample_cov = pd.read_csv(os.path.join(output_path, 'data_cov.csv'), index_col=0, engine="pyarrow") + kmer_whole.index = kmer_whole.index.astype(str) + sample_cov.index = sample_cov.index.astype(str) + data = pd.merge(kmer_whole, sample_cov, how='inner', on=None, + left_index=True, right_index=True, sort=False, copy=True) + if is_combined: + sample_cov_split = pd.read_csv(os.path.join(output_path, 'data_split_cov.csv'), index_col=0, engine="pyarrow") + data_split = pd.merge(kmer_split, sample_cov_split, how='inner', on=None, + left_index=True, right_index=True, sort=False, copy=True) + else: + data_split = kmer_split + + with atomic_write(os.path.join(output_path, 'data.csv'), overwrite=True) as ofile: + data.to_csv(ofile) - return True + with atomic_write(os.path.join(output_path, 'data_split.csv'), overwrite=True) as ofile: + data_split.to_csv(ofile) + return sample def generate_cov_from_abundances(abundances, output, contig_path, contig_threshold=1000, sep=None, contig_threshold_dict=None): import pandas as pd diff --git a/SemiBin/main.py b/SemiBin/main.py index b1ff51e..fe70413 100644 --- a/SemiBin/main.py +++ b/SemiBin/main.py @@ -11,7 +11,7 @@ from . import utils from .utils import validate_normalize_args, get_must_link_threshold, generate_cannot_link, \ set_random_seed, process_fasta, split_data, get_model_path, extract_bams -from .generate_coverage import generate_cov, combine_cov, combine_sample_cov, generate_cov_from_abundances +from .generate_coverage import generate_cov, combine_cov, combine_sample_cov, generate_sample_cov, generate_cov_from_abundances from .generate_kmer import generate_kmer_features_from_fasta from .fasta import fasta_iter @@ -978,45 +978,36 @@ def fasta_sample_iter(fn): sys.exit(1) # Generate cov features for every sample - for sample in sample_list: - output_path = os.path.join(args.output, 'samples', sample) - os.makedirs(output_path, exist_ok=True) - - combine_sample_cov( - sample, os.path.join(args.output, "samples"), args.bams, "data_cov.csv", - is_combined, args.separator, logger, - os.path.join(output_path, "data_cov.csv")) - - if is_combined: - combine_sample_cov( - sample, os.path.join(args.output, "samples"), args.bams, "data_split_cov.csv", - is_combined, args.separator, logger, - os.path.join(output_path, 'data_split_cov.csv')) - - sample_contig_fasta = os.path.join( - args.output, f'samples/{sample}.fa') - kmer_whole = generate_kmer_features_from_fasta( - sample_contig_fasta, binning_threshold[sample], 4) - kmer_split = generate_kmer_features_from_fasta( - sample_contig_fasta, 1000, 4, split=True, split_threshold=must_link_threshold) + logger.info('Generating coverage for every sample.') + with Pool(min(args.num_process, len(sample_list))) as pool: + results = [ + pool.apply_async( + generate_sample_cov, + args=( + sample, + args.bams, + args.output, + is_combined, + args.separator, + logger, + binning_threshold, + must_link_threshold + )) + for sample in sample_list] + for r in results: + s = r.get() + logger.info(f'Generated coverage: {s}') - sample_cov = pd.read_csv(os.path.join(output_path, 'data_cov.csv'), index_col=0, engine="pyarrow") - kmer_whole.index = kmer_whole.index.astype(str) - sample_cov.index = sample_cov.index.astype(str) - data = pd.merge(kmer_whole, sample_cov, how='inner', on=None, - left_index=True, right_index=True, sort=False, copy=True) + for sample in sample_list: + if not os.path.exists(os.path.join(args.output, 'samples', sample, 'data_cov.csv')): + sys.stderr.write( + f"Error: Generating coverage file fail\n") + sys.exit(1) if is_combined: - sample_cov_split = pd.read_csv(os.path.join(output_path, 'data_split_cov.csv'), index_col=0, engine="pyarrow") - data_split = pd.merge(kmer_split, sample_cov_split, how='inner', on=None, - left_index=True, right_index=True, sort=False, copy=True) - else: - data_split = kmer_split - - with atomic_write(os.path.join(output_path, 'data.csv'), overwrite=True) as ofile: - data.to_csv(ofile) - - with atomic_write(os.path.join(output_path, 'data_split.csv'), overwrite=True) as ofile: - data_split.to_csv(ofile) + if not os.path.join(os.path.join(args.output, 'samples', sample, 'data_split_cov.csv')): + sys.stderr.write( + f"Error: Generating coverage file fail\n") + sys.exit(1) if args.abundances: logger.info('Reading abundance information from abundance files.')