Skip to content

Commit 6e51add

Browse files
authored
Merge pull request #66 from launchdarkly/dr/heartbeats
Reconnect when no data is received for 5 minutes.
2 parents cf12723 + 3e34e3c commit 6e51add

File tree

2 files changed

+21
-5
lines changed

2 files changed

+21
-5
lines changed

ldclient/sse_client.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,12 @@
1616

1717

1818
class SSEClient(object):
19-
def __init__(self, url, last_id=None, retry=3000, session=None, **kwargs):
19+
def __init__(self, url, last_id=None, retry=3000, connect_timeout=10, read_timeout=300, session=None, **kwargs):
2020
self.url = url
2121
self.last_id = last_id
2222
self.retry = retry
23+
self._connect_timeout = connect_timeout
24+
self._read_timeout = read_timeout
2325

2426
# Optional support for passing in a requests.Session()
2527
self.session = session
@@ -46,7 +48,12 @@ def _connect(self):
4648

4749
# Use session if set. Otherwise fall back to requests module.
4850
requester = self.session or requests
49-
self.resp = requester.get(self.url, stream=True, **self.requests_kwargs)
51+
self.resp = requester.get(
52+
self.url,
53+
stream=True,
54+
timeout=(self._connect_timeout, self._read_timeout),
55+
**self.requests_kwargs)
56+
5057
self.resp_file = self.resp.raw
5158

5259
# TODO: Ensure we're handling redirects. Might also stick the 'origin'

ldclient/streaming.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,15 @@
44
from threading import Thread
55

66
import backoff
7-
import requests
7+
88
from ldclient.interfaces import UpdateProcessor
99
from ldclient.sse_client import SSEClient
1010
from ldclient.util import _stream_headers, log
1111

12+
# allows for up to 5 minutes to elapse without any data sent across the stream. The heartbeats sent as comments on the
13+
# stream will keep this from triggering
14+
stream_read_timeout = 5 * 60
15+
1216

1317
class StreamingUpdateProcessor(Thread, UpdateProcessor):
1418
def __init__(self, config, requester, store, ready):
@@ -30,9 +34,14 @@ def run(self):
3034
def _backoff_expo():
3135
return backoff.expo(max_value=30)
3236

33-
@backoff.on_exception(_backoff_expo, requests.exceptions.RequestException, max_tries=None, jitter=backoff.full_jitter)
37+
@backoff.on_exception(_backoff_expo, BaseException, max_tries=None, jitter=backoff.full_jitter)
3438
def _connect(self):
35-
messages = SSEClient(self._uri, verify=self._config.verify_ssl, headers=_stream_headers(self._config.sdk_key))
39+
messages = SSEClient(
40+
self._uri,
41+
verify=self._config.verify_ssl,
42+
headers=_stream_headers(self._config.sdk_key),
43+
connect_timeout=self._config.connect_timeout,
44+
read_timeout=stream_read_timeout)
3645
for msg in messages:
3746
if not self._running:
3847
break

0 commit comments

Comments
 (0)