From fe92e4567c19dc9ce6e94b2d92b2557c619c89e9 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Wed, 13 Dec 2023 02:08:25 +0100 Subject: [PATCH 1/7] chore: Add `.idea` to `.gitignore` --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 56313116..4f8c62d2 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,7 @@ smoke-test .meltano/** .tox/** .secrets/** +.idea .vscode/** output/** .env From c0d4a8901c2d71fbeb2bd8cd888113df409f74ca Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Wed, 13 Dec 2023 02:07:57 +0100 Subject: [PATCH 2/7] test: Add test cases for arrays and objects In PostgreSQL, all boils down to the `jsonb[]` type, but arrays are reflected as `sqlalchemy.dialects.postgresql.ARRAY` instead of `sqlalchemy.dialects.postgresql.JSONB`. In order to prepare for more advanced type mangling & validation, and to better support databases pretending to be compatible with PostgreSQL, the new test cases exercise arrays with different kinds of inner values, because, on other databases, ARRAYs may need to have uniform content. Along the lines, it adds a `verify_schema` utility function in the spirit of the `verify_data` function, refactored and generalized from the `test_anyof` test case. --- .../tests/data_files/array_boolean.singer | 5 + .../tests/data_files/array_data.singer | 6 - .../tests/data_files/array_number.singer | 5 + .../tests/data_files/array_string.singer | 6 + .../tests/data_files/array_timestamp.singer | 5 + .../tests/data_files/object_mixed.singer | 3 + target_postgres/tests/test_target_postgres.py | 167 +++++++++++++++--- 7 files changed, 162 insertions(+), 35 deletions(-) create mode 100644 target_postgres/tests/data_files/array_boolean.singer delete mode 100644 target_postgres/tests/data_files/array_data.singer create mode 100644 target_postgres/tests/data_files/array_number.singer create mode 100644 target_postgres/tests/data_files/array_string.singer create mode 100644 target_postgres/tests/data_files/array_timestamp.singer create mode 100644 target_postgres/tests/data_files/object_mixed.singer diff --git a/target_postgres/tests/data_files/array_boolean.singer b/target_postgres/tests/data_files/array_boolean.singer new file mode 100644 index 00000000..268a64a0 --- /dev/null +++ b/target_postgres/tests/data_files/array_boolean.singer @@ -0,0 +1,5 @@ +{"type": "SCHEMA", "stream": "array_boolean", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}, "value": {"type": "array", "items": {"type": "boolean"}}}}} +{"type": "RECORD", "stream": "array_boolean", "record": {"id": 1, "value": [ true, false ]}} +{"type": "RECORD", "stream": "array_boolean", "record": {"id": 2, "value": [ false ]}} +{"type": "RECORD", "stream": "array_boolean", "record": {"id": 3, "value": [ false, true, true, false ]}} +{"type": "STATE", "value": {"array_boolean": 3}} diff --git a/target_postgres/tests/data_files/array_data.singer b/target_postgres/tests/data_files/array_data.singer deleted file mode 100644 index 0d132ac6..00000000 --- a/target_postgres/tests/data_files/array_data.singer +++ /dev/null @@ -1,6 +0,0 @@ -{"type": "SCHEMA", "stream": "test_carts", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}, "fruits": {"type": "array","items": {"type": "string"}}}}} -{"type": "RECORD", "stream": "test_carts", "record": {"id": 1, "fruits": [ "apple", "orange", "pear" ]}} -{"type": "RECORD", "stream": "test_carts", "record": {"id": 2, "fruits": [ "banana", "apple" ]}} -{"type": "RECORD", "stream": "test_carts", "record": {"id": 3, "fruits": [ "pear" ]}} -{"type": "RECORD", "stream": "test_carts", "record": {"id": 4, "fruits": [ "orange", "banana", "apple", "pear" ]}} -{"type": "STATE", "value": {"test_carts": 4}} diff --git a/target_postgres/tests/data_files/array_number.singer b/target_postgres/tests/data_files/array_number.singer new file mode 100644 index 00000000..4eac276e --- /dev/null +++ b/target_postgres/tests/data_files/array_number.singer @@ -0,0 +1,5 @@ +{"type": "SCHEMA", "stream": "array_number", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}, "value": {"type": "array", "items": {"type": "number"}}}}} +{"type": "RECORD", "stream": "array_number", "record": {"id": 1, "value": [ 42.42, 84.84, 23 ]}} +{"type": "RECORD", "stream": "array_number", "record": {"id": 2, "value": [ 1.0 ]}} +{"type": "RECORD", "stream": "array_number", "record": {"id": 3, "value": [ 1.11, 2.22, 3, 4, 5.55 ]}} +{"type": "STATE", "value": {"array_number": 3}} diff --git a/target_postgres/tests/data_files/array_string.singer b/target_postgres/tests/data_files/array_string.singer new file mode 100644 index 00000000..f14e7870 --- /dev/null +++ b/target_postgres/tests/data_files/array_string.singer @@ -0,0 +1,6 @@ +{"type": "SCHEMA", "stream": "array_string", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}, "value": {"type": "array","items": {"type": "string"}}}}} +{"type": "RECORD", "stream": "array_string", "record": {"id": 1, "value": [ "apple", "orange", "pear" ]}} +{"type": "RECORD", "stream": "array_string", "record": {"id": 2, "value": [ "banana", "apple" ]}} +{"type": "RECORD", "stream": "array_string", "record": {"id": 3, "value": [ "pear" ]}} +{"type": "RECORD", "stream": "array_string", "record": {"id": 4, "value": [ "orange", "banana", "apple", "pear" ]}} +{"type": "STATE", "value": {"array_string": 4}} diff --git a/target_postgres/tests/data_files/array_timestamp.singer b/target_postgres/tests/data_files/array_timestamp.singer new file mode 100644 index 00000000..e5192cec --- /dev/null +++ b/target_postgres/tests/data_files/array_timestamp.singer @@ -0,0 +1,5 @@ +{"type": "SCHEMA", "stream": "array_timestamp", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}, "value": {"type": "array", "items": {"type": "string", "format": "date-time"}}}}} +{"type": "RECORD", "stream": "array_timestamp", "record": {"id": 1, "value": [ "2023-12-13T01:15:02", "2023-12-13T01:16:02" ]}} +{"type": "RECORD", "stream": "array_timestamp", "record": {"id": 2, "value": [ "2023-12-13T01:15:02" ]}} +{"type": "RECORD", "stream": "array_timestamp", "record": {"id": 3, "value": [ "2023-12-13T01:15:02", "2023-12-13T01:16:02", "2023-12-13T01:17:02" ]}} +{"type": "STATE", "value": {"array_timestamp": 3}} diff --git a/target_postgres/tests/data_files/object_mixed.singer b/target_postgres/tests/data_files/object_mixed.singer new file mode 100644 index 00000000..2ed86261 --- /dev/null +++ b/target_postgres/tests/data_files/object_mixed.singer @@ -0,0 +1,3 @@ +{"type": "SCHEMA", "stream": "object_mixed", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}, "value": {"type": "object"}}}} +{"type": "RECORD", "stream": "object_mixed", "record": {"id": 1, "value": {"string": "foo", "integer": 42, "float": 42.42, "timestamp": "2023-12-13T01:15:02", "array_boolean": [true, false], "array_float": [42.42, 84.84], "array_integer": [42, 84], "array_string": ["foo", "bar"], "nested_object": {"foo": "bar"}}}} +{"type": "STATE", "value": {"object_mixed": 1}} diff --git a/target_postgres/tests/test_target_postgres.py b/target_postgres/tests/test_target_postgres.py index 1eaa9978..ab4cd11d 100644 --- a/target_postgres/tests/test_target_postgres.py +++ b/target_postgres/tests/test_target_postgres.py @@ -11,8 +11,8 @@ import sqlalchemy from singer_sdk.exceptions import MissingKeyPropertiesError from singer_sdk.testing import get_target_test_class, sync_end_to_end -from sqlalchemy.dialects.postgresql import ARRAY -from sqlalchemy.types import TEXT, TIMESTAMP +from sqlalchemy.dialects.postgresql import ARRAY, JSONB +from sqlalchemy.types import BIGINT, TEXT, TIMESTAMP from target_postgres.connector import PostgresConnector from target_postgres.target import TargetPostgres @@ -94,7 +94,7 @@ def verify_data( Args: target: The target to obtain a database connection from. - full_table_name: The schema and table name of the table to check data for. + table_name: The schema and table name of the table to check data for. primary_key: The primary key of the table. number_of_rows: The expected number of rows that should be in the table. check_data: A dictionary representing the full contents of the first row in the @@ -134,6 +134,43 @@ def verify_data( assert result.first()[0] == number_of_rows +def verify_schema( + target: TargetPostgres, + table_name: str, + check_columns: dict = None, +): + """Checks whether the schema of a database table matches the provided column definitions. + + Args: + target: The target to obtain a database connection from. + table_name: The schema and table name of the table to check data for. + check_columns: A dictionary mapping column names to their definitions. Currently, + it is all about the `type` attribute which is compared. + """ + engine = create_engine(target) + schema = target.config["default_target_schema"] + with engine.connect() as connection: + meta = sqlalchemy.MetaData() + table = sqlalchemy.Table( + table_name, meta, schema=schema, autoload_with=connection + ) + for column in table.c: + # Ignore `_sdc` columns for now. + if column.name.startswith("_sdc"): + continue + try: + column_type_expected = check_columns[column.name]["type"] + except KeyError: + raise ValueError( + f"Invalid check_columns - missing definition for column: {column.name}" + ) + if not isinstance(column.type, column_type_expected): + raise TypeError( + f"Column '{column.name}' (with type '{column.type}') " + f"does not match expected type: {column_type_expected}" + ) + + def test_sqlalchemy_url_config(postgres_config_no_ssl): """Be sure that passing a sqlalchemy_url works @@ -406,11 +443,92 @@ def test_duplicate_records(postgres_target): verify_data(postgres_target, "test_duplicate_records", 2, "id", row) -def test_array_data(postgres_target): - file_name = "array_data.singer" +def test_array_boolean(postgres_target): + file_name = "array_boolean.singer" + singer_file_to_target(file_name, postgres_target) + row = {"id": 1, "value": [True, False]} + verify_data(postgres_target, "array_boolean", 3, "id", row) + verify_schema( + postgres_target, + "array_boolean", + check_columns={ + "id": {"type": BIGINT}, + "value": {"type": ARRAY}, + }, + ) + + +def test_array_number(postgres_target): + file_name = "array_number.singer" + singer_file_to_target(file_name, postgres_target) + row = {"id": 1, "value": [Decimal("42.42"), Decimal("84.84"), 23]} + verify_data(postgres_target, "array_number", 3, "id", row) + verify_schema( + postgres_target, + "array_number", + check_columns={ + "id": {"type": BIGINT}, + "value": {"type": ARRAY}, + }, + ) + + +def test_array_string(postgres_target): + file_name = "array_string.singer" + singer_file_to_target(file_name, postgres_target) + row = {"id": 1, "value": ["apple", "orange", "pear"]} + verify_data(postgres_target, "array_string", 4, "id", row) + verify_schema( + postgres_target, + "array_string", + check_columns={ + "id": {"type": BIGINT}, + "value": {"type": ARRAY}, + }, + ) + + +def test_array_timestamp(postgres_target): + file_name = "array_timestamp.singer" + singer_file_to_target(file_name, postgres_target) + row = {"id": 1, "value": ["2023-12-13T01:15:02", "2023-12-13T01:16:02"]} + verify_data(postgres_target, "array_timestamp", 3, "id", row) + verify_schema( + postgres_target, + "array_timestamp", + check_columns={ + "id": {"type": BIGINT}, + "value": {"type": ARRAY}, + }, + ) + + +def test_object_mixed(postgres_target): + file_name = "object_mixed.singer" singer_file_to_target(file_name, postgres_target) - row = {"id": 1, "fruits": ["apple", "orange", "pear"]} - verify_data(postgres_target, "test_carts", 4, "id", row) + row = { + "id": 1, + "value": { + "string": "foo", + "integer": 42, + "float": Decimal("42.42"), + "timestamp": "2023-12-13T01:15:02", + "array_boolean": [True, False], + "array_float": [Decimal("42.42"), Decimal("84.84")], + "array_integer": [42, 84], + "array_string": ["foo", "bar"], + "nested_object": {"foo": "bar"}, + }, + } + verify_data(postgres_target, "object_mixed", 1, "id", row) + verify_schema( + postgres_target, + "object_mixed", + check_columns={ + "id": {"type": BIGINT}, + "value": {"type": JSONB}, + }, + ) def test_encoded_string_data(postgres_target): @@ -456,41 +574,32 @@ def test_large_int(postgres_target): def test_anyof(postgres_target): """Test that anyOf is handled correctly""" - engine = create_engine(postgres_target) table_name = "commits" file_name = f"{table_name}.singer" - schema = postgres_target.config["default_target_schema"] singer_file_to_target(file_name, postgres_target) - with engine.connect() as connection: - meta = sqlalchemy.MetaData() - table = sqlalchemy.Table( - "commits", meta, schema=schema, autoload_with=connection - ) - for column in table.c: - # {"type":"string"} - if column.name == "id": - assert isinstance(column.type, TEXT) + verify_schema( + postgres_target, + table_name, + check_columns={ + # {"type":"string"} + "id": {"type": TEXT}, # Any of nullable date-time. # Note that postgres timestamp is equivalent to jsonschema date-time. # {"anyOf":[{"type":"string","format":"date-time"},{"type":"null"}]} - if column.name in {"authored_date", "committed_date"}: - assert isinstance(column.type, TIMESTAMP) - + "authored_date": {"type": TIMESTAMP}, + "committed_date": {"type": TIMESTAMP}, # Any of nullable array of strings or single string. # {"anyOf":[{"type":"array","items":{"type":["null","string"]}},{"type":"string"},{"type":"null"}]} - if column.name == "parent_ids": - assert isinstance(column.type, ARRAY) - + "parent_ids": {"type": ARRAY}, # Any of nullable string. # {"anyOf":[{"type":"string"},{"type":"null"}]} - if column.name == "commit_message": - assert isinstance(column.type, TEXT) - + "commit_message": {"type": TEXT}, # Any of nullable string or integer. # {"anyOf":[{"type":"string"},{"type":"integer"},{"type":"null"}]} - if column.name == "legacy_id": - assert isinstance(column.type, TEXT) + "legacy_id": {"type": TEXT}, + }, + ) def test_new_array_column(postgres_target): From 8b3ea4f55c3d3e69df5f7db7c9d95fce95317308 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Tue, 19 Dec 2023 19:05:45 +0100 Subject: [PATCH 3/7] test: Fix `FATAL: sorry, too many clients already` Dispose the SQLAlchemy engine object after use within test utility functions. --- target_postgres/tests/test_target_postgres.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/target_postgres/tests/test_target_postgres.py b/target_postgres/tests/test_target_postgres.py index ab4cd11d..f676f8f1 100644 --- a/target_postgres/tests/test_target_postgres.py +++ b/target_postgres/tests/test_target_postgres.py @@ -132,6 +132,7 @@ def verify_data( sqlalchemy.text(f"SELECT COUNT(*) FROM {full_table_name}") ) assert result.first()[0] == number_of_rows + engine.dispose() def verify_schema( @@ -169,6 +170,7 @@ def verify_schema( f"Column '{column.name}' (with type '{column.type}') " f"does not match expected type: {column_type_expected}" ) + engine.dispose() def test_sqlalchemy_url_config(postgres_config_no_ssl): From a9d179694aa8b51290928865da7f267ed27e44cc Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Tue, 19 Dec 2023 20:10:36 +0100 Subject: [PATCH 4/7] test: Fix `FATAL: sorry, too many clients already` Within `BasePostgresSDKTests`, new database connections via SQLAlchemy haven't been closed, and started filling up the connection pool, eventually saturating it. --- target_postgres/tests/test_sdk.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/target_postgres/tests/test_sdk.py b/target_postgres/tests/test_sdk.py index 3f95c393..5d4207ad 100644 --- a/target_postgres/tests/test_sdk.py +++ b/target_postgres/tests/test_sdk.py @@ -61,7 +61,9 @@ class BasePostgresSDKTests: @pytest.fixture() def connection(self, runner): engine = create_engine(runner) - return engine.connect() + with engine.connect() as connection: + yield connection + engine.dispose() SDKTests = get_target_test_class( From 723e1fa699b188a3d8e3ccd5589e6f64cfacdea3 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Tue, 19 Dec 2023 21:31:43 +0100 Subject: [PATCH 5/7] test: Fix `FATAL: sorry, too many clients already` Dispose the SQLAlchemy engine object after use within `PostgresConnector`. --- target_postgres/connector.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/target_postgres/connector.py b/target_postgres/connector.py index d6730539..369eb462 100644 --- a/target_postgres/connector.py +++ b/target_postgres/connector.py @@ -180,8 +180,10 @@ def copy_table_structure( @contextmanager def _connect(self) -> t.Iterator[sqlalchemy.engine.Connection]: - with self._engine.connect().execution_options() as conn: + engine = self._engine + with engine.connect().execution_options() as conn: yield conn + engine.dispose() def drop_table( self, table: sqlalchemy.Table, connection: sqlalchemy.engine.Connection From cf732e1d905bd49deeb98313bb40f8c3d7af38a3 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Thu, 14 Dec 2023 21:23:20 +0100 Subject: [PATCH 6/7] chore: Fix parameter names in docstrings --- target_postgres/connector.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/target_postgres/connector.py b/target_postgres/connector.py index 369eb462..59314d60 100644 --- a/target_postgres/connector.py +++ b/target_postgres/connector.py @@ -332,7 +332,7 @@ def create_empty_table( # type: ignore[override] """Create an empty target table. Args: - full_table_name: the target table name. + table_name: the target table name. schema: the JSON schema for the new table. primary_keys: list of key properties. partition_keys: list of partition keys. @@ -427,7 +427,7 @@ def _create_empty_column( # type: ignore[override] """Create a new column. Args: - full_table_name: The target table name. + table_name: The target table name. column_name: The name of the new column. sql_type: SQLAlchemy type engine to be used in creating the new column. @@ -491,7 +491,7 @@ def _adapt_column_type( # type: ignore[override] """Adapt table column type to support the new JSON schema type. Args: - full_table_name: The target table name. + table_name: The target table name. column_name: The target column name. sql_type: The new SQLAlchemy type. @@ -722,7 +722,7 @@ def _get_column_type( # type: ignore[override] """Get the SQL type of the declared column. Args: - full_table_name: The name of the table. + table_name: The name of the table. column_name: The name of the column. Returns: From 860baf44ae72a58d1165c313de0515e2ff3a1f4e Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Thu, 14 Dec 2023 21:27:48 +0100 Subject: [PATCH 7/7] test: Refactor utility functions `verify_data` and `verify_schema` By wrapping them into a container class `AssertionHelper`, it is easy to parameterize them, and to provide them to the test functions using a pytest fixture. This way, they are reusable from database adapter implementations which derive from PostgreSQL. The motivation for this is because the metadata column prefix `_sdc` needs to be adjusted for other database systems, as they reject such columns, being reserved for system purposes. In the specific case of CrateDB, it is enough to rename it like `__sdc`. Sad but true. --- target_postgres/tests/test_target_postgres.py | 272 +++++++++--------- 1 file changed, 140 insertions(+), 132 deletions(-) diff --git a/target_postgres/tests/test_target_postgres.py b/target_postgres/tests/test_target_postgres.py index f676f8f1..d20fcf0a 100644 --- a/target_postgres/tests/test_target_postgres.py +++ b/target_postgres/tests/test_target_postgres.py @@ -28,6 +28,8 @@ postgres_config_ssh_tunnel, ) +METADATA_COLUMN_PREFIX = "_sdc" + # The below syntax is documented at https://docs.pytest.org/en/stable/deprecations.html#calling-fixtures-directly @pytest.fixture(scope="session", name="postgres_config") @@ -75,102 +77,114 @@ def singer_file_to_target(file_name, target) -> None: # TODO should set schemas for each tap individually so we don't collide -def remove_metadata_columns(row: dict) -> dict: - new_row = {} - for column in row.keys(): - if not column.startswith("_sdc"): - new_row[column] = row[column] - return new_row - - -def verify_data( - target: TargetPostgres, - table_name: str, - number_of_rows: int = 1, - primary_key: str | None = None, - check_data: dict | list[dict] | None = None, -): - """Checks whether the data in a table matches a provided data sample. - - Args: - target: The target to obtain a database connection from. - table_name: The schema and table name of the table to check data for. - primary_key: The primary key of the table. - number_of_rows: The expected number of rows that should be in the table. - check_data: A dictionary representing the full contents of the first row in the - table, as determined by lowest primary_key value, or else a list of - dictionaries representing every row in the table. - """ - engine = create_engine(target) - full_table_name = f"{target.config['default_target_schema']}.{table_name}" - with engine.connect() as connection: - if primary_key is not None and check_data is not None: - if isinstance(check_data, dict): - result = connection.execute( - sqlalchemy.text( - f"SELECT * FROM {full_table_name} ORDER BY {primary_key}" +class AssertionHelper: + def __init__(self, target: TargetPostgres, metadata_column_prefix: str): + self.target = target + self.metadata_column_prefix = metadata_column_prefix + self.engine = create_engine(self.target) + + def remove_metadata_columns(self, row: dict) -> dict: + new_row = {} + for column in row.keys(): + if not column.startswith(self.metadata_column_prefix): + new_row[column] = row[column] + return new_row + + def verify_data( + self, + table_name: str, + number_of_rows: int = 1, + primary_key: str | None = None, + check_data: dict | list[dict] | None = None, + ): + """Checks whether the data in a table matches a provided data sample. + + Args: + target: The target to obtain a database connection from. + table_name: The schema and table name of the table to check data for. + primary_key: The primary key of the table. + number_of_rows: The expected number of rows that should be in the table. + check_data: A dictionary representing the full contents of the first row in the + table, as determined by lowest primary_key value, or else a list of + dictionaries representing every row in the table. + """ + full_table_name = f"{self.target.config['default_target_schema']}.{table_name}" + with self.engine.connect() as connection: + if primary_key is not None and check_data is not None: + if isinstance(check_data, dict): + result = connection.execute( + sqlalchemy.text( + f"SELECT * FROM {full_table_name} ORDER BY {primary_key}" + ) ) - ) - assert result.rowcount == number_of_rows - result_dict = remove_metadata_columns(result.first()._asdict()) - assert result_dict == check_data - elif isinstance(check_data, list): - result = connection.execute( - sqlalchemy.text( - f"SELECT * FROM {full_table_name} ORDER BY {primary_key}" + assert result.rowcount == number_of_rows + result_dict = self.remove_metadata_columns(result.first()._asdict()) + assert result_dict == check_data + elif isinstance(check_data, list): + result = connection.execute( + sqlalchemy.text( + f"SELECT * FROM {full_table_name} ORDER BY {primary_key}" + ) ) - ) - assert result.rowcount == number_of_rows - result_dict = [ - remove_metadata_columns(row._asdict()) for row in result.all() - ] - assert result_dict == check_data + assert result.rowcount == number_of_rows + result_dict = [ + self.remove_metadata_columns(row._asdict()) + for row in result.all() + ] + assert result_dict == check_data + else: + raise ValueError("Invalid check_data - not dict or list of dicts") else: - raise ValueError("Invalid check_data - not dict or list of dicts") - else: - result = connection.execute( - sqlalchemy.text(f"SELECT COUNT(*) FROM {full_table_name}") + result = connection.execute( + sqlalchemy.text(f"SELECT COUNT(*) FROM {full_table_name}") + ) + assert result.first()[0] == number_of_rows + + def verify_schema( + self, + table_name: str, + check_columns: dict = None, + ): + """Checks whether the schema of a database table matches the provided column definitions. + + Args: + target: The target to obtain a database connection from. + table_name: The schema and table name of the table to check data for. + check_columns: A dictionary mapping column names to their definitions. Currently, + it is all about the `type` attribute which is compared. + metadata_column_prefix: The prefix string for metadata columns. Usually `_sdc`. + """ + schema = self.target.config["default_target_schema"] + with self.engine.connect() as connection: + meta = sqlalchemy.MetaData() + table = sqlalchemy.Table( + table_name, meta, schema=schema, autoload_with=connection ) - assert result.first()[0] == number_of_rows - engine.dispose() + for column in table.c: + # Ignore `_sdc` metadata columns when veriying table schema. + if column.name.startswith(self.metadata_column_prefix): + continue + try: + column_type_expected = check_columns[column.name]["type"] + except KeyError: + raise ValueError( + f"Invalid check_columns - missing definition for column: {column.name}" + ) + if not isinstance(column.type, column_type_expected): + raise TypeError( + f"Column '{column.name}' (with type '{column.type}') " + f"does not match expected type: {column_type_expected}" + ) + def __del__(self): + self.engine.dispose() -def verify_schema( - target: TargetPostgres, - table_name: str, - check_columns: dict = None, -): - """Checks whether the schema of a database table matches the provided column definitions. - Args: - target: The target to obtain a database connection from. - table_name: The schema and table name of the table to check data for. - check_columns: A dictionary mapping column names to their definitions. Currently, - it is all about the `type` attribute which is compared. - """ - engine = create_engine(target) - schema = target.config["default_target_schema"] - with engine.connect() as connection: - meta = sqlalchemy.MetaData() - table = sqlalchemy.Table( - table_name, meta, schema=schema, autoload_with=connection - ) - for column in table.c: - # Ignore `_sdc` columns for now. - if column.name.startswith("_sdc"): - continue - try: - column_type_expected = check_columns[column.name]["type"] - except KeyError: - raise ValueError( - f"Invalid check_columns - missing definition for column: {column.name}" - ) - if not isinstance(column.type, column_type_expected): - raise TypeError( - f"Column '{column.name}' (with type '{column.type}') " - f"does not match expected type: {column_type_expected}" - ) - engine.dispose() +@pytest.fixture +def helper(postgres_target) -> AssertionHelper: + return AssertionHelper( + target=postgres_target, metadata_column_prefix=METADATA_COLUMN_PREFIX + ) def test_sqlalchemy_url_config(postgres_config_no_ssl): @@ -287,11 +301,11 @@ def test_special_chars_in_attributes(postgres_target): singer_file_to_target(file_name, postgres_target) -def test_optional_attributes(postgres_target): +def test_optional_attributes(postgres_target, helper): file_name = "optional_attributes.singer" singer_file_to_target(file_name, postgres_target) row = {"id": 1, "optional": "This is optional"} - verify_data(postgres_target, "test_optional_attributes", 4, "id", row) + helper.verify_data("test_optional_attributes", 4, "id", row) def test_schema_no_properties(postgres_target): @@ -311,7 +325,7 @@ def test_large_numeric_primary_key(postgres_target): # TODO test that data is correct -def test_schema_updates(postgres_target): +def test_schema_updates(postgres_target, helper): file_name = "schema_updates.singer" singer_file_to_target(file_name, postgres_target) row = { @@ -323,16 +337,16 @@ def test_schema_updates(postgres_target): "a5": None, "a6": None, } - verify_data(postgres_target, "test_schema_updates", 6, "id", row) + helper.verify_data("test_schema_updates", 6, "id", row) -def test_multiple_state_messages(postgres_target): +def test_multiple_state_messages(postgres_target, helper): file_name = "multiple_state_messages.singer" singer_file_to_target(file_name, postgres_target) row = {"id": 1, "metric": 100} - verify_data(postgres_target, "test_multiple_state_messages_a", 6, "id", row) + helper.verify_data("test_multiple_state_messages_a", 6, "id", row) row = {"id": 1, "metric": 110} - verify_data(postgres_target, "test_multiple_state_messages_b", 6, "id", row) + helper.verify_data("test_multiple_state_messages_b", 6, "id", row) # TODO test that data is correct @@ -349,7 +363,7 @@ def test_multiple_schema_messages(postgres_target, caplog): assert "Schema has changed for stream" not in caplog.text -def test_relational_data(postgres_target): +def test_relational_data(postgres_target, helper): file_name = "user_location_data.singer" singer_file_to_target(file_name, postgres_target) @@ -406,12 +420,12 @@ def test_relational_data(postgres_target): }, ] - verify_data(postgres_target, "test_users", 8, "id", users) - verify_data(postgres_target, "test_locations", 5, "id", locations) - verify_data(postgres_target, "test_user_in_location", 5, "id", user_in_location) + helper.verify_data("test_users", 8, "id", users) + helper.verify_data("test_locations", 5, "id", locations) + helper.verify_data("test_user_in_location", 5, "id", user_in_location) -def test_no_primary_keys(postgres_target): +def test_no_primary_keys(postgres_target, helper): """We run both of these tests twice just to ensure that no records are removed and append only works properly""" engine = create_engine(postgres_target) table_name = "test_no_pk" @@ -430,7 +444,7 @@ def test_no_primary_keys(postgres_target): file_name = f"{table_name}_append.singer" singer_file_to_target(file_name, postgres_target) - verify_data(postgres_target, table_name, 16) + helper.verify_data(table_name, 16) def test_no_type(postgres_target): @@ -438,20 +452,19 @@ def test_no_type(postgres_target): singer_file_to_target(file_name, postgres_target) -def test_duplicate_records(postgres_target): +def test_duplicate_records(postgres_target, helper): file_name = "duplicate_records.singer" singer_file_to_target(file_name, postgres_target) row = {"id": 1, "metric": 100} - verify_data(postgres_target, "test_duplicate_records", 2, "id", row) + helper.verify_data("test_duplicate_records", 2, "id", row) -def test_array_boolean(postgres_target): +def test_array_boolean(postgres_target, helper): file_name = "array_boolean.singer" singer_file_to_target(file_name, postgres_target) row = {"id": 1, "value": [True, False]} - verify_data(postgres_target, "array_boolean", 3, "id", row) - verify_schema( - postgres_target, + helper.verify_data("array_boolean", 3, "id", row) + helper.verify_schema( "array_boolean", check_columns={ "id": {"type": BIGINT}, @@ -460,13 +473,12 @@ def test_array_boolean(postgres_target): ) -def test_array_number(postgres_target): +def test_array_number(postgres_target, helper): file_name = "array_number.singer" singer_file_to_target(file_name, postgres_target) row = {"id": 1, "value": [Decimal("42.42"), Decimal("84.84"), 23]} - verify_data(postgres_target, "array_number", 3, "id", row) - verify_schema( - postgres_target, + helper.verify_data("array_number", 3, "id", row) + helper.verify_schema( "array_number", check_columns={ "id": {"type": BIGINT}, @@ -475,13 +487,12 @@ def test_array_number(postgres_target): ) -def test_array_string(postgres_target): +def test_array_string(postgres_target, helper): file_name = "array_string.singer" singer_file_to_target(file_name, postgres_target) row = {"id": 1, "value": ["apple", "orange", "pear"]} - verify_data(postgres_target, "array_string", 4, "id", row) - verify_schema( - postgres_target, + helper.verify_data("array_string", 4, "id", row) + helper.verify_schema( "array_string", check_columns={ "id": {"type": BIGINT}, @@ -490,13 +501,12 @@ def test_array_string(postgres_target): ) -def test_array_timestamp(postgres_target): +def test_array_timestamp(postgres_target, helper): file_name = "array_timestamp.singer" singer_file_to_target(file_name, postgres_target) row = {"id": 1, "value": ["2023-12-13T01:15:02", "2023-12-13T01:16:02"]} - verify_data(postgres_target, "array_timestamp", 3, "id", row) - verify_schema( - postgres_target, + helper.verify_data("array_timestamp", 3, "id", row) + helper.verify_schema( "array_timestamp", check_columns={ "id": {"type": BIGINT}, @@ -505,7 +515,7 @@ def test_array_timestamp(postgres_target): ) -def test_object_mixed(postgres_target): +def test_object_mixed(postgres_target, helper): file_name = "object_mixed.singer" singer_file_to_target(file_name, postgres_target) row = { @@ -522,9 +532,8 @@ def test_object_mixed(postgres_target): "nested_object": {"foo": "bar"}, }, } - verify_data(postgres_target, "object_mixed", 1, "id", row) - verify_schema( - postgres_target, + helper.verify_data("object_mixed", 1, "id", row) + helper.verify_schema( "object_mixed", check_columns={ "id": {"type": BIGINT}, @@ -533,7 +542,7 @@ def test_object_mixed(postgres_target): ) -def test_encoded_string_data(postgres_target): +def test_encoded_string_data(postgres_target, helper): """ We removed NUL characters from the original encoded_strings.singer as postgres doesn't allow them. https://www.postgresql.org/docs/current/functions-string.html#:~:text=chr(0)%20is%20disallowed%20because%20text%20data%20types%20cannot%20store%20that%20character. @@ -546,11 +555,11 @@ def test_encoded_string_data(postgres_target): file_name = "encoded_strings.singer" singer_file_to_target(file_name, postgres_target) row = {"id": 1, "info": "simple string 2837"} - verify_data(postgres_target, "test_strings", 11, "id", row) + helper.verify_data("test_strings", 11, "id", row) row = {"id": 1, "info": {"name": "simple", "value": "simple string 2837"}} - verify_data(postgres_target, "test_strings_in_objects", 11, "id", row) + helper.verify_data("test_strings_in_objects", 11, "id", row) row = {"id": 1, "strings": ["simple string", "απλή συμβολοσειρά", "简单的字串"]} - verify_data(postgres_target, "test_strings_in_arrays", 6, "id", row) + helper.verify_data("test_strings_in_arrays", 6, "id", row) def test_tap_appl(postgres_target): @@ -574,14 +583,13 @@ def test_large_int(postgres_target): singer_file_to_target(file_name, postgres_target) -def test_anyof(postgres_target): +def test_anyof(postgres_target, helper): """Test that anyOf is handled correctly""" table_name = "commits" file_name = f"{table_name}.singer" singer_file_to_target(file_name, postgres_target) - verify_schema( - postgres_target, + helper.verify_schema( table_name, check_columns={ # {"type":"string"} @@ -690,7 +698,7 @@ def test_activate_version_soft_delete(postgres_target): result = connection.execute( sqlalchemy.text( - f"SELECT * FROM {full_table_name} where _sdc_deleted_at is NOT NULL" + f"SELECT * FROM {full_table_name} where {METADATA_COLUMN_PREFIX}_deleted_at is NOT NULL" ) ) assert result.rowcount == 2