Skip to content

feat(flagd): Add in-process evaluator #104

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 33 additions & 2 deletions providers/openfeature-provider-flagd/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# flagd Provider for OpenFeature

This provider is designed to use flagd's [evaluation protocol](https://github.com/open-feature/schemas/blob/main/protobuf/schema/v1/schema.proto).
This provider is designed to use flagd's [evaluation protocol](https://github.com/open-feature/schemas/blob/main/protobuf/schema/v1/schema.proto), or locally evaluate flags defined in a flagd [flag definition](https://github.com/open-feature/schemas/blob/main/json/flagd-definitions.json) via the OpenFeature Python SDK.

## Installation

Expand Down Expand Up @@ -29,7 +29,9 @@ api.set_provider(FlagdProvider())

### In-process resolver

This mode performs flag evaluations locally (in-process).
This mode performs flag evaluations locally (in-process). Flag configurations for evaluation are obtained via gRPC protocol using [sync protobuf schema](https://buf.build/open-feature/flagd/file/main:sync/v1/sync_service.proto) service definition.

Consider the following example to create a `FlagdProvider` with in-process evaluations,

```python
from openfeature import api
Expand All @@ -38,10 +40,39 @@ from openfeature.contrib.provider.flagd.config import ResolverType

api.set_provider(FlagdProvider(
resolver_type=ResolverType.IN_PROCESS,
))
```

In the above example, in-process handlers attempt to connect to a sync service on address `localhost:8013` to obtain [flag definitions](https://github.com/open-feature/schemas/blob/main/json/flags.json).

<!--
#### Sync-metadata

To support the injection of contextual data configured in flagd for in-process evaluation, the provider exposes a `getSyncMetadata` accessor which provides the most recent value returned by the [GetMetadata RPC](https://buf.build/open-feature/flagd/docs/main:flagd.sync.v1#flagd.sync.v1.FlagSyncService.GetMetadata).
The value is updated with every (re)connection to the sync implementation.
This can be used to enrich evaluations with such data.
If the `in-process` mode is not used, and before the provider is ready, the `getSyncMetadata` returns an empty map.
-->
### File mode

In-process resolvers can also work in an offline mode.
To enable this mode, you should provide a valid flag configuration file with the option `offlineFlagSourcePath`.

```python
from openfeature import api
from openfeature.contrib.provider.flagd import FlagdProvider
from openfeature.contrib.provider.flagd.config import ResolverType

api.set_provider(FlagdProvider(
resolver_type=ResolverType.FILE,
offline_flag_source_path="my-flag.json",
))
```

Provider will attempt to detect file changes using polling.
Polling happens at 5 second intervals and this is currently unconfigurable.
This mode is useful for local development, tests and offline applications.

### Configuration options

The default options can be defined in the FlagdProvider constructor.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class CacheType(Enum):
ENV_VAR_RETRY_BACKOFF_MS = "FLAGD_RETRY_BACKOFF_MS"
ENV_VAR_RETRY_BACKOFF_MAX_MS = "FLAGD_RETRY_BACKOFF_MAX_MS"
ENV_VAR_RETRY_GRACE_PERIOD_SECONDS = "FLAGD_RETRY_GRACE_PERIOD"
ENV_VAR_SELECTOR = "FLAGD_SOURCE_SELECTOR"
ENV_VAR_STREAM_DEADLINE_MS = "FLAGD_STREAM_DEADLINE_MS"
ENV_VAR_TLS = "FLAGD_TLS"
ENV_VAR_TLS_CERT = "FLAGD_SERVER_CERT_PATH"
Expand Down Expand Up @@ -79,6 +80,7 @@ def __init__( # noqa: PLR0913
host: typing.Optional[str] = None,
port: typing.Optional[int] = None,
tls: typing.Optional[bool] = None,
selector: typing.Optional[str] = None,
resolver: typing.Optional[ResolverType] = None,
offline_flag_source_path: typing.Optional[str] = None,
offline_poll_interval_ms: typing.Optional[int] = None,
Expand Down Expand Up @@ -221,3 +223,7 @@ def __init__( # noqa: PLR0913
if cert_path is None
else cert_path
)

self.selector = (
env_or_default(ENV_VAR_SELECTOR, None) if selector is None else selector
)
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def __init__( # noqa: PLR0913
deadline_ms: typing.Optional[int] = None,
timeout: typing.Optional[int] = None,
retry_backoff_ms: typing.Optional[int] = None,
selector: typing.Optional[str] = None,
resolver_type: typing.Optional[ResolverType] = None,
offline_flag_source_path: typing.Optional[str] = None,
stream_deadline_ms: typing.Optional[int] = None,
Expand Down Expand Up @@ -86,6 +87,7 @@ def __init__( # noqa: PLR0913
retry_backoff_ms=retry_backoff_ms,
retry_backoff_max_ms=retry_backoff_max_ms,
retry_grace_period=retry_grace_period,
selector=selector,
resolver=resolver_type,
offline_flag_source_path=offline_flag_source_path,
stream_deadline_ms=stream_deadline_ms,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,51 +1,5 @@
import typing

from openfeature.evaluation_context import EvaluationContext
from openfeature.flag_evaluation import FlagResolutionDetails

from .grpc import GrpcResolver
from .in_process import InProcessResolver


class AbstractResolver(typing.Protocol):
def initialize(self, evaluation_context: EvaluationContext) -> None: ...

def shutdown(self) -> None: ...

def resolve_boolean_details(
self,
key: str,
default_value: bool,
evaluation_context: typing.Optional[EvaluationContext] = None,
) -> FlagResolutionDetails[bool]: ...

def resolve_string_details(
self,
key: str,
default_value: str,
evaluation_context: typing.Optional[EvaluationContext] = None,
) -> FlagResolutionDetails[str]: ...

def resolve_float_details(
self,
key: str,
default_value: float,
evaluation_context: typing.Optional[EvaluationContext] = None,
) -> FlagResolutionDetails[float]: ...

def resolve_integer_details(
self,
key: str,
default_value: int,
evaluation_context: typing.Optional[EvaluationContext] = None,
) -> FlagResolutionDetails[int]: ...

def resolve_object_details(
self,
key: str,
default_value: typing.Union[dict, list],
evaluation_context: typing.Optional[EvaluationContext] = None,
) -> FlagResolutionDetails[typing.Union[dict, list]]: ...

from .protocol import AbstractResolver

__all__ = ["AbstractResolver", "GrpcResolver", "InProcessResolver"]
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from ..config import Config
from .process.connector import FlagStateConnector
from .process.connector.grpc_watcher import GrpcWatcher
from .process.flags import FlagStore
from .process.targeting import targeting

Expand All @@ -28,13 +29,19 @@ def __init__(
],
):
self.config = config
if not self.config.offline_flag_source_path:
raise ValueError(
"offline_flag_source_path must be provided when using in-process resolver"
)
self.flag_store = FlagStore(emit_provider_configuration_changed)
self.connector: FlagStateConnector = FileWatcher(
self.config, self.flag_store, emit_provider_ready, emit_provider_error
self.connector: FlagStateConnector = (
FileWatcher(
self.config, self.flag_store, emit_provider_ready, emit_provider_error
)
if self.config.offline_flag_source_path
else GrpcWatcher(
self.config,
self.flag_store,
emit_provider_ready,
emit_provider_error,
emit_provider_stale,
)
)

def initialize(self, evaluation_context: EvaluationContext) -> None:
Expand Down Expand Up @@ -112,6 +119,7 @@ def _resolve(
raise ParseError(
"Parsed JSONLogic targeting did not return a string or bool"
)

variant, value = flag.get_variant(variant)
if not value:
raise ParseError(f"Resolved variant {variant} not in variants config.")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
import json
import logging
import threading
import time
import typing

import grpc

from openfeature.evaluation_context import EvaluationContext
from openfeature.event import ProviderEventDetails
from openfeature.exception import ErrorCode, ParseError, ProviderNotReadyError
from openfeature.schemas.protobuf.flagd.sync.v1 import (
sync_pb2,
sync_pb2_grpc,
)

from ....config import Config
from ..connector import FlagStateConnector
from ..flags import FlagStore

logger = logging.getLogger("openfeature.contrib")


class GrpcWatcher(FlagStateConnector):
def __init__(
self,
config: Config,
flag_store: FlagStore,
emit_provider_ready: typing.Callable[[ProviderEventDetails], None],
emit_provider_error: typing.Callable[[ProviderEventDetails], None],
emit_provider_stale: typing.Callable[[ProviderEventDetails], None],
):
self.flag_store = flag_store
self.config = config

self.channel = self._generate_channel(config)
self.stub = sync_pb2_grpc.FlagSyncServiceStub(self.channel)
self.retry_backoff_seconds = config.retry_backoff_ms * 0.001
self.retry_backoff_max_seconds = config.retry_backoff_ms * 0.001
self.retry_grace_period = config.retry_grace_period
self.streamline_deadline_seconds = config.stream_deadline_ms * 0.001
self.deadline = config.deadline_ms * 0.001
self.selector = config.selector
self.emit_provider_ready = emit_provider_ready
self.emit_provider_error = emit_provider_error
self.emit_provider_stale = emit_provider_stale

self.connected = False
self.thread: typing.Optional[threading.Thread] = None
self.timer: typing.Optional[threading.Timer] = None

self.start_time = time.time()

def _generate_channel(self, config: Config) -> grpc.Channel:
target = f"{config.host}:{config.port}"
# Create the channel with the service config
options = [
("grpc.keepalive_time_ms", config.keep_alive_time),
("grpc.initial_reconnect_backoff_ms", config.retry_backoff_ms),
("grpc.max_reconnect_backoff_ms", config.retry_backoff_max_ms),
("grpc.min_reconnect_backoff_ms", config.stream_deadline_ms),
]
if config.tls:
channel_args = {
"options": options,
"credentials": grpc.ssl_channel_credentials(),
}
if config.cert_path:
with open(config.cert_path, "rb") as f:
channel_args["credentials"] = grpc.ssl_channel_credentials(f.read())

channel = grpc.secure_channel(target, **channel_args)

else:
channel = grpc.insecure_channel(
target,
options=options,
)

return channel

def initialize(self, context: EvaluationContext) -> None:
self.connect()

def connect(self) -> None:
self.active = True

# Run monitoring in a separate thread
self.monitor_thread = threading.Thread(
target=self.monitor, daemon=True, name="FlagdGrpcSyncServiceMonitorThread"
)
self.monitor_thread.start()
## block until ready or deadline reached
timeout = self.deadline + time.time()
while not self.connected and time.time() < timeout:
time.sleep(0.05)
logger.debug("Finished blocking gRPC state initialization")

if not self.connected:
raise ProviderNotReadyError(
"Blocking init finished before data synced. Consider increasing startup deadline to avoid inconsistent evaluations."
)

def monitor(self) -> None:
self.channel.subscribe(self._state_change_callback, try_to_connect=True)

def _state_change_callback(self, new_state: grpc.ChannelConnectivity) -> None:
logger.debug(f"gRPC state change: {new_state}")
if new_state == grpc.ChannelConnectivity.READY:
if not self.thread or not self.thread.is_alive():
self.thread = threading.Thread(
target=self.listen,
daemon=True,
name="FlagdGrpcSyncWorkerThread",
)
self.thread.start()

if self.timer and self.timer.is_alive():
logger.debug("gRPC error timer expired")
self.timer.cancel()

elif new_state == grpc.ChannelConnectivity.TRANSIENT_FAILURE:
# this is the failed reconnect attempt so we are going into stale
self.emit_provider_stale(
ProviderEventDetails(
message="gRPC sync disconnected, reconnecting",
)
)
self.start_time = time.time()
# adding a timer, so we can emit the error event after time
self.timer = threading.Timer(self.retry_grace_period, self.emit_error)

logger.debug("gRPC error timer started")
self.timer.start()
self.connected = False

def emit_error(self) -> None:
logger.debug("gRPC error emitted")
self.emit_provider_error(
ProviderEventDetails(
message="gRPC sync disconnected, reconnecting",
error_code=ErrorCode.GENERAL,
)
)

def shutdown(self) -> None:
self.active = False
self.channel.close()

def listen(self) -> None:
call_args = (
{"timeout": self.streamline_deadline_seconds}
if self.streamline_deadline_seconds > 0
else {}
)
request_args = {"selector": self.selector} if self.selector is not None else {}

while self.active:
try:
request = sync_pb2.SyncFlagsRequest(**request_args)

logger.debug("Setting up gRPC sync flags connection")
for flag_rsp in self.stub.SyncFlags(
request, wait_for_ready=True, **call_args
):
flag_str = flag_rsp.flag_configuration
logger.debug(
f"Received flag configuration - {abs(hash(flag_str)) % (10**8)}"
)
self.flag_store.update(json.loads(flag_str))

if not self.connected:
self.emit_provider_ready(
ProviderEventDetails(
message="gRPC sync connection established"
)
)
self.connected = True

if not self.active:
logger.debug("Terminating gRPC sync thread")
return

Check warning on line 182 in providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/grpc_watcher.py

View check run for this annotation

Codecov / codecov/patch

providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/grpc_watcher.py#L181-L182

Added lines #L181 - L182 were not covered by tests
except grpc.RpcError as e: # noqa: PERF203
logger.error(f"SyncFlags stream error, {e.code()=} {e.details()=}")
except json.JSONDecodeError:
logger.exception(

Check warning on line 186 in providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/grpc_watcher.py

View check run for this annotation

Codecov / codecov/patch

providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/grpc_watcher.py#L185-L186

Added lines #L185 - L186 were not covered by tests
f"Could not parse JSON flag data from SyncFlags endpoint: {flag_str=}"
)
except ParseError:
logger.exception(

Check warning on line 190 in providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/grpc_watcher.py

View check run for this annotation

Codecov / codecov/patch

providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/grpc_watcher.py#L189-L190

Added lines #L189 - L190 were not covered by tests
f"Could not parse flag data using flagd syntax: {flag_str=}"
)
Loading