diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py index 9c7af83673abe..7eaa0bed63168 100644 --- a/airflow/dag_processing/manager.py +++ b/airflow/dag_processing/manager.py @@ -453,11 +453,21 @@ def _refresh_dag_bundles(self): elapsed_time_since_refresh = ( now - (bundle_model.last_refreshed or timezone.utc_epoch()) ).total_seconds() - pre_refresh_version = bundle.get_current_version() + if bundle.supports_versioning: + # we will also check the version of the bundle to see if another DAG processor has seen + # a new version + pre_refresh_version = ( + self._bundle_versions.get(bundle.name) or bundle.get_current_version() + ) + current_version_matches_db = pre_refresh_version == bundle_model.version + else: + # With no versioning, it always "matches" + current_version_matches_db = True + previously_seen = bundle.name in self._bundle_versions if ( elapsed_time_since_refresh < bundle.refresh_interval - and bundle_model.version == pre_refresh_version + and current_version_matches_db and previously_seen ): self.log.info("Not time to refresh %s", bundle.name) @@ -471,13 +481,17 @@ def _refresh_dag_bundles(self): bundle_model.last_refreshed = now - version_after_refresh = bundle.get_current_version() if bundle.supports_versioning: # We can short-circuit the rest of this if (1) bundle was seen before by # this dag processor and (2) the version of the bundle did not change # after refreshing it + version_after_refresh = bundle.get_current_version() if previously_seen and pre_refresh_version == version_after_refresh: - self.log.debug("Bundle %s version not changed after refresh", bundle.name) + self.log.debug( + "Bundle %s version not changed after refresh: %s", + bundle.name, + version_after_refresh, + ) continue bundle_model.version = version_after_refresh @@ -485,6 +499,10 @@ def _refresh_dag_bundles(self): self.log.info( "Version changed for %s, new version: %s", bundle.name, version_after_refresh ) + else: + version_after_refresh = None + + self._bundle_versions[bundle.name] = version_after_refresh found_files = [ DagFileInfo(rel_path=p, bundle_name=bundle.name, bundle_path=bundle.path) @@ -503,8 +521,6 @@ def _refresh_dag_bundles(self): self.deactivate_deleted_dags(active_files=found_files) self.clear_nonexistent_import_errors() - self._bundle_versions[bundle.name] = bundle.get_current_version() - def _find_files_in_bundle(self, bundle: BaseDagBundle) -> list[Path]: """Get relative paths for dag files from bundle dir.""" # Build up a list of Python files that could contain DAGs diff --git a/tests/dag_processing/test_manager.py b/tests/dag_processing/test_manager.py index d2dacd3587087..ebc3536c83f23 100644 --- a/tests/dag_processing/test_manager.py +++ b/tests/dag_processing/test_manager.py @@ -62,6 +62,7 @@ from tests_common.test_utils.db import ( clear_db_assets, clear_db_callbacks, + clear_db_dag_bundles, clear_db_dags, clear_db_import_errors, clear_db_runs, @@ -93,6 +94,7 @@ def setup_method(self): clear_db_dags() clear_db_callbacks() clear_db_import_errors() + clear_db_dag_bundles() def teardown_class(self): clear_db_assets() @@ -101,6 +103,7 @@ def teardown_class(self): clear_db_dags() clear_db_callbacks() clear_db_import_errors() + clear_db_dag_bundles() def mock_processor(self) -> DagFileProcessorProcess: proc = MagicMock() @@ -575,7 +578,9 @@ def test_send_file_processing_statsd_timing( any_order=True, ) - def test_refresh_dags_dir_doesnt_delete_zipped_dags(self, tmp_path, configure_testing_dag_bundle): + def test_refresh_dags_dir_doesnt_delete_zipped_dags( + self, tmp_path, testing_dag_bundle, configure_testing_dag_bundle + ): """Test DagFileProcessorManager._refresh_dag_dir method""" dagbag = DagBag(dag_folder=tmp_path, include_examples=False) zipped_dag_path = os.path.join(TEST_DAGS_FOLDER, "test_zip.zip") @@ -869,14 +874,14 @@ def _update_bundletwo_version(): def test_bundles_versions_are_stored(self, session): config = [ { - "name": "mybundle", + "name": "bundleone", "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", "kwargs": {"path": "/dev/null", "refresh_interval": 0}, }, ] mybundle = MagicMock() - mybundle.name = "mybundle" + mybundle.name = "bundleone" mybundle.path = "/dev/null" mybundle.refresh_interval = 0 mybundle.supports_versioning = True @@ -885,11 +890,70 @@ def test_bundles_versions_are_stored(self, session): with conf_vars({("dag_processor", "dag_bundle_config_list"): json.dumps(config)}): DagBundlesManager().sync_bundles_to_db() with mock.patch("airflow.dag_processing.manager.DagBundlesManager") as mock_bundle_manager: - mock_bundle_manager.return_value._bundle_config = {"mybundle": None} + mock_bundle_manager.return_value._bundle_config = {"bundleone": None} mock_bundle_manager.return_value.get_all_dag_bundles.return_value = [mybundle] manager = DagFileProcessorManager(max_runs=1) manager.run() with create_session() as session: - model = session.get(DagBundleModel, "mybundle") + model = session.get(DagBundleModel, "bundleone") assert model.version == "123" + + def test_non_versioned_bundle_get_version_not_called(self): + config = [ + { + "name": "bundleone", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {"path": "/dev/null", "refresh_interval": 0}, + }, + ] + + bundleone = MagicMock() + bundleone.name = "bundleone" + bundleone.refresh_interval = 0 + bundleone.supports_versioning = False + bundleone.path = Path("/dev/null") + + with conf_vars({("dag_processor", "dag_bundle_config_list"): json.dumps(config)}): + DagBundlesManager().sync_bundles_to_db() + with mock.patch("airflow.dag_processing.manager.DagBundlesManager") as mock_bundle_manager: + mock_bundle_manager.return_value._bundle_config = {"bundleone": None} + mock_bundle_manager.return_value.get_all_dag_bundles.return_value = [bundleone] + manager = DagFileProcessorManager(max_runs=1) + manager.run() + + bundleone.refresh.assert_called_once() + bundleone.get_current_version.assert_not_called() + + def test_versioned_bundle_get_version_called_once(self): + """Make sure in a normal "warm" loop, get_current_version is called just once after refresha""" + + config = [ + { + "name": "bundleone", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {"path": "/dev/null", "refresh_interval": 0}, + }, + ] + + bundleone = MagicMock() + bundleone.name = "bundleone" + bundleone.refresh_interval = 0 + bundleone.supports_versioning = True + bundleone.get_current_version.return_value = "123" + bundleone.path = Path("/dev/null") + + with conf_vars({("dag_processor", "dag_bundle_config_list"): json.dumps(config)}): + DagBundlesManager().sync_bundles_to_db() + with mock.patch("airflow.dag_processing.manager.DagBundlesManager") as mock_bundle_manager: + mock_bundle_manager.return_value._bundle_config = {"bundleone": None} + mock_bundle_manager.return_value.get_all_dag_bundles.return_value = [bundleone] + manager = DagFileProcessorManager(max_runs=1) + manager.run() # run it once to warm up + + # now run it again so we can check we only call get_current_version once + bundleone.refresh.reset_mock() + bundleone.get_current_version.reset_mock() + manager.run() + bundleone.refresh.assert_called_once() + bundleone.get_current_version.assert_called_once()