diff --git a/bkbit/data_translators/error_classes.py b/bkbit/data_translators/error_classes.py new file mode 100644 index 0000000..a653bf5 --- /dev/null +++ b/bkbit/data_translators/error_classes.py @@ -0,0 +1,5 @@ +class MissingModelError(Exception): + """ + Custom error class to denote that a model is missing + """ + pass diff --git a/bkbit/data_translators/genome_annotation_translator.py b/bkbit/data_translators/genome_annotation_translator.py index 4c47be9..3893c4b 100644 --- a/bkbit/data_translators/genome_annotation_translator.py +++ b/bkbit/data_translators/genome_annotation_translator.py @@ -65,10 +65,12 @@ from tqdm import tqdm import click import pkg_resources +import pathlib from bkbit.models import genome_annotation as ga from bkbit.utils.setup_logger import setup_logger from bkbit.utils.load_json import load_json +import bkbit.data_translators.error_classes as error_classes ## CONSTANTS ## @@ -112,9 +114,6 @@ class Gff3: __init__(content_url, assembly_accession=None, assembly_strain=None, log_level="WARNING", log_to_file=False): Initializes the Gff3 class with the provided parameters. - parse_url(): - Parses the content URL and extracts information about the genome annotation. - __download_gff_file(): Downloads a GFF file from a given URL and calculates the MD5, SHA256, and SHA1 hashes. @@ -161,35 +160,35 @@ class Gff3: def __init__( self, content_url, + url_metadata, assembly_accession=None, assembly_strain=None, log_level="WARNING", log_to_file=False, + use_tqdm=True, + tmp_dir=None ): """ Initializes an instance of the GFFTranslator class. Parameters: - content_url (str): The URL of the GFF file. + - url_metadata (dict): dict containing metadata about the genome annotation - assembly_id (str): The ID of the genome assembly. - assembly_strain (str, optional): The strain of the genome assembly. Defaults to None. - hash_functions (tuple[str]): A tuple of hash functions to use for generating checksums. Defaults to ('MD5'). """ + self._tmp_dir = tmp_dir + self.use_tqdm = use_tqdm self.logger = setup_logger(LOG_FILE_NAME, log_level, log_to_file) - try: - self.scientific_name_to_taxonid = load_json(SCIENTIFIC_NAME_TO_TAXONID_PATH) - self.taxon_scientific_name = load_json(TAXON_SCIENTIFIC_NAME_PATH) - self.taxon_common_name = load_json(TAXON_COMMON_NAME_PATH) - except FileNotFoundError as e: - self.logger.critical("NCBI Taxonomy not downloaded. Run 'bkbit download-ncbi-taxonomy' command first." ) - print(e) - sys.exit(2) + detect_model_presence(logger=self.logger) + self.taxon_scientific_name = load_json(TAXON_SCIENTIFIC_NAME_PATH) + self.taxon_common_name = load_json(TAXON_COMMON_NAME_PATH) self.content_url = content_url ## STEP 1: Parse the content URL to get metadata # Parse content_url to get metadata - url_metadata = self.parse_url() if url_metadata is None: self.logger.critical( "The provided content URL is not supported. Please provide a valid URL." @@ -212,13 +211,10 @@ def __init__( self.authority = url_metadata.get("authority") # Assign the taxon_id and assembly_id based on the authority + taxon_id = url_metadata.get("taxonid") if self.authority.value == ga.AuthorityType.NCBI.value: - taxon_id = url_metadata.get("taxonid") assembly_id = url_metadata.get("assembly_accession") elif self.authority.value == ga.AuthorityType.ENSEMBL.value: - taxon_id = self.scientific_name_to_taxonid.get( - url_metadata.get("scientific_name").replace("_", " ") - ) if assembly_accession is None: self.logger.critical( "The assembly ID is required for Ensembl URLs. Please provide the assembly ID." @@ -253,60 +249,40 @@ def __init__( self.gene_annotations = {} - def parse_url(self): + @classmethod + def from_url( + cls, + content_url, + assembly_accession=None, + assembly_strain=None, + log_level="WARNING", + log_to_file=False, + use_tqdm=True, + tmp_dir=None + ): """ - Parses the content URL and extracts information about the genome annotation. + Initializes an instance of the GFFTranslator class, gleaning + metadata about the annotation from the URL of the GFF file. - Returns: - A dictionary containing the following information: - - 'authority': The authority type (NCBI or ENSEMBL). - - 'taxonid': The taxon ID of the genome. - - 'release_version': The release version of the genome annotation. - - 'assembly_accession': The assembly accession of the genome. - - 'assembly_name': The name of the assembly. - - 'species': The species name (only for ENSEMBL URLs). - """ - # Define regex patterns for NCBI and Ensembl URLs - # NCBI : [assembly accession.version]_[assembly name]_[content type].[optional format] - # ENSEMBL : ..<_version>.gff3.gz -> organism full name, assembly name, genome version - ncbi_pattern = r"/genomes/all/annotation_releases/(\d+)(?:/(\d+))?/(GCF_\d+\.\d+)[_-]([^/]+)/(GCF_\d+\.\d+)[_-]([^/]+)_genomic\.gff\.gz" - ensembl_pattern = ( - r"/pub/release-(\d+)/gff3/([^/]+)/([^/.]+)\.([^/.]+)\.([^/.]+)\.gff3\.gz" + Parameters: + - content_url (str): The URL of the GFF file. + - assembly_id (str): The ID of the genome assembly. + - assembly_strain (str, optional): The strain of the genome assembly. Defaults to None. + - hash_functions (tuple[str]): A tuple of hash functions to use for generating checksums. Defaults to ('MD5'). + """ + detect_model_presence() + url_metadata = parse_url(content_url=content_url) + return cls( + content_url=content_url, + url_metadata=url_metadata, + assembly_accession=assembly_accession, + assembly_strain=assembly_strain, + log_level=log_level, + log_to_file=log_to_file, + use_tqdm=use_tqdm, + tmp_dir=tmp_dir ) - # Parse the URL to get the path - parsed_url = urlparse(self.content_url) - path = parsed_url.path - - # Determine if the URL is from NCBI or Ensembl and extract information - if "ncbi" in parsed_url.netloc: - ncbi_match = re.search(ncbi_pattern, path) - if ncbi_match: - return { - "authority": ga.AuthorityType.NCBI, - "taxonid": ncbi_match.group(1), - "release_version": ( - ncbi_match.group(2) - if ncbi_match.group(2) - else ncbi_match.group(4) - ), - "assembly_accession": ncbi_match.group(3), - "assembly_name": ncbi_match.group(6), - } - - elif "ensembl" in parsed_url.netloc: - ensembl_match = re.search(ensembl_pattern, path) - if ensembl_match: - return { - "authority": ga.AuthorityType.ENSEMBL, - "release_version": ensembl_match.group(1), - "scientific_name": ensembl_match.group(3), - "assembly_name": ensembl_match.group(4), - } - - # If no match is found, return None - return None - def __download_gff_file(self): """ Downloads a GFF file from a given URL and calculates the MD5, SHA256, and SHA1 hashes. @@ -325,16 +301,23 @@ def __download_gff_file(self): sha1_hash = hashlib.sha1() # Create a temporary file for the gzip data - with tempfile.NamedTemporaryFile(suffix=".gz", delete=False) as f_gzip: + with tempfile.NamedTemporaryFile( + suffix=".gz", + delete=False, + dir=self._tmp_dir) as f_gzip: + gzip_file_path = f_gzip.name # Create a progress bar - progress_bar = tqdm( - total=total_size, - unit="iB", - unit_scale=True, - desc="Downloading GFF file", - ) + if self.use_tqdm: + progress_bar = tqdm( + total=total_size, + unit="iB", + unit_scale=True, + desc="Downloading GFF file", + ) + else: + progress_bar = None # Read the file in chunks, write to the temporary file, and update the hash while True: @@ -345,9 +328,11 @@ def __download_gff_file(self): md5_hash.update(data) sha256_hash.update(data) sha1_hash.update(data) - progress_bar.update(len(data)) + if progress_bar is not None: + progress_bar.update(len(data)) - progress_bar.close() + if progress_bar is not None: + progress_bar.close() # Return the path to the temporary file and the md5 hash return gzip_file_path, { @@ -548,7 +533,7 @@ def parse(self, feature_filter: tuple[str] = DEFAULT_FEATURE_FILTER): # Decompress the gzip file with gzip.open(self.gff_file, "rb") as f_in: # Create a temporary file to save the decompressed data - with tempfile.NamedTemporaryFile(delete=False) as f_out: + with tempfile.NamedTemporaryFile(delete=False, dir=self._tmp_dir) as f_out: # Copy the decompressed data to the temporary file f_out.write(f_in.read()) gff_file = f_out.name @@ -558,9 +543,14 @@ def parse(self, feature_filter: tuple[str] = DEFAULT_FEATURE_FILTER): with open(gff_file, "r", encoding="utf-8") as file: curr_line_num = 1 - progress_bar = tqdm( - total=self.__get_line_count(gff_file), desc="Parsing GFF3 File" - ) + + if self.use_tqdm: + progress_bar = tqdm( + total=self.__get_line_count(gff_file), desc="Parsing GFF3 File" + ) + else: + progress_bar = None + for line_raw in file: line_strip = line_raw.strip() if curr_line_num == 1 and not line_strip.startswith("##gff-version 3"): @@ -602,9 +592,13 @@ def parse(self, feature_filter: tuple[str] = DEFAULT_FEATURE_FILTER): self.gene_annotations[gene_annotation.id] = ( gene_annotation ) - progress_bar.update(1) + if progress_bar is not None: + progress_bar.update(1) + curr_line_num += 1 - progress_bar.close() + + if progress_bar is not None: + progress_bar.close() def generate_ensembl_gene_annotation(self, attributes, curr_line_num): """ @@ -862,14 +856,22 @@ def serialize_to_jsonld( self, exclude_none: bool = True, exclude_unset: bool = False ): """ - Serialize the object and either write it to the specified output file or print it to the CLI. + Serialize the object and either write it + to the specified output file or print it to the CLI. Parameters: - exclude_none (bool): Whether to exclude None values in the output. - exclude_unset (bool): Whether to exclude unset values in the output. + exclude_none (bool): + Whether to exclude None values in the output. + exclude_unset (bool): + Whether to exclude unset values in the output. Returns: - None + Dict + + Notes + ----- + Overrode default implementation so that we can return the dict + containing the jsonld graph """ data = [ @@ -884,16 +886,115 @@ def serialize_to_jsonld( ), ] for ck in self.checksums: - data.append(ck.dict(exclude_none=exclude_none, exclude_unset=exclude_unset)) + data.append( + ck.dict( + exclude_none=exclude_none, + exclude_unset=exclude_unset + ) + ) for ga in self.gene_annotations.values(): - data.append(ga.dict(exclude_none=exclude_none, exclude_unset=exclude_unset)) + data.append( + ga.dict( + exclude_none=exclude_none, + exclude_unset=exclude_unset + ) + ) output_data = { - "@context": "https://raw.githubusercontent.com/brain-bican/models/main/jsonld-context-autogen/genome_annotation.context.jsonld", + "@context": ( + "https://raw.githubusercontent.com/brain-bican/" + "models/main/jsonld-context-autogen/" + "genome_annotation.context.jsonld" + ), "@graph": data, } + return output_data - print(json.dumps(output_data, indent=2)) + +def parse_url(content_url): + """ + Parses the content URL and extracts information about the genome annotation. + + Parameters: + content_url: a string. The URL of the GFF file being parsed for metadata + + Returns: + A dictionary containing the following information: + - 'authority': The authority type (NCBI or ENSEMBL). + - 'taxonid': The taxon ID of the genome. + - 'release_version': The release version of the genome annotation. + - 'assembly_accession': The assembly accession of the genome. + - 'assembly_name': The name of the assembly. + - 'species': The species name (only for ENSEMBL URLs). + """ + # Define regex patterns for NCBI and Ensembl URLs + # NCBI : [assembly accession.version]_[assembly name]_[content type].[optional format] + # ENSEMBL : ..<_version>.gff3.gz -> organism full name, assembly name, genome version + ncbi_pattern = r"/genomes/all/annotation_releases/(\d+)(?:/(\d+))?/(GCF_\d+\.\d+)[_-]([^/]+)/(GCF_\d+\.\d+)[_-]([^/]+)_genomic\.gff\.gz" + + # Parse the URL to get the path + parsed_url = urlparse(content_url) + path = parsed_url.path + + # Determine if the URL is from NCBI or Ensembl and extract information + if "ncbi" in parsed_url.netloc: + ncbi_match = re.search(ncbi_pattern, path) + if ncbi_match: + return { + "authority": ga.AuthorityType.NCBI, + "taxonid": ncbi_match.group(1), + "release_version": ( + ncbi_match.group(2) + if ncbi_match.group(2) + else ncbi_match.group(4) + ), + "assembly_accession": ncbi_match.group(3), + "assembly_name": ncbi_match.group(6), + } + + elif "ensembl" in parsed_url.netloc: + file_path = pathlib.Path(parsed_url.path).name + file_path = file_path.replace('.gff3.gz', '') + + scientific_name = file_path.split('.')[0] + file_path = file_path.replace(scientific_name+'.', '') + release_version = file_path.split('.')[-1] + file_path = file_path.replace('.' + release_version, '') + assembly_name = file_path + + scientific_name_to_taxonid = load_json(SCIENTIFIC_NAME_TO_TAXONID_PATH) + result = { + "authority": ga.AuthorityType.ENSEMBL, + "release_version": release_version, + "scientific_name": scientific_name, + "assembly_name": assembly_name, + } + taxon_id = scientific_name_to_taxonid.get( + result.get("scientific_name").replace("_", " ") + ) + result['taxonid'] = taxon_id + return result + + # If no match is found, return None + return None + + +def detect_model_presence(logger=None): + """ + Detect if NCBI taxonomy model is present. If not, throw a custom error. + + Paramteers + ---------- + logger: + custom logger where message can be written + """ + scientific_name_path = pathlib.Path(TAXON_SCIENTIFIC_NAME_PATH) + taxon_common_name_path = pathlib.Path(TAXON_COMMON_NAME_PATH) + if not scientific_name_path.is_file() or not taxon_common_name_path.is_file(): + msg = "NCBI Taxonomy not downloaded. Run 'bkbit download-ncbi-taxonomy' command first." + if logger is not None: + logger.critical(msg) + raise error_classes.MissingModelError(msg) @click.command() @@ -932,11 +1033,16 @@ def gff2jsonld(content_url, assembly_accession, assembly_strain, log_level, log_ ''' Creates GeneAnnotation objects from a GFF3 file and serializes them to JSON-LD format. ''' - gff3 = Gff3( - content_url, assembly_accession, assembly_strain, log_level, log_to_file + gff3 = Gff3.from_url( + content_url=content_url, + assembly_accession=assembly_accession, + assembly_strain=assembly_strain, + log_level=log_level, + log_to_file=log_to_file ) gff3.parse() - gff3.serialize_to_jsonld() + result = gff3.serialize_to_jsonld() + print(json.dumps(result, indent=2)) if __name__ == "__main__":