From 580675b291e915e3ed21d94e19e4e53bc7ff444c Mon Sep 17 00:00:00 2001 From: "google-labs-jules[bot]" <161369871+google-labs-jules[bot]@users.noreply.github.com> Date: Sat, 14 Jun 2025 15:27:18 +0000 Subject: [PATCH] feat: Implement interactive agent feedback and confirmation This commit introduces two major interactive features to the DeepResearchAgent: 1. **Confirmation Before Synthesis**: I now pause before generating the final report and await your confirmation via the UI. This allows you to review the process before committing to the final step. 2. **Error Feedback Mechanism**: When I encounter critical errors during task execution (e.g., in `research_execution_node`), I pause and request your feedback. You can choose to: * Retry the failing task. * Skip the current task. * Abort the entire research process. These features enhance your control and allow for better error recovery and guided research. Changes include: - Modifications to `DeepResearchAgent` to manage new states (`awaiting_final_confirmation`, `awaiting_error_feedback`), handle pause/resume logic using LangGraph's checkpointing, and process your feedback. - New graph nodes (`request_final_confirmation_node`, `request_error_feedback_node`, `process_error_feedback_node`) and updated routing logic. - New methods in `DeepResearchAgent` (`provide_final_confirmation`, `provide_error_feedback`) to receive your input. - Updates to `WebuiManager` to manage my lifecycle across paused states and relay UI commands. - UI enhancements in `deep_research_agent_tab.py` to display status, error details, and provide interactive elements (buttons, radio choices) for your confirmation and feedback. - Improved handling of my state and resource cleanup in `DeepResearchAgent` and `WebuiManager`. --- .../deep_research/deep_research_agent.py | 590 +++++++++++++++-- .../components/deep_research_agent_tab.py | 607 ++++++++++++------ src/webui/webui_manager.py | 63 +- 3 files changed, 1022 insertions(+), 238 deletions(-) diff --git a/src/agent/deep_research/deep_research_agent.py b/src/agent/deep_research/deep_research_agent.py index 86be3016..eb32d381 100644 --- a/src/agent/deep_research/deep_research_agent.py +++ b/src/agent/deep_research/deep_research_agent.py @@ -330,11 +330,112 @@ class DeepResearchState(TypedDict): stop_requested: bool error_message: Optional[str] messages: List[BaseMessage] + awaiting_final_confirmation: bool = False + user_confirmed_synthesis: bool = False + awaiting_error_feedback: bool = False + error_details: Optional[str] = None + user_feedback_on_error: Optional[str] = None + current_error_node_origin: Optional[str] = None + _routing_target: Optional[str] = None # Internal temporary field for routing # --- Langgraph Nodes --- +async def request_final_confirmation_node(state: DeepResearchState) -> Dict[str, Any]: + """Node to request final confirmation from the user before synthesis.""" + logger.info("--- Entering Request Final Confirmation Node ---") + # The UI would observe awaiting_final_confirmation and prompt the user. + # The actual message to the user is handled by the UI. + # This node just updates the state to signify it's waiting. + return { + "awaiting_final_confirmation": True, + "messages": state["messages"] + [SystemMessage(content="Requesting final confirmation before proceeding to synthesis.")] + } + + +async def request_error_feedback_node(state: DeepResearchState) -> Dict[str, Any]: + """Node to request error feedback from the user.""" + logger.info(f"--- Entering Request Error Feedback Node ---") + error_details = state.get("error_details", "Unknown error") + origin_node = state.get("current_error_node_origin", "Unknown origin") + logger.info(f"Requesting user feedback for error from '{origin_node}': {error_details}") + return { + "awaiting_error_feedback": True, + "messages": state["messages"] + [SystemMessage(content=f"Agent paused due to error in {origin_node}: {error_details}. Awaiting user feedback (e.g., 'retry', 'skip', 'abort').")] + } + + +async def process_error_feedback_node(state: DeepResearchState) -> Dict[str, Any]: + """Processes user feedback on an error and determines next step.""" + logger.info(f"--- Entering Process Error Feedback Node ---") + feedback = state.get("user_feedback_on_error") + origin_node = state.get("current_error_node_origin") # Should be set + + updated_plan = state["research_plan"] + cat_idx = state["current_category_index"] + task_idx = state["current_task_index_in_category"] + + new_messages = state["messages"] + [SystemMessage(content=f"User provided feedback on error: '{feedback}'.")] + + next_node_target: Optional[str] = None + updates_for_state = { + "user_feedback_on_error": None, # Clear feedback + "awaiting_error_feedback": False, + # Keep error_details and current_error_node_origin for now, clear if successfully retrying/skipping + "_routing_target": None, # Clear any previous routing target + } + + if feedback == "retry": + logger.info(f"Retrying node '{origin_node}' based on user feedback.") + new_messages.append(SystemMessage(content=f"Retrying task in {origin_node} based on user feedback.")) + next_node_target = origin_node + # Clear error details for the retry attempt + updates_for_state["error_details"] = None + updates_for_state["current_error_node_origin"] = None + elif feedback == "skip": + logger.info(f"Skipping current task in '{origin_node}' based on user feedback.") + new_messages.append(SystemMessage(content=f"Skipping current task based on user feedback.")) + if updated_plan and cat_idx < len(updated_plan) and task_idx < len(updated_plan[cat_idx]["tasks"]): + updated_plan[cat_idx]["tasks"][task_idx]["status"] = "skipped_by_user" + updated_plan[cat_idx]["tasks"][task_idx]["result_summary"] = "Task skipped due to error, per user feedback." + _save_plan_to_md(updated_plan, str(state["output_dir"])) + + # Advance task/category index + task_idx += 1 + if task_idx >= len(updated_plan[cat_idx]["tasks"]): + cat_idx += 1 + task_idx = 0 + updates_for_state["current_category_index"] = cat_idx + updates_for_state["current_task_index_in_category"] = task_idx + updates_for_state["research_plan"] = updated_plan + else: + logger.warning("Could not mark task as skipped: Plan/indices out of bounds.") + + next_node_target = origin_node # Let it go through should_continue with updated indices + # Clear error details as we are moving on + updates_for_state["error_details"] = None + updates_for_state["current_error_node_origin"] = None + elif feedback == "abort": + logger.info(f"Aborting research based on user feedback on error.") + new_messages.append(SystemMessage(content=f"Aborting research based on user feedback.")) + updates_for_state["stop_requested"] = True + # Clear all error related fields + updates_for_state["error_details"] = None + updates_for_state["current_error_node_origin"] = None + next_node_target = "end_run" # Explicitly route to end + else: + logger.warning(f"Unknown feedback type: '{feedback}'. Defaulting to re-request feedback.") + new_messages.append(SystemMessage(content=f"Unknown feedback '{feedback}'. Please provide 'retry', 'skip', or 'abort'.")) + updates_for_state["awaiting_error_feedback"] = True # Re-request feedback + updates_for_state["user_feedback_on_error"] = None # Clear invalid feedback + next_node_target = "request_error_feedback" # Go back to request feedback node + + updates_for_state["messages"] = new_messages + updates_for_state["_routing_target"] = next_node_target + return updates_for_state + + def _load_previous_state(task_id: str, output_dir: str) -> Dict[str, Any]: state_updates = {} plan_file = os.path.join(output_dir, PLAN_FILENAME) @@ -799,12 +900,23 @@ async def research_execution_node(state: DeepResearchState) -> Dict[str, Any]: if next_task_idx >= len(current_category["tasks"]): next_cat_idx += 1 next_task_idx = 0 + logger.error(f"Unhandled error during research execution for task '{current_task['task_description']}': {e}", + exc_info=True) + # Preserve existing messages up to the point of error + error_messages = state.get("messages", []) + current_task_message_history + [ + SystemMessage(content=f"Encountered error during task execution: {e}") + ] + # Don't mark task as failed in plan yet, user might retry + # _save_plan_to_md(plan, output_dir) # Plan not changed here return { + "error_details": str(e), + "current_error_node_origin": "execute_research", # This node's name + "awaiting_error_feedback": True, # Signal to pause and request feedback + "messages": error_messages, + # Keep current plan and indices, user might retry current task "research_plan": plan, - "current_category_index": next_cat_idx, - "current_task_index_in_category": next_task_idx, - "error_message": f"Core Execution Error on task '{current_task['task_description']}': {e}", - "messages": state["messages"] + current_task_message_history # Preserve messages up to error + "current_category_index": cat_idx, + "current_task_index_in_category": task_idx, } @@ -940,9 +1052,15 @@ def should_continue(state: DeepResearchState) -> str: if state.get("stop_requested"): logger.info("Stop requested, routing to END.") return "end_run" - if state.get("error_message") and "Core Execution Error" in state["error_message"]: # Critical error in node - logger.warning(f"Critical error detected: {state['error_message']}. Routing to END.") - return "end_run" + if state.get("error_message") and "Core Execution Error" in state["error_message"]: + logger.warning(f"Core execution error detected: {state['error_message']}. Routing to request_error_feedback.") + # This specific error_message is from non-feedback path, but could be consolidated + # For now, let's assume Core Execution Error means we want user feedback + return "request_error_feedback" # Route to get feedback for this specific error type + if state.get("awaiting_error_feedback"): # If a node explicitly set this, go ask for feedback + logger.info("State indicates awaiting_error_feedback. Routing to request_error_feedback_node.") + return "request_error_feedback" + plan = state.get("research_plan") cat_idx = state.get("current_category_index", 0) @@ -972,8 +1090,64 @@ def should_continue(state: DeepResearchState) -> str: return "execute_research" # If we've gone through all categories and tasks (cat_idx >= len(plan)) - logger.info("All plan categories and tasks processed or current indices are out of bounds. Routing to Synthesis.") - return "synthesize_report" + logger.info("All plan categories and tasks processed or current indices are out of bounds.") + if state.get("user_confirmed_synthesis"): + logger.info("User has confirmed. Routing to Synthesis.") + return "synthesize_report" + else: + logger.info("User has not confirmed. Routing to Request Final Confirmation.") + return "request_final_confirmation" + + +def did_user_confirm_synthesis(state: DeepResearchState) -> str: + """Checks if the user has confirmed synthesis.""" + logger.info("--- Evaluating Condition: Did User Confirm Synthesis? ---") + if state.get("user_confirmed_synthesis"): + logger.info("User confirmed synthesis. Routing to 'synthesize_report'.") + return "synthesize_report" + else: + logger.info("User has not confirmed synthesis. Routing to 'wait_for_confirmation' (back to request_final_confirmation node).") + return "wait_for_confirmation" + +def did_user_provide_error_feedback(state: DeepResearchState) -> str: + """Checks if the user has provided feedback on an error.""" + logger.info("--- Evaluating Condition: Did User Provide Error Feedback? ---") + if state.get("user_feedback_on_error"): + logger.info("User provided error feedback. Routing to 'process_error_feedback'.") + return "process_error_feedback" + else: + logger.info("User has not provided error feedback. Routing to 'wait_for_error_feedback' (back to request_error_feedback node).") + return "wait_for_error_feedback" # Loops back to request_error_feedback + +def route_after_error_processing(state: DeepResearchState) -> str: + """Determines the next node after error feedback has been processed.""" + logger.info("--- Evaluating Condition: Route After Error Processing ---") + if state.get("stop_requested"): + logger.info("Stop requested after error feedback. Routing to 'end_run'.") + return "end_run" + + routing_target = state.get("_routing_target") + # Important: Clear the routing target from state after reading it to prevent stale routing. + # This is tricky as the state update from this function isn't immediate. + # Best handled by the node that sets _routing_target also clearing it, or by ensuring process_error_feedback_node clears it before returning. + # For now, we assume process_error_feedback_node handles clearing it from its returned dict. + # However, a direct state modification like `state["_routing_target"] = None` is not possible here as it's a TypedDict. + # The node process_error_feedback_node should return the state *without* _routing_target if it's consumed. + # Let's refine process_error_feedback_node to set _routing_target to None in its return if consumed. + + if routing_target == "execute_research": + logger.info("Routing to 'execute_research' after error processing.") + return "execute_research" + elif routing_target == "request_error_feedback": # e.g. invalid feedback from user + logger.info("Routing back to 'request_error_feedback' after error processing.") + return "request_error_feedback" + elif routing_target == "end_run": # Explicitly told to end by process_error_feedback_node (e.g. abort) + logger.info("Routing to 'end_run' after error processing.") + return "end_run" + else: + # Default fallback or if routing_target is None (should not happen if logic is correct) + logger.warning(f"Unknown or missing routing target: '{routing_target}'. Defaulting to 'end_run'.") + return "end_run" # --- DeepSearchAgent Class --- @@ -1055,30 +1229,67 @@ def _compile_graph(self) -> StateGraph: workflow.add_node("plan_research", planning_node) workflow.add_node("execute_research", research_execution_node) workflow.add_node("synthesize_report", synthesis_node) + workflow.add_node("request_final_confirmation", request_final_confirmation_node) + workflow.add_node("request_error_feedback", request_error_feedback_node) # New error node + workflow.add_node("process_error_feedback", process_error_feedback_node) # New error processing node workflow.add_node( "end_run", lambda state: logger.info("--- Reached End Run Node ---") or {} - ) # Simple end node + ) - # Define edges workflow.set_entry_point("plan_research") + workflow.add_edge("plan_research", "execute_research") - workflow.add_edge( - "plan_research", "execute_research" - ) # Always execute after planning + # Conditional edge from execute_research + # It now needs to check for awaiting_error_feedback first + def route_from_execute_research(state: DeepResearchState) -> str: + if state.get("awaiting_error_feedback"): + return "request_error_feedback" # Error occurred, go to feedback + return should_continue(state) # Normal continuation logic - # Conditional edge after execution workflow.add_conditional_edges( "execute_research", - should_continue, + route_from_execute_research, # Use the new routing function { - "execute_research": "execute_research", # Loop back if more steps - "synthesize_report": "synthesize_report", # Move to synthesis if done - "end_run": "end_run", # End if stop requested or error + "request_error_feedback": "request_error_feedback", # Route to request error feedback + "execute_research": "execute_research", + "synthesize_report": "synthesize_report", + "request_final_confirmation": "request_final_confirmation", + "end_run": "end_run", }, ) - workflow.add_edge("synthesize_report", "end_run") # End after synthesis + # Conditional edge from request_final_confirmation (unchanged from previous) + workflow.add_conditional_edges( + "request_final_confirmation", + did_user_confirm_synthesis, + { + "synthesize_report": "synthesize_report", + "wait_for_confirmation": "request_final_confirmation", + } + ) + + # Edges for error handling flow + workflow.add_conditional_edges( + "request_error_feedback", + did_user_provide_error_feedback, + { + "process_error_feedback": "process_error_feedback", + "wait_for_error_feedback": "request_error_feedback", # Loop back to wait + } + ) + workflow.add_conditional_edges( + "process_error_feedback", + route_after_error_processing, + { + "execute_research": "execute_research", # Retry + "request_error_feedback": "request_error_feedback", # e.g. invalid user input + # If "skip" was chosen, process_error_feedback updates indices and routes to "execute_research", + # which then calls should_continue to determine the true next step (next task or next cat or synth). + "end_run": "end_run", # Abort + } + ) + workflow.add_edge("synthesize_report", "end_run") app = workflow.compile() return app @@ -1141,9 +1352,17 @@ async def run( "browser_config": self.browser_config, "final_report": None, "current_category_index": 0, + "current_category_index": 0, "current_task_index_in_category": 0, "stop_requested": False, - "error_message": None, + "error_message": None, # This is for general, non-feedback errors + "awaiting_final_confirmation": False, + "user_confirmed_synthesis": False, + "awaiting_error_feedback": False, # Init new error fields + "error_details": None, + "user_feedback_on_error": None, + "current_error_node_origin": None, + "_routing_target": None, } if task_id: @@ -1169,12 +1388,35 @@ async def run( status = "unknown" message = None try: - logger.info(f"Invoking graph execution for task {self.current_task_id}...") - self.runner = asyncio.create_task(self.graph.ainvoke(initial_state)) + logger.info(f"Invoking graph execution for task {self.current_task_id} with checkpointing.") + # Use thread_id for checkpointing + config = {"configurable": {"thread_id": self.current_task_id}} + self.runner = asyncio.create_task(self.graph.ainvoke(initial_state, config=config)) final_state = await self.runner - logger.info(f"Graph execution finished for task {self.current_task_id}.") - - # Determine status based on final state + logger.info(f"Graph execution attempt finished for task {self.current_task_id}.") + + if final_state and final_state.get("awaiting_final_confirmation"): + logger.info(f"Task {self.current_task_id} is awaiting final confirmation.") + # Return a specific status indicating pause, do not clean up yet. + return { + "status": "awaiting_confirmation", + "message": "Research process paused, awaiting final confirmation before synthesis.", + "task_id": self.current_task_id, + "current_state": final_state, + } + + if final_state and final_state.get("awaiting_error_feedback"): + logger.info(f"Task {self.current_task_id} is awaiting error feedback.") + return { + "status": "awaiting_error_feedback", + "message": f"Research process paused due to error: {final_state.get('error_details')}. Awaiting user feedback.", + "task_id": self.current_task_id, + "error_details": final_state.get("error_details"), + "current_error_node_origin": final_state.get("current_error_node_origin"), + "current_state": final_state, + } + + # If not awaiting confirmation or error feedback, proceed to determine final status if self.stop_event and self.stop_event.is_set(): status = "stopped" message = "Research process was stopped by request." @@ -1182,46 +1424,60 @@ async def run( elif final_state and final_state.get("error_message"): status = "error" message = final_state["error_message"] - logger.error(f"Graph execution completed with error: {message}") + logger.error(f"Graph execution completed with error for task {self.current_task_id}: {message}") elif final_state and final_state.get("final_report"): status = "completed" message = "Research process completed successfully." logger.info(message) else: - # If it ends without error/report (e.g., empty plan, stopped before synthesis) status = "finished_incomplete" - message = "Research process finished, but may be incomplete (no final report generated)." - logger.warning(message) + message = "Research process finished, but may be incomplete or in an unexpected state." + logger.warning(f"{message} for task {self.current_task_id}. Final state: {final_state}") except asyncio.CancelledError: status = "cancelled" message = f"Agent run task cancelled for {self.current_task_id}." logger.info(message) - # final_state will remain None or the state before cancellation if checkpointing was used + final_state = None # No meaningful final state on cancellation except Exception as e: status = "error" message = f"Unhandled error during graph execution for {self.current_task_id}: {e}" logger.error(message, exc_info=True) - # final_state will remain None or the state before the error + final_state = None # No meaningful final state on such error finally: - logger.info(f"Cleaning up resources for task {self.current_task_id}") - task_id_to_clean = self.current_task_id + # Cleanup only if the task is not paused (either for confirmation or error feedback) + is_paused_for_confirmation = final_state and final_state.get("awaiting_final_confirmation") + is_paused_for_error = final_state and final_state.get("awaiting_error_feedback") + + if not (is_paused_for_confirmation or is_paused_for_error): + logger.info(f"Performing final cleanup for task {self.current_task_id}") + task_id_to_clean = self.current_task_id # Capture before clearing + if task_id_to_clean and task_id_to_clean in _AGENT_STOP_FLAGS: # Check if task_id_to_clean is not None + del _AGENT_STOP_FLAGS[task_id_to_clean] + + self.stop_event = None + self.current_task_id = None + self.runner = None + if self.mcp_client: + await self.mcp_client.__aexit__(None, None, None) + self.mcp_client = None + else: + if is_paused_for_confirmation: + logger.info(f"Task {self.current_task_id} is paused for final confirmation. Skipping full cleanup.") + if is_paused_for_error: + logger.info(f"Task {self.current_task_id} is paused for error feedback. Skipping full cleanup.") - self.stop_event = None - self.current_task_id = None - self.runner = None # Mark runner as finished - if self.mcp_client: - await self.mcp_client.__aexit__(None, None, None) + # This return is for terminal states (completed, stopped, error, etc.) or if somehow final_state was None + # If paused, the return happened inside the try block. + # Need to ensure task_id_to_clean is defined if self.current_task_id was cleared. + task_id_for_return = self.current_task_id if self.current_task_id else (task_id_to_clean if 'task_id_to_clean' in locals() else None) - # Return a result dictionary including the status and the final state if available - return { - "status": status, - "message": message, - "task_id": task_id_to_clean, # Use the stored task_id - "final_state": final_state - if final_state - else {}, # Return the final state dict - } + return { + "status": status, + "message": message, + "task_id": task_id_for_return, + "final_state": final_state if final_state else {}, + } async def _stop_lingering_browsers(self, task_id): """Attempts to stop any BrowserUseAgent instances associated with the task_id.""" @@ -1259,3 +1515,241 @@ async def stop(self): def close(self): self.stopped = False + + async def provide_final_confirmation(self): + """ + External method to signal that the user has confirmed synthesis and resume graph execution. + """ + if not self.current_task_id or not self.graph: + logger.warning("Cannot provide final confirmation: No active task_id or graph.") + return {"status": "error", "message": "No active task or graph to confirm."} + + if self.stop_event and self.stop_event.is_set(): + logger.warning(f"Stop event is set for task {self.current_task_id}. Cannot provide confirmation.") + # Perform cleanup as if it was stopped. + # This logic might be redundant if stop() is comprehensive. + if self.current_task_id in _AGENT_STOP_FLAGS: + del _AGENT_STOP_FLAGS[self.current_task_id] + self.stop_event = None + cleared_task_id = self.current_task_id + self.current_task_id = None + self.runner = None + return { + "status": "stopped", + "message": "Task was stopped before confirmation could be processed.", + "task_id": cleared_task_id, + } + + logger.info(f"User confirmation received for task {self.current_task_id}. Attempting to resume.") + config = {"configurable": {"thread_id": self.current_task_id}} + + try: + # Fetch current state to correctly update messages + # Note: get_state might return the full state including config, need to access actual values. + # LangGraph's get_state can be complex; assuming it returns a snapshot we can work with. + # If a Checkpointer is configured, get_state on the graph object itself is the right way. + graph_snapshot = self.graph.get_state(config) # Get current state using thread_id + current_messages = graph_snapshot.values.get("messages", []) if graph_snapshot else [] + + updated_messages = current_messages + [SystemMessage(content="User confirmed. Proceeding to synthesis.")] + + logger.info(f"Updating state for task {self.current_task_id} with user confirmation.") + await self.graph.update_state( + config, # Pass the config with thread_id + { + "user_confirmed_synthesis": True, + "awaiting_final_confirmation": False, + "messages": updated_messages, + "stop_requested": False # Ensure stop_requested is False if we are resuming + } + ) + logger.info(f"State updated. Re-invoking graph for task {self.current_task_id} to continue.") + # Re-invoke the graph. Pass None as input to continue from the checkpoint. + continued_state = await self.graph.ainvoke(None, config=config) + logger.info(f"Graph re-invocation finished for task {self.current_task_id}.") + + # Process the continued_state to determine the final outcome + status = "unknown" + message = "" + if self.stop_event and self.stop_event.is_set(): # Check if stop was called during re-invocation + status = "stopped" + message = f"Research process for task {self.current_task_id} was stopped during continuation." + elif continued_state and continued_state.get("error_message"): + status = "error" + message = continued_state["error_message"] + elif continued_state and continued_state.get("final_report"): + status = "completed" + message = "Research process completed successfully after confirmation." + elif continued_state and continued_state.get("awaiting_final_confirmation"): + status = "awaiting_confirmation" # It might have gone to error, then retry, then finished tasks and now waits for final confirm + message = f"Task {self.current_task_id} is now awaiting final confirmation after previous interaction." + logger.info(message) + # Do not clean up fully if it's now paused for confirmation + return { + "status": status, + "message": message, + "task_id": self.current_task_id, + "current_state": continued_state, + } + elif continued_state and continued_state.get("awaiting_error_feedback"): + status = "awaiting_error_feedback" # Retry might have failed again + message = f"Task {self.current_task_id} is again awaiting error feedback: {continued_state.get('error_details')}." + logger.info(message) + # Do not clean up fully if it's now paused for error + return { + "status": status, + "message": message, + "task_id": self.current_task_id, + "error_details": continued_state.get("error_details"), + "current_error_node_origin": continued_state.get("current_error_node_origin"), + "current_state": continued_state, + } + else: # Truly terminal states + status = "finished_incomplete" + message = f"Processing continued for task {self.current_task_id} but did not produce a final report as expected." + logger.warning(f"{message} - Continued state: {continued_state}") + + # Final cleanup only for truly terminal states from this method + logger.info(f"Performing final cleanup for task {self.current_task_id} after provide_final_confirmation resolution.") + task_id_to_clean = self.current_task_id # Capture before clearing + if task_id_to_clean and task_id_to_clean in _AGENT_STOP_FLAGS: + del _AGENT_STOP_FLAGS[task_id_to_clean] + self.stop_event = None + self.current_task_id = None + self.runner = None + if self.mcp_client: + await self.mcp_client.__aexit__(None, None, None) + self.mcp_client = None + + return { + "status": status, + "message": message, + "task_id": task_id_to_clean, + "final_state": continued_state if continued_state else {} + } + + except Exception as e: + logger.error(f"Error during final confirmation processing for task {self.current_task_id}: {e}", exc_info=True) + task_id_on_error = self.current_task_id # Capture before potentially clearing + # Attempt cleanup even on error + if task_id_on_error and task_id_on_error in _AGENT_STOP_FLAGS: + del _AGENT_STOP_FLAGS[task_id_on_error] + self.stop_event = None + self.current_task_id = None + self.runner = None + if self.mcp_client: # Ensure mcp_client is cleaned up on exception too + await self.mcp_client.__aexit__(None, None, None) + self.mcp_client = None + return {"status": "error", "message": f"Failed to process confirmation and continue: {e}", "task_id": task_id_on_error} + + + async def provide_error_feedback(self, feedback: str) -> Dict[str, Any]: + """ + External method for user to provide feedback on an error and resume graph execution. + """ + if not self.current_task_id or not self.graph: + logger.warning("Cannot provide error feedback: No active task_id or graph.") + return {"status": "error", "message": "No active task or graph for error feedback."} + + if self.stop_event and self.stop_event.is_set(): + logger.warning(f"Stop event is set for task {self.current_task_id}. Cannot provide error feedback.") + task_id_on_stop = self.current_task_id + # Perform cleanup as if it was stopped. + if task_id_on_stop in _AGENT_STOP_FLAGS: del _AGENT_STOP_FLAGS[task_id_on_stop] + self.stop_event = None + self.current_task_id = None + self.runner = None + return { + "status": "stopped", + "message": "Task was stopped before error feedback could be processed.", + "task_id": task_id_on_stop, + } + + logger.info(f"User error feedback '{feedback}' received for task {self.current_task_id}. Attempting to resume.") + config = {"configurable": {"thread_id": self.current_task_id}} + + try: + graph_snapshot = self.graph.get_state(config) + current_messages = graph_snapshot.values.get("messages", []) if graph_snapshot else [] + updated_messages = current_messages + [SystemMessage(content=f"User provided error feedback: '{feedback}'.")] + + logger.info(f"Updating state for task {self.current_task_id} with error feedback.") + await self.graph.update_state( + config, + { + "user_feedback_on_error": feedback, + "awaiting_error_feedback": False, # We are processing it now + "messages": updated_messages, + "stop_requested": False + } + ) + logger.info(f"State updated. Re-invoking graph for task {self.current_task_id} to process feedback.") + continued_state = await self.graph.ainvoke(None, config=config) # Input is None to resume + logger.info(f"Graph re-invocation after error feedback finished for task {self.current_task_id}.") + + status = "unknown" + message = "" + + if self.stop_event and self.stop_event.is_set(): + status = "stopped" + message = f"Research process for task {self.current_task_id} was stopped during error feedback processing." + elif continued_state and continued_state.get("awaiting_error_feedback"): + status = "awaiting_error_feedback" # e.g. retry failed, or feedback was invalid. + message = f"Task {self.current_task_id} is again awaiting error feedback: {continued_state.get('error_details')}." + logger.info(message) + # Do not clean up fully if it's paused again + return { + "status": status, "message": message, "task_id": self.current_task_id, + "error_details": continued_state.get("error_details"), + "current_error_node_origin": continued_state.get("current_error_node_origin"), + "current_state": continued_state, + } + elif continued_state and continued_state.get("awaiting_final_confirmation"): + status = "awaiting_confirmation" # e.g. skip error, finished all tasks, now needs final confirmation + message = f"Task {self.current_task_id} is now awaiting final confirmation after error feedback processing." + logger.info(message) + # Do not clean up fully if it's paused for confirmation + return { + "status": status, "message": message, "task_id": self.current_task_id, + "current_state": continued_state, + } + elif continued_state and continued_state.get("error_message"): # General error from a node after resuming + status = "error" + message = continued_state["error_message"] + elif continued_state and continued_state.get("final_report"): + status = "completed" + message = "Research process completed successfully after error feedback." + else: + status = "finished_incomplete" + message = f"Processing continued for task {self.current_task_id} after error feedback, but result is indeterminate." + logger.warning(f"{message} - Continued state: {continued_state}") + + # Final cleanup for terminal states from this method + logger.info(f"Performing final cleanup for task {self.current_task_id} after provide_error_feedback resolution.") + task_id_to_clean = self.current_task_id + if task_id_to_clean and task_id_to_clean in _AGENT_STOP_FLAGS: + del _AGENT_STOP_FLAGS[task_id_to_clean] + self.stop_event = None + self.current_task_id = None + self.runner = None + if self.mcp_client: + await self.mcp_client.__aexit__(None, None, None) + self.mcp_client = None + + return { + "status": status, "message": message, "task_id": task_id_to_clean, + "final_state": continued_state if continued_state else {} + } + + except Exception as e: + logger.error(f"Error during error feedback processing for task {self.current_task_id}: {e}", exc_info=True) + task_id_on_error = self.current_task_id + if task_id_on_error and task_id_on_error in _AGENT_STOP_FLAGS: + del _AGENT_STOP_FLAGS[task_id_on_error] + self.stop_event = None + self.current_task_id = None + self.runner = None + if self.mcp_client: + await self.mcp_client.__aexit__(None, None, None) + self.mcp_client = None + return {"status": "error", "message": f"Failed to process error feedback and continue: {e}", "task_id": task_id_on_error} diff --git a/src/webui/components/deep_research_agent_tab.py b/src/webui/components/deep_research_agent_tab.py index 88faea09..43273af4 100644 --- a/src/webui/components/deep_research_agent_tab.py +++ b/src/webui/components/deep_research_agent_tab.py @@ -70,6 +70,16 @@ async def run_deep_research(webui_manager: WebuiManager, components: Dict[Compon markdown_download_comp = webui_manager.get_component_by_id("deep_research_agent.markdown_download") mcp_server_config_comp = webui_manager.get_component_by_id("deep_research_agent.mcp_server_config") + # New UI components for interaction + status_display_comp = webui_manager.get_component_by_id("deep_research_agent.status_display") + confirmation_group_comp = webui_manager.get_component_by_id("deep_research_agent.confirmation_group") + # confirm_synthesis_button_comp = webui_manager.get_component_by_id("deep_research_agent.confirm_synthesis_button") # Already part of all_managed_inputs if named correctly + error_feedback_group_comp = webui_manager.get_component_by_id("deep_research_agent.error_feedback_group") + error_details_display_comp = webui_manager.get_component_by_id("deep_research_agent.error_details_display") + # error_feedback_choice_comp = webui_manager.get_component_by_id("deep_research_agent.error_feedback_choice") # Already part of all_managed_inputs + # submit_error_feedback_button_comp = webui_manager.get_component_by_id("deep_research_agent.submit_error_feedback_button") # Already part of all_managed_inputs + + # --- 1. Get Task and Settings --- task_topic = components.get(research_task_comp, "").strip() task_id_to_resume = components.get(resume_task_id_comp, "").strip() or None @@ -101,11 +111,15 @@ async def run_deep_research(webui_manager: WebuiManager, components: Dict[Compon resume_task_id_comp: gr.update(interactive=False), parallel_num_comp: gr.update(interactive=False), save_dir_comp: gr.update(interactive=False), - markdown_display_comp: gr.update(value="Starting research..."), - markdown_download_comp: gr.update(value=None, interactive=False) + markdown_display_comp: gr.update(value=""), # Clear previous report + markdown_download_comp: gr.update(value=None, interactive=False), + status_display_comp: gr.update(value="Starting research...", visible=True), + confirmation_group_comp: gr.update(visible=False), + error_feedback_group_comp: gr.update(visible=False), } - agent_task = None + # agent_task renamed to agent_run_task_async for clarity + agent_run_task_async = None running_task_id = None plan_file_path = None report_file_path = None @@ -154,205 +168,276 @@ def get_setting(tab: str, key: str, default: Any = None): mcp_server_config=mcp_config ) logger.info("DeepResearchAgent initialized.") + # Persist the agent instance in webui_manager if it's a new run or resuming an existing one + # This is now implicitly handled by webui_manager.dr_agent not being cleared if paused. # --- 5. Start Agent Run --- + # The agent instance (webui_manager.dr_agent) is now managed across calls if paused. + # Task ID for resume is handled by the agent itself if passed in initial_state. + # We use webui_manager.dr_task_id to track the *current* or *resumed* task. + + current_agent_task_id = task_id_to_resume if task_id_to_resume else None + + # Store agent and task_id in webui_manager so other handlers can access it + webui_manager.dr_task_id = current_agent_task_id # May be updated by agent's run result + agent_run_coro = webui_manager.dr_agent.run( - topic=task_topic, - task_id=task_id_to_resume, + topic=task_topic, # Topic might be ignored if resuming with a plan + task_id=current_agent_task_id, save_dir=base_save_dir, max_parallel_browsers=max_parallel_agents ) - agent_task = asyncio.create_task(agent_run_coro) - webui_manager.dr_current_task = agent_task - - # Wait briefly for the agent to start and potentially create the task ID/folder - await asyncio.sleep(1.0) - - # Determine the actual task ID being used (agent sets this) - running_task_id = webui_manager.dr_agent.current_task_id - if not running_task_id: - # Agent might not have set it yet, try to get from result later? Risky. - # Or derive from resume_task_id if provided? - running_task_id = task_id_to_resume - if not running_task_id: - logger.warning("Could not determine running task ID immediately.") - # We can still monitor, but might miss initial plan if ID needed for path - else: - logger.info(f"Assuming task ID based on resume ID: {running_task_id}") - else: - logger.info(f"Agent started with Task ID: {running_task_id}") - - webui_manager.dr_task_id = running_task_id # Store for stop handler - - # --- 6. Monitor Progress via research_plan.md --- - if running_task_id: - task_specific_dir = os.path.join(base_save_dir, str(running_task_id)) - plan_file_path = os.path.join(task_specific_dir, "research_plan.md") - report_file_path = os.path.join(task_specific_dir, "report.md") - logger.info(f"Monitoring plan file: {plan_file_path}") - else: - logger.warning("Cannot monitor plan file: Task ID unknown.") - plan_file_path = None - last_plan_content = None - while not agent_task.done(): - update_dict = {} - update_dict[resume_task_id_comp] = gr.update(value=running_task_id) - agent_stopped = getattr(webui_manager.dr_agent, 'stopped', False) - if agent_stopped: - logger.info("Stop signal detected from agent state.") - break # Exit monitoring loop - - # Check and update research plan display - if plan_file_path: - try: - current_mtime = os.path.getmtime(plan_file_path) if os.path.exists(plan_file_path) else 0 - if current_mtime > last_plan_mtime: - logger.info(f"Detected change in {plan_file_path}") - plan_content = _read_file_safe(plan_file_path) - if last_plan_content is None or ( - plan_content is not None and plan_content != last_plan_content): - update_dict[markdown_display_comp] = gr.update(value=plan_content) - last_plan_content = plan_content - last_plan_mtime = current_mtime - elif plan_content is None: - # File might have been deleted or became unreadable - last_plan_mtime = 0 # Reset to force re-read attempt later - except Exception as e: - logger.warning(f"Error checking/reading plan file {plan_file_path}: {e}") - # Avoid continuous logging for the same error - await asyncio.sleep(2.0) - - # Yield updates if any - if update_dict: - yield update_dict - - await asyncio.sleep(1.0) # Check file changes every second - - # --- 7. Task Finalization --- - logger.info("Agent task processing finished. Awaiting final result...") - final_result_dict = await agent_task # Get result or raise exception - logger.info(f"Agent run completed. Result keys: {final_result_dict.keys() if final_result_dict else 'None'}") - - # Try to get task ID from result if not known before - if not running_task_id and final_result_dict and 'task_id' in final_result_dict: - running_task_id = final_result_dict['task_id'] - webui_manager.dr_task_id = running_task_id - task_specific_dir = os.path.join(base_save_dir, str(running_task_id)) - report_file_path = os.path.join(task_specific_dir, "report.md") - logger.info(f"Task ID confirmed from result: {running_task_id}") - - final_ui_update = {} - if report_file_path and os.path.exists(report_file_path): - logger.info(f"Loading final report from: {report_file_path}") - report_content = _read_file_safe(report_file_path) - if report_content: - final_ui_update[markdown_display_comp] = gr.update(value=report_content) - final_ui_update[markdown_download_comp] = gr.File(value=report_file_path, - label=f"Report ({running_task_id}.md)", - interactive=True) - else: - final_ui_update[markdown_display_comp] = gr.update( - value="# Research Complete\n\n*Error reading final report file.*") - elif final_result_dict and 'report' in final_result_dict: - logger.info("Using report content directly from agent result.") - # If agent directly returns report content - final_ui_update[markdown_display_comp] = gr.update(value=final_result_dict['report']) - # Cannot offer download if only content is available - final_ui_update[markdown_download_comp] = gr.update(value=None, label="Download Research Report", - interactive=False) - else: - logger.warning("Final report file not found and not in result dict.") - final_ui_update[markdown_display_comp] = gr.update(value="# Research Complete\n\n*Final report not found.*") - - yield final_ui_update + agent_run_task_async = asyncio.create_task(agent_run_coro) + webui_manager.dr_current_task_async = agent_run_task_async + + # Initial running_task_id, might be updated by the agent's run method's result + running_task_id = webui_manager.dr_agent.current_task_id # Agent sets this internally on run + if not running_task_id and current_agent_task_id: # If resuming, agent.current_task_id might not be set until run + running_task_id = current_agent_task_id + + webui_manager.dr_task_id = running_task_id # Update manager with potentially new task_id from a fresh run + + # --- 6. Monitor Progress & Handle Intermediate States --- + # This loop replaces the old file monitoring. It now processes agent's dictionary results. + last_plan_content = "" # To avoid re-rendering identical plan + + while not agent_run_task_async.done(): + # Try to get intermediate results or updates if agent supports streaming them. + # For now, we'll primarily rely on the final_result_dict after the task is done, + # but this structure allows for future enhancements if agent.run() becomes a generator. + # The main loop will break when agent_run_task_async is done. + + # Update plan display if running_task_id is known + if running_task_id: # Ensure running_task_id is set (agent sets this) + task_specific_dir = os.path.join(base_save_dir, str(running_task_id)) + plan_file_path = os.path.join(task_specific_dir, "research_plan.md") + if os.path.exists(plan_file_path): + try: + current_plan_content = _read_file_safe(plan_file_path) + if current_plan_content and current_plan_content != last_plan_content: + yield {markdown_display_comp: gr.update(value=current_plan_content)} + last_plan_content = current_plan_content + except Exception as e: + logger.warning(f"Could not read plan file for update: {e}") + + agent_stopped_flag = getattr(webui_manager.dr_agent, 'stopped', False) + if agent_stopped_flag: + logger.info("Stop signal detected from agent state during run.") + break # Exit monitoring loop, let finalization handle result. + + yield {resume_task_id_comp: gr.update(value=running_task_id if running_task_id else "")} + await asyncio.sleep(1.0) + + logger.info("Agent task processing finished. Awaiting final result from agent.run()...") + final_result_dict = await agent_run_task_async + webui_manager.dr_current_task_async = None # Clear finished asyncio task + + running_task_id = final_result_dict.get("task_id", running_task_id) # Update task_id from final result + webui_manager.dr_task_id = running_task_id # Ensure manager has the definitive task_id + + logger.info(f"Agent run completed. Status: {final_result_dict.get('status')}, Message: {final_result_dict.get('message')}") + + ui_updates = { + stop_button_comp: gr.update(interactive=False), + # Default to re-enabling inputs, specific states below will override + start_button_comp: gr.update(value="▶️ Run", interactive=True), + research_task_comp: gr.update(interactive=True), + resume_task_id_comp: gr.update(interactive=True, value=running_task_id), # Show final task ID + parallel_num_comp: gr.update(interactive=True), + save_dir_comp: gr.update(interactive=True), + } + status = final_result_dict.get("status") + + if status == "awaiting_confirmation": + ui_updates[status_display_comp] = gr.update(value=final_result_dict.get("message", "Awaiting final confirmation from user."), visible=True) + ui_updates[confirmation_group_comp] = gr.update(visible=True) + ui_updates[error_feedback_group_comp] = gr.update(visible=False) + ui_updates[start_button_comp] = gr.update(interactive=False) # Keep run disabled + ui_updates[research_task_comp] = gr.update(interactive=False) + ui_updates[resume_task_id_comp] = gr.update(interactive=False) + # webui_manager.dr_agent should NOT be cleared here by run_deep_research + elif status == "awaiting_error_feedback": + ui_updates[status_display_comp] = gr.update(value=final_result_dict.get("message", "Paused due to an error."), visible=True) + ui_updates[error_details_display_comp] = gr.update(value=f"**Error in {final_result_dict.get('current_error_node_origin', 'N/A')}:**\n\n```\n{final_result_dict.get('error_details', 'No details provided.')}\n```") + ui_updates[error_feedback_group_comp] = gr.update(visible=True) + ui_updates[confirmation_group_comp] = gr.update(visible=False) + ui_updates[start_button_comp] = gr.update(interactive=False) # Keep run disabled + ui_updates[research_task_comp] = gr.update(interactive=False) + ui_updates[resume_task_id_comp] = gr.update(interactive=False) + # webui_manager.dr_agent should NOT be cleared here + else: # Terminal states: "completed", "stopped", "error", "finished_incomplete" + ui_updates[status_display_comp] = gr.update(value=f"Status: {status}. {final_result_dict.get('message', '')}", visible=True) + ui_updates[confirmation_group_comp] = gr.update(visible=False) + ui_updates[error_feedback_group_comp] = gr.update(visible=False) + + if running_task_id: # Try to load report for terminal states + task_specific_dir = os.path.join(base_save_dir, str(running_task_id)) + report_file_path = os.path.join(task_specific_dir, "report.md") + if os.path.exists(report_file_path): + report_content = _read_file_safe(report_file_path) + ui_updates[markdown_display_comp] = gr.update(value=report_content if report_content else "Report file found but empty.") + ui_updates[markdown_download_comp] = gr.File(value=report_file_path, label=f"Report ({running_task_id}.md)", interactive=True) + elif final_result_dict.get("final_report"): # If report is in dict (e.g. error before save) + ui_updates[markdown_display_comp] = gr.update(value=final_result_dict["final_report"]) + ui_updates[markdown_download_comp] = gr.update(interactive=False, value=None) + else: + ui_updates[markdown_display_comp] = gr.update(value=f"# Research Ended\n\nStatus: {status}\nMessage: {final_result_dict.get('message', 'No further details.')}") + ui_updates[markdown_download_comp] = gr.update(interactive=False, value=None) + else: # No running_task_id, show message from dict + ui_updates[markdown_display_comp] = gr.update(value=f"# Research Ended\n\nStatus: {status}\nMessage: {final_result_dict.get('message', 'No task ID was available to load report.')}") + + # Clear agent from manager ONLY if it's a terminal state from the initial run + webui_manager.dr_agent = None + webui_manager.dr_task_id = None + logger.info(f"Agent run concluded with terminal status '{status}'. Cleared active agent from WebuiManager.") + + yield ui_updates except Exception as e: - logger.error(f"Error during Deep Research Agent execution: {e}", exc_info=True) + logger.error(f"Error in run_deep_research: {e}", exc_info=True) gr.Error(f"Research failed: {e}") - yield {markdown_display_comp: gr.update(value=f"# Research Failed\n\n**Error:**\n```\n{e}\n```")} - - finally: - # --- 8. Final UI Reset --- - webui_manager.dr_current_task = None # Clear task reference - webui_manager.dr_task_id = None # Clear running task ID - yield { - start_button_comp: gr.update(value="▶️ Run", interactive=True), + markdown_display_comp: gr.update(value=f"# Research Failed\n\n**Error:**\n```\n{e}\n```"), + status_display_comp: gr.update(value=f"Critical error: {e}", visible=True), + start_button_comp: gr.update(interactive=True), # Re-enable on critical failure stop_button_comp: gr.update(interactive=False), - research_task_comp: gr.update(interactive=True), - resume_task_id_comp: gr.update(value="", interactive=True), - parallel_num_comp: gr.update(interactive=True), - save_dir_comp: gr.update(interactive=True), - # Keep download button enabled if file exists - markdown_download_comp: gr.update() if report_file_path and os.path.exists(report_file_path) else gr.update( - interactive=False) + confirmation_group_comp: gr.update(visible=False), + error_feedback_group_comp: gr.update(visible=False), } + # Ensure agent is cleared on such an error + webui_manager.dr_agent = None + webui_manager.dr_task_id = None + webui_manager.dr_current_task_async = None + finally: + # This finally block now only handles UI elements that need resetting if not handled by status logic + # The agent instance (webui_manager.dr_agent) is managed based on returned status. + # dr_current_task_async should be None if task finished or errored out of the main try. + if webui_manager.dr_current_task_async and not webui_manager.dr_current_task_async.done(): + logger.info("run_deep_research finally block: Task was still running, implies an issue if not paused.") + + # If not paused, ensure buttons are reset. If paused, they are managed by the status block. + current_status_is_paused = False + if 'status' in locals() and status in ["awaiting_confirmation", "awaiting_error_feedback"]: + current_status_is_paused = True + + if not current_status_is_paused: + final_reset_ui = { + start_button_comp: gr.update(value="▶️ Run", interactive=True), + stop_button_comp: gr.update(interactive=False), + research_task_comp: gr.update(interactive=True), + resume_task_id_comp: gr.update(interactive=True), # Value already set by status block or remains empty + parallel_num_comp: gr.update(interactive=True), + save_dir_comp: gr.update(interactive=True), + } + # Only yield if there's something to update and not already covered by a specific status yield + # This avoids a redundant yield if the last status yield already handled button states. + if status not in ["awaiting_confirmation", "awaiting_error_feedback"]: + yield final_reset_ui + # Ensure dr_current_task_async is cleared if it's done or was never set properly + webui_manager.dr_current_task_async = None async def stop_deep_research(webui_manager: WebuiManager) -> Dict[Component, Any]: + """Handles the Stop button click.""" + logger.info("Stop button clicked for Deep Research.") """Handles the Stop button click.""" logger.info("Stop button clicked for Deep Research.") agent = webui_manager.dr_agent - task = webui_manager.dr_current_task + async_task = webui_manager.dr_current_task_async # Changed from dr_current_task task_id = webui_manager.dr_task_id - base_save_dir = webui_manager.dr_save_dir + base_save_dir = webui_manager.dr_save_dir # Used for report loading + # Get components for UI updates stop_button_comp = webui_manager.get_component_by_id("deep_research_agent.stop_button") start_button_comp = webui_manager.get_component_by_id("deep_research_agent.start_button") markdown_display_comp = webui_manager.get_component_by_id("deep_research_agent.markdown_display") markdown_download_comp = webui_manager.get_component_by_id("deep_research_agent.markdown_download") - - final_update = { - stop_button_comp: gr.update(interactive=False, value="⏹️ Stopping...") + status_display_comp = webui_manager.get_component_by_id("deep_research_agent.status_display") + confirmation_group_comp = webui_manager.get_component_by_id("deep_research_agent.confirmation_group") + error_feedback_group_comp = webui_manager.get_component_by_id("deep_research_agent.error_feedback_group") + + ui_updates = { + stop_button_comp: gr.update(interactive=False, value="⏹️ Stopping..."), + start_button_comp: gr.update(interactive=False), # Keep start disabled until fully stopped + confirmation_group_comp: gr.update(visible=False), # Hide interactive groups + error_feedback_group_comp: gr.update(visible=False), + status_display_comp: gr.update(value="Stopping research...", visible=True) } - if agent and task and not task.done(): - logger.info("Signalling DeepResearchAgent to stop.") + if agent: # Agent might exist even if task is None (e.g. if run failed before task creation) + logger.info(f"Signalling DeepResearchAgent (task_id: {task_id}) to stop.") try: - # Assuming stop is synchronous or sets a flag quickly - await agent.stop() + await agent.stop() # This should set the agent's internal stop_event except Exception as e: - logger.error(f"Error calling agent.stop(): {e}") - - # The run_deep_research loop should detect the stop and exit. - # We yield an intermediate "Stopping..." state. The final reset is done by run_deep_research. - - # Try to show the final report if available after stopping - await asyncio.sleep(1.5) # Give agent a moment to write final files potentially - report_file_path = None - if task_id and base_save_dir: - report_file_path = os.path.join(base_save_dir, str(task_id), "report.md") - - if report_file_path and os.path.exists(report_file_path): - report_content = _read_file_safe(report_file_path) - if report_content: - final_update[markdown_display_comp] = gr.update( - value=report_content + "\n\n---\n*Research stopped by user.*") - final_update[markdown_download_comp] = gr.File(value=report_file_path, label=f"Report ({task_id}.md)", - interactive=True) + logger.error(f"Error calling agent.stop(): {e}", exc_info=True) + gr.Warning(f"Error trying to stop the agent: {e}") + else: + logger.warning("Stop clicked but no active DeepResearchAgent instance found.") + # If no agent, just reset UI + ui_updates[start_button_comp] = gr.update(value="▶️ Run", interactive=True) + ui_updates[stop_button_comp] = gr.update(interactive=False) + ui_updates[status_display_comp] = gr.update(value="No active research to stop.", visible=True) + # Clean up manager state if no agent was found, ensuring clean state + webui_manager.dr_agent = None + webui_manager.dr_task_id = None + webui_manager.dr_current_task_async = None + return ui_updates # Early exit if no agent + + # If there was an asyncio task for the agent's run method + if async_task and not async_task.done(): + logger.info("Waiting for agent's run task to complete after signalling stop...") + try: + # Wait for a short period for the task to acknowledge stop and finish + # The agent's run method's finally block should handle its cleanup. + await asyncio.wait_for(async_task, timeout=10.0) # Give it time to process stop + except asyncio.TimeoutError: + logger.warning("Timeout waiting for agent task to finish after stop. It might be stuck.") + ui_updates[status_display_comp] = gr.update(value="Agent stop timed out. Task might be stuck.", visible=True) + # Attempt to cancel the task if it's truly stuck + async_task.cancel() + except Exception as e: # Other errors during await + logger.error(f"Error while waiting for agent task to finish: {e}", exc_info=True) + ui_updates[status_display_comp] = gr.update(value=f"Error waiting for agent task: {e}", visible=True) + + # Agent's run method's finally block should have executed and returned a result. + # The result processing (including UI updates for report) is primarily in run_deep_research's main flow. + # Here, we ensure UI is reset to a stoppable state. + + # Try to load report, as agent might have finished one upon stopping + report_content_on_stop = "## Research Stopped by User\n\n" + report_file_path = None + if task_id and base_save_dir: + task_specific_dir = os.path.join(base_save_dir, str(task_id)) + report_file_path = os.path.join(task_specific_dir, "report.md") + if os.path.exists(report_file_path): + content = _read_file_safe(report_file_path) + if content: + report_content_on_stop += content + ui_updates[markdown_download_comp] = gr.File(value=report_file_path, label=f"Report ({task_id}.md)", interactive=True) else: - final_update[markdown_display_comp] = gr.update( - value="# Research Stopped\n\n*Error reading final report file after stop.*") + report_content_on_stop += "*Final report file was empty or unreadable.*" else: - final_update[markdown_display_comp] = gr.update(value="# Research Stopped by User") + report_content_on_stop += "*Final report file not found.*" + else: + report_content_on_stop += "*Task ID or save directory not available to load report.*" - # Keep start button disabled, run_deep_research finally block will re-enable it. - final_update[start_button_comp] = gr.update(interactive=False) + ui_updates[markdown_display_comp] = gr.update(value=report_content_on_stop) + ui_updates[status_display_comp] = gr.update(value="Research stopped.", visible=True) + ui_updates[start_button_comp] = gr.update(value="▶️ Run", interactive=True) # Re-enable run + ui_updates[stop_button_comp] = gr.update(value="⏹️ Stop", interactive=False) # Disable stop + ui_updates[research_task_comp] = gr.update(interactive=True) + ui_updates[resume_task_id_comp] = gr.update(interactive=True) # Allow new task or resume + ui_updates[parallel_num_comp] = gr.update(interactive=True) + ui_updates[save_dir_comp] = gr.update(interactive=True) - else: - logger.warning("Stop clicked but no active research task found.") - # Reset UI state just in case - final_update = { - start_button_comp: gr.update(interactive=True), - stop_button_comp: gr.update(interactive=False), - webui_manager.get_component_by_id("deep_research_agent.research_task"): gr.update(interactive=True), - webui_manager.get_component_by_id("deep_research_agent.resume_task_id"): gr.update(interactive=True), - webui_manager.get_component_by_id("deep_research_agent.max_iteration"): gr.update(interactive=True), - webui_manager.get_component_by_id("deep_research_agent.max_query"): gr.update(interactive=True), - } + # Crucially, clear the agent from WebuiManager as the task is now considered terminal. + webui_manager.dr_agent = None + webui_manager.dr_task_id = None + webui_manager.dr_current_task_async = None + logger.info("DeepResearchAgent and its task have been stopped and cleared from WebuiManager.") - return final_update + return ui_updates async def update_mcp_server(mcp_file: str, webui_manager: WebuiManager): @@ -407,7 +492,7 @@ def create_deep_research_agent_tab(webui_manager: WebuiManager): dict( research_task=research_task, parallel_num=parallel_num, - max_query=max_query, + max_query=max_query, # This is actually save_dir start_button=start_button, stop_button=stop_button, markdown_display=markdown_display, @@ -417,41 +502,189 @@ def create_deep_research_agent_tab(webui_manager: WebuiManager): mcp_server_config=mcp_server_config, ) ) + # Add new UI components to webui_manager and tab_components + status_display = gr.Markdown(value="Status: Idle", visible=True) # Initially visible + with gr.Group(visible=False) as confirmation_group: + confirm_synthesis_button = gr.Button("Confirm and Proceed to Synthesis") + with gr.Group(visible=False) as error_feedback_group: + error_details_display = gr.Markdown() + error_feedback_choice = gr.Radio(choices=["retry", "skip", "abort"], label="Choose action for error", value="retry") + submit_error_feedback_button = gr.Button("Submit Feedback") + + tab_components.update({ + "status_display": status_display, + "confirmation_group": confirmation_group, + "confirm_synthesis_button": confirm_synthesis_button, + "error_feedback_group": error_feedback_group, + "error_details_display": error_details_display, + "error_feedback_choice": error_feedback_choice, + "submit_error_feedback_button": submit_error_feedback_button, + }) + webui_manager.add_components("deep_research_agent", tab_components) - webui_manager.init_deep_research_agent() + webui_manager.init_deep_research_agent() # Initializes self.dr_agent etc. + + async def update_mcp_wrapper(mcp_file): + # This function seems to be misnamed in the original, it's for MCP server update + # It might be better named handle_mcp_upload or similar + config_str, visibility_update = await update_mcp_server(mcp_file, webui_manager) + return {mcp_server_config: config_str, mcp_server_config: visibility_update} - async def update_wrapper(mcp_file): - """Wrapper for handle_pause_resume.""" - update_dict = await update_mcp_server(mcp_file, webui_manager) - yield update_dict mcp_json_file.change( - update_wrapper, + fn=update_mcp_wrapper, # Corrected name if it's for MCP inputs=[mcp_json_file], - outputs=[mcp_server_config, mcp_server_config] + outputs=[mcp_server_config, mcp_server_config] # Ensure this matches what update_mcp_server returns ) - dr_tab_outputs = list(tab_components.values()) - all_managed_inputs = set(webui_manager.get_components()) + # Consolidate all components that can be updated by handlers + # Order matters for Gradio output mapping if not returning a dict keyed by component + # It's safer to return dicts from handlers: {component_to_update: gr.update(...)} + # For now, assuming dr_tab_outputs includes all relevant components for updates. + # We need to add the new components to this list if not using dict returns. + + # Get all components managed by WebuiManager that might need updating + # This includes components from other tabs if they are in webui_manager.id_to_component + all_managed_components_list = list(webui_manager.id_to_component.values()) + # --- Define Event Handler Wrappers --- - async def start_wrapper(comps: Dict[Component, Any]) -> AsyncGenerator[Dict[Component, Any], None]: - async for update in run_deep_research(webui_manager, comps): - yield update + # These wrappers will now collect all inputs from the UI that might be needed + # and pass them to the core logic. They then yield dictionaries of component updates. - async def stop_wrapper() -> AsyncGenerator[Dict[Component, Any], None]: + async def handle_run_button_click(comps: Dict[Component, Any]) -> AsyncGenerator[Dict[Component, Any], None]: + # `comps` is already a dictionary of all managed components from `all_managed_inputs` + async for update_dict in run_deep_research(webui_manager, comps): + yield update_dict + + async def handle_stop_button_click() -> AsyncGenerator[Dict[Component, Any], None]: update_dict = await stop_deep_research(webui_manager) yield update_dict + async def handle_confirm_synthesis_click() -> AsyncGenerator[Dict[Component, Any], None]: + logger.info("Confirm Synthesis button clicked.") + # Disable interaction groups immediately + yield { + confirmation_group_comp: gr.update(visible=False), + status_display_comp: gr.update(value="Confirmation received. Resuming for synthesis..."), + } + result = await webui_manager.provide_final_confirmation_to_agent() + + # Process result and update UI (similar to terminal states in run_deep_research) + ui_updates = { + start_button_comp: gr.update(interactive=True), # Re-enable run button + stop_button_comp: gr.update(interactive=False), + research_task_comp: gr.update(interactive=True), + resume_task_id_comp: gr.update(interactive=True, value=result.get("task_id","")), + status_display_comp: gr.update(value=f"Status: {result.get('status')}. {result.get('message', '')}", visible=True), + confirmation_group_comp: gr.update(visible=False), + error_feedback_group_comp: gr.update(visible=False), + } + if result.get("status") == "completed" and result.get("final_state", {}).get("final_report"): + ui_updates[markdown_display_comp] = gr.update(value=result["final_state"]["final_report"]) + if result.get("task_id") and webui_manager.dr_save_dir: + report_file = os.path.join(webui_manager.dr_save_dir, str(result["task_id"]), "report.md") + if os.path.exists(report_file): + ui_updates[markdown_download_comp] = gr.File(value=report_file, label=f"Report ({result['task_id']}.md)", interactive=True) + elif result.get("status") == "error": + ui_updates[markdown_display_comp] = gr.update(value=f"# Synthesis Failed\n\nError: {result.get('message')}") + yield ui_updates + + async def handle_submit_error_feedback_click(feedback_choice: str) -> AsyncGenerator[Dict[Component, Any], None]: + logger.info(f"Submit Error Feedback button clicked. Choice: {feedback_choice}") + # Disable interaction groups immediately + yield { + error_feedback_group_comp: gr.update(visible=False), + status_display_comp: gr.update(value=f"Feedback '{feedback_choice}' received. Resuming..."), + } + result = await webui_manager.provide_error_feedback_to_agent(feedback_choice) + + ui_updates = { # Default to re-enabling main controls if terminal + start_button_comp: gr.update(interactive=True), + stop_button_comp: gr.update(interactive=False), + research_task_comp: gr.update(interactive=True), + resume_task_id_comp: gr.update(interactive=True, value=result.get("task_id","")), + } + + status = result.get("status") + if status == "awaiting_confirmation": + ui_updates[status_display_comp] = gr.update(value=result.get("message", "Awaiting final confirmation."), visible=True) + ui_updates[confirmation_group_comp] = gr.update(visible=True) + ui_updates[error_feedback_group_comp] = gr.update(visible=False) + ui_updates[start_button_comp] = gr.update(interactive=False) # Keep run disabled + ui_updates[research_task_comp] = gr.update(interactive=False) + ui_updates[resume_task_id_comp] = gr.update(interactive=False) + elif status == "awaiting_error_feedback": + ui_updates[status_display_comp] = gr.update(value=result.get("message", "Paused due to an error."), visible=True) + ui_updates[error_details_display_comp] = gr.update(value=f"**Error in {result.get('current_error_node_origin', 'N/A')}:**\n\n```\n{result.get('error_details', 'No details provided.')}\n```") + ui_updates[error_feedback_group_comp] = gr.update(visible=True) + ui_updates[confirmation_group_comp] = gr.update(visible=False) + ui_updates[start_button_comp] = gr.update(interactive=False) # Keep run disabled + ui_updates[research_task_comp] = gr.update(interactive=False) + ui_updates[resume_task_id_comp] = gr.update(interactive=False) + else: # Terminal states + ui_updates[status_display_comp] = gr.update(value=f"Status: {status}. {result.get('message', '')}", visible=True) + ui_updates[confirmation_group_comp] = gr.update(visible=False) + ui_updates[error_feedback_group_comp] = gr.update(visible=False) + if status == "completed" and result.get("final_state", {}).get("final_report"): + ui_updates[markdown_display_comp] = gr.update(value=result["final_state"]["final_report"]) + if result.get("task_id") and webui_manager.dr_save_dir: + report_file = os.path.join(webui_manager.dr_save_dir, str(result["task_id"]), "report.md") + if os.path.exists(report_file): + ui_updates[markdown_download_comp] = gr.File(value=report_file, label=f"Report ({result['task_id']}.md)", interactive=True) + elif status == "error": + ui_updates[markdown_display_comp] = gr.update(value=f"# Task Failed after Feedback\n\nError: {result.get('message')}") + elif result.get("final_state", {}).get("final_report"): # E.g. for skipped then completed + ui_updates[markdown_display_comp] = gr.update(value=result["final_state"]["final_report"]) + if result.get("task_id") and webui_manager.dr_save_dir: + report_file = os.path.join(webui_manager.dr_save_dir, str(result["task_id"]), "report.md") + if os.path.exists(report_file): + ui_updates[markdown_download_comp] = gr.File(value=report_file, label=f"Report ({result['task_id']}.md)", interactive=True) + yield ui_updates + # --- Connect Handlers --- + # Inputs for start_button are all components webui_manager is aware of. + # Outputs are all components in the current tab that might be updated. + # Using dicts for outputs is more robust. + + # The `inputs` for `start_button.click` should be `webui_manager.get_components()` + # to ensure all settings from other tabs are correctly passed. + # The `outputs` should be a list of all components that can be affected by `run_deep_research`. + # This includes the new UI elements. + + # Define the list of ALL components that any of these handlers might update. + # This is important for Gradio's .click() method when not returning a dict. + # However, it's better practice for handlers to yield dicts of {component: gr.update()}, + # then the `outputs` list in .click() can be minimal or just those components. + # For safety, list all potentially affected components if not using dict returns consistently. + + output_components_for_handlers = [ + start_button, stop_button, research_task, resume_task_id, parallel_num, max_query, # max_query is save_dir + markdown_display, markdown_download, status_display, + confirmation_group, error_feedback_group, error_details_display + ] + + start_button.click( - fn=start_wrapper, - inputs=all_managed_inputs, - outputs=dr_tab_outputs + fn=handle_run_button_click, + inputs=webui_manager.get_components(), # Pass all managed components as dict + outputs=output_components_for_handlers # Explicitly list outputs ) stop_button.click( - fn=stop_wrapper, + fn=handle_stop_button_click, inputs=None, - outputs=dr_tab_outputs + outputs=output_components_for_handlers + ) + + confirm_synthesis_button.click( + fn=handle_confirm_synthesis_click, + inputs=None, # Task_id will be taken from webui_manager + outputs=output_components_for_handlers + ) + + submit_error_feedback_button.click( + fn=handle_submit_error_feedback_click, + inputs=[error_feedback_choice], # Pass the choice from radio + outputs=output_components_for_handlers ) diff --git a/src/webui/webui_manager.py b/src/webui/webui_manager.py index 0a9d5e16..573e6874 100644 --- a/src/webui/webui_manager.py +++ b/src/webui/webui_manager.py @@ -45,10 +45,11 @@ def init_deep_research_agent(self) -> None: """ init deep research agent """ - self.dr_agent: Optional[DeepResearchAgent] = None - self.dr_current_task = None - self.dr_agent_task_id: Optional[str] = None + self.dr_agent: Optional[DeepResearchAgent] = None # This will be our active agent + self.dr_current_task_async: Optional[asyncio.Task] = None # Corresponds to dr_current_task, renamed for clarity + self.dr_task_id: Optional[str] = None # Corresponds to dr_agent_task_id, used for UI and agent interaction self.dr_save_dir: Optional[str] = None + # active_deep_research_agent and active_deep_research_task_id are effectively self.dr_agent and self.dr_task_id def add_components(self, tab_name: str, components_dict: dict[str, "Component"]) -> None: """ @@ -120,3 +121,59 @@ def load_config(self, config_path: str): } ) yield update_components + + async def provide_final_confirmation_to_agent(self) -> Dict[str, Any]: + """ + Provides final confirmation to the currently active DeepResearchAgent. + """ + logger.info(f"Attempting to provide final confirmation for task_id: {self.dr_task_id}") + if not self.dr_agent or not self.dr_task_id: + logger.warning("No active DeepResearchAgent or task_id to provide confirmation to.") + return {"status": "error", "message": "No active research task found for confirmation."} + + try: + result = await self.dr_agent.provide_final_confirmation() + logger.info(f"Confirmation result for task {self.dr_task_id}: {result.get('status')}") + + # If the agent is no longer awaiting confirmation or error, and is in a terminal state + if result.get("status") not in ["awaiting_confirmation", "awaiting_error_feedback"]: + logger.info(f"Task {self.dr_task_id} reached terminal state after confirmation. Clearing active agent.") + self.dr_agent = None + self.dr_task_id = None + self.dr_current_task_async = None + return result + except Exception as e: + logger.error(f"Error providing final confirmation to agent for task {self.dr_task_id}: {e}", exc_info=True) + # Consider clearing agent if it's in an unrecoverable state + self.dr_agent = None + self.dr_task_id = None + self.dr_current_task_async = None + return {"status": "error", "message": f"Failed to provide final confirmation: {e}"} + + async def provide_error_feedback_to_agent(self, feedback: str) -> Dict[str, Any]: + """ + Provides error feedback to the currently active DeepResearchAgent. + """ + logger.info(f"Attempting to provide error feedback '{feedback}' for task_id: {self.dr_task_id}") + if not self.dr_agent or not self.dr_task_id: + logger.warning("No active DeepResearchAgent or task_id to provide error feedback to.") + return {"status": "error", "message": "No active research task found for error feedback."} + + try: + result = await self.dr_agent.provide_error_feedback(feedback) + logger.info(f"Error feedback result for task {self.dr_task_id} with feedback '{feedback}': {result.get('status')}") + + # If the agent is no longer awaiting confirmation or error, and is in a terminal state + if result.get("status") not in ["awaiting_confirmation", "awaiting_error_feedback"]: + logger.info(f"Task {self.dr_task_id} reached terminal state after error feedback. Clearing active agent.") + self.dr_agent = None + self.dr_task_id = None + self.dr_current_task_async = None + return result + except Exception as e: + logger.error(f"Error providing error feedback to agent for task {self.dr_task_id}: {e}", exc_info=True) + # Consider clearing agent if it's in an unrecoverable state + self.dr_agent = None + self.dr_task_id = None + self.dr_current_task_async = None + return {"status": "error", "message": f"Failed to provide error feedback: {e}"}