diff --git a/app/src/ai/blocklist/action_model.rs b/app/src/ai/blocklist/action_model.rs index 81f61fb846..9d65b67786 100644 --- a/app/src/ai/blocklist/action_model.rs +++ b/app/src/ai/blocklist/action_model.rs @@ -589,7 +589,14 @@ impl BlocklistAIActionModel { .running_actions .get(&conversation_id) .is_some_and(|running| !running.is_empty()); - has_pending || has_running + // In-flight preprocessing counts as unfinished: a batch still being preprocessed may + // yet enqueue executable actions, so resolving the conversation now would be premature + // (and would also misfire the orphan-recovery net). + let has_preprocessing = self + .pending_preprocessed_actions + .get(&conversation_id) + .is_some_and(|preprocessing| !preprocessing.is_empty()); + has_pending || has_running || has_preprocessing } /// Returns finished action results received from the most recent AI output for the active conversation. @@ -600,6 +607,21 @@ impl BlocklistAIActionModel { self.finished_action_results.get(&conversation_id) } + /// Test-only helper: seeds a finished action result without emitting events or + /// updating conversation status, so tests can reproduce the preprocessing dedup + /// path (where a result already exists for an action that is about to be queued). + #[cfg(test)] + pub(super) fn insert_finished_action_result_for_test( + &mut self, + conversation_id: AIConversationId, + result: Arc, + ) { + self.finished_action_results + .entry(conversation_id) + .or_default() + .push(result); + } + /// Returns the `AIActionStatus` for the action corresponding to the given `id`, if any. pub fn get_action_status(&self, id: &AIAgentActionId) -> Option { for (conversation_id, pending_actions_for_conversation) in &self.pending_actions { @@ -957,6 +979,7 @@ impl BlocklistAIActionModel { .or_default() .handle_preprocess_actions_result(preprocess_id, actions); + let mut enqueued_executable_action = false; for action in actions_to_enqueue { let action_id = action.id.clone(); // Some actions may already have results. This can happen in session sharing when @@ -990,8 +1013,34 @@ impl BlocklistAIActionModel { .or_default() .push_back(action); ctx.emit(BlocklistAIActionEvent::QueuedAction(action_id)); + enqueued_executable_action = true; } self.try_to_execute_available_actions(conversation_id, ctx); + + // Orphan guard: if this preprocessing batch enqueued nothing executable and no other + // actions remain unfinished for the conversation (including in-flight preprocessing), + // then no `FinishedAction` will ever be emitted for this drain. Without it, the + // controller's resolver never runs and the conversation can stay `InProgress` forever + // with no in-flight stream. Re-emit `FinishedAction` for an already-settled action so + // the controller resolves the conversation (sends a follow-up or sets a terminal + // status). Reusing a finished action id is semantically valid because that action *is* + // finished, and avoids introducing an event variant that lacks an action id. + if !enqueued_executable_action + && !self.has_unfinished_actions_for_conversation(conversation_id) + { + if let Some(settled_action_id) = self + .finished_action_results + .get(&conversation_id) + .and_then(|results| results.last()) + .map(|result| result.id.clone()) + { + ctx.emit(BlocklistAIActionEvent::FinishedAction { + action_id: settled_action_id, + conversation_id, + cancellation_reason: None, + }); + } + } } /// Apply a finished action result to the conversation. diff --git a/app/src/ai/blocklist/action_model/preprocess.rs b/app/src/ai/blocklist/action_model/preprocess.rs index e8d2807f9b..991e81301a 100644 --- a/app/src/ai/blocklist/action_model/preprocess.rs +++ b/app/src/ai/blocklist/action_model/preprocess.rs @@ -27,6 +27,11 @@ impl PendingPreprocessedActions { self.0.iter().any(|action| action.contains(action_id)) } + /// Returns whether there are no in-flight preprocessing batches. + pub fn is_empty(&self) -> bool { + self.0.is_empty() + } + /// Returns the actions that are ready to be queued now that the group of actions identified by [`PreprocessId`] have completed. /// NOTE this may return actions that have been completed earlier to maintain the invariant that actions are returned in the /// order they are added. diff --git a/app/src/ai/blocklist/controller.rs b/app/src/ai/blocklist/controller.rs index 3ce921c0c3..ea2994de8e 100644 --- a/app/src/ai/blocklist/controller.rs +++ b/app/src/ai/blocklist/controller.rs @@ -448,113 +448,11 @@ impl BlocklistAIController { else { return; }; - let action_model = me.action_model.as_ref(ctx); - if action_model.has_unfinished_actions_for_conversation(*conversation_id) { - return; - } - - let history_model = BlocklistAIHistoryModel::handle(ctx); - let Some((is_viewing_shared_session, is_entirely_passive_code_diff)) = history_model - .as_ref(ctx) - .conversation(conversation_id) - .map(|conversation| { - ( - conversation.is_viewing_shared_session(), - conversation.is_entirely_passive_code_diff(), - ) - }) - else { - return; - }; - - // Viewer sessions should not send follow-ups. - // They only act as passive viewers of the action stream. - if is_viewing_shared_session { - return; - } - - let Some(finished_action_results) = - action_model.get_finished_action_results(*conversation_id) - else { - return; - }; - let is_passive_code_diff = is_entirely_passive_code_diff - && finished_action_results.last().is_some_and(|result| { - matches!(result.result, AIAgentActionResultType::RequestFileEdits(_)) - }); - let has_manual_follow_up = me.pending_passive_follow_ups.contains(conversation_id); - - let is_lrc_command_completed = - cancellation_reason.is_some_and(|reason| reason.is_lrc_command_completed()); - let should_preserve_in_progress_status = cancellation_reason - .is_some_and(|reason| reason.should_preserve_in_progress_status()); - let should_trigger_follow_up_request = (!is_passive_code_diff - && !is_lrc_command_completed - && finished_action_results - .iter() - .any(|result| result.result.should_trigger_request_upon_completion())) - || has_manual_follow_up; - if !should_trigger_follow_up_request { - if should_preserve_in_progress_status { - return; - } - // We also check if there's an in-flight req, because it's possible that this - // subscription callback was queued in response to auto-cancelling pending actions - // in the process of constructing a request. In such cases, we don't want to update - // conversation status to Cancelled/Success. - if !me - .in_flight_response_streams - .has_active_stream_for_conversation(*conversation_id, ctx) - { - // If the completed actions do not trigger a follow-up request, update conversation - // status based on the outcome of the actions. - // - // (It would otherwise remain `InProgress`, which would be correct, since we'd be - // immediately triggering a follow-up request). - // - // In practice, the only time where this codepath gets triggered is upon completion - // of a passive code diff action, where we don't autosend the next request. - // - // With passive code diffs, its most appropriate to mark the conversation - // successful if the passive diff was accepted. In practice, there's only ever - // one RequestFileEdits action, so `finished_action_results` at this point - // should only have a single element. - // - // If the user does end up following up on the passive diff-originated conversation, - // the status will once again be updated to `InProgress`. - let updated_conversation_status = if finished_action_results - .iter() - .all(|result| result.result.is_successful()) - || is_lrc_command_completed - { - ConversationStatus::Success - } else { - // This is an imperfect heuristic that practically speaking should have no effect. - // - // If we actually need to differentiate between the state of a conversation - // where actions completed with mixed result statuses (e.g. a mix of - // cancelled, error, and success) _and_ we don't automatically send back action - // results to the agent, then it'd be worth considering adding a new status - // variant. - ConversationStatus::Cancelled - }; - history_model.update(ctx, |history_model, ctx| { - history_model.update_conversation_status( - me.terminal_view_id, - *conversation_id, - updated_conversation_status, - ctx, - ); - }); - } - return; - } - let trigger = if has_manual_follow_up { - FollowUpTrigger::UserRequested - } else { - FollowUpTrigger::Auto - }; - me.send_follow_up_for_conversation(*conversation_id, trigger, ctx); + me.resolve_conversation_after_actions_settled( + *conversation_id, + *cancellation_reason, + ctx, + ); }); ctx.subscribe_to_model(&agent_view_controller, |me, _, event, ctx| { @@ -1624,6 +1522,133 @@ impl BlocklistAIController { self.pending_passive_follow_ups.remove(&conversation_id); } + /// Resolves a conversation once all of its actions have settled (no pending, + /// running, or in-flight preprocessing actions). + /// + /// This enforces the invariant that a conversation must never remain `InProgress` + /// with no in-flight stream and nothing left to do: it either sends a follow-up + /// request with the finished action results, or transitions the conversation to a + /// terminal status. + /// + /// This is the shared resolution path invoked both by the action model's + /// `FinishedAction` event and by the orphan-recovery safety net. + fn resolve_conversation_after_actions_settled( + &mut self, + conversation_id: AIConversationId, + cancellation_reason: Option, + ctx: &mut ModelContext, + ) { + let action_model = self.action_model.as_ref(ctx); + if action_model.has_unfinished_actions_for_conversation(conversation_id) { + return; + } + // Treat missing finished results as empty so an orphaned conversation still resolves + // to a terminal status instead of early-returning. In the normal `FinishedAction` + // flow results are always present, so this is equivalent to the prior behavior there. + let finished_action_results = action_model + .get_finished_action_results(conversation_id) + .cloned() + .unwrap_or_default(); + + let history_model = BlocklistAIHistoryModel::handle(ctx); + let Some((is_viewing_shared_session, is_entirely_passive_code_diff)) = history_model + .as_ref(ctx) + .conversation(&conversation_id) + .map(|conversation| { + ( + conversation.is_viewing_shared_session(), + conversation.is_entirely_passive_code_diff(), + ) + }) + else { + return; + }; + + // Viewer sessions should not send follow-ups. + // They only act as passive viewers of the action stream. + if is_viewing_shared_session { + return; + } + + let is_passive_code_diff = is_entirely_passive_code_diff + && finished_action_results.last().is_some_and(|result| { + matches!(result.result, AIAgentActionResultType::RequestFileEdits(_)) + }); + let has_manual_follow_up = self.pending_passive_follow_ups.contains(&conversation_id); + + let is_lrc_command_completed = + cancellation_reason.is_some_and(|reason| reason.is_lrc_command_completed()); + let should_preserve_in_progress_status = + cancellation_reason.is_some_and(|reason| reason.should_preserve_in_progress_status()); + let should_trigger_follow_up_request = (!is_passive_code_diff + && !is_lrc_command_completed + && finished_action_results + .iter() + .any(|result| result.result.should_trigger_request_upon_completion())) + || has_manual_follow_up; + if !should_trigger_follow_up_request { + if should_preserve_in_progress_status { + return; + } + // We also check if there's an in-flight req, because it's possible that this + // resolution was triggered in response to auto-cancelling pending actions + // in the process of constructing a request. In such cases, we don't want to update + // conversation status to Cancelled/Success. + if !self + .in_flight_response_streams + .has_active_stream_for_conversation(conversation_id, ctx) + { + // If the completed actions do not trigger a follow-up request, update conversation + // status based on the outcome of the actions. + // + // (It would otherwise remain `InProgress`, which would be correct, since we'd be + // immediately triggering a follow-up request). + // + // In practice, the only time where this codepath gets triggered is upon completion + // of a passive code diff action, where we don't autosend the next request. + // + // With passive code diffs, its most appropriate to mark the conversation + // successful if the passive diff was accepted. In practice, there's only ever + // one RequestFileEdits action, so `finished_action_results` at this point + // should only have a single element. + // + // If the user does end up following up on the passive diff-originated conversation, + // the status will once again be updated to `InProgress`. + let updated_conversation_status = if finished_action_results + .iter() + .all(|result| result.result.is_successful()) + || is_lrc_command_completed + { + ConversationStatus::Success + } else { + // This is an imperfect heuristic that practically speaking should have no effect. + // + // If we actually need to differentiate between the state of a conversation + // where actions completed with mixed result statuses (e.g. a mix of + // cancelled, error, and success) _and_ we don't automatically send back action + // results to the agent, then it'd be worth considering adding a new status + // variant. + ConversationStatus::Cancelled + }; + history_model.update(ctx, |history_model, ctx| { + history_model.update_conversation_status( + self.terminal_view_id, + conversation_id, + updated_conversation_status, + ctx, + ); + }); + } + return; + } + let trigger = if has_manual_follow_up { + FollowUpTrigger::UserRequested + } else { + FollowUpTrigger::Auto + }; + self.send_follow_up_for_conversation(conversation_id, trigger, ctx); + } + fn conversation_ready_for_pending_events( &self, conversation_id: AIConversationId, @@ -1909,6 +1934,12 @@ impl BlocklistAIController { ctx: &mut ModelContext, ) { if !self.conversation_ready_for_pending_events(conversation_id, ctx) { + // Safety net: this is one of the few hooks that fires after stream cleanup, so it's + // a convenient place to catch a conversation that has been left orphaned — + // `InProgress` with no in-flight stream, no pending/running/preprocessing actions, + // and no scheduled resume. Nothing else would ever move such a conversation out of + // `InProgress`, so recover it here. Readiness behavior is otherwise unchanged. + self.recover_orphaned_conversation_if_needed(conversation_id, ctx); return; } @@ -1923,6 +1954,55 @@ impl BlocklistAIController { self.inject_pending_events_for_request(conversation_id, ctx); } + /// Detects and recovers an orphaned conversation: one left `InProgress` with no + /// in-flight stream, no pending/running/preprocessing actions, and no scheduled + /// auto-resume. Such a conversation would otherwise hang indefinitely. Recovery + /// routes through the shared `resolve_conversation_after_actions_settled` path so + /// the conversation reaches a terminal status (or sends a follow-up). + fn recover_orphaned_conversation_if_needed( + &mut self, + conversation_id: AIConversationId, + ctx: &mut ModelContext, + ) { + let owns_conversation = BlocklistAIHistoryModel::as_ref(ctx) + .all_live_conversations_for_terminal_view(self.terminal_view_id) + .any(|conversation| conversation.id() == conversation_id); + if !owns_conversation { + return; + } + let is_in_progress = BlocklistAIHistoryModel::as_ref(ctx) + .conversation(&conversation_id) + .is_some_and(|conversation| conversation.status().is_in_progress()); + if !is_in_progress { + return; + } + if self + .in_flight_response_streams + .has_active_stream_for_conversation(conversation_id, ctx) + { + return; + } + if self + .action_model + .as_ref(ctx) + .has_unfinished_actions_for_conversation(conversation_id) + { + return; + } + // A scheduled auto-resume will move the conversation out of `InProgress` on its own. + if self + .pending_auto_resume_handles + .contains_key(&conversation_id) + { + return; + } + + log::warn!( + "Recovering orphaned conversation stuck InProgress with no stream or pending work: conversation_id={conversation_id:?}" + ); + self.resolve_conversation_after_actions_settled(conversation_id, None, ctx); + } + fn handle_dormant_claude_wake_ready( &mut self, conversation_id: AIConversationId, diff --git a/app/src/ai/blocklist/controller_tests.rs b/app/src/ai/blocklist/controller_tests.rs index 101a00f7b0..4188aef00f 100644 --- a/app/src/ai/blocklist/controller_tests.rs +++ b/app/src/ai/blocklist/controller_tests.rs @@ -1,16 +1,19 @@ use std::collections::HashMap; +use std::sync::Arc; use uuid::Uuid; use warpui::{App, SingletonEntity}; -use crate::ai::agent::conversation::AIConversationId; +use crate::ai::agent::conversation::{AIConversationId, ConversationStatus}; use crate::ai::agent::task::TaskId; use crate::ai::agent::{ - AIAgentAttachment, AIAgentContext, AIAgentInput, CancellationReason, ImageContext, - PassiveSuggestionTrigger, UserQueryMode, + AIAgentAction, AIAgentActionId, AIAgentActionResult, AIAgentActionResultType, + AIAgentActionType, AIAgentAttachment, AIAgentContext, AIAgentInput, CancellationReason, + ImageContext, PassiveSuggestionTrigger, RequestCommandOutputResult, UserQueryMode, }; use crate::ai::ambient_agents::AmbientAgentTaskId; use crate::ai::blocklist::{BlocklistAIHistoryModel, PendingAttachment, PendingFile}; +use crate::test_util::assert_eventually; use crate::test_util::terminal::{add_window_with_terminal, initialize_app_for_terminal_view}; fn new_ambient_agent_task_id() -> AmbientAgentTaskId { @@ -203,3 +206,88 @@ fn cancelling_conversation_aborts_pending_auto_resume() { }); }); } + +/// Regression test for orphaned conversations: when an action's preprocessing +/// completes but enqueues nothing (here, because a result for the action already +/// exists and the dedup guard drops it), the conversation must not be left stuck +/// `InProgress` with no in-flight stream and no pending/running actions. It should +/// resolve to a terminal status (or trigger a follow-up). +#[test] +fn success_with_actions_that_drop_in_preprocessing_does_not_orphan_conversation() { + App::test((), |mut app| async move { + initialize_app_for_terminal_view(&mut app); + let terminal = add_window_with_terminal(&mut app, None); + + let action_id = AIAgentActionId::from("orphan-action".to_owned()); + let task_id = TaskId::new("orphan-task".to_owned()); + + let conversation_id = terminal.update(&mut app, |terminal, ctx| { + let conversation_id = + BlocklistAIHistoryModel::handle(ctx).update(ctx, |history_model, ctx| { + history_model.start_new_conversation(terminal.id(), false, false, false, ctx) + }); + + let action_model = terminal.ai_controller().as_ref(ctx).action_model.clone(); + + // Seed a finished result for `action_id` so the preprocessing dedup guard drops + // the action when it is queued below, enqueuing nothing executable. A cancelled + // result keeps resolution offline (it won't trigger a follow-up request). + action_model.update(ctx, |action_model, _| { + action_model.insert_finished_action_result_for_test( + conversation_id, + Arc::new(AIAgentActionResult { + id: action_id.clone(), + task_id: task_id.clone(), + result: AIAgentActionResultType::RequestCommandOutput( + RequestCommandOutputResult::CancelledBeforeExecution, + ), + }), + ); + }); + + // Queue an action sharing that id. Preprocessing is spawned but has not yet run. + action_model.update(ctx, |action_model, ctx| { + action_model.queue_actions( + vec![AIAgentAction { + id: action_id.clone(), + task_id: task_id.clone(), + action: AIAgentActionType::InitProject, + requires_result: true, + }], + conversation_id, + ctx, + ); + }); + + conversation_id + }); + + // Until preprocessing drains, the conversation is still in progress. + terminal.update(&mut app, |_terminal, ctx| { + assert!( + BlocklistAIHistoryModel::as_ref(ctx) + .conversation(&conversation_id) + .expect("conversation exists") + .status() + .is_in_progress(), + "conversation should be InProgress before preprocessing drains" + ); + }); + + // Drive the spawned preprocessing future and resulting action-finished event to + // completion. On slower CI machines, a fixed number of yields can race this path. + assert_eventually!( + 200 => !matches!( + terminal.read(&app, |_terminal, ctx| { + BlocklistAIHistoryModel::as_ref(ctx) + .conversation(&conversation_id) + .expect("conversation exists") + .status() + .clone() + }), + ConversationStatus::InProgress + ), + "conversation whose only action dropped during preprocessing must not remain InProgress" + ); + }); +} diff --git a/app/src/terminal/view_tests.rs b/app/src/terminal/view_tests.rs index ab0696e0df..0689ea4b0d 100644 --- a/app/src/terminal/view_tests.rs +++ b/app/src/terminal/view_tests.rs @@ -6047,6 +6047,7 @@ fn submit_cli_agent_rich_input_opencode_defers_enter_and_close() { // Wait for the delayed \r to arrive. assert_eventually!( + 200 => pty_writes.borrow().len() == 2, "carriage return should be written after delay" ); @@ -6421,6 +6422,7 @@ fn submit_with_plugin_but_auto_toggle_off_respects_auto_dismiss() { // auto_toggle is off, so auto_dismiss closes rich input. // Claude uses DelayedEnter, so the close happens after a timer. assert_eventually!( + 200 => terminal.read(&app, |view, ctx| !view .has_active_cli_agent_input_session(ctx)), "Rich input should be closed after submit with auto_dismiss"