From c59d8b70e100b3500285e8443e57ca2848ad7162 Mon Sep 17 00:00:00 2001 From: estelle Date: Tue, 9 Dec 2025 14:13:20 +0100 Subject: [PATCH 1/8] Send pipeline failed events to the event callback --- .../experimental/pipeline/notification.py | 17 ++++++++ .../experimental/pipeline/orchestrator.py | 6 +-- .../experimental/pipeline/pipeline.py | 39 +++++++++++-------- .../experimental/pipeline/test_pipeline.py | 11 +++--- 4 files changed, 48 insertions(+), 25 deletions(-) diff --git a/src/neo4j_graphrag/experimental/pipeline/notification.py b/src/neo4j_graphrag/experimental/pipeline/notification.py index 94665fe2c..8a0763dac 100644 --- a/src/neo4j_graphrag/experimental/pipeline/notification.py +++ b/src/neo4j_graphrag/experimental/pipeline/notification.py @@ -86,6 +86,12 @@ class EventNotifier: def __init__(self, callbacks: list[EventCallbackProtocol]) -> None: self.callbacks = callbacks + def add_callback(self, callback: EventCallbackProtocol) -> None: + self.callbacks.append(callback) + + def remove_callback(self, callback: EventCallbackProtocol) -> None: + self.callbacks.remove(callback) + async def notify(self, event: Event) -> None: await asyncio.gather( *[c(event) for c in self.callbacks], @@ -114,6 +120,17 @@ async def notify_pipeline_finished( ) await self.notify(event) + async def notify_pipeline_failed( + self, run_id: str, message: Optional[str] = None + ) -> None: + event = PipelineEvent( + event_type=EventType.PIPELINE_FAILED, + run_id=run_id, + message=message, + payload=None, + ) + await self.notify(event) + async def notify_task_started( self, run_id: str, diff --git a/src/neo4j_graphrag/experimental/pipeline/orchestrator.py b/src/neo4j_graphrag/experimental/pipeline/orchestrator.py index b5536b537..5a50b7855 100644 --- a/src/neo4j_graphrag/experimental/pipeline/orchestrator.py +++ b/src/neo4j_graphrag/experimental/pipeline/orchestrator.py @@ -52,10 +52,10 @@ class Orchestrator: (checking that all dependencies are met), and run them. """ - def __init__(self, pipeline: Pipeline): + def __init__(self, pipeline: Pipeline, run_id: Optional[str] = None): self.pipeline = pipeline - self.event_notifier = EventNotifier(pipeline.callbacks) - self.run_id = str(uuid.uuid4()) + self.event_notifier = self.pipeline.event_notifier + self.run_id = run_id or str(uuid.uuid4()) async def run_task(self, task: TaskPipelineNode, data: dict[str, Any]) -> None: """Get inputs and run a specific task. Once the task is done, diff --git a/src/neo4j_graphrag/experimental/pipeline/pipeline.py b/src/neo4j_graphrag/experimental/pipeline/pipeline.py index f6ede6b51..5fda60767 100644 --- a/src/neo4j_graphrag/experimental/pipeline/pipeline.py +++ b/src/neo4j_graphrag/experimental/pipeline/pipeline.py @@ -21,6 +21,8 @@ from timeit import default_timer from typing import Any, AsyncGenerator, Optional +import uuid + from neo4j_graphrag.utils.logging import prettify try: @@ -41,6 +43,7 @@ EventCallbackProtocol, EventType, PipelineEvent, + EventNotifier, ) from neo4j_graphrag.experimental.pipeline.orchestrator import Orchestrator from neo4j_graphrag.experimental.pipeline.pipeline_graph import ( @@ -124,7 +127,6 @@ def __init__( ) -> None: super().__init__() self.store = store or InMemoryStore() - self.callbacks = [callback] if callback else [] self.final_results = InMemoryStore() self.is_validated = False self.param_mapping: dict[str, dict[str, dict[str, str]]] = defaultdict(dict) @@ -139,6 +141,7 @@ def __init__( } """ self.missing_inputs: dict[str, list[str]] = defaultdict() + self.event_notifier = EventNotifier([callback] if callback else []) @classmethod def from_template( @@ -514,7 +517,7 @@ async def event_stream(event: Event) -> None: await event_queue.put(event) # Add event streaming callback - self.callbacks.append(event_stream) + self.event_notifier.add_callback(event_stream) event_queue_getter_task = None try: @@ -546,34 +549,36 @@ async def event_stream(event: Event) -> None: yield event # type: ignore if exc := run_task.exception(): - yield PipelineEvent( - event_type=EventType.PIPELINE_FAILED, - # run_id is null if pipeline fails before even starting - # ie during pipeline validation - run_id=run_id or "", - message=str(exc), - ) if raise_exception: raise exc finally: # Restore original callback - self.callbacks.remove(event_stream) + self.event_notifier.remove_callback(event_stream) if event_queue_getter_task and not event_queue_getter_task.done(): event_queue_getter_task.cancel() async def run(self, data: dict[str, Any]) -> PipelineResult: - logger.debug("PIPELINE START") start_time = default_timer() + run_id = str(uuid.uuid4()) + logger.debug(f"PIPELINE START with {run_id=}") + try: + res = await self._run(run_id, data) + except Exception as e: + await self.event_notifier.notify_pipeline_failed( + run_id, + message=f"Pipeline failed with error {e}", + ) + raise e + end_time = default_timer() + logger.debug(f"PIPELINE FINISHED {run_id} in {end_time - start_time}s") + return res + + async def _run(self, run_id: str, data: dict[str, Any]) -> PipelineResult: self.invalidate() self.validate_input_data(data) - orchestrator = Orchestrator(self) - logger.debug(f"PIPELINE ORCHESTRATOR: {orchestrator.run_id}") + orchestrator = Orchestrator(self, run_id) await orchestrator.run(data) - end_time = default_timer() - logger.debug( - f"PIPELINE FINISHED {orchestrator.run_id} in {end_time - start_time}s" - ) return PipelineResult( run_id=orchestrator.run_id, result=await self.get_final_results(orchestrator.run_id), diff --git a/tests/unit/experimental/pipeline/test_pipeline.py b/tests/unit/experimental/pipeline/test_pipeline.py index d37fc65fc..d648df3e5 100644 --- a/tests/unit/experimental/pipeline/test_pipeline.py +++ b/tests/unit/experimental/pipeline/test_pipeline.py @@ -503,7 +503,7 @@ async def test_pipeline_streaming_no_user_callback_happy_path() -> None: assert len(events) == 2 assert events[0].event_type == EventType.PIPELINE_STARTED assert events[1].event_type == EventType.PIPELINE_FINISHED - assert len(pipe.callbacks) == 0 + assert len(pipe.event_notifier.callbacks) == 0 @pytest.mark.asyncio @@ -515,7 +515,7 @@ async def test_pipeline_streaming_with_user_callback_happy_path() -> None: events.append(e) assert len(events) == 2 assert len(callback.call_args_list) == 2 - assert len(pipe.callbacks) == 1 + assert len(pipe.event_notifier.callbacks) == 1 @pytest.mark.asyncio @@ -528,7 +528,7 @@ async def callback(event: Event) -> None: async for e in pipe.stream({}): events.append(e) assert len(events) == 2 - assert len(pipe.callbacks) == 1 + assert len(pipe.event_notifier.callbacks) == 1 @pytest.mark.asyncio @@ -559,9 +559,9 @@ async def test_pipeline_streaming_error_in_pipeline_definition() -> None: events.append(e) # validation happens before pipeline run actually starts # but we have the PIPELINE_FAILED event + print(events) assert len(events) == 1 assert events[0].event_type == EventType.PIPELINE_FAILED - assert events[0].run_id == "" @pytest.mark.asyncio @@ -573,6 +573,7 @@ async def test_pipeline_streaming_error_in_component() -> None: with pytest.raises(TypeError): async for e in pipe.stream({"component": {"number1": None, "number2": 2}}): events.append(e) + print(events) assert len(events) == 3 assert events[0].event_type == EventType.PIPELINE_STARTED assert events[1].event_type == EventType.TASK_STARTED @@ -589,4 +590,4 @@ async def callback(event: Event) -> None: async for e in pipe.stream({}): events.append(e) assert len(events) == 2 - assert len(pipe.callbacks) == 1 + assert len(pipe.event_notifier.callbacks) == 1 From 7b3a17ee156666f4b9e97a73346267f2c8e5eaa2 Mon Sep 17 00:00:00 2001 From: estelle Date: Tue, 9 Dec 2025 14:27:44 +0100 Subject: [PATCH 2/8] Ruff --- src/neo4j_graphrag/experimental/pipeline/orchestrator.py | 1 - src/neo4j_graphrag/experimental/pipeline/pipeline.py | 4 ---- 2 files changed, 5 deletions(-) diff --git a/src/neo4j_graphrag/experimental/pipeline/orchestrator.py b/src/neo4j_graphrag/experimental/pipeline/orchestrator.py index 5a50b7855..5bf61c7b9 100644 --- a/src/neo4j_graphrag/experimental/pipeline/orchestrator.py +++ b/src/neo4j_graphrag/experimental/pipeline/orchestrator.py @@ -26,7 +26,6 @@ PipelineMissingDependencyError, PipelineStatusUpdateError, ) -from neo4j_graphrag.experimental.pipeline.notification import EventNotifier from neo4j_graphrag.experimental.pipeline.types.context import RunContext from neo4j_graphrag.experimental.pipeline.types.orchestration import ( RunResult, diff --git a/src/neo4j_graphrag/experimental/pipeline/pipeline.py b/src/neo4j_graphrag/experimental/pipeline/pipeline.py index 5fda60767..6f95dc6d7 100644 --- a/src/neo4j_graphrag/experimental/pipeline/pipeline.py +++ b/src/neo4j_graphrag/experimental/pipeline/pipeline.py @@ -41,8 +41,6 @@ from neo4j_graphrag.experimental.pipeline.notification import ( Event, EventCallbackProtocol, - EventType, - PipelineEvent, EventNotifier, ) from neo4j_graphrag.experimental.pipeline.orchestrator import Orchestrator @@ -510,7 +508,6 @@ async def stream( """ # Create queue for events event_queue: asyncio.Queue[Event] = asyncio.Queue() - run_id = None async def event_stream(event: Event) -> None: # Put event in queue for streaming @@ -545,7 +542,6 @@ async def event_stream(event: Event) -> None: # we are sure to get an Event here, since this is the only # thing we put in the queue, but mypy still complains event = event_future.result() - run_id = getattr(event, "run_id", None) yield event # type: ignore if exc := run_task.exception(): From c5b204f978d9900bb08bb7a632960c8c79269de3 Mon Sep 17 00:00:00 2001 From: estelle Date: Thu, 11 Dec 2025 10:24:49 +0100 Subject: [PATCH 3/8] Remove prints --- tests/unit/experimental/pipeline/test_pipeline.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/unit/experimental/pipeline/test_pipeline.py b/tests/unit/experimental/pipeline/test_pipeline.py index d648df3e5..e7df7403b 100644 --- a/tests/unit/experimental/pipeline/test_pipeline.py +++ b/tests/unit/experimental/pipeline/test_pipeline.py @@ -559,7 +559,6 @@ async def test_pipeline_streaming_error_in_pipeline_definition() -> None: events.append(e) # validation happens before pipeline run actually starts # but we have the PIPELINE_FAILED event - print(events) assert len(events) == 1 assert events[0].event_type == EventType.PIPELINE_FAILED @@ -573,7 +572,6 @@ async def test_pipeline_streaming_error_in_component() -> None: with pytest.raises(TypeError): async for e in pipe.stream({"component": {"number1": None, "number2": 2}}): events.append(e) - print(events) assert len(events) == 3 assert events[0].event_type == EventType.PIPELINE_STARTED assert events[1].event_type == EventType.TASK_STARTED From 209454533e796911949fe870cfc13ec2ca930ccb Mon Sep 17 00:00:00 2001 From: estelle Date: Thu, 11 Dec 2025 10:35:36 +0100 Subject: [PATCH 4/8] Move pipeline started event notification so that we have both started and failure event - add unit test --- .../experimental/pipeline/orchestrator.py | 4 ---- .../experimental/pipeline/pipeline.py | 7 +++++- .../experimental/pipeline/test_pipeline.py | 24 +++++++++++++++---- 3 files changed, 26 insertions(+), 9 deletions(-) diff --git a/src/neo4j_graphrag/experimental/pipeline/orchestrator.py b/src/neo4j_graphrag/experimental/pipeline/orchestrator.py index 5bf61c7b9..bacfdf89a 100644 --- a/src/neo4j_graphrag/experimental/pipeline/orchestrator.py +++ b/src/neo4j_graphrag/experimental/pipeline/orchestrator.py @@ -264,9 +264,5 @@ async def run(self, data: dict[str, Any]) -> None: (node without any parent). Then the callback on_task_complete will handle the task dependencies. """ - await self.event_notifier.notify_pipeline_started(self.run_id, data) tasks = [self.run_task(root, data) for root in self.pipeline.roots()] await asyncio.gather(*tasks) - await self.event_notifier.notify_pipeline_finished( - self.run_id, await self.pipeline.get_final_results(self.run_id) - ) diff --git a/src/neo4j_graphrag/experimental/pipeline/pipeline.py b/src/neo4j_graphrag/experimental/pipeline/pipeline.py index 6f95dc6d7..7cf0c6f46 100644 --- a/src/neo4j_graphrag/experimental/pipeline/pipeline.py +++ b/src/neo4j_graphrag/experimental/pipeline/pipeline.py @@ -571,11 +571,16 @@ async def run(self, data: dict[str, Any]) -> PipelineResult: return res async def _run(self, run_id: str, data: dict[str, Any]) -> PipelineResult: + await self.event_notifier.notify_pipeline_started(run_id, data) self.invalidate() self.validate_input_data(data) orchestrator = Orchestrator(self, run_id) await orchestrator.run(data) - return PipelineResult( + result = PipelineResult( run_id=orchestrator.run_id, result=await self.get_final_results(orchestrator.run_id), ) + await self.event_notifier.notify_pipeline_finished( + run_id, await self.get_final_results(run_id), + ) + return result diff --git a/tests/unit/experimental/pipeline/test_pipeline.py b/tests/unit/experimental/pipeline/test_pipeline.py index e7df7403b..16e23cd27 100644 --- a/tests/unit/experimental/pipeline/test_pipeline.py +++ b/tests/unit/experimental/pipeline/test_pipeline.py @@ -483,6 +483,23 @@ async def test_pipeline_event_notification() -> None: previous_ts = actual_event.timestamp +@pytest.mark.asyncio +async def test_pipeline_event_notification_error_in_pipeline_run() -> None: + callback = AsyncMock(spec=EventCallbackProtocol) + pipe = Pipeline(callback=callback) + component_a = ComponentAdd() + component_b = ComponentAdd() + pipe.add_component(component_a, "a") + pipe.add_component(component_b, "b") + pipe.connect("a", "b", {"number1": "a.result"}) + + with pytest.raises(PipelineDefinitionError): + await pipe.run({"a": {"number1": 1, "number2": 2}}) + assert len(callback.await_args_list) == 2 + assert callback.await_args_list[0][0][0].event_type == EventType.PIPELINE_STARTED + assert callback.await_args_list[1][0][0].event_type == EventType.PIPELINE_FAILED + + def test_event_model_no_warning(recwarn: Sized) -> None: event = Event( event_type=EventType.PIPELINE_STARTED, @@ -557,10 +574,9 @@ async def test_pipeline_streaming_error_in_pipeline_definition() -> None: with pytest.raises(PipelineDefinitionError): async for e in pipe.stream({"a": {"number1": 1, "number2": 2}}): events.append(e) - # validation happens before pipeline run actually starts - # but we have the PIPELINE_FAILED event - assert len(events) == 1 - assert events[0].event_type == EventType.PIPELINE_FAILED + assert len(events) == 2 + assert events[0].event_type == EventType.PIPELINE_STARTED + assert events[1].event_type == EventType.PIPELINE_FAILED @pytest.mark.asyncio From e48269cc61f97181a63379449ff90376cf719b78 Mon Sep 17 00:00:00 2001 From: estelle Date: Thu, 11 Dec 2025 10:39:55 +0100 Subject: [PATCH 5/8] ruff --- src/neo4j_graphrag/experimental/pipeline/pipeline.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/neo4j_graphrag/experimental/pipeline/pipeline.py b/src/neo4j_graphrag/experimental/pipeline/pipeline.py index 7cf0c6f46..2743107c5 100644 --- a/src/neo4j_graphrag/experimental/pipeline/pipeline.py +++ b/src/neo4j_graphrag/experimental/pipeline/pipeline.py @@ -581,6 +581,7 @@ async def _run(self, run_id: str, data: dict[str, Any]) -> PipelineResult: result=await self.get_final_results(orchestrator.run_id), ) await self.event_notifier.notify_pipeline_finished( - run_id, await self.get_final_results(run_id), + run_id, + await self.get_final_results(run_id), ) return result From 60aca0acbb3740b3520e91ad8f7a630bab1755cd Mon Sep 17 00:00:00 2001 From: estelle Date: Thu, 11 Dec 2025 13:32:12 +0100 Subject: [PATCH 6/8] Round time in debug log --- examples/build_graph/simple_kg_builder_from_pdf.py | 3 +++ src/neo4j_graphrag/experimental/pipeline/pipeline.py | 4 ++-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/examples/build_graph/simple_kg_builder_from_pdf.py b/examples/build_graph/simple_kg_builder_from_pdf.py index d3b2948f8..9a62870c4 100644 --- a/examples/build_graph/simple_kg_builder_from_pdf.py +++ b/examples/build_graph/simple_kg_builder_from_pdf.py @@ -22,6 +22,9 @@ AUTH = ("neo4j", "password") DATABASE = "neo4j" +import logging + +logging.basicConfig(level=logging.DEBUG) root_dir = Path(__file__).parents[1] file_path = root_dir / "data" / "Harry Potter and the Chamber of Secrets Summary.pdf" diff --git a/src/neo4j_graphrag/experimental/pipeline/pipeline.py b/src/neo4j_graphrag/experimental/pipeline/pipeline.py index 2743107c5..5cb8602eb 100644 --- a/src/neo4j_graphrag/experimental/pipeline/pipeline.py +++ b/src/neo4j_graphrag/experimental/pipeline/pipeline.py @@ -104,7 +104,7 @@ async def run( res = await self.execute(context, inputs) end_time = default_timer() logger.debug( - f"TASK FINISHED {self.name} in {end_time - start_time} res={prettify(res)}" + f"TASK FINISHED {self.name} in {round(end_time - start_time)}s res={prettify(res)}" ) return res @@ -567,7 +567,7 @@ async def run(self, data: dict[str, Any]) -> PipelineResult: ) raise e end_time = default_timer() - logger.debug(f"PIPELINE FINISHED {run_id} in {end_time - start_time}s") + logger.debug(f"PIPELINE FINISHED {run_id} in {round(end_time - start_time)}s") return res async def _run(self, run_id: str, data: dict[str, Any]) -> PipelineResult: From c418f08fe1bd3c20cb091f62d45ce2c4ef43df44 Mon Sep 17 00:00:00 2001 From: estelle Date: Thu, 11 Dec 2025 13:35:36 +0100 Subject: [PATCH 7/8] Rm test code --- examples/build_graph/simple_kg_builder_from_pdf.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/examples/build_graph/simple_kg_builder_from_pdf.py b/examples/build_graph/simple_kg_builder_from_pdf.py index 9a62870c4..d3b2948f8 100644 --- a/examples/build_graph/simple_kg_builder_from_pdf.py +++ b/examples/build_graph/simple_kg_builder_from_pdf.py @@ -22,9 +22,6 @@ AUTH = ("neo4j", "password") DATABASE = "neo4j" -import logging - -logging.basicConfig(level=logging.DEBUG) root_dir = Path(__file__).parents[1] file_path = root_dir / "data" / "Harry Potter and the Chamber of Secrets Summary.pdf" From c3643a4fe97e9bfe4e8dd36e0c728e4fcb14b7f9 Mon Sep 17 00:00:00 2001 From: estelle Date: Thu, 11 Dec 2025 13:45:12 +0100 Subject: [PATCH 8/8] round with 2 digits --- src/neo4j_graphrag/experimental/pipeline/pipeline.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/neo4j_graphrag/experimental/pipeline/pipeline.py b/src/neo4j_graphrag/experimental/pipeline/pipeline.py index 5cb8602eb..e0687c613 100644 --- a/src/neo4j_graphrag/experimental/pipeline/pipeline.py +++ b/src/neo4j_graphrag/experimental/pipeline/pipeline.py @@ -104,7 +104,7 @@ async def run( res = await self.execute(context, inputs) end_time = default_timer() logger.debug( - f"TASK FINISHED {self.name} in {round(end_time - start_time)}s res={prettify(res)}" + f"TASK FINISHED {self.name} in {round(end_time - start_time, 2)}s res={prettify(res)}" ) return res @@ -567,7 +567,9 @@ async def run(self, data: dict[str, Any]) -> PipelineResult: ) raise e end_time = default_timer() - logger.debug(f"PIPELINE FINISHED {run_id} in {round(end_time - start_time)}s") + logger.debug( + f"PIPELINE FINISHED {run_id} in {round(end_time - start_time, 2)}s" + ) return res async def _run(self, run_id: str, data: dict[str, Any]) -> PipelineResult: