Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
222 changes: 156 additions & 66 deletions api/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
)
from api.workspace import get_last_workspace
from api.agent_sessions import read_importable_agent_session_rows, read_session_lineage_metadata
from api import wal as _wal

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -316,11 +317,11 @@ def __init__(self, session_id: str=None, title: str='Untitled',
pending_user_message: str=None,
pending_attachments=None,
pending_started_at=None,
context_length=None, threshold_tokens=None,
last_prompt_tokens=None,
context_messages=None,
compression_anchor_visible_idx=None,
compression_anchor_message_key=None,
context_length=None, threshold_tokens=None,
last_prompt_tokens=None,
parent_session_id: str=None,
enabled_toolsets=None,
**kwargs):
Expand Down Expand Up @@ -375,7 +376,6 @@ def save(self, touch_updated_at: bool = True, skip_index: bool = False) -> None:
'input_tokens', 'output_tokens', 'estimated_cost',
'personality', 'active_stream_id',
'pending_user_message', 'pending_attachments', 'pending_started_at',
'compression_anchor_visible_idx', 'compression_anchor_message_key',
'context_length', 'threshold_tokens', 'last_prompt_tokens',
'parent_session_id',
'is_cli_session', 'source_tag', 'session_source', 'source_label',
Expand Down Expand Up @@ -470,8 +470,6 @@ def compact(self, include_runtime=False, active_stream_ids=None) -> dict:
'output_tokens': self.output_tokens,
'estimated_cost': self.estimated_cost,
'personality': self.personality,
'compression_anchor_visible_idx': self.compression_anchor_visible_idx,
'compression_anchor_message_key': self.compression_anchor_message_key,
'context_length': self.context_length,
'threshold_tokens': self.threshold_tokens,
'last_prompt_tokens': self.last_prompt_tokens,
Expand Down Expand Up @@ -543,19 +541,16 @@ def _apply_core_sync_or_error_marker(
# stuck pending fields MUST still be cleared and an error marker appended
# so the session isn't permanently left in stale-pending state.
if len(session.messages) != 0:
# Messages are non-empty — an assistant partial was already captured
# (e.g. WAL replay or a previous checkpoint). Clear stuck pending fields
# but do NOT append an error marker: the partial is real model output.
session.active_stream_id = None
session.pending_user_message = None
session.pending_attachments = []
session.pending_started_at = None
session.messages.append({
'role': 'assistant',
'content': '**Previous turn did not complete.**',
'timestamp': int(time.time()),
'_error': True,
})
session.save()
logger.info(
"Session %s: pending cleared (messages non-empty), added error marker",
"Session %s: pending cleared (messages non-empty, partial present), no error marker",
sid,
)
return True
Expand Down Expand Up @@ -615,6 +610,106 @@ def _apply_core_sync_or_error_marker(
return True


def _replay_wal_recovery(session) -> None:
"""Replay WAL events into a freshly-loaded session to recover crashed streaming output.

Called only when ``session.active_stream_id`` is set (streaming was in-flight)
and the stream is no longer alive. Safe to call repeatedly — replaying into an
already-recovered session adds duplicate messages, but the LRU eviction path in
``get_session`` prevents a recovered session from being pinned in cache with
stale data (it is evicted if still stuck with messages=[] after repair fails).

Side-effects:
- Appends assistant message(s) to ``session.messages`` with recovered content.
- Appends tool call events to ``session.tool_calls`` if any were captured.
- Sets ``session.active_stream_id = None`` and clears pending state.
- Calls ``session.save()`` to persist the recovered data.
- Deletes the WAL file on successful recovery.

WAL is NOT replayed if the session has messages (normal completion path); the
WAL is only replayed when the session JSON on disk shows no assistant reply
for the in-flight stream (messages list ends with the user's pending message).
"""
if not session.active_stream_id:
return

# Only replay WAL if the stream is no longer alive in STREAMS.
# If the stream IS still alive, the streaming thread is still running — WAL
# will be written normally; do not interfere.
try:
with STREAMS_LOCK:
if session.active_stream_id in STREAMS:
return # stream still active — let it finish normally
except Exception:
return # best-effort check

# Only replay if the last message in the session is from the user
# (i.e., the assistant reply is genuinely missing, not just not-yet-checkpointed).
if not session.messages or session.messages[-1].get('role') != 'user':
# Assistant message already present — no recovery needed.
return

events = _wal.read_wal(session.session_id)
if not events:
return # No WAL — fall through to existing stale-pending repair

recovered = _wal.replay_wal(events)
if not recovered.get('content') and not recovered.get('tool_calls'):
return # Nothing substantive to recover

# Valid WAL event list found. Reconstruct assistant message content.
assistant_content = recovered.get('content', '')
# Strip trailing thinking/reasoning markup that was mid-stream when crashed.
import re as _re
assistant_content = _re.sub(
r'<think(?:ing)?\b[^>]*>.*',
'', assistant_content, flags=_re.DOTALL | _re.IGNORECASE
).strip()

recovered_msg = {
'role': 'assistant',
'content': assistant_content,
'timestamp': int(time.time()),
'_wal_recovered': True,
}
# Restore reasoning/thinking text captured during the interrupted stream.
if recovered.get('reasoning'):
recovered_msg['reasoning'] = recovered['reasoning']
# Only mark _partial if the content was cut off (no natural sentence end).
# Use a heuristic: ends with a letter/digit followed by no punctuation.
if assistant_content and assistant_content[-1].isalnum():
recovered_msg['_partial'] = True

session.messages.append(recovered_msg)

# Reconstruct tool_calls list from WAL tool events if present.
tool_calls = recovered.get('tool_calls', [])
if tool_calls:
session.tool_calls = session.tool_calls or []
for tc in tool_calls:
session.tool_calls.append({
'id': tc.get('id', ''),
'name': tc.get('name', ''),
'args': tc.get('args', ''),
'result': '',
'timestamp': int(time.time()),
})

# Clear pending state and persist.
session.active_stream_id = None
session.pending_user_message = None
session.pending_attachments = []
session.pending_started_at = None
session.save()
_wal.delete_wal(session.session_id)
logger.info(
"Session %s: WAL recovery replayed %d tokens, %d tool calls",
session.session_id,
len(assistant_content),
len(tool_calls),
)


def _repair_stale_pending(session) -> bool:
"""Recover a sidecar stuck with messages=[] and stale pending state.

Expand Down Expand Up @@ -666,6 +761,40 @@ def _repair_stale_pending(session) -> bool:
return False


_CRON_PROJECT_LOCK = threading.Lock()
CRON_PROJECT_NAME = 'Cron Jobs'


def ensure_cron_project() -> str:
"""Return the project_id of the system "Cron Jobs" project, creating it if needed.

Thread-safe and idempotent. Returns a 12-char hex project_id string.
"""
with _CRON_PROJECT_LOCK:
for p in load_projects():
if p.get('name') == CRON_PROJECT_NAME:
return p['project_id']
project_id = uuid.uuid4().hex[:12]
projects = load_projects()
projects.append({
'project_id': project_id,
'name': CRON_PROJECT_NAME,
'color': '#6366f1',
'created_at': time.time(),
})
save_projects(projects)
return project_id


def is_cron_session(session_id: str, source_tag: str = None) -> bool:
"""Return True if a session originates from a cron job."""
if source_tag == 'cron':
return True
sid = str(session_id or '')
return sid.startswith('cron_')



def get_session(sid, metadata_only=False):
"""Load a session, optionally with metadata only (skipping the messages array).

Expand All @@ -685,6 +814,15 @@ def get_session(sid, metadata_only=False):
else:
s = Session.load(sid)
if s:
# WAL recovery: replay any unflushed streaming output from a crashed
# or killed process before adding the session to the cache. This
# reconstructs partial assistant text (tokens streamed but not yet
# checkpointed) and tool call events so they are not silently lost.
if not metadata_only:
try:
_replay_wal_recovery(s)
except Exception:
pass # WAL replay is best-effort; never block session load
with LOCK:
SESSIONS[sid] = s
SESSIONS.move_to_end(sid)
Expand Down Expand Up @@ -826,16 +964,11 @@ def all_sessions():
# No grace window: a 0-message Untitled session is never shown in the list
# regardless of age. This means page refreshes and accidental New Conversation
# clicks never leave orphan entries in the sidebar.
#
# Exception: sessions with active_stream_id set are actively streaming (#1327).
# #1184 deferred the first save() until the first message, so during the
# initial streaming turn the session still looks like Untitled+0-messages.
# Without this exemption, navigating away during a long first turn causes
# the session to vanish from the sidebar.
result = [s for s in result if not (
s.get('title', 'Untitled') == 'Untitled'
and s.get('message_count', 0) == 0
and not s.get('active_stream_id')
and not s.get('is_streaming')
and not s.get('pending_user_message') # exempt sessions waiting for first response (#1327)
)]
result = [s for s in result if not _hide_from_default_sidebar(s)]
# Backfill: sessions created before Sprint 22 have no profile tag.
Expand All @@ -861,12 +994,12 @@ def all_sessions():
out.sort(key=lambda s: (getattr(s, 'pinned', False), _session_sort_timestamp(s)), reverse=True)
# Hide empty Untitled sessions from the UI entirely — kept consistent with the
# index-path filter above. No grace window: a 0-message Untitled session is
# never shown regardless of age (#1171). Same streaming exemption as above (#1327).
# never shown regardless of age (#1171).
result = [s.compact(include_runtime=True, active_stream_ids=active_stream_ids) for s in out if not (
s.title == 'Untitled'
and len(s.messages) == 0
and not s.active_stream_id
and not s.pending_user_message
and not _is_streaming_session(s.active_stream_id, active_stream_ids)
and not s.pending_user_message # exempt sessions waiting for first response (#1327)
)]
result = [s for s in result if not _hide_from_default_sidebar(s)]
for s in result:
Expand Down Expand Up @@ -905,40 +1038,6 @@ def save_projects(projects) -> None:
PROJECTS_FILE.write_text(json.dumps(projects, ensure_ascii=False, indent=2), encoding='utf-8')


CRON_PROJECT_NAME = 'Cron Jobs'
_CRON_PROJECT_LOCK = threading.Lock()


def ensure_cron_project() -> str:
"""Return the project_id of the system "Cron Jobs" project, creating it if needed.

Thread-safe and idempotent. Returns a 12-char hex project_id string.
"""
with _CRON_PROJECT_LOCK:
for p in load_projects():
if p.get('name') == CRON_PROJECT_NAME:
return p['project_id']
project_id = uuid.uuid4().hex[:12]
projects = load_projects()
projects.append({
'project_id': project_id,
'name': CRON_PROJECT_NAME,
'color': '#6366f1',
'created_at': time.time(),
})
save_projects(projects)
return project_id


def is_cron_session(session_id: str, source_tag: str = None) -> bool:
"""Return True if a session originates from a cron job."""
if source_tag == 'cron':
return True
sid = str(session_id or '')
return sid.startswith('cron_')



def import_cli_session(
session_id: str,
title: str,
Expand Down Expand Up @@ -1003,15 +1102,6 @@ def get_cli_sessions() -> list:
except ImportError:
_cli_profile = None # older agent -- fall back to no profile

# Memoize the cron project ID for this scan so we don't pay a lock-acquire +
# disk-read of projects.json per cron session in the loop below.
# Resolved lazily on the first cron session we encounter.
_cron_pid_cache = [None] # list-as-cell so the closure can mutate
def _cron_pid():
if _cron_pid_cache[0] is None:
_cron_pid_cache[0] = ensure_cron_project()
return _cron_pid_cache[0]

try:
for row in read_importable_agent_session_rows(db_path, limit=200, log=logger, exclude_sources=None):
sid = row['id']
Expand Down Expand Up @@ -1050,7 +1140,7 @@ def _cron_pid():
'updated_at': raw_ts,
'pinned': False,
'archived': False,
'project_id': _cron_pid() if is_cron_session(sid, _source) else None,
'project_id': None,
'profile': profile,
'source_tag': _source,
'raw_source': row.get('raw_source'),
Expand Down
10 changes: 7 additions & 3 deletions api/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -2622,9 +2622,13 @@ def cancel_stream(stream_id: str) -> bool:
and clears session.active_stream_id) so new /api/chat/start requests succeed
immediately after cancel, even if the agent thread is still blocked.

The worker thread's finally block uses .pop(key, None), so the double-pop is
a safe no-op. Session cleanup runs outside STREAMS_LOCK to preserve lock
ordering (streaming thread does LOCK → STREAMS_LOCK; inverting would deadlock).
NOTE: cancel_stream calls get_session() (disk-backed) rather than reading
from the SESSIONS cache. This is intentional — the cached session may
contain stale in-memory state from a prior turn, while the disk copy
reflects the latest persisted checkpoint. On crash recovery we need the
most recently checkpointed state, not a potentially stale in-memory view.
Cache coherence for cancel is not a concern — both the streaming thread's
finally-block and the cancel path write through to disk via session.save().
"""
from api import config as _live_config

Expand Down
Loading
Loading