diff --git a/providers/openfeature-provider-flagd/README.md b/providers/openfeature-provider-flagd/README.md index 8463700a..60d27fcf 100644 --- a/providers/openfeature-provider-flagd/README.md +++ b/providers/openfeature-provider-flagd/README.md @@ -1,6 +1,6 @@ # flagd Provider for OpenFeature -This provider is designed to use flagd's [evaluation protocol](https://github.com/open-feature/schemas/blob/main/protobuf/schema/v1/schema.proto). +This provider is designed to use flagd's [evaluation protocol](https://github.com/open-feature/schemas/blob/main/protobuf/schema/v1/schema.proto), or locally evaluate flags defined in a flagd [flag definition](https://github.com/open-feature/schemas/blob/main/json/flagd-definitions.json) via the OpenFeature Python SDK. ## Installation @@ -29,7 +29,9 @@ api.set_provider(FlagdProvider()) ### In-process resolver -This mode performs flag evaluations locally (in-process). +This mode performs flag evaluations locally (in-process). Flag configurations for evaluation are obtained via gRPC protocol using [sync protobuf schema](https://buf.build/open-feature/flagd/file/main:sync/v1/sync_service.proto) service definition. + +Consider the following example to create a `FlagdProvider` with in-process evaluations, ```python from openfeature import api @@ -38,10 +40,39 @@ from openfeature.contrib.provider.flagd.config import ResolverType api.set_provider(FlagdProvider( resolver_type=ResolverType.IN_PROCESS, +)) +``` + +In the above example, in-process handlers attempt to connect to a sync service on address `localhost:8013` to obtain [flag definitions](https://github.com/open-feature/schemas/blob/main/json/flags.json). + + +### File mode + +In-process resolvers can also work in an offline mode. +To enable this mode, you should provide a valid flag configuration file with the option `offlineFlagSourcePath`. + +```python +from openfeature import api +from openfeature.contrib.provider.flagd import FlagdProvider +from openfeature.contrib.provider.flagd.config import ResolverType + +api.set_provider(FlagdProvider( + resolver_type=ResolverType.FILE, offline_flag_source_path="my-flag.json", )) ``` +Provider will attempt to detect file changes using polling. +Polling happens at 5 second intervals and this is currently unconfigurable. +This mode is useful for local development, tests and offline applications. + ### Configuration options The default options can be defined in the FlagdProvider constructor. diff --git a/providers/openfeature-provider-flagd/openfeature/spec b/providers/openfeature-provider-flagd/openfeature/spec index a69f748d..95fe981d 160000 --- a/providers/openfeature-provider-flagd/openfeature/spec +++ b/providers/openfeature-provider-flagd/openfeature/spec @@ -1 +1 @@ -Subproject commit a69f748db2edfec7015ca6bb702ca22fd8c5ef30 +Subproject commit 95fe981d9e4c96f43ee57a3e7ca9e94b653a249e diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/config.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/config.py index d5829c2a..ac6134a7 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/config.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/config.py @@ -44,6 +44,7 @@ class CacheType(Enum): ENV_VAR_RETRY_BACKOFF_MS = "FLAGD_RETRY_BACKOFF_MS" ENV_VAR_RETRY_BACKOFF_MAX_MS = "FLAGD_RETRY_BACKOFF_MAX_MS" ENV_VAR_RETRY_GRACE_PERIOD_SECONDS = "FLAGD_RETRY_GRACE_PERIOD" +ENV_VAR_SELECTOR = "FLAGD_SOURCE_SELECTOR" ENV_VAR_STREAM_DEADLINE_MS = "FLAGD_STREAM_DEADLINE_MS" ENV_VAR_TLS = "FLAGD_TLS" ENV_VAR_TLS_CERT = "FLAGD_SERVER_CERT_PATH" @@ -79,6 +80,7 @@ def __init__( # noqa: PLR0913 host: typing.Optional[str] = None, port: typing.Optional[int] = None, tls: typing.Optional[bool] = None, + selector: typing.Optional[str] = None, resolver: typing.Optional[ResolverType] = None, offline_flag_source_path: typing.Optional[str] = None, offline_poll_interval_ms: typing.Optional[int] = None, @@ -221,3 +223,7 @@ def __init__( # noqa: PLR0913 if cert_path is None else cert_path ) + + self.selector = ( + env_or_default(ENV_VAR_SELECTOR, None) if selector is None else selector + ) diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py index 732bd413..83b03897 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py @@ -46,6 +46,7 @@ def __init__( # noqa: PLR0913 deadline_ms: typing.Optional[int] = None, timeout: typing.Optional[int] = None, retry_backoff_ms: typing.Optional[int] = None, + selector: typing.Optional[str] = None, resolver_type: typing.Optional[ResolverType] = None, offline_flag_source_path: typing.Optional[str] = None, stream_deadline_ms: typing.Optional[int] = None, @@ -86,6 +87,7 @@ def __init__( # noqa: PLR0913 retry_backoff_ms=retry_backoff_ms, retry_backoff_max_ms=retry_backoff_max_ms, retry_grace_period=retry_grace_period, + selector=selector, resolver=resolver_type, offline_flag_source_path=offline_flag_source_path, stream_deadline_ms=stream_deadline_ms, diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/__init__.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/__init__.py index 73923abb..f539de8f 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/__init__.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/__init__.py @@ -1,51 +1,5 @@ -import typing - -from openfeature.evaluation_context import EvaluationContext -from openfeature.flag_evaluation import FlagResolutionDetails - from .grpc import GrpcResolver from .in_process import InProcessResolver - - -class AbstractResolver(typing.Protocol): - def initialize(self, evaluation_context: EvaluationContext) -> None: ... - - def shutdown(self) -> None: ... - - def resolve_boolean_details( - self, - key: str, - default_value: bool, - evaluation_context: typing.Optional[EvaluationContext] = None, - ) -> FlagResolutionDetails[bool]: ... - - def resolve_string_details( - self, - key: str, - default_value: str, - evaluation_context: typing.Optional[EvaluationContext] = None, - ) -> FlagResolutionDetails[str]: ... - - def resolve_float_details( - self, - key: str, - default_value: float, - evaluation_context: typing.Optional[EvaluationContext] = None, - ) -> FlagResolutionDetails[float]: ... - - def resolve_integer_details( - self, - key: str, - default_value: int, - evaluation_context: typing.Optional[EvaluationContext] = None, - ) -> FlagResolutionDetails[int]: ... - - def resolve_object_details( - self, - key: str, - default_value: typing.Union[dict, list], - evaluation_context: typing.Optional[EvaluationContext] = None, - ) -> FlagResolutionDetails[typing.Union[dict, list]]: ... - +from .protocol import AbstractResolver __all__ = ["AbstractResolver", "GrpcResolver", "InProcessResolver"] diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/in_process.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/in_process.py index baa3a9a3..cacf37ec 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/in_process.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/in_process.py @@ -10,6 +10,7 @@ from ..config import Config from .process.connector import FlagStateConnector +from .process.connector.grpc_watcher import GrpcWatcher from .process.flags import FlagStore from .process.targeting import targeting @@ -28,13 +29,19 @@ def __init__( ], ): self.config = config - if not self.config.offline_flag_source_path: - raise ValueError( - "offline_flag_source_path must be provided when using in-process resolver" - ) self.flag_store = FlagStore(emit_provider_configuration_changed) - self.connector: FlagStateConnector = FileWatcher( - self.config, self.flag_store, emit_provider_ready, emit_provider_error + self.connector: FlagStateConnector = ( + FileWatcher( + self.config, self.flag_store, emit_provider_ready, emit_provider_error + ) + if self.config.offline_flag_source_path + else GrpcWatcher( + self.config, + self.flag_store, + emit_provider_ready, + emit_provider_error, + emit_provider_stale, + ) ) def initialize(self, evaluation_context: EvaluationContext) -> None: @@ -112,6 +119,7 @@ def _resolve( raise ParseError( "Parsed JSONLogic targeting did not return a string or bool" ) + variant, value = flag.get_variant(variant) if not value: raise ParseError(f"Resolved variant {variant} not in variants config.") diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/grpc_watcher.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/grpc_watcher.py new file mode 100644 index 00000000..138a5ddb --- /dev/null +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/grpc_watcher.py @@ -0,0 +1,192 @@ +import json +import logging +import threading +import time +import typing + +import grpc + +from openfeature.evaluation_context import EvaluationContext +from openfeature.event import ProviderEventDetails +from openfeature.exception import ErrorCode, ParseError, ProviderNotReadyError +from openfeature.schemas.protobuf.flagd.sync.v1 import ( + sync_pb2, + sync_pb2_grpc, +) + +from ....config import Config +from ..connector import FlagStateConnector +from ..flags import FlagStore + +logger = logging.getLogger("openfeature.contrib") + + +class GrpcWatcher(FlagStateConnector): + def __init__( + self, + config: Config, + flag_store: FlagStore, + emit_provider_ready: typing.Callable[[ProviderEventDetails], None], + emit_provider_error: typing.Callable[[ProviderEventDetails], None], + emit_provider_stale: typing.Callable[[ProviderEventDetails], None], + ): + self.flag_store = flag_store + self.config = config + + self.channel = self._generate_channel(config) + self.stub = sync_pb2_grpc.FlagSyncServiceStub(self.channel) + self.retry_backoff_seconds = config.retry_backoff_ms * 0.001 + self.retry_backoff_max_seconds = config.retry_backoff_ms * 0.001 + self.retry_grace_period = config.retry_grace_period + self.streamline_deadline_seconds = config.stream_deadline_ms * 0.001 + self.deadline = config.deadline_ms * 0.001 + self.selector = config.selector + self.emit_provider_ready = emit_provider_ready + self.emit_provider_error = emit_provider_error + self.emit_provider_stale = emit_provider_stale + + self.connected = False + self.thread: typing.Optional[threading.Thread] = None + self.timer: typing.Optional[threading.Timer] = None + + self.start_time = time.time() + + def _generate_channel(self, config: Config) -> grpc.Channel: + target = f"{config.host}:{config.port}" + # Create the channel with the service config + options = [ + ("grpc.keepalive_time_ms", config.keep_alive_time), + ("grpc.initial_reconnect_backoff_ms", config.retry_backoff_ms), + ("grpc.max_reconnect_backoff_ms", config.retry_backoff_max_ms), + ("grpc.min_reconnect_backoff_ms", config.stream_deadline_ms), + ] + if config.tls: + channel_args = { + "options": options, + "credentials": grpc.ssl_channel_credentials(), + } + if config.cert_path: + with open(config.cert_path, "rb") as f: + channel_args["credentials"] = grpc.ssl_channel_credentials(f.read()) + + channel = grpc.secure_channel(target, **channel_args) + + else: + channel = grpc.insecure_channel( + target, + options=options, + ) + + return channel + + def initialize(self, context: EvaluationContext) -> None: + self.connect() + + def connect(self) -> None: + self.active = True + + # Run monitoring in a separate thread + self.monitor_thread = threading.Thread( + target=self.monitor, daemon=True, name="FlagdGrpcSyncServiceMonitorThread" + ) + self.monitor_thread.start() + ## block until ready or deadline reached + timeout = self.deadline + time.time() + while not self.connected and time.time() < timeout: + time.sleep(0.05) + logger.debug("Finished blocking gRPC state initialization") + + if not self.connected: + raise ProviderNotReadyError( + "Blocking init finished before data synced. Consider increasing startup deadline to avoid inconsistent evaluations." + ) + + def monitor(self) -> None: + self.channel.subscribe(self._state_change_callback, try_to_connect=True) + + def _state_change_callback(self, new_state: grpc.ChannelConnectivity) -> None: + logger.debug(f"gRPC state change: {new_state}") + if new_state == grpc.ChannelConnectivity.READY: + if not self.thread or not self.thread.is_alive(): + self.thread = threading.Thread( + target=self.listen, + daemon=True, + name="FlagdGrpcSyncWorkerThread", + ) + self.thread.start() + + if self.timer and self.timer.is_alive(): + logger.debug("gRPC error timer expired") + self.timer.cancel() + + elif new_state == grpc.ChannelConnectivity.TRANSIENT_FAILURE: + # this is the failed reconnect attempt so we are going into stale + self.emit_provider_stale( + ProviderEventDetails( + message="gRPC sync disconnected, reconnecting", + ) + ) + self.start_time = time.time() + # adding a timer, so we can emit the error event after time + self.timer = threading.Timer(self.retry_grace_period, self.emit_error) + + logger.debug("gRPC error timer started") + self.timer.start() + self.connected = False + + def emit_error(self) -> None: + logger.debug("gRPC error emitted") + self.emit_provider_error( + ProviderEventDetails( + message="gRPC sync disconnected, reconnecting", + error_code=ErrorCode.GENERAL, + ) + ) + + def shutdown(self) -> None: + self.active = False + self.channel.close() + + def listen(self) -> None: + call_args = ( + {"timeout": self.streamline_deadline_seconds} + if self.streamline_deadline_seconds > 0 + else {} + ) + request_args = {"selector": self.selector} if self.selector is not None else {} + + while self.active: + try: + request = sync_pb2.SyncFlagsRequest(**request_args) + + logger.debug("Setting up gRPC sync flags connection") + for flag_rsp in self.stub.SyncFlags( + request, wait_for_ready=True, **call_args + ): + flag_str = flag_rsp.flag_configuration + logger.debug( + f"Received flag configuration - {abs(hash(flag_str)) % (10**8)}" + ) + self.flag_store.update(json.loads(flag_str)) + + if not self.connected: + self.emit_provider_ready( + ProviderEventDetails( + message="gRPC sync connection established" + ) + ) + self.connected = True + + if not self.active: + logger.debug("Terminating gRPC sync thread") + return + except grpc.RpcError as e: # noqa: PERF203 + logger.error(f"SyncFlags stream error, {e.code()=} {e.details()=}") + except json.JSONDecodeError: + logger.exception( + f"Could not parse JSON flag data from SyncFlags endpoint: {flag_str=}" + ) + except ParseError: + logger.exception( + f"Could not parse flag data using flagd syntax: {flag_str=}" + ) diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/flags.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/flags.py index 919fa8f2..d8f93b36 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/flags.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/flags.py @@ -68,11 +68,19 @@ def __post_init__(self) -> None: @classmethod def from_dict(cls, key: str, data: dict) -> "Flag": - data["default_variant"] = data["defaultVariant"] - del data["defaultVariant"] - flag = cls(key=key, **data) - - return flag + if "defaultVariant" in data: + data["default_variant"] = data["defaultVariant"] + del data["defaultVariant"] + + if "source" in data: + del data["source"] + if "selector" in data: + del data["selector"] + try: + flag = cls(key=key, **data) + return flag + except Exception as err: + raise ParseError from err @property def default(self) -> tuple[str, typing.Any]: diff --git a/providers/openfeature-provider-flagd/tests/conftest.py b/providers/openfeature-provider-flagd/tests/conftest.py index 287f5240..692b84b2 100644 --- a/providers/openfeature-provider-flagd/tests/conftest.py +++ b/providers/openfeature-provider-flagd/tests/conftest.py @@ -1,4 +1,5 @@ import os +from pathlib import Path import pytest @@ -8,13 +9,14 @@ @pytest.fixture() def flagd_provider_client(): - api.set_provider(FlagdProvider()) - return api.get_client() + provider = FlagdProvider() + api.set_provider(provider) + yield api.get_client() + provider.shutdown() def setup_flag_file(base_dir: str, flag_file: str) -> str: - with open(f"test-harness/flags/{flag_file}") as src_file: - contents = src_file.read() + contents = (Path(__file__).parent / "../test-harness/flags" / flag_file).read_text() dst_path = os.path.join(base_dir, flag_file) with open(dst_path, "w") as dst_file: dst_file.write(contents) diff --git a/providers/openfeature-provider-flagd/tests/e2e/inprocess/__init__.py b/providers/openfeature-provider-flagd/tests/e2e/inprocess/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/providers/openfeature-provider-flagd/tests/e2e/inprocess/conftest.py b/providers/openfeature-provider-flagd/tests/e2e/inprocess/conftest.py new file mode 100644 index 00000000..d16bf517 --- /dev/null +++ b/providers/openfeature-provider-flagd/tests/e2e/inprocess/conftest.py @@ -0,0 +1,19 @@ +import pytest + +from openfeature.contrib.provider.flagd.config import ResolverType +from tests.e2e.testfilter import TestFilter + +resolver = ResolverType.IN_PROCESS +feature_list = ["~targetURI", "~unixsocket"] + + +def pytest_collection_modifyitems(config, items): + test_filter = TestFilter( + config, feature_list=feature_list, resolver=resolver.value, base_path=__file__ + ) + test_filter.filter_items(items) + + +@pytest.fixture() +def resolver_type() -> ResolverType: + return resolver diff --git a/providers/openfeature-provider-flagd/tests/e2e/inprocess/test_flaqd.py b/providers/openfeature-provider-flagd/tests/e2e/inprocess/test_flaqd.py new file mode 100644 index 00000000..3feb4c52 --- /dev/null +++ b/providers/openfeature-provider-flagd/tests/e2e/inprocess/test_flaqd.py @@ -0,0 +1,5 @@ +from pytest_bdd import scenarios + +from tests.e2e.paths import TEST_HARNESS_PATH + +scenarios(f"{TEST_HARNESS_PATH}/gherkin") diff --git a/providers/openfeature-provider-flagd/tests/test_errors.py b/providers/openfeature-provider-flagd/tests/test_errors.py index cc053788..15ea2d8c 100644 --- a/providers/openfeature-provider-flagd/tests/test_errors.py +++ b/providers/openfeature-provider-flagd/tests/test_errors.py @@ -1,4 +1,5 @@ import os +import time import pytest @@ -6,6 +7,7 @@ from openfeature.contrib.provider.flagd import FlagdProvider from openfeature.contrib.provider.flagd.config import ResolverType from openfeature.evaluation_context import EvaluationContext +from openfeature.event import ProviderEvent from openfeature.exception import ErrorCode from openfeature.flag_evaluation import Reason @@ -84,3 +86,27 @@ def test_flag_disabled(): assert res.value == "fallback" assert res.reason == Reason.DISABLED + + +@pytest.mark.parametrize("wait", (500, 250)) +def test_grpc_sync_fail_deadline(wait: int): + init_failed = False + + def fail(*args, **kwargs): + nonlocal init_failed + init_failed = True + + api.get_client().add_handler(ProviderEvent.PROVIDER_ERROR, fail) + + t = time.time() + api.set_provider( + FlagdProvider( + resolver_type=ResolverType.IN_PROCESS, + port=99999, # dead port to test failure + deadline_ms=wait, + ) + ) + + elapsed = time.time() - t + assert abs(elapsed - wait * 0.001) < 0.11 + assert init_failed