#!/usr/bin/env python3
# 2025 (c) Micha Johannes Birklbauer
# https://github.com/michabirklbauer/
# micha.birklbauer@gmail.com
from __future__ import annotations
import warnings
import pandas as pd
from tqdm import tqdm
from ..data._csm import CrosslinkSpectrumMatch
from ..data._crosslink import Crosslink
from ..data._parser_result import ParserResult
from ..data._util import check_input
from ..data._crosslink import create_crosslink
from ..data._csm import create_csm
from ..data._parser_result import create_parser_result
from ..constants import XI_MODIFICATION_MAPPING
from ._util import format_sequence
from ._util import get_bool_from_value
from ._util import __serialize_pandas_series
from typing import Optional
from typing import BinaryIO
from typing import Dict
from typing import Any
from typing import Tuple
from typing import List
# legacy
try:
from typing import Literal
except ImportError:
from typing_extensions import Literal
[docs]
def detect_xi_filetype(
data: pd.DataFrame,
) -> Literal["xisearch", "xifdr_csms", "xifdr_crosslinks"]:
r"""Detects the xi-related source (application) of the data.
Detects whether the input data is originating from xiSearch or xiFDR, and if xiFDR which type of data is
being read (crosslink-spectrum-matches or crosslinks).
Parameters
----------
data : pd.DataFrame
The input data originating from xiSearch or xiFDR.
Returns
-------
str
"xisearch" if a xiSearch result file was read, "xifdr_csms" if CSMs from xiFDR were read,
"xifdr_crosslinks" if crosslinks from xiFDR were read.
Raises
------
ValueError
If the data source could not be determined.
Examples
--------
>>> from pyXLMS.parser import detect_xi_filetype
>>> import pandas as pd
>>> df1 = pd.read_csv("data/xi/r1_Xi1.7.6.7.csv")
>>> detect_xi_filetype(df1)
'xisearch'
>>> from pyXLMS.parser import detect_xi_filetype
>>> import pandas as pd
>>> df2 = pd.read_csv("data/xi/1perc_xl_boost_CSM_xiFDR2.2.1.csv")
>>> detect_xi_filetype(df2)
'xifdr_csms'
>>> from pyXLMS.parser import detect_xi_filetype
>>> import pandas as pd
>>> df3 = pd.read_csv("data/xi/1perc_xl_boost_Links_xiFDR2.2.1.csv")
>>> detect_xi_filetype(df3)
'xifdr_crosslinks'
"""
## check input
_ok = check_input(data, "data", pd.DataFrame)
col_names = data.columns.values.tolist()
if "AllScore" in col_names:
return "xisearch"
if "LinkPos1" in col_names:
return "xifdr_csms"
if "ToSite" in col_names:
return "xifdr_crosslinks"
raise ValueError(
"Could not infer data source, are you sure you read a xi result file?"
)
return "err"
[docs]
def parse_peptide(sequence: str, term_char: str = ".") -> str:
r"""Parses the peptide sequence from a sequence string including flanking amino acids.
Parses the peptide sequence from a sequence string including flanking amino acids, for example ``"K.KKMoxKLS.S"``.
The returned peptide sequence for this example would be ``"KKMoxKLS"``.
Parameters
----------
sequence : str
The sequence string containing the peptide sequence and flanking amino acids.
term_char : str (single character), default = "."
The character used to denote N-terminal and C-terminal.
Returns
-------
str
The parsed peptide sequence without flanking amino acids.
Raises
------
RuntimeError
If (one of) the peptide sequence(s) could not be parsed.
Examples
--------
>>> from pyXLMS.parser import parse_peptide
>>> parse_peptide("K.KKMoxKLS.S")
'KKMoxKLS'
>>> from pyXLMS.parser import parse_peptide
>>> parse_peptide("-.CcmCcmPSR.T")
'CcmCcmPSR'
>>> from pyXLMS.parser import parse_peptide
>>> parse_peptide("CCPSR")
'CCPSR'
"""
## check input
_ok = check_input(sequence, "sequence", str)
# PEPTIDE
if term_char not in sequence and len(sequence.strip()) > 1:
return sequence.strip()
if term_char in sequence:
parts = [part.strip() for part in sequence.split(term_char)]
# K.PEPTPIDE.P.EP <- wrong format
if len(parts) > 3:
raise RuntimeError(f"Could not parse peptide from sequence {sequence}!")
# K.PEPTIDE.R
if len(parts) == 3 and len(parts[1]) > 1:
return parts[1]
if len(parts) == 2:
# PEPTIDE.R
if len(parts[0]) > 1 and len(parts[1]) == 1:
return parts[0]
# K.PEPTIDE
if len(parts[1]) > 1 and len(parts[0]) == 1:
return parts[1]
# if none of these cases match, raise error
raise RuntimeError(f"Could not parse peptide from sequence {sequence}!")
return "err"
[docs]
def parse_modifications_from_xi_sequence(sequence: str) -> Dict[int, str]:
r"""Parses all post-translational-modifications from a peptide sequence as reported by xiFDR.
Parses all post-translational-modifications from a peptide sequence as reported by xiFDR. This assumes
that amino acids are given in upper case letters and post-translational-modifications in lower case letters.
The parsed modifications are returned as a dictionary that maps their position in the sequence (1-based) to
their xiFDR annotation (``SYMBOLEXT``), for example ``"cm"`` or ``"ox"``.
Parameters
----------
sequence : str
The peptide sequence as given by xiFDR.
Returns
-------
dict of int, str
Dictionary that maps modifications (values) to their respective positions in the peptide sequence (1-based)
(keys). The modifications are given in xiFDR annotation style (``SYMBOLEXT``) which is the lower letter
modification code, for example ``"cm"`` for carbamidomethylation.
Raises
------
RuntimeError
If multiple modifications on the same residue are parsed.
Examples
--------
>>> from pyXLMS.parser import parse_modifications_from_xi_sequence
>>> seq1 = "KIECcmFDSVEISGVEDR"
>>> parse_modifications_from_xi_sequence(seq1)
{4: 'cm'}
>>> from pyXLMS.parser import parse_modifications_from_xi_sequence
>>> seq2 = "KIECcmFDSVEMoxISGVEDR"
>>> parse_modifications_from_xi_sequence(seq2)
{4: 'cm', 10: 'ox'}
>>> from pyXLMS.parser import parse_modifications_from_xi_sequence
>>> seq3 = "KIECcmFDSVEISGVEDRMox"
>>> parse_modifications_from_xi_sequence(seq3)
{4: 'cm', 17: 'ox'}
>>> from pyXLMS.parser import parse_modifications_from_xi_sequence
>>> seq4 = "CcmKIECcmFDSVEISGVEDRMox"
>>> parse_modifications_from_xi_sequence(seq4)
{1: 'cm', 5: 'cm', 18: 'ox'}
"""
## check input
_ok = check_input(sequence, "sequence", str)
modifications = dict()
pos = 0
current_mod = ""
for i, aa in enumerate(str(sequence).strip()):
if aa.isupper():
pos += 1
current_mod = ""
else:
current_mod += aa
if (i + 1 >= len(sequence)) or (sequence[i + 1].isupper()):
if pos in modifications:
raise RuntimeError(
f"Modification at position {pos} already exists!"
)
modifications[pos] = current_mod
return modifications
def __parse_int(value: Any) -> int:
r"""Parses an integer from the given value.
Parses an integer from the given value. If it is a string it will try to replace any comma
(thousands seperator) with an empty string.
Parameters
----------
value : any
The value to be converted to int.
Returns
-------
int
The converted integer value.
"""
if isinstance(value, str):
return int(value.replace(",", ""))
return int(value)
def __parse_float(value: Any) -> float:
r"""Parses a float from the given value.
Parses a float from the given value. If it is a string it will try to replace any comma
(thousands seperator) with an empty string.
Parameters
----------
value : any
The value to be converted to float.
Returns
-------
float
The converted float value.
"""
if isinstance(value, str):
return float(value.replace(",", ""))
return float(value)
def __parse_xisearch_modifications(
row: pd.Series,
alpha: bool,
modifications: Dict[str, Tuple[str, float]] = XI_MODIFICATION_MAPPING,
ignore_errors: bool = False,
verbose: Literal[0, 1, 2] = 1,
) -> Dict[int, Tuple[str, float]]:
r"""Returns the corresponding modifications object for a crosslink-spectrum-match from xiSearch.
Parameters
----------
row : pandas.Series
One row/crosslink-spectrum-match of the xiSearch result file.
alpha : bool
Whether to parse modifications from the alpha peptide or - if ``False`` - from the beta peptide.
modifications : dict of str, tuple, default = ``constants.XI_MODIFICATION_MAPPING``
Mapping of xi sequence elements (e.g. ``"cm"``) to their modifications (e.g. ``("Carbamidomethyl", 57.021464)``).
ignore_errors : bool, default = False
If modifications that are not given in parameter 'modifications' should raise an error or not. By default an error is
raised if an unknown modification is encountered. If ``True`` modifications that are unknown are encoded with the xi
shortcode (``SYMBOLEXT``) and ``float("nan")`` modification mass.
verbose : 0, 1, or 2, default = 1
- 0: All warnings are ignored.
- 1: Warnings are printed to stdout.
- 2: Warnings are treated as errors.
Returns
-------
dict of int, tuple
The ``pyXLMS`` specific modifications object, a dictionary that maps positions to their corresponding modifications and their
monoisotopic masses.
Raises
------
RuntimeError
If the parsed modifications and positions are not of the same length.
RuntimeError
If multiple modifications on the same residue are parsed (only for ``verbose = 2``).
KeyError
If an unknown modification is encountered.
Notes
-----
This function should not be called directly, it is called from ``__read_xisearch()``.
"""
# EXAMPLE VALUES
# Modifications2 Mox;Mox
# ModificationPositions2 5;7
# helper function that changes ``SYMBOL`` to ``SYMBOLEXT``
def preprocess_mod(mod: str) -> str:
return "".join([c for c in mod if not c.isupper()]).strip()
crosslinker = str(row["Crosslinker"]).strip()
crosslinker_mass = __parse_float(row["CrosslinkerMass"])
parsed_modifications = dict()
# parse from Modifications
if alpha:
parsed_modifications[__parse_int(row["Link1"])] = (
crosslinker,
crosslinker_mass,
)
if not pd.isna(row["Modifications1"]): # pyright: ignore [reportGeneralTypeIssues]
if ";" in str(row["Modifications1"]):
mods = [
preprocess_mod(mod) for mod in str(row["Modifications1"]).split(";")
]
positions = [
__parse_int(pos)
for pos in str(row["ModificationPositions1"]).split(";")
]
if len(mods) != len(positions):
err_str = "Parsed modifications and their positions are not of the same length!\n"
err_str += f"Parsed modifications: {row['Modifications1']}; Parsed positions: {row['ModificationPositions1']}\n"
err_str += f"CSM ScanId: {row['ScanId']}; CSM Scan: {row['Scan']}"
raise RuntimeError(err_str)
for i in range(len(mods)):
if positions[i] in parsed_modifications:
err_str = (
f"Modification at position {positions[i]} already exists!\n"
)
err_str += (
f"CSM ScanId: {row['ScanId']}; CSM Scan: {row['Scan']}"
)
if verbose == 1:
warnings.warn(RuntimeWarning(err_str))
elif verbose == 2:
raise RuntimeError(err_str)
try:
t1 = parsed_modifications[positions[i]][0] + (
"," + modifications[mods[i]][0]
)
t2 = (
parsed_modifications[positions[i]][1]
+ modifications[mods[i]][1]
)
parsed_modifications[positions[i]] = (t1, t2)
except KeyError:
if ignore_errors:
t1 = (
parsed_modifications[positions[i]][0]
+ ","
+ mods[i]
)
t2 = parsed_modifications[positions[i]][1] + float(
"nan"
)
parsed_modifications[positions[i]] = (t1, t2)
else:
err_str = f"Key {mods[i]} not found in parameter 'modifications'. Are you missing a modification?\n"
err_str += f"CSM ScanId: {row['ScanId']}; CSM Scan: {row['Scan']}"
raise KeyError(err_str)
else:
try:
parsed_modifications[positions[i]] = (
modifications[mods[i]][0],
modifications[mods[i]][1],
)
except KeyError:
if ignore_errors:
parsed_modifications[positions[i]] = (
mods[i],
float("nan"),
)
else:
err_str = f"Key {mods[i]} not found in parameter 'modifications'. Are you missing a modification?\n"
err_str += f"CSM ScanId: {row['ScanId']}; CSM Scan: {row['Scan']}"
raise KeyError(err_str)
else:
mod = preprocess_mod(str(row["Modifications1"]))
pos = __parse_int(row["ModificationPositions1"])
if pos in parsed_modifications:
err_str = f"Modification at position {pos} already exists!\n"
err_str += f"CSM ScanId: {row['ScanId']}; CSM Scan: {row['Scan']}"
if verbose == 1:
warnings.warn(RuntimeWarning(err_str))
elif verbose == 2:
raise RuntimeError(err_str)
try:
t1 = parsed_modifications[pos][0] + "," + modifications[mod][0]
t2 = parsed_modifications[pos][1] + modifications[mod][1]
parsed_modifications[pos] = (t1, t2)
except KeyError:
if ignore_errors:
t1 = parsed_modifications[pos][0] + "," + mod
t2 = parsed_modifications[pos][1] + float("nan")
parsed_modifications[pos] = (t1, t2)
else:
err_str = f"Key {mod} not found in parameter 'modifications'. Are you missing a modification?\n"
err_str += (
f"CSM ScanId: {row['ScanId']}; CSM Scan: {row['Scan']}"
)
raise KeyError(err_str)
else:
try:
parsed_modifications[pos] = (
modifications[mod][0],
modifications[mod][1],
)
except KeyError:
if ignore_errors:
parsed_modifications[pos] = (
mod,
float("nan"),
)
else:
err_str = f"Key {mod} not found in parameter 'modifications'. Are you missing a modification?\n"
err_str += (
f"CSM ScanId: {row['ScanId']}; CSM Scan: {row['Scan']}"
)
raise KeyError(err_str)
else:
parsed_modifications[__parse_int(row["Link2"])] = (
crosslinker,
crosslinker_mass,
)
if not pd.isna(row["Modifications2"]): # pyright: ignore [reportGeneralTypeIssues]
if ";" in str(row["Modifications2"]):
mods = [
preprocess_mod(mod) for mod in str(row["Modifications2"]).split(";")
]
positions = [
__parse_int(pos)
for pos in str(row["ModificationPositions2"]).split(";")
]
if len(mods) != len(positions):
err_str = "Parsed modifications and their positions are not of the same length!\n"
err_str += f"Parsed modifications: {row['Modifications2']}; Parsed positions: {row['ModificationPositions2']}\n"
err_str += f"CSM ScanId: {row['ScanId']}; CSM Scan: {row['Scan']}"
raise RuntimeError(err_str)
for i in range(len(mods)):
if positions[i] in parsed_modifications:
err_str = (
f"Modification at position {positions[i]} already exists!\n"
)
err_str += (
f"CSM ScanId: {row['ScanId']}; CSM Scan: {row['Scan']}"
)
if verbose == 1:
warnings.warn(RuntimeWarning(err_str))
elif verbose == 2:
raise RuntimeError(err_str)
try:
t1 = parsed_modifications[positions[i]][0] + (
"," + modifications[mods[i]][0]
)
t2 = (
parsed_modifications[positions[i]][1]
+ modifications[mods[i]][1]
)
parsed_modifications[positions[i]] = (t1, t2)
except KeyError:
if ignore_errors:
t1 = parsed_modifications[positions[i]][0] + (
"," + mods[i]
)
t2 = parsed_modifications[positions[i]][1] + float(
"nan"
)
parsed_modifications[positions[i]] = (t1, t2)
else:
err_str = f"Key {mods[i]} not found in parameter 'modifications'. Are you missing a modification?\n"
err_str += f"CSM ScanId: {row['ScanId']}; CSM Scan: {row['Scan']}"
raise KeyError(err_str)
else:
try:
parsed_modifications[positions[i]] = (
modifications[mods[i]][0],
modifications[mods[i]][1],
)
except KeyError:
if ignore_errors:
parsed_modifications[positions[i]] = (
mods[i],
float("nan"),
)
else:
err_str = f"Key {mods[i]} not found in parameter 'modifications'. Are you missing a modification?\n"
err_str += f"CSM ScanId: {row['ScanId']}; CSM Scan: {row['Scan']}"
raise KeyError(err_str)
else:
mod = preprocess_mod(str(row["Modifications2"]))
pos = __parse_int(row["ModificationPositions2"])
if pos in parsed_modifications:
err_str = f"Modification at position {pos} already exists!\n"
err_str += f"CSM ScanId: {row['ScanId']}; CSM Scan: {row['Scan']}"
if verbose == 1:
warnings.warn(RuntimeWarning(err_str))
elif verbose == 2:
raise RuntimeError(err_str)
try:
t1 = parsed_modifications[pos][0] + "," + modifications[mod][0]
t2 = parsed_modifications[pos][1] + modifications[mod][1]
parsed_modifications[pos] = (t1, t2)
except KeyError:
if ignore_errors:
t1 = parsed_modifications[pos][0] + "," + mod
t2 = parsed_modifications[pos][1] + float("nan")
parsed_modifications[pos] = (t1, t2)
else:
err_str = f"Key {mod} not found in parameter 'modifications'. Are you missing a modification?\n"
err_str += (
f"CSM ScanId: {row['ScanId']}; CSM Scan: {row['Scan']}"
)
raise KeyError(err_str)
else:
try:
parsed_modifications[pos] = (
modifications[mod][0],
modifications[mod][1],
)
except KeyError:
if ignore_errors:
parsed_modifications[pos] = (
mod,
float("nan"),
)
else:
err_str = f"Key {mod} not found in parameter 'modifications'. Are you missing a modification?\n"
err_str += (
f"CSM ScanId: {row['ScanId']}; CSM Scan: {row['Scan']}"
)
raise KeyError(err_str)
# parse from sequence (because fixed modifcations are not reported in Modifications)
if alpha:
modified_sequence = parse_peptide(str(row["Peptide1"]).strip())
mods_from_sequence = parse_modifications_from_xi_sequence(modified_sequence)
for pos, mod in mods_from_sequence.items():
if pos in parsed_modifications:
err_str = f"Modification at position {pos} already exists!\n"
err_str += f"CSM ScanId: {row['ScanId']}; CSM Scan: {row['Scan']}"
if verbose == 1:
warnings.warn(RuntimeWarning(err_str))
elif verbose == 2:
raise RuntimeError(err_str)
mod_mapped = None
try:
mod_mapped = modifications[mod]
except KeyError:
if ignore_errors:
mod_mapped = (mod, float("nan"))
else:
err_str = f"Key {mod} not found in parameter 'modifications'. Are you missing a modification?\n"
err_str += (
f"CSM ScanId: {row['ScanId']}; CSM Scan: {row['Scan']}"
)
raise KeyError(err_str)
if mod_mapped is not None and isinstance(mod_mapped, tuple):
if mod_mapped[0] not in parsed_modifications[pos][0]:
parsed_modifications[pos] = (
parsed_modifications[pos][0] + "," + mod_mapped[0],
parsed_modifications[pos][1] + mod_mapped[1],
)
else:
mod_mapped = None
try:
mod_mapped = modifications[mod]
except KeyError:
if ignore_errors:
mod_mapped = (mod, float("nan"))
else:
err_str = f"Key {mod} not found in parameter 'modifications'. Are you missing a modification?\n"
err_str += (
f"CSM ScanId: {row['ScanId']}; CSM Scan: {row['Scan']}"
)
raise KeyError(err_str)
if mod_mapped is not None and isinstance(mod_mapped, tuple):
parsed_modifications[pos] = mod_mapped
else:
modified_sequence = parse_peptide(str(row["Peptide2"]).strip())
mods_from_sequence = parse_modifications_from_xi_sequence(modified_sequence)
for pos, mod in mods_from_sequence.items():
if pos in parsed_modifications:
err_str = f"Modification at position {pos} already exists!\n"
err_str += f"CSM ScanId: {row['ScanId']}; CSM Scan: {row['Scan']}"
if verbose == 1:
warnings.warn(RuntimeWarning(err_str))
elif verbose == 2:
raise RuntimeError(err_str)
mod_mapped = None
try:
mod_mapped = modifications[mod]
except KeyError:
if ignore_errors:
mod_mapped = (mod, float("nan"))
else:
err_str = f"Key {mod} not found in parameter 'modifications'. Are you missing a modification?\n"
err_str += (
f"CSM ScanId: {row['ScanId']}; CSM Scan: {row['Scan']}"
)
raise KeyError(err_str)
if mod_mapped is not None and isinstance(mod_mapped, tuple):
if mod_mapped[0] not in parsed_modifications[pos][0]:
parsed_modifications[pos] = (
parsed_modifications[pos][0] + "," + mod_mapped[0],
parsed_modifications[pos][1] + mod_mapped[1],
)
else:
mod_mapped = None
try:
mod_mapped = modifications[mod]
except KeyError:
if ignore_errors:
mod_mapped = (mod, float("nan"))
else:
err_str = f"Key {mod} not found in parameter 'modifications'. Are you missing a modification?\n"
err_str += (
f"CSM ScanId: {row['ScanId']}; CSM Scan: {row['Scan']}"
)
raise KeyError(err_str)
if mod_mapped is not None and isinstance(mod_mapped, tuple):
parsed_modifications[pos] = mod_mapped
return parsed_modifications
def __read_xisearch(
data: pd.DataFrame,
decoy_prefix: str,
parse_modifications: bool,
modifications: Dict[str, Tuple[str, float]],
ignore_errors: bool,
verbose: Literal[0, 1, 2],
) -> List[CrosslinkSpectrumMatch]:
r"""Reads a xiSearch pandas dataframe and returns a list of crosslink-spectrum-matches.
Parameters
----------
data : pandas.DataFrame
Dataframe of a xiSearch result ``.csv`` file read with pandas.
decoy_prefix : str
The prefix that indicates that a protein is from the decoy database.
parse_modifications : bool
Whether or not post-translational-modifications should be parsed for crosslink-spectrum-matches.
Requires correct specification of the 'modifications' parameter.
modifications : dict of str, tuple
Mapping of xi sequence elements (e.g. ``"cm"``) to their modifications (e.g. ``("Carbamidomethyl", 57.021464)``).
ignore_errors : bool
If modifications that are not given in parameter 'modifications' should raise an error or not. By default an error is
raised if an unknown modification is encountered. If ``True`` modifications that are unknown are encoded with the xi
shortcode (``SYMBOLEXT``) and ``float("nan")`` modification mass.
verbose : 0, 1, or 2
- 0: All warnings are ignored.
- 1: Warnings are printed to stdout.
- 2: Warnings are treated as errors.
Returns
-------
list of CrosslinkSpectrumMatch
The read crosslink-spectrum-matches.
Notes
-----
This function should not be called directly, it is called from ``read_xi()``.
"""
# remove monolinks
xl = data.dropna(axis=0, subset=["BasePeptide2"])
# create csms list
csms = list()
# create csms
for i, row in tqdm(
xl.iterrows(), total=xl.shape[0], desc="Reading xiSearch CSMs..."
):
csm = create_csm(
peptide_a=format_sequence(str(row["BasePeptide1"])),
modifications_a=__parse_xisearch_modifications(
row, True, modifications, ignore_errors, verbose
)
if parse_modifications
else None,
xl_position_peptide_a=__parse_int(row["Link1"]),
proteins_a=[
p.strip()
if p.strip()[: len(decoy_prefix)] != decoy_prefix
else p.strip()[len(decoy_prefix) :]
for p in str(row["Protein1"]).split(";")
],
xl_position_proteins_a=[
__parse_int(__parse_float(p))
for p in str(row["ProteinLink1"]).split(";")
],
pep_position_proteins_a=[
__parse_int(__parse_float(p)) for p in str(row["Start1"]).split(";")
],
score_a=__parse_float(row["Pep1Score"]),
decoy_a=get_bool_from_value(int(row["Protein1decoy"])), # pyright: ignore[reportArgumentType]
peptide_b=format_sequence(str(row["BasePeptide2"])),
modifications_b=__parse_xisearch_modifications(
row, False, modifications, ignore_errors, verbose
)
if parse_modifications
else None,
xl_position_peptide_b=__parse_int(row["Link2"]),
proteins_b=[
p.strip()
if p.strip()[: len(decoy_prefix)] != decoy_prefix
else p.strip()[len(decoy_prefix) :]
for p in str(row["Protein2"]).split(";")
],
xl_position_proteins_b=[
__parse_int(__parse_float(p))
for p in str(row["ProteinLink2"]).split(";")
],
pep_position_proteins_b=[
__parse_int(__parse_float(p)) for p in str(row["Start2"]).split(";")
],
score_b=__parse_float(row["Pep2Score"]),
decoy_b=get_bool_from_value(int(row["Protein2decoy"])), # pyright: ignore[reportArgumentType]
score=__parse_float(row["match score"]),
spectrum_file=str(row["peakListFileName"]).strip(),
scan_nr=__parse_int(row["Scan"]),
charge=__parse_int(row["PrecoursorCharge"]),
rt=None,
im_cv=None,
additional_information={
"source": __serialize_pandas_series(row),
"spectrum quality score": __parse_float(row["spectrum quality score"]),
},
)
csms.append(csm)
return csms
def __parse_xifdr_modifications(
row: pd.Series,
alpha: bool,
modifications: Dict[str, Tuple[str, float]] = XI_MODIFICATION_MAPPING,
ignore_errors: bool = False,
verbose: Literal[0, 1, 2] = 1,
) -> Dict[int, Tuple[str, float]]:
r"""Returns the corresponding modifications object for a crosslink-spectrum-match from xiFDR.
Parameters
----------
row : pandas.Series
One row/crosslink-spectrum-match of the xiFDR CSM result file.
alpha : bool
Whether to parse modifications from the alpha peptide or - if ``False`` - from the beta peptide.
modifications : dict of str, tuple, default = ``constants.XI_MODIFICATION_MAPPING``
Mapping of xi sequence elements (e.g. ``"cm"``) to their modifications (e.g. ``("Carbamidomethyl", 57.021464)``).
ignore_errors : bool, default = False
If modifications that are not given in parameter 'modifications' should raise an error or not. By default an error is
raised if an unknown modification is encountered. If ``True`` modifications that are unknown are encoded with the xi
shortcode (``SYMBOLEXT``) and ``float("nan")`` modification mass.
verbose : 0, 1, or 2, default = 1
- 0: All warnings are ignored.
- 1: Warnings are printed to stdout.
- 2: Warnings are treated as errors.
Returns
-------
dict of int, tuple
The ``pyXLMS`` specific modifications object, a dictionary that maps positions to their corresponding modifications and their
monoisotopic masses.
Raises
------
RuntimeError
If multiple modifications on the same residue are parsed (only if ``verbose = 2``).
KeyError
If an unknown modification is encountered.
Notes
-----
This function should not be called directly, it is called from ``__read_xifdr_csms()``.
"""
crosslinker = str(row["Crosslinker"]).strip()
crosslinker_mass = __parse_float(row["CrosslinkerModMass"])
parsed_modifications = dict()
if alpha:
parsed_modifications[__parse_int(row["LinkPos1"])] = (
crosslinker,
crosslinker_mass,
)
for pos, mod in parse_modifications_from_xi_sequence(
str(row["PepSeq1"]).strip()
).items():
if pos in parsed_modifications:
err_str = f"Modification at position {pos} already exists!\n"
err_str += f"CSM ScanId: {row['ScanId']}; CSM Scan: {row['Scan']}"
if verbose == 1:
warnings.warn(RuntimeWarning(err_str))
elif verbose == 2:
raise RuntimeError(err_str)
try:
t1 = parsed_modifications[pos][0] + "," + modifications[mod][0]
t2 = parsed_modifications[pos][1] + modifications[mod][1]
parsed_modifications[pos] = (t1, t2)
except KeyError:
if ignore_errors:
t1 = parsed_modifications[pos][0] + "," + mod
t2 = parsed_modifications[pos][1] + float("nan")
parsed_modifications[pos] = (t1, t2)
else:
err_str = f"Key {mod} not found in parameter 'modifications'. Are you missing a modification?\n"
err_str += (
f"CSM ScanId: {row['ScanId']}; CSM Scan: {row['Scan']}"
)
raise KeyError(err_str)
try:
parsed_modifications[pos] = (
modifications[mod][0],
modifications[mod][1],
)
except KeyError:
if ignore_errors:
parsed_modifications[pos] = (mod, float("nan"))
else:
err_str = f"Key {mod} not found in parameter 'modifications'. Are you missing a modification?\n"
err_str += f"CSM ScanId: {row['ScanId']}; CSM Scan: {row['Scan']}"
raise KeyError(err_str)
else:
parsed_modifications[__parse_int(row["LinkPos2"])] = (
crosslinker,
crosslinker_mass,
)
for pos, mod in parse_modifications_from_xi_sequence(
str(row["PepSeq2"]).strip()
).items():
if pos in parsed_modifications:
err_str = f"Modification at position {pos} already exists!\n"
err_str += f"CSM ScanId: {row['ScanId']}; CSM Scan: {row['Scan']}"
if verbose == 1:
warnings.warn(RuntimeWarning(err_str))
elif verbose == 2:
raise RuntimeError(err_str)
try:
t1 = parsed_modifications[pos][0] + "," + modifications[mod][0]
t2 = parsed_modifications[pos][1] + modifications[mod][1]
parsed_modifications[pos] = (t1, t2)
except KeyError:
if ignore_errors:
t1 = parsed_modifications[pos][0] + "," + mod
t2 = parsed_modifications[pos][1] + float("nan")
parsed_modifications[pos] = (t1, t2)
else:
err_str = f"Key {mod} not found in parameter 'modifications'. Are you missing a modification?\n"
err_str += (
f"CSM ScanId: {row['ScanId']}; CSM Scan: {row['Scan']}"
)
raise KeyError(err_str)
try:
parsed_modifications[pos] = (
modifications[mod][0],
modifications[mod][1],
)
except KeyError:
if ignore_errors:
parsed_modifications[pos] = (mod, float("nan"))
else:
err_str = f"Key {mod} not found in parameter 'modifications'. Are you missing a modification?\n"
err_str += f"CSM ScanId: {row['ScanId']}; CSM Scan: {row['Scan']}"
raise KeyError(err_str)
return parsed_modifications
def __read_xifdr_csms(
data: pd.DataFrame,
decoy_prefix: str,
parse_modifications: bool,
modifications: Dict[str, Tuple[str, float]],
ignore_errors: bool,
verbose: Literal[0, 1, 2],
) -> List[CrosslinkSpectrumMatch]:
r"""Reads a xiFDR CSM pandas dataframe and returns a list of crosslink-spectrum-matches.
Parameters
----------
data : pandas.DataFrame
Dataframe of a xiFDR CSM result ``.csv`` file read with pandas.
decoy_prefix : str
The prefix that indicates that a protein is from the decoy database.
parse_modifications : bool
Whether or not post-translational-modifications should be parsed for crosslink-spectrum-matches.
Requires correct specification of the 'modifications' parameter.
modifications : dict of str, tuple
Mapping of xi sequence elements (e.g. ``"cm"``) to their modifications (e.g. ``("Carbamidomethyl", 57.021464)``).
ignore_errors : bool
If modifications that are not given in parameter 'modifications' should raise an error or not. By default an error is
raised if an unknown modification is encountered. If ``True`` modifications that are unknown are encoded with the xi
shortcode (``SYMBOLEXT``) and ``float("nan")`` modification mass.
verbose : 0, 1, or 2
- 0: All warnings are ignored.
- 1: Warnings are printed to stdout.
- 2: Warnings are treated as errors.
Returns
-------
list of CrosslinkSpectrumMatch
The read crosslink-spectrum-matches.
Notes
-----
This function should not be called directly, it is called from ``read_xi()``.
"""
# create csms list
csms = list()
# create csms
for i, row in tqdm(
data.iterrows(), total=data.shape[0], desc="Reading xiFDR CSMs..."
):
csm = create_csm(
peptide_a=format_sequence(str(row["PepSeq1"])),
modifications_a=__parse_xifdr_modifications(
row, True, modifications, ignore_errors, verbose
)
if parse_modifications
else None,
xl_position_peptide_a=__parse_int(row["LinkPos1"]),
proteins_a=[
p.strip()
if p.strip()[: len(decoy_prefix)] != decoy_prefix
else p.strip()[len(decoy_prefix) :]
for p in str(row["Protein1"]).split(";")
],
xl_position_proteins_a=[
__parse_int(p) for p in str(row["ProteinLinkPos1"]).split(";")
],
pep_position_proteins_a=[
__parse_int(p) for p in str(row["PepPos1"]).split(";")
],
score_a=None,
decoy_a=get_bool_from_value(row["Decoy1"]),
peptide_b=format_sequence(str(row["PepSeq2"])),
modifications_b=__parse_xifdr_modifications(
row, False, modifications, ignore_errors, verbose
)
if parse_modifications
else None,
xl_position_peptide_b=__parse_int(row["LinkPos2"]),
proteins_b=[
p.strip()
if p.strip()[: len(decoy_prefix)] != decoy_prefix
else p.strip()[len(decoy_prefix) :]
for p in str(row["Protein2"]).split(";")
],
xl_position_proteins_b=[
__parse_int(p) for p in str(row["ProteinLinkPos2"]).split(";")
],
pep_position_proteins_b=[
__parse_int(p) for p in str(row["PepPos2"]).split(";")
],
score_b=None,
decoy_b=get_bool_from_value(row["Decoy2"]),
score=__parse_float(row["Score"]),
spectrum_file=str(row["PeakListFileName"]).strip()
if "PeakListFileName" in row
else str(row["run"]).strip(),
scan_nr=__parse_int(row["scan"]),
charge=__parse_int(row["exp charge"]),
rt=None,
im_cv=None,
additional_information={"source": __serialize_pandas_series(row)},
)
csms.append(csm)
return csms
def __read_xifdr_crosslinks(data: pd.DataFrame, decoy_prefix: str) -> List[Crosslink]:
r"""Reads a xiFDR Links pandas dataframe and returns a list of crosslinks.
Parameters
----------
data : pandas.DataFrame
Dataframe of a xiFDR Links result ``.csv`` file read with pandas.
decoy_prefix : str
The prefix that indicates that a protein is from the decoy database.
Returns
-------
list of Crosslink
The read crosslinks.
Notes
-----
This function should not be called directly, it is called from ``read_xi()``.
"""
# create crosslink list
crosslinks = list()
# create crosslinks
for i, row in tqdm(
data.iterrows(), total=data.shape[0], desc="Reading xiFDR crosslinks..."
):
psmid = str(row["PSMIDs"]).split(";")[0]
s1 = psmid.split("P1_")[1].split(" ")[0]
p1 = parse_peptide(s1)
s2 = psmid.split("P2_")[1].split(" ")[0]
p2 = parse_peptide(s2)
pos1 = __parse_int(psmid.split("P2_")[1].split(" ")[1])
pos2 = __parse_int(psmid.split("P2_")[1].split(" ")[2])
crosslink = create_crosslink(
peptide_a=format_sequence(p1),
xl_position_peptide_a=pos1,
proteins_a=[
p.strip()
if p.strip()[: len(decoy_prefix)] != decoy_prefix
else p.strip()[len(decoy_prefix) :]
for p in str(row["Protein1"]).split(";")
],
xl_position_proteins_a=[
__parse_int(p) for p in str(row["fromSite"]).split(";")
],
decoy_a=get_bool_from_value(row["Decoy1"]),
peptide_b=format_sequence(p2),
xl_position_peptide_b=pos2,
proteins_b=[
p.strip()
if p.strip()[: len(decoy_prefix)] != decoy_prefix
else p.strip()[len(decoy_prefix) :]
for p in str(row["Protein2"]).split(";")
],
xl_position_proteins_b=[
__parse_int(p) for p in str(row["ToSite"]).split(";")
],
decoy_b=get_bool_from_value(row["Decoy2"]),
score=__parse_float(row["Score"]),
additional_information={"source": __serialize_pandas_series(row)},
)
crosslinks.append(crosslink)
return crosslinks
[docs]
def read_xi(
files: str | List[str] | BinaryIO,
decoy_prefix: Optional[str] = "auto",
parse_modifications: bool = True,
modifications: Dict[str, Tuple[str, float]] = XI_MODIFICATION_MAPPING,
sep: str = ",",
decimal: str = ".",
ignore_errors: bool = False,
verbose: Literal[0, 1, 2] = 1,
**kwargs,
) -> ParserResult:
r"""Read a xiSearch/xiFDR result file.
Reads a xiSearch crosslink-spectrum-matches result file or a xiFDR crosslink-spectrum-matches
result file or crosslink result file in ``.csv`` format and returns a ``parser_result``.
Parameters
----------
files : str, list of str, or file stream
The name/path of the xiSearch/xiFDR result file(s) or a file-like object/stream.
decoy_prefix : str, or None, default = "auto"
The prefix that indicates that a protein is from the decoy database.
If "auto" or None it will use the default for each xi file type.
parse_modifications : bool, default = True
Whether or not post-translational-modifications should be parsed for crosslink-spectrum-matches.
Requires correct specification of the 'modifications' parameter.
modifications : dict of str, tuple, default = ``constants.XI_MODIFICATION_MAPPING``
Mapping of xi sequence elements (e.g. ``"cm"``) to their modifications (e.g. ``("Carbamidomethyl", 57.021464)``).
This corresponds to the ``SYMBOLEXT`` field, or the ``SYMBOL`` field minus the amino acid in the xiSearch config.
sep : str, default = ","
Seperator used in the ``.csv`` file.
decimal : str, default = "."
Character to recognize as decimal point.
ignore_errors : bool, default = False
If modifications that are not given in parameter 'modifications' should raise an error or not. By default an error is
raised if an unknown modification is encountered. If ``True`` modifications that are unknown are encoded with the xi
shortcode (``SYMBOLEXT``) and ``float("nan")`` modification mass.
verbose : 0, 1, or 2, default = 1
- 0: All warnings are ignored.
- 1: Warnings are printed to stdout.
- 2: Warnings are treated as errors.
**kwargs
Any additional parameters will be passed to ``pandas.read*``.
Returns
-------
ParserResult
The ``parser_result`` object containing all parsed information.
Raises
------
RuntimeError
If the file(s) contain no crosslinks or crosslink-spectrum-matches.
TypeError
If parameter verbose was not set correctly.
Notes
-----
Uses ``Pep1Score`` as the score for the alpha peptide, ``Pep2Score`` as the score of the
beta peptide, and ``match score`` as the score of the crosslink-spectrum-match for xiSearch
crosslink-spectrum-matches. Uses ``Score`` as the score of the crosslink-spectrum-match for
xiFDR crosslink-spectrum-matches, alpha and beta peptide scores are ``None`` for xiFDR
crosslink-spectrum-matches. Uses ``Score`` as the score of the crosslink for xiFDR crosslinks.
For reference, see here:
`github.com/Rappsilber-Laboratory/XiSearch <https://github.com/Rappsilber-Laboratory/XiSearch/discussions/126>`_.
Examples
--------
>>> from pyXLMS.parser import read_xi
>>> csms_from_xiSearch = read_xi("data/xi/r1_Xi1.7.6.7.csv")
>>> from pyXLMS.parser import read_xi
>>> csms_from_xiFDR = read_xi("data/xi/1perc_xl_boost_CSM_xiFDR2.2.1.csv")
>>> from pyXLMS.parser import read_xi
>>> crosslinks_from_xiFDR = read_xi("data/xi/1perc_xl_boost_Links_xiFDR2.2.1.csv")
"""
## check input
_ok = (
check_input(decoy_prefix, "decoy_prefix", str)
if decoy_prefix is not None
else True
)
_ok = check_input(parse_modifications, "parse_modifications", bool)
_ok = check_input(modifications, "modifications", dict, tuple)
_ok = check_input(sep, "sep", str)
_ok = check_input(decimal, "decimal", str)
_ok = check_input(ignore_errors, "ignore_errors", bool)
_ok = check_input(verbose, "verbose", int)
if verbose not in [0, 1, 2]:
raise TypeError("Verbose level has to be one of 0, 1, or 2!")
## data structures
crosslinks = list()
csms = list()
## handle input
if not isinstance(files, list):
inputs = [files]
else:
inputs = files
for input in inputs:
## reading data
data = pd.read_csv(input, sep=sep, decimal=decimal, low_memory=False, **kwargs) # ty: ignore[no-matching-overload]
## detect input file type
xi_file_type = detect_xi_filetype(data)
## set decoy prefix
if decoy_prefix is None or decoy_prefix == "auto":
decoy_prefix = "REV_" if xi_file_type == "xisearch" else "decoy:"
## process data
if xi_file_type == "xifdr_csms":
csms += __read_xifdr_csms(
data,
decoy_prefix,
parse_modifications,
modifications,
ignore_errors,
verbose,
)
elif xi_file_type == "xifdr_crosslinks":
crosslinks += __read_xifdr_crosslinks(data, decoy_prefix)
else:
csms += __read_xisearch(
data,
decoy_prefix,
parse_modifications,
modifications,
ignore_errors,
verbose,
)
## check results
if len(crosslinks) + len(csms) == 0:
raise RuntimeError(
"No crosslink-spectrum-matches or crosslinks were parsed! If this is unexpected, please file a bug report!"
)
## return parser result
return create_parser_result(
search_engine="xiSearch/xiFDR",
csms=csms if len(csms) > 0 else None,
crosslinks=crosslinks if len(crosslinks) > 0 else None,
)