diff --git a/docs/resources/spo/SPO-closed_task_figure.png b/docs/resources/spo/SPO-closed_task_figure.png new file mode 100644 index 0000000000..329ff5461e Binary files /dev/null and b/docs/resources/spo/SPO-closed_task_figure.png differ diff --git a/docs/resources/spo/SPO-closed_task_table.png b/docs/resources/spo/SPO-closed_task_table.png new file mode 100644 index 0000000000..f5bc16523d Binary files /dev/null and b/docs/resources/spo/SPO-closed_task_table.png differ diff --git a/docs/resources/spo/SPO-logo.png b/docs/resources/spo/SPO-logo.png new file mode 100644 index 0000000000..717bdcb1b7 Binary files /dev/null and b/docs/resources/spo/SPO-logo.png differ diff --git a/docs/resources/spo/SPO-method.png b/docs/resources/spo/SPO-method.png new file mode 100644 index 0000000000..7d73a63c9f Binary files /dev/null and b/docs/resources/spo/SPO-method.png differ diff --git a/docs/resources/spo/SPO-open_ended_task_figure.png b/docs/resources/spo/SPO-open_ended_task_figure.png new file mode 100644 index 0000000000..99d2b733dd Binary files /dev/null and b/docs/resources/spo/SPO-open_ended_task_figure.png differ diff --git a/examples/spo/README.md b/examples/spo/README.md new file mode 100644 index 0000000000..08ed6f804d --- /dev/null +++ b/examples/spo/README.md @@ -0,0 +1,184 @@ +# SPO | Self-Supervised Prompt Optimization + + +An automated prompt engineering tool for Large Language Models (LLMs), designed for universal domain adaptation. + +A next-generation prompt engineering system implementing **Self-Supervised Prompt Optimization (SPO)**. Achieves state-of-the-art performance with 17.8-90.9Γ— higher cost efficiency than conventional methods. πŸš€ + +

+Framework of SPO +

+ +## ✨ Core Advantages + +- πŸ’Έ **Ultra-Low Cost** - _$0.15 per task optimization_ +- 🏷️ **Zero Supervision** - _No ground truth/human feedback required_ +- ⚑ **Universal Adaptation** - _Closed & open-ended tasks supported_ +- πŸ”„ **Self-Evolving** - _Auto-optimization via LLM-as-judge mechanism_ + +[Read our paper on arXiv](https://arxiv.org/pdf/2502.06855) + +## πŸ“Š Experiment + +### Closed Tasks +

+SPO closed task table +SPO closed task figure +

+ +*SPO demonstrates superior cost efficiency, requiring only 1.1% to 5.6% of the cost of state-of-the-art methods while maintaining competitive performance.* + +### Open-ended Tasks +

+Open-ended task figure +

+ +*SPO significantly improves model performance across all model configurations in open-ended tasks.* + +## πŸš€ Quick Start + +### 1. Configure Your API Key βš™οΈ + +Configure LLM parameters in `config/config2.yaml` (see `examples/spo/config2.example.yaml` for reference) +### 2. Define Your Iteration template πŸ“ + +Create a Iteration template file `metagpt/ext/spo/settings/task_name.yaml`: +```yaml +prompt: | + Please solve the following problem. + +requirements: | + ... + +count: None + +faq: + - question: | + ... + answer: | + ... + + - question: | + ... + answer: | + ... +``` + +Notes: +- `prompt`: Initial prompt for iteration +- `requirements`: Desired effects/outcomes (e.g., generate more thinking, use more humorous language) +- `count`: Target word count for the generated prompt (e.g., 50). Set to None for no limit +- `faq`: QA pairs used for iteration, can include appropriate number of pairs (typically 3) + - `question`: Questions from the dataset used for iteration + - `answer`: Corresponding answers. Can contain desired thinking patterns or responses instead of actual answers, or can be left empty. See `metagpt/ext/spo/settings/Navigate.yaml` for reference + +### 3. Implement the PromptOptimizer πŸ”§ + +You have three ways to run the PromptOptimizer: + +#### Option 1: Python Script + +```python +from metagpt.ext.spo.components.optimizer import PromptOptimizer +from metagpt.ext.spo.utils.llm_client import SPO_LLM + +if __name__ == "__main__": + # Initialize LLM settings + SPO_LLM.initialize( + optimize_kwargs={"model": "claude-3-5-sonnet-20240620", "temperature": 0.7}, + evaluate_kwargs={"model": "gpt-4o-mini", "temperature": 0.3}, + execute_kwargs={"model": "gpt-4o-mini", "temperature": 0} + ) + + # Create and run optimizer + optimizer = PromptOptimizer( + optimized_path="workspace", # Output directory + initial_round=1, # Starting round + max_rounds=10, # Maximum optimization rounds + template="Poem.yaml", # Template file + name="Poem", # Project name + ) + + optimizer.optimize() +``` + +#### Option 2: Command Line Interface + +```bash +python -m examples.spo.optimize +``` + +Available command line options: +``` +--opt-model Model for optimization (default: claude-3-5-sonnet-20240620) +--opt-temp Temperature for optimization (default: 0.7) +--eval-model Model for evaluation (default: gpt-4o-mini) +--eval-temp Temperature for evaluation (default: 0.3) +--exec-model Model for execution (default: gpt-4o-mini) +--exec-temp Temperature for execution (default: 0) +--workspace Output directory path (default: workspace) +--initial-round Initial round number (default: 1) +--max-rounds Maximum number of rounds (default: 10) +--template Template file name (default: Poem.yaml) +--name Project name (default: Poem) +``` + +For help: +```bash +python -m examples.spo.optimize --help +``` + +#### Option 3: Streamlit Web Interface + +For a more user-friendly experience, you can use the Streamlit web interface to configure and run the optimizer. + +First, install Streamlit: +```bash +pip install "streamlit~=1.42.0" +``` + +Then run the web interface: +```bash +python -m streamlit run metagpt/ext/spo/app.py +``` + +### 4. View Results +``` +workspace + └── Project_name + └── prompts + β”œβ”€β”€ results.json + β”œβ”€β”€ round_1 + β”‚ β”œβ”€β”€ answers.txt + β”‚ └── prompt.txt + β”œβ”€β”€ round_2 + β”‚ β”œβ”€β”€ answers.txt + β”‚ └── prompt.txt + β”œβ”€β”€ round_3 + β”‚ β”œβ”€β”€ answers.txt + β”‚ └── prompt.txt + β”œβ”€β”€ ... + └── round_n + β”œβ”€β”€ answers.txt + └── prompt.txt +``` + +- `results.json`: Stores whether each iteration round was judged successful and other related information +- `prompt.txt`: The optimized prompt for the corresponding round +- `answers.txt`: The output results generated using the prompt for the corresponding round + +## Citation + +If you use SPO in your research, please cite our paper: + +``` +@misc{xiang2025spo, + title={Self-Supervised Prompt Optimization}, + author={Jinyu Xiang and Jiayi Zhang and Zhaoyang Yu and Fengwei Teng and Jinhao Tu and Xinbing Liang and Sirui Hong and Chenglin Wu and Yuyu Luo}, + year={2025}, + eprint={2502.06855}, + archivePrefix={arXiv}, + primaryClass={cs.CL}, + url={https://arxiv.org/abs/2502.06855}, +} +``` \ No newline at end of file diff --git a/examples/spo/config2.example.yaml b/examples/spo/config2.example.yaml new file mode 100644 index 0000000000..3afa5406b5 --- /dev/null +++ b/examples/spo/config2.example.yaml @@ -0,0 +1,12 @@ +models: + "": # model: "gpt-4-turbo" # or gpt-3.5-turbo + api_type: "openai" # or azure / ollama / groq etc. + base_url: "" + api_key: "" + temperature: 0 + "": + api_type: "openai" + base_url: "" + api_key: "" + temperature: 0 + diff --git a/examples/spo/optimize.py b/examples/spo/optimize.py new file mode 100644 index 0000000000..0f11f043ab --- /dev/null +++ b/examples/spo/optimize.py @@ -0,0 +1,49 @@ +import argparse + +from metagpt.ext.spo.components.optimizer import PromptOptimizer +from metagpt.ext.spo.utils.llm_client import SPO_LLM + + +def parse_args(): + parser = argparse.ArgumentParser(description="SPO PromptOptimizer CLI") + + # LLM parameter + parser.add_argument("--opt-model", type=str, default="claude-3-5-sonnet-20240620", help="Model for optimization") + parser.add_argument("--opt-temp", type=float, default=0.7, help="Temperature for optimization") + parser.add_argument("--eval-model", type=str, default="gpt-4o-mini", help="Model for evaluation") + parser.add_argument("--eval-temp", type=float, default=0.3, help="Temperature for evaluation") + parser.add_argument("--exec-model", type=str, default="gpt-4o-mini", help="Model for execution") + parser.add_argument("--exec-temp", type=float, default=0, help="Temperature for execution") + + # PromptOptimizer parameter + parser.add_argument("--workspace", type=str, default="workspace", help="Path for optimized output") + parser.add_argument("--initial-round", type=int, default=1, help="Initial round number") + parser.add_argument("--max-rounds", type=int, default=10, help="Maximum number of rounds") + parser.add_argument("--template", type=str, default="Poem.yaml", help="Template file name") + parser.add_argument("--name", type=str, default="Poem", help="Project name") + + return parser.parse_args() + + +def main(): + args = parse_args() + + SPO_LLM.initialize( + optimize_kwargs={"model": args.opt_model, "temperature": args.opt_temp}, + evaluate_kwargs={"model": args.eval_model, "temperature": args.eval_temp}, + execute_kwargs={"model": args.exec_model, "temperature": args.exec_temp}, + ) + + optimizer = PromptOptimizer( + optimized_path=args.workspace, + initial_round=args.initial_round, + max_rounds=args.max_rounds, + template=args.template, + name=args.name, + ) + + optimizer.optimize() + + +if __name__ == "__main__": + main() diff --git a/metagpt/ext/spo/__init__.py b/metagpt/ext/spo/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/metagpt/ext/spo/app.py b/metagpt/ext/spo/app.py new file mode 100644 index 0000000000..963775be4e --- /dev/null +++ b/metagpt/ext/spo/app.py @@ -0,0 +1,283 @@ +import asyncio +from pathlib import Path +from typing import Dict + +import streamlit as st +import yaml +from loguru import logger as _logger + +from metagpt.const import METAGPT_ROOT +from metagpt.ext.spo.components.optimizer import PromptOptimizer +from metagpt.ext.spo.utils.llm_client import SPO_LLM, RequestType + + +def load_yaml_template(template_path: Path) -> Dict: + if template_path.exists(): + with open(template_path, "r", encoding="utf-8") as f: + return yaml.safe_load(f) + return {"prompt": "", "requirements": "", "count": None, "faq": [{"question": "", "answer": ""}]} + + +def save_yaml_template(template_path: Path, data: Dict) -> None: + template_format = { + "prompt": str(data.get("prompt", "")), + "requirements": str(data.get("requirements", "")), + "count": data.get("count"), + "faq": [ + {"question": str(faq.get("question", "")).strip(), "answer": str(faq.get("answer", "")).strip()} + for faq in data.get("faq", []) + ], + } + + template_path.parent.mkdir(parents=True, exist_ok=True) + + with open(template_path, "w", encoding="utf-8") as f: + yaml.dump(template_format, f, allow_unicode=True, sort_keys=False, default_flow_style=False, indent=2) + + +def display_optimization_results(result_data): + for result in result_data: + round_num = result["round"] + success = result["succeed"] + prompt = result["prompt"] + + with st.expander(f"Round {round_num} {':white_check_mark:' if success else ':x:'}"): + st.markdown("**Prompt:**") + st.code(prompt, language="text") + st.markdown("
", unsafe_allow_html=True) + + col1, col2 = st.columns(2) + with col1: + st.markdown(f"**Status:** {'Success βœ… ' if success else 'Failed ❌ '}") + with col2: + st.markdown(f"**Tokens:** {result['tokens']}") + + st.markdown("**Answers:**") + for idx, answer in enumerate(result["answers"]): + st.markdown(f"**Question {idx + 1}:**") + st.text(answer["question"]) + st.markdown("**Answer:**") + st.text(answer["answer"]) + st.markdown("---") + + # Summary + success_count = sum(1 for r in result_data if r["succeed"]) + total_rounds = len(result_data) + + st.markdown("### Summary") + col1, col2 = st.columns(2) + with col1: + st.metric("Total Rounds", total_rounds) + with col2: + st.metric("Successful Rounds", success_count) + + +def main(): + if "optimization_results" not in st.session_state: + st.session_state.optimization_results = [] + + st.title("SPO | Self-Supervised Prompt Optimization πŸ€–") + + # Sidebar for configurations + with st.sidebar: + st.header("Configuration") + + # Template Selection/Creation + settings_path = Path("metagpt/ext/spo/settings") + existing_templates = [f.stem for f in settings_path.glob("*.yaml")] + + template_mode = st.radio("Template Mode", ["Use Existing", "Create New"]) + + if template_mode == "Use Existing": + template_name = st.selectbox("Select Template", existing_templates) + else: + template_name = st.text_input("New Template Name") + if template_name and not template_name.endswith(".yaml"): + template_name = f"{template_name}" + + # LLM Settings + st.subheader("LLM Settings") + opt_model = st.selectbox( + "Optimization Model", ["claude-3-5-sonnet-20240620", "gpt-4o", "gpt-4o-mini", "deepseek-chat"], index=0 + ) + opt_temp = st.slider("Optimization Temperature", 0.0, 1.0, 0.7) + + eval_model = st.selectbox( + "Evaluation Model", ["gpt-4o-mini", "claude-3-5-sonnet-20240620", "gpt-4o", "deepseek-chat"], index=0 + ) + eval_temp = st.slider("Evaluation Temperature", 0.0, 1.0, 0.3) + + exec_model = st.selectbox( + "Execution Model", ["gpt-4o-mini", "claude-3-5-sonnet-20240620", "gpt-4o", "deepseek-chat"], index=0 + ) + exec_temp = st.slider("Execution Temperature", 0.0, 1.0, 0.0) + + # Optimizer Settings + st.subheader("Optimizer Settings") + initial_round = st.number_input("Initial Round", 1, 100, 1) + max_rounds = st.number_input("Maximum Rounds", 1, 100, 10) + + # Main content area + st.header("Template Configuration") + + if template_name: + template_path = settings_path / f"{template_name}.yaml" + template_data = load_yaml_template(template_path) + + if "current_template" not in st.session_state or st.session_state.current_template != template_name: + st.session_state.current_template = template_name + st.session_state.faqs = template_data.get("faq", []) + + # Edit template sections + prompt = st.text_area("Prompt", template_data.get("prompt", ""), height=100) + requirements = st.text_area("Requirements", template_data.get("requirements", ""), height=100) + + # FAQ section + st.subheader("FAQ Examples") + + # Add new FAQ button + if st.button("Add New FAQ"): + st.session_state.faqs.append({"question": "", "answer": ""}) + + # Edit FAQs + new_faqs = [] + for i in range(len(st.session_state.faqs)): + st.markdown(f"**FAQ #{i + 1}**") + col1, col2, col3 = st.columns([45, 45, 10]) + + with col1: + question = st.text_area( + f"Question {i + 1}", st.session_state.faqs[i].get("question", ""), key=f"q_{i}", height=100 + ) + with col2: + answer = st.text_area( + f"Answer {i + 1}", st.session_state.faqs[i].get("answer", ""), key=f"a_{i}", height=100 + ) + with col3: + if st.button("πŸ—‘οΈ", key=f"delete_{i}"): + st.session_state.faqs.pop(i) + st.rerun() + + new_faqs.append({"question": question, "answer": answer}) + + # Save template button + if st.button("Save Template"): + new_template_data = {"prompt": prompt, "requirements": requirements, "count": None, "faq": new_faqs} + + save_yaml_template(template_path, new_template_data) + + st.session_state.faqs = new_faqs + st.success(f"Template saved to {template_path}") + + st.subheader("Current Template Preview") + preview_data = {"prompt": prompt, "requirements": requirements, "count": None, "faq": new_faqs} + st.code(yaml.dump(preview_data, allow_unicode=True), language="yaml") + + st.subheader("Optimization Logs") + log_container = st.empty() + + class StreamlitSink: + def write(self, message): + current_logs = st.session_state.get("logs", []) + current_logs.append(message.strip()) + st.session_state.logs = current_logs + + log_container.code("\n".join(current_logs), language="plaintext") + + streamlit_sink = StreamlitSink() + _logger.remove() + + def prompt_optimizer_filter(record): + return "optimizer" in record["name"].lower() + + _logger.add( + streamlit_sink.write, + format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {level: <8} | {name}:{function}:{line} - {message}", + filter=prompt_optimizer_filter, + ) + _logger.add(METAGPT_ROOT / "logs/{time:YYYYMMDD}.txt", level="DEBUG") + + # Start optimization button + if st.button("Start Optimization"): + try: + # Initialize LLM + SPO_LLM.initialize( + optimize_kwargs={"model": opt_model, "temperature": opt_temp}, + evaluate_kwargs={"model": eval_model, "temperature": eval_temp}, + execute_kwargs={"model": exec_model, "temperature": exec_temp}, + ) + + # Create optimizer instance + optimizer = PromptOptimizer( + optimized_path="workspace", + initial_round=initial_round, + max_rounds=max_rounds, + template=f"{template_name}.yaml", + name=template_name, + ) + + # Run optimization with progress bar + with st.spinner("Optimizing prompts..."): + optimizer.optimize() + + st.success("Optimization completed!") + + st.header("Optimization Results") + + prompt_path = optimizer.root_path / "prompts" + result_data = optimizer.data_utils.load_results(prompt_path) + + st.session_state.optimization_results = result_data + + except Exception as e: + st.error(f"An error occurred: {str(e)}") + _logger.error(f"Error during optimization: {str(e)}") + + if st.session_state.optimization_results: + st.header("Optimization Results") + display_optimization_results(st.session_state.optimization_results) + + st.markdown("---") + st.subheader("Test Optimized Prompt") + col1, col2 = st.columns(2) + + with col1: + test_prompt = st.text_area("Optimized Prompt", value="", height=200, key="test_prompt") + + with col2: + test_question = st.text_area("Your Question", value="", height=200, key="test_question") + + if st.button("Test Prompt"): + if test_prompt and test_question: + try: + with st.spinner("Generating response..."): + SPO_LLM.initialize( + optimize_kwargs={"model": opt_model, "temperature": opt_temp}, + evaluate_kwargs={"model": eval_model, "temperature": eval_temp}, + execute_kwargs={"model": exec_model, "temperature": exec_temp}, + ) + + llm = SPO_LLM.get_instance() + messages = [{"role": "user", "content": f"{test_prompt}\n\n{test_question}"}] + + async def get_response(): + return await llm.responser(request_type=RequestType.EXECUTE, messages=messages) + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + response = loop.run_until_complete(get_response()) + finally: + loop.close() + + st.subheader("Response:") + st.markdown(response) + + except Exception as e: + st.error(f"Error generating response: {str(e)}") + else: + st.warning("Please enter both prompt and question.") + + +if __name__ == "__main__": + main() diff --git a/metagpt/ext/spo/components/__init__.py b/metagpt/ext/spo/components/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/metagpt/ext/spo/components/evaluator.py b/metagpt/ext/spo/components/evaluator.py new file mode 100644 index 0000000000..952ef211ba --- /dev/null +++ b/metagpt/ext/spo/components/evaluator.py @@ -0,0 +1,75 @@ +# -*- coding: utf-8 -*- +# @Date : 8/23/2024 10:00 AM +# @Author : all +# @Desc : Evaluation for different datasets +import asyncio +import random +from typing import Any, Dict + +from metagpt.ext.spo.prompts.evaluate_prompt import EVALUATE_PROMPT +from metagpt.ext.spo.utils import load +from metagpt.ext.spo.utils.llm_client import SPO_LLM, RequestType, extract_content +from metagpt.logs import logger + + +class QuickExecute: + """ + Execute Prompt + """ + + def __init__(self, prompt: str): + self.prompt = prompt + self.llm = SPO_LLM.get_instance() + + async def prompt_execute(self) -> tuple[Any]: + _, _, qa, _ = load.load_meta_data() + answers = [] + + async def fetch_answer(q: str) -> Dict[str, Any]: + messages = [{"role": "user", "content": f"{self.prompt}\n\n{q}"}] + try: + answer = await self.llm.responser(request_type=RequestType.EXECUTE, messages=messages) + return {"question": q, "answer": answer} + except Exception as e: + return {"question": q, "answer": str(e)} + + tasks = [fetch_answer(item["question"]) for item in qa] + answers = await asyncio.gather(*tasks) + + return answers + + +class QuickEvaluate: + """ + Complete the evaluation for different answers here. + """ + + def __init__(self): + self.llm = SPO_LLM.get_instance() + + async def prompt_evaluate(self, samples: dict, new_samples: dict) -> bool: + _, requirement, qa, _ = load.load_meta_data() + + if random.random() < 0.5: + samples, new_samples = new_samples, samples + is_swapped = True + else: + is_swapped = False + + messages = [ + { + "role": "user", + "content": EVALUATE_PROMPT.format( + requirement=requirement, sample=samples, new_sample=new_samples, answers=str(qa) + ), + } + ] + + try: + response = await self.llm.responser(request_type=RequestType.EVALUATE, messages=messages) + choose = extract_content(response, "choose") + return choose == "A" if is_swapped else choose == "B" + + except Exception as e: + logger.error(e) + return False diff --git a/metagpt/ext/spo/components/optimizer.py b/metagpt/ext/spo/components/optimizer.py new file mode 100644 index 0000000000..0ce588f44b --- /dev/null +++ b/metagpt/ext/spo/components/optimizer.py @@ -0,0 +1,136 @@ +# -*- coding: utf-8 -*- +# @Date : 8/12/2024 22:00 PM +# @Author : issac +# @Desc : optimizer for prompt + +import asyncio +from pathlib import Path +from typing import List + +from metagpt.ext.spo.prompts.optimize_prompt import PROMPT_OPTIMIZE_PROMPT +from metagpt.ext.spo.utils import load +from metagpt.ext.spo.utils.data_utils import DataUtils +from metagpt.ext.spo.utils.evaluation_utils import EvaluationUtils +from metagpt.ext.spo.utils.llm_client import SPO_LLM, RequestType, extract_content +from metagpt.ext.spo.utils.prompt_utils import PromptUtils +from metagpt.logs import logger + + +class PromptOptimizer: + def __init__( + self, + optimized_path: str = None, + initial_round: int = 1, + max_rounds: int = 10, + name: str = "", + template: str = "", + ) -> None: + self.name = name + self.root_path = Path(optimized_path) / self.name + self.top_scores = [] + self.round = initial_round + self.max_rounds = max_rounds + self.template = template + + self.prompt_utils = PromptUtils(self.root_path) + self.data_utils = DataUtils(self.root_path) + self.evaluation_utils = EvaluationUtils(self.root_path) + self.llm = SPO_LLM.get_instance() + + def optimize(self): + for opt_round in range(self.max_rounds): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + loop.run_until_complete(self._optimize_prompt()) + self.round += 1 + + self.show_final_result() + + def show_final_result(self): + best_round = self.data_utils.get_best_round() + + logger.info("\n" + "=" * 50) + logger.info("\nπŸ† OPTIMIZATION COMPLETED - FINAL RESULTS πŸ†\n") + logger.info(f"\nπŸ“Œ Best Performing Round: {best_round['round']}") + logger.info(f"\n🎯 Final Optimized Prompt:\n{best_round['prompt']}") + logger.info("\n" + "=" * 50 + "\n") + + async def _optimize_prompt(self): + prompt_path = self.root_path / "prompts" + load.set_file_name(self.template) + data = self.data_utils.load_results(prompt_path) + + if self.round == 1: + await self._handle_first_round(prompt_path, data) + return + + directory = self.prompt_utils.create_round_directory(prompt_path, self.round) + new_prompt = await self._generate_optimized_prompt() + self.prompt = new_prompt + + logger.info(f"\nRound {self.round} Prompt: {self.prompt}\n") + self.prompt_utils.write_prompt(directory, prompt=self.prompt) + + success, answers = await self._evaluate_new_prompt(prompt_path, data, directory) + self._log_optimization_result(success) + + return self.prompt + + async def _handle_first_round(self, prompt_path: Path, data: List[dict]) -> None: + logger.info("\n⚑ RUNNING Round 1 PROMPT ⚑\n") + directory = self.prompt_utils.create_round_directory(prompt_path, self.round) + + prompt, _, _, _ = load.load_meta_data() + self.prompt = prompt + self.prompt_utils.write_prompt(directory, prompt=self.prompt) + + new_samples = await self.evaluation_utils.execute_prompt(self, directory) + _, answers = await self.evaluation_utils.evaluate_prompt( + self, None, new_samples, path=prompt_path, data=data, initial=True + ) + self.prompt_utils.write_answers(directory, answers=answers) + + async def _generate_optimized_prompt(self): + _, requirements, qa, count = load.load_meta_data() + samples = self.data_utils.get_best_round() + + logger.info(f"\nπŸš€Round {self.round} OPTIMIZATION STARTING πŸš€\n") + logger.info(f"\nSelecting prompt for round {samples['round']} and advancing to the iteration phase\n") + + golden_answer = self.data_utils.list_to_markdown(qa) + best_answer = self.data_utils.list_to_markdown(samples["answers"]) + + optimize_prompt = PROMPT_OPTIMIZE_PROMPT.format( + prompt=samples["prompt"], + answers=best_answer, + requirements=requirements, + golden_answers=golden_answer, + count=count, + ) + + response = await self.llm.responser( + request_type=RequestType.OPTIMIZE, messages=[{"role": "user", "content": optimize_prompt}] + ) + + modification = extract_content(response, "modification") + logger.info(f"Modification of {self.round} round: {modification}") + + prompt = extract_content(response, "prompt") + return prompt if prompt else "" + + async def _evaluate_new_prompt(self, prompt_path, data, directory): + logger.info("\n⚑ RUNNING OPTIMIZED PROMPT ⚑\n") + new_samples = await self.evaluation_utils.execute_prompt(self, directory) + + logger.info("\nπŸ“Š EVALUATING OPTIMIZED PROMPT πŸ“Š\n") + samples = self.data_utils.get_best_round() + success, answers = await self.evaluation_utils.evaluate_prompt( + self, samples, new_samples, path=prompt_path, data=data, initial=False + ) + + self.prompt_utils.write_answers(directory, answers=answers) + return success, answers + + def _log_optimization_result(self, success): + logger.info("\n🎯 OPTIMIZATION RESULT 🎯\n") + logger.info(f"\nRound {self.round} Optimization: {'βœ… SUCCESS' if success else '❌ FAILED'}\n") diff --git a/metagpt/ext/spo/prompts/evaluate_prompt.py b/metagpt/ext/spo/prompts/evaluate_prompt.py new file mode 100644 index 0000000000..80a9b093bf --- /dev/null +++ b/metagpt/ext/spo/prompts/evaluate_prompt.py @@ -0,0 +1,20 @@ +EVALUATE_PROMPT = """ +Based on the original requirements, evaluate the two responses, A and B, and determine which one better meets the requirements. If a reference answer is provided, strictly follow the format/content of the reference answer. + +# Requirement +{requirement} + +# A +{sample} + +# B +{new_sample} + +# Golden answer +{answers} + +Provide your analysis and the choice you believe is better, using XML tags to encapsulate your response. + +Some analysis +A/B (the better answer in your opinion) +""" diff --git a/metagpt/ext/spo/prompts/optimize_prompt.py b/metagpt/ext/spo/prompts/optimize_prompt.py new file mode 100644 index 0000000000..f6ca81e334 --- /dev/null +++ b/metagpt/ext/spo/prompts/optimize_prompt.py @@ -0,0 +1,32 @@ +PROMPT_OPTIMIZE_PROMPT = """ +You are building a prompt to address user requirement. Based on the given prompt, +please reconstruct and optimize it. You can add, modify, or delete prompts. Please include a single modification in +XML tags in your reply. During the optimization, you can incorporate any thinking models. +This is a prompt that performed excellently in a previous iteration. You must make further optimizations and improvements based on this prompt. The modified prompt must differ from the provided example. + +requirements: +``` +{requirements} +``` + +reference prompt: +``` +{prompt} +``` + +The execution result of this reference prompt is(some cases): +``` +{answers} +``` + +The best answer we expect(some cases): +``` +{golden_answers} +``` + +Provide your analysis, optimization points, and the complete optimized prompt using the following XML format: + +Analyze what drawbacks exist in the results produced by the reference prompt and how to improve them. +Summarize the key points for improvement in one sentence +Provide the complete optimized prompt {count} +""" diff --git a/metagpt/ext/spo/settings/Navigate.yaml b/metagpt/ext/spo/settings/Navigate.yaml new file mode 100644 index 0000000000..a5d8a16510 --- /dev/null +++ b/metagpt/ext/spo/settings/Navigate.yaml @@ -0,0 +1,47 @@ +prompt: | + Please think step by step. + Ensure the response concludes with the answer in the XML format: + [Yes or No]. + +requirements: | + Must put the final answer at the end with XML. ((Yes or No),such as Yes) + The provided prompt needs to adapt to all current types of questions. + +count: None + +faq: + - question: | + If you follow these instructions, do you return to the starting point? Always face forward. Take 7 steps left. Take 2 steps backward. Take 7 steps backward. Take 7 steps backward. Take 3 steps forward. + Options: + - Yes + - No + + answer: | + A lot of thinking and analysis processes. + ... + Final Answer: + (Yes or No) + + - question: | + If you follow these instructions, do you return to the starting point? Always face forward. Take 6 steps backward. Take 8 steps left. Take 3 steps right. Take 7 steps forward. Take 3 steps right. Take 9 steps right. Take 1 step backward. Take 7 steps left. + Options: + - Yes + - No + + answer: | + A lot of thinking and analysis processes. + ... + Final Answer: + (Yes or No) + + - question: | + If you follow these instructions, do you return to the starting point? Turn left. Turn left. Take 6 steps. Take 3 steps. Turn around. Take 1 step. Take 3 steps. Take 5 steps. + Options: + - Yes + - No + + answer: | + A lot of thinking and analysis processes. + ... + Final Answer: + (Yes or No) diff --git a/metagpt/ext/spo/settings/Poem.yaml b/metagpt/ext/spo/settings/Poem.yaml new file mode 100644 index 0000000000..74aa1565f4 --- /dev/null +++ b/metagpt/ext/spo/settings/Poem.yaml @@ -0,0 +1,23 @@ +prompt: | + Create poetry in the requested style and format. + +requirements: | + None + +count: None + +faq: + - question: | + Write a modern sonnet about climate change + answer: | + None + + - question: | + Create a haiku series about New York City + answer: | + None + + - question: | + Write a free verse poem about social media + answer: | + None diff --git a/metagpt/ext/spo/utils/__init__.py b/metagpt/ext/spo/utils/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/metagpt/ext/spo/utils/data_utils.py b/metagpt/ext/spo/utils/data_utils.py new file mode 100644 index 0000000000..17771c0213 --- /dev/null +++ b/metagpt/ext/spo/utils/data_utils.py @@ -0,0 +1,106 @@ +import datetime +import json +from pathlib import Path +from typing import Dict, List, Union + +import pandas as pd + +from metagpt.logs import logger + + +class DataUtils: + def __init__(self, root_path: Path): + self.root_path = root_path + self.top_scores = [] + + def load_results(self, path: Path) -> list: + result_path = self.get_results_file_path(path) + if result_path.exists(): + try: + return json.loads(result_path.read_text()) + except json.JSONDecodeError: + return [] + return [] + + def get_best_round(self): + self._load_scores() + + for entry in self.top_scores: + if entry["succeed"]: + return entry + + return None + + def get_results_file_path(self, prompt_path: Path) -> Path: + return prompt_path / "results.json" + + def create_result_data(self, round: int, answers: list[dict], prompt: str, succeed: bool, tokens: int) -> dict: + now = datetime.datetime.now() + return {"round": round, "answers": answers, "prompt": prompt, "succeed": succeed, "tokens": tokens, "time": now} + + def save_results(self, json_file_path: Path, data: Union[List, Dict]): + json_path = json_file_path + json_path.write_text(json.dumps(data, default=str, indent=4)) + + def _load_scores(self): + rounds_dir = self.root_path / "prompts" + result_file = rounds_dir / "results.json" + self.top_scores = [] + + try: + if not result_file.exists(): + logger.warning(f"Results file not found at {result_file}") + return self.top_scores + + data = json.loads(result_file.read_text(encoding="utf-8")) + df = pd.DataFrame(data) + + for index, row in df.iterrows(): + self.top_scores.append( + { + "round": row["round"], + "succeed": row["succeed"], + "prompt": row["prompt"], + "answers": row["answers"], + } + ) + + self.top_scores.sort(key=lambda x: x["round"], reverse=True) + + except FileNotFoundError: + logger.error(f"Could not find results file: {result_file}") + except json.JSONDecodeError: + logger.error(f"Invalid JSON format in file: {result_file}") + except Exception as e: + logger.error(f"Unexpected error loading scores: {str(e)}") + + return self.top_scores + + def list_to_markdown(self, questions_list: list): + """ + Convert a list of question-answer dictionaries to a formatted Markdown string. + + Args: + questions_list (list): List of dictionaries containing 'question' and 'answer' keys + + Returns: + str: Formatted Markdown string + """ + markdown_text = "```\n" + + for i, qa_pair in enumerate(questions_list, 1): + # Add question section + markdown_text += f"Question {i}\n\n" + markdown_text += f"{qa_pair['question']}\n\n" + + # Add answer section + markdown_text += f"Answer {i}\n\n" + markdown_text += f"{qa_pair['answer']}\n\n" + + # Add separator between QA pairs except for the last one + if i < len(questions_list): + markdown_text += "---\n\n" + + markdown_text += "\n```" + + return markdown_text diff --git a/metagpt/ext/spo/utils/evaluation_utils.py b/metagpt/ext/spo/utils/evaluation_utils.py new file mode 100644 index 0000000000..9814a70ba7 --- /dev/null +++ b/metagpt/ext/spo/utils/evaluation_utils.py @@ -0,0 +1,74 @@ +from pathlib import Path +from typing import Any, List, Optional, Tuple + +import tiktoken + +from metagpt.ext.spo.components.evaluator import QuickEvaluate, QuickExecute +from metagpt.logs import logger + +EVALUATION_REPETITION = 4 + + +def count_tokens(sample: dict): + if not sample: + return 0 + else: + encoding = tiktoken.get_encoding("cl100k_base") + return len(encoding.encode(str(sample["answers"]))) + + +class EvaluationUtils: + def __init__(self, root_path: Path) -> None: + self.root_path = root_path + + async def execute_prompt(self, optimizer: Any, prompt_path: Path) -> dict: + optimizer.prompt = optimizer.prompt_utils.load_prompt(optimizer.round, prompt_path) + executor = QuickExecute(prompt=optimizer.prompt) + + answers = await executor.prompt_execute() + + cur_round = optimizer.round + + new_data = {"round": cur_round, "answers": answers, "prompt": optimizer.prompt} + + return new_data + + async def evaluate_prompt( + self, + optimizer: Any, + samples: Optional[dict], + new_samples: dict, + path: Path, + data: List[dict], + initial: bool = False, + ) -> Tuple[bool, dict]: + evaluator = QuickEvaluate() + new_token = count_tokens(new_samples) + + if initial is True: + succeed = True + else: + evaluation_results = [] + for _ in range(EVALUATION_REPETITION): + result = await evaluator.prompt_evaluate(samples=samples, new_samples=new_samples) + evaluation_results.append(result) + + logger.info(f"Evaluation Results {evaluation_results}") + + true_count = evaluation_results.count(True) + false_count = evaluation_results.count(False) + succeed = true_count > false_count + + new_data = optimizer.data_utils.create_result_data( + new_samples["round"], new_samples["answers"], new_samples["prompt"], succeed, new_token + ) + + data.append(new_data) + + result_path = optimizer.data_utils.get_results_file_path(path) + + optimizer.data_utils.save_results(result_path, data) + + answers = new_samples["answers"] + + return succeed, answers diff --git a/metagpt/ext/spo/utils/llm_client.py b/metagpt/ext/spo/utils/llm_client.py new file mode 100644 index 0000000000..81524d3c13 --- /dev/null +++ b/metagpt/ext/spo/utils/llm_client.py @@ -0,0 +1,107 @@ +import asyncio +import re +from enum import Enum +from typing import Any, List, Optional + +from metagpt.configs.models_config import ModelsConfig +from metagpt.llm import LLM +from metagpt.logs import logger + + +class RequestType(Enum): + OPTIMIZE = "optimize" + EVALUATE = "evaluate" + EXECUTE = "execute" + + +class SPO_LLM: + _instance: Optional["SPO_LLM"] = None + + def __init__( + self, + optimize_kwargs: Optional[dict] = None, + evaluate_kwargs: Optional[dict] = None, + execute_kwargs: Optional[dict] = None, + ) -> None: + self.evaluate_llm = LLM(llm_config=self._load_llm_config(evaluate_kwargs)) + self.optimize_llm = LLM(llm_config=self._load_llm_config(optimize_kwargs)) + self.execute_llm = LLM(llm_config=self._load_llm_config(execute_kwargs)) + + def _load_llm_config(self, kwargs: dict) -> Any: + model = kwargs.get("model") + if not model: + raise ValueError("'model' parameter is required") + + try: + model_config = ModelsConfig.default().get(model) + if model_config is None: + raise ValueError(f"Model '{model}' not found in configuration") + + config = model_config.model_copy() + + for key, value in kwargs.items(): + if hasattr(config, key): + setattr(config, key, value) + + return config + + except AttributeError: + raise ValueError(f"Model '{model}' not found in configuration") + except Exception as e: + raise ValueError(f"Error loading configuration for model '{model}': {str(e)}") + + async def responser(self, request_type: RequestType, messages: List[dict]) -> str: + llm_mapping = { + RequestType.OPTIMIZE: self.optimize_llm, + RequestType.EVALUATE: self.evaluate_llm, + RequestType.EXECUTE: self.execute_llm, + } + + llm = llm_mapping.get(request_type) + if not llm: + raise ValueError(f"Invalid request type. Valid types: {', '.join([t.value for t in RequestType])}") + + response = await llm.acompletion(messages) + return response.choices[0].message.content + + @classmethod + def initialize(cls, optimize_kwargs: dict, evaluate_kwargs: dict, execute_kwargs: dict) -> None: + """Initialize the global instance""" + cls._instance = cls(optimize_kwargs, evaluate_kwargs, execute_kwargs) + + @classmethod + def get_instance(cls) -> "SPO_LLM": + """Get the global instance""" + if cls._instance is None: + raise RuntimeError("SPO_LLM not initialized. Call initialize() first.") + return cls._instance + + +def extract_content(xml_string: str, tag: str) -> Optional[str]: + pattern = rf"<{tag}>(.*?)" + match = re.search(pattern, xml_string, re.DOTALL) + return match.group(1).strip() if match else None + + +async def main(): + # test LLM + SPO_LLM.initialize( + optimize_kwargs={"model": "gpt-4o", "temperature": 0.7}, + evaluate_kwargs={"model": "gpt-4o-mini", "temperature": 0.3}, + execute_kwargs={"model": "gpt-4o-mini", "temperature": 0.3}, + ) + + llm = SPO_LLM.get_instance() + + # test messages + hello_msg = [{"role": "user", "content": "hello"}] + response = await llm.responser(request_type=RequestType.EXECUTE, messages=hello_msg) + logger(f"AI: {response}") + response = await llm.responser(request_type=RequestType.OPTIMIZE, messages=hello_msg) + logger(f"AI: {response}") + response = await llm.responser(request_type=RequestType.EVALUATE, messages=hello_msg) + logger(f"AI: {response}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/metagpt/ext/spo/utils/load.py b/metagpt/ext/spo/utils/load.py new file mode 100644 index 0000000000..f8c4f53fce --- /dev/null +++ b/metagpt/ext/spo/utils/load.py @@ -0,0 +1,48 @@ +import random +from pathlib import Path + +import yaml + +FILE_NAME = "" +SAMPLE_K = 3 + + +def set_file_name(name: str): + global FILE_NAME + FILE_NAME = name + + +def load_meta_data(k: int = SAMPLE_K): + # load yaml file + config_path = Path(__file__).parent.parent / "settings" / FILE_NAME + + if not config_path.exists(): + raise FileNotFoundError(f"Configuration file '{FILE_NAME}' not found in settings directory") + + try: + with config_path.open("r", encoding="utf-8") as file: + data = yaml.safe_load(file) + except yaml.YAMLError as e: + raise ValueError(f"Error parsing YAML file '{FILE_NAME}': {str(e)}") + except Exception as e: + raise Exception(f"Error reading file '{FILE_NAME}': {str(e)}") + + qa = [] + + for item in data["faq"]: + question = item["question"] + answer = item["answer"] + qa.append({"question": question, "answer": answer}) + + prompt = data["prompt"] + requirements = data["requirements"] + count = data["count"] + + if isinstance(count, int): + count = f", within {count} words" + else: + count = "" + + random_qa = random.sample(qa, min(k, len(qa))) + + return prompt, requirements, random_qa, count diff --git a/metagpt/ext/spo/utils/prompt_utils.py b/metagpt/ext/spo/utils/prompt_utils.py new file mode 100644 index 0000000000..c1c960bb70 --- /dev/null +++ b/metagpt/ext/spo/utils/prompt_utils.py @@ -0,0 +1,34 @@ +from pathlib import Path + +from metagpt.logs import logger + + +class PromptUtils: + def __init__(self, root_path: Path): + self.root_path = root_path + + def create_round_directory(self, prompt_path: Path, round_number: int) -> Path: + directory = prompt_path / f"round_{round_number}" + directory.mkdir(parents=True, exist_ok=True) + return directory + + def load_prompt(self, round_number: int, prompts_path: Path): + prompt_file = prompts_path / "prompt.txt" + + try: + return prompt_file.read_text(encoding="utf-8") + except FileNotFoundError as e: + logger.info(f"Error loading prompt for round {round_number}: {e}") + raise + + def write_answers(self, directory: Path, answers: dict, name: str = "answers.txt"): + answers_file = directory / name + with answers_file.open("w", encoding="utf-8") as file: + for item in answers: + file.write(f"Question:\n{item['question']}\n") + file.write(f"Answer:\n{item['answer']}\n") + file.write("\n") + + def write_prompt(self, directory: Path, prompt: str): + prompt_file = directory / "prompt.txt" + prompt_file.write_text(prompt, encoding="utf-8")