diff --git a/configs/template_config.ini b/configs/template_config.ini index 6a056de..e12c496 100644 --- a/configs/template_config.ini +++ b/configs/template_config.ini @@ -10,6 +10,7 @@ host_seq_file = /absolute/path/to/host_data/${name:host}/data.fna.gz host_cls_file = /absolute/path/to/host_data/${name:host}/class.csv metagenome_seq_file = /absolute/path/to/data/to/analyse/${name:metagenome}/data.fna.gz outdir = /absolute/path/to/directory/to/output/analysis +workdir = /absolute/path/to/directory/to/spill/memory/if/needed [settings] k = 20 @@ -23,4 +24,4 @@ classification_threshold = 0.8 [outputs] mpa-style = True kronagram = True -abundance_report = True +report = True diff --git a/src/Caribou_classification.py b/src/Caribou_classification.py index 074c5fd..e8e2fa5 100644 --- a/src/Caribou_classification.py +++ b/src/Caribou_classification.py @@ -1,7 +1,7 @@ #!/usr/bin python3 import ray -import os.path +import json import argparse from utils import * @@ -19,6 +19,7 @@ def bacteria_classification(opt): # Verify existence of files and load data data_bacteria = verify_load_data(opt['data_bacteria']) data_metagenome = verify_load_data(opt['data_metagenome']) + preclassified = verify_preclassified(data_metagenome) k_length = len(data_bacteria['kmers'][0]) # Verify that model type is valid / choose default depending on host presence @@ -35,7 +36,12 @@ def bacteria_classification(opt): list_taxas = verify_taxas(opt['taxa'], data_bacteria['taxas']) # Initialize cluster - ray.init() + ray.init( + _system_config = { + 'object_spilling_config': json.dumps( + {'type': 'filesystem', 'params': {'directory_path': str(opt['workdir'])}}) + } + ) # Definition of model for bacteria taxonomic classification + training ################################################################################ @@ -55,44 +61,25 @@ def bacteria_classification(opt): clf.execute_training() t_end = time() t_train = t_end - t_start + # Execution of bacteria taxonomic classification on metagenome + save results ################################################################################ - def populate_save_data(clf, end_taxa): - clf_data = {'sequence' : clf.classified_data['sequence'].copy()} - if end_taxa is not None: - clf_data['sequence'] = clf_data['sequence'][:clf_data['sequence'].index(end_taxa)] - - if 'domain' in clf_data['sequence'] and len(data_metagenome['classified_ids']) > 0: - clf_data['domain'] = { - 'profile' : data_metagenome['profile'], - 'kmers' : data_metagenome['kmers'], - 'ids' : data_metagenome['ids'], - 'classification' : data_metagenome['classification'], - 'classified_ids' : data_metagenome['classified_ids'], - 'unknown_profile' : data_metagenome['unknown_profile'], - 'unknown_ids' : data_metagenome['unknown_ids'] - } - if 'host' in clf_data.keys(): - clf_data['domain']['host_classification'] = data_metagenome['host_classification'] - clf_data['domain']['host_ids'] = data_metagenome['host_ids'] - - for taxa in clf_data['sequence']: - clf_data[taxa] = { - 'profile' : clf.classified_data[taxa]['unknown'], - 'kmers' : data_metagenome['kmers'], - 'ids' : clf.classified_data[taxa]['unknown_ids'], - 'classification' : clf.classified_data[taxa]['classification'], - 'classified_ids' : clf.classified_data[taxa]['classified_ids'], - } - - clf_file = os.path.join(outdirs['results_dir'], opt['metagenome_name'] + '_classified.npz') - save_Xy_data(clf_data, clf_file) - + t_start = time() - end_taxa = clf.execute_classification(data_metagenome) + if preclassified is not None: + end_taxa = clf.execute_classification(data_metagenome[preclassified]) + else: + end_taxa = clf.execute_classification(data_metagenome) t_end = time() t_classif = t_end - t_start - populate_save_data(clf, end_taxa) + clf_data = populate_save_data( + clf.classified_data, + data_metagenome, + end_taxa, + outdirs['results_dir'], + opt['metagenome_name'], + preclassified = preclassified, + ) if end_taxa is None: print(f"Caribou finished training the {opt['model_type']} model and classifying bacterial sequences at {opt['taxa']} taxonomic level with it. \ \nThe training step took {t_train} seconds to execute and the classification step took {t_classif} seconds to execute.") @@ -114,7 +101,7 @@ def populate_save_data(clf, end_taxa): parser.add_argument('-e','--training_epochs', default=100, type=int, help='The number of training iterations for the neural networks models if one ise chosen, defaults to 100') parser.add_argument('-v','--verbose', action='store_true', help='Should the program be verbose') parser.add_argument('-o','--outdir', required=True, type=Path, help='PATH to a directory on file where outputs will be saved') - parser.add_argument('-wd','--workdir', default=None, type=Path, help='Optional. Path to a working directory where Ray Tune will output and spill tuning data') + parser.add_argument('-wd','--workdir', default='/tmp/spill', type=Path, help='Optional. Path to a working directory where Ray Tune will output and spill tuning data') args = parser.parse_args() opt = vars(args) diff --git a/src/Caribou_classification_train_cv.py b/src/Caribou_classification_train_cv.py index 5bc9db9..1aa0244 100644 --- a/src/Caribou_classification_train_cv.py +++ b/src/Caribou_classification_train_cv.py @@ -1,6 +1,7 @@ #!/usr/bin python3 import ray +import json import argparse from utils import * @@ -32,7 +33,12 @@ def bacteria_classification_train_cv(opt): lst_taxas.remove('domain') # Initialize cluster - ray.init() + ray.init( + _system_config = { + 'object_spilling_config': json.dumps( + {'type': 'filesystem', 'params': {'directory_path': str(opt['workdir'])}}) + } + ) # Training and cross-validation of models for classification of bacterias ################################################################################ @@ -64,7 +70,7 @@ def bacteria_classification_train_cv(opt): parser.add_argument('-e','--training_epochs', default=100, type=int, help='The number of training iterations for the neural networks models if one ise chosen, defaults to 100') parser.add_argument('-v','--verbose', action='store_true', help='Should the program be verbose') parser.add_argument('-o','--outdir', required=True, type=Path, help='PATH to a directory on file where outputs will be saved') - parser.add_argument('-wd','--workdir', default=None, type=Path, help='Optional. Path to a working directory where Ray Tune will output and spill tuning data') + parser.add_argument('-wd','--workdir', default='/tmp/spill', type=Path, help='Optional. Path to a working directory where Ray Tune will output and spill tuning data') args = parser.parse_args() opt = vars(args) diff --git a/src/Caribou_extraction.py b/src/Caribou_extraction.py index d55ecfe..9e2e137 100644 --- a/src/Caribou_extraction.py +++ b/src/Caribou_extraction.py @@ -1,7 +1,7 @@ #!/usr/bin python3 import ray -import os.path +import json import argparse from utils import * @@ -38,7 +38,12 @@ def bacteria_extraction(opt): outdirs = define_create_outdirs(opt['outdir']) # Initialize cluster - ray.init() + ray.init( + _system_config = { + 'object_spilling_config': json.dumps( + {'type': 'filesystem', 'params': {'directory_path': str(opt['workdir'])}}) + } + ) # Definition of model for bacteria extraction / host removal + execution ################################################################################ @@ -75,28 +80,20 @@ def bacteria_extraction(opt): # Execution of bacteria extraction / host removal on metagenome + save results ################################################################################ - def populate_save_data(clf): - clf_data = { - 'sequence': clf.classified_data['sequence'].copy(), - 'profile' : clf.classified_data['domain']['bacteria'], - 'kmers' : data_metagenome['kmers'], - 'ids' : clf.classified_data['domain']['bacteria_ids'], - 'unknown_profile' : clf.classified_data['domain']['unknown'], - 'unknown_ids' : clf.classified_data['domain']['unknown_ids'], - } - if 'host' in clf.classified_data.keys(): - clf_data['host_profile'] = clf.classified_data['host']['classification'] - clf_data['host_ids'] = clf.classified_data['host']['classified_ids'] - clf_file = os.path.join(outdirs['results_dir'], opt['metagenome_name'] + '_extracted.npz') - save_Xy_data(clf_data, clf_file) - + t_start = time() end_taxa = clf.execute_classification(data_metagenome) t_end = time() t_classify = t_end - t_start if end_taxa is None: - populate_save_data(clf) + clf_data = populate_save_data( + clf.classified_data, + data_bacteria, + end_taxa, + outdirs['results_dir'], + opt['metagenome_name'], + ) print(f"Caribou finished training the {opt['model_type']} model and extracting bacteria with it. \ \nThe training step took {t_train} seconds and the classification step took {t_classify} seconds.") else: @@ -118,7 +115,7 @@ def populate_save_data(clf): parser.add_argument('-e','--training_epochs', default=100, type=int, help='The number of training iterations for the neural networks models if one ise chosen, defaults to 100') parser.add_argument('-v','--verbose', action='store_true', help='Should the program be verbose') parser.add_argument('-o','--outdir', required=True, type=Path, help='PATH to a directory on file where outputs will be saved') - parser.add_argument('-wd','--workdir', default=None, type=Path, help='Optional. Path to a working directory where Ray Tune will output and spill tuning data') + parser.add_argument('-wd','--workdir', default='/tmp/spill', type=Path, help='Optional. Path to a working directory where Ray Tune will output and spill tuning data') args = parser.parse_args() opt = vars(args) diff --git a/src/Caribou_extraction_train_cv.py b/src/Caribou_extraction_train_cv.py index 36aeb14..aa3e5ea 100644 --- a/src/Caribou_extraction_train_cv.py +++ b/src/Caribou_extraction_train_cv.py @@ -1,6 +1,7 @@ #!/usr/bin python3 import ray +import json import argparse from utils import * @@ -35,7 +36,12 @@ def bacteria_extraction_train_cv(opt): outdirs = define_create_outdirs(opt['outdir']) # Initialize cluster - ray.init() + ray.init( + _system_config = { + 'object_spilling_config': json.dumps( + {'type': 'filesystem', 'params': {'directory_path': str(opt['workdir'])}}) + } + ) # Training and cross-validation of models for bacteria extraction / host removal ################################################################################ @@ -84,7 +90,7 @@ def bacteria_extraction_train_cv(opt): parser.add_argument('-e','--training_epochs', default=100, type=int, help='The number of training iterations for the neural networks models if one is chosen, defaults to 100') parser.add_argument('-v','--verbose', action='store_true', help='Should the program be verbose') parser.add_argument('-o','--outdir', required=True, type=Path, help='PATH to a directory on file where outputs will be saved') - parser.add_argument('-wd','--workdir', default=None, type=Path, help='Optional. Path to a working directory where Ray Tune will output and spill tuning data') + parser.add_argument('-wd','--workdir', default='/tmp/spill', type=Path, help='Optional. Path to a working directory where Ray Tune will output and spill tuning data') args = parser.parse_args() opt = vars(args) diff --git a/src/Caribou_kmers.py b/src/Caribou_kmers.py index 9c43a2e..dfed773 100644 --- a/src/Caribou_kmers.py +++ b/src/Caribou_kmers.py @@ -1,6 +1,7 @@ #!/usr/bin python3 import ray +import json import pathlib import os.path import argparse @@ -36,7 +37,12 @@ def kmers_dataset(opt): outdirs = define_create_outdirs(opt['outdir']) # Initialize cluster - ray.init() + ray.init( + _system_config = { + 'object_spilling_config': json.dumps( + {'type': 'filesystem', 'params': {'directory_path': str(opt['workdir'])}}) + } + ) # K-mers profile extraction ################################################################################ @@ -139,6 +145,8 @@ def kmers_dataset(opt): parser.add_argument('-k','--k_length', required=True, type=int, help='Length of k-mers to extract') parser.add_argument('-l','--kmers_list', default=None, type=pathlib.Path, help='PATH to a file containing a list of k-mers to be extracted if the dataset is not a training database') parser.add_argument('-o','--outdir', required=True, type=pathlib.Path, help='PATH to a directory on file where outputs will be saved') + parser.add_argument('-o','--outdir', required=True, type=pathlib.Path, help='PATH to a directory on file where outputs will be saved') + parser.add_argument('-wd','--workdir', default='/tmp/spill', type=Path, help='Optional. Path to a working directory where tuning data will be spilled') args = parser.parse_args() opt = vars(args) diff --git a/src/Caribou_outputs.py b/src/Caribou_outputs.py index 55fa727..eb65fb2 100644 --- a/src/Caribou_outputs.py +++ b/src/Caribou_outputs.py @@ -1,5 +1,7 @@ #!/usr/bin python3 +import ray +import json import argparse from utils import * @@ -52,9 +54,17 @@ def out_2_user(opt): parser.add_argument('-m','--mpa', action='store_true', help='Should the mpa-style output be generated?') parser.add_argument('-k','--kronagram', action='store_true', help='Should the interactive kronagram be generated?') parser.add_argument('-r','--report', action='store_true', help='Should the abundance report be generated?') + parser.add_argument('-wd','--workdir', default='/tmp/spill', type=Path, help='Optional. Path to a working directory where tuning data will be spilled') # parser.add_argument('-b', '--biom', action='store_true', help='Should the biom file be generated?') args = parser.parse_args() opt = vars(args) + ray.init( + _system_config = { + 'object_spilling_config': json.dumps( + {'type': 'filesystem', 'params': {'directory_path': str(opt['workdir'])}}) + } + ) + out_2_user(opt) \ No newline at end of file diff --git a/src/Caribou_pipeline.py b/src/Caribou_pipeline.py index 0ea0dc6..7d66b64 100644 --- a/src/Caribou_pipeline.py +++ b/src/Caribou_pipeline.py @@ -1,6 +1,7 @@ #!/usr/bin python3 import ray +import json import argparse import configparser @@ -37,6 +38,7 @@ def caribou(opt): host_cls_file = config.get('io', 'host_cls_file', fallback = None) metagenome_seq_file = config.get('io', 'metagenome_seq_file') outdir = config.get('io', 'outdir') + workdir = config.get('io', 'workdir', fallback = '/tmp/spill') # settings k_length = config.getint('settings', 'k', fallback = 35) @@ -51,7 +53,7 @@ def caribou(opt): # outputs mpa_style = config.getboolean('outputs', 'mpa-style', fallback = True) kronagram = config.getboolean('outputs', 'kronagram', fallback = True) - abundance_report = config.getboolean('outputs', 'abundance_report', fallback = True) + report = config.getboolean('outputs', 'report', fallback = True) # Part 0.5 - Validation of parameters and environment ################################################################################ @@ -81,7 +83,7 @@ def caribou(opt): # outputs verify_boolean(mpa_style, 'output in mpa-style table form') verify_boolean(kronagram, 'output in Kronagram form') - verify_boolean(abundance_report, 'output in abundance report form') + verify_boolean(report, 'output in abundance report form') # Check batch_size if multi_classifier in ['cnn','widecnn'] and training_batch_size < 20: @@ -91,14 +93,20 @@ def caribou(opt): outdirs = define_create_outdirs(outdir) # Initialize cluster - ray.init() + ray.init( + _system_config = { + 'object_spilling_config': json.dumps( + {'type': 'filesystem', 'params': {'directory_path': str(opt['workdir'])}}) + } + ) # Part 1 - K-mers profile extraction ################################################################################ t_start = time() if host is not None: # Reference Database and Host - k_profile_database, k_profile_host = build_load_save_data((database_seq_file, database_cls_file), + k_profile_database, k_profile_host = build_load_save_data( + (database_seq_file, database_cls_file), (host_seq_file, host_cls_file), outdirs['data_dir'], database, @@ -107,7 +115,8 @@ def caribou(opt): ) else: # Reference Database Only - k_profile_database = build_load_save_data((database_seq_file, database_cls_file), + k_profile_database = build_load_save_data( + (database_seq_file, database_cls_file), host, outdirs['data_dir'], database, @@ -116,12 +125,14 @@ def caribou(opt): ) # Metagenome to analyse - k_profile_metagenome = build_load_save_data(metagenome_seq_file, + k_profile_metagenome = build_load_save_data( + metagenome_seq_file, None, outdirs['data_dir'], metagenome, host, - kmers_list = k_profile_database['kmers'] + kmers_list = k_profile_database['kmers'], + k = k_length, ) t_end = time() t_kmers = t_end - t_start @@ -171,32 +182,41 @@ def caribou(opt): # Classify the data from the metagenome t_start = time() - recursive_classifier.execute_classification(k_profile_metagenome) + end_taxa = recursive_classifier.execute_classification(k_profile_metagenome) t_end = time() t_classif = t_end - t_start - # Get classification results dictionnary - classified_data = recursive_classifier.classified_data + # Build / Save classification results dictionnary + classified_data = populate_save_data( + recursive_classifier.classified_data, + k_profile_database, + end_taxa, + outdirs['results_dir'], + metagenome + ) + # Part 4 - Outputs for biological analysis of bacterial population ################################################################################ t_start = time() - outputs = Outputs(k_profile_database, - outdirs['results_dir'], - k_length, - multi_classifier, - metagenome, - host, - classified_data) + outputs = Outputs( + k_profile_database, + outdirs['results_dir'], + k_length, + multi_classifier, + metagenome, + host, + classified_data + ) # Output desired files according to parameters if mpa_style is True: outputs.mpa_style() if kronagram is True: outputs.kronagram() - if abundance_report is True: - outputs.abundance_report() + if report is True: + outputs.report() t_end = time() t_outputs = t_end - t_start diff --git a/src/outputs/out.py b/src/outputs/out.py index ffc998e..c7cf0d4 100644 --- a/src/outputs/out.py +++ b/src/outputs/out.py @@ -1,8 +1,6 @@ import os -import gzip import numpy as np -from Bio import SeqIO import pandas as pd from subprocess import run diff --git a/src/supplement/sklearn_tuning.py b/src/supplement/sklearn_tuning.py index 9da9e35..3466ffc 100644 --- a/src/supplement/sklearn_tuning.py +++ b/src/supplement/sklearn_tuning.py @@ -150,13 +150,19 @@ def sim_4_cv(df, kmers_ds, name, taxa, cols, k, scaler): parser.add_argument('-t','--taxa', required=True, help='The taxa for which the tuning should be done') parser.add_argument('-k','--kmers_length', required=True, help='Length of k-mers') parser.add_argument('-o','--outdir', required=True, type=Path, help='Path to folder for outputing tuning results') -parser.add_argument('-wd','--workdir', default='~/ray', type=Path, help='Optional. Path to a working directory where tuning data will be spilled') +parser.add_argument('-wd','--workdir', default='/tmp/spill', type=Path, help='Optional. Path to a working directory where tuning data will be spilled') args = parser.parse_args() opt = vars(args) -ray.init(logging_level=logging.ERROR, _system_config={'object_spilling_config': json.dumps({'type': 'filesystem', 'params': {'directory_path': str(opt['workdir'])}})}) +ray.init( + logging_level=logging.ERROR, + _system_config={ + 'object_spilling_config': json.dumps( + {'type': 'filesystem', 'params': {'directory_path': str(opt['workdir'])}}) + } +) # Data ################################################################################ diff --git a/src/utils.py b/src/utils.py index bd0bf03..8f0ed9c 100644 --- a/src/utils.py +++ b/src/utils.py @@ -26,7 +26,10 @@ 'verify_load_data', 'verify_concordance_klength', 'verify_taxas', - 'verify_load_classified' + 'verify_preclassified', + 'verify_load_classified', + 'populate_save_data_domain', + 'populate_save_data' ] # Data handling @@ -46,11 +49,11 @@ def save_Xy_data(df, Xy_file): def verify_file(file : Path): if file is not None and not os.path.exists(file): - raise ValueError('Cannot find file {} !'.format(file)) + raise ValueError(f'Cannot find file {file} !') def verify_data_path(dir : Path): if not os.path.exists(dir): - raise ValueError("Cannot find data folder {} ! Exiting".format(dir)) + raise ValueError(f"Cannot find data folder {dir} ! Exiting") def verify_saving_path(dir : Path): path, folder = os.path.split(dir) @@ -72,23 +75,23 @@ def verify_host_params(host : str, host_seq_file : Path, host_cls_file : Path): def verify_boolean(val : bool, parameter : str): if val not in [True,False,None]: raise ValueError( - 'Invalid value for {} ! Please use boolean values !\n'.format(parameter) + + f'Invalid value for {parameter} ! Please use boolean values !\n' + 'Please refer to the wiki for further details : https://github.com/bioinfoUQAM/Caribou/wiki') def verify_positive_int(val : int, parameter : str): if type(val) != int or val < 0: raise ValueError( - 'Invalid value for {} ! Please use a positive integer !\n'.format(parameter) + + f'Invalid value for {parameter} ! Please use a positive integer !\n' + 'Please refer to the wiki for further details : https://github.com/bioinfoUQAM/Caribou/wiki') def verify_0_1(val : float, parameter : str): if type(val) != float: raise ValueError( - 'Invalid value for {} ! Please use a float between 0 and 1 !\n'.format(parameter) + + f'Invalid value for {parameter} ! Please use a float between 0 and 1 !\n' + 'Please refer to the wiki for further details : https://github.com/bioinfoUQAM/Caribou/wiki') elif not 0 <= val <= 1: raise ValueError( - 'Invalid value for {} ! Please use a float between 0 and 1 !\n'.format(parameter) + + f'Invalid value for {parameter} ! Please use a float between 0 and 1 !\n' + 'Please refer to the wiki for further details : https://github.com/bioinfoUQAM/Caribou/wiki') def verify_binary_classifier(clf : str): @@ -110,7 +113,7 @@ def verify_seqfiles(seqfile : Path, seqfile_host : Path): def verify_concordance_klength(klen1 : int, klen2 : int): if klen1 != klen2: raise ValueError("K length between datasets is inconsistent ! Exiting\n" + - "K length of bacteria dataset is {} while K length from host is {}").format(klen1, klen2) + f"K length of bacteria dataset is {klen1} while K length from host is {klen2}") # Verif + handling ######################################################################################################### @@ -176,11 +179,75 @@ def verify_load_classified(classified_data: Path): return data - def verify_taxas(taxas : str, db_taxas : list): taxas = str.split(taxas, ',') for taxa in taxas: if taxa not in db_taxas: - raise ValueError("One of the chosen classification taxa {} is not present in the database!".format(taxas)) + raise ValueError(f"One of the chosen classification taxa {taxas} is not present in the database!") return taxas - \ No newline at end of file + + +def verify_preclassified(data: dict): + preclassified = None + if 'sequence' in data.keys(): + preclassified = data.keys() + preclassified.remove('sequence') + preclassified.remove('kmers') + if len(preclassified) > 1: + raise ValueError('More than one classified taxa present in data.\n' + + 'Please provide data containing only one or relaunch classification with an empty k-mers dataset !') + else: + preclassified = preclassified[0] + return preclassified + +# Saving +######################################################################################################### + +def populate_save_data_domain(clf_data : dict): + clf_dict = { + 'profile' : clf_data['domain']['bacteria'], + 'ids' : clf_data['domain']['bacteria_ids'], + 'unknown_profile' : clf_data['domain']['unknown'], + 'unknown_ids' : clf_data['domain']['unknown_ids'], + } + if 'host' in clf_data.keys(): + clf_data['host_profile'] = clf_data['host']['classification'] + clf_data['host_ids'] = clf_data['host']['classified_ids'] + + return clf_dict + +def populate_save_data( + clf_data : dict, + db_data : dict, + end_taxa : str, + outdir : Path, + metagenome : str, + preclassified : str = None, +): + clf_dict = { + 'sequence':clf_data['sequence'].copy(), + 'kmers': db_data['kmers'], + } + if end_taxa is not None: + clf_dict['sequence'] = clf_dict['sequence'][:clf_dict['sequence'].index(end_taxa)] + + for taxa in clf_dict['sequence']: + if taxa == 'domain': + clf_dict[taxa] = populate_save_data_domain( + clf_data + ) + else: + clf_dict[taxa] = { + 'profile': clf_data[taxa]['unknown'], + 'ids': clf_data[taxa]['unknown_ids'], + 'classification': clf_data[taxa]['classification'], + 'classified_ids': clf_data[taxa]['classified_ids'], + } + + if preclassified is not None: + clf_dict[preclassified] = db_data[preclassified] + clf_dict['sequence'].insert(0, preclassified) + + clf_file = os.path.join(outdir, f'{metagenome}_classified.npz') + save_Xy_data(clf_dict, clf_file) + return clf_dict