1
+ from __future__ import absolute_import
2
+
1
3
import json
2
4
from threading import Thread
3
5
4
6
import time
5
- from sseclient import SSEClient
6
7
8
+ from requests import HTTPError
9
+ from sseclient import SSEClient
7
10
from ldclient .interfaces import UpdateProcessor
8
11
from ldclient .util import _stream_headers , log
9
12
@@ -30,8 +33,17 @@ def run(self):
30
33
for msg in messages :
31
34
if not self ._running :
32
35
break
33
- if self .process_message (self ._store , self ._requester , msg , self ._ready ) is True :
36
+ message_ok = self .process_message (self ._store , self ._requester , msg , self ._ready )
37
+ if message_ok is True and self ._ready .is_set () is False :
34
38
self ._ready .set ()
39
+ except HTTPError as e :
40
+ if e .response is not None and e .response .status_code is not None :
41
+ if 400 <= e .response .status_code < 500 :
42
+ log .error ("StreamingUpdateProcessor response: " + str (e ) + ". Retries will not be attempted." )
43
+ if self ._ready .is_set () is False :
44
+ self ._ready .set ()
45
+ self ._running = False
46
+ return
35
47
except Exception as e :
36
48
log .error ("Could not connect to LaunchDarkly stream: " + str (e .message ) +
37
49
" waiting 1 second before trying again." )
@@ -42,15 +54,15 @@ def stop(self):
42
54
self ._running = False
43
55
44
56
def initialized (self ):
45
- return self ._running and self ._ready .is_set () and self ._store .initialized
57
+ return self ._running and self ._ready .is_set () is True and self ._store .initialized is True
46
58
47
59
@staticmethod
48
60
def process_message (store , requester , msg , ready ):
49
61
log .debug ("Received stream event {} with data: {}" .format (msg .event , msg .data ))
50
62
if msg .event == 'put' :
51
63
payload = json .loads (msg .data )
52
64
store .init (payload )
53
- if not ready .is_set () and store .initialized :
65
+ if not ready .is_set () is True and store .initialized is True :
54
66
log .info ("StreamingUpdateProcessor initialized ok" )
55
67
return True
56
68
elif msg .event == 'patch' :
@@ -63,7 +75,7 @@ def process_message(store, requester, msg, ready):
63
75
store .upsert (key , requester .get_one (key ))
64
76
elif msg .event == "indirect/put" :
65
77
store .init (requester .get_all ())
66
- if not ready .is_set () and store .initialized :
78
+ if not ready .is_set () is True and store .initialized is True :
67
79
log .info ("StreamingUpdateProcessor initialized ok" )
68
80
return True
69
81
elif msg .event == 'delete' :
0 commit comments