Skip to content
7 changes: 6 additions & 1 deletion orchestrator/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,17 @@ def REDIS_URL(self) -> str:
MEMORY_SESSION_CONSOLIDATION_TTL_SECONDS: int = int(os.getenv("MEMORY_SESSION_CONSOLIDATION_TTL_SECONDS", "3600"))
# L3 Cache: TTL for Mem0 search result caching in Redis
MEMORY_CACHE_TTL_SECONDS: int = int(os.getenv("MEMORY_CACHE_TTL_SECONDS", "300"))
# Context Router: per-source sub-budgets (tokens)
# Context Router: per-source sub-budgets (tokens).
# Fallback only — used when the model context window is unknown. When the
# window is known, ContextRouter._compute_budgets derives budgets as a
# proportion of the usable window instead (PRD-141 US-011).
CONTEXT_BUDGET_SESSION: int = int(os.getenv("CONTEXT_BUDGET_SESSION", "500"))
CONTEXT_BUDGET_LONG_TERM: int = int(os.getenv("CONTEXT_BUDGET_LONG_TERM", "800"))
CONTEXT_BUDGET_TEMPORAL: int = int(os.getenv("CONTEXT_BUDGET_TEMPORAL", "600"))
CONTEXT_BUDGET_DAILY: int = int(os.getenv("CONTEXT_BUDGET_DAILY", "400"))
CONTEXT_BUDGET_AWARENESS: int = int(os.getenv("CONTEXT_BUDGET_AWARENESS", "200"))
CONTEXT_BUDGET_TOOLS: int = int(os.getenv("CONTEXT_BUDGET_TOOLS", "1000"))
CONTEXT_BUDGET_SYSTEM_PROMPT: int = int(os.getenv("CONTEXT_BUDGET_SYSTEM_PROMPT", "600"))
# Knowledge awareness: TTL for per-workspace capability map cached in Redis
MEMORY_AWARENESS_CACHE_TTL_SECONDS: int = int(os.getenv("MEMORY_AWARENESS_CACHE_TTL_SECONDS", "600"))
# L2 Decay: Ebbinghaus decay rate (higher = faster forgetting)
Expand Down
10 changes: 10 additions & 0 deletions orchestrator/consumers/chatbot/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -1482,6 +1482,16 @@ async def _run_tool_loop(
# Max iterations reached
if iteration >= max_iterations:
logger.warning(f"Max tool iterations ({max_iterations}) reached. Forcing final response.")
yield self.streaming_handler.format_aisdk_limit_reached(
limit="max_tool_iterations",
value=max_iterations,
message=(
f"I reached the maximum of {max_iterations} tool steps for a "
"single response, so I'm answering with what I have so far. "
"An admin can raise this via the CHATBOT_MAX_TOOL_ITERATIONS "
"setting (or the workspace power-mode caps)."
),
)
final = await agent_runtime.llm_manager.generate_response(
messages=llm_messages, tools=None,
)
Expand Down
349 changes: 112 additions & 237 deletions orchestrator/consumers/chatbot/smart_tool_router.py

Large diffs are not rendered by default.

9 changes: 9 additions & 0 deletions orchestrator/consumers/chatbot/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,15 @@ def format_aisdk_data(self, event_type: str, data: Dict[str, Any] = None) -> str
payload["data"] = data
return f'd:{json.dumps(payload)}\n'

def format_aisdk_limit_reached(self, limit: str, value: int, message: str) -> str:
"""Format a limit_reached event so the user is told an agent stopped
because it hit a cap (instead of silently bailing). Carries limit/value
under the AI SDK data envelope like every other data event."""
return self.format_aisdk_data(
"limit_reached",
{"limit": limit, "value": value, "message": message},
)

def format_aisdk_chat_id(self, chat_id: str) -> str:
"""Format chat-id data event."""
return f'd:{{"type":"chat-id","chatId":"{chat_id}"}}\n'
Expand Down
55 changes: 46 additions & 9 deletions orchestrator/core/context_guard.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@

Strategy:
1. Count tokens in the full message payload (system + user + assistant + tool)
2. If below 80% of model context → pass through unchanged
3. If above 80% → compact: summarize older turns, keep recent context
2. Resolve a model-aware compaction threshold + kept-turns from the context
window (_thresholds_for_model): bigger windows compact later and keep more
turns; small windows compact earlier and keep fewer
3. If below the threshold → pass through unchanged; above → compact: summarize
older turns, keep recent context
4. Flush key facts to Mem0 before discarding messages

This prevents context_length_exceeded errors and keeps conversations going
Expand Down Expand Up @@ -137,11 +140,42 @@ def get_context_window(model_name: str, db_session=None) -> int:
# Context Guard
# ---------------------------------------------------------------------------

# Thresholds
# Thresholds — static fallbacks only. The model-aware values come from
# _thresholds_for_model(); these are used when the context window is unknown.
COMPACT_THRESHOLD = 0.80 # Compact when >80% of context used
KEEP_RECENT_TURNS = 6 # Always keep the last N user+assistant messages
SUMMARY_MAX_TOKENS = 500 # Max tokens for the compaction summary


def _thresholds_for_model(context_window: int) -> Tuple[float, int]:
"""Resolve (compact_threshold, keep_recent_turns) for a model's window.

Large-context models can safely fill a higher fraction of the window and
keep more recent turns; small-context models must compact earlier and keep
fewer turns to avoid context_length_exceeded (provider 400s).

Tiers (PRD-141 US-012):
>=200K -> (0.90, 12)
>=100K -> (0.85, 8)
>= 32K -> (0.80, 6)
>= 8K -> (0.75, 4)
else -> (0.70, 3)

An unknown / non-positive window falls back to the static COMPACT_THRESHOLD
and KEEP_RECENT_TURNS constants.
"""
if not context_window or context_window <= 0:
return COMPACT_THRESHOLD, KEEP_RECENT_TURNS
if context_window >= 200_000:
return 0.90, 12
if context_window >= 100_000:
return 0.85, 8
if context_window >= 32_000:
return 0.80, 6
if context_window >= 8_000:
return 0.75, 4
return 0.70, 3

# PRD-123 Pattern #7: Proactive compaction thresholds
PROACTIVE_COMPACT_AFTER_TURNS = int(
__import__("os").getenv("PROACTIVE_COMPACT_AFTER_TURNS", "8")
Expand Down Expand Up @@ -182,10 +216,11 @@ async def check_and_compact(
(messages, was_compacted, tools) — tools may be None if they don't fit
"""
context_window = get_context_window(model_name, db_session)
compact_threshold, keep_recent_turns = _thresholds_for_model(context_window)
tool_tokens = count_tool_tokens(tools)
current_tokens = count_message_tokens(messages)
total_tokens = current_tokens + tool_tokens
threshold = int(context_window * COMPACT_THRESHOLD)
threshold = int(context_window * compact_threshold)

logger.debug(
"[ContextGuard] tokens=%d (msgs=%d tools=%d) / %d (%.0f%% of %d window)",
Expand Down Expand Up @@ -217,6 +252,7 @@ async def check_and_compact(
llm_manager=llm_manager,
workspace_id=workspace_id,
agent_id=agent_id,
keep_recent_turns=keep_recent_turns,
)

new_tokens = count_message_tokens(compacted)
Expand All @@ -233,6 +269,7 @@ async def _compact(
llm_manager: Any,
workspace_id: Optional[str] = None,
agent_id: Optional[int] = None,
keep_recent_turns: int = KEEP_RECENT_TURNS,
) -> List[Dict[str, Any]]:
"""
Compact messages by summarizing older turns.
Expand All @@ -253,12 +290,12 @@ async def _compact(
conversation.append(msg)

# If conversation is short enough, keep everything
if len(conversation) <= KEEP_RECENT_TURNS:
if len(conversation) <= keep_recent_turns:
return messages

# Split: old turns (to summarize) | recent turns (to keep)
old_turns = conversation[:-KEEP_RECENT_TURNS]
recent_turns = conversation[-KEEP_RECENT_TURNS:]
old_turns = conversation[:-keep_recent_turns]
recent_turns = conversation[-keep_recent_turns:]

# Build text from old turns for summarization
old_text = self._turns_to_text(old_turns)
Expand All @@ -283,8 +320,8 @@ async def _compact(
"_compact_tombstone": {
"compacted_count": len(old_turns),
"compacted_roles": [m.get("role") for m in old_turns],
"summary_token_est": self.count_tokens(summary),
"original_token_est": sum(self.count_message_tokens(m) for m in old_turns),
"summary_token_est": count_tokens(summary),
"original_token_est": count_message_tokens(old_turns),
},
}

Expand Down
1 change: 1 addition & 0 deletions orchestrator/modules/context/sections/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ async def _load_filtered(
available_tools=all_tools,
conversation_context=conversation_context,
tool_hints=tool_hints,
agent_id=agent_id,
)

if not result.should_include_tools:
Expand Down
69 changes: 63 additions & 6 deletions orchestrator/modules/memory/context_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,27 @@
logger = logging.getLogger(__name__)


# ---------------------------------------------------------------------------
# Context budget weights (PRD-141 US-011)
# ---------------------------------------------------------------------------
# Each section gets a fixed proportion of the *usable* context window
# (usable = 80% of the raw window, reserving 20% for the model's response).
# Weights sum to 0.80 of the usable window — the remaining 0.20 is slack for
# estimator error and untracked overhead. ``tools`` and ``system_prompt`` are
# reserved headroom the router does not fill itself — they keep the memory
# sections from claiming space the prompt assembler needs.
_CONTEXT_BUDGET_WEIGHTS: Dict[str, float] = {
"session": 0.10,
"long_term": 0.15,
"temporal": 0.10,
"daily": 0.08,
"awareness": 0.05,
"tools": 0.20,
"system_prompt": 0.12,
}
_USABLE_WINDOW_FRACTION = 0.80


# ---------------------------------------------------------------------------
# ContextSignals — output of query analysis
# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -345,6 +366,41 @@ def _estimate_tokens(text: str) -> int:
"""Cheap token estimate: ~4 chars per token."""
return len(text) // 4 if text else 0

@staticmethod
def _compute_budgets(context_window: Optional[int]) -> Dict[str, int]:
"""Resolve per-section token budgets for context assembly.

When the model's ``context_window`` is known (a positive int), each
section gets a fixed proportion of the *usable* window
(``usable = int(context_window * 0.80)``), so budgets scale with the
model — a 128K model gets far larger sections than an 8K model.

When the window is unknown (``None`` / non-positive), fall back to the
static ``CONTEXT_BUDGET_*`` config values. The config values are
therefore a fallback only, never the primary source when a window is
available.

Returns a dict keyed by section name with token budgets.
"""
from config import config

if not context_window or context_window <= 0:
return {
"session": config.CONTEXT_BUDGET_SESSION,
"long_term": config.CONTEXT_BUDGET_LONG_TERM,
"temporal": config.CONTEXT_BUDGET_TEMPORAL,
"daily": config.CONTEXT_BUDGET_DAILY,
"awareness": config.CONTEXT_BUDGET_AWARENESS,
"tools": config.CONTEXT_BUDGET_TOOLS,
"system_prompt": config.CONTEXT_BUDGET_SYSTEM_PROMPT,
}

usable = int(context_window * _USABLE_WINDOW_FRACTION)
return {
name: int(usable * weight)
for name, weight in _CONTEXT_BUDGET_WEIGHTS.items()
}

@staticmethod
def _truncate_to_budget(text: str, token_budget: int) -> str:
"""Truncate *text* so its estimated token count fits within *token_budget*."""
Expand Down Expand Up @@ -384,6 +440,7 @@ async def retrieve_context(
agent_id: int,
query: str,
conversation_id: Optional[str] = None,
context_window: Optional[int] = None,
) -> ContextBundle:
"""
Assemble a budget-constrained context bundle by fetching from L1/L2/L3
Expand All @@ -400,17 +457,17 @@ async def retrieve_context(
All layer fetches are concurrent via ``asyncio.gather``.
Any single-layer failure is logged and skipped — never breaks the bundle.
"""
from config import config
from modules.memory.unified_memory_service import get_unified_memory_service

service = get_unified_memory_service()
signals = self.analyze_query(query)

budget_session = config.CONTEXT_BUDGET_SESSION
budget_long_term = config.CONTEXT_BUDGET_LONG_TERM
budget_temporal = config.CONTEXT_BUDGET_TEMPORAL
budget_daily = config.CONTEXT_BUDGET_DAILY
budget_awareness = config.CONTEXT_BUDGET_AWARENESS
budgets = self._compute_budgets(context_window)
budget_session = budgets["session"]
budget_long_term = budgets["long_term"]
budget_temporal = budgets["temporal"]
budget_daily = budgets["daily"]
budget_awareness = budgets["awareness"]

# ----- Determine which fetches to launch -----
fetch_session = (
Expand Down
2 changes: 2 additions & 0 deletions orchestrator/modules/memory/unified_memory_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -1662,6 +1662,7 @@ async def retrieve_context(
agent_id: int,
query: str,
conversation_id: Optional[str] = None,
context_window: Optional[int] = None,
) -> "ContextBundle":
"""
Assemble a budget-constrained context bundle across all memory layers.
Expand All @@ -1683,6 +1684,7 @@ async def retrieve_context(
agent_id=agent_id,
query=query,
conversation_id=conversation_id,
context_window=context_window,
)
except Exception:
logger.error(
Expand Down
6 changes: 6 additions & 0 deletions orchestrator/modules/tools/discovery/action_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ def get_by_category(self, category: str) -> List[ActionDefinition]:
self._ensure_initialized()
return [a for a in self._actions.values() if a.category == category]

def get_by_tags(self, tags: List[str]) -> List[ActionDefinition]:
"""Get actions whose tags intersect any of *tags* (OR semantics)."""
self._ensure_initialized()
wanted = set(tags)
return [a for a in self._actions.values() if wanted.intersection(a.tags)]

def get_by_permission(self, level: str) -> List[ActionDefinition]:
"""Get actions filtered by permission level."""
self._ensure_initialized()
Expand Down
Loading
Loading