Skip to content

Commit c182c93

Browse files
fix: Ensure hard-delete mode doesn't delete data it shouldn't when ACTIVATE_VERSION messages are processed (#391)
## What hard delete was previously doing wrong As discussed in #340, hard delete deleted data that it shouldn't. This was caused by a `<=` where there should have been a `<`. ## So why were the tests passing? All ACTIVATE_VERSION messages were run before any records were synced. This is because RECORD messages were being batched until the end of the sync, whereas ACTIVATE_VERSION messages were processed as they arrived. The intent of `test_activate_version_hard_delete` was to: 1. Sync seven records 2. Check that seven records were synced 3. Add two records "manually" 4. Check that there are nine records in total 5. Sync the same seven records, with ACTIVATE_VERSION removing the two manual records. 6. Check that seven records remain Here's what was really happening: 1. Sync seven records 2. Seven records were synced? ✅ 3. Add two records "manually" 4. There are nine records in total? ✅ 5. ACTIVATE_VERSION deletes all nine records and then syncs the same seven records again. 6. Seven records remain? ⚠️ Yes, but not for the correct reason ## My fix I don't know if I fully understand the ACTIVATE_VERSION specification, so please correct me, but here's my change. Drain the sink immediately before an ACTIVATE_VERSION message is processed for a sink. This means: 1. The target still batches records when possible. 2. ACTIVATE_VERSION messages are still processed as they arrive. 3. Importantly, ACTIVATE_VERSION messages correctly apply to all preceding records. Closes #340 --------- Co-authored-by: Edgar Ramírez Mondragón <[email protected]>
1 parent a72feb2 commit c182c93

File tree

4 files changed

+20
-10
lines changed

4 files changed

+20
-10
lines changed

target_postgres/sinks.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -324,6 +324,15 @@ def activate_version(self, new_version: int) -> None:
324324
)
325325
return
326326

327+
if self._pending_batch:
328+
self.logger.info(
329+
"An activate version message for '%s' was received. Draining...",
330+
self.stream_name,
331+
)
332+
draining_status = self.start_drain()
333+
self.process_batch(draining_status)
334+
self.mark_drained()
335+
327336
# There's nothing to do if the table doesn't exist yet
328337
# (which it won't the first time the stream is processed)
329338
if not self.connector.table_exists(self.full_table_name):
@@ -370,7 +379,7 @@ def activate_version(self, new_version: int) -> None:
370379
delete_stmt = sa.delete(target_table).where(
371380
sa.or_(
372381
target_table.c[self.version_column_name].is_(None),
373-
target_table.c[self.version_column_name] <= new_version,
382+
target_table.c[self.version_column_name] < new_version,
374383
)
375384
)
376385
connection.execute(delete_stmt)

target_postgres/tests/data_files/test_activate_version_hard.singer

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,5 @@
77
{"type": "RECORD", "stream": "test_activate_version_hard", "record": {"code": "NA", "name": "North America"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"}
88
{"type": "RECORD", "stream": "test_activate_version_hard", "record": {"code": "OC", "name": "Oceania"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"}
99
{"type": "RECORD", "stream": "test_activate_version_hard", "record": {"code": "SA", "name": "South America"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"}
10+
{"type": "RECORD", "stream": "test_activate_version_hard", "record": {"code": "XX", "name": "This record should be deleted"}, "version": 1674486431562, "time_extracted": "2023-01-23T15:07:11.563063Z"}
1011
{"type": "ACTIVATE_VERSION", "stream": "test_activate_version_hard", "version": 1674486431563}
Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
{"type": "SCHEMA", "stream": "test_activate_version_soft", "schema": {"type": "object", "properties": {"code": {"type": ["string"]}, "name": {"type": ["null", "string"]}}}, "key_properties": ["code"], "bookmark_properties": []}
22
{"type": "ACTIVATE_VERSION", "stream": "test_activate_version_soft", "version": 1674486431564}
3-
{"type": "RECORD", "stream": "test_activate_version_soft", "record": {"code": "AF", "name": "Africa"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"}
4-
{"type": "RECORD", "stream": "test_activate_version_soft", "record": {"code": "AN", "name": "Antarctica"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"}
5-
{"type": "RECORD", "stream": "test_activate_version_soft", "record": {"code": "AS", "name": "Asia"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"}
6-
{"type": "RECORD", "stream": "test_activate_version_soft", "record": {"code": "EU", "name": "Europe"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"}
7-
{"type": "RECORD", "stream": "test_activate_version_soft", "record": {"code": "NA", "name": "North America"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"}
8-
{"type": "RECORD", "stream": "test_activate_version_soft", "record": {"code": "OC", "name": "Oceania"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"}
3+
{"type": "RECORD", "stream": "test_activate_version_soft", "record": {"code": "AF", "name": "Africa"}, "version": 1674486431564, "time_extracted": "2023-01-23T15:07:11.563063Z"}
4+
{"type": "RECORD", "stream": "test_activate_version_soft", "record": {"code": "AN", "name": "Antarctica"}, "version": 1674486431564, "time_extracted": "2023-01-23T15:07:11.563063Z"}
5+
{"type": "RECORD", "stream": "test_activate_version_soft", "record": {"code": "AS", "name": "Asia"}, "version": 1674486431564, "time_extracted": "2023-01-23T15:07:11.563063Z"}
6+
{"type": "RECORD", "stream": "test_activate_version_soft", "record": {"code": "EU", "name": "Europe"}, "version": 1674486431564, "time_extracted": "2023-01-23T15:07:11.563063Z"}
7+
{"type": "RECORD", "stream": "test_activate_version_soft", "record": {"code": "NA", "name": "North America"}, "version": 1674486431564, "time_extracted": "2023-01-23T15:07:11.563063Z"}
8+
{"type": "RECORD", "stream": "test_activate_version_soft", "record": {"code": "OC", "name": "Oceania"}, "version": 1674486431564, "time_extracted": "2023-01-23T15:07:11.563063Z"}
99
{"type": "ACTIVATE_VERSION", "stream": "test_activate_version_soft", "version": 1674486431564}

target_postgres/tests/test_target_postgres.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -633,9 +633,9 @@ def test_activate_version_soft_delete(postgres_config_no_ssl):
633633
table_name = "test_activate_version_soft"
634634
file_name = f"{table_name}.singer"
635635
full_table_name = postgres_config_no_ssl["default_target_schema"] + "." + table_name
636-
postgres_config_hard_delete_true = copy.deepcopy(postgres_config_no_ssl)
637-
postgres_config_hard_delete_true["hard_delete"] = False
638-
pg_soft_delete = TargetPostgres(config=postgres_config_hard_delete_true)
636+
postgres_config_hard_delete_false = copy.deepcopy(postgres_config_no_ssl)
637+
postgres_config_hard_delete_false["hard_delete"] = False
638+
pg_soft_delete = TargetPostgres(config=postgres_config_hard_delete_false)
639639
engine = create_engine(pg_soft_delete)
640640
singer_file_to_target(file_name, pg_soft_delete)
641641
with engine.connect() as connection:

0 commit comments

Comments
 (0)