Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract then merge #162

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 139 additions & 0 deletions SemiBin/generate_coverage.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import subprocess
from .generate_kmer import generate_kmer_features_from_fasta
from .atomicwrite import atomic_write

def calculate_coverage(depth_stream, bam_file, must_link_threshold, edge=75, is_combined=False,
Expand Down Expand Up @@ -172,6 +173,144 @@ def combine_cov(cov_dir : str, bam_list, is_combined : bool): # bam_list : list[
data_split_cov = None
return data_cov, data_split_cov

def combine_sample_cov(sample: str, cov_dir: str, bam_list, suffix: str, is_combined: bool, separator, logger, outcsv):
"""
combine cov/cov_split for specific sample in one file

Parameters
----------
sample : sample name
cov_dir : where coverage files are stored
bam_list : list of BAM files
suffix: data_cov.csv or data_split_cov.csv
is_combined : whether to process split files
separator: separator
logger: logger
outcsv: writable path to csv file

Returns
-------
sample : sample name
"""
import polars as pl
import numpy as np
#logger.debug("\tCombining coverage information for sample {sample}")

covs = []
count = 0
batch = 0
for bam_index, bam_file in enumerate(bam_list):
bam_fname = os.path.split(bam_file)[-1]
#logger.debug(f"\tProcessing #{bam_index}: {bam_fname}_{bam_index}_{suffix}")

data_cov = pl.scan_csv(f'{cov_dir}/{bam_fname}_{bam_index}_{suffix}')\
.rename({"": "contig_name"})\
.filter(pl.col("contig_name").str.contains(sample + separator))\
.collect()
covs.append(data_cov)

count += 1
if count > 99:
batch += 1
#logger.debug(f"\tConcating #batch {batch} dataframe")
data_cov_merged = pl.concat(covs, how="align")
#logger.debug(f"\tConcating #batch {batch} dataframe done")
covs = []
covs.append(data_cov_merged)
count = 0

if len(covs) > 1:
batch += 1
#logger.debug(f"\tConcating #batch {batch} dataframe")
sample_cov = pl.concat(covs, how="align") # signal SIGSEGV (Address boundary error)
#logger.debug(f"\tConcating #batch {batch} dataframe done")
else:
sample_cov = covs[0]
#covs = []

contig_names = [i.split(":")[1] for i in sample_cov["contig_name"]]
sample_cov = sample_cov.drop("contig_name")
headers = ["contig_name"] + list(sample_cov.columns)

if is_combined:
abun_scale = (sample_cov.mean() / 100).map_rows(np.ceil) * 100
divided_columns = [pl.col(col) / abun_scale[0, index] for index, col in enumerate(list(sample_cov.columns))]

sample_cov = sample_cov.select(divided_columns)

sample_cov = sample_cov.with_columns(pl.Series("contig_name", contig_names))
sample_cov = sample_cov.select(headers)

#logger.debug(f"\tSaving {outcsv}")
sample_cov.write_csv(outcsv)
#logger.debug(f"\tSaving {outcsv} done")

#logger.debug("\tCombining coverage information for sample {sample} done")
return sample

def generate_sample_cov(sample, bams, output, is_combined, separator, logger, binning_threshold, must_link_threshold):
"""
generate cov/cov_split for specific sample in one file

Parameters
----------
sample : sample name
bams: bams list
output : output dirname
is_combined : whether to process split files
separator: separator
logger: logger
binning_threshold
must_link_threshold

Returns
-------
sample : sample name
"""
import pandas as pd

logger.debug("\tGenerating coverage information for sample {sample}")

output_path = os.path.join(output, 'samples', sample)
os.makedirs(output_path, exist_ok=True)

combine_sample_cov(
sample, os.path.join(output, "samples"), bams, "data_cov.csv",
is_combined, separator, logger,
os.path.join(output_path, "data_cov.csv"))

if is_combined:
combine_sample_cov(
sample, os.path.join(output, "samples"), bams, "data_split_cov.csv",
is_combined, separator, logger,
os.path.join(output_path, 'data_split_cov.csv'))

sample_contig_fasta = os.path.join(output, f'samples/{sample}.fa')
kmer_whole = generate_kmer_features_from_fasta(
sample_contig_fasta, binning_threshold[sample], 4)
kmer_split = generate_kmer_features_from_fasta(
sample_contig_fasta, 1000, 4, split=True, split_threshold=must_link_threshold)

sample_cov = pd.read_csv(os.path.join(output_path, 'data_cov.csv'), index_col=0, engine="pyarrow")
kmer_whole.index = kmer_whole.index.astype(str)
sample_cov.index = sample_cov.index.astype(str)
data = pd.merge(kmer_whole, sample_cov, how='inner', on=None,
left_index=True, right_index=True, sort=False, copy=True)
if is_combined:
sample_cov_split = pd.read_csv(os.path.join(output_path, 'data_split_cov.csv'), index_col=0, engine="pyarrow")
data_split = pd.merge(kmer_split, sample_cov_split, how='inner', on=None,
left_index=True, right_index=True, sort=False, copy=True)
else:
data_split = kmer_split

with atomic_write(os.path.join(output_path, 'data.csv'), overwrite=True) as ofile:
data.to_csv(ofile)

with atomic_write(os.path.join(output_path, 'data_split.csv'), overwrite=True) as ofile:
data_split.to_csv(ofile)

return sample

def generate_cov_from_abundances(abundances, output, contig_path, contig_threshold=1000, sep=None, contig_threshold_dict=None):
import pandas as pd
import numpy as np
Expand Down
75 changes: 28 additions & 47 deletions SemiBin/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from . import utils
from .utils import validate_normalize_args, get_must_link_threshold, generate_cannot_link, \
set_random_seed, process_fasta, split_data, get_model_path, extract_bams
from .generate_coverage import generate_cov, combine_cov, generate_cov_from_abundances
from .generate_coverage import generate_cov, combine_cov, combine_sample_cov, generate_sample_cov, generate_cov_from_abundances
from .generate_kmer import generate_kmer_features_from_fasta
from .fasta import fasta_iter

Expand Down Expand Up @@ -978,55 +978,36 @@ def fasta_sample_iter(fn):
sys.exit(1)

# Generate cov features for every sample
data_cov, data_split_cov = combine_cov(os.path.join(args.output, 'samples'), args.bams, is_combined)
if is_combined:
data_split_cov = data_split_cov.reset_index()
columns_list = list(data_split_cov.columns)
columns_list[0] = 'contig_name'
data_split_cov.columns = columns_list

data_cov = data_cov.reset_index()
columns_list = list(data_cov.columns)
columns_list[0] = 'contig_name'
data_cov.columns = columns_list
logger.info('Generating coverage for every sample.')
with Pool(min(args.num_process, len(sample_list))) as pool:
results = [
pool.apply_async(
generate_sample_cov,
args=(
sample,
args.bams,
args.output,
is_combined,
args.separator,
logger,
binning_threshold,
must_link_threshold
))
for sample in sample_list]
for r in results:
s = r.get()
logger.info(f'Generated coverage: {s}')

for sample in sample_list:
output_path = os.path.join(args.output, 'samples', sample)
os.makedirs(output_path, exist_ok=True)

part_data = split_data(data_cov, sample, args.separator, is_combined)
part_data.to_csv(os.path.join(output_path, 'data_cov.csv'))

if is_combined:
part_data = split_data(data_split_cov, sample, args.separator, is_combined)
part_data.to_csv(os.path.join(
output_path, 'data_split_cov.csv'))

sample_contig_fasta = os.path.join(
args.output, f'samples/{sample}.fa')
kmer_whole = generate_kmer_features_from_fasta(
sample_contig_fasta, binning_threshold[sample], 4)
kmer_split = generate_kmer_features_from_fasta(
sample_contig_fasta, 1000, 4, split=True, split_threshold=must_link_threshold)

sample_cov = pd.read_csv(os.path.join(output_path, 'data_cov.csv'), index_col=0)
kmer_whole.index = kmer_whole.index.astype(str)
sample_cov.index = sample_cov.index.astype(str)
data = pd.merge(kmer_whole, sample_cov, how='inner', on=None,
left_index=True, right_index=True, sort=False, copy=True)
if not os.path.exists(os.path.join(args.output, 'samples', sample, 'data_cov.csv')):
sys.stderr.write(
f"Error: Generating coverage file fail\n")
sys.exit(1)
if is_combined:
sample_cov_split = pd.read_csv(os.path.join(
output_path, 'data_split_cov.csv'), index_col=0)
data_split = pd.merge(kmer_split, sample_cov_split, how='inner', on=None,
left_index=True, right_index=True, sort=False, copy=True)
else:
data_split = kmer_split

with atomic_write(os.path.join(output_path, 'data.csv'), overwrite=True) as ofile:
data.to_csv(ofile)

with atomic_write(os.path.join(output_path, 'data_split.csv'), overwrite=True) as ofile:
data_split.to_csv(ofile)
if not os.path.join(os.path.join(args.output, 'samples', sample, 'data_split_cov.csv')):
sys.stderr.write(
f"Error: Generating coverage file fail\n")
sys.exit(1)

if args.abundances:
logger.info('Reading abundance information from abundance files.')
Expand Down
2 changes: 2 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
'torch',
'python-igraph',
'pandas',
'pyarrow',
'polars',
'scikit-learn',
'requests',
'numexpr',
Expand Down