Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Triggerer code does not seem to be able to use an Airflow connection & variable #48183

Closed
1 of 2 tasks
pankajkoti opened this issue Mar 24, 2025 · 7 comments · Fixed by #48239
Closed
1 of 2 tasks

Triggerer code does not seem to be able to use an Airflow connection & variable #48183

pankajkoti opened this issue Mar 24, 2025 · 7 comments · Fixed by #48239
Assignees
Labels
affected_version:main_branch Issues Reported for main branch affected_version:3.0.0beta For all 3.0.0 beta releases area:core area:Triggerer kind:bug This is a clearly a bug priority:critical Showstopper bug that should be patched immediately provider:databricks
Milestone

Comments

@pankajkoti
Copy link
Member

Apache Airflow version

main (development)

If "Other Airflow 2 version" selected, which one?

No response

What happened?

I am building a new deferrable operator in the databricks provider. Although the synchronous version of the operator seems to work fine, when using deferrable=True, it's complaining that the Airflow connection isn't defined. Likely it appears that the Triggerer is not fetching the Airflow connection or is failing to fetch the connection.

What you think should happen instead?

The Triggerer code should be able to use the Airflow connections so that the deferrable operators work fine.

How to reproduce

Run an example DAG with an Airflow operator with deferrable=True that tries to use an Airflow connection

Operating System

Darwin

Versions of Apache Airflow Providers

No response

Deployment

Virtualenv installation

Deployment details

Locally using latest code from main using Breeze

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@pankajkoti pankajkoti added area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet affected_version:main_branch Issues Reported for main branch labels Mar 24, 2025
@pankajkoti
Copy link
Member Author

This is the traceback I see

Trigger failed:
Traceback (most recent call last):

  File "/opt/airflow/airflow-core/src/airflow/jobs/triggerer_job_runner.py", line 764, in cleanup_finished_triggers
    result = details["task"].result()

  File "/opt/airflow/airflow-core/src/airflow/jobs/triggerer_job_runner.py", line 864, in run_trigger
    async for event in trigger.run():

  File "/opt/airflow/providers/databricks/src/airflow/providers/databricks/triggers/databricks.py", line 178, in run
    statement_state = await self.hook.a_get_sql_statement_state(self.statement_id)

  File "/opt/airflow/providers/databricks/src/airflow/providers/databricks/hooks/databricks.py", line 790, in a_get_sql_statement_state
    response = await self._a_do_api_call(get_sql_statement_endpoint)

  File "/opt/airflow/providers/databricks/src/airflow/providers/databricks/hooks/databricks_base.py", line 707, in _a_do_api_call
    url = self._endpoint_url(endpoint)

  File "/opt/airflow/providers/databricks/src/airflow/providers/databricks/hooks/databricks_base.py", line 619, in _endpoint_url
    port = f":{self.databricks_conn.port}" if self.databricks_conn.port else ""

  File "/usr/local/lib/python3.10/functools.py", line 981, in __get__
    val = self.func(instance)

  File "/opt/airflow/providers/databricks/src/airflow/providers/databricks/hooks/databricks_base.py", line 138, in databricks_conn
    return self.get_connection(self.databricks_conn_id)

  File "/opt/airflow/airflow-core/src/airflow/hooks/base.py", line 64, in get_connection
    conn = Connection.get_connection_from_secrets(conn_id)

  File "/opt/airflow/airflow-core/src/airflow/models/connection.py", line 496, in get_connection_from_secrets
    raise AirflowNotFoundException(f"The conn_id `{conn_id}` isn't defined")

airflow.exceptions.AirflowNotFoundException: The conn_id `databricks_default` isn't defined

@vatsrahul1001 vatsrahul1001 added affected_version:3.0.0beta For all 3.0.0 beta releases priority:high High priority bug that should be patched quickly but does not require immediate new release and removed needs-triage label for new issues that we didn't triage yet labels Mar 24, 2025
@kaxil
Copy link
Member

kaxil commented Mar 24, 2025

It is same for XCom, Variable or anything that requires DB connection in the actual trigger code.

@vatsrahul1001
Copy link
Collaborator

@kaxil for XCom, Variable are we tracking on SDK board?

@jroachgolf84
Copy link
Contributor

Just curious, what would a fix for this look like?

@ashb
Copy link
Member

ashb commented Mar 24, 2025

@jroachgolf84 Adding something like

@functools.cached_property
def client(self) -> Client:
from airflow.sdk.api.client import Client
client = Client(base_url=None, token="", dry_run=True, transport=in_process_api_server().transport)
# Mypy is wrong -- the setter accepts a string on the property setter! `URLType = URL | str`
client.base_url = "http://in-process.invalid./" # type: ignore[assignment]
return client
def _handle_request(self, msg: ToManager, log: FilteringBoundLogger) -> None: # type: ignore[override]
from airflow.sdk.api.datamodels._generated import ConnectionResponse, VariableResponse
resp = None
if isinstance(msg, DagFileParsingResult):
self.parsing_result = msg
return
elif isinstance(msg, GetConnection):
conn = self.client.connections.get(msg.conn_id)
if isinstance(conn, ConnectionResponse):
conn_result = ConnectionResult.from_conn_response(conn)
resp = conn_result.model_dump_json(exclude_unset=True, by_alias=True).encode()
else:
resp = conn.model_dump_json().encode()
elif isinstance(msg, GetVariable):
var = self.client.variables.get(msg.key)
if isinstance(var, VariableResponse):
var_result = VariableResult.from_variable_response(var)
resp = var_result.model_dump_json(exclude_unset=True).encode()
else:
resp = var.model_dump_json().encode()
else:
log.error("Unhandled request", msg=msg)
return
if resp:
self.stdin.write(resp + b"\n")

here

def _handle_request(self, msg: ToTriggerSupervisor, log: FilteringBoundLogger) -> None: # type: ignore[override]
if isinstance(msg, messages.TriggerStateChanges):
log.debug("State change from async process", state=msg)
if msg.events:
self.events.extend(msg.events)
if msg.failures:
self.failed_triggers.extend(msg.failures)
for id in msg.finished or ():
self.running_triggers.discard(id)
self.cancelling_triggers.discard(id)
# Remove logger from the cache, and since structlog doesn't have an explicit close method, we
# only need to remove the last reference to it to close the open FH
if factory := self.logger_cache.pop(id, None):
factory.upload_to_remote()
return
raise ValueError(f"Unknown message type {type(msg)}")

@kaxil
Copy link
Member

kaxil commented Mar 24, 2025

@kaxil for XCom, Variable are we tracking on SDK board?

No but the same code should fix both, the one that is in Ash's example for DAG processor

@kaxil kaxil changed the title Triggerer code does not seem to be able to use an Airflow connection Triggerer code does not seem to be able to use an Airflow connection & variable Mar 24, 2025
@kaxil kaxil added priority:critical Showstopper bug that should be patched immediately and removed priority:high High priority bug that should be patched quickly but does not require immediate new release labels Mar 24, 2025
@pierrejeambrun
Copy link
Member

@jroachgolf84 Thanks for the PR. I didn't know you were working on that.

I also have a draft there:
#48239

I think you are missing some pieces (extend the ToTriggerRunner and ToTriggerSupervisor, socket initialization, first message check etc.)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
affected_version:main_branch Issues Reported for main branch affected_version:3.0.0beta For all 3.0.0 beta releases area:core area:Triggerer kind:bug This is a clearly a bug priority:critical Showstopper bug that should be patched immediately provider:databricks
6 participants