Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AnomalyDetection] Add threshold and aggregation functions. #34018

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
324 changes: 324 additions & 0 deletions sdks/python/apache_beam/ml/anomaly/aggregations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,324 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import collections
import math
import statistics
from typing import Any
from typing import Callable
from typing import Iterable
from typing import Optional

from apache_beam.ml.anomaly.base import AggregationFn
from apache_beam.ml.anomaly.base import AnomalyPrediction
from apache_beam.ml.anomaly.specifiable import specifiable


class _AggModelIdMixin:
def __init__(self, agg_model_id: Optional[str] = None):
self._agg_model_id = agg_model_id

def _set_agg_model_id_if_unset(self, agg_model_id: str) -> None:
if self._agg_model_id is None:
self._agg_model_id = agg_model_id

def add_model_id(self, result_dict):
result_dict["model_id"] = self._agg_model_id


class _SourcePredictionMixin:
def __init__(self, include_source_predictions):
self._include_source_predictions = include_source_predictions

def add_source_predictions(self, result_dict, source_predictions):
if self._include_source_predictions:
result_dict["source_predictions"] = list(source_predictions)


class LabelAggregation(AggregationFn, _AggModelIdMixin, _SourcePredictionMixin):
"""Aggregates anomaly predictions based on their labels.

This is an abstract base class for `AggregationFn`s that combine multiple
`AnomalyPrediction` objects into a single `AnomalyPrediction` based on
the labels of the input predictions.

Args:
agg_func (Callable[[Iterable[int]], int]): A function that aggregates
a collection of anomaly labels (integers) into a single label.
agg_model_id (Optional[str]): The model id used in aggregated predictions.
Defaults to None.
include_source_predictions (bool): If True, include the input predictions in
the `source_predictions` of the output. Defaults to False.
"""
def __init__(
self,
agg_func: Callable[[Iterable[int]], int],
agg_model_id: Optional[str] = None,
include_source_predictions: bool = False,
missing_label: int = -2,
):
self._agg = agg_func
self._missing_label = missing_label
_AggModelIdMixin.__init__(self, agg_model_id)
_SourcePredictionMixin.__init__(self, include_source_predictions)

def apply(
self, predictions: Iterable[AnomalyPrediction]) -> AnomalyPrediction:
"""Applies the label aggregation function to a list of predictions.

Args:
predictions (Iterable[AnomalyPrediction]): A collection of
`AnomalyPrediction` objects to be aggregated.

Returns:
AnomalyPrediction: A single `AnomalyPrediction` object with the
aggregated label. The aggregated label is determined as follows:

- If there are any non-missing and non-error labels, the `agg_func` is
applied to aggregate them.
- If all labels are error labels (`None`), the aggregated label is also
`None`.
- If there are a mix of missing and error labels, the aggregated label
is the `missing_label`.
"""
result_dict: dict[str, Any] = {}
_AggModelIdMixin.add_model_id(self, result_dict)
_SourcePredictionMixin.add_source_predictions(
self, result_dict, predictions)

labels = [
prediction.label for prediction in predictions if
prediction.label is not None and prediction.label != self._missing_label
]

if len(labels) > 0:
# apply aggregation_fn if there is any non-None and non-missing label
result_dict["label"] = self._agg(labels)
elif all(map(lambda x: x.label is None, predictions)):
# all are error labels (None) -- all scores are error
result_dict["label"] = None
else:
# some missing labels with some error labels (None)
result_dict["label"] = self._missing_label

return AnomalyPrediction(**result_dict)


class ScoreAggregation(AggregationFn, _AggModelIdMixin, _SourcePredictionMixin):
"""Aggregates anomaly predictions based on their scores.

This is an abstract base class for `AggregationFn`s that combine multiple
`AnomalyPrediction` objects into a single `AnomalyPrediction` based on
the scores of the input predictions.

Args:
agg_func (Callable[[Iterable[float]], float]): A function that aggregates
a collection of anomaly scores (floats) into a single score.
agg_model_id (Optional[str]): The model id used in aggregated predictions.
Defaults to None.
include_source_predictions (bool): If True, include the input predictions in
the `source_predictions` of the output. Defaults to False.
"""
def __init__(
self,
agg_func: Callable[[Iterable[float]], float],
agg_model_id: Optional[str] = None,
include_source_predictions: bool = False):
self._agg = agg_func
_AggModelIdMixin.__init__(self, agg_model_id)
_SourcePredictionMixin.__init__(self, include_source_predictions)

def apply(
self, predictions: Iterable[AnomalyPrediction]) -> AnomalyPrediction:
"""Applies the score aggregation function to a list of predictions.

Args:
predictions (Iterable[AnomalyPrediction]): A collection of
`AnomalyPrediction` objects to be aggregated.

Returns:
AnomalyPrediction: A single `AnomalyPrediction` object with the
aggregated score. The aggregated score is determined as follows:

- If there are any non-missing and non-error scores, the `agg_func` is
applied to aggregate them.
- If all scores are error scores (`None`), the aggregated score is also
`None`.
- If there are a mix of missing (`NaN`) and error scores (`None`), the
aggregated score is `NaN`.
"""
result_dict: dict[str, Any] = {}
_AggModelIdMixin.add_model_id(self, result_dict)
_SourcePredictionMixin.add_source_predictions(
self, result_dict, predictions)

scores = [
prediction.score for prediction in predictions
if prediction.score is not None and not math.isnan(prediction.score)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it might make sense to define a score for predictions which have no score, but have a label (and vice versa). Or maybe we can just throw? I guess this relates to my above question - when would we expect this scoreless condition to happen.

Copy link
Collaborator Author

@shunping shunping Feb 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for raising this important point.

Let me take a step back and clarify the workflow we implemented here:
[input] -> detector -> [score] -> threhold_fn -> [label] -> aggregation_fn -> [aggregated label]

  • First, we are trying to address scenarios where a detector generates score of None and NaN. In my opinion, we can distinguish between these two cases:

    • The detector is NOT ready to give a prediction. This could imply that the detector needs some warm-up time before the first inference can be made.
    • The detector is ready to predict, but there is something wrong during the prediction process. For example, the input data could be ill-formated or the detector is simply not able to make a prediction on this input.

    We can use None to represent the first case, and NaN for the second one. The rationale is that None value is something we don't know yet, but recoverable (if we feed the input into the detector that is ready to score), but NaN is coming from an error during prediction and can never be recovered.

  • After we have None and NaN scores, the threshold_fn needs to handle how to assign labels for them.

    • In the current implementation, I only consider None and assign a normal label to it, which may be ok, because we don't want to flag outliers when the detector is still warming up. Alternatively, we can also set the label to be None which means that we will defer the decision to other detectors.
    • For the irrecoverable NaN score, I think we can assign an outlier label.
  • When multiple labels are ready for aggregation, it is reasonable to apply the aggregation_fn on all the non-None labels. If they are all None (the situation you mentioned in the previous comment), maybe we can expose another parameter in the aggregation function for undecided default.

WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally, I like this approach and think that using NaN/None for weird failure/intentional no output is reasonable.

We can use None to represent the first case, and NaN for the second one. The rationale is that None value is something we don't know yet, but recoverable (if we feed the input into the detector that is ready to score), but NaN is coming from an error during prediction and can never be recovered.

I'd actually flip these. I think None is more likely to happen because of some user mistake (e.g. I'm using a predictor that outputs a label in a situation that expects a score or vice versa), whereas NaN is an intentional choice.

When multiple labels are ready for aggregation, it is reasonable to apply the aggregation_fn on all the non-None labels. If they are all None (the situation you mentioned in the previous comment), maybe we can expose another parameter in the aggregation function for undecided default.

I think if all the detectors agree (whether that is a None or NaN, it makes sense to match whatever they are outputting). If they're all inconclusive, an inconclusive result makes sense. If they're all errors, an error result makes sense. Thoughts?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd actually flip these. I think None is more likely to happen because of some user mistake (e.g. I'm using a predictor that outputs a label in a situation that expects a score or vice versa), whereas NaN is an intentional choice.

I am fine with that.

I think if all the detectors agree (whether that is a None or NaN, it makes sense to match whatever they are outputting). If they're all inconclusive, an inconclusive result makes sense. If they're all errors, an error result makes sense. Thoughts?

Hmmm...Are you saying that other than normal and outlier label, we will have to add two labels for the cases of score=None and score=NaN, respectively? This will be get a bit complicated; for example, some model says None while the other says NaN.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On a second thought, maybe it is not as complicated as it seems to me. Let me see if it can be sorted out.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. I made the changes. PTAL.

]

if len(scores) > 0:
# apply aggregation_fn if there is any non-None and non-NaN score
result_dict["score"] = self._agg(scores)
elif all(map(lambda x: x.score is None, predictions)):
# all are error scores (None)
result_dict["score"] = None
else:
# some missing scores (NaN) with some error scores (None)
result_dict["score"] = float("NaN")

return AnomalyPrediction(**result_dict)


@specifiable
class MajorityVote(LabelAggregation):
"""Aggregates anomaly labels using majority voting.

This `AggregationFn` implements a majority voting strategy to combine
anomaly labels from multiple `AnomalyPrediction` objects. It counts the
occurrences of normal and outlier labels and selects the label with the
higher count as the aggregated label. In case of a tie, a tie-breaker
label is used.

Example:
If input labels are [normal, outlier, outlier, normal, outlier], and
normal_label=0, outlier_label=1, then the aggregated label will be
outlier (1) because outliers have a majority (3 vs 2).

Args:
normal_label (int): The integer label for normal predictions. Defaults to 0.
outlier_label (int): The integer label for outlier predictions. Defaults to
1.
tie_breaker (int): The label to return if there is a tie in votes.
Defaults to 0 (normal_label).
**kwargs: Additional keyword arguments to pass to the base
`LabelAggregation` class.
"""
def __init__(self, normal_label=0, outlier_label=1, tie_breaker=0, **kwargs):
self._tie_breaker = tie_breaker
self._normal_label = normal_label
self._outlier_label = outlier_label

def inner(predictions: Iterable[int]) -> int:
counters = collections.Counter(predictions)
if counters[self._normal_label] < counters[self._outlier_label]:
vote = self._outlier_label
elif counters[self._normal_label] > counters[self._outlier_label]:
vote = self._normal_label
else:
vote = self._tie_breaker
return vote

super().__init__(agg_func=inner, **kwargs)


# And scheme
@specifiable
class AllVote(LabelAggregation):
"""Aggregates anomaly labels using an "all vote" (AND) scheme.

This `AggregationFn` implements an "all vote" strategy. It aggregates
anomaly labels such that the result is considered an outlier only if all
input `AnomalyPrediction` objects are labeled as outliers.

Example:
If input labels are [outlier, outlier, outlier], and outlier_label=1,
then the aggregated label will be outlier (1).
If input labels are [outlier, normal, outlier], and outlier_label=1,
then the aggregated label will be normal (0).

Args:
normal_label (int): The integer label for normal predictions. Defaults to 0.
outlier_label (int): The integer label for outlier predictions. Defaults to
1.
**kwargs: Additional keyword arguments to pass to the base
`LabelAggregation` class.
"""
def __init__(self, normal_label=0, outlier_label=1, **kwargs):
self._normal_label = normal_label
self._outlier_label = outlier_label

def inner(predictions: Iterable[int]) -> int:
return self._outlier_label if all(
map(lambda p: p == self._outlier_label,
predictions)) else self._normal_label

super().__init__(agg_func=inner, **kwargs)


# Or scheme
@specifiable
class AnyVote(LabelAggregation):
"""Aggregates anomaly labels using an "any vote" (OR) scheme.

This `AggregationFn` implements an "any vote" strategy. It aggregates
anomaly labels such that the result is considered an outlier if at least
one of the input `AnomalyPrediction` objects is labeled as an outlier.

Example:
If input labels are [normal, normal, outlier], and outlier_label=1,
then the aggregated label will be outlier (1).
If input labels are [normal, normal, normal], and outlier_label=1,
then the aggregated label will be normal (0).

Args:
normal_label (int): The integer label for normal predictions. Defaults to 0.
outlier_label (int): The integer label for outlier predictions. Defaults to
1.
**kwargs: Additional keyword arguments to pass to the base
`LabelAggregation` class.
"""
def __init__(self, normal_label=0, outlier_label=1, **kwargs):
self._normal_label = normal_label
self._outlier_label = outlier_label

def inner(predictions: Iterable[int]) -> int:
return self._outlier_label if any(
map(lambda p: p == self._outlier_label,
predictions)) else self._normal_label

super().__init__(agg_func=inner, **kwargs)


@specifiable
class AverageScore(ScoreAggregation):
"""Aggregates anomaly scores by calculating their average.

This `AggregationFn` computes the average of the anomaly scores from a
collection of `AnomalyPrediction` objects.

Args:
**kwargs: Additional keyword arguments to pass to the base
`ScoreAggregation` class.
"""
def __init__(self, **kwargs):
super().__init__(agg_func=statistics.mean, **kwargs)


@specifiable
class MaxScore(ScoreAggregation):
"""Aggregates anomaly scores by selecting the maximum score.

This `AggregationFn` selects the highest anomaly score from a collection
of `AnomalyPrediction` objects as the aggregated score.

Args:
**kwargs: Additional keyword arguments to pass to the base
`ScoreAggregation` class.
"""
def __init__(self, **kwargs):
super().__init__(agg_func=max, **kwargs)
Loading
Loading