From 9a6476355330732c478763d609a339fe974255df Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Tue, 18 Feb 2025 15:27:08 -0500 Subject: [PATCH] [AnomalyDetection] Add univariate trackers (#33994) * Change prediction in AnomalyResult to predictions which is now an iterable of AnomalyPrediction. * Add mean, stdev and quantile trackers with tests. * Add docstrings * Fix lints * Make trackers specifiable. Also includes minor fixes on Specifiable and univariate perf tests. * Adjust class structures in trackers. Minor fix per feedback. --- .../apache_beam/ml/anomaly/specifiable.py | 5 +- .../ml/anomaly/univariate/__init__.py | 16 ++ .../apache_beam/ml/anomaly/univariate/base.py | 88 +++++++++ .../apache_beam/ml/anomaly/univariate/mean.py | 146 +++++++++++++++ .../ml/anomaly/univariate/mean_test.py | 161 ++++++++++++++++ .../ml/anomaly/univariate/perf_test.py | 82 ++++++++ .../ml/anomaly/univariate/quantile.py | 176 ++++++++++++++++++ .../ml/anomaly/univariate/quantile_test.py | 162 ++++++++++++++++ .../ml/anomaly/univariate/stdev.py | 156 ++++++++++++++++ .../ml/anomaly/univariate/stdev_test.py | 157 ++++++++++++++++ 10 files changed, 1147 insertions(+), 2 deletions(-) create mode 100644 sdks/python/apache_beam/ml/anomaly/univariate/__init__.py create mode 100644 sdks/python/apache_beam/ml/anomaly/univariate/base.py create mode 100644 sdks/python/apache_beam/ml/anomaly/univariate/mean.py create mode 100644 sdks/python/apache_beam/ml/anomaly/univariate/mean_test.py create mode 100644 sdks/python/apache_beam/ml/anomaly/univariate/perf_test.py create mode 100644 sdks/python/apache_beam/ml/anomaly/univariate/quantile.py create mode 100644 sdks/python/apache_beam/ml/anomaly/univariate/quantile_test.py create mode 100644 sdks/python/apache_beam/ml/anomaly/univariate/stdev.py create mode 100644 sdks/python/apache_beam/ml/anomaly/univariate/stdev_test.py diff --git a/sdks/python/apache_beam/ml/anomaly/specifiable.py b/sdks/python/apache_beam/ml/anomaly/specifiable.py index 1aedab2e8c21..e0122d41d9d5 100644 --- a/sdks/python/apache_beam/ml/anomaly/specifiable.py +++ b/sdks/python/apache_beam/ml/anomaly/specifiable.py @@ -42,6 +42,7 @@ _ACCEPTED_SUBSPACES = [ "EnsembleAnomalyDetector", "AnomalyDetector", + "BaseTracker", "ThresholdFn", "AggregationFn", _FALLBACK_SUBSPACE, @@ -80,7 +81,7 @@ def _spec_type_to_subspace(spec_type: str) -> str: if spec_type in _KNOWN_SPECIFIABLE[subspace]: return subspace - raise ValueError(f"subspace for {str} not found.") + raise ValueError(f"subspace for {spec_type} not found.") @dataclasses.dataclass(frozen=True) @@ -309,7 +310,7 @@ def new_getattr(self, name): cls.run_original_init = run_original_init cls.to_spec = Specifiable.to_spec cls._to_spec_helper = staticmethod(Specifiable._to_spec_helper) - cls.from_spec = classmethod(Specifiable.from_spec) + cls.from_spec = Specifiable.from_spec cls._from_spec_helper = staticmethod(Specifiable._from_spec_helper) return cls # end of the function body of _wrapper diff --git a/sdks/python/apache_beam/ml/anomaly/univariate/__init__.py b/sdks/python/apache_beam/ml/anomaly/univariate/__init__.py new file mode 100644 index 000000000000..cce3acad34a4 --- /dev/null +++ b/sdks/python/apache_beam/ml/anomaly/univariate/__init__.py @@ -0,0 +1,16 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# diff --git a/sdks/python/apache_beam/ml/anomaly/univariate/base.py b/sdks/python/apache_beam/ml/anomaly/univariate/base.py new file mode 100644 index 000000000000..b0eb2aba1e69 --- /dev/null +++ b/sdks/python/apache_beam/ml/anomaly/univariate/base.py @@ -0,0 +1,88 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import abc +from collections import deque +from enum import Enum + + +class BaseTracker(abc.ABC): + """Abstract base class for all univariate trackers.""" + @abc.abstractmethod + def push(self, x): + """Push a new value to the tracker. + + Args: + x: The value to be pushed. + """ + raise NotImplementedError() + + @abc.abstractmethod + def get(self): + """Get the current tracking value. + + Returns: + The current tracked value, the type of which depends on the specific + tracker implementation. + """ + raise NotImplementedError() + + +class WindowMode(Enum): + """Enum representing the window mode for windowed trackers.""" + #: operating on all data points from the beginning. + LANDMARK = 1 + #: operating on a fixed-size sliding window of recent data points. + SLIDING = 2 + + +class WindowedTracker(BaseTracker): + """Abstract base class for trackers that operate on a data window. + + This class provides a foundation for trackers that maintain a window of data, + either as a landmark window or a sliding window. It provides basic push and + pop operations. + + Args: + window_mode: A `WindowMode` enum specifying whether the window is `LANDMARK` + or `SLIDING`. + **kwargs: Keyword arguments. + For `SLIDING` window mode, `window_size` can be specified to set the + maximum size of the sliding window. Defaults to 100. + """ + def __init__(self, window_mode, **kwargs): + if window_mode == WindowMode.SLIDING: + self._window_size = kwargs.get("window_size", 100) + self._queue = deque(maxlen=self._window_size) + self._n = 0 + self._window_mode = window_mode + + def push(self, x): + """Adds a new value to the data window. + + Args: + x: The value to be added to the window. + """ + self._queue.append(x) + + def pop(self): + """Removes and returns the oldest value from the data window (FIFO). + + Returns: + The oldest value from the window. + """ + return self._queue.popleft() diff --git a/sdks/python/apache_beam/ml/anomaly/univariate/mean.py b/sdks/python/apache_beam/ml/anomaly/univariate/mean.py new file mode 100644 index 000000000000..9aec5098bfd4 --- /dev/null +++ b/sdks/python/apache_beam/ml/anomaly/univariate/mean.py @@ -0,0 +1,146 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Trackers for calculating mean in windowed fashion. + +This module defines different types of mean trackers that operate on windows +of data. It includes: + + * `SimpleSlidingMeanTracker`: Calculates mean using numpy in a sliding window. + * `IncLandmarkMeanTracker`: Incremental mean tracker in landmark window mode. + * `IncSlidingMeanTracker`: Incremental mean tracker in sliding window mode. +""" + +import math +import warnings + +import numpy as np + +from apache_beam.ml.anomaly.specifiable import specifiable +from apache_beam.ml.anomaly.univariate.base import BaseTracker +from apache_beam.ml.anomaly.univariate.base import WindowedTracker +from apache_beam.ml.anomaly.univariate.base import WindowMode + + +class MeanTracker(BaseTracker): + """Abstract base class for mean trackers. + + Currently, it does not add any specific functionality but provides a type + hierarchy for mean trackers. + """ + pass + + +@specifiable +class SimpleSlidingMeanTracker(WindowedTracker, MeanTracker): + """Sliding window mean tracker that calculates mean using NumPy. + + This tracker uses NumPy's `nanmean` function to calculate the mean of the + values currently in the sliding window. It's a simple, non-incremental + approach. + + Args: + window_size: The size of the sliding window. + """ + def __init__(self, window_size): + super().__init__(window_mode=WindowMode.SLIDING, window_size=window_size) + + def get(self): + """Calculates and returns the mean of the current sliding window. + + Returns: + float: The mean of the values in the current sliding window. + Returns NaN if the window is empty. + """ + if len(self._queue) == 0: + return float('nan') + + with warnings.catch_warnings(record=False): + warnings.simplefilter("ignore") + return np.nanmean(self._queue) + + +class IncMeanTracker(WindowedTracker, MeanTracker): + """Base class for incremental mean trackers. + + This class implements incremental calculation of the mean, which is more + efficient for streaming data as it updates the mean with each new data point + instead of recalculating from scratch. + + Args: + window_mode: A `WindowMode` enum specifying whether the window is `LANDMARK` + or `SLIDING`. + **kwargs: Keyword arguments passed to the parent class constructor. + """ + def __init__(self, window_mode, **kwargs): + super().__init__(window_mode=window_mode, **kwargs) + self._mean = 0 + + def push(self, x): + """Pushes a new value and updates the incremental mean. + + Args: + x: The new value to be pushed. + """ + if not math.isnan(x): + self._n += 1 + delta = x - self._mean + else: + delta = 0 + + if self._window_mode == WindowMode.SLIDING: + if len(self._queue) >= self._window_size and \ + not math.isnan(old_x := self.pop()): + self._n -= 1 + delta += (self._mean - old_x) + + super().push(x) + + if self._n > 0: + self._mean += delta / self._n + else: + self._mean = 0 + + def get(self): + """Returns the current incremental mean. + + Returns: + float: The current incremental mean value. + Returns NaN if no valid (non-NaN) values have been pushed. + """ + if self._n < 1: + # keep it consistent with numpy + return float("nan") + return self._mean + + +@specifiable +class IncLandmarkMeanTracker(IncMeanTracker): + """Landmark window mean tracker using incremental calculation.""" + def __init__(self): + super().__init__(window_mode=WindowMode.LANDMARK) + + +@specifiable +class IncSlidingMeanTracker(IncMeanTracker): + """Sliding window mean tracker using incremental calculation. + + Args: + window_size: The size of the sliding window. + """ + def __init__(self, window_size): + super().__init__(window_mode=WindowMode.SLIDING, window_size=window_size) diff --git a/sdks/python/apache_beam/ml/anomaly/univariate/mean_test.py b/sdks/python/apache_beam/ml/anomaly/univariate/mean_test.py new file mode 100644 index 000000000000..0c8888597ebe --- /dev/null +++ b/sdks/python/apache_beam/ml/anomaly/univariate/mean_test.py @@ -0,0 +1,161 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import logging +import math +import random +import time +import unittest +import warnings + +from parameterized import parameterized + +from apache_beam.ml.anomaly.univariate.mean import IncLandmarkMeanTracker +from apache_beam.ml.anomaly.univariate.mean import IncSlidingMeanTracker +from apache_beam.ml.anomaly.univariate.mean import SimpleSlidingMeanTracker + +FLOAT64_MAX = 1.79769313486231570814527423731704356798070e+308 + + +class LandmarkMeanTest(unittest.TestCase): + def test_without_nan(self): + t = IncLandmarkMeanTracker() + self.assertTrue(math.isnan(t.get())) # Returns NaN if tracker is empty + + t.push(1) + self.assertEqual(t.get(), 1.0) + t.push(3) + self.assertEqual(t.get(), 2.0) + t.push(8) + self.assertEqual(t.get(), 4.0) + t.push(16) + self.assertEqual(t.get(), 7.0) + t.push(-3) + self.assertEqual(t.get(), 5.0) + + def test_with_nan(self): + t = IncLandmarkMeanTracker() + + t.push(float('nan')) + self.assertTrue(math.isnan(t.get())) # NaN is ignored + t.push(1) + self.assertEqual(t.get(), 1.0) + t.push(float('nan')) + self.assertEqual(t.get(), 1.0) + t.push(float('nan')) + self.assertEqual(t.get(), 1.0) + t.push(float('nan')) + self.assertEqual(t.get(), 1.0) + + def test_with_float64_max(self): + t = IncLandmarkMeanTracker() + t.push(FLOAT64_MAX) + self.assertEqual(t.get(), FLOAT64_MAX) + t.push(FLOAT64_MAX) + self.assertEqual(t.get(), FLOAT64_MAX) + + def test_accuracy_fuzz(self): + seed = int(time.time()) + random.seed(seed) + print("Random seed: %d" % seed) + + for _ in range(10): + numbers = [] + for _ in range(5000): + numbers.append(random.randint(0, 1000)) + + with warnings.catch_warnings(record=False): + warnings.simplefilter("ignore") + t1 = IncLandmarkMeanTracker() + t2 = SimpleSlidingMeanTracker(len(numbers)) + for v in numbers: + t1.push(v) + t2.push(v) + self.assertTrue(abs(t1.get() - t2.get()) < 1e-9) + + +class SlidingMeanTest(unittest.TestCase): + @parameterized.expand([SimpleSlidingMeanTracker, IncSlidingMeanTracker]) + def test_without_nan(self, tracker): + t = tracker(3) + self.assertTrue(math.isnan(t.get())) # Returns NaN if tracker is empty + + t.push(1) + self.assertEqual(t.get(), 1.0) + t.push(3) + self.assertEqual(t.get(), 2.0) + t.push(8) + self.assertEqual(t.get(), 4.0) + t.push(16) + self.assertEqual(t.get(), 9.0) + t.push(-3) + self.assertEqual(t.get(), 7.0) + + @parameterized.expand([SimpleSlidingMeanTracker, IncSlidingMeanTracker]) + def test_with_nan(self, tracker): + t = tracker(3) + + t.push(float('nan')) + self.assertTrue(math.isnan(t.get())) # NaN is ignored + t.push(1) + self.assertEqual(t.get(), 1.0) + + # flush the only number out + t.push(float('nan')) + self.assertEqual(t.get(), 1.0) + t.push(float('nan')) + self.assertEqual(t.get(), 1.0) + t.push(float('nan')) + self.assertTrue(math.isnan(t.get())) # All values in the tracker are NaN + t.push(4) + self.assertEqual(t.get(), 4.0) + + @parameterized.expand([SimpleSlidingMeanTracker, IncSlidingMeanTracker]) + def test_with_float64_max(self, tracker): + t = tracker(2) + t.push(FLOAT64_MAX) + self.assertEqual(t.get(), FLOAT64_MAX) + t.push(FLOAT64_MAX) + if tracker is IncSlidingMeanTracker: + self.assertEqual(t.get(), FLOAT64_MAX) + self.assertFalse(math.isinf(t.get())) + else: + # SimpleSlidingMean (using Numpy) returns inf when it computes the + # average of [float64_max, float64_max]. + self.assertTrue(math.isinf(t.get())) + + def test_accuracy_fuzz(self): + seed = int(time.time()) + random.seed(seed) + print("Random seed: %d" % seed) + + for _ in range(10): + numbers = [] + for _ in range(5000): + numbers.append(random.randint(0, 1000)) + + t1 = IncSlidingMeanTracker(100) + t2 = SimpleSlidingMeanTracker(100) + for v in numbers: + t1.push(v) + t2.push(v) + self.assertTrue(abs(t1.get() - t2.get()) < 1e-9) + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + unittest.main() diff --git a/sdks/python/apache_beam/ml/anomaly/univariate/perf_test.py b/sdks/python/apache_beam/ml/anomaly/univariate/perf_test.py new file mode 100644 index 000000000000..61067ef38dab --- /dev/null +++ b/sdks/python/apache_beam/ml/anomaly/univariate/perf_test.py @@ -0,0 +1,82 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import logging +import random +import statistics +import time +import timeit +import unittest +import warnings + +from apache_beam.ml.anomaly.univariate.mean import * +from apache_beam.ml.anomaly.univariate.quantile import * +from apache_beam.ml.anomaly.univariate.stdev import * + +seed_value_time = int(time.time()) +random.seed(seed_value_time) +print(f"{'Seed value':32s}{seed_value_time}") + +numbers = [] +for _ in range(50000): + numbers.append(random.randint(0, 1000)) + + +def run_tracker(tracker, numbers): + for i in range(len(numbers)): + tracker.push(numbers[i]) + _ = tracker.get() + + +def print_result(tracker, number=10, repeat=5): + runtimes = timeit.repeat( + lambda: run_tracker(tracker, numbers), number=number, repeat=repeat) + mean = statistics.mean(runtimes) + sd = statistics.stdev(runtimes) + print(f"{tracker.__class__.__name__:32s}{mean:.6f} ± {sd:.6f}") + + +class PerfTest(unittest.TestCase): + def test_mean_perf(self): + print() + print_result(IncLandmarkMeanTracker()) + print_result(IncSlidingMeanTracker(100)) + # SimpleSlidingMeanTracker (numpy-based batch approach) is an order of + # magnitude slower than other methods. To prevent excessively long test + # runs, we reduce the number of repetitions. + print_result(SimpleSlidingMeanTracker(100), number=1) + + def test_stdev_perf(self): + print() + print_result(IncLandmarkStdevTracker()) + print_result(IncSlidingStdevTracker(100)) + # Same as test_mean_perf, we reduce the number of repetitions here. + print_result(SimpleSlidingStdevTracker(100), number=1) + + def test_quantile_perf(self): + print() + with warnings.catch_warnings(record=False): + warnings.simplefilter("ignore") + print_result(BufferedLandmarkQuantileTracker(0.5)) + print_result(BufferedSlidingQuantileTracker(100, 0.5)) + # Same as test_mean_perf, we reduce the number of repetitions here. + print_result(SimpleSlidingQuantileTracker(100, 0.5), number=1) + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + unittest.main() diff --git a/sdks/python/apache_beam/ml/anomaly/univariate/quantile.py b/sdks/python/apache_beam/ml/anomaly/univariate/quantile.py new file mode 100644 index 000000000000..44d2b1ab2448 --- /dev/null +++ b/sdks/python/apache_beam/ml/anomaly/univariate/quantile.py @@ -0,0 +1,176 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Trackers for calculating quantiles in windowed fashion. + +This module defines different types of quantile trackers that operate on +windows of data. It includes: + + * `SimpleSlidingQuantileTracker`: Calculates quantile using numpy in a sliding + window. + * `BufferedLandmarkQuantileTracker`: Sortedlist based quantile tracker in + landmark window mode. + * `BufferedSlidingQuantileTracker`: Sortedlist based quantile tracker in + sliding window mode. +""" + +import math +import typing +import warnings + +import numpy as np +from sortedcontainers import SortedList + +from apache_beam.ml.anomaly.specifiable import specifiable +from apache_beam.ml.anomaly.univariate.base import BaseTracker +from apache_beam.ml.anomaly.univariate.base import WindowedTracker +from apache_beam.ml.anomaly.univariate.base import WindowMode + + +class QuantileTracker(BaseTracker): + """Abstract base class for quantile trackers. + + Currently, it does not add any specific functionality but provides a type + hierarchy for quantile trackers. + """ + def __init__(self, q): + assert 0 <= q <= 1, "quantile argument should be between 0 and 1" + self._q = q + + +@specifiable +class SimpleSlidingQuantileTracker(WindowedTracker, QuantileTracker): + """Sliding window quantile tracker using NumPy. + + This tracker uses NumPy's `nanquantile` function to calculate the specified + quantile of the values currently in the sliding window. It's a simple, + non-incremental approach. + + Args: + window_size: The size of the sliding window. + q: The quantile to calculate, a float between 0 and 1 (inclusive). + """ + def __init__(self, window_size, q): + super().__init__(window_mode=WindowMode.SLIDING, window_size=window_size) + QuantileTracker.__init__(self, q) + + def get(self): + """Calculates and returns the specified quantile of the current sliding + window. + + Returns: + float: The specified quantile of the values in the current sliding window. + Returns NaN if the window is empty. + """ + with warnings.catch_warnings(record=False): + warnings.simplefilter("ignore") + return np.nanquantile(self._queue, self._q) + + +class BufferedQuantileTracker(WindowedTracker, QuantileTracker): + """Abstract base class for buffered quantile trackers. + + Warning: + Buffered quantile trackers are NOT truly incremental in the sense that they + don't update the quantile in constant time per new data point. They maintain + a sorted list of all values in the window. + + Args: + window_mode: A `WindowMode` enum specifying whether the window is `LANDMARK` + or `SLIDING`. + q: The quantile to calculate, a float between 0 and 1 (inclusive). + **kwargs: Keyword arguments passed to the parent class constructor. + """ + def __init__(self, window_mode, q, **kwargs): + super().__init__(window_mode, **kwargs) + QuantileTracker.__init__(self, q) + self._sorted_items = SortedList() + + def push(self, x): + """Pushes a new value, maintains the sorted list, and manages the window. + + Args: + x: The new value to be pushed. + """ + if not math.isnan(x): + self._sorted_items.add(x) + + if self._window_mode == WindowMode.SLIDING: + if (len(self._queue) >= self._window_size and + not math.isnan(old_x := self.pop())): + self._sorted_items.discard(old_x) + + super().push(x) + + def get(self): + """Returns the current quantile value using the sorted list. + + Calculates the quantile using linear interpolation on the sorted values. + + Returns: + float: The calculated quantile value. Returns NaN if the window is empty. + """ + n = len(self._sorted_items) + if n < 1: + return float("nan") + + pos = self._q * (n - 1) + lo = math.floor(pos) + lo_value = typing.cast(float, self._sorted_items[lo]) + + # Use linear interpolation to yield the requested quantile + hi = min(lo + 1, n - 1) + hi_value: float = typing.cast(float, self._sorted_items[hi]) + return lo_value + (hi_value - lo_value) * (pos - lo) + + +@specifiable +class BufferedLandmarkQuantileTracker(BufferedQuantileTracker): + """Landmark quantile tracker using a sorted list for quantile calculation. + + Warning: + Landmark quantile trackers have unbounded memory consumption as they store + all pushed values in a sorted list. Avoid using in production for + long-running streams. + + Args: + q: The quantile to calculate, a float between 0 and 1 (inclusive). + """ + def __init__(self, q): + warnings.warn( + "Quantile trackers should not be used in production due to " + "the unbounded memory consumption.") + super().__init__(window_mode=WindowMode.LANDMARK, q=q) + + +@specifiable +class BufferedSlidingQuantileTracker(BufferedQuantileTracker): + """Sliding window quantile tracker using a sorted list for quantile + calculation. + + Warning: + Maintains a sorted list of values within the sliding window to calculate + the specified quantile. Memory consumption is bounded by the window size + but can still be significant for large windows. + + Args: + window_size: The size of the sliding window. + q: The quantile to calculate, a float between 0 and 1 (inclusive). + """ + def __init__(self, window_size, q): + super().__init__( + window_mode=WindowMode.SLIDING, q=q, window_size=window_size) diff --git a/sdks/python/apache_beam/ml/anomaly/univariate/quantile_test.py b/sdks/python/apache_beam/ml/anomaly/univariate/quantile_test.py new file mode 100644 index 000000000000..9f9402505b70 --- /dev/null +++ b/sdks/python/apache_beam/ml/anomaly/univariate/quantile_test.py @@ -0,0 +1,162 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import logging +import math +import random +import time +import unittest +import warnings + +from parameterized import parameterized + +from apache_beam.ml.anomaly.univariate.quantile import BufferedLandmarkQuantileTracker # pylint: disable=line-too-long +from apache_beam.ml.anomaly.univariate.quantile import BufferedSlidingQuantileTracker +from apache_beam.ml.anomaly.univariate.quantile import SimpleSlidingQuantileTracker # pylint: disable=line-too-long + + +class LandmarkQuantileTest(unittest.TestCase): + def test_without_nan(self): + with warnings.catch_warnings(record=False): + warnings.simplefilter("ignore") + t = BufferedLandmarkQuantileTracker(0.5) + + self.assertTrue(math.isnan(t.get())) + t.push(1) + self.assertEqual(t.get(), 1.0) + t.push(2) + self.assertEqual(t.get(), 1.5) + t.push(2) + self.assertEqual(t.get(), 2.0) + t.push(0) + self.assertEqual(t.get(), 1.5) + t.push(3) + self.assertEqual(t.get(), 2.0) + t.push(1) + self.assertEqual(t.get(), 1.5) + + def test_with_nan(self): + with warnings.catch_warnings(record=False): + warnings.simplefilter("ignore") + t = BufferedLandmarkQuantileTracker(0.2) + + self.assertTrue(math.isnan(t.get())) + t.push(1) + self.assertEqual(t.get(), 1.0) + t.push(float('nan')) + self.assertEqual(t.get(), 1.0) + t.push(2) + self.assertEqual(t.get(), 1.2) + t.push(float('nan')) + self.assertEqual(t.get(), 1.2) + t.push(0) + self.assertEqual(t.get(), 0.4) + + def test_accuracy_fuzz(self): + seed = int(time.time()) + random.seed(seed) + print("Random seed: %d" % seed) + + def _accuracy_helper(): + numbers = [] + for _ in range(5000): + numbers.append(random.randint(0, 1000)) + + with warnings.catch_warnings(record=False): + warnings.simplefilter("ignore") + t1 = BufferedLandmarkQuantileTracker(0.5) + t2 = SimpleSlidingQuantileTracker(len(numbers), 0.5) + for v in numbers: + t1.push(v) + t2.push(v) + self.assertTrue(abs(t1.get() - t2.get()) < 1e-9) + + for _ in range(10): + _accuracy_helper() + + +class SlidingQuantileTest(unittest.TestCase): + @parameterized.expand( + [ #SimpleSlidingQuantileTracker, + BufferedSlidingQuantileTracker + ]) + def test_without_nan(self, tracker): + t = tracker(3, 0.5) + self.assertTrue(math.isnan(t.get())) + t.push(1) + self.assertEqual(t.get(), 1.0) + t.push(2) + self.assertEqual(t.get(), 1.5) + t.push(2) + self.assertEqual(t.get(), 2.0) + t.push(0) + self.assertEqual(t.get(), 2.0) + t.push(3) + self.assertEqual(t.get(), 2.0) + t.push(1) + self.assertEqual(t.get(), 1.0) + + @parameterized.expand( + [SimpleSlidingQuantileTracker, BufferedSlidingQuantileTracker]) + def test_with_nan(self, tracker): + t = tracker(3, 0.8) + self.assertTrue(math.isnan(t.get())) + t.push(1) + self.assertEqual(t.get(), 1.0) + t.push(2) + self.assertEqual(t.get(), 1.8) + t.push(float('nan')) + self.assertEqual(t.get(), 1.8) + t.push(2) + self.assertEqual(t.get(), 2.0) + t.push(0) + self.assertEqual(t.get(), 1.6) + t.push(float('nan')) + self.assertEqual(t.get(), 1.6) + t.push(float('nan')) + self.assertEqual(t.get(), 0.0) + t.push(float('nan')) + self.assertTrue(math.isnan(t.get())) + t.push(3) + self.assertEqual(t.get(), 3.0) + t.push(1) + self.assertEqual(t.get(), 2.6) + + def test_accuracy_fuzz(self): + seed = int(time.time()) + random.seed(seed) + print("Random seed: %d" % seed) + + def _accuracy_helper(): + numbers = [] + for _ in range(5000): + numbers.append(random.randint(0, 1000)) + + t1 = BufferedSlidingQuantileTracker(100, 0.1) + t2 = SimpleSlidingQuantileTracker(100, 0.1) + for v in numbers: + t1.push(v) + t2.push(v) + self.assertTrue(abs(t1.get() - t2.get()) < 1e-9) + + for _ in range(10): + _accuracy_helper() + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + unittest.main() diff --git a/sdks/python/apache_beam/ml/anomaly/univariate/stdev.py b/sdks/python/apache_beam/ml/anomaly/univariate/stdev.py new file mode 100644 index 000000000000..cbd5d9f26ebc --- /dev/null +++ b/sdks/python/apache_beam/ml/anomaly/univariate/stdev.py @@ -0,0 +1,156 @@ +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Trackers for calculating standard deviation in windowed fashion. + +This module defines different types of standard deviation trackers that operate +on windows of data. It includes: + + * `SimpleSlidingStdevTracker`: Calculates stdev using numpy in a sliding + window. + * `IncLandmarkStdevTracker`: Incremental stdev tracker in landmark window + mode. + * `IncSlidingStdevTracker`: Incremental stdev tracker in sliding window mode. +""" + +import math +import warnings + +import numpy as np + +from apache_beam.ml.anomaly.specifiable import specifiable +from apache_beam.ml.anomaly.univariate.base import BaseTracker +from apache_beam.ml.anomaly.univariate.base import WindowedTracker +from apache_beam.ml.anomaly.univariate.base import WindowMode + + +class StdevTracker(BaseTracker): + """Abstract base class for standard deviation trackers. + + Currently, it does not add any specific functionality but provides a type + hierarchy for stdev trackers. + """ + pass + + +@specifiable +class SimpleSlidingStdevTracker(WindowedTracker, StdevTracker): + """Sliding window standard deviation tracker using NumPy. + + This tracker uses NumPy's `nanvar` function to calculate the variance of the + values currently in the sliding window and then takes the square root to get + the standard deviation. It's a simple, non-incremental approach. + """ + def __init__(self, window_size): + super().__init__(window_mode=WindowMode.SLIDING, window_size=window_size) + + def get(self): + """Calculates and returns the stdev of the current sliding window. + + Returns: + float: The standard deviation of the values in the current sliding window. + Returns NaN if the window contains fewer than 2 elements. + """ + with warnings.catch_warnings(record=False): + warnings.simplefilter("ignore") + # We do not use nanstd, since nanstd([]) gives 0, which is incorrect. + # Use nanvar instead. + return math.sqrt(np.nanvar(self._queue, ddof=1)) + + +class IncStdevTracker(WindowedTracker, StdevTracker): + """Abstract base class for incremental standard deviation trackers. + + This class implements an online algorithm for calculating standard deviation, + updating the standard deviation incrementally as new data points arrive. + + Args: + window_mode: A `WindowMode` enum specifying whether the window is `LANDMARK` + or `SLIDING`. + **kwargs: Keyword arguments passed to the parent class constructor. + """ + def __init__(self, window_mode, **kwargs): + super().__init__(window_mode, **kwargs) + self._mean = 0 + self._m2 = 0 + + def push(self, x): + """Pushes a new value and updates the incremental standard deviation + calculation. + + Implements Welford's online algorithm for variance, then derives stdev. + + Args: + x: The new value to be pushed. + """ + if not math.isnan(x): + self._n += 1 + delta1 = x - self._mean + else: + delta1 = 0 + + if self._window_mode == WindowMode.SLIDING: + if (len(self._queue) >= self._window_size and + not math.isnan(old_x := self.pop())): + self._n -= 1 + delta2 = self._mean - old_x + else: + delta2 = 0 + + super().push(x) + else: + delta2 = 0 + + if self._n > 0: + self._mean += (delta1 + delta2) / self._n + + if delta1 != 0: + self._m2 += delta1 * (x - self._mean) + if delta2 != 0: + self._m2 += delta2 * (old_x - self._mean) + else: + self._mean = 0 + self._m2 = 0 + + def get(self): + """Returns the current incremental standard deviation. + + Returns: + float: The current incremental standard deviation value. + Returns NaN if fewer than 2 valid (non-NaN) values have been pushed. + """ + if self._n < 2: + # keep it consistent with numpy + return float("nan") + dof = self._n - 1 + return math.sqrt(self._m2 / dof) + + +@specifiable +class IncLandmarkStdevTracker(IncStdevTracker): + """Landmark window standard deviation tracker using incremental calculation.""" # pylint: disable=line-too-long + + def __init__(self): + super().__init__(window_mode=WindowMode.LANDMARK) + + +@specifiable +class IncSlidingStdevTracker(IncStdevTracker): + """Sliding window standard deviation tracker using incremental calculation. + + Args: + window_size: The size of the sliding window. + """ + def __init__(self, window_size): + super().__init__(window_mode=WindowMode.SLIDING, window_size=window_size) diff --git a/sdks/python/apache_beam/ml/anomaly/univariate/stdev_test.py b/sdks/python/apache_beam/ml/anomaly/univariate/stdev_test.py new file mode 100644 index 000000000000..c26f16e3a1b7 --- /dev/null +++ b/sdks/python/apache_beam/ml/anomaly/univariate/stdev_test.py @@ -0,0 +1,157 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import logging +import math +import random +import time +import unittest + +from parameterized import parameterized + +from apache_beam.ml.anomaly.univariate.stdev import IncLandmarkStdevTracker +from apache_beam.ml.anomaly.univariate.stdev import IncSlidingStdevTracker +from apache_beam.ml.anomaly.univariate.stdev import SimpleSlidingStdevTracker + + +class LandmarkStdevTest(unittest.TestCase): + def test_without_nan(self): + t = IncLandmarkStdevTracker() + self.assertTrue(math.isnan(t.get())) + t.push(1) + self.assertTrue(math.isnan(t.get())) + t.push(2) + self.assertEqual(t.get(), 0.7071067811865476) + t.push(3) + self.assertEqual(t.get(), 1.0) + t.push(10) + self.assertEqual(t.get(), 4.08248290463863) + + def test_with_nan(self): + t = IncLandmarkStdevTracker() + + t.push(float('nan')) + self.assertTrue(math.isnan(t.get())) # NaN is ignored + + t.push(float('nan')) + self.assertTrue(math.isnan(t.get())) # NaN is ignored + + t.push(2) + self.assertTrue(math.isnan(t.get())) + t.push(1) + self.assertEqual(t.get(), 0.7071067811865476) + t.push(3) + self.assertEqual(t.get(), 1.0) + + # flush the only number out + t.push(float('nan')) + self.assertEqual(t.get(), 1.0) + + t.push(10) + self.assertEqual(t.get(), 4.08248290463863) + + def test_accuracy_fuzz(self): + seed = int(time.time()) + random.seed(seed) + print("Random seed: %d" % seed) + + for _ in range(10): + numbers = [] + for _ in range(5000): + numbers.append(random.randint(0, 1000)) + + t1 = IncLandmarkStdevTracker() + t2 = SimpleSlidingStdevTracker(len(numbers)) + for v in numbers: + t1.push(v) + t2.push(v) + self.assertTrue((math.isnan(t1.get()) and math.isnan(t2.get())) or + abs(t1.get() - t2.get()) < 1e-9) + + +class SlidingStdevTest(unittest.TestCase): + @parameterized.expand([SimpleSlidingStdevTracker, IncSlidingStdevTracker]) + def test_without_nan(self, tracker): + t = tracker(3) + self.assertTrue(math.isnan(t.get())) + t.push(2) + self.assertTrue(math.isnan(t.get())) + t.push(1) + self.assertEqual(t.get(), 0.7071067811865476) + t.push(3) + self.assertEqual(t.get(), 1.0) + t.push(10) + self.assertEqual(t.get(), 4.725815626252609) + + @parameterized.expand([SimpleSlidingStdevTracker, IncSlidingStdevTracker]) + def test_stdev_with_nan(self, tracker): + t = tracker(3) + + t.push(float('nan')) + self.assertTrue(math.isnan(t.get())) # NaN is ignored + + t.push(float('nan')) + self.assertTrue(math.isnan(t.get())) # NaN is ignored + + t.push(1) + self.assertTrue(math.isnan(t.get())) + t.push(2) + self.assertEqual(t.get(), 0.7071067811865476) + t.push(3) + self.assertEqual(t.get(), 1.0) + + # flush the only number out + t.push(float('nan')) + self.assertEqual(t.get(), 0.7071067811865476) + + t.push(float('nan')) + self.assertTrue(math.isnan(t.get())) + + t.push(float('nan')) + self.assertTrue(math.isnan(t.get())) + + if tracker is IncSlidingStdevTracker: + self.assertEqual(t._m2, 0) + self.assertEqual(t._mean, 0) + + t.push(4) + self.assertTrue(math.isnan(t.get())) + t.push(5) + self.assertEqual(t.get(), 0.7071067811865476) + + def test_accuracy_fuzz(self): + seed = int(time.time()) + random.seed(seed) + print("Random seed: %d" % seed) + + for _ in range(10): + numbers = [] + for _ in range(5000): + numbers.append(random.randint(0, 1000)) + + t1 = IncSlidingStdevTracker(100) + t2 = SimpleSlidingStdevTracker(100) + for v in numbers: + t1.push(v) + t2.push(v) + self.assertTrue((math.isnan(t1.get()) and math.isnan(t2.get())) or + abs(t1.get() - t2.get()) < 1e-9) + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + unittest.main()