Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[OpenLineage] Added Openlineage support for DatabricksCopyIntoOperator #45257

Open
wants to merge 17 commits into
base: main
Choose a base branch
from

Conversation

rahul-madaan
Copy link
Contributor


This PR adds support for DatabricksCopyIntoOperator (/providers/databricks/operators/databricks_sql.py)
Taking reference from CopyFromExternalStageToSnowflakeOperator which already has OL support.

tested using this DAG - click to open
"""
Example DAG demonstrating the usage of DatabricksCopyIntoOperator with OpenLineage support.
"""

import logging
logging.getLogger('databricks.sql').setLevel(logging.DEBUG)

from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.databricks.operators.databricks_sql import DatabricksCopyIntoOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    'databricks_copy_into_example',
    default_args=default_args,
    description='Example DAG for DatabricksCopyIntoOperator with OpenLineage',
    schedule_interval=None,
    schedule=None,
    start_date=datetime(2024, 12, 13),
    catchup=False,
    tags=['example', 'databricks', 'openlineage'],
) as dag:

    # Example with S3
    copy_from_s3 = DatabricksCopyIntoOperator(
        task_id='copy_from_s3',
        databricks_conn_id='databricks_default',
        table_name='wide_world_importers.astronomer_assets.sample',
        file_location='s3a://kreative360/yoyo/sample.csv',
        file_format='CSV',
        format_options={
            "header": "true",
            "inferSchema": "true",
            "delimiter": ","
        },
        copy_options={
            "force": "true",
            "mergeSchema": "true"
        },
        http_path='/sql/1.0/warehouses/ca43e87568a0b22e',
        credential={
            "AWS_ACCESS_KEY": "<redacted>",
            "AWS_SECRET_KEY": "<redacted>",
            "AWS_SESSION_TOKEN": "<redacted>",
            "AWS_REGION": "ap-south-1"
        }
    )

    # Example with Azure Blob Storage using wasbs protocol
    copy_from_azure = DatabricksCopyIntoOperator(
        task_id='copy_from_azure',
        databricks_conn_id='databricks_default',  
        table_name='wide_world_importers.astronomer_assets.sample',
        file_location='wasbs://[email protected]/sample.csv',
        file_format='CSV',
        # Using Azure storage credential
        credential={
            "AZURE_SAS_TOKEN": "<redacted>", # Replace with actual SAS token
        },
        format_options={
            "header": "true",
            "inferSchema": "true"
        },
        copy_options={
            "force": "true",
            "mergeSchema": "true"
        },
        http_path='/sql/1.0/warehouses/ca43e87568a0b22e'
    )

    # Example with GCS
    copy_from_gcs = DatabricksCopyIntoOperator(
        task_id='copy_from_gcs',
        databricks_conn_id='databricks_default',
        table_name='wide_world_importers.astronomer_assets.sample',
        file_location='gs://kreative360/yoyo/sample.csv',
        file_format='CSV',
        format_options={
            "header": "true",
            "inferSchema": "true",
            "delimiter": ","
        },
        copy_options={
            "force": "true",
            "mergeSchema": "true"
        },
        http_path='/sql/1.0/warehouses/ca43e87568a0b22e',
    )

    [copy_from_s3, copy_from_azure, copy_from_gcs]

note - tests have been performed using s3 object on aws only, other cloud providers (azure and gcs) have been tested only using FAIL events.

OL events:

^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@rahul-madaan rahul-madaan changed the title [OpenLineage] Added Openlineage support to DatabricksCopyIntoOperator [OpenLineage] Added Openlineage support for DatabricksCopyIntoOperator Dec 28, 2024
@rahul-madaan
Copy link
Contributor Author

@kacpermuda @potiuk could you please take a look at the PR and approve?

@rahul-madaan rahul-madaan force-pushed the rahul-madaan-databrcik-copyinto-support branch from 935ff33 to cad91c0 Compare January 2, 2025 06:12
@potiuk potiuk force-pushed the rahul-madaan-databrcik-copyinto-support branch from cad91c0 to 3f40df2 Compare January 2, 2025 12:26
@potiuk
Copy link
Member

potiuk commented Jan 2, 2025

@rahul-madaan -> I rebased it. we found and issue with @jscheffl with the new caching scheme - fixed in #45347 that would run "main" version of the tests.

@rahul-madaan
Copy link
Contributor Author

@kacpermuda @potiuk A gentle reminder, please review the PR whenever you find some time this week.

@kacpermuda
Copy link
Contributor

@rahul-madaan Can you please rebase and make sure all the CI is green? I'll try to review the PR this week :)

@rahul-madaan
Copy link
Contributor Author

errors are not getting resolved even after rebasing. It somehow started coming after Jarek rebased it. Should I reset the branch and cherrypick my commits to resolve this?

@jscheffl
Copy link
Contributor

jscheffl commented Jan 6, 2025

errors are not getting resolved even after rebasing. It somehow started coming after Jarek rebased it. Should I reset the branch and cherrypick my commits to resolve this?

Usually not needed - but I agree it seems the errors are unrelated to your changes... on first view.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is generated code. You make changes manually here? The source should be the provider.yaml in the databricks provider.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was getting an error in one of the tests. I ran the recommended command and it got automatically updated. Once this was updated, test started passing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think any changes here are necessary, you are not adding any new dependencies in the code. Try to submit the Pr without them and we'll see what happens.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed the changes

"skip-pre-commits": "check-provider-yaml-valid,flynt,identity,lint-helm-chart,mypy-airflow,mypy-dev,mypy-docs,mypy-providers,mypy-task-sdk,"
"ts-compile-format-lint-ui,ts-compile-format-lint-www",
"run-kubernetes-tests": "false",
"upgrade-to-newer-dependencies": "false",
"core-test-types-list-as-string": "API Always CLI Core Operators Other Serialization WWW",
"providers-test-types-list-as-string": "Providers[amazon] Providers[common.compat,common.io,common.sql,dbt.cloud,ftp,mysql,openlineage,postgres,sftp,snowflake,trino] Providers[google]",
"providers-test-types-list-as-string": "Providers[amazon] Providers[common.compat,common.io,common.sql,databricks,dbt.cloud,ftp,mysql,openlineage,postgres,sftp,snowflake,trino] Providers[google]",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you add a dependency for databricks to AWS?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, this was done because assertion in one of the tests was failing. I believe this is not just for AWS provider. the ID of test says "Trigger openlineage and related providers tests when Assets files changed"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think any changes here are necessary, you are not adding any new dependencies in the code. Try to submit the Pr without them and we'll see what happens.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed these changes.

Copy link
Contributor

@kacpermuda kacpermuda left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added some comments. Let's try to re-iterate on this after you make changes, i think we can make this code easier to read and more maintainable 🚀

Comment on lines 368 to 370
result = hook.run(self._sql, handler=lambda cur: cur.fetchall())
# Convert to list, handling the case where result might be None
self._result = list(result) if result is not None else []
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the result saved here? Later in the code It appears to be query_ids, but are we sure that is what we are getting? What if somebody submits a query that reads a million rows? I'm asking because it looks like a place with a lot of potential to add a lot of processing even for users that do not use OpenLineage integration.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated code to not save the result now. It is not required.

# Build SQLJobFacet
try:
normalized_sql = SQLParser.normalize_sql(self._sql)
normalized_sql = re.sub(r"\n+", "\n", re.sub(r" +", " ", normalized_sql))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we usually only use SQLParser.normalize_sql for the SQLJobFacet. What is the reason for this additional replacements? Could you add some comments if it's necessary ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is done in CopyFromExternalStageToSnowflakeOperator OL implementation here

query = re.sub(r"\n+", "\n", re.sub(r" +", " ", query))

Comment on lines 470 to 475
# Combine schema/table with optional catalog for final dataset name
fq_name = table
if schema:
fq_name = f"{schema}.{fq_name}"
if catalog:
fq_name = f"{catalog}.{fq_name}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are not replacing None values with anything here, so we can end up with None.None.table_name ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, we will end up with only the table name if schema and catalog both are None.

Comment on lines 480 to 487
extraction_errors.append(
Error(
errorMessage=str(e),
stackTrace=None,
task="output_dataset_construction",
taskNumber=None,
)
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are not using the extraction_errors later in the code, so there is no point in appending here. Maybe the ExtractionErrorFacet should be created at the very end?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

used it in the code later, if it is available then it will be added to the OL event.

)

@staticmethod
def _extract_openlineage_unique_dataset_paths(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this method used? I don't see it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Junk method, forgot to remove it. apologies 😅


def on_kill(self) -> None:
# NB: on_kill isn't required for this operator since query cancelling gets
# handled in `DatabricksSqlHook.run()` method which is called in `execute()`
...

def get_openlineage_facets_on_complete(self, task_instance):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, this is a really long method. Maybe we can somehow split it into some smaller, logical parts if possible? If not, maybe somehow refactor it? I think indentation make it harder to read, when there is a lot of logic inside a single if. Maybe those code chunks should be separate methods?

)

# Add external query facet if we have run results
if hasattr(self, "_result") and self._result:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this hasattr is redundant, since we ourselves add it in init.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not required, I have removed externalQueryRunFacet.

# Add external query facet if we have run results
if hasattr(self, "_result") and self._result:
run_facets["externalQuery"] = ExternalQueryRunFacet(
externalQueryId=str(id(self._result)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are saving it as a list in execute and here we are converting it to string. Why is that? Is it a single query_id or multiple ones?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not required, I have removed externalQueryRunFacet.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think any changes here are necessary, you are not adding any new dependencies in the code. Try to submit the Pr without them and we'll see what happens.

"skip-pre-commits": "check-provider-yaml-valid,flynt,identity,lint-helm-chart,mypy-airflow,mypy-dev,mypy-docs,mypy-providers,mypy-task-sdk,"
"ts-compile-format-lint-ui,ts-compile-format-lint-www",
"run-kubernetes-tests": "false",
"upgrade-to-newer-dependencies": "false",
"core-test-types-list-as-string": "API Always CLI Core Operators Other Serialization WWW",
"providers-test-types-list-as-string": "Providers[amazon] Providers[common.compat,common.io,common.sql,dbt.cloud,ftp,mysql,openlineage,postgres,sftp,snowflake,trino] Providers[google]",
"providers-test-types-list-as-string": "Providers[amazon] Providers[common.compat,common.io,common.sql,databricks,dbt.cloud,ftp,mysql,openlineage,postgres,sftp,snowflake,trino] Providers[google]",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think any changes here are necessary, you are not adding any new dependencies in the code. Try to submit the Pr without them and we'll see what happens.

@rahul-madaan rahul-madaan marked this pull request as draft January 10, 2025 13:24
@rahul-madaan rahul-madaan marked this pull request as ready for review January 24, 2025 22:24
@rahul-madaan
Copy link
Contributor Author

@kacpermuda I have addressed all the comments, please take a look. I have tested it on s3 and it is working perfectly.

@rahul-madaan rahul-madaan marked this pull request as draft January 28, 2025 07:30
@rahul-madaan rahul-madaan marked this pull request as ready for review January 28, 2025 08:16
Copy link
Contributor

@kacpermuda kacpermuda left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not test it manually, leaving some more comments. Mostly: i think there are some leftovers in the operator and the tests from the previous version (like usage of self._result). Apart from that, it gets the job done 😄

@@ -273,7 +273,12 @@ def __init__(
if force_copy is not None:
self._copy_options["force"] = "true" if force_copy else "false"

# These will be used by OpenLineage
self._sql: str | None = None
self._result: list[Any] = []
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is no longer needed?

normalized_sql = re.sub(r"\n+", "\n", re.sub(r" +", " ", normalized_sql))
job_facets["sql"] = SQLJobFacet(query=normalized_sql)
except Exception as e:
self.log.error("Failed creating SQL job facet: %s", str(e))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we usually try not to log on error level unless absolutely necessary. Could you review the code and adjust in other places as well? Maybe warning is enough? WDYT?

file_format="CSV",
)
op._sql = "COPY INTO schema.table FROM 's3://bucket/dir1'"
op._result = mock_hook().run.return_value
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is already gone from the operator, so the tests should be adjusted?

Comment on lines +292 to +299
def test_get_openlineage_facets_on_complete_with_errors(mock_hook):
"""Test OpenLineage facets generation with extraction errors."""
mock_hook().run.return_value = [
{"file": "s3://bucket/dir1/file1.csv"},
{"file": "invalid://location/file.csv"}, # Invalid URI
{"file": "azure://account.invalid.windows.net/container/file.csv"}, # Invalid Azure URI
]
mock_hook().get_connection().host = "databricks.com"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we are passing invalid URI's and then checking that there are no extraction errors? Is this test valid?



@mock.patch("airflow.providers.databricks.operators.databricks_sql.DatabricksSqlHook")
def test_get_openlineage_facets_on_complete_no_sql(mock_hook):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should run the execute and then explicitly overwrite the self._sql? Or at least manually make sure the _sql is None. This test assumes the self._sql is initiated as None, but we don't check it.

Comment on lines +376 to +377
assert "COPY INTO catalog.schema.table" in result.job_facets["sql"].query
assert "FILEFORMAT = CSV" in result.job_facets["sql"].query
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should check the whole query or at least also check if the gcs path is there? WDYT?

Comment on lines +391 to +392
op._sql = "COPY INTO schema.table FROM 'invalid://location'"
op._result = [{"file": "s3://bucket/file.csv"}]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not actually execute the operator instead?

Comment on lines +496 to +508
if extraction_errors:
run_facets["extractionError"] = ExtractionErrorRunFacet(
totalTasks=1,
failedTasks=len(extraction_errors),
errors=extraction_errors,
)
# Return only error facets for invalid URIs
return OperatorLineage(
inputs=[],
outputs=[],
job_facets=job_facets,
run_facets=run_facets,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we try to return the output dataset even if the inputs are incorrect?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants