Skip to content

Commit

Permalink
added pfam, tests ok
Browse files Browse the repository at this point in the history
  • Loading branch information
Robaina committed Oct 2, 2023
1 parent b472abb commit 48e3417
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 70 deletions.
31 changes: 30 additions & 1 deletion src/pynteny/filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import pynteny.parsers.labelparser as labelparser
import pynteny.parsers.syntenyparser as syntenyparser
from pynteny.hmm import HMMER, PGAP
from pynteny.hmm import HMMER, PFAM, PGAP
from pynteny.preprocessing import FASTA

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -418,6 +418,35 @@ def add_HMM_meta_info_to_hits(self, hmm_meta: Path) -> SyntenyHits:
return self._synteny_hits
pgap = PGAP(hmm_meta)
self._synteny_hits[fields] = ""
for row in self._synteny_hits.itertuples():
meta_values = [
[
str(v).replace("nan", "")
for k, v in pgap.get_meta_info_for_HMM(hmm).items()
if k != "accession"
]
for hmm in row.hmm.split("|")
]
self._synteny_hits.loc[row.Index, fields] = [
"|".join(v) for v in zip(*meta_values)
]
return SyntenyHits(self._synteny_hits)

def add_PFAM_HMM_meta_info_to_hits(self, hmm_meta: Path) -> SyntenyHits:
"""Add molecular metadata to synteny hits.
Args:
hmm_meta (Path): path to PFAM metadata file.
Returns:
SyntenyHits: and instance of class SyntenyHits.
"""
hmm_meta = Path(hmm_meta)
fields = ["gene_symbol", "label", "product", "ec_number"]
if all([f in self._synteny_hits.columns for f in fields]):
return self._synteny_hits
pgap = PFAM(hmm_meta)
self._synteny_hits[fields] = ""
for row in self._synteny_hits.itertuples():
meta_values = [
[
Expand Down
79 changes: 18 additions & 61 deletions src/pynteny/hmm.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,20 @@
import logging
import os
import sys
import tempfile
from collections import defaultdict
from pathlib import Path
import tempfile

import pandas as pd
from Bio import SearchIO

import pynteny.wrappers as wrappers
from pynteny.utils import (
extract_tar_file,
flatten_directory,
is_tar_file,
list_tar_dir,
download_file,
extract_gz_file,
extract_to_directory,
is_tar_file,
list_tar_dir,
split_hmms,
)

Expand Down Expand Up @@ -133,20 +132,16 @@ class HMMDatabase:

def __init__(
self,
database_directory: Path,
metadata_file: Path,
metadata_columns: list[str] = None,
) -> None:
"""Initialize class HMMDatabase
Args:
database_directory (Path): path to directory containing HMM database
with individual files for each HMM.
metadata_file (Path): path to metadata file.
metadata_columns (list[str], optional): list of metadata columns to be
used. Defaults to None (all columns).
"""
self._data_dir = Path(database_directory)
self._metadata_file = Path(metadata_file)
self._meta = pd.read_csv(
self._metadata_file, sep="\t", usecols=metadata_columns
Expand All @@ -170,15 +165,6 @@ def meta_file(self) -> Path:
"""
return self._metadata_file

@property
def data_dir(self) -> Path:
"""Return data directory
Returns:
Path: data directory
"""
return self._data_dir

def get_HMM_names_by_gene_symbol(self, gene_symbol: str) -> list[str]:
"""Try to retrieve HMM by its gene symbol, more
than one HMM may map to a single gene symbol
Expand Down Expand Up @@ -255,16 +241,18 @@ def __init__(self, *args, **kwargs):
self._meta = self._meta.rename(columns={"#ncbi_accession": "accession"})
# self._meta = self.remove_missing_HMMs_from_metadata(meta_outfile=None)

@staticmethod
def remove_missing_HMMs_from_metadata(self, meta_outfile: Path = None) -> None:
def remove_missing_HMMs_from_metadata(
self, hmm_database: Path, meta_outfile: Path = None
) -> None:
"""Remove HMMs from metadata that are not in HMM directory
Args:
hmm_database (Path): path to HMM database.
hmm_dir (Path): path to directory containing PGAP database.
meta_outfile (Path, optional): path to output file. Defaults to None.
"""
logger.info("Removing missing HMMs from PGAP metadata")
hmm_dir = self._database_dir
hmm_dir = hmm_database
if meta_outfile is None:
meta_outfile = (
self._metadata_file.parent
Expand Down Expand Up @@ -314,21 +302,24 @@ def from_gz_file(
extract_gz_file(hmm_gz_file, temp.name)
split_hmms(temp.name, hmm_outdir)
cls.construct_meta_file(hmm_outdir, meta_outfile)
return cls(hmm_outdir, meta_outfile)
return cls(meta_outfile)

def construct_meta_file(self, meta_outfile: Path = None) -> None:
def construct_meta_file(
self, hmm_database: Path, meta_outfile: Path = None
) -> None:
"""Construct metadata file from individual HMM files.
Args:
hmm_database (Path): path to HMM database.
meta_outfile (Path): path to metadata file.
"""
logger.info("Constructing metadata file for PFAM-A database")
if meta_outfile is None:
meta_outfile = self._database_dir / "PFAM_meta.tsv"
meta_outfile = hmm_database / "PFAM_meta.tsv"
else:
meta_outfile = Path(meta_outfile)
hmm_meta_lines = ["accession\tgene_symbol\tdescription\tlength\n"]
for hmm_file in self._database_dir.glob("*.hmm"):
for hmm_file in hmm_database.glob("*.hmm"):
with open(hmm_file, "r") as f:
hmm_text = f.read()

Expand Down Expand Up @@ -375,7 +366,7 @@ def download_pgap(download_dir: Path, unpack: bool = False) -> tuple[Path, Path]
download_file(meta_url, meta_file)
if unpack:
destination_path = download_dir / "pgap_hmms"
extract_pgap_to_directory(PGAP_file, destination_dir=destination_path)
extract_to_directory(PGAP_file, destination_dir=destination_path)
return destination_path, meta_file
else:
return PGAP_file, meta_file
Expand All @@ -397,41 +388,7 @@ def download_pfam(download_dir: Path, unpack: bool = False) -> Path:
download_file(url, PFAM_file)
if unpack:
destination_path = download_dir / "pfam_hmms"
extract_pfam_to_directory(PFAM_file, destination_dir=destination_path)
extract_to_directory(PFAM_file, destination_dir=destination_path)
return destination_path
else:
return PFAM_file


def extract_pgap_to_directory(pgap_tar: Path, destination_dir: Path) -> None:
"""Extract PGAP hmm database (tar.gz) to downlaod directory
Args:
pgap_tar (Path): path to compressed PGAP database.
"""
pgap_tar = Path(pgap_tar)
if not is_tar_file(pgap_tar):
logger.warning(f"{pgap_tar} is not a tar file. Skipping extraction")
return
logger.info("Extracting hmm files to target directory")
extract_tar_file(pgap_tar, destination_dir)
flatten_directory(destination_dir)
os.remove(pgap_tar)
logger.info("PGAP database unpacked successfully")


def extract_pfam_to_directory(pfam_gz: Path, destination_dir: Path) -> None:
"""Extract PFAM hmm database (gz) to downlaod directory
Args:
pfam_gz (Path): path to compressed PFAM database.
"""
pfam_gz = Path(pfam_gz)
if not pfam_gz.is_file():
logger.warning(f"{pfam_gz} is not a file. Skipping extraction")
return
logger.info("Extracting hmm files to target directory")
extract_gz_file(pfam_gz, destination_dir)
flatten_directory(destination_dir)
os.remove(pfam_gz)
logger.info("PGAP database unpacked successfully")
2 changes: 1 addition & 1 deletion src/pynteny/parsers/syntenyparser.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ def parse_genes_in_synteny_structure(
tuple[str,dict]: parsed synteny structure where gene symbols are replaced
by HMM names.
"""
pgap = PGAP(Path(hmm_meta))
pgap = PGAP(hmm_meta)
gene_symbols = get_gene_symbols_in_structure(synteny_structure)
strand_locs = get_strands_in_structure(synteny_structure, parsed_symbol=False)
gene_dists = get_maximum_distances_in_structure(synteny_structure)
Expand Down
19 changes: 13 additions & 6 deletions src/pynteny/subcommands.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,18 @@

import pynteny.parsers.syntenyparser as syntenyparser
from pynteny.filter import SyntenyHits, filter_FASTA_by_synteny_structure
from pynteny.hmm import PGAP, PFAM, download_pgap, download_pfam
from pynteny.hmm import (
PFAM,
PGAP,
download_pfam,
download_pgap,
)
from pynteny.preprocessing import Database
from pynteny.utils import (
CommandArgs,
ConfigParser,
extract_to_directory,
is_gz_file,
is_tar_file,
)

Expand Down Expand Up @@ -109,8 +116,8 @@ def synteny_search(args: Union[CommandArgs, ArgumentParser]) -> SyntenyHits:
)

temp_hmm_dir = Path(args.hmm_dir.parent) / "temp_hmm_dir"
if is_tar_file(args.hmm_dir):
PGAP.extract_PGAP_to_directory(args.hmm_dir, temp_hmm_dir)
if is_tar_file(args.hmm_dir) or is_gz_file(args.hmm_dir):
extract_to_directory(args.hmm_dir, temp_hmm_dir)
hmm_dir = temp_hmm_dir
else:
hmm_dir = args.hmm_dir
Expand Down Expand Up @@ -266,8 +273,8 @@ def download_hmms(args: Union[CommandArgs, ArgumentParser]) -> None:
logger.info("Downloading PGAP database")
try:
PGAP_path, PGAP_meta_file = download_pgap(download_dir, unpack=args.unpack)
PGAP(PGAP_path, PGAP_meta_file).remove_missing_HMMs_from_metadata(
PGAP_meta_file
PGAP(PGAP_meta_file).remove_missing_HMMs_from_metadata(
PGAP_path, PGAP_meta_file
)
config.update_config("unpack_PGAP_database", args.unpack)
logger.info("PGAP database downloaded successfully\n")
Expand All @@ -286,7 +293,7 @@ def download_hmms(args: Union[CommandArgs, ArgumentParser]) -> None:
PFAM_meta_file = download_dir / "hmm_PFAM.tsv"
PFAM_path = download_dir / "PFAM_hmms"
PFAM_gz_file = download_pfam(download_dir, unpack=True)
pfam = PFAM.from_gz_file(
PFAM.from_gz_file(
PFAM_gz_file,
hmm_outdir=PFAM_path,
meta_outfile=PFAM_meta_file,
Expand Down
34 changes: 34 additions & 0 deletions src/pynteny/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,19 @@ def is_tar_file(tar_file: Path) -> bool:
return tar_file.is_file() and tarfile.is_tarfile(tar_file.as_posix())


def is_gz_file(gz_file: Path) -> bool:
"""Check whether file is gz-compressed.
Args:
gz_file (Path): path to file.
Returns:
bool: whether file is compressed or not.
"""
gz_file = Path(gz_file)
return gz_file.is_file() and gz_file.as_posix().endswith("gz")


def extract_tar_file(tar_file: Path, dest_dir: Path = None) -> None:
"""Extract tar or tar.gz files to dest_dir.
Expand Down Expand Up @@ -275,6 +288,27 @@ def extract_gz_file(gz_file: Path, dest_dir: Path = None) -> None:
sys.exit(1)


def extract_to_directory(file: Path, destination_dir: Path) -> None:
"""Extract hmm database (tar.gz) to downlaod directory
Args:
hmm_file (Path): path to compressed database.
"""
file = Path(file)
if (not is_tar_file(file)) and (not is_gz_file(file)):
logger.warning(f"{file} is not a tar or gz file. Skipping extraction")
return
else:
logger.info("Extracting files to target directory")
if is_tar_file(file):
extract_tar_file(file, destination_dir)
elif is_gz_file(file):
extract_gz_file(file, destination_dir)
flatten_directory(destination_dir)
os.remove(file)
logger.info("Database unpacked successfully")


def list_tar_dir(tar_dir: Path) -> list:
"""List files within tar or tar.gz directory.
Expand Down
4 changes: 3 additions & 1 deletion tests/test_hmm.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
class TestPGAP(unittest.TestCase):
def test_remove_missing_HMMs_from_metadata(self):
with tempfile.NamedTemporaryFile() as tmp:
pgap = PGAP(this_file_dir / "test_data/hmm_meta.tsv")
pgap = PGAP(
this_file_dir / "test_data/hmm_meta.tsv",
)
pgap.remove_missing_HMMs_from_metadata(
this_file_dir / "test_data/hmms", tmp.name
)
Expand Down

0 comments on commit 48e3417

Please sign in to comment.