diff --git a/src/agentscope_runtime/engine/app/agent_app.py b/src/agentscope_runtime/engine/app/agent_app.py index a9e619671..0f5b8f59a 100644 --- a/src/agentscope_runtime/engine/app/agent_app.py +++ b/src/agentscope_runtime/engine/app/agent_app.py @@ -16,6 +16,7 @@ import uvicorn from a2a.types import A2ARequest from fastapi import FastAPI, Request +from fastapi.responses import JSONResponse from fastapi.middleware.cors import CORSMiddleware from fastapi.params import Depends as DependsClass from fastapi.responses import StreamingResponse @@ -604,13 +605,35 @@ async def get_stream_query_task_status(task_id: str): """Get stream query task status and result""" return self.get_task_status(task_id) + def _check_admin_auth(self, request: Request) -> bool: + """ + Check admin authorization for process-control endpoints. + + When AGENTSCOPE_ADMIN_SECRET is set, require a matching + Authorization: Bearer header from all callers. + When unset, restrict to localhost (127.0.0.1 / ::1) only. + """ + secret = os.getenv("AGENTSCOPE_ADMIN_SECRET") + client_host = (request.client.host if request.client else None) or "" + is_localhost = client_host in ("127.0.0.1", "::1", "localhost") + + if secret: + auth_header = request.headers.get("Authorization", "") + return auth_header == f"Bearer {secret}" + return is_localhost + def _add_process_control_endpoints(self): """Add process control endpoints for detached mode.""" @self.post("/shutdown") @UnifiedRoutingMixin.internal_route - async def shutdown_process_simple(): + async def shutdown_process_simple(request: Request): """Gracefully shutdown the process (simple endpoint).""" + if not self._check_admin_auth(request): + return JSONResponse( + status_code=403, + content={"error": "Forbidden: admin access required"}, + ) import signal async def delayed_shutdown(): @@ -622,8 +645,13 @@ async def delayed_shutdown(): @self.post("/admin/shutdown") @UnifiedRoutingMixin.internal_route - async def shutdown_process(): + async def shutdown_process(request: Request): """Gracefully shutdown the process.""" + if not self._check_admin_auth(request): + return JSONResponse( + status_code=403, + content={"error": "Forbidden: admin access required"}, + ) import signal # Schedule shutdown after response @@ -636,8 +664,13 @@ async def delayed_shutdown(): @self.get("/admin/status") @UnifiedRoutingMixin.internal_route - async def get_process_status(): + async def get_process_status(request: Request): """Get process status information.""" + if not self._check_admin_auth(request): + return JSONResponse( + status_code=403, + content={"error": "Forbidden: admin access required"}, + ) import psutil proc = psutil.Process(os.getpid())