Skip to content

Commit b156e21

Browse files
(fix): refactor batch_event_processor to reset deadline after it passes. (#227)
* refactor batch_event_processor to reset deadline after it passes. Also, hang on queue with timeout at flush interval * fix lint error * lint * fix lint error * finally got to debug replacing the mock logger * update to take time in float * add unit tests for float flush deadline and flush interval * fix broken test * update method description * added a unit test to make sure processor is called once during flush interval * lint error
1 parent 9fee8ff commit b156e21

File tree

2 files changed

+69
-7
lines changed

2 files changed

+69
-7
lines changed

optimizely/event/event_processor.py

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -150,18 +150,18 @@ def _validate_instantiation_props(self, prop, prop_name, default_value):
150150
return is_valid
151151

152152
def _get_time(self, _time=None):
153-
""" Method to return rounded off time as integer in seconds. If _time is None, uses current time.
153+
""" Method to return time as float in seconds. If _time is None, uses current time.
154154
155155
Args:
156-
_time: time in seconds that needs to be rounded off.
156+
_time: time in seconds.
157157
158158
Returns:
159-
Integer time in seconds.
159+
Float time in seconds.
160160
"""
161161
if _time is None:
162-
return int(round(time.time()))
162+
return time.time()
163163

164-
return int(round(_time))
164+
return _time
165165

166166
def start(self):
167167
""" Starts the batch processing thread to batch events. """
@@ -182,12 +182,18 @@ def _run(self):
182182
while True:
183183
if self._get_time() >= self.flushing_interval_deadline:
184184
self._flush_queue()
185+
self.flushing_interval_deadline = self._get_time() + \
186+
self._get_time(self.flush_interval.total_seconds())
187+
self.logger.debug('Flush interval deadline. Flushed queue.')
185188

186189
try:
187-
item = self.event_queue.get(False)
190+
interval = self.flushing_interval_deadline - self._get_time()
191+
item = self.event_queue.get(True, interval)
192+
193+
if item is None:
194+
continue
188195

189196
except queue.Empty:
190-
time.sleep(0.05)
191197
continue
192198

193199
if item == self._SHUTDOWN_SIGNAL:

tests/test_event_processor.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,28 @@ def test_flush_on_max_timeout(self):
173173
self.assertStrictTrue(event_dispatcher.compare_events())
174174
self.assertEqual(0, self.event_processor.event_queue.qsize())
175175

176+
def test_flush_once_max_timeout(self):
177+
event_dispatcher = TestEventDispatcher()
178+
179+
self.optimizely.logger = SimpleLogger(enums.LogLevels.DEBUG)
180+
181+
with mock.patch.object(self.optimizely, 'logger') as mock_config_logging:
182+
self._set_event_processor(event_dispatcher, mock_config_logging)
183+
184+
user_event = self._build_conversion_event(self.event_name)
185+
self.event_processor.process(user_event)
186+
event_dispatcher.expect_conversion(self.event_name, self.test_user_id)
187+
188+
time.sleep(1.75)
189+
190+
self.assertStrictTrue(event_dispatcher.compare_events())
191+
self.assertEqual(0, self.event_processor.event_queue.qsize())
192+
self.assertTrue(mock_config_logging.debug.called)
193+
mock_config_logging.debug.assert_any_call('Received event of type ConversionEvent for user test_user.')
194+
mock_config_logging.debug.assert_any_call('Flush interval deadline. Flushed queue.')
195+
self.assertTrue(mock_config_logging.debug.call_count == 2)
196+
self.optimizely.logger = SimpleLogger()
197+
176198
def test_flush_max_batch_size(self):
177199
event_dispatcher = TestEventDispatcher()
178200

@@ -339,6 +361,40 @@ def test_init__invalid_flush_interval(self):
339361
self.assertEqual(datetime.timedelta(seconds=30), self.event_processor.flush_interval)
340362
mock_config_logging.info.assert_called_with('Using default value 30 for flush_interval.')
341363

364+
def test_init__float_flush_interval(self):
365+
event_dispatcher = TestEventDispatcher()
366+
367+
with mock.patch.object(self.optimizely, 'logger') as mock_config_logging:
368+
self.event_processor = BatchEventProcessor(
369+
event_dispatcher,
370+
mock_config_logging,
371+
True,
372+
self.event_queue,
373+
self.MAX_BATCH_SIZE,
374+
0.5,
375+
self.MAX_TIMEOUT_INTERVAL_SEC,
376+
)
377+
378+
# default flush interval is 30s.
379+
self.assertEqual(datetime.timedelta(seconds=0.5), self.event_processor.flush_interval)
380+
381+
def test_init__float_flush_deadline(self):
382+
event_dispatcher = TestEventDispatcher()
383+
384+
with mock.patch.object(self.optimizely, 'logger') as mock_config_logging:
385+
self.event_processor = BatchEventProcessor(
386+
event_dispatcher,
387+
mock_config_logging,
388+
True,
389+
self.event_queue,
390+
self.MAX_BATCH_SIZE,
391+
0.5,
392+
self.MAX_TIMEOUT_INTERVAL_SEC,
393+
)
394+
395+
# default flush interval is 30s.
396+
self.assertTrue(isinstance(self.event_processor.flushing_interval_deadline, float))
397+
342398
def test_init__bool_flush_interval(self):
343399
event_dispatcher = TestEventDispatcher()
344400

0 commit comments

Comments
 (0)