Source code for pyXLMS.transform_validate

#!/usr/bin/env python3

# 2025 (c) Micha Johannes Birklbauer
# https://github.com/michabirklbauer/
# micha.birklbauer@gmail.com

from __future__ import annotations

import warnings
import numpy as np
from tqdm import tqdm

from .data import check_input
from .data import check_input_multi
from .data import create_parser_result
from .transform_util import get_available_keys
from .transform_filter import filter_target_decoy
from .transform_filter import filter_crosslink_type

from typing import Dict
from typing import List
from typing import Any

# legacy
try:
    from typing import Literal
except ImportError:
    from typing_extensions import Literal


def __verify_fdr_strict(
    data: List[Dict[str, Any]],
    fdr: float,
    cutoff: float,
    score: Literal["higher_better", "lower_better"],
) -> bool:
    r"""Verifies that a list of crosslinks or crosslink-spectrum-matches has the estimated false discovery rate using a strict approach.

    Verifies that a list of crosslinks or crosslink-spectrum-matches has the estimated false discovery rate (FDR) using the
    formula (TD+DD)/TT given a score cutoff.
    Requires that "score", "alpha_decoy" and "beta_decoy" fields are set for crosslinks and crosslink-spectrum-matches.

    Parameters
    ----------
    data : list of dict of str, any
        A list of crosslink-spectrum-matches or crosslinks to validate.
    fdr : float
        The target FDR, must be given as a real number between 0 and 1.
    cutoff : float
        Score cutoff that defines which crosslinks or crosslink-spectrum-matches fall within the FDR validated result.
    score: str, one of "higher_better" or "lower_better"
        If a higher score is considered better, or a lower score is considered better.

    Returns
    -------
    bool
        Returns True if the given score cutoff yields only crosslinks or crosslink-spectrum-matches
        that are within the target FDR. Returns False if the given score cutoff yields crosslinks or
        crosslink-spectrum-matches that produce a higher estimated FDR than the desired target FDR.

    Notes
    -----
    This function should not be called directly, it is called from ``__validate_strict()``.
    """
    D = 0
    T = 0
    for item in data:
        if score == "higher_better" and item["score"] >= cutoff:
            if not item["alpha_decoy"] and not item["beta_decoy"]:
                T += 1
            else:
                D += 1
        elif score == "lower_better" and item["score"] <= cutoff:
            if not item["alpha_decoy"] and not item["beta_decoy"]:
                T += 1
            else:
                D += 1
        else:
            # do nothing
            pass
    return D / T < fdr


def __validate_strict(
    data: List[Dict[str, Any]],
    fdr: float,
    score: Literal["higher_better", "lower_better"],
) -> List[Dict[str, Any]]:
    r"""Validate a list of crosslinks or crosslink-spectrum-matches by strict false discovery rate estimation.

    Validate a list of crosslinks or crosslink-spectrum-matches by strict false discovery rate (FDR) estimation using the
    formula (TD+DD)/TT.
    Requires that "score", "alpha_decoy" and "beta_decoy" fields are set for crosslinks and crosslink-spectrum-matches.

    Parameters
    ----------
    data : list of dict of str, any
        A list of crosslink-spectrum-matches or crosslinks to validate.
    fdr : float
        The target FDR, must be given as a real number between 0 and 1. The default of 0.01 corresponds to 1% FDR.
    score: str, one of "higher_better" or "lower_better"
        If a higher score is considered better, or a lower score is considered better.

    Returns
    -------
    list of dict of str, any
        A list of validated crosslink-spectrum-matches or crosslinks.

    Warns
    -----
    RuntimeWarning
        If none of the data passes the desired FDR threshold.

    Notes
    -----
    This function should not be called directly, it is called from ``validate()``.
    """
    scores = list()
    td = list()
    for item in data:
        scores.append(item["score"])
        if not item["alpha_decoy"] and not item["beta_decoy"]:
            td.append(0)
        else:
            td.append(1)
    scores = np.array(scores)
    cutoff = 0
    if score == "higher_better":
        td = np.array(td)[np.argsort(scores, kind="stable")]
        scores = scores[np.argsort(scores, kind="stable")]
        cutoff = scores[0]  # scores.max()
    else:
        td = np.array(td)[np.argsort(scores, kind="stable")[::-1]]
        scores = scores[np.argsort(scores, kind="stable")[::-1]]
        cutoff = scores[0]  # scores.min()
    nr_items = len(td)
    for i in tqdm(
        range(nr_items),
        total=nr_items,
        desc="Iterating over scores for FDR calculation...",
    ):
        if (nr_items - i - td[i:].sum()) <= 0.0:
            warnings.warn(
                RuntimeWarning(
                    "None of the data passes the desired FDR threshold! This is usually due to decoys with very good scores."
                )
            )
            return []
        if td[i:].sum() / (nr_items - i - td[i:].sum()) < fdr:
            # we need to verify in this case because there might be multiple
            # items with the same score
            if __verify_fdr_strict(data, fdr, scores[i], score):
                cutoff = scores[i]
                break
    validated_items = list()
    for item in data:
        if score == "higher_better" and item["score"] >= cutoff:
            validated_items.append(item)
        elif score == "lower_better" and item["score"] <= cutoff:
            validated_items.append(item)
        else:
            # do nothing
            pass
    return validated_items


def __verify_fdr_relaxed(
    data: List[Dict[str, Any]],
    fdr: float,
    cutoff: float,
    score: Literal["higher_better", "lower_better"],
) -> bool:
    r"""Verifies that a list of crosslinks or crosslink-spectrum-matches has the estimated false discovery rate using a relaxed approach.

    Verifies that a list of crosslinks or crosslink-spectrum-matches has the estimated false discovery rate (FDR) using the
    formula (TD-DD)/TT given a score cutoff.
    Requires that "score", "alpha_decoy" and "beta_decoy" fields are set for crosslinks and crosslink-spectrum-matches.

    Parameters
    ----------
    data : list of dict of str, any
        A list of crosslink-spectrum-matches or crosslinks to validate.
    fdr : float
        The target FDR, must be given as a real number between 0 and 1.
    cutoff : float
        Score cutoff that defines which crosslinks or crosslink-spectrum-matches fall within the FDR validated result.
    score: str, one of "higher_better" or "lower_better"
        If a higher score is considered better, or a lower score is considered better.

    Returns
    -------
    bool
        Returns True if the given score cutoff yields only crosslinks or crosslink-spectrum-matches
        that are within the target FDR. Returns False if the given score cutoff yields crosslinks or
        crosslink-spectrum-matches that produce a higher estimated FDR than the desired target FDR.

    Raises
    ------
    RuntimeError
        If the number of DD matches exceeds the number of TD matches.
        FDR can not be estimated with the formula '(TD-DD)/TT' in these cases.

    Notes
    -----
    This function should not be called directly, it is called from ``__validate_relaxed()``.
    """
    D = 0
    DT = 0
    T = 0
    for item in data:
        if score == "higher_better" and item["score"] >= cutoff:
            if not item["alpha_decoy"] and not item["beta_decoy"]:
                T += 1
            elif item["alpha_decoy"] and item["beta_decoy"]:
                D += 1
            else:
                DT += 1
        elif score == "lower_better" and item["score"] <= cutoff:
            if not item["alpha_decoy"] and not item["beta_decoy"]:
                T += 1
            elif item["alpha_decoy"] and item["beta_decoy"]:
                D += 1
            else:
                DT += 1
        else:
            # do nothing
            pass
    if (DT - D) < 0.0:
        raise RuntimeError(
            f"Number of DD matches is greater than the number of TD matches for score {cutoff}! "
            "Invalid FDR estimation! Please use formula 'D/T' instead!"
        )
    return (DT - D) / T < fdr


def __validate_relaxed(
    data: List[Dict[str, Any]],
    fdr: float,
    score: Literal["higher_better", "lower_better"],
) -> List[Dict[str, Any]]:
    r"""Validate a list of crosslinks or crosslink-spectrum-matches by relaxed false discovery rate estimation.

    Validate a list of crosslinks or crosslink-spectrum-matches by relaxed false discovery rate (FDR) estimation using the
    formula (TD-DD)/TT.
    Requires that "score", "alpha_decoy" and "beta_decoy" fields are set for crosslinks and crosslink-spectrum-matches.

    Parameters
    ----------
    data : list of dict of str, any
        A list of crosslink-spectrum-matches or crosslinks to validate.
    fdr : float
        The target FDR, must be given as a real number between 0 and 1.
    score: str, one of "higher_better" or "lower_better"
        If a higher score is considered better, or a lower score is considered better.

    Returns
    -------
    list of dict of str, any
        A list of validated crosslink-spectrum-matches or crosslinks.

    Raises
    ------
    RuntimeError
        If the number of DD matches exceeds the number of TD matches.
        FDR can not be estimated with the formula '(TD-DD)/TT' in these cases.

    Warns
    -----
    RuntimeWarning
        If none of the data passes the desired FDR threshold.

    Notes
    -----
    This function should not be called directly, it is called from ``validate()``.
    """
    scores = list()
    td = list()
    tdd = list()
    for item in data:
        scores.append(item["score"])
        if not item["alpha_decoy"] and not item["beta_decoy"]:
            td.append(0)
            tdd.append(0)
        elif item["alpha_decoy"] and item["beta_decoy"]:
            td.append(1)
            tdd.append(-1)
        else:
            td.append(1)
            tdd.append(1)
    scores = np.array(scores)
    cutoff = 0
    if score == "higher_better":
        td = np.array(td)[np.argsort(scores, kind="stable")]
        tdd = np.array(tdd)[np.argsort(scores, kind="stable")]
        scores = scores[np.argsort(scores, kind="stable")]
        cutoff = scores[0]  # scores.max()
    else:
        td = np.array(td)[np.argsort(scores, kind="stable")[::-1]]
        tdd = np.array(tdd)[np.argsort(scores, kind="stable")[::-1]]
        scores = scores[np.argsort(scores, kind="stable")[::-1]]
        cutoff = scores[0]  # scores.min()
    nr_items = len(td)
    for i in tqdm(
        range(nr_items),
        total=nr_items,
        desc="Iterating over scores for FDR calculation...",
    ):
        if tdd[i:].sum() < 0.0:
            raise RuntimeError(
                f"Number of DD matches is greater than the number of TD matches for score {scores[i]}! "
                "Invalid FDR estimation! Please use formula 'D/T' instead!"
            )
        if (nr_items - i - td[i:].sum()) <= 0.0:
            warnings.warn(
                RuntimeWarning(
                    "None of the data passes the desired FDR threshold! This is usually due to decoys with very good scores."
                )
            )
            return []
        if tdd[i:].sum() / (nr_items - i - td[i:].sum()) < fdr:
            # we need to verify in this case because there might be multiple
            # items with the same score
            if __verify_fdr_relaxed(data, fdr, scores[i], score):
                cutoff = scores[i]
                break
    validated_items = list()
    for item in data:
        if score == "higher_better" and item["score"] >= cutoff:
            validated_items.append(item)
        elif score == "lower_better" and item["score"] <= cutoff:
            validated_items.append(item)
        else:
            # do nothing
            pass
    return validated_items


[docs] def validate( data: List[Dict[str, Any]] | Dict[str, Any], fdr: float = 0.01, formula: Literal["D/T", "(TD+DD)/TT", "(TD-DD)/TT"] = "D/T", score: Literal["higher_better", "lower_better"] = "higher_better", separate_intra_inter: bool = False, ignore_missing_labels: bool = False, ) -> List[Dict[str, Any]] | Dict[str, Any]: r"""Validate a list of crosslinks or crosslink-spectrum-matches, or a parser_result by estimating false discovery rate. Validate a list of crosslinks or crosslink-spectrum-matches, or a parser_result by estimating false discovery rate (FDR) using the defined formula. Requires that "score", "alpha_decoy" and "beta_decoy" fields are set for crosslinks and crosslink-spectrum-matches. Parameters ---------- data : list of dict of str, any, or dict of str, any A list of crosslink-spectrum-matches or crosslinks to validate, or a parser_result. fdr : float, default = 0.01 The target FDR, must be given as a real number between 0 and 1. The default of 0.01 corresponds to 1% FDR. formula : str, one of "D/T", "(TD+DD)/TT", or "(TD-DD)/TT", default = "D/T" Which formula to use to estimate FDR. D and DD denote decoy matches, T and TT denote target matches, and TD denotes target-decoy and decoy-target matches. score: str, one of "higher_better" or "lower_better", default = "higher_better" If a higher score is considered better, or a lower score is considered better. separate_intra_inter : bool, default = False If FDR should be estimated separately for intra and inter matches. ignore_missing_labels : bool, default = False If crosslinks and crosslink-spectrum-matches should be ignored if they don't have target and decoy labels. By default and error is thrown if any unlabelled data is encountered. Returns ------- list of dict of str, any, or dict of str, any If a list of crosslink-spectrum-matches or crosslinks was provided, a list of validated crosslink-spectrum-matches or crosslinks is returned. If a parser_result was provided, an parser_result with validated crosslink-spectrum-matches and/or validated crosslinks will be returned. Raises ------ TypeError If a wrong data type is provided. TypeError If parameter formula is not one of 'D/T', '(TD+DD)/TT', or '(TD-DD)/TT'. TypeError If parameter score is not one of 'higher_better' or 'lower_better'. ValueError If parameter fdr is outside of the supported range. ValueError If attribute 'score' is not available for any of the data. ValueError If attribute 'alpha_decoy' or 'beta_decoy' is not available for any of the data and parameter ignore_missing_labels is set to False. ValueError If the number of DD matches exceeds the number of TD matches for formula '(TD-DD)/TT'. FDR can not be estimated with the formula '(TD-DD)/TT' in these cases. Notes ----- Please note that progress bars will usually not complete when running this function. This is by design as it is not necessary to iterate over all scores to estimate FDR. Examples -------- >>> from pyXLMS.parser import read >>> from pyXLMS.transform import validate >>> pr = read("data/ms_annika/XLpeplib_Beveridge_QEx-HFX_DSS_R1_CSMs.xlsx", engine="MS Annika", crosslinker="DSS") >>> csms = pr["crosslink-spectrum-matches"] >>> len(csms) 826 >>> validated = validate(csms) >>> len(validated) 705 >>> from pyXLMS.parser import read >>> from pyXLMS.transform import validate >>> pr = read(["data/ms_annika/XLpeplib_Beveridge_QEx-HFX_DSS_R1_CSMs.xlsx", "data/ms_annika/XLpeplib_Beveridge_QEx-HFX_DSS_R1_Crosslinks.xlsx"], engine="MS Annika", crosslinker="DSS") >>> len(pr["crosslink-spectrum-matches"]) 826 >>> len(pr["crosslinks"]) 300 >>> validated = validate(pr) >>> len(validated["crosslink-spectrum-matches"]) 705 >> len(validated["crosslinks"]) 226 >>> from pyXLMS.parser import read >>> from pyXLMS.transform import validate >>> pr = read(["data/ms_annika/XLpeplib_Beveridge_QEx-HFX_DSS_R1_CSMs.xlsx", "data/ms_annika/XLpeplib_Beveridge_QEx-HFX_DSS_R1_Crosslinks.xlsx"], engine="MS Annika", crosslinker="DSS") >>> len(pr["crosslink-spectrum-matches"]) 826 >>> len(pr["crosslinks"]) 300 >>> validated = validate(pr, fdr=0.05) >>> len(validated["crosslink-spectrum-matches"]) 825 >> len(validated["crosslinks"]) 260 """ _ok = check_input_multi(data, "data", [dict, list]) _ok = check_input(fdr, "fdr", float) _ok = check_input(formula, "formula", str) _ok = check_input(score, "score", str) _ok = check_input(separate_intra_inter, "separate_intra_inter", bool) _ok = check_input(ignore_missing_labels, "ignore_missing_labels", bool) if fdr >= 1.0 or fdr <= 0.0: raise ValueError( "FDR must be given as a real number between 0 and 1, e.g. 0.01 corresponds to 1% FDR!" ) if formula not in ["D/T", "(TD+DD)/TT", "(TD-DD)/TT"]: raise TypeError( "Parameter 'formula' has to be one of 'D/T', '(TD+DD)/TT' or '(TD-DD)/TT'! Where D and DD is the number of decoys, T and TT the number of targets, " "and TD the number of target-decoys!" ) if score not in ["higher_better", "lower_better"]: raise TypeError( "Parameter 'score' has to be one of 'higher_better' or 'lower_better'! If two identical crosslinks or crosslink-spectrum" "-matches are found, the one with the higher score is kept if 'higher_better' is selected, and vice versa." ) if isinstance(data, list): _ok = check_input(data, "data", list, dict) if len(data) == 0: return data if "data_type" not in data[0] or data[0]["data_type"] not in [ "crosslink", "crosslink-spectrum-match", ]: raise TypeError( "Unsupported data type for input data! Parameter data has to be a list of crosslink or crosslink-spectrum-match, " "or a parser_result!" ) if ignore_missing_labels: data = [ item for item in data if item["alpha_decoy"] is not None and item["beta_decoy"] is not None ] available_keys = get_available_keys(data) if ( not available_keys["score"] or not available_keys["alpha_decoy"] or not available_keys["beta_decoy"] ): raise ValueError( "Can't validate data if 'score' or target/decoy labels are missing! Selecting 'ignore_missing_labels = True' will ignore crosslinks and crosslink-spectrum-matches " "that don't have a valid target/decoy label and filter them out!" ) if formula == "(TD-DD)/TT": if len(filter_target_decoy(data)["Target-Decoy"]) == 0: raise ValueError( "Can't estimate FDR with formula '(TD-DD)/TT' when there are not TD matches! Please select the default formula instead!" ) if separate_intra_inter: separate = filter_crosslink_type(data) return __validate_relaxed( separate["Intra"], fdr, score ) + __validate_relaxed(separate["Inter"], fdr, score) return __validate_relaxed(data, fdr, score) if separate_intra_inter: separate = filter_crosslink_type(data) return __validate_strict(separate["Intra"], fdr, score) + __validate_strict( separate["Inter"], fdr, score ) return __validate_strict(data, fdr, score) if "data_type" not in data or data["data_type"] != "parser_result": raise TypeError("Can't validate dict. Dict has to be a valid 'parser_result'!") new_csms = ( validate( data["crosslink-spectrum-matches"], fdr, formula, score, separate_intra_inter, ignore_missing_labels, ) if data["crosslink-spectrum-matches"] is not None else None ) new_xls = ( validate( data["crosslinks"], fdr, formula, score, separate_intra_inter, ignore_missing_labels, ) if data["crosslinks"] is not None else None ) if new_csms is not None: if not isinstance(new_csms, list): raise RuntimeError( "Something went wrong while validating crosslink-spectrum-matches.\n" f"Expected data type: list. Got: {type(new_csms)}." ) if new_xls is not None: if not isinstance(new_xls, list): raise RuntimeError( "Something went wrong while validating crosslinks.\n" f"Expected data type: list. Got: {type(new_xls)}." ) return create_parser_result( search_engine=data["search_engine"], csms=new_csms, crosslinks=new_xls )