Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python variable reloading #1637

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
60 changes: 60 additions & 0 deletions metagpt/actions/di/execute_py_code.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# -*- encoding: utf-8 -*-
"""
@Date : 2024/11/14 22:48:29
@Author : orange-crow
@File : execute_py_code.py
"""

import re
import textwrap
from collections import OrderedDict
from typing import Literal

from metagpt.actions import Action
from metagpt.tools.code_executor.display import print_text_live
from metagpt.tools.code_executor.pyexe import AsyncPyExecutor


class ExecutePyCode(Action):
"""execute python code, return result to llm, and display it."""

is_save_obj: bool
work_dir: str
executor: AsyncPyExecutor

def __init__(self, is_save_obj: bool = False, work_dir: str = ""):
super().__init__(
is_save_obj=is_save_obj, # to save python object in code.
work_dir=work_dir, # python object saved dir.
executor=AsyncPyExecutor(work_dir, is_save_obj),
)

async def build(self):
self.code_gen = self.executor.run()
await self.code_gen.asend(None)

async def terminate(self):
await self.executor.terminate()

async def reset(self):
await self.terminate()
await self.build()
self.executor._cmd_space = OrderedDict()

async def run(self, code: str, language: Literal["python", "markdown"] = "python"):
if language == "python":
await self.build()
# 定义正则表达式模式,匹配以 '\n ' 开头的行
pattern = r"^\n "
# 使用 re.search 检测字符串中是否包含匹配的模式
match = re.search(pattern, code, re.MULTILINE)
if match:
code = textwrap.dedent(code)
await self.code_gen.asend(code)
res = self.executor._cmd_space[str(len(self.executor._cmd_space) - 1)]
if res["stderr"]:
return "\n".join(res["stdout"]) + "\n".join(res["stderr"]), False
return "\n".join(res["stdout"]) + "\n".join(res["stderr"]), True
else:
print_text_live(code)
return code, True
225 changes: 225 additions & 0 deletions metagpt/actions/di/use_experience.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
import json

import chromadb
from pydantic import BaseModel

from metagpt.actions import Action
from metagpt.const import SERDESER_PATH
from metagpt.logs import logger
from metagpt.prompts.di.get_task_summary import TASK_CODE_DESCRIPTION_PROMPT
from metagpt.rag.engines import SimpleEngine
from metagpt.rag.schema import ChromaRetrieverConfig
from metagpt.schema import Task
from metagpt.strategy.planner import Planner


class Trajectory(BaseModel):
user_requirement: str = ""
task_map: dict[str, Task] = {}
task: Task = None
is_used: bool = False

def rag_key(self) -> str:
"""For search"""
return self.task.instruction


class Experience(BaseModel):
code_summary: str = ""
trajectory: Trajectory = None

def rag_key(self) -> str:
"""For search"""
return self.code_summary


EXPERIENCE_COLLECTION_NAME = "di_experience_0"
TRAJECTORY_COLLECTION_NAME = "di_trajectory_0"
PERSIST_PATH = SERDESER_PATH / "data_interpreter/chroma"


class AddNewTrajectories(Action):
"""Record the execution status of each task as a trajectory and store it."""

name: str = "AddNewTrajectories"

def _init_engine(self, collection_name: str):
"""Initialize a collection for storing code experiences."""

engine = SimpleEngine.from_objs(
retriever_configs=[ChromaRetrieverConfig(persist_path=PERSIST_PATH, collection_name=collection_name)],
)
return engine

async def run(self, planner: Planner, trajectory_collection_name: str = TRAJECTORY_COLLECTION_NAME):
"""Initiate a collection and add new trajectories to the collection."""

engine = self._init_engine(trajectory_collection_name)

if not planner.plan.tasks:
return

user_requirement = planner.plan.goal
task_map = planner.plan.task_map
trajectories = [
Trajectory(user_requirement=user_requirement, task_map=task_map, task=task, is_used=False)
for task in planner.plan.tasks
]

engine.add_objs(trajectories)


class AddNewExperiences(Action):
"""Retrieve the trajectories from the vector database where trajectories are stored,
compare and summarize them to form experiences, and then store these experiences in the vector database.
"""

name: str = "AddNewTaskExperiences"

def _init_engine(self, collection_name: str):
"""Initialize a collection for storing code experiences."""

engine = SimpleEngine.from_objs(
retriever_configs=[ChromaRetrieverConfig(persist_path=PERSIST_PATH, collection_name=collection_name)],
)
return engine

async def _single_task_summary(self, trajectory_collection_name: str, experience_collection_name: str):
trajectory_engine = self._init_engine(collection_name=trajectory_collection_name)
experience_engine = self._init_engine(collection_name=experience_collection_name)

db = chromadb.PersistentClient(path=str(PERSIST_PATH))
collection = db.get_or_create_collection(trajectory_collection_name)

# get the ids of all trajectories where the is_used attribute is false.
unused_ids = [
id
for id in collection.get()["ids"] # collection.get()["ids"] will get all the ids in the collection
if json.loads(collection.get([id])["metadatas"][0]["obj_json"])["is_used"]
== False # Check if the is_used attribute of the trajectory corresponding to the given id is false.
]

trajectory_dicts = [
json.loads(metadata["obj_json"]) for metadata in collection.get(unused_ids)["metadatas"]
] # get the trajectory in dictionary format
trajectories = []
experiences = []

for trajectory_dict in trajectory_dicts:
# set the is_used attribute of the trajectory to true and create a new trajectory (the old trajectory will be deleted below).
trajectory_dict["is_used"] = True
trajectory = Trajectory(**trajectory_dict)
trajectories.append(trajectory)

# summarize the trajectory using LLM and assemble it into a single experience
code_summary = await self.task_code_sumarization(trajectory)
experience = Experience(code_summary=code_summary, trajectory=trajectory)
experiences.append(experience)

collection.delete(unused_ids) # delete the old trajectories
trajectory_engine.add_objs(trajectories)
experience_engine.add_objs(experiences)

async def task_code_sumarization(self, trajectory: Trajectory):
"""use LLM to summarize the task code.
Args:
trajectory: The trajectory to be summarized.
Returns:
A summary of the trajectory's code.
"""
task = trajectory.task
prompt = TASK_CODE_DESCRIPTION_PROMPT.format(
code_snippet=task.code, code_result=task.result, code_success="Success" if task.is_success else "Failure"
)
resp = await self._aask(prompt=prompt)
return resp

async def run(
self,
trajectory_collection_name: str = TRAJECTORY_COLLECTION_NAME,
experience_collection_name: str = EXPERIENCE_COLLECTION_NAME,
mode: str = "single_task_summary",
):
"""Initiate a collection and Add a new task experience to the collection.

Args:
trajectory_collection_name(str): the trajectory collection_name to be used for geting experiences.
experience_collection_name(str): the experience collection_name to be used for saving experiences.
mode(str): how to generate experiences.

"""
if mode == "single_task_summary":
await self._single_task_summary(
trajectory_collection_name=trajectory_collection_name,
experience_collection_name=experience_collection_name,
)
else:
pass # TODO:add other methods to generate experiences from trajectories.


class RetrieveExperiences(Action):
"""Retrieve the most relevant experience from the vector database based on the input task."""

name: str = "RetrieveExperiences"

def _init_engine(self, collection_name: str, top_k: int):
"""Initialize a SimpleEngine for retrieving experiences.

Args:
query (str): The chromadb collectin_name.
top_k (int): The number of eperiences to be retrieved.
"""

engine = SimpleEngine.from_objs(
retriever_configs=[
ChromaRetrieverConfig(
persist_path=PERSIST_PATH, collection_name=collection_name, similarity_top_k=top_k
)
],
)
return engine

async def run(
self, query: str, experience_collection_name: str = EXPERIENCE_COLLECTION_NAME, top_k: int = 5
) -> str:
"""Retrieve past attempted tasks

Args:
query (str): The task instruction to be used for retrieval.
experience_collection_name(str): the collextion_name for retrieving experiences.
top_k (int, optional): The number of experiences to be retrieved. Defaults to 5.

Returns:
_type_: _description_
"""
engine = self._init_engine(collection_name=experience_collection_name, top_k=top_k)

if len(query) <= 2: # not "" or not '""'
return ""

nodes = await engine.aretrieve(query)
new_experiences = []
for i, node in enumerate(nodes):
try:
code_summary = node.node.metadata["obj"].code_summary
trajectory = node.node.metadata["obj"].trajectory

except:
continue

# Create the experience dictionary with placeholder keys
experience = {
"Reference __i__": trajectory.task.instruction,
"Task code": trajectory.task.code,
"Code summary": code_summary,
"Task result": trajectory.task.result,
"Task outcome": "Success" if trajectory.task.is_success else "Failure",
"Task ownership's requirement": "This task is part of " + trajectory.user_requirement,
}

# Replace the placeholder in the keys
experience = {k.replace("__i__", str(i)): v for k, v in experience.items()}
new_experiences.append(experience)

logger.info("retrieval done")
return json.dumps(new_experiences, indent=4)
2 changes: 2 additions & 0 deletions metagpt/actions/di/write_analysis_code.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,13 @@ async def run(
tool_info: str = "",
working_memory: list[Message] = None,
use_reflection: bool = False,
experiences: str = "",
**kwargs,
) -> str:
structual_prompt = STRUCTUAL_PROMPT.format(
user_requirement=user_requirement,
plan_status=plan_status,
experiences=experiences,
tool_info=tool_info,
)

Expand Down
10 changes: 10 additions & 0 deletions metagpt/prompts/di/get_task_summary.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
TASK_CODE_DESCRIPTION_PROMPT = """
Please explain in a paragraph what the following code snippet does. Only the function of the code snippet needs to be explained, no variable names need to be explained.

Code snippet:
{code_snippet}
Code Execution Result:
{code_result}
Code Success or Failure:
{code_success}
"""
4 changes: 4 additions & 0 deletions metagpt/prompts/di/write_analysis_code.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
# Plan Status
{plan_status}

# Reference experience (can be empty):
This is some previous coding experience that is similar to the current task. You can learn from the successful code and avoid the mistakes from the failed code. If there are other codes you don't know about in the experience, please don't refer to it.
{experiences}

# Tool Info
{tool_info}

Expand Down
23 changes: 14 additions & 9 deletions metagpt/roles/di/data_interpreter.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from metagpt.actions.di.ask_review import ReviewConst
from metagpt.actions.di.execute_nb_code import ExecuteNbCode
from metagpt.actions.di.use_experience import AddNewTrajectories, RetrieveExperiences
from metagpt.actions.di.write_analysis_code import CheckData, WriteAnalysisCode
from metagpt.logs import logger
from metagpt.prompts.di.write_analysis_code import DATA_INFO
Expand Down Expand Up @@ -38,6 +39,7 @@ class DataInterpreter(Role):
auto_run: bool = True
use_plan: bool = True
use_reflection: bool = False
use_experience: bool = False
execute_code: ExecuteNbCode = Field(default_factory=ExecuteNbCode, exclude=True)
tools: list[str] = [] # Use special symbol ["<all>"] to indicate use of all registered tools
tool_recommender: ToolRecommender = None
Expand Down Expand Up @@ -88,6 +90,9 @@ async def _act(self) -> Message:
async def _plan_and_act(self) -> Message:
try:
rsp = await super()._plan_and_act()
await AddNewTrajectories().run(
self.planner
) # extract trajectories based on the execution status of each task in the planner
await self.execute_code.terminate()
return rsp
except Exception as e:
Expand All @@ -96,11 +101,13 @@ async def _plan_and_act(self) -> Message:

async def _act_on_task(self, current_task: Task) -> TaskResult:
"""Useful in 'plan_and_act' mode. Wrap the output in a TaskResult for review and confirmation."""
code, result, is_success = await self._write_and_exec_code()
# retrieve past tasks for this task
experiences = await RetrieveExperiences().run(query=current_task.instruction) if self.use_experience else ""
code, result, is_success = await self._write_and_exec_code(experiences=experiences)
task_result = TaskResult(code=code, result=result, is_success=is_success)
return task_result

async def _write_and_exec_code(self, max_retry: int = 3):
async def _write_and_exec_code(self, max_retry: int = 3, experiences: str = ""):
counter = 0
success = False

Expand All @@ -122,7 +129,9 @@ async def _write_and_exec_code(self, max_retry: int = 3):

while not success and counter < max_retry:
### write code ###
code, cause_by = await self._write_code(counter, plan_status, tool_info)
code, cause_by = await self._write_code(
counter, plan_status, tool_info, experiences=experiences if counter == 0 else ""
)

self.working_memory.add(Message(content=code, role="assistant", cause_by=cause_by))

Expand All @@ -143,12 +152,7 @@ async def _write_and_exec_code(self, max_retry: int = 3):

return code, result, success

async def _write_code(
self,
counter: int,
plan_status: str = "",
tool_info: str = "",
):
async def _write_code(self, counter: int, plan_status: str = "", tool_info: str = "", experiences: str = ""):
todo = self.rc.todo # todo is WriteAnalysisCode
logger.info(f"ready to {todo.name}")
use_reflection = counter > 0 and self.use_reflection # only use reflection after the first trial
Expand All @@ -161,6 +165,7 @@ async def _write_code(
tool_info=tool_info,
working_memory=self.working_memory.get(),
use_reflection=use_reflection,
experiences=experiences,
)

return code, todo
Expand Down
Loading
Loading