Skip to content

Commit 41d0ad8

Browse files
authored
feat(flagd-rpc): adding grace attempts (#117)
* feat(flagd-rpc): add caching with tests Signed-off-by: Simon Schrottner <[email protected]>
1 parent 16179e3 commit 41d0ad8

File tree

9 files changed

+137
-67
lines changed

9 files changed

+137
-67
lines changed

providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/config.py

Lines changed: 57 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import dataclasses
12
import os
23
import typing
34
from enum import Enum
@@ -19,10 +20,13 @@ class CacheType(Enum):
1920
DEFAULT_HOST = "localhost"
2021
DEFAULT_KEEP_ALIVE = 0
2122
DEFAULT_OFFLINE_SOURCE_PATH: typing.Optional[str] = None
23+
DEFAULT_OFFLINE_POLL_MS = 5000
2224
DEFAULT_PORT_IN_PROCESS = 8015
2325
DEFAULT_PORT_RPC = 8013
2426
DEFAULT_RESOLVER_TYPE = ResolverType.RPC
2527
DEFAULT_RETRY_BACKOFF = 1000
28+
DEFAULT_RETRY_BACKOFF_MAX = 120000
29+
DEFAULT_RETRY_GRACE_ATTEMPTS = 5
2630
DEFAULT_STREAM_DEADLINE = 600000
2731
DEFAULT_TLS = False
2832

@@ -32,9 +36,12 @@ class CacheType(Enum):
3236
ENV_VAR_HOST = "FLAGD_HOST"
3337
ENV_VAR_KEEP_ALIVE_TIME_MS = "FLAGD_KEEP_ALIVE_TIME_MS"
3438
ENV_VAR_OFFLINE_FLAG_SOURCE_PATH = "FLAGD_OFFLINE_FLAG_SOURCE_PATH"
39+
ENV_VAR_OFFLINE_POLL_MS = "FLAGD_OFFLINE_POLL_MS"
3540
ENV_VAR_PORT = "FLAGD_PORT"
3641
ENV_VAR_RESOLVER_TYPE = "FLAGD_RESOLVER"
3742
ENV_VAR_RETRY_BACKOFF_MS = "FLAGD_RETRY_BACKOFF_MS"
43+
ENV_VAR_RETRY_BACKOFF_MAX_MS = "FLAGD_RETRY_BACKOFF_MAX_MS"
44+
ENV_VAR_RETRY_GRACE_ATTEMPTS = "FLAGD_RETRY_GRACE_ATTEMPTS"
3845
ENV_VAR_STREAM_DEADLINE_MS = "FLAGD_STREAM_DEADLINE_MS"
3946
ENV_VAR_TLS = "FLAGD_TLS"
4047

@@ -62,19 +69,23 @@ def env_or_default(
6269
return val if cast is None else cast(val)
6370

6471

72+
@dataclasses.dataclass
6573
class Config:
6674
def __init__( # noqa: PLR0913
6775
self,
6876
host: typing.Optional[str] = None,
6977
port: typing.Optional[int] = None,
7078
tls: typing.Optional[bool] = None,
71-
resolver_type: typing.Optional[ResolverType] = None,
79+
resolver: typing.Optional[ResolverType] = None,
7280
offline_flag_source_path: typing.Optional[str] = None,
81+
offline_poll_interval_ms: typing.Optional[int] = None,
7382
retry_backoff_ms: typing.Optional[int] = None,
74-
deadline: typing.Optional[int] = None,
83+
retry_backoff_max_ms: typing.Optional[int] = None,
84+
retry_grace_attempts: typing.Optional[int] = None,
85+
deadline_ms: typing.Optional[int] = None,
7586
stream_deadline_ms: typing.Optional[int] = None,
76-
keep_alive: typing.Optional[int] = None,
77-
cache_type: typing.Optional[CacheType] = None,
87+
keep_alive_time: typing.Optional[int] = None,
88+
cache: typing.Optional[CacheType] = None,
7889
max_cache_size: typing.Optional[int] = None,
7990
):
8091
self.host = env_or_default(ENV_VAR_HOST, DEFAULT_HOST) if host is None else host
@@ -94,18 +105,37 @@ def __init__( # noqa: PLR0913
94105
if retry_backoff_ms is None
95106
else retry_backoff_ms
96107
)
108+
self.retry_backoff_max_ms: int = (
109+
int(
110+
env_or_default(
111+
ENV_VAR_RETRY_BACKOFF_MAX_MS, DEFAULT_RETRY_BACKOFF_MAX, cast=int
112+
)
113+
)
114+
if retry_backoff_max_ms is None
115+
else retry_backoff_max_ms
116+
)
97117

98-
self.resolver_type = (
118+
self.retry_grace_attempts: int = (
119+
int(
120+
env_or_default(
121+
ENV_VAR_RETRY_GRACE_ATTEMPTS, DEFAULT_RETRY_GRACE_ATTEMPTS, cast=int
122+
)
123+
)
124+
if retry_grace_attempts is None
125+
else retry_grace_attempts
126+
)
127+
128+
self.resolver = (
99129
env_or_default(
100130
ENV_VAR_RESOLVER_TYPE, DEFAULT_RESOLVER_TYPE, cast=convert_resolver_type
101131
)
102-
if resolver_type is None
103-
else resolver_type
132+
if resolver is None
133+
else resolver
104134
)
105135

106136
default_port = (
107137
DEFAULT_PORT_RPC
108-
if self.resolver_type is ResolverType.RPC
138+
if self.resolver is ResolverType.RPC
109139
else DEFAULT_PORT_IN_PROCESS
110140
)
111141

@@ -123,10 +153,20 @@ def __init__( # noqa: PLR0913
123153
else offline_flag_source_path
124154
)
125155

126-
self.deadline: int = (
156+
self.offline_poll_interval_ms: int = (
157+
int(
158+
env_or_default(
159+
ENV_VAR_OFFLINE_POLL_MS, DEFAULT_OFFLINE_POLL_MS, cast=int
160+
)
161+
)
162+
if offline_poll_interval_ms is None
163+
else offline_poll_interval_ms
164+
)
165+
166+
self.deadline_ms: int = (
127167
int(env_or_default(ENV_VAR_DEADLINE_MS, DEFAULT_DEADLINE, cast=int))
128-
if deadline is None
129-
else deadline
168+
if deadline_ms is None
169+
else deadline_ms
130170
)
131171

132172
self.stream_deadline_ms: int = (
@@ -139,18 +179,18 @@ def __init__( # noqa: PLR0913
139179
else stream_deadline_ms
140180
)
141181

142-
self.keep_alive: int = (
182+
self.keep_alive_time: int = (
143183
int(
144184
env_or_default(ENV_VAR_KEEP_ALIVE_TIME_MS, DEFAULT_KEEP_ALIVE, cast=int)
145185
)
146-
if keep_alive is None
147-
else keep_alive
186+
if keep_alive_time is None
187+
else keep_alive_time
148188
)
149189

150-
self.cache_type = (
190+
self.cache = (
151191
CacheType(env_or_default(ENV_VAR_CACHE_TYPE, DEFAULT_CACHE))
152-
if cache_type is None
153-
else cache_type
192+
if cache is None
193+
else cache
154194
)
155195

156196
self.max_cache_size: int = (

providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ def __init__( # noqa: PLR0913
5252
keep_alive_time: typing.Optional[int] = None,
5353
cache_type: typing.Optional[CacheType] = None,
5454
max_cache_size: typing.Optional[int] = None,
55+
retry_backoff_max_ms: typing.Optional[int] = None,
56+
retry_grace_attempts: typing.Optional[int] = None,
5557
):
5658
"""
5759
Create an instance of the FlagdProvider
@@ -79,31 +81,34 @@ def __init__( # noqa: PLR0913
7981
host=host,
8082
port=port,
8183
tls=tls,
82-
deadline=deadline,
84+
deadline_ms=deadline,
8385
retry_backoff_ms=retry_backoff_ms,
84-
resolver_type=resolver_type,
86+
retry_backoff_max_ms=retry_backoff_max_ms,
87+
retry_grace_attempts=retry_grace_attempts,
88+
resolver=resolver_type,
8589
offline_flag_source_path=offline_flag_source_path,
8690
stream_deadline_ms=stream_deadline_ms,
87-
keep_alive=keep_alive_time,
88-
cache_type=cache_type,
91+
keep_alive_time=keep_alive_time,
92+
cache=cache_type,
8993
max_cache_size=max_cache_size,
9094
)
9195

9296
self.resolver = self.setup_resolver()
9397

9498
def setup_resolver(self) -> AbstractResolver:
95-
if self.config.resolver_type == ResolverType.RPC:
99+
if self.config.resolver == ResolverType.RPC:
96100
return GrpcResolver(
97101
self.config,
98102
self.emit_provider_ready,
99103
self.emit_provider_error,
104+
self.emit_provider_stale,
100105
self.emit_provider_configuration_changed,
101106
)
102-
elif self.config.resolver_type == ResolverType.IN_PROCESS:
107+
elif self.config.resolver == ResolverType.IN_PROCESS:
103108
return InProcessResolver(self.config, self)
104109
else:
105110
raise ValueError(
106-
f"`resolver_type` parameter invalid: {self.config.resolver_type}"
111+
f"`resolver_type` parameter invalid: {self.config.resolver}"
107112
)
108113

109114
def initialize(self, evaluation_context: EvaluationContext) -> None:

providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -37,30 +37,32 @@
3737

3838

3939
class GrpcResolver:
40-
MAX_BACK_OFF = 120
41-
4240
def __init__(
4341
self,
4442
config: Config,
4543
emit_provider_ready: typing.Callable[[ProviderEventDetails], None],
4644
emit_provider_error: typing.Callable[[ProviderEventDetails], None],
45+
emit_provider_stale: typing.Callable[[ProviderEventDetails], None],
4746
emit_provider_configuration_changed: typing.Callable[
4847
[ProviderEventDetails], None
4948
],
5049
):
5150
self.config = config
5251
self.emit_provider_ready = emit_provider_ready
5352
self.emit_provider_error = emit_provider_error
53+
self.emit_provider_stale = emit_provider_stale
5454
self.emit_provider_configuration_changed = emit_provider_configuration_changed
5555
self.cache: typing.Optional[BaseCacheImpl] = (
5656
LRUCache(maxsize=self.config.max_cache_size)
57-
if self.config.cache_type == CacheType.LRU
57+
if self.config.cache == CacheType.LRU
5858
else None
5959
)
6060
self.stub, self.channel = self._create_stub()
6161
self.retry_backoff_seconds = config.retry_backoff_ms * 0.001
62+
self.retry_backoff_max_seconds = config.retry_backoff_max_ms * 0.001
63+
self.retry_grace_attempts = config.retry_grace_attempts
6264
self.streamline_deadline_seconds = config.stream_deadline_ms * 0.001
63-
self.deadline = config.deadline * 0.001
65+
self.deadline = config.deadline_ms * 0.001
6466
self.connected = False
6567

6668
def _create_stub(
@@ -70,13 +72,10 @@ def _create_stub(
7072
channel_factory = grpc.secure_channel if config.tls else grpc.insecure_channel
7173
channel = channel_factory(
7274
f"{config.host}:{config.port}",
73-
options=(("grpc.keepalive_time_ms", config.keep_alive),),
75+
options=(("grpc.keepalive_time_ms", config.keep_alive_time),),
7476
)
7577
stub = evaluation_pb2_grpc.ServiceStub(channel)
7678

77-
if self.cache:
78-
self.cache.clear()
79-
8079
return stub, channel
8180

8281
def initialize(self, evaluation_context: EvaluationContext) -> None:
@@ -113,8 +112,10 @@ def listen(self) -> None:
113112
if self.streamline_deadline_seconds > 0
114113
else {}
115114
)
115+
retry_counter = 0
116116
while self.active:
117117
request = evaluation_pb2.EventStreamRequest()
118+
118119
try:
119120
logger.debug("Setting up gRPC sync flags connection")
120121
for message in self.stub.EventStream(request, **call_args):
@@ -126,6 +127,7 @@ def listen(self) -> None:
126127
)
127128
)
128129
self.connected = True
130+
retry_counter = 0
129131
# reset retry delay after successsful read
130132
retry_delay = self.retry_backoff_seconds
131133

@@ -146,15 +148,37 @@ def listen(self) -> None:
146148
)
147149

148150
self.connected = False
151+
self.on_connection_error(retry_counter, retry_delay)
152+
153+
retry_delay = self.handle_retry(retry_counter, retry_delay)
154+
155+
retry_counter = retry_counter + 1
156+
157+
def handle_retry(self, retry_counter: int, retry_delay: float) -> float:
158+
if retry_counter == 0:
159+
logger.info("gRPC sync disconnected, reconnecting immediately")
160+
else:
161+
logger.info(f"gRPC sync disconnected, reconnecting in {retry_delay}s")
162+
time.sleep(retry_delay)
163+
retry_delay = min(1.1 * retry_delay, self.retry_backoff_max_seconds)
164+
return retry_delay
165+
166+
def on_connection_error(self, retry_counter: int, retry_delay: float) -> None:
167+
if retry_counter == self.retry_grace_attempts:
168+
if self.cache:
169+
self.cache.clear()
149170
self.emit_provider_error(
150171
ProviderEventDetails(
151172
message=f"gRPC sync disconnected, reconnecting in {retry_delay}s",
152173
error_code=ErrorCode.GENERAL,
153174
)
154175
)
155-
logger.info(f"gRPC sync disconnected, reconnecting in {retry_delay}s")
156-
time.sleep(retry_delay)
157-
retry_delay = min(1.1 * retry_delay, self.MAX_BACK_OFF)
176+
elif retry_counter == 1:
177+
self.emit_provider_stale(
178+
ProviderEventDetails(
179+
message=f"gRPC sync disconnected, reconnecting in {retry_delay}s",
180+
)
181+
)
158182

159183
def handle_changed_flags(self, data: typing.Any) -> None:
160184
changed_flags = list(data["flags"].keys())

providers/openfeature-provider-flagd/tests/e2e/conftest.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
# running all gherkin tests, except the ones, not implemented
1515
def pytest_collection_modifyitems(config):
16-
marker = "not customCert and not unixsocket and not sync"
16+
marker = "not customCert and not unixsocket and not sync and not targetURI"
1717

1818
# this seems to not work with python 3.8
1919
if hasattr(config.option, "markexpr") and config.option.markexpr == "":

providers/openfeature-provider-flagd/tests/e2e/steps.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -591,7 +591,7 @@ def assert_handlers(
591591
)
592592
)
593593
def assert_handler_run(event_type: ProviderEvent, event_handles):
594-
assert_handlers(event_handles, event_type, max_wait=6)
594+
assert_handlers(event_handles, event_type, max_wait=30)
595595

596596

597597
@then(

providers/openfeature-provider-flagd/tests/e2e/test_config.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
import pytest
66
from asserts import assert_equal
7-
from pytest_bdd import parsers, scenarios, then, when
7+
from pytest_bdd import given, parsers, scenarios, then, when
88
from tests.e2e.conftest import TEST_HARNESS_PATH
99

1010
from openfeature.contrib.provider.flagd.config import CacheType, Config, ResolverType
@@ -47,19 +47,19 @@ def option_values() -> dict:
4747
return {}
4848

4949

50-
@when(
50+
@given(
5151
parsers.cfparse(
52-
'we have an option "{option}" of type "{type_info}" with value "{value}"',
52+
'an option "{option}" of type "{type_info}" with value "{value}"',
5353
),
5454
)
5555
def option_with_value(option: str, value: str, type_info: str, option_values: dict):
5656
value = type_cast[type_info](value)
5757
option_values[camel_to_snake(option)] = value
5858

5959

60-
@when(
60+
@given(
6161
parsers.cfparse(
62-
'we have an environment variable "{env}" with value "{value}"',
62+
'an environment variable "{env}" with value "{value}"',
6363
),
6464
)
6565
def env_with_value(monkeypatch, env: str, value: str):
@@ -68,7 +68,7 @@ def env_with_value(monkeypatch, env: str, value: str):
6868

6969
@when(
7070
parsers.cfparse(
71-
"we initialize a config",
71+
"a config was initialized",
7272
),
7373
target_fixture="config",
7474
)
@@ -78,12 +78,12 @@ def initialize_config(option_values):
7878

7979
@when(
8080
parsers.cfparse(
81-
'we initialize a config for "{resolver_type}"',
81+
'a config was initialized for "{resolver_type}"',
8282
),
8383
target_fixture="config",
8484
)
8585
def initialize_config_for(resolver_type: str, option_values: dict):
86-
return Config(resolver_type=ResolverType(resolver_type), **option_values)
86+
return Config(resolver=ResolverType(resolver_type), **option_values)
8787

8888

8989
@then(

providers/openfeature-provider-flagd/tests/e2e/test_rpc_reconnect.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,4 +27,5 @@ def image():
2727

2828
scenarios(
2929
f"{TEST_HARNESS_PATH}/gherkin/flagd-reconnect.feature",
30+
f"{TEST_HARNESS_PATH}/gherkin/events.feature",
3031
)

0 commit comments

Comments
 (0)