diff --git a/test/core/pipeline/features/pipeline_run.feature b/test/core/pipeline/features/pipeline_run.feature index db064ea2b1..2e93fbc8a7 100644 --- a/test/core/pipeline/features/pipeline_run.feature +++ b/test/core/pipeline/features/pipeline_run.feature @@ -39,11 +39,17 @@ Feature: Pipeline running | that is linear with conditional branching and multiple joins | | that is a simple agent | | that has a variadic component that receives partial inputs | + | that has a variadic component that receives partial inputs in a different order | | that has an answer joiner variadic component | | that is linear and a component in the middle receives optional input from other components and input from the user | | that has a loop in the middle | | that has variadic component that receives a conditional input | | that has a string variadic component | + | that is an agent that can use RAG | + | that has a feedback loop | + | created in a non-standard order that has a loop | + | that has an agent with a feedback cycle | + | that is a file conversion pipeline with two joiners | Scenario Outline: Running a bad Pipeline Given a pipeline diff --git a/test/core/pipeline/features/test_run.py b/test/core/pipeline/features/test_run.py index 652fea7c30..1c1a9e7db7 100644 --- a/test/core/pipeline/features/test_run.py +++ b/test/core/pipeline/features/test_run.py @@ -7,9 +7,10 @@ from haystack import Pipeline, Document, component from haystack.document_stores.types import DuplicatePolicy -from haystack.dataclasses import ChatMessage, GeneratedAnswer -from haystack.components.routers import ConditionalRouter +from haystack.dataclasses import ChatMessage, GeneratedAnswer, ByteStream +from haystack.components.routers import ConditionalRouter, FileTypeRouter from haystack.components.builders import PromptBuilder, AnswerBuilder, ChatPromptBuilder +from haystack.components.converters import OutputAdapter, TextFileToDocument, CSVToDocument, JSONConverter from haystack.components.preprocessors import DocumentCleaner, DocumentSplitter from haystack.components.retrievers.in_memory import InMemoryBM25Retriever from haystack.document_stores.in_memory import InMemoryDocumentStore @@ -31,6 +32,7 @@ StringListJoiner, ) from haystack.testing.factory import component_class +from test.components.converters.test_csv_to_document import csv_converter from test.core.pipeline.features.conftest import PipelineRunData @@ -719,6 +721,7 @@ def pipeline_that_has_a_component_that_doesnt_return_a_dictionary(): return pipe, [PipelineRunData({"comp": {"a": 1}})] +# Run again @given( "a pipeline that has components added in a different order from the order of execution", target_fixture="pipeline_data", @@ -1809,6 +1812,65 @@ def run(self, create_document: bool = False): ) +@given( + "a pipeline that has a variadic component that receives partial inputs in a different order", + target_fixture="pipeline_data", +) +def that_has_a_variadic_component_that_receives_partial_inputs_different_order(): + @component + class ConditionalDocumentCreator: + def __init__(self, content: str): + self._content = content + + @component.output_types(documents=List[Document], noop=None) + def run(self, create_document: bool = False): + if create_document: + return {"documents": [Document(id=self._content, content=self._content)]} + return {"noop": None} + + pipeline = Pipeline(max_runs_per_component=1) + pipeline.add_component("third_creator", ConditionalDocumentCreator(content="Third document")) + pipeline.add_component("first_creator", ConditionalDocumentCreator(content="First document")) + pipeline.add_component("second_creator", ConditionalDocumentCreator(content="Second document")) + pipeline.add_component("documents_joiner", DocumentJoiner()) + + pipeline.connect("first_creator.documents", "documents_joiner.documents") + pipeline.connect("second_creator.documents", "documents_joiner.documents") + pipeline.connect("third_creator.documents", "documents_joiner.documents") + + return ( + pipeline, + [ + PipelineRunData( + inputs={"first_creator": {"create_document": True}, "third_creator": {"create_document": True}}, + expected_outputs={ + "second_creator": {"noop": None}, + "documents_joiner": { + "documents": [ + Document(id="First document", content="First document"), + Document(id="Third document", content="Third document"), + ] + }, + }, + expected_run_order=["first_creator", "second_creator", "third_creator", "documents_joiner"], + ), + PipelineRunData( + inputs={"first_creator": {"create_document": True}, "second_creator": {"create_document": True}}, + expected_outputs={ + "third_creator": {"noop": None}, + "documents_joiner": { + "documents": [ + Document(id="First document", content="First document"), + Document(id="Second document", content="Second document"), + ] + }, + }, + expected_run_order=["first_creator", "second_creator", "third_creator", "documents_joiner"], + ), + ], + ) + + @given("a pipeline that has an answer joiner variadic component", target_fixture="pipeline_data") def that_has_an_answer_joiner_variadic_component(): query = "What's Natural Language Processing?" @@ -2224,3 +2286,638 @@ def that_has_a_string_variadic_component(): ) ], ) + + +@given("a pipeline that is an agent that can use RAG", target_fixture="pipeline_data") +def an_agent_that_can_use_RAG(): + @component + class FixedGenerator: + def __init__(self, replies): + self.replies = replies + self.idx = 0 + + @component.output_types(replies=List[str]) + def run(self, prompt: str): + if self.idx < len(self.replies): + replies = [self.replies[self.idx]] + self.idx += 1 + else: + self.idx = 0 + replies = [self.replies[self.idx]] + self.idx += 1 + + return {"replies": replies} + + @component + class FakeRetriever: + @component.output_types(documents=List[Document]) + def run(self, query: str): + return { + "documents": [ + Document(content="This is a document potentially answering the question.", meta={"access_group": 1}) + ] + } + + agent_prompt_template = """ +Your task is to answer the user's question. +You can use a RAG system to find information. +Use the RAG system until you have sufficient information to answer the question. +To use the RAG system, output "search:" followed by your question. +Once you have an answer, output "answer:" followed by your answer. + +Here is the question: {{query}} + """ + + rag_prompt_template = """ +Answer the question based on the provided documents. +Question: {{ query }} +Documents: +{% for document in documents %} +{{ document.content }} +{% endfor %} + """ + + joiner = BranchJoiner(type_=str) + + agent_llm = FixedGenerator(replies=["search: Can you help me?", "answer: here is my answer"]) + agent_prompt = PromptBuilder(template=agent_prompt_template) + + rag_llm = FixedGenerator(replies=["This is all the information I found!"]) + rag_prompt = PromptBuilder(template=rag_prompt_template) + + retriever = FakeRetriever() + + routes = [ + { + "condition": "{{ 'search:' in replies[0] }}", + "output": "{{ replies[0] }}", + "output_name": "search", + "output_type": str, + }, + { + "condition": "{{ 'answer:' in replies[0] }}", + "output": "{{ replies }}", + "output_name": "answer", + "output_type": List[str], + }, + ] + + router = ConditionalRouter(routes=routes) + + concatenator = OutputAdapter(template="{{current_prompt + '\n' + rag_answer[0]}}", output_type=str) + + answer_builder = AnswerBuilder() + + pp = Pipeline(max_runs_per_component=2) + + pp.add_component("joiner", joiner) + pp.add_component("rag_llm", rag_llm) + pp.add_component("rag_prompt", rag_prompt) + pp.add_component("agent_prompt", agent_prompt) + pp.add_component("agent_llm", agent_llm) + pp.add_component("router", router) + pp.add_component("concatenator", concatenator) + pp.add_component("retriever", retriever) + pp.add_component("answer_builder", answer_builder) + + pp.connect("agent_prompt.prompt", "joiner.value") + pp.connect("joiner.value", "agent_llm.prompt") + pp.connect("agent_llm.replies", "router.replies") + pp.connect("router.search", "retriever.query") + pp.connect("router.answer", "answer_builder.replies") + pp.connect("retriever.documents", "rag_prompt.documents") + pp.connect("rag_prompt.prompt", "rag_llm.prompt") + pp.connect("rag_llm.replies", "concatenator.rag_answer") + pp.connect("joiner.value", "concatenator.current_prompt") + pp.connect("concatenator.output", "joiner.value") + + query = "Does this run reliably?" + + return ( + pp, + [ + PipelineRunData( + inputs={ + "agent_prompt": {"query": query}, + "rag_prompt": {"query": query}, + "answer_builder": {"query": query}, + }, + expected_outputs={ + "answer_builder": { + "answers": [GeneratedAnswer(data="answer: here is my answer", query=query, documents=[])] + } + }, + expected_run_order=[ + "agent_prompt", + "joiner", + "agent_llm", + "router", + "retriever", + "rag_prompt", + "rag_llm", + "concatenator", + "joiner", + "agent_llm", + "router", + "answer_builder", + ], + ) + ], + ) + + +@given("a pipeline that has a feedback loop", target_fixture="pipeline_data") +def has_feedback_loop(): + @component + class FixedGenerator: + def __init__(self, replies): + self.replies = replies + self.idx = 0 + + @component.output_types(replies=List[str]) + def run(self, prompt: str): + if self.idx < len(self.replies): + replies = [self.replies[self.idx]] + self.idx += 1 + else: + self.idx = 0 + replies = [self.replies[self.idx]] + self.idx += 1 + + return {"replies": replies} + + code_prompt_template = """ +Generate code to solve the task: {{ task }} + +{% if feedback %} +Here is your initial attempt and some feedback: +{{ feedback }} +{% endif %} + """ + + feedback_prompt_template = """ +Check if this code is valid and can run: {{ code[0] }} +Return "PASS" if it passes and "FAIL" if it fails. +Provide additional feedback on why it fails. + """ + + code_llm = FixedGenerator(replies=["invalid code", "valid code"]) + code_prompt = PromptBuilder(template=code_prompt_template) + + feedback_llm = FixedGenerator(replies=["FAIL", "PASS"]) + feedback_prompt = PromptBuilder(template=feedback_prompt_template) + + routes = [ + { + "condition": "{{ 'FAIL' in replies[0] }}", + "output": "{{ replies[0] }}", + "output_name": "fail", + "output_type": str, + }, + { + "condition": "{{ 'PASS' in replies[0] }}", + "output": "{{ code }}", + "output_name": "pass", + "output_type": List[str], + }, + ] + + router = ConditionalRouter(routes=routes) + + concatenator = OutputAdapter(template="{{current_prompt[0] + '\n' + feedback[0]}}", output_type=str) + + answer_builder = AnswerBuilder() + + pp = Pipeline(max_runs_per_component=100) + + pp.add_component("code_llm", code_llm) + pp.add_component("code_prompt", code_prompt) + pp.add_component("feedback_prompt", feedback_prompt) + pp.add_component("feedback_llm", feedback_llm) + pp.add_component("router", router) + pp.add_component("concatenator", concatenator) + pp.add_component("answer_builder", answer_builder) + + pp.connect("code_prompt.prompt", "code_llm.prompt") + pp.connect("code_llm.replies", "feedback_prompt.code") + pp.connect("feedback_llm.replies", "router.replies") + pp.connect("router.fail", "concatenator.feedback") + pp.connect("router.pass", "answer_builder.replies") + pp.connect("code_llm.replies", "router.code") + pp.connect("feedback_prompt.prompt", "feedback_llm.prompt") + pp.connect("code_llm.replies", "concatenator.current_prompt") + pp.connect("concatenator.output", "code_prompt.feedback") + + task = "Generate code to generate christmas ascii-art" + + return ( + pp, + [ + PipelineRunData( + inputs={"code_prompt": {"task": task}, "answer_builder": {"query": task}}, + expected_outputs={ + "answer_builder": {"answers": [GeneratedAnswer(data="valid code", query=task, documents=[])]} + }, + expected_run_order=[ + "code_prompt", + "code_llm", + "feedback_prompt", + "feedback_llm", + "router", + "concatenator", + "code_prompt", + "code_llm", + "feedback_prompt", + "feedback_llm", + "router", + "answer_builder", + ], + ) + ], + ) + + +@given("a pipeline created in a non-standard order that has a loop", target_fixture="pipeline_data") +def has_non_standard_order_loop(): + @component + class FixedGenerator: + def __init__(self, replies): + self.replies = replies + self.idx = 0 + + @component.output_types(replies=List[str]) + def run(self, prompt: str): + if self.idx < len(self.replies): + replies = [self.replies[self.idx]] + self.idx += 1 + else: + self.idx = 0 + replies = [self.replies[self.idx]] + self.idx += 1 + + return {"replies": replies} + + code_prompt_template = """ +Generate code to solve the task: {{ task }} + +{% if feedback %} +Here is your initial attempt and some feedback: +{{ feedback }} +{% endif %} + """ + + feedback_prompt_template = """ +Check if this code is valid and can run: {{ code[0] }} +Return "PASS" if it passes and "FAIL" if it fails. +Provide additional feedback on why it fails. + """ + + code_llm = FixedGenerator(replies=["invalid code", "valid code"]) + code_prompt = PromptBuilder(template=code_prompt_template) + + feedback_llm = FixedGenerator(replies=["FAIL", "PASS"]) + feedback_prompt = PromptBuilder(template=feedback_prompt_template) + + routes = [ + { + "condition": "{{ 'FAIL' in replies[0] }}", + "output": "{{ replies[0] }}", + "output_name": "fail", + "output_type": str, + }, + { + "condition": "{{ 'PASS' in replies[0] }}", + "output": "{{ code }}", + "output_name": "pass", + "output_type": List[str], + }, + ] + + router = ConditionalRouter(routes=routes) + + concatenator = OutputAdapter(template="{{current_prompt[0] + '\n' + feedback[0]}}", output_type=str) + + answer_builder = AnswerBuilder() + + pp = Pipeline(max_runs_per_component=100) + + pp.add_component("concatenator", concatenator) + pp.add_component("code_llm", code_llm) + pp.add_component("code_prompt", code_prompt) + pp.add_component("feedback_prompt", feedback_prompt) + pp.add_component("feedback_llm", feedback_llm) + pp.add_component("router", router) + + pp.add_component("answer_builder", answer_builder) + + pp.connect("concatenator.output", "code_prompt.feedback") + pp.connect("code_prompt.prompt", "code_llm.prompt") + pp.connect("code_llm.replies", "feedback_prompt.code") + pp.connect("feedback_llm.replies", "router.replies") + pp.connect("router.fail", "concatenator.feedback") + pp.connect("feedback_prompt.prompt", "feedback_llm.prompt") + pp.connect("router.pass", "answer_builder.replies") + pp.connect("code_llm.replies", "router.code") + pp.connect("code_llm.replies", "concatenator.current_prompt") + + task = "Generate code to generate christmas ascii-art" + + return ( + pp, + [ + PipelineRunData( + inputs={"code_prompt": {"task": task}, "answer_builder": {"query": task}}, + expected_outputs={ + "answer_builder": {"answers": [GeneratedAnswer(data="valid code", query=task, documents=[])]} + }, + expected_run_order=[ + "code_prompt", + "code_llm", + "feedback_prompt", + "feedback_llm", + "router", + "concatenator", + "code_prompt", + "code_llm", + "feedback_prompt", + "feedback_llm", + "router", + "answer_builder", + ], + ) + ], + ) + + +@given("a pipeline that has an agent with a feedback cycle", target_fixture="pipeline_data") +def agent_with_feedback_cycle(): + @component + class FixedGenerator: + def __init__(self, replies): + self.replies = replies + self.idx = 0 + + @component.output_types(replies=List[str]) + def run(self, prompt: str): + if self.idx < len(self.replies): + replies = [self.replies[self.idx]] + self.idx += 1 + else: + self.idx = 0 + replies = [self.replies[self.idx]] + self.idx += 1 + + return {"replies": replies} + + @component + class FakeFileEditor: + @component.output_types(files=str) + def run(self, replies: List[str]): + return {"files": "This is the edited file content."} + + code_prompt_template = """ +Generate code to solve the task: {{ task }} + +You can edit files by returning: +Edit: file_name + +Once you solved the task, respond with: +Task finished! + +{% if feedback %} +Here is your initial attempt and some feedback: +{{ feedback }} +{% endif %} + """ + + feedback_prompt_template = """ +{% if task_finished %} +Check if this code is valid and can run: {{ code }} +Return "PASS" if it passes and "FAIL" if it fails. +Provide additional feedback on why it fails. +{% endif %} + """ + + code_llm = FixedGenerator(replies=["Edit: file_1.py", "Edit: file_2.py", "Edit: file_3.py", "Task finished!"]) + code_prompt = PromptBuilder(template=code_prompt_template) + file_editor = FakeFileEditor() + + feedback_llm = FixedGenerator(replies=["FAIL", "PASS"]) + feedback_prompt = PromptBuilder(template=feedback_prompt_template, required_variables=["task_finished"]) + + routes = [ + { + "condition": "{{ 'FAIL' in replies[0] }}", + "output": "{{ current_prompt + '\n' + replies[0] }}", + "output_name": "fail", + "output_type": str, + }, + { + "condition": "{{ 'PASS' in replies[0] }}", + "output": "{{ replies }}", + "output_name": "pass", + "output_type": List[str], + }, + ] + feedback_router = ConditionalRouter(routes=routes) + + tool_use_routes = [ + { + "condition": "{{ 'Edit:' in replies[0] }}", + "output": "{{ replies }}", + "output_name": "edit", + "output_type": List[str], + }, + { + "condition": "{{ 'Task finished!' in replies[0] }}", + "output": "{{ replies }}", + "output_name": "done", + "output_type": List[str], + }, + ] + tool_use_router = ConditionalRouter(routes=tool_use_routes) + + joiner = BranchJoiner(type_=str) + agent_concatenator = OutputAdapter(template="{{current_prompt + '\n' + files}}", output_type=str) + + pp = Pipeline(max_runs_per_component=100) + + pp.add_component("code_prompt", code_prompt) + pp.add_component("joiner", joiner) + pp.add_component("code_llm", code_llm) + pp.add_component("tool_use_router", tool_use_router) + pp.add_component("file_editor", file_editor) + pp.add_component("agent_concatenator", agent_concatenator) + pp.add_component("feedback_prompt", feedback_prompt) + pp.add_component("feedback_llm", feedback_llm) + pp.add_component("feedback_router", feedback_router) + + # Main Agent + pp.connect("code_prompt.prompt", "joiner.value") + pp.connect("joiner.value", "code_llm.prompt") + pp.connect("code_llm.replies", "tool_use_router.replies") + pp.connect("tool_use_router.edit", "file_editor.replies") + pp.connect("file_editor.files", "agent_concatenator.files") + pp.connect("joiner.value", "agent_concatenator.current_prompt") + pp.connect("agent_concatenator.output", "joiner.value") + + # Feedback Cycle + pp.connect("tool_use_router.done", "feedback_prompt.task_finished") + pp.connect("agent_concatenator.output", "feedback_prompt.code") + pp.connect("feedback_prompt.prompt", "feedback_llm.prompt") + pp.connect("feedback_llm.replies", "feedback_router.replies") + pp.connect("agent_concatenator.output", "feedback_router.current_prompt") + pp.connect("feedback_router.fail", "joiner.value") + + task = "Generate code to generate christmas ascii-art" + + return ( + pp, + [ + PipelineRunData( + inputs={"code_prompt": {"task": task}}, + expected_outputs={"feedback_router": {"pass": ["PASS"]}}, + expected_run_order=[ + "code_prompt", + "joiner", + "code_llm", + "tool_use_router", + "file_editor", + "agent_concatenator", + "joiner", + "code_llm", + "tool_use_router", + "file_editor", + "agent_concatenator", + "joiner", + "code_llm", + "tool_use_router", + "file_editor", + "agent_concatenator", + "joiner", + "code_llm", + "tool_use_router", + "feedback_prompt", + "feedback_llm", + "feedback_router", + "joiner", + "code_llm", + "tool_use_router", + "file_editor", + "agent_concatenator", + "joiner", + "code_llm", + "tool_use_router", + "file_editor", + "agent_concatenator", + "joiner", + "code_llm", + "tool_use_router", + "file_editor", + "agent_concatenator", + "joiner", + "code_llm", + "tool_use_router", + "feedback_prompt", + "feedback_llm", + "feedback_router", + ], + ) + ], + ) + + +@given("a pipeline that is a file conversion pipeline with two joiners", target_fixture="pipeline_data") +def pipeline_that_converts_files(): + csv_data = """ +some,header,row +0,1,0 + """ + + txt_data = "Text file content for testing this." + + json_data = '{"content": "Some test content"}' + + sources = [ + ByteStream.from_string(text=csv_data, mime_type="text/csv", meta={"file_type": "csv"}), + ByteStream.from_string(text=txt_data, mime_type="text/plain", meta={"file_type": "txt"}), + ByteStream.from_string(text=json_data, mime_type="application/json", meta={"file_type": "json"}), + ] + + router = FileTypeRouter(mime_types=["text/csv", "text/plain", "application/json"]) + splitter = DocumentSplitter(split_by="word", split_length=3, split_overlap=0) + txt_converter = TextFileToDocument() + csv_converter = CSVToDocument() + json_converter = JSONConverter(content_key="content") + + b_joiner = DocumentJoiner() + a_joiner = DocumentJoiner() + + pp = Pipeline(max_runs_per_component=1) + + pp.add_component("router", router) + pp.add_component("splitter", splitter) + pp.add_component("txt_converter", txt_converter) + pp.add_component("csv_converter", csv_converter) + pp.add_component("json_converter", json_converter) + pp.add_component("b_joiner", b_joiner) + pp.add_component("a_joiner", a_joiner) + + pp.connect("router.text/plain", "txt_converter.sources") + pp.connect("router.application/json", "json_converter.sources") + pp.connect("router.text/csv", "csv_converter.sources") + pp.connect("txt_converter.documents", "b_joiner.documents") + pp.connect("json_converter.documents", "b_joiner.documents") + pp.connect("csv_converter.documents", "a_joiner.documents") + pp.connect("b_joiner.documents", "splitter.documents") + pp.connect("splitter.documents", "a_joiner.documents") + + return ( + pp, + [ + PipelineRunData( + inputs={"router": {"sources": sources}}, + expected_outputs={ + "a_joiner": { + "documents": [ + Document(content=csv_data, meta={"file_type": "csv"}), + Document( + id="191726aa5b9aac93f376cc9b364cd07ee5c2a957383ed6d9b901cfc61d0a2b0f", + content="Text file content ", + meta={ + "file_type": "txt", + "source_id": "41cb91740f6e64ab542122936ea746c238ae0a92fd29b698efabbe23d0ba4c42", + "page_number": 1, + "split_id": 0, + "split_idx_start": 0, + }, + ), + Document( + id="424e39c85775b9dc32d61406cbe9b19ba234a4a2f26f59a1285120f10bb38ca1", + content="for testing this.", + meta={ + "file_type": "txt", + "source_id": "41cb91740f6e64ab542122936ea746c238ae0a92fd29b698efabbe23d0ba4c42", + "page_number": 1, + "split_id": 1, + "split_idx_start": 18, + }, + ), + Document( + id="d6145ad0bf5454b0e0c56f14b1da11c95076fb7961a2df1631d191ba8e174cf6", + content="Some test content", + meta={ + "file_type": "json", + "source_id": "0c6c5951d18da2935c7af3e24d417a9f94ca85403866dcfee1de93922504e1e5", + "page_number": 1, + "split_id": 0, + "split_idx_start": 0, + }, + ), + ] + } + }, + expected_run_order=[], + ) + ], + )