Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion interfaces/otlp/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `gRPC` is favoured over `HTTP` when multiple endpoints are available
- Support for publishing LogQL and PromQL rules via `OtlpRequirer.publish()`
- LZMA+base64 compression of rules in the requirer databag to avoid Juju databag size limits
- `OtlpProvider.rules()` for fetching rules from all requirer relations with injected Juju topology and validation
- `OtlpProvider.rules()` for fetching rules from all requirer relations with injected Juju topology and validation
- Generic aggregator rules automatically included in every requirer's published rule set
- Python 3.10+ compatibility

Expand All @@ -26,3 +26,4 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Updated

- Replace the requirer's rule path interface with an interface accepting an object containing rules
- Generic PromQL alert rules for requirer charms that are of the aggregator or application type
7 changes: 7 additions & 0 deletions interfaces/otlp/src/charmlibs/interfaces/otlp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,13 @@ def _publish_rules(self, _: ops.EventBase):
)
OtlpRequirer(self, rules=rules).publish()

Generic rules are sourced from `cosl.rules.generic_alert_groups <https://github.com/canonical/cos-lib/blob/main/src/cosl/rules.py>`_.
If the charm is an aggregator e.g., opentelemetry-collector, the type of generic rules to be
injected into the charm's RuleStore should reflect that. This is configurable by setting the
``aggregator_peer_relation_name`` with the name of the charm's peer relation::

OtlpRequirer(..., aggregator_peer_relation_name="my-peers").publish()

Relation Data Format
====================

Expand Down
138 changes: 129 additions & 9 deletions interfaces/otlp/src/charmlibs/interfaces/otlp/_otlp.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,21 @@
import copy
import json
import logging
import re
from collections import OrderedDict
from collections.abc import Sequence
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Final, Literal

from cosl.juju_topology import JujuTopology
from cosl.rules import InjectResult, Rules, generic_alert_groups
from cosl.types import OfficialRuleFileFormat
from cosl.rules import (
HOST_METRICS_MISSING_RULE_NAME,
InjectResult,
Rules,
generic_alert_groups,
)
from cosl.types import OfficialRuleFileFormat, SingleRuleFormat
from cosl.utils import LZMABase64
from ops import CharmBase
from pydantic import (
Expand Down Expand Up @@ -64,29 +70,63 @@ def __post_init__(self):

def add_logql(
self,
rule_dict: dict[str, Any],
rule_dict: OfficialRuleFileFormat | SingleRuleFormat,
*,
group_name: str | None = None,
group_name_prefix: str | None = None,
) -> 'RuleStore':
"""Add rules from dict to the existing LogQL ruleset.

Args:
rule_dict: a single-rule or official-rule YAML dict
group_name: a custom group name, used only if the new rule is of single-rule format
group_name_prefix: a custom group name prefix, used only if the new rule is of
single-rule format
"""
self.logql.add(rule_dict, group_name=group_name, group_name_prefix=group_name_prefix)
return self

def add_logql_path(self, dir_path: str | Path, *, recursive: bool = False) -> 'RuleStore':
"""Add LogQL rules from a dir path.

All rules from files are aggregated into a data structure representing a single rule file.
All group names are augmented with juju topology.

Args:
dir_path: either a rules file or a dir of rules files.
recursive: whether to read files recursively or not (no impact if `path` is a file).
"""
self.logql.add_path(dir_path, recursive=recursive)
return self

def add_promql(
self,
rule_dict: dict[str, Any],
rule_dict: OfficialRuleFileFormat | SingleRuleFormat,
*,
group_name: str | None = None,
group_name_prefix: str | None = None,
) -> 'RuleStore':
"""Add rules from dict to the existing PromQL ruleset.

Args:
rule_dict: a single-rule or official-rule YAML dict
group_name: a custom group name, used only if the new rule is of single-rule format
group_name_prefix: a custom group name prefix, used only if the new rule is of
single-rule format
"""
self.promql.add(rule_dict, group_name=group_name, group_name_prefix=group_name_prefix)
return self

def add_promql_path(self, dir_path: str | Path, *, recursive: bool = False) -> 'RuleStore':
"""Add PromQL rules from a dir path.

All rules from files are aggregated into a data structure representing a single rule file.
All group names are augmented with juju topology.

Args:
dir_path: either a rules file or a dir of rules files.
recursive: whether to read files recursively or not (no impact if `path` is a file).
"""
self.promql.add_path(dir_path, recursive=recursive)
return self

Expand Down Expand Up @@ -174,6 +214,10 @@ class OtlpRequirer:
endpoints.
telemetries: The telemetries to filter for in the provider's OTLP
endpoints.
aggregator_peer_relation_name: Name of the peers relation of this
charm. This should only be set IFF the charm is an aggregator AND
it has a peer relation with this name. When provided, generic
aggregator rules are used instead of application-level rules.
rules: Rules of different types e.g., logql or promql, that the
requirer will publish for the provider.
"""
Expand All @@ -185,6 +229,7 @@ def __init__(
protocols: Sequence[Literal['http', 'grpc']] | None = None,
telemetries: Sequence[Literal['logs', 'metrics', 'traces']] | None = None,
*,
aggregator_peer_relation_name: str | None = None,
rules: RuleStore | None = None,
):
self._charm = charm
Expand All @@ -196,6 +241,7 @@ def __init__(
self._telemetries: list[Literal['logs', 'metrics', 'traces']] = (
list(telemetries) if telemetries is not None else []
)
self._aggregator_peer_relation_name = aggregator_peer_relation_name
self._rules = rules if rules is not None else RuleStore(self._topology)

def _filter_endpoints(self, endpoints: list[_OtlpEndpoint]) -> list[_OtlpEndpoint]:
Expand Down Expand Up @@ -235,6 +281,80 @@ def _favor_modern_endpoints(self, endpoints: list[_OtlpEndpoint]) -> _OtlpEndpoi
modern_score: Final = {'grpc': 2, 'http': 1}
return max(endpoints, key=lambda e: modern_score.get(e.protocol, 0))

def _duplicate_rules_per_unit(
self,
alert_rules: OfficialRuleFileFormat,
rule_names_to_duplicate: list[str],
peer_unit_names: set[str],
is_subordinate: bool = False,
) -> OfficialRuleFileFormat:
"""Duplicate alert rule per unit in peer_units list.

Args:
alert_rules: A dictionary of rules in OfficialRuleFileFormat.
rule_names_to_duplicate: A list of rule names to be duplicated.
peer_unit_names: A set of charm unit names to duplicate rules for.
is_subordinate: A boolean denoting whether the charm duplicating
alert rules is a subordinate or not. If yes, the severity of
the alerts in duplicate_keys needs to be set to critical.

Returns:
The updated rules with those specified in rule_names_to_duplicate,
duplicated per unit in OfficialRuleFileFormat.
"""
updated_alert_rules = copy.deepcopy(alert_rules)
for group in updated_alert_rules.get('groups', {}):
new_rules: list[SingleRuleFormat] = []
for rule in group.get('rules', []):
if rule.get('alert', '') not in rule_names_to_duplicate:
new_rules.append(rule)
else:
for juju_unit in sorted(peer_unit_names):
rule_copy = copy.deepcopy(rule)
rule_copy.get('labels', {})['juju_unit'] = juju_unit
rule_copy['expr'] = self._rules.promql.tool.inject_label_matchers(
expression=re.sub(r'%%juju_unit%%,?', '', rule_copy['expr']),
topology={'juju_unit': juju_unit},
)
# If the charm is a subordinate, the severity of the alerts need to be
# bumped to critical.
rule_copy.get('labels', {})['severity'] = (
'critical' if is_subordinate else 'warning'
)
new_rules.append(rule_copy)
group['rules'] = new_rules
return updated_alert_rules

def _inject_generic_rules(self):
"""Inject generic rules into the charm's RuleStore."""
if self._aggregator_peer_relation_name:
if not (
peer_relations := self._charm.model.get_relation(
self._aggregator_peer_relation_name
)
):
logger.warning(
'Generic aggregator rules were requested, but no peer relation was found. '
'Ensure this charm has a peer relation named "%s" to use generic aggregator '
'rules.',
self._aggregator_peer_relation_name,
)
unit_names: set[str] = {self._charm.unit.name}
if peer_relations:
unit_names |= {unit.name for unit in peer_relations.units}
agg_rules = self._duplicate_rules_per_unit(
generic_alert_groups.aggregator_rules,
rule_names_to_duplicate=[HOST_METRICS_MISSING_RULE_NAME],
peer_unit_names=unit_names,
is_subordinate=self._charm.meta.subordinate,
)
self._rules.add_promql(agg_rules, group_name_prefix=self._topology.identifier)
else:
self._rules.add_promql(
generic_alert_groups.application_rules,
group_name_prefix=self._topology.identifier,
)

def publish(self):
"""Triggers programmatically the update of the relation data.

Expand All @@ -246,10 +366,8 @@ def publish(self):
# Only the leader unit can write to app data.
return

self._rules.add_promql(
copy.deepcopy(generic_alert_groups.aggregator_rules),
group_name_prefix=self._topology.identifier,
)
# Add generic rules
self._inject_generic_rules()

# Publish to databag
databag = _OtlpRequirerAppData.model_validate({
Expand Down Expand Up @@ -363,7 +481,9 @@ def rules(self, query_type: Literal['logql', 'promql']) -> dict[str, OfficialRul
continue

# Get rules for the desired query type
rules_for_type: dict[str, Any] | None = getattr(requirer.rules, query_type, None)
rules_for_type: OfficialRuleFileFormat | None = getattr(
requirer.rules, query_type, None
)
if not rules_for_type:
continue

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.

__version__ = '0.1.0'
__version__ = '0.2.0'
88 changes: 54 additions & 34 deletions interfaces/otlp/tests/unit/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@
import logging
import socket
from copy import deepcopy
from typing import Final, Literal
from typing import Literal
from unittest.mock import patch

import ops
import pytest
from cosl.juju_topology import JujuTopology
from cosl.types import AlertingRuleFormat, OfficialRuleFileFormat, RecordingRuleFormat
from ops import testing
from ops.charm import CharmBase

Expand All @@ -33,53 +34,56 @@

logger = logging.getLogger(__name__)

PEERS_ENDPOINT = 'my-peers'
LOKI_RULES_DEST_PATH = 'loki_alert_rules'
METRICS_RULES_DEST_PATH = 'prometheus_alert_rules'
SINGLE_LOGQL_ALERT: Final = {
'alert': 'HighLogVolume',
'expr': 'count_over_time({job=~".+"}[30s]) > 100',
'labels': {'severity': 'high'},
}
SINGLE_LOGQL_RECORD: Final = {
'record': 'log:error_rate:rate5m',
'expr': 'sum by (service) (rate({job=~".+"} | json | level="error" [5m]))',
'labels': {'severity': 'high'},
}
SINGLE_PROMQL_ALERT: Final = {
'alert': 'Workload Missing',
'expr': 'up{job=~".+"} == 0',
'for': '0m',
'labels': {'severity': 'critical'},
}
SINGLE_PROMQL_RECORD: Final = {
'record': 'code:prometheus_http_requests_total:sum',
'expr': 'sum by (code) (prometheus_http_requests_total{job=~".+"})',
'labels': {'severity': 'high'},
}
OFFICIAL_LOGQL_RULES: Final = {
'groups': [
SINGLE_LOGQL_ALERT = AlertingRuleFormat(
alert='HighLogVolume',
expr='count_over_time({job=~".+"}[30s]) > 100',
labels={'severity': 'high'},
)
SINGLE_LOGQL_RECORD = RecordingRuleFormat(
record='log:error_rate:rate5m',
expr='sum by (service) (rate({job=~".+"} | json | level="error" [5m]))',
labels={'severity': 'high'},
)
SINGLE_PROMQL_ALERT = AlertingRuleFormat(
alert='Workload Missing',
expr='up{job=~".+"} == 0',
for_='0m',
labels={'severity': 'critical'},
)
SINGLE_PROMQL_RECORD = RecordingRuleFormat(
record='code:prometheus_http_requests_total:sum',
expr='sum by (code) (prometheus_http_requests_total{job=~".+"})',
labels={'severity': 'high'},
)
OFFICIAL_LOGQL_RULES = OfficialRuleFileFormat(
groups=[
{
'name': 'test_logql',
'rules': [SINGLE_LOGQL_ALERT, SINGLE_LOGQL_RECORD],
},
]
}
OFFICIAL_PROMQL_RULES: Final = {
'groups': [
)
OFFICIAL_PROMQL_RULES = OfficialRuleFileFormat(
groups=[
{
'name': 'test_promql',
'rules': [SINGLE_PROMQL_ALERT, SINGLE_PROMQL_RECORD],
},
]
}
ALL_PROTOCOLS: Final[list[Literal['grpc', 'http']]] = ['grpc', 'http']
ALL_TELEMETRIES: Final[list[Literal['logs', 'metrics', 'traces']]] = ['logs', 'metrics', 'traces']
)
ALL_PROTOCOLS: list[Literal['grpc', 'http']] = ['grpc', 'http']
ALL_TELEMETRIES: list[Literal['logs', 'metrics', 'traces']] = ['logs', 'metrics', 'traces']


# --- Tester charms ---


class OtlpRequirerCharm(CharmBase):
_aggregator_peer_relation_name: str | None = None

def __init__(self, framework: ops.Framework):
super().__init__(framework)
self.framework.observe(self.on.update_status, self._publish_rules)
Expand All @@ -96,7 +100,11 @@ def _publish_rules(self, _: ops.EventBase) -> None:
.add_promql(deepcopy(OFFICIAL_PROMQL_RULES))
)
OtlpRequirer(
self, protocols=ALL_PROTOCOLS, telemetries=ALL_TELEMETRIES, rules=rules
self,
protocols=ALL_PROTOCOLS,
telemetries=ALL_TELEMETRIES,
aggregator_peer_relation_name=self._aggregator_peer_relation_name,
rules=rules,
).publish()


Expand All @@ -121,9 +129,21 @@ def mock_hostname():


@pytest.fixture
def otlp_requirer_ctx() -> testing.Context[OtlpRequirerCharm]:
meta = {'name': 'otlp-requirer', 'requires': {'send-otlp': {'interface': 'otlp'}}}
return testing.Context(OtlpRequirerCharm, meta=meta)
def otlp_requirer_ctx(request: pytest.FixtureRequest) -> testing.Context[OtlpRequirerCharm]:
meta = {
'name': 'otlp-requirer',
'requires': {'send-otlp': {'interface': 'otlp'}},
'peers': {PEERS_ENDPOINT: {'interface': 'aggregator_peers'}},
}
# We want to be able to test generic aggregator rules injection and the application rules
# injection case, which is toggled by an aggregator peer relation name input.
generic_aggregator_rules: bool = getattr(request, 'param', False)
charm_cls = type(
'OtlpRequirerCharm',
(OtlpRequirerCharm,),
{'_aggregator_peer_relation_name': PEERS_ENDPOINT if generic_aggregator_rules else None},
)
return testing.Context(charm_cls, meta=meta)


@pytest.fixture
Expand Down
Loading
Loading