diff --git a/spectrum_io/search_result/__init__.py b/spectrum_io/search_result/__init__.py index 1da55b4..7b80ca5 100644 --- a/spectrum_io/search_result/__init__.py +++ b/spectrum_io/search_result/__init__.py @@ -2,6 +2,7 @@ from .mascot import Mascot from .maxquant import MaxQuant +from .msamanda import MSAmanda from .msfragger import MSFragger from .sage import Sage from .xisearch import Xisearch diff --git a/spectrum_io/search_result/msamanda.py b/spectrum_io/search_result/msamanda.py index 4be13e6..0d667f5 100644 --- a/spectrum_io/search_result/msamanda.py +++ b/spectrum_io/search_result/msamanda.py @@ -1,125 +1,102 @@ import logging -from pathlib import Path -from typing import Union +from typing import Dict, Optional import pandas as pd from spectrum_fundamentals.constants import PARTICLE_MASSES -from .search_results import filter_valid_prosit_sequences +from .search_results import SearchResults, filter_valid_prosit_sequences, parse_mods logger = logging.getLogger(__name__) -def _update_columns_for_prosit(df: pd.DataFrame): - """ - Update columns of df to work with Prosit. - - :param df: df to modify - :return: modified df as pd.DataFrame - """ - - def _check_unsupported_mods(modstring: str): - if modstring: - for mod in modstring.split(";"): - if not mod.split("(", 1)[1].startswith(("Oxidation", "Carbamidomethyl")): - raise AssertionError(f"Unsupported modification: {mod}") - - def _make_mod_seq(seq: str): - seq = seq.replace("m", "M[UNIMOD:35]") - seq = seq.replace("c", "C[UNIMOD:4]") - return seq - - df["REVERSE"] = df["Protein Accessions"].str.startswith("REV_") - df["MASS"] = df["m/z"] * df["PRECURSOR_CHARGE"] - (df["PRECURSOR_CHARGE"] * PARTICLE_MASSES["PROTON"]) - df["Modifications"].fillna("").apply(_check_unsupported_mods) - df["MODIFIED_SEQUENCE"] = df["SEQUENCE"].apply(_make_mod_seq) - df["SEQUENCE"] = df["SEQUENCE"].str.upper() - df["PEPTIDE_LENGTH"] = df["SEQUENCE"].str.len() - df["RAW_FILE"] = df["RAW_FILE"].str.replace(r"\.\w+$", "", regex=True) - - return df[ - [ - "SCORE", - "MASS", - "PRECURSOR_CHARGE", - "RAW_FILE", - "SCAN_NUMBER", - "REVERSE", - "MODIFIED_SEQUENCE", - "SEQUENCE", - "PEPTIDE_LENGTH", - ] - ] - - -def _remove_decoys_in_targets(full_df): - duplicated_psms = full_df[["SCAN_NUMBER", "RAW_FILE", "MODIFIED_SEQUENCE"]].duplicated(keep=False) - logger.info(f"Removing {sum(duplicated_psms)} duplicated PSMs...") - full_df = full_df[~(full_df["REVERSE"] & duplicated_psms)] - if any(full_df[["SCAN_NUMBER", "RAW_FILE", "MODIFIED_SEQUENCE"]].duplicated(keep=False)): - raise AssertionError("There are duplicate target PSMs. This is a bug of msamanda!") - logger.info(f"{len(full_df)} remaining PSMs") - - return full_df - - -def read_result(path: Union[str, Path], suffix: str = "output.csv") -> pd.DataFrame: - """ - Function to read a msms txt and perform some basic formatting. - - :param path: path to msms.txt to read - :param suffix: Optional suffix to determine which fileresult files should be taken from the supplied path - :raises FileNotFoundError: If the supplied path is not found - :raises AssertionError: If the supplied path does not contain any files matching the provided suffix. - :return: pd.DataFrame with the formatted data - """ - if isinstance(path, str): - path = Path(path) - if path.is_file(): - pathlist = [path] - elif path.is_dir(): - pathlist = list(path.glob(f"*{suffix}")) - if not pathlist: - raise AssertionError(f"The directory does not contain any files that match the pattern *{suffix}") - else: - raise FileNotFoundError(f"{path} does not exist.") - - df_list = [] - for output_file in pathlist: - logger.info(f"Reading {output_file}...") - df = pd.read_csv( - output_file, - sep="\t", - skiprows=1, - usecols=[ - "Scan Number", - "Sequence", - "Modifications", - "Filename", - "Amanda Score", - "m/z", - "Charge", - "Protein Accessions", - ], +class MSAmanda(SearchResults): + """Handle search results from MSAmanda.""" + + @property + def standard_mods(self): + """Standard modifications that are always applied if not otherwise specified.""" + return {"m": 35, "c": 4} + + def read_result( + self, tmt_label: str = "", custom_mods: Optional[Dict[str, int]] = None, suffix: str = "output.csv" + ) -> pd.DataFrame: + """ + Function to read a msms txt and perform some basic formatting. + + :param tmt_label: optional tmt label as str + :param custom_mods: optional dictionary mapping Sage-specific mod pattern to UNIMOD IDs. + If None, static carbamidomethylation of cytein and variable oxidation of methionine + are mapped automatically. To avoid this, explicitely provide an empty dictionary. + :param suffix: Optional suffix to determine which fileresult files should be taken from the supplied path + :raises FileNotFoundError: If the supplied path is not found + :raises AssertionError: If the supplied path does not contain any files matching the provided suffix. + :raises NotImplementedError: If tmt label was supplied. + :return: pd.DataFrame with the formatted data + """ + parsed_mods = parse_mods(self.standard_mods | (custom_mods or {})) + print(parsed_mods) + if tmt_label: + raise NotImplementedError("TMT is not supported for MSAmanda. Please open an issue on github.") + + if self.path.is_file(): + pathlist = [self.path] + print("hooray") + elif self.path.is_dir(): + pathlist = list(self.path.glob(f"*{suffix}")) + if not pathlist: + raise AssertionError(f"The directory does not contain any files that match the pattern *{suffix}") + else: + raise FileNotFoundError(f"{self.path} does not exist.") + + df_list = [] + for output_file in pathlist: + logger.info(f"Reading {output_file}...") + df_list.append( + pd.read_csv( + output_file, + sep="\t", + skiprows=1, + usecols=[ + "Scan Number", + "Sequence", + # "Modifications", + "Protein Accessions", + "Amanda Score", + "m/z", + "Charge", + "Filename", + ], + ) + ) + logger.info(f"Finished reading {output_file}") + + self.results = pd.concat(df_list) + + self.convert_to_internal(mods=parsed_mods) + return filter_valid_prosit_sequences(self.results) + + def convert_to_internal(self, mods: Dict[str, str]): + """ + Convert all columns in the Sage output to the internal format used by Oktoberfest. + + :param mods: dictionary mapping Sage-specific mod patterns (keys) to ProForma standard (values) + """ + df = self.results + df["REVERSE"] = df["Protein Accessions"].str.startswith("REV_") + df["m/z"] = df["Charge"] * (df["m/z"] - PARTICLE_MASSES["PROTON"]) + df["SEQUENCE"] = df["Sequence"].str.upper() + df.replace({"RAW_FILE": {r"\.\w+$": ""}, "Sequence": mods}, regex=True, inplace=True) + df["PEPTIDE_LENGTH"] = df["SEQUENCE"].str.len() + + df.rename( + columns={ + "Filename": "RAW_FILE", + "Scan Number": "SCAN_NUMBER", + "Sequence": "MODIFIED_SEQUENCE", + "Charge": "PRECURSOR_CHARGE", + "m/z": "MASS", + "Amanda Score": "SCORE", + "Protein Accessions": "PROTEINS", + }, + inplace=True, ) - df.columns = [ - "SCAN_NUMBER", - "SEQUENCE", - "Modifications", - "Protein Accessions", - "SCORE", - "m/z", - "PRECURSOR_CHARGE", - "RAW_FILE", - ] - logger.info(f"Finished reading {output_file}") - - df = _update_columns_for_prosit(df) - df = filter_valid_prosit_sequences(df) - df_list.append(df) - - if len(df_list) == 1: - full_df = df_list[0] - full_df = pd.concat(df_list, axis=0) - full_df = _remove_decoys_in_targets(full_df) - return full_df diff --git a/spectrum_io/search_result/search_results.py b/spectrum_io/search_result/search_results.py index 8d924bd..1a62f92 100644 --- a/spectrum_io/search_result/search_results.py +++ b/spectrum_io/search_result/search_results.py @@ -78,7 +78,7 @@ def parse_mods(mods: Dict[str, int]) -> Dict[str, str]: f"Replacement {k} not understood. Replacements must be strings and follow " f"the pattern {key_pattern}" ) if k[0].isalpha(): - unimod_regex_map[re.escape(k)] = f"{k[0]}[UNIMOD:{v}]" + unimod_regex_map[re.escape(k)] = f"{k[0].upper()}[UNIMOD:{v}]" continue if k[0] == "^": unimod_regex_map[f"^{re.escape(k[1:])}"] = f"[UNIMOD:{v}]-" diff --git a/tests/unit_tests/test_sage.py b/tests/unit_tests/test_sage.py index 679f9da..4ec4a49 100644 --- a/tests/unit_tests/test_sage.py +++ b/tests/unit_tests/test_sage.py @@ -25,11 +25,9 @@ class TestSage(unittest.TestCase): def test_read_sage(self): """Test function for reading sage results and transforming to Prosit format.""" expected_sage_internal_path = Path(__file__).parent / "data" / "sage_output_internal.csv" - internal_search_results_df = Sage(Path(__file__).parent / "data" / "sage_output.tsv").read_result( - tmt_label="tmt" - ) + search_results_df = Sage(Path(__file__).parent / "data" / "sage_output.tsv").read_result(tmt_label="tmt") expected_df = pd.read_csv(expected_sage_internal_path) - pd.testing.assert_frame_equal(internal_search_results_df[COLUMNS], expected_df[COLUMNS]) + pd.testing.assert_frame_equal(search_results_df[COLUMNS], expected_df[COLUMNS]) def test_read_sage_with_custom_mods(self): """Test function for reading sage results with custom mods and transforming to Prosit format ."""