From 26394f1f9a2056e51d53a752eb31eb4352ee0069 Mon Sep 17 00:00:00 2001 From: mathislucka Date: Thu, 9 Jan 2025 14:48:43 +0100 Subject: [PATCH 1/8] chore: add pipeline run tests showing bugs --- .../pipeline/features/pipeline_run.feature | 5 + test/core/pipeline/features/test_run.py | 553 +++++++++++++++++- 2 files changed, 557 insertions(+), 1 deletion(-) diff --git a/test/core/pipeline/features/pipeline_run.feature b/test/core/pipeline/features/pipeline_run.feature index db064ea2b1..398c4f3c2c 100644 --- a/test/core/pipeline/features/pipeline_run.feature +++ b/test/core/pipeline/features/pipeline_run.feature @@ -39,11 +39,16 @@ Feature: Pipeline running | that is linear with conditional branching and multiple joins | | that is a simple agent | | that has a variadic component that receives partial inputs | + | that has a variadic component that receives partial inputs in a different order | | that has an answer joiner variadic component | | that is linear and a component in the middle receives optional input from other components and input from the user | | that has a loop in the middle | | that has variadic component that receives a conditional input | | that has a string variadic component | + | that is an agent that can use RAG | + | that has a feedback loop | + | created in a non-standard order that has a loop | + | that has an agent with a feedback cycle | Scenario Outline: Running a bad Pipeline Given a pipeline diff --git a/test/core/pipeline/features/test_run.py b/test/core/pipeline/features/test_run.py index 8f07dfec99..6f10320b8a 100644 --- a/test/core/pipeline/features/test_run.py +++ b/test/core/pipeline/features/test_run.py @@ -10,7 +10,8 @@ from haystack.dataclasses import ChatMessage, GeneratedAnswer from haystack.components.routers import ConditionalRouter from haystack.components.builders import PromptBuilder, AnswerBuilder, ChatPromptBuilder -from haystack.components.preprocessors import DocumentCleaner, DocumentSplitter +from haystack.components.converters import OutputAdapter +from haystack.components.preprocessors import DocumentCleaner from haystack.components.retrievers.in_memory import InMemoryBM25Retriever from haystack.document_stores.in_memory import InMemoryDocumentStore from haystack.components.joiners import BranchJoiner, DocumentJoiner, AnswerJoiner, StringJoiner @@ -1808,6 +1809,61 @@ def run(self, create_document: bool = False): ], ) +@given("a pipeline that has a variadic component that receives partial inputs in a different order", target_fixture="pipeline_data") +def that_has_a_variadic_component_that_receives_partial_inputs_different_order(): + @component + class ConditionalDocumentCreator: + def __init__(self, content: str): + self._content = content + + @component.output_types(documents=List[Document], noop=None) + def run(self, create_document: bool = False): + if create_document: + return {"documents": [Document(id=self._content, content=self._content)]} + return {"noop": None} + + pipeline = Pipeline(max_runs_per_component=1) + pipeline.add_component("third_creator", ConditionalDocumentCreator(content="Third document")) + pipeline.add_component("first_creator", ConditionalDocumentCreator(content="First document")) + pipeline.add_component("second_creator", ConditionalDocumentCreator(content="Second document")) + pipeline.add_component("documents_joiner", DocumentJoiner()) + + pipeline.connect("first_creator.documents", "documents_joiner.documents") + pipeline.connect("second_creator.documents", "documents_joiner.documents") + pipeline.connect("third_creator.documents", "documents_joiner.documents") + + return ( + pipeline, + [ + PipelineRunData( + inputs={"first_creator": {"create_document": True}, "third_creator": {"create_document": True}}, + expected_outputs={ + "second_creator": {"noop": None}, + "documents_joiner": { + "documents": [ + Document(id="First document", content="First document"), + Document(id="Third document", content="Third document"), + ] + }, + }, + expected_run_order=["first_creator", "second_creator", "third_creator", "documents_joiner"], + ), + PipelineRunData( + inputs={"first_creator": {"create_document": True}, "second_creator": {"create_document": True}}, + expected_outputs={ + "third_creator": {"noop": None}, + "documents_joiner": { + "documents": [ + Document(id="First document", content="First document"), + Document(id="Second document", content="Second document"), + ] + }, + }, + expected_run_order=["first_creator", "second_creator", "third_creator", "documents_joiner"], + ), + ], + ) + @given("a pipeline that has an answer joiner variadic component", target_fixture="pipeline_data") def that_has_an_answer_joiner_variadic_component(): @@ -2224,3 +2280,498 @@ def that_has_a_string_variadic_component(): ) ], ) + +@given("a pipeline that is an agent that can use RAG", target_fixture="pipeline_data") +def an_agent_that_can_use_RAG(): + @component + class FixedGenerator: + def __init__(self, replies): + self.replies = replies + self.idx = 0 + @component.output_types(replies=List[str]) + def run(self, prompt: str): + if self.idx < len(self.replies): + replies = [self.replies[self.idx]] + self.idx += 1 + else: + self.idx = 0 + replies = [self.replies[self.idx]] + self.idx += 1 + + return {"replies": replies} + + @component + class FakeRetriever: + @component.output_types(documents=List[Document]) + def run(self, query: str): + return {"documents": [Document(content="This is a document potentially answering the question.", meta={"access_group": 1})]} + + agent_prompt_template = """ +Your task is to answer the user's question. +You can use a RAG system to find information. +Use the RAG system until you have sufficient information to answer the question. +To use the RAG system, output "search:" followed by your question. +Once you have an answer, output "answer:" followed by your answer. + +Here is the question: {{query}} + """ + + rag_prompt_template = """ +Answer the question based on the provided documents. +Question: {{ query }} +Documents: +{% for document in documents %} +{{ document.content }} +{% endfor %} + """ + + joiner = BranchJoiner(type_=str) + + agent_llm = FixedGenerator(replies=["search: Can you help me?", "answer: here is my answer"]) + agent_prompt = PromptBuilder(template=agent_prompt_template) + + rag_llm = FixedGenerator(replies=["This is all the information I found!"]) + rag_prompt = PromptBuilder(template=rag_prompt_template) + + retriever = FakeRetriever() + + routes = [ + { + "condition": "{{ 'search:' in replies[0] }}", + "output": "{{ replies[0] }}", + "output_name": "search", + "output_type": str, + }, + { + "condition": "{{ 'answer:' in replies[0] }}", + "output": "{{ replies }}", + "output_name": "answer", + "output_type": List[str], + }, + ] + + router = ConditionalRouter(routes=routes) + + concatenator = OutputAdapter(template="{{current_prompt + '\n' + rag_answer[0]}}", output_type=str) + + answer_builder = AnswerBuilder() + + pp = Pipeline(max_runs_per_component=2) + + pp.add_component("joiner", joiner) + pp.add_component("rag_llm", rag_llm) + pp.add_component("rag_prompt", rag_prompt) + pp.add_component("agent_prompt", agent_prompt) + pp.add_component("agent_llm", agent_llm) + pp.add_component("router", router) + pp.add_component("concatenator", concatenator) + pp.add_component("retriever", retriever) + pp.add_component("answer_builder", answer_builder) + + pp.connect("agent_prompt.prompt", "joiner.value") + pp.connect("joiner.value", "agent_llm.prompt") + pp.connect("agent_llm.replies", "router.replies") + pp.connect("router.search", "retriever.query") + pp.connect("router.answer", "answer_builder.replies") + pp.connect("retriever.documents", "rag_prompt.documents") + pp.connect("rag_prompt.prompt", "rag_llm.prompt") + pp.connect("rag_llm.replies", "concatenator.rag_answer") + pp.connect("joiner.value", "concatenator.current_prompt") + pp.connect("concatenator.output", "joiner.value") + + + + + query = "Does this run reliably?" + + return ( + pp, + [ + PipelineRunData( + inputs={"agent_prompt": {"query": query}, "rag_prompt": {"query": query}, "answer_builder": {"query": query}}, + expected_outputs={ + "answer_builder": { + "answers": [GeneratedAnswer(data="answer: here is my answer", query=query, documents=[])] + } + }, + expected_run_order=[ + "agent_prompt", + "joiner", + "agent_llm", + "router", + "retriever", + "rag_prompt", + "rag_llm", + "concatenator", + "joiner", + "agent_llm", + "router", + "answer_builder" + ], + ) + ], + ) + +@given("a pipeline that has a feedback loop", target_fixture="pipeline_data") +def has_feedback_loop(): + @component + class FixedGenerator: + def __init__(self, replies): + self.replies = replies + self.idx = 0 + @component.output_types(replies=List[str]) + def run(self, prompt: str): + if self.idx < len(self.replies): + replies = [self.replies[self.idx]] + self.idx += 1 + else: + self.idx = 0 + replies = [self.replies[self.idx]] + self.idx += 1 + + return {"replies": replies} + + + code_prompt_template = """ +Generate code to solve the task: {{ task }} + +{% if feedback %} +Here is your initial attempt and some feedback: +{{ feedback }} +{% endif %} + """ + + feedback_prompt_template = """ +Check if this code is valid and can run: {{ code[0] }} +Return "PASS" if it passes and "FAIL" if it fails. +Provide additional feedback on why it fails. + """ + + code_llm = FixedGenerator(replies=["invalid code", "valid code"]) + code_prompt = PromptBuilder(template=code_prompt_template) + + feedback_llm = FixedGenerator(replies=["FAIL", "PASS"]) + feedback_prompt = PromptBuilder(template=feedback_prompt_template) + + routes = [ + { + "condition": "{{ 'FAIL' in replies[0] }}", + "output": "{{ replies[0] }}", + "output_name": "fail", + "output_type": str, + }, + { + "condition": "{{ 'PASS' in replies[0] }}", + "output": "{{ code }}", + "output_name": "pass", + "output_type": List[str], + }, + ] + + router = ConditionalRouter(routes=routes) + + concatenator = OutputAdapter(template="{{current_prompt[0] + '\n' + feedback[0]}}", output_type=str) + + answer_builder = AnswerBuilder() + + pp = Pipeline(max_runs_per_component=100) + + pp.add_component("code_llm", code_llm) + pp.add_component("code_prompt", code_prompt) + pp.add_component("feedback_prompt", feedback_prompt) + pp.add_component("feedback_llm", feedback_llm) + pp.add_component("router", router) + pp.add_component("concatenator", concatenator) + pp.add_component("answer_builder", answer_builder) + + pp.connect("code_prompt.prompt", "code_llm.prompt") + pp.connect("code_llm.replies", "feedback_prompt.code") + pp.connect("feedback_llm.replies", "router.replies") + pp.connect("router.fail", "concatenator.feedback") + pp.connect("router.pass", "answer_builder.replies") + pp.connect("code_llm.replies", "router.code") + pp.connect("feedback_prompt.prompt", "feedback_llm.prompt") + pp.connect("code_llm.replies", "concatenator.current_prompt") + pp.connect("concatenator.output", "code_prompt.feedback") + + + + + task = "Generate code to generate christmas ascii-art" + + return ( + pp, + [ + PipelineRunData( + inputs={"code_prompt": {"task": task}, "answer_builder": {"query": task}}, + expected_outputs={ + "answer_builder": { + "answers": [GeneratedAnswer(data="valid code", query=task, documents=[])] + } + }, + expected_run_order=[ + 'code_prompt', 'code_llm', 'feedback_prompt', 'feedback_llm', 'router', 'concatenator', + 'code_prompt', 'code_llm', 'feedback_prompt', 'feedback_llm', 'router', 'answer_builder'], + ) + ], + ) + +@given("a pipeline created in a non-standard order that has a loop", target_fixture="pipeline_data") +def has_non_standard_order_loop(): + @component + class FixedGenerator: + def __init__(self, replies): + self.replies = replies + self.idx = 0 + @component.output_types(replies=List[str]) + def run(self, prompt: str): + if self.idx < len(self.replies): + replies = [self.replies[self.idx]] + self.idx += 1 + else: + self.idx = 0 + replies = [self.replies[self.idx]] + self.idx += 1 + + return {"replies": replies} + + + code_prompt_template = """ +Generate code to solve the task: {{ task }} + +{% if feedback %} +Here is your initial attempt and some feedback: +{{ feedback }} +{% endif %} + """ + + feedback_prompt_template = """ +Check if this code is valid and can run: {{ code[0] }} +Return "PASS" if it passes and "FAIL" if it fails. +Provide additional feedback on why it fails. + """ + + + + + code_llm = FixedGenerator(replies=["invalid code", "valid code"]) + code_prompt = PromptBuilder(template=code_prompt_template) + + feedback_llm = FixedGenerator(replies=["FAIL", "PASS"]) + feedback_prompt = PromptBuilder(template=feedback_prompt_template) + + routes = [ + { + "condition": "{{ 'FAIL' in replies[0] }}", + "output": "{{ replies[0] }}", + "output_name": "fail", + "output_type": str, + }, + { + "condition": "{{ 'PASS' in replies[0] }}", + "output": "{{ code }}", + "output_name": "pass", + "output_type": List[str], + }, + ] + + router = ConditionalRouter(routes=routes) + + concatenator = OutputAdapter(template="{{current_prompt[0] + '\n' + feedback[0]}}", output_type=str) + + answer_builder = AnswerBuilder() + + pp = Pipeline(max_runs_per_component=100) + + pp.add_component("concatenator", concatenator) + pp.add_component("code_llm", code_llm) + pp.add_component("code_prompt", code_prompt) + pp.add_component("feedback_prompt", feedback_prompt) + pp.add_component("feedback_llm", feedback_llm) + pp.add_component("router", router) + + pp.add_component("answer_builder", answer_builder) + + + pp.connect("concatenator.output", "code_prompt.feedback") + pp.connect("code_prompt.prompt", "code_llm.prompt") + pp.connect("code_llm.replies", "feedback_prompt.code") + pp.connect("feedback_llm.replies", "router.replies") + pp.connect("router.fail", "concatenator.feedback") + pp.connect("feedback_prompt.prompt", "feedback_llm.prompt") + pp.connect("router.pass", "answer_builder.replies") + pp.connect("code_llm.replies", "router.code") + pp.connect("code_llm.replies", "concatenator.current_prompt") + + + task = "Generate code to generate christmas ascii-art" + + return ( + pp, + [ + PipelineRunData( + inputs={"code_prompt": {"task": task}, "answer_builder": {"query": task}}, + expected_outputs={ + "answer_builder": { + "answers": [GeneratedAnswer(data="valid code", query=task, documents=[])] + } + }, + expected_run_order=[ + 'code_prompt', 'code_llm', 'feedback_prompt', 'feedback_llm', 'router', 'concatenator', + 'code_prompt', 'code_llm', 'feedback_prompt', 'feedback_llm', 'router', 'answer_builder'], + ) + ], + ) + +@given("a pipeline that has an agent with a feedback cycle", target_fixture="pipeline_data") +def agent_with_feedback_cycle(): + @component + class FixedGenerator: + def __init__(self, replies): + self.replies = replies + self.idx = 0 + @component.output_types(replies=List[str]) + def run(self, prompt: str): + if self.idx < len(self.replies): + replies = [self.replies[self.idx]] + self.idx += 1 + else: + self.idx = 1 + replies = [self.replies[self.idx]] + + return {"replies": replies} + + @component + class FakeFileEditor: + @component.output_types(files=str) + def run(self, replies: List[str]): + return {"files": "This is the edited file content."} + + + code_prompt_template = """ +Generate code to solve the task: {{ task }} + +You can edit files by returning: +Edit: file_name + +Once you solved the task, respond with: +Task finished! + +{% if feedback %} +Here is your initial attempt and some feedback: +{{ feedback }} +{% endif %} + """ + + feedback_prompt_template = """ +{% if task_finished %} +Check if this code is valid and can run: {{ code }} +Return "PASS" if it passes and "FAIL" if it fails. +Provide additional feedback on why it fails. +{% endif %} + """ + + code_llm = FixedGenerator(replies=["Edit: file_1.py", "Edit: file_2.py", "Edit: file_3.py", "Task finished!"]) + code_prompt = PromptBuilder(template=code_prompt_template) + file_editor = FakeFileEditor() + + feedback_llm = FixedGenerator(replies=["FAIL", "PASS"]) + feedback_prompt = PromptBuilder(template=feedback_prompt_template, required_variables=["task_finished"]) + + routes = [ + { + "condition": "{{ 'FAIL' in replies[0] }}", + "output": "{{ current_prompt + '\n' + replies[0] }}", + "output_name": "fail", + "output_type": str, + }, + { + "condition": "{{ 'PASS' in replies[0] }}", + "output": "{{ replies }}", + "output_name": "pass", + "output_type": List[str], + }, + ] + feedback_router = ConditionalRouter(routes=routes) + + + tool_use_routes = [ + { + "condition": "{{ 'Edit:' in replies[0] }}", + "output": "{{ replies }}", + "output_name": "edit", + "output_type": List[str], + }, + { + "condition": "{{ 'Task finished!' in replies[0] }}", + "output": "{{ replies }}", + "output_name": "done", + "output_type": List[str], + }, + ] + tool_use_router = ConditionalRouter(routes=tool_use_routes) + + + joiner = BranchJoiner(type_=str) + agent_concatenator = OutputAdapter(template="{{current_prompt + '\n' + files}}", output_type=str) + + + pp = Pipeline(max_runs_per_component=100) + + pp.add_component("code_prompt", code_prompt) + pp.add_component("joiner", joiner) + pp.add_component("code_llm", code_llm) + pp.add_component("tool_use_router", tool_use_router) + pp.add_component("file_editor", file_editor) + pp.add_component("agent_concatenator", agent_concatenator) + pp.add_component("feedback_prompt", feedback_prompt) + pp.add_component("feedback_llm", feedback_llm) + pp.add_component("feedback_router", feedback_router) + + + # Main Agent + pp.connect("code_prompt.prompt", "joiner.value") + pp.connect("joiner.value", "code_llm.prompt") + pp.connect("code_llm.replies", "tool_use_router.replies") + pp.connect("tool_use_router.edit", "file_editor.replies") + pp.connect("file_editor.files", "agent_concatenator.files") + pp.connect("joiner.value", "agent_concatenator.current_prompt") + pp.connect("agent_concatenator.output", "joiner.value") + + # Feedback Cycle + pp.connect("tool_use_router.done", "feedback_prompt.task_finished") + pp.connect("agent_concatenator.output", "feedback_prompt.code") + pp.connect("feedback_prompt.prompt", "feedback_llm.prompt") + pp.connect("feedback_llm.replies", "feedback_router.replies") + pp.connect("agent_concatenator.output", "feedback_router.current_prompt") + pp.connect("feedback_router.fail", "joiner.value") + + + task = "Generate code to generate christmas ascii-art" + + return ( + pp, + [ + PipelineRunData( + inputs={"code_prompt": {"task": task}}, + expected_outputs={ + "feedback_router": { + "pass": ["PASS"] + } + }, + expected_run_order=[ + 'code_prompt', + + "joiner", "code_llm", "tool_use_router", "file_editor", "agent_concatenator", + "joiner", "code_llm", "tool_use_router", "file_editor", "agent_concatenator", + "joiner", "code_llm", "tool_use_router", "file_editor", "agent_concatenator", + "joiner", "code_llm", "tool_use_router", "feedback_prompt", "feedback_llm", "feedback_router", + + "joiner", "code_llm", "tool_use_router", "file_editor", "agent_concatenator", + "joiner", "code_llm", "tool_use_router", "file_editor", "agent_concatenator", + "joiner", "code_llm", "tool_use_router", "file_editor", "agent_concatenator", + "joiner", "code_llm", "tool_use_router", "feedback_prompt", "feedback_llm", "feedback_router" + ], + ) + ], + ) \ No newline at end of file From ab03473eb2c71969242105ba9d094fe7a5af5be6 Mon Sep 17 00:00:00 2001 From: mathislucka Date: Thu, 9 Jan 2025 15:56:13 +0100 Subject: [PATCH 2/8] chore: format --- test/core/pipeline/features/test_run.py | 154 ++++++++++++++++-------- 1 file changed, 101 insertions(+), 53 deletions(-) diff --git a/test/core/pipeline/features/test_run.py b/test/core/pipeline/features/test_run.py index 6f10320b8a..673b06b92c 100644 --- a/test/core/pipeline/features/test_run.py +++ b/test/core/pipeline/features/test_run.py @@ -824,7 +824,7 @@ def pipeline_that_has_a_component_with_only_default_inputs(): "answers": [ GeneratedAnswer( data="Paris", - query="What " "is " "the " "capital " "of " "France?", + query="What is the capital of France?", documents=[ Document( id="413dccdf51a54cca75b7ed2eddac04e6e58560bd2f0caf4106a3efc023fe3651", @@ -917,7 +917,7 @@ def fake_generator_run(self, generation_kwargs: Optional[Dict[str, Any]] = None, pipe, [ PipelineRunData( - inputs={"prompt_builder": {"query": "What is the capital of " "Italy?"}}, + inputs={"prompt_builder": {"query": "What is the capital of Italy?"}}, expected_outputs={"router": {"correct_replies": ["Rome"]}}, expected_run_order=["prompt_builder", "generator", "router", "prompt_builder", "generator", "router"], ) @@ -1809,7 +1809,11 @@ def run(self, create_document: bool = False): ], ) -@given("a pipeline that has a variadic component that receives partial inputs in a different order", target_fixture="pipeline_data") + +@given( + "a pipeline that has a variadic component that receives partial inputs in a different order", + target_fixture="pipeline_data", +) def that_has_a_variadic_component_that_receives_partial_inputs_different_order(): @component class ConditionalDocumentCreator: @@ -2281,6 +2285,7 @@ def that_has_a_string_variadic_component(): ], ) + @given("a pipeline that is an agent that can use RAG", target_fixture="pipeline_data") def an_agent_that_can_use_RAG(): @component @@ -2288,6 +2293,7 @@ class FixedGenerator: def __init__(self, replies): self.replies = replies self.idx = 0 + @component.output_types(replies=List[str]) def run(self, prompt: str): if self.idx < len(self.replies): @@ -2304,7 +2310,11 @@ def run(self, prompt: str): class FakeRetriever: @component.output_types(documents=List[Document]) def run(self, query: str): - return {"documents": [Document(content="This is a document potentially answering the question.", meta={"access_group": 1})]} + return { + "documents": [ + Document(content="This is a document potentially answering the question.", meta={"access_group": 1}) + ] + } agent_prompt_template = """ Your task is to answer the user's question. @@ -2379,16 +2389,17 @@ def run(self, query: str): pp.connect("joiner.value", "concatenator.current_prompt") pp.connect("concatenator.output", "joiner.value") - - - query = "Does this run reliably?" return ( pp, [ PipelineRunData( - inputs={"agent_prompt": {"query": query}, "rag_prompt": {"query": query}, "answer_builder": {"query": query}}, + inputs={ + "agent_prompt": {"query": query}, + "rag_prompt": {"query": query}, + "answer_builder": {"query": query}, + }, expected_outputs={ "answer_builder": { "answers": [GeneratedAnswer(data="answer: here is my answer", query=query, documents=[])] @@ -2406,12 +2417,13 @@ def run(self, query: str): "joiner", "agent_llm", "router", - "answer_builder" + "answer_builder", ], ) ], ) + @given("a pipeline that has a feedback loop", target_fixture="pipeline_data") def has_feedback_loop(): @component @@ -2419,6 +2431,7 @@ class FixedGenerator: def __init__(self, replies): self.replies = replies self.idx = 0 + @component.output_types(replies=List[str]) def run(self, prompt: str): if self.idx < len(self.replies): @@ -2431,7 +2444,6 @@ def run(self, prompt: str): return {"replies": replies} - code_prompt_template = """ Generate code to solve the task: {{ task }} @@ -2494,9 +2506,6 @@ def run(self, prompt: str): pp.connect("code_llm.replies", "concatenator.current_prompt") pp.connect("concatenator.output", "code_prompt.feedback") - - - task = "Generate code to generate christmas ascii-art" return ( @@ -2505,17 +2514,27 @@ def run(self, prompt: str): PipelineRunData( inputs={"code_prompt": {"task": task}, "answer_builder": {"query": task}}, expected_outputs={ - "answer_builder": { - "answers": [GeneratedAnswer(data="valid code", query=task, documents=[])] - } + "answer_builder": {"answers": [GeneratedAnswer(data="valid code", query=task, documents=[])]} }, expected_run_order=[ - 'code_prompt', 'code_llm', 'feedback_prompt', 'feedback_llm', 'router', 'concatenator', - 'code_prompt', 'code_llm', 'feedback_prompt', 'feedback_llm', 'router', 'answer_builder'], + "code_prompt", + "code_llm", + "feedback_prompt", + "feedback_llm", + "router", + "concatenator", + "code_prompt", + "code_llm", + "feedback_prompt", + "feedback_llm", + "router", + "answer_builder", + ], ) ], ) + @given("a pipeline created in a non-standard order that has a loop", target_fixture="pipeline_data") def has_non_standard_order_loop(): @component @@ -2523,6 +2542,7 @@ class FixedGenerator: def __init__(self, replies): self.replies = replies self.idx = 0 + @component.output_types(replies=List[str]) def run(self, prompt: str): if self.idx < len(self.replies): @@ -2535,7 +2555,6 @@ def run(self, prompt: str): return {"replies": replies} - code_prompt_template = """ Generate code to solve the task: {{ task }} @@ -2551,9 +2570,6 @@ def run(self, prompt: str): Provide additional feedback on why it fails. """ - - - code_llm = FixedGenerator(replies=["invalid code", "valid code"]) code_prompt = PromptBuilder(template=code_prompt_template) @@ -2592,7 +2608,6 @@ def run(self, prompt: str): pp.add_component("answer_builder", answer_builder) - pp.connect("concatenator.output", "code_prompt.feedback") pp.connect("code_prompt.prompt", "code_llm.prompt") pp.connect("code_llm.replies", "feedback_prompt.code") @@ -2603,7 +2618,6 @@ def run(self, prompt: str): pp.connect("code_llm.replies", "router.code") pp.connect("code_llm.replies", "concatenator.current_prompt") - task = "Generate code to generate christmas ascii-art" return ( @@ -2612,17 +2626,27 @@ def run(self, prompt: str): PipelineRunData( inputs={"code_prompt": {"task": task}, "answer_builder": {"query": task}}, expected_outputs={ - "answer_builder": { - "answers": [GeneratedAnswer(data="valid code", query=task, documents=[])] - } + "answer_builder": {"answers": [GeneratedAnswer(data="valid code", query=task, documents=[])]} }, expected_run_order=[ - 'code_prompt', 'code_llm', 'feedback_prompt', 'feedback_llm', 'router', 'concatenator', - 'code_prompt', 'code_llm', 'feedback_prompt', 'feedback_llm', 'router', 'answer_builder'], + "code_prompt", + "code_llm", + "feedback_prompt", + "feedback_llm", + "router", + "concatenator", + "code_prompt", + "code_llm", + "feedback_prompt", + "feedback_llm", + "router", + "answer_builder", + ], ) ], ) + @given("a pipeline that has an agent with a feedback cycle", target_fixture="pipeline_data") def agent_with_feedback_cycle(): @component @@ -2630,14 +2654,16 @@ class FixedGenerator: def __init__(self, replies): self.replies = replies self.idx = 0 + @component.output_types(replies=List[str]) def run(self, prompt: str): if self.idx < len(self.replies): replies = [self.replies[self.idx]] self.idx += 1 else: - self.idx = 1 + self.idx = 0 replies = [self.replies[self.idx]] + self.idx += 1 return {"replies": replies} @@ -2647,7 +2673,6 @@ class FakeFileEditor: def run(self, replies: List[str]): return {"files": "This is the edited file content."} - code_prompt_template = """ Generate code to solve the task: {{ task }} @@ -2694,7 +2719,6 @@ def run(self, replies: List[str]): ] feedback_router = ConditionalRouter(routes=routes) - tool_use_routes = [ { "condition": "{{ 'Edit:' in replies[0] }}", @@ -2711,11 +2735,9 @@ def run(self, replies: List[str]): ] tool_use_router = ConditionalRouter(routes=tool_use_routes) - joiner = BranchJoiner(type_=str) agent_concatenator = OutputAdapter(template="{{current_prompt + '\n' + files}}", output_type=str) - pp = Pipeline(max_runs_per_component=100) pp.add_component("code_prompt", code_prompt) @@ -2728,7 +2750,6 @@ def run(self, replies: List[str]): pp.add_component("feedback_llm", feedback_llm) pp.add_component("feedback_router", feedback_router) - # Main Agent pp.connect("code_prompt.prompt", "joiner.value") pp.connect("joiner.value", "code_llm.prompt") @@ -2746,7 +2767,6 @@ def run(self, replies: List[str]): pp.connect("agent_concatenator.output", "feedback_router.current_prompt") pp.connect("feedback_router.fail", "joiner.value") - task = "Generate code to generate christmas ascii-art" return ( @@ -2754,24 +2774,52 @@ def run(self, replies: List[str]): [ PipelineRunData( inputs={"code_prompt": {"task": task}}, - expected_outputs={ - "feedback_router": { - "pass": ["PASS"] - } - }, + expected_outputs={"feedback_router": {"pass": ["PASS"]}}, expected_run_order=[ - 'code_prompt', - - "joiner", "code_llm", "tool_use_router", "file_editor", "agent_concatenator", - "joiner", "code_llm", "tool_use_router", "file_editor", "agent_concatenator", - "joiner", "code_llm", "tool_use_router", "file_editor", "agent_concatenator", - "joiner", "code_llm", "tool_use_router", "feedback_prompt", "feedback_llm", "feedback_router", - - "joiner", "code_llm", "tool_use_router", "file_editor", "agent_concatenator", - "joiner", "code_llm", "tool_use_router", "file_editor", "agent_concatenator", - "joiner", "code_llm", "tool_use_router", "file_editor", "agent_concatenator", - "joiner", "code_llm", "tool_use_router", "feedback_prompt", "feedback_llm", "feedback_router" + "code_prompt", + "joiner", + "code_llm", + "tool_use_router", + "file_editor", + "agent_concatenator", + "joiner", + "code_llm", + "tool_use_router", + "file_editor", + "agent_concatenator", + "joiner", + "code_llm", + "tool_use_router", + "file_editor", + "agent_concatenator", + "joiner", + "code_llm", + "tool_use_router", + "feedback_prompt", + "feedback_llm", + "feedback_router", + "joiner", + "code_llm", + "tool_use_router", + "file_editor", + "agent_concatenator", + "joiner", + "code_llm", + "tool_use_router", + "file_editor", + "agent_concatenator", + "joiner", + "code_llm", + "tool_use_router", + "file_editor", + "agent_concatenator", + "joiner", + "code_llm", + "tool_use_router", + "feedback_prompt", + "feedback_llm", + "feedback_router", ], ) ], - ) \ No newline at end of file + ) From 438ffaaadbf325f689bf421d1072ae191df9b250 Mon Sep 17 00:00:00 2001 From: mathislucka Date: Thu, 9 Jan 2025 16:01:50 +0100 Subject: [PATCH 3/8] chore: format --- haystack/components/audio/whisper_local.py | 2 +- haystack/components/converters/openapi_functions.py | 3 +-- .../components/generators/chat/hugging_face_local.py | 2 +- haystack/components/rankers/lost_in_the_middle.py | 4 ++-- haystack/core/component/component.py | 10 +++++----- haystack/core/pipeline/draw.py | 6 +++--- haystack/document_stores/in_memory/document_store.py | 9 +++------ haystack/marshal/yaml.py | 3 +-- haystack/utils/filters.py | 3 +-- haystack/utils/hf.py | 2 +- test/components/audio/test_whisper_local.py | 12 ++++++------ .../converters/test_docx_file_to_document.py | 6 +++--- .../embedders/test_openai_document_embedder.py | 6 +++--- .../embedders/test_openai_text_embedder.py | 6 +++--- test/components/joiners/test_document_joiner.py | 6 +++--- .../preprocessors/test_document_cleaner.py | 8 ++------ test/components/routers/test_conditional_router.py | 6 +++--- 17 files changed, 42 insertions(+), 52 deletions(-) diff --git a/haystack/components/audio/whisper_local.py b/haystack/components/audio/whisper_local.py index 79ac83b144..54ec15c6f8 100644 --- a/haystack/components/audio/whisper_local.py +++ b/haystack/components/audio/whisper_local.py @@ -72,7 +72,7 @@ def __init__( whisper_import.check() if model not in get_args(WhisperLocalModel): raise ValueError( - f"Model name '{model}' not recognized. Choose one among: " f"{', '.join(get_args(WhisperLocalModel))}." + f"Model name '{model}' not recognized. Choose one among: {', '.join(get_args(WhisperLocalModel))}." ) self.model = model self.whisper_params = whisper_params or {} diff --git a/haystack/components/converters/openapi_functions.py b/haystack/components/converters/openapi_functions.py index acc5d2a232..0d13b9c59b 100644 --- a/haystack/components/converters/openapi_functions.py +++ b/haystack/components/converters/openapi_functions.py @@ -249,8 +249,7 @@ def _parse_openapi_spec(self, content: str) -> Dict[str, Any]: open_api_spec_content = yaml.safe_load(content) except yaml.YAMLError: error_message = ( - "Failed to parse the OpenAPI specification. " - "The content does not appear to be valid JSON or YAML.\n\n" + "Failed to parse the OpenAPI specification. The content does not appear to be valid JSON or YAML.\n\n" ) raise RuntimeError(error_message, content) diff --git a/haystack/components/generators/chat/hugging_face_local.py b/haystack/components/generators/chat/hugging_face_local.py index 1ad152f1e3..a79a6dcfa8 100644 --- a/haystack/components/generators/chat/hugging_face_local.py +++ b/haystack/components/generators/chat/hugging_face_local.py @@ -149,7 +149,7 @@ def __init__( # pylint: disable=too-many-positional-arguments if task not in PIPELINE_SUPPORTED_TASKS: raise ValueError( - f"Task '{task}' is not supported. " f"The supported tasks are: {', '.join(PIPELINE_SUPPORTED_TASKS)}." + f"Task '{task}' is not supported. The supported tasks are: {', '.join(PIPELINE_SUPPORTED_TASKS)}." ) huggingface_pipeline_kwargs["task"] = task diff --git a/haystack/components/rankers/lost_in_the_middle.py b/haystack/components/rankers/lost_in_the_middle.py index f757fadddc..01df8fde30 100644 --- a/haystack/components/rankers/lost_in_the_middle.py +++ b/haystack/components/rankers/lost_in_the_middle.py @@ -51,7 +51,7 @@ def __init__(self, word_count_threshold: Optional[int] = None, top_k: Optional[i """ if isinstance(word_count_threshold, int) and word_count_threshold <= 0: raise ValueError( - f"Invalid value for word_count_threshold: {word_count_threshold}. " f"word_count_threshold must be > 0." + f"Invalid value for word_count_threshold: {word_count_threshold}. word_count_threshold must be > 0." ) if isinstance(top_k, int) and top_k <= 0: raise ValueError(f"top_k must be > 0, but got {top_k}") @@ -78,7 +78,7 @@ def run( """ if isinstance(word_count_threshold, int) and word_count_threshold <= 0: raise ValueError( - f"Invalid value for word_count_threshold: {word_count_threshold}. " f"word_count_threshold must be > 0." + f"Invalid value for word_count_threshold: {word_count_threshold}. word_count_threshold must be > 0." ) if isinstance(top_k, int) and top_k <= 0: raise ValueError(f"top_k must be > 0, but got {top_k}") diff --git a/haystack/core/component/component.py b/haystack/core/component/component.py index 567faa4871..d77fd77593 100644 --- a/haystack/core/component/component.py +++ b/haystack/core/component/component.py @@ -268,9 +268,9 @@ def __call__(cls, *args, **kwargs): try: pre_init_hook.in_progress = True named_positional_args = ComponentMeta._positional_to_kwargs(cls, args) - assert ( - set(named_positional_args.keys()).intersection(kwargs.keys()) == set() - ), "positional and keyword arguments overlap" + assert set(named_positional_args.keys()).intersection(kwargs.keys()) == set(), ( + "positional and keyword arguments overlap" + ) kwargs.update(named_positional_args) pre_init_hook.callback(cls, kwargs) instance = super().__call__(**kwargs) @@ -309,8 +309,8 @@ def _component_repr(component: Component) -> str: # We're explicitly ignoring the type here because we're sure that the component # has the __haystack_input__ and __haystack_output__ attributes at this point return ( - f'{result}\n{getattr(component, "__haystack_input__", "")}' - f'\n{getattr(component, "__haystack_output__", "")}' + f"{result}\n{getattr(component, '__haystack_input__', '')}" + f"\n{getattr(component, '__haystack_output__', '')}" ) diff --git a/haystack/core/pipeline/draw.py b/haystack/core/pipeline/draw.py index 83df791515..2e24bf9acd 100644 --- a/haystack/core/pipeline/draw.py +++ b/haystack/core/pipeline/draw.py @@ -124,7 +124,7 @@ def _to_mermaid_text(graph: networkx.MultiDiGraph) -> str: } states = { - comp: f"{comp}[\"{comp}
{type(data['instance']).__name__}{optional_inputs[comp]}\"]:::component" # noqa + comp: f'{comp}["{comp}
{type(data["instance"]).__name__}{optional_inputs[comp]}"]:::component' # noqa for comp, data in graph.nodes(data=True) if comp not in ["input", "output"] } @@ -139,11 +139,11 @@ def _to_mermaid_text(graph: networkx.MultiDiGraph) -> str: connections_list.append(conn_string) input_connections = [ - f"i{{*}}--\"{conn_data['label']}
{conn_data['conn_type']}\"--> {states[to_comp]}" + f'i{{*}}--"{conn_data["label"]}
{conn_data["conn_type"]}"--> {states[to_comp]}' for _, to_comp, conn_data in graph.out_edges("input", data=True) ] output_connections = [ - f"{states[from_comp]}--\"{conn_data['label']}
{conn_data['conn_type']}\"--> o{{*}}" + f'{states[from_comp]}--"{conn_data["label"]}
{conn_data["conn_type"]}"--> o{{*}}' for from_comp, _, conn_data in graph.in_edges("output", data=True) ] connections = "\n".join(connections_list + input_connections + output_connections) diff --git a/haystack/document_stores/in_memory/document_store.py b/haystack/document_stores/in_memory/document_store.py index 1aea1e50c3..ad469a002c 100644 --- a/haystack/document_stores/in_memory/document_store.py +++ b/haystack/document_stores/in_memory/document_store.py @@ -396,8 +396,7 @@ def filter_documents(self, filters: Optional[Dict[str, Any]] = None) -> List[Doc if filters: if "operator" not in filters and "conditions" not in filters: raise ValueError( - "Invalid filter syntax. See https://docs.haystack.deepset.ai/docs/metadata-filtering " - "for details." + "Invalid filter syntax. See https://docs.haystack.deepset.ai/docs/metadata-filtering for details." ) return [doc for doc in self.storage.values() if document_matches_filter(filters=filters, document=doc)] return list(self.storage.values()) @@ -506,8 +505,7 @@ def bm25_retrieval( if filters: if "operator" not in filters: raise ValueError( - "Invalid filter syntax. See https://docs.haystack.deepset.ai/docs/metadata-filtering " - "for details." + "Invalid filter syntax. See https://docs.haystack.deepset.ai/docs/metadata-filtering for details." ) filters = {"operator": "AND", "conditions": [content_type_filter, filters]} else: @@ -574,8 +572,7 @@ def embedding_retrieval( # pylint: disable=too-many-positional-arguments return [] elif len(documents_with_embeddings) < len(all_documents): logger.info( - "Skipping some Documents that don't have an embedding. " - "To generate embeddings, use a DocumentEmbedder." + "Skipping some Documents that don't have an embedding. To generate embeddings, use a DocumentEmbedder." ) scores = self._compute_query_embedding_similarity_scores( diff --git a/haystack/marshal/yaml.py b/haystack/marshal/yaml.py index 615cd1916a..b9c5ffdf41 100644 --- a/haystack/marshal/yaml.py +++ b/haystack/marshal/yaml.py @@ -31,8 +31,7 @@ def marshal(self, dict_: Dict[str, Any]) -> str: return yaml.dump(dict_, Dumper=YamlDumper) except yaml.representer.RepresenterError as e: raise TypeError( - "Error dumping pipeline to YAML - Ensure that all pipeline " - "components only serialize basic Python types" + "Error dumping pipeline to YAML - Ensure that all pipeline components only serialize basic Python types" ) from e def unmarshal(self, data_: Union[str, bytes, bytearray]) -> Dict[str, Any]: diff --git a/haystack/utils/filters.py b/haystack/utils/filters.py index c8a3133e3c..bddb422efe 100644 --- a/haystack/utils/filters.py +++ b/haystack/utils/filters.py @@ -112,8 +112,7 @@ def _less_than_equal(document_value: Any, filter_value: Any) -> bool: def _in(document_value: Any, filter_value: Any) -> bool: if not isinstance(filter_value, list): msg = ( - f"Filter value must be a `list` when using operator 'in' or 'not in', " - f"received type '{type(filter_value)}'" + f"Filter value must be a `list` when using operator 'in' or 'not in', received type '{type(filter_value)}'" ) raise FilterError(msg) return any(_equal(e, document_value) for e in filter_value) diff --git a/haystack/utils/hf.py b/haystack/utils/hf.py index 6a83594ada..7ddca03046 100644 --- a/haystack/utils/hf.py +++ b/haystack/utils/hf.py @@ -205,7 +205,7 @@ def resolve_hf_pipeline_kwargs( # pylint: disable=too-many-positional-arguments task = model_info(huggingface_pipeline_kwargs["model"], token=huggingface_pipeline_kwargs["token"]).pipeline_tag if task not in supported_tasks: - raise ValueError(f"Task '{task}' is not supported. " f"The supported tasks are: {', '.join(supported_tasks)}.") + raise ValueError(f"Task '{task}' is not supported. The supported tasks are: {', '.join(supported_tasks)}.") huggingface_pipeline_kwargs["task"] = task return huggingface_pipeline_kwargs diff --git a/test/components/audio/test_whisper_local.py b/test/components/audio/test_whisper_local.py index 28463c4ce6..394a9c4000 100644 --- a/test/components/audio/test_whisper_local.py +++ b/test/components/audio/test_whisper_local.py @@ -190,14 +190,14 @@ def test_whisper_local_transcriber(self, test_files_path): docs = output["documents"] assert len(docs) == 3 - assert all( - word in docs[0].content.strip().lower() for word in {"content", "the", "document"} - ), f"Expected words not found in: {docs[0].content.strip().lower()}" + assert all(word in docs[0].content.strip().lower() for word in {"content", "the", "document"}), ( + f"Expected words not found in: {docs[0].content.strip().lower()}" + ) assert test_files_path / "audio" / "this is the content of the document.wav" == docs[0].meta["audio_file"] - assert all( - word in docs[1].content.strip().lower() for word in {"context", "answer"} - ), f"Expected words not found in: {docs[1].content.strip().lower()}" + assert all(word in docs[1].content.strip().lower() for word in {"context", "answer"}), ( + f"Expected words not found in: {docs[1].content.strip().lower()}" + ) path = test_files_path / "audio" / "the context for this answer is here.wav" assert path.absolute() == docs[1].meta["audio_file"] diff --git a/test/components/converters/test_docx_file_to_document.py b/test/components/converters/test_docx_file_to_document.py index 9b4ee3fe60..c013759938 100644 --- a/test/components/converters/test_docx_file_to_document.py +++ b/test/components/converters/test_docx_file_to_document.py @@ -176,9 +176,9 @@ def test_run_with_table(self, test_files_path): table_index = next(i for i, part in enumerate(content_parts) if "| This | Is | Just a |" in part) # check that natural order of the document is preserved assert any("Donald Trump" in part for part in content_parts[:table_index]), "Text before table not found" - assert any( - "Now we are in Page 2" in part for part in content_parts[table_index + 1 :] - ), "Text after table not found" + assert any("Now we are in Page 2" in part for part in content_parts[table_index + 1 :]), ( + "Text after table not found" + ) def test_run_with_store_full_path_false(self, test_files_path): """ diff --git a/test/components/embedders/test_openai_document_embedder.py b/test/components/embedders/test_openai_document_embedder.py index 87ed6afbb6..7d43bcfa83 100644 --- a/test/components/embedders/test_openai_document_embedder.py +++ b/test/components/embedders/test_openai_document_embedder.py @@ -251,8 +251,8 @@ def test_run(self): assert len(doc.embedding) == 1536 assert all(isinstance(x, float) for x in doc.embedding) - assert ( - "text" in result["meta"]["model"] and "ada" in result["meta"]["model"] - ), "The model name does not contain 'text' and 'ada'" + assert "text" in result["meta"]["model"] and "ada" in result["meta"]["model"], ( + "The model name does not contain 'text' and 'ada'" + ) assert result["meta"]["usage"] == {"prompt_tokens": 15, "total_tokens": 15}, "Usage information does not match" diff --git a/test/components/embedders/test_openai_text_embedder.py b/test/components/embedders/test_openai_text_embedder.py index 31a0360555..695e6351f0 100644 --- a/test/components/embedders/test_openai_text_embedder.py +++ b/test/components/embedders/test_openai_text_embedder.py @@ -130,8 +130,8 @@ def test_run(self): assert len(result["embedding"]) == 1536 assert all(isinstance(x, float) for x in result["embedding"]) - assert ( - "text" in result["meta"]["model"] and "ada" in result["meta"]["model"] - ), "The model name does not contain 'text' and 'ada'" + assert "text" in result["meta"]["model"] and "ada" in result["meta"]["model"], ( + "The model name does not contain 'text' and 'ada'" + ) assert result["meta"]["usage"] == {"prompt_tokens": 6, "total_tokens": 6}, "Usage information does not match" diff --git a/test/components/joiners/test_document_joiner.py b/test/components/joiners/test_document_joiner.py index 6cc4f5f9e0..8160fdc48a 100644 --- a/test/components/joiners/test_document_joiner.py +++ b/test/components/joiners/test_document_joiner.py @@ -302,6 +302,6 @@ def test_test_score_norm_with_rrf(self): for i in range(len(join_results["documents"]) - 1) ) - assert ( - is_sorted - ), "Documents are not sorted in descending order by score, there is an issue with rff ranking" + assert is_sorted, ( + "Documents are not sorted in descending order by score, there is an issue with rff ranking" + ) diff --git a/test/components/preprocessors/test_document_cleaner.py b/test/components/preprocessors/test_document_cleaner.py index 5f5633a2c4..0cc929e059 100644 --- a/test/components/preprocessors/test_document_cleaner.py +++ b/test/components/preprocessors/test_document_cleaner.py @@ -71,7 +71,7 @@ def test_remove_whitespaces(self): ) assert len(result["documents"]) == 1 assert result["documents"][0].content == ( - "This is a text with some words. " "" "There is a second sentence. " "" "And there is a third sentence.\f" + "This is a text with some words. There is a second sentence. And there is a third sentence.\f" ) def test_remove_substrings(self): @@ -210,11 +210,7 @@ def test_ascii_only(self): def test_other_document_fields_are_not_lost(self): cleaner = DocumentCleaner(keep_id=True) document = Document( - content="This is a text with some words. \n" - "" - "There is a second sentence. \n" - "" - "And there is a third sentence.\n", + content="This is a text with some words. \nThere is a second sentence. \nAnd there is a third sentence.\n", dataframe=DataFrame({"col1": [1], "col2": [2]}), blob=ByteStream.from_string("some_data"), meta={"data": 1}, diff --git a/test/components/routers/test_conditional_router.py b/test/components/routers/test_conditional_router.py index 66d941b645..478e62d5bf 100644 --- a/test/components/routers/test_conditional_router.py +++ b/test/components/routers/test_conditional_router.py @@ -436,9 +436,9 @@ def test_router_with_optional_parameters(self): # Test pipeline without path parameter result = pipe.run(data={"router": {"question": "What?"}}) - assert result["router"] == { - "fallback": "What?" - }, "Default route should work in pipeline when 'path' is not provided" + assert result["router"] == {"fallback": "What?"}, ( + "Default route should work in pipeline when 'path' is not provided" + ) # Test pipeline with path parameter result = pipe.run(data={"router": {"question": "What?", "path": "followup_short"}}) From 7461a3998d35e5cb76cddc1630b3ea8283c08e28 Mon Sep 17 00:00:00 2001 From: mathislucka Date: Thu, 9 Jan 2025 17:04:46 +0100 Subject: [PATCH 4/8] chore: trigger CI --- test/core/pipeline/features/test_run.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/core/pipeline/features/test_run.py b/test/core/pipeline/features/test_run.py index 673b06b92c..500f8f84d1 100644 --- a/test/core/pipeline/features/test_run.py +++ b/test/core/pipeline/features/test_run.py @@ -719,7 +719,7 @@ def pipeline_that_has_a_component_that_doesnt_return_a_dictionary(): pipe.add_component("comp", BrokenComponent()) return pipe, [PipelineRunData({"comp": {"a": 1}})] - +# Run again @given( "a pipeline that has components added in a different order from the order of execution", target_fixture="pipeline_data", From 2f525a8292fcc33d87dc20e0bdcc4a472a1ab255 Mon Sep 17 00:00:00 2001 From: mathislucka Date: Thu, 9 Jan 2025 17:08:30 +0100 Subject: [PATCH 5/8] chore: format --- test/core/pipeline/features/test_run.py | 1 + 1 file changed, 1 insertion(+) diff --git a/test/core/pipeline/features/test_run.py b/test/core/pipeline/features/test_run.py index 500f8f84d1..5aa8ffc7f4 100644 --- a/test/core/pipeline/features/test_run.py +++ b/test/core/pipeline/features/test_run.py @@ -719,6 +719,7 @@ def pipeline_that_has_a_component_that_doesnt_return_a_dictionary(): pipe.add_component("comp", BrokenComponent()) return pipe, [PipelineRunData({"comp": {"a": 1}})] + # Run again @given( "a pipeline that has components added in a different order from the order of execution", From 80e8ab62e4a7576f8cb8089043b1c918e82f5d95 Mon Sep 17 00:00:00 2001 From: mathislucka Date: Thu, 9 Jan 2025 22:29:28 +0100 Subject: [PATCH 6/8] WIP: cycles don't execute twice if we empty the waiting queue --- haystack/core/pipeline/pipeline.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/haystack/core/pipeline/pipeline.py b/haystack/core/pipeline/pipeline.py index 622c4ef6d7..a31ec6c7d0 100644 --- a/haystack/core/pipeline/pipeline.py +++ b/haystack/core/pipeline/pipeline.py @@ -449,6 +449,12 @@ def run( # noqa: PLR0915, PLR0912 # So we reset it given the output returned by the subgraph. run_queue = [] + # This prevents a cycle from running twice. + # It's not a full fix for all the issues, it's only here to explain what is happening. + for component_name, data in self.graph.nodes.items(): + if component_name in cycles[0]: + _dequeue_component((component_name, data["instance"]), run_queue=run_queue, waiting_queue=waiting_queue) + # Reset the waiting for input previous states, we managed to run at least one component before_last_waiting_queue = None last_waiting_queue = None From 4d23551fff82937e7307b1314b6c596ae78808c6 Mon Sep 17 00:00:00 2001 From: mathislucka Date: Tue, 4 Feb 2025 14:57:52 +0100 Subject: [PATCH 7/8] Revert "WIP: cycles don't execute twice if we empty the waiting queue" This reverts commit 80e8ab62e4a7576f8cb8089043b1c918e82f5d95. --- haystack/core/pipeline/pipeline.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/haystack/core/pipeline/pipeline.py b/haystack/core/pipeline/pipeline.py index a31ec6c7d0..622c4ef6d7 100644 --- a/haystack/core/pipeline/pipeline.py +++ b/haystack/core/pipeline/pipeline.py @@ -449,12 +449,6 @@ def run( # noqa: PLR0915, PLR0912 # So we reset it given the output returned by the subgraph. run_queue = [] - # This prevents a cycle from running twice. - # It's not a full fix for all the issues, it's only here to explain what is happening. - for component_name, data in self.graph.nodes.items(): - if component_name in cycles[0]: - _dequeue_component((component_name, data["instance"]), run_queue=run_queue, waiting_queue=waiting_queue) - # Reset the waiting for input previous states, we managed to run at least one component before_last_waiting_queue = None last_waiting_queue = None From ed5f5b77ee56b8b0de3484501b72d0e89c958d28 Mon Sep 17 00:00:00 2001 From: mathislucka Date: Tue, 4 Feb 2025 15:25:15 +0100 Subject: [PATCH 8/8] add test for file conversion failure --- .../pipeline/features/pipeline_run.feature | 1 + test/core/pipeline/features/test_run.py | 105 +++++++++++++++++- 2 files changed, 102 insertions(+), 4 deletions(-) diff --git a/test/core/pipeline/features/pipeline_run.feature b/test/core/pipeline/features/pipeline_run.feature index 398c4f3c2c..2e93fbc8a7 100644 --- a/test/core/pipeline/features/pipeline_run.feature +++ b/test/core/pipeline/features/pipeline_run.feature @@ -49,6 +49,7 @@ Feature: Pipeline running | that has a feedback loop | | created in a non-standard order that has a loop | | that has an agent with a feedback cycle | + | that is a file conversion pipeline with two joiners | Scenario Outline: Running a bad Pipeline Given a pipeline diff --git a/test/core/pipeline/features/test_run.py b/test/core/pipeline/features/test_run.py index 5aa8ffc7f4..1c1a9e7db7 100644 --- a/test/core/pipeline/features/test_run.py +++ b/test/core/pipeline/features/test_run.py @@ -7,11 +7,11 @@ from haystack import Pipeline, Document, component from haystack.document_stores.types import DuplicatePolicy -from haystack.dataclasses import ChatMessage, GeneratedAnswer -from haystack.components.routers import ConditionalRouter +from haystack.dataclasses import ChatMessage, GeneratedAnswer, ByteStream +from haystack.components.routers import ConditionalRouter, FileTypeRouter from haystack.components.builders import PromptBuilder, AnswerBuilder, ChatPromptBuilder -from haystack.components.converters import OutputAdapter -from haystack.components.preprocessors import DocumentCleaner +from haystack.components.converters import OutputAdapter, TextFileToDocument, CSVToDocument, JSONConverter +from haystack.components.preprocessors import DocumentCleaner, DocumentSplitter from haystack.components.retrievers.in_memory import InMemoryBM25Retriever from haystack.document_stores.in_memory import InMemoryDocumentStore from haystack.components.joiners import BranchJoiner, DocumentJoiner, AnswerJoiner, StringJoiner @@ -32,6 +32,7 @@ StringListJoiner, ) from haystack.testing.factory import component_class +from test.components.converters.test_csv_to_document import csv_converter from test.core.pipeline.features.conftest import PipelineRunData @@ -2824,3 +2825,99 @@ def run(self, replies: List[str]): ) ], ) + + +@given("a pipeline that is a file conversion pipeline with two joiners", target_fixture="pipeline_data") +def pipeline_that_converts_files(): + csv_data = """ +some,header,row +0,1,0 + """ + + txt_data = "Text file content for testing this." + + json_data = '{"content": "Some test content"}' + + sources = [ + ByteStream.from_string(text=csv_data, mime_type="text/csv", meta={"file_type": "csv"}), + ByteStream.from_string(text=txt_data, mime_type="text/plain", meta={"file_type": "txt"}), + ByteStream.from_string(text=json_data, mime_type="application/json", meta={"file_type": "json"}), + ] + + router = FileTypeRouter(mime_types=["text/csv", "text/plain", "application/json"]) + splitter = DocumentSplitter(split_by="word", split_length=3, split_overlap=0) + txt_converter = TextFileToDocument() + csv_converter = CSVToDocument() + json_converter = JSONConverter(content_key="content") + + b_joiner = DocumentJoiner() + a_joiner = DocumentJoiner() + + pp = Pipeline(max_runs_per_component=1) + + pp.add_component("router", router) + pp.add_component("splitter", splitter) + pp.add_component("txt_converter", txt_converter) + pp.add_component("csv_converter", csv_converter) + pp.add_component("json_converter", json_converter) + pp.add_component("b_joiner", b_joiner) + pp.add_component("a_joiner", a_joiner) + + pp.connect("router.text/plain", "txt_converter.sources") + pp.connect("router.application/json", "json_converter.sources") + pp.connect("router.text/csv", "csv_converter.sources") + pp.connect("txt_converter.documents", "b_joiner.documents") + pp.connect("json_converter.documents", "b_joiner.documents") + pp.connect("csv_converter.documents", "a_joiner.documents") + pp.connect("b_joiner.documents", "splitter.documents") + pp.connect("splitter.documents", "a_joiner.documents") + + return ( + pp, + [ + PipelineRunData( + inputs={"router": {"sources": sources}}, + expected_outputs={ + "a_joiner": { + "documents": [ + Document(content=csv_data, meta={"file_type": "csv"}), + Document( + id="191726aa5b9aac93f376cc9b364cd07ee5c2a957383ed6d9b901cfc61d0a2b0f", + content="Text file content ", + meta={ + "file_type": "txt", + "source_id": "41cb91740f6e64ab542122936ea746c238ae0a92fd29b698efabbe23d0ba4c42", + "page_number": 1, + "split_id": 0, + "split_idx_start": 0, + }, + ), + Document( + id="424e39c85775b9dc32d61406cbe9b19ba234a4a2f26f59a1285120f10bb38ca1", + content="for testing this.", + meta={ + "file_type": "txt", + "source_id": "41cb91740f6e64ab542122936ea746c238ae0a92fd29b698efabbe23d0ba4c42", + "page_number": 1, + "split_id": 1, + "split_idx_start": 18, + }, + ), + Document( + id="d6145ad0bf5454b0e0c56f14b1da11c95076fb7961a2df1631d191ba8e174cf6", + content="Some test content", + meta={ + "file_type": "json", + "source_id": "0c6c5951d18da2935c7af3e24d417a9f94ca85403866dcfee1de93922504e1e5", + "page_number": 1, + "split_id": 0, + "split_idx_start": 0, + }, + ), + ] + } + }, + expected_run_order=[], + ) + ], + )