Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions packages/decepticon/decepticon/tools/web/jwt.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,10 @@ def parse_token(token: str) -> JWTToken:
tok.findings.append("alg=HS256 with jku header — key confusion candidate")
if header.kid and ("../" in header.kid or "%2f" in header.kid.lower()):
tok.findings.append("kid contains path traversal — file read / SQLi candidate")
if header.jku and not header.jku.startswith("https://"):
tok.findings.append("jku over non-HTTPS or attacker-controlled host — key confusion")
if header.jku:
tok.findings.append("jku points at an attacker-influenced host — key confusion")
if header.x5u:
tok.findings.append("x5u points at an attacker-influenced host — key confusion")
if claims.expired:
tok.findings.append("token already expired — test whether server enforces exp")
if claims.exp is None:
Expand Down
29 changes: 25 additions & 4 deletions packages/decepticon/decepticon/tools/web/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@

from __future__ import annotations

import asyncio
import json
import os
from typing import Any
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Coroutine, TypeVar

from langchain_core.tools import tool

Expand All @@ -24,6 +26,27 @@ def _json(data: Any) -> str:
return json.dumps(data, indent=2, default=str, ensure_ascii=False)


_T = TypeVar("_T")


def _run_coro(coro: Coroutine[Any, Any, _T]) -> _T:
"""Run ``coro`` to completion regardless of the calling context.

These sync ``@tool`` wrappers are invoked both from plain sync code
and from inside the agent's running event loop. ``asyncio.run`` is the
correct primitive but raises if a loop is already running, so when one
is detected the coroutine is driven on a fresh thread that owns its own
loop. On Python 3.13 ``get_event_loop().run_until_complete`` no longer
works in either context, hence this helper.
"""
try:
asyncio.get_running_loop()
except RuntimeError:
return asyncio.run(coro)
with ThreadPoolExecutor(max_workers=1) as pool:
return pool.submit(lambda: asyncio.run(coro)).result()


@tool
def jwt_parse(token: str) -> str:
"""Parse a JWT and surface known header / claim findings.
Expand Down Expand Up @@ -186,8 +209,6 @@ def http_request(
Returns:
JSON with status, headers, body (truncated), elapsed_ms, request_id
"""
import asyncio

try:
headers = json.loads(headers_json) if headers_json else {}
except json.JSONDecodeError:
Expand All @@ -205,7 +226,7 @@ async def _do():
return resp

try:
resp = asyncio.get_event_loop().run_until_complete(_do())
resp = _run_coro(_do())
return _json(
{
"status": resp.status,
Expand Down
20 changes: 20 additions & 0 deletions packages/decepticon/tests/unit/web/test_jwt.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,26 @@ def test_jku_non_https_flagged(self) -> None:
parsed = parse_token(t)
assert any("jku" in f for f in parsed.findings)

def test_jku_https_attacker_host_flagged(self) -> None:
t = forge_token(
{"sub": "f"},
alg="HS256",
secret="k",
header={"jku": "https://attacker.com/jwks.json"},
)
parsed = parse_token(t)
assert any("jku" in f for f in parsed.findings)

def test_x5u_https_attacker_host_flagged(self) -> None:
t = forge_token(
{"sub": "f"},
alg="HS256",
secret="k",
header={"x5u": "https://attacker.com/cert.pem"},
)
parsed = parse_token(t)
assert any("x5u" in f for f in parsed.findings)


class TestForgeVerify:
@pytest.mark.parametrize("alg", ["HS256", "HS384", "HS512"])
Expand Down
68 changes: 68 additions & 0 deletions packages/decepticon/tests/unit/web/test_tools.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
"""Tests for the web @tool wrappers (http_request event-loop safety)."""

from __future__ import annotations

import asyncio
import json

import pytest

from decepticon.tools.web import tools
from decepticon.tools.web.http import HTTPResponse


class _FakeSession:
async def request(
self,
method: str,
url: str,
*,
headers: dict[str, str] | None = None,
body: bytes | None = None,
tag: str = "",
) -> HTTPResponse:
return HTTPResponse(
id="resp-1",
request_id="req-1",
status=200,
headers={"content-type": "text/plain"},
body=b"pong",
elapsed_ms=1.5,
timestamp=2.0,
)


@pytest.fixture
def patched_session(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(tools, "_get_session", lambda: _FakeSession())

Check notice

Code scanning / CodeQL

Unnecessary lambda Note test

This 'lambda' is just a simple wrapper around a callable object. Use that object directly.


def test_http_request_returns_parsed_result_from_sync_context(patched_session: None) -> None:
raw = tools.http_request.invoke({"method": "GET", "url": "https://target.test/ping"})
parsed = json.loads(raw)
assert "error" not in parsed
assert parsed["status"] == 200
assert parsed["body"] == "pong"


async def test_http_request_works_inside_running_event_loop(patched_session: None) -> None:
loop = asyncio.get_running_loop()

raw = await loop.run_in_executor(
None,
lambda: tools.http_request.invoke({"method": "GET", "url": "https://target.test/ping"}),
)

parsed = json.loads(raw)
assert "error" not in parsed
assert parsed["status"] == 200
assert parsed["body"] == "pong"


async def test_run_coro_helper_works_with_active_running_loop() -> None:
asyncio.get_running_loop()

async def _coro() -> str:
return "ok"

assert tools._run_coro(_coro()) == "ok"
Loading