-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathfull_agent_loop.py
More file actions
298 lines (263 loc) · 11.2 KB
/
full_agent_loop.py
File metadata and controls
298 lines (263 loc) · 11.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
"""Reference agent loop example demonstrating all four phases.
This script shows a deterministic, in-memory loop:
1. Route: shortlist candidate tools and show choice cards.
2. Call: inject only the selected tool schema into a call prompt.
3. Interpret: ingest a large tool result and let the firewall summarize it.
4. Answer: build the final response context from the accumulated history.
"""
from __future__ import annotations
import json
from contextweaver.config import ContextBudget
from contextweaver.context.manager import ContextManager
from contextweaver.routing.catalog import Catalog, generate_sample_catalog, load_catalog_dicts
from contextweaver.routing.router import Router
from contextweaver.routing.tree import TreeBuilder
from contextweaver.types import ContextItem, ItemKind, Phase
def _token_count(pack_prompt_tokens: dict[str, int], header_footer_tokens: int) -> int:
"""Compute total tokens represented in BuildStats."""
return sum(pack_prompt_tokens.values()) + header_footer_tokens
def _print_phase(
name: str,
prompt: str,
stats_total: int,
stats_included: int,
stats_dropped: int,
stats_dedup: int,
stats_hf_tokens: int,
stats_tokens_per_section: dict[str, int],
) -> None:
"""Print prompt text and compact diagnostics for one phase."""
print(f"\n{'=' * 80}")
print(f"PHASE: {name}")
print(f"{'=' * 80}")
print(prompt)
print("\n--- BuildStats ---")
print(f"total_candidates: {stats_total}")
print(f"included_count: {stats_included}")
print(f"dropped_count: {stats_dropped}")
print(f"dedup_removed: {stats_dedup}")
print(f"header_footer_tokens: {stats_hf_tokens}")
print(f"tokens_per_section: {stats_tokens_per_section}")
print(f"token_count: {_token_count(stats_tokens_per_section, stats_hf_tokens)}")
def _build_catalog() -> Catalog:
"""Create a deterministic catalog with >50 tools and explicit schemas on target tools."""
raw = generate_sample_catalog(n=60, seed=42)
# Add full schemas for two tools so call-phase hydration is meaningful.
for item in raw:
if item["id"] == "analytics.metrics.query":
item["args_schema"] = {
"type": "object",
"properties": {
"metric": {"type": "string", "description": "Metric identifier"},
"window_days": {"type": "integer", "description": "Trailing time window"},
"group_by": {"type": "string", "description": "Aggregation bucket"},
},
"required": ["metric", "window_days"],
}
item["examples"] = [
"metrics_query(metric='daily_active_users', window_days=30, group_by='day')"
]
item["constraints"] = {"max_window_days": 365, "read_only": True}
if item["id"] == "billing.reports.revenue":
item["args_schema"] = {
"type": "object",
"properties": {
"start_date": {"type": "string", "description": "ISO date"},
"end_date": {"type": "string", "description": "ISO date"},
},
"required": ["start_date", "end_date"],
}
item["examples"] = ["revenue_report(start_date='2026-01-01', end_date='2026-01-31')"]
item["constraints"] = {"read_only": True}
if item["id"] == "billing.subscriptions.list":
item["args_schema"] = {
"type": "object",
"properties": {
"status": {
"type": "string",
"description": "Filter status such as active, trialing, or canceled",
},
"limit": {"type": "integer", "description": "Maximum records to return"},
},
"required": [],
}
item["examples"] = ["subscriptions_list(status='active', limit=50)"]
item["constraints"] = {"read_only": True}
catalog = Catalog()
for item in load_catalog_dicts(raw):
catalog.register(item)
return catalog
def _pick_tool(route_ids: list[str], catalog: Catalog) -> str:
"""Simulate model selection: pick the first routed tool with an explicit schema.
This function acts as a deterministic stand-in for an LLM's tool-selection step.
It returns the first candidate (in routing-rank order) that carries a full argument
schema, so the call-phase prompt can demonstrate schema hydration. If no candidate
has a schema, it falls back to the top-ranked candidate.
Args:
route_ids: Ordered list of candidate tool IDs from the router.
catalog: The catalog used to look up item schemas.
Returns:
The selected tool ID.
Raises:
ValueError: If the router returned no candidate tools.
"""
if not route_ids:
raise ValueError(
"Router returned no candidates. Ensure the catalog contains reachable items."
)
for item_id in route_ids:
if catalog.get(item_id).args_schema:
return item_id
return route_ids[0]
def _simulate_large_result(tool_id: str) -> str:
"""Build a large deterministic JSON payload to trigger firewall summarization.
The payload is generic (not tool-specific) so it remains valid regardless of
which tool _pick_tool() selects.
"""
rows = []
for idx in range(1, 101):
rows.append(
{
"record_id": idx,
"value": 1500 + idx,
"tool": tool_id,
}
)
payload = {
"status": "ok",
"rows": rows,
"summary": {
"count": len(rows),
"max": max(row["value"] for row in rows),
"min": min(row["value"] for row in rows),
},
}
return json.dumps(payload, sort_keys=True)
def main() -> None:
"""Run the full route -> call -> interpret -> answer loop."""
budget = ContextBudget(route=500, call=800, interpret=600, answer=1000)
manager = ContextManager(budget=budget)
catalog = _build_catalog()
items = catalog.all()
graph = TreeBuilder(max_children=12).build(items)
router = Router(graph, items=items, beam_width=3, top_k=6)
user_query = "What is the trend of daily active users over the last 30 days?"
manager.ingest(
ContextItem(
id="u1",
kind=ItemKind.user_turn,
text=user_query,
)
)
# Phase 1: route prompt + choice cards from a large catalog.
route_pack, cards, route_result = manager.build_route_prompt_sync(
goal="Select the single best analytics tool for the user request.",
query=user_query,
router=router,
)
_print_phase(
name="route",
prompt=route_pack.prompt,
stats_total=route_pack.stats.total_candidates,
stats_included=route_pack.stats.included_count,
stats_dropped=route_pack.stats.dropped_count,
stats_dedup=route_pack.stats.dedup_removed,
stats_hf_tokens=route_pack.stats.header_footer_tokens,
stats_tokens_per_section=route_pack.stats.tokens_per_section,
)
print(f"choice_cards: {len(cards)}")
print(f"routed_candidates: {route_result.candidate_ids}")
selected_tool_id = _pick_tool(route_result.candidate_ids, catalog)
# Hydrate the selected tool — returns full schema/examples/constraints as HydrationResult.
# catalog.hydrate() is the idiomatic post-routing call before building the call-phase prompt.
hydrated = catalog.hydrate(selected_tool_id)
print(f"model_selected_tool_id: {selected_tool_id}")
manager.ingest(
ContextItem(
id="a1",
kind=ItemKind.agent_msg,
text=f"I will call {selected_tool_id} to answer the question.",
parent_id="u1",
)
)
# Build a call text that is internally consistent with the selected tool's schema.
# Arguments are derived from the hydrated schema so the call-phase prompt is always valid.
props = hydrated.args_schema.get("properties", {})
required_keys = hydrated.args_schema.get("required") or list(props.keys())
call_args = ", ".join(f"{k}=..." for k in (required_keys or list(props.keys()))[:3])
manager.ingest(
ContextItem(
id="tc1",
kind=ItemKind.tool_call,
text=f"{selected_tool_id}({call_args})",
parent_id="u1",
)
)
# Phase 2: call prompt with only the selected tool schema injected.
call_pack = manager.build_call_prompt_sync(
tool_id=selected_tool_id,
query=user_query,
catalog=catalog,
)
_print_phase(
name="call",
prompt=call_pack.prompt,
stats_total=call_pack.stats.total_candidates,
stats_included=call_pack.stats.included_count,
stats_dropped=call_pack.stats.dropped_count,
stats_dedup=call_pack.stats.dedup_removed,
stats_hf_tokens=call_pack.stats.header_footer_tokens,
stats_tokens_per_section=call_pack.stats.tokens_per_section,
)
print(f"selected_schema_keys: {sorted(hydrated.args_schema.get('properties', {}).keys())}")
# Execute (simulated): create a large tool result that exceeds firewall threshold.
large_result = _simulate_large_result(selected_tool_id)
processed_item, envelope = manager.ingest_tool_result_sync(
tool_call_id="tc1",
raw_output=large_result,
tool_name=selected_tool_id,
media_type="application/json",
firewall_threshold=1200,
)
# Phase 3: interpret prompt includes firewall summary instead of raw payload.
interpret_pack = manager.build_sync(phase=Phase.interpret, query=user_query)
_print_phase(
name="interpret",
prompt=interpret_pack.prompt,
stats_total=interpret_pack.stats.total_candidates,
stats_included=interpret_pack.stats.included_count,
stats_dropped=interpret_pack.stats.dropped_count,
stats_dedup=interpret_pack.stats.dedup_removed,
stats_hf_tokens=interpret_pack.stats.header_footer_tokens,
stats_tokens_per_section=interpret_pack.stats.tokens_per_section,
)
print(f"raw_result_chars: {len(large_result)}")
print(f"summary_chars: {len(processed_item.text)}")
artifact_handle = processed_item.artifact_ref.handle if processed_item.artifact_ref else "none"
print(f"artifact_ref: {artifact_handle}")
print(f"facts_extracted: {len(envelope.facts)}")
manager.ingest(
ContextItem(
id="a2",
kind=ItemKind.agent_msg,
text="I interpreted the metrics and will now draft the final response.",
parent_id="u1",
)
)
# Phase 4: answer prompt composes final context from prior phases.
answer_pack = manager.build_sync(
phase=Phase.answer,
query="Summarize the 30-day active-user trend for the user.",
)
_print_phase(
name="answer",
prompt=answer_pack.prompt,
stats_total=answer_pack.stats.total_candidates,
stats_included=answer_pack.stats.included_count,
stats_dropped=answer_pack.stats.dropped_count,
stats_dedup=answer_pack.stats.dedup_removed,
stats_hf_tokens=answer_pack.stats.header_footer_tokens,
stats_tokens_per_section=answer_pack.stats.tokens_per_section,
)
if __name__ == "__main__":
main()