From 2c126e4cc3c2f66374673b29f64700e8e29ebcbc Mon Sep 17 00:00:00 2001 From: ycc <957421025@qq.com> Date: Wed, 25 Mar 2026 20:19:57 +0800 Subject: [PATCH] fix(join): make self-leave use stable agent identity --- backend/app.py | 22 +++-- backend/tests/test_leave_agent_auth.py | 121 +++++++++++++++++++++++++ frontend/join.html | 46 +++++++++- 3 files changed, 180 insertions(+), 9 deletions(-) create mode 100644 backend/tests/test_leave_agent_auth.py diff --git a/backend/app.py b/backend/app.py index f64c6757..42d65359 100644 --- a/backend/app.py +++ b/backend/app.py @@ -1108,28 +1108,38 @@ def leave_agent(): agent_id = (data.get("agentId") or "").strip() name = (data.get("name") or "").strip() - if not agent_id and not name: - return jsonify({"ok": False, "msg": "请提供 agentId 或名字"}), 400 + join_key = (data.get("joinKey") or "").strip() + owner_authed = _is_asset_editor_authed() + + if owner_authed: + if not agent_id and not name: + return jsonify({"ok": False, "msg": "请提供 agentId 或名字"}), 400 + else: + if not agent_id or not join_key: + return jsonify({"ok": False, "msg": "请提供 agentId 和 joinKey"}), 400 agents = load_agents_state() target = None if agent_id: target = next((a for a in agents if a.get("agentId") == agent_id and not a.get("isMain")), None) - if (not target) and name: + if owner_authed and (not target) and name: # fallback: remove by name only if agentId not provided target = next((a for a in agents if a.get("name") == name and not a.get("isMain")), None) if not target: return jsonify({"ok": False, "msg": "没有找到要离开的 agent"}), 404 - join_key = target.get("joinKey") + if not owner_authed and target.get("joinKey") != join_key: + return jsonify({"ok": False, "msg": "joinKey 不匹配"}), 403 + + target_join_key = target.get("joinKey") new_agents = [a for a in agents if a.get("isMain") or a.get("agentId") != target.get("agentId")] # Optional: free key back to unused after leave keys_data = load_join_keys() - if join_key: - key_item = next((k for k in keys_data.get("keys", []) if k.get("key") == join_key), None) + if target_join_key: + key_item = next((k for k in keys_data.get("keys", []) if k.get("key") == target_join_key), None) if key_item: key_item["used"] = False key_item["usedBy"] = None diff --git a/backend/tests/test_leave_agent_auth.py b/backend/tests/test_leave_agent_auth.py new file mode 100644 index 00000000..b6226023 --- /dev/null +++ b/backend/tests/test_leave_agent_auth.py @@ -0,0 +1,121 @@ +from __future__ import annotations + +import json +import sys +import tempfile +import unittest +from pathlib import Path + +ROOT_DIR = Path(__file__).resolve().parents[2] +BACKEND_DIR = ROOT_DIR / "backend" +STATE_FILE = ROOT_DIR / "state.json" +STATE_FILE_EXISTED = STATE_FILE.exists() + +if str(BACKEND_DIR) not in sys.path: + sys.path.insert(0, str(BACKEND_DIR)) + +import app as backend_app # noqa: E402 + + +class LeaveAgentAuthTestCase(unittest.TestCase): + def setUp(self) -> None: + self.tempdir = tempfile.TemporaryDirectory() + tmp_root = Path(self.tempdir.name) + self.agents_file = tmp_root / "agents-state.json" + self.join_keys_file = tmp_root / "join-keys.json" + + self._orig_agents_file = backend_app.AGENTS_STATE_FILE + self._orig_join_keys_file = backend_app.JOIN_KEYS_FILE + + backend_app.AGENTS_STATE_FILE = str(self.agents_file) + backend_app.JOIN_KEYS_FILE = str(self.join_keys_file) + + self.client = backend_app.app.test_client() + self._write_fixture_files() + + def tearDown(self) -> None: + backend_app.AGENTS_STATE_FILE = self._orig_agents_file + backend_app.JOIN_KEYS_FILE = self._orig_join_keys_file + self.tempdir.cleanup() + + def _write_fixture_files(self) -> None: + self.agents_file.write_text( + json.dumps( + [ + {"agentId": "star", "isMain": True}, + { + "agentId": "agent_1", + "isMain": False, + "name": "guest-1", + "joinKey": "ocj_demo", + "authStatus": "approved", + }, + ], + ensure_ascii=False, + ), + encoding="utf-8", + ) + self.join_keys_file.write_text( + json.dumps( + { + "keys": [ + { + "key": "ocj_demo", + "used": True, + "usedBy": "guest-1", + "usedByAgentId": "agent_1", + "usedAt": "2026-03-25T00:00:00", + } + ] + }, + ensure_ascii=False, + ), + encoding="utf-8", + ) + + def _login_asset_editor(self) -> None: + response = self.client.post("/assets/auth", json={"password": "1234"}) + self.assertEqual(response.status_code, 200) + self.assertTrue(response.get_json()["ok"]) + + def test_leave_agent_rejects_name_only_without_auth(self) -> None: + response = self.client.post("/leave-agent", json={"name": "guest-1"}) + self.assertEqual(response.status_code, 400) + self.assertIn("agentId 和 joinKey", response.get_json()["msg"]) + + def test_leave_agent_requires_matching_join_key_for_self_service(self) -> None: + response = self.client.post( + "/leave-agent", + json={"agentId": "agent_1", "joinKey": "wrong-key", "name": "guest-1"}, + ) + self.assertEqual(response.status_code, 403) + + response = self.client.post( + "/leave-agent", + json={"agentId": "agent_1", "joinKey": "ocj_demo", "name": "guest-1"}, + ) + self.assertEqual(response.status_code, 200) + self.assertTrue(response.get_json()["ok"]) + + saved_agents = json.loads(self.agents_file.read_text(encoding="utf-8")) + self.assertEqual(saved_agents, [{"agentId": "star", "isMain": True}]) + + saved_keys = json.loads(self.join_keys_file.read_text(encoding="utf-8")) + self.assertFalse(saved_keys["keys"][0]["used"]) + self.assertIsNone(saved_keys["keys"][0]["usedBy"]) + + def test_leave_agent_by_name_still_works_for_owner_session(self) -> None: + self._login_asset_editor() + + response = self.client.post("/leave-agent", json={"name": "guest-1"}) + self.assertEqual(response.status_code, 200) + self.assertTrue(response.get_json()["ok"]) + + +def tearDownModule() -> None: + if not STATE_FILE_EXISTED and STATE_FILE.exists(): + STATE_FILE.unlink() + + +if __name__ == "__main__": + unittest.main() diff --git a/frontend/join.html b/frontend/join.html index d2d3ab20..2b5e80b4 100644 --- a/frontend/join.html +++ b/frontend/join.html @@ -124,6 +124,7 @@