Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelizes MDAnalysis.analysis.InterRDF and MDAnalysis.analysis.InterRDF_s #4884

Open
wants to merge 12 commits into
base: develop
Choose a base branch
from
1 change: 1 addition & 0 deletions package/CHANGELOG
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ Enhancements
* Enable parallelization for analysis.nucleicacids.NucPairDist (Issue #4670)
* Add check and warning for empty (all zero) coordinates in RDKit converter (PR #4824)
* Added `precision` for XYZWriter (Issue #4775, PR #4771)
* Parallelize `analysis.rdf.InterRDF` and `analysis.rdf.InterRDF` (Issue #4675)


Changes
Expand Down
86 changes: 85 additions & 1 deletion package/MDAnalysis/analysis/rdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@
import numpy as np

from ..lib import distances
from .base import AnalysisBase
from .base import AnalysisBase, ResultsGroup


class InterRDF(AnalysisBase):
Expand Down Expand Up @@ -216,8 +216,23 @@ class InterRDF(AnalysisBase):
Store results as attributes `bins`, `edges`, `rdf` and `count`
of the `results` attribute of
:class:`~MDAnalysis.analysis.AnalysisBase`.

.. versionchanged:: 2.9.0
Enabled **parallel execution** with the ``multiprocessing`` and ``dask``
backends; use the new method :meth:`get_supported_backends` to see all
supported backends.
"""

@classmethod
def get_supported_backends(cls):
return (
"serial",
"multiprocessing",
"dask",
)

_analysis_algorithm_is_parallelizable = True

def __init__(
self,
g1,
Expand Down Expand Up @@ -273,6 +288,7 @@ def _prepare(self):

if self.norm == "rdf":
# Cumulative volume for rdf normalization
self.results.volume_cum = 0
self.volume_cum = 0
# Set the max range to filter the search radius
self._maxrange = self.rdf_settings["range"][1]
Expand Down Expand Up @@ -302,8 +318,19 @@ def _single_frame(self):
self.results.count += count

if self.norm == "rdf":
self.results.volume_cum += self._ts.volume
self.volume_cum += self._ts.volume

def _get_aggregator(self):
return ResultsGroup(
lookup={
"count": ResultsGroup.ndarray_sum,
"volume_cum": ResultsGroup.ndarray_sum,
"bins": ResultsGroup.ndarray_sum,
"edges": ResultsGroup.ndarray_mean,
}
)

def _conclude(self):
norm = self.n_frames
if self.norm in ["rdf", "density"]:
Expand All @@ -324,6 +351,7 @@ def _conclude(self):
N -= xA * xB * nblocks

# Average number density
self.volume_cum = self.results.volume_cum
box_vol = self.volume_cum / self.n_frames
norm *= N / box_vol

Expand Down Expand Up @@ -561,8 +589,61 @@ class InterRDF_s(AnalysisBase):
Instead of `density=True` use `norm='density'`
.. deprecated:: 2.3.0
The `universe` parameter is superflous.

.. versionchanged:: 2.9.0
Enabled **parallel execution** with the ``multiprocessing`` and ``dask``
backends; use the new method :meth:`get_supported_backends` to see all
supported backends.
"""

@classmethod
def get_supported_backends(cls):
return (
"serial",
"multiprocessing",
"dask",
)

_analysis_algorithm_is_parallelizable = True

@staticmethod
def func(arrs):
r"""Custom aggregator for nested arrays

Parameters
----------
arrs : list
List of arrays or nested lists of arrays

Returns
-------
ndarray
Sums flattened arrays at alternate index
in the list and returns a list of two arrays
"""

def flatten(arr):
if isinstance(arr, (list, tuple)):
return [item for sublist in arr for item in flatten(sublist)]
return [arr]

flat = flatten(arrs)
aggregated_arr = [np.zeros_like(flat[0]), np.zeros_like(flat[1])]
for i in range(len(flat) // 2):
aggregated_arr[0] += flat[2 * i] # 0, 2, 4, ...
aggregated_arr[1] += flat[2 * i + 1] # 1, 3, 5, ...
return aggregated_arr

def _get_aggregator(self):
return ResultsGroup(
lookup={
"count": self.func,
"volume_cum": ResultsGroup.ndarray_sum,
"bins": ResultsGroup.ndarray_mean,
"edges": ResultsGroup.ndarray_mean,
}
)

def __init__(
self,
u,
Expand Down Expand Up @@ -614,6 +695,7 @@ def _prepare(self):

if self.norm == "rdf":
# Cumulative volume for rdf normalization
self.results.volume_cum = 0
self.volume_cum = 0
self._maxrange = self.rdf_settings["range"][1]

Expand All @@ -631,6 +713,7 @@ def _single_frame(self):
self.results.count[i][idx1, idx2, :] += count

if self.norm == "rdf":
self.results.volume_cum += self._ts.volume
self.volume_cum += self._ts.volume

def _conclude(self):
Expand All @@ -642,6 +725,7 @@ def _conclude(self):

if self.norm == "rdf":
# Average number density
self.volume_cum = self.results.volume_cum
norm *= 1 / (self.volume_cum / self.n_frames)

# Empty lists to restore indices, RDF
Expand Down
28 changes: 22 additions & 6 deletions package/MDAnalysis/core/selection.py
Original file line number Diff line number Diff line change
Expand Up @@ -1106,25 +1106,41 @@ class WaterSelection(Selection):

.. versionadded:: 2.9.0
"""
token = 'water'

token = "water"

# Recognized water resnames
water_res = {
'H2O', 'HOH', 'OH2', 'HHO', 'OHH',
'T3P', 'T4P', 'T5P', 'SOL', 'WAT',
'TIP', 'TIP2', 'TIP3', 'TIP4'
"H2O",
"HOH",
"OH2",
"HHO",
"OHH",
"T3P",
"T4P",
"T5P",
"SOL",
"WAT",
"TIP",
"TIP2",
"TIP3",
"TIP4",
}

def _apply(self, group):
resnames = group.universe._topology.resnames
nmidx = resnames.nmidx[group.resindices]

matches = [ix for (nm, ix) in resnames.namedict.items()
if nm in self.water_res]
matches = [
ix
for (nm, ix) in resnames.namedict.items()
if nm in self.water_res
]
mask = np.isin(nmidx, matches)

return group[mask]


class BackboneSelection(ProteinSelection):
"""A BackboneSelection contains all atoms with name 'N', 'CA', 'C', 'O'.

Expand Down
11 changes: 11 additions & 0 deletions testsuite/MDAnalysisTests/analysis/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from MDAnalysis.analysis.nucleicacids import NucPairDist
from MDAnalysis.analysis.contacts import Contacts
from MDAnalysis.analysis.density import DensityAnalysis
from MDAnalysis.analysis.rdf import InterRDF, InterRDF_s
from MDAnalysis.lib.util import is_installed


Expand Down Expand Up @@ -176,3 +177,13 @@ def client_Contacts(request):
@pytest.fixture(scope="module", params=params_for_cls(DensityAnalysis))
def client_DensityAnalysis(request):
return request.param


@pytest.fixture(scope="module", params=params_for_cls(InterRDF))
def client_InterRDF(request):
return request.param


@pytest.fixture(scope="module", params=params_for_cls(InterRDF_s))
def client_InterRDF_s(request):
return request.param
72 changes: 48 additions & 24 deletions testsuite/MDAnalysisTests/analysis/test_rdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,83 +48,85 @@ def sels(u):
return s1, s2


def test_nbins(u):
def test_nbins(u, client_InterRDF):
s1 = u.atoms[:3]
s2 = u.atoms[3:]
rdf = InterRDF(s1, s2, nbins=412).run()
rdf = InterRDF(s1, s2, nbins=412).run(**client_InterRDF)

assert len(rdf.results.bins) == 412


def test_range(u):
def test_range(u, client_InterRDF):
s1 = u.atoms[:3]
s2 = u.atoms[3:]
rmin, rmax = 1.0, 13.0
rdf = InterRDF(s1, s2, range=(rmin, rmax)).run()
rdf = InterRDF(s1, s2, range=(rmin, rmax)).run(**client_InterRDF)

assert rdf.results.edges[0] == rmin
assert rdf.results.edges[-1] == rmax


def test_count_sum(sels):
def test_count_sum(sels, client_InterRDF):
# OW vs HW
# should see 8 comparisons in count
s1, s2 = sels
rdf = InterRDF(s1, s2).run()
rdf = InterRDF(s1, s2).run(**client_InterRDF)
assert rdf.results.count.sum() == 8


def test_count(sels):
def test_count(sels, client_InterRDF):
# should see two distances with 4 counts each
s1, s2 = sels
rdf = InterRDF(s1, s2).run()
rdf = InterRDF(s1, s2).run(**client_InterRDF)
assert len(rdf.results.count[rdf.results.count == 4]) == 2


def test_double_run(sels):
def test_double_run(sels, client_InterRDF):
# running rdf twice should give the same result
s1, s2 = sels
rdf = InterRDF(s1, s2).run()
rdf.run()
rdf = InterRDF(s1, s2).run(**client_InterRDF)
rdf.run(**client_InterRDF)
assert len(rdf.results.count[rdf.results.count == 4]) == 2


def test_exclusion(sels):
def test_exclusion(sels, client_InterRDF):
# should see two distances with 4 counts each
s1, s2 = sels
rdf = InterRDF(s1, s2, exclusion_block=(1, 2)).run()
rdf = InterRDF(s1, s2, exclusion_block=(1, 2)).run(**client_InterRDF)
assert rdf.results.count.sum() == 4


@pytest.mark.parametrize(
"attr, count", [("residue", 8), ("segment", 0), ("chain", 8)]
)
def test_ignore_same_residues(sels, attr, count):
def test_ignore_same_residues(sels, attr, count, client_InterRDF):
# should see two distances with 4 counts each
s1, s2 = sels
rdf = InterRDF(s2, s2, exclude_same=attr).run()
rdf = InterRDF(s2, s2, exclude_same=attr).run(**client_InterRDF)
assert rdf.rdf[0] == 0
assert rdf.results.count.sum() == count


def test_ignore_same_residues_fails(sels):
def test_ignore_same_residues_fails(sels, client_InterRDF):
s1, s2 = sels
with pytest.raises(
ValueError, match="The exclude_same argument to InterRDF must be"
):
InterRDF(s2, s2, exclude_same="unsupported").run()
InterRDF(s2, s2, exclude_same="unsupported").run(**client_InterRDF)

with pytest.raises(
ValueError,
match="The exclude_same argument to InterRDF cannot be used with",
):
InterRDF(s2, s2, exclude_same="residue", exclusion_block=tuple()).run()
InterRDF(s2, s2, exclude_same="residue", exclusion_block=tuple()).run(
**client_InterRDF
)


@pytest.mark.parametrize("attr", ("rdf", "bins", "edges", "count"))
def test_rdf_attr_warning(sels, attr):
def test_rdf_attr_warning(sels, attr, client_InterRDF):
s1, s2 = sels
rdf = InterRDF(s1, s2).run()
rdf = InterRDF(s1, s2).run(**client_InterRDF)
wmsg = f"The `{attr}` attribute was deprecated in MDAnalysis 2.0.0"
with pytest.warns(DeprecationWarning, match=wmsg):
getattr(rdf, attr) is rdf.results[attr]
Expand All @@ -133,22 +135,44 @@ def test_rdf_attr_warning(sels, attr):
@pytest.mark.parametrize(
"norm, value", [("density", 1.956823), ("rdf", 244602.88385), ("none", 4)]
)
def test_norm(sels, norm, value):
def test_norm(sels, norm, value, client_InterRDF):
s1, s2 = sels
rdf = InterRDF(s1, s2, norm=norm).run()
rdf = InterRDF(s1, s2, norm=norm).run(**client_InterRDF)
assert_allclose(max(rdf.results.rdf), value)


@pytest.mark.parametrize(
"norm, norm_required", [("Density", "density"), (None, "none")]
)
def test_norm_values(sels, norm, norm_required):
def test_norm_values(sels, norm, norm_required, client_InterRDF):
s1, s2 = sels
rdf = InterRDF(s1, s2, norm=norm).run()
rdf = InterRDF(s1, s2, norm=norm).run(**client_InterRDF)
assert rdf.norm == norm_required


def test_unknown_norm(sels):
s1, s2 = sels
with pytest.raises(ValueError, match="invalid norm"):
InterRDF(s1, s2, sels, norm="foo")

# tests for parallelization

@pytest.mark.parametrize(
"classname,is_parallelizable",
[
(mda.analysis.rdf.InterRDF, True),
]
)
def test_class_is_parallelizable(classname, is_parallelizable):
assert classname._analysis_algorithm_is_parallelizable == is_parallelizable


@pytest.mark.parametrize(
"classname,backends",
[
(mda.analysis.rdf.InterRDF,
('serial', 'multiprocessing', 'dask',)),
]
)
def test_supported_backends(classname, backends):
assert classname.get_supported_backends() == backends
Loading
Loading