Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ docs/*
!docs/ui-ux/**
!docs/rfcs/
!docs/rfcs/**
!docs/architecture/
!docs/architecture/**

# Local-only AI assistant context — never committed even under docs/.
docs/AGENTS.md
Expand Down
95 changes: 95 additions & 0 deletions CHANGELOG.md

Large diffs are not rendered by default.

20 changes: 20 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,26 @@ RUN apt-get update -y --fix-missing --no-install-recommends \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*

# Optional GPU user-space acceleration libraries for users who pass through
# host GPU devices. The default image remains CPU-only.
ARG INSTALL_GPU_LIBS=0
RUN if [ "$INSTALL_GPU_LIBS" = "1" ]; then \
apt-get update -y --fix-missing --no-install-recommends \
&& apt-get install -y --no-install-recommends \
libva2 \
vainfo \
mesa-va-drivers \
&& if apt-cache show intel-media-va-driver-non-free >/dev/null 2>&1; then \
apt-get install -y --no-install-recommends intel-media-va-driver-non-free; \
else \
echo "intel-media-va-driver-non-free is not available from the configured Debian repositories; skipping Intel non-free VA-API driver."; \
fi \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*; \
else \
echo "Skipping optional GPU user-space acceleration libraries (INSTALL_GPU_LIBS=0)."; \
fi

# UTF-8
RUN localedef -i en_US -c -f UTF-8 -A /usr/share/locale/locale.alias en_US.UTF-8
ENV LANG=en_US.utf8
Expand Down
238 changes: 203 additions & 35 deletions api/agent_sessions.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,26 @@ def _as_positive_int(value) -> int:
return 0


def _as_score(*values) -> float:
"""First numerically-coercible value as a float, else 0.0.

Used to score lineage tips by recency. ``last_message_at`` comes from
``MAX(timestamp)`` and is normally a numeric epoch, but older/non-standard
state.db schemas can store an ISO-8601 *text* timestamp. Rather than letting
a non-numeric value raise ValueError (which previously escaped the DB
try-block and dropped all lineage metadata), fall through to the next
candidate (e.g. ``started_at``).
"""
for value in values:
if value in (None, ""):
continue
try:
return float(value)
except (TypeError, ValueError):
continue
return 0.0


def _count_user_turns(row: dict) -> int:
user_turns = row.get("actual_user_message_count")
if user_turns is None:
Expand Down Expand Up @@ -279,23 +299,50 @@ def _project_agent_session_rows(rows: list[dict]) -> list[dict]:
children.sort(key=lambda row: row.get('started_at') or 0, reverse=True)

def compression_tip(row: dict) -> tuple[dict | None, int]:
current = row
seen = {row['id']}
"""Return the freshest importable continuation descendant for ``row``.

Compression parents can have multiple continuation-looking children when
a stale segment is resumed after a newer compressed branch already
exists. Picking the newest *direct* child can hide the branch whose
deeper descendant has the actual latest activity. Walk all reachable
continuation descendants and select by real message activity instead.
"""
latest_importable = row if (row.get('actual_message_count') or 0) > 0 else None
segment_count = 1
for _ in range(len(rows_by_id) + 1):
candidates = [
child for child in children_by_parent.get(current['id'], [])
if child['id'] not in seen and _is_continuation_session(current, child)
]
if not candidates:
return latest_importable, segment_count
current = candidates[0]
seen.add(current['id'])
segment_count = 0
best_depth = 1
best_score = (
_as_score(latest_importable.get('last_activity'), latest_importable.get('started_at'))
if latest_importable
else 0
)
stack: list[tuple[dict, int]] = [(row, 1)]
seen: set[str] = set()

while stack:
current, depth = stack.pop()
current_id = current.get('id')
if not current_id or current_id in seen:
continue
seen.add(current_id)
segment_count += 1
if (current.get('actual_message_count') or 0) > 0:

current_score = _as_score(current.get('last_activity'), current.get('started_at'))
if (
(current.get('actual_message_count') or 0) > 0
and (current_score > best_score or (current_score == best_score and depth >= best_depth))
):
latest_importable = current
return latest_importable, segment_count
best_depth = depth
best_score = current_score
for child in children_by_parent.get(current_id, []):
child_id = child.get('id')
if not child_id or child_id in seen:
continue
if not _is_continuation_session(current, child):
continue
stack.append((child, depth + 1))

return latest_importable, max(segment_count, 1)

projected = []
for row in rows:
Expand Down Expand Up @@ -338,15 +385,15 @@ def compression_tip(row: dict) -> tuple[dict | None, int]:
projected.append(merged)

projected.sort(
key=lambda row: row.get('last_activity') or row.get('started_at') or 0,
key=lambda row: _as_score(row.get('last_activity'), row.get('started_at')),
reverse=True,
)
return projected


def read_importable_agent_session_rows(
db_path: Path,
limit: int = 200,
limit: int | None = 200,
log=None,
exclude_sources: tuple[str, ...] | None = ("cron", "webui"),
) -> list[dict]:
Expand Down Expand Up @@ -684,6 +731,8 @@ def read_session_lineage_metadata(db_path: Path, session_ids: list[str] | set[st
if 'parent_session_id' not in session_cols or 'end_reason' not in session_cols:
return {}
session_source_expr = _optional_col('session_source', session_cols)
source_expr = _optional_col('source', session_cols)
message_count_expr = _optional_col('message_count', session_cols, '0')
# Scoped fetch via PRIMARY KEY + idx_sessions_parent rather than a
# full table scan. The sessions table grows unbounded over time
# (1000+ rows is normal, 10000+ for power users), and this function
Expand All @@ -692,7 +741,9 @@ def read_session_lineage_metadata(db_path: Path, session_ids: list[str] | set[st
#
# Fetch the wanted ids first, then chase parent_session_id chains
# in batches until no new ids appear. Each batch hits PRIMARY KEY
# so it's effectively O(N) lookups.
# so it's effectively O(N) lookups. Then walk continuation children
# from the materialized ancestors so branchy compression lineages can
# mark the real freshest tip, not just the newest direct sibling.
#
# IN-clause is chunked to 500 to stay under SQLITE_MAX_VARIABLE_NUMBER
# on older sqlite (Python 3.9 ships sqlite 3.31 which defaults to 999;
Expand All @@ -717,7 +768,7 @@ def read_session_lineage_metadata(db_path: Path, session_ids: list[str] | set[st
placeholders = ','.join('?' * len(chunk))
cur.execute(
f"""
SELECT s.id, s.source, {session_source_expr}, s.title, s.started_at, s.parent_session_id, s.ended_at, s.end_reason
SELECT s.id, {source_expr}, {session_source_expr}, s.title, s.started_at, s.parent_session_id, s.ended_at, s.end_reason, {message_count_expr}
FROM sessions s
WHERE s.id IN ({placeholders})
""",
Expand All @@ -730,9 +781,137 @@ def read_session_lineage_metadata(db_path: Path, session_ids: list[str] | set[st
parent_id = rows.get(sid, {}).get('parent_session_id')
if parent_id and parent_id not in rows and parent_id not in to_fetch:
to_fetch.add(parent_id)

# Fetch descendants from the discovered ancestors using the parent
# index. This keeps the sidebar read scoped while still giving the
# collapse metadata enough information to choose the active branch.
to_expand = set(rows)
expanded: set[str] = set()
for _hop in range(20):
frontier = [sid for sid in to_expand if sid not in expanded]
if not frontier:
break
to_expand = set()
for i in range(0, len(frontier), IN_CHUNK):
chunk = frontier[i:i + IN_CHUNK]
placeholders = ','.join('?' * len(chunk))
cur.execute(
f"""
SELECT s.id, {source_expr}, {session_source_expr}, s.title, s.started_at, s.parent_session_id, s.ended_at, s.end_reason, {message_count_expr}
FROM sessions s
WHERE s.parent_session_id IN ({placeholders})
""",
chunk,
)
for row in cur.fetchall():
child = dict(row)
rows[child['id']] = child
parent_id = child.get('parent_session_id')
parent = rows.get(str(parent_id)) if parent_id else None
if parent and child['id'] not in expanded and _is_continuation_session(parent, child):
to_expand.add(child['id'])
expanded.update(frontier)

message_stats: dict[str, dict] = {}
cur.execute("SELECT name FROM sqlite_master WHERE type = 'table' AND name = 'messages'")
has_messages_table = cur.fetchone() is not None
# Older/minimal state.db schemas can have a `messages` table WITHOUT a
# `timestamp` column (or with a non-numeric one). Detect the columns
# rather than gating on table existence alone: require `session_id`,
# and only select MAX(timestamp) when that column is actually present
# so the query can't raise and collapse the whole lineage metadata.
messages_has_session_id = False
messages_has_timestamp = False
if has_messages_table:
cur.execute("PRAGMA table_info(messages)")
_message_cols = {row[1] for row in cur.fetchall()}
messages_has_session_id = 'session_id' in _message_cols
messages_has_timestamp = 'timestamp' in _message_cols
use_messages_query = has_messages_table and messages_has_session_id
row_ids = list(rows)
if use_messages_query:
last_at_expr = "MAX(timestamp) AS last_message_at" if messages_has_timestamp else "NULL AS last_message_at"
for i in range(0, len(row_ids), IN_CHUNK):
chunk = row_ids[i:i + IN_CHUNK]
placeholders = ','.join('?' * len(chunk))
cur.execute(
f"""
SELECT session_id, COUNT(*) AS actual_message_count, {last_at_expr}
FROM messages
WHERE session_id IN ({placeholders})
GROUP BY session_id
""",
chunk,
)
for row in cur.fetchall():
message_stats[row['session_id']] = dict(row)
for sid, row in rows.items():
stats = message_stats.get(sid) or {}
if use_messages_query:
row['actual_message_count'] = int(stats.get('actual_message_count') or 0)
else:
row['actual_message_count'] = int(row.get('message_count') or 0)
row['last_message_at'] = stats.get('last_message_at')
except Exception:
return {}

children_by_parent: dict[str, list[dict]] = {}
for row in rows.values():
parent_id = row.get('parent_session_id')
if parent_id:
children_by_parent.setdefault(parent_id, []).append(row)

def continuation_root_and_depth(sid: str) -> tuple[str, int]:
root_id = sid
current_id = sid
depth = 1
seen = {sid}
while True:
current = rows.get(current_id)
raw_parent_id = current.get('parent_session_id') if current else None
parent_id = str(raw_parent_id) if raw_parent_id else ''
if not parent_id:
break
parent = rows.get(parent_id)
if not parent or parent_id in seen:
break
if not _is_continuation_session(parent, current):
break
root_id = parent_id
current_id = parent_id
seen.add(parent_id)
depth += 1
return root_id, depth

def freshest_continuation_tip(root_id: str) -> tuple[str, int]:
best_id = root_id
best_depth = 1
segment_count = 0
best_score = _as_score(rows.get(root_id, {}).get('last_message_at'), rows.get(root_id, {}).get('started_at'))
stack: list[tuple[str, int]] = [(root_id, 1)]
seen: set[str] = set()
while stack:
current_id, depth = stack.pop()
if current_id in seen:
continue
seen.add(current_id)
current = rows.get(current_id)
if not current:
continue
segment_count += 1
actual_count = int(current.get('actual_message_count') or 0)
score = _as_score(current.get('last_message_at'), current.get('started_at'))
if actual_count > 0 and (score > best_score or (score == best_score and depth >= best_depth)):
best_id = current_id
best_depth = depth
best_score = score
for child in children_by_parent.get(current_id, []):
if _is_continuation_session(current, child):
stack.append((child['id'], depth + 1))

return best_id, max(segment_count, best_depth)

lineage_tip_cache: dict[str, tuple[str, int]] = {}
metadata: dict[str, dict] = {}
for sid in wanted:
row = rows.get(sid)
Expand Down Expand Up @@ -770,26 +949,15 @@ def read_session_lineage_metadata(db_path: Path, session_ids: list[str] | set[st
entry['_parent_lineage_root_id'] = parent_root
continue

root_id = sid
current_id = sid
segment_count = 1
seen = {sid}
while True:
current = rows.get(current_id)
parent_id = current.get('parent_session_id') if current else None
parent = rows.get(parent_id) if parent_id else None
if not parent or parent_id in seen:
break
if not _is_continuation_session(parent, current):
break
root_id = parent_id
current_id = parent_id
seen.add(parent_id)
segment_count += 1
root_id, segment_count = continuation_root_and_depth(sid)

if root_id != sid:
entry = metadata.setdefault(sid, {})
entry['_lineage_root_id'] = root_id
entry['_compression_segment_count'] = segment_count
if root_id not in lineage_tip_cache:
lineage_tip_cache[root_id] = freshest_continuation_tip(root_id)
tip_id, tip_depth = lineage_tip_cache[root_id]
entry['_lineage_tip_id'] = tip_id
entry['_compression_segment_count'] = max(segment_count, tip_depth)

return metadata
Loading
Loading