#!/usr/bin/env python3
# 2025 (c) Micha Johannes Birklbauer
# https://github.com/michabirklbauer/
# micha.birklbauer@gmail.com
from __future__ import annotations
import warnings
import numpy as np
from tqdm import tqdm
from ..data._csm import CrosslinkSpectrumMatch
from ..data._crosslink import Crosslink
from ..data._parser_result import ParserResult
from ..data._util import check_input
from ..data._util import check_input_multi
from ..data._parser_result import create_parser_result
from ._util import check_available_keys
from ._util import assert_csms
from ._util import assert_xls
from ._util import assert_csms_or_xls
from ._filter import filter_target_decoy
from ._filter import filter_crosslink_type
from typing import List
# legacy
try:
from typing import Literal
except ImportError:
from typing_extensions import Literal
def __verify_fdr_strict(
data: List[CrosslinkSpectrumMatch] | List[Crosslink],
fdr: float,
cutoff: float,
score: Literal["higher_better", "lower_better"],
) -> bool:
r"""Verifies that a list of crosslinks or crosslink-spectrum-matches has the estimated false discovery rate using a strict approach.
Verifies that a list of crosslinks or crosslink-spectrum-matches has the estimated false discovery rate (FDR) using the
formula (TD+DD)/TT given a score cutoff.
Requires that "score", "alpha_decoy" and "beta_decoy" fields are set for crosslinks and crosslink-spectrum-matches.
Parameters
----------
data : list of CrosslinkSpectrumMatch, or list of Crosslink
A list of crosslink-spectrum-matches or crosslinks to validate.
fdr : float
The target FDR, must be given as a real number between 0 and 1.
cutoff : float
Score cutoff that defines which crosslinks or crosslink-spectrum-matches fall within the FDR validated result.
score : str, one of "higher_better" or "lower_better"
If a higher score is considered better, or a lower score is considered better.
Returns
-------
bool
Returns True if the given score cutoff yields only crosslinks or crosslink-spectrum-matches
that are within the target FDR. Returns False if the given score cutoff yields crosslinks or
crosslink-spectrum-matches that produce a higher estimated FDR than the desired target FDR.
Notes
-----
This function should not be called directly, it is called from ``__validate_strict()``.
"""
D = 0
T = 0
for item in data:
if score == "higher_better" and item["score"] >= cutoff:
if not item["alpha_decoy"] and not item["beta_decoy"]:
T += 1
else:
D += 1
elif score == "lower_better" and item["score"] <= cutoff:
if not item["alpha_decoy"] and not item["beta_decoy"]:
T += 1
else:
D += 1
else:
# do nothing
pass
return D / T < fdr
def __validate_strict(
data: List[CrosslinkSpectrumMatch] | List[Crosslink],
fdr: float,
score: Literal["higher_better", "lower_better"],
) -> List[CrosslinkSpectrumMatch] | List[Crosslink]:
r"""Validate a list of crosslinks or crosslink-spectrum-matches by strict false discovery rate estimation.
Validate a list of crosslinks or crosslink-spectrum-matches by strict false discovery rate (FDR) estimation using the
formula (TD+DD)/TT.
Requires that "score", "alpha_decoy" and "beta_decoy" fields are set for crosslinks and crosslink-spectrum-matches.
Parameters
----------
data : list of CrosslinkSpectrumMatch, or list of Crosslink
A list of crosslink-spectrum-matches or crosslinks to validate.
fdr : float
The target FDR, must be given as a real number between 0 and 1. The default of 0.01 corresponds to 1% FDR.
score : str, one of "higher_better" or "lower_better"
If a higher score is considered better, or a lower score is considered better.
Returns
-------
list of dict of str, any
A list of validated crosslink-spectrum-matches or crosslinks.
Warns
-----
RuntimeWarning
If none of the data passes the desired FDR threshold.
Notes
-----
This function should not be called directly, it is called from ``validate()``.
"""
scores = list()
td = list()
for item in data:
scores.append(item["score"])
if not item["alpha_decoy"] and not item["beta_decoy"]:
td.append(0)
else:
td.append(1)
scores = np.array(scores)
cutoff = 0
if score == "higher_better":
td = np.array(td)[np.argsort(scores, kind="stable")]
scores = scores[np.argsort(scores, kind="stable")]
cutoff = scores[0] # scores.max()
else:
td = np.array(td)[np.argsort(scores, kind="stable")[::-1]]
scores = scores[np.argsort(scores, kind="stable")[::-1]]
cutoff = scores[0] # scores.min()
nr_items = len(td)
for i in tqdm(
range(nr_items),
total=nr_items,
desc="Iterating over scores for FDR calculation...",
):
if (nr_items - i - td[i:].sum()) <= 0.0:
warnings.warn(
RuntimeWarning(
"None of the data passes the desired FDR threshold! This is usually due to decoys with very good scores."
)
)
return []
if td[i:].sum() / (nr_items - i - td[i:].sum()) < fdr:
# we need to verify in this case because there might be multiple
# items with the same score
if __verify_fdr_strict(data, fdr, scores[i], score):
cutoff = scores[i]
break
validated_items = list()
for item in data:
if score == "higher_better" and item["score"] >= cutoff:
validated_items.append(item)
elif score == "lower_better" and item["score"] <= cutoff:
validated_items.append(item)
else:
# do nothing
pass
return validated_items
def __verify_fdr_relaxed(
data: List[CrosslinkSpectrumMatch] | List[Crosslink],
fdr: float,
cutoff: float,
score: Literal["higher_better", "lower_better"],
) -> bool:
r"""Verifies that a list of crosslinks or crosslink-spectrum-matches has the estimated false discovery rate using a relaxed approach.
Verifies that a list of crosslinks or crosslink-spectrum-matches has the estimated false discovery rate (FDR) using the
formula (TD-DD)/TT given a score cutoff.
Requires that "score", "alpha_decoy" and "beta_decoy" fields are set for crosslinks and crosslink-spectrum-matches.
Parameters
----------
data : list of CrosslinkSpectrumMatch, or list of Crosslink
A list of crosslink-spectrum-matches or crosslinks to validate.
fdr : float
The target FDR, must be given as a real number between 0 and 1.
cutoff : float
Score cutoff that defines which crosslinks or crosslink-spectrum-matches fall within the FDR validated result.
score : str, one of "higher_better" or "lower_better"
If a higher score is considered better, or a lower score is considered better.
Returns
-------
bool
Returns True if the given score cutoff yields only crosslinks or crosslink-spectrum-matches
that are within the target FDR. Returns False if the given score cutoff yields crosslinks or
crosslink-spectrum-matches that produce a higher estimated FDR than the desired target FDR.
Raises
------
RuntimeError
If the number of DD matches exceeds the number of TD matches.
FDR can not be estimated with the formula '(TD-DD)/TT' in these cases.
Notes
-----
This function should not be called directly, it is called from ``__validate_relaxed()``.
"""
D = 0
DT = 0
T = 0
for item in data:
if score == "higher_better" and item["score"] >= cutoff:
if not item["alpha_decoy"] and not item["beta_decoy"]:
T += 1
elif item["alpha_decoy"] and item["beta_decoy"]:
D += 1
else:
DT += 1
elif score == "lower_better" and item["score"] <= cutoff:
if not item["alpha_decoy"] and not item["beta_decoy"]:
T += 1
elif item["alpha_decoy"] and item["beta_decoy"]:
D += 1
else:
DT += 1
else:
# do nothing
pass
if (DT - D) < 0.0:
raise RuntimeError(
f"Number of DD matches is greater than the number of TD matches for score {cutoff}! "
"Invalid FDR estimation! Please use formula 'D/T' instead!"
)
return (DT - D) / T < fdr
def __validate_relaxed(
data: List[CrosslinkSpectrumMatch] | List[Crosslink],
fdr: float,
score: Literal["higher_better", "lower_better"],
) -> List[CrosslinkSpectrumMatch] | List[Crosslink]:
r"""Validate a list of crosslinks or crosslink-spectrum-matches by relaxed false discovery rate estimation.
Validate a list of crosslinks or crosslink-spectrum-matches by relaxed false discovery rate (FDR) estimation using the
formula (TD-DD)/TT.
Requires that "score", "alpha_decoy" and "beta_decoy" fields are set for crosslinks and crosslink-spectrum-matches.
Parameters
----------
data : list of CrosslinkSpectrumMatch, or list of Crosslink
A list of crosslink-spectrum-matches or crosslinks to validate.
fdr : float
The target FDR, must be given as a real number between 0 and 1.
score : str, one of "higher_better" or "lower_better"
If a higher score is considered better, or a lower score is considered better.
Returns
-------
list of dict of str, any
A list of validated crosslink-spectrum-matches or crosslinks.
Raises
------
RuntimeError
If the number of DD matches exceeds the number of TD matches.
FDR can not be estimated with the formula '(TD-DD)/TT' in these cases.
Warns
-----
RuntimeWarning
If none of the data passes the desired FDR threshold.
Notes
-----
This function should not be called directly, it is called from ``validate()``.
"""
scores = list()
td = list()
tdd = list()
for item in data:
scores.append(item["score"])
if not item["alpha_decoy"] and not item["beta_decoy"]:
td.append(0)
tdd.append(0)
elif item["alpha_decoy"] and item["beta_decoy"]:
td.append(1)
tdd.append(-1)
else:
td.append(1)
tdd.append(1)
scores = np.array(scores)
cutoff = 0
if score == "higher_better":
td = np.array(td)[np.argsort(scores, kind="stable")]
tdd = np.array(tdd)[np.argsort(scores, kind="stable")]
scores = scores[np.argsort(scores, kind="stable")]
cutoff = scores[0] # scores.max()
else:
td = np.array(td)[np.argsort(scores, kind="stable")[::-1]]
tdd = np.array(tdd)[np.argsort(scores, kind="stable")[::-1]]
scores = scores[np.argsort(scores, kind="stable")[::-1]]
cutoff = scores[0] # scores.min()
nr_items = len(td)
for i in tqdm(
range(nr_items),
total=nr_items,
desc="Iterating over scores for FDR calculation...",
):
if tdd[i:].sum() < 0.0:
raise RuntimeError(
f"Number of DD matches is greater than the number of TD matches for score {scores[i]}! "
"Invalid FDR estimation! Please use formula 'D/T' instead!"
)
if (nr_items - i - td[i:].sum()) <= 0.0:
warnings.warn(
RuntimeWarning(
"None of the data passes the desired FDR threshold! This is usually due to decoys with very good scores."
)
)
return []
if tdd[i:].sum() / (nr_items - i - td[i:].sum()) < fdr:
# we need to verify in this case because there might be multiple
# items with the same score
if __verify_fdr_relaxed(data, fdr, scores[i], score):
cutoff = scores[i]
break
validated_items = list()
for item in data:
if score == "higher_better" and item["score"] >= cutoff:
validated_items.append(item)
elif score == "lower_better" and item["score"] <= cutoff:
validated_items.append(item)
else:
# do nothing
pass
return validated_items
[docs]
def validate(
data: List[CrosslinkSpectrumMatch] | List[Crosslink] | ParserResult,
fdr: float = 0.01,
formula: Literal["D/T", "(TD+DD)/TT", "(TD-DD)/TT"] = "D/T",
score: Literal["higher_better", "lower_better"] = "higher_better",
separate_intra_inter: bool = False,
ignore_missing_labels: bool = False,
) -> List[CrosslinkSpectrumMatch] | List[Crosslink] | ParserResult:
r"""Validate a list of crosslinks or crosslink-spectrum-matches, or a parser_result by estimating false discovery rate.
Validate a list of crosslinks or crosslink-spectrum-matches, or a parser_result by estimating false discovery rate (FDR) using the defined formula.
Requires that "score", "alpha_decoy" and "beta_decoy" fields are set for crosslinks and crosslink-spectrum-matches.
Parameters
----------
data : list of CrosslinkSpectrumMatch, list of Crosslink, or ParserResult
A list of crosslink-spectrum-matches or crosslinks to validate, or a parser_result.
fdr : float, default = 0.01
The target FDR, must be given as a real number between 0 and 1. The default of 0.01 corresponds to 1% FDR.
formula : str, one of "D/T", "(TD+DD)/TT", or "(TD-DD)/TT", default = "D/T"
Which formula to use to estimate FDR. 'D' denotes any decoy matches including decoy-decoy, decoy-target and target-decoy matches.
'DD' denotes decoy-decoy matches, 'T' and 'TT' denote target-target matches, and 'TD' denotes target-decoy
and decoy-target matches.
score : str, one of "higher_better" or "lower_better", default = "higher_better"
If a higher score is considered better, or a lower score is considered better.
separate_intra_inter : bool, default = False
If FDR should be estimated separately for intra and inter matches.
ignore_missing_labels : bool, default = False
If crosslinks and crosslink-spectrum-matches should be ignored if they don't have target and decoy labels. By default and error is
thrown if any unlabelled data is encountered.
Returns
-------
list of CrosslinkSpectrumMatch, list of Crosslink, or ParserResult
If a list of crosslink-spectrum-matches or crosslinks was provided, a list of validated
crosslink-spectrum-matches or crosslinks is returned. If a parser_result was provided,
an parser_result with validated crosslink-spectrum-matches and/or validated crosslinks will
be returned.
Raises
------
TypeError
If a wrong data type is provided.
TypeError
If parameter formula is not one of 'D/T', '(TD+DD)/TT', or '(TD-DD)/TT'.
TypeError
If parameter score is not one of 'higher_better' or 'lower_better'.
ValueError
If parameter fdr is outside of the supported range.
ValueError
If the number of DD matches exceeds the number of TD matches for formula '(TD-DD)/TT'.
FDR cannot be estimated with the formula '(TD-DD)/TT' in these cases.
Warnings
--------
Please note that the by default selected FDR formula of 'D/T' actually denotes '(TD+DD)/TT' where 'TD' includes both 'TD' and 'DT' matches
since we consider any peptide pair with at least one decoy hit a decoy. Estimating FDR via 'DD/TT' is not valid for crosslinking mass spectrometry
as it severly underestimated the actual error and is therefore not supported by pyXLMS!
Notes
-----
Please note that progress bars will usually not complete when running this function. This is by design as it is not
necessary to iterate over all scores to estimate FDR.
Examples
--------
>>> from pyXLMS.parser import read
>>> from pyXLMS.transform import validate
>>> pr = read(
... "data/ms_annika/XLpeplib_Beveridge_QEx-HFX_DSS_R1_CSMs.xlsx",
... engine="MS Annika",
... crosslinker="DSS",
... )
>>> csms = pr["crosslink-spectrum-matches"]
>>> len(csms)
826
>>> validated = validate(csms)
>>> len(validated)
705
>>> from pyXLMS.parser import read
>>> from pyXLMS.transform import validate
>>> pr = read(
... [
... "data/ms_annika/XLpeplib_Beveridge_QEx-HFX_DSS_R1_CSMs.xlsx",
... "data/ms_annika/XLpeplib_Beveridge_QEx-HFX_DSS_R1_Crosslinks.xlsx",
... ],
... engine="MS Annika",
... crosslinker="DSS",
... )
>>> len(pr["crosslink-spectrum-matches"])
826
>>> len(pr["crosslinks"])
300
>>> validated = validate(pr)
>>> len(validated["crosslink-spectrum-matches"])
705
>>> len(validated["crosslinks"])
226
>>> from pyXLMS.parser import read
>>> from pyXLMS.transform import validate
>>> pr = read(
... [
... "data/ms_annika/XLpeplib_Beveridge_QEx-HFX_DSS_R1_CSMs.xlsx",
... "data/ms_annika/XLpeplib_Beveridge_QEx-HFX_DSS_R1_Crosslinks.xlsx",
... ],
... engine="MS Annika",
... crosslinker="DSS",
... )
>>> len(pr["crosslink-spectrum-matches"])
826
>>> len(pr["crosslinks"])
300
>>> validated = validate(pr, fdr=0.05)
>>> len(validated["crosslink-spectrum-matches"])
825
>>> len(validated["crosslinks"])
260
"""
_ok = check_input_multi(data, "data", [ParserResult, list])
_ok = check_input(fdr, "fdr", float)
_ok = check_input(formula, "formula", str)
_ok = check_input(score, "score", str)
_ok = check_input(separate_intra_inter, "separate_intra_inter", bool)
_ok = check_input(ignore_missing_labels, "ignore_missing_labels", bool)
if fdr >= 1.0 or fdr <= 0.0:
raise ValueError(
"FDR must be given as a real number between 0 and 1, e.g. 0.01 corresponds to 1% FDR!"
)
if formula not in ["D/T", "(TD+DD)/TT", "(TD-DD)/TT"]:
raise TypeError(
"Parameter 'formula' has to be one of 'D/T', '(TD+DD)/TT' or '(TD-DD)/TT'! Where D and DD is the number of decoys, T and TT the number of targets, "
"and TD the number of target-decoys!"
)
if score not in ["higher_better", "lower_better"]:
raise TypeError(
"Parameter 'score' has to be one of 'higher_better' or 'lower_better'!"
)
if isinstance(data, list):
csms_or_xls = assert_csms_or_xls(data)
if len(csms_or_xls) == 0:
return csms_or_xls
if ignore_missing_labels:
csms_or_xls = [
item
for item in csms_or_xls
if item["alpha_decoy"] is not None and item["beta_decoy"] is not None
]
csms_or_xls = assert_csms_or_xls(csms_or_xls)
_ok = check_available_keys(["score", "alpha_decoy", "beta_decoy"], csms_or_xls)
if formula == "(TD-DD)/TT":
if len(filter_target_decoy(csms_or_xls)["Target-Decoy"]) == 0:
raise ValueError(
"Can't estimate FDR with formula '(TD-DD)/TT' when there are no TD matches! Please select the default formula instead!"
)
if separate_intra_inter:
separate = filter_crosslink_type(csms_or_xls)
return assert_csms_or_xls(
__validate_relaxed(separate["Intra"], fdr, score)
+ __validate_relaxed(separate["Inter"], fdr, score)
)
return __validate_relaxed(csms_or_xls, fdr, score)
if separate_intra_inter:
separate = filter_crosslink_type(csms_or_xls)
return assert_csms_or_xls(
__validate_strict(separate["Intra"], fdr, score)
+ __validate_strict(separate["Inter"], fdr, score)
)
return __validate_strict(csms_or_xls, fdr, score)
new_csms = (
assert_csms(
validate(
data["crosslink-spectrum-matches"],
fdr,
formula,
score,
separate_intra_inter,
ignore_missing_labels,
)
)
if data["crosslink-spectrum-matches"] is not None
else None
)
new_xls = (
assert_xls(
validate(
data["crosslinks"],
fdr,
formula,
score,
separate_intra_inter,
ignore_missing_labels,
)
)
if data["crosslinks"] is not None
else None
)
return create_parser_result(
search_engine=data["search_engine"],
csms=new_csms,
crosslinks=new_xls,
)