diff --git a/.gitignore b/.gitignore index 56313116..4f8c62d2 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,7 @@ smoke-test .meltano/** .tox/** .secrets/** +.idea .vscode/** output/** .env diff --git a/target_postgres/connector.py b/target_postgres/connector.py index d6730539..59314d60 100644 --- a/target_postgres/connector.py +++ b/target_postgres/connector.py @@ -180,8 +180,10 @@ def copy_table_structure( @contextmanager def _connect(self) -> t.Iterator[sqlalchemy.engine.Connection]: - with self._engine.connect().execution_options() as conn: + engine = self._engine + with engine.connect().execution_options() as conn: yield conn + engine.dispose() def drop_table( self, table: sqlalchemy.Table, connection: sqlalchemy.engine.Connection @@ -330,7 +332,7 @@ def create_empty_table( # type: ignore[override] """Create an empty target table. Args: - full_table_name: the target table name. + table_name: the target table name. schema: the JSON schema for the new table. primary_keys: list of key properties. partition_keys: list of partition keys. @@ -425,7 +427,7 @@ def _create_empty_column( # type: ignore[override] """Create a new column. Args: - full_table_name: The target table name. + table_name: The target table name. column_name: The name of the new column. sql_type: SQLAlchemy type engine to be used in creating the new column. @@ -489,7 +491,7 @@ def _adapt_column_type( # type: ignore[override] """Adapt table column type to support the new JSON schema type. Args: - full_table_name: The target table name. + table_name: The target table name. column_name: The target column name. sql_type: The new SQLAlchemy type. @@ -720,7 +722,7 @@ def _get_column_type( # type: ignore[override] """Get the SQL type of the declared column. Args: - full_table_name: The name of the table. + table_name: The name of the table. column_name: The name of the column. Returns: diff --git a/target_postgres/tests/data_files/array_boolean.singer b/target_postgres/tests/data_files/array_boolean.singer new file mode 100644 index 00000000..268a64a0 --- /dev/null +++ b/target_postgres/tests/data_files/array_boolean.singer @@ -0,0 +1,5 @@ +{"type": "SCHEMA", "stream": "array_boolean", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}, "value": {"type": "array", "items": {"type": "boolean"}}}}} +{"type": "RECORD", "stream": "array_boolean", "record": {"id": 1, "value": [ true, false ]}} +{"type": "RECORD", "stream": "array_boolean", "record": {"id": 2, "value": [ false ]}} +{"type": "RECORD", "stream": "array_boolean", "record": {"id": 3, "value": [ false, true, true, false ]}} +{"type": "STATE", "value": {"array_boolean": 3}} diff --git a/target_postgres/tests/data_files/array_data.singer b/target_postgres/tests/data_files/array_data.singer deleted file mode 100644 index 0d132ac6..00000000 --- a/target_postgres/tests/data_files/array_data.singer +++ /dev/null @@ -1,6 +0,0 @@ -{"type": "SCHEMA", "stream": "test_carts", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}, "fruits": {"type": "array","items": {"type": "string"}}}}} -{"type": "RECORD", "stream": "test_carts", "record": {"id": 1, "fruits": [ "apple", "orange", "pear" ]}} -{"type": "RECORD", "stream": "test_carts", "record": {"id": 2, "fruits": [ "banana", "apple" ]}} -{"type": "RECORD", "stream": "test_carts", "record": {"id": 3, "fruits": [ "pear" ]}} -{"type": "RECORD", "stream": "test_carts", "record": {"id": 4, "fruits": [ "orange", "banana", "apple", "pear" ]}} -{"type": "STATE", "value": {"test_carts": 4}} diff --git a/target_postgres/tests/data_files/array_number.singer b/target_postgres/tests/data_files/array_number.singer new file mode 100644 index 00000000..4eac276e --- /dev/null +++ b/target_postgres/tests/data_files/array_number.singer @@ -0,0 +1,5 @@ +{"type": "SCHEMA", "stream": "array_number", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}, "value": {"type": "array", "items": {"type": "number"}}}}} +{"type": "RECORD", "stream": "array_number", "record": {"id": 1, "value": [ 42.42, 84.84, 23 ]}} +{"type": "RECORD", "stream": "array_number", "record": {"id": 2, "value": [ 1.0 ]}} +{"type": "RECORD", "stream": "array_number", "record": {"id": 3, "value": [ 1.11, 2.22, 3, 4, 5.55 ]}} +{"type": "STATE", "value": {"array_number": 3}} diff --git a/target_postgres/tests/data_files/array_string.singer b/target_postgres/tests/data_files/array_string.singer new file mode 100644 index 00000000..f14e7870 --- /dev/null +++ b/target_postgres/tests/data_files/array_string.singer @@ -0,0 +1,6 @@ +{"type": "SCHEMA", "stream": "array_string", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}, "value": {"type": "array","items": {"type": "string"}}}}} +{"type": "RECORD", "stream": "array_string", "record": {"id": 1, "value": [ "apple", "orange", "pear" ]}} +{"type": "RECORD", "stream": "array_string", "record": {"id": 2, "value": [ "banana", "apple" ]}} +{"type": "RECORD", "stream": "array_string", "record": {"id": 3, "value": [ "pear" ]}} +{"type": "RECORD", "stream": "array_string", "record": {"id": 4, "value": [ "orange", "banana", "apple", "pear" ]}} +{"type": "STATE", "value": {"array_string": 4}} diff --git a/target_postgres/tests/data_files/array_timestamp.singer b/target_postgres/tests/data_files/array_timestamp.singer new file mode 100644 index 00000000..e5192cec --- /dev/null +++ b/target_postgres/tests/data_files/array_timestamp.singer @@ -0,0 +1,5 @@ +{"type": "SCHEMA", "stream": "array_timestamp", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}, "value": {"type": "array", "items": {"type": "string", "format": "date-time"}}}}} +{"type": "RECORD", "stream": "array_timestamp", "record": {"id": 1, "value": [ "2023-12-13T01:15:02", "2023-12-13T01:16:02" ]}} +{"type": "RECORD", "stream": "array_timestamp", "record": {"id": 2, "value": [ "2023-12-13T01:15:02" ]}} +{"type": "RECORD", "stream": "array_timestamp", "record": {"id": 3, "value": [ "2023-12-13T01:15:02", "2023-12-13T01:16:02", "2023-12-13T01:17:02" ]}} +{"type": "STATE", "value": {"array_timestamp": 3}} diff --git a/target_postgres/tests/data_files/object_mixed.singer b/target_postgres/tests/data_files/object_mixed.singer new file mode 100644 index 00000000..2ed86261 --- /dev/null +++ b/target_postgres/tests/data_files/object_mixed.singer @@ -0,0 +1,3 @@ +{"type": "SCHEMA", "stream": "object_mixed", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}, "value": {"type": "object"}}}} +{"type": "RECORD", "stream": "object_mixed", "record": {"id": 1, "value": {"string": "foo", "integer": 42, "float": 42.42, "timestamp": "2023-12-13T01:15:02", "array_boolean": [true, false], "array_float": [42.42, 84.84], "array_integer": [42, 84], "array_string": ["foo", "bar"], "nested_object": {"foo": "bar"}}}} +{"type": "STATE", "value": {"object_mixed": 1}} diff --git a/target_postgres/tests/test_sdk.py b/target_postgres/tests/test_sdk.py index 3f95c393..5d4207ad 100644 --- a/target_postgres/tests/test_sdk.py +++ b/target_postgres/tests/test_sdk.py @@ -61,7 +61,9 @@ class BasePostgresSDKTests: @pytest.fixture() def connection(self, runner): engine = create_engine(runner) - return engine.connect() + with engine.connect() as connection: + yield connection + engine.dispose() SDKTests = get_target_test_class( diff --git a/target_postgres/tests/test_target_postgres.py b/target_postgres/tests/test_target_postgres.py index 1eaa9978..d20fcf0a 100644 --- a/target_postgres/tests/test_target_postgres.py +++ b/target_postgres/tests/test_target_postgres.py @@ -11,8 +11,8 @@ import sqlalchemy from singer_sdk.exceptions import MissingKeyPropertiesError from singer_sdk.testing import get_target_test_class, sync_end_to_end -from sqlalchemy.dialects.postgresql import ARRAY -from sqlalchemy.types import TEXT, TIMESTAMP +from sqlalchemy.dialects.postgresql import ARRAY, JSONB +from sqlalchemy.types import BIGINT, TEXT, TIMESTAMP from target_postgres.connector import PostgresConnector from target_postgres.target import TargetPostgres @@ -28,6 +28,8 @@ postgres_config_ssh_tunnel, ) +METADATA_COLUMN_PREFIX = "_sdc" + # The below syntax is documented at https://docs.pytest.org/en/stable/deprecations.html#calling-fixtures-directly @pytest.fixture(scope="session", name="postgres_config") @@ -75,63 +77,114 @@ def singer_file_to_target(file_name, target) -> None: # TODO should set schemas for each tap individually so we don't collide -def remove_metadata_columns(row: dict) -> dict: - new_row = {} - for column in row.keys(): - if not column.startswith("_sdc"): - new_row[column] = row[column] - return new_row - - -def verify_data( - target: TargetPostgres, - table_name: str, - number_of_rows: int = 1, - primary_key: str | None = None, - check_data: dict | list[dict] | None = None, -): - """Checks whether the data in a table matches a provided data sample. - - Args: - target: The target to obtain a database connection from. - full_table_name: The schema and table name of the table to check data for. - primary_key: The primary key of the table. - number_of_rows: The expected number of rows that should be in the table. - check_data: A dictionary representing the full contents of the first row in the - table, as determined by lowest primary_key value, or else a list of - dictionaries representing every row in the table. - """ - engine = create_engine(target) - full_table_name = f"{target.config['default_target_schema']}.{table_name}" - with engine.connect() as connection: - if primary_key is not None and check_data is not None: - if isinstance(check_data, dict): - result = connection.execute( - sqlalchemy.text( - f"SELECT * FROM {full_table_name} ORDER BY {primary_key}" +class AssertionHelper: + def __init__(self, target: TargetPostgres, metadata_column_prefix: str): + self.target = target + self.metadata_column_prefix = metadata_column_prefix + self.engine = create_engine(self.target) + + def remove_metadata_columns(self, row: dict) -> dict: + new_row = {} + for column in row.keys(): + if not column.startswith(self.metadata_column_prefix): + new_row[column] = row[column] + return new_row + + def verify_data( + self, + table_name: str, + number_of_rows: int = 1, + primary_key: str | None = None, + check_data: dict | list[dict] | None = None, + ): + """Checks whether the data in a table matches a provided data sample. + + Args: + target: The target to obtain a database connection from. + table_name: The schema and table name of the table to check data for. + primary_key: The primary key of the table. + number_of_rows: The expected number of rows that should be in the table. + check_data: A dictionary representing the full contents of the first row in the + table, as determined by lowest primary_key value, or else a list of + dictionaries representing every row in the table. + """ + full_table_name = f"{self.target.config['default_target_schema']}.{table_name}" + with self.engine.connect() as connection: + if primary_key is not None and check_data is not None: + if isinstance(check_data, dict): + result = connection.execute( + sqlalchemy.text( + f"SELECT * FROM {full_table_name} ORDER BY {primary_key}" + ) ) - ) - assert result.rowcount == number_of_rows - result_dict = remove_metadata_columns(result.first()._asdict()) - assert result_dict == check_data - elif isinstance(check_data, list): - result = connection.execute( - sqlalchemy.text( - f"SELECT * FROM {full_table_name} ORDER BY {primary_key}" + assert result.rowcount == number_of_rows + result_dict = self.remove_metadata_columns(result.first()._asdict()) + assert result_dict == check_data + elif isinstance(check_data, list): + result = connection.execute( + sqlalchemy.text( + f"SELECT * FROM {full_table_name} ORDER BY {primary_key}" + ) ) - ) - assert result.rowcount == number_of_rows - result_dict = [ - remove_metadata_columns(row._asdict()) for row in result.all() - ] - assert result_dict == check_data + assert result.rowcount == number_of_rows + result_dict = [ + self.remove_metadata_columns(row._asdict()) + for row in result.all() + ] + assert result_dict == check_data + else: + raise ValueError("Invalid check_data - not dict or list of dicts") else: - raise ValueError("Invalid check_data - not dict or list of dicts") - else: - result = connection.execute( - sqlalchemy.text(f"SELECT COUNT(*) FROM {full_table_name}") + result = connection.execute( + sqlalchemy.text(f"SELECT COUNT(*) FROM {full_table_name}") + ) + assert result.first()[0] == number_of_rows + + def verify_schema( + self, + table_name: str, + check_columns: dict = None, + ): + """Checks whether the schema of a database table matches the provided column definitions. + + Args: + target: The target to obtain a database connection from. + table_name: The schema and table name of the table to check data for. + check_columns: A dictionary mapping column names to their definitions. Currently, + it is all about the `type` attribute which is compared. + metadata_column_prefix: The prefix string for metadata columns. Usually `_sdc`. + """ + schema = self.target.config["default_target_schema"] + with self.engine.connect() as connection: + meta = sqlalchemy.MetaData() + table = sqlalchemy.Table( + table_name, meta, schema=schema, autoload_with=connection ) - assert result.first()[0] == number_of_rows + for column in table.c: + # Ignore `_sdc` metadata columns when veriying table schema. + if column.name.startswith(self.metadata_column_prefix): + continue + try: + column_type_expected = check_columns[column.name]["type"] + except KeyError: + raise ValueError( + f"Invalid check_columns - missing definition for column: {column.name}" + ) + if not isinstance(column.type, column_type_expected): + raise TypeError( + f"Column '{column.name}' (with type '{column.type}') " + f"does not match expected type: {column_type_expected}" + ) + + def __del__(self): + self.engine.dispose() + + +@pytest.fixture +def helper(postgres_target) -> AssertionHelper: + return AssertionHelper( + target=postgres_target, metadata_column_prefix=METADATA_COLUMN_PREFIX + ) def test_sqlalchemy_url_config(postgres_config_no_ssl): @@ -248,11 +301,11 @@ def test_special_chars_in_attributes(postgres_target): singer_file_to_target(file_name, postgres_target) -def test_optional_attributes(postgres_target): +def test_optional_attributes(postgres_target, helper): file_name = "optional_attributes.singer" singer_file_to_target(file_name, postgres_target) row = {"id": 1, "optional": "This is optional"} - verify_data(postgres_target, "test_optional_attributes", 4, "id", row) + helper.verify_data("test_optional_attributes", 4, "id", row) def test_schema_no_properties(postgres_target): @@ -272,7 +325,7 @@ def test_large_numeric_primary_key(postgres_target): # TODO test that data is correct -def test_schema_updates(postgres_target): +def test_schema_updates(postgres_target, helper): file_name = "schema_updates.singer" singer_file_to_target(file_name, postgres_target) row = { @@ -284,16 +337,16 @@ def test_schema_updates(postgres_target): "a5": None, "a6": None, } - verify_data(postgres_target, "test_schema_updates", 6, "id", row) + helper.verify_data("test_schema_updates", 6, "id", row) -def test_multiple_state_messages(postgres_target): +def test_multiple_state_messages(postgres_target, helper): file_name = "multiple_state_messages.singer" singer_file_to_target(file_name, postgres_target) row = {"id": 1, "metric": 100} - verify_data(postgres_target, "test_multiple_state_messages_a", 6, "id", row) + helper.verify_data("test_multiple_state_messages_a", 6, "id", row) row = {"id": 1, "metric": 110} - verify_data(postgres_target, "test_multiple_state_messages_b", 6, "id", row) + helper.verify_data("test_multiple_state_messages_b", 6, "id", row) # TODO test that data is correct @@ -310,7 +363,7 @@ def test_multiple_schema_messages(postgres_target, caplog): assert "Schema has changed for stream" not in caplog.text -def test_relational_data(postgres_target): +def test_relational_data(postgres_target, helper): file_name = "user_location_data.singer" singer_file_to_target(file_name, postgres_target) @@ -367,12 +420,12 @@ def test_relational_data(postgres_target): }, ] - verify_data(postgres_target, "test_users", 8, "id", users) - verify_data(postgres_target, "test_locations", 5, "id", locations) - verify_data(postgres_target, "test_user_in_location", 5, "id", user_in_location) + helper.verify_data("test_users", 8, "id", users) + helper.verify_data("test_locations", 5, "id", locations) + helper.verify_data("test_user_in_location", 5, "id", user_in_location) -def test_no_primary_keys(postgres_target): +def test_no_primary_keys(postgres_target, helper): """We run both of these tests twice just to ensure that no records are removed and append only works properly""" engine = create_engine(postgres_target) table_name = "test_no_pk" @@ -391,7 +444,7 @@ def test_no_primary_keys(postgres_target): file_name = f"{table_name}_append.singer" singer_file_to_target(file_name, postgres_target) - verify_data(postgres_target, table_name, 16) + helper.verify_data(table_name, 16) def test_no_type(postgres_target): @@ -399,21 +452,97 @@ def test_no_type(postgres_target): singer_file_to_target(file_name, postgres_target) -def test_duplicate_records(postgres_target): +def test_duplicate_records(postgres_target, helper): file_name = "duplicate_records.singer" singer_file_to_target(file_name, postgres_target) row = {"id": 1, "metric": 100} - verify_data(postgres_target, "test_duplicate_records", 2, "id", row) + helper.verify_data("test_duplicate_records", 2, "id", row) + + +def test_array_boolean(postgres_target, helper): + file_name = "array_boolean.singer" + singer_file_to_target(file_name, postgres_target) + row = {"id": 1, "value": [True, False]} + helper.verify_data("array_boolean", 3, "id", row) + helper.verify_schema( + "array_boolean", + check_columns={ + "id": {"type": BIGINT}, + "value": {"type": ARRAY}, + }, + ) + + +def test_array_number(postgres_target, helper): + file_name = "array_number.singer" + singer_file_to_target(file_name, postgres_target) + row = {"id": 1, "value": [Decimal("42.42"), Decimal("84.84"), 23]} + helper.verify_data("array_number", 3, "id", row) + helper.verify_schema( + "array_number", + check_columns={ + "id": {"type": BIGINT}, + "value": {"type": ARRAY}, + }, + ) + + +def test_array_string(postgres_target, helper): + file_name = "array_string.singer" + singer_file_to_target(file_name, postgres_target) + row = {"id": 1, "value": ["apple", "orange", "pear"]} + helper.verify_data("array_string", 4, "id", row) + helper.verify_schema( + "array_string", + check_columns={ + "id": {"type": BIGINT}, + "value": {"type": ARRAY}, + }, + ) -def test_array_data(postgres_target): - file_name = "array_data.singer" +def test_array_timestamp(postgres_target, helper): + file_name = "array_timestamp.singer" singer_file_to_target(file_name, postgres_target) - row = {"id": 1, "fruits": ["apple", "orange", "pear"]} - verify_data(postgres_target, "test_carts", 4, "id", row) + row = {"id": 1, "value": ["2023-12-13T01:15:02", "2023-12-13T01:16:02"]} + helper.verify_data("array_timestamp", 3, "id", row) + helper.verify_schema( + "array_timestamp", + check_columns={ + "id": {"type": BIGINT}, + "value": {"type": ARRAY}, + }, + ) + + +def test_object_mixed(postgres_target, helper): + file_name = "object_mixed.singer" + singer_file_to_target(file_name, postgres_target) + row = { + "id": 1, + "value": { + "string": "foo", + "integer": 42, + "float": Decimal("42.42"), + "timestamp": "2023-12-13T01:15:02", + "array_boolean": [True, False], + "array_float": [Decimal("42.42"), Decimal("84.84")], + "array_integer": [42, 84], + "array_string": ["foo", "bar"], + "nested_object": {"foo": "bar"}, + }, + } + helper.verify_data("object_mixed", 1, "id", row) + helper.verify_schema( + "object_mixed", + check_columns={ + "id": {"type": BIGINT}, + "value": {"type": JSONB}, + }, + ) -def test_encoded_string_data(postgres_target): +def test_encoded_string_data(postgres_target, helper): """ We removed NUL characters from the original encoded_strings.singer as postgres doesn't allow them. https://www.postgresql.org/docs/current/functions-string.html#:~:text=chr(0)%20is%20disallowed%20because%20text%20data%20types%20cannot%20store%20that%20character. @@ -426,11 +555,11 @@ def test_encoded_string_data(postgres_target): file_name = "encoded_strings.singer" singer_file_to_target(file_name, postgres_target) row = {"id": 1, "info": "simple string 2837"} - verify_data(postgres_target, "test_strings", 11, "id", row) + helper.verify_data("test_strings", 11, "id", row) row = {"id": 1, "info": {"name": "simple", "value": "simple string 2837"}} - verify_data(postgres_target, "test_strings_in_objects", 11, "id", row) + helper.verify_data("test_strings_in_objects", 11, "id", row) row = {"id": 1, "strings": ["simple string", "απλή συμβολοσειρά", "简单的字串"]} - verify_data(postgres_target, "test_strings_in_arrays", 6, "id", row) + helper.verify_data("test_strings_in_arrays", 6, "id", row) def test_tap_appl(postgres_target): @@ -454,43 +583,33 @@ def test_large_int(postgres_target): singer_file_to_target(file_name, postgres_target) -def test_anyof(postgres_target): +def test_anyof(postgres_target, helper): """Test that anyOf is handled correctly""" - engine = create_engine(postgres_target) table_name = "commits" file_name = f"{table_name}.singer" - schema = postgres_target.config["default_target_schema"] singer_file_to_target(file_name, postgres_target) - with engine.connect() as connection: - meta = sqlalchemy.MetaData() - table = sqlalchemy.Table( - "commits", meta, schema=schema, autoload_with=connection - ) - for column in table.c: - # {"type":"string"} - if column.name == "id": - assert isinstance(column.type, TEXT) + helper.verify_schema( + table_name, + check_columns={ + # {"type":"string"} + "id": {"type": TEXT}, # Any of nullable date-time. # Note that postgres timestamp is equivalent to jsonschema date-time. # {"anyOf":[{"type":"string","format":"date-time"},{"type":"null"}]} - if column.name in {"authored_date", "committed_date"}: - assert isinstance(column.type, TIMESTAMP) - + "authored_date": {"type": TIMESTAMP}, + "committed_date": {"type": TIMESTAMP}, # Any of nullable array of strings or single string. # {"anyOf":[{"type":"array","items":{"type":["null","string"]}},{"type":"string"},{"type":"null"}]} - if column.name == "parent_ids": - assert isinstance(column.type, ARRAY) - + "parent_ids": {"type": ARRAY}, # Any of nullable string. # {"anyOf":[{"type":"string"},{"type":"null"}]} - if column.name == "commit_message": - assert isinstance(column.type, TEXT) - + "commit_message": {"type": TEXT}, # Any of nullable string or integer. # {"anyOf":[{"type":"string"},{"type":"integer"},{"type":"null"}]} - if column.name == "legacy_id": - assert isinstance(column.type, TEXT) + "legacy_id": {"type": TEXT}, + }, + ) def test_new_array_column(postgres_target): @@ -579,7 +698,7 @@ def test_activate_version_soft_delete(postgres_target): result = connection.execute( sqlalchemy.text( - f"SELECT * FROM {full_table_name} where _sdc_deleted_at is NOT NULL" + f"SELECT * FROM {full_table_name} where {METADATA_COLUMN_PREFIX}_deleted_at is NOT NULL" ) ) assert result.rowcount == 2