Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 50 additions & 1 deletion app/src/ai/blocklist/action_model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,14 @@ impl BlocklistAIActionModel {
.running_actions
.get(&conversation_id)
.is_some_and(|running| !running.is_empty());
has_pending || has_running
// In-flight preprocessing counts as unfinished: a batch still being preprocessed may
// yet enqueue executable actions, so resolving the conversation now would be premature
// (and would also misfire the orphan-recovery net).
let has_preprocessing = self
.pending_preprocessed_actions
.get(&conversation_id)
.is_some_and(|preprocessing| !preprocessing.is_empty());
has_pending || has_running || has_preprocessing
}

/// Returns finished action results received from the most recent AI output for the active conversation.
Expand All @@ -600,6 +607,21 @@ impl BlocklistAIActionModel {
self.finished_action_results.get(&conversation_id)
}

/// Test-only helper: seeds a finished action result without emitting events or
/// updating conversation status, so tests can reproduce the preprocessing dedup
/// path (where a result already exists for an action that is about to be queued).
#[cfg(test)]
pub(super) fn insert_finished_action_result_for_test(
&mut self,
conversation_id: AIConversationId,
result: Arc<AIAgentActionResult>,
) {
self.finished_action_results
.entry(conversation_id)
.or_default()
.push(result);
}

/// Returns the `AIActionStatus` for the action corresponding to the given `id`, if any.
pub fn get_action_status(&self, id: &AIAgentActionId) -> Option<AIActionStatus> {
for (conversation_id, pending_actions_for_conversation) in &self.pending_actions {
Expand Down Expand Up @@ -957,6 +979,7 @@ impl BlocklistAIActionModel {
.or_default()
.handle_preprocess_actions_result(preprocess_id, actions);

let mut enqueued_executable_action = false;
for action in actions_to_enqueue {
let action_id = action.id.clone();
// Some actions may already have results. This can happen in session sharing when
Expand Down Expand Up @@ -990,8 +1013,34 @@ impl BlocklistAIActionModel {
.or_default()
.push_back(action);
ctx.emit(BlocklistAIActionEvent::QueuedAction(action_id));
enqueued_executable_action = true;
}
self.try_to_execute_available_actions(conversation_id, ctx);

// Orphan guard: if this preprocessing batch enqueued nothing executable and no other
// actions remain unfinished for the conversation (including in-flight preprocessing),
// then no `FinishedAction` will ever be emitted for this drain. Without it, the
// controller's resolver never runs and the conversation can stay `InProgress` forever
// with no in-flight stream. Re-emit `FinishedAction` for an already-settled action so
// the controller resolves the conversation (sends a follow-up or sets a terminal
// status). Reusing a finished action id is semantically valid because that action *is*
// finished, and avoids introducing an event variant that lacks an action id.
if !enqueued_executable_action
&& !self.has_unfinished_actions_for_conversation(conversation_id)
{
if let Some(settled_action_id) = self
.finished_action_results
.get(&conversation_id)
.and_then(|results| results.last())
.map(|result| result.id.clone())
{
ctx.emit(BlocklistAIActionEvent::FinishedAction {
action_id: settled_action_id,
conversation_id,
cancellation_reason: None,
});
}
}
}

/// Apply a finished action result to the conversation.
Expand Down
5 changes: 5 additions & 0 deletions app/src/ai/blocklist/action_model/preprocess.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ impl PendingPreprocessedActions {
self.0.iter().any(|action| action.contains(action_id))
}

/// Returns whether there are no in-flight preprocessing batches.
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}

/// Returns the actions that are ready to be queued now that the group of actions identified by [`PreprocessId`] have completed.
/// NOTE this may return actions that have been completed earlier to maintain the invariant that actions are returned in the
/// order they are added.
Expand Down
Loading
Loading