Skip to content

Commit

Permalink
update use of config in data loader and arranger
Browse files Browse the repository at this point in the history
  • Loading branch information
CunliangGeng committed May 29, 2024
1 parent cbffd58 commit 7fc40a3
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 124 deletions.
167 changes: 93 additions & 74 deletions src/nplinker/arranger.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,9 @@
import shutil
from glob import glob
from pathlib import Path
from dynaconf import Dynaconf
from jsonschema import validate
import nplinker.defaults as defaults
from nplinker.config import config
from nplinker.defaults import GENOME_BGC_MAPPINGS_FILENAME
from nplinker.defaults import GENOME_STATUS_FILENAME
from nplinker.defaults import STRAIN_MAPPINGS_FILENAME
from nplinker.genomics.antismash import podp_download_and_extract_antismash_data
from nplinker.genomics.bigscape.runbigscape import run_bigscape
from nplinker.genomics.mibig import download_and_extract_mibig_metadata
Expand All @@ -33,17 +30,41 @@ class DatasetArranger:
This class is used to arrange the datasets required by NPLinker according to the
configuration. The datasets include MIBiG, GNPS, antiSMASH, and BiG-SCAPE.
If `config.mode` is "local", the datasets are validated.
If `config.mode` is "podp", the datasets are downloaded or generated.
It uses the default downloads directory `defaults.DOWNLOADS_DEFAULT_PATH` to store the
downloaded files. Default data paths for MIBiG, GNPS, antiSMASH, and BiG-SCAPE are defined
in `nplinker.defaults`.
If `self.config.mode` is "local", the datasets are validated.
If `self.config.mode` is "podp", the datasets are downloaded or generated.
Attributes:
config: A Dynaconf object that contains the configuration settings. Check `nplinker.config`
module for more details.
root_dir: The root directory of the datasets.
downloads_dir: The directory to store downloaded files.
mibig_dir: The directory to store MIBiG metadata.
gnps_dir: The directory to store GNPS data.
antismash_dir: The directory to store antiSMASH data.
bigscape_dir: The directory to store BiG-SCAPE data.
bigscape_running_output_dir: The directory to store the running output of BiG-SCAPE.
"""

def __init__(self) -> None:
# Prepare the downloads directory and/or PODP json file which are required for other methods
defaults.DOWNLOADS_DEFAULT_PATH.mkdir(exist_ok=True)
def __init__(self, config: Dynaconf) -> None:
"""Initialize the DatasetArranger.
Args:
config: A Dynaconf object that contains the configuration settings. Check `nplinker.config`
module for more details.
"""
self.config = config
self.root_dir = config.root_dir
self.downloads_dir = self.root_dir / defaults.DOWNLOADS_DIRNAME
self.downloads_dir.mkdir(exist_ok=True)

self.mibig_dir = self.root_dir / defaults.MIBIG_DIRNAME
self.gnps_dir = self.root_dir / defaults.GNPS_DIRNAME
self.antismash_dir = self.root_dir / defaults.ANTISMASH_DIRNAME
self.bigscape_dir = self.root_dir / defaults.BIGSCAPE_DIRNAME
self.bigscape_running_output_dir = (
self.bigscape_dir / defaults.BIGSCAPE_RUNNING_OUTPUT_DIRNAME
)

self.arrange_podp_project_json()

def arrange(self) -> None:
Expand All @@ -62,18 +83,18 @@ def arrange(self) -> None:
def arrange_podp_project_json(self) -> None:
"""Arrange the PODP project JSON file.
If `config.mode` is "podp", download the PODP project JSON file if it doesn't exist. Then
If `self.config.mode` is "podp", download the PODP project JSON file if it doesn't exist. Then
validate the PODP project JSON file if it exists or is downloaded.
The validation is controlled by the json schema `schemas/podp_adapted_schema.json`.
"""
if config.mode == "podp":
file_name = f"paired_datarecord_{config.podp_id}.json"
podp_file = defaults.DOWNLOADS_DEFAULT_PATH / file_name
if self.config.mode == "podp":
file_name = f"paired_datarecord_{self.config.podp_id}.json"
podp_file = self.downloads_dir / file_name
if not podp_file.exists():
download_url(
PODP_PROJECT_URL.format(config.podp_id),
defaults.DOWNLOADS_DEFAULT_PATH,
PODP_PROJECT_URL.format(self.config.podp_id),
self.downloads_dir,
file_name,
)

Expand All @@ -84,26 +105,26 @@ def arrange_podp_project_json(self) -> None:
def arrange_mibig(self) -> None:
"""Arrange the MIBiG metadata.
Always download and extract the MIBiG metadata if `config.mibig.to_use` is True.
Always download and extract the MIBiG metadata if `self.config.mibig.to_use` is True.
If the default directory has already existed, it will be removed and re-downloaded to ensure
the latest version is used. So it's not allowed to manually put MIBiG metadata in the
default directory.
"""
if config.mibig.to_use:
if defaults.MIBIG_DEFAULT_PATH.exists():
if self.config.mibig.to_use:
if self.mibig_dir.exists():
# remove existing mibig data
shutil.rmtree(defaults.MIBIG_DEFAULT_PATH)
shutil.rmtree(self.mibig_dir)
download_and_extract_mibig_metadata(
defaults.DOWNLOADS_DEFAULT_PATH,
defaults.MIBIG_DEFAULT_PATH,
version=config.mibig.version,
self.downloads_dir,
self.mibig_dir,
version=self.config.mibig.version,
)

def arrange_gnps(self) -> None:
"""Arrange the GNPS data.
If `config.mode` is "local", validate the GNPS data directory.
If `config.mode` is "podp", download the GNPS data if it doesn't exist or remove the
If `self.config.mode` is "local", validate the GNPS data directory.
If `self.config.mode` is "podp", download the GNPS data if it doesn't exist or remove the
existing GNPS data and re-download it if it is invalid.
The validation process includes:
Expand All @@ -116,20 +137,20 @@ def arrange_gnps(self) -> None:
- annotations.tsv
"""
pass_validation = False
if config.mode == "podp":
if self.config.mode == "podp":
# retry downloading at most 3 times if downloaded data has problems
for _ in range(3):
try:
validate_gnps(defaults.GNPS_DEFAULT_PATH)
validate_gnps(self.gnps_dir)
pass_validation = True
break
except (FileNotFoundError, ValueError):
# Don't need to remove downloaded archive, as it'll be overwritten
shutil.rmtree(defaults.GNPS_DEFAULT_PATH, ignore_errors=True)
shutil.rmtree(self.gnps_dir, ignore_errors=True)
self._download_and_extract_gnps()

if not pass_validation:
validate_gnps(defaults.GNPS_DEFAULT_PATH)
validate_gnps(self.gnps_dir)

# get the path to file_mappings file (csv or tsv)
self.gnps_file_mappings_file = self._get_gnps_file_mappings_file()
Expand All @@ -143,8 +164,8 @@ def _get_gnps_file_mappings_file(self) -> Path:
Returns:
Path to the GNPS file mappings file.
"""
file_mappings_tsv = defaults.GNPS_DEFAULT_PATH / defaults.GNPS_FILE_MAPPINGS_TSV
file_mappings_csv = defaults.GNPS_DEFAULT_PATH / defaults.GNPS_FILE_MAPPINGS_CSV
file_mappings_tsv = self.gnps_dir / defaults.GNPS_FILE_MAPPINGS_TSV
file_mappings_csv = self.gnps_dir / defaults.GNPS_FILE_MAPPINGS_CSV

gnps_file_mappings_file = (
file_mappings_tsv if file_mappings_tsv.exists() else file_mappings_csv
Expand All @@ -158,23 +179,21 @@ def _download_and_extract_gnps(self) -> None:
Get the GNPS task ID from the PODP project JSON file, then download and extract the GNPS
data to the default GNPS directory.
"""
podp_file = defaults.DOWNLOADS_DEFAULT_PATH / f"paired_datarecord_{config.podp_id}.json"
podp_file = self.downloads_dir / f"paired_datarecord_{self.config.podp_id}.json"
with open(podp_file, "r") as f:
podp_json_data = json.load(f)
gnps_task_id = podp_json_data["metabolomics"]["project"].get("molecular_network")

data_archive = (
GNPSDownloader(gnps_task_id, defaults.DOWNLOADS_DEFAULT_PATH)
.download()
.get_download_file()
GNPSDownloader(gnps_task_id, self.downloads_dir).download().get_download_file()
)
GNPSExtractor(data_archive, defaults.GNPS_DEFAULT_PATH)
GNPSExtractor(data_archive, self.gnps_dir)

def arrange_antismash(self) -> None:
"""Arrange the antiSMASH data.
If `config.mode` is "local", validate the antiSMASH data directory.
If `config.mode` is "podp", download the antiSMASH data if it doesn't exist or remove the
If `self.config.mode` is "local", validate the antiSMASH data directory.
If `self.config.mode` is "podp", download the antiSMASH data if it doesn't exist or remove the
existing antiSMASH data and re-download it if it is invalid.
The validation process includes:
Expand All @@ -196,99 +215,99 @@ def arrange_antismash(self) -> None:
```
"""
pass_validation = False
if config.mode == "podp":
if self.config.mode == "podp":
for _ in range(3):
try:
validate_antismash(defaults.ANTISMASH_DEFAULT_PATH)
validate_antismash(self.antismash_dir)
pass_validation = True
break
except FileNotFoundError:
shutil.rmtree(defaults.ANTISMASH_DEFAULT_PATH, ignore_errors=True)
shutil.rmtree(self.antismash_dir, ignore_errors=True)
self._download_and_extract_antismash()

if not pass_validation:
validate_antismash(defaults.ANTISMASH_DEFAULT_PATH)
validate_antismash(self.antismash_dir)

def _download_and_extract_antismash(self) -> None:
"""Download and extract the antiSMASH data.
Get the antiSMASH data from the PODP project JSON file, then download and extract the
antiSMASH data to the default antiSMASH directory.
"""
podp_file = defaults.DOWNLOADS_DEFAULT_PATH / f"paired_datarecord_{config.podp_id}.json"
podp_file = self.downloads_dir / f"paired_datarecord_{self.config.podp_id}.json"
with open(podp_file, "r") as f:
podp_json_data = json.load(f)
podp_download_and_extract_antismash_data(
podp_json_data["genomes"], defaults.DOWNLOADS_DEFAULT_PATH, config.root_dir
podp_json_data["genomes"], self.downloads_dir, self.root_dir
)

def arrange_bigscape(self) -> None:
"""Arrange the BiG-SCAPE data.
If `config.mode` is "local", validate the BiG-SCAPE data directory.
If `config.mode` is "podp", run BiG-SCAPE to generate the clustering file if it doesn't
If `self.config.mode` is "local", validate the BiG-SCAPE data directory.
If `self.config.mode` is "podp", run BiG-SCAPE to generate the clustering file if it doesn't
exist or remove the existing BiG-SCAPE data and re-run BiG-SCAPE if it is invalid.
The running output of BiG-SCAPE will be saved to the directory "bigscape_running_output"
in the default BiG-SCAPE directory, and the clustering file
"mix_clustering_c{config.bigscape.cutoff}.tsv" will be copied to the default BiG-SCAPE
"mix_clustering_c{self.config.bigscape.cutoff}.tsv" will be copied to the default BiG-SCAPE
directory.
The validation process includes:
- Check if the default BiG-SCAPE data directory exists.
- Check if the clustering file "mix_clustering_c{config.bigscape.cutoff}.tsv" exists in the
- Check if the clustering file "mix_clustering_c{self.config.bigscape.cutoff}.tsv" exists in the
BiG-SCAPE data directory.
- Check if the 'data_sqlite.db' file exists in the BiG-SCAPE data directory.
"""
pass_validation = False
if config.mode == "podp":
if self.config.mode == "podp":
for _ in range(3):
try:
validate_bigscape(defaults.BIGSCAPE_DEFAULT_PATH)
validate_bigscape(self.bigscape_dir)
pass_validation = True
break
except FileNotFoundError:
shutil.rmtree(defaults.BIGSCAPE_DEFAULT_PATH, ignore_errors=True)
shutil.rmtree(self.bigscape_dir, ignore_errors=True)
self._run_bigscape()

if not pass_validation:
validate_bigscape(defaults.BIGSCAPE_DEFAULT_PATH)
validate_bigscape(self.bigscape_dir)

def _run_bigscape(self) -> None:
"""Run BiG-SCAPE to generate the clustering file.
The running output of BiG-SCAPE will be saved to the `BIGSCAPE_RUNNING_OUTPUT_PATH`.
The clustering file "mix_clustering_c{config.bigscape.cutoff}.tsv" will be copied to the
The clustering file "mix_clustering_c{self.config.bigscape.cutoff}.tsv" will be copied to the
default BiG-SCAPE directory.
"""
defaults.BIGSCAPE_RUNNING_OUTPUT_PATH.mkdir(exist_ok=True, parents=True)
self.bigscape_running_output_dir.mkdir(exist_ok=True, parents=True)
run_bigscape(
defaults.ANTISMASH_DEFAULT_PATH,
defaults.BIGSCAPE_RUNNING_OUTPUT_PATH,
config.bigscape.parameters,
self.antismash_dir,
self.bigscape_running_output_dir,
self.config.bigscape.parameters,
)
for f in glob(
str(
defaults.BIGSCAPE_RUNNING_OUTPUT_PATH
self.bigscape_running_output_dir
/ "network_files"
/ "*"
/ "mix"
/ "mix_clustering_c*.tsv"
)
):
shutil.copy(f, defaults.BIGSCAPE_DEFAULT_PATH)
shutil.copy(f, self.bigscape_dir)

def arrange_strain_mappings(self) -> None:
"""Arrange the strain mappings file.
If `config.mode` is "local", validate the strain mappings file.
If `config.mode` is "podp", always generate the strain mappings file and validate it.
If `self.config.mode` is "local", validate the strain mappings file.
If `self.config.mode` is "podp", always generate the strain mappings file and validate it.
The valiation checks if the strain mappings file exists and if it is a valid JSON file
according to the schema defined in `schemas/strain_mappings_schema.json`.
"""
if config.mode == "podp":
if self.config.mode == "podp":
self._generate_strain_mappings()

self._validate_strain_mappings()
Expand All @@ -307,7 +326,7 @@ def _validate_strain_mappings(self) -> None:
ValidationError: If the strain mappings file is not a valid JSON file according to the
schema.
"""
strain_mappings_file = config.root_dir / STRAIN_MAPPINGS_FILENAME
strain_mappings_file = self.root_dir / defaults.STRAIN_MAPPINGS_FILENAME

if not strain_mappings_file.exists():
raise FileNotFoundError(f"Strain mappings file not found at {strain_mappings_file}")
Expand All @@ -319,14 +338,14 @@ def _validate_strain_mappings(self) -> None:

def _generate_strain_mappings(self) -> None:
"""Generate the strain mappings file for the PODP mode."""
podp_json_file = defaults.DOWNLOADS_DEFAULT_PATH / f"paired_datarecord_{config.podp_id}.json"
genome_status_json_file = defaults.DOWNLOADS_DEFAULT_PATH / GENOME_STATUS_FILENAME
genome_bgc_mappings_file = defaults.ANTISMASH_DEFAULT_PATH / GENOME_BGC_MAPPINGS_FILENAME
podp_json_file = self.downloads_dir / f"paired_datarecord_{self.config.podp_id}.json"
genome_status_json_file = self.downloads_dir / defaults.GENOME_STATUS_FILENAME
genome_bgc_mappings_file = self.antismash_dir / defaults.GENOME_BGC_MAPPINGS_FILENAME
gnps_file_mapping_file = self.gnps_file_mappings_file
strain_mappings_file = config.root_dir / STRAIN_MAPPINGS_FILENAME
strain_mappings_file = self.root_dir / defaults.STRAIN_MAPPINGS_FILENAME

# generate the genome_bgc_mappings_file
generate_mappings_genome_id_bgc_id(defaults.ANTISMASH_DEFAULT_PATH)
generate_mappings_genome_id_bgc_id(self.antismash_dir)
# generate the strain_mappings_file
podp_generate_strain_mappings(
podp_json_file,
Expand All @@ -343,7 +362,7 @@ def arrange_strains_selected(self) -> None:
The validation checks if the strains selected file is a valid JSON file according to the
schema defined in `schemas/user_strains.json`.
"""
strains_selected_file = config.root_dir / defaults.STRAINS_SELECTED_FILENAME
strains_selected_file = self.root_dir / defaults.STRAINS_SELECTED_FILENAME
if strains_selected_file.exists():
with open(strains_selected_file, "r") as f:
json_data = json.load(f)
Expand Down Expand Up @@ -445,7 +464,7 @@ def validate_bigscape(bigscape_dir: Path) -> None:
"""Validate the BiG-SCAPE data directory and its contents.
The BiG-SCAPE data directory must exist and contain the clustering file
"mix_clustering_c{config.bigscape.cutoff}.tsv" where {config.bigscape.cutoff} is the
"mix_clustering_c{self.config.bigscape.cutoff}.tsv" where {self.config.bigscape.cutoff} is the
bigscape cutoff value set in the config file.
Alternatively, the directory can contain the BiG-SCAPE database file generated by BiG-SCAPE v2.
Expand All @@ -461,7 +480,7 @@ def validate_bigscape(bigscape_dir: Path) -> None:
if not bigscape_dir.exists():
raise FileNotFoundError(f"BiG-SCAPE data directory not found at {bigscape_dir}")

clustering_file = bigscape_dir / f"mix_clustering_c{config.bigscape.cutoff}.tsv"
clustering_file = bigscape_dir / f"mix_clustering_c{self.config.bigscape.cutoff}.tsv"
database_file = bigscape_dir / "data_sqlite.db"
if not clustering_file.exists() and not database_file.exists():
raise FileNotFoundError(f"BiG-SCAPE data not found in {clustering_file} or {database_file}")
2 changes: 1 addition & 1 deletion src/nplinker/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from nplinker.utils import transform_to_full_path


def load_config(config_file: str | PathLike):
def load_config(config_file: str | PathLike) -> Dynaconf:
"""Load and validate the configuration file.
Args:
Expand Down
Loading

0 comments on commit 7fc40a3

Please sign in to comment.