Skip to content

Commit

Permalink
Merge branch 'apache:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
ambika-garg authored Dec 20, 2024
2 parents 4336b66 + f10f552 commit 9569894
Show file tree
Hide file tree
Showing 83 changed files with 2,770 additions and 2,999 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ ARG PYTHON_BASE_IMAGE="python:3.9-slim-bookworm"
# Also use `force pip` label on your PR to swap all places we use `uv` to `pip`
ARG AIRFLOW_PIP_VERSION=24.3.1
# ARG AIRFLOW_PIP_VERSION="git+https://github.com/pypa/pip.git@main"
ARG AIRFLOW_UV_VERSION=0.5.10
ARG AIRFLOW_UV_VERSION=0.5.11
ARG AIRFLOW_USE_UV="false"
ARG UV_HTTP_TIMEOUT="300"
ARG AIRFLOW_IMAGE_REPOSITORY="https://github.com/apache/airflow"
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile.ci
Original file line number Diff line number Diff line change
Expand Up @@ -1365,7 +1365,7 @@ RUN bash /scripts/docker/install_packaging_tools.sh; \
# Also use `force pip` label on your PR to swap all places we use `uv` to `pip`
ARG AIRFLOW_PIP_VERSION=24.3.1
# ARG AIRFLOW_PIP_VERSION="git+https://github.com/pypa/pip.git@main"
ARG AIRFLOW_UV_VERSION=0.5.10
ARG AIRFLOW_UV_VERSION=0.5.11
# TODO(potiuk): automate with upgrade check (possibly)
ARG AIRFLOW_PRE_COMMIT_VERSION="4.0.1"
ARG AIRFLOW_PRE_COMMIT_UV_VERSION="4.1.4"
Expand Down
3 changes: 0 additions & 3 deletions airflow/callbacks/callback_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,10 @@ class CallbackRequest(BaseModel):
Base Class with information about the callback to be executed.
:param msg: Additional Message that can be used for logging
:param processor_subdir: Directory used by Dag Processor when parsed the dag.
"""

full_filepath: str
"""File Path to use to run the callback"""
processor_subdir: str | None = None
"""Directory used by Dag Processor when parsed the dag"""
msg: str | None = None
"""Additional Message that can be used for logging to determine failure/zombie"""

Expand Down
21 changes: 6 additions & 15 deletions airflow/dag_processing/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ def _update_dag_owner_links(dag_owner_links: dict[str, str], dm: DagModel, *, se
)


def _serialize_dag_capturing_errors(dag: MaybeSerializedDAG, session: Session, processor_subdir: str | None):
def _serialize_dag_capturing_errors(dag: MaybeSerializedDAG, session: Session):
"""
Try to serialize the dag to the DB, but make a note of any errors.
Expand All @@ -186,7 +186,6 @@ def _serialize_dag_capturing_errors(dag: MaybeSerializedDAG, session: Session, p
dag,
min_update_interval=settings.MIN_SERIALIZED_DAG_UPDATE_INTERVAL,
session=session,
processor_subdir=processor_subdir,
)
if dag_was_updated:
_sync_dag_perms(dag, session=session)
Expand Down Expand Up @@ -234,18 +233,15 @@ def _update_dag_warnings(
session.merge(warning_to_add)


def _update_import_errors(
files_parsed: set[str], import_errors: dict[str, str], processor_subdir: str | None, session: Session
):
def _update_import_errors(files_parsed: set[str], import_errors: dict[str, str], session: Session):
from airflow.listeners.listener import get_listener_manager

# We can remove anything from files parsed in this batch that doesn't have an error. We need to remove old
# errors (i.e. from files that are removed) separately

session.execute(delete(ParseImportError).where(ParseImportError.filename.in_(list(files_parsed))))

query = select(ParseImportError.filename).where(ParseImportError.processor_subdir == processor_subdir)
existing_import_error_files = set(session.scalars(query))
existing_import_error_files = set(session.scalars(select(ParseImportError.filename)))

# Add the errors of the processed files
for filename, stacktrace in import_errors.items():
Expand All @@ -261,7 +257,6 @@ def _update_import_errors(
filename=filename,
timestamp=utcnow(),
stacktrace=stacktrace,
processor_subdir=processor_subdir,
)
)
# sending notification when a new dag import error occurs
Expand All @@ -272,7 +267,6 @@ def _update_import_errors(
def update_dag_parsing_results_in_db(
dags: Collection[MaybeSerializedDAG],
import_errors: dict[str, str],
processor_subdir: str | None,
warnings: set[DagWarning],
session: Session,
*,
Expand Down Expand Up @@ -308,11 +302,11 @@ def update_dag_parsing_results_in_db(
)
log.debug("Calling the DAG.bulk_sync_to_db method")
try:
DAG.bulk_write_to_db(dags, processor_subdir=processor_subdir, session=session)
DAG.bulk_write_to_db(dags, session=session)
# Write Serialized DAGs to DB, capturing errors
# Write Serialized DAGs to DB, capturing errors
for dag in dags:
serialize_errors.extend(_serialize_dag_capturing_errors(dag, session, processor_subdir))
serialize_errors.extend(_serialize_dag_capturing_errors(dag, session))
except OperationalError:
session.rollback()
raise
Expand All @@ -329,7 +323,6 @@ def update_dag_parsing_results_in_db(
_update_import_errors(
files_parsed=good_dag_filelocs,
import_errors=import_errors,
processor_subdir=processor_subdir,
session=session,
)
except Exception:
Expand Down Expand Up @@ -377,7 +370,6 @@ def update_dags(
self,
orm_dags: dict[str, DagModel],
*,
processor_subdir: str | None = None,
session: Session,
) -> None:
from airflow.configuration import conf
Expand All @@ -391,7 +383,7 @@ def update_dags(
for dag_id, dm in sorted(orm_dags.items()):
dag = self.dags[dag_id]
dm.fileloc = dag.fileloc
dm.owners = dag.owner
dm.owners = dag.owner or conf.get("operators", "default_owner")
dm.is_active = True
dm.has_import_errors = False
dm.last_parsed_time = utcnow()
Expand Down Expand Up @@ -433,7 +425,6 @@ def update_dags(
dm.timetable_summary = dag.timetable.summary
dm.timetable_description = dag.timetable.description
dm.asset_expression = dag.timetable.asset_condition.as_expression()
dm.processor_subdir = processor_subdir

last_automated_run: DagRun | None = run_info.latest_runs.get(dag.dag_id)
if last_automated_run is None:
Expand Down
18 changes: 2 additions & 16 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,8 +429,6 @@ def deactivate_stale_dags(
"""Detect and deactivate DAGs which are no longer present in files."""
to_deactivate = set()
query = select(DagModel.dag_id, DagModel.fileloc, DagModel.last_parsed_time).where(DagModel.is_active)
if self.standalone_dag_processor:
query = query.where(DagModel.processor_subdir == dag_directory)
dags_parsed = session.execute(query)

for dag in dags_parsed:
Expand Down Expand Up @@ -595,13 +593,8 @@ def _fetch_callbacks(
self.log.debug("Fetching callbacks from the database.")

callback_queue: list[CallbackRequest] = []
dag_directory = self.get_dag_directory()
with prohibit_commit(session) as guard:
query = select(DbCallbackRequest)
if self.standalone_dag_processor:
query = query.where(
DbCallbackRequest.processor_subdir == dag_directory,
)
query = query.order_by(DbCallbackRequest.priority_weight.asc()).limit(self.max_callbacks_per_loop)
query = with_row_locks(query, of=DbCallbackRequest, session=session, skip_locked=True)
callbacks = session.scalars(query)
Expand Down Expand Up @@ -677,10 +670,7 @@ def _iter_dag_filelocs(fileloc: str) -> Iterator[str]:

dag_filelocs = {full_loc for path in self._file_paths for full_loc in _iter_dag_filelocs(path)}

DagModel.deactivate_deleted_dags(
dag_filelocs,
processor_subdir=self.get_dag_directory(),
)
DagModel.deactivate_deleted_dags(dag_filelocs)

return True

Expand All @@ -700,12 +690,11 @@ def clear_nonexistent_import_errors(self, session=NEW_SESSION):
:param session: session for ORM operations
"""
self.log.debug("Removing old import errors")
query = delete(ParseImportError).where(ParseImportError.processor_subdir == self.get_dag_directory())
query = delete(ParseImportError)

if self._file_paths:
query = query.where(
ParseImportError.filename.notin_(self._file_paths),
ParseImportError.processor_subdir == self.get_dag_directory(),
)

session.execute(query.execution_options(synchronize_session="fetch"))
Expand Down Expand Up @@ -862,7 +851,6 @@ def _collect_results(self, session: Session = NEW_SESSION):
run_count=self._file_stats[path].run_count,
parsing_result=proc.parsing_result,
path=path,
processor_subdir=self.get_dag_directory(),
session=session,
)

Expand Down Expand Up @@ -1105,7 +1093,6 @@ def process_parse_results(
run_count: int,
path: str,
parsing_result: DagFileParsingResult | None,
processor_subdir: str | None,
session: Session,
) -> DagFileStat:
"""Take the parsing result and stats about the parser process and convert it into a DagFileState."""
Expand All @@ -1127,7 +1114,6 @@ def process_parse_results(
dags=parsing_result.serialized_dags,
import_errors=parsing_result.import_errors or {},
warnings=set(parsing_result.warnings or []),
processor_subdir=processor_subdir,
session=session,
)
stat.num_dags = len(parsing_result.serialized_dags)
Expand Down
11 changes: 4 additions & 7 deletions airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -878,7 +878,6 @@ def process_executor_events(
full_filepath=ti.dag_model.fileloc,
ti=ti,
msg=msg,
processor_subdir=ti.dag_model.processor_subdir,
)
executor.send_callback(request)
else:
Expand Down Expand Up @@ -1707,7 +1706,6 @@ def _schedule_dag_run(
dag_id=dag.dag_id,
run_id=dag_run.run_id,
is_failure_callback=True,
processor_subdir=dag_model.processor_subdir,
msg="timed_out",
)

Expand Down Expand Up @@ -2066,11 +2064,11 @@ def _find_and_purge_zombies(self) -> None:
if zombies := self._find_zombies(session=session):
self._purge_zombies(zombies, session=session)

def _find_zombies(self, *, session: Session) -> list[tuple[TI, str, str]]:
def _find_zombies(self, *, session: Session) -> list[tuple[TI, str]]:
self.log.debug("Finding 'running' jobs without a recent heartbeat")
limit_dttm = timezone.utcnow() - timedelta(seconds=self._zombie_threshold_secs)
zombies = session.execute(
select(TI, DM.fileloc, DM.processor_subdir)
select(TI, DM.fileloc)
.with_hint(TI, "USE INDEX (ti_state)", dialect_name="mysql")
.join(DM, TI.dag_id == DM.dag_id)
.where(
Expand All @@ -2083,12 +2081,11 @@ def _find_zombies(self, *, session: Session) -> list[tuple[TI, str, str]]:
self.log.warning("Failing %s TIs without heartbeat after %s", len(zombies), limit_dttm)
return zombies

def _purge_zombies(self, zombies: list[tuple[TI, str, str]], *, session: Session) -> None:
for ti, file_loc, processor_subdir in zombies:
def _purge_zombies(self, zombies: list[tuple[TI, str]], *, session: Session) -> None:
for ti, file_loc in zombies:
zombie_message_details = self._generate_zombie_message_details(ti)
request = TaskCallbackRequest(
full_filepath=file_loc,
processor_subdir=processor_subdir,
ti=ti,
msg=str(zombie_message_details),
)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
Add deadline alerts table.
Revision ID: 237cef8dfea1
Revises: 038dc8bc6284
Create Date: 2024-12-05 22:08:38.997054
"""

from __future__ import annotations

import sqlalchemy as sa
import sqlalchemy_jsonfield
from alembic import op
from sqlalchemy_utils import UUIDType

from airflow.migrations.db_types import StringID
from airflow.models import ID_LEN

# revision identifiers, used by Alembic.
revision = "237cef8dfea1"
down_revision = "038dc8bc6284"
branch_labels = None
depends_on = None
airflow_version = "3.0.0"


def upgrade():
op.create_table(
"deadline",
sa.Column("id", UUIDType(binary=False), nullable=False),
sa.Column("dag_id", StringID(length=ID_LEN), nullable=True),
sa.Column("dagrun_id", sa.Integer(), nullable=True),
sa.Column("deadline", sa.DateTime(), nullable=False),
sa.Column("callback", sa.String(length=500), nullable=False),
sa.Column("callback_kwargs", sqlalchemy_jsonfield.jsonfield.JSONField(), nullable=True),
sa.PrimaryKeyConstraint("id", name=op.f("deadline_pkey")),
sa.ForeignKeyConstraint(columns=("dagrun_id",), refcolumns=["dag_run.id"], ondelete="CASCADE"),
sa.ForeignKeyConstraint(columns=("dag_id",), refcolumns=["dag.dag_id"], ondelete="CASCADE"),
sa.Index("deadline_idx", "deadline", unique=False),
)


def downgrade():
op.drop_table("deadline")
74 changes: 74 additions & 0 deletions airflow/migrations/versions/0053_3_0_0_remove_processor_subdir.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
Remove processor_subdir.
Revision ID: 5c9c0231baa2
Revises: 237cef8dfea1
Create Date: 2024-12-18 19:10:26.962464
"""

from __future__ import annotations

import sqlalchemy as sa
from alembic import op

revision = "5c9c0231baa2"
down_revision = "237cef8dfea1"
branch_labels = None
depends_on = None
airflow_version = "3.0.0"


def upgrade():
"""Apply Remove processor_subdir."""
with op.batch_alter_table("callback_request", schema=None) as batch_op:
batch_op.drop_column("processor_subdir")

with op.batch_alter_table("dag", schema=None) as batch_op:
batch_op.drop_column("processor_subdir")

with op.batch_alter_table("import_error", schema=None) as batch_op:
batch_op.drop_column("processor_subdir")

with op.batch_alter_table("serialized_dag", schema=None) as batch_op:
batch_op.drop_column("processor_subdir")


def downgrade():
"""Unapply Remove processor_subdir."""
with op.batch_alter_table("serialized_dag", schema=None) as batch_op:
batch_op.add_column(
sa.Column("processor_subdir", sa.VARCHAR(length=2000), autoincrement=False, nullable=True)
)

with op.batch_alter_table("import_error", schema=None) as batch_op:
batch_op.add_column(
sa.Column("processor_subdir", sa.VARCHAR(length=2000), autoincrement=False, nullable=True)
)

with op.batch_alter_table("dag", schema=None) as batch_op:
batch_op.add_column(
sa.Column("processor_subdir", sa.VARCHAR(length=2000), autoincrement=False, nullable=True)
)

with op.batch_alter_table("callback_request", schema=None) as batch_op:
batch_op.add_column(
sa.Column("processor_subdir", sa.VARCHAR(length=2000), autoincrement=False, nullable=True)
)
Loading

0 comments on commit 9569894

Please sign in to comment.