Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 65 additions & 47 deletions UnleashClient/connectors/streaming_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,61 +65,79 @@ def stop(self):
if self._thread:
self._thread.join(timeout=5)

def _create_client(self) -> SSEClient:
connect_strategy = ConnectStrategy.http(
self._base_url,
headers=self._headers,
urllib3_request_options=self._custom_options,
)
retry_strategy = RetryDelayStrategy.default(
max_delay=self._backoff_max,
backoff_multiplier=self._backoff_multiplier,
jitter_multiplier=self._backoff_jitter,
)

self._client = SSEClient(
connect=connect_strategy,
initial_retry_delay=self._backoff_initial,
retry_delay_strategy=retry_strategy,
retry_delay_reset_threshold=60.0,
error_strategy=ErrorStrategy.always_continue(),
logger=LOGGER,
)

return self._client

def _process_event(self, item) -> None:
if not item.event:
return

if item.event in ("unleash-connected", "unleash-updated"):
try:
self.engine.take_state(item.data)
self.cache.set(FEATURES_URL, self.engine.get_state())

if item.event == "unleash-connected" and self.ready_callback:
try:
self.ready_callback()
except Exception:
LOGGER.debug("Ready callback failed", exc_info=True)
except Exception:
LOGGER.error("Error applying streaming state", exc_info=True)
self.load_features()
else:
LOGGER.warning("Ignoring SSE event type: %s", item.event)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if for the unknown events we want warnings or maybe debug instead

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we may add new event types over time and it's not sth to warn about and pollute logs at this level


def _process_error(self, error) -> None:
if error is None:
if not self._stop.is_set():
LOGGER.info("SSE stream ended - server closed connection gracefully")
elif not self._stop.is_set():
LOGGER.warning("SSE stream error: %s - will retry", error)

def _close_client(self) -> None:
try:
if self._client is not None:
self._client.close()
except Exception:
pass

def _run(self):
try:
LOGGER.info("Connecting to Unleash streaming endpoint: %s", self._base_url)
client = self._create_client()

connect_strategy = ConnectStrategy.http(
self._base_url,
headers=self._headers,
urllib3_request_options=self._custom_options,
)

retry_strategy = RetryDelayStrategy.default(
max_delay=self._backoff_max,
backoff_multiplier=self._backoff_multiplier,
jitter_multiplier=self._backoff_jitter,
)

self._client = SSEClient(
connect=connect_strategy,
initial_retry_delay=self._backoff_initial,
retry_delay_strategy=retry_strategy,
retry_delay_reset_threshold=60.0,
error_strategy=ErrorStrategy.always_continue(),
logger=LOGGER,
)

# Initial hydration happens in the stream.
for event in self._client.events:
for item in client.all:
if self._stop.is_set():
LOGGER.debug("SSE stream stopped by user request")
break
if not event.event:
continue

if event.event in ("unleash-connected", "unleash-updated"):
try:
self.engine.take_state(event.data)
self.cache.set(FEATURES_URL, self.engine.get_state())

if event.event == "unleash-connected" and self.ready_callback:
try:
self.ready_callback()
except Exception:
LOGGER.debug("Ready callback failed", exc_info=True)
except Exception:
LOGGER.error("Error applying streaming state", exc_info=True)
self.load_features()
else:
LOGGER.debug("Ignoring SSE event type: %s", event.event)

LOGGER.debug("SSE stream ended")
if hasattr(item, "event") and hasattr(item, "data"):
self._process_event(item)
elif hasattr(item, "error"):
self._process_error(item.error)
except Exception as exc:
LOGGER.warning("Streaming connection failed: %s", exc)
self.load_features()
finally:
try:
if self._client is not None:
self._client.close()
except Exception:
pass
self._close_client()
Loading