From e25b99db55ed5f79f6b806501666d4daf16ffe75 Mon Sep 17 00:00:00 2001 From: Derek Visch Date: Mon, 23 Jan 2023 11:01:17 -0500 Subject: [PATCH 1/8] Activat Version example test --- .../tests/data_files/activate_version.singer | 10 ++++++++++ target_postgres/tests/test_standard_target.py | 6 ++++++ 2 files changed, 16 insertions(+) create mode 100644 target_postgres/tests/data_files/activate_version.singer diff --git a/target_postgres/tests/data_files/activate_version.singer b/target_postgres/tests/data_files/activate_version.singer new file mode 100644 index 00000000..6eb1511d --- /dev/null +++ b/target_postgres/tests/data_files/activate_version.singer @@ -0,0 +1,10 @@ +{"type": "SCHEMA", "stream": "test_activate_version", "schema": {"type": "object", "properties": {"code": {"type": ["string"]}, "name": {"type": ["null", "string"]}}}, "key_properties": ["code"], "bookmark_properties": []} +{"type": "ACTIVATE_VERSION", "stream": "test_activate_version", "version": 1674486431563} +{"type": "RECORD", "stream": "test_activate_version", "record": {"code": "AF", "name": "Africa"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "RECORD", "stream": "test_activate_version", "record": {"code": "AN", "name": "Antarctica"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "RECORD", "stream": "test_activate_version", "record": {"code": "AS", "name": "Asia"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "RECORD", "stream": "test_activate_version", "record": {"code": "EU", "name": "Europe"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "RECORD", "stream": "test_activate_version", "record": {"code": "NA", "name": "North America"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "RECORD", "stream": "test_activate_version", "record": {"code": "OC", "name": "Oceania"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "RECORD", "stream": "test_activate_version", "record": {"code": "SA", "name": "South America"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "ACTIVATE_VERSION", "stream": "test_activate_version", "version": 1674486431563} diff --git a/target_postgres/tests/test_standard_target.py b/target_postgres/tests/test_standard_target.py index a2a7dc91..3bb7ceb6 100644 --- a/target_postgres/tests/test_standard_target.py +++ b/target_postgres/tests/test_standard_target.py @@ -232,3 +232,9 @@ def test_new_array_column(postgres_target): """Create a new Array column with an existing table""" file_name = "new_array_column.singer" singer_file_to_target(file_name, postgres_target) + + +def test_activate_version(postgres_target): + """Activate Version Test""" + file_name = "activate_version.singer" + singer_file_to_target(file_name, postgres_target) From 3280b0105d55ff265284f49d4e8aba5862126bbe Mon Sep 17 00:00:00 2001 From: Derek Visch Date: Mon, 23 Jan 2023 16:52:57 -0500 Subject: [PATCH 2/8] Activate version working --- README.md | 30 +++---- target_postgres/sinks.py | 62 ++++++++++++++ target_postgres/target.py | 33 +++++++- .../tests/data_files/activate_version.singer | 10 --- .../data_files/activate_version_hard.singer | 10 +++ .../data_files/activate_version_soft.singer | 10 +++ target_postgres/tests/test_standard_target.py | 80 ++++++++++++++++++- 7 files changed, 206 insertions(+), 29 deletions(-) delete mode 100644 target_postgres/tests/data_files/activate_version.singer create mode 100644 target_postgres/tests/data_files/activate_version_hard.singer create mode 100644 target_postgres/tests/data_files/activate_version_soft.singer diff --git a/README.md b/README.md index 7efaa224..3762a35a 100644 --- a/README.md +++ b/README.md @@ -14,20 +14,22 @@ Built with the [Meltano SDK](https://sdk.meltano.com) for Singer Taps and Target * `schema-flattening` ## Settings - -| Setting | Required | Default | Description | -| :------------------- | :------: | :-----------------: | :------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | -| host | False | None | Hostname for postgres instance. Note if sqlalchemy_url is set this will be ignored. | -| port | False | 5432 | The port on which postgres is awaiting connection. Note if sqlalchemy_url is set this will be ignored. | -| user | False | None | User name used to authenticate. Note if sqlalchemy_url is set this will be ignored. | -| password | False | None | Password used to authenticate. Note if sqlalchemy_url is set this will be ignored. | -| database | False | None | Database name. Note if sqlalchemy_url is set this will be ignored. | -| sqlalchemy_url | False | None | SQLAlchemy connection string. This will override using host, user, password, port, dialect. Note that you must esacpe password special characters properly see https://docs.sqlalchemy.org/en/20/core/engines.html#escaping-special-characters-such-as-signs-in-passwords | -| dialect+driver | False | postgresql+psycopg2 | Dialect+driver see https://docs.sqlalchemy.org/en/20/core/engines.html. Generally just leave this alone. Note if sqlalchemy_url is set this will be ignored. | -| stream_maps | False | None | Config object for stream maps capability. For more information check out [Stream Maps](https://sdk.meltano.com/en/latest/stream_maps.html). | -| stream_map_config | False | None | User-defined config values to be used within map expressions. | -| flattening_enabled | False | None | 'True' to enable schema flattening and automatically expand nested properties. | -| flattening_max_depth | False | None | The max depth to flatten schemas. | +| Setting | Required | Default | Description | +| :-------------------- | :------: | :-----------------: | :--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| host | False | None | Hostname for postgres instance. Note if sqlalchemy_url is set this will be ignored. | +| port | False | 5432 | The port on which postgres is awaiting connection. Note if sqlalchemy_url is set this will be ignored. | +| user | False | None | User name used to authenticate. Note if sqlalchemy_url is set this will be ignored. | +| password | False | None | Password used to authenticate. Note if sqlalchemy_url is set this will be ignored. | +| database | False | None | Database name. Note if sqlalchemy_url is set this will be ignored. | +| sqlalchemy_url | False | None | SQLAlchemy connection string. This will override using host, user, password, port,dialect. Note that you must esacpe password specialcharacters properly seehttps://docs.sqlalchemy.org/en/20/core/engines.html#escaping-special-characters-such-as-signs-in-passwords | +| dialect+driver | False | postgresql+psycopg2 | Dialect+driver see https://docs.sqlalchemy.org/en/20/core/engines.html. Generally just leave this alone. Note if sqlalchemy_url is set this will be ignored. | +| default_target_schema | False | None | Postgres schema to send data to, example: tap-clickup | +| hard_delete | False | 0 | When activate version is sent from a tap this specefies if we should delete the records that don't match, or mark them with a date in the `_sdc_deleted_at` column. | +| add_record_metadata | False | 1 | Note that this must be enabled for activate_version to work!This adds _sdc_extracted_at, _sdc_batched_at, and more to every table. See https://sdk.meltano.com/en/latest/implementation/record_metadata.html for more information. | +| stream_maps | False | None | Config object for stream maps capability. For more information check out [Stream Maps](https://sdk.meltano.com/en/latest/stream_maps.html). | +| stream_map_config | False | None | User-defined config values to be used within map expressions. | +| flattening_enabled | False | None | 'True' to enable schema flattening and automatically expand nested properties. | +| flattening_max_depth | False | None | The max depth to flatten schemas. | A full list of supported settings and capabilities is available by running: `target-postgres --about` diff --git a/target_postgres/sinks.py b/target_postgres/sinks.py index 90866d16..50815a5f 100644 --- a/target_postgres/sinks.py +++ b/target_postgres/sinks.py @@ -2,9 +2,12 @@ import uuid from typing import Any, Dict, Iterable, List, Optional, Union +import sqlalchemy +from pendulum import now from singer_sdk.sinks import SQLSink from sqlalchemy import Column, MetaData, Table, insert from sqlalchemy.sql import Executable +from sqlalchemy.sql.expression import bindparam from target_postgres.connector import PostgresConnector @@ -252,3 +255,62 @@ def schema_name(self) -> Optional[str]: # Schema name not detected. return None + + def activate_version(self, new_version: int) -> None: + """Bump the active version of the target table. + + Args: + new_version: The version number to activate. + """ + # There's nothing to do if the table doesn't exist yet + # (which it won't the first time the stream is processed) + if not self.connector.table_exists(self.full_table_name): + return + + deleted_at = now() + datetime_type = self.connector.to_sql_type( + {"type": "string", "format": "date-time"} + ) # Different from SingerSDK as we need to handle types the same as SCHEMA messsages + integer_type = self.connector.to_sql_type( + {"type": "integer"} + ) # Different from SingerSDK as we need to handle types the same as SCHEMA messsages + + if not self.connector.column_exists( + full_table_name=self.full_table_name, + column_name=self.version_column_name, + ): + self.connector.prepare_column( + self.full_table_name, + self.version_column_name, + sql_type=integer_type, # Different from SingerSDK as we need to handle types the same as SCHEMA messsages + ) + + self.logger.info("Hard delete: %s", self.config.get("hard_delete")) + if self.config["hard_delete"] is True: + self.connection.execute( + f"DELETE FROM {self.full_table_name} " + f"WHERE {self.version_column_name} <= {new_version} OR {self.version_column_name} IS NULL" + ) + return + + if not self.connector.column_exists( + full_table_name=self.full_table_name, + column_name=self.soft_delete_column_name, + ): + self.connector.prepare_column( + self.full_table_name, + self.soft_delete_column_name, + sql_type=datetime_type, # Different from SingerSDK as we need to handle types the same as SCHEMA messsages + ) + + query = sqlalchemy.text( + f"UPDATE {self.full_table_name}\n" + f"SET {self.soft_delete_column_name} = :deletedate \n" + f"WHERE {self.version_column_name} < :version OR {self.version_column_name} IS NULL \n" # Need to deal with the case where data doesn't exist for the version column + f" AND {self.soft_delete_column_name} IS NULL\n" + ) + query = query.bindparams( + bindparam("deletedate", value=deleted_at, type_=datetime_type), + bindparam("version", value=new_version, type_=integer_type), + ) + self.connector.connection.execute(query) diff --git a/target_postgres/target.py b/target_postgres/target.py index d7ef2624..36994376 100644 --- a/target_postgres/target.py +++ b/target_postgres/target.py @@ -71,7 +71,7 @@ def __init__( th.StringType, description=( "User name used to authenticate. " - + "Note if sqlalchemy_url is set this will be ignored.", + + "Note if sqlalchemy_url is set this will be ignored." ), ), th.Property( @@ -117,6 +117,37 @@ def __init__( th.StringType, description="Postgres schema to send data to, example: tap-clickup", ), + th.Property( + "hard_delete", + th.StringType, + default=False, + description=( + "When activate version is sent from a tap this specefies " + + "if we should delete the records that don't match, or mark " + + "them with a date in the `_sdc_deleted_at` column." + ), + ), + th.Property( + "hard_delete", + th.StringType, + default=False, + description=( + "When activate version is sent from a tap this specefies " + + "if we should delete the records that don't match, or mark " + + "them with a date in the `_sdc_deleted_at` column." + ), + ), + th.Property( + "add_record_metadata", + th.StringType, + default=True, + description=( + "Note that this must be enabled for activate_version to work!" + + "This adds _sdc_extracted_at, _sdc_batched_at, and more to every table. " + + "See https://sdk.meltano.com/en/latest/implementation/record_metadata.html " + + "for more information." + ), + ), ).to_dict() default_sink_class = PostgresSink diff --git a/target_postgres/tests/data_files/activate_version.singer b/target_postgres/tests/data_files/activate_version.singer deleted file mode 100644 index 6eb1511d..00000000 --- a/target_postgres/tests/data_files/activate_version.singer +++ /dev/null @@ -1,10 +0,0 @@ -{"type": "SCHEMA", "stream": "test_activate_version", "schema": {"type": "object", "properties": {"code": {"type": ["string"]}, "name": {"type": ["null", "string"]}}}, "key_properties": ["code"], "bookmark_properties": []} -{"type": "ACTIVATE_VERSION", "stream": "test_activate_version", "version": 1674486431563} -{"type": "RECORD", "stream": "test_activate_version", "record": {"code": "AF", "name": "Africa"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} -{"type": "RECORD", "stream": "test_activate_version", "record": {"code": "AN", "name": "Antarctica"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} -{"type": "RECORD", "stream": "test_activate_version", "record": {"code": "AS", "name": "Asia"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} -{"type": "RECORD", "stream": "test_activate_version", "record": {"code": "EU", "name": "Europe"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} -{"type": "RECORD", "stream": "test_activate_version", "record": {"code": "NA", "name": "North America"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} -{"type": "RECORD", "stream": "test_activate_version", "record": {"code": "OC", "name": "Oceania"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} -{"type": "RECORD", "stream": "test_activate_version", "record": {"code": "SA", "name": "South America"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} -{"type": "ACTIVATE_VERSION", "stream": "test_activate_version", "version": 1674486431563} diff --git a/target_postgres/tests/data_files/activate_version_hard.singer b/target_postgres/tests/data_files/activate_version_hard.singer new file mode 100644 index 00000000..1d05c3f8 --- /dev/null +++ b/target_postgres/tests/data_files/activate_version_hard.singer @@ -0,0 +1,10 @@ +{"type": "SCHEMA", "stream": "test_activate_version_hard", "schema": {"type": "object", "properties": {"code": {"type": ["string"]}, "name": {"type": ["null", "string"]}}}, "key_properties": ["code"], "bookmark_properties": []} +{"type": "ACTIVATE_VERSION", "stream": "test_activate_version_hard", "version": 1674486431563} +{"type": "RECORD", "stream": "test_activate_version_hard", "record": {"code": "AF", "name": "Africa"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "RECORD", "stream": "test_activate_version_hard", "record": {"code": "AN", "name": "Antarctica"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "RECORD", "stream": "test_activate_version_hard", "record": {"code": "AS", "name": "Asia"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "RECORD", "stream": "test_activate_version_hard", "record": {"code": "EU", "name": "Europe"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "RECORD", "stream": "test_activate_version_hard", "record": {"code": "NA", "name": "North America"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "RECORD", "stream": "test_activate_version_hard", "record": {"code": "OC", "name": "Oceania"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "RECORD", "stream": "test_activate_version_hard", "record": {"code": "SA", "name": "South America"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "ACTIVATE_VERSION", "stream": "test_activate_version_hard", "version": 1674486431563} diff --git a/target_postgres/tests/data_files/activate_version_soft.singer b/target_postgres/tests/data_files/activate_version_soft.singer new file mode 100644 index 00000000..991434c5 --- /dev/null +++ b/target_postgres/tests/data_files/activate_version_soft.singer @@ -0,0 +1,10 @@ +{"type": "SCHEMA", "stream": "test_activate_version_soft", "schema": {"type": "object", "properties": {"code": {"type": ["string"]}, "name": {"type": ["null", "string"]}}}, "key_properties": ["code"], "bookmark_properties": []} +{"type": "ACTIVATE_VERSION", "stream": "test_activate_version_soft", "version": 1674486431563} +{"type": "RECORD", "stream": "test_activate_version_soft", "record": {"code": "AF", "name": "Africa"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "RECORD", "stream": "test_activate_version_soft", "record": {"code": "AN", "name": "Antarctica"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "RECORD", "stream": "test_activate_version_soft", "record": {"code": "AS", "name": "Asia"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "RECORD", "stream": "test_activate_version_soft", "record": {"code": "EU", "name": "Europe"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "RECORD", "stream": "test_activate_version_soft", "record": {"code": "NA", "name": "North America"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "RECORD", "stream": "test_activate_version_soft", "record": {"code": "OC", "name": "Oceania"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "RECORD", "stream": "test_activate_version_soft", "record": {"code": "SA", "name": "South America"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "ACTIVATE_VERSION", "stream": "test_activate_version_soft", "version": 1674486431563} diff --git a/target_postgres/tests/test_standard_target.py b/target_postgres/tests/test_standard_target.py index 3bb7ceb6..fc2cb632 100644 --- a/target_postgres/tests/test_standard_target.py +++ b/target_postgres/tests/test_standard_target.py @@ -1,5 +1,6 @@ """ Attempt at making some standard Target Tests. """ # flake8: noqa +import copy import io import uuid from contextlib import redirect_stdout @@ -7,7 +8,9 @@ import jsonschema import pytest +import sqlalchemy from singer_sdk.testing import sync_end_to_end +from sqlalchemy import create_engine, engine_from_config from target_postgres.target import TargetPostgres from target_postgres.tests.samples.aapl.aapl import Fundamentals @@ -24,6 +27,9 @@ def postgres_config(): "user": "postgres", "password": "postgres", "database": "postgres", + "port": 5432, + "add_record_metadata": True, + "hard_delete": False, } @@ -32,6 +38,12 @@ def postgres_target(postgres_config) -> TargetPostgres: return TargetPostgres(config=postgres_config) +def sqlalchemy_engine(config) -> sqlalchemy.engine.Engine: + return create_engine( + f"{config['dialect+driver']}://{config['user']}:{config['password']}@{config['host']}:{config['port']}/{config['database']}" + ) + + def singer_file_to_target(file_name, target) -> None: """Singer file to Target, emulates a tap run @@ -234,7 +246,67 @@ def test_new_array_column(postgres_target): singer_file_to_target(file_name, postgres_target) -def test_activate_version(postgres_target): - """Activate Version Test""" - file_name = "activate_version.singer" - singer_file_to_target(file_name, postgres_target) +def test_activate_version_hard_delete(postgres_config): + """Activate Version Hard Delete Test""" + file_name = "activate_version_hard.singer" + postgres_config_hard_delete_true = copy.deepcopy(postgres_config) + postgres_config_hard_delete_true["hard_delete"] = True + pg_hard_delete_true = TargetPostgres(config=postgres_config_hard_delete_true) + singer_file_to_target(file_name, pg_hard_delete_true) + engine = sqlalchemy_engine(postgres_config) + with engine.connect() as connection: + result = connection.execute("SELECT * FROM test_activate_version_hard") + assert result.rowcount == 7 + # Add a record like someone would if they weren't using the tap target combo + result = connection.execute( + "INSERT INTO test_activate_version_hard(code, \"name\") VALUES('Manual1', 'Meltano')" + ) + result = connection.execute( + "INSERT INTO test_activate_version_hard(code, \"name\") VALUES('Manual2', 'Meltano')" + ) + result = connection.execute("SELECT * FROM test_activate_version_hard") + assert result.rowcount == 9 + + singer_file_to_target(file_name, pg_hard_delete_true) + + # Should remove the 2 records we added manually + with engine.connect() as connection: + result = connection.execute("SELECT * FROM test_activate_version_hard") + assert result.rowcount == 7 + + +def test_activate_version_soft_delete(postgres_config): + """Activate Version Soft Delete Test""" + file_name = "activate_version_soft.singer" + engine = sqlalchemy_engine(postgres_config) + with engine.connect() as connection: + result = connection.execute("DROP TABLE IF EXISTS test_activate_version_soft") + postgres_config_soft_delete = copy.deepcopy(postgres_config) + postgres_config_soft_delete["hard_delete"] = False + pg_soft_delete = TargetPostgres(config=postgres_config_soft_delete) + singer_file_to_target(file_name, pg_soft_delete) + + with engine.connect() as connection: + result = connection.execute("SELECT * FROM test_activate_version_soft") + assert result.rowcount == 7 + # Add a record like someone would if they weren't using the tap target combo + result = connection.execute( + "INSERT INTO test_activate_version_soft(code, \"name\") VALUES('Manual1', 'Meltano')" + ) + result = connection.execute( + "INSERT INTO test_activate_version_soft(code, \"name\") VALUES('Manual2', 'Meltano')" + ) + result = connection.execute("SELECT * FROM test_activate_version_soft") + assert result.rowcount == 9 + + singer_file_to_target(file_name, pg_soft_delete) + + # Should have all records including the 2 we added manually + with engine.connect() as connection: + result = connection.execute("SELECT * FROM test_activate_version_soft") + assert result.rowcount == 9 + + result = connection.execute( + "SELECT * FROM test_activate_version_soft where _sdc_deleted_at is NOT NULL" + ) + assert result.rowcount == 2 From 43dc67558fcbfdc72d8bf5a43b768f93f8eac252 Mon Sep 17 00:00:00 2001 From: Derek Visch Date: Mon, 23 Jan 2023 17:18:18 -0500 Subject: [PATCH 3/8] Lint and Schema fix for config --- target_postgres/sinks.py | 19 +++++++++++-------- target_postgres/target.py | 14 ++------------ 2 files changed, 13 insertions(+), 20 deletions(-) diff --git a/target_postgres/sinks.py b/target_postgres/sinks.py index 50815a5f..6642067d 100644 --- a/target_postgres/sinks.py +++ b/target_postgres/sinks.py @@ -268,12 +268,15 @@ def activate_version(self, new_version: int) -> None: return deleted_at = now() + # Different from SingerSDK as we need to handle types the + # same as SCHEMA messsages datetime_type = self.connector.to_sql_type( {"type": "string", "format": "date-time"} - ) # Different from SingerSDK as we need to handle types the same as SCHEMA messsages - integer_type = self.connector.to_sql_type( - {"type": "integer"} - ) # Different from SingerSDK as we need to handle types the same as SCHEMA messsages + ) + + # Different from SingerSDK as we need to handle types the + # same as SCHEMA messsages + integer_type = self.connector.to_sql_type({"type": "integer"}) if not self.connector.column_exists( full_table_name=self.full_table_name, @@ -282,7 +285,7 @@ def activate_version(self, new_version: int) -> None: self.connector.prepare_column( self.full_table_name, self.version_column_name, - sql_type=integer_type, # Different from SingerSDK as we need to handle types the same as SCHEMA messsages + sql_type=integer_type, ) self.logger.info("Hard delete: %s", self.config.get("hard_delete")) @@ -300,13 +303,13 @@ def activate_version(self, new_version: int) -> None: self.connector.prepare_column( self.full_table_name, self.soft_delete_column_name, - sql_type=datetime_type, # Different from SingerSDK as we need to handle types the same as SCHEMA messsages + sql_type=datetime_type, ) - + # Need to deal with the case where data doesn't exist for the version column query = sqlalchemy.text( f"UPDATE {self.full_table_name}\n" f"SET {self.soft_delete_column_name} = :deletedate \n" - f"WHERE {self.version_column_name} < :version OR {self.version_column_name} IS NULL \n" # Need to deal with the case where data doesn't exist for the version column + f"WHERE {self.version_column_name} < :version OR {self.version_column_name} IS NULL \n" f" AND {self.soft_delete_column_name} IS NULL\n" ) query = query.bindparams( diff --git a/target_postgres/target.py b/target_postgres/target.py index 36994376..abe69f5a 100644 --- a/target_postgres/target.py +++ b/target_postgres/target.py @@ -119,17 +119,7 @@ def __init__( ), th.Property( "hard_delete", - th.StringType, - default=False, - description=( - "When activate version is sent from a tap this specefies " - + "if we should delete the records that don't match, or mark " - + "them with a date in the `_sdc_deleted_at` column." - ), - ), - th.Property( - "hard_delete", - th.StringType, + th.BooleanType, default=False, description=( "When activate version is sent from a tap this specefies " @@ -139,7 +129,7 @@ def __init__( ), th.Property( "add_record_metadata", - th.StringType, + th.BooleanType, default=True, description=( "Note that this must be enabled for activate_version to work!" From c4336820679f79e87e748d883c9c4162cfa614f4 Mon Sep 17 00:00:00 2001 From: Derek Visch Date: Tue, 24 Jan 2023 10:53:34 -0500 Subject: [PATCH 4/8] Activate Version test to show data being deleted --- ...ion_doesnt_delete_before_populating.singer | 10 ++++++++ ...n_doesnt_delete_before_populating_2.singer | 2 ++ target_postgres/tests/test_standard_target.py | 25 +++++++++++++++++++ 3 files changed, 37 insertions(+) create mode 100644 target_postgres/tests/data_files/test_activate_version_doesnt_delete_before_populating.singer create mode 100644 target_postgres/tests/data_files/test_activate_version_doesnt_delete_before_populating_2.singer diff --git a/target_postgres/tests/data_files/test_activate_version_doesnt_delete_before_populating.singer b/target_postgres/tests/data_files/test_activate_version_doesnt_delete_before_populating.singer new file mode 100644 index 00000000..30a45c09 --- /dev/null +++ b/target_postgres/tests/data_files/test_activate_version_doesnt_delete_before_populating.singer @@ -0,0 +1,10 @@ +{"type": "SCHEMA", "stream": "test_activate_version_doesnt_delete_before_populating", "schema": {"type": "object", "properties": {"code": {"type": ["string"]}, "name": {"type": ["null", "string"]}}}, "key_properties": ["code"], "bookmark_properties": []} +{"type": "ACTIVATE_VERSION", "stream": "test_activate_version_doesnt_delete_before_populating", "version": 1674486431563} +{"type": "RECORD", "stream": "test_activate_version_doesnt_delete_before_populating", "record": {"code": "AF", "name": "Africa"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "RECORD", "stream": "test_activate_version_doesnt_delete_before_populating", "record": {"code": "AN", "name": "Antarctica"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "RECORD", "stream": "test_activate_version_doesnt_delete_before_populating", "record": {"code": "AS", "name": "Asia"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "RECORD", "stream": "test_activate_version_doesnt_delete_before_populating", "record": {"code": "EU", "name": "Europe"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "RECORD", "stream": "test_activate_version_doesnt_delete_before_populating", "record": {"code": "NA", "name": "North America"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "RECORD", "stream": "test_activate_version_doesnt_delete_before_populating", "record": {"code": "OC", "name": "Oceania"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "RECORD", "stream": "test_activate_version_doesnt_delete_before_populating", "record": {"code": "SA", "name": "South America"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "ACTIVATE_VERSION", "stream": "test_activate_version_doesnt_delete_before_populating", "version": 1674486431563} diff --git a/target_postgres/tests/data_files/test_activate_version_doesnt_delete_before_populating_2.singer b/target_postgres/tests/data_files/test_activate_version_doesnt_delete_before_populating_2.singer new file mode 100644 index 00000000..ae6faf12 --- /dev/null +++ b/target_postgres/tests/data_files/test_activate_version_doesnt_delete_before_populating_2.singer @@ -0,0 +1,2 @@ +{"type": "SCHEMA", "stream": "test_activate_version_doesnt_delete_before_populating", "schema": {"type": "object", "properties": {"code": {"type": ["string"]}, "name": {"type": ["null", "string"]}}}, "key_properties": ["code"], "bookmark_properties": []} +{"type": "ACTIVATE_VERSION", "stream": "test_activate_version_doesnt_delete_before_populating", "version": 1674486431563} diff --git a/target_postgres/tests/test_standard_target.py b/target_postgres/tests/test_standard_target.py index fc2cb632..31d8bd3a 100644 --- a/target_postgres/tests/test_standard_target.py +++ b/target_postgres/tests/test_standard_target.py @@ -310,3 +310,28 @@ def test_activate_version_soft_delete(postgres_config): "SELECT * FROM test_activate_version_soft where _sdc_deleted_at is NOT NULL" ) assert result.rowcount == 2 + + +def test_activate_version_doesnt_delete_before_populating(postgres_config): + """Activate Version should never delete records that are valid""" + table_name = "test_activate_version_doesnt_delete_before_populating" + file_name = f"{table_name}.singer" + engine = sqlalchemy_engine(postgres_config) + with engine.connect() as connection: + result = connection.execute(f"DROP TABLE IF EXISTS {table_name}") + + postgres_config_soft_delete = copy.deepcopy(postgres_config) + postgres_config_soft_delete["hard_delete"] = True + pg_hard_delete = TargetPostgres(config=postgres_config_soft_delete) + singer_file_to_target(file_name, pg_hard_delete) + # Will populate us with 7 records + with engine.connect() as connection: + result = connection.execute(f"SELECT * FROM {table_name}") + assert result.rowcount == 7 + + # Only has a schema and one activate record message + file_name = "test_activate_version_doesnt_delete_before_populating_2.singer" + singer_file_to_target(file_name, pg_hard_delete) + with engine.connect() as connection: + result = connection.execute(f"SELECT * FROM {table_name}") + assert result.rowcount == 7 From b3670813536a878e24c65bdb07feb2e8c96be4d2 Mon Sep 17 00:00:00 2001 From: Derek Visch Date: Tue, 24 Jan 2023 11:03:20 -0500 Subject: [PATCH 5/8] Lint Fix --- target_postgres/sinks.py | 6 ++++-- target_postgres/target.py | 4 ++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/target_postgres/sinks.py b/target_postgres/sinks.py index 6642067d..3d8dd5b2 100644 --- a/target_postgres/sinks.py +++ b/target_postgres/sinks.py @@ -292,7 +292,8 @@ def activate_version(self, new_version: int) -> None: if self.config["hard_delete"] is True: self.connection.execute( f"DELETE FROM {self.full_table_name} " - f"WHERE {self.version_column_name} <= {new_version} OR {self.version_column_name} IS NULL" + f"WHERE {self.version_column_name} <= {new_version} " + f"OR {self.version_column_name} IS NULL" ) return @@ -309,7 +310,8 @@ def activate_version(self, new_version: int) -> None: query = sqlalchemy.text( f"UPDATE {self.full_table_name}\n" f"SET {self.soft_delete_column_name} = :deletedate \n" - f"WHERE {self.version_column_name} < :version OR {self.version_column_name} IS NULL \n" + f"WHERE {self.version_column_name} < :version " + f"OR {self.version_column_name} IS NULL \n" f" AND {self.soft_delete_column_name} IS NULL\n" ) query = query.bindparams( diff --git a/target_postgres/target.py b/target_postgres/target.py index abe69f5a..1f622016 100644 --- a/target_postgres/target.py +++ b/target_postgres/target.py @@ -133,8 +133,8 @@ def __init__( default=True, description=( "Note that this must be enabled for activate_version to work!" - + "This adds _sdc_extracted_at, _sdc_batched_at, and more to every table. " - + "See https://sdk.meltano.com/en/latest/implementation/record_metadata.html " + + "This adds _sdc_extracted_at, _sdc_batched_at, and more to every " + + " table. See https://sdk.meltano.com/en/latest/implementation/record_metadata.html " # noqa: E501 + "for more information." ), ), From 2ba215be09ecd9d30df9fe3856bd0ab4d7580300 Mon Sep 17 00:00:00 2001 From: Derek Visch Date: Tue, 24 Jan 2023 16:03:45 -0500 Subject: [PATCH 6/8] Activate version is doing things correctly, existing implementations of ACTIVATE_VERSION sometimes send an ACTIVATE_MESSAGE before records are sent which I think is just a bad tap implemention but it is valid --- ...ivate_version_deletes_data_properly.singer | 10 ++++++++++ ...ate_version_deletes_data_properly_2.singer | 2 ++ ...ion_doesnt_delete_before_populating.singer | 10 ---------- ...n_doesnt_delete_before_populating_2.singer | 2 -- target_postgres/tests/test_standard_target.py | 20 ++++++++++++------- 5 files changed, 25 insertions(+), 19 deletions(-) create mode 100644 target_postgres/tests/data_files/test_activate_version_deletes_data_properly.singer create mode 100644 target_postgres/tests/data_files/test_activate_version_deletes_data_properly_2.singer delete mode 100644 target_postgres/tests/data_files/test_activate_version_doesnt_delete_before_populating.singer delete mode 100644 target_postgres/tests/data_files/test_activate_version_doesnt_delete_before_populating_2.singer diff --git a/target_postgres/tests/data_files/test_activate_version_deletes_data_properly.singer b/target_postgres/tests/data_files/test_activate_version_deletes_data_properly.singer new file mode 100644 index 00000000..d49c733e --- /dev/null +++ b/target_postgres/tests/data_files/test_activate_version_deletes_data_properly.singer @@ -0,0 +1,10 @@ +{"type": "SCHEMA", "stream": "test_activate_version_deletes_data_properly", "schema": {"type": "object", "properties": {"code": {"type": ["string"]}, "name": {"type": ["null", "string"]}}}, "key_properties": ["code"], "bookmark_properties": []} +{"type": "ACTIVATE_VERSION", "stream": "test_activate_version_deletes_data_properly", "version": 1674486431563} +{"type": "RECORD", "stream": "test_activate_version_deletes_data_properly", "record": {"code": "AF", "name": "Africa"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "RECORD", "stream": "test_activate_version_deletes_data_properly", "record": {"code": "AN", "name": "Antarctica"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "RECORD", "stream": "test_activate_version_deletes_data_properly", "record": {"code": "AS", "name": "Asia"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "RECORD", "stream": "test_activate_version_deletes_data_properly", "record": {"code": "EU", "name": "Europe"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "RECORD", "stream": "test_activate_version_deletes_data_properly", "record": {"code": "NA", "name": "North America"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "RECORD", "stream": "test_activate_version_deletes_data_properly", "record": {"code": "OC", "name": "Oceania"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "RECORD", "stream": "test_activate_version_deletes_data_properly", "record": {"code": "SA", "name": "South America"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} +{"type": "ACTIVATE_VERSION", "stream": "test_activate_version_deletes_data_properly", "version": 1674486431563} diff --git a/target_postgres/tests/data_files/test_activate_version_deletes_data_properly_2.singer b/target_postgres/tests/data_files/test_activate_version_deletes_data_properly_2.singer new file mode 100644 index 00000000..05e5f28b --- /dev/null +++ b/target_postgres/tests/data_files/test_activate_version_deletes_data_properly_2.singer @@ -0,0 +1,2 @@ +{"type": "SCHEMA", "stream": "test_activate_version_deletes_data_properly", "schema": {"type": "object", "properties": {"code": {"type": ["string"]}, "name": {"type": ["null", "string"]}}}, "key_properties": ["code"], "bookmark_properties": []} +{"type": "ACTIVATE_VERSION", "stream": "test_activate_version_deletes_data_properly", "version": 1674486431564} diff --git a/target_postgres/tests/data_files/test_activate_version_doesnt_delete_before_populating.singer b/target_postgres/tests/data_files/test_activate_version_doesnt_delete_before_populating.singer deleted file mode 100644 index 30a45c09..00000000 --- a/target_postgres/tests/data_files/test_activate_version_doesnt_delete_before_populating.singer +++ /dev/null @@ -1,10 +0,0 @@ -{"type": "SCHEMA", "stream": "test_activate_version_doesnt_delete_before_populating", "schema": {"type": "object", "properties": {"code": {"type": ["string"]}, "name": {"type": ["null", "string"]}}}, "key_properties": ["code"], "bookmark_properties": []} -{"type": "ACTIVATE_VERSION", "stream": "test_activate_version_doesnt_delete_before_populating", "version": 1674486431563} -{"type": "RECORD", "stream": "test_activate_version_doesnt_delete_before_populating", "record": {"code": "AF", "name": "Africa"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} -{"type": "RECORD", "stream": "test_activate_version_doesnt_delete_before_populating", "record": {"code": "AN", "name": "Antarctica"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} -{"type": "RECORD", "stream": "test_activate_version_doesnt_delete_before_populating", "record": {"code": "AS", "name": "Asia"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} -{"type": "RECORD", "stream": "test_activate_version_doesnt_delete_before_populating", "record": {"code": "EU", "name": "Europe"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} -{"type": "RECORD", "stream": "test_activate_version_doesnt_delete_before_populating", "record": {"code": "NA", "name": "North America"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} -{"type": "RECORD", "stream": "test_activate_version_doesnt_delete_before_populating", "record": {"code": "OC", "name": "Oceania"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} -{"type": "RECORD", "stream": "test_activate_version_doesnt_delete_before_populating", "record": {"code": "SA", "name": "South America"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"} -{"type": "ACTIVATE_VERSION", "stream": "test_activate_version_doesnt_delete_before_populating", "version": 1674486431563} diff --git a/target_postgres/tests/data_files/test_activate_version_doesnt_delete_before_populating_2.singer b/target_postgres/tests/data_files/test_activate_version_doesnt_delete_before_populating_2.singer deleted file mode 100644 index ae6faf12..00000000 --- a/target_postgres/tests/data_files/test_activate_version_doesnt_delete_before_populating_2.singer +++ /dev/null @@ -1,2 +0,0 @@ -{"type": "SCHEMA", "stream": "test_activate_version_doesnt_delete_before_populating", "schema": {"type": "object", "properties": {"code": {"type": ["string"]}, "name": {"type": ["null", "string"]}}}, "key_properties": ["code"], "bookmark_properties": []} -{"type": "ACTIVATE_VERSION", "stream": "test_activate_version_doesnt_delete_before_populating", "version": 1674486431563} diff --git a/target_postgres/tests/test_standard_target.py b/target_postgres/tests/test_standard_target.py index 31d8bd3a..4a67eb6f 100644 --- a/target_postgres/tests/test_standard_target.py +++ b/target_postgres/tests/test_standard_target.py @@ -312,9 +312,9 @@ def test_activate_version_soft_delete(postgres_config): assert result.rowcount == 2 -def test_activate_version_doesnt_delete_before_populating(postgres_config): - """Activate Version should never delete records that are valid""" - table_name = "test_activate_version_doesnt_delete_before_populating" +def test_activate_version_deletes_data_properly(postgres_config): + """Activate Version should""" + table_name = "test_activate_version_deletes_data_properly" file_name = f"{table_name}.singer" engine = sqlalchemy_engine(postgres_config) with engine.connect() as connection: @@ -326,12 +326,18 @@ def test_activate_version_doesnt_delete_before_populating(postgres_config): singer_file_to_target(file_name, pg_hard_delete) # Will populate us with 7 records with engine.connect() as connection: + result = connection.execute( + f"INSERT INTO {table_name} (code, \"name\") VALUES('Manual1', 'Meltano')" + ) + result = connection.execute( + f"INSERT INTO {table_name} (code, \"name\") VALUES('Manual2', 'Meltano')" + ) result = connection.execute(f"SELECT * FROM {table_name}") - assert result.rowcount == 7 + assert result.rowcount == 9 - # Only has a schema and one activate record message - file_name = "test_activate_version_doesnt_delete_before_populating_2.singer" + # Only has a schema and one activate_version message, should delete all records as it's a higher version than what's currently in the table + file_name = f"{table_name}_2.singer" singer_file_to_target(file_name, pg_hard_delete) with engine.connect() as connection: result = connection.execute(f"SELECT * FROM {table_name}") - assert result.rowcount == 7 + assert result.rowcount == 0 From f40ba633c195f90c366667f4ecd9ede0038d5617 Mon Sep 17 00:00:00 2001 From: Derek Visch Date: Tue, 14 Feb 2023 15:35:27 -0500 Subject: [PATCH 7/8] Consistent spacing for docs --- target_postgres/target.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/target_postgres/target.py b/target_postgres/target.py index 1f622016..1b2e02e4 100644 --- a/target_postgres/target.py +++ b/target_postgres/target.py @@ -95,9 +95,9 @@ def __init__( th.StringType, description=( "SQLAlchemy connection string. " - + "This will override using host, user, password, port," - + "dialect. Note that you must esacpe password special" - + "characters properly see" + + "This will override using host, user, password, port, " + + "dialect. Note that you must esacpe password special " + + "characters properly see " + "https://docs.sqlalchemy.org/en/20/core/engines.html#escaping-special-characters-such-as-signs-in-passwords" # noqa: E501 ), ), @@ -134,7 +134,7 @@ def __init__( description=( "Note that this must be enabled for activate_version to work!" + "This adds _sdc_extracted_at, _sdc_batched_at, and more to every " - + " table. See https://sdk.meltano.com/en/latest/implementation/record_metadata.html " # noqa: E501 + + "table. See https://sdk.meltano.com/en/latest/implementation/record_metadata.html " # noqa: E501 + "for more information." ), ), From aacbed142423d9188e09855a2f3b7198e16bc18c Mon Sep 17 00:00:00 2001 From: Derek Visch Date: Tue, 14 Feb 2023 15:35:48 -0500 Subject: [PATCH 8/8] Update README.md Co-authored-by: Aaron ("AJ") Steers --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 3762a35a..b775ffc8 100644 --- a/README.md +++ b/README.md @@ -21,7 +21,7 @@ Built with the [Meltano SDK](https://sdk.meltano.com) for Singer Taps and Target | user | False | None | User name used to authenticate. Note if sqlalchemy_url is set this will be ignored. | | password | False | None | Password used to authenticate. Note if sqlalchemy_url is set this will be ignored. | | database | False | None | Database name. Note if sqlalchemy_url is set this will be ignored. | -| sqlalchemy_url | False | None | SQLAlchemy connection string. This will override using host, user, password, port,dialect. Note that you must esacpe password specialcharacters properly seehttps://docs.sqlalchemy.org/en/20/core/engines.html#escaping-special-characters-such-as-signs-in-passwords | +| sqlalchemy_url | False | None | SQLAlchemy connection string. This will override using host, user, password, port,dialect. Note that you must escape password special characters properly. See https://docs.sqlalchemy.org/en/20/core/engines.html#escaping-special-characters-such-as-signs-in-passwords | | dialect+driver | False | postgresql+psycopg2 | Dialect+driver see https://docs.sqlalchemy.org/en/20/core/engines.html. Generally just leave this alone. Note if sqlalchemy_url is set this will be ignored. | | default_target_schema | False | None | Postgres schema to send data to, example: tap-clickup | | hard_delete | False | 0 | When activate version is sent from a tap this specefies if we should delete the records that don't match, or mark them with a date in the `_sdc_deleted_at` column. |