Skip to content

Commit

Permalink
fix: corrected bugs regarding inferring nullability wrong; refactored…
Browse files Browse the repository at this point in the history
… tests used to choose best schema
  • Loading branch information
Nicolas ESTRADA committed Jan 28, 2025
1 parent 8f45283 commit 41f8ded
Show file tree
Hide file tree
Showing 4 changed files with 552 additions and 225 deletions.
19 changes: 12 additions & 7 deletions sources/pg_legacy_replication/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ def gen_data_item(
col_name = _actual_column_name(data)
if not included_columns or col_name in included_columns:
data_item[col_name] = _to_dlt_val(
data, column_schema[col_name]["data_type"], for_delete=is_delete
data, column_schema[col_name], for_delete=is_delete
)

return data_item
Expand Down Expand Up @@ -601,19 +601,18 @@ def compare_schemas(last: TTableSchema, new: TTableSchema) -> TTableSchema:
precise one if they are relatively equal or else raises a
AssertionError due to an incompatible schema change
"""
table_name = last["name"]
assert table_name == new["name"], "Table names do not match"
assert last["name"] == new["name"], "Table names do not match"

table_schema = TTableSchema(name=table_name, columns={})
table_schema = TTableSchema(name=last["name"], columns={})
last_cols, new_cols = last["columns"], new["columns"]
assert len(last_cols) == len(
new_cols
), f"Columns mismatch last:{last['columns']} new:{new['columns']}"
), f"Columns mismatch last:{last_cols} new:{new_cols}"

for name, s1 in last_cols.items():
s2 = new_cols.get(name)
assert (
s2 is not None and s1["data_type"] == s2["data_type"]
s2 and s1["data_type"] == s2["data_type"]
), f"Incompatible schema for column '{name}'"

# Ensure new has no fields outside allowed fields
Expand All @@ -623,7 +622,13 @@ def compare_schemas(last: TTableSchema, new: TTableSchema) -> TTableSchema:
# Select the more precise schema by comparing nullable, precision, and scale
col_schema = TColumnSchema(name=name, data_type=s1["data_type"])
if "nullable" in s1 or "nullable" in s2:
col_schema["nullable"] = s1.get("nullable", s2.get("nullable"))
# Get nullable values (could be True, False, or None)
s1_null = s1.get("nullable")
s2_null = s2.get("nullable")
if s1_null is not None and s2_null is not None:
col_schema["nullable"] = s1_null or s2_null # Default is True
else:
col_schema["nullable"] = s1_null if s1_null is not None else s2_null
if "precision" in s1 or "precision" in s2:
col_schema["precision"] = s1.get("precision", s2.get("precision"))
if "scale" in s1 or "scale" in s2:
Expand Down
18 changes: 12 additions & 6 deletions sources/pg_legacy_replication/schema_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,6 @@ def _to_dlt_column_schema(
# Set nullable attribute if type_info is available
if type_info:
column_schema["nullable"] = type_info.value_optional
elif datum.WhichOneof("datum"): # Or simply guess as this is a very rare case
column_schema["nullable"] = False

return column_schema

Expand All @@ -170,13 +168,16 @@ def _epoch_days_to_date(epoch_days: int) -> pendulum.Date:


def _to_dlt_val(
val: DatumMessage, data_type: TDataType, *, for_delete: bool = False
val: DatumMessage, col_schema: TColumnSchema, *, for_delete: bool = False
) -> Any:
"""Converts decoderbuf's datum value into dlt-compatible data value."""
datum = val.WhichOneof("datum")
data_type = col_schema["data_type"]
assert data_type is not None
datum = _get_datum_attr(val)
if datum is None:
return _DUMMY_VALS[data_type] if for_delete else None
if datum == "datum_missing":
nullable = col_schema.get("nullable", False)
if for_delete and not nullable:
return _DUMMY_VALS[data_type]
return None

raw_value = getattr(val, datum)
Expand Down Expand Up @@ -212,3 +213,8 @@ def safe_load(x: str) -> Any:
return x

return [safe_load(x) for x in without_braces.split(",")]


def _get_datum_attr(val: DatumMessage) -> Optional[str]:
datum = val.WhichOneof("datum")
return None if datum is None and datum == "datum_missing" else datum
Loading

0 comments on commit 41f8ded

Please sign in to comment.