Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 34 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,34 +1,52 @@
# http_server_python

TEN extension of a Simple HTTP server, to make the running TEN graph interativeable with outside world.

Typical usages:
- Modify properties of TEN extensions
- Trigger actions of TEN extensions
- Query status of TEN extensions

This project is a TEN extension that implements a simple HTTP server, enabling interaction with the running TEN graph from external systems.

## Features

- Passing through any `cmd` to the running TEN graph, return result as needed
- **Command Execution**: Seamlessly pass any `cmd` to the running TEN graph and receive the results.
- **Asynchronous Handling**: Utilizes `asyncio` and `aiohttp` for efficient, non-blocking request processing.
- **Configurable Server**: Easily configure the server's listening address and port through the TEN environment.


## API

- `/cmd`
### Property

<!-- TODO: hide internal fields and add examples -->
Refer to api definition in [manifest.json](manifest.json) and default values in [property.json](property.json).

## Development
| Property | Type | Description |
| - | - | - |
| `listen_addr`| `string`| address to listen on |
| `listen_port` | `int32` | port to listen on|

## HTTP API

### Build
### POST `/cmd/{cmd_name}`

<!-- build dependencies and steps -->
- **Description**: Sends a command with the specified name on the TEN graph.
- **Request Body**: JSON object containing the command properties.
- **Response**: JSON object with the command execution result.

#### Example Request

```bash
curl -X POST http://127.0.0.1:8888/cmd/example_cmd_name \
-H "Content-Type: application/json" \
-d '{
"num_property1": 1,
"str_property2": "Hello"
}'
```

## Development

### Unit test
### Standalone testing

<!-- how to do unit test for the extension -->

## Misc
```bash
task install
task test
```

<!-- others if applicable -->
1 change: 1 addition & 0 deletions Taskfile.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ tasks:
desc: install dependencies
cmds:
- tman install
- pip install -r requirements.txt
- pip install -r tests/requirements.txt

lint:
Expand Down
178 changes: 72 additions & 106 deletions http_server_extension.py
Original file line number Diff line number Diff line change
@@ -1,114 +1,80 @@
import asyncio
from aiohttp import web
import json

from ten import (
Extension,
TenEnv,
AsyncExtension,
AsyncTenEnv,
Cmd,
StatusCode,
CmdResult,
)
from http.server import HTTPServer, BaseHTTPRequestHandler
import threading
from functools import partial
import re


class HTTPHandler(BaseHTTPRequestHandler):
def __init__(self, ten: TenEnv, *args, directory=None, **kwargs):
ten.log_debug(f"new handler: {directory} {args} {kwargs}")
self.ten = ten
super().__init__(*args, **kwargs)

def do_POST(self):
self.ten.log_debug(f"post request incoming {self.path}")

# match path /cmd/<cmd_name>
match = re.match(r"^/cmd/([^/]+)$", self.path)
if match:
cmd_name = match.group(1)
try:
content_length = int(self.headers["Content-Length"])
input = self.rfile.read(content_length).decode("utf-8")
self.ten.log_info(f"incoming request {self.path} {input}")

# processing by send_cmd
cmd_result_event = threading.Event()
cmd_result: CmdResult

def cmd_callback(_, result, ten_error):
nonlocal cmd_result_event
nonlocal cmd_result
cmd_result = result
self.ten.log_info(
"cmd callback result: {}".format(
cmd_result.get_property_to_json("")
)
)
cmd_result_event.set()

cmd = Cmd.create(cmd_name)
cmd.set_property_from_json("", input)
self.ten.send_cmd(cmd, cmd_callback)
event_got = cmd_result_event.wait(timeout=5)

# return response
if not event_got: # timeout
self.send_response_only(504)
self.end_headers()
return
self.send_response(
200 if cmd_result.get_status_code() == StatusCode.OK else 502
)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(
cmd_result.get_property_to_json("").encode(encoding="utf_8")
)
except Exception as e:
self.ten.log_warn("failed to handle request, err {}".format(e))
self.send_response_only(500)
self.end_headers()
else:
self.ten.log_warn(f"invalid path: {self.path}")
self.send_response_only(404)
self.end_headers()


class HTTPServerExtension(Extension):


class HTTPServerExtension(AsyncExtension):
def __init__(self, name: str):
super().__init__(name)
self.listen_addr = "127.0.0.1"
self.listen_port = 8888
self.cmd_white_list = None
self.server = None
self.thread = None

def on_start(self, ten: TenEnv):
self.listen_addr = ten.get_property_string("listen_addr")
self.listen_port = ten.get_property_int("listen_port")
"""
white_list = ten.get_property_string("cmd_white_list")
if len(white_list) > 0:
self.cmd_white_list = white_list.split(",")
"""

ten.log_info(
f"on_start {self.listen_addr}:{self.listen_port}, {self.cmd_white_list}"
)

self.server = HTTPServer(
(self.listen_addr, self.listen_port), partial(HTTPHandler, ten)
)
self.thread = threading.Thread(target=self.server.serve_forever)
self.thread.start()

ten.on_start_done()

def on_stop(self, ten: TenEnv):
self.server.shutdown()
self.thread.join()
ten.on_stop_done()

def on_cmd(self, ten: TenEnv, cmd: Cmd):
self.listen_addr: str = "127.0.0.1"
self.listen_port: int = 8888

self.ten_env: AsyncTenEnv = None

# http server instances
self.app = web.Application()
self.runner = None

# POST /cmd/{cmd_name}
async def handle_post_cmd(self, request):
ten_env = self.ten_env

try:
cmd_name = request.match_info.get('cmd_name')

req_json = await request.json()
input = json.dumps(req_json, ensure_ascii=False)

ten_env.log_debug(
f"process incoming request {request.method} {request.path} {input}")

cmd = Cmd.create(cmd_name)
cmd.set_property_from_json("", input)
[cmd_result, _] = await asyncio.wait_for(ten_env.send_cmd(cmd), 5.0)

# return response
status = 200 if cmd_result.get_status_code() == StatusCode.OK else 502
return web.json_response(
cmd_result.get_property_to_json(""), status=status
)
except json.JSONDecodeError:
return web.Response(status=400)
except asyncio.TimeoutError:
return web.Response(status=504)
except Exception as e:
ten_env.log_warn(
"failed to handle request with unknown exception, err {}".format(e))
return web.Response(status=500)

async def on_start(self, ten_env: AsyncTenEnv):
if await ten_env.is_property_exist("listen_addr"):
self.listen_addr = await ten_env.get_property_string("listen_addr")
if await ten_env.is_property_exist("listen_port"):
self.listen_port = await ten_env.get_property_int("listen_port")
self.ten_env = ten_env

ten_env.log_info(
f"http server listening on {self.listen_addr}:{self.listen_port}")

self.app.router.add_post("/cmd/{cmd_name}", self.handle_post_cmd)
self.runner = web.AppRunner(self.app)
await self.runner.setup()
site = web.TCPSite(self.runner, self.listen_addr, self.listen_port)
await site.start()

async def on_stop(self, ten_env: AsyncTenEnv):
await self.runner.cleanup()
self.ten_env = None

async def on_cmd(self, ten_env: AsyncTenEnv, cmd: Cmd):
cmd_name = cmd.get_name()
ten.log_info("on_cmd {cmd_name}")
cmd_result = CmdResult.create(StatusCode.OK)
ten.return_result(cmd_result, cmd)
ten_env.log_debug(f"on_cmd {cmd_name}")
ten_env.return_result(CmdResult.create(StatusCode.OK), cmd)
4 changes: 3 additions & 1 deletion manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
"include": [
"manifest.json",
"property.json",
"**.py"
"**.py",
"requirements.txt",
"README.md"
]
},
"api": {
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
aiohttp
51 changes: 0 additions & 51 deletions tests/test_404.py

This file was deleted.

64 changes: 64 additions & 0 deletions tests/test_4xx.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
#
# Copyright © 2025 Agora
# This file is part of TEN Framework, an open source project.
# Licensed under the Apache License, Version 2.0, with certain conditions.
# Refer to the "LICENSE" file in the root directory for more information.
#
from pathlib import Path
from ten import (
ExtensionTester,
TenEnvTester,
)
import httpx


class ExtensionTester404NotFound1(ExtensionTester):
def on_start(self, ten_env: TenEnvTester) -> None:
ten_env.on_start_done()

property_json = {"num": 1, "str": "111"}
r = httpx.post("http://127.0.0.1:8888/cmd", json=property_json)
print(r)
if r.status_code == httpx.codes.NOT_FOUND:
ten_env.stop_test()


class ExtensionTester404NotFound2(ExtensionTester):
def on_start(self, ten_env: TenEnvTester) -> None:
ten_env.on_start_done()

property_json = {"num": 1, "str": "111"}
r = httpx.post("http://127.0.0.1:8888/cmd/aaa/123", json=property_json)
print(r)
if r.status_code == httpx.codes.NOT_FOUND:
ten_env.stop_test()


class ExtensionTester400BadRequest(ExtensionTester):
def on_start(self, ten_env: TenEnvTester) -> None:
ten_env.on_start_done()

property_str = '{num": 1, "str": "111"}' # not a valid json
r = httpx.post("http://127.0.0.1:8888/cmd/aaa", content=property_str)
print(r)
if r.status_code == httpx.codes.BAD_REQUEST:
ten_env.stop_test()


def test_4xx():
tester_404_1 = ExtensionTester404NotFound1()
tester_404_1.add_addon_base_dir(
str(Path(__file__).resolve().parent.parent))
tester_404_1.set_test_mode_single("http_server_python")
tester_404_1.run()

tester_404_2 = ExtensionTester404NotFound2()
tester_404_2.add_addon_base_dir(
str(Path(__file__).resolve().parent.parent))
tester_404_2.set_test_mode_single("http_server_python")
tester_404_2.run()

tester_400 = ExtensionTester400BadRequest()
tester_400.add_addon_base_dir(str(Path(__file__).resolve().parent.parent))
tester_400.set_test_mode_single("http_server_python")
tester_400.run()
Loading
Loading