Source code for pyXLMS.parser._parser_xldbse_plink

#!/usr/bin/env python3

# 2025 (c) Micha Johannes Birklbauer
# https://github.com/michabirklbauer/
# micha.birklbauer@gmail.com

from __future__ import annotations

import io
import warnings
import pandas as pd
from tqdm import tqdm

from ..data._parser_result import ParserResult
from ..data._util import check_input
from ..data._csm import create_csm
from ..data._crosslink import create_crosslink
from ..data._parser_result import create_parser_result
from ..constants import MODIFICATIONS
from ._util import format_sequence
from ._util import __serialize_pandas_series
from ._util import __parse_int, __parse_float

from typing import Optional
from typing import BinaryIO
from typing import Dict
from typing import Any
from typing import Tuple
from typing import List
from typing import Callable

# legacy
try:
    from typing import Literal
except ImportError:
    from typing_extensions import Literal


def __parse_modifications_from_plink_modifications_str(
    seq: str,
    mod_str: Optional[str | float],
    crosslinker: str,
    modifications: Dict[str, float] = MODIFICATIONS,
    verbose: Literal[0, 1, 2] = 1,
) -> Tuple[Dict[int, Tuple[str, float]], Dict[int, Tuple[str, float]]]:
    r"""Parse post-translational-modifications from a pLink modification string.

    Parses post-translational-modifications (PTMs) from a pLink modification string,
    for example "Carbamidomethyl[C](4);Oxidation[M](23)".

    Parameters
    ----------
    seq : str
        The pLink crosslink sequence string.
    mod_str : str, float, or None
        The pLink modification value, as string or float. Can be None.
    crosslinker : str
        Name of the used cross-linking reagent, for example "DSSO".
    modifications: dict of str, float, default = ``constants.MODIFICATIONS``
        Mapping of modification names to modification masses.
    verbose : 0, 1, or 2, default = 1
        - 0: All warnings are ignored.
        - 1: Warnings are printed to stdout.
        - 2: Warnings are treated as errors.

    Returns
    -------
    tuple of dict of int, tuple
        The ``pyXLMS`` specific modification objects, dictionaries that map positions to their corresponding modifications and their
        monoisotopic masses. The first object (index 0) corresponds to the modifications of the first peptide, the second object (index 1)
        corresponds to the modifications of the second peptide.

    Raises
    ------
    RuntimeError
        If multiple modifications on the same residue are parsed (only for ``verbose = 2``).
    KeyError
        If an unknown modification is encountered.

    Notes
    -----
    This function should not be called directly, it is called from ``read_plink()``.
    """
    modifications_a = dict()
    modifications_b = dict()
    xl_pos_a = __parse_int(seq.split("-")[0].split("(")[1].split(")")[0])
    xl_pos_b = __parse_int(seq.split("-")[1].split("(")[1].split(")")[0])
    if crosslinker in modifications:
        modifications_a[xl_pos_a] = (crosslinker, modifications[crosslinker])
        modifications_b[xl_pos_b] = (crosslinker, modifications[crosslinker])
    else:
        raise KeyError(
            f"Key {crosslinker} not found in parameter 'modifications'. Are you missing a modification?"
        )
    if mod_str is None:
        return (modifications_a, modifications_b)
    if isinstance(mod_str, float) and pd.isna(mod_str):
        return (modifications_a, modifications_b)
    mod_str = str(mod_str).strip()
    if mod_str == "nan":
        return (modifications_a, modifications_b)
    mods = mod_str.split(";")
    for mod in mods:
        mod_desc = mod.split("[")[0].strip()
        if mod_desc not in modifications:
            raise KeyError(
                f"Key {mod_desc} not found in parameter 'modifications'. Are you missing a modification?"
            )
        mod_pos = __parse_int(mod.split("(")[1].split(")")[0])
        if mod_pos > len(seq.split("-")[0]):
            mod_pos = mod_pos - len(seq.split("-")[0])
            if mod_pos in modifications_b:
                if verbose == 2:
                    raise RuntimeError(
                        f"Modification at position {mod_pos} already exists!"
                    )
                if verbose == 1:
                    warnings.warn(
                        RuntimeWarning(
                            f"Modification at position {mod_pos} already exists!"
                        )
                    )
                t1 = modifications_b[mod_pos][0] + "," + mod_desc
                t2 = modifications_b[mod_pos][1] + modifications[mod_desc]
                modifications_b[mod_pos] = (t1, t2)
            else:
                modifications_b[mod_pos] = (mod_desc, modifications[mod_desc])
        else:
            if mod_pos in modifications_a:
                if verbose == 2:
                    raise RuntimeError(
                        f"Modification at position {mod_pos} already exists!"
                    )
                if verbose == 1:
                    warnings.warn(
                        RuntimeWarning(
                            f"Modification at position {mod_pos} already exists!"
                        )
                    )
                t1 = modifications_a[mod_pos][0] + "," + mod_desc
                t2 = modifications_a[mod_pos][1] + modifications[mod_desc]
                modifications_a[mod_pos] = (t1, t2)
            else:
                modifications_a[mod_pos] = (mod_desc, modifications[mod_desc])
    return (modifications_a, modifications_b)


def __parse_proteins_and_position_from_plink(
    seq: str,
    proteins: str,
) -> Dict[str, Any]:
    r"""Parses proteins and positions from pLink results.

    Parses proteins, as well as peptide and crosslink positions from a pLink crosslink sequence
    and protein string.

    Parameters
    ----------
    seq : str
        The pLink crosslink sequence string.
    proteins : str
        The pLink proteins string.

    Returns
    -------
    dict of str, Any
        A dictionary with the following keys and information:
        ``xl_pos_a``, ``proteins_a``, ``proteins_a_xl_positions``, ``proteins_a_pep_positions``,
        ``xl_pos_b``, ``proteins_b``, ``proteins_b_xl_positions``, ``proteins_b_pep_positions``.

    Notes
    -----
    This function should not be called directly, it is called from ``read_plink()``.
    """
    xl_pos_a = __parse_int(seq.split("-")[0].split("(")[1].split(")")[0])
    xl_pos_b = __parse_int(seq.split("-")[1].split("(")[1].split(")")[0])
    # proteins a
    proteins_set_a = set()
    proteins_a = list()
    proteins_a_xl_positions = list()
    proteins_a_pep_positions = list()
    # proteins b
    proteins_set_b = set()
    proteins_b = list()
    proteins_b_xl_positions = list()
    proteins_b_pep_positions = list()
    # find unique
    proteins = proteins.strip().rstrip("/")
    for protein_pair in proteins.split("/"):
        protein_a = protein_pair.split("-")[0].strip()
        protein_b = protein_pair.split("-")[1].strip()
        proteins_set_a.add(protein_a)
        proteins_set_b.add(protein_b)
    # get proteins a
    for protein in sorted(proteins_set_a):
        acc = protein.split("(")[0]
        pos = __parse_int(protein.split("(")[1].split(")")[0])
        proteins_a.append(acc)
        proteins_a_xl_positions.append(pos)
        proteins_a_pep_positions.append(pos - xl_pos_a + 1)
    # get proteins b
    for protein in sorted(proteins_set_b):
        acc = protein.split("(")[0]
        pos = __parse_int(protein.split("(")[1].split(")")[0])
        proteins_b.append(acc)
        proteins_b_xl_positions.append(pos)
        proteins_b_pep_positions.append(pos - xl_pos_b + 1)
    return {
        "xl_pos_a": xl_pos_a,
        "proteins_a": proteins_a,
        "proteins_a_xl_positions": proteins_a_xl_positions,
        "proteins_a_pep_positions": proteins_a_pep_positions,
        "xl_pos_b": xl_pos_b,
        "proteins_b": proteins_b,
        "proteins_b_xl_positions": proteins_b_xl_positions,
        "proteins_b_pep_positions": proteins_b_pep_positions,
    }


def __read_plink_cross_linked_peptides_file(
    file: str | BinaryIO, sep: str = ",", decimal: str = ".", **kwargs
) -> pd.DataFrame:
    r"""Reads a pLink cross-linked peptides file into a pandas DataFrame.

    Reads a pLink cross-linked peptides file into a pandas DataFrame. Reading
    skips all lines starting with ``sep``.

    Parameters
    ----------
    file : str, or BinaryIO
        The name/path of the pLink result file or a file-like object/stream.
    sep : str, default = ","
        Seperator used in the ``.csv`` file.
    decimal : str, default = "."
        Character to recognize as decimal point.
    **kwargs
        Any additional parameters will be passed to ``pandas.read*``.

    Returns
    -------
    pd.DataFrame
        The parsed cross-linked peptides.

    Raises
    ------
    RuntimeError
        If the file could not be parsed.
    ValueError
        If the file is not a pLink cross-linked peptides file.

    Notes
    -----
    This function should not be called directly, it is called from ``read_plink()`` and ``detect_plink_filetype()``.
    """
    parsed_lines = None
    if isinstance(file, str):
        with open(file, "r", encoding="utf-8") as f:
            parsed_lines = f.readlines()
            f.close()
    else:
        file.seek(0)
        parsed_lines = file.readlines()
        file.seek(0)
    if parsed_lines is None:
        raise RuntimeError("Something went wrong while parsing the file!")
    lines = list()
    for line in parsed_lines:
        preprocessed_line = str(line).strip()
        if not preprocessed_line.startswith(sep):
            lines.append(preprocessed_line)
    df = pd.read_csv(
        io.StringIO("\n".join(lines)),
        sep=sep,
        decimal=decimal,
        low_memory=False,
        **kwargs,
    )
    colnames = df.columns.values.tolist()
    if not (
        "Peptide_Order" in colnames
        and "Peptide" in colnames
        and "Peptide_Mass" in colnames
        and "Modifications" in colnames
        and "Proteins" in colnames
        and "Protein_Type" in colnames
    ):
        raise ValueError(
            "The provided file seems not to be a pLink cross-linked peptides file!"
        )
    return df