diff --git a/sdks/python/apache_beam/ml/anomaly/base.py b/sdks/python/apache_beam/ml/anomaly/base.py index b849268067b6..6a717cf5db16 100644 --- a/sdks/python/apache_beam/ml/anomaly/base.py +++ b/sdks/python/apache_beam/ml/anomaly/base.py @@ -61,9 +61,8 @@ class AnomalyResult(): """A dataclass for the anomaly detection results""" #: The original input data. example: beam.Row - #: The iterable of `AnomalyPrediction` objects containing the predictions. - #: Expect length 1 if an aggregation strategy is applied. - predictions: Iterable[AnomalyPrediction] + #: The `AnomalyPrediction` object containing the prediction. + prediction: AnomalyPrediction class ThresholdFn(abc.ABC): diff --git a/sdks/python/apache_beam/ml/anomaly/univariate/__init__.py b/sdks/python/apache_beam/ml/anomaly/univariate/__init__.py new file mode 100644 index 000000000000..cce3acad34a4 --- /dev/null +++ b/sdks/python/apache_beam/ml/anomaly/univariate/__init__.py @@ -0,0 +1,16 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# diff --git a/sdks/python/apache_beam/ml/anomaly/univariate/base.py b/sdks/python/apache_beam/ml/anomaly/univariate/base.py new file mode 100644 index 000000000000..f604611edef3 --- /dev/null +++ b/sdks/python/apache_beam/ml/anomaly/univariate/base.py @@ -0,0 +1,50 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import abc +from enum import Enum +from collections import deque + + +class BaseTracker(abc.ABC): + @abc.abstractmethod + def push(self, x): + raise NotImplementedError() + + @abc.abstractmethod + def get(self, **kwargs): + raise NotImplementedError() + + +class WindowMode(Enum): + LANDMARK = 1 + SLIDING = 2 + + +class IncrementalTracker(BaseTracker): + def __init__(self, window_mode, **kwargs): + if window_mode == WindowMode.SLIDING: + self._window_size = kwargs.get("window_size", 100) + self._queue = deque(maxlen=self._window_size) + self._n = 0 + self._window_mode = window_mode + + def push(self, x): + self._queue.append(x) + + def pop(self): + return self._queue.popleft() diff --git a/sdks/python/apache_beam/ml/anomaly/univariate/mean.py b/sdks/python/apache_beam/ml/anomaly/univariate/mean.py new file mode 100644 index 000000000000..ff3df1ad1211 --- /dev/null +++ b/sdks/python/apache_beam/ml/anomaly/univariate/mean.py @@ -0,0 +1,83 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import math +import warnings + +import numpy as np + +from apache_beam.ml.anomaly.univariate.base import IncrementalTracker +from apache_beam.ml.anomaly.univariate.base import WindowMode + +__all__ = [ + "LandmarkMeanTracker", "SimpleSlidingMeanTracker", "SlidingMeanTracker" +] + + +class SimpleSlidingMeanTracker(IncrementalTracker): + def __init__(self, window_size): + super().__init__(window_mode=WindowMode.SLIDING, window_size=window_size) + + def get(self): + if len(self._queue) == 0: + return float('nan') + + with warnings.catch_warnings(record=False): + warnings.simplefilter("ignore") + return np.nanmean(self._queue) + + +class IncrementalMeanTracker(IncrementalTracker): + def __init__(self, window_mode, **kwargs): + super().__init__(window_mode, **kwargs) + self._mean = 0 + + def push(self, x): + if not math.isnan(x): + self._n += 1 + delta = x - self._mean + else: + delta = 0 + + if self._window_mode == WindowMode.SLIDING: + if len(self._queue) >= self._window_size and \ + not math.isnan(old_x := self.pop()): + self._n -= 1 + delta += (self._mean - old_x) + + super().push(x) + + if self._n > 0: + self._mean += delta / self._n + else: + self._mean = 0 + + def get(self): + if self._n < 1: + # keep it consistent with numpy + return float("nan") + return self._mean + + +class LandmarkMeanTracker(IncrementalMeanTracker): + def __init__(self): + super().__init__(window_mode=WindowMode.LANDMARK) + + +class SlidingMeanTracker(IncrementalMeanTracker): + def __init__(self, window_size): + super().__init__(window_mode=WindowMode.SLIDING, window_size=window_size) diff --git a/sdks/python/apache_beam/ml/anomaly/univariate/mean_test.py b/sdks/python/apache_beam/ml/anomaly/univariate/mean_test.py new file mode 100644 index 000000000000..b1d12fc7041c --- /dev/null +++ b/sdks/python/apache_beam/ml/anomaly/univariate/mean_test.py @@ -0,0 +1,161 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import logging +import math +import random +import time +import unittest +import warnings + +from parameterized import parameterized + +from apache_beam.ml.anomaly.univariate.mean import LandmarkMeanTracker +from apache_beam.ml.anomaly.univariate.mean import SimpleSlidingMeanTracker +from apache_beam.ml.anomaly.univariate.mean import SlidingMeanTracker + +FLOAT64_MAX = 1.79769313486231570814527423731704356798070e+308 + + +class LandmarkMeanTest(unittest.TestCase): + def test_without_nan(self): + t = LandmarkMeanTracker() + self.assertTrue(math.isnan(t.get())) # Returns NaN if tracker is empty + + t.push(1) + self.assertEqual(t.get(), 1.0) + t.push(3) + self.assertEqual(t.get(), 2.0) + t.push(8) + self.assertEqual(t.get(), 4.0) + t.push(16) + self.assertEqual(t.get(), 7.0) + t.push(-3) + self.assertEqual(t.get(), 5.0) + + def test_with_nan(self): + t = LandmarkMeanTracker() + + t.push(float('nan')) + self.assertTrue(math.isnan(t.get())) # NaN is ignored + t.push(1) + self.assertEqual(t.get(), 1.0) + t.push(float('nan')) + self.assertEqual(t.get(), 1.0) + t.push(float('nan')) + self.assertEqual(t.get(), 1.0) + t.push(float('nan')) + self.assertEqual(t.get(), 1.0) + + def test_with_float64_max(self): + t = LandmarkMeanTracker() + t.push(FLOAT64_MAX) + self.assertEqual(t.get(), FLOAT64_MAX) + t.push(FLOAT64_MAX) + self.assertEqual(t.get(), FLOAT64_MAX) + + def test_accuracy_fuzz(self): + seed = int(time.time()) + random.seed(seed) + print("Random seed: %d" % seed) + + for _ in range(10): + numbers = [] + for _ in range(5000): + numbers.append(random.randint(0, 1000)) + + with warnings.catch_warnings(record=False): + warnings.simplefilter("ignore") + t1 = LandmarkMeanTracker() + t2 = SimpleSlidingMeanTracker(len(numbers)) + for v in numbers: + t1.push(v) + t2.push(v) + self.assertTrue(abs(t1.get() - t2.get()) < 1e-9) + + +class SlidingMeanTest(unittest.TestCase): + @parameterized.expand([SimpleSlidingMeanTracker, SlidingMeanTracker]) + def test_without_nan(self, tracker): + t = tracker(3) + self.assertTrue(math.isnan(t.get())) # Returns NaN if tracker is empty + + t.push(1) + self.assertEqual(t.get(), 1.0) + t.push(3) + self.assertEqual(t.get(), 2.0) + t.push(8) + self.assertEqual(t.get(), 4.0) + t.push(16) + self.assertEqual(t.get(), 9.0) + t.push(-3) + self.assertEqual(t.get(), 7.0) + + @parameterized.expand([SimpleSlidingMeanTracker, SlidingMeanTracker]) + def test_with_nan(self, tracker): + t = tracker(3) + + t.push(float('nan')) + self.assertTrue(math.isnan(t.get())) # NaN is ignored + t.push(1) + self.assertEqual(t.get(), 1.0) + + # flush the only number out + t.push(float('nan')) + self.assertEqual(t.get(), 1.0) + t.push(float('nan')) + self.assertEqual(t.get(), 1.0) + t.push(float('nan')) + self.assertTrue(math.isnan(t.get())) # All values in the tracker are NaN + t.push(4) + self.assertEqual(t.get(), 4.0) + + @parameterized.expand([SimpleSlidingMeanTracker, SlidingMeanTracker]) + def test_with_float64_max(self, tracker): + t = tracker(2) + t.push(FLOAT64_MAX) + self.assertEqual(t.get(), FLOAT64_MAX) + t.push(FLOAT64_MAX) + if tracker is SlidingMeanTracker: + self.assertEqual(t.get(), FLOAT64_MAX) + self.assertFalse(math.isinf(t.get())) + else: + # SimpleSlidingMean (using Numpy) returns inf when it computes the + # average of [float64_max, float64_max]. + self.assertTrue(math.isinf(t.get())) + + def test_accuracy_fuzz(self): + seed = int(time.time()) + random.seed(seed) + print("Random seed: %d" % seed) + + for _ in range(10): + numbers = [] + for _ in range(5000): + numbers.append(random.randint(0, 1000)) + + t1 = SlidingMeanTracker(100) + t2 = SimpleSlidingMeanTracker(100) + for v in numbers: + t1.push(v) + t2.push(v) + self.assertTrue(abs(t1.get() - t2.get()) < 1e-9) + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + unittest.main() diff --git a/sdks/python/apache_beam/ml/anomaly/univariate/perf_test.py b/sdks/python/apache_beam/ml/anomaly/univariate/perf_test.py new file mode 100644 index 000000000000..15a2f0f9663d --- /dev/null +++ b/sdks/python/apache_beam/ml/anomaly/univariate/perf_test.py @@ -0,0 +1,77 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import logging +import random +import time +import timeit +import statistics +import unittest +import warnings + +from apache_beam.ml.anomaly.univariate.mean import * +from apache_beam.ml.anomaly.univariate.stdev import * +from apache_beam.ml.anomaly.univariate.quantile import * + +seed_value_time = int(time.time()) +random.seed(seed_value_time) +print(f"{'Seed value':30s}{seed_value_time}") + +numbers = [] +for _ in range(50000): + numbers.append(random.randint(0, 1000)) + + +def run_tracker(tracker, numbers): + for i in range(len(numbers)): + tracker.push(numbers[i]) + _ = tracker.get() + + +def print_result(tracker, number=10, repeat=5): + runtimes = timeit.repeat( + lambda: run_tracker(tracker, numbers), number=number, repeat=repeat) + mean = statistics.mean(runtimes) + sd = statistics.stdev(runtimes) + print(f"{tracker.__class__.__name__:30s}{mean:.6f} ± {sd:.6f}") + + +class PerfTest(unittest.TestCase): + def test_mean_perf(self): + print() + print_result(LandmarkMeanTracker()) + print_result(SlidingMeanTracker(100)) + print_result(SimpleSlidingMeanTracker(100), number=1) + + def test_stdev_perf(self): + print() + print_result(LandmarkStdevTracker()) + print_result(SlidingStdevTracker(100)) + print_result(SimpleSlidingStdevTracker(100), number=1) + + def test_quantile_perf(self): + print() + with warnings.catch_warnings(record=False): + warnings.simplefilter("ignore") + print_result(LandmarkQuantileTracker(0.5)) + print_result(SlidingQuantileTracker(100, 0.5)) + print_result(SimpleSlidingQuantileTracker(100, 0.5), number=1) + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + unittest.main() diff --git a/sdks/python/apache_beam/ml/anomaly/univariate/quantile.py b/sdks/python/apache_beam/ml/anomaly/univariate/quantile.py new file mode 100644 index 000000000000..5a6b4716fff1 --- /dev/null +++ b/sdks/python/apache_beam/ml/anomaly/univariate/quantile.py @@ -0,0 +1,91 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import math +import typing +import warnings + +import numpy as np +from sortedcontainers import SortedList + +from apache_beam.ml.anomaly.univariate.base import IncrementalTracker +from apache_beam.ml.anomaly.univariate.base import WindowMode + +__all__ = [ + "LandmarkQuantileTracker", + "SimpleSlidingQuantileTracker", + "SlidingQuantileTracker" +] + + +class SimpleSlidingQuantileTracker(IncrementalTracker): + def __init__(self, window_size, q): + super().__init__(window_mode=WindowMode.SLIDING, window_size=window_size) + assert 0 <= q <= 1, "quantile argument should be between 0 and 1" + self._q = q + + def get(self): + with warnings.catch_warnings(record=False): + warnings.simplefilter("ignore") + return np.nanquantile(self._queue, self._q) + + +class IncrementalQuantileTracker(IncrementalTracker): + def __init__(self, window_mode, q, **kwargs): + super().__init__(window_mode, **kwargs) + assert 0 <= q <= 1, "quantile argument should be between 0 and 1" + self._q = q + self._sorted_items = SortedList() + + def push(self, x): + if not math.isnan(x): + self._sorted_items.add(x) + + if self._window_mode == WindowMode.SLIDING: + if (len(self._queue) >= self._window_size and + not math.isnan(old_x := self.pop())): + self._sorted_items.discard(old_x) + + super().push(x) + + def get(self): + n = len(self._sorted_items) + if n < 1: + return float("nan") + + pos = self._q * (n - 1) + lo = math.floor(pos) + lo_value = typing.cast(float, self._sorted_items[lo]) + + # Use linear interpolation to yield the requested quantile + hi = min(lo + 1, n - 1) + hi_value: float = typing.cast(float, self._sorted_items[hi]) + return lo_value + (hi_value - lo_value) * (pos - lo) + + +class LandmarkQuantileTracker(IncrementalQuantileTracker): + def __init__(self, q): + warnings.warn( + "Quantile trackers should not be used in production due to " + "the unbounded memory consumption.") + super().__init__(window_mode=WindowMode.LANDMARK, q=q) + + +class SlidingQuantileTracker(IncrementalQuantileTracker): + def __init__(self, window_size, q): + super().__init__( + window_mode=WindowMode.SLIDING, q=q, window_size=window_size) diff --git a/sdks/python/apache_beam/ml/anomaly/univariate/quantile_test.py b/sdks/python/apache_beam/ml/anomaly/univariate/quantile_test.py new file mode 100644 index 000000000000..71de237ce8dd --- /dev/null +++ b/sdks/python/apache_beam/ml/anomaly/univariate/quantile_test.py @@ -0,0 +1,161 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import logging +import math +import random +import time +import unittest +import warnings + +from parameterized import parameterized + +from apache_beam.ml.anomaly.univariate.quantile import LandmarkQuantileTracker +from apache_beam.ml.anomaly.univariate.quantile import SimpleSlidingQuantileTracker # pylint: disable=line-too-long +from apache_beam.ml.anomaly.univariate.quantile import SlidingQuantileTracker + + +class LandmarkQuantileTest(unittest.TestCase): + def test_without_nan(self): + with warnings.catch_warnings(record=False): + warnings.simplefilter("ignore") + t = LandmarkQuantileTracker(0.5) + + self.assertTrue(math.isnan(t.get())) + t.push(1) + self.assertEqual(t.get(), 1.0) + t.push(2) + self.assertEqual(t.get(), 1.5) + t.push(2) + self.assertEqual(t.get(), 2.0) + t.push(0) + self.assertEqual(t.get(), 1.5) + t.push(3) + self.assertEqual(t.get(), 2.0) + t.push(1) + self.assertEqual(t.get(), 1.5) + + def test_with_nan(self): + with warnings.catch_warnings(record=False): + warnings.simplefilter("ignore") + t = LandmarkQuantileTracker(0.2) + + self.assertTrue(math.isnan(t.get())) + t.push(1) + self.assertEqual(t.get(), 1.0) + t.push(float('nan')) + self.assertEqual(t.get(), 1.0) + t.push(2) + self.assertEqual(t.get(), 1.2) + t.push(float('nan')) + self.assertEqual(t.get(), 1.2) + t.push(0) + self.assertEqual(t.get(), 0.4) + + def test_accuracy_fuzz(self): + seed = int(time.time()) + random.seed(seed) + print("Random seed: %d" % seed) + + def _accuracy_helper(): + numbers = [] + for _ in range(5000): + numbers.append(random.randint(0, 1000)) + + with warnings.catch_warnings(record=False): + warnings.simplefilter("ignore") + t1 = LandmarkQuantileTracker(0.5) + t2 = SimpleSlidingQuantileTracker(len(numbers), 0.5) + for v in numbers: + t1.push(v) + t2.push(v) + self.assertTrue(abs(t1.get() - t2.get()) < 1e-9) + + for _ in range(10): + _accuracy_helper() + + +class SlidingQuantileTest(unittest.TestCase): + @parameterized.expand( + [ #SimpleSlidingQuantileTracker, + SlidingQuantileTracker + ]) + def test_without_nan(self, tracker): + t = tracker(3, 0.5) + self.assertTrue(math.isnan(t.get())) + t.push(1) + self.assertEqual(t.get(), 1.0) + t.push(2) + self.assertEqual(t.get(), 1.5) + t.push(2) + self.assertEqual(t.get(), 2.0) + t.push(0) + self.assertEqual(t.get(), 2.0) + t.push(3) + self.assertEqual(t.get(), 2.0) + t.push(1) + self.assertEqual(t.get(), 1.0) + + @parameterized.expand([SimpleSlidingQuantileTracker, SlidingQuantileTracker]) + def test_with_nan(self, tracker): + t = tracker(3, 0.8) + self.assertTrue(math.isnan(t.get())) + t.push(1) + self.assertEqual(t.get(), 1.0) + t.push(2) + self.assertEqual(t.get(), 1.8) + t.push(float('nan')) + self.assertEqual(t.get(), 1.8) + t.push(2) + self.assertEqual(t.get(), 2.0) + t.push(0) + self.assertEqual(t.get(), 1.6) + t.push(float('nan')) + self.assertEqual(t.get(), 1.6) + t.push(float('nan')) + self.assertEqual(t.get(), 0.0) + t.push(float('nan')) + self.assertTrue(math.isnan(t.get())) + t.push(3) + self.assertEqual(t.get(), 3.0) + t.push(1) + self.assertEqual(t.get(), 2.6) + + def test_accuracy_fuzz(self): + seed = int(time.time()) + random.seed(seed) + print("Random seed: %d" % seed) + + def _accuracy_helper(): + numbers = [] + for _ in range(5000): + numbers.append(random.randint(0, 1000)) + + t1 = SlidingQuantileTracker(100, 0.1) + t2 = SimpleSlidingQuantileTracker(100, 0.1) + for v in numbers: + t1.push(v) + t2.push(v) + self.assertTrue(abs(t1.get() - t2.get()) < 1e-9) + + for _ in range(10): + _accuracy_helper() + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + unittest.main() diff --git a/sdks/python/apache_beam/ml/anomaly/univariate/stdev.py b/sdks/python/apache_beam/ml/anomaly/univariate/stdev.py new file mode 100644 index 000000000000..19fbc7f2410c --- /dev/null +++ b/sdks/python/apache_beam/ml/anomaly/univariate/stdev.py @@ -0,0 +1,91 @@ +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import math +import warnings + +import numpy as np + +from apache_beam.ml.anomaly.univariate.base import IncrementalTracker +from apache_beam.ml.anomaly.univariate.base import WindowMode + +__all__ = [ + "LandmarkStdevTracker", "SimpleSlidingStdevTracker", "SlidingStdevTracker" +] + + +class SimpleSlidingStdevTracker(IncrementalTracker): + def __init__(self, window_size): + super().__init__(window_mode=WindowMode.SLIDING, window_size=window_size) + + def get(self): + with warnings.catch_warnings(record=False): + warnings.simplefilter("ignore") + # We do not use nanstd, since nanstd([]) gives 0, which is incorrect. + # Use nanvar instead. + return math.sqrt(np.nanvar(self._queue, ddof=1)) + + +class IncrementalStdevTracker(IncrementalTracker): + def __init__(self, window_mode, **kwargs): + super().__init__(window_mode, **kwargs) + self._mean = 0 + self._m2 = 0 + + def push(self, x): + if not math.isnan(x): + self._n += 1 + delta1 = x - self._mean + else: + delta1 = 0 + + if self._window_mode == WindowMode.SLIDING: + if (len(self._queue) >= self._window_size and + not math.isnan(old_x := self.pop())): + self._n -= 1 + delta2 = self._mean - old_x + else: + delta2 = 0 + + super().push(x) + else: + delta2 = 0 + + if self._n > 0: + self._mean += (delta1 + delta2) / self._n + + if delta1 != 0: + self._m2 += delta1 * (x - self._mean) + if delta2 != 0: + self._m2 += delta2 * (old_x - self._mean) + else: + self._mean = 0 + self._m2 = 0 + + def get(self): + if self._n < 2: + # keep it consistent with numpy + return float("nan") + dof = self._n - 1 + return math.sqrt(self._m2 / dof) + + +class LandmarkStdevTracker(IncrementalStdevTracker): + def __init__(self): + super().__init__(window_mode=WindowMode.LANDMARK) + + +class SlidingStdevTracker(IncrementalStdevTracker): + def __init__(self, window_size): + super().__init__(window_mode=WindowMode.SLIDING, window_size=window_size) diff --git a/sdks/python/apache_beam/ml/anomaly/univariate/stdev_test.py b/sdks/python/apache_beam/ml/anomaly/univariate/stdev_test.py new file mode 100644 index 000000000000..a70ba73405ec --- /dev/null +++ b/sdks/python/apache_beam/ml/anomaly/univariate/stdev_test.py @@ -0,0 +1,157 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import logging +import math +import random +import time +import unittest + +from parameterized import parameterized + +from apache_beam.ml.anomaly.univariate.stdev import LandmarkStdevTracker +from apache_beam.ml.anomaly.univariate.stdev import SimpleSlidingStdevTracker +from apache_beam.ml.anomaly.univariate.stdev import SlidingStdevTracker + + +class LandmarkStdevTest(unittest.TestCase): + def test_without_nan(self): + t = LandmarkStdevTracker() + self.assertTrue(math.isnan(t.get())) + t.push(1) + self.assertTrue(math.isnan(t.get())) + t.push(2) + self.assertEqual(t.get(), 0.7071067811865476) + t.push(3) + self.assertEqual(t.get(), 1.0) + t.push(10) + self.assertEqual(t.get(), 4.08248290463863) + + def test_with_nan(self): + t = LandmarkStdevTracker() + + t.push(float('nan')) + self.assertTrue(math.isnan(t.get())) # NaN is ignored + + t.push(float('nan')) + self.assertTrue(math.isnan(t.get())) # NaN is ignored + + t.push(2) + self.assertTrue(math.isnan(t.get())) + t.push(1) + self.assertEqual(t.get(), 0.7071067811865476) + t.push(3) + self.assertEqual(t.get(), 1.0) + + # flush the only number out + t.push(float('nan')) + self.assertEqual(t.get(), 1.0) + + t.push(10) + self.assertEqual(t.get(), 4.08248290463863) + + def test_accuracy_fuzz(self): + seed = int(time.time()) + random.seed(seed) + print("Random seed: %d" % seed) + + for _ in range(10): + numbers = [] + for _ in range(5000): + numbers.append(random.randint(0, 1000)) + + t1 = LandmarkStdevTracker() + t2 = SimpleSlidingStdevTracker(len(numbers)) + for v in numbers: + t1.push(v) + t2.push(v) + self.assertTrue((math.isnan(t1.get()) and math.isnan(t2.get())) or + abs(t1.get() - t2.get()) < 1e-9) + + +class SlidingStdevTest(unittest.TestCase): + @parameterized.expand([SimpleSlidingStdevTracker, SlidingStdevTracker]) + def test_without_nan(self, tracker): + t = tracker(3) + self.assertTrue(math.isnan(t.get())) + t.push(2) + self.assertTrue(math.isnan(t.get())) + t.push(1) + self.assertEqual(t.get(), 0.7071067811865476) + t.push(3) + self.assertEqual(t.get(), 1.0) + t.push(10) + self.assertEqual(t.get(), 4.725815626252609) + + @parameterized.expand([SimpleSlidingStdevTracker, SlidingStdevTracker]) + def test_stdev_with_nan(self, tracker): + t = tracker(3) + + t.push(float('nan')) + self.assertTrue(math.isnan(t.get())) # NaN is ignored + + t.push(float('nan')) + self.assertTrue(math.isnan(t.get())) # NaN is ignored + + t.push(1) + self.assertTrue(math.isnan(t.get())) + t.push(2) + self.assertEqual(t.get(), 0.7071067811865476) + t.push(3) + self.assertEqual(t.get(), 1.0) + + # flush the only number out + t.push(float('nan')) + self.assertEqual(t.get(), 0.7071067811865476) + + t.push(float('nan')) + self.assertTrue(math.isnan(t.get())) + + t.push(float('nan')) + self.assertTrue(math.isnan(t.get())) + + if tracker is SlidingStdevTracker: + self.assertEqual(t._m2, 0) + self.assertEqual(t._mean, 0) + + t.push(4) + self.assertTrue(math.isnan(t.get())) + t.push(5) + self.assertEqual(t.get(), 0.7071067811865476) + + def test_accuracy_fuzz(self): + seed = int(time.time()) + random.seed(seed) + print("Random seed: %d" % seed) + + for _ in range(10): + numbers = [] + for _ in range(5000): + numbers.append(random.randint(0, 1000)) + + t1 = SlidingStdevTracker(100) + t2 = SimpleSlidingStdevTracker(100) + for v in numbers: + t1.push(v) + t2.push(v) + self.assertTrue((math.isnan(t1.get()) and math.isnan(t2.get())) or + abs(t1.get() - t2.get()) < 1e-9) + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + unittest.main()