From 37f9c770d705bfa15856e8f7650eef8802b65591 Mon Sep 17 00:00:00 2001 From: Jed Cunningham Date: Thu, 23 Jan 2025 20:01:02 -0700 Subject: [PATCH 1/4] Call `get_current_version` less often in bundle refresh loop In the bundle refresh loop, we can call `get_current_version` a lot less often, as 1) we can skip it for bundles that do not support versioning and 2) for those that do, we already know the version from the last time we refreshed! Since this is a local call, this isn't a huge gain. But every little bit helps. --- airflow/dag_processing/manager.py | 28 +++++++++--- tests/dag_processing/test_manager.py | 67 +++++++++++++++++++++++++++- 2 files changed, 87 insertions(+), 8 deletions(-) diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py index 9c7af83673abe..7eaa0bed63168 100644 --- a/airflow/dag_processing/manager.py +++ b/airflow/dag_processing/manager.py @@ -453,11 +453,21 @@ def _refresh_dag_bundles(self): elapsed_time_since_refresh = ( now - (bundle_model.last_refreshed or timezone.utc_epoch()) ).total_seconds() - pre_refresh_version = bundle.get_current_version() + if bundle.supports_versioning: + # we will also check the version of the bundle to see if another DAG processor has seen + # a new version + pre_refresh_version = ( + self._bundle_versions.get(bundle.name) or bundle.get_current_version() + ) + current_version_matches_db = pre_refresh_version == bundle_model.version + else: + # With no versioning, it always "matches" + current_version_matches_db = True + previously_seen = bundle.name in self._bundle_versions if ( elapsed_time_since_refresh < bundle.refresh_interval - and bundle_model.version == pre_refresh_version + and current_version_matches_db and previously_seen ): self.log.info("Not time to refresh %s", bundle.name) @@ -471,13 +481,17 @@ def _refresh_dag_bundles(self): bundle_model.last_refreshed = now - version_after_refresh = bundle.get_current_version() if bundle.supports_versioning: # We can short-circuit the rest of this if (1) bundle was seen before by # this dag processor and (2) the version of the bundle did not change # after refreshing it + version_after_refresh = bundle.get_current_version() if previously_seen and pre_refresh_version == version_after_refresh: - self.log.debug("Bundle %s version not changed after refresh", bundle.name) + self.log.debug( + "Bundle %s version not changed after refresh: %s", + bundle.name, + version_after_refresh, + ) continue bundle_model.version = version_after_refresh @@ -485,6 +499,10 @@ def _refresh_dag_bundles(self): self.log.info( "Version changed for %s, new version: %s", bundle.name, version_after_refresh ) + else: + version_after_refresh = None + + self._bundle_versions[bundle.name] = version_after_refresh found_files = [ DagFileInfo(rel_path=p, bundle_name=bundle.name, bundle_path=bundle.path) @@ -503,8 +521,6 @@ def _refresh_dag_bundles(self): self.deactivate_deleted_dags(active_files=found_files) self.clear_nonexistent_import_errors() - self._bundle_versions[bundle.name] = bundle.get_current_version() - def _find_files_in_bundle(self, bundle: BaseDagBundle) -> list[Path]: """Get relative paths for dag files from bundle dir.""" # Build up a list of Python files that could contain DAGs diff --git a/tests/dag_processing/test_manager.py b/tests/dag_processing/test_manager.py index d2dacd3587087..43a4299b4495d 100644 --- a/tests/dag_processing/test_manager.py +++ b/tests/dag_processing/test_manager.py @@ -62,6 +62,7 @@ from tests_common.test_utils.db import ( clear_db_assets, clear_db_callbacks, + clear_db_dag_bundles, clear_db_dags, clear_db_import_errors, clear_db_runs, @@ -93,6 +94,7 @@ def setup_method(self): clear_db_dags() clear_db_callbacks() clear_db_import_errors() + clear_db_dag_bundles() def teardown_class(self): clear_db_assets() @@ -101,6 +103,7 @@ def teardown_class(self): clear_db_dags() clear_db_callbacks() clear_db_import_errors() + clear_db_dag_bundles() def mock_processor(self) -> DagFileProcessorProcess: proc = MagicMock() @@ -575,7 +578,9 @@ def test_send_file_processing_statsd_timing( any_order=True, ) - def test_refresh_dags_dir_doesnt_delete_zipped_dags(self, tmp_path, configure_testing_dag_bundle): + def test_refresh_dags_dir_doesnt_delete_zipped_dags( + self, tmp_path, testing_dag_bundle, configure_testing_dag_bundle + ): """Test DagFileProcessorManager._refresh_dag_dir method""" dagbag = DagBag(dag_folder=tmp_path, include_examples=False) zipped_dag_path = os.path.join(TEST_DAGS_FOLDER, "test_zip.zip") @@ -869,7 +874,7 @@ def _update_bundletwo_version(): def test_bundles_versions_are_stored(self, session): config = [ { - "name": "mybundle", + "name": "bundleone", "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", "kwargs": {"path": "/dev/null", "refresh_interval": 0}, }, @@ -893,3 +898,61 @@ def test_bundles_versions_are_stored(self, session): with create_session() as session: model = session.get(DagBundleModel, "mybundle") assert model.version == "123" + + def test_non_versioned_bundle_get_version_not_called(self): + config = [ + { + "name": "bundleone", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {"path": "/dev/null", "refresh_interval": 0}, + }, + ] + + mybundle = MagicMock() + mybundle.name = "bundleone" + mybundle.refresh_interval = 0 + mybundle.supports_versioning = False + + with conf_vars({("dag_bundles", "config_list"): json.dumps(config)}): + DagBundlesManager().sync_bundles_to_db() + with mock.patch( + "airflow.dag_processing.bundles.manager.DagBundlesManager" + ) as mock_bundle_manager: + mock_bundle_manager.return_value._bundle_config = {"bundleone": None} + mock_bundle_manager.return_value.get_all_dag_bundles.return_value = [mybundle] + manager = DagFileProcessorManager(max_runs=1) + manager.run() + + mybundle.get_current_version.assert_not_called() + + def test_versioned_bundle_get_version_called_once(self): + """Make sure in a normal "warm" loop, get_current_version is called just once after refresha""" + + config = [ + { + "name": "bundleone", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {"path": "/dev/null", "refresh_interval": 0}, + }, + ] + + mybundle = MagicMock() + mybundle.name = "bundleone" + mybundle.refresh_interval = 0 + mybundle.supports_versioning = True + mybundle.get_current_version.return_value = "123" + + with conf_vars({("dag_bundles", "config_list"): json.dumps(config)}): + DagBundlesManager().sync_bundles_to_db() + with mock.patch( + "airflow.dag_processing.bundles.manager.DagBundlesManager" + ) as mock_bundle_manager: + mock_bundle_manager.return_value._bundle_config = {"bundleone": None} + mock_bundle_manager.return_value.get_all_dag_bundles.return_value = [mybundle] + manager = DagFileProcessorManager(max_runs=1) + manager.run() # run it once to warm up + + # now run it again so we can check we only call get_current_version once + mybundle.get_current_version.reset_mock() + manager.run() + mybundle.get_current_version.assert_called_once() From 81661ccd9b0d11b8c0069446ea1892418c207ca8 Mon Sep 17 00:00:00 2001 From: Jed Cunningham Date: Wed, 29 Jan 2025 15:30:24 -0700 Subject: [PATCH 2/4] Fix config mocks --- tests/dag_processing/test_manager.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/dag_processing/test_manager.py b/tests/dag_processing/test_manager.py index 43a4299b4495d..67dbd6e36628e 100644 --- a/tests/dag_processing/test_manager.py +++ b/tests/dag_processing/test_manager.py @@ -913,7 +913,7 @@ def test_non_versioned_bundle_get_version_not_called(self): mybundle.refresh_interval = 0 mybundle.supports_versioning = False - with conf_vars({("dag_bundles", "config_list"): json.dumps(config)}): + with conf_vars({("dag_processor", "dag_bundle_config_list"): json.dumps(config)}): DagBundlesManager().sync_bundles_to_db() with mock.patch( "airflow.dag_processing.bundles.manager.DagBundlesManager" @@ -942,7 +942,7 @@ def test_versioned_bundle_get_version_called_once(self): mybundle.supports_versioning = True mybundle.get_current_version.return_value = "123" - with conf_vars({("dag_bundles", "config_list"): json.dumps(config)}): + with conf_vars({("dag_processor", "dag_bundle_config_list"): json.dumps(config)}): DagBundlesManager().sync_bundles_to_db() with mock.patch( "airflow.dag_processing.bundles.manager.DagBundlesManager" From 0d6046d5a7d41da86d2cee7c5e9cb9f4b21c947d Mon Sep 17 00:00:00 2001 From: Jed Cunningham Date: Thu, 30 Jan 2025 22:28:27 -0700 Subject: [PATCH 3/4] More test fixes --- tests/dag_processing/test_manager.py | 54 +++++++++++++++++++++------- 1 file changed, 42 insertions(+), 12 deletions(-) diff --git a/tests/dag_processing/test_manager.py b/tests/dag_processing/test_manager.py index 67dbd6e36628e..0496101db187f 100644 --- a/tests/dag_processing/test_manager.py +++ b/tests/dag_processing/test_manager.py @@ -881,7 +881,7 @@ def test_bundles_versions_are_stored(self, session): ] mybundle = MagicMock() - mybundle.name = "mybundle" + mybundle.name = "bundleone" mybundle.path = "/dev/null" mybundle.refresh_interval = 0 mybundle.supports_versioning = True @@ -890,13 +890,13 @@ def test_bundles_versions_are_stored(self, session): with conf_vars({("dag_processor", "dag_bundle_config_list"): json.dumps(config)}): DagBundlesManager().sync_bundles_to_db() with mock.patch("airflow.dag_processing.manager.DagBundlesManager") as mock_bundle_manager: - mock_bundle_manager.return_value._bundle_config = {"mybundle": None} + mock_bundle_manager.return_value._bundle_config = {"bundleone": None} mock_bundle_manager.return_value.get_all_dag_bundles.return_value = [mybundle] manager = DagFileProcessorManager(max_runs=1) manager.run() with create_session() as session: - model = session.get(DagBundleModel, "mybundle") + model = session.get(DagBundleModel, "bundleone") assert model.version == "123" def test_non_versioned_bundle_get_version_not_called(self): @@ -923,7 +923,36 @@ def test_non_versioned_bundle_get_version_not_called(self): manager = DagFileProcessorManager(max_runs=1) manager.run() - mybundle.get_current_version.assert_not_called() + with create_session() as session: + model = session.get(DagBundleModel, "bundleone") + assert model.version == "123" + + def test_non_versioned_bundle_get_version_not_called(self): + config = [ + { + "name": "bundleone", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {"path": "/dev/null", "refresh_interval": 0}, + }, + ] + + bundleone = MagicMock() + bundleone.name = "bundleone" + bundleone.refresh_interval = 0 + bundleone.supports_versioning = False + bundleone.path = Path("/dev/null") + + with conf_vars({("dag_processor", "dag_bundle_config_list"): json.dumps(config)}): + DagBundlesManager().sync_bundles_to_db() + with mock.patch( + "airflow.dag_processing.bundles.manager.DagBundlesManager" + ) as mock_bundle_manager: + mock_bundle_manager.return_value._bundle_config = {"bundleone": None} + mock_bundle_manager.return_value.get_all_dag_bundles.return_value = [bundleone] + manager = DagFileProcessorManager(max_runs=1) + manager.run() + + bundleone.get_current_version.assert_not_called() def test_versioned_bundle_get_version_called_once(self): """Make sure in a normal "warm" loop, get_current_version is called just once after refresha""" @@ -936,11 +965,12 @@ def test_versioned_bundle_get_version_called_once(self): }, ] - mybundle = MagicMock() - mybundle.name = "bundleone" - mybundle.refresh_interval = 0 - mybundle.supports_versioning = True - mybundle.get_current_version.return_value = "123" + bundleone = MagicMock() + bundleone.name = "bundleone" + bundleone.refresh_interval = 0 + bundleone.supports_versioning = True + bundleone.get_current_version.return_value = "123" + bundleone.path = Path("/dev/null") with conf_vars({("dag_processor", "dag_bundle_config_list"): json.dumps(config)}): DagBundlesManager().sync_bundles_to_db() @@ -948,11 +978,11 @@ def test_versioned_bundle_get_version_called_once(self): "airflow.dag_processing.bundles.manager.DagBundlesManager" ) as mock_bundle_manager: mock_bundle_manager.return_value._bundle_config = {"bundleone": None} - mock_bundle_manager.return_value.get_all_dag_bundles.return_value = [mybundle] + mock_bundle_manager.return_value.get_all_dag_bundles.return_value = [bundleone] manager = DagFileProcessorManager(max_runs=1) manager.run() # run it once to warm up # now run it again so we can check we only call get_current_version once - mybundle.get_current_version.reset_mock() + bundleone.get_current_version.reset_mock() manager.run() - mybundle.get_current_version.assert_called_once() + bundleone.get_current_version.assert_called_once() From b6142c2103325a17f9e225bc73c161b6fac602f8 Mon Sep 17 00:00:00 2001 From: Jed Cunningham Date: Mon, 3 Feb 2025 14:23:42 -0700 Subject: [PATCH 4/4] Fix a bad rebase / couple more asserts :) --- tests/dag_processing/test_manager.py | 39 ++++------------------------ 1 file changed, 5 insertions(+), 34 deletions(-) diff --git a/tests/dag_processing/test_manager.py b/tests/dag_processing/test_manager.py index 0496101db187f..ebc3536c83f23 100644 --- a/tests/dag_processing/test_manager.py +++ b/tests/dag_processing/test_manager.py @@ -899,34 +899,6 @@ def test_bundles_versions_are_stored(self, session): model = session.get(DagBundleModel, "bundleone") assert model.version == "123" - def test_non_versioned_bundle_get_version_not_called(self): - config = [ - { - "name": "bundleone", - "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", - "kwargs": {"path": "/dev/null", "refresh_interval": 0}, - }, - ] - - mybundle = MagicMock() - mybundle.name = "bundleone" - mybundle.refresh_interval = 0 - mybundle.supports_versioning = False - - with conf_vars({("dag_processor", "dag_bundle_config_list"): json.dumps(config)}): - DagBundlesManager().sync_bundles_to_db() - with mock.patch( - "airflow.dag_processing.bundles.manager.DagBundlesManager" - ) as mock_bundle_manager: - mock_bundle_manager.return_value._bundle_config = {"bundleone": None} - mock_bundle_manager.return_value.get_all_dag_bundles.return_value = [mybundle] - manager = DagFileProcessorManager(max_runs=1) - manager.run() - - with create_session() as session: - model = session.get(DagBundleModel, "bundleone") - assert model.version == "123" - def test_non_versioned_bundle_get_version_not_called(self): config = [ { @@ -944,14 +916,13 @@ def test_non_versioned_bundle_get_version_not_called(self): with conf_vars({("dag_processor", "dag_bundle_config_list"): json.dumps(config)}): DagBundlesManager().sync_bundles_to_db() - with mock.patch( - "airflow.dag_processing.bundles.manager.DagBundlesManager" - ) as mock_bundle_manager: + with mock.patch("airflow.dag_processing.manager.DagBundlesManager") as mock_bundle_manager: mock_bundle_manager.return_value._bundle_config = {"bundleone": None} mock_bundle_manager.return_value.get_all_dag_bundles.return_value = [bundleone] manager = DagFileProcessorManager(max_runs=1) manager.run() + bundleone.refresh.assert_called_once() bundleone.get_current_version.assert_not_called() def test_versioned_bundle_get_version_called_once(self): @@ -974,15 +945,15 @@ def test_versioned_bundle_get_version_called_once(self): with conf_vars({("dag_processor", "dag_bundle_config_list"): json.dumps(config)}): DagBundlesManager().sync_bundles_to_db() - with mock.patch( - "airflow.dag_processing.bundles.manager.DagBundlesManager" - ) as mock_bundle_manager: + with mock.patch("airflow.dag_processing.manager.DagBundlesManager") as mock_bundle_manager: mock_bundle_manager.return_value._bundle_config = {"bundleone": None} mock_bundle_manager.return_value.get_all_dag_bundles.return_value = [bundleone] manager = DagFileProcessorManager(max_runs=1) manager.run() # run it once to warm up # now run it again so we can check we only call get_current_version once + bundleone.refresh.reset_mock() bundleone.get_current_version.reset_mock() manager.run() + bundleone.refresh.assert_called_once() bundleone.get_current_version.assert_called_once()