|
11 | 11 | import sqlalchemy
|
12 | 12 | from singer_sdk.exceptions import MissingKeyPropertiesError
|
13 | 13 | from singer_sdk.testing import get_target_test_class, sync_end_to_end
|
14 |
| -from sqlalchemy.dialects.postgresql import ARRAY |
15 |
| -from sqlalchemy.types import TEXT, TIMESTAMP |
| 14 | +from sqlalchemy.dialects.postgresql import ARRAY, JSONB |
| 15 | +from sqlalchemy.types import BIGINT, TEXT, TIMESTAMP |
16 | 16 |
|
17 | 17 | from target_postgres.connector import PostgresConnector
|
18 | 18 | from target_postgres.target import TargetPostgres
|
@@ -94,7 +94,7 @@ def verify_data(
|
94 | 94 |
|
95 | 95 | Args:
|
96 | 96 | target: The target to obtain a database connection from.
|
97 |
| - full_table_name: The schema and table name of the table to check data for. |
| 97 | + table_name: The schema and table name of the table to check data for. |
98 | 98 | primary_key: The primary key of the table.
|
99 | 99 | number_of_rows: The expected number of rows that should be in the table.
|
100 | 100 | check_data: A dictionary representing the full contents of the first row in the
|
@@ -134,6 +134,43 @@ def verify_data(
|
134 | 134 | assert result.first()[0] == number_of_rows
|
135 | 135 |
|
136 | 136 |
|
| 137 | +def verify_schema( |
| 138 | + target: TargetPostgres, |
| 139 | + table_name: str, |
| 140 | + check_columns: dict = None, |
| 141 | +): |
| 142 | + """Checks whether the schema of a database table matches the provided column definitions. |
| 143 | +
|
| 144 | + Args: |
| 145 | + target: The target to obtain a database connection from. |
| 146 | + table_name: The schema and table name of the table to check data for. |
| 147 | + check_columns: A dictionary mapping column names to their definitions. Currently, |
| 148 | + it is all about the `type` attribute which is compared. |
| 149 | + """ |
| 150 | + engine = create_engine(target) |
| 151 | + schema = target.config["default_target_schema"] |
| 152 | + with engine.connect() as connection: |
| 153 | + meta = sqlalchemy.MetaData() |
| 154 | + table = sqlalchemy.Table( |
| 155 | + table_name, meta, schema=schema, autoload_with=connection |
| 156 | + ) |
| 157 | + for column in table.c: |
| 158 | + # Ignore `_sdc` columns for now. |
| 159 | + if column.name.startswith("_sdc"): |
| 160 | + continue |
| 161 | + try: |
| 162 | + column_type_expected = check_columns[column.name]["type"] |
| 163 | + except KeyError: |
| 164 | + raise ValueError( |
| 165 | + f"Invalid check_columns - missing definition for column: {column.name}" |
| 166 | + ) |
| 167 | + if not isinstance(column.type, column_type_expected): |
| 168 | + raise TypeError( |
| 169 | + f"Column '{column.name}' (with type '{column.type}') " |
| 170 | + f"does not match expected type: {column_type_expected}" |
| 171 | + ) |
| 172 | + |
| 173 | + |
137 | 174 | def test_sqlalchemy_url_config(postgres_config_no_ssl):
|
138 | 175 | """Be sure that passing a sqlalchemy_url works
|
139 | 176 |
|
@@ -406,11 +443,92 @@ def test_duplicate_records(postgres_target):
|
406 | 443 | verify_data(postgres_target, "test_duplicate_records", 2, "id", row)
|
407 | 444 |
|
408 | 445 |
|
409 |
| -def test_array_data(postgres_target): |
410 |
| - file_name = "array_data.singer" |
| 446 | +def test_array_boolean(postgres_target): |
| 447 | + file_name = "array_boolean.singer" |
| 448 | + singer_file_to_target(file_name, postgres_target) |
| 449 | + row = {"id": 1, "value": [True, False]} |
| 450 | + verify_data(postgres_target, "array_boolean", 3, "id", row) |
| 451 | + verify_schema( |
| 452 | + postgres_target, |
| 453 | + "array_boolean", |
| 454 | + check_columns={ |
| 455 | + "id": {"type": BIGINT}, |
| 456 | + "value": {"type": ARRAY}, |
| 457 | + }, |
| 458 | + ) |
| 459 | + |
| 460 | + |
| 461 | +def test_array_number(postgres_target): |
| 462 | + file_name = "array_number.singer" |
| 463 | + singer_file_to_target(file_name, postgres_target) |
| 464 | + row = {"id": 1, "value": [Decimal("42.42"), Decimal("84.84"), 23]} |
| 465 | + verify_data(postgres_target, "array_number", 3, "id", row) |
| 466 | + verify_schema( |
| 467 | + postgres_target, |
| 468 | + "array_number", |
| 469 | + check_columns={ |
| 470 | + "id": {"type": BIGINT}, |
| 471 | + "value": {"type": ARRAY}, |
| 472 | + }, |
| 473 | + ) |
| 474 | + |
| 475 | + |
| 476 | +def test_array_string(postgres_target): |
| 477 | + file_name = "array_string.singer" |
| 478 | + singer_file_to_target(file_name, postgres_target) |
| 479 | + row = {"id": 1, "value": ["apple", "orange", "pear"]} |
| 480 | + verify_data(postgres_target, "array_string", 4, "id", row) |
| 481 | + verify_schema( |
| 482 | + postgres_target, |
| 483 | + "array_string", |
| 484 | + check_columns={ |
| 485 | + "id": {"type": BIGINT}, |
| 486 | + "value": {"type": ARRAY}, |
| 487 | + }, |
| 488 | + ) |
| 489 | + |
| 490 | + |
| 491 | +def test_array_timestamp(postgres_target): |
| 492 | + file_name = "array_timestamp.singer" |
| 493 | + singer_file_to_target(file_name, postgres_target) |
| 494 | + row = {"id": 1, "value": ["2023-12-13T01:15:02", "2023-12-13T01:16:02"]} |
| 495 | + verify_data(postgres_target, "array_timestamp", 3, "id", row) |
| 496 | + verify_schema( |
| 497 | + postgres_target, |
| 498 | + "array_timestamp", |
| 499 | + check_columns={ |
| 500 | + "id": {"type": BIGINT}, |
| 501 | + "value": {"type": ARRAY}, |
| 502 | + }, |
| 503 | + ) |
| 504 | + |
| 505 | + |
| 506 | +def test_object_mixed(postgres_target): |
| 507 | + file_name = "object_mixed.singer" |
411 | 508 | singer_file_to_target(file_name, postgres_target)
|
412 |
| - row = {"id": 1, "fruits": ["apple", "orange", "pear"]} |
413 |
| - verify_data(postgres_target, "test_carts", 4, "id", row) |
| 509 | + row = { |
| 510 | + "id": 1, |
| 511 | + "value": { |
| 512 | + "string": "foo", |
| 513 | + "integer": 42, |
| 514 | + "float": Decimal("42.42"), |
| 515 | + "timestamp": "2023-12-13T01:15:02", |
| 516 | + "array_boolean": [True, False], |
| 517 | + "array_float": [Decimal("42.42"), Decimal("84.84")], |
| 518 | + "array_integer": [42, 84], |
| 519 | + "array_string": ["foo", "bar"], |
| 520 | + "nested_object": {"foo": "bar"}, |
| 521 | + }, |
| 522 | + } |
| 523 | + verify_data(postgres_target, "object_mixed", 1, "id", row) |
| 524 | + verify_schema( |
| 525 | + postgres_target, |
| 526 | + "object_mixed", |
| 527 | + check_columns={ |
| 528 | + "id": {"type": BIGINT}, |
| 529 | + "value": {"type": JSONB}, |
| 530 | + }, |
| 531 | + ) |
414 | 532 |
|
415 | 533 |
|
416 | 534 | def test_encoded_string_data(postgres_target):
|
@@ -456,41 +574,32 @@ def test_large_int(postgres_target):
|
456 | 574 |
|
457 | 575 | def test_anyof(postgres_target):
|
458 | 576 | """Test that anyOf is handled correctly"""
|
459 |
| - engine = create_engine(postgres_target) |
460 | 577 | table_name = "commits"
|
461 | 578 | file_name = f"{table_name}.singer"
|
462 |
| - schema = postgres_target.config["default_target_schema"] |
463 | 579 | singer_file_to_target(file_name, postgres_target)
|
464 |
| - with engine.connect() as connection: |
465 |
| - meta = sqlalchemy.MetaData() |
466 |
| - table = sqlalchemy.Table( |
467 |
| - "commits", meta, schema=schema, autoload_with=connection |
468 |
| - ) |
469 |
| - for column in table.c: |
470 |
| - # {"type":"string"} |
471 |
| - if column.name == "id": |
472 |
| - assert isinstance(column.type, TEXT) |
473 | 580 |
|
| 581 | + verify_schema( |
| 582 | + postgres_target, |
| 583 | + table_name, |
| 584 | + check_columns={ |
| 585 | + # {"type":"string"} |
| 586 | + "id": {"type": TEXT}, |
474 | 587 | # Any of nullable date-time.
|
475 | 588 | # Note that postgres timestamp is equivalent to jsonschema date-time.
|
476 | 589 | # {"anyOf":[{"type":"string","format":"date-time"},{"type":"null"}]}
|
477 |
| - if column.name in {"authored_date", "committed_date"}: |
478 |
| - assert isinstance(column.type, TIMESTAMP) |
479 |
| - |
| 590 | + "authored_date": {"type": TIMESTAMP}, |
| 591 | + "committed_date": {"type": TIMESTAMP}, |
480 | 592 | # Any of nullable array of strings or single string.
|
481 | 593 | # {"anyOf":[{"type":"array","items":{"type":["null","string"]}},{"type":"string"},{"type":"null"}]}
|
482 |
| - if column.name == "parent_ids": |
483 |
| - assert isinstance(column.type, ARRAY) |
484 |
| - |
| 594 | + "parent_ids": {"type": ARRAY}, |
485 | 595 | # Any of nullable string.
|
486 | 596 | # {"anyOf":[{"type":"string"},{"type":"null"}]}
|
487 |
| - if column.name == "commit_message": |
488 |
| - assert isinstance(column.type, TEXT) |
489 |
| - |
| 597 | + "commit_message": {"type": TEXT}, |
490 | 598 | # Any of nullable string or integer.
|
491 | 599 | # {"anyOf":[{"type":"string"},{"type":"integer"},{"type":"null"}]}
|
492 |
| - if column.name == "legacy_id": |
493 |
| - assert isinstance(column.type, TEXT) |
| 600 | + "legacy_id": {"type": TEXT}, |
| 601 | + }, |
| 602 | + ) |
494 | 603 |
|
495 | 604 |
|
496 | 605 | def test_new_array_column(postgres_target):
|
|
0 commit comments