From 7fc40a305d161c1de01aeb6da399542e5d645030 Mon Sep 17 00:00:00 2001 From: Cunliang Geng Date: Wed, 29 May 2024 10:52:21 +0200 Subject: [PATCH] update use of config in data loader and arranger --- src/nplinker/arranger.py | 167 +++++++++++++----------- src/nplinker/config.py | 2 +- src/nplinker/defaults.py | 18 +-- src/nplinker/loader.py | 66 +++++++--- src/nplinker/nplinker.py | 35 +++-- src/nplinker/scoring/metcalf_scoring.py | 4 +- 6 files changed, 168 insertions(+), 124 deletions(-) diff --git a/src/nplinker/arranger.py b/src/nplinker/arranger.py index 97a57b28..50f2e73a 100644 --- a/src/nplinker/arranger.py +++ b/src/nplinker/arranger.py @@ -3,12 +3,9 @@ import shutil from glob import glob from pathlib import Path +from dynaconf import Dynaconf from jsonschema import validate import nplinker.defaults as defaults -from nplinker.config import config -from nplinker.defaults import GENOME_BGC_MAPPINGS_FILENAME -from nplinker.defaults import GENOME_STATUS_FILENAME -from nplinker.defaults import STRAIN_MAPPINGS_FILENAME from nplinker.genomics.antismash import podp_download_and_extract_antismash_data from nplinker.genomics.bigscape.runbigscape import run_bigscape from nplinker.genomics.mibig import download_and_extract_mibig_metadata @@ -33,17 +30,41 @@ class DatasetArranger: This class is used to arrange the datasets required by NPLinker according to the configuration. The datasets include MIBiG, GNPS, antiSMASH, and BiG-SCAPE. - If `config.mode` is "local", the datasets are validated. - If `config.mode` is "podp", the datasets are downloaded or generated. - - It uses the default downloads directory `defaults.DOWNLOADS_DEFAULT_PATH` to store the - downloaded files. Default data paths for MIBiG, GNPS, antiSMASH, and BiG-SCAPE are defined - in `nplinker.defaults`. + If `self.config.mode` is "local", the datasets are validated. + If `self.config.mode` is "podp", the datasets are downloaded or generated. + + Attributes: + config: A Dynaconf object that contains the configuration settings. Check `nplinker.config` + module for more details. + root_dir: The root directory of the datasets. + downloads_dir: The directory to store downloaded files. + mibig_dir: The directory to store MIBiG metadata. + gnps_dir: The directory to store GNPS data. + antismash_dir: The directory to store antiSMASH data. + bigscape_dir: The directory to store BiG-SCAPE data. + bigscape_running_output_dir: The directory to store the running output of BiG-SCAPE. """ - def __init__(self) -> None: - # Prepare the downloads directory and/or PODP json file which are required for other methods - defaults.DOWNLOADS_DEFAULT_PATH.mkdir(exist_ok=True) + def __init__(self, config: Dynaconf) -> None: + """Initialize the DatasetArranger. + + Args: + config: A Dynaconf object that contains the configuration settings. Check `nplinker.config` + module for more details. + """ + self.config = config + self.root_dir = config.root_dir + self.downloads_dir = self.root_dir / defaults.DOWNLOADS_DIRNAME + self.downloads_dir.mkdir(exist_ok=True) + + self.mibig_dir = self.root_dir / defaults.MIBIG_DIRNAME + self.gnps_dir = self.root_dir / defaults.GNPS_DIRNAME + self.antismash_dir = self.root_dir / defaults.ANTISMASH_DIRNAME + self.bigscape_dir = self.root_dir / defaults.BIGSCAPE_DIRNAME + self.bigscape_running_output_dir = ( + self.bigscape_dir / defaults.BIGSCAPE_RUNNING_OUTPUT_DIRNAME + ) + self.arrange_podp_project_json() def arrange(self) -> None: @@ -62,18 +83,18 @@ def arrange(self) -> None: def arrange_podp_project_json(self) -> None: """Arrange the PODP project JSON file. - If `config.mode` is "podp", download the PODP project JSON file if it doesn't exist. Then + If `self.config.mode` is "podp", download the PODP project JSON file if it doesn't exist. Then validate the PODP project JSON file if it exists or is downloaded. The validation is controlled by the json schema `schemas/podp_adapted_schema.json`. """ - if config.mode == "podp": - file_name = f"paired_datarecord_{config.podp_id}.json" - podp_file = defaults.DOWNLOADS_DEFAULT_PATH / file_name + if self.config.mode == "podp": + file_name = f"paired_datarecord_{self.config.podp_id}.json" + podp_file = self.downloads_dir / file_name if not podp_file.exists(): download_url( - PODP_PROJECT_URL.format(config.podp_id), - defaults.DOWNLOADS_DEFAULT_PATH, + PODP_PROJECT_URL.format(self.config.podp_id), + self.downloads_dir, file_name, ) @@ -84,26 +105,26 @@ def arrange_podp_project_json(self) -> None: def arrange_mibig(self) -> None: """Arrange the MIBiG metadata. - Always download and extract the MIBiG metadata if `config.mibig.to_use` is True. + Always download and extract the MIBiG metadata if `self.config.mibig.to_use` is True. If the default directory has already existed, it will be removed and re-downloaded to ensure the latest version is used. So it's not allowed to manually put MIBiG metadata in the default directory. """ - if config.mibig.to_use: - if defaults.MIBIG_DEFAULT_PATH.exists(): + if self.config.mibig.to_use: + if self.mibig_dir.exists(): # remove existing mibig data - shutil.rmtree(defaults.MIBIG_DEFAULT_PATH) + shutil.rmtree(self.mibig_dir) download_and_extract_mibig_metadata( - defaults.DOWNLOADS_DEFAULT_PATH, - defaults.MIBIG_DEFAULT_PATH, - version=config.mibig.version, + self.downloads_dir, + self.mibig_dir, + version=self.config.mibig.version, ) def arrange_gnps(self) -> None: """Arrange the GNPS data. - If `config.mode` is "local", validate the GNPS data directory. - If `config.mode` is "podp", download the GNPS data if it doesn't exist or remove the + If `self.config.mode` is "local", validate the GNPS data directory. + If `self.config.mode` is "podp", download the GNPS data if it doesn't exist or remove the existing GNPS data and re-download it if it is invalid. The validation process includes: @@ -116,20 +137,20 @@ def arrange_gnps(self) -> None: - annotations.tsv """ pass_validation = False - if config.mode == "podp": + if self.config.mode == "podp": # retry downloading at most 3 times if downloaded data has problems for _ in range(3): try: - validate_gnps(defaults.GNPS_DEFAULT_PATH) + validate_gnps(self.gnps_dir) pass_validation = True break except (FileNotFoundError, ValueError): # Don't need to remove downloaded archive, as it'll be overwritten - shutil.rmtree(defaults.GNPS_DEFAULT_PATH, ignore_errors=True) + shutil.rmtree(self.gnps_dir, ignore_errors=True) self._download_and_extract_gnps() if not pass_validation: - validate_gnps(defaults.GNPS_DEFAULT_PATH) + validate_gnps(self.gnps_dir) # get the path to file_mappings file (csv or tsv) self.gnps_file_mappings_file = self._get_gnps_file_mappings_file() @@ -143,8 +164,8 @@ def _get_gnps_file_mappings_file(self) -> Path: Returns: Path to the GNPS file mappings file. """ - file_mappings_tsv = defaults.GNPS_DEFAULT_PATH / defaults.GNPS_FILE_MAPPINGS_TSV - file_mappings_csv = defaults.GNPS_DEFAULT_PATH / defaults.GNPS_FILE_MAPPINGS_CSV + file_mappings_tsv = self.gnps_dir / defaults.GNPS_FILE_MAPPINGS_TSV + file_mappings_csv = self.gnps_dir / defaults.GNPS_FILE_MAPPINGS_CSV gnps_file_mappings_file = ( file_mappings_tsv if file_mappings_tsv.exists() else file_mappings_csv @@ -158,23 +179,21 @@ def _download_and_extract_gnps(self) -> None: Get the GNPS task ID from the PODP project JSON file, then download and extract the GNPS data to the default GNPS directory. """ - podp_file = defaults.DOWNLOADS_DEFAULT_PATH / f"paired_datarecord_{config.podp_id}.json" + podp_file = self.downloads_dir / f"paired_datarecord_{self.config.podp_id}.json" with open(podp_file, "r") as f: podp_json_data = json.load(f) gnps_task_id = podp_json_data["metabolomics"]["project"].get("molecular_network") data_archive = ( - GNPSDownloader(gnps_task_id, defaults.DOWNLOADS_DEFAULT_PATH) - .download() - .get_download_file() + GNPSDownloader(gnps_task_id, self.downloads_dir).download().get_download_file() ) - GNPSExtractor(data_archive, defaults.GNPS_DEFAULT_PATH) + GNPSExtractor(data_archive, self.gnps_dir) def arrange_antismash(self) -> None: """Arrange the antiSMASH data. - If `config.mode` is "local", validate the antiSMASH data directory. - If `config.mode` is "podp", download the antiSMASH data if it doesn't exist or remove the + If `self.config.mode` is "local", validate the antiSMASH data directory. + If `self.config.mode` is "podp", download the antiSMASH data if it doesn't exist or remove the existing antiSMASH data and re-download it if it is invalid. The validation process includes: @@ -196,18 +215,18 @@ def arrange_antismash(self) -> None: ``` """ pass_validation = False - if config.mode == "podp": + if self.config.mode == "podp": for _ in range(3): try: - validate_antismash(defaults.ANTISMASH_DEFAULT_PATH) + validate_antismash(self.antismash_dir) pass_validation = True break except FileNotFoundError: - shutil.rmtree(defaults.ANTISMASH_DEFAULT_PATH, ignore_errors=True) + shutil.rmtree(self.antismash_dir, ignore_errors=True) self._download_and_extract_antismash() if not pass_validation: - validate_antismash(defaults.ANTISMASH_DEFAULT_PATH) + validate_antismash(self.antismash_dir) def _download_and_extract_antismash(self) -> None: """Download and extract the antiSMASH data. @@ -215,80 +234,80 @@ def _download_and_extract_antismash(self) -> None: Get the antiSMASH data from the PODP project JSON file, then download and extract the antiSMASH data to the default antiSMASH directory. """ - podp_file = defaults.DOWNLOADS_DEFAULT_PATH / f"paired_datarecord_{config.podp_id}.json" + podp_file = self.downloads_dir / f"paired_datarecord_{self.config.podp_id}.json" with open(podp_file, "r") as f: podp_json_data = json.load(f) podp_download_and_extract_antismash_data( - podp_json_data["genomes"], defaults.DOWNLOADS_DEFAULT_PATH, config.root_dir + podp_json_data["genomes"], self.downloads_dir, self.root_dir ) def arrange_bigscape(self) -> None: """Arrange the BiG-SCAPE data. - If `config.mode` is "local", validate the BiG-SCAPE data directory. - If `config.mode` is "podp", run BiG-SCAPE to generate the clustering file if it doesn't + If `self.config.mode` is "local", validate the BiG-SCAPE data directory. + If `self.config.mode` is "podp", run BiG-SCAPE to generate the clustering file if it doesn't exist or remove the existing BiG-SCAPE data and re-run BiG-SCAPE if it is invalid. The running output of BiG-SCAPE will be saved to the directory "bigscape_running_output" in the default BiG-SCAPE directory, and the clustering file - "mix_clustering_c{config.bigscape.cutoff}.tsv" will be copied to the default BiG-SCAPE + "mix_clustering_c{self.config.bigscape.cutoff}.tsv" will be copied to the default BiG-SCAPE directory. The validation process includes: - Check if the default BiG-SCAPE data directory exists. - - Check if the clustering file "mix_clustering_c{config.bigscape.cutoff}.tsv" exists in the + - Check if the clustering file "mix_clustering_c{self.config.bigscape.cutoff}.tsv" exists in the BiG-SCAPE data directory. - Check if the 'data_sqlite.db' file exists in the BiG-SCAPE data directory. """ pass_validation = False - if config.mode == "podp": + if self.config.mode == "podp": for _ in range(3): try: - validate_bigscape(defaults.BIGSCAPE_DEFAULT_PATH) + validate_bigscape(self.bigscape_dir) pass_validation = True break except FileNotFoundError: - shutil.rmtree(defaults.BIGSCAPE_DEFAULT_PATH, ignore_errors=True) + shutil.rmtree(self.bigscape_dir, ignore_errors=True) self._run_bigscape() if not pass_validation: - validate_bigscape(defaults.BIGSCAPE_DEFAULT_PATH) + validate_bigscape(self.bigscape_dir) def _run_bigscape(self) -> None: """Run BiG-SCAPE to generate the clustering file. The running output of BiG-SCAPE will be saved to the `BIGSCAPE_RUNNING_OUTPUT_PATH`. - The clustering file "mix_clustering_c{config.bigscape.cutoff}.tsv" will be copied to the + The clustering file "mix_clustering_c{self.config.bigscape.cutoff}.tsv" will be copied to the default BiG-SCAPE directory. """ - defaults.BIGSCAPE_RUNNING_OUTPUT_PATH.mkdir(exist_ok=True, parents=True) + self.bigscape_running_output_dir.mkdir(exist_ok=True, parents=True) run_bigscape( - defaults.ANTISMASH_DEFAULT_PATH, - defaults.BIGSCAPE_RUNNING_OUTPUT_PATH, - config.bigscape.parameters, + self.antismash_dir, + self.bigscape_running_output_dir, + self.config.bigscape.parameters, ) for f in glob( str( - defaults.BIGSCAPE_RUNNING_OUTPUT_PATH + self.bigscape_running_output_dir / "network_files" / "*" / "mix" / "mix_clustering_c*.tsv" ) ): - shutil.copy(f, defaults.BIGSCAPE_DEFAULT_PATH) + shutil.copy(f, self.bigscape_dir) def arrange_strain_mappings(self) -> None: """Arrange the strain mappings file. - If `config.mode` is "local", validate the strain mappings file. - If `config.mode` is "podp", always generate the strain mappings file and validate it. + If `self.config.mode` is "local", validate the strain mappings file. + If `self.config.mode` is "podp", always generate the strain mappings file and validate it. The valiation checks if the strain mappings file exists and if it is a valid JSON file according to the schema defined in `schemas/strain_mappings_schema.json`. """ - if config.mode == "podp": + if self.config.mode == "podp": self._generate_strain_mappings() self._validate_strain_mappings() @@ -307,7 +326,7 @@ def _validate_strain_mappings(self) -> None: ValidationError: If the strain mappings file is not a valid JSON file according to the schema. """ - strain_mappings_file = config.root_dir / STRAIN_MAPPINGS_FILENAME + strain_mappings_file = self.root_dir / defaults.STRAIN_MAPPINGS_FILENAME if not strain_mappings_file.exists(): raise FileNotFoundError(f"Strain mappings file not found at {strain_mappings_file}") @@ -319,14 +338,14 @@ def _validate_strain_mappings(self) -> None: def _generate_strain_mappings(self) -> None: """Generate the strain mappings file for the PODP mode.""" - podp_json_file = defaults.DOWNLOADS_DEFAULT_PATH / f"paired_datarecord_{config.podp_id}.json" - genome_status_json_file = defaults.DOWNLOADS_DEFAULT_PATH / GENOME_STATUS_FILENAME - genome_bgc_mappings_file = defaults.ANTISMASH_DEFAULT_PATH / GENOME_BGC_MAPPINGS_FILENAME + podp_json_file = self.downloads_dir / f"paired_datarecord_{self.config.podp_id}.json" + genome_status_json_file = self.downloads_dir / defaults.GENOME_STATUS_FILENAME + genome_bgc_mappings_file = self.antismash_dir / defaults.GENOME_BGC_MAPPINGS_FILENAME gnps_file_mapping_file = self.gnps_file_mappings_file - strain_mappings_file = config.root_dir / STRAIN_MAPPINGS_FILENAME + strain_mappings_file = self.root_dir / defaults.STRAIN_MAPPINGS_FILENAME # generate the genome_bgc_mappings_file - generate_mappings_genome_id_bgc_id(defaults.ANTISMASH_DEFAULT_PATH) + generate_mappings_genome_id_bgc_id(self.antismash_dir) # generate the strain_mappings_file podp_generate_strain_mappings( podp_json_file, @@ -343,7 +362,7 @@ def arrange_strains_selected(self) -> None: The validation checks if the strains selected file is a valid JSON file according to the schema defined in `schemas/user_strains.json`. """ - strains_selected_file = config.root_dir / defaults.STRAINS_SELECTED_FILENAME + strains_selected_file = self.root_dir / defaults.STRAINS_SELECTED_FILENAME if strains_selected_file.exists(): with open(strains_selected_file, "r") as f: json_data = json.load(f) @@ -445,7 +464,7 @@ def validate_bigscape(bigscape_dir: Path) -> None: """Validate the BiG-SCAPE data directory and its contents. The BiG-SCAPE data directory must exist and contain the clustering file - "mix_clustering_c{config.bigscape.cutoff}.tsv" where {config.bigscape.cutoff} is the + "mix_clustering_c{self.config.bigscape.cutoff}.tsv" where {self.config.bigscape.cutoff} is the bigscape cutoff value set in the config file. Alternatively, the directory can contain the BiG-SCAPE database file generated by BiG-SCAPE v2. @@ -461,7 +480,7 @@ def validate_bigscape(bigscape_dir: Path) -> None: if not bigscape_dir.exists(): raise FileNotFoundError(f"BiG-SCAPE data directory not found at {bigscape_dir}") - clustering_file = bigscape_dir / f"mix_clustering_c{config.bigscape.cutoff}.tsv" + clustering_file = bigscape_dir / f"mix_clustering_c{self.config.bigscape.cutoff}.tsv" database_file = bigscape_dir / "data_sqlite.db" if not clustering_file.exists() and not database_file.exists(): raise FileNotFoundError(f"BiG-SCAPE data not found in {clustering_file} or {database_file}") diff --git a/src/nplinker/config.py b/src/nplinker/config.py index 6908c6cc..5e734ff5 100644 --- a/src/nplinker/config.py +++ b/src/nplinker/config.py @@ -6,7 +6,7 @@ from nplinker.utils import transform_to_full_path -def load_config(config_file: str | PathLike): +def load_config(config_file: str | PathLike) -> Dynaconf: """Load and validate the configuration file. Args: diff --git a/src/nplinker/defaults.py b/src/nplinker/defaults.py index e9243454..a6d53050 100644 --- a/src/nplinker/defaults.py +++ b/src/nplinker/defaults.py @@ -1,7 +1,3 @@ -from pathlib import Path -from nplinker.config import config - - STRAIN_MAPPINGS_FILENAME = "strain_mappings.json" GENOME_BGC_MAPPINGS_FILENAME = "genome_bgc_mappings.json" GENOME_STATUS_FILENAME = "genome_status.json" @@ -13,10 +9,10 @@ STRAINS_SELECTED_FILENAME = "strains_selected.json" -DOWNLOADS_DEFAULT_PATH: Path = config.root_dir / "downloads" -MIBIG_DEFAULT_PATH: Path = config.root_dir / "mibig" -GNPS_DEFAULT_PATH: Path = config.root_dir / "gnps" -ANTISMASH_DEFAULT_PATH: Path = config.root_dir / "antismash" -BIGSCAPE_DEFAULT_PATH: Path = config.root_dir / "bigscape" -BIGSCAPE_RUNNING_OUTPUT_PATH: Path = BIGSCAPE_DEFAULT_PATH / "bigscape_running_output" -OUTPUT_DEFAULT_PATH: Path = config.root_dir / "output" +DOWNLOADS_DIRNAME = "downloads" +MIBIG_DIRNAME = "mibig" +GNPS_DIRNAME = "gnps" +ANTISMASH_DIRNAME = "antismash" +BIGSCAPE_DIRNAME = "bigscape" +BIGSCAPE_RUNNING_OUTPUT_DIRNAME = "bigscape_running_output" +OUTPUT_DIRNAME = "output" diff --git a/src/nplinker/loader.py b/src/nplinker/loader.py index 41b84d89..cb567346 100644 --- a/src/nplinker/loader.py +++ b/src/nplinker/loader.py @@ -2,14 +2,8 @@ import os from importlib.resources import files from deprecated import deprecated +from dynaconf import Dynaconf from nplinker import defaults -from nplinker.config import config -from nplinker.defaults import GNPS_ANNOTATIONS_FILENAME -from nplinker.defaults import GNPS_DEFAULT_PATH -from nplinker.defaults import GNPS_MOLECULAR_FAMILY_FILENAME -from nplinker.defaults import GNPS_SPECTRA_FILENAME -from nplinker.defaults import STRAIN_MAPPINGS_FILENAME -from nplinker.defaults import STRAINS_SELECTED_FILENAME from nplinker.genomics.antismash import AntismashBGCLoader from nplinker.genomics.bigscape import BigscapeGCFLoader from nplinker.genomics.bigscape import BigscapeV2GCFLoader @@ -33,7 +27,22 @@ class DatasetLoader: - """Class to load all data.""" + """Class to load all data. + + Attributes: + config: A Dynaconf object that contains the configuration settings. Check the + `nplinker.config` module for more information. + bgcs: A list of BGC objects. + gcfs: A list of GCF objects. + spectra: A list of Spectrum objects. + molfams: A list of MolecularFamily objects. + mibig_bgcs: A list of MIBiG BGC objects. + mibig_strains_in_use: A StrainCollection object that contains the strains in use from MIBiG. + product_types: A list of product types. + strains: A StrainCollection object that contains all strains. + class_matches: A ClassMatches object that contains class match info. + chem_classes: A ChemClassPredictions object that contains chemical class predictions + """ RUN_CANOPUS_DEFAULT = False EXTRA_CANOPUS_PARAMS_DEFAULT = "--maxmz 600 formula zodiac structure canopus" @@ -42,8 +51,15 @@ class DatasetLoader: OR_CANOPUS = "canopus_dir" OR_MOLNETENHANCER = "molnetenhancer_dir" - def __init__(self): - # set public attributes + def __init__(self, config: Dynaconf): + """Initialize the DatasetLoader. + + Args: + config: A Dynaconf object that contains the configuration settings. Check the + `nplinker.config` module for more information. + """ + self.config = config + self.bgcs, self.gcfs, self.spectra, self.molfams = [], [], [], [] self.mibig_bgcs = [] self.mibig_strains_in_use = StrainCollection() @@ -74,14 +90,14 @@ def load(self): def _load_strain_mappings(self): # 1. load strain mappings - sc = StrainCollection.read_json(config.root_dir / STRAIN_MAPPINGS_FILENAME) + sc = StrainCollection.read_json(self.config.root_dir / defaults.STRAIN_MAPPINGS_FILENAME) for strain in sc: self.strains.add(strain) logger.info("Loaded {} non-MiBIG Strain objects".format(len(self.strains))) # 2. filter user specificied strains (remove all that are not specified by user). # It's not allowed to specify empty list of strains, otherwise validation will fail. - user_strains_file = config.root_dir / STRAINS_SELECTED_FILENAME + user_strains_file = self.config.root_dir / defaults.STRAINS_SELECTED_FILENAME if user_strains_file.exists(): logger.info(f"Loading user specified strains from file {user_strains_file}.") user_strains = load_user_strains(user_strains_file) @@ -104,15 +120,17 @@ def _load_metabolomics(self): """ logger.info(f"{'='*40}\nLoading metabolomics data starts...") + gnps_dir = self.config.root_dir / defaults.GNPS_DIRNAME + # Step 1: load all Spectrum objects - raw_spectra = GNPSSpectrumLoader(GNPS_DEFAULT_PATH / GNPS_SPECTRA_FILENAME).spectra + raw_spectra = GNPSSpectrumLoader(gnps_dir / defaults.GNPS_SPECTRA_FILENAME).spectra # Step 2: load all GNPS annotations raw_annotations = GNPSAnnotationLoader( - GNPS_DEFAULT_PATH / GNPS_ANNOTATIONS_FILENAME + gnps_dir / defaults.GNPS_ANNOTATIONS_FILENAME ).annotations # Step 3: load all MolecularFamily objects raw_molfams = GNPSMolecularFamilyLoader( - GNPS_DEFAULT_PATH / GNPS_MOLECULAR_FAMILY_FILENAME + gnps_dir / defaults.GNPS_MOLECULAR_FAMILY_FILENAME ).get_mfs(keep_singleton=False) # Step 4: add GNPS annotations to Spectrum.gnps_annotations @@ -145,21 +163,27 @@ def _load_genomics(self): # Step 1: load antismash BGC objects & add strain info logger.info("Parsing AntiSMASH directory...") - antismash_bgcs = AntismashBGCLoader(str(defaults.ANTISMASH_DEFAULT_PATH)).get_bgcs() + antismash_bgcs = AntismashBGCLoader( + str(self.config.root_dir / defaults.ANTISMASH_DIRNAME) + ).get_bgcs() antismash_bgcs_with_strain, _ = add_strain_to_bgc(self.strains, antismash_bgcs) # Step 2: load mibig BGC objects (having strain info) - if config.mibig.to_use: - self.mibig_bgcs = MibigLoader(str(defaults.MIBIG_DEFAULT_PATH)).get_bgcs() + if self.config.mibig.to_use: + self.mibig_bgcs = MibigLoader( + str(self.config.root_dir / defaults.MIBIG_DIRNAME) + ).get_bgcs() # Step 3: get all BGC objects with strain info all_bgcs_with_strain = antismash_bgcs_with_strain + self.mibig_bgcs # Step 4: load all GCF objects bigscape_cluster_file = ( - defaults.BIGSCAPE_DEFAULT_PATH / f"mix_clustering_c{config.bigscape.cutoff}.tsv" + self.config.root_dir + / defaults.BIGSCAPE_DIRNAME + / f"mix_clustering_c{self.config.bigscape.cutoff}.tsv" ) - bigscape_db_file = defaults.BIGSCAPE_DEFAULT_PATH / "data_sqlite.db" + bigscape_db_file = self.config.root_dir / defaults.BIGSCAPE_DIRNAME / "data_sqlite.db" # switch depending on found file. prefer V1 if both are found if bigscape_cluster_file.exists(): @@ -180,7 +204,7 @@ def _load_genomics(self): # Step 6: get mibig bgcs and strains in use from GCFs mibig_strains_in_use = StrainCollection() - if config.mibig.to_use: + if self.config.mibig.to_use: mibig_bgcs_in_use, mibig_strains_in_use = get_mibig_from_gcf(all_gcfs_with_bgc) else: mibig_bgcs_in_use = [] diff --git a/src/nplinker/nplinker.py b/src/nplinker/nplinker.py index 64e3b34b..d8f24a83 100644 --- a/src/nplinker/nplinker.py +++ b/src/nplinker/nplinker.py @@ -7,6 +7,7 @@ from . import setup_logging from .arranger import DatasetArranger from .config import load_config +from .defaults import OUTPUT_DIRNAME from .genomics import BGC from .genomics import GCF from .loader import NPLINKER_APP_DATA_DIR @@ -42,7 +43,11 @@ class NPLinker: } def __init__(self, config_file: str | PathLike): - """Initialise an NPLinker instance.""" + """Initialise an NPLinker instance. + + Args: + config_file: Path to the configuration file to use. + """ self.config = load_config(config_file) setup_logging( @@ -54,7 +59,8 @@ def __init__(self, config_file: str | PathLike): "Configuration:\n %s", pformat(self.config.as_dict(), width=20, sort_dicts=False) ) - self._loader = DatasetLoader() + self.output_dir = self.config.root_dir / OUTPUT_DIRNAME + self.output_dir.mkdir(exist_ok=True) self._spectra = [] self._bgcs = [] @@ -143,19 +149,20 @@ def bigscape_cutoff(self): def load_data(self): """Loads the basic components of a dataset.""" - arranger = DatasetArranger() + arranger = DatasetArranger(self.config) arranger.arrange() - self._loader.load() - - self._spectra = self._loader.spectra - self._molfams = self._loader.molfams - self._bgcs = self._loader.bgcs - self._gcfs = self._loader.gcfs - self._mibig_bgcs = self._loader.mibig_bgcs - self._strains = self._loader.strains - self._product_types = self._loader.product_types - self._chem_classes = self._loader.chem_classes - self._class_matches = self._loader.class_matches + loader = DatasetLoader(self.config) + loader.load() + + self._spectra = loader.spectra + self._molfams = loader.molfams + self._bgcs = loader.bgcs + self._gcfs = loader.gcfs + self._mibig_bgcs = loader.mibig_bgcs + self._strains = loader.strains + self._product_types = loader.product_types + self._chem_classes = loader.chem_classes + self._class_matches = loader.class_matches # TODO CG: refactor this method and update its unit tests def get_links( diff --git a/src/nplinker/scoring/metcalf_scoring.py b/src/nplinker/scoring/metcalf_scoring.py index a2ac3f19..b21304c7 100644 --- a/src/nplinker/scoring/metcalf_scoring.py +++ b/src/nplinker/scoring/metcalf_scoring.py @@ -4,7 +4,6 @@ from typing import TYPE_CHECKING import numpy as np import pandas as pd -from nplinker.defaults import OUTPUT_DEFAULT_PATH from nplinker.genomics import GCF from nplinker.metabolomics import MolecularFamily from nplinker.metabolomics import Spectrum @@ -72,8 +71,7 @@ def setup(npl: NPLinker): ) ) - OUTPUT_DEFAULT_PATH.mkdir(exist_ok=True) - cache_file = OUTPUT_DEFAULT_PATH / MetcalfScoring.CACHE + cache_file = npl.output_dir / MetcalfScoring.CACHE # the metcalf preprocessing can take a long time for large datasets, so it's # better to cache as the data won't change unless the number of objects does