diff --git a/girest/README.md b/girest/README.md index b845a66..c34747c 100644 --- a/girest/README.md +++ b/girest/README.md @@ -53,10 +53,6 @@ The schema generator (`girest/girest/main.py`) converts GObject Introspection me ```bash # Generate OpenAPI schema in JSON format python3 girest-dump-schema.py Gst 1.0 -o gst-schema.json - -# Options: -# --sse-only SSE-style callbacks (EventSource-based) -# (default) URL-based callbacks (HTTP POST) ``` ### GI Features and OpenAPI Mapping @@ -123,54 +119,10 @@ python3 girest-dump-schema.py Gst 1.0 -o gst-schema.json // Returns: { return: true, cur: 1234567890 } ``` -#### Callbacks - Dual Mode Support +#### Callbacks **GI Feature**: Function pointers with scope (call, async, notified, forever) -GIRest supports two callback modes: - -##### SSE Mode (Server-Sent Events) - `--sse-only` - -**When to use**: Browser clients, simple integration, persistent connections - -**OpenAPI Mapping**: -- Callbacks return integer IDs in response -- Single `/GIRest/callbacks` endpoint (text/event-stream) -- Events dispatched with `{ id: callbackId, data: {...} }` - -**Features**: -- ✅ Synchronous callbacks (scope: call) - **Excluded** in SSE-only mode -- ✅ Asynchronous callbacks (scope: async, notified, forever) -- ✅ Automatic reconnection -- ✅ Built-in EventSource support - -**Schema Example**: -```json -{ - "responses": { - "200": { - "content": { - "application/json": { - "schema": { - "properties": { - "func": { - "type": "integer", - "description": "Callback ID", - "x-gi-callback": "#/components/schemas/GstLogFunction" - } - } - } - } - } - } - } -} -``` - -##### URL-Based Callbacks Mode (Default) - -**When to use**: Server-to-server, custom callback handling, all callback types - **OpenAPI Mapping**: - Callback URL parameters: `{name}_url` (e.g., `func_url`) - Required headers: `session-id`, `callback-secret` @@ -178,7 +130,7 @@ GIRest supports two callback modes: - HMAC signature verification **Features**: -- ✅ All callback scopes supported (sync and async) +- ✅ All callback scopes supported (synchronous and asynchronous) - ✅ No persistent connection required - ✅ Flexible routing and authentication - ✅ Custom callback infrastructure integration @@ -217,7 +169,7 @@ GIRest supports two callback modes: } ``` -**Callback Payload** (URL mode): +**Callback Payload**: ```json { "sessionId": "client-session-123", @@ -230,12 +182,12 @@ GIRest supports two callback modes: ##### Callback Scope Handling -| GI Scope | Description | SSE Mode | URL Mode | -|----------|-------------|----------|----------| -| `CALL` | Synchronous, called during function execution | ❌ Excluded | ✅ Supported | -| `ASYNC` | Fire-and-forget, no guarantee of invocation | ✅ Supported | ✅ Supported | -| `NOTIFIED` | Called multiple times until destroyed | ✅ Supported | ✅ Supported | -| `FOREVER` | Never destroyed, called indefinitely | ✅ Supported | ✅ Supported | +| GI Scope | Description | Support | +|----------|-------------|---------| +| `CALL` | Synchronous, called during function execution | ✅ Supported | +| `ASYNC` | Fire-and-forget, no guarantee of invocation | ✅ Supported | +| `NOTIFIED` | Called multiple times until destroyed | ✅ Supported | +| `FOREVER` | Never destroyed, called indefinitely | ✅ Supported | #### Pointer Types @@ -304,7 +256,6 @@ Custom extensions used throughout the schema: | `x-gi-null` | Nullable type | `true` | | `x-gi-callback` | Callback schema ref | `"#/components/schemas/GstLogFunction"` | | `x-gi-callback-style` | Callback type | `"sync"`, `"async"` | -| `x-girest-callback-mode` | API mode | `"sse"`, `"url"` | --- @@ -349,7 +300,7 @@ The GIRest server (`girest-frida.py`) uses [Frida](https://frida.re) to inject i │ ┌────────────────────────────────────────────────────┐ │ │ │ GIResolver (girest/resolvers.py) │ │ │ │ • Maps HTTP requests to Frida RPC calls │ │ -│ │ • Handles callbacks (SSE or URL-based) │ │ +│ │ • Handles callbacks (URL-based) │ │ │ │ • Manages callback lifecycle │ │ │ └────────────────────────────────────────────────────┘ │ └──────────────────────────────────────────────────────────┘ @@ -375,31 +326,6 @@ The JavaScript agent (`girest/girest.js`) handles: ### Callback Handling in Frida -#### SSE Mode - -```javascript -// Create native callback -const cb_id = callbacks.size; -const cb = new NativeCallback((...args) => { - // Serialize arguments - const data = {}; - for (var cb_a of cb_def) { - data[cb_a.name] = args[idx++]; - } - - // Send via SSE - send({ - kind: "callback", - data: { id: cb_id, data: data } - }); -}, "void", cb_sig); - -callbacks.set(cb_id.toString(), cb); -return cb_id; // Return to client -``` - -#### URL Mode - ```javascript // Create native callback that POSTs to client URL const cb = new NativeCallback((...args) => { @@ -440,9 +366,6 @@ python3 girest-frida.py --name gst-launch-1.0 Gst 1.0 # Custom port python3 girest-frida.py --pid 12345 --port 8080 Gst 1.0 -# SSE-only mode -python3 girest-frida.py --pid 12345 --sse-only Gst 1.0 - # Access Swagger UI # http://localhost:9000/ui ``` @@ -451,7 +374,6 @@ python3 girest-frida.py --pid 12345 --sse-only Gst 1.0 | Endpoint | Purpose | |----------|---------| -| `/GIRest/callbacks` | SSE endpoint for callback events (SSE mode only) | | `/ui` | Swagger UI for API exploration | > **Note**: The `/GIRest/pipelines` endpoint is part of gstaudit-server, not the base GIRest framework. @@ -475,11 +397,6 @@ python3 girest-client-generator.py Gst 1.0 \ python3 girest-client-generator.py Gst 1.0 \ --base-path /api/v1 \ --output gst.ts - -# SSE mode -python3 girest-client-generator.py Gst 1.0 \ - --sse-only \ - --output gst-sse.ts ``` ### Generated Code Structure @@ -489,13 +406,7 @@ python3 girest-client-generator.py Gst 1.0 \ const apiConfig = { host, port, basePath, ... }; export function setApiConfig(config) { ... } -// SSE mode: EventSource for callbacks -const callbackDispatcher = new Map(); -const callbackSource = new EventSource('/GIRest/callbacks'); - -// OR - -// URL mode: Callback handler interface +// Callback handler interface export interface ICallbackHandler { registerCallback(func, metadata): { callbackUrl, callbackId }; unregisterCallback(callbackId): void; @@ -516,53 +427,53 @@ export namespace Gst { ### Key Features -#### Callbacks (SSE Mode - EventSource-based) +#### Callbacks **Generated Code**: ```typescript export async function debug_add_log_function( + session_id: string, + callback_secret: string, func: GstLogFunction ): Promise { + const callbackHandler = getCallbackHandler(); + const { callbackUrl } = callbackHandler.registerCallback(func, convertGstLogFunctionArgs, { + methodName: 'debug_add_log_function', + paramName: 'func' + }); + const url = new URL('/Gst/debug_add_log_function', apiConfig.baseUrl); - const response = await fetch(url.toString()); - const data = await response.json(); + url.searchParams.append('func', callbackUrl); - // Automatically register callback with dispatcher - if (data.func !== undefined) { - callbackDispatcher.set(data.func.toString(), { - converter: convertGstLogFunctionArgs, - userFunction: func - }); - } + const response = await fetch(url.toString(), { + headers: { + 'session-id': session_id, + 'callback-secret': callback_secret + } + }); } - -// EventSource automatically dispatches callbacks -callbackSource.onmessage = (ev) => { - const json = JSON.parse(ev.data); - const callbackEntry = callbackDispatcher.get(json.id.toString()); - if (callbackEntry) { - const args = callbackEntry.converter(json.data); - callbackEntry.userFunction(...args); - } -}; ``` **Usage**: ```typescript -import { Gst } from './gst'; +import { Gst, setCallbackHandler } from './gst'; +import { MyCallbackHandler } from './my-handler'; + +// Set up callback handler +setCallbackHandler(new MyCallbackHandler()); // Define callback function onLog(category, level, file, func, line, obj, message) { console.log(`${file}:${line} - ${message}`); } -// Register - that's it! -await Gst.debug_add_log_function(onLog); +// Register callback +await Gst.debug_add_log_function('session-123', 'my-secret', onLog); ``` -#### Callbacks (URL Mode - Webhook-based) +#### Automatic Memory Management **Generated Code**: @@ -746,13 +657,6 @@ PyGObject is not used to read introspection information because it "hides" imple FastAPI lacks proper inheritance support for object types in schemas. Additionally, adding schema metadata (required for automatic TypeScript bindings) is difficult - you can only add it to endpoints, not fields. Workarounds exist but require generating and extending Pydantic models, which is too burdensome. We use `apispec` directly to generate valid OpenAPI schemas. -### Why vendor extensions? - -Using `x-girest-callback-mode` vendor extensions instead of pattern detection provides: -- **Explicit configuration**: Mode is declared in schema, not inferred -- **Better maintainability**: No fragile pattern matching logic -- **Clarity**: Consumers know the exact API mode from schema metadata - ### Why ICallbackHandler interface? The interface-based design allows: diff --git a/girest/design/CALLBACK_IMPLEMENTATION.md b/girest/design/CALLBACK_IMPLEMENTATION.md index 76aea7c..0c08155 100644 --- a/girest/design/CALLBACK_IMPLEMENTATION.md +++ b/girest/design/CALLBACK_IMPLEMENTATION.md @@ -1,52 +1,46 @@ # Callback Implementation -This document describes the implementation of callback support in GIRest, including both Server-Sent Events (SSE) and non-SSE callback modes, reentrancy support, and thread affinity mechanisms. +This document describes the implementation of callback support in GIRest, including reentrancy support and thread affinity mechanisms. ## Overview -GIRest provides two ways to handle callbacks from native GStreamer/GObject code: +GIRest handles callbacks from native GStreamer/GObject code by using callback URLs. The client provides a callback URL when registering the callback, and GIRest makes HTTP POST requests to this URL when callbacks are triggered. -1. **SSE Mode (Server-Sent Events)**: The client opens a long-lived HTTP connection to `/GIRest/callbacks` and receives callback events as they occur. Best for browser-based clients and TypeScript/JavaScript applications. +This approach supports **reentrancy** (calling REST APIs from within callbacks) and **thread affinity** (ensuring reentrant calls execute on the correct native thread). -2. **Non-SSE Mode**: The client provides a callback URL when registering the callback. GIRest will make HTTP POST requests to this URL when callbacks are triggered. Best for server-to-server communication and testing scenarios. - -Both modes support **reentrancy** (calling REST APIs from within callbacks) and **thread affinity** (ensuring reentrant calls execute on the correct native thread). - -Both modes support **reentrancy** (calling REST APIs from within callbacks) and **thread affinity** (ensuring reentrant calls execute on the correct native thread). - -## Callback Modes - -### SSE Mode (Server-Sent Events) +## Callback Mechanism **How it works:** -1. Client opens an EventSource connection to `/GIRest/callbacks` -2. Server keeps connection alive and sends callback events as JSON -3. Client dispatcher routes events to registered callback functions by callback ID +1. Client starts a callback server (e.g., HTTP server listening on a port) +2. Client registers callback with REST API, providing callback URL in request body +3. GIRest server makes HTTP POST requests to the callback URL when events occur +4. Client server handles POST requests and executes callback function **Advantages:** -- Simple for browser-based clients -- Low latency - server pushes events immediately -- Single connection for all callbacks +- Server-to-server communication +- No persistent connection required +- Flexible routing and authentication +- Supports all callback scopes (sync and async) **Requirements:** -- Client must support EventSource/SSE -- Client must maintain long-lived connection +- Client must run an HTTP server to receive callbacks +- Client provides callback URL when registering callbacks -**Example (TypeScript):** -```typescript -const callbackSource = new EventSource('http://localhost:8000/GIRest/callbacks'); -callbackSource.onmessage = (ev) => { - const json = JSON.parse(ev.data); - const cb = callbackDispatcher.get(json.id.toString()); - if (cb) { - cb(...Object.values(json.data)); - } -}; +**Example (Python test):** +```python +# Start callback server +callback_server = await start_callback_server() -await Gst.debug_add_log_function(onLog); -``` +# Register callback with URL +payload = { + "func": { + "url": f"http://localhost:{callback_server.port}/callback/0" + } +} +response = await client.post("/Gst/debug_add_log_function", json=payload) -### Non-SSE Mode +# Server will POST to the callback URL when events occur +``` **How it works:** 1. Client starts a callback server (e.g., aiohttp server listening on a port) @@ -56,8 +50,9 @@ await Gst.debug_add_log_function(onLog); **Advantages:** - Server-to-server communication -- Works without SSE/EventSource support -- Easier to test with standard HTTP tools +- No persistent connection required +- Flexible routing and authentication +- Supports all callback scopes (sync and async) **Requirements:** - Client must run an HTTP server to receive callbacks @@ -125,7 +120,7 @@ send({ The client must include the `X-Correlation-Id` header in reentrant API calls: ```python -# Non-SSE callback handler +# Callback handler async def callback_handler(request): data = await request.json() correlation_id = data.get('correlation_id') @@ -260,20 +255,24 @@ INFO: Executing queued call on thread: 1638567, correlation_id=2 All reentrant calls execute as "queued calls" on the same thread, regardless of nesting depth. -## TypeScript Client Usage (SSE Mode) +## TypeScript Client Usage The TypeScript bindings include automatic callback support. When you call a function that takes a callback parameter (like `Gst.debug_add_log_function`), the generated code will: 1. Make the REST API call to register the callback -2. Receive a callback ID from the server -3. Automatically register your callback function with the internal dispatcher -4. Listen for callback events via EventSource on `/GIRest/callbacks` -5. Dispatch events to your callback function when they arrive +2. Register your callback function with the callback handler +3. Provide the callback URL to the server +4. Server POSTs to the callback URL when events occur +5. Callback handler dispatches to your callback function Example usage: ```typescript -import { Gst } from './gst'; +import { Gst, setCallbackHandler } from './gst'; +import { MyCallbackHandler } from './my-handler'; + +// Set up callback handler +setCallbackHandler(new MyCallbackHandler()); // Define your callback function with proper types function onLog(category, level, file, func, line, obj, message) { @@ -281,11 +280,9 @@ function onLog(category, level, file, func, line, obj, message) { } // Register the callback - it will be automatically dispatched -await Gst.debug_add_log_function(onLog); +await Gst.debug_add_log_function('session-123', 'my-secret', onLog); ``` -Compare this with the manual approach in `girest/examples/log.js` - the TypeScript bindings handle all the boilerplate automatically! - ## Architecture ### 1. OpenAPI Schema Generation (girest/main.py) @@ -503,24 +500,6 @@ This is a server-side concern and doesn't affect the TypeScript bindings impleme ## Testing -### SSE Mode Testing -Run the TypeScript generator: -```bash -cd girest -python3 girest-client-generator.py Gst 1.0 --base-url http://localhost:8000 -o gst.ts -``` - -Check the generated callback: -```bash -grep -A 10 "debug_add_log_function" gst.ts -``` - -Expected output shows: -- Callback function parameter with proper TypeScript signature -- Automatic callback registration after API call -- Reserved keywords renamed (`function` -> `function_`) - -### Non-SSE Mode Testing Run e2e tests with callback servers: ```bash cd girest @@ -547,16 +526,16 @@ All reentrant calls for a given callback should show the same thread ID in logs. 1. **GObject Refcounting**: Implement in girest.js Frida script 2. **Type Safety**: Improve enum namespace handling to avoid TypeScript warnings 3. **Error Handling**: Add error handling for callback registration failures -4. **Callback Cleanup**: Allow unregistering callbacks and closing EventSource connection +4. **Callback Cleanup**: Allow unregistering callbacks 5. **Multiple Callbacks**: Support registering multiple callbacks for the same event 6. **Correlation ID Timeout**: Add timeout for correlation ID cleanup when callbacks don't complete -7. **Non-SSE TypeScript Support**: Generate TypeScript bindings that support non-SSE callback mode ## Key Takeaways -### Two Callback Modes -- **SSE**: Browser-friendly, push-based, single connection -- **Non-SSE**: Server-to-server, requires callback HTTP server, easier to test +### Callback Mechanism +- Server-to-server communication via HTTP POST +- Requires callback HTTP server on client side +- Supports all callback scopes and patterns ### Reentrancy is Fully Supported - Callbacks can make REST API calls back to GIRest diff --git a/girest/girest-client-generator.py b/girest/girest-client-generator.py index ff321f8..c709f9a 100755 --- a/girest/girest-client-generator.py +++ b/girest/girest-client-generator.py @@ -32,17 +32,12 @@ def main(): parser.add_argument("--host", default="localhost", help="Host for REST API calls (default: localhost)") parser.add_argument("--port", type=int, default=9000, help="Port for REST API calls (default: 9000)") parser.add_argument("--base-path", default="", help="Base path for REST API calls (default: '')") - parser.add_argument( - "--sse-only", - action="store_true", - help="Use SSE-only mode: hide callback URLs, return int IDs, skip sync callbacks", - ) args = parser.parse_args() try: # Generate the OpenAPI schema - girest = GIRest(args.namespace, args.version, sse_only=args.sse_only) + girest = GIRest(args.namespace, args.version) spec = girest.generate() openapi_schema = spec.to_dict() diff --git a/girest/girest-dump-schema.py b/girest/girest-dump-schema.py index 9092ed7..9f83f21 100755 --- a/girest/girest-dump-schema.py +++ b/girest/girest-dump-schema.py @@ -24,17 +24,12 @@ def main(): parser.add_argument("namespace", help="GObject namespace (e.g., 'Gst', 'GLib', 'Gtk')") parser.add_argument("version", help="Namespace version (e.g., '1.0', '2.0')") parser.add_argument("-o", "--output", help="Output file path (default: stdout)", default=None) - parser.add_argument( - "--sse-only", - action="store_true", - help="Use SSE-only mode: hide callback URLs, return int IDs, skip sync callbacks", - ) args = parser.parse_args() try: # Generate the OpenAPI schema - girest = GIRest(args.namespace, args.version, sse_only=args.sse_only) + girest = GIRest(args.namespace, args.version) spec = girest.generate() openapi_schema = spec.to_dict() diff --git a/girest/girest-frida.py b/girest/girest-frida.py index ef34d8b..01c615b 100755 --- a/girest/girest-frida.py +++ b/girest/girest-frida.py @@ -20,25 +20,15 @@ def main(): parser.add_argument("version", help="Namespace version (e.g., '1.0', '2.0')") parser.add_argument("--pid", type=int, required=True, help="Process ID to instrument") parser.add_argument("--port", type=int, default=9000, help="Port to run the server on (default: 9000)") - parser.add_argument( - "--sse-buffer-size", type=int, default=100, help="Size of the SSE event ring buffer (default: 100)" - ) - parser.add_argument( - "--sse-only", - action="store_true", - help="Use SSE-only mode: hide callback URLs, return int IDs, skip sync callbacks", - ) args = parser.parse_args() try: # Create the resolver with Frida - resolver = FridaResolver( - args.namespace, args.version, args.pid, sse_buffer_size=args.sse_buffer_size, sse_only=args.sse_only - ) + resolver = FridaResolver(args.namespace, args.version, args.pid) # Create the connexion AsyncApp # the actual defition by calling, for example, operation.parameters - app = GIApp(__name__, args.namespace, args.version, resolver, sse_only=args.sse_only) + app = GIApp(__name__, args.namespace, args.version, resolver) app.run(port=args.port) except Exception as e: diff --git a/girest/girest/app.py b/girest/girest/app.py index caaaa7e..2704de2 100644 --- a/girest/girest/app.py +++ b/girest/girest/app.py @@ -70,11 +70,10 @@ def __init__( version: str, resolver: GIResolver, *, - sse_only: bool = False, default_base_path=None, ): - # Generate the OpenAPI schema with specified buffer size - girest = GIRest(namespace, version, sse_only=sse_only) + # Generate the OpenAPI schema + girest = GIRest(namespace, version) spec = girest.generate() specd = spec.to_dict() diff --git a/girest/girest/callbacks.py b/girest/girest/callbacks.py index c3dca95..c5bdf38 100644 --- a/girest/girest/callbacks.py +++ b/girest/girest/callbacks.py @@ -1,9 +1,9 @@ """ Callback handling and security for GIRest HTTP callbacks. -This module provides classes for secure callback invocations, supporting both -synchronous callbacks (that wait for a return value) and asynchronous callbacks -(fire-and-forget), with HMAC signature authentication. +This module provides classes for secure callback invocations with HMAC +signature authentication, supporting both synchronous callbacks (that wait +for a return value) and asynchronous callbacks (fire-and-forget). """ import hashlib diff --git a/girest/girest/generator.py b/girest/girest/generator.py index 1b78325..db9601b 100644 --- a/girest/girest/generator.py +++ b/girest/girest/generator.py @@ -191,19 +191,6 @@ def _create_namespace_schemas(self): schema = Namespace(tag, self) self.add_schema(schema) - def _get_callback_mode(self) -> str: - """ - Get the callback mode from the OpenAPI schema info. - - The callback mode is specified in the schema's info section via the - x-girest-callback-mode vendor extension. This is set by GIRest when - generating the schema based on the --sse-only flag. - - Returns: - str: "sse" for SSE mode, "url" for URL-based callbacks, defaults to "sse" - """ - return self.schema.get("info", {}).get("x-girest-callback-mode", "sse") - def generate(self) -> str: """Generate complete TypeScript bindings.""" title = self.schema.get("info", {}).get("title", "API") @@ -217,10 +204,6 @@ def generate(self) -> str: # Now the tags without schemas self._create_namespace_schemas() - # Get callback mode from schema - callback_mode = self._get_callback_mode() - sse_mode = callback_mode == "sse" - # Generate main file main_template = self.jinja_env.get_template("main.ts.j2") return main_template.render( @@ -231,7 +214,6 @@ def generate(self) -> str: port=self.port, base_path=self.base_path, schemas=self.schema_objects_cache, - sse_mode=sse_mode, ) @@ -328,7 +310,7 @@ def description(self) -> str: @property def is_callback(self) -> bool: - """Check if this parameter represents a callback (non-SSE mode).""" + """Check if this parameter represents a callback.""" return "x-gi-callback" in self.schema_section @property @@ -528,12 +510,12 @@ def __init__(self, name: str, schema_def: Dict[str, Any], generator: "Generator" raw_properties = schema_def.get("properties", {}) for pname, pv in raw_properties.items(): # Check if this property has x-gi-is-return (callback parameter) - # In non-SSE mode, callbacks have additional properties like sessionId, callbackName, etc. + # Callbacks have additional properties like sessionId, callbackName, etc. # that don't have this flag if pv.get("x-gi-is-return", False): self._return_param = ReturnParam(pname, self.generator, pv, self) elif pname not in ["sessionId", "callbackName", "args", "invocationNumber", "timestamp"]: - # Skip non-SSE mode metadata properties + # Skip callback metadata properties self._parameters.append(Field(pname, pv, generator, self)) @property @@ -946,7 +928,7 @@ def callback_params(self) -> List["ReturnParam"]: @property def callback_url_params(self) -> List["Param"]: - """Get callback URL parameters (non-SSE mode).""" + """Get callback URL parameters.""" # Check both query params and body properties for callbacks query_callbacks = [p for p in self.query_params if p.is_callback] body_callbacks = [p for p in self.body_properties if p.is_callback] @@ -954,16 +936,9 @@ def callback_url_params(self) -> List["Param"]: @property def header_params(self) -> List["Param"]: - """Get header parameters (typically for non-SSE mode callbacks).""" + """Get header parameters (typically for callback authentication).""" return [p for p in self.parameters if p.location == "header"] - @property - def uses_sse_callbacks(self) -> bool: - """Determine if this method uses SSE-style callbacks (returns callback ID) or URL-based callbacks.""" - # If there are callback_params (callbacks in return), it's SSE mode - # If there are callback_url_params (callbacks as URL parameters), it's non-SSE mode - return len(self.callback_params) > 0 and len(self.callback_url_params) == 0 - def generate(self) -> str: """Generate the code based on the template, selecting HTTP method-specific templates.""" # Try specific template first (e.g., method_GstBus_connect_sync_message.ts.j2) diff --git a/girest/girest/main.py b/girest/girest/main.py index 95d8620..9326915 100644 --- a/girest/girest/main.py +++ b/girest/girest/main.py @@ -13,10 +13,9 @@ class GIRest: pointer_schema = {"type": "string", "pattern": "^0x[0-9a-fA-F]+$|^[0-9]+$"} event_schema = {"type": "object", "required": ["data"], "properties": {"data": {"type": "string"}}} - def __init__(self, ns, ns_version, sse_only=False): + def __init__(self, ns, ns_version): self.ns = ns self.ns_version = ns_version - self.sse_only = sse_only # If True, use SSE-style callbacks (old behavior) # To keep track of schemas already registered self.schemas = {} self.spec = APISpec( @@ -25,7 +24,6 @@ def __init__(self, ns, ns_version, sse_only=False): openapi_version="3.0.2", info={ "description": "API schema autogenerated by giREST", - "x-girest-callback-mode": "sse" if sse_only else "url", }, ) # Include the Pointer and Event definitions @@ -78,7 +76,7 @@ def _get_container_element_type_schema(self, container_type_info): return None - def _add_non_sse_parameters(self, operation): + def _add_callback_parameters(self, operation): # Add header parameters for callback authentication operation["parameters"].extend( [ @@ -330,44 +328,26 @@ def _generate_function(self, bim, bi=None, is_constructor=False, is_destructor=F if tag == "interface": interface = GIRepository.type_info_get_interface(arg_type) if interface and interface.get_type() == GIRepository.InfoType.CALLBACK: - # Generate the callback schema (needed in both modes) - full_name = self._generate_callback(interface) + # Generate the callback schema + self._generate_callback(interface) - if self.sse_only: - # SSE-only mode: check scope and skip sync callbacks - scope = GIRepository.arg_info_get_scope(arg) - is_sync = scope == GIRepository.ScopeType.CALL + # Generate full callback specification + # Generate OpenAPI callback specification + # Pass the arg (ArgInfo) so we can get the scope + callback_param, callbacks_obj, is_sync = self._generate_callback_argument(interface, arg, arg_name) - if is_sync: - # Skip methods with synchronous callbacks in SSE-only mode - return + # Add callback parameter to method + params.append(callback_param) - # For async callbacks in SSE-only mode, add callback ID to response (old behavior) - response_props[arg_name] = { - "type": "integer", - "description": "Callback ID", - "x-gi-callback": f"#/components/schemas/{full_name}", + # Store callback info for later + method_callbacks.append( + { + "name": arg_name, + "interface": interface, + "callbacks_obj": callbacks_obj, + "is_synchronous": is_sync, } - else: - # Standard mode: generate full callback specification - # Generate OpenAPI callback specification - # Pass the arg (ArgInfo) so we can get the scope - callback_param, callbacks_obj, is_sync = self._generate_callback_argument( - interface, arg, arg_name - ) - - # Add callback parameter to method - params.append(callback_param) - - # Store callback info for later - method_callbacks.append( - { - "name": arg_name, - "interface": interface, - "callbacks_obj": callbacks_obj, - "is_synchronous": is_sync, - } - ) + ) continue @@ -477,7 +457,7 @@ def _generate_function(self, bim, bi=None, is_constructor=False, is_destructor=F for cb_info in method_callbacks: callbacks_spec.update(cb_info["callbacks_obj"]) operation["callbacks"] = callbacks_spec - self._add_non_sse_parameters(operation) + self._add_callback_parameters(operation) # Only add vendor-specific attributes when they are True if is_constructor: @@ -853,9 +833,9 @@ def _generate_callback(self, bi, schema_name=None, emitter_info=None): return_type = GIRepository.callable_info_get_return_type(bi) return_schema = self._type_to_schema(return_type) - # Create a separate return value schema for HTTP responses (not in SSE-only mode) + # Create a separate return value schema for HTTP responses # This includes the return value AND any output parameters - if (return_schema or response_properties) and not self.sse_only: + if return_schema or response_properties: return_schema_name = f"{full_name}Return" # Add return value to response properties if it exists @@ -872,28 +852,28 @@ def _generate_callback(self, bi, schema_name=None, emitter_info=None): return_wrapper_schema = {"type": "object", "properties": response_properties} self.spec.components.schema(return_schema_name, return_wrapper_schema) - # Add HTTP callback invocation properties only in standard mode (not SSE-only) + # Add HTTP callback invocation properties # These are used when the callback is invoked via HTTP POST (request body) - if not self.sse_only: - request_properties["sessionId"] = {"type": "string", "description": "Session identifier for routing"} - request_properties["callbackName"] = { - "type": "string", - "description": "Name of the callback being invoked", - } - request_properties["args"] = {"type": "array", "description": "Callback arguments in order", "items": {}} - request_properties["invocationNumber"] = {"type": "integer", "description": "Sequential invocation counter"} - request_properties["timestamp"] = { - "type": "string", - "format": "date-time", - "description": "Timestamp of callback invocation", - } + request_properties["sessionId"] = {"type": "string", "description": "Session identifier for routing"} + request_properties["callbackName"] = { + "type": "string", + "description": "Name of the callback being invoked", + } + request_properties["args"] = {"type": "array", "description": "Callback arguments in order", "items": {}} + request_properties["invocationNumber"] = {"type": "integer", "description": "Sequential invocation counter"} + request_properties["timestamp"] = { + "type": "string", + "format": "date-time", + "description": "Timestamp of callback invocation", + } # Create callback schema (request body) - callback_schema = {"type": "object", "x-gi-type": "callback", "properties": request_properties} - - # Add required fields only in standard mode - if not self.sse_only: - callback_schema["required"] = ["sessionId", "callbackName", "args"] + callback_schema = { + "type": "object", + "x-gi-type": "callback", + "properties": request_properties, + "required": ["sessionId", "callbackName", "args"], + } self.spec.components.schema(full_name, callback_schema) return full_name @@ -1168,45 +1148,44 @@ def _generate_signals(self, bi): }, } - # Add callback handling for non-SSE mode - if not self.sse_only: - self._add_non_sse_parameters(operation) - # Add handler callback to request body - operation["requestBody"]["content"]["application/json"]["schema"]["properties"]["handler"] = { - "type": "string", - "format": "uri", - "description": f"Callback URL for '{signal_name}' signal handler", - "x-gi-callback": f"#/components/schemas/{callback_schema_name}", - "x-gi-callback-style": "async", - } - operation["requestBody"]["content"]["application/json"]["schema"]["required"].append("handler") - - # Add callback definition - operation["callbacks"] = { - callback_schema_name: { - "{$request.body#/handler}": { - "post": { - "summary": f"Callback for '{signal_name}' signal", - "description": f"Invoked when the '{signal_name}' signal is emitted", - "requestBody": { - "required": True, - "content": { - "application/json": { - "schema": {"$ref": f"#/components/schemas/{callback_schema_name}"} - } - }, - }, - "responses": { - "204": {"description": "Callback received"}, - "400": {"description": "Invalid callback request"}, - "401": {"description": "Invalid signature"}, - "500": {"description": "Callback processing error"}, + # Add callback handling + self._add_callback_parameters(operation) + # Add handler callback to request body + operation["requestBody"]["content"]["application/json"]["schema"]["properties"]["handler"] = { + "type": "string", + "format": "uri", + "description": f"Callback URL for '{signal_name}' signal handler", + "x-gi-callback": f"#/components/schemas/{callback_schema_name}", + "x-gi-callback-style": "async", + } + operation["requestBody"]["content"]["application/json"]["schema"]["required"].append("handler") + + # Add callback definition + operation["callbacks"] = { + callback_schema_name: { + "{$request.body#/handler}": { + "post": { + "summary": f"Callback for '{signal_name}' signal", + "description": f"Invoked when the '{signal_name}' signal is emitted", + "requestBody": { + "required": True, + "content": { + "application/json": { + "schema": {"$ref": f"#/components/schemas/{callback_schema_name}"} + } }, - "security": [{"callbackSignature": []}], - } + }, + "responses": { + "204": {"description": "Callback received"}, + "400": {"description": "Invalid callback request"}, + "401": {"description": "Invalid signature"}, + "500": {"description": "Callback processing error"}, + }, + "security": [{"callbackSignature": []}], } } } + } # Register the endpoint self.spec.path(path=api, operations={"post": operation}) @@ -1440,7 +1419,7 @@ def _generate_missing(self, ns): "description": "", "operationId": "GObject--signal_connect_data", "tags": ["GObject"], - "parameters": [], # Empty list that will be populated by _add_non_sse_parameters if needed + "parameters": [], # Empty list that will be populated by _add_callback_parameters if needed "requestBody": { "required": True, "content": { @@ -1481,47 +1460,44 @@ def _generate_missing(self, ns): } }, } - # Depending on SSE or not, add the other parameters - if not self.sse_only: - self._add_non_sse_parameters(post_operation) - # Add c_handler to request body schema - post_operation["requestBody"]["content"]["application/json"]["schema"]["properties"]["c_handler"] = { - "type": "string", - "format": "uri", - "description": "Callback URL that will be invoked with signal events", - "x-gi-callback": "#/components/schemas/GObjectCallback", - "x-gi-callback-style": "async", - } - post_operation["requestBody"]["content"]["application/json"]["schema"]["required"].append("c_handler") - - # Add the callbacks - # FIXME reuse the callback generator somehow - post_operation["callbacks"] = { - "Callback": { - "{$request.body#/c_handler}": { - "post": { - "summary": "Callback for signal handler", - "description": "Invoked by the server when the signal is emitted", - "requestBody": { - "required": True, - "content": { - "application/json": {"schema": {"$ref": "#/components/schemas/GObjectCallback"}} - }, - }, - "responses": { - "204": {"description": "Callback received (no content)"}, - "400": {"description": "Invalid callback request"}, - "401": {"description": "Invalid signature or authentication failed"}, - "500": {"description": "Callback processing error"}, + # Add callback parameters + self._add_callback_parameters(post_operation) + # Add c_handler to request body schema + post_operation["requestBody"]["content"]["application/json"]["schema"]["properties"]["c_handler"] = { + "type": "string", + "format": "uri", + "description": "Callback URL that will be invoked with signal events", + "x-gi-callback": "#/components/schemas/GObjectCallback", + "x-gi-callback-style": "async", + } + post_operation["requestBody"]["content"]["application/json"]["schema"]["required"].append("c_handler") + + # Add the callbacks + # FIXME reuse the callback generator somehow + post_operation["callbacks"] = { + "Callback": { + "{$request.body#/c_handler}": { + "post": { + "summary": "Callback for signal handler", + "description": "Invoked by the server when the signal is emitted", + "requestBody": { + "required": True, + "content": { + "application/json": {"schema": {"$ref": "#/components/schemas/GObjectCallback"}} }, - "security": [{"callbackSignature": []}], - } + }, + "responses": { + "204": {"description": "Callback received (no content)"}, + "400": {"description": "Invalid callback request"}, + "401": {"description": "Invalid signature or authentication failed"}, + "500": {"description": "Callback processing error"}, + }, + "security": [{"callbackSignature": []}], } } } - else: - # FIXME return the callback id - pass + } + self.spec.path(path=post_api, operations={"post": post_operation}) def generate(self): diff --git a/girest/girest/resolvers.py b/girest/girest/resolvers.py index 2127555..493817b 100644 --- a/girest/girest/resolvers.py +++ b/girest/girest/resolvers.py @@ -7,7 +7,6 @@ import logging import queue import threading -from collections import deque from typing import Any, Dict import connexion @@ -17,7 +16,6 @@ gi.require_version("GIRepository", "2.0") from gi.repository import GIRepository # noqa: E402 -from starlette.responses import StreamingResponse # noqa: E402 try: from .utils import parse_operation_id @@ -265,116 +263,14 @@ def handle_response_message(self, correlation_id: str, response: dict): class GIResolver(Resolver): - def __init__(self, sse_buffer_size: int = 100): - # SSE event buffer (ring buffer using deque with maxlen) - self.sse_buffer_size = sse_buffer_size - self.sse_events: deque = deque(maxlen=sse_buffer_size) - self.sse_event = None # Will be created in event loop - self._event_loop = None # Reference to the event loop - # Counter for assigning unique IDs to events - # Lock protects both the counter and the deque during event push - self._event_counter = 0 - self._buffer_lock = threading.Lock() + def __init__(self): super().__init__() - def push_sse_event(self, event_data: dict): - """ - Push an event to the SSE buffer. This is thread-safe and non-blocking. - - If the buffer is full, the oldest event will be discarded to make room. - Can be safely called from any thread (e.g., Frida's message handler). - - Args: - event_data: Dictionary containing event data to be sent to SSE clients - """ - # Assign a unique sequential ID and append to buffer atomically - # Lock protects both operations to ensure consistency - with self._buffer_lock: - event_id = self._event_counter - self._event_counter += 1 - - # Wrap the event data with an ID - event_wrapper = {"_sse_id": event_id, "data": event_data} - - self.sse_events.append(event_wrapper) - - # Set the event to notify waiting clients (outside lock) - # Use call_soon_threadsafe if called from a different thread - if self.sse_event is not None and self._event_loop is not None: - self._event_loop.call_soon_threadsafe(self.sse_event.set) - - async def sse_event_generator(self): - """ - Async generator that yields SSE events from the buffer. - - Yields events from the current position in the buffer and then waits - for new events to be pushed. Each generator instance tracks its own - position independently using sequential event IDs. - - Note: If the buffer rotates (oldest events are discarded) while a client - is connected, the client may miss events. This is acceptable for the - use case as it prevents unbounded memory growth. - """ - # Initialize event loop and event on first call - if self.sse_event is None: - self._event_loop = asyncio.get_running_loop() - self.sse_event = asyncio.Event() - - # Track the last event ID we've sent - last_sent_id = -1 - - while True: - has_new_events = False - - # Take an atomic snapshot to avoid mutation during iteration - with self._buffer_lock: - snapshot = list(self.sse_events) - - # Iterate over the snapshot - for event_wrapper in snapshot: - event_id = event_wrapper["_sse_id"] - if event_id > last_sent_id: - last_sent_id = event_id - has_new_events = True - # Yield only the data, not the wrapper - yield event_wrapper["data"] - - # If we yielded events, check again for more before waiting - if has_new_events: - continue - - # Wait for new events - await self.sse_event.wait() - self.sse_event.clear() - - async def sse_callbacks_endpoint(self): - """ - SSE endpoint that streams callback events. - - This endpoint is registered at /GIRest/callbacks and streams events - in Server-Sent Events format. - - Returns: - Async generator yielding SSE-formatted messages - """ - async for event_data in self.sse_event_generator(): - # Format as SSE - message = f"data: {json.dumps(event_data)}\n\n" - yield message - def resolve(self, operation): """We overwrite the resolve method to have access to the path schema""" return Resolution(self.get_function_from_operation(operation), operation.operation_id) def get_function_from_operation(self, operation): - async def sse_callback(): - """SSE endpoint for callback events.""" - return StreamingResponse( - self.sse_callbacks_endpoint(), - media_type="text/event-stream", - headers={"Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no"}, - ) - operation_id = operation.operation_id if not operation_id: @@ -384,9 +280,8 @@ async def sse_callback(): if not parsed: return None - namespace, class_name, method_name, operator = parsed - if namespace == "GIRest" and method_name == "callbacks" and not class_name: - return sse_callback + # No special handling needed for standard operations + return None class FridaResolver(GIResolver): @@ -433,8 +328,6 @@ def __init__( scripts=None, on_log=None, on_message=None, - sse_buffer_size=100, - sse_only=False, ): # Load the corresponding Gir file self.repo = GIRepository.Repository() @@ -452,8 +345,6 @@ def __init__( self._callback_id_counter = 0 # Callback metadata registry: callback_id -> {url, session_id, secret, scope} self._callback_registry = {} - # SSE mode flag - self.sse_only = sse_only # Initialize helper classes for Frida communication self.command_serializer = FridaCommandSerializer() @@ -465,7 +356,7 @@ def __init__( # Initialize message bus with the Frida script self.message_bus = FridaMessageBus(self.scripts[0]) - super().__init__(sse_buffer_size) + super().__init__() def _build_enum_mappings(self): """Build mappings from enum string names to integer values""" @@ -539,83 +430,79 @@ def _on_message(self, message, data): callback_data = payload["data"] callback_id = callback_data.get("callback_id") - if self.sse_only: - # SSE mode: push event to buffer for client polling - self.push_sse_event(callback_data) - else: - # URL mode: make HTTP callback and unlock Frida - if callback_id is None: - logger.error("Callback invocation missing callback_id") - return - - # Look up callback metadata - metadata = self._callback_registry.get(callback_id) - if not metadata: - logger.error(f"Callback {callback_id} not found in registry") - # Still need to unlock Frida with a null result - self.scripts[0].post( - { - "type": f"callback-{callback_id}", - "kind": "callback-response", - "callback_id": callback_id, - "result": None, - } - ) - return - - # Get the handler instance from the registry (created during registration) - handler = metadata.get("handler") - if not handler: - logger.error(f"Callback {callback_id} does not have a handler") - # Still need to unlock Frida with a null result - self.scripts[0].post( - { - "type": f"callback-{callback_id}", - "kind": "callback-response", - "callback_id": callback_id, - "result": None, - } - ) - return - - # Get raw args and convert enums to strings - raw_args = callback_data["args"] - callback_name = metadata["name"] - - # Convert callback arguments (especially enum integers to strings) - args = self._convert_callback_args(raw_args, metadata["callback_type"]) - - # Note: GI scope (call/async/notified/forever) only affects WHEN the - # callback is invoked, not HOW we handle it. All callbacks: - # - Make synchronous HTTP requests - # - Wait for response - # - May or may not have return values (depends on callback signature) - result = None - - # IMPORTANT: Handle callback in a separate thread to allow reentrancy - # This allows the main thread to continue processing HTTP requests - # while we wait for the callback response - def handle_callback_in_thread(): - nonlocal result - # All callbacks handled uniformly - # Pass callback_id for correlation tracking (thread affinity) - result = handler.invoke(callback_name, callback_id, args) - - # Unlock Frida thread with the result - self.scripts[0].post( - { - "type": f"callback-{callback_id}", - "kind": "callback-response", - "callback_id": callback_id, - "result": result, - } - ) - - # Run callback handling in a daemon thread - # This allows the main thread to keep processing HTTP requests - callback_thread = threading.Thread(target=handle_callback_in_thread, daemon=True) - callback_thread.start() - # Note: We don't wait for the thread - it will post the response when ready + # URL mode: make HTTP callback and unlock Frida + if callback_id is None: + logger.error("Callback invocation missing callback_id") + return + + # Look up callback metadata + metadata = self._callback_registry.get(callback_id) + if not metadata: + logger.error(f"Callback {callback_id} not found in registry") + # Still need to unlock Frida with a null result + self.scripts[0].post( + { + "type": f"callback-{callback_id}", + "kind": "callback-response", + "callback_id": callback_id, + "result": None, + } + ) + return + + # Get the handler instance from the registry (created during registration) + handler = metadata.get("handler") + if not handler: + logger.error(f"Callback {callback_id} does not have a handler") + # Still need to unlock Frida with a null result + self.scripts[0].post( + { + "type": f"callback-{callback_id}", + "kind": "callback-response", + "callback_id": callback_id, + "result": None, + } + ) + return + + # Get raw args and convert enums to strings + raw_args = callback_data["args"] + callback_name = metadata["name"] + + # Convert callback arguments (especially enum integers to strings) + args = self._convert_callback_args(raw_args, metadata["callback_type"]) + + # Note: GI scope (call/async/notified/forever) only affects WHEN the + # callback is invoked, not HOW we handle it. All callbacks: + # - Make synchronous HTTP requests + # - Wait for response + # - May or may not have return values (depends on callback signature) + result = None + + # IMPORTANT: Handle callback in a separate thread to allow reentrancy + # This allows the main thread to continue processing HTTP requests + # while we wait for the callback response + def handle_callback_in_thread(): + nonlocal result + # All callbacks handled uniformly + # Pass callback_id for correlation tracking (thread affinity) + result = handler.invoke(callback_name, callback_id, args) + + # Unlock Frida thread with the result + self.scripts[0].post( + { + "type": f"callback-{callback_id}", + "kind": "callback-response", + "callback_id": callback_id, + "result": result, + } + ) + + # Run callback handling in a daemon thread + # This allows the main thread to keep processing HTTP requests + callback_thread = threading.Thread(target=handle_callback_in_thread, daemon=True) + callback_thread.start() + # Note: We don't wait for the thread - it will post the response when ready else: # For now, just log other messages logger.debug(f"Message from Frida: {message}") @@ -1573,9 +1460,6 @@ async def frida_resolver_handler(_method=None, _type=None, _endpoint=None, *args converted_kwargs = {} n_args = GIRepository.callable_info_get_n_args(_method) - # Callback ids to return, only relevant in SSE mode - callbacks = {} - # Add 'self' as a parameter if _type["is_method"]: converted_kwargs["this"] = kwargs["self"]["ptr"] @@ -1587,18 +1471,6 @@ async def frida_resolver_handler(_method=None, _type=None, _endpoint=None, *args # Some args might not be on the passed in args, like output params if arg_name in kwargs: converted_kwargs[arg_name] = self._arg_from_rest(kwargs[arg_name], arg, headers) - # In SSE Mode, the callback is never passed in REST, but we - # need to generate the callback information - elif self.sse_only and not _type["arguments"][i]["is_destroy"]: - arg_type = GIRepository.arg_info_get_type(arg) - tag = GIRepository.type_tag_to_string(GIRepository.type_info_get_tag(arg_type)) - if tag != "interface": - continue - interface = GIRepository.type_info_get_interface(arg_type) - info_type = interface.get_type() - if info_type == GIRepository.InfoType.CALLBACK: - callback_id = self._generate_callback(None, arg, info_type, headers) - callbacks[arg_name] = converted_kwargs[arg_name] = callback_id # Serialize the command command = self.command_serializer.serialize_call( @@ -1635,12 +1507,6 @@ async def execute_async(): # Use common response parsing logic result = self._parse_response(result, _endpoint, method_info=_method) - # In SSE mode, return also the callback id - if callbacks: - if not result: - result = {} - result.update(callbacks) - return result return frida_resolver_handler diff --git a/girest/girest/templates/main.ts.j2 b/girest/girest/templates/main.ts.j2 index c084fcc..fa0afe4 100644 --- a/girest/girest/templates/main.ts.j2 +++ b/girest/girest/templates/main.ts.j2 @@ -2,7 +2,6 @@ * {{ title }} * Version: {{ version }} * Auto-generated by girest-client-generator - * Mode: {{ 'SSE' if sse_mode else 'Non-SSE (URL-based callbacks)' }} */ // Configuration for API server location @@ -35,13 +34,6 @@ export function setApiConfig(config: { host?: string; port?: number; basePath?: if (config.basePath !== undefined) { apiConfig.basePath = config.basePath; } - -{% if sse_mode %} - // Reinitialize callback EventSource with new URL - if (typeof EventSource !== 'undefined' && (config.host !== undefined || config.port !== undefined || config.basePath !== undefined)) { - initializeCallbackSource(); - } -{% endif %} } /** @@ -62,57 +54,7 @@ const objectRegistry = new FinalizationRegistry((ptr: string) => { .catch(err => console.error('Failed to unref object:', ptr, err)); }); -{% if sse_mode %} -// SSE Mode: Callback dispatcher for handling callbacks from the server via EventSource -interface CallbackEntry { - converter: (data: any) => any[]; - userFunction: Function; -} -const callbackDispatcher = new Map(); - -let callbackSource: EventSource | null = null; -let isReinitializing = false; - -function initializeCallbackSource(): void { - // Prevent concurrent reinitialization - if (isReinitializing) { - return; - } - isReinitializing = true; - - // Close existing EventSource if it exists - if (callbackSource) { - callbackSource.close(); - callbackSource = null; - } - - // Initialize callback dispatcher with EventSource - if (typeof EventSource !== 'undefined') { - callbackSource = new EventSource(apiConfig.fullBaseUrl + '/GIRest/callbacks'); - callbackSource.onmessage = (ev) => { - try { - const json = JSON.parse(ev.data); - const callbackEntry = callbackDispatcher.get(json.id.toString()); - if (callbackEntry) { - const convertedArgs = callbackEntry.converter(json.data); - callbackEntry.userFunction(...convertedArgs); - } - } catch (error) { - console.error('Error processing callback:', error); - } - }; - callbackSource.onerror = (error) => { - console.error('Callback EventSource error:', error); - }; - callbackSource.onopen = () => { - isReinitializing = false; - }; - } else { - isReinitializing = false; - } -} -{% else %} -// Non-SSE Mode: URL-based callbacks +// URL-based callbacks // The client must implement ICallbackHandler to receive and process callbacks // Track active correlation ID for thread affinity @@ -140,7 +82,7 @@ export function getActiveCorrelationId(): string | null { } /** - * Interface for callback handling in non-SSE mode. + * Interface for callback handling. * Implementations should handle callback registration and invocation. */ export interface ICallbackHandler { @@ -171,7 +113,7 @@ export interface ICallbackHandler { let callbackHandler: ICallbackHandler | null = null; /** - * Set the callback handler for non-SSE mode. + * Set the callback handler. * This handler is responsible for managing callback registrations and invocations. * * @param handler An implementation of ICallbackHandler @@ -186,7 +128,6 @@ export function setCallbackHandler(handler: ICallbackHandler): void { export function getCallbackHandler(): ICallbackHandler | null { return callbackHandler; } -{% endif %} // Define the core interfaces for objects and structs memory management interface ObjectLikeInstance { diff --git a/girest/girest/templates/method.ts.j2 b/girest/girest/templates/method.ts.j2 index 79b67e2..7de8ed8 100644 --- a/girest/girest/templates/method.ts.j2 +++ b/girest/girest/templates/method.ts.j2 @@ -5,10 +5,10 @@ {% set callback_url_params = [] %} {% set header_params_list = [] %} -{# Process query parameters - skip callback URL params in non-SSE mode as they're handled separately #} +{# Process query parameters - skip callback URL params as they're handled separately #} {% for param in method.query_params %} {% if param.is_callback %} - {# Non-SSE mode: callback URL parameter - we'll handle this separately #} + {# Callback URL parameter - we'll handle this separately #} {% set _ = callback_url_params.append(param) %} {% else %} {% set ts_type = param.type.lang_type %} @@ -20,34 +20,23 @@ {% endif %} {% endfor %} -{# For SSE mode: Add callback function parameters from return parameters that are callbacks #} -{% if method.uses_sse_callbacks %} - {% for return_param in method.return_obj.return_params %} - {% if return_param.is_callback %} - {% set _ = callback_params.append(return_param.valid_name + ": " + return_param.callback.name) %} - {% endif %} - {% endfor %} -{% endif %} +{# Add callback function parameters for each callback_url param #} +{% for cb_url_param in callback_url_params %} + {# Find the callback schema reference #} + {% set cb_ref = cb_url_param.schema_section.get('x-gi-callback', '') %} + {% set cb_schema_name = cb_ref.split('/')[-1] if cb_ref else 'Function' %} + {% set _ = callback_params.append(cb_url_param.valid_name + ": " + cb_schema_name) %} +{% endfor %} -{# For non-SSE mode: Add callback function parameters for each callback_url param #} -{% if not method.uses_sse_callbacks %} - {% for cb_url_param in callback_url_params %} - {# Find the callback schema reference #} - {% set cb_ref = cb_url_param.schema_section.get('x-gi-callback', '') %} - {% set cb_schema_name = cb_ref.split('/')[-1] if cb_ref else 'Function' %} - {% set _ = callback_params.append(cb_url_param.valid_name + ": " + cb_schema_name) %} - {% endfor %} - - {# Add header parameters for non-SSE mode (sessionId, callbackSecret) #} - {% for header_param in method.header_params %} - {% set _ = header_params_list.append(header_param) %} - {% if header_param.required %} - {% set _ = required_params.append(header_param.valid_name.replace('-', '_') + ": string") %} - {% else %} - {% set _ = optional_params.append(header_param.valid_name.replace('-', '_') + "?: string") %} - {% endif %} - {% endfor %} -{% endif %} +{# Add header parameters (sessionId, callbackSecret) #} +{% for header_param in method.header_params %} + {% set _ = header_params_list.append(header_param) %} + {% if header_param.required %} + {% set _ = required_params.append(header_param.valid_name.replace('-', '_') + ": string") %} + {% else %} + {% set _ = optional_params.append(header_param.valid_name.replace('-', '_') + "?: string") %} + {% endif %} +{% endfor %} {% set all_params = required_params + callback_params + optional_params %} {% set params_string = all_params | join(", ") %} @@ -57,8 +46,7 @@ {% if method.return_obj.return_params %} {% if method.return_obj.return_params|length == 1 %} {% set return_param = method.return_obj.return_params[0] %} - {# In SSE mode, callback params are in the return; skip them for the TS return type #} - {% if not (method.uses_sse_callbacks and return_param.is_callback) %} + {% if not return_param.is_callback %} {% set method_return_signature = return_param.type.lang_type %} {% if return_param.can_be_null %} {% set method_return_signature = method_return_signature + " | null" %} @@ -67,8 +55,7 @@ {% else %} {% set return_type_parts = [] %} {% for return_param in method.return_obj.return_params %} - {# In SSE mode, skip callback params from the return type #} - {% if not (method.uses_sse_callbacks and return_param.is_callback) %} + {% if not return_param.is_callback %} {% set type_str = return_param.type.lang_type %} {% if return_param.can_be_null %} {% set type_str = type_str + " | null" %} @@ -113,10 +100,9 @@ {% include 'param.ts.j2' %} {% endif %} {% endfor %} -{% if not method.uses_sse_callbacks %} -{# Non-SSE mode: Use callback handler interface #} +{# Use callback handler interface #} {% for cb_url_param in callback_url_params %} - // Non-SSE mode: Register callback using the callback handler + // Register callback using the callback handler let {{ cb_url_param.name }}_callbackInfo: { callbackUrl: string; callbackId: string } | undefined; if (typeof {{ cb_url_param.valid_name }} !== 'undefined') { const callbackHandler = getCallbackHandler(); @@ -138,10 +124,9 @@ url.searchParams.append('{{ cb_url_param.name }}', {{ cb_url_param.name }}_callbackInfo.callbackUrl); } {% endfor %} -{% endif %} try { -{% if not method.uses_sse_callbacks and header_params_list|length > 0 %} - // Non-SSE mode: Add headers for callback authentication +{% if header_params_list|length > 0 %} + // Add headers for callback authentication const headers: Record = {}; {% for header_param in header_params_list %} {% set header_var_name = header_param.valid_name.replace('-', '_') %} @@ -175,17 +160,6 @@ } {% if not method.return_obj.is_void or method.callback_params %} const data = await response.json(); -{% if method.uses_sse_callbacks and method.callback_params %} - // SSE mode: Register callbacks with dispatcher -{% for cb in method.callback_params %} - if (data.{{ cb.name }} !== undefined) { - callbackDispatcher.set(data.{{ cb.name }}.toString(), { - converter: convert{{ cb.callback.name }}Args, - userFunction: {{ cb.valid_name }} - }); - } -{% endfor %} -{% endif %} {% if not method.return_obj.is_void %} {{ method.return_obj.generate() }} {% else %} diff --git a/girest/girest/templates/method_post.ts.j2 b/girest/girest/templates/method_post.ts.j2 index 9448668..1d0eda9 100644 --- a/girest/girest/templates/method_post.ts.j2 +++ b/girest/girest/templates/method_post.ts.j2 @@ -6,7 +6,7 @@ {% set header_params_list = [] %} {% set body_props = [] %} -{# Process header parameters (session-id, callback-secret in non-SSE mode) #} +{# Process header parameters (session-id, callback-secret) #} {% for header_param in method.header_params %} {% set _ = header_params_list.append(header_param) %} {% if header_param.required %} @@ -19,7 +19,7 @@ {# Process body properties #} {% for prop in method.body_properties %} {% if prop.is_callback %} - {# Non-SSE mode: callback URL parameter - we'll handle this separately #} + {# Callback URL parameter - we'll handle this separately #} {% set _ = callback_url_params.append(prop) %} {# Add the callback function parameter #} {% set cb_schema_name = prop.callback %} @@ -91,7 +91,7 @@ {% endfor %} const url = new URL(`${apiConfig.normalizedBasePath}{{ ns.method_path }}`, apiConfig.baseUrl); try { - // Non-SSE mode: Add headers for callback authentication + // Add headers for callback authentication const headers: Record = { 'Content-Type': 'application/json' }; @@ -106,7 +106,7 @@ const body: Record = {}; {% for prop in method.body_properties %} {% if prop.is_callback %} - // Non-SSE mode: Register callback using the callback handler + // Register callback using the callback handler if (typeof {{ prop.valid_name }} !== 'undefined') { const callbackHandler = getCallbackHandler(); if (!callbackHandler) { diff --git a/girest/tests/e2e/conftest.py b/girest/tests/e2e/conftest.py index f5ed7d8..0cb6c20 100644 --- a/girest/tests/e2e/conftest.py +++ b/girest/tests/e2e/conftest.py @@ -5,7 +5,7 @@ These fixtures manage the lifecycle of: - GStreamer pipeline process (gst-launch) - GIRest server process (girest-frida.py) -- Mock callback server (for non-SSE tests) +- Mock callback server (for callback tests) """ import asyncio @@ -125,9 +125,9 @@ def assert_has_ptr(obj, msg="Object should have ptr"): def assert_callback_invocation(callback_data, expected_args=None): """ - Assert that callback data has the correct structure for non-SSE callback invocations. + Assert that callback data has the correct structure for callback invocations. - All non-SSE callbacks follow the same structure: + All callbacks follow the same structure: - sessionId: Session identifier - callbackName: Name of the callback parameter - args: Dictionary containing the actual callback arguments (by parameter name) @@ -220,10 +220,10 @@ def gst_pipeline(): @pytest.fixture(scope="session") def girest_server(gst_pipeline): """ - Start the GIRest server in non-SSE mode (session-scoped). + Start the GIRest server (session-scoped). Launches girest-frida.py attached to the GStreamer pipeline via Frida. - This server is used for all basic tests and non-SSE callback tests. + This server is used for all basic tests and callback tests. Args: gst_pipeline: PID of the running GStreamer pipeline @@ -231,33 +231,15 @@ def girest_server(gst_pipeline): Yields: str: Base URL of the running server (http://localhost:9000) """ - yield from _start_girest_server(gst_pipeline, sse_only=False, port=9000) + yield from _start_girest_server(gst_pipeline, port=9000) -@pytest.fixture(scope="session") -def girest_server_sse(gst_pipeline): - """ - Start the GIRest server in SSE-only mode (session-scoped). - - Launches girest-frida.py with --sse-only flag. - This server is only used for SSE callback tests. - - Args: - gst_pipeline: PID of the running GStreamer pipeline - - Yields: - str: Base URL of the running server (http://localhost:9001) - """ - yield from _start_girest_server(gst_pipeline, sse_only=True, port=9001) - - -def _start_girest_server(gst_pipeline, sse_only=False, port=9000): +def _start_girest_server(gst_pipeline, port=9000): """ Internal helper to start GIRest server with specified configuration. Args: gst_pipeline: PID of the running GStreamer pipeline - sse_only: Whether to enable SSE-only mode port: Port number for the server Yields: @@ -273,10 +255,8 @@ def _start_girest_server(gst_pipeline, sse_only=False, port=9000): # Get path to girest-frida.py girest_path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "girest-frida.py") - # Build command with sse_only flag if needed + # Build command cmd = ["python3", "-u", girest_path, "Gst", "1.0", "--pid", str(gst_pipeline), "--port", str(port)] - if sse_only: - cmd.append("--sse-only") # Create log file for server output log_file = tempfile.NamedTemporaryFile(mode="w+", delete=False, suffix=".log", prefix="girest-server-") @@ -289,8 +269,7 @@ def _start_girest_server(gst_pipeline, sse_only=False, port=9000): ) # Wait for server to be ready by monitoring stdout for the "Uvicorn running" message - mode = "SSE-only" if sse_only else "non-SSE" - print(f"\n✓ Starting GIRest server in {mode} mode (attaching to PID {gst_pipeline})...") + print(f"\n✓ Starting GIRest server (attaching to PID {gst_pipeline})...") ready = False startup_output = [] @@ -372,7 +351,7 @@ def capture_output(): @pytest.fixture async def callback_server(): """ - Create a mock HTTP callback server for non-SSE callback tests. + Create a mock HTTP callback server for callback tests. This server receives callback POSTs from girest and records them for test validation. Each test gets a fresh server instance with diff --git a/girest/tests/e2e/test_e2e_callbacks_nonsse.py b/girest/tests/e2e/test_e2e_callbacks_nonsse.py index dbcdc9a..c059d2d 100644 --- a/girest/tests/e2e/test_e2e_callbacks_nonsse.py +++ b/girest/tests/e2e/test_e2e_callbacks_nonsse.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 """ -End-to-end tests for non-SSE callback handling in GIRest. +End-to-end tests for callback handling in GIRest. These tests verify URL-based callbacks by: 1. Starting a mock HTTP server to receive callback POSTs @@ -9,7 +9,7 @@ 4. Verifying the server receives POST requests with correct callback data 5. Testing different callback scopes (call/sync, async, notified, forever) -Non-SSE callbacks work by: +Callbacks work by: - Client provides a callback URL in the method call - GIRest POSTs callback data to that URL (with HMAC signature) - For sync callbacks (scope=call): waits for response diff --git a/girest/tests/e2e/test_e2e_callbacks_sse.py b/girest/tests/e2e/test_e2e_callbacks_sse.py deleted file mode 100644 index ac1d68f..0000000 --- a/girest/tests/e2e/test_e2e_callbacks_sse.py +++ /dev/null @@ -1,164 +0,0 @@ -#!/usr/bin/env python3 -""" -End-to-end tests for SSE (Server-Sent Events) callback handling in GIRest. - -These tests verify SSE-based callbacks by: -1. Triggering GStreamer operations that use callbacks (e.g., foreach_pad) -2. Listening to the /GIRest/callbacks SSE endpoint -3. Verifying callback events are received with correct data structure -4. Testing callback lifecycle (registration, invocation) - -SSE callbacks are used when the client can maintain a persistent connection -and receive real-time updates as events occur. -""" - -import asyncio -import json - -import httpx -import pytest -from conftest import assert_api_success - - -@pytest.mark.skip(reason="SSE callbacks will be removed in future version") -@pytest.mark.asyncio -async def test_callbacks_endpoint_with_foreach_pad(girest_server_sse): - """ - Test the callbacks endpoint by creating a fakesrc element and iterating over its pads. - - This test validates the callbacks functionality by: - 1. Creating a fakesrc element - 2. Calling the foreach_pad endpoint which should emit callbacks - 3. Listening to the /GIRest/callbacks endpoint to receive callback events - 4. Validating that callback events are properly emitted and received - - The foreach_pad function will iterate over all pads of the element and call - the provided callback function for each pad. Based on the callback schema, - each callback should include the element, pad, and user_data parameters. - """ - async with httpx.AsyncClient(timeout=15.0) as client: - # Step 1: Create a fakesrc element - response = await client.get( - f"{girest_server_sse}/Gst/ElementFactory/make", params={"factoryname": "fakesrc", "name": "test_fakesrc"} - ) - assert_api_success(response, "Failed to create fakesrc element") - response_data = response.json() - assert "return" in response_data, "fakesrc creation should return an object" - assert "ptr" in response_data["return"], "fakesrc creation should return an object" - fakesrc_ptr = response_data["return"]["ptr"] - - # Step 2: Start listening to callbacks in a separate task - callback_events = [] - - async def listen_to_callbacks(): - """Listen to the callbacks endpoint and collect events""" - try: - async with client.stream("GET", f"{girest_server_sse}/GIRest/callbacks") as response: - if response.status_code != 200: - print(f"Failed to connect to callbacks endpoint: {response.status_code}") - return - - # Read Server-Sent Events for a limited time - async for line in response.aiter_lines(): - if line.strip(): - # Parse Server-Sent Events format - if line.startswith("data: "): - try: - # The event data is JSON inside the SSE data field - event_json = json.loads(line[6:]) # Remove "data: " prefix - - # Based on the documentation, the event should have: - # - kind: "callback" - # - data: { id: callback_id, data: { ... callback params ... } } - if event_json.get("kind") == "callback": - callback_data = event_json.get("data", {}) - callback_events.append(callback_data) - print(f"Received callback event: {callback_data}") - except json.JSONDecodeError: - print(f"Failed to parse callback event: {line}") - - # Stop after receiving some events or after reasonable number - # fakesrc typically has one source pad, so we expect 1 callback - if len(callback_events) >= 3: - break - except Exception as e: - print(f"Error listening to callbacks: {e}") - - # Step 3: Start the callback listener - callback_task = asyncio.create_task(listen_to_callbacks()) - - # Give the callback listener a moment to connect - await asyncio.sleep(1) - - # Step 4: Call foreach_pad to trigger callbacks - response = await client.get(f"{girest_server_sse}/Gst/Element/ptr,{fakesrc_ptr}/foreach_pad") - assert_api_success(response, "Failed to call foreach_pad on fakesrc") - response_data = response.json() - - # The response should contain a callback ID and return value - assert "func" in response_data, "foreach_pad response should contain callback ID" - assert "return" in response_data, "foreach_pad response should contain return value" - # Callback ID can be string or int, convert to int for comparison - callback_id_str = str(response_data["func"]) - callback_id = int(callback_id_str) if callback_id_str.isdigit() else None - assert callback_id is not None, f"Callback ID should be numeric: {response_data['func']}" - assert isinstance(response_data["return"], (bool, int)), "Return value should be a boolean or integer" - - # Step 5: Wait for callbacks to be processed - await asyncio.sleep(3) - - # Cancel the callback listener - callback_task.cancel() - try: - await callback_task - except asyncio.CancelledError: - pass - - # Step 6: Validate that we received callback events or that the API call succeeded - # Even if we didn't receive callback events due to streaming issues, - # the fact that foreach_pad returned successfully with a callback ID - # indicates the callback system is working - if len(callback_events) > 0: - print(f"✓ Successfully received {len(callback_events)} callback events via streaming") - - # Validate the structure of callback events - for event in callback_events: - # Based on the documentation, the event structure should be: - # { id: callback_id, data: { element: {...}, pad: {...}, user_data: {...} } } - assert "id" in event, f"Callback event should contain id: {event}" - assert "data" in event, f"Callback event should contain data: {event}" - - # The callback ID should match the one returned by foreach_pad - event_callback_id = int(str(event["id"])) if str(event["id"]).isdigit() else None - assert ( - event_callback_id == callback_id - ), f"Callback ID mismatch: expected {callback_id}, got {event['id']}" - - callback_data = event["data"] - - # Based on GstElementForeachPadFunc signature: (element, pad, user_data) -> bool - assert "element" in callback_data, f"Callback data should contain element: {callback_data}" - assert "pad" in callback_data, f"Callback data should contain pad: {callback_data}" - assert "user_data" in callback_data, f"Callback data should contain user_data: {callback_data}" - - # The element should be our fakesrc (verify pointer matches) - element = callback_data["element"] - assert "ptr" in element, f"Element should be a valid object with ptr: {element}" - assert ( - element["ptr"] == fakesrc_ptr - ), f"Element pointer mismatch in callback: expected {fakesrc_ptr}, got {element['ptr']}" - - # The pad should be a valid GstPad object - pad = callback_data["pad"] - assert "ptr" in pad, f"Pad should be a valid object with ptr: {pad}" - else: - # If no callback events were received, that's ok for this test - # The important thing is that the foreach_pad call succeeded and returned a callback ID - print("⚠ No callback events received via streaming (possibly due to server streaming issues)") - print( - "✓ However, foreach_pad call succeeded and returned a callback ID, indicating the callback system is working" - ) - - print( - f"✓ Successfully tested callbacks endpoint with foreach_pad - API call succeeded with callback ID {callback_id}" - ) diff --git a/gstaudit/lib/callbacks.ts b/gstaudit/lib/callbacks.ts index a504e4a..d62f91c 100644 --- a/gstaudit/lib/callbacks.ts +++ b/gstaudit/lib/callbacks.ts @@ -36,8 +36,8 @@ export type CallbackFunction = (...args: any[]) => void | Promise; export type ConverterFunction = (data: any) => Promise | any[]; /** - * Interface for callback handling in non-SSE mode - * This matches the ICallbackHandler interface from generated non-SSE bindings + * Interface for callback handling + * This matches the ICallbackHandler interface from generated bindings */ export interface ICallbackHandler { registerCallback( diff --git a/gstaudit/lib/server/websocket-handler.ts b/gstaudit/lib/server/websocket-handler.ts index 9bef40b..2093514 100644 --- a/gstaudit/lib/server/websocket-handler.ts +++ b/gstaudit/lib/server/websocket-handler.ts @@ -133,7 +133,7 @@ async function initializeConnection( // Get or create handler for this connection const handler = manager.getOrCreateHandler(connectionId, { host, port }); - // Configure non-SSE bindings and register callbacks + // Configure bindings and register callbacks const { setApiConfig, setCallbackHandler } = await import('@/lib/gst'); setApiConfig({ host, port, basePath: '/girest' });