diff --git a/src/nplinker/genomics/antismash/antismash_downloader.py b/src/nplinker/genomics/antismash/antismash_downloader.py index 22b2135e..342a5512 100644 --- a/src/nplinker/genomics/antismash/antismash_downloader.py +++ b/src/nplinker/genomics/antismash/antismash_downloader.py @@ -7,14 +7,12 @@ from nplinker.utils import list_dirs from nplinker.utils import list_files + logger = LogConfig.getLogger(__name__) # urls to be given to download antismash data -ANTISMASH_DB_PAGE_URL = 'https://antismash-db.secondarymetabolites.org/output/{}/' ANTISMASH_DB_DOWNLOAD_URL = 'https://antismash-db.secondarymetabolites.org/output/{}/{}' - # The antiSMASH DBV2 is for the availability of the old version, better to keep it. -ANTISMASH_DBV2_PAGE_URL = 'https://antismash-dbv2.secondarymetabolites.org/output/{}/' ANTISMASH_DBV2_DOWNLOAD_URL = 'https://antismash-dbv2.secondarymetabolites.org/output/{}/{}' @@ -29,8 +27,8 @@ def download_and_extract_antismash_data(antismash_id: str, Args: antismash_id(str): The id used to download BGC archive from antiSMASH database. - If the id is versioned (e.g., "GCF_004339725.1") please be sure to - specify the version as well. + If the id is versioned (e.g., "GCF_004339725.1") please be sure to + specify the version as well. download_root(str | PathLike): Path to the directory to place downloaded archive in. extract_root(str | PathLike): Path to the directory data files will be extracted to. Note that an `antismash` directory will be created in the specified `extract_root` if diff --git a/src/nplinker/pairedomics/__init__.py b/src/nplinker/pairedomics/__init__.py index 3d77a7da..3e0bf637 100644 --- a/src/nplinker/pairedomics/__init__.py +++ b/src/nplinker/pairedomics/__init__.py @@ -1,10 +1,9 @@ import logging -from .podp_antismash_downloader import download_antismash_data +from .podp_antismash_downloader import GENOME_STATUS_FILENAME +from .podp_antismash_downloader import GenomeStatus from .podp_antismash_downloader import podp_download_and_extract_antismash_data + logging.getLogger(__name__).addHandler(logging.NullHandler()) -__all__ = [ - "download_antismash_data", - "podp_download_and_extract_antismash_data" -] \ No newline at end of file +__all__ = ["GENOME_STATUS_FILENAME", "GenomeStatus", "podp_download_and_extract_antismash_data"] diff --git a/src/nplinker/pairedomics/podp_antismash_downloader.py b/src/nplinker/pairedomics/podp_antismash_downloader.py index 21c5bf2d..168e543d 100644 --- a/src/nplinker/pairedomics/podp_antismash_downloader.py +++ b/src/nplinker/pairedomics/podp_antismash_downloader.py @@ -1,67 +1,109 @@ -import csv -import os -import re -import time -import zipfile +import json from os import PathLike from pathlib import Path -import httpx +import re +import time from bs4 import BeautifulSoup from bs4 import NavigableString from bs4 import Tag -from deprecated import deprecated -from progress.bar import Bar +import httpx from nplinker.genomics.antismash import download_and_extract_antismash_data from nplinker.logconfig import LogConfig -logger = LogConfig.getLogger(__name__) - -# urls to be given to download antismash data -ANTISMASH_DB_PAGE_URL = 'https://antismash-db.secondarymetabolites.org/output/{}/' -ANTISMASH_DB_DOWNLOAD_URL = 'https://antismash-db.secondarymetabolites.org/output/{}/{}' -# The antiSMASH DBV2 is for the availability of the old version, better to keep it. -ANTISMASH_DBV2_PAGE_URL = 'https://antismash-dbv2.secondarymetabolites.org/output/{}/' -ANTISMASH_DBV2_DOWNLOAD_URL = 'https://antismash-dbv2.secondarymetabolites.org/output/{}/{}' +logger = LogConfig.getLogger(__name__) NCBI_LOOKUP_URL = 'https://www.ncbi.nlm.nih.gov/assembly/?term={}' - JGI_GENOME_LOOKUP_URL = 'https://img.jgi.doe.gov/cgi-bin/m/main.cgi?section=TaxonDetail&page=taxonDetail&taxon_oid={}' - USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:86.0) Gecko/20100101 Firefox/86.0' - -GENOME_STATUS_FILENAME = "genome_status.csv" +GENOME_STATUS_FILENAME = "genome_status.json" class GenomeStatus: - """To represent the data status for a certain genome ID. + """A class to represent the status of a single genome. - Attributes: - original_id(str): original ID for the genome. - resolved_refseq_id(str): RefSeq ID generated by resolving the original genome ID. - attempted(str): If the attempt of resolving the genome ID has already been done. Defaults to `False`. - filename(str): Path to the file that stores the genome data. Defaults to "". + The status of genomes is tracked in a JSON file which has a name defined + in variable `GENOME_STATUS_FILENAME`. """ def __init__(self, original_id: str, - resolved_refseq_id: str, - attempted: str = 'False', - filename: str = ""): + resolved_refseq_id: str = "", + resolve_attempted: bool = False, + bgc_path: str = ""): + """Initialize a GenomeStatus object for the given genome. + + Args: + original_id (str): The original ID of the genome. + resolved_refseq_id (str, optional): The resolved RefSeq ID of the + genome. Defaults to "". + resolve_attempted (bool, optional): A flag indicating whether an + attempt to resolve the RefSeq ID has been made. Defaults to False. + bgc_path (str, optional): The path to the downloaded BGC file for + the genome. Defaults to "". + """ self.original_id = original_id self.resolved_refseq_id = "" if resolved_refseq_id == 'None' else resolved_refseq_id - self.attempted = attempted == 'True' - self.filename = filename + self.resolve_attempted = resolve_attempted + self.bgc_path = bgc_path + + @staticmethod + def read_json(file: str | PathLike) -> dict[str, 'GenomeStatus']: + """Get a dict of GenomeStatus objects by loading given genome status file. + + Note that an empty dict is returned if the given file doesn't exist. + + Args: + file(str | PathLike): Path to genome status file. - def to_csv(self, file: str | PathLike, mode: str = 'a') -> None: - """Write object to a csv file. + Returns: + dict: dict keys are genome original id and values are GenomeStatus + objects. An empty dict is returned if the given file doesn't exist. """ - line = ','.join([ - self.original_id, self.resolved_refseq_id, - str(self.attempted), self.filename - ]) - with open(file, mode) as f: - f.write(line + '\n') + genome_status_dict = {} + if Path(file).exists(): + with open(file, "r") as f: + data = json.load(f) + genome_status_dict = { + gs["original_id"]: GenomeStatus(**gs) + for gs in data["genome_status"] + } + return genome_status_dict + + @staticmethod + def to_json(genome_status_dict: dict[str, 'GenomeStatus'], + file: str | PathLike | None = None) -> str | None: + """Convert the genome status dictionary to a JSON string. + + If a file path is provided, the JSON string is written to the file. If + the file already exists, it is overwritten. + + Args: + genome_status_dict (dict[str, 'GenomeStatus']): A dictionary of genome + status objects. The keys are the original genome IDs and the values + are GenomeStatus objects. + file(str | PathLike | None): The path to the output JSON file. + If None, the JSON string is returned but not written to a file. + + Returns: + str | None: The JSON string if `file` is None, otherwise None. + """ + gs_list = [gs._to_dict() for gs in genome_status_dict.values()] + json_data = {"genome_status": gs_list, "version": "1.0"} + if file is not None: + with open(file, "w") as f: + json.dump(json_data, f) + return None + return json.dumps(json_data) + + def _to_dict(self) -> dict: + """Convert the GenomeStatus object to a dict.""" + return { + "original_id": self.original_id, + "resolved_refseq_id": self.resolved_refseq_id, + "resolve_attempted": self.resolve_attempted, + "bgc_path": self.bgc_path + } def podp_download_and_extract_antismash_data( @@ -69,20 +111,20 @@ def podp_download_and_extract_antismash_data( project_download_root: str | PathLike, project_extract_root: str | PathLike): """Download and extract antiSMASH BGC archive for the given genome records. - + Args: - genome_records(list[dict[str, dict[str, str] | str]]): list of dicts - representing genome records. The dict of each genome record contains - - key(str): "genome_ID" - - value(dict[str, str]): a dict containing information about genome + genome_records(list[dict[str, dict[str, str] | str]]): list of dicts + representing genome records. The dict of each genome record contains + - key(str): "genome_ID" + - value(dict[str, str]): a dict containing information about genome type, label and accession ids (RefSeq, GenBank, and/or JGI). - project_download_root(str | PathLike): Path to the directory to place - downloaded archive in. - project_extract_root(str | PathLike): Path to the directory downloaded archive - will be extracted to. - Note that an `antismash` directory will be created in the specified - `extract_root` if it doesn't exist. The files will be extracted to - `/antismash/` directory. + project_download_root(str | PathLike): Path to the directory to place + downloaded archive in. + project_extract_root(str | PathLike): Path to the directory downloaded archive + will be extracted to. + Note that an `antismash` directory will be created in the specified + `extract_root` if it doesn't exist. The files will be extracted to + `/antismash/` directory. """ if not Path(project_download_root).exists(): @@ -90,113 +132,75 @@ def podp_download_and_extract_antismash_data( # genome_status_file can't be written Path(project_download_root).mkdir(parents=True, exist_ok=True) - genome_status_file = Path(project_download_root, GENOME_STATUS_FILENAME) - genome_status = _get_genome_status_log(genome_status_file) + gs_file = Path(project_download_root, GENOME_STATUS_FILENAME) + gs_dict = GenomeStatus.read_json(gs_file) for i, genome_record in enumerate(genome_records): # get the best available ID from the dict - raw_genome_id = _get_best_available_genome_id( - genome_record['genome_ID']) + genome_id_data = genome_record['genome_ID'] + raw_genome_id = _get_best_available_genome_id(genome_id_data) if raw_genome_id is None or len(raw_genome_id) == 0: logger.warning( f'Ignoring genome record "{genome_record}" due to missing genome ID field' ) continue - # use this to check if the lookup has already been attempted and if - # so if the file is cached locally - if raw_genome_id not in genome_status: - genome_status[raw_genome_id] = GenomeStatus(raw_genome_id, "None") + # check if genome ID exist in the genome status file + if raw_genome_id not in gs_dict: + gs_dict[raw_genome_id] = GenomeStatus(raw_genome_id) - genome_obj = genome_status[raw_genome_id] + gs_obj = gs_dict[raw_genome_id] logger.info( - f'Checking for antismash data {i + 1}/{len(genome_records)}, current genome ID={raw_genome_id}' - ) - # first check if file is cached locally - if (genome_obj.filename and Path(genome_obj.filename).exists()): - # file already downloaded + f'Checking for antismash data {i + 1}/{len(genome_records)}, ' + f'current genome ID={raw_genome_id}') + # first, check if BGC data is downloaded + if (gs_obj.bgc_path and Path(gs_obj.bgc_path).exists()): logger.info( - f'Genome ID {raw_genome_id} already downloaded to {genome_obj.filename}' + f'Genome ID {raw_genome_id} already downloaded to {gs_obj.bgc_path}' ) continue - if genome_obj.attempted: - # lookup attempted previously but failed + # second, check if lookup attempted previously + if gs_obj.resolve_attempted: logger.info( f'Genome ID {raw_genome_id} skipped due to previous failure') continue - # if no existing file and no lookup attempted, can start process of - # trying to retrieve the data - # lookup the ID + # if not downloaded or lookup attempted, then try to resolve the ID + # and download logger.info(f'Beginning lookup process for genome ID {raw_genome_id}') + gs_obj.resolved_refseq_id = _resolve_refseq_id(genome_id_data) + gs_obj.resolve_attempted = True - genome_obj.resolved_refseq_id = _resolve_refseq_id( - genome_record['genome_ID']) - - if not isinstance(genome_obj.resolved_refseq_id, str): - raise TypeError( - f"genome_obj.resolved_refseq_id should be a string. Instead got: {type(genome_obj.resolved_refseq_id)}" - ) - - genome_obj.attempted = True - - if genome_obj.resolved_refseq_id == "": + if gs_obj.resolved_refseq_id == "": # give up on this one logger.warning(f'Failed lookup for genome ID {raw_genome_id}') continue - # if we got a refseq ID, now try to download and extract the data from antismash - download_and_extract_antismash_data(genome_obj.resolved_refseq_id, + # if resolved id is valid, try to download and extract antismash data + download_and_extract_antismash_data(gs_obj.resolved_refseq_id, project_download_root, project_extract_root) - genome_obj.filename = str( + gs_obj.bgc_path = str( Path(project_download_root, - genome_obj.resolved_refseq_id + '.zip').absolute()) + gs_obj.resolved_refseq_id + '.zip').absolute()) + output_path = Path(project_extract_root, 'antismash', - genome_obj.resolved_refseq_id) + gs_obj.resolved_refseq_id) Path.touch(output_path / 'completed', exist_ok=True) - missing = len([x for x in genome_status.values() if len(x.filename) == 0]) - logger.info( - f'Dataset has {missing} missing sets of antiSMASH data (from a total of {len(genome_records)})' - ) + missing = len([gs for gs in gs_dict.values() if not gs.bgc_path]) + logger.info(f'Dataset has {missing} missing sets of antiSMASH data ' + f' (from a total of {len(genome_records)}).') - for obj in genome_status.values(): - obj.to_csv(genome_status_file) + # save updated genome status to json file + GenomeStatus.to_json(gs_dict, gs_file) if missing == len(genome_records): logger.warning('Failed to successfully retrieve ANY genome data!') -def _get_genome_status_log( - genome_status_file: PathLike) -> dict[str, GenomeStatus]: - """Get a dict of GenomeStatus objects by reading given genome status file. - Note that a empty dict is returned if the given file does not exist. - - Args: - genome_status_file(PathLike): Path to genome status file that records - genome IDs and local filenames to avoid repeating time-consuming - HTTP requests each time the app is loaded. - - Returns: - dict: dict keys are genome original id and values are GenomeStatus objects. - """ - - genome_status = {} - - # GENOME_STATUS_FILENAME is read, then in the for loop over the genome records it gets updated, - # and finally it is saved again in GENOME_STATUS_FILENAME which is overwritten - if Path(genome_status_file).exists(): - with open(genome_status_file) as f: - for line in csv.reader(f): - asobj = GenomeStatus(*line) - genome_status[asobj.original_id] = asobj - - return genome_status - - def _get_best_available_genome_id( genome_id_data: dict[str, str]) -> str | None: """Get the best available ID from genome_id_data dict. @@ -206,7 +210,7 @@ def _get_best_available_genome_id( for each genome record present. Returns: - str | None: ID for the genome, if present, otherwise None. + str | None: ID for the genome, if present, otherwise None. """ if 'RefSeq_accession' in genome_id_data: @@ -249,17 +253,17 @@ def _ncbi_genbank_search(genbank_id: str, def _resolve_genbank_accession(genbank_id: str) -> str: - """Try to get RefSeq id through given GenBank id. + """Try to get RefSeq id through given GenBank id. Args: - genbank_id(str): ID for GenBank accession. + genbank_id(str): ID for GenBank accession. Raises: - Exception: "Unknown HTML format" if the search of genbank does not give any results. + Exception: "Unknown HTML format" if the search of genbank does not give any results. Exception: "Expected HTML elements not found" if no match with RefSeq assembly accession is found. Returns: - str | None: RefSeq ID if the search is successful, otherwise None. + str | None: RefSeq ID if the search is successful, otherwise None. """ logger.info( f'Attempting to resolve Genbank accession {genbank_id} to RefSeq accession' @@ -318,7 +322,7 @@ def _resolve_jgi_accession(jgi_id: str) -> str: """Try to get RefSeq id through given JGI id. Args: - jgi_id(str): JGI_Genome_ID for GenBank accession. + jgi_id(str): JGI_Genome_ID for GenBank accession. Returns: str | None: Return RefSeq ID if search is successful, otherwise None. @@ -356,7 +360,7 @@ def _resolve_refseq_id(genome_id_data: dict[str, str]) -> str: for each genome record present. Returns: - str: Return RefSeq ID if present, otherwise an empty string. + str: Return RefSeq ID if present, otherwise an empty string. """ if 'RefSeq_accession' in genome_id_data: # best case, can use this directly @@ -370,249 +374,3 @@ def _resolve_refseq_id(genome_id_data: dict[str, str]) -> str: logger.warning(f'Unable to resolve genome_ID: {genome_id_data}') return "" - - -def _get_antismash_filename(genome_obj: GenomeStatus) -> str | None: - """Get the Antismash filename for genome_obj ID looking it up on ANTISMASH_DB_PAGE_URL - and ANTISMASH_DBV2_PAGE_URL. - - Args: - genome_obj(GenomeStatus): Defines the status relative to a certain genome ID. - - Returns: - str | None: Antismash filename, if it exists. - """ - # want to try up to 4 different links here, v1 and v2 databases, each - # with and without the .1 suffix on the accesssion ID - - accesssions = [ - genome_obj.resolved_refseq_id, genome_obj.resolved_refseq_id + '.1' - ] - for base_url in [ANTISMASH_DB_PAGE_URL, ANTISMASH_DBV2_PAGE_URL]: - for accession in accesssions: - url = base_url.format(accession) - link = None - - logger.info(f'antismash DB lookup for {accession} => {url}') - try: - resp = httpx.get(url, follow_redirects=True) - soup = BeautifulSoup(resp.content, 'html.parser') - # retrieve .zip file download link - link = soup.find('a', - {'href': lambda url: url.endswith('.zip')}) - except Exception as e: - logger.debug(f'antiSMASH DB page load failed: {e}') - - if link is not None: - logger.info( - f"antiSMASH lookup succeeded! Filename is {link['href']}") - # save with the .1 suffix if that worked - genome_obj.resolved_refseq_id = accession - return str(link['href']) - - return None - - -@deprecated(version="1.3.3", - reason="Use download_and_extract_antismash_data class instead.") -def download_antismash_data(genome_records: list[dict[str, - dict[str, str] | str]], - project_download_cache: str | PathLike, - project_file_cache: str | PathLike): - - genome_status_file = Path(project_download_cache, GENOME_STATUS_FILENAME) - genome_status = _get_genome_status_log(genome_status_file) - - for i, genome_record in enumerate(genome_records): - # get the best available ID from the dict - if not isinstance(genome_record['genome_ID'], dict): - raise TypeError( - f"_get_best_available_genome_id() expects a dict as input. Instead got: {type(genome_record['genome_ID'])}" - ) - raw_genome_id = _get_best_available_genome_id( - genome_record['genome_ID']) - if raw_genome_id is None: - logger.warning( - f'Ignoring genome record "{genome_record}" due to missing genome ID field' - ) - continue - - # use this to check if the lookup has already been attempted and if - # so if the file is cached locally - if raw_genome_id not in genome_status: - genome_status[raw_genome_id] = GenomeStatus(raw_genome_id, "None") - - genome_obj = genome_status[raw_genome_id] - - logger.info( - f'Checking for antismash data {i + 1}/{len(genome_records)}, current genome ID={raw_genome_id}' - ) - # first check if file is cached locally - if os.path.exists(genome_obj.filename): - # file already downloaded - logger.info( - f'Genome ID {raw_genome_id} already downloaded to {genome_obj.filename}' - ) - genome_record['resolved_refseq_id'] = genome_obj.resolved_refseq_id - elif genome_obj.attempted: - # lookup attempted previously but failed - logger.info( - f'Genome ID {raw_genome_id} skipped due to previous failure') - genome_record['resolved_refseq_id'] = genome_obj.resolved_refseq_id - else: - # if no existing file and no lookup attempted, can start process of - # trying to retrieve the data - - # lookup the ID - logger.info( - f'Beginning lookup process for genome ID {raw_genome_id}') - - genome_obj.resolved_refseq_id = _resolve_refseq_id( - genome_record['genome_ID']) - genome_obj.attempted = True - - if genome_obj.resolved_refseq_id == "": - # give up on this one - logger.warning(f'Failed lookup for genome ID {raw_genome_id}') - genome_obj.to_csv(genome_status_file) - continue - - # if we got a refseq ID, now try to download the data from antismash - if _download_antismash_zip(genome_obj, project_download_cache): - logger.info( - f'Genome data successfully downloaded for {raw_genome_id}') - assert isinstance(genome_obj.resolved_refseq_id, str) - genome_record[ - 'resolved_refseq_id'] = genome_obj.resolved_refseq_id - else: - logger.warning( - f'Failed to download antiSMASH data for genome ID {genome_obj.resolved_refseq_id} ({genome_obj.original_id})' - ) - - genome_obj.to_csv(genome_status_file) - - _extract_antismash_zip(genome_obj, project_file_cache) - - missing = len([x for x in genome_status.values() if len(x.filename) == 0]) - logger.info( - f'Dataset has {missing} missing sets of antiSMASH data (from a total of {len(genome_records)})' - ) - - for obj in genome_status.values(): - obj.to_csv(genome_status_file) - - if missing == len(genome_records): - logger.warning('Failed to successfully retrieve ANY genome data!') - - -@deprecated(version="1.3.3", - reason="Use download_and_extract_antismash_data class instead.") -def _get_antismash_zip_data(accession_id: str, filename: str, - local_path: str | PathLike) -> bool: - for base_url in [ANTISMASH_DB_DOWNLOAD_URL, ANTISMASH_DBV2_DOWNLOAD_URL]: - zipfile_url = base_url.format(accession_id, filename) - with open(local_path, 'wb') as f: - total_bytes = 0 - try: - with httpx.stream('GET', zipfile_url) as r: - if r.status_code == 404: - logger.debug('antiSMASH download URL was a 404') - continue - - logger.info(f'Downloading from antiSMASH: {zipfile_url}') - filesize = int(r.headers['content-length']) - bar_obj = Bar(filename, - max=filesize, - suffix='%(percent)d%%') - for data in r.iter_bytes(): - f.write(data) - total_bytes += len(data) - bar_obj.next(len(data)) - bar_obj.finish() - except Exception as e: - logger.warning(f'antiSMASH zip download failed: {e}') - continue - - return True - - return False - - -@deprecated(version="1.3.3", - reason="Use download_and_extract_antismash_data class instead.") -def _download_antismash_zip(antismash_obj: GenomeStatus, - project_download_cache: str | PathLike) -> bool: - # save zip files to avoid having to repeat above lookup every time - local_path = os.path.join(project_download_cache, - f'{antismash_obj.resolved_refseq_id}.zip') - logger.debug(f'Checking for existing antismash zip at {local_path}') - - cached = False - # if the file exists locally - if os.path.exists(local_path): - logger.info(f'Found cached file at {local_path}') - try: - # check if it's a valid zip file, if so treat it as cached - with zipfile.ZipFile(local_path) as _: - cached = True - antismash_obj.filename = local_path - except zipfile.BadZipFile as bzf: - # otherwise delete and redownload - logger.info( - f'Invalid antismash zipfile found ({bzf}). Will download again' - ) - os.unlink(local_path) - antismash_obj.filename = "" - - if not cached: - filename = _get_antismash_filename(antismash_obj) - if filename is None: - return False - - _get_antismash_zip_data(antismash_obj.resolved_refseq_id, filename, - local_path) - antismash_obj.filename = local_path - - return True - - -@deprecated(version="1.3.3", - reason="Use download_and_extract_antismash_data class instead.") -def _extract_antismash_zip(antismash_obj: GenomeStatus, - project_file_cache: str | PathLike) -> bool: - if antismash_obj.filename is None or len(antismash_obj.filename) == 0: - return False - - output_path = os.path.join(project_file_cache, 'antismash', - antismash_obj.resolved_refseq_id) - exists_already = os.path.exists(output_path) and os.path.exists( - os.path.join(output_path, 'completed')) - - logger.debug( - f'Extracting antismash data to {output_path}, exists_already = {exists_already}' - ) - if exists_already: - return True - - # create a subfolder for each set of genome data (the zip files used to be - # constructed with path info but that seems to have changed recently) - if not os.path.exists(output_path): - os.makedirs(output_path, exist_ok=True) - - with zipfile.ZipFile(antismash_obj.filename) as antismash_zip: - kc_prefix1 = f'{antismash_obj.resolved_refseq_id}/knownclusterblast' - kc_prefix2 = 'knownclusterblast' - for zip_member in antismash_zip.namelist(): - # TODO other files here? - if zip_member.endswith('.gbk') or zip_member.endswith('.json'): - antismash_zip.extract(zip_member, path=output_path) - elif zip_member.startswith(kc_prefix1) or zip_member.startswith( - kc_prefix2): - if zip_member.endswith( - '.txt') and 'mibig_hits' not in zip_member: - antismash_zip.extract(zip_member, path=output_path) - - with open(os.path.join(output_path, 'completed'), 'w'): - pass - - return True diff --git a/tests/pairedomics/test_podp_antismash_downloader.py b/tests/pairedomics/test_podp_antismash_downloader.py index 4fb4ad09..ea110f6a 100644 --- a/tests/pairedomics/test_podp_antismash_downloader.py +++ b/tests/pairedomics/test_podp_antismash_downloader.py @@ -1,11 +1,9 @@ -import csv +import json from pathlib import Path import pytest -from nplinker.pairedomics.podp_antismash_downloader import GenomeStatus -from nplinker.pairedomics.podp_antismash_downloader import \ - _get_genome_status_log -from nplinker.pairedomics.podp_antismash_downloader import \ - podp_download_and_extract_antismash_data +from nplinker.pairedomics import GENOME_STATUS_FILENAME +from nplinker.pairedomics import GenomeStatus +from nplinker.pairedomics import podp_download_and_extract_antismash_data from nplinker.utils import list_files @@ -21,44 +19,91 @@ def extract_root(tmp_path): @pytest.fixture def genome_status_file(download_root): - return Path(download_root, "genome_status.csv") - - -# Test __init__ method of GenomeStatus class -@pytest.mark.parametrize("params, expected", - [(["GCF_000515175.1", "None", "False", "" - ], ["GCF_000515175.1", "", False, ""]), - (["GCF_000515175.1", "None", "True", "" - ], ["GCF_000515175.1", "", True, ""]), - (["GCF_000515175.1", "GCF_000515175.1", "True", ""], - ["GCF_000515175.1", "GCF_000515175.1", True, ""]), - (["GCF_000515175.1", "None", "False", "filename" - ], ["GCF_000515175.1", "", False, "filename"])]) + return Path(download_root, GENOME_STATUS_FILENAME) + + +# Test `GenomeStatus` class +@pytest.mark.parametrize("params, expected", [ + (["genome1"], ["genome1", "", False, ""]), + (["genome1", "refseq1", True, "/path/to/file" + ], ["genome1", "refseq1", True, "/path/to/file"]), +]) def test_genome_status_init(params, expected): gs = GenomeStatus(*params) - assert [gs.original_id, gs.resolved_refseq_id, gs.attempted, - gs.filename] == expected - - -# Test to_csv method of GenomeStatus class -def test_genome_status_to_csv(tmp_path): - genome_status_file = Path(tmp_path, "genome_status.csv") - raw_genome_id1 = "GCF_000515175.1" - raw_genome_id2 = "GCF_000514635.1" - genome_obj1 = GenomeStatus(raw_genome_id1, "None") - genome_obj2 = GenomeStatus(raw_genome_id2, "None") - genome_obj1.to_csv(genome_status_file) - genome_obj2.to_csv(genome_status_file) - assert Path(genome_status_file).exists() - genome_status = {} - with open(genome_status_file) as f: - for line in csv.reader(f): - asobj = GenomeStatus(*line) - genome_status[asobj.original_id] = asobj - assert isinstance(genome_status[raw_genome_id1], GenomeStatus) - assert isinstance(genome_status[raw_genome_id2], GenomeStatus) - assert genome_status[raw_genome_id1].original_id == raw_genome_id1 - assert genome_status[raw_genome_id2].original_id == raw_genome_id2 + assert [ + gs.original_id, gs.resolved_refseq_id, gs.resolve_attempted, + gs.bgc_path + ] == expected + + +def test_genome_status_read_json(tmp_path): + data = { + "genome_status": [{ + "original_id": "genome1", + "resolved_refseq_id": "refseq1", + "resolve_attempted": True, + "bgc_path": "/path/to/bgc1" + }, { + "original_id": "genome2", + "resolved_refseq_id": "", + "resolve_attempted": False, + "bgc_path": "" + }], + "version": + "1.0" + } + file_path = tmp_path / GENOME_STATUS_FILENAME + with open(file_path, "w") as f: + json.dump(data, f) + genome_status_dict = GenomeStatus.read_json(file_path) + + assert len(genome_status_dict) == 2 + assert genome_status_dict["genome1"].original_id == "genome1" + assert genome_status_dict["genome1"].resolved_refseq_id == "refseq1" + assert genome_status_dict["genome1"].resolve_attempted is True + assert genome_status_dict["genome1"].bgc_path == "/path/to/bgc1" + assert genome_status_dict["genome2"].original_id == "genome2" + assert genome_status_dict["genome2"].resolved_refseq_id == "" + assert genome_status_dict["genome2"].resolve_attempted is False + assert genome_status_dict["genome2"].bgc_path == "" + + +def test_genome_status_to_json(tmp_path): + genome_status_dict = { + "genome1": GenomeStatus("genome1", "refseq1", True, "/path/to/bgc1"), + "genome2": GenomeStatus("genome2", "", False, "") + } + result = GenomeStatus.to_json(genome_status_dict, + tmp_path / GENOME_STATUS_FILENAME) + with open(tmp_path / GENOME_STATUS_FILENAME, "r") as f: + loaded_data = json.load(f) + + assert result is None + assert loaded_data["version"] == "1.0" + assert len(loaded_data["genome_status"]) == 2 + assert loaded_data["genome_status"][0]["original_id"] == "genome1" + assert loaded_data["genome_status"][0]["resolved_refseq_id"] == "refseq1" + assert loaded_data["genome_status"][0]["resolve_attempted"] is True + assert loaded_data["genome_status"][0]["bgc_path"] == "/path/to/bgc1" + assert loaded_data["genome_status"][1]["original_id"] == "genome2" + assert loaded_data["genome_status"][1]["resolved_refseq_id"] == "" + assert loaded_data["genome_status"][1]["resolve_attempted"] is False + assert loaded_data["genome_status"][1]["bgc_path"] == "" + + +def test_genome_status_to_json_nofile(): + genome_status_dict = { + "genome1": GenomeStatus("genome1", "refseq1", True, "/path/to/bgc1"), + "genome2": GenomeStatus("genome2", "", False, "") + } + result = GenomeStatus.to_json(genome_status_dict) + + assert isinstance(result, str) + assert result == '{"genome_status": ' \ + '[{"original_id": "genome1", "resolved_refseq_id": "refseq1", ' \ + '"resolve_attempted": true, "bgc_path": "/path/to/bgc1"}, ' \ + '{"original_id": "genome2", "resolved_refseq_id": "", ' \ + '"resolve_attempted": false, "bgc_path": ""}], "version": "1.0"}' # Test `podp_download_and_extract_antismash_data` function @@ -91,7 +136,7 @@ def test_multiple_records(download_root, extract_root, genome_status_file): archive2 = download_root / "GCF_000514515.1.zip" extracted_folder2 = extract_root / "antismash" / "GCF_000514515.1" extracted_files2 = list_files(extracted_folder2, keep_parent=True) - genome_status = _get_genome_status_log(genome_status_file) + genome_status = GenomeStatus.read_json(genome_status_file) assert archive1.exists() assert archive2.exists() @@ -133,7 +178,7 @@ def test_missing_id(download_root, extract_root, genome_status_file): extracted_folder1 = extract_root / "antismash" / "GCF_000514875.1" archive2 = download_root / "GCF_000514515.1.zip" extracted_folder2 = extract_root / "antismash" / "GCF_000514515.1" - genome_status = _get_genome_status_log(genome_status_file) + genome_status = GenomeStatus.read_json(genome_status_file) assert (not archive1.exists() and archive2.exists()) assert (not archive1.is_file() and archive2.is_file()) @@ -156,15 +201,15 @@ def test_caching(download_root, extract_root, genome_status_file, caplog): podp_download_and_extract_antismash_data(genome_records, download_root, extract_root) - genome_status_old = _get_genome_status_log(genome_status_file) + genome_status_old = GenomeStatus.read_json(genome_status_file) genome_obj = genome_status_old["GCF_000514875.1"] - assert Path(genome_obj.filename).exists() - assert genome_obj.attempted + assert Path(genome_obj.bgc_path).exists() + assert genome_obj.resolve_attempted podp_download_and_extract_antismash_data(genome_records, download_root, extract_root) - assert f'Genome ID {genome_obj.original_id} already downloaded to {genome_obj.filename}' in caplog.text + assert f'Genome ID {genome_obj.original_id} already downloaded to {genome_obj.bgc_path}' in caplog.text assert f'Genome ID {genome_obj.original_id} skipped due to previous failure' not in caplog.text - genome_status_new = _get_genome_status_log(genome_status_file) + genome_status_new = GenomeStatus.read_json(genome_status_file) assert len(genome_status_old) == len(genome_status_new) @@ -181,9 +226,9 @@ def test_failed_lookup(download_root, extract_root, genome_status_file): podp_download_and_extract_antismash_data(genome_records, download_root, extract_root) - genome_status = _get_genome_status_log(genome_status_file) - assert len(genome_status["non_existing_ID"].filename) == 0 - assert genome_status["non_existing_ID"].attempted + genome_status = GenomeStatus.read_json(genome_status_file) + assert len(genome_status["non_existing_ID"].bgc_path) == 0 + assert genome_status["non_existing_ID"].resolve_attempted assert not (download_root / "non_existing_ID.zip").exists() assert not (extract_root / "antismash" / "non_existing_ID.zip").exists() @@ -202,7 +247,7 @@ def test_refseq_id(download_root, extract_root, genome_status_file): podp_download_and_extract_antismash_data(genome_records, download_root, extract_root) - genome_status = _get_genome_status_log(genome_status_file) + genome_status = GenomeStatus.read_json(genome_status_file) genome_obj = genome_status["GCF_000514875.1"] archive = download_root / Path(str(genome_obj.resolved_refseq_id) + ".zip") extracted_folder = extract_root / "antismash" / genome_obj.resolved_refseq_id @@ -232,7 +277,7 @@ def test_genbank_id(download_root, extract_root, genome_status_file): podp_download_and_extract_antismash_data(genome_records, download_root, extract_root) - genome_status = _get_genome_status_log(genome_status_file) + genome_status = GenomeStatus.read_json(genome_status_file) genome_obj = genome_status["GCA_004799605.1"] archive = download_root / Path(str(genome_obj.resolved_refseq_id) + ".zip") extracted_folder = extract_root / "antismash" / genome_obj.resolved_refseq_id @@ -262,7 +307,7 @@ def test_jgi_id(download_root, extract_root, genome_status_file): podp_download_and_extract_antismash_data(genome_records, download_root, extract_root) - genome_status = _get_genome_status_log(genome_status_file) + genome_status = GenomeStatus.read_json(genome_status_file) genome_obj = genome_status["2506783052"] archive = download_root / Path(str(genome_obj.resolved_refseq_id) + ".zip") extracted_folder = extract_root / "antismash" / genome_obj.resolved_refseq_id @@ -293,7 +338,7 @@ def test_refseq_jgi_id(download_root, extract_root, genome_status_file): podp_download_and_extract_antismash_data(genome_records, download_root, extract_root) - genome_status = _get_genome_status_log(genome_status_file) + genome_status = GenomeStatus.read_json(genome_status_file) genome_obj = genome_status["GCF_000514875.1"] archive = download_root / Path(str(genome_obj.resolved_refseq_id) + ".zip") extracted_folder = extract_root / "antismash" / genome_obj.resolved_refseq_id @@ -324,7 +369,7 @@ def test_refseq_genbank_id(download_root, extract_root, genome_status_file): podp_download_and_extract_antismash_data(genome_records, download_root, extract_root) - genome_status = _get_genome_status_log(genome_status_file) + genome_status = GenomeStatus.read_json(genome_status_file) genome_obj = genome_status["GCF_000514875.1"] archive = download_root / Path(str(genome_obj.resolved_refseq_id) + ".zip") extracted_folder = extract_root / "antismash" / genome_obj.resolved_refseq_id @@ -355,7 +400,7 @@ def test_genbank_jgi_id(download_root, extract_root, genome_status_file): podp_download_and_extract_antismash_data(genome_records, download_root, extract_root) - genome_status = _get_genome_status_log(genome_status_file) + genome_status = GenomeStatus.read_json(genome_status_file) genome_obj = genome_status["GCA_004799605.1"] archive = download_root / Path(str(genome_obj.resolved_refseq_id) + ".zip") extracted_folder = extract_root / "antismash" / genome_obj.resolved_refseq_id @@ -369,22 +414,3 @@ def test_genbank_jgi_id(download_root, extract_root, genome_status_file): for extracted_file in extracted_files) assert genome_status_file.is_file() assert len(genome_status) == 1 - - -# Test `_get_genome_status_log` function -def test_get_genome_status_log(tmp_path): - genome_status_file = Path(tmp_path, "genome_status.csv") - raw_genome_id1 = "GCF_000515175.1" - raw_genome_id2 = "GCF_000514635.1" - genome_obj1 = GenomeStatus(raw_genome_id1, "None") - genome_obj2 = GenomeStatus(raw_genome_id2, "None") - genome_status = _get_genome_status_log(genome_status_file) - assert isinstance(genome_status, dict) - assert len(genome_status) == 0 - genome_obj1.to_csv(genome_status_file) - genome_obj2.to_csv(genome_status_file) - genome_status = _get_genome_status_log(genome_status_file) - assert isinstance(genome_status[raw_genome_id1], GenomeStatus) - assert isinstance(genome_status[raw_genome_id2], GenomeStatus) - assert genome_status[raw_genome_id1].original_id == raw_genome_id1 - assert genome_status[raw_genome_id2].original_id == raw_genome_id2