Skip to content

Commit 2788b86

Browse files
authored
fix(ingest/pipeline): Fix BatchPartitionExecutor Shutdown Race Condition (#14750)
1 parent 92bcccd commit 2788b86

File tree

6 files changed

+47
-14
lines changed

6 files changed

+47
-14
lines changed

metadata-ingestion/src/datahub/ingestion/api/sink.py

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import datetime
2+
import logging
23
from abc import ABCMeta, abstractmethod
34
from dataclasses import dataclass, field
4-
from typing import Any, Generic, Optional, Type, TypeVar, cast
5+
from typing import Any, Callable, Generic, List, Optional, Type, TypeVar, cast
56

67
from typing_extensions import Self
78

@@ -12,6 +13,8 @@
1213
from datahub.utilities.lossy_collections import LossyList
1314
from datahub.utilities.type_annotations import get_class_from_annotation
1415

16+
logger = logging.getLogger(__name__)
17+
1518

1619
@dataclass
1720
class SinkReport(Report):
@@ -89,6 +92,7 @@ class Sink(Generic[SinkConfig, SinkReportType], Closeable, metaclass=ABCMeta):
8992
ctx: PipelineContext
9093
config: SinkConfig
9194
report: SinkReportType
95+
_pre_shutdown_callbacks: List[Callable[[], None]]
9296

9397
@classmethod
9498
def get_config_class(cls) -> Type[SinkConfig]:
@@ -106,6 +110,7 @@ def __init__(self, ctx: PipelineContext, config: SinkConfig):
106110
self.ctx = ctx
107111
self.config = config
108112
self.report = self.get_report_class()()
113+
self._pre_shutdown_callbacks = []
109114

110115
self.__post_init__()
111116

@@ -144,8 +149,28 @@ def write_record_async(
144149
def get_report(self) -> SinkReportType:
145150
return self.report
146151

152+
def register_pre_shutdown_callback(self, callback: Callable[[], None]) -> None:
153+
"""Register a callback to be executed before the sink shuts down.
154+
155+
This is useful for components that need to send final reports or cleanup
156+
operations before the sink's resources are released.
157+
"""
158+
self._pre_shutdown_callbacks.append(callback)
159+
147160
def close(self) -> None:
148-
pass
161+
"""Close the sink and clean up resources.
162+
163+
This method executes any registered pre-shutdown callbacks before
164+
performing the actual shutdown. Subclasses should override this method
165+
to provide sink-specific cleanup logic while calling super().close()
166+
to ensure callbacks are executed.
167+
"""
168+
# Execute pre-shutdown callbacks before shutdown
169+
for callback in self._pre_shutdown_callbacks:
170+
try:
171+
callback()
172+
except Exception as e:
173+
logger.warning(f"Pre-shutdown callback failed: {e}", exc_info=True)
149174

150175
def configured(self) -> str:
151176
"""Override this method to output a human-readable and scrubbed version of the configured sink"""

metadata-ingestion/src/datahub/ingestion/reporting/datahub_ingestion_run_summary_provider.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,4 +268,4 @@ def on_completion(
268268
entity_urn=self.execution_request_input_urn,
269269
aspect_value=execution_result_aspect,
270270
)
271-
self.sink.close()
271+
# Note: sink.close() is handled by the pipeline's context manager

metadata-ingestion/src/datahub/ingestion/run/pipeline.py

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,11 @@ def __init__(
265265
with _add_init_error_context("configure transformers"):
266266
self._configure_transforms()
267267

268+
# Register completion callback with sink to handle final reporting
269+
self.sink.register_pre_shutdown_callback(
270+
self._notify_reporters_on_ingestion_completion
271+
)
272+
268273
# If all of the initialization succeeds, we can preserve the exit stack until the pipeline run.
269274
# We need to use an exit stack so that if we have an exception during initialization,
270275
# things that were already initialized are still cleaned up.
@@ -344,8 +349,8 @@ def _notify_reporters_on_ingestion_start(self) -> None:
344349
for reporter in self.reporters:
345350
try:
346351
reporter.on_start(ctx=self.ctx)
347-
except Exception as e:
348-
logger.warning("Reporting failed on start", exc_info=e)
352+
except Exception:
353+
logger.warning("Reporting failed on start", exc_info=True)
349354

350355
def _warn_old_cli_version(self) -> None:
351356
"""
@@ -407,8 +412,8 @@ def _notify_reporters_on_ingestion_completion(self) -> None:
407412
report=self._get_structured_report(),
408413
ctx=self.ctx,
409414
)
410-
except Exception as e:
411-
logger.warning("Reporting failed on completion", exc_info=e)
415+
except Exception:
416+
logger.warning("Reporting failed on completion", exc_info=True)
412417

413418
@classmethod
414419
def create(
@@ -521,10 +526,10 @@ def run(self) -> None:
521526

522527
except (RuntimeError, SystemExit):
523528
raise
524-
except Exception as e:
529+
except Exception:
525530
logger.error(
526531
"Failed to process some records. Continuing.",
527-
exc_info=e,
532+
exc_info=True,
528533
)
529534
# TODO: Transformer errors should be reported more loudly / as part of the pipeline report.
530535

@@ -553,19 +558,16 @@ def run(self) -> None:
553558

554559
self.process_commits()
555560
self.final_status = PipelineStatus.COMPLETED
556-
except (SystemExit, KeyboardInterrupt) as e:
561+
except (SystemExit, KeyboardInterrupt):
557562
self.final_status = PipelineStatus.CANCELLED
558-
logger.error("Caught error", exc_info=e)
563+
logger.error("Caught error", exc_info=True)
559564
raise
560565
except Exception as exc:
561566
self.final_status = PipelineStatus.ERROR
562567
self._handle_uncaught_pipeline_exception(exc)
563568
finally:
564569
clear_global_warnings()
565570

566-
# This can't be in the finally part because this should happen after context manager exists
567-
self._notify_reporters_on_ingestion_completion()
568-
569571
def transform(self, records: Iterable[RecordEnvelope]) -> Iterable[RecordEnvelope]:
570572
"""
571573
Transforms the given sequence of records by passing the records through the transformers

metadata-ingestion/src/datahub/ingestion/sink/datahub_kafka.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,4 +74,5 @@ def write_record_async(
7474
callback(err, f"Failed to write record: {err}")
7575

7676
def close(self) -> None:
77+
super().close()
7778
self.emitter.flush()

metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,10 @@ def emit_async(
349349
)
350350

351351
def close(self):
352+
# Execute pre-shutdown callbacks first (handled by parent class)
353+
super().close()
354+
355+
# Then perform sink-specific shutdown
352356
with self.report.main_thread_blocking_timer:
353357
self.executor.shutdown()
354358

metadata-ingestion/src/datahub/ingestion/sink/file.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ def write_record_async(
7979
write_callback.on_success(record_envelope, {})
8080

8181
def close(self):
82+
super().close()
8283
self.file.write("\n]")
8384
self.file.close()
8485

0 commit comments

Comments
 (0)