Skip to content

Commit

Permalink
Add docstrings
Browse files Browse the repository at this point in the history
  • Loading branch information
shunping committed Feb 15, 2025
1 parent 510754e commit b2ffec8
Show file tree
Hide file tree
Showing 7 changed files with 257 additions and 36 deletions.
38 changes: 38 additions & 0 deletions sdks/python/apache_beam/ml/anomaly/univariate/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,49 @@


class BaseTracker(abc.ABC):
"""Abstract base class for all univariate trackers."""
@abc.abstractmethod
def push(self, x):
"""Push a new value to the tracker.
Args:
x: The value to be pushed.
"""
raise NotImplementedError()

@abc.abstractmethod
def get(self):
"""Get the current tracking value.
Returns:
The current tracked value, the type of which depends on the specific
tracker implementation.
"""
raise NotImplementedError()


class WindowMode(Enum):
"""Enum representing the window mode for windowed trackers."""
#: operating on all data points from the beginning.
LANDMARK = 1
#: operating on a fixed-size sliding window of recent data points.
SLIDING = 2


class WindowedTracker(BaseTracker):
"""Abstract base class for trackers that operate on a data window.
This class provides a foundation for trackers that maintain a window of data,
either as a landmark window or a sliding window. It provides basic push and
pop operations.
Args:
window_mode: A `WindowMode` enum specifying whether the window is `LANDMARK`
or `SLIDING`.
**kwargs: Keyword arguments.
For `SLIDING` window mode, `window_size` can be specified to set the
maximum size of the sliding window. Defaults to 100.
"""
def __init__(self, window_mode, **kwargs):
if window_mode == WindowMode.SLIDING:
self._window_size = kwargs.get("window_size", 100)
Expand All @@ -44,7 +72,17 @@ def __init__(self, window_mode, **kwargs):
self._window_mode = window_mode

def push(self, x):
"""Adds a new value to the data window.
Args:
x: The value to be added to the window.
"""
self._queue.append(x)

def pop(self):
"""Removes and returns the oldest value from the data window (FIFO).
Returns:
The oldest value from the window.
"""
return self._queue.popleft()
64 changes: 58 additions & 6 deletions sdks/python/apache_beam/ml/anomaly/univariate/mean.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,16 @@
# limitations under the License.
#

"""Trackers for calculating mean in windowed fashion.
This module defines different types of mean trackers that operate on windows
of data. It includes:
* `SimpleSlidingMeanTracker`: Calculates mean using numpy in a sliding window.
* `IncLandmarkMeanTracker`: Incremental mean tracker in landmark window mode.
* `IncSlidingMeanTracker`: Incremental mean tracker in sliding window mode.
"""

import math
import warnings

Expand All @@ -23,22 +33,36 @@
from apache_beam.ml.anomaly.univariate.base import WindowMode
from apache_beam.ml.anomaly.univariate.base import WindowedTracker

__all__ = [
"SimpleSlidingMeanTracker",
"IncLandmarkMeanTracker",
"IncSlidingMeanTracker"
]


class MeanTracker(WindowedTracker):
"""Abstract base class for mean trackers.
Currently, it does not add any specific functionality but provides a type
hierarchy for mean trackers.
"""
pass


class SimpleSlidingMeanTracker(MeanTracker):
"""Sliding window mean tracker that calculates mean using NumPy.
This tracker uses NumPy's `nanmean` function to calculate the mean of the
values currently in the sliding window. It's a simple, non-incremental
approach.
Args:
window_size: The size of the sliding window.
"""
def __init__(self, window_size):
super().__init__(window_mode=WindowMode.SLIDING, window_size=window_size)

def get(self):
"""Calculates and returns the mean of the current sliding window.
Returns:
float: The mean of the values in the current sliding window.
Returns NaN if the window is empty.
"""
if len(self._queue) == 0:
return float('nan')

Expand All @@ -48,11 +72,27 @@ def get(self):


class IncMeanTracker(MeanTracker):
"""Base class for incremental mean trackers.
This class implements incremental calculation of the mean, which is more
efficient for streaming data as it updates the mean with each new data point
instead of recalculating from scratch.
Args:
window_mode: A `WindowMode` enum specifying whether the window is `LANDMARK`
or `SLIDING`.
**kwargs: Keyword arguments passed to the parent class constructor.
"""
def __init__(self, window_mode, **kwargs):
super().__init__(window_mode=window_mode, **kwargs)
self._mean = 0

def push(self, x):
"""Pushes a new value and updates the incremental mean.
Args:
x: The new value to be pushed.
"""
if not math.isnan(x):
self._n += 1
delta = x - self._mean
Expand All @@ -73,17 +113,29 @@ def push(self, x):
self._mean = 0

def get(self):
"""Returns the current incremental mean.
Returns:
float: The current incremental mean value.
Returns NaN if no valid (non-NaN) values have been pushed.
"""
if self._n < 1:
# keep it consistent with numpy
return float("nan")
return self._mean


class IncLandmarkMeanTracker(IncMeanTracker):
"""Landmark window mean tracker using incremental calculation."""
def __init__(self):
super().__init__(window_mode=WindowMode.LANDMARK)


class IncSlidingMeanTracker(IncMeanTracker):
"""Sliding window mean tracker using incremental calculation.
Args:
window_size: The size of the sliding window.
"""
def __init__(self, window_size):
super().__init__(window_mode=WindowMode.SLIDING, window_size=window_size)
5 changes: 1 addition & 4 deletions sdks/python/apache_beam/ml/anomaly/univariate/mean_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,7 @@ def test_accuracy_fuzz(self):


class SlidingMeanTest(unittest.TestCase):
@parameterized.expand([
# SimpleSlidingMeanTracker,
IncSlidingMeanTracker
])
@parameterized.expand([SimpleSlidingMeanTracker, IncSlidingMeanTracker])
def test_without_nan(self, tracker):
t = tracker(3)
self.assertTrue(math.isnan(t.get())) # Returns NaN if tracker is empty
Expand Down
10 changes: 7 additions & 3 deletions sdks/python/apache_beam/ml/anomaly/univariate/perf_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,20 +55,24 @@ def test_mean_perf(self):
print()
print_result(IncLandmarkMeanTracker())
print_result(IncSlidingMeanTracker(100))
# SimpleSlidingMeanTracker (numpy-based batch approach) is an order of
# magnitude slower than other methods. To prevent excessively long test
# runs, we reduce the number of repetitions.
print_result(SimpleSlidingMeanTracker(100), number=1)

def test_stdev_perf(self):
print()
print_result(IncLandmarkStdevTracker())
print_result(IncSlidingStdevTracker(100))
print_result(SimpleSlidingStdevTracker(100), number=1)
# Same as test_mean_perf, we reduce the number of repetitions here.

def test_quantile_perf(self):
print()
with warnings.catch_warnings(record=False):
warnings.simplefilter("ignore")
print_result(IncLandmarkQuantileTracker(0.5))
print_result(IncSlidingQuantileTracker(100, 0.5))
print_result(BufferedLandmarkQuantileTracker(0.5))
print_result(BufferedSlidingQuantileTracker(100, 0.5))
# Same as test_mean_perf, we reduce the number of repetitions here.
print_result(SimpleSlidingQuantileTracker(100, 0.5), number=1)


Expand Down
94 changes: 85 additions & 9 deletions sdks/python/apache_beam/ml/anomaly/univariate/quantile.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,19 @@
# limitations under the License.
#

"""Trackers for calculating quantiles in windowed fashion.
This module defines different types of quantile trackers that operate on
windows of data. It includes:
* `SimpleSlidingQuantileTracker`: Calculates quantile using numpy in a sliding
window.
* `BufferedLandmarkQuantileTracker`: Sortedlist based quantile tracker in
landmark window mode.
* `BufferedSlidingQuantileTracker`: Sortedlist based quantile tracker in
sliding window mode.
"""

import math
import typing
import warnings
Expand All @@ -25,37 +38,71 @@
from apache_beam.ml.anomaly.univariate.base import WindowMode
from apache_beam.ml.anomaly.univariate.base import WindowedTracker

__all__ = [
"IncLandmarkQuantileTracker",
"SimpleSlidingQuantileTracker",
"IncSlidingQuantileTracker"
]


class QuantileTracker(WindowedTracker):
"""Abstract base class for quantile trackers.
Currently, it does not add any specific functionality but provides a type
hierarchy for quantile trackers.
"""
pass


class SimpleSlidingQuantileTracker(QuantileTracker):
"""Sliding window quantile tracker using NumPy.
This tracker uses NumPy's `nanquantile` function to calculate the specified
quantile of the values currently in the sliding window. It's a simple,
non-incremental approach.
Args:
window_size: The size of the sliding window.
q: The quantile to calculate, a float between 0 and 1 (inclusive).
"""
def __init__(self, window_size, q):
super().__init__(window_mode=WindowMode.SLIDING, window_size=window_size)
assert 0 <= q <= 1, "quantile argument should be between 0 and 1"
self._q = q

def get(self):
"""Calculates and returns the specified quantile of the current sliding
window.
Returns:
float: The specified quantile of the values in the current sliding window.
Returns NaN if the window is empty.
"""
with warnings.catch_warnings(record=False):
warnings.simplefilter("ignore")
return np.nanquantile(self._queue, self._q)


class IncQuantileTracker(WindowedTracker):
class BufferedQuantileTracker(WindowedTracker):
"""Abstract base class for buffered quantile trackers.
Warning:
Buffered quantile trackers are NOT truly incremental in the sense that they
don't update the quantile in constant time per new data point. They maintain
a sorted list of all values in the window.
Args:
window_mode: A `WindowMode` enum specifying whether the window is `LANDMARK`
or `SLIDING`.
q: The quantile to calculate, a float between 0 and 1 (inclusive).
**kwargs: Keyword arguments passed to the parent class constructor.
"""
def __init__(self, window_mode, q, **kwargs):
super().__init__(window_mode, **kwargs)
assert 0 <= q <= 1, "quantile argument should be between 0 and 1"
self._q = q
self._sorted_items = SortedList()

def push(self, x):
"""Pushes a new value, maintains the sorted list, and manages the window.
Args:
x: The new value to be pushed.
"""
if not math.isnan(x):
self._sorted_items.add(x)

Expand All @@ -67,6 +114,13 @@ def push(self, x):
super().push(x)

def get(self):
"""Returns the current quantile value using the sorted list.
Calculates the quantile using linear interpolation on the sorted values.
Returns:
float: The calculated quantile value. Returns NaN if the window is empty.
"""
n = len(self._sorted_items)
if n < 1:
return float("nan")
Expand All @@ -81,15 +135,37 @@ def get(self):
return lo_value + (hi_value - lo_value) * (pos - lo)


class IncLandmarkQuantileTracker(IncQuantileTracker):
class BufferedLandmarkQuantileTracker(BufferedQuantileTracker):
"""Landmark quantile tracker using a sorted list for quantile calculation.
Warning:
Landmark quantile trackers have unbounded memory consumption as they store
all pushed values in a sorted list. Avoid using in production for
long-running streams.
Args:
q: The quantile to calculate, a float between 0 and 1 (inclusive).
"""
def __init__(self, q):
warnings.warn(
"Quantile trackers should not be used in production due to "
"the unbounded memory consumption.")
super().__init__(window_mode=WindowMode.LANDMARK, q=q)


class IncSlidingQuantileTracker(IncQuantileTracker):
class BufferedSlidingQuantileTracker(BufferedQuantileTracker):
"""Sliding window quantile tracker using a sorted list for quantile
calculation.
Warning:
Maintains a sorted list of values within the sliding window to calculate
the specified quantile. Memory consumption is bounded by the window size
but can still be significant for large windows.
Args:
window_size: The size of the sliding window.
q: The quantile to calculate, a float between 0 and 1 (inclusive).
"""
def __init__(self, window_size, q):
super().__init__(
window_mode=WindowMode.SLIDING, q=q, window_size=window_size)
Loading

0 comments on commit b2ffec8

Please sign in to comment.