-
Notifications
You must be signed in to change notification settings - Fork 7.9k
s11是存在问题的 ,没有创建任务的工具。 #164
Copy link
Copy link
Open
Description
需要结合s07的代码,进行修改。
docs/zh/s11-autonomous-agents.md中测试样例:Create 3 tasks on the board, then spawn alice and bob. Watch them auto-claim.
由于没有任务管理类,因此不能创建task_1.json task_2.json ...
所以代码不能运行。
粗略解决
1.需要增加任务管理类。
class TaskManager:
def __init__(self, tasks_dir: Path):
self.dir = tasks_dir
self.dir.mkdir(exist_ok=True)
self._next_id = self._max_id() + 1 # 启动时自动计算下一个可用的自增 ID
def _max_id(self) -> int:
"""扫描本地硬盘,找出当前最大的任务 ID"""
ids = [int(f.stem.split("_")[1]) for f in self.dir.glob("task_*.json")]
return max(ids) if ids else 0
def _load(self, task_id: int) -> dict:
"""根据 ID 物理读取 JSON 状态机"""
path = self.dir / f"task_{task_id}.json"
if not path.exists():
raise ValueError(f"Task {task_id} not found")
return json.loads(path.read_text())
def _save(self, task: dict):
"""原子级落地:将任务状态写回硬盘。这里保留了 ensure_ascii=False 防乱码"""
path = self.dir / f"task_{task['id']}.json"
path.write_text(json.dumps(task, indent=2, ensure_ascii=False))
def create(self, subject: str, description: str = "") -> str:
"""创建一个新任务,初始状态为 pending (待办)"""
task = {
"id": self._next_id, "subject": subject, "description": description,
"status": "pending", "blockedBy": [], "owner": "",
}
self._save(task)
self._next_id += 1
return json.dumps(task, indent=2, ensure_ascii=False)
def get(self, task_id: int) -> str:
return json.dumps(self._load(task_id), indent=2, ensure_ascii=False)
def update(self, task_id: int, status: str = None,
add_blocked_by: list = None, remove_blocked_by: list = None) -> str:
"""更新任务的核心逻辑"""
task = self._load(task_id)
if status:
if status not in ("pending", "in_progress", "completed"):
raise ValueError(f"Invalid status: {status}")
task["status"] = status
# 🎯 图联动的魔法:如果大模型把一个任务标为“已完成”,立刻触发全盘的依赖解除!
if status == "completed":
self._clear_dependency(task_id)
if add_blocked_by:
# 使用 set 去重,防止同一个前置任务被重复添加
task["blockedBy"] = list(set(task["blockedBy"] + add_blocked_by))
if remove_blocked_by:
task["blockedBy"] = [x for x in task["blockedBy"] if x not in remove_blocked_by]
self._save(task)
return json.dumps(task, indent=2, ensure_ascii=False)
def _clear_dependency(self, completed_id: int):
"""
依赖解绑器 (Dependency resolution):
遍历硬盘里的每一个任务,如果它们正在等这个刚完成的 completed_id,就把它从阻塞名单里踢掉。
"""
for f in self.dir.glob("task_*.json"):
task = json.loads(f.read_text())
if completed_id in task.get("blockedBy", []):
task["blockedBy"].remove(completed_id)
self._save(task)
def list_all(self) -> str:
"""
生成给大模型看的宏观看板(把底层 JSON 渲染成 Markdown 风格的列表)。
"""
tasks = []
files = sorted(
self.dir.glob("task_*.json"),
key=lambda f: int(f.stem.split("_")[1])
)
for f in files:
tasks.append(json.loads(f.read_text()))
if not tasks:
return "No tasks."
lines = []
for t in tasks:
# 视觉化标识,方便大模型(和人类)一眼看清状态
marker = {"pending": "[ ]", "in_progress": "[>]", "completed": "[x]"}.get(t["status"], "[?]")
blocked = f" (blocked by: {t['blockedBy']})" if t.get("blockedBy") else ""
lines.append(f"{marker} #{t['id']}: {t['subject']}{blocked}")
return "\n".join(lines)
# 实例化全局任务管理器
TASKS = TaskManager(TASKS_DIR)
2.给模型增加任务工具。
# 路由表增加 4 个任务相关的方法
"task_create": lambda **kw: TASKS.create(kw["subject"], kw.get("description", "")),
"task_update": lambda **kw: TASKS.update(kw["task_id"], kw.get("status"), kw.get("addBlockedBy"), kw.get("removeBlockedBy")),
"task_list": lambda **kw: TASKS.list_all(),
"task_get": lambda **kw: TASKS.get(kw["task_id"]),Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels