Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/lineage/airflow.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ Supported operators:
- `RedshiftSQLOperator`
- `SnowflakeOperator` and `SnowflakeOperatorAsync`
- `SqliteOperator`
- `TeradataOperator` (_Note: Teradata uses two-tier `database.table` naming without a schema level_)
- `TrinoOperator`

<!--
Expand Down
1 change: 1 addition & 0 deletions metadata-ingestion-modules/airflow-plugin/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ def get_long_description():
"snowflake-connector-python>=2.7.10",
"virtualenv", # needed by PythonVirtualenvOperator
"apache-airflow-providers-sqlite",
"apache-airflow-providers-teradata",
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ def __init__(self):
BigQueryInsertJobOperatorExtractor
)

self.task_to_extractor.extractors["TeradataOperator"] = (
TeradataOperatorExtractor
)

self._graph: Optional["DataHubGraph"] = None

@contextlib.contextmanager
Expand Down Expand Up @@ -333,4 +337,29 @@ def _snowflake_default_schema(self: "SnowflakeExtractor") -> Optional[str]:
or self.conn.schema
)
# TODO: Should we try a fallback of:
# execute_query_on_hook(self.hook, "SELECT current_schema();")[0][0]
# execute_query_on_hook(self.hook, "SELECT current_schema();")


class TeradataOperatorExtractor(BaseExtractor):
"""Extractor for Teradata SQL operations.

Extracts lineage from TeradataOperator tasks by parsing the SQL queries
and understanding Teradata's two-tier database.table naming convention.
"""

def extract(self) -> Optional[TaskMetadata]:
from airflow.providers.teradata.operators.teradata import TeradataOperator

operator: "TeradataOperator" = self.operator
sql = operator.sql
if not sql:
self.log.warning("No query found in TeradataOperator")
return None

return _parse_sql_into_task_metadata(
self,
sql,
platform="teradata",
default_database=None,
default_schema=None,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added integration test and generated golden file
Uses extra param as ANSI as declared in docs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Integration test results:
pytest 'tests/integration/test_plugin.py::test_airflow_plugin[v2_teradata_operator]' -v
image

Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from datetime import datetime

from airflow import DAG
from airflow.providers.teradata.operators.teradata import TeradataOperator

TERADATA_COST_TABLE = "costs"
TERADATA_PROCESSED_TABLE = "processed_costs"


def _fake_teradata_execute(*args, **kwargs):
pass


with DAG(
"teradata_operator",
start_date=datetime(2023, 1, 1),
schedule_interval=None,
catchup=False,
) as dag:
TeradataOperator.execute = _fake_teradata_execute

transform_cost_table = TeradataOperator(
teradata_conn_id="my_teradata",
task_id="transform_cost_table",
sql="""
CREATE OR REPLACE TABLE {{ params.out_table_name }} AS
SELECT
id,
month,
total_cost,
area,
total_cost / area as cost_per_area
FROM {{ params.in_table_name }}
""",
params={
"in_table_name": TERADATA_COST_TABLE,
"out_table_name": TERADATA_PROCESSED_TABLE,
},
)
Loading
Loading