Skip to content
24 changes: 20 additions & 4 deletions lib/crewai/src/crewai/crew.py
Original file line number Diff line number Diff line change
Expand Up @@ -1017,10 +1017,26 @@ def _handle_crew_planning(self) -> None:
tasks=self.tasks, planning_agent_llm=self.planning_llm
)._handle_crew_planning()

for task, step_plan in zip(
self.tasks, result.list_of_plans_per_task, strict=False
):
task.description += step_plan.plan
plan_map: dict[int, str] = {}
for step_plan in result.list_of_plans_per_task:
if step_plan.task_number in plan_map:
self._logger.log(
"warning",
f"Duplicate plan for Task Number {step_plan.task_number}, "
"using the first plan",
)
else:
plan_map[step_plan.task_number] = step_plan.plan

for idx, task in enumerate(self.tasks):
task_number = idx + 1
if task_number in plan_map:
task.description += plan_map[task_number]
else:
self._logger.log(
"warning",
f"No plan found for Task Number {task_number}",
)

def _store_execution_log(
self,
Expand Down
7 changes: 5 additions & 2 deletions lib/crewai/src/crewai/utilities/planning_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@
class PlanPerTask(BaseModel):
"""Represents a plan for a specific task."""

task: str = Field(..., description="The task for which the plan is created")
task_number: int = Field(
description="The 1-indexed task number this plan corresponds to",
ge=1,
)
task: str = Field(description="The task for which the plan is created")
plan: str = Field(
...,
description="The step by step plan on how the agents can execute their tasks using the available tools with mastery",
)

Expand Down
206 changes: 195 additions & 11 deletions lib/crewai/tests/utilities/test_planning_handler.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
from unittest.mock import patch
"""Tests for the planning handler module."""

from unittest.mock import MagicMock, patch

import pytest

from crewai.agent import Agent
from crewai.crew import Crew
from crewai.knowledge.source.string_knowledge_source import StringKnowledgeSource
from crewai.task import Task
from crewai.tasks.task_output import TaskOutput
Expand All @@ -13,7 +17,7 @@
)


class InternalCrewPlanner:
class TestInternalCrewPlanner:
@pytest.fixture
def crew_planner(self):
tasks = [
Expand Down Expand Up @@ -49,9 +53,9 @@ def crew_planner_different_llm(self):

def test_handle_crew_planning(self, crew_planner):
list_of_plans_per_task = [
PlanPerTask(task="Task1", plan="Plan 1"),
PlanPerTask(task="Task2", plan="Plan 2"),
PlanPerTask(task="Task3", plan="Plan 3"),
PlanPerTask(task_number=1, task="Task1", plan="Plan 1"),
PlanPerTask(task_number=2, task="Task2", plan="Plan 2"),
PlanPerTask(task_number=3, task="Task3", plan="Plan 3"),
]
with patch.object(Task, "execute_sync") as execute:
execute.return_value = TaskOutput(
Expand Down Expand Up @@ -97,12 +101,12 @@ def test_create_tasks_summary(self, crew_planner):
# Knowledge field should not be present when empty
assert '"agent_knowledge"' not in tasks_summary

@patch("crewai.knowledge.storage.knowledge_storage.chromadb")
def test_create_tasks_summary_with_knowledge_and_tools(self, mock_chroma):
@patch("crewai.knowledge.knowledge.Knowledge.add_sources")
@patch("crewai.knowledge.storage.knowledge_storage.KnowledgeStorage")
def test_create_tasks_summary_with_knowledge_and_tools(
self, mock_storage, mock_add_sources
):
"""Test task summary generation with both knowledge and tools present."""
# Mock ChromaDB collection
mock_collection = mock_chroma.return_value.get_or_create_collection.return_value
mock_collection.add.return_value = None

# Create mock tools with proper string descriptions and structured tool support
class MockTool(BaseTool):
Expand Down Expand Up @@ -166,7 +170,9 @@ def test_handle_crew_planning_different_llm(self, crew_planner_different_llm):
description="Description",
agent="agent",
pydantic=PlannerTaskPydanticOutput(
list_of_plans_per_task=[PlanPerTask(task="Task1", plan="Plan 1")]
list_of_plans_per_task=[
PlanPerTask(task_number=1, task="Task1", plan="Plan 1")
]
),
)
result = crew_planner_different_llm._handle_crew_planning()
Expand All @@ -177,3 +183,181 @@ def test_handle_crew_planning_different_llm(self, crew_planner_different_llm):
crew_planner_different_llm.tasks
)
execute.assert_called_once()

def test_plan_per_task_requires_task_number(self):
"""Test that PlanPerTask model requires task_number field."""
with pytest.raises(ValueError):
PlanPerTask(task="Task1", plan="Plan 1")

def test_plan_per_task_with_task_number(self):
"""Test PlanPerTask model with task_number field."""
plan = PlanPerTask(task_number=5, task="Task5", plan="Plan for task 5")
assert plan.task_number == 5
assert plan.task == "Task5"
assert plan.plan == "Plan for task 5"


class TestCrewPlanningIntegration:
"""Tests for Crew._handle_crew_planning integration with task_number matching."""

def test_crew_planning_with_out_of_order_plans(self):
"""Test that plans are correctly matched to tasks even when returned out of order.

This test verifies the fix for issue #3953 where plans returned by the LLM
in a different order than the tasks would be incorrectly assigned.
"""
agent1 = Agent(role="Agent 1", goal="Goal 1", backstory="Backstory 1")
agent2 = Agent(role="Agent 2", goal="Goal 2", backstory="Backstory 2")
agent3 = Agent(role="Agent 3", goal="Goal 3", backstory="Backstory 3")

task1 = Task(
description="First task description",
expected_output="Output 1",
agent=agent1,
)
task2 = Task(
description="Second task description",
expected_output="Output 2",
agent=agent2,
)
task3 = Task(
description="Third task description",
expected_output="Output 3",
agent=agent3,
)

crew = Crew(
agents=[agent1, agent2, agent3],
tasks=[task1, task2, task3],
planning=True,
)

out_of_order_plans = [
PlanPerTask(task_number=3, task="Task 3", plan=" [PLAN FOR TASK 3]"),
PlanPerTask(task_number=1, task="Task 1", plan=" [PLAN FOR TASK 1]"),
PlanPerTask(task_number=2, task="Task 2", plan=" [PLAN FOR TASK 2]"),
]

mock_planner_result = PlannerTaskPydanticOutput(
list_of_plans_per_task=out_of_order_plans
)

with patch.object(
CrewPlanner, "_handle_crew_planning", return_value=mock_planner_result
):
crew._handle_crew_planning()

assert "[PLAN FOR TASK 1]" in task1.description
assert "[PLAN FOR TASK 2]" in task2.description
assert "[PLAN FOR TASK 3]" in task3.description

assert "[PLAN FOR TASK 3]" not in task1.description
assert "[PLAN FOR TASK 1]" not in task2.description
assert "[PLAN FOR TASK 2]" not in task3.description

def test_crew_planning_with_missing_plan(self):
"""Test that missing plans are handled gracefully with a warning."""
agent1 = Agent(role="Agent 1", goal="Goal 1", backstory="Backstory 1")
agent2 = Agent(role="Agent 2", goal="Goal 2", backstory="Backstory 2")

task1 = Task(
description="First task description",
expected_output="Output 1",
agent=agent1,
)
task2 = Task(
description="Second task description",
expected_output="Output 2",
agent=agent2,
)

crew = Crew(
agents=[agent1, agent2],
tasks=[task1, task2],
planning=True,
)

original_task1_desc = task1.description
original_task2_desc = task2.description

incomplete_plans = [
PlanPerTask(task_number=1, task="Task 1", plan=" [PLAN FOR TASK 1]"),
]

mock_planner_result = PlannerTaskPydanticOutput(
list_of_plans_per_task=incomplete_plans
)

with patch.object(
CrewPlanner, "_handle_crew_planning", return_value=mock_planner_result
):
crew._handle_crew_planning()

assert "[PLAN FOR TASK 1]" in task1.description

assert task2.description == original_task2_desc

def test_crew_planning_preserves_original_description(self):
"""Test that planning appends to the original task description."""
agent = Agent(role="Agent 1", goal="Goal 1", backstory="Backstory 1")

task = Task(
description="Original task description",
expected_output="Output 1",
agent=agent,
)

crew = Crew(
agents=[agent],
tasks=[task],
planning=True,
)

plans = [
PlanPerTask(task_number=1, task="Task 1", plan=" - Additional plan steps"),
]

mock_planner_result = PlannerTaskPydanticOutput(list_of_plans_per_task=plans)

with patch.object(
CrewPlanner, "_handle_crew_planning", return_value=mock_planner_result
):
crew._handle_crew_planning()

assert "Original task description" in task.description
assert "Additional plan steps" in task.description

def test_crew_planning_with_duplicate_task_numbers(self):
"""Test that duplicate task numbers use the first plan and log a warning."""
agent = Agent(role="Agent 1", goal="Goal 1", backstory="Backstory 1")

task = Task(
description="Task description",
expected_output="Output 1",
agent=agent,
)

crew = Crew(
agents=[agent],
tasks=[task],
planning=True,
)

# Two plans with the same task_number - should use the first one
duplicate_plans = [
PlanPerTask(task_number=1, task="Task 1", plan=" [FIRST PLAN]"),
PlanPerTask(task_number=1, task="Task 1", plan=" [SECOND PLAN]"),
]

mock_planner_result = PlannerTaskPydanticOutput(
list_of_plans_per_task=duplicate_plans
)

with patch.object(
CrewPlanner, "_handle_crew_planning", return_value=mock_planner_result
):
crew._handle_crew_planning()

# Should use the first plan, not the second
assert "[FIRST PLAN]" in task.description
assert "[SECOND PLAN]" not in task.description
Loading