Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"python-envs.defaultEnvManager": "ms-python.python:system",
"python-envs.defaultEnvManager": "ms-python.python:venv",
"python-envs.pythonProjects": [],
"python.analysis.typeCheckingMode": "strict",
"python.analysis.generateWithTypeAnnotation": true,
Expand Down
4 changes: 3 additions & 1 deletion base120.egg-info/SOURCES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ LICENSE
README.md
pyproject.toml
base120/__init__.py
base120/observability.py
base120.egg-info/PKG-INFO
base120.egg-info/SOURCES.txt
base120.egg-info/dependency_links.txt
base120.egg-info/requires.txt
base120.egg-info/top_level.txt
tests/test_corpus.py
tests/test_corpus.py
tests/test_observability.py
25 changes: 14 additions & 11 deletions base120/observability.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
Uses standard library only - no runtime dependencies.
"""

from typing import Any, Callable, Iterable, Mapping, Optional, TextIO, cast

import json
import sys
from datetime import datetime, timezone


def create_event_sink(output=None):
def create_event_sink(output: Optional[TextIO] = None) -> Callable[[Mapping[str, Any]], None]:
"""
Create a standard event sink that logs structured JSON events.

Expand All @@ -26,8 +28,9 @@ def create_event_sink(output=None):
"""
if output is None:
output = sys.stdout
output = cast(TextIO, output)

def sink(event):
def sink(event: Mapping[str, Any]) -> None:
try:
json.dump(event, output)
output.write('\n')
Expand All @@ -41,13 +44,13 @@ def sink(event):


def create_validator_event(
artifact_id,
schema_version,
result,
error_codes,
failure_mode_ids,
correlation_id=None
):
artifact_id: str,
schema_version: str,
result: str,
error_codes: Iterable[str],
failure_mode_ids: Iterable[str],
correlation_id: Optional[str] = None,
) -> Mapping[str, Any]:
"""
Create a validator_result event conforming to the observability schema.

Expand All @@ -62,12 +65,12 @@ def create_validator_event(
Returns:
Dict conforming to validator_result event schema
"""
event = {
event: dict[str, Any] = {
"event_type": "validator_result",
"artifact_id": artifact_id,
"schema_version": schema_version,
"result": result,
"error_codes": error_codes,
"error_codes": list(error_codes),
"failure_mode_ids": sorted(failure_mode_ids), # Ensure sorted for consistency
"timestamp": datetime.now(timezone.utc).isoformat()
}
Expand Down
16 changes: 10 additions & 6 deletions base120/validators/errors.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
def resolve_errors(fms: list[str], err_registry: list[dict]) -> list[str]:
from typing import Any, Mapping, Sequence


def resolve_errors(fms: list[str], err_registry: Sequence[Mapping[str, Any]]) -> list[str]:
# FM30 dominance: escalation suppresses all other errors
if "FM30" in fms:
return sorted(
entry["id"]
str(entry.get("id", ""))
for entry in err_registry
if "FM30" in entry["fm"]
if "FM30" in entry.get("fm", [])
)

errs = []
errs: list[str] = []
for entry in err_registry:
if any(fm in fms for fm in entry["fm"]):
errs.append(entry["id"])
fm_list = entry.get("fm", [])
if any(fm in fms for fm in fm_list):
errs.append(str(entry.get("id", "")))
return sorted(set(errs))
7 changes: 5 additions & 2 deletions base120/validators/mappings.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
def resolve_failure_modes(subclass: str, mappings: dict) -> list[str]:
return mappings.get("mappings", {}).get(subclass, [])
from typing import Mapping, Sequence


def resolve_failure_modes(subclass: str, mappings: Mapping[str, Mapping[str, Sequence[str]]]) -> list[str]:
return list(mappings.get("mappings", {}).get(subclass, []))
8 changes: 6 additions & 2 deletions base120/validators/schema.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
from typing import Any, Mapping

# pyright: reportMissingModuleSource=false
from jsonschema import Draft202012Validator

def validate_schema(artifact: dict, schema: dict) -> list[str]:

def validate_schema(artifact: Mapping[str, Any], schema: Mapping[str, Any]) -> list[str]:
validator = Draft202012Validator(schema)
errors = list(validator.iter_errors(artifact))
errors = list(validator.iter_errors(artifact)) # type: ignore[call-overload]
return ["ERR-SCHEMA-001"] if errors else []
27 changes: 17 additions & 10 deletions base120/validators/validate.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
from typing import Any, Callable, Mapping, MutableSequence, Optional, Sequence

from base120.validators.schema import validate_schema
from base120.validators.mappings import resolve_failure_modes
from base120.validators.errors import resolve_errors

def validate_artifact(
artifact: dict,
schema: dict,
mappings: dict,
err_registry: list[dict],
event_sink=None
artifact: Mapping[str, Any],
schema: Mapping[str, Any],
mappings: Mapping[str, Any],
err_registry: Sequence[Mapping[str, Any]],
event_sink: Optional[Callable[[Mapping[str, Any]], None]] = None,
) -> list[str]:

errs = []
fms = []
errs: MutableSequence[str] = []
fms: list[str] = []

# 1. Schema validation
errs.extend(validate_schema(artifact, schema))
Expand All @@ -22,7 +24,7 @@ def validate_artifact(
return sorted(set(errs))

# 2. Subclass → FM
subclass = artifact.get("class")
subclass = str(artifact.get("class", ""))
fms = resolve_failure_modes(subclass, mappings)

# 3. FM → ERR
Expand All @@ -34,7 +36,12 @@ def validate_artifact(
return sorted(set(errs))


def _emit_event(artifact, error_codes, failure_mode_ids, event_sink):
def _emit_event(
artifact: Mapping[str, Any],
error_codes: Sequence[str],
failure_mode_ids: Sequence[str],
event_sink: Optional[Callable[[Mapping[str, Any]], None]],
) -> None:
"""
Emit validator_result event if event_sink is provided.

Expand All @@ -54,7 +61,7 @@ def _emit_event(artifact, error_codes, failure_mode_ids, event_sink):
schema_version="v1.0.0",
result=result,
error_codes=sorted(set(error_codes)),
failure_mode_ids=failure_mode_ids
failure_mode_ids=list(failure_mode_ids),
)

event_sink(event)
Expand Down
2 changes: 1 addition & 1 deletion tests/test_corpus.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
with open(ROOT / "registries" / "err.json") as f:
ERR_REGISTRY = json.load(f)["registry"]

def load_json(path):
def load_json(path: str | Path):
with open(path) as f:
return json.load(f)

Expand Down
10 changes: 7 additions & 3 deletions tests/test_observability.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import json
from pathlib import Path
from io import StringIO
from typing import Any, Mapping

from base120.validators.validate import validate_artifact
from base120.observability import create_event_sink, create_validator_event
Expand Down Expand Up @@ -130,7 +131,7 @@ def test_backward_compatibility_without_event_sink():

def test_event_emission_failure_does_not_propagate():
"""Errors during event emission do not affect validation."""
def failing_sink(event):
def failing_sink(event: Mapping[str, Any]) -> None:
raise Exception("Event emission failed!")

artifact = {
Expand Down Expand Up @@ -159,13 +160,16 @@ def test_unknown_artifact_id():
"instance": "test",
"models": ["FM1"]
}

errors = validate_artifact(artifact, SCHEMA, MAPPINGS, ERR_REGISTRY, event_sink=event_sink)

assert errors == ["ERR-SCHEMA-001"], "Missing id should fail schema validation"

output.seek(0)
event = json.loads(output.read().strip())

assert event["artifact_id"] == "unknown"
assert event["result"] == "failure"
assert "ERR-SCHEMA-001" in event["error_codes"]


def test_create_validator_event_with_correlation_id():
Expand Down