diff --git a/sdks/python/apache_beam/ml/anomaly/univariate/base.py b/sdks/python/apache_beam/ml/anomaly/univariate/base.py index 9bd6ee45d23e..b0eb2aba1e69 100644 --- a/sdks/python/apache_beam/ml/anomaly/univariate/base.py +++ b/sdks/python/apache_beam/ml/anomaly/univariate/base.py @@ -21,21 +21,49 @@ class BaseTracker(abc.ABC): + """Abstract base class for all univariate trackers.""" @abc.abstractmethod def push(self, x): + """Push a new value to the tracker. + + Args: + x: The value to be pushed. + """ raise NotImplementedError() @abc.abstractmethod def get(self): + """Get the current tracking value. + + Returns: + The current tracked value, the type of which depends on the specific + tracker implementation. + """ raise NotImplementedError() class WindowMode(Enum): + """Enum representing the window mode for windowed trackers.""" + #: operating on all data points from the beginning. LANDMARK = 1 + #: operating on a fixed-size sliding window of recent data points. SLIDING = 2 class WindowedTracker(BaseTracker): + """Abstract base class for trackers that operate on a data window. + + This class provides a foundation for trackers that maintain a window of data, + either as a landmark window or a sliding window. It provides basic push and + pop operations. + + Args: + window_mode: A `WindowMode` enum specifying whether the window is `LANDMARK` + or `SLIDING`. + **kwargs: Keyword arguments. + For `SLIDING` window mode, `window_size` can be specified to set the + maximum size of the sliding window. Defaults to 100. + """ def __init__(self, window_mode, **kwargs): if window_mode == WindowMode.SLIDING: self._window_size = kwargs.get("window_size", 100) @@ -44,7 +72,17 @@ def __init__(self, window_mode, **kwargs): self._window_mode = window_mode def push(self, x): + """Adds a new value to the data window. + + Args: + x: The value to be added to the window. + """ self._queue.append(x) def pop(self): + """Removes and returns the oldest value from the data window (FIFO). + + Returns: + The oldest value from the window. + """ return self._queue.popleft() diff --git a/sdks/python/apache_beam/ml/anomaly/univariate/mean.py b/sdks/python/apache_beam/ml/anomaly/univariate/mean.py index 7c2ae0a76820..eeb806e5d378 100644 --- a/sdks/python/apache_beam/ml/anomaly/univariate/mean.py +++ b/sdks/python/apache_beam/ml/anomaly/univariate/mean.py @@ -15,6 +15,16 @@ # limitations under the License. # +"""Trackers for calculating mean in windowed fashion. + +This module defines different types of mean trackers that operate on windows +of data. It includes: + + * `SimpleSlidingMeanTracker`: Calculates mean using numpy in a sliding window. + * `IncLandmarkMeanTracker`: Incremental mean tracker in landmark window mode. + * `IncSlidingMeanTracker`: Incremental mean tracker in sliding window mode. +""" + import math import warnings @@ -23,22 +33,36 @@ from apache_beam.ml.anomaly.univariate.base import WindowMode from apache_beam.ml.anomaly.univariate.base import WindowedTracker -__all__ = [ - "SimpleSlidingMeanTracker", - "IncLandmarkMeanTracker", - "IncSlidingMeanTracker" -] - class MeanTracker(WindowedTracker): + """Abstract base class for mean trackers. + + Currently, it does not add any specific functionality but provides a type + hierarchy for mean trackers. + """ pass class SimpleSlidingMeanTracker(MeanTracker): + """Sliding window mean tracker that calculates mean using NumPy. + + This tracker uses NumPy's `nanmean` function to calculate the mean of the + values currently in the sliding window. It's a simple, non-incremental + approach. + + Args: + window_size: The size of the sliding window. + """ def __init__(self, window_size): super().__init__(window_mode=WindowMode.SLIDING, window_size=window_size) def get(self): + """Calculates and returns the mean of the current sliding window. + + Returns: + float: The mean of the values in the current sliding window. + Returns NaN if the window is empty. + """ if len(self._queue) == 0: return float('nan') @@ -48,11 +72,27 @@ def get(self): class IncMeanTracker(MeanTracker): + """Base class for incremental mean trackers. + + This class implements incremental calculation of the mean, which is more + efficient for streaming data as it updates the mean with each new data point + instead of recalculating from scratch. + + Args: + window_mode: A `WindowMode` enum specifying whether the window is `LANDMARK` + or `SLIDING`. + **kwargs: Keyword arguments passed to the parent class constructor. + """ def __init__(self, window_mode, **kwargs): super().__init__(window_mode=window_mode, **kwargs) self._mean = 0 def push(self, x): + """Pushes a new value and updates the incremental mean. + + Args: + x: The new value to be pushed. + """ if not math.isnan(x): self._n += 1 delta = x - self._mean @@ -73,6 +113,12 @@ def push(self, x): self._mean = 0 def get(self): + """Returns the current incremental mean. + + Returns: + float: The current incremental mean value. + Returns NaN if no valid (non-NaN) values have been pushed. + """ if self._n < 1: # keep it consistent with numpy return float("nan") @@ -80,10 +126,16 @@ def get(self): class IncLandmarkMeanTracker(IncMeanTracker): + """Landmark window mean tracker using incremental calculation.""" def __init__(self): super().__init__(window_mode=WindowMode.LANDMARK) class IncSlidingMeanTracker(IncMeanTracker): + """Sliding window mean tracker using incremental calculation. + + Args: + window_size: The size of the sliding window. + """ def __init__(self, window_size): super().__init__(window_mode=WindowMode.SLIDING, window_size=window_size) diff --git a/sdks/python/apache_beam/ml/anomaly/univariate/mean_test.py b/sdks/python/apache_beam/ml/anomaly/univariate/mean_test.py index 1348f314a0ed..0c8888597ebe 100644 --- a/sdks/python/apache_beam/ml/anomaly/univariate/mean_test.py +++ b/sdks/python/apache_beam/ml/anomaly/univariate/mean_test.py @@ -89,10 +89,7 @@ def test_accuracy_fuzz(self): class SlidingMeanTest(unittest.TestCase): - @parameterized.expand([ - # SimpleSlidingMeanTracker, - IncSlidingMeanTracker - ]) + @parameterized.expand([SimpleSlidingMeanTracker, IncSlidingMeanTracker]) def test_without_nan(self, tracker): t = tracker(3) self.assertTrue(math.isnan(t.get())) # Returns NaN if tracker is empty diff --git a/sdks/python/apache_beam/ml/anomaly/univariate/perf_test.py b/sdks/python/apache_beam/ml/anomaly/univariate/perf_test.py index 114aa1889e83..b7324aa94f70 100644 --- a/sdks/python/apache_beam/ml/anomaly/univariate/perf_test.py +++ b/sdks/python/apache_beam/ml/anomaly/univariate/perf_test.py @@ -55,20 +55,24 @@ def test_mean_perf(self): print() print_result(IncLandmarkMeanTracker()) print_result(IncSlidingMeanTracker(100)) + # SimpleSlidingMeanTracker (numpy-based batch approach) is an order of + # magnitude slower than other methods. To prevent excessively long test + # runs, we reduce the number of repetitions. print_result(SimpleSlidingMeanTracker(100), number=1) def test_stdev_perf(self): print() print_result(IncLandmarkStdevTracker()) print_result(IncSlidingStdevTracker(100)) - print_result(SimpleSlidingStdevTracker(100), number=1) + # Same as test_mean_perf, we reduce the number of repetitions here. def test_quantile_perf(self): print() with warnings.catch_warnings(record=False): warnings.simplefilter("ignore") - print_result(IncLandmarkQuantileTracker(0.5)) - print_result(IncSlidingQuantileTracker(100, 0.5)) + print_result(BufferedLandmarkQuantileTracker(0.5)) + print_result(BufferedSlidingQuantileTracker(100, 0.5)) + # Same as test_mean_perf, we reduce the number of repetitions here. print_result(SimpleSlidingQuantileTracker(100, 0.5), number=1) diff --git a/sdks/python/apache_beam/ml/anomaly/univariate/quantile.py b/sdks/python/apache_beam/ml/anomaly/univariate/quantile.py index 632fcc9f6f66..cf7b94c320ad 100644 --- a/sdks/python/apache_beam/ml/anomaly/univariate/quantile.py +++ b/sdks/python/apache_beam/ml/anomaly/univariate/quantile.py @@ -15,6 +15,19 @@ # limitations under the License. # +"""Trackers for calculating quantiles in windowed fashion. + +This module defines different types of quantile trackers that operate on +windows of data. It includes: + + * `SimpleSlidingQuantileTracker`: Calculates quantile using numpy in a sliding + window. + * `BufferedLandmarkQuantileTracker`: Sortedlist based quantile tracker in + landmark window mode. + * `BufferedSlidingQuantileTracker`: Sortedlist based quantile tracker in + sliding window mode. +""" + import math import typing import warnings @@ -25,30 +38,59 @@ from apache_beam.ml.anomaly.univariate.base import WindowMode from apache_beam.ml.anomaly.univariate.base import WindowedTracker -__all__ = [ - "IncLandmarkQuantileTracker", - "SimpleSlidingQuantileTracker", - "IncSlidingQuantileTracker" -] - class QuantileTracker(WindowedTracker): + """Abstract base class for quantile trackers. + + Currently, it does not add any specific functionality but provides a type + hierarchy for quantile trackers. + """ pass class SimpleSlidingQuantileTracker(QuantileTracker): + """Sliding window quantile tracker using NumPy. + + This tracker uses NumPy's `nanquantile` function to calculate the specified + quantile of the values currently in the sliding window. It's a simple, + non-incremental approach. + + Args: + window_size: The size of the sliding window. + q: The quantile to calculate, a float between 0 and 1 (inclusive). + """ def __init__(self, window_size, q): super().__init__(window_mode=WindowMode.SLIDING, window_size=window_size) assert 0 <= q <= 1, "quantile argument should be between 0 and 1" self._q = q def get(self): + """Calculates and returns the specified quantile of the current sliding + window. + + Returns: + float: The specified quantile of the values in the current sliding window. + Returns NaN if the window is empty. + """ with warnings.catch_warnings(record=False): warnings.simplefilter("ignore") return np.nanquantile(self._queue, self._q) -class IncQuantileTracker(WindowedTracker): +class BufferedQuantileTracker(WindowedTracker): + """Abstract base class for buffered quantile trackers. + + Warning: + Buffered quantile trackers are NOT truly incremental in the sense that they + don't update the quantile in constant time per new data point. They maintain + a sorted list of all values in the window. + + Args: + window_mode: A `WindowMode` enum specifying whether the window is `LANDMARK` + or `SLIDING`. + q: The quantile to calculate, a float between 0 and 1 (inclusive). + **kwargs: Keyword arguments passed to the parent class constructor. + """ def __init__(self, window_mode, q, **kwargs): super().__init__(window_mode, **kwargs) assert 0 <= q <= 1, "quantile argument should be between 0 and 1" @@ -56,6 +98,11 @@ def __init__(self, window_mode, q, **kwargs): self._sorted_items = SortedList() def push(self, x): + """Pushes a new value, maintains the sorted list, and manages the window. + + Args: + x: The new value to be pushed. + """ if not math.isnan(x): self._sorted_items.add(x) @@ -67,6 +114,13 @@ def push(self, x): super().push(x) def get(self): + """Returns the current quantile value using the sorted list. + + Calculates the quantile using linear interpolation on the sorted values. + + Returns: + float: The calculated quantile value. Returns NaN if the window is empty. + """ n = len(self._sorted_items) if n < 1: return float("nan") @@ -81,7 +135,17 @@ def get(self): return lo_value + (hi_value - lo_value) * (pos - lo) -class IncLandmarkQuantileTracker(IncQuantileTracker): +class BufferedLandmarkQuantileTracker(BufferedQuantileTracker): + """Landmark quantile tracker using a sorted list for quantile calculation. + + Warning: + Landmark quantile trackers have unbounded memory consumption as they store + all pushed values in a sorted list. Avoid using in production for + long-running streams. + + Args: + q: The quantile to calculate, a float between 0 and 1 (inclusive). + """ def __init__(self, q): warnings.warn( "Quantile trackers should not be used in production due to " @@ -89,7 +153,19 @@ def __init__(self, q): super().__init__(window_mode=WindowMode.LANDMARK, q=q) -class IncSlidingQuantileTracker(IncQuantileTracker): +class BufferedSlidingQuantileTracker(BufferedQuantileTracker): + """Sliding window quantile tracker using a sorted list for quantile + calculation. + + Warning: + Maintains a sorted list of values within the sliding window to calculate + the specified quantile. Memory consumption is bounded by the window size + but can still be significant for large windows. + + Args: + window_size: The size of the sliding window. + q: The quantile to calculate, a float between 0 and 1 (inclusive). + """ def __init__(self, window_size, q): super().__init__( window_mode=WindowMode.SLIDING, q=q, window_size=window_size) diff --git a/sdks/python/apache_beam/ml/anomaly/univariate/quantile_test.py b/sdks/python/apache_beam/ml/anomaly/univariate/quantile_test.py index ceed2483e4fa..9f9402505b70 100644 --- a/sdks/python/apache_beam/ml/anomaly/univariate/quantile_test.py +++ b/sdks/python/apache_beam/ml/anomaly/univariate/quantile_test.py @@ -24,8 +24,8 @@ from parameterized import parameterized -from apache_beam.ml.anomaly.univariate.quantile import IncLandmarkQuantileTracker # pylint: disable=line-too-long -from apache_beam.ml.anomaly.univariate.quantile import IncSlidingQuantileTracker +from apache_beam.ml.anomaly.univariate.quantile import BufferedLandmarkQuantileTracker # pylint: disable=line-too-long +from apache_beam.ml.anomaly.univariate.quantile import BufferedSlidingQuantileTracker from apache_beam.ml.anomaly.univariate.quantile import SimpleSlidingQuantileTracker # pylint: disable=line-too-long @@ -33,7 +33,7 @@ class LandmarkQuantileTest(unittest.TestCase): def test_without_nan(self): with warnings.catch_warnings(record=False): warnings.simplefilter("ignore") - t = IncLandmarkQuantileTracker(0.5) + t = BufferedLandmarkQuantileTracker(0.5) self.assertTrue(math.isnan(t.get())) t.push(1) @@ -52,7 +52,7 @@ def test_without_nan(self): def test_with_nan(self): with warnings.catch_warnings(record=False): warnings.simplefilter("ignore") - t = IncLandmarkQuantileTracker(0.2) + t = BufferedLandmarkQuantileTracker(0.2) self.assertTrue(math.isnan(t.get())) t.push(1) @@ -78,7 +78,7 @@ def _accuracy_helper(): with warnings.catch_warnings(record=False): warnings.simplefilter("ignore") - t1 = IncLandmarkQuantileTracker(0.5) + t1 = BufferedLandmarkQuantileTracker(0.5) t2 = SimpleSlidingQuantileTracker(len(numbers), 0.5) for v in numbers: t1.push(v) @@ -92,7 +92,7 @@ def _accuracy_helper(): class SlidingQuantileTest(unittest.TestCase): @parameterized.expand( [ #SimpleSlidingQuantileTracker, - IncSlidingQuantileTracker + BufferedSlidingQuantileTracker ]) def test_without_nan(self, tracker): t = tracker(3, 0.5) @@ -111,7 +111,7 @@ def test_without_nan(self, tracker): self.assertEqual(t.get(), 1.0) @parameterized.expand( - [SimpleSlidingQuantileTracker, IncSlidingQuantileTracker]) + [SimpleSlidingQuantileTracker, BufferedSlidingQuantileTracker]) def test_with_nan(self, tracker): t = tracker(3, 0.8) self.assertTrue(math.isnan(t.get())) @@ -146,7 +146,7 @@ def _accuracy_helper(): for _ in range(5000): numbers.append(random.randint(0, 1000)) - t1 = IncSlidingQuantileTracker(100, 0.1) + t1 = BufferedSlidingQuantileTracker(100, 0.1) t2 = SimpleSlidingQuantileTracker(100, 0.1) for v in numbers: t1.push(v) diff --git a/sdks/python/apache_beam/ml/anomaly/univariate/stdev.py b/sdks/python/apache_beam/ml/anomaly/univariate/stdev.py index 1e895669dc85..6868db07b3ac 100644 --- a/sdks/python/apache_beam/ml/anomaly/univariate/stdev.py +++ b/sdks/python/apache_beam/ml/anomaly/univariate/stdev.py @@ -12,6 +12,18 @@ # limitations under the License. # +"""Trackers for calculating standard deviation in windowed fashion. + +This module defines different types of standard deviation trackers that operate +on windows of data. It includes: + + * `SimpleSlidingStdevTracker`: Calculates stdev using numpy in a sliding + window. + * `IncLandmarkStdevTracker`: Incremental stdev tracker in landmark window + mode. + * `IncSlidingStdevTracker`: Incremental stdev tracker in sliding window mode. +""" + import math import warnings @@ -20,22 +32,33 @@ from apache_beam.ml.anomaly.univariate.base import WindowMode from apache_beam.ml.anomaly.univariate.base import WindowedTracker -__all__ = [ - "SimpleSlidingStdevTracker", - "IncLandmarkStdevTracker", - "IncSlidingStdevTracker" -] - class StdevTracker(WindowedTracker): + """Abstract base class for standard deviation trackers. + + Currently, it does not add any specific functionality but provides a type + hierarchy for stdev trackers. + """ pass class SimpleSlidingStdevTracker(StdevTracker): + """Sliding window standard deviation tracker using NumPy. + + This tracker uses NumPy's `nanvar` function to calculate the variance of the + values currently in the sliding window and then takes the square root to get + the standard deviation. It's a simple, non-incremental approach. + """ def __init__(self, window_size): super().__init__(window_mode=WindowMode.SLIDING, window_size=window_size) def get(self): + """Calculates and returns the stdev of the current sliding window. + + Returns: + float: The standard deviation of the values in the current sliding window. + Returns NaN if the window contains fewer than 2 elements. + """ with warnings.catch_warnings(record=False): warnings.simplefilter("ignore") # We do not use nanstd, since nanstd([]) gives 0, which is incorrect. @@ -44,12 +67,30 @@ def get(self): class IncStdevTracker(StdevTracker): + """Abstract base class for incremental standard deviation trackers. + + This class implements an online algorithm for calculating standard deviation, + updating the standard deviation incrementally as new data points arrive. + + Args: + window_mode: A `WindowMode` enum specifying whether the window is `LANDMARK` + or `SLIDING`. + **kwargs: Keyword arguments passed to the parent class constructor. + """ def __init__(self, window_mode, **kwargs): super().__init__(window_mode, **kwargs) self._mean = 0 self._m2 = 0 def push(self, x): + """Pushes a new value and updates the incremental standard deviation + calculation. + + Implements Welford's online algorithm for variance, then derives stdev. + + Args: + x: The new value to be pushed. + """ if not math.isnan(x): self._n += 1 delta1 = x - self._mean @@ -80,6 +121,12 @@ def push(self, x): self._m2 = 0 def get(self): + """Returns the current incremental standard deviation. + + Returns: + float: The current incremental standard deviation value. + Returns NaN if fewer than 2 valid (non-NaN) values have been pushed. + """ if self._n < 2: # keep it consistent with numpy return float("nan") @@ -88,10 +135,17 @@ def get(self): class IncLandmarkStdevTracker(IncStdevTracker): + """Landmark window standard deviation tracker using incremental calculation.""" # pylint: disable=line-too-long + def __init__(self): super().__init__(window_mode=WindowMode.LANDMARK) class IncSlidingStdevTracker(IncStdevTracker): + """Sliding window standard deviation tracker using incremental calculation. + + Args: + window_size: The size of the sliding window. + """ def __init__(self, window_size): super().__init__(window_mode=WindowMode.SLIDING, window_size=window_size)