Skip to content

Support for Custom Modifications in Multiprocessing #303

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -149,3 +149,4 @@ _docs*
.idea

test_data
*/integration/input_data
49 changes: 49 additions & 0 deletions alphabase/constants/modification.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,55 @@ def has_custom_mods():
return len(MOD_DF[MOD_DF["classification"] == _MOD_CLASSIFICATION_USER_ADDED]) > 0


def get_custom_mods():

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added two new functions: get_custom_mods() to extract custom modifications as a serializable dictionary, and init_custom_mods() to initialize these modifications in child processes. This enables transferring custom modification data to worker processes during multiprocessing.

def get_custom_mods():
    "
    Returns a dictionary of user-defined modifications that can be serialized and passed to child processes.

    Returns
    -------
    dict
        Dictionary with modification names as keys and modification details as values
    "
    if not has_custom_mods():
        return {}

    custom_mods = MOD_DF[MOD_DF["classification"] == _MOD_CLASSIFICATION_USER_ADDED]
    result = {}

    for mod_name, row in custom_mods.iterrows():
        result[mod_name] = {
            "composition": row["composition"],
            "modloss_composition": row["modloss_composition"],
            "smiles": row["smiles"],
        }

    return result


def init_custom_mods(custom_mods_dict):
    "
    Initialize custom modifications in a child process from a dictionary.

    Parameters
    ----------
    custom_mods_dict : dict
        Dictionary of custom modifications as returned by get_custom_mods()
    "
    if not custom_mods_dict:
        return

    for mod_name, mod_info in custom_mods_dict.items():
        _add_a_new_modification(
            mod_name=mod_name,
            composition=mod_info["composition"],
            modloss_composition=mod_info["modloss_composition"],
            smiles=mod_info["smiles"],
        )

    # Update all dictionaries after adding modifications
    update_all_by_MOD_DF()

"""
Returns a dictionary of user-defined modifications that can be serialized and passed to child processes.

Returns
-------
dict
Dictionary with modification names as keys and modification details as values
"""
if not has_custom_mods():
return {}

custom_mods = MOD_DF[MOD_DF["classification"] == _MOD_CLASSIFICATION_USER_ADDED]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the main drawback I see here is that this code needs to be adapted whenever something is added to the MOD_DF .. this would be easily missed and then the multiprocessing part of the code would behave differently

result = {}

for mod_name, row in custom_mods.iterrows():
result[mod_name] = {
"composition": row["composition"],
"modloss_composition": row["modloss_composition"],
"smiles": row["smiles"],
}

return result


def init_custom_mods(custom_mods_dict):
"""
Initialize custom modifications in a child process from a dictionary.

Parameters
----------
custom_mods_dict : dict
Dictionary of custom modifications as returned by get_custom_mods()
"""
if not custom_mods_dict:
return

for mod_name, mod_info in custom_mods_dict.items():
_add_a_new_modification(
mod_name=mod_name,
composition=mod_info["composition"],
modloss_composition=mod_info["modloss_composition"],
smiles=mod_info["smiles"],
)

# Update all dictionaries after adding modifications
update_all_by_MOD_DF()


def add_new_modifications(new_mods: Union[list, dict]):
"""Add new modifications into :data:`MOD_DF`.

Expand Down
98 changes: 71 additions & 27 deletions alphabase/peptide/precursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -483,14 +483,27 @@ def _batchify_df(df_group, mp_batch_size):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improved the _count_batchify_df function by using integer division to calculate the number of batches more efficiently. This avoids creating unnecessary range objects just to count iterations.

def _batchify_df(df_group, mp_batch_size):
    "Internal funciton for multiprocessing"
    for _, df in df_group:
        for i in range(0, len(df), mp_batch_size):
            yield df.iloc[i : i + mp_batch_size, :]


def _count_batchify_df(df_group, mp_batch_size):
    "Internal function"
    count = 0
    for _, group in df_group:
        count += (len(group) + mp_batch_size - 1) // mp_batch_size
    return count


def _count_batchify_df(df_group, mp_batch_size):
"""Internal funciton for multiprocessing"""
"""Internal function"""
count = 0
for _, df in df_group:
for _ in range(0, len(df), mp_batch_size):
count += 1
for _, group in df_group:
count += (len(group) + mp_batch_size - 1) // mp_batch_size
return count


def _init_worker(custom_mods_dict):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a new _init_worker function that serves as an initializer for worker processes, handling the initialization of custom modifications in each worker process.

def _init_worker(custom_mods_dict):
    "
    Initialize a worker process with custom modifications.

    Parameters
    ----------
    custom_mods_dict : dict
        Dictionary of custom modifications as returned by get_custom_mods()
    "
    from alphabase.constants.modification import init_custom_mods

    init_custom_mods(custom_mods_dict)

"""
Initialize a worker process with custom modifications.

Parameters
----------
custom_mods_dict : dict
Dictionary of custom modifications as returned by get_custom_mods()
"""
from alphabase.constants.modification import init_custom_mods

init_custom_mods(custom_mods_dict)


# `progress_bar` should be replaced by more advanced tqdm wrappers created by Sander
# I will leave it to alphabase.utils
def calc_precursor_isotope_info_mp(
Expand All @@ -501,56 +514,74 @@ def calc_precursor_isotope_info_mp(
min_right_most_intensity: float = 0.2,
min_precursor_num_to_run_mp: int = 10000,
) -> pd.DataFrame:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Completely refactored the calc_precursor_isotope_info_mp function to add support for custom modifications. Key improvements: 1) Added proper initialization of worker processes with custom mods, 2) Enhanced documentation with detailed parameter descriptions, 3) Better progress bar handling, and 4) Simplified the early return condition.

# `progress_bar` should be replaced by more advanced tqdm wrappers created by Sander
# I will leave it to alphabase.utils
def calc_precursor_isotope_info_mp(
    precursor_df: pd.DataFrame,
    processes: int = 8,
    mp_batch_size: int = 10000,
    progress_bar=None,
    min_right_most_intensity: float = 0.2,
    min_precursor_num_to_run_mp: int = 10000,
) -> pd.DataFrame:
    "Calculate isotope info for precursor_df using multiprocessing.

    Parameters
    ----------

    precursor_df : pd.DataFrame
        Precursor_df to calculate isotope info

    processes : int
        Process number. Optional, by default 8

    mp_batch_size : int
        Multiprocessing batch size. Optional, by default 10000.

    progress_bar : tqdm.tqdm
        Progress bar. Optional, by default None

    min_right_most_intensity : float
        The minimal intensity value of the right-most peak relative to apex peak.

    min_precursor_num_to_run_mp : int
        The minimal number of precursors to run multiprocessing. Optional, by default 10000.

    Returns
    -------

    pd.DataFrame
        precursor_df with additional columns:
        - isotope_apex_offset
        - isotope_apex_mz
        - isotope_apex_intensity
        - isotope_right_most_offset
        - isotope_right_most_mz
        - isotope_right_most_intensity
        - isotope_m1_mz
        - isotope_m1_intensity
        - mono_isotope_idx
    "
    if processes <= 1 or len(precursor_df) < min_precursor_num_to_run_mp:
        return calc_precursor_isotope_info(precursor_df)

    # Get custom modifications to pass to worker processes
    from alphabase.constants.modification import get_custom_mods

    custom_mods_dict = get_custom_mods()

    df_list = []
    df_group = precursor_df.groupby("nAA")

    with mp.get_context("spawn").Pool(
        processes, initializer=_init_worker, initargs=(custom_mods_dict,)
    ) as p:
        processing = p.imap(
            partial(
                calc_precursor_isotope_info,
                min_right_most_intensity=min_right_most_intensity,
            ),
            _batchify_df(df_group, mp_batch_size),
        )

        if progress_bar:
            for df in progress_bar(
                processing, _count_batchify_df(df_group, mp_batch_size)
            ):
                df_list.append(df)
        else:
            for df in processing:
                df_list.append(df)
    return pd.concat(df_list)

"""`calc_precursor_isotope` is not that fast for large dataframes,
so here we use multiprocessing for faster isotope pattern calculation.
The speed is acceptable with multiprocessing (3.8 min for 21M precursors, 8 processes).
"""Calculate isotope info for precursor_df using multiprocessing.

Parameters
----------

precursor_df : pd.DataFrame
Precursor_df to calculate
Precursor_df to calculate isotope info

processes : int
Process number. Optional, by default 8

mp_batch_size : int
Multiprocessing batch size. Optional, by default 100000.
Multiprocessing batch size. Optional, by default 10000.

progress_bar : Callable
The tqdm-based callback function
to check multiprocessing. Defaults to None.
progress_bar : tqdm.tqdm
Progress bar. Optional, by default None

min_right_most_intensity : float
The minimal intensity value of the right-most peak relative to apex peak.
Optional, by default 0.2

min_precursor_num_to_run_mp : int
The minimal number of precursors to run multiprocessing. Optional, by default 10000.

Returns
-------

pd.DataFrame
DataFrame with `isotope_*` columns,
see :meth:'calc_precursor_isotope()'.
precursor_df with additional columns:
- isotope_apex_offset
- isotope_apex_mz
- isotope_apex_intensity
- isotope_right_most_offset
- isotope_right_most_mz
- isotope_right_most_intensity
- isotope_m1_mz
- isotope_m1_intensity
- mono_isotope_idx
"""
if len(precursor_df) < min_precursor_num_to_run_mp or processes <= 1:
return calc_precursor_isotope_info(
precursor_df=precursor_df,
min_right_most_intensity=min_right_most_intensity,
)
if processes <= 1 or len(precursor_df) < min_precursor_num_to_run_mp:
return calc_precursor_isotope_info(precursor_df)

# Get custom modifications to pass to worker processes
from alphabase.constants.modification import get_custom_mods

custom_mods_dict = get_custom_mods()

df_list = []
df_group = precursor_df.groupby("nAA")
with mp.get_context("spawn").Pool(processes) as p:

with mp.get_context("spawn").Pool(
processes, initializer=_init_worker, initargs=(custom_mods_dict,)
) as p:
processing = p.imap(
partial(
calc_precursor_isotope_info,
min_right_most_intensity=min_right_most_intensity,
),
_batchify_df(df_group, mp_batch_size),
)

if progress_bar:
processing = progress_bar(
for df in progress_bar(
processing, _count_batchify_df(df_group, mp_batch_size)
)
for df in processing:
df_list.append(df)
):
df_list.append(df)
else:
for df in processing:
df_list.append(df)
return pd.concat(df_list)


Expand Down Expand Up @@ -653,6 +684,12 @@ def calc_precursor_isotope_intensity_mp(
min_right_most_intensity : float

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Enhanced the calc_precursor_isotope_intensity_mp function to support custom modifications in multiprocessing. Added worker initialization, improved documentation with better parameter descriptions (especially for the normalize parameter), and made the function's interface more consistent with other multiprocessing functions.

def calc_precursor_isotope_intensity_mp(
    precursor_df,
    max_isotope=6,
    min_right_most_intensity=0.001,
    normalize: typing.Literal["mono", "sum"] = "sum",
    mp_batch_size=1000,
    mp_process_num=8,
    progress_bar=True,
) -> pd.DataFrame:
    "Calculate isotope intensity values for precursor_df using multiprocessing.

    Parameters
    ----------

    precursor_df : pd.DataFrame
        Precursor_df to calculate isotope intensity

    max_isotope : int
        Max isotope number to calculate. Optional, by default 6

    min_right_most_intensity : float
        The minimal intensity value of the right-most peak relative to apex peak.

    normalize : typing.Literal["mono", "sum"]
        How to normalize the isotope intensities.
        "mono": normalize to monoisotopic peak
        "sum": normalize to sum of all peaks
        Optional, by default "sum"

    mp_batch_size : int
        Multiprocessing batch size. Optional, by default 1000.

    mp_process_num : int
        Process number. Optional, by default 8

    progress_bar : bool
        Whether to show progress bar. Optional, by default True

    Returns
    -------

    pd.DataFrame
        precursor_df with additional columns i_0, i_1, i_2, ... i_{max_isotope-1}

    "

    if mp_process_num <= 1:
        return calc_precursor_isotope_intensity(
            precursor_df=precursor_df,
            max_isotope=max_isotope,
            min_right_most_intensity=min_right_most_intensity,
            normalize=normalize,
        )

    # Get custom modifications to pass to worker processes
    from alphabase.constants.modification import get_custom_mods

    custom_mods_dict = get_custom_mods()

    df_list = []
    df_group = precursor_df.groupby("nAA")

    with mp.get_context("spawn").Pool(
        mp_process_num, initializer=_init_worker, initargs=(custom_mods_dict,)
    ) as p:
        processing = p.imap(
            partial(
                calc_precursor_isotope_intensity,
                max_isotope=max_isotope,
                min_right_most_intensity=min_right_most_intensity,
                normalize=normalize,
            ),
            _batchify_df(df_group, mp_batch_size),
        )

        if progress_bar:
            df_list = list(
                tqdm(processing, total=_count_batchify_df(df_group, mp_batch_size))
            )
        else:
            df_list = list(processing)

    return pd.concat(df_list)

The minimal intensity value of the right-most peak relative to apex peak.

normalize : typing.Literal["mono", "sum"]
How to normalize the isotope intensities.
"mono": normalize to monoisotopic peak
"sum": normalize to sum of all peaks
Optional, by default "sum"

mp_batch_size : int
Multiprocessing batch size. Optional, by default 1000.

Expand All @@ -678,10 +715,17 @@ def calc_precursor_isotope_intensity_mp(
normalize=normalize,
)

# Get custom modifications to pass to worker processes
from alphabase.constants.modification import get_custom_mods

custom_mods_dict = get_custom_mods()

df_list = []
df_group = precursor_df.groupby("nAA")

with mp.get_context("spawn").Pool(mp_process_num) as p:
with mp.get_context("spawn").Pool(
mp_process_num, initializer=_init_worker, initargs=(custom_mods_dict,)
) as p:
processing = p.imap(
partial(
calc_precursor_isotope_intensity,
Expand All @@ -699,7 +743,7 @@ def calc_precursor_isotope_intensity_mp(
else:
df_list = list(processing)

return pd.concat(df_list, ignore_index=True)
return pd.concat(df_list)


calc_precursor_isotope = calc_precursor_isotope_intensity
Expand Down
34 changes: 23 additions & 11 deletions alphabase/spectral_library/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import numpy as np
import pandas as pd

from alphabase.constants.modification import has_custom_mods
from alphabase.io.hdf import HDF_File
from alphabase.peptide.fragment import (
calc_fragment_count,
Expand Down Expand Up @@ -417,9 +416,14 @@ def calc_precursor_isotope_intensity(
mp_batch_size : int, optional
The batch size for multiprocessing.

mp_processes : int, optional
mp_process_num : int, optional
The number of processes for multiprocessing.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improved documentation for the calc_precursor_isotope_intensity method, especially clarifying the normalize parameter. Removed the check against custom modifications since they are now supported in multiprocessing.

        """Calculate and append the isotope intensity columns into self.precursor_df.
        See `alphabase.peptide.calc_precursor_isotope_intensity` for details.

        Parameters
        ----------

        max_isotope : int, optional
            The maximum isotope to calculate.

        min_right_most_intensity : float, optional
            The minimum intensity of the right most isotope.

        mp_batch_size : int, optional
            The batch size for multiprocessing.

        mp_process_num : int, optional
            The number of processes for multiprocessing.

        normalize : typing.Literal["mono", "sum"], optional
            How to normalize the isotope intensities.
            "mono": normalize to monoisotopic peak
            "sum": normalize to sum of all peaks
            Defaults to "sum".
        """

        if "precursor_mz" not in self._precursor_df.columns:
            self.calc_and_clip_precursor_mz()

        do_multiprocessing = (
            mp_process_num > 1 and len(self.precursor_df) > mp_batch_size
        )

        # Custom modifications are now supported in multiprocessing


normalize : typing.Literal["mono", "sum"], optional
How to normalize the isotope intensities.
"mono": normalize to monoisotopic peak
"sum": normalize to sum of all peaks
Defaults to "sum".
"""

if "precursor_mz" not in self._precursor_df.columns:
Expand All @@ -429,13 +433,7 @@ def calc_precursor_isotope_intensity(
mp_process_num > 1 and len(self.precursor_df) > mp_batch_size
)

if do_multiprocessing and has_custom_mods():
logging.warning(
"Multiprocessing not compatible with custom modifications yet, falling back to single process."
)
do_multiprocessing = False
# TODO enable multiprocessing also in this case

# Custom modifications are now supported in multiprocessing
if do_multiprocessing:
(self._precursor_df) = calc_precursor_isotope_intensity_mp(
self.precursor_df,
Expand Down Expand Up @@ -477,15 +475,29 @@ def calc_precursor_isotope_info(
):
"""
Append isotope columns into self.precursor_df.
See `alphabase.peptide.calc_precursor_isotope` for details.
See `alphabase.peptide.calc_precursor_isotope_info` for details.

Parameters

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improved documentation for the calc_precursor_isotope_info method with better parameter descriptions. Removed the check against custom modifications since that limitation no longer exists.

    def calc_precursor_isotope_info(
        self,
        mp_process_num: int = 8,
        mp_process_bar=None,
        mp_batch_size=10000,
    ):
        """
        Append isotope columns into self.precursor_df.
        See `alphabase.peptide.calc_precursor_isotope_info` for details.

        Parameters
        ----------
        mp_process_num : int, optional
            The number of processes for multiprocessing. Defaults to 8.

        mp_process_bar : tqdm.tqdm, optional
            Progress bar. Defaults to None.

        mp_batch_size : int, optional
            The batch size for multiprocessing. Defaults to 10000.
        """
        if "precursor_mz" not in self._precursor_df.columns:
            self.calc_and_clip_precursor_mz()

        # Custom modifications are now supported in multiprocessing

----------
mp_process_num : int, optional
The number of processes for multiprocessing. Defaults to 8.

mp_process_bar : tqdm.tqdm, optional
Progress bar. Defaults to None.

mp_batch_size : int, optional
The batch size for multiprocessing. Defaults to 10000.
"""
if "precursor_mz" not in self._precursor_df.columns:
self.calc_and_clip_precursor_mz()

# Custom modifications are now supported in multiprocessing
if mp_process_num > 1 and len(self.precursor_df) > mp_batch_size:
(self._precursor_df) = calc_precursor_isotope_info_mp(
self.precursor_df,
processes=mp_process_num,
process_bar=mp_process_bar,
progress_bar=mp_process_bar,
mp_batch_size=mp_batch_size,
)
else:
(self._precursor_df) = calc_precursor_isotope_info(self.precursor_df)
Expand Down
Loading
Loading