diff --git a/cli/CLI_COMMANDS.md b/cli/CLI_COMMANDS.md new file mode 100644 index 00000000..7316dcec --- /dev/null +++ b/cli/CLI_COMMANDS.md @@ -0,0 +1,137 @@ +# DeepCode CLI Commands + +## Overview + +The DeepCode CLI has been restructured using Click to support multiple subcommands. This allows for better organization and extensibility. + +## Available Commands + +### Main Command Group + +```bash +python cli/cli_app.py --help +``` + +Shows all available commands and options. + +### Run Interactive Session (Default) + +```bash +# All of these are equivalent: +python cli/cli_app.py +python cli/cli_app.py run +``` + +Launches the interactive DeepCode CLI session where you can: +- Process research papers from URLs +- Upload and process local files +- Chat with the AI to generate code +- View processing history +- Configure settings + +### Configuration Management + +```bash +python cli/cli_app.py config +``` + +Shows configuration options (placeholder for future implementation). + +**Planned features:** +- View current configuration +- Set default processing mode (comprehensive/optimized) +- Configure API keys and endpoints +- Manage workspace settings + +### Cleanup Utility + +```bash +# Clean Python cache files +python cli/cli_app.py clean --cache + +# Clean log files +python cli/cli_app.py clean --logs + +# Clean everything +python cli/cli_app.py clean --all +``` + +Removes temporary files and caches to free up disk space. + +## Adding New Subcommands + +To add a new subcommand, simply add a new function decorated with `@cli.command()` in `cli/cli_app.py`: + +```python +@cli.command() +@click.option('--option-name', help='Description') +def my_command(option_name): + """Description of what this command does""" + # Your implementation here + click.echo("Command executed!") +``` + +## Examples + +### Basic Usage + +```bash +# Start interactive session +python cli/cli_app.py + +# Or explicitly use the run command +python cli/cli_app.py run +``` + +### Cleanup Examples + +```bash +# Clean only cache files +python cli/cli_app.py clean --cache + +# Clean only logs +python cli/cli_app.py clean --logs + +# Clean everything +python cli/cli_app.py clean --all +``` + +### Getting Help + +```bash +# General help +python cli/cli_app.py --help + +# Help for specific command +python cli/cli_app.py clean --help +python cli/cli_app.py config --help +``` + +## Version Information + +```bash +python cli/cli_app.py --version +``` + +## Migration Notes + +### For Developers + +The previous `main()` coroutine has been renamed to `run_interactive_cli()` and is now wrapped by Click commands: + +- **Old:** `asyncio.run(main())` +- **New:** `cli()` (Click group) → `run()` command → `run_interactive_cli()` + +### Backward Compatibility + +The CLI launcher (`cli_launcher.py`) has been updated to use the new Click-based structure, maintaining backward compatibility with existing workflows. + +## Future Enhancements + +Potential subcommands to add: +- `deepcode process --file ` - Direct file processing +- `deepcode process --url ` - Direct URL processing +- `deepcode history` - View processing history +- `deepcode export` - Export results +- `deepcode doctor` - Check system dependencies and configuration + diff --git a/cli/__init__.py b/cli/__init__.py index 313bd6f3..735b107c 100644 --- a/cli/__init__.py +++ b/cli/__init__.py @@ -11,8 +11,8 @@ __version__ = "1.0.0" __author__ = "DeepCode Team - Data Intelligence Lab @ HKU" -from .cli_app import main as cli_main +from .cli_app import cli, run_interactive_cli from .cli_interface import CLIInterface from .cli_launcher import main as launcher_main -__all__ = ["cli_main", "CLIInterface", "launcher_main"] +__all__ = ["cli", "run_interactive_cli", "CLIInterface", "launcher_main"] diff --git a/cli/cli_app.py b/cli/cli_app.py index 0b0627fa..022417de 100644 --- a/cli/cli_app.py +++ b/cli/cli_app.py @@ -23,7 +23,6 @@ sys.path.insert(0, parent_dir) # 导入MCP应用和工作流 - from cli.workflows import CLIWorkflowAdapter from cli.cli_interface import CLIInterface, Colors @@ -31,13 +30,16 @@ class CLIApp: """CLI应用主类 - 升级版智能体编排引擎""" - def __init__(self): + def __init__(self) -> None: self.cli = CLIInterface() self.workflow_adapter = CLIWorkflowAdapter(cli_interface=self.cli) self.app = None # Will be initialized by workflow adapter self.logger = None - self.context = None + # Context for storing last run metadata (input_source, input_type, error_flag) + # 同时用于 /retry-last 聊天命令 + self.context = {"last_input": None} # Document segmentation will be managed by CLI interface + self._interrupt_handled = False # Track if KeyboardInterrupt was already handled async def initialize_mcp_app(self): """初始化MCP应用 - 使用工作流适配器""" @@ -49,7 +51,12 @@ async def cleanup_mcp_app(self): await self.workflow_adapter.cleanup_mcp_app() async def process_input(self, input_source: str, input_type: str): - """处理输入源(URL或文件)- 使用升级版智能体编排引擎""" + """处理输入源(URL或文件/聊天)- 使用升级版智能体编排引擎 + + 同时在 ``self.context["last_input"]`` 中记录最近一次运行的 + ``(input_source, input_type, error_flag)`` 信息,供 /retry-last 使用。 + """ + try: # Document segmentation configuration is managed by CLI interface @@ -68,7 +75,10 @@ async def process_input(self, input_source: str, input_type: str): enable_indexing=self.cli.enable_indexing, ) - if result["status"] == "success": + # 标记本次运行是否出错 + error_flag = result.get("status") != "success" + + if not error_flag: # 显示完成状态 final_stage = 8 if self.cli.enable_indexing else 5 self.cli.display_processing_stages( @@ -94,9 +104,18 @@ async def process_input(self, input_source: str, input_type: str): # 添加到历史记录 self.cli.add_to_history(input_source, result) + # 在上下文中记录最近一次运行的输入信息 + if self.context is None or not isinstance(self.context, dict): + self.context = {"last_input": None} + self.context["last_input"] = { + "input_source": input_source, + "input_type": input_type, + "error": error_flag, + } + return result - except Exception as e: + except Exception as e: # noqa: BLE001 error_msg = str(e) self.cli.print_error_box("Agent Orchestration Error", error_msg) self.cli.print_status(f"Error during orchestration: {error_msg}", "error") @@ -105,6 +124,15 @@ async def process_input(self, input_source: str, input_type: str): error_result = {"status": "error", "error": error_msg} self.cli.add_to_history(input_source, error_result) + # 在上下文中记录最近一次失败运行的信息 + if self.context is None or not isinstance(self.context, dict): + self.context = {"last_input": None} + self.context["last_input"] = { + "input_source": input_source, + "input_type": input_type, + "error": True, + } + return error_result def display_results( @@ -143,7 +171,7 @@ def display_results( if len(analysis_result) > 1000 else analysis_result ) - except Exception: + except Exception: # noqa: BLE001 print( analysis_result[:1000] + "..." if len(analysis_result) > 1000 @@ -233,8 +261,43 @@ async def run_interactive_session(self): elif choice in ["t", "chat", "text"]: chat_input = self.cli.get_chat_input() - if chat_input: - await self.process_input(chat_input, "chat") + if not chat_input: + # 用户取消或未提供输入 + continue + + # 处理聊天命令(以 "/" 开头) + if chat_input.strip() == "/retry-last": + last = None + if isinstance(self.context, dict): + last = self.context.get("last_input") + + if not last: + self.cli.print_status( + "No previous run available to retry.", "warning" + ) + elif not last.get("error"): + self.cli.print_status( + "Last run was successful; nothing to retry.", "info" + ) + else: + source = last.get("input_source") + input_type = last.get("input_type", "chat") + if not source: + self.cli.print_status( + "Previous failed run has no input source to retry.", + "error", + ) + else: + self.cli.print_status( + "Retrying last failed input...", "processing" + ) + await self.process_input(source, input_type) + + # 处理完命令后继续主循环 + continue + + # 普通聊天输入 - 直接作为 chat 类型处理 + await self.process_input(chat_input, "chat") elif choice in ["h", "history"]: self.cli.show_history() @@ -255,8 +318,12 @@ async def run_interactive_session(self): self.cli.print_status("Session ended by user", "info") except KeyboardInterrupt: - print(f"\n{Colors.WARNING}⚠️ Process interrupted by user{Colors.ENDC}") - except Exception as e: + if not self._interrupt_handled: + self._interrupt_handled = True + print( + f"\n{Colors.WARNING}⚠️ Process interrupted by user{Colors.ENDC}" + ) + except Exception as e: # noqa: BLE001 print(f"\n{Colors.FAIL}❌ Unexpected error: {str(e)}{Colors.ENDC}") finally: # 清理资源 @@ -266,6 +333,7 @@ async def run_interactive_session(self): async def main(): """主函数""" start_time = time.time() + app = None try: # 创建并运行CLI应用 @@ -273,8 +341,12 @@ async def main(): await app.run_interactive_session() except KeyboardInterrupt: - print(f"\n{Colors.WARNING}⚠️ Application interrupted by user{Colors.ENDC}") - except Exception as e: + # Only print if not already handled by run_interactive_session + if app is None or not app._interrupt_handled: + print( + f"\n{Colors.WARNING}⚠️ Application interrupted by user{Colors.ENDC}" + ) + except Exception as e: # noqa: BLE001 print(f"\n{Colors.FAIL}❌ Application error: {str(e)}{Colors.ENDC}") finally: end_time = time.time() diff --git a/cli/cli_launcher.py b/cli/cli_launcher.py index 255d20f9..4f567551 100644 --- a/cli/cli_launcher.py +++ b/cli/cli_launcher.py @@ -129,14 +129,12 @@ def main(): # 导入并运行CLI应用 if str(project_root) not in sys.path: sys.path.insert(0, str(project_root)) # 添加项目根目录到路径 - from cli.cli_app import main as cli_main + from cli.cli_app import cli print("\n🎯 Launching CLI application...") - # 使用asyncio运行主函数 - import asyncio - - asyncio.run(cli_main()) + # 使用Click CLI运行主函数 (默认运行 'run' 命令) + cli(["run"]) except KeyboardInterrupt: print("\n\n🛑 DeepCode CLI stopped by user") diff --git a/cli/workflows/cli_workflow_adapter.py b/cli/workflows/cli_workflow_adapter.py index dda79b3d..1fb654de 100644 --- a/cli/workflows/cli_workflow_adapter.py +++ b/cli/workflows/cli_workflow_adapter.py @@ -7,7 +7,10 @@ """ import os -from typing import Callable, Dict, Any +from datetime import datetime +from typing import Any, Callable, Dict + +import httpx from mcp_agent.app import MCPApp @@ -22,6 +25,9 @@ class CLIWorkflowAdapter: - Integration with the latest agent orchestration engine """ + # Maximum length for result strings to avoid building giant strings + MAX_RESULT_LENGTH = 1000 + def __init__(self, cli_interface=None): """ Initialize CLI workflow adapter. @@ -34,6 +40,24 @@ def __init__(self, cli_interface=None): self.logger = None self.context = None + @staticmethod + def _truncate_result(result: str, max_length: int = MAX_RESULT_LENGTH) -> str: + """ + Truncate result string at source to avoid building giant strings. + + Args: + result: Result string to truncate + max_length: Maximum length (default: 1000) + + Returns: + Truncated string with ellipsis if needed + """ + if not result: + return "" + if len(result) <= max_length: + return result + return result[:max_length] + "..." + async def initialize_mcp_app(self) -> Dict[str, Any]: """ Initialize MCP application for CLI usage. @@ -94,8 +118,7 @@ async def cleanup_mcp_app(self): ) def create_cli_progress_callback(self) -> Callable: - """ - Create CLI-optimized progress callback function. + """Create CLI-optimized progress callback function. Returns: Callable: Progress callback function @@ -126,11 +149,46 @@ def progress_callback(progress: int, message: str): return progress_callback + async def _send_pipeline_webhook_notification( + self, + input_source: str, + enable_indexing: bool, + pipeline_mode: str, + result: Dict[str, Any], + ) -> None: + """Send an optional webhook notification when a pipeline run completes. + + The webhook URL is configured via the ``PIPELINE_WEBHOOK_URL`` environment + variable. If it is not set, this method is a no-op. + """ + webhook_url = os.getenv("PIPELINE_WEBHOOK_URL") + if not webhook_url: + return + + payload: Dict[str, Any] = { + "event": "pipeline.completed", + "status": result.get("status", "unknown"), + "pipeline_mode": pipeline_mode, + "input_source": input_source, + "enable_indexing": enable_indexing, + "timestamp": datetime.utcnow().isoformat() + "Z", + } + if "error" in result: + payload["error"] = result["error"] + + try: + async with httpx.AsyncClient(timeout=5.0) as client: + await client.post(webhook_url, json=payload) + except Exception as exc: # Webhook failures must not break the CLI flow + if self.logger: + self.logger.warning( + f"Webhook notification failed for {webhook_url}: {exc}" + ) + async def execute_full_pipeline( self, input_source: str, enable_indexing: bool = True ) -> Dict[str, Any]: - """ - Execute the complete intelligent multi-agent research orchestration pipeline. + """Execute the complete intelligent multi-agent research orchestration pipeline. Args: input_source: Research input source (file path, URL, or preprocessed analysis) @@ -139,6 +197,8 @@ async def execute_full_pipeline( Returns: dict: Comprehensive pipeline execution result """ + pipeline_mode = "comprehensive" if enable_indexing else "optimized" + response: Dict[str, Any] try: # Import the latest agent orchestration engine from workflows.agent_orchestration_engine import ( @@ -150,9 +210,9 @@ async def execute_full_pipeline( # Display pipeline start if self.cli_interface: - mode = "comprehensive" if enable_indexing else "optimized" self.cli_interface.print_status( - f"🚀 Starting {mode} agent orchestration pipeline...", "processing" + f"🚀 Starting {pipeline_mode} agent orchestration pipeline...", + "processing", ) self.cli_interface.display_processing_stages(0) @@ -172,23 +232,32 @@ async def execute_full_pipeline( "complete", ) - return { + response = { "status": "success", "result": result, - "pipeline_mode": "comprehensive" if enable_indexing else "optimized", + "pipeline_mode": pipeline_mode, } - except Exception as e: error_msg = f"Pipeline execution failed: {str(e)}" if self.cli_interface: self.cli_interface.print_status(error_msg, "error") - return { + response = { "status": "error", "error": error_msg, - "pipeline_mode": "comprehensive" if enable_indexing else "optimized", + "pipeline_mode": pipeline_mode, } + # Notify external systems (if configured) that the pipeline has completed + await self._send_pipeline_webhook_notification( + input_source=input_source, + enable_indexing=enable_indexing, + pipeline_mode=pipeline_mode, + result=response, + ) + + return response + async def execute_chat_pipeline(self, user_input: str) -> Dict[str, Any]: """ Execute the chat-based planning and implementation pipeline. @@ -308,11 +377,15 @@ async def process_input_with_orchestration( input_source, enable_indexing=enable_indexing ) + # Truncate repo_result at source to avoid building giant strings + raw_result = pipeline_result.get("result", "") + truncated_result = self._truncate_result(raw_result) + return { "status": pipeline_result["status"], "analysis_result": "Integrated into agent orchestration pipeline", "download_result": "Integrated into agent orchestration pipeline", - "repo_result": pipeline_result.get("result", ""), + "repo_result": truncated_result, "pipeline_mode": pipeline_result.get("pipeline_mode", "comprehensive"), "error": pipeline_result.get("error"), } diff --git a/requirements.txt b/requirements.txt index d185cd68..5cad500e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,6 +2,7 @@ aiofiles>=0.8.0 aiohttp>=3.8.0 anthropic asyncio-mqtt +click>=8.0.0 docling mcp-agent mcp-server-git