Skip to content

Commit a37e5a3

Browse files
committed
Merge branch 'feat/grace_attempts' into feat/grpc-sync-addition
Signed-off-by: Simon Schrottner <[email protected]>
2 parents c05b07d + 518572a commit a37e5a3

File tree

7 files changed

+91
-19
lines changed

7 files changed

+91
-19
lines changed

providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/config.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import dataclasses
12
import os
23
import typing
34
from enum import Enum
@@ -19,10 +20,13 @@ class CacheType(Enum):
1920
DEFAULT_HOST = "localhost"
2021
DEFAULT_KEEP_ALIVE = 0
2122
DEFAULT_OFFLINE_SOURCE_PATH: typing.Optional[str] = None
23+
DEFAULT_OFFLINE_POLL_MS = 5000
2224
DEFAULT_PORT_IN_PROCESS = 8015
2325
DEFAULT_PORT_RPC = 8013
2426
DEFAULT_RESOLVER_TYPE = ResolverType.RPC
2527
DEFAULT_RETRY_BACKOFF = 1000
28+
DEFAULT_RETRY_BACKOFF_MAX = 120000
29+
DEFAULT_RETRY_GRACE_ATTEMPTS = 5
2630
DEFAULT_STREAM_DEADLINE = 600000
2731
DEFAULT_TLS = False
2832

@@ -32,9 +36,12 @@ class CacheType(Enum):
3236
ENV_VAR_HOST = "FLAGD_HOST"
3337
ENV_VAR_KEEP_ALIVE_TIME_MS = "FLAGD_KEEP_ALIVE_TIME_MS"
3438
ENV_VAR_OFFLINE_FLAG_SOURCE_PATH = "FLAGD_OFFLINE_FLAG_SOURCE_PATH"
39+
ENV_VAR_OFFLINE_POLL_MS = "FLAGD_OFFLINE_POLL_MS"
3540
ENV_VAR_PORT = "FLAGD_PORT"
3641
ENV_VAR_RESOLVER_TYPE = "FLAGD_RESOLVER"
3742
ENV_VAR_RETRY_BACKOFF_MS = "FLAGD_RETRY_BACKOFF_MS"
43+
ENV_VAR_RETRY_BACKOFF_MAX_MS = "FLAGD_RETRY_BACKOFF_MAX_MS"
44+
ENV_VAR_RETRY_GRACE_ATTEMPTS = "FLAGD_RETRY_GRACE_ATTEMPTS"
3845
ENV_VAR_SELECTOR = "FLAGD_SELECTOR"
3946
ENV_VAR_STREAM_DEADLINE_MS = "FLAGD_STREAM_DEADLINE_MS"
4047
ENV_VAR_TLS = "FLAGD_TLS"
@@ -63,6 +70,7 @@ def env_or_default(
6370
return val if cast is None else cast(val)
6471

6572

73+
@dataclasses.dataclass
6674
class Config:
6775
def __init__( # noqa: PLR0913
6876
self,
@@ -72,7 +80,10 @@ def __init__( # noqa: PLR0913
7280
selector: typing.Optional[str] = None,
7381
resolver: typing.Optional[ResolverType] = None,
7482
offline_flag_source_path: typing.Optional[str] = None,
83+
offline_poll_interval_ms: typing.Optional[int] = None,
7584
retry_backoff_ms: typing.Optional[int] = None,
85+
retry_backoff_max_ms: typing.Optional[int] = None,
86+
retry_grace_attempts: typing.Optional[int] = None,
7687
deadline_ms: typing.Optional[int] = None,
7788
stream_deadline_ms: typing.Optional[int] = None,
7889
keep_alive_time: typing.Optional[int] = None,
@@ -96,6 +107,25 @@ def __init__( # noqa: PLR0913
96107
if retry_backoff_ms is None
97108
else retry_backoff_ms
98109
)
110+
self.retry_backoff_max_ms: int = (
111+
int(
112+
env_or_default(
113+
ENV_VAR_RETRY_BACKOFF_MAX_MS, DEFAULT_RETRY_BACKOFF_MAX, cast=int
114+
)
115+
)
116+
if retry_backoff_max_ms is None
117+
else retry_backoff_max_ms
118+
)
119+
120+
self.retry_grace_attempts: int = (
121+
int(
122+
env_or_default(
123+
ENV_VAR_RETRY_GRACE_ATTEMPTS, DEFAULT_RETRY_GRACE_ATTEMPTS, cast=int
124+
)
125+
)
126+
if retry_grace_attempts is None
127+
else retry_grace_attempts
128+
)
99129

100130
self.resolver = (
101131
env_or_default(
@@ -125,6 +155,16 @@ def __init__( # noqa: PLR0913
125155
else offline_flag_source_path
126156
)
127157

158+
self.offline_poll_interval_ms: int = (
159+
int(
160+
env_or_default(
161+
ENV_VAR_OFFLINE_POLL_MS, DEFAULT_OFFLINE_POLL_MS, cast=int
162+
)
163+
)
164+
if offline_poll_interval_ms is None
165+
else offline_poll_interval_ms
166+
)
167+
128168
self.deadline_ms: int = (
129169
int(env_or_default(ENV_VAR_DEADLINE_MS, DEFAULT_DEADLINE, cast=int))
130170
if deadline_ms is None

providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ def __init__( # noqa: PLR0913
5454
keep_alive_time: typing.Optional[int] = None,
5555
cache_type: typing.Optional[CacheType] = None,
5656
max_cache_size: typing.Optional[int] = None,
57+
retry_backoff_max_ms: typing.Optional[int] = None,
58+
retry_grace_attempts: typing.Optional[int] = None,
5759
):
5860
"""
5961
Create an instance of the FlagdProvider
@@ -83,6 +85,8 @@ def __init__( # noqa: PLR0913
8385
tls=tls,
8486
deadline_ms=deadline,
8587
retry_backoff_ms=retry_backoff_ms,
88+
retry_backoff_max_ms=retry_backoff_max_ms,
89+
retry_grace_attempts=retry_grace_attempts,
8690
selector=selector,
8791
resolver=resolver_type,
8892
offline_flag_source_path=offline_flag_source_path,
@@ -100,6 +104,7 @@ def setup_resolver(self) -> AbstractResolver:
100104
self.config,
101105
self.emit_provider_ready,
102106
self.emit_provider_error,
107+
self.emit_provider_stale,
103108
self.emit_provider_configuration_changed,
104109
)
105110
elif self.config.resolver == ResolverType.IN_PROCESS:

providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727

2828
from ..config import CacheType, Config
2929
from ..flag_type import FlagType
30-
from .protocol import AbstractResolver
3130

3231
if typing.TYPE_CHECKING:
3332
from google.protobuf.message import Message
@@ -37,23 +36,21 @@
3736
logger = logging.getLogger("openfeature.contrib")
3837

3938

40-
class GrpcResolver(AbstractResolver):
41-
MAX_BACK_OFF = 120
42-
43-
MAX_BACK_OFF = 120
44-
39+
class GrpcResolver:
4540
def __init__(
4641
self,
4742
config: Config,
4843
emit_provider_ready: typing.Callable[[ProviderEventDetails], None],
4944
emit_provider_error: typing.Callable[[ProviderEventDetails], None],
45+
emit_provider_stale: typing.Callable[[ProviderEventDetails], None],
5046
emit_provider_configuration_changed: typing.Callable[
5147
[ProviderEventDetails], None
5248
],
5349
):
5450
self.config = config
5551
self.emit_provider_ready = emit_provider_ready
5652
self.emit_provider_error = emit_provider_error
53+
self.emit_provider_stale = emit_provider_stale
5754
self.emit_provider_configuration_changed = emit_provider_configuration_changed
5855
self.cache: typing.Optional[BaseCacheImpl] = (
5956
LRUCache(maxsize=self.config.max_cache_size)
@@ -62,6 +59,8 @@ def __init__(
6259
)
6360
self.stub, self.channel = self._create_stub()
6461
self.retry_backoff_seconds = config.retry_backoff_ms * 0.001
62+
self.retry_backoff_max_seconds = config.retry_backoff_ms * 0.001
63+
self.retry_grace_attempts = config.retry_grace_attempts
6564
self.streamline_deadline_seconds = config.stream_deadline_ms * 0.001
6665
self.deadline = config.deadline_ms * 0.001
6766
self.connected = False
@@ -77,9 +76,6 @@ def _create_stub(
7776
)
7877
stub = evaluation_pb2_grpc.ServiceStub(channel)
7978

80-
if self.cache:
81-
self.cache.clear()
82-
8379
return stub, channel
8480

8581
def initialize(self, evaluation_context: EvaluationContext) -> None:
@@ -116,8 +112,10 @@ def listen(self) -> None:
116112
if self.streamline_deadline_seconds > 0
117113
else {}
118114
)
115+
retry_counter = 0
119116
while self.active:
120117
request = evaluation_pb2.EventStreamRequest()
118+
121119
try:
122120
logger.debug("Setting up gRPC sync flags connection")
123121
for message in self.stub.EventStream(request, **call_args):
@@ -129,6 +127,7 @@ def listen(self) -> None:
129127
)
130128
)
131129
self.connected = True
130+
retry_counter = 0
132131
# reset retry delay after successsful read
133132
retry_delay = self.retry_backoff_seconds
134133

@@ -149,15 +148,37 @@ def listen(self) -> None:
149148
)
150149

151150
self.connected = False
151+
self.handle_error(retry_counter, retry_delay)
152+
153+
retry_delay = self.handle_retry(retry_counter, retry_delay)
154+
155+
retry_counter = retry_counter + 1
156+
157+
def handle_retry(self, retry_counter: int, retry_delay: float) -> float:
158+
if retry_counter == 0:
159+
logger.info("gRPC sync disconnected, reconnecting immediately")
160+
else:
161+
logger.info(f"gRPC sync disconnected, reconnecting in {retry_delay}s")
162+
time.sleep(retry_delay)
163+
retry_delay = min(1.1 * retry_delay, self.retry_backoff_max_seconds)
164+
return retry_delay
165+
166+
def handle_error(self, retry_counter: int, retry_delay: float) -> None:
167+
if retry_counter == self.retry_grace_attempts:
168+
if self.cache:
169+
self.cache.clear()
152170
self.emit_provider_error(
153171
ProviderEventDetails(
154172
message=f"gRPC sync disconnected, reconnecting in {retry_delay}s",
155173
error_code=ErrorCode.GENERAL,
156174
)
157175
)
158-
logger.info(f"gRPC sync disconnected, reconnecting in {retry_delay}s")
159-
time.sleep(retry_delay)
160-
retry_delay = min(1.1 * retry_delay, self.MAX_BACK_OFF)
176+
elif retry_counter == 1:
177+
self.emit_provider_stale(
178+
ProviderEventDetails(
179+
message=f"gRPC sync disconnected, reconnecting in {retry_delay}s",
180+
)
181+
)
161182

162183
def handle_changed_flags(self, data: typing.Any) -> None:
163184
changed_flags = list(data["flags"].keys())

providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/grpc_watcher.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from openfeature.evaluation_context import EvaluationContext
1010
from openfeature.event import ProviderEventDetails
1111
from openfeature.exception import ErrorCode, ParseError, ProviderNotReadyError
12-
from openfeature.schemas.protobuf.flagd.sync.v1 import ( # type:ignore[import-not-found]
12+
from openfeature.schemas.protobuf.flagd.sync.v1 import (
1313
sync_pb2,
1414
sync_pb2_grpc,
1515
)
@@ -22,8 +22,6 @@
2222

2323

2424
class GrpcWatcher(FlagStateConnector):
25-
MAX_BACK_OFF = 120
26-
2725
def __init__(
2826
self,
2927
config: Config,
@@ -36,6 +34,8 @@ def __init__(
3634

3735
self.stub, self.channel = self.create_stub()
3836
self.retry_backoff_seconds = config.retry_backoff_ms * 0.001
37+
self.retry_backoff_max_seconds = config.retry_backoff_ms * 0.001
38+
self.retry_grace_attempts = config.retry_grace_attempts
3939
self.streamline_deadline_seconds = config.stream_deadline_ms * 0.001
4040
self.deadline = config.deadline_ms * 0.001
4141
self.selector = config.selector
@@ -85,9 +85,14 @@ def sync_flags(self) -> None:
8585
if self.streamline_deadline_seconds > 0
8686
else {}
8787
)
88+
8889
while self.active:
8990
try:
90-
request = sync_pb2.SyncFlagsRequest(selector=self.selector)
91+
request_args = (
92+
{"selector": self.selector} if self.selector is not None else {}
93+
)
94+
request = sync_pb2.SyncFlagsRequest(**request_args)
95+
9196
logger.debug("Setting up gRPC sync flags connection")
9297
for flag_rsp in self.stub.SyncFlags(request, **call_args):
9398
flag_str = flag_rsp.flag_configuration
@@ -130,4 +135,4 @@ def sync_flags(self) -> None:
130135
)
131136
logger.info(f"gRPC sync disconnected, reconnecting in {retry_delay}s")
132137
time.sleep(retry_delay)
133-
retry_delay = min(1.1 * retry_delay, self.MAX_BACK_OFF)
138+
retry_delay = min(1.1 * retry_delay, self.retry_backoff_max_seconds)

providers/openfeature-provider-flagd/tests/e2e/steps.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -597,7 +597,7 @@ def assert_handlers(
597597
)
598598
)
599599
def assert_handler_run(event_type: ProviderEvent, event_handles):
600-
assert_handlers(event_handles, event_type, max_wait=6)
600+
assert_handlers(event_handles, event_type, max_wait=30)
601601

602602

603603
@then(

providers/openfeature-provider-flagd/tests/e2e/test_rpc_reconnect.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,4 +27,5 @@ def image():
2727

2828
scenarios(
2929
f"{TEST_HARNESS_PATH}/gherkin/flagd-reconnect.feature",
30+
f"{TEST_HARNESS_PATH}/gherkin/events.feature",
3031
)

0 commit comments

Comments
 (0)