-
Notifications
You must be signed in to change notification settings - Fork 13
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Update logics of loading metabolomics data
Major changes: - update loading process in `DatasetLoader._load_metabolomics`. - add utility functions for metabolomics loading
- Loading branch information
1 parent
5e97659
commit 6d012f5
Showing
4 changed files
with
247 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,8 +1,18 @@ | ||
import logging | ||
from .molecular_family import MolecularFamily | ||
from .spectrum import Spectrum | ||
from .utils import add_annotation_to_spectrum | ||
from .utils import add_spectrum_to_mf | ||
from .utils import add_strains_to_spectrum | ||
|
||
|
||
logging.getLogger(__name__).addHandler(logging.NullHandler()) | ||
|
||
__all__ = ["MolecularFamily", "Spectrum"] | ||
|
||
__all__ = [ | ||
"MolecularFamily", | ||
"Spectrum", | ||
"add_annotation_to_spectrum", | ||
"add_spectrum_to_mf", | ||
"add_strains_to_spectrum", | ||
] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,115 @@ | ||
from nplinker.logconfig import LogConfig | ||
from nplinker.strain_collection import StrainCollection | ||
from .molecular_family import MolecularFamily | ||
from .spectrum import Spectrum | ||
|
||
|
||
logger = LogConfig.getLogger(__name__) | ||
|
||
|
||
def add_annotation_to_spectrum(annotations: dict[str, dict], spectra: list[Spectrum]) -> None: | ||
"""Add GNPS annotations to the `Spectrum.gnps_annotaions` attribute for input spectra. | ||
It is possible that some spectra don't have annotations. | ||
Note that the input `spectra` list is changed in place. | ||
Args: | ||
annotations(dict[str, dict]): A dictionary of GNPS annotations, where the keys are | ||
spectrum ids and the values are GNPS annotations. | ||
spectra(list[Spectrum]): A list of Spectrum objects. | ||
""" | ||
for spec in spectra: | ||
if spec.spectrum_id in annotations: | ||
spec.gnps_annotations = annotations[spec.spectrum_id] | ||
|
||
|
||
def add_strains_to_spectrum( | ||
strains: StrainCollection, spectra: list[Spectrum] | ||
) -> tuple[list[Spectrum], list[Spectrum]]: | ||
"""Add `Strain` objects to the `Spectrum.strains` attribute for input spectra. | ||
Note that the input `spectra` list is changed in place. | ||
Args: | ||
strains(StrainCollection): A collection of strain objects. | ||
spectra(list[Spectrum]): A list of Spectrum objects. | ||
Returns: | ||
tuple(list[Spectrum], list[Spectrum]): A tuple of two lists of Spectrum | ||
objects. The first list contains Spectrum objects that are updated | ||
with Strain objects; the second list contains Spectrum objects that | ||
are not updated with Strain objects becuase no Strain objects are found. | ||
""" | ||
spectra_with_strains = [] | ||
spectra_without_strains = [] | ||
for spec in spectra: | ||
try: | ||
strain_list = strains.lookup(spec.spectrum_id) | ||
except ValueError: | ||
spectra_without_strains.append(spec) | ||
continue | ||
|
||
for strain in strain_list: | ||
spec.strains.add(strain) | ||
spectra_with_strains.append(spec) | ||
|
||
logger.info( | ||
f"{len(spectra_with_strains)} Spectrum objects updated with Strain objects.\n" | ||
f"{len(spectra_without_strains)} Spectrum objects not updated with Strain objects." | ||
) | ||
|
||
return spectra_with_strains, spectra_without_strains | ||
|
||
|
||
def add_spectrum_to_mf( | ||
spectra: list[Spectrum], mfs: list[MolecularFamily] | ||
) -> tuple[list[MolecularFamily], list[MolecularFamily], dict[MolecularFamily, set[str]]]: | ||
"""Add Spectrum objects to MolecularFamily objects. | ||
The attribute of `spectra_ids` of MolecularFamily object contains the ids of Spectrum objects. | ||
These ids are used to find Spectrum objects from the input `spectra` list. The found Spectrum | ||
objects are added to the `spectra` attribute of MolecularFamily object. It is possible that | ||
some spectrum ids are not found in the input `spectra` list, and so their Spectrum objects are | ||
missing in the MolecularFamily object. | ||
Note that the input `mfs` list is changed in place. | ||
Args: | ||
spectra(list[Spectrum]): A list of Spectrum objects. | ||
mfs(list[MolecularFamily]): A list of MolecularFamily objects. | ||
Returns: | ||
tuple(list[MolecularFamily], list[MolecularFamily], dict[MolecularFamily, set[str]]): | ||
The first list contains MolecularFamily objects that are updated with Spectrum objects. | ||
The second list contains MolecularFamily objects that are not updated with Spectrum | ||
objects (all Spectrum objects are missing). | ||
The dictionary contains MolecularFamily objects as keys and a set of ids of missing | ||
Spectrum objects as values. | ||
""" | ||
spec_dict = {spec.spectrum_id: spec for spec in spectra} | ||
mf_with_spec = [] | ||
mf_without_spec = [] | ||
mf_missing_spec: dict[MolecularFamily, set[str]] = {} | ||
for mf in mfs: | ||
for spec_id in mf.spectra_ids: | ||
try: | ||
spec = spec_dict[spec_id] | ||
except KeyError: | ||
if mf not in mf_missing_spec: | ||
mf_missing_spec[mf] = {spec_id} | ||
else: | ||
mf_missing_spec[mf].add(spec_id) | ||
continue | ||
mf.add_spectrum(spec) | ||
|
||
if mf.spectra: | ||
mf_with_spec.append(mf) | ||
else: | ||
mf_without_spec.append(mf) | ||
|
||
logger.info( | ||
f"{len(mf_with_spec)} MolecularFamily objects updated with Spectrum objects.\n" | ||
f"{len(mf_without_spec)} MolecularFamily objects not updated with Spectrum objects.\n" | ||
f"{len(mf_missing_spec)} MolecularFamily objects have missing Spectrum objects." | ||
) | ||
return mf_with_spec, mf_without_spec, mf_missing_spec |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,83 @@ | ||
import pytest | ||
from nplinker.metabolomics import MolecularFamily | ||
from nplinker.metabolomics import Spectrum | ||
from nplinker.metabolomics import add_annotation_to_spectrum | ||
from nplinker.metabolomics import add_spectrum_to_mf | ||
from nplinker.metabolomics import add_strains_to_spectrum | ||
from nplinker.strain import Strain | ||
from nplinker.strain_collection import StrainCollection | ||
|
||
|
||
@pytest.fixture | ||
def spectra(): | ||
"""Fixture for a list of Spectrum objects.""" | ||
# The order of the spectra is important for the tests. | ||
return [ | ||
Spectrum("spec0", [100, 200], [0.1, 0.2], 150), | ||
Spectrum("spec1", [100, 200], [0.1, 0.2], 150), | ||
Spectrum("spec2", [100, 200], [0.1, 0.2], 150), | ||
] | ||
|
||
|
||
def test_add_annotation_to_spectrum(spectra): | ||
"""Test the add_annotation_to_spectrum function.""" | ||
annotations = { | ||
"spec0": {"annotation": "annotation_0"}, | ||
"spec1": {"annotation": "annotation_1"}, | ||
"spec3": {"annotation": "annotation_3"}, | ||
} | ||
|
||
add_annotation_to_spectrum(annotations, spectra) | ||
|
||
for i, spec in enumerate(spectra): | ||
if i < 2: | ||
assert spec.gnps_annotations == {"annotation": f"annotation_{i}"} | ||
else: | ||
assert spec.gnps_annotations == {} | ||
|
||
|
||
def test_add_strains_to_spectrum(spectra): | ||
"""Test the add_strains_to_spectrum function.""" | ||
strains = StrainCollection() | ||
strain0 = Strain("spec0") # spectrum id as strain id | ||
strain1 = Strain("strain1") | ||
strain1.add_alias("spec1") # spectrum id as strain alias | ||
strains.add(strain0) | ||
strains.add(strain1) | ||
|
||
spectra_with_strains, spectra_without_strains = add_strains_to_spectrum(strains, spectra) | ||
|
||
assert len(spectra_with_strains) == 2 | ||
assert len(spectra_without_strains) == 1 | ||
assert spectra_with_strains == [spectra[0], spectra[1]] | ||
assert spectra_without_strains == [spectra[2]] | ||
assert strain0 in spectra_with_strains[0].strains | ||
assert strain1 in spectra_with_strains[1].strains | ||
assert spectra_without_strains[0].strains == StrainCollection() | ||
|
||
|
||
def test_add_spectrum_to_mf(spectra): | ||
"""Test the add_spectrum_to_mf function.""" | ||
# Prepare the molecular families | ||
mf0 = MolecularFamily("mf0") | ||
mf0.spectra_ids = {"spec0", "spec1"} | ||
mf1 = MolecularFamily("mf1") | ||
mf1.spectra_ids = { | ||
"spec2", | ||
"spec-missing-1", | ||
} | ||
mf2 = MolecularFamily("mf2") | ||
mf2.spectra_ids = {"spec-missing-2", "spec-missing-3"} | ||
mfs = [mf0, mf1, mf2] | ||
|
||
mf_with_spec, mf_without_spec, mf_missing_spec = add_spectrum_to_mf(spectra, mfs) | ||
|
||
assert len(mf_with_spec) == 2 | ||
assert len(mf_without_spec) == 1 | ||
assert len(mf_missing_spec) == 2 | ||
assert mf_with_spec == [mf0, mf1] | ||
assert mf_without_spec == [mf2] | ||
assert mf_missing_spec == {mf1: {"spec-missing-1"}, mf2: {"spec-missing-2", "spec-missing-3"}} | ||
assert mf0.spectra == {spectra[0], spectra[1]} | ||
assert mf1.spectra == {spectra[2]} | ||
assert mf2.spectra == set() |