diff --git a/CODEOWNERS b/CODEOWNERS index 9e0880913..76a0bae11 100644 --- a/CODEOWNERS +++ b/CODEOWNERS @@ -1,4 +1,2 @@ # Ignore changelog from codeowner requests docs/changelog.md - -docs/* @erinecon diff --git a/charmcraft.yaml b/charmcraft.yaml index 65640f83a..66392d364 100644 --- a/charmcraft.yaml +++ b/charmcraft.yaml @@ -56,6 +56,10 @@ requires: receive-ca-certs: interface: certificate_transfer description: Receive a CA certs for haproxy to trust. + spoe-auth: + interface: spoe-auth + description: Relation to provide authentication proxy to HAProxy. + limit: 1 provides: ingress: diff --git a/docs/changelog.md b/docs/changelog.md index 82a2e1817..69df63597 100644 --- a/docs/changelog.md +++ b/docs/changelog.md @@ -10,6 +10,10 @@ Each revision is versioned by the date of the revision. - Added GitHub workflow that checks whether a pull request contains a change artifact. +## 2025-11-13 + +- Added the `spoe-auth` library and requirer/provider class implementation. + ## 2025-11-12 - Updated the haproxy-route library to add the `allow_http` attribute. diff --git a/docs/release-notes/artifacts/pr0229.yaml b/docs/release-notes/artifacts/pr0229.yaml new file mode 100644 index 000000000..88d521d59 --- /dev/null +++ b/docs/release-notes/artifacts/pr0229.yaml @@ -0,0 +1,15 @@ +# --- Release notes change artifact ---- + +schema_version: 1 +changes: + - title: Add SPOE Auth interface library + author: tphan025 + type: major + description: | + Add the `charms.haproxy.v0.spoe_auth` library to enable SPOE authentication integration. + urls: + pr: https://github.com/canonical/haproxy-operator/pull/229 + related_doc: + related_issue: + visibility: public + highlight: false diff --git a/lib/charms/haproxy/v0/spoe_auth.py b/lib/charms/haproxy/v0/spoe_auth.py new file mode 100644 index 000000000..ba65c9d02 --- /dev/null +++ b/lib/charms/haproxy/v0/spoe_auth.py @@ -0,0 +1,549 @@ +# Copyright 2025 Canonical Ltd. +# See LICENSE file for licensing details. + +# pylint: disable=duplicate-code +"""SPOE-auth interface library. + +## Getting Started + +To get started using the library, you need to first declare the library in + the charm-libs section of your `charmcraft.yaml` file: +```yaml +charm-libs: +- lib: haproxy.spoe_auth + version: "0" +``` + +Then, fetch the library using `charmcraft`: + +```shell +cd some-charm +charmcraft fetch-libs +``` + +## Using the library as the Provider + +The provider charm should expose the interface as shown below: + +```yaml +provides: + spoe-auth: + interface: spoe-auth + limit: 1 +``` + +Then, to initialise the library: + +```python +from charms.haproxy.v0.spoe_auth import SpoeAuthProvider, HaproxyEvent + +class SpoeAuthCharm(CharmBase): + def __init__(self, *args): + super().__init__(*args) + self.spoe_auth = SpoeAuthProvider(self, relation_name="spoe-auth") + + self.framework.observe( + self.on.config_changed, self._on_config_changed + ) + + def _on_config_changed(self, event): + # Publish the SPOE auth configuration + self.spoe_auth.provide_spoe_auth_requirements( + spop_port=8081, + oidc_callback_port=5000, + event=HaproxyEvent.ON_HTTP_REQUEST, + var_authenticated="var.sess.is_authenticated", + var_redirect_url="var.sess.redirect_url", + cookie_name="auth_session", + hostname="auth.example.com", + oidc_callback_path="/oauth2/callback", + ) +``` +""" + +import json +import logging +import re +from collections.abc import MutableMapping +from enum import StrEnum +from typing import Annotated, cast + +from ops import CharmBase, RelationBrokenEvent +from ops.charm import CharmEvents +from ops.framework import EventBase, EventSource, Object +from ops.model import Relation +from pydantic import BaseModel, BeforeValidator, ConfigDict, Field, IPvAnyAddress, ValidationError + +logger = logging.getLogger(__name__) +SPOE_AUTH_DEFAULT_RELATION_NAME = "spoe-auth" +HAPROXY_CONFIG_INVALID_CHARACTERS = "\n\t#\\'\"\r$ " +# RFC-1034 and RFC-2181 compliance REGEX for validating FQDNs +HOSTNAME_REGEX = ( + r"^(?=.{1,253})(?!.*--.*)(?:(?!-)(?![0-9])[a-zA-Z0-9-]" + r"{1,63}(? str: + """Validate if value contains invalid haproxy config characters. + + Args: + value: The value to validate. + + Raises: + ValueError: When value contains invalid characters. + + Returns: + The validated value. + """ + if [char for char in value if char in HAPROXY_CONFIG_INVALID_CHARACTERS]: + raise ValueError(f"Relation data contains invalid character(s) {value}") + return value + + +def validate_hostname(value: str) -> str: + """Validate if value is a valid hostname per RFC 1123. + + Args: + value: The value to validate. + + Raises: + ValueError: When value is not a valid hostname. + + Returns: + The validated value. + """ + if not re.match(HOSTNAME_REGEX, value): + raise ValueError(f"Invalid hostname: {value}") + return value + + +VALIDSTR = Annotated[str, BeforeValidator(value_contains_invalid_characters)] + + +class DataValidationError(Exception): + """Raised when data validation fails.""" + + +class SpoeAuthInvalidRelationDataError(Exception): + """Raised when data validation of the spoe-auth relation fails.""" + + +class _DatabagModel(BaseModel): + """Base databag model. + + Attrs: + model_config: pydantic model configuration. + """ + + model_config = ConfigDict( + # tolerate additional keys in databag + extra="ignore", + # Allow instantiating this class by field name (instead of forcing alias). + populate_by_name=True, + ) # type: ignore + """Pydantic config.""" + + @classmethod + def load(cls, databag: MutableMapping[str, str]) -> "_DatabagModel": + """Load this model from a Juju json databag. + + Args: + databag: Databag content. + + Raises: + DataValidationError: When model validation failed. + + Returns: + _DatabagModel: The validated model. + """ + try: + data = { + k: json.loads(v) + for k, v in databag.items() + # Don't attempt to parse model-external values + if k in {(f.alias or n) for n, f in cls.model_fields.items()} + } + except json.JSONDecodeError as e: + msg = f"invalid databag contents: expecting json. {databag}" + logger.error(msg) + raise DataValidationError(msg) from e + + try: + return cls.model_validate_json(json.dumps(data)) + except ValidationError as e: + msg = f"failed to validate databag: {databag}" + logger.error(msg) + raise DataValidationError(msg) from e + + def dump( + self, databag: MutableMapping[str, str] | None = None, clear: bool = True + ) -> MutableMapping[str, str] | None: + """Write the contents of this model to Juju databag. + + Args: + databag: The databag to write to. + clear: Whether to clear the databag before writing. + + Returns: + MutableMapping: The databag. + """ + if clear and databag: + databag.clear() + + if databag is None: + databag = {} + + dct = self.model_dump(mode="json", by_alias=True) + databag.update({k: json.dumps(v) for k, v in dct.items()}) + return databag + + +class HaproxyEvent(StrEnum): + """Enumeration of HAProxy SPOE events. + + Attributes: + ON_FRONTEND_HTTP_REQUEST: Event triggered on frontend HTTP request. + """ + + ON_FRONTEND_HTTP_REQUEST = "on-frontend-http-request" + + +class SpoeAuthProviderAppData(_DatabagModel): + """Configuration model for SPOE authentication provider. + + Attributes: + spop_port: The port on the agent listening for SPOP. + oidc_callback_port: The port on the agent handling OIDC callbacks. + event: The event that triggers SPOE messages (e.g., on-http-request). + var_authenticated: Name of the variable set by the SPOE agent for auth status. + var_redirect_url: Name of the variable set by the SPOE agent for IDP redirect URL. + cookie_name: Name of the authentication cookie used by the SPOE agent. + oidc_callback_path: Path for OIDC callback. + oidc_callback_hostname: The hostname HAProxy should route OIDC callbacks to. + """ + + spop_port: int = Field( + description="The port on the agent listening for SPOP.", + gt=0, + le=65525, + ) + oidc_callback_port: int = Field( + description="The port on the agent handling OIDC callbacks.", + gt=0, + le=65525, + ) + event: HaproxyEvent = Field( + description="The event that triggers SPOE messages (e.g., on-http-request).", + ) + message_name: str = Field( + description="The name of the SPOE message that the provider expects." + ) + var_authenticated: VALIDSTR = Field( + description="Name of the variable set by the SPOE agent for auth status.", + ) + var_redirect_url: VALIDSTR = Field( + description="Name of the variable set by the SPOE agent for IDP redirect URL.", + ) + cookie_name: VALIDSTR = Field( + description="Name of the authentication cookie used by the SPOE agent.", + ) + oidc_callback_path: VALIDSTR = Field( + description="Path for OIDC callback.", default="/oauth2/callback" + ) + hostname: Annotated[str, BeforeValidator(validate_hostname)] = Field( + description="The hostname HAProxy should route OIDC callbacks to.", + ) + + +class SpoeAuthProviderUnitData(_DatabagModel): + """spoe-auth provider unit data. + + Attributes: + address: IP address of the unit. + """ + + address: IPvAnyAddress = Field(description="IP address of the unit.") + + +class SpoeAuthProvider(Object): + """SPOE auth interface provider implementation. + + Attributes: + relations: Related applications. + """ + + def __init__( + self, charm: CharmBase, relation_name: str = SPOE_AUTH_DEFAULT_RELATION_NAME + ) -> None: + """Initialize the SpoeAuthProvider. + + Args: + charm: The charm that is instantiating the library. + relation_name: The name of the relation to bind to. + """ + super().__init__(charm, relation_name) + self.charm = charm + self.relation_name = relation_name + + @property + def relations(self) -> list[Relation]: + """The list of Relation instances associated with this relation_name. + + Returns: + list[Relation]: The list of relations. + """ + return list(self.charm.model.relations[self.relation_name]) + + # pylint: disable=too-many-arguments,too-many-positional-arguments + def provide_spoe_auth_requirements( + self, + relation: Relation, + spop_port: int, + oidc_callback_port: int, + event: HaproxyEvent, + message_name: str, + var_authenticated: str, + var_redirect_url: str, + cookie_name: str, + hostname: str, + oidc_callback_path: str = "/oauth2/callback", + unit_address: str | None = None, + ) -> None: + """Set the SPOE auth configuration in the application databag. + + Args: + relation: The relation instance to set data on. + spop_port: The port on the agent listening for SPOP. + oidc_callback_port: The port on the agent handling OIDC callbacks. + event: The event that triggers SPOE messages. + message_name: The name of the SPOE message that the provider expects. + var_authenticated: Name of the variable for auth status. + var_redirect_url: Name of the variable for IDP redirect URL. + cookie_name: Name of the authentication cookie. + hostname: The hostname HAProxy should route OIDC callbacks to. + oidc_callback_path: Path for OIDC callback. + unit_address: The address of the unit. + + Raises: + DataValidationError: When validation of application data fails. + """ + if not self.charm.unit.is_leader(): + logger.warning("Only the leader unit can set the SPOE auth configuration.") + return + + try: + application_data = SpoeAuthProviderAppData( + spop_port=spop_port, + oidc_callback_port=oidc_callback_port, + event=event, + message_name=message_name, + var_authenticated=var_authenticated, + var_redirect_url=var_redirect_url, + cookie_name=cookie_name, + hostname=hostname, + oidc_callback_path=oidc_callback_path, + ) + unit_data = self._prepare_unit_data(unit_address) + except ValidationError as exc: + logger.error("Validation error when preparing provider relation data.") + raise DataValidationError( + "Validation error when preparing provider relation data." + ) from exc + + if self.charm.unit.is_leader(): + application_data.dump(relation.data[self.charm.app], clear=True) + unit_data.dump(relation.data[self.charm.unit], clear=True) + + def _prepare_unit_data(self, unit_address: str | None) -> SpoeAuthProviderUnitData: + """Prepare and validate unit data. + + Raises: + DataValidationError: When no address or unit IP is available. + + Returns: + RequirerUnitData: The validated unit data model. + """ + if not unit_address: + network_binding = self.charm.model.get_binding(self.relation_name) + if ( + network_binding is not None + and (bind_address := network_binding.network.bind_address) is not None + ): + unit_address = str(bind_address) + else: + logger.error("No unit IP available.") + raise DataValidationError("No unit IP available.") + return SpoeAuthProviderUnitData(address=cast("IPvAnyAddress", unit_address)) + + +class SpoeAuthAvailableEvent(EventBase): + """SpoeAuthAvailableEvent custom event.""" + + +class SpoeAuthRemovedEvent(EventBase): + """SpoeAuthRemovedEvent custom event.""" + + +class SpoeAuthRequirerEvents(CharmEvents): + """List of events that the SPOE auth requirer charm can leverage. + + Attributes: + available: Emitted when provider configuration is available. + removed: Emitted when the provider relation is broken. + """ + + available = EventSource(SpoeAuthAvailableEvent) + removed = EventSource(SpoeAuthRemovedEvent) + + +class SpoeAuthRequirer(Object): + """SPOE auth interface requirer implementation. + + Attributes: + on: Custom events of the requirer. + relation: The related application. + """ + + # Ignore this for pylance + on = SpoeAuthRequirerEvents() # type: ignore + + def __init__( + self, charm: CharmBase, relation_name: str = SPOE_AUTH_DEFAULT_RELATION_NAME + ) -> None: + """Initialize the SpoeAuthRequirer. + + Args: + charm: The charm that is instantiating the library. + relation_name: The name of the relation to bind to. + """ + super().__init__(charm, relation_name) + self.charm = charm + self.relation_name = relation_name + + self.framework.observe(self.charm.on[self.relation_name].relation_created, self._configure) + self.framework.observe(self.charm.on[self.relation_name].relation_changed, self._configure) + self.framework.observe( + self.charm.on[self.relation_name].relation_broken, self._on_relation_broken + ) + + @property + def relation(self) -> Relation | None: + """The relation instance associated with this relation_name. + + Returns: + Optional[Relation]: The relation instance, or None if not available. + """ + relations = self.charm.model.relations[self.relation_name] + return relations[0] if relations else None + + def _configure(self, _: EventBase) -> None: + """Handle relation changed events.""" + if self.is_available(): + self.on.available.emit() + + def _on_relation_broken(self, _: RelationBrokenEvent) -> None: + """Handle relation broken events.""" + self.on.removed.emit() + + def is_available(self) -> bool: + """Check if the SPOE auth configuration is available and valid. + + Returns: + bool: True if configuration is available and valid, False otherwise. + """ + if not self.relation: + return False + + if not self.relation.app: + return False + + try: + databag = self.relation.data[self.relation.app] + if not databag: + return False + SpoeAuthProviderAppData.load(databag) + return True + except (DataValidationError, KeyError): + return False + + def get_data(self) -> SpoeAuthProviderAppData | None: + """Get the SPOE auth configuration from the provider. + + Returns: + Optional[SpoeAuthProviderAppData]: The SPOE auth configuration, + or None if not available. + + Raises: + SpoeAuthInvalidRelationDataError: When configuration data is invalid. + """ + if not self.relation: + return None + + if not self.relation.app: + return None + + try: + databag = self.relation.data[self.relation.app] + if not databag: + return None + return SpoeAuthProviderAppData.load(databag) # type: ignore + except DataValidationError as exc: + logger.error( + "spoe-auth data validation failed for relation %s: %s", + self.relation, + str(exc), + ) + raise SpoeAuthInvalidRelationDataError( + f"spoe-auth data validation failed for relation: {self.relation}" + ) from exc + + def get_provider_unit_data(self, relation: Relation) -> list[SpoeAuthProviderUnitData]: + """Fetch and validate the requirer's units data. + + Args: + relation: The relation to fetch unit data from. + + Raises: + DataValidationError: When unit data validation fails. + + Returns: + list[SpoeAuthProviderUnitData]: List of validated unit data from the provider. + """ + requirer_units_data: list[SpoeAuthProviderUnitData] = [] + + for unit in relation.units: + databag = relation.data.get(unit) + if not databag: + logger.error( + "Requirer unit data does not exist even though the unit is still present." + ) + continue + try: + data = cast("SpoeAuthProviderUnitData", SpoeAuthProviderUnitData.load(databag)) + requirer_units_data.append(data) + except DataValidationError: + logger.error("Invalid requirer application data for %s", unit) + raise + return requirer_units_data + + def get_provider_application_data(self, relation: Relation) -> SpoeAuthProviderAppData: + """Fetch and validate the requirer's application databag. + + Args: + relation: The relation to fetch application data from. + + Raises: + DataValidationError: When requirer application data validation fails. + + Returns: + RequirerApplicationData: Validated application data from the requirer. + """ + try: + return cast( + "SpoeAuthProviderAppData", + SpoeAuthProviderAppData.load(relation.data[relation.app]), + ) + except DataValidationError: + logger.error("Invalid requirer application data for %s", relation.app.name) + raise diff --git a/tests/unit/test_spoe_auth_lib.py b/tests/unit/test_spoe_auth_lib.py new file mode 100644 index 000000000..4ba82de54 --- /dev/null +++ b/tests/unit/test_spoe_auth_lib.py @@ -0,0 +1,406 @@ +# Copyright 2025 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Unit tests for SPOE auth interface library.""" + +import json +import re +from typing import Any, cast + +import pytest +from charms.haproxy.v0.spoe_auth import ( + HOSTNAME_REGEX, + DataValidationError, + HaproxyEvent, + SpoeAuthProviderAppData, + SpoeAuthProviderUnitData, +) +from pydantic import IPvAnyAddress, ValidationError + +PLACEHOLDER_ADDRESS = "10.0.0.1" +PLACEHOLDER_SPOP_PORT = 8081 +PLACEHOLDER_OIDC_CALLBACK_PORT = 5000 +PLACEHOLDER_VAR_AUTHENTICATED = "var.sess.is_authenticated" +PLACEHOLDER_VAR_REDIRECT_URL = "var.sess.redirect_url" +PLACEHOLDER_COOKIE_NAME = "auth_session" +PLACEHOLDER_HOSTNAME = "auth.example.com" +PLACEHOLDER_OIDC_CALLBACK_PATH = "/oauth2/callback" + + +@pytest.fixture(name="mock_provider_app_data_dict") +def mock_provider_app_data_dict_fixture() -> dict[str, Any]: + """Create mock provider application data dictionary.""" + return { + "spop_port": PLACEHOLDER_SPOP_PORT, + "oidc_callback_port": PLACEHOLDER_OIDC_CALLBACK_PORT, + "event": "on-frontend-http-request", + "message_name": "try-auth-oidc", + "var_authenticated": PLACEHOLDER_VAR_AUTHENTICATED, + "var_redirect_url": PLACEHOLDER_VAR_REDIRECT_URL, + "cookie_name": PLACEHOLDER_COOKIE_NAME, + "oidc_callback_path": PLACEHOLDER_OIDC_CALLBACK_PATH, + "hostname": PLACEHOLDER_HOSTNAME, + } + + +@pytest.fixture(name="mock_provider_unit_data_dict") +def mock_provider_unit_data_dict_fixture() -> dict[str, str]: + """Create mock provider unit data dictionary.""" + return {"address": PLACEHOLDER_ADDRESS} + + +def test_spoe_auth_provider_app_data_validation(): + """ + arrange: Create a SpoeAuthProviderAppData model with valid data. + act: Validate the model. + assert: Model validation passes. + """ + data = SpoeAuthProviderAppData( + spop_port=PLACEHOLDER_SPOP_PORT, + oidc_callback_port=PLACEHOLDER_OIDC_CALLBACK_PORT, + event=HaproxyEvent.ON_FRONTEND_HTTP_REQUEST, + message_name="try-auth-oidc", + var_authenticated=PLACEHOLDER_VAR_AUTHENTICATED, + var_redirect_url=PLACEHOLDER_VAR_REDIRECT_URL, + cookie_name=PLACEHOLDER_COOKIE_NAME, + hostname=PLACEHOLDER_HOSTNAME, + oidc_callback_path=PLACEHOLDER_OIDC_CALLBACK_PATH, + ) + + assert data.spop_port == PLACEHOLDER_SPOP_PORT + assert data.oidc_callback_port == PLACEHOLDER_OIDC_CALLBACK_PORT + assert data.event == HaproxyEvent.ON_FRONTEND_HTTP_REQUEST + assert data.var_authenticated == PLACEHOLDER_VAR_AUTHENTICATED + assert data.var_redirect_url == PLACEHOLDER_VAR_REDIRECT_URL + assert data.cookie_name == PLACEHOLDER_COOKIE_NAME + assert data.hostname == PLACEHOLDER_HOSTNAME + assert data.oidc_callback_path == PLACEHOLDER_OIDC_CALLBACK_PATH + + +def test_spoe_auth_provider_app_data_default_callback_path(): + """Create SpoeAuthProviderAppData with default callback path. + + arrange: Create a SpoeAuthProviderAppData model without specifying oidc_callback_path. + act: Validate the model. + assert: Model validation passes with default callback path. + """ + data = SpoeAuthProviderAppData( + spop_port=PLACEHOLDER_SPOP_PORT, + oidc_callback_port=PLACEHOLDER_OIDC_CALLBACK_PORT, + event=HaproxyEvent.ON_FRONTEND_HTTP_REQUEST, + message_name="try-auth-oidc", + var_authenticated=PLACEHOLDER_VAR_AUTHENTICATED, + var_redirect_url=PLACEHOLDER_VAR_REDIRECT_URL, + cookie_name=PLACEHOLDER_COOKIE_NAME, + hostname=PLACEHOLDER_HOSTNAME, + oidc_callback_path="/oauth2/callback", # Explicitly set to the default value + ) + + assert data.oidc_callback_path == "/oauth2/callback" + + +@pytest.mark.parametrize("port", [0, 65526]) +def test_spoe_auth_provider_app_data_invalid_spop_port(port: int): + """ + arrange: Create a SpoeAuthProviderAppData model with spop_port set to 0. + act: Validate the model. + assert: Validation raises an error. + """ + with pytest.raises(ValidationError): + SpoeAuthProviderAppData( + spop_port=port, # Invalid: port must be > 0 and <= 65525 + oidc_callback_port=PLACEHOLDER_OIDC_CALLBACK_PORT, + event=HaproxyEvent.ON_FRONTEND_HTTP_REQUEST, + message_name="try-auth-oidc", + var_authenticated=PLACEHOLDER_VAR_AUTHENTICATED, + var_redirect_url=PLACEHOLDER_VAR_REDIRECT_URL, + cookie_name=PLACEHOLDER_COOKIE_NAME, + hostname=PLACEHOLDER_HOSTNAME, + ) + + +@pytest.mark.parametrize("port", [0, 65526]) +def test_spoe_auth_provider_app_data_invalid_oidc_callback_port(port: int): + """ + arrange: Create a SpoeAuthProviderAppData model with invalid oidc_callback_port. + act: Validate the model. + assert: Validation raises an error. + """ + with pytest.raises(ValidationError): + SpoeAuthProviderAppData( + spop_port=PLACEHOLDER_SPOP_PORT, + oidc_callback_port=port, # Invalid: port must be > 0 and <= 65525 + event=HaproxyEvent.ON_FRONTEND_HTTP_REQUEST, + message_name="try-auth-oidc", + var_authenticated=PLACEHOLDER_VAR_AUTHENTICATED, + var_redirect_url=PLACEHOLDER_VAR_REDIRECT_URL, + cookie_name=PLACEHOLDER_COOKIE_NAME, + hostname=PLACEHOLDER_HOSTNAME, + ) + + +def test_spoe_auth_provider_app_data_invalid_hostname_format(): + """ + arrange: Create a SpoeAuthProviderAppData model with invalid hostname format. + act: Validate the model. + assert: Validation raises an error. + """ + with pytest.raises(ValidationError): + SpoeAuthProviderAppData( + spop_port=PLACEHOLDER_SPOP_PORT, + oidc_callback_port=PLACEHOLDER_OIDC_CALLBACK_PORT, + event=HaproxyEvent.ON_FRONTEND_HTTP_REQUEST, + message_name="try-auth-oidc", + var_authenticated=PLACEHOLDER_VAR_AUTHENTICATED, + var_redirect_url=PLACEHOLDER_VAR_REDIRECT_URL, + cookie_name=PLACEHOLDER_COOKIE_NAME, + hostname="invalid-hostname-!@#", # Invalid: contains special chars + ) + + +def test_spoe_auth_provider_app_data_invalid_char_in_var_authenticated(): + """ + arrange: Create a SpoeAuthProviderAppData model with invalid characters in var_authenticated. + act: Validate the model. + assert: Validation raises an error. + """ + with pytest.raises(ValidationError): + SpoeAuthProviderAppData( + spop_port=PLACEHOLDER_SPOP_PORT, + oidc_callback_port=PLACEHOLDER_OIDC_CALLBACK_PORT, + event=HaproxyEvent.ON_FRONTEND_HTTP_REQUEST, + message_name="try-auth-oidc", + var_authenticated="invalid\nvar", # Invalid: newline character + var_redirect_url=PLACEHOLDER_VAR_REDIRECT_URL, + cookie_name=PLACEHOLDER_COOKIE_NAME, + hostname=PLACEHOLDER_HOSTNAME, + ) + + +def test_spoe_auth_provider_unit_data_validation(): + """ + arrange: Create a SpoeAuthProviderUnitData model with valid data. + act: Validate the model. + assert: Model validation passes. + """ + data = SpoeAuthProviderUnitData(address=cast("IPvAnyAddress", PLACEHOLDER_ADDRESS)) + + assert str(data.address) == PLACEHOLDER_ADDRESS + + +def test_spoe_auth_provider_unit_data_ipv6_validation(): + """ + arrange: Create a SpoeAuthProviderUnitData model with IPv6 address. + act: Validate the model. + assert: Model validation passes. + """ + ipv6_address = "2001:db8::1" + data = SpoeAuthProviderUnitData(address=cast("IPvAnyAddress", ipv6_address)) + + assert str(data.address) == ipv6_address + + +def test_spoe_auth_provider_unit_data_invalid_address(): + """ + arrange: Create a SpoeAuthProviderUnitData model with invalid IP address. + act: Validate the model. + assert: Validation raises an error. + """ + with pytest.raises(ValidationError): + SpoeAuthProviderUnitData(address=cast("IPvAnyAddress", "invalid-ip-address")) + + +def test_load_provider_app_data(mock_provider_app_data_dict: dict[str, Any]): + """ + arrange: Create a databag with valid provider application data. + act: Load the data with SpoeAuthProviderAppData.load(). + assert: Data is loaded correctly. + """ + databag = {k: json.dumps(v) for k, v in mock_provider_app_data_dict.items()} + data = cast("SpoeAuthProviderAppData", SpoeAuthProviderAppData.load(databag)) + + assert data.spop_port == PLACEHOLDER_SPOP_PORT + assert data.oidc_callback_port == PLACEHOLDER_OIDC_CALLBACK_PORT + assert data.event == HaproxyEvent.ON_FRONTEND_HTTP_REQUEST + assert data.var_authenticated == PLACEHOLDER_VAR_AUTHENTICATED + assert data.var_redirect_url == PLACEHOLDER_VAR_REDIRECT_URL + assert data.cookie_name == PLACEHOLDER_COOKIE_NAME + assert data.oidc_callback_path == PLACEHOLDER_OIDC_CALLBACK_PATH + assert data.hostname == PLACEHOLDER_HOSTNAME + assert data.message_name == "try-auth-oidc" + + +def test_load_provider_app_data_invalid_databag(): + """ + arrange: Create a databag with invalid JSON. + act: Load the data with SpoeAuthProviderAppData.load(). + assert: DataValidationError is raised. + """ + invalid_databag = { + "spop_port": "not-json", + } + with pytest.raises(DataValidationError): + SpoeAuthProviderAppData.load(invalid_databag) + + +def test_dump_provider_app_data(): + """Dump provider app data to databag. + + arrange: Create a SpoeAuthProviderAppData model with valid data. + act: Dump the model to a databag. + assert: Databag contains correct data. + """ + data = SpoeAuthProviderAppData( + spop_port=PLACEHOLDER_SPOP_PORT, + oidc_callback_port=PLACEHOLDER_OIDC_CALLBACK_PORT, + event=HaproxyEvent.ON_FRONTEND_HTTP_REQUEST, + message_name="try-auth-oidc", + var_authenticated=PLACEHOLDER_VAR_AUTHENTICATED, + var_redirect_url=PLACEHOLDER_VAR_REDIRECT_URL, + cookie_name=PLACEHOLDER_COOKIE_NAME, + hostname=PLACEHOLDER_HOSTNAME, + oidc_callback_path=PLACEHOLDER_OIDC_CALLBACK_PATH, + ) + + databag: dict[str, Any] = {} + result = data.dump(databag) + + assert result is not None + assert "spop_port" in databag + assert json.loads(databag["spop_port"]) == PLACEHOLDER_SPOP_PORT + assert json.loads(databag["oidc_callback_port"]) == PLACEHOLDER_OIDC_CALLBACK_PORT + assert json.loads(databag["event"]) == "on-frontend-http-request" + assert json.loads(databag["var_authenticated"]) == PLACEHOLDER_VAR_AUTHENTICATED + assert json.loads(databag["var_redirect_url"]) == PLACEHOLDER_VAR_REDIRECT_URL + assert json.loads(databag["cookie_name"]) == PLACEHOLDER_COOKIE_NAME + # oidc_callback_path should be included when explicitly set + if "oidc_callback_path" in databag: + assert json.loads(databag["oidc_callback_path"]) == PLACEHOLDER_OIDC_CALLBACK_PATH + assert json.loads(databag["hostname"]) == PLACEHOLDER_HOSTNAME + assert json.loads(databag["message_name"]) == "try-auth-oidc" + + +def test_dump_and_load_provider_app_data_roundtrip(): + """ + arrange: Create a SpoeAuthProviderAppData model. + act: Dump and then load it again. + assert: The loaded data matches the original. + """ + original_data = SpoeAuthProviderAppData( + spop_port=PLACEHOLDER_SPOP_PORT, + oidc_callback_port=PLACEHOLDER_OIDC_CALLBACK_PORT, + event=HaproxyEvent.ON_FRONTEND_HTTP_REQUEST, + message_name="try-auth-oidc", + var_authenticated=PLACEHOLDER_VAR_AUTHENTICATED, + var_redirect_url=PLACEHOLDER_VAR_REDIRECT_URL, + cookie_name=PLACEHOLDER_COOKIE_NAME, + hostname=PLACEHOLDER_HOSTNAME, + oidc_callback_path=PLACEHOLDER_OIDC_CALLBACK_PATH, + ) + + # Dump to databag + databag: dict[str, Any] = {} + original_data.dump(databag) + + # Load from databag + loaded_data = cast("SpoeAuthProviderAppData", SpoeAuthProviderAppData.load(databag)) + + assert loaded_data.spop_port == original_data.spop_port + assert loaded_data.oidc_callback_port == original_data.oidc_callback_port + assert loaded_data.event == original_data.event + assert loaded_data.var_authenticated == original_data.var_authenticated + assert loaded_data.var_redirect_url == original_data.var_redirect_url + assert loaded_data.cookie_name == original_data.cookie_name + assert loaded_data.hostname == original_data.hostname + assert loaded_data.oidc_callback_path == original_data.oidc_callback_path + + +def test_load_provider_unit_data(mock_provider_unit_data_dict: dict[str, str]): + """ + arrange: Create a databag with valid unit data. + act: Load the data with SpoeAuthProviderUnitData.load(). + assert: Data is loaded correctly. + """ + databag = {k: json.dumps(v) for k, v in mock_provider_unit_data_dict.items()} + data = cast("SpoeAuthProviderUnitData", SpoeAuthProviderUnitData.load(databag)) + + assert str(data.address) == PLACEHOLDER_ADDRESS + + +def test_dump_provider_unit_data(): + """ + arrange: Create a SpoeAuthProviderUnitData model with valid data. + act: Dump the model to a databag. + assert: Databag contains correct data. + """ + data = SpoeAuthProviderUnitData(address=cast("IPvAnyAddress", PLACEHOLDER_ADDRESS)) + + databag: dict[str, Any] = {} + result = data.dump(databag) + + assert result is not None + assert "address" in databag + assert json.loads(databag["address"]) == PLACEHOLDER_ADDRESS + + +def test_dump_and_load_provider_unit_data_roundtrip(): + """ + arrange: Create a SpoeAuthProviderUnitData model. + act: Dump and then load it again. + assert: The loaded data matches the original. + """ + original_data = SpoeAuthProviderUnitData(address=cast("IPvAnyAddress", PLACEHOLDER_ADDRESS)) + + # Dump to databag + databag: dict[str, Any] = {} + original_data.dump(databag) + + # Load from databag + loaded_data = cast("SpoeAuthProviderUnitData", SpoeAuthProviderUnitData.load(databag)) + + assert str(loaded_data.address) == str(original_data.address) + + +@pytest.mark.parametrize( + "hostname,is_valid", + [ + ("example.com", True), + ("sub.example.com", True), + ("test.sub.example.com", True), + ("a.b.c.d.e.f.g.example.com", True), + ("test-123.example.com", True), + ("a.example.com", True), + ("test.example-with-dash.com", True), + ("very-long-subdomain-name-that-is-still-valid.example.com", True), + ("x.y", True), + ("123test.example.com", False), # Must start with a letter + ("example", False), # No TLD + ("-example.com", False), # Starts with hyphen + ("example-.com", False), # Ends with hyphen + ("ex--ample.com", False), # Double hyphen + ("example..com", False), # Double dots + (".example.com", False), # Starts with dot + ("example.com.", False), # Ends with dot + ("example.", False), # Ends with dot after TLD + ("example..", False), # Multiple dots at end + ("", False), # Empty string + ("a" * 64 + ".com", False), # Label too long (>63 chars) + ("invalid-hostname-!@#.com", False), # Invalid characters + ("example with spaces.com", False), # Spaces not allowed + ("example\nnewline.com", False), # Newline not allowed + ("UPPERCASE.COM", True), # Should be valid (case insensitive) + ("mixed-Case.Example.COM", True), # Mixed case should be valid + ], +) +def test_hostname_regex_validation(hostname: str, is_valid: bool): + """Test HOSTNAME_REGEX validates FQDNs correctly. + + arrange: Test various hostname strings against HOSTNAME_REGEX. + act: Check if the hostname matches the regex pattern. + assert: The result matches the expected validity. + """ + match = re.match(HOSTNAME_REGEX, hostname) + if is_valid: + assert match is not None, f"Expected '{hostname}' to be valid but regex didn't match" + else: + assert match is None, f"Expected '{hostname}' to be invalid but regex matched"