Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Call get_current_version less often in bundle refresh loop #46261

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 22 additions & 6 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -453,11 +453,21 @@ def _refresh_dag_bundles(self):
elapsed_time_since_refresh = (
now - (bundle_model.last_refreshed or timezone.utc_epoch())
).total_seconds()
pre_refresh_version = bundle.get_current_version()
if bundle.supports_versioning:
# we will also check the version of the bundle to see if another DAG processor has seen
# a new version
pre_refresh_version = (
self._bundle_versions.get(bundle.name) or bundle.get_current_version()
)
current_version_matches_db = pre_refresh_version == bundle_model.version
else:
# With no versioning, it always "matches"
current_version_matches_db = True

previously_seen = bundle.name in self._bundle_versions
if (
elapsed_time_since_refresh < bundle.refresh_interval
and bundle_model.version == pre_refresh_version
and current_version_matches_db
and previously_seen
):
self.log.info("Not time to refresh %s", bundle.name)
Expand All @@ -471,20 +481,28 @@ def _refresh_dag_bundles(self):

bundle_model.last_refreshed = now

version_after_refresh = bundle.get_current_version()
if bundle.supports_versioning:
# We can short-circuit the rest of this if (1) bundle was seen before by
# this dag processor and (2) the version of the bundle did not change
# after refreshing it
version_after_refresh = bundle.get_current_version()
if previously_seen and pre_refresh_version == version_after_refresh:
self.log.debug("Bundle %s version not changed after refresh", bundle.name)
self.log.debug(
"Bundle %s version not changed after refresh: %s",
bundle.name,
version_after_refresh,
)
continue

bundle_model.version = version_after_refresh

self.log.info(
"Version changed for %s, new version: %s", bundle.name, version_after_refresh
)
else:
version_after_refresh = None

self._bundle_versions[bundle.name] = version_after_refresh

found_files = [
DagFileInfo(rel_path=p, bundle_name=bundle.name, bundle_path=bundle.path)
Expand All @@ -503,8 +521,6 @@ def _refresh_dag_bundles(self):
self.deactivate_deleted_dags(active_files=found_files)
self.clear_nonexistent_import_errors()

self._bundle_versions[bundle.name] = bundle.get_current_version()

def _find_files_in_bundle(self, bundle: BaseDagBundle) -> list[Path]:
"""Get relative paths for dag files from bundle dir."""
# Build up a list of Python files that could contain DAGs
Expand Down
74 changes: 69 additions & 5 deletions tests/dag_processing/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
from tests_common.test_utils.db import (
clear_db_assets,
clear_db_callbacks,
clear_db_dag_bundles,
clear_db_dags,
clear_db_import_errors,
clear_db_runs,
Expand Down Expand Up @@ -93,6 +94,7 @@ def setup_method(self):
clear_db_dags()
clear_db_callbacks()
clear_db_import_errors()
clear_db_dag_bundles()

def teardown_class(self):
clear_db_assets()
Expand All @@ -101,6 +103,7 @@ def teardown_class(self):
clear_db_dags()
clear_db_callbacks()
clear_db_import_errors()
clear_db_dag_bundles()

def mock_processor(self) -> DagFileProcessorProcess:
proc = MagicMock()
Expand Down Expand Up @@ -575,7 +578,9 @@ def test_send_file_processing_statsd_timing(
any_order=True,
)

def test_refresh_dags_dir_doesnt_delete_zipped_dags(self, tmp_path, configure_testing_dag_bundle):
def test_refresh_dags_dir_doesnt_delete_zipped_dags(
self, tmp_path, testing_dag_bundle, configure_testing_dag_bundle
):
"""Test DagFileProcessorManager._refresh_dag_dir method"""
dagbag = DagBag(dag_folder=tmp_path, include_examples=False)
zipped_dag_path = os.path.join(TEST_DAGS_FOLDER, "test_zip.zip")
Expand Down Expand Up @@ -869,14 +874,14 @@ def _update_bundletwo_version():
def test_bundles_versions_are_stored(self, session):
config = [
{
"name": "mybundle",
"name": "bundleone",
"classpath": "airflow.dag_processing.bundles.local.LocalDagBundle",
"kwargs": {"path": "/dev/null", "refresh_interval": 0},
},
]

mybundle = MagicMock()
mybundle.name = "mybundle"
mybundle.name = "bundleone"
mybundle.path = "/dev/null"
mybundle.refresh_interval = 0
mybundle.supports_versioning = True
Expand All @@ -885,11 +890,70 @@ def test_bundles_versions_are_stored(self, session):
with conf_vars({("dag_processor", "dag_bundle_config_list"): json.dumps(config)}):
DagBundlesManager().sync_bundles_to_db()
with mock.patch("airflow.dag_processing.manager.DagBundlesManager") as mock_bundle_manager:
mock_bundle_manager.return_value._bundle_config = {"mybundle": None}
mock_bundle_manager.return_value._bundle_config = {"bundleone": None}
mock_bundle_manager.return_value.get_all_dag_bundles.return_value = [mybundle]
manager = DagFileProcessorManager(max_runs=1)
manager.run()

with create_session() as session:
model = session.get(DagBundleModel, "mybundle")
model = session.get(DagBundleModel, "bundleone")
assert model.version == "123"

def test_non_versioned_bundle_get_version_not_called(self):
config = [
{
"name": "bundleone",
"classpath": "airflow.dag_processing.bundles.local.LocalDagBundle",
"kwargs": {"path": "/dev/null", "refresh_interval": 0},
},
]

bundleone = MagicMock()
bundleone.name = "bundleone"
bundleone.refresh_interval = 0
bundleone.supports_versioning = False
bundleone.path = Path("/dev/null")

with conf_vars({("dag_processor", "dag_bundle_config_list"): json.dumps(config)}):
DagBundlesManager().sync_bundles_to_db()
with mock.patch("airflow.dag_processing.manager.DagBundlesManager") as mock_bundle_manager:
mock_bundle_manager.return_value._bundle_config = {"bundleone": None}
mock_bundle_manager.return_value.get_all_dag_bundles.return_value = [bundleone]
manager = DagFileProcessorManager(max_runs=1)
manager.run()

bundleone.refresh.assert_called_once()
bundleone.get_current_version.assert_not_called()

def test_versioned_bundle_get_version_called_once(self):
"""Make sure in a normal "warm" loop, get_current_version is called just once after refresha"""

config = [
{
"name": "bundleone",
"classpath": "airflow.dag_processing.bundles.local.LocalDagBundle",
"kwargs": {"path": "/dev/null", "refresh_interval": 0},
},
]

bundleone = MagicMock()
bundleone.name = "bundleone"
bundleone.refresh_interval = 0
bundleone.supports_versioning = True
bundleone.get_current_version.return_value = "123"
bundleone.path = Path("/dev/null")

with conf_vars({("dag_processor", "dag_bundle_config_list"): json.dumps(config)}):
DagBundlesManager().sync_bundles_to_db()
with mock.patch("airflow.dag_processing.manager.DagBundlesManager") as mock_bundle_manager:
mock_bundle_manager.return_value._bundle_config = {"bundleone": None}
mock_bundle_manager.return_value.get_all_dag_bundles.return_value = [bundleone]
manager = DagFileProcessorManager(max_runs=1)
manager.run() # run it once to warm up

# now run it again so we can check we only call get_current_version once
bundleone.refresh.reset_mock()
bundleone.get_current_version.reset_mock()
manager.run()
bundleone.refresh.assert_called_once()
bundleone.get_current_version.assert_called_once()