Skip to content

Commit

Permalink
remove deprecated functions of loading genomics data
Browse files Browse the repository at this point in the history
Remove deprecated function `load_gcfs` and `filter_mibig_only_gcf`.
  • Loading branch information
CunliangGeng authored Dec 14, 2023
1 parent 4a8755e commit 188d927
Show file tree
Hide file tree
Showing 4 changed files with 1 addition and 195 deletions.
4 changes: 0 additions & 4 deletions src/nplinker/genomics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@
from .abc import BGCLoaderBase
from .bgc import BGC
from .gcf import GCF
from .genomics import filter_mibig_only_gcf
from .genomics import generate_mappings_genome_id_bgc_id
from .genomics import get_bgcs_from_gcfs
from .genomics import get_strains_from_bgcs
from .genomics import load_gcfs
from .genomics import map_bgc_to_gcf
from .genomics import map_strain_to_bgc

Expand All @@ -17,11 +15,9 @@
"BGCLoaderBase",
"BGC",
"GCF",
"filter_mibig_only_gcf",
"generate_mappings_genome_id_bgc_id",
"get_bgcs_from_gcfs",
"get_strains_from_bgcs",
"load_gcfs",
"map_bgc_to_gcf",
"map_strain_to_bgc",
]
181 changes: 0 additions & 181 deletions src/nplinker/genomics/genomics.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
from __future__ import annotations
import csv
import json
from os import PathLike
from pathlib import Path
from deprecated import deprecated
from jsonschema import validate
from nplinker.globals import GENOME_BGC_MAPPINGS_FILENAME
from nplinker.logconfig import LogConfig
Expand Down Expand Up @@ -118,15 +116,6 @@ def map_bgc_to_gcf(bgcs: list[BGC], gcfs: list[GCF]):
gcf.add_bgc(bgc)


def filter_mibig_only_gcf(gcfs: list[GCF]) -> list[GCF]:
"""Filter out GCFs that contain only MIBiG BGC objects.
This method returns a new list of GCFs that have at least one non-MIBiG
BGC object as its child.
"""
return [gcf for gcf in gcfs if gcf.has_mibig_only() is False]


def get_bgcs_from_gcfs(gcfs: list[GCF]) -> list[BGC]:
"""Get all BGC objects from given GCF objects."""
s = set()
Expand All @@ -144,173 +133,3 @@ def get_strains_from_bgcs(bgcs: list[BGC]) -> StrainCollection:
else:
logger.warning("Strain is None for BGC %s", bgc.bgc_id)
return sc


@deprecated(
version="1.3.3",
reason="It is split to separate functions: "
"map_strain_to_bgc, map_bgc_to_gcf, filter_mibig_only_gcf, "
"get_bgcs_from_gcfs and get_strains_from_bgcs.",
)
def load_gcfs(
bigscape_dir: str | PathLike,
strains: StrainCollection,
mibig_bgc_dict: dict[str, BGC],
antismash_bgc_dict: dict[str, BGC],
antismash_file_dict: dict[str, str],
bigscape_cutoff: int,
):
bigscape_dir = Path(bigscape_dir)
product_class_cluster_file = (
bigscape_dir / "mix" / f"mix_clustering_c0.{bigscape_cutoff:02d}.tsv"
)
network_annotations_file = bigscape_dir / "Network_Annotations_Full.tsv"

new_bgc: BGC
num_mibig: int = 0
bgc_list: list[BGC] = []

gcf_dict: dict[str, GCF] = {}
gcf_list: list[GCF] = []

used_strains: StrainCollection = StrainCollection()
unknown_strains: dict[str, str] = {}

# CG: bigscape data
# parse the annotation files (<dataset>/bigscape/<cluster_name>/Network_Annotations_<cluster_name>.tsv
# these contain fields:
# - BGC name/ID [0]
# - "Accession ID" [1]
# - Description [2]
# - Product prediction [3]
# - Bigscape product type/class [4]
# - Organism [5]
# - Taxonomy [6]
metadata = {}
with open(network_annotations_file) as f:
reader = csv.reader(f, delimiter="\t")
next(reader) # skip headers
for line in reader:
metadata[line[0]] = line

# CG: bigscape data
# "cluster files" are the various <class>_clustering_c0.xx.tsv files
# - BGC name
# - cluster ID
with open(product_class_cluster_file, "rt") as f:
reader = csv.reader(f, delimiter="\t")
next(reader) # skip headers
for line in reader:
bgc_name = line[0]
family_id = line[1]

# TODO: is it necessary to keep bigscape_class for GCF class?
# get bgc annotations from bigscape file
metadata_line = metadata[bgc_name]
bigscape_class = metadata_line[4]

# check strain
try:
strain = strains.lookup(bgc_name)
except KeyError:
logger.warning(f"Unknown strain ID: {bgc_name}")
unknown_strains[bgc_name] = antismash_file_dict[bgc_name]
continue

# build new bgc
if strain.id.startswith("BGC"):
try:
new_bgc = mibig_bgc_dict[strain.id]
except KeyError:
raise KeyError(f"Unknown MiBIG: {strain.id}")
num_mibig += 1
else:
try:
new_bgc = antismash_bgc_dict[bgc_name]
except KeyError:
raise KeyError(f"Unknown AntiSMASH BGC: {bgc_name}")

new_bgc.strain = strain
bgc_list.append(new_bgc)

# build new gcf
if family_id not in gcf_dict:
new_gcf = GCF(family_id)
gcf_dict[family_id] = new_gcf
gcf_list.append(new_gcf)

# link bgc to gcf
gcf_dict[family_id].add_bgc(new_bgc)

# add strain to used strains
used_strains.add(strain)

logger.info(
"# MiBIG BGCs = {}, non-MiBIG BGCS = {}, total bgcs = {}, GCFs = {}, strains={}".format(
num_mibig, len(bgc_list) - num_mibig, len(bgc_list), len(gcf_dict), len(strains)
)
)

# filter out MiBIG-only GCFs)
gcf_list, bgc_list, used_strains = _filter_gcfs(gcf_list, bgc_list, used_strains)
logger.info(
"# after filtering, total bgcs = {}, GCFs = {}, strains={}, unknown_strains={}".format(
len(bgc_list), len(gcf_list), len(used_strains), len(unknown_strains)
)
)

return gcf_list, bgc_list, used_strains, unknown_strains


@deprecated(
version="1.3.3",
reason="It is split to separate functions: "
"filter_mibig_only_gcf, get_bgcs_from_gcfs and get_strains_from_bgcs.",
)
def _filter_gcfs(
gcfs: list[GCF], bgcs: list[BGC], strains: StrainCollection
) -> tuple[list[GCF], list[BGC], StrainCollection]:
"""Remove a GCF from given GCF list if it only has MIBiG BGC members,
correspondingly remove relevant BGC and strain from given list/collection.
GCF and BGC internal id is updated to keep ids consectutive in a list.
Args:
gcfs(list[GCF]): list of GCF objects
bgcs(list[BGC]): list of BGC objects
strains(StrainCollection): StrainCollection object
Returns:
tuple[list[GCF], list[BGC], StrainCollection]: updated list of GCF
objects, updated list of BGC objects and updated StrainCollection
object.
"""
gcfs_to_remove = set()
bgcs_to_remove = set()

for gcf in gcfs:
num_non_mibig_bgcs = len(list(filter(lambda bgc: not bgc.is_mibig(), gcf.bgcs)))
if num_non_mibig_bgcs == 0:
gcfs_to_remove.add(gcf)
for bgc in gcf.bgcs:
bgcs_to_remove.add(bgc)

for bgc in bgcs:
if len(bgc.parents) == 0:
bgcs_to_remove.add(bgc)

for gcf in gcfs_to_remove:
gcfs.remove(gcf)

for bgc in bgcs_to_remove:
bgcs.remove(bgc)
if bgc.strain is not None:
strains.remove(bgc.strain)

logger.info(
"Remove GCFs that has only MIBiG BGCs: removing {} GCFs and {} BGCs".format(
len(gcfs_to_remove), len(bgcs_to_remove)
)
)

return gcfs, bgcs, strains
3 changes: 1 addition & 2 deletions src/nplinker/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from nplinker.genomics import generate_mappings_genome_id_bgc_id
from nplinker.genomics.antismash import AntismashBGCLoader
from nplinker.genomics.bigscape import BigscapeGCFLoader
from nplinker.genomics.genomics import filter_mibig_only_gcf
from nplinker.genomics.genomics import get_bgcs_from_gcfs
from nplinker.genomics.genomics import get_strains_from_bgcs
from nplinker.genomics.genomics import map_bgc_to_gcf
Expand Down Expand Up @@ -453,7 +452,7 @@ def _load_genomics(self):
map_bgc_to_gcf(raw_bgcs, raw_gcfs)

# Step 5: get clean GCF objects, BGC objects and Strain objects
self.gcfs = filter_mibig_only_gcf(raw_gcfs)
self.gcfs = raw_gcfs
self.bgcs = get_bgcs_from_gcfs(self.gcfs)
self.strains = get_strains_from_bgcs(self.bgcs)

Expand Down
8 changes: 0 additions & 8 deletions tests/genomics/test_genomics.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import pytest
from nplinker.genomics import BGC
from nplinker.genomics import GCF
from nplinker.genomics import filter_mibig_only_gcf
from nplinker.genomics import generate_mappings_genome_id_bgc_id
from nplinker.genomics import get_bgcs_from_gcfs
from nplinker.genomics import get_strains_from_bgcs
Expand Down Expand Up @@ -149,13 +148,6 @@ def test_map_bgc_to_gcf_error(bgc_list, gcf_list_error):
assert "BGC id 'BGC_04' from GCF object '1' not found" in e.value.args[0]


def test_filter_mibig_only_gcf(bgc_list, gcf_list):
map_bgc_to_gcf(bgc_list, gcf_list)
gcfs = filter_mibig_only_gcf(gcf_list)
assert len(gcfs) == 1
assert gcfs[0].gcf_id == "2"


def test_get_bgcs_from_gcfs(bgc_list, gcf_list):
map_bgc_to_gcf(bgc_list, gcf_list)
bgcs = get_bgcs_from_gcfs(gcf_list)
Expand Down

0 comments on commit 188d927

Please sign in to comment.