Skip to content

Commit af4856b

Browse files
Reduce memory footprint of state messages (#212)
Co-authored-by: Eric Boucher <[email protected]>
1 parent e0004e5 commit af4856b

File tree

2 files changed

+10
-4
lines changed

2 files changed

+10
-4
lines changed

target_postgres/stream_tracker.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,9 @@ def flush_streams(self, force=False):
4848

4949
self._emit_safe_queued_states(force=force)
5050

51-
def handle_state_message(self, line_data):
51+
def handle_state_message(self, line):
5252
if self.emit_states:
53-
self.state_queue.append({'state': line_data['value'], 'watermark': self.message_counter})
53+
self.state_queue.append({'state': line, 'watermark': self.message_counter})
5454
self._emit_safe_queued_states()
5555

5656
def handle_record_message(self, stream, line_data):
@@ -80,9 +80,14 @@ def _emit_safe_queued_states(self, force=False):
8080
valid_flush_watermarks.append(watermark)
8181
safe_flush_threshold = min(valid_flush_watermarks, default=0)
8282

83+
# the STATE message that the target forwards
8384
emittable_state = None
85+
emittable_state_str = None
8486
while len(self.state_queue) > 0 and (force or self.state_queue[0]['watermark'] <= safe_flush_threshold):
85-
emittable_state = self.state_queue.popleft()['state']
87+
emittable_state_str = self.state_queue.popleft()['state']
88+
89+
if emittable_state_str is not None:
90+
emittable_state = json.loads(emittable_state_str)['value']
8691

8792
if emittable_state:
8893
if len(statediff.diff(emittable_state, self.last_emitted_state or {})) > 0:

target_postgres/target_tools.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,8 @@ def _line_handler(state_tracker, target, invalid_records_detect, invalid_records
152152
state_tracker.flush_stream(line_data['stream'])
153153
target.activate_version(stream_buffer, line_data['version'])
154154
elif line_data['type'] == 'STATE':
155-
state_tracker.handle_state_message(line_data)
155+
# pass the string instead of the deserialized object to save memory in the deque
156+
state_tracker.handle_state_message(line)
156157
else:
157158
raise TargetError('Unknown message type {} in message {}'.format(
158159
line_data['type'],

0 commit comments

Comments
 (0)