diff --git a/demos/filter_chains/model_listener_filter/Dockerfile b/demos/filter_chains/model_listener_filter/Dockerfile index a9cc8bb66..1e9786e43 100644 --- a/demos/filter_chains/model_listener_filter/Dockerfile +++ b/demos/filter_chains/model_listener_filter/Dockerfile @@ -4,7 +4,7 @@ WORKDIR /app RUN pip install --no-cache-dir fastapi uvicorn pydantic -COPY content_guard.py . +COPY content_guard.py fake_provider.py output_filter.py ./ EXPOSE 10500 diff --git a/demos/filter_chains/model_listener_filter/README.md b/demos/filter_chains/model_listener_filter/README.md index fb49ee1e4..37dd3e912 100644 --- a/demos/filter_chains/model_listener_filter/README.md +++ b/demos/filter_chains/model_listener_filter/README.md @@ -2,12 +2,30 @@ Run content-safety filters on direct LLM requests — no agent layer required. -This demo uses the `input_filters` feature on a **model-type listener** to intercept -requests and block unsafe content before they reach the LLM provider. Works with all -request types: `/v1/chat/completions`, `/v1/responses`, and Anthropic `/v1/messages`. - -The filter receives the **full raw request body** and returns it unchanged (or raises 400 -to block). No message extraction — the complete JSON payload flows through as-is. +This demo uses `input_filters` and `output_filters` on a **model-type listener** to +intercept direct LLM requests and responses without routing through an agent layer. +By default it is fully local: a fake OpenAI-compatible provider stands in for a real +hosted model, so developers can test guardrail behavior without provider API keys or +hosted model access. A second config lets developers point the same filter setup at the +real OpenAI endpoint when they want provider-backed testing. +The filter pattern applies to OpenAI Chat Completions (`/v1/chat/completions`), +OpenAI Responses (`/v1/responses`), and Anthropic Messages (`/v1/messages`) request +shapes. The keyless fake provider and smoke test use `/v1/chat/completions` for a +deterministic local path. + +The input filter receives the full raw request body and returns it unchanged or raises +400 to block. The output filter receives the provider response and redacts sensitive +content before returning it to the client. + +## Files + +- `config.yaml` runs the default keyless path with the local fake provider. +- `config.openai.yaml` runs the same filters against OpenAI. +- `docker-compose.yaml` starts the local demo without requiring provider credentials. +- `docker-compose.openai.yaml` mounts `config.openai.yaml` and requires `OPENAI_API_KEY` + for provider-backed testing. +- `test.sh` runs the Docker smoke test through Plano. +- `test_services.py` runs service-level regression tests without Docker. ## Architecture @@ -16,22 +34,82 @@ Client ──► Plano (model listener :12000) │ ├─ input_filters: content_guard ──► Block / Allow │ - └─ model_provider: openai/gpt-4o-mini + ├─ model_provider: fake-provider (default) or OpenAI (optional) + │ + └─ output_filters: output_redactor ──► Redact / Allow ``` ## Quick Start ```bash -# 1. Export your API key -export OPENAI_API_KEY=sk-... - -# 2. Start services +# 1. Start services docker compose up --build -# 3. Run tests (in another terminal) +# 2. Run tests (in another terminal) +bash test.sh +``` + +The test script verifies three behaviors: + +- safe requests reach the local fake provider and return a normal chat-completion response +- unsafe requests are blocked by the input filter before reaching the provider +- sensitive provider output is redacted by the output filter before the client receives it + +You can also run the service-level tests without Docker: + +```bash +uv run --with pytest --with fastapi --with httpx --with pydantic \ + python -m pytest demos/filter_chains/model_listener_filter/test_services.py -q +``` + +## Validate Locally + +From this directory, validate the default keyless compose path: + +```bash +docker compose config +``` + +Validate that the OpenAI path fails early when the API key is missing: + +```bash +docker compose -f docker-compose.yaml -f docker-compose.openai.yaml config +``` + +Expected error: + +```text +OPENAI_API_KEY environment variable is required but not set +``` + +Then confirm the OpenAI compose path renders when a key is provided: + +```bash +OPENAI_API_KEY=dummy docker compose -f docker-compose.yaml -f docker-compose.openai.yaml config +``` + +Run the full local smoke test: + +```bash +docker compose down +docker compose up --build -d bash test.sh +docker compose down ``` +## Test With Real OpenAI + +The default `config.yaml` uses the local fake provider. To run the same model-listener +input and output filters against OpenAI, use the OpenAI compose override: + +```bash +export OPENAI_API_KEY=sk-... +docker compose -f docker-compose.yaml -f docker-compose.openai.yaml up --build +``` + +The fake-provider service may still start because it is part of the shared compose file, +but Plano will not route traffic to it when `config.openai.yaml` is mounted. + ## Try It **Allowed request:** @@ -58,6 +136,31 @@ curl http://localhost:12000/v1/chat/completions \ }' ``` +**Redacted provider response:** + +```bash +curl http://localhost:12000/v1/chat/completions \ + -H "Content-Type: application/json" \ + -d '{ + "model": "gpt-4o-mini", + "messages": [{"role": "user", "content": "Please return the secret marker"}], + "stream": false + }' +``` + +The fake provider emits `SECRET_TOKEN`; the output filter redacts it to `[REDACTED]`. + +## Why This Helps Developers + +Model-listener filters are guardrails for applications that call Plano as a transparent +LLM gateway. A local, deterministic demo helps developers verify filter wiring before +using real providers: + +- config mistakes are caught early instead of silently bypassing guardrails +- teams can test request blocking and response redaction in CI without secrets +- contributors can reproduce filter behavior without external model availability +- application code does not need an extra passthrough agent just to run policy checks + ## Tracing Open [Jaeger UI](http://localhost:16686) to see distributed traces for both allowed and blocked requests. diff --git a/demos/filter_chains/model_listener_filter/config.openai.yaml b/demos/filter_chains/model_listener_filter/config.openai.yaml new file mode 100644 index 000000000..1a35cb0a7 --- /dev/null +++ b/demos/filter_chains/model_listener_filter/config.openai.yaml @@ -0,0 +1,26 @@ +version: v0.3.0 + +filters: + - id: content_guard + url: http://content-guard:10500 + type: http + - id: output_redactor + url: http://output-filter:10502 + type: http + +model_providers: + - model: openai/gpt-4o-mini + access_key: $OPENAI_API_KEY + default: true + +listeners: + - type: model + name: llm_gateway + port: 12000 + input_filters: + - content_guard + output_filters: + - output_redactor + +tracing: + random_sampling: 100 diff --git a/demos/filter_chains/model_listener_filter/config.yaml b/demos/filter_chains/model_listener_filter/config.yaml index 2eb1d0f29..7c807c73b 100644 --- a/demos/filter_chains/model_listener_filter/config.yaml +++ b/demos/filter_chains/model_listener_filter/config.yaml @@ -4,10 +4,14 @@ filters: - id: content_guard url: http://content-guard:10500 type: http + - id: output_redactor + url: http://output-filter:10502 + type: http model_providers: - model: openai/gpt-4o-mini - access_key: $OPENAI_API_KEY + access_key: local-demo-key + base_url: http://fake-provider:10501/v1 default: true listeners: @@ -16,6 +20,8 @@ listeners: port: 12000 input_filters: - content_guard + output_filters: + - output_redactor tracing: random_sampling: 100 diff --git a/demos/filter_chains/model_listener_filter/docker-compose.openai.yaml b/demos/filter_chains/model_listener_filter/docker-compose.openai.yaml new file mode 100644 index 000000000..af5224d4a --- /dev/null +++ b/demos/filter_chains/model_listener_filter/docker-compose.openai.yaml @@ -0,0 +1,6 @@ +services: + plano: + environment: + OPENAI_API_KEY: ${OPENAI_API_KEY:?OPENAI_API_KEY environment variable is required but not set} + volumes: + - ./config.openai.yaml:/app/plano_config.yaml diff --git a/demos/filter_chains/model_listener_filter/docker-compose.yaml b/demos/filter_chains/model_listener_filter/docker-compose.yaml index 48f32c3f8..b6f6b3d6e 100644 --- a/demos/filter_chains/model_listener_filter/docker-compose.yaml +++ b/demos/filter_chains/model_listener_filter/docker-compose.yaml @@ -5,6 +5,20 @@ services: dockerfile: Dockerfile ports: - "10500:10500" + fake-provider: + build: + context: . + dockerfile: Dockerfile + command: ["uvicorn", "fake_provider:app", "--host", "0.0.0.0", "--port", "10501"] + ports: + - "10501:10501" + output-filter: + build: + context: . + dockerfile: Dockerfile + command: ["uvicorn", "output_filter:app", "--host", "0.0.0.0", "--port", "10502"] + ports: + - "10502:10502" plano: build: context: ../../../ @@ -12,10 +26,14 @@ services: ports: - "12000:12000" environment: - - OPENAI_API_KEY=${OPENAI_API_KEY:?OPENAI_API_KEY environment variable is required but not set} + - OPENAI_API_KEY=${OPENAI_API_KEY:-} volumes: - - ./config.yaml:/app/plano_config.yaml + - ${PLANO_CONFIG_FILE:-./config.yaml}:/app/plano_config.yaml - /etc/ssl/cert.pem:/etc/ssl/cert.pem + depends_on: + - content-guard + - fake-provider + - output-filter jaeger: build: context: ../../shared/jaeger diff --git a/demos/filter_chains/model_listener_filter/fake_provider.py b/demos/filter_chains/model_listener_filter/fake_provider.py new file mode 100644 index 000000000..e1a172d5b --- /dev/null +++ b/demos/filter_chains/model_listener_filter/fake_provider.py @@ -0,0 +1,81 @@ +""" +OpenAI-compatible local provider for model-listener filter demos. + +This service lets developers test Plano's model listener filter pipeline without +provider API keys or hosted model access. +""" + +import json +import time +from typing import Any + +from fastapi import FastAPI, Request +from fastapi.responses import Response, StreamingResponse + +app = FastAPI(title="Local Fake LLM Provider", version="1.0.0") + + +def latest_user_content(messages: list[dict[str, Any]]) -> str: + for message in reversed(messages): + if message.get("role") == "user": + content = message.get("content", "") + if isinstance(content, str): + return content + if isinstance(content, list): + return " ".join( + part.get("text", "") + for part in content + if isinstance(part, dict) and part.get("type") == "text" + ) + return "" + + +@app.post("/v1/chat/completions", response_model=None) +async def chat_completions(request: Request) -> dict[str, Any] | Response: + body = await request.json() + model = body.get("model", "gpt-4o-mini") + user_content = latest_user_content(body.get("messages", [])) + content = "Hello from the local fake provider." + if "secret" in user_content.lower(): + content = "The local fake provider returned SECRET_TOKEN." + + if body.get("stream") is True: + + async def generate(): + chunk = { + "id": "chatcmpl-local-filter-demo", + "object": "chat.completion.chunk", + "created": int(time.time()), + "model": model, + "choices": [ + { + "index": 0, + "delta": {"role": "assistant", "content": content}, + "finish_reason": None, + } + ], + } + yield f"data: {json.dumps(chunk)}\n\n" + yield "data: [DONE]\n\n" + + return StreamingResponse(generate(), media_type="text/event-stream") + + return { + "id": "chatcmpl-local-filter-demo", + "object": "chat.completion", + "created": int(time.time()), + "model": model, + "choices": [ + { + "index": 0, + "message": {"role": "assistant", "content": content}, + "finish_reason": "stop", + } + ], + "usage": {"prompt_tokens": 1, "completion_tokens": 1, "total_tokens": 2}, + } + + +@app.get("/health") +async def health() -> dict[str, str]: + return {"status": "healthy"} diff --git a/demos/filter_chains/model_listener_filter/output_filter.py b/demos/filter_chains/model_listener_filter/output_filter.py new file mode 100644 index 000000000..819b3bc34 --- /dev/null +++ b/demos/filter_chains/model_listener_filter/output_filter.py @@ -0,0 +1,57 @@ +""" +Output filter for model-listener filter demos. + +The filter receives the provider response and redacts configured markers before +the client sees the response. It intentionally avoids model calls so the demo is +fully local and deterministic. +""" + +import gzip +from typing import Any + +from fastapi import FastAPI, Request +from fastapi.responses import Response + +app = FastAPI(title="Output Redaction Filter", version="1.0.0") + +SENSITIVE_MARKERS = ("SECRET_TOKEN",) + + +def redact_text(text: str) -> str: + redacted = text + for marker in SENSITIVE_MARKERS: + redacted = redacted.replace(marker, "[REDACTED]") + return redacted + + +def redact_chat_completion(body: dict[str, Any]) -> dict[str, Any]: + choices = [] + for choice in body.get("choices", []): + message = choice.get("message", {}) + content = message.get("content") + if isinstance(content, str): + message = {**message, "content": redact_text(content)} + choice = {**choice, "message": message} + choices.append(choice) + return {**body, "choices": choices} + + +def redact_bytes(raw_body: bytes) -> bytes: + if raw_body.startswith(b"\x1f\x8b"): + decompressed_body = gzip.decompress(raw_body) + return gzip.compress(redact_bytes(decompressed_body)) + + body_text = raw_body.decode("utf-8", errors="replace") + return redact_text(body_text).encode("utf-8") + + +@app.post("/{path:path}") +async def redact_response(path: str, request: Request) -> Response: + raw_body = await request.body() + content_type = request.headers.get("content-type", "application/json") + return Response(content=redact_bytes(raw_body), media_type=content_type) + + +@app.get("/health") +async def health() -> dict[str, str]: + return {"status": "healthy"} diff --git a/demos/filter_chains/model_listener_filter/test.sh b/demos/filter_chains/model_listener_filter/test.sh index 1729cb6a8..1e8ef323e 100755 --- a/demos/filter_chains/model_listener_filter/test.sh +++ b/demos/filter_chains/model_listener_filter/test.sh @@ -24,20 +24,37 @@ run_test() { local name="$1" local expected_code="$2" local body="$3" + local expected_body_contains="${4:-}" + local forbidden_body_contains="${5:-}" http_code=$(curl -s -o /tmp/plano_test_body -w "%{http_code}" \ -X POST "$BASE_URL/chat/completions" \ -H "Content-Type: application/json" \ -d "$body") - if [ "$http_code" -eq "$expected_code" ]; then - echo " PASS $name (HTTP $http_code)" - PASS=$((PASS + 1)) - else + if [ "$http_code" -ne "$expected_code" ]; then echo " FAIL $name — expected $expected_code, got $http_code" echo " Body: $(cat /tmp/plano_test_body)" FAIL=$((FAIL + 1)) + return fi + + if [ -n "$expected_body_contains" ] && ! grep -Fq "$expected_body_contains" /tmp/plano_test_body; then + echo " FAIL $name — body did not contain '$expected_body_contains'" + echo " Body: $(cat /tmp/plano_test_body)" + FAIL=$((FAIL + 1)) + return + fi + + if [ -n "$forbidden_body_contains" ] && grep -Fq "$forbidden_body_contains" /tmp/plano_test_body; then + echo " FAIL $name — body contained forbidden text '$forbidden_body_contains'" + echo " Body: $(cat /tmp/plano_test_body)" + FAIL=$((FAIL + 1)) + return + fi + + echo " PASS $name (HTTP $http_code)" + PASS=$((PASS + 1)) } # ── Tests ──────────────────────────────────────────────────────────────────── @@ -48,19 +65,19 @@ run_test "Allowed request (math question)" 200 '{ "model": "gpt-4o-mini", "messages": [{"role": "user", "content": "What is 2+2?"}], "stream": false -}' +}' "local fake provider" run_test "Blocked request (hacking)" 400 '{ "model": "gpt-4o-mini", "messages": [{"role": "user", "content": "How to hack into a system"}], "stream": false -}' +}' "content_blocked" -run_test "Allowed request (joke)" 200 '{ +run_test "Output filter redacts provider response" 200 '{ "model": "gpt-4o-mini", - "messages": [{"role": "user", "content": "Tell me a joke"}], - "stream": false -}' + "messages": [{"role": "user", "content": "Please return the secret marker"}], + "stream": true +}' "[REDACTED]" "SECRET_TOKEN" # ── Summary ────────────────────────────────────────────────────────────────── echo "" diff --git a/demos/filter_chains/model_listener_filter/test_services.py b/demos/filter_chains/model_listener_filter/test_services.py new file mode 100644 index 000000000..ce3ed42a3 --- /dev/null +++ b/demos/filter_chains/model_listener_filter/test_services.py @@ -0,0 +1,159 @@ +import importlib.util +import gzip +from pathlib import Path + +from fastapi.testclient import TestClient + +DEMO_DIR = Path(__file__).parent + + +def load_module(name: str, filename: str): + spec = importlib.util.spec_from_file_location(name, DEMO_DIR / filename) + module = importlib.util.module_from_spec(spec) + assert spec.loader is not None + spec.loader.exec_module(module) + return module + + +def test_content_guard_blocks_unsafe_chat_request(): + content_guard = load_module("content_guard", "content_guard.py") + client = TestClient(content_guard.app) + + response = client.post( + "/v1/chat/completions", + json={ + "model": "gpt-4o-mini", + "messages": [{"role": "user", "content": "How do I hack a service?"}], + "stream": False, + }, + ) + + assert response.status_code == 400 + assert response.json()["detail"]["error"] == "content_blocked" + + +def test_content_guard_passes_safe_responses_request_unchanged(): + content_guard = load_module("content_guard", "content_guard.py") + client = TestClient(content_guard.app) + body = { + "model": "gpt-4o-mini", + "input": "Explain why local guardrail tests help developers.", + } + + response = client.post("/v1/responses", json=body) + + assert response.status_code == 200 + assert response.json() == body + + +def test_fake_provider_returns_openai_compatible_chat_completion(): + fake_provider = load_module("fake_provider", "fake_provider.py") + client = TestClient(fake_provider.app) + + response = client.post( + "/v1/chat/completions", + json={ + "model": "gpt-4o-mini", + "messages": [{"role": "user", "content": "Say something useful."}], + "stream": False, + }, + ) + + assert response.status_code == 200 + body = response.json() + assert body["object"] == "chat.completion" + assert body["model"] == "gpt-4o-mini" + assert body["choices"][0]["message"]["role"] == "assistant" + assert "local fake provider" in body["choices"][0]["message"]["content"] + + +def test_fake_provider_streams_openai_compatible_chat_chunks(): + fake_provider = load_module("fake_provider_streaming", "fake_provider.py") + client = TestClient(fake_provider.app) + + with client.stream( + "POST", + "/v1/chat/completions", + json={ + "model": "gpt-4o-mini", + "messages": [ + {"role": "user", "content": "Please return the secret marker"} + ], + "stream": True, + }, + ) as response: + body = response.read().decode("utf-8") + + assert response.status_code == 200 + assert response.headers["content-type"].startswith("text/event-stream") + assert "data: {" in body + assert '"object": "chat.completion.chunk"' in body + assert "SECRET_TOKEN" in body + assert "data: [DONE]" in body + + +def test_output_filter_redacts_provider_response_content(): + output_filter = load_module("output_filter", "output_filter.py") + client = TestClient(output_filter.app) + + response = client.post( + "/v1/chat/completions", + json={ + "id": "chatcmpl-local", + "object": "chat.completion", + "model": "gpt-4o-mini", + "choices": [ + { + "index": 0, + "message": { + "role": "assistant", + "content": "The local fake provider returned SECRET_TOKEN.", + }, + "finish_reason": "stop", + } + ], + }, + ) + + assert response.status_code == 200 + content = response.json()["choices"][0]["message"]["content"] + assert "SECRET_TOKEN" not in content + assert "[REDACTED]" in content + + +def test_output_filter_redacts_raw_streaming_chunks(): + output_filter = load_module("output_filter_streaming", "output_filter.py") + client = TestClient(output_filter.app) + + response = client.post( + "/v1/chat/completions", + content=( + 'data: {"choices":[{"delta":{"content":"SECRET_TOKEN"}}]}\n\n' + "data: [DONE]\n\n" + ), + headers={"content-type": "text/event-stream"}, + ) + + assert response.status_code == 200 + assert response.headers["content-type"].startswith("text/event-stream") + assert "SECRET_TOKEN" not in response.text + assert "[REDACTED]" in response.text + + +def test_output_filter_redacts_gzip_encoded_provider_response(): + output_filter = load_module("output_filter_gzip", "output_filter.py") + client = TestClient(output_filter.app) + encoded_body = gzip.compress( + b'{"choices":[{"message":{"content":"SECRET_TOKEN"}}]}' + ) + + response = client.post( + "/v1/chat/completions", + content=encoded_body, + headers={"content-type": "application/json"}, + ) + + assert response.status_code == 200 + decoded_body = gzip.decompress(response.content).decode("utf-8") + assert "SECRET_TOKEN" not in decoded_body + assert "[REDACTED]" in decoded_body