From 449932957e40da452e488f77989e90d9aeb08310 Mon Sep 17 00:00:00 2001 From: Angelo Giacco Date: Mon, 15 Sep 2025 15:35:58 +0100 Subject: [PATCH 01/20] add webrtc support to python sdk --- WEBRTC_CHANGES.md | 229 +++++++++++++++++ examples/webrtc_conversation_example.py | 189 ++++++++++++++ pyproject.toml | 1 + .../conversational_ai/base_connection.py | 59 +++++ .../conversational_ai/connection_factory.py | 45 ++++ .../conversational_ai/conversation.py | 29 +++ .../conversational_ai/conversation_factory.py | 235 +++++++++++++++++ .../conversational_ai/webrtc_connection.py | 132 ++++++++++ .../conversational_ai/webrtc_conversation.py | 241 ++++++++++++++++++ .../conversational_ai/websocket_connection.py | 57 +++++ tests/test_webrtc_conversation.py | 226 ++++++++++++++++ 11 files changed, 1443 insertions(+) create mode 100644 WEBRTC_CHANGES.md create mode 100644 examples/webrtc_conversation_example.py create mode 100644 src/elevenlabs/conversational_ai/base_connection.py create mode 100644 src/elevenlabs/conversational_ai/connection_factory.py create mode 100644 src/elevenlabs/conversational_ai/conversation_factory.py create mode 100644 src/elevenlabs/conversational_ai/webrtc_connection.py create mode 100644 src/elevenlabs/conversational_ai/webrtc_conversation.py create mode 100644 src/elevenlabs/conversational_ai/websocket_connection.py create mode 100644 tests/test_webrtc_conversation.py diff --git a/WEBRTC_CHANGES.md b/WEBRTC_CHANGES.md new file mode 100644 index 00000000..30ab0885 --- /dev/null +++ b/WEBRTC_CHANGES.md @@ -0,0 +1,229 @@ +# WebRTC Support Implementation for ElevenLabs Python SDK + +This document summarizes the WebRTC support implementation added to the ElevenLabs Python SDK, following the same architecture as the JavaScript SDK. + +## Overview + +WebRTC support has been added to enable real-time, low-latency conversations with ElevenLabs agents using the LiveKit WebRTC infrastructure. This provides an alternative to the existing WebSocket-based connections with improved performance for real-time audio applications. + +## Files Added + +### Core Implementation + +1. **`src/elevenlabs/conversational_ai/base_connection.py`** + - Abstract base class for all connection types + - Defines the common interface for WebSocket and WebRTC connections + - Includes `ConnectionType` enum with `WEBSOCKET` and `WEBRTC` options + +2. **`src/elevenlabs/conversational_ai/websocket_connection.py`** + - WebSocket connection implementation extending `BaseConnection` + - Maintains existing WebSocket functionality in the new architecture + +3. **`src/elevenlabs/conversational_ai/webrtc_connection.py`** + - WebRTC connection implementation using LiveKit Python SDK + - Handles LiveKit room management, audio tracks, and data channels + - Supports automatic conversation token fetching from ElevenLabs API + +4. **`src/elevenlabs/conversational_ai/connection_factory.py`** + - Factory functions for creating connections based on type + - Includes logic for determining connection type based on parameters + +5. **`src/elevenlabs/conversational_ai/webrtc_conversation.py`** + - WebRTC-specific conversation class extending `BaseConversation` + - Provides async interface for WebRTC conversations + - Integrates with LiveKit for real-time audio streaming + +6. **`src/elevenlabs/conversational_ai/conversation_factory.py`** + - High-level factory functions for creating different conversation types + - Includes convenience functions `create_webrtc_conversation()` and `create_websocket_conversation()` + - Provides unified `create_conversation()` function with connection type selection + +### Testing + +7. **`tests/test_webrtc_conversation.py`** + - Comprehensive test suite for WebRTC functionality + - Tests connection type determination, factory functions, and conversation lifecycle + - Includes mocked LiveKit integration tests + +### Examples + +8. **`examples/webrtc_conversation_example.py`** + - Complete working examples of WebRTC conversation usage + - Shows both explicit token and automatic token fetching approaches + - Demonstrates the differences between WebSocket and WebRTC connections + +## Files Modified + +### Dependencies + +1. **`pyproject.toml`** + - Added `livekit = ">=0.15.0"` dependency for WebRTC support + +### Core Conversation Module + +2. **`src/elevenlabs/conversational_ai/conversation.py`** + - Updated `ConversationInitiationData` to include `connection_type` and `conversation_token` parameters + - Added imports for the new connection system + - Added helper methods `_determine_connection_type()` and `_create_connection()` to `BaseConversation` + +## Key Features + +### Connection Types + +- **WebSocket (existing)**: Traditional WebSocket-based connections +- **WebRTC (new)**: Real-time connections using LiveKit infrastructure + +### Authentication Methods + +- **Agent ID**: For public agents, no additional authentication required +- **Conversation Token**: For private agents, obtained from ElevenLabs API +- **Automatic Token Fetching**: SDK can automatically fetch tokens when agent ID is provided + +### API Design + +The implementation follows the same patterns as the JavaScript SDK: + +```python +# WebRTC conversation with explicit token +conversation = create_webrtc_conversation( + client=client, + agent_id="your-agent-id", + conversation_token="your-token", + audio_interface=async_audio_interface, + callback_agent_response=on_response +) + +# WebRTC conversation with automatic token fetching +conversation = create_webrtc_conversation( + client=client, + agent_id="your-agent-id", # Token will be fetched automatically + audio_interface=async_audio_interface +) + +# Generic factory with connection type +conversation = create_conversation( + client=client, + agent_id="your-agent-id", + connection_type=ConnectionType.WEBRTC, + audio_interface=async_audio_interface +) +``` + +### Backward Compatibility + +- All existing WebSocket-based conversation code continues to work unchanged +- New connection types are opt-in through explicit parameters +- Default behavior remains WebSocket connections + +## Technical Architecture + +### Connection Hierarchy + +``` +BaseConnection (abstract) +├── WebSocketConnection +└── WebRTCConnection (uses LiveKit) +``` + +### Conversation Hierarchy + +``` +BaseConversation +├── Conversation (sync WebSocket) +├── AsyncConversation (async WebSocket) +└── WebRTCConversation (async WebRTC) +``` + +### Factory Pattern + +The implementation uses factory functions to create appropriate conversation types based on: +- Explicit connection type parameter +- Presence of conversation token (implies WebRTC) +- Audio interface type (sync vs async) +- Callback function types (sync vs async) + +## Benefits of WebRTC Implementation + +1. **Lower Latency**: Direct peer-to-peer audio streaming +2. **Better Audio Quality**: Optimized for real-time audio +3. **Reduced Server Load**: Audio doesn't go through application servers +4. **Adaptive Bitrate**: Automatic quality adjustment based on network conditions +5. **Better Connectivity**: NAT traversal and firewall handling + +## Usage Examples + +### Basic WebRTC Conversation + +```python +import asyncio +from elevenlabs import ElevenLabs +from elevenlabs.conversational_ai.conversation_factory import create_webrtc_conversation + +async def main(): + client = ElevenLabs(api_key="your-api-key") + + conversation = create_webrtc_conversation( + client=client, + agent_id="your-agent-id", + audio_interface=YourAsyncAudioInterface(), + ) + + await conversation.start_session() + await conversation.send_user_message("Hello!") + # ... conversation logic + await conversation.end_session() + +asyncio.run(main()) +``` + +### Connection Type Comparison + +```python +# WebSocket (existing) +ws_conversation = create_conversation( + client=client, + agent_id="agent-id", + connection_type=ConnectionType.WEBSOCKET, + audio_interface=SyncAudioInterface() # Sync interface +) + +# WebRTC (new) +webrtc_conversation = create_conversation( + client=client, + agent_id="agent-id", + connection_type=ConnectionType.WEBRTC, + audio_interface=AsyncAudioInterface() # Async interface required +) +``` + +## Testing + +The implementation includes comprehensive tests covering: + +- Connection type determination logic +- Factory function behavior +- WebRTC conversation lifecycle +- Message handling +- Error conditions +- Token fetching + +All tests use proper mocking to avoid external dependencies during testing. + +## Future Considerations + +1. **Audio Interface Implementations**: Additional concrete audio interface implementations for common use cases +2. **Advanced WebRTC Features**: Support for video, screen sharing, or advanced audio processing +3. **Monitoring and Analytics**: Integration with LiveKit's monitoring features +4. **Connection Fallback**: Automatic fallback from WebRTC to WebSocket in case of connection issues + +## Migration Guide + +For users wanting to upgrade from WebSocket to WebRTC: + +1. Install the updated SDK with `livekit` dependency +2. Update audio interface to async (`AsyncAudioInterface`) +3. Update callback functions to async +4. Change connection type to `ConnectionType.WEBRTC` +5. Provide conversation token or agent ID for authentication + +The migration is non-breaking - existing code continues to work without changes. \ No newline at end of file diff --git a/examples/webrtc_conversation_example.py b/examples/webrtc_conversation_example.py new file mode 100644 index 00000000..4f73a5ee --- /dev/null +++ b/examples/webrtc_conversation_example.py @@ -0,0 +1,189 @@ +#!/usr/bin/env python3 +""" +Example demonstrating WebRTC conversation support in the ElevenLabs Python SDK. + +This example shows how to use the new WebRTC connection type for real-time +conversations with ElevenLabs agents using LiveKit. +""" + +import asyncio +import os +from elevenlabs import ElevenLabs +from elevenlabs.conversational_ai.conversation_factory import create_webrtc_conversation +from elevenlabs.conversational_ai.conversation import AsyncAudioInterface +from elevenlabs.conversational_ai.base_connection import ConnectionType + + +class SimpleAsyncAudioInterface(AsyncAudioInterface): + """A simple example audio interface for WebRTC conversations.""" + + async def start(self, input_callback): + """Start the audio interface with input callback.""" + print("Audio interface started") + self.input_callback = input_callback + # In a real implementation, you would set up audio capture here + + async def stop(self): + """Stop the audio interface.""" + print("Audio interface stopped") + # In a real implementation, you would clean up audio resources here + + async def output(self, audio: bytes): + """Output audio to the user.""" + print(f"Received audio output: {len(audio)} bytes") + # In a real implementation, you would play the audio here + + async def interrupt(self): + """Handle interruption signal.""" + print("Audio output interrupted") + # In a real implementation, you would stop current audio playback here + + +async def webrtc_conversation_example(): + """Example of using WebRTC conversation with ElevenLabs.""" + + # Initialize the ElevenLabs client + client = ElevenLabs(api_key=os.getenv("ELEVENLABS_API_KEY")) + + # Create audio interface + audio_interface = SimpleAsyncAudioInterface() + + # Define callback functions + async def on_agent_response(response: str): + print(f"Agent: {response}") + + async def on_user_transcript(transcript: str): + print(f"User: {transcript}") + + async def on_session_end(): + print("Conversation session ended") + + # Example 1: WebRTC conversation with conversation token + print("Example 1: WebRTC conversation with explicit token") + conversation_token = "your-conversation-token" # Get this from your server + + conversation = create_webrtc_conversation( + client=client, + agent_id="your-agent-id", + conversation_token=conversation_token, + audio_interface=audio_interface, + callback_agent_response=on_agent_response, + callback_user_transcript=on_user_transcript, + callback_end_session=on_session_end, + ) + + try: + # Start the conversation + await conversation.start_session() + print("WebRTC conversation started!") + + # Send a message to the agent + await conversation.send_user_message("Hello, how are you today?") + + # Keep the conversation running for a bit + await asyncio.sleep(10) + + # Send contextual information + await conversation.send_contextual_update("The user seems interested in learning about WebRTC") + + # Keep running for a bit more + await asyncio.sleep(5) + + finally: + # End the conversation + await conversation.end_session() + print("Conversation ended") + + +async def auto_fetch_token_example(): + """Example of WebRTC conversation with automatic token fetching.""" + + print("\nExample 2: WebRTC conversation with automatic token fetching") + + # Initialize the ElevenLabs client + client = ElevenLabs(api_key=os.getenv("ELEVENLABS_API_KEY")) + + # Create audio interface + audio_interface = SimpleAsyncAudioInterface() + + # Define callback functions + async def on_agent_response(response: str): + print(f"Agent: {response}") + + # Create WebRTC conversation without explicit token - it will be fetched automatically + conversation = create_webrtc_conversation( + client=client, + agent_id="your-agent-id", # Make sure this agent exists and you have access + audio_interface=audio_interface, + callback_agent_response=on_agent_response, + ) + + try: + # Start the conversation (token will be fetched automatically) + await conversation.start_session() + print("WebRTC conversation started with auto-fetched token!") + + # Send a message + await conversation.send_user_message("Tell me about WebRTC advantages over WebSockets") + + # Wait for response + await asyncio.sleep(10) + + except Exception as e: + print(f"Error in conversation: {e}") + + finally: + # End the conversation + await conversation.end_session() + + +async def compare_connection_types(): + """Example showing the difference between WebSocket and WebRTC connections.""" + + print("\nExample 3: Comparing connection types") + + from elevenlabs.conversational_ai.conversation_factory import create_conversation + + client = ElevenLabs(api_key=os.getenv("ELEVENLABS_API_KEY")) + + # WebSocket conversation (traditional) + print("Creating WebSocket conversation...") + ws_conversation = create_conversation( + client=client, + agent_id="your-agent-id", + connection_type=ConnectionType.WEBSOCKET, + # Note: WebSocket conversations use AudioInterface (sync), not AsyncAudioInterface + ) + print(f"WebSocket conversation type: {type(ws_conversation)}") + + # WebRTC conversation (new) + print("Creating WebRTC conversation...") + webrtc_conversation = create_conversation( + client=client, + agent_id="your-agent-id", + connection_type=ConnectionType.WEBRTC, + conversation_token="your-token", # Required for WebRTC + audio_interface=SimpleAsyncAudioInterface(), + ) + print(f"WebRTC conversation type: {type(webrtc_conversation)}") + + print("\nKey differences:") + print("- WebSocket: Uses sync AudioInterface, established WebSocket protocol") + print("- WebRTC: Uses AsyncAudioInterface, lower latency, real-time audio streaming") + + +if __name__ == "__main__": + # Check for required environment variables + if not os.getenv("ELEVENLABS_API_KEY"): + print("Please set the ELEVENLABS_API_KEY environment variable") + exit(1) + + print("ElevenLabs WebRTC Conversation Examples") + print("=" * 40) + + # Run the examples + asyncio.run(webrtc_conversation_example()) + asyncio.run(auto_fetch_token_example()) + asyncio.run(compare_connection_types()) + + print("\nAll examples completed!") \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 2696993b..12ae36ca 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -43,6 +43,7 @@ pydantic-core = ">=2.18.2" requests = ">=2.20" typing_extensions = ">= 4.0.0" websockets = ">=11.0" +livekit = ">=0.15.0" [tool.poetry.group.dev.dependencies] mypy = "==1.13.0" diff --git a/src/elevenlabs/conversational_ai/base_connection.py b/src/elevenlabs/conversational_ai/base_connection.py new file mode 100644 index 00000000..dde8a29d --- /dev/null +++ b/src/elevenlabs/conversational_ai/base_connection.py @@ -0,0 +1,59 @@ +from abc import ABC, abstractmethod +import asyncio +import json +from typing import Callable, Optional, Awaitable, Union, Any, Literal, Dict +from enum import Enum + + +class ConnectionType(str, Enum): + """Connection types available for conversations.""" + WEBSOCKET = "websocket" + WEBRTC = "webrtc" + + +class BaseConnection(ABC): + """Base class for conversation connections.""" + + def __init__(self): + self.conversation_id: Optional[str] = None + self._message_queue = [] + self._on_message_callback: Optional[Callable[[dict], Union[None, Awaitable[None]]]] = None + + @abstractmethod + async def connect(self) -> None: + """Establish the connection.""" + pass + + @abstractmethod + async def close(self) -> None: + """Close the connection.""" + pass + + @abstractmethod + async def send_message(self, message: dict) -> None: + """Send a message through the connection.""" + pass + + @abstractmethod + async def send_audio(self, audio_data: bytes) -> None: + """Send audio data through the connection.""" + pass + + def on_message(self, callback: Callable[[dict], Union[None, Awaitable[None]]]) -> None: + """Set the message callback.""" + self._on_message_callback = callback + # Process any queued messages + if self._message_queue: + for message in self._message_queue: + self._handle_message(message) + self._message_queue.clear() + + def _handle_message(self, message: dict) -> None: + """Handle incoming messages.""" + if self._on_message_callback: + if asyncio.iscoroutinefunction(self._on_message_callback): + asyncio.create_task(self._on_message_callback(message)) + else: + self._on_message_callback(message) + else: + self._message_queue.append(message) \ No newline at end of file diff --git a/src/elevenlabs/conversational_ai/connection_factory.py b/src/elevenlabs/conversational_ai/connection_factory.py new file mode 100644 index 00000000..9e0ca7de --- /dev/null +++ b/src/elevenlabs/conversational_ai/connection_factory.py @@ -0,0 +1,45 @@ +from typing import Optional + +from .base_connection import BaseConnection, ConnectionType +from .websocket_connection import WebSocketConnection +from .webrtc_connection import WebRTCConnection + + +def create_connection( + connection_type: ConnectionType, + *, + ws_url: Optional[str] = None, + conversation_token: Optional[str] = None, + agent_id: Optional[str] = None +) -> BaseConnection: + """Factory function to create connections based on type.""" + + if connection_type == ConnectionType.WEBSOCKET: + if not ws_url: + raise ValueError("ws_url is required for WebSocket connections") + return WebSocketConnection(ws_url) + + elif connection_type == ConnectionType.WEBRTC: + return WebRTCConnection(conversation_token=conversation_token, agent_id=agent_id) + + else: + raise ValueError(f"Unknown connection type: {connection_type}") + + +def determine_connection_type( + connection_type: Optional[ConnectionType] = None, + conversation_token: Optional[str] = None, + **kwargs +) -> ConnectionType: + """Determine the appropriate connection type based on parameters.""" + + # If connection_type is explicitly specified, use it + if connection_type: + return connection_type + + # If conversation_token is provided, use WebRTC + if conversation_token: + return ConnectionType.WEBRTC + + # Default to WebSocket for backward compatibility + return ConnectionType.WEBSOCKET \ No newline at end of file diff --git a/src/elevenlabs/conversational_ai/conversation.py b/src/elevenlabs/conversational_ai/conversation.py index dbc72c5b..eda1996b 100644 --- a/src/elevenlabs/conversational_ai/conversation.py +++ b/src/elevenlabs/conversational_ai/conversation.py @@ -13,6 +13,8 @@ from ..base_client import BaseElevenLabs from ..version import __version__ +from .base_connection import ConnectionType +from .connection_factory import create_connection, determine_connection_type class ClientToOrchestratorEvent(str, Enum): @@ -295,11 +297,15 @@ def __init__( conversation_config_override: Optional[dict] = None, dynamic_variables: Optional[dict] = None, user_id: Optional[str] = None, + connection_type: Optional[ConnectionType] = None, + conversation_token: Optional[str] = None, ): self.extra_body = extra_body or {} self.conversation_config_override = conversation_config_override or {} self.dynamic_variables = dynamic_variables or {} self.user_id = user_id + self.connection_type = connection_type + self.conversation_token = conversation_token class BaseConversation: @@ -339,6 +345,29 @@ def _get_signed_url(self): separator = "&" if "?" in signed_url else "?" return f"{signed_url}{separator}source=python_sdk&version={__version__}" + def _determine_connection_type(self) -> ConnectionType: + """Determine the appropriate connection type for this conversation.""" + return determine_connection_type( + connection_type=self.config.connection_type, + conversation_token=self.config.conversation_token + ) + + def _create_connection(self): + """Create the appropriate connection based on configuration.""" + connection_type = self._determine_connection_type() + + if connection_type == ConnectionType.WEBSOCKET: + ws_url = self._get_signed_url() if self.requires_auth else self._get_wss_url() + return create_connection(connection_type, ws_url=ws_url) + elif connection_type == ConnectionType.WEBRTC: + return create_connection( + connection_type, + conversation_token=self.config.conversation_token, + agent_id=self.agent_id + ) + else: + raise ValueError(f"Unsupported connection type: {connection_type}") + def _create_initiation_message(self): return json.dumps( { diff --git a/src/elevenlabs/conversational_ai/conversation_factory.py b/src/elevenlabs/conversational_ai/conversation_factory.py new file mode 100644 index 00000000..808ad853 --- /dev/null +++ b/src/elevenlabs/conversational_ai/conversation_factory.py @@ -0,0 +1,235 @@ +from typing import Optional, Callable, Awaitable, Union + +from ..base_client import BaseElevenLabs +from .conversation import ( + Conversation, + AsyncConversation, + AudioInterface, + AsyncAudioInterface, + ConversationInitiationData, + ClientTools +) +from .webrtc_conversation import WebRTCConversation +from .base_connection import ConnectionType + + +def create_conversation( + client: BaseElevenLabs, + agent_id: str, + user_id: Optional[str] = None, + *, + connection_type: ConnectionType = ConnectionType.WEBSOCKET, + conversation_token: Optional[str] = None, + requires_auth: bool = True, + audio_interface: Optional[Union[AudioInterface, AsyncAudioInterface]] = None, + config: Optional[ConversationInitiationData] = None, + client_tools: Optional[ClientTools] = None, + # Sync callbacks (for websocket conversations) + callback_agent_response: Optional[Callable[[str], None]] = None, + callback_agent_response_correction: Optional[Callable[[str, str], None]] = None, + callback_user_transcript: Optional[Callable[[str], None]] = None, + callback_latency_measurement: Optional[Callable[[int], None]] = None, + callback_end_session: Optional[Callable] = None, + # Async callbacks (for WebRTC and async websocket conversations) + async_callback_agent_response: Optional[Callable[[str], Awaitable[None]]] = None, + async_callback_agent_response_correction: Optional[Callable[[str, str], Awaitable[None]]] = None, + async_callback_user_transcript: Optional[Callable[[str], Awaitable[None]]] = None, + async_callback_latency_measurement: Optional[Callable[[int], Awaitable[None]]] = None, + async_callback_end_session: Optional[Callable[[], Awaitable[None]]] = None, +) -> Union[Conversation, AsyncConversation, WebRTCConversation]: + """Create a conversation with the specified connection type. + + Args: + client: ElevenLabs client instance + agent_id: ID of the agent to connect to + user_id: Optional user ID + connection_type: Type of connection (websocket or webrtc) + conversation_token: Token for WebRTC authentication + requires_auth: Whether authentication is required + audio_interface: Audio interface for the conversation + config: Conversation configuration + client_tools: Client tools for handling agent calls + callback_*: Synchronous callbacks for websocket conversations + async_callback_*: Asynchronous callbacks for WebRTC and async conversations + + Returns: + A conversation instance of the appropriate type + + Examples: + # WebSocket conversation (default) + conversation = create_conversation( + client=client, + agent_id="your-agent-id", + audio_interface=your_audio_interface + ) + + # WebRTC conversation + conversation = create_conversation( + client=client, + agent_id="your-agent-id", + connection_type=ConnectionType.WEBRTC, + conversation_token="your-token", # Optional, will fetch if not provided + audio_interface=your_async_audio_interface, + async_callback_agent_response=your_response_handler + ) + + # Public agent (no auth required) + conversation = create_conversation( + client=client, + agent_id="public-agent-id", + connection_type=ConnectionType.WEBRTC, + requires_auth=False, + audio_interface=your_async_audio_interface + ) + """ + + # Set up configuration + if config is None: + config = ConversationInitiationData() + + config.connection_type = connection_type + if conversation_token: + config.conversation_token = conversation_token + + if connection_type == ConnectionType.WEBRTC: + # Create WebRTC conversation + if not isinstance(audio_interface, AsyncAudioInterface) and audio_interface is not None: + raise ValueError("WebRTC conversations require an AsyncAudioInterface") + + return WebRTCConversation( + client=client, + agent_id=agent_id, + user_id=user_id, + conversation_token=conversation_token, + audio_interface=audio_interface, + config=config, + client_tools=client_tools, + callback_agent_response=async_callback_agent_response, + callback_agent_response_correction=async_callback_agent_response_correction, + callback_user_transcript=async_callback_user_transcript, + callback_latency_measurement=async_callback_latency_measurement, + callback_end_session=async_callback_end_session, + ) + + elif connection_type == ConnectionType.WEBSOCKET: + # Determine if we should use sync or async conversation + has_async_callbacks = any([ + async_callback_agent_response, + async_callback_agent_response_correction, + async_callback_user_transcript, + async_callback_latency_measurement, + async_callback_end_session, + ]) + + if has_async_callbacks or isinstance(audio_interface, AsyncAudioInterface): + # Use async conversation + return AsyncConversation( + client=client, + agent_id=agent_id, + user_id=user_id, + requires_auth=requires_auth, + audio_interface=audio_interface, + config=config, + client_tools=client_tools, + callback_agent_response=async_callback_agent_response, + callback_agent_response_correction=async_callback_agent_response_correction, + callback_user_transcript=async_callback_user_transcript, + callback_latency_measurement=async_callback_latency_measurement, + callback_end_session=async_callback_end_session, + ) + else: + # Use sync conversation + if not isinstance(audio_interface, AudioInterface) and audio_interface is not None: + raise ValueError("Synchronous WebSocket conversations require an AudioInterface") + + return Conversation( + client=client, + agent_id=agent_id, + user_id=user_id, + requires_auth=requires_auth, + audio_interface=audio_interface, + config=config, + client_tools=client_tools, + callback_agent_response=callback_agent_response, + callback_agent_response_correction=callback_agent_response_correction, + callback_user_transcript=callback_user_transcript, + callback_latency_measurement=callback_latency_measurement, + callback_end_session=callback_end_session, + ) + + else: + raise ValueError(f"Unsupported connection type: {connection_type}") + + +# Convenience functions for specific connection types + +def create_webrtc_conversation( + client: BaseElevenLabs, + agent_id: str, + user_id: Optional[str] = None, + *, + conversation_token: Optional[str] = None, + audio_interface: Optional[AsyncAudioInterface] = None, + config: Optional[ConversationInitiationData] = None, + client_tools: Optional[ClientTools] = None, + callback_agent_response: Optional[Callable[[str], Awaitable[None]]] = None, + callback_agent_response_correction: Optional[Callable[[str, str], Awaitable[None]]] = None, + callback_user_transcript: Optional[Callable[[str], Awaitable[None]]] = None, + callback_latency_measurement: Optional[Callable[[int], Awaitable[None]]] = None, + callback_end_session: Optional[Callable[[], Awaitable[None]]] = None, +) -> WebRTCConversation: + """Create a WebRTC conversation. + + Convenience function for creating WebRTC conversations with type safety. + """ + return create_conversation( + client=client, + agent_id=agent_id, + user_id=user_id, + connection_type=ConnectionType.WEBRTC, + conversation_token=conversation_token, + audio_interface=audio_interface, + config=config, + client_tools=client_tools, + async_callback_agent_response=callback_agent_response, + async_callback_agent_response_correction=callback_agent_response_correction, + async_callback_user_transcript=callback_user_transcript, + async_callback_latency_measurement=callback_latency_measurement, + async_callback_end_session=callback_end_session, + ) + + +def create_websocket_conversation( + client: BaseElevenLabs, + agent_id: str, + user_id: Optional[str] = None, + *, + requires_auth: bool = True, + audio_interface: Optional[AudioInterface] = None, + config: Optional[ConversationInitiationData] = None, + client_tools: Optional[ClientTools] = None, + callback_agent_response: Optional[Callable[[str], None]] = None, + callback_agent_response_correction: Optional[Callable[[str, str], None]] = None, + callback_user_transcript: Optional[Callable[[str], None]] = None, + callback_latency_measurement: Optional[Callable[[int], None]] = None, + callback_end_session: Optional[Callable] = None, +) -> Conversation: + """Create a WebSocket conversation. + + Convenience function for creating WebSocket conversations with type safety. + """ + return create_conversation( + client=client, + agent_id=agent_id, + user_id=user_id, + connection_type=ConnectionType.WEBSOCKET, + requires_auth=requires_auth, + audio_interface=audio_interface, + config=config, + client_tools=client_tools, + callback_agent_response=callback_agent_response, + callback_agent_response_correction=callback_agent_response_correction, + callback_user_transcript=callback_user_transcript, + callback_latency_measurement=callback_latency_measurement, + callback_end_session=callback_end_session, + ) \ No newline at end of file diff --git a/src/elevenlabs/conversational_ai/webrtc_connection.py b/src/elevenlabs/conversational_ai/webrtc_connection.py new file mode 100644 index 00000000..685b8470 --- /dev/null +++ b/src/elevenlabs/conversational_ai/webrtc_connection.py @@ -0,0 +1,132 @@ +import json +import asyncio +from typing import Optional, Dict, Any +import httpx +from livekit.rtc import Room, TrackKind + +from .base_connection import BaseConnection + + +class WebRTCConnection(BaseConnection): + """WebRTC-based connection for conversations using LiveKit.""" + + LIVEKIT_WS_URL = "wss://livekit.rtc.elevenlabs.io" + + def __init__(self, conversation_token: Optional[str] = None, agent_id: Optional[str] = None): + super().__init__() + self.conversation_token = conversation_token + self.agent_id = agent_id + self._room: Optional[Room] = None + self._is_connected = False + + async def connect(self) -> None: + """Establish the WebRTC connection using LiveKit.""" + # Get conversation token if not provided + if not self.conversation_token: + if not self.agent_id: + raise ValueError("Either conversation_token or agent_id is required for WebRTC connection") + self.conversation_token = await self._fetch_conversation_token() + + # Create room and connect + self._room = Room() + self._setup_room_callbacks() + + # Connect to LiveKit room + await self._room.connect(self.LIVEKIT_WS_URL, self.conversation_token) + self._is_connected = True + + # Set conversation ID from room name if available + if self._room.name: + self.conversation_id = self._room.name + else: + self.conversation_id = f"webrtc-{id(self)}" + + # Enable microphone + await self._room.local_participant.set_microphone_enabled(True) + + async def close(self) -> None: + """Close the WebRTC connection.""" + if self._room: + await self._room.disconnect() + self._room = None + self._is_connected = False + + async def send_message(self, message: dict) -> None: + """Send a message through WebRTC data channel.""" + if not self._is_connected or not self._room: + raise RuntimeError("WebRTC room not connected") + + # In WebRTC mode, audio is sent via published tracks, not data messages + if "user_audio_chunk" in message: + return # Audio is handled separately + + try: + data = json.dumps(message).encode('utf-8') + await self._room.local_participant.publish_data(data, reliable=True) + except Exception as e: + print(f"Failed to send message via WebRTC: {e}") + raise + + async def send_audio(self, audio_data: bytes) -> None: + """Send audio data through WebRTC (handled by published tracks).""" + # In WebRTC mode, audio is sent through the microphone track + # This method can be used for custom audio streaming if needed + pass + + async def _fetch_conversation_token(self) -> str: + """Fetch conversation token from ElevenLabs API.""" + if not self.agent_id: + raise ValueError("Agent ID is required to fetch conversation token") + + url = f"https://api.elevenlabs.io/v1/convai/conversation/token?agent_id={self.agent_id}" + + async with httpx.AsyncClient() as client: + response = await client.get(url) + + if not response.is_success: + raise Exception(f"Failed to fetch conversation token for agent {self.agent_id}: {response.status_code} {response.text}") + + data = response.json() + token = data.get("token") + + if not token: + raise Exception("No conversation token received from API") + + return token + + def _setup_room_callbacks(self) -> None: + """Setup LiveKit room event callbacks.""" + if not self._room: + return + + @self._room.on("connected") + def on_connected() -> None: + print("WebRTC room connected") + + @self._room.on("disconnected") + def on_disconnected(reason: Optional[str] = None) -> None: + self._is_connected = False + print(f"WebRTC room disconnected: {reason}") + + @self._room.on("data_received") + def on_data_received(data: bytes, participant) -> None: + try: + message = json.loads(data.decode('utf-8')) + + # Filter out audio messages for WebRTC - they're handled via audio tracks + if message.get("type") == "audio": + return + + self._handle_message(message) + except (json.JSONDecodeError, UnicodeDecodeError) as e: + print(f"Failed to parse incoming data message: {e}") + + @self._room.on("track_subscribed") + def on_track_subscribed(track, publication, participant) -> None: + if track.kind == TrackKind.KIND_AUDIO and "agent" in participant.identity: + # Handle incoming agent audio + print("Subscribed to agent audio track") + + def get_room(self) -> Optional[Room]: + """Get the LiveKit room instance for advanced usage.""" + return self._room \ No newline at end of file diff --git a/src/elevenlabs/conversational_ai/webrtc_conversation.py b/src/elevenlabs/conversational_ai/webrtc_conversation.py new file mode 100644 index 00000000..eee05699 --- /dev/null +++ b/src/elevenlabs/conversational_ai/webrtc_conversation.py @@ -0,0 +1,241 @@ +import asyncio +import json +import base64 +from typing import Optional, Callable, Awaitable + +from ..base_client import BaseElevenLabs +from .conversation import ( + BaseConversation, + ConversationInitiationData, + AsyncAudioInterface, + ClientTools +) +from .base_connection import ConnectionType +from .webrtc_connection import WebRTCConnection + + +class WebRTCConversation(BaseConversation): + """WebRTC-based conversational AI session using LiveKit. + + This class provides WebRTC connectivity for real-time audio conversations + with ElevenLabs agents, offering lower latency compared to WebSocket connections. + """ + + def __init__( + self, + client: BaseElevenLabs, + agent_id: str, + user_id: Optional[str] = None, + *, + conversation_token: Optional[str] = None, + audio_interface: Optional[AsyncAudioInterface] = None, + config: Optional[ConversationInitiationData] = None, + client_tools: Optional[ClientTools] = None, + callback_agent_response: Optional[Callable[[str], Awaitable[None]]] = None, + callback_agent_response_correction: Optional[Callable[[str, str], Awaitable[None]]] = None, + callback_user_transcript: Optional[Callable[[str], Awaitable[None]]] = None, + callback_latency_measurement: Optional[Callable[[int], Awaitable[None]]] = None, + callback_end_session: Optional[Callable[[], Awaitable[None]]] = None, + ): + """Initialize a WebRTC conversation. + + Args: + client: The ElevenLabs client to use for the conversation. + agent_id: The ID of the agent to converse with. + user_id: The ID of the user conversing with the agent. + conversation_token: Token for WebRTC authentication. If not provided, + will be fetched using the agent_id. + audio_interface: The async audio interface to use for input and output. + config: Configuration for the conversation. + client_tools: Client tools for handling agent tool calls. + callback_agent_response: Async callback for agent responses. + callback_agent_response_correction: Async callback for response corrections. + callback_user_transcript: Async callback for user transcripts. + callback_latency_measurement: Async callback for latency measurements. + callback_end_session: Async callback for when session ends. + """ + + # Set up configuration with WebRTC specifics + if config is None: + config = ConversationInitiationData() + config.connection_type = ConnectionType.WEBRTC + config.conversation_token = conversation_token + + super().__init__( + client=client, + agent_id=agent_id, + user_id=user_id, + requires_auth=True, # WebRTC requires authentication + config=config, + client_tools=client_tools, + ) + + self.audio_interface = audio_interface + self.callback_agent_response = callback_agent_response + self.callback_agent_response_correction = callback_agent_response_correction + self.callback_user_transcript = callback_user_transcript + self.callback_latency_measurement = callback_latency_measurement + self.callback_end_session = callback_end_session + + self._connection: Optional[WebRTCConnection] = None + self._should_stop = asyncio.Event() + self._session_task: Optional[asyncio.Task] = None + + async def start_session(self): + """Start the WebRTC conversation session.""" + try: + # Create WebRTC connection + self._connection = WebRTCConnection( + conversation_token=self.config.conversation_token, + agent_id=self.agent_id + ) + + # Set up message handler + self._connection.on_message(self._handle_message) + + # Connect + await self._connection.connect() + + # Send initiation message + initiation_message = json.loads(self._create_initiation_message()) + await self._connection.send_message(initiation_message) + + # Update conversation ID + self._conversation_id = self._connection.conversation_id + + # Start audio interface if provided + if self.audio_interface: + await self.audio_interface.start(self._audio_input_callback) + + print(f"WebRTC conversation started with ID: {self._conversation_id}") + + except Exception as e: + print(f"Failed to start WebRTC session: {e}") + raise + + async def end_session(self): + """End the WebRTC conversation session.""" + self._should_stop.set() + + if self.audio_interface: + await self.audio_interface.stop() + + if self._connection: + await self._connection.close() + self._connection = None + + self.client_tools.stop() + + if self.callback_end_session: + await self.callback_end_session() + + async def send_user_message(self, text: str): + """Send a text message from the user to the agent.""" + if not self._connection: + raise RuntimeError("Session not started") + + message = { + "type": "user_message", + "text": text + } + await self._connection.send_message(message) + + async def send_contextual_update(self, text: str): + """Send a contextual update to the conversation.""" + if not self._connection: + raise RuntimeError("Session not started") + + message = { + "type": "contextual_update", + "text": text + } + await self._connection.send_message(message) + + async def register_user_activity(self): + """Register user activity to prevent session timeout.""" + if not self._connection: + raise RuntimeError("Session not started") + + message = { + "type": "user_activity" + } + await self._connection.send_message(message) + + async def _audio_input_callback(self, audio_data: bytes): + """Handle audio input from the audio interface.""" + if self._connection and not self._should_stop.is_set(): + # For WebRTC, audio is sent through the room's microphone track + # This callback can be used for custom processing if needed + pass + + async def _handle_message(self, message: dict): + """Handle incoming messages from the WebRTC connection.""" + try: + message_type = message.get("type") + + if message_type == "conversation_initiation_metadata": + event = message["conversation_initiation_metadata_event"] + if not self._conversation_id: + self._conversation_id = event["conversation_id"] + + elif message_type == "audio": + # Audio is handled through WebRTC audio tracks, not data messages + pass + + elif message_type == "agent_response": + if self.callback_agent_response: + event = message["agent_response_event"] + await self.callback_agent_response(event["agent_response"].strip()) + + elif message_type == "agent_response_correction": + if self.callback_agent_response_correction: + event = message["agent_response_correction_event"] + await self.callback_agent_response_correction( + event["original_agent_response"].strip(), + event["corrected_agent_response"].strip() + ) + + elif message_type == "user_transcript": + if self.callback_user_transcript: + event = message["user_transcription_event"] + await self.callback_user_transcript(event["user_transcript"].strip()) + + elif message_type == "interruption": + if self.audio_interface: + await self.audio_interface.interrupt() + + elif message_type == "ping": + event = message["ping_event"] + # Send pong response + pong_message = { + "type": "pong", + "event_id": event["event_id"] + } + await self._connection.send_message(pong_message) + + if self.callback_latency_measurement and event.get("ping_ms"): + await self.callback_latency_measurement(int(event["ping_ms"])) + + elif message_type == "client_tool_call": + tool_call = message.get("client_tool_call", {}) + tool_name = tool_call.get("tool_name") + parameters = { + "tool_call_id": tool_call["tool_call_id"], + **tool_call.get("parameters", {}) + } + + # Execute tool asynchronously + async def send_response(response): + if not self._should_stop.is_set(): + await self._connection.send_message(response) + + self.client_tools.execute_tool(tool_name, parameters, send_response) + + except Exception as e: + print(f"Error handling message: {e}") + + def get_webrtc_room(self): + """Get the underlying LiveKit room for advanced WebRTC operations.""" + if self._connection: + return self._connection.get_room() + return None \ No newline at end of file diff --git a/src/elevenlabs/conversational_ai/websocket_connection.py b/src/elevenlabs/conversational_ai/websocket_connection.py new file mode 100644 index 00000000..5b20579f --- /dev/null +++ b/src/elevenlabs/conversational_ai/websocket_connection.py @@ -0,0 +1,57 @@ +import json +import base64 +from typing import Optional +import websockets +from websockets.exceptions import ConnectionClosedOK + +from .base_connection import BaseConnection + + +class WebSocketConnection(BaseConnection): + """WebSocket-based connection for conversations.""" + + def __init__(self, ws_url: str): + super().__init__() + self.ws_url = ws_url + self._ws: Optional[websockets.WebSocketClientProtocol] = None + + async def connect(self) -> None: + """Establish the WebSocket connection.""" + self._ws = await websockets.connect(self.ws_url, max_size=16 * 1024 * 1024) + + async def close(self) -> None: + """Close the WebSocket connection.""" + if self._ws: + await self._ws.close() + self._ws = None + + async def send_message(self, message: dict) -> None: + """Send a message through the WebSocket.""" + if not self._ws: + raise RuntimeError("WebSocket not connected") + await self._ws.send(json.dumps(message)) + + async def send_audio(self, audio_data: bytes) -> None: + """Send audio data through the WebSocket.""" + if not self._ws: + raise RuntimeError("WebSocket not connected") + + message = { + "user_audio_chunk": base64.b64encode(audio_data).decode() + } + await self._ws.send(json.dumps(message)) + + async def receive_messages(self) -> None: + """Receive and handle messages from the WebSocket.""" + if not self._ws: + return + + try: + async for message_str in self._ws: + try: + message = json.loads(message_str) + self._handle_message(message) + except json.JSONDecodeError: + continue + except ConnectionClosedOK: + pass \ No newline at end of file diff --git a/tests/test_webrtc_conversation.py b/tests/test_webrtc_conversation.py new file mode 100644 index 00000000..a46b10b8 --- /dev/null +++ b/tests/test_webrtc_conversation.py @@ -0,0 +1,226 @@ +import pytest +import asyncio +from unittest.mock import Mock, AsyncMock, patch + +from elevenlabs.conversational_ai.base_connection import ConnectionType +from elevenlabs.conversational_ai.conversation_factory import ( + create_conversation, + create_webrtc_conversation, + create_websocket_conversation +) +from elevenlabs.conversational_ai.webrtc_conversation import WebRTCConversation +from elevenlabs.conversational_ai.conversation import Conversation, AsyncConversation +from elevenlabs.conversational_ai.webrtc_connection import WebRTCConnection + + +class TestWebRTCConversation: + """Test WebRTC conversation functionality.""" + + @pytest.fixture + def mock_client(self): + """Create a mock ElevenLabs client.""" + return Mock() + + @pytest.fixture + def mock_audio_interface(self): + """Create a mock async audio interface.""" + from elevenlabs.conversational_ai.conversation import AsyncAudioInterface + interface = Mock(spec=AsyncAudioInterface) + interface.start = AsyncMock() + interface.stop = AsyncMock() + interface.output = AsyncMock() + interface.interrupt = AsyncMock() + return interface + + def test_connection_type_determination(self): + """Test that connection types are determined correctly.""" + from elevenlabs.conversational_ai.connection_factory import determine_connection_type + + # Default should be websocket + assert determine_connection_type() == ConnectionType.WEBSOCKET + + # Explicit connection type should be respected + assert determine_connection_type(ConnectionType.WEBRTC) == ConnectionType.WEBRTC + + # Conversation token should imply WebRTC + assert determine_connection_type(conversation_token="token") == ConnectionType.WEBRTC + + # Explicit type should override token inference + assert determine_connection_type( + ConnectionType.WEBSOCKET, + conversation_token="token" + ) == ConnectionType.WEBSOCKET + + def test_factory_creates_correct_conversation_types(self, mock_client): + """Test that the factory creates the correct conversation types.""" + # WebRTC conversation + webrtc_conv = create_conversation( + client=mock_client, + agent_id="test-agent", + connection_type=ConnectionType.WEBRTC + ) + assert isinstance(webrtc_conv, WebRTCConversation) + + # WebSocket conversation (sync) + ws_conv = create_conversation( + client=mock_client, + agent_id="test-agent", + connection_type=ConnectionType.WEBSOCKET + ) + assert isinstance(ws_conv, (Conversation, AsyncConversation)) + + def test_convenience_functions(self, mock_client, mock_audio_interface): + """Test convenience functions for creating conversations.""" + # WebRTC convenience function with conversation token to avoid HTTP calls + with patch('elevenlabs.conversational_ai.webrtc_connection.Room') as mock_room_class: + mock_room = Mock() + mock_room.connect = AsyncMock() + mock_room.disconnect = AsyncMock() + mock_room.local_participant = Mock() + mock_room.local_participant.set_microphone_enabled = AsyncMock() + mock_room.name = "test-room" + mock_room_class.return_value = mock_room + + webrtc_conv = create_webrtc_conversation( + client=mock_client, + agent_id="test-agent", + conversation_token="test-token", + audio_interface=mock_audio_interface + ) + assert isinstance(webrtc_conv, WebRTCConversation) + + # WebSocket convenience function + ws_conv = create_websocket_conversation( + client=mock_client, + agent_id="test-agent" + ) + assert isinstance(ws_conv, Conversation) + + @pytest.mark.asyncio + async def test_webrtc_conversation_lifecycle(self, mock_client, mock_audio_interface): + """Test WebRTC conversation lifecycle.""" + with patch('elevenlabs.conversational_ai.webrtc_connection.Room') as mock_room_class: + # Mock room instance + mock_room = Mock() + mock_room.connect = AsyncMock() + mock_room.disconnect = AsyncMock() + mock_room.local_participant = Mock() + mock_room.local_participant.set_microphone_enabled = AsyncMock() + mock_room.local_participant.publish_data = AsyncMock() + mock_room.name = "test-room" + mock_room_class.return_value = mock_room + + # Create conversation with a conversation token to avoid HTTP calls + conversation = WebRTCConversation( + client=mock_client, + agent_id="test-agent", + conversation_token="test-token", # Provide token to avoid fetching + audio_interface=mock_audio_interface + ) + + # Test start session + await conversation.start_session() + mock_room.connect.assert_called_once() + mock_audio_interface.start.assert_called_once() + + # Test end session + await conversation.end_session() + mock_room.disconnect.assert_called_once() + mock_audio_interface.stop.assert_called_once() + + @pytest.mark.asyncio + async def test_webrtc_conversation_messaging(self, mock_client): + """Test WebRTC conversation messaging functionality.""" + with patch('elevenlabs.conversational_ai.webrtc_connection.Room') as mock_room_class: + # Mock room instance + mock_room = Mock() + mock_room.connect = AsyncMock() + mock_room.disconnect = AsyncMock() + mock_room.local_participant = Mock() + mock_room.local_participant.set_microphone_enabled = AsyncMock() + mock_room.local_participant.publish_data = AsyncMock() + mock_room.name = "test-room" + mock_room_class.return_value = mock_room + + # Create conversation with a conversation token to avoid HTTP calls + conversation = WebRTCConversation( + client=mock_client, + agent_id="test-agent", + conversation_token="test-token" # Provide token to avoid fetching + ) + + # Start session + await conversation.start_session() + + # Test sending user message + await conversation.send_user_message("Hello, agent!") + # WebRTC messages are sent via publish_data + assert mock_room.local_participant.publish_data.called + + # Test sending contextual update + await conversation.send_contextual_update("Context update") + assert mock_room.local_participant.publish_data.called + + # Test registering user activity + await conversation.register_user_activity() + assert mock_room.local_participant.publish_data.called + + def test_webrtc_connection_creation(self): + """Test WebRTC connection creation and configuration.""" + # Test with conversation token + connection = WebRTCConnection(conversation_token="test-token") + assert connection.conversation_token == "test-token" + + # Test with agent ID + connection = WebRTCConnection(agent_id="test-agent") + assert connection.agent_id == "test-agent" + + @pytest.mark.asyncio + async def test_webrtc_connection_token_fetch(self): + """Test fetching conversation token from API.""" + with patch('httpx.AsyncClient') as mock_client_class: + mock_client = AsyncMock() + mock_response = Mock() + mock_response.is_success = True + mock_response.json.return_value = {"token": "fetched-token"} + mock_client.get.return_value = mock_response + mock_client_class.return_value.__aenter__.return_value = mock_client + + connection = WebRTCConnection(agent_id="test-agent") + token = await connection._fetch_conversation_token() + + assert token == "fetched-token" + mock_client.get.assert_called_once_with( + "https://api.elevenlabs.io/v1/convai/conversation/token?agent_id=test-agent" + ) + + @pytest.mark.asyncio + async def test_webrtc_connection_token_fetch_error(self): + """Test error handling when fetching conversation token.""" + with patch('httpx.AsyncClient') as mock_client_class: + mock_client = AsyncMock() + mock_response = Mock() + mock_response.is_success = False + mock_response.status_code = 404 + mock_response.text = "Not Found" + mock_client.get.return_value = mock_response + mock_client_class.return_value.__aenter__.return_value = mock_client + + connection = WebRTCConnection(agent_id="test-agent") + + with pytest.raises(Exception, match="Failed to fetch conversation token"): + await connection._fetch_conversation_token() + + def test_factory_validation(self, mock_client): + """Test validation in factory functions.""" + from elevenlabs.conversational_ai.conversation import AudioInterface + + # Should raise error for wrong audio interface type with WebRTC + sync_audio = Mock(spec=AudioInterface) + with pytest.raises(ValueError, match="WebRTC conversations require an AsyncAudioInterface"): + create_conversation( + client=mock_client, + agent_id="test-agent", + connection_type=ConnectionType.WEBRTC, + audio_interface=sync_audio + ) \ No newline at end of file From 3ecb311b1073416cf45971097d0e42e9705e95bd Mon Sep 17 00:00:00 2001 From: Angelo Giacco Date: Tue, 16 Sep 2025 13:27:13 +0100 Subject: [PATCH 02/20] readme --- README.md | 100 ++++++++++++++++++++ WEBRTC_CHANGES.md | 229 ---------------------------------------------- 2 files changed, 100 insertions(+), 229 deletions(-) delete mode 100644 WEBRTC_CHANGES.md diff --git a/README.md b/README.md index f023d00c..6b364e9e 100644 --- a/README.md +++ b/README.md @@ -253,6 +253,106 @@ client_tools.register("calculate_sum", calculate_sum, is_async=False) client_tools.register("fetch_data", fetch_data, is_async=True) ``` +### WebRTC Support + +ElevenLabs Python SDK supports WebRTC connections for real-time, low-latency conversations using LiveKit infrastructure. WebRTC provides better audio quality, lower latency, and improved connectivity compared to traditional WebSocket connections. + +#### Key Benefits +- **Lower Latency**: Direct peer-to-peer audio streaming +- **Better Audio Quality**: Optimized for real-time audio +- **Improved Connectivity**: NAT traversal and firewall handling +- **Adaptive Bitrate**: Automatic quality adjustment based on network conditions + +#### Basic WebRTC Usage + +```python +import asyncio +from elevenlabs import ElevenLabs +from elevenlabs.conversational_ai.conversation_factory import create_webrtc_conversation +from elevenlabs.conversational_ai.conversation import AsyncAudioInterface + +class SimpleAsyncAudioInterface(AsyncAudioInterface): + async def start(self, input_callback): + print("Audio interface started") + self.input_callback = input_callback + + async def stop(self): + print("Audio interface stopped") + + async def output(self, audio: bytes): + print(f"Received audio: {len(audio)} bytes") + + async def interrupt(self): + print("Audio interrupted") + +async def main(): + client = ElevenLabs(api_key="YOUR_API_KEY") + audio_interface = SimpleAsyncAudioInterface() + + # WebRTC conversation with automatic token fetching + conversation = create_webrtc_conversation( + client=client, + agent_id="your-agent-id", + audio_interface=audio_interface, + ) + + await conversation.start_session() + await conversation.send_user_message("Hello!") + await asyncio.sleep(10) + await conversation.end_session() + +asyncio.run(main()) +``` + +#### Connection Type Comparison + +```python +from elevenlabs.conversational_ai.conversation_factory import create_conversation +from elevenlabs.conversational_ai.base_connection import ConnectionType + +# WebSocket (existing) +ws_conversation = create_conversation( + client=client, + agent_id="your-agent-id", + connection_type=ConnectionType.WEBSOCKET, + # Uses sync AudioInterface +) + +# WebRTC (new) +webrtc_conversation = create_conversation( + client=client, + agent_id="your-agent-id", + connection_type=ConnectionType.WEBRTC, + audio_interface=AsyncAudioInterface(), # Async interface required +) +``` + +#### Authentication Methods + +WebRTC conversations support multiple authentication approaches: + +1. **Automatic Token Fetching**: Provide only `agent_id` and the SDK fetches the conversation token automatically +2. **Explicit Token**: Provide both `agent_id` and `conversation_token` for manual token management + +```python +# Automatic token fetching (recommended) +conversation = create_webrtc_conversation( + client=client, + agent_id="your-agent-id", + audio_interface=audio_interface +) + +# Explicit token +conversation = create_webrtc_conversation( + client=client, + agent_id="your-agent-id", + conversation_token="your-conversation-token", + audio_interface=audio_interface +) +``` + +**Requirements**: WebRTC conversations require the `livekit` dependency, which is automatically installed with the SDK. All WebRTC conversations must use `AsyncAudioInterface` implementations. + ## Languages Supported Explore [all models & languages](https://elevenlabs.io/docs/models). diff --git a/WEBRTC_CHANGES.md b/WEBRTC_CHANGES.md deleted file mode 100644 index 30ab0885..00000000 --- a/WEBRTC_CHANGES.md +++ /dev/null @@ -1,229 +0,0 @@ -# WebRTC Support Implementation for ElevenLabs Python SDK - -This document summarizes the WebRTC support implementation added to the ElevenLabs Python SDK, following the same architecture as the JavaScript SDK. - -## Overview - -WebRTC support has been added to enable real-time, low-latency conversations with ElevenLabs agents using the LiveKit WebRTC infrastructure. This provides an alternative to the existing WebSocket-based connections with improved performance for real-time audio applications. - -## Files Added - -### Core Implementation - -1. **`src/elevenlabs/conversational_ai/base_connection.py`** - - Abstract base class for all connection types - - Defines the common interface for WebSocket and WebRTC connections - - Includes `ConnectionType` enum with `WEBSOCKET` and `WEBRTC` options - -2. **`src/elevenlabs/conversational_ai/websocket_connection.py`** - - WebSocket connection implementation extending `BaseConnection` - - Maintains existing WebSocket functionality in the new architecture - -3. **`src/elevenlabs/conversational_ai/webrtc_connection.py`** - - WebRTC connection implementation using LiveKit Python SDK - - Handles LiveKit room management, audio tracks, and data channels - - Supports automatic conversation token fetching from ElevenLabs API - -4. **`src/elevenlabs/conversational_ai/connection_factory.py`** - - Factory functions for creating connections based on type - - Includes logic for determining connection type based on parameters - -5. **`src/elevenlabs/conversational_ai/webrtc_conversation.py`** - - WebRTC-specific conversation class extending `BaseConversation` - - Provides async interface for WebRTC conversations - - Integrates with LiveKit for real-time audio streaming - -6. **`src/elevenlabs/conversational_ai/conversation_factory.py`** - - High-level factory functions for creating different conversation types - - Includes convenience functions `create_webrtc_conversation()` and `create_websocket_conversation()` - - Provides unified `create_conversation()` function with connection type selection - -### Testing - -7. **`tests/test_webrtc_conversation.py`** - - Comprehensive test suite for WebRTC functionality - - Tests connection type determination, factory functions, and conversation lifecycle - - Includes mocked LiveKit integration tests - -### Examples - -8. **`examples/webrtc_conversation_example.py`** - - Complete working examples of WebRTC conversation usage - - Shows both explicit token and automatic token fetching approaches - - Demonstrates the differences between WebSocket and WebRTC connections - -## Files Modified - -### Dependencies - -1. **`pyproject.toml`** - - Added `livekit = ">=0.15.0"` dependency for WebRTC support - -### Core Conversation Module - -2. **`src/elevenlabs/conversational_ai/conversation.py`** - - Updated `ConversationInitiationData` to include `connection_type` and `conversation_token` parameters - - Added imports for the new connection system - - Added helper methods `_determine_connection_type()` and `_create_connection()` to `BaseConversation` - -## Key Features - -### Connection Types - -- **WebSocket (existing)**: Traditional WebSocket-based connections -- **WebRTC (new)**: Real-time connections using LiveKit infrastructure - -### Authentication Methods - -- **Agent ID**: For public agents, no additional authentication required -- **Conversation Token**: For private agents, obtained from ElevenLabs API -- **Automatic Token Fetching**: SDK can automatically fetch tokens when agent ID is provided - -### API Design - -The implementation follows the same patterns as the JavaScript SDK: - -```python -# WebRTC conversation with explicit token -conversation = create_webrtc_conversation( - client=client, - agent_id="your-agent-id", - conversation_token="your-token", - audio_interface=async_audio_interface, - callback_agent_response=on_response -) - -# WebRTC conversation with automatic token fetching -conversation = create_webrtc_conversation( - client=client, - agent_id="your-agent-id", # Token will be fetched automatically - audio_interface=async_audio_interface -) - -# Generic factory with connection type -conversation = create_conversation( - client=client, - agent_id="your-agent-id", - connection_type=ConnectionType.WEBRTC, - audio_interface=async_audio_interface -) -``` - -### Backward Compatibility - -- All existing WebSocket-based conversation code continues to work unchanged -- New connection types are opt-in through explicit parameters -- Default behavior remains WebSocket connections - -## Technical Architecture - -### Connection Hierarchy - -``` -BaseConnection (abstract) -├── WebSocketConnection -└── WebRTCConnection (uses LiveKit) -``` - -### Conversation Hierarchy - -``` -BaseConversation -├── Conversation (sync WebSocket) -├── AsyncConversation (async WebSocket) -└── WebRTCConversation (async WebRTC) -``` - -### Factory Pattern - -The implementation uses factory functions to create appropriate conversation types based on: -- Explicit connection type parameter -- Presence of conversation token (implies WebRTC) -- Audio interface type (sync vs async) -- Callback function types (sync vs async) - -## Benefits of WebRTC Implementation - -1. **Lower Latency**: Direct peer-to-peer audio streaming -2. **Better Audio Quality**: Optimized for real-time audio -3. **Reduced Server Load**: Audio doesn't go through application servers -4. **Adaptive Bitrate**: Automatic quality adjustment based on network conditions -5. **Better Connectivity**: NAT traversal and firewall handling - -## Usage Examples - -### Basic WebRTC Conversation - -```python -import asyncio -from elevenlabs import ElevenLabs -from elevenlabs.conversational_ai.conversation_factory import create_webrtc_conversation - -async def main(): - client = ElevenLabs(api_key="your-api-key") - - conversation = create_webrtc_conversation( - client=client, - agent_id="your-agent-id", - audio_interface=YourAsyncAudioInterface(), - ) - - await conversation.start_session() - await conversation.send_user_message("Hello!") - # ... conversation logic - await conversation.end_session() - -asyncio.run(main()) -``` - -### Connection Type Comparison - -```python -# WebSocket (existing) -ws_conversation = create_conversation( - client=client, - agent_id="agent-id", - connection_type=ConnectionType.WEBSOCKET, - audio_interface=SyncAudioInterface() # Sync interface -) - -# WebRTC (new) -webrtc_conversation = create_conversation( - client=client, - agent_id="agent-id", - connection_type=ConnectionType.WEBRTC, - audio_interface=AsyncAudioInterface() # Async interface required -) -``` - -## Testing - -The implementation includes comprehensive tests covering: - -- Connection type determination logic -- Factory function behavior -- WebRTC conversation lifecycle -- Message handling -- Error conditions -- Token fetching - -All tests use proper mocking to avoid external dependencies during testing. - -## Future Considerations - -1. **Audio Interface Implementations**: Additional concrete audio interface implementations for common use cases -2. **Advanced WebRTC Features**: Support for video, screen sharing, or advanced audio processing -3. **Monitoring and Analytics**: Integration with LiveKit's monitoring features -4. **Connection Fallback**: Automatic fallback from WebRTC to WebSocket in case of connection issues - -## Migration Guide - -For users wanting to upgrade from WebSocket to WebRTC: - -1. Install the updated SDK with `livekit` dependency -2. Update audio interface to async (`AsyncAudioInterface`) -3. Update callback functions to async -4. Change connection type to `ConnectionType.WEBRTC` -5. Provide conversation token or agent ID for authentication - -The migration is non-breaking - existing code continues to work without changes. \ No newline at end of file From 62937564ad5996c354586931e1f8ac081a512c07 Mon Sep 17 00:00:00 2001 From: Angelo Giacco Date: Tue, 16 Sep 2025 13:27:53 +0100 Subject: [PATCH 03/20] rm --- examples/webrtc_conversation_example.py | 189 ------------------------ 1 file changed, 189 deletions(-) delete mode 100644 examples/webrtc_conversation_example.py diff --git a/examples/webrtc_conversation_example.py b/examples/webrtc_conversation_example.py deleted file mode 100644 index 4f73a5ee..00000000 --- a/examples/webrtc_conversation_example.py +++ /dev/null @@ -1,189 +0,0 @@ -#!/usr/bin/env python3 -""" -Example demonstrating WebRTC conversation support in the ElevenLabs Python SDK. - -This example shows how to use the new WebRTC connection type for real-time -conversations with ElevenLabs agents using LiveKit. -""" - -import asyncio -import os -from elevenlabs import ElevenLabs -from elevenlabs.conversational_ai.conversation_factory import create_webrtc_conversation -from elevenlabs.conversational_ai.conversation import AsyncAudioInterface -from elevenlabs.conversational_ai.base_connection import ConnectionType - - -class SimpleAsyncAudioInterface(AsyncAudioInterface): - """A simple example audio interface for WebRTC conversations.""" - - async def start(self, input_callback): - """Start the audio interface with input callback.""" - print("Audio interface started") - self.input_callback = input_callback - # In a real implementation, you would set up audio capture here - - async def stop(self): - """Stop the audio interface.""" - print("Audio interface stopped") - # In a real implementation, you would clean up audio resources here - - async def output(self, audio: bytes): - """Output audio to the user.""" - print(f"Received audio output: {len(audio)} bytes") - # In a real implementation, you would play the audio here - - async def interrupt(self): - """Handle interruption signal.""" - print("Audio output interrupted") - # In a real implementation, you would stop current audio playback here - - -async def webrtc_conversation_example(): - """Example of using WebRTC conversation with ElevenLabs.""" - - # Initialize the ElevenLabs client - client = ElevenLabs(api_key=os.getenv("ELEVENLABS_API_KEY")) - - # Create audio interface - audio_interface = SimpleAsyncAudioInterface() - - # Define callback functions - async def on_agent_response(response: str): - print(f"Agent: {response}") - - async def on_user_transcript(transcript: str): - print(f"User: {transcript}") - - async def on_session_end(): - print("Conversation session ended") - - # Example 1: WebRTC conversation with conversation token - print("Example 1: WebRTC conversation with explicit token") - conversation_token = "your-conversation-token" # Get this from your server - - conversation = create_webrtc_conversation( - client=client, - agent_id="your-agent-id", - conversation_token=conversation_token, - audio_interface=audio_interface, - callback_agent_response=on_agent_response, - callback_user_transcript=on_user_transcript, - callback_end_session=on_session_end, - ) - - try: - # Start the conversation - await conversation.start_session() - print("WebRTC conversation started!") - - # Send a message to the agent - await conversation.send_user_message("Hello, how are you today?") - - # Keep the conversation running for a bit - await asyncio.sleep(10) - - # Send contextual information - await conversation.send_contextual_update("The user seems interested in learning about WebRTC") - - # Keep running for a bit more - await asyncio.sleep(5) - - finally: - # End the conversation - await conversation.end_session() - print("Conversation ended") - - -async def auto_fetch_token_example(): - """Example of WebRTC conversation with automatic token fetching.""" - - print("\nExample 2: WebRTC conversation with automatic token fetching") - - # Initialize the ElevenLabs client - client = ElevenLabs(api_key=os.getenv("ELEVENLABS_API_KEY")) - - # Create audio interface - audio_interface = SimpleAsyncAudioInterface() - - # Define callback functions - async def on_agent_response(response: str): - print(f"Agent: {response}") - - # Create WebRTC conversation without explicit token - it will be fetched automatically - conversation = create_webrtc_conversation( - client=client, - agent_id="your-agent-id", # Make sure this agent exists and you have access - audio_interface=audio_interface, - callback_agent_response=on_agent_response, - ) - - try: - # Start the conversation (token will be fetched automatically) - await conversation.start_session() - print("WebRTC conversation started with auto-fetched token!") - - # Send a message - await conversation.send_user_message("Tell me about WebRTC advantages over WebSockets") - - # Wait for response - await asyncio.sleep(10) - - except Exception as e: - print(f"Error in conversation: {e}") - - finally: - # End the conversation - await conversation.end_session() - - -async def compare_connection_types(): - """Example showing the difference between WebSocket and WebRTC connections.""" - - print("\nExample 3: Comparing connection types") - - from elevenlabs.conversational_ai.conversation_factory import create_conversation - - client = ElevenLabs(api_key=os.getenv("ELEVENLABS_API_KEY")) - - # WebSocket conversation (traditional) - print("Creating WebSocket conversation...") - ws_conversation = create_conversation( - client=client, - agent_id="your-agent-id", - connection_type=ConnectionType.WEBSOCKET, - # Note: WebSocket conversations use AudioInterface (sync), not AsyncAudioInterface - ) - print(f"WebSocket conversation type: {type(ws_conversation)}") - - # WebRTC conversation (new) - print("Creating WebRTC conversation...") - webrtc_conversation = create_conversation( - client=client, - agent_id="your-agent-id", - connection_type=ConnectionType.WEBRTC, - conversation_token="your-token", # Required for WebRTC - audio_interface=SimpleAsyncAudioInterface(), - ) - print(f"WebRTC conversation type: {type(webrtc_conversation)}") - - print("\nKey differences:") - print("- WebSocket: Uses sync AudioInterface, established WebSocket protocol") - print("- WebRTC: Uses AsyncAudioInterface, lower latency, real-time audio streaming") - - -if __name__ == "__main__": - # Check for required environment variables - if not os.getenv("ELEVENLABS_API_KEY"): - print("Please set the ELEVENLABS_API_KEY environment variable") - exit(1) - - print("ElevenLabs WebRTC Conversation Examples") - print("=" * 40) - - # Run the examples - asyncio.run(webrtc_conversation_example()) - asyncio.run(auto_fetch_token_example()) - asyncio.run(compare_connection_types()) - - print("\nAll examples completed!") \ No newline at end of file From e08b69264697be91f978aaf8a36509e6428889a8 Mon Sep 17 00:00:00 2001 From: Angelo Giacco Date: Tue, 16 Sep 2025 13:35:30 +0100 Subject: [PATCH 04/20] v --- README.md | 2 +- pyproject.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 6b364e9e..a76894ad 100644 --- a/README.md +++ b/README.md @@ -351,7 +351,7 @@ conversation = create_webrtc_conversation( ) ``` -**Requirements**: WebRTC conversations require the `livekit` dependency, which is automatically installed with the SDK. All WebRTC conversations must use `AsyncAudioInterface` implementations. +**Requirements**: WebRTC conversations require the `livekit` dependency (`pip install livekit`), which is automatically installed with the SDK. All WebRTC conversations must use `AsyncAudioInterface` implementations. ## Languages Supported diff --git a/pyproject.toml b/pyproject.toml index 12ae36ca..61ded467 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -43,7 +43,7 @@ pydantic-core = ">=2.18.2" requests = ">=2.20" typing_extensions = ">= 4.0.0" websockets = ">=11.0" -livekit = ">=0.15.0" +livekit = ">=1.0.13" [tool.poetry.group.dev.dependencies] mypy = "==1.13.0" From f94788694a5e5766a2e30b418dfeef47b23e6067 Mon Sep 17 00:00:00 2001 From: Angelo Giacco Date: Tue, 16 Sep 2025 13:52:36 +0100 Subject: [PATCH 05/20] fixs --- .../conversational_ai/webrtc_connection.py | 151 ++++++++++++++++-- 1 file changed, 138 insertions(+), 13 deletions(-) diff --git a/src/elevenlabs/conversational_ai/webrtc_connection.py b/src/elevenlabs/conversational_ai/webrtc_connection.py index 685b8470..d62c94d5 100644 --- a/src/elevenlabs/conversational_ai/webrtc_connection.py +++ b/src/elevenlabs/conversational_ai/webrtc_connection.py @@ -1,24 +1,71 @@ import json import asyncio -from typing import Optional, Dict, Any +from typing import Optional, Dict, Any, Callable, Union, Awaitable import httpx from livekit.rtc import Room, TrackKind from .base_connection import BaseConnection +class WebRTCConnectionConfig: + """Configuration for WebRTC connection.""" + def __init__( + self, + conversation_token: Optional[str] = None, + agent_id: Optional[str] = None, + livekit_url: Optional[str] = None, + api_origin: Optional[str] = None, + overrides: Optional[Dict[str, Any]] = None, + on_debug: Optional[Callable[[Dict[str, Any]], None]] = None, + ): + self.conversation_token = conversation_token + self.agent_id = agent_id + self.livekit_url = livekit_url + self.api_origin = api_origin + self.overrides = overrides or {} + self.on_debug = on_debug + + class WebRTCConnection(BaseConnection): """WebRTC-based connection for conversations using LiveKit.""" - LIVEKIT_WS_URL = "wss://livekit.rtc.elevenlabs.io" + DEFAULT_LIVEKIT_WS_URL = "wss://livekit.rtc.elevenlabs.io" + DEFAULT_API_ORIGIN = "https://api.elevenlabs.io" - def __init__(self, conversation_token: Optional[str] = None, agent_id: Optional[str] = None): + def __init__( + self, + conversation_token: Optional[str] = None, + agent_id: Optional[str] = None, + livekit_url: Optional[str] = None, + api_origin: Optional[str] = None, + overrides: Optional[Dict[str, Any]] = None, + on_debug: Optional[Callable[[Dict[str, Any]], None]] = None, + ): super().__init__() self.conversation_token = conversation_token self.agent_id = agent_id + self.livekit_url = livekit_url or self.DEFAULT_LIVEKIT_WS_URL + self.api_origin = api_origin or self.DEFAULT_API_ORIGIN + self.overrides = overrides or {} + self.on_debug = on_debug self._room: Optional[Room] = None self._is_connected = False + @classmethod + async def create(cls, config: WebRTCConnectionConfig) -> "WebRTCConnection": + """Create and connect a WebRTC connection.""" + connection = cls( + conversation_token=config.conversation_token, + agent_id=config.agent_id, + livekit_url=config.livekit_url, + api_origin=config.api_origin, + overrides=config.overrides, + on_debug=config.on_debug, + ) + + await connection.connect() + return connection + async def connect(self) -> None: """Establish the WebRTC connection using LiveKit.""" # Get conversation token if not provided @@ -31,19 +78,26 @@ async def connect(self) -> None: self._room = Room() self._setup_room_callbacks() - # Connect to LiveKit room - await self._room.connect(self.LIVEKIT_WS_URL, self.conversation_token) + # Connect to LiveKit room using configurable URL + await self._room.connect(self.livekit_url, self.conversation_token) self._is_connected = True # Set conversation ID from room name if available if self._room.name: - self.conversation_id = self._room.name + # Extract conversation ID from room name if it contains one + import re + match = re.search(r'(conv_[a-zA-Z0-9]+)', self._room.name) + self.conversation_id = match.group(0) if match else self._room.name else: self.conversation_id = f"webrtc-{id(self)}" # Enable microphone await self._room.local_participant.set_microphone_enabled(True) + # Send overrides if any + if self.overrides: + await self.send_message(self._construct_overrides()) + async def close(self) -> None: """Close the WebRTC connection.""" if self._room: @@ -78,13 +132,24 @@ async def _fetch_conversation_token(self) -> str: if not self.agent_id: raise ValueError("Agent ID is required to fetch conversation token") - url = f"https://api.elevenlabs.io/v1/convai/conversation/token?agent_id={self.agent_id}" + # Get version and source from overrides or use defaults + version = self.overrides.get("client", {}).get("version", "2.15.0") # From pyproject.toml + source = self.overrides.get("client", {}).get("source", "python_sdk") + + # Convert WSS origin to HTTPS for API calls + api_origin = self._convert_wss_to_https(self.api_origin) + + url = f"{api_origin}/v1/convai/conversation/token?agent_id={self.agent_id}&source={source}&version={version}" async with httpx.AsyncClient() as client: response = await client.get(url) if not response.is_success: - raise Exception(f"Failed to fetch conversation token for agent {self.agent_id}: {response.status_code} {response.text}") + error_msg = f"ElevenLabs API returned {response.status_code} {response.reason_phrase}" + if response.status_code == 401: + error_msg = "Your agent has authentication enabled, but no signed URL or conversation token was provided." + + raise Exception(f"Failed to fetch conversation token for agent {self.agent_id}: {error_msg}") data = response.json() token = data.get("token") @@ -94,6 +159,22 @@ async def _fetch_conversation_token(self) -> str: return token + def _convert_wss_to_https(self, origin: str) -> str: + """Convert WSS origin to HTTPS for API calls.""" + return origin.replace("wss://", "https://") + + def _construct_overrides(self) -> Dict[str, Any]: + """Construct overrides message for conversation initiation.""" + return { + "type": "conversation_initiation_client_data", + "overrides": self.overrides + } + + def debug(self, info: Dict[str, Any]) -> None: + """Log debug information.""" + if self.on_debug: + self.on_debug(info) + def _setup_room_callbacks(self) -> None: """Setup LiveKit room event callbacks.""" if not self._room: @@ -101,12 +182,20 @@ def _setup_room_callbacks(self) -> None: @self._room.on("connected") def on_connected() -> None: - print("WebRTC room connected") + self._is_connected = True + self.debug({"type": "webrtc_connected", "message": "WebRTC room connected"}) @self._room.on("disconnected") def on_disconnected(reason: Optional[str] = None) -> None: self._is_connected = False - print(f"WebRTC room disconnected: {reason}") + self.debug({"type": "webrtc_disconnected", "message": f"WebRTC room disconnected: {reason}"}) + + @self._room.on("connection_state_changed") + def on_connection_state_changed(state) -> None: + self.debug({"type": "connection_state_changed", "state": str(state)}) + # Handle disconnected state + if hasattr(state, 'name') and state.name == 'DISCONNECTED': + self._is_connected = False @self._room.on("data_received") def on_data_received(data: bytes, participant) -> None: @@ -119,13 +208,49 @@ def on_data_received(data: bytes, participant) -> None: self._handle_message(message) except (json.JSONDecodeError, UnicodeDecodeError) as e: - print(f"Failed to parse incoming data message: {e}") + self.debug({ + "type": "data_parse_error", + "error": str(e), + "raw_data": data.decode('utf-8', errors='replace') + }) @self._room.on("track_subscribed") def on_track_subscribed(track, publication, participant) -> None: if track.kind == TrackKind.KIND_AUDIO and "agent" in participant.identity: - # Handle incoming agent audio - print("Subscribed to agent audio track") + self.debug({ + "type": "agent_audio_track_subscribed", + "participant": participant.identity + }) + + @self._room.on("active_speakers_changed") + def on_active_speakers_changed(speakers) -> None: + # Update mode based on active speakers + if speakers and len(speakers) > 0: + is_agent_speaking = any("agent" in speaker.identity for speaker in speakers) + mode = "speaking" if is_agent_speaking else "listening" + else: + mode = "listening" + + self.debug({"type": "mode_changed", "mode": mode}) + + async def set_microphone_enabled(self, enabled: bool) -> None: + """Enable or disable the microphone.""" + if not self._room or not self._room.local_participant: + raise RuntimeError("Room not connected") + + await self._room.local_participant.set_microphone_enabled(enabled) + + async def set_microphone_device(self, device_id: str) -> None: + """Set the microphone input device.""" + if not self._room or not self._room.local_participant: + raise RuntimeError("Room not connected") + + # This would require additional LiveKit functionality for device switching + # For now, we log the request + self.debug({ + "type": "microphone_device_change_requested", + "device_id": device_id + }) def get_room(self) -> Optional[Room]: """Get the LiveKit room instance for advanced usage.""" From 9e9e5c986721e9bfd6cbee9ab98942a2a73fc0f0 Mon Sep 17 00:00:00 2001 From: Angelo Giacco Date: Tue, 16 Sep 2025 16:45:55 +0100 Subject: [PATCH 06/20] fic --- .../conversational_ai/connection_factory.py | 19 +- .../conversational_ai/conversation.py | 323 ++++++++++++++++-- .../conversational_ai/conversation_factory.py | 21 +- .../conversational_ai/webrtc_connection.py | 16 + .../conversational_ai/webrtc_conversation.py | 37 +- 5 files changed, 368 insertions(+), 48 deletions(-) diff --git a/src/elevenlabs/conversational_ai/connection_factory.py b/src/elevenlabs/conversational_ai/connection_factory.py index 9e0ca7de..9817c6a6 100644 --- a/src/elevenlabs/conversational_ai/connection_factory.py +++ b/src/elevenlabs/conversational_ai/connection_factory.py @@ -1,8 +1,8 @@ -from typing import Optional +from typing import Optional, Dict, Any, Callable from .base_connection import BaseConnection, ConnectionType from .websocket_connection import WebSocketConnection -from .webrtc_connection import WebRTCConnection +from .webrtc_connection import WebRTCConnection, WebRTCConnectionConfig def create_connection( @@ -10,7 +10,11 @@ def create_connection( *, ws_url: Optional[str] = None, conversation_token: Optional[str] = None, - agent_id: Optional[str] = None + agent_id: Optional[str] = None, + livekit_url: Optional[str] = None, + api_origin: Optional[str] = None, + overrides: Optional[Dict[str, Any]] = None, + on_debug: Optional[Callable[[Dict[str, Any]], None]] = None, ) -> BaseConnection: """Factory function to create connections based on type.""" @@ -20,7 +24,14 @@ def create_connection( return WebSocketConnection(ws_url) elif connection_type == ConnectionType.WEBRTC: - return WebRTCConnection(conversation_token=conversation_token, agent_id=agent_id) + return WebRTCConnection( + conversation_token=conversation_token, + agent_id=agent_id, + livekit_url=livekit_url, + api_origin=api_origin, + overrides=overrides, + on_debug=on_debug, + ) else: raise ValueError(f"Unknown connection type: {connection_type}") diff --git a/src/elevenlabs/conversational_ai/conversation.py b/src/elevenlabs/conversational_ai/conversation.py index eda1996b..5e8c7853 100644 --- a/src/elevenlabs/conversational_ai/conversation.py +++ b/src/elevenlabs/conversational_ai/conversation.py @@ -299,6 +299,10 @@ def __init__( user_id: Optional[str] = None, connection_type: Optional[ConnectionType] = None, conversation_token: Optional[str] = None, + livekit_url: Optional[str] = None, + api_origin: Optional[str] = None, + webrtc_overrides: Optional[dict] = None, + on_debug: Optional[Callable[[dict], None]] = None, ): self.extra_body = extra_body or {} self.conversation_config_override = conversation_config_override or {} @@ -306,6 +310,10 @@ def __init__( self.user_id = user_id self.connection_type = connection_type self.conversation_token = conversation_token + self.livekit_url = livekit_url + self.api_origin = api_origin + self.webrtc_overrides = webrtc_overrides or {} + self.on_debug = on_debug class BaseConversation: @@ -332,6 +340,7 @@ def __init__( self._conversation_id = None self._last_interrupt_id = 0 + self._connection = None def _get_wss_url(self): base_http_url = self.client._client_wrapper.get_base_url() @@ -360,10 +369,36 @@ def _create_connection(self): ws_url = self._get_signed_url() if self.requires_auth else self._get_wss_url() return create_connection(connection_type, ws_url=ws_url) elif connection_type == ConnectionType.WEBRTC: + # Convert base HTTP URL to appropriate origins + base_http_url = self.client._client_wrapper.get_base_url() + + # Use configured URLs or derive from base URL + api_origin = self.config.api_origin or base_http_url + livekit_url = self.config.livekit_url + if not livekit_url: + # Default LiveKit URL if not specified + livekit_url = "wss://livekit.rtc.elevenlabs.io" + + # Merge conversation overrides with WebRTC overrides + overrides = { + **self.config.webrtc_overrides, + "client": { + "version": __version__, + "source": "python_sdk", + }, + "custom_llm_extra_body": self.config.extra_body, + "conversation_config_override": self.config.conversation_config_override, + "dynamic_variables": self.config.dynamic_variables, + } + return create_connection( connection_type, conversation_token=self.config.conversation_token, - agent_id=self.agent_id + agent_id=self.agent_id, + livekit_url=livekit_url, + api_origin=api_origin, + overrides=overrides, + on_debug=self.config.on_debug, ) else: raise ValueError(f"Unsupported connection type: {connection_type}") @@ -563,8 +598,17 @@ def start_session(self): Will run in background thread until `end_session` is called. """ - ws_url = self._get_signed_url() if self.requires_auth else self._get_wss_url() - self._thread = threading.Thread(target=self._run, args=(ws_url,)) + self._connection = self._create_connection() + + connection_type = self._determine_connection_type() + if connection_type == ConnectionType.WEBSOCKET: + ws_url = self._get_signed_url() if self.requires_auth else self._get_wss_url() + self._thread = threading.Thread(target=self._run_websocket, args=(ws_url,)) + elif connection_type == ConnectionType.WEBRTC: + self._thread = threading.Thread(target=self._run_webrtc) + else: + raise ValueError(f"Unsupported connection type: {connection_type}") + self._thread.start() def end_session(self): @@ -574,6 +618,23 @@ def end_session(self): self._ws = None self._should_stop.set() + # Close connection if it exists + if self._connection: + connection_type = self._determine_connection_type() + if connection_type == ConnectionType.WEBRTC: + # For WebRTC, we need to close the connection in an async context + import asyncio + try: + loop = asyncio.get_event_loop() + if loop.is_running(): + asyncio.create_task(self._connection.close()) + else: + loop.run_until_complete(self._connection.close()) + except RuntimeError: + # No event loop running, create a new one + asyncio.run(self._connection.close()) + self._connection = None + if self.callback_end_session: self.callback_end_session() @@ -596,17 +657,33 @@ def send_user_message(self, text: str): text: The text message to send to the agent. Raises: - RuntimeError: If the session is not active or websocket is not connected. + RuntimeError: If the session is not active or connection is not established. """ - if not self._ws: - raise RuntimeError("Session not started or websocket not connected.") + connection_type = self._determine_connection_type() - event = UserMessageClientToOrchestratorEvent(text=text) - try: - self._ws.send(json.dumps(event.to_dict())) - except Exception as e: - print(f"Error sending user message: {e}") - raise + if connection_type == ConnectionType.WEBSOCKET: + if not self._ws: + raise RuntimeError("Session not started or websocket not connected.") + + event = UserMessageClientToOrchestratorEvent(text=text) + try: + self._ws.send(json.dumps(event.to_dict())) + except Exception as e: + print(f"Error sending user message: {e}") + raise + + elif connection_type == ConnectionType.WEBRTC: + if not self._connection: + raise RuntimeError("Session not started or WebRTC connection not established.") + + event = UserMessageClientToOrchestratorEvent(text=text) + try: + # Send through WebRTC connection + import asyncio + asyncio.create_task(self._connection.send_message(event.to_dict())) + except Exception as e: + print(f"Error sending user message: {e}") + raise def register_user_activity(self): """Register user activity to prevent session timeout. @@ -648,7 +725,7 @@ def send_contextual_update(self, text: str): print(f"Error sending contextual update: {e}") raise - def _run(self, ws_url: str): + def _run_websocket(self, ws_url: str): with connect(ws_url, max_size=16 * 1024 * 1024) as ws: self._ws = ws ws.send(self._create_initiation_message()) @@ -686,6 +763,97 @@ def input_callback(audio): self._ws = None + def _run_webrtc(self): + """Run WebRTC conversation session.""" + try: + # Connect to WebRTC + import asyncio + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + async def webrtc_session(): + await self._connection.connect() + self._conversation_id = self._connection.conversation_id + + # Set up message callback + def message_callback(message): + self._handle_webrtc_message(message) + + self._connection.on_message(message_callback) + + # Set up audio input callback + def input_callback(audio): + try: + # Send audio through WebRTC connection + loop.create_task(self._connection.send_audio(audio)) + except Exception as e: + print(f"Error sending user audio chunk: {e}") + self.end_session() + + self.audio_interface.start(input_callback) + + # Keep running until stopped + while not self._should_stop.is_set(): + await asyncio.sleep(0.1) + + await self._connection.close() + + loop.run_until_complete(webrtc_session()) + + except Exception as e: + print(f"WebRTC session error: {e}") + self.end_session() + finally: + loop.close() + + def _handle_webrtc_message(self, message): + """Handle messages from WebRTC connection.""" + class WebRTCMessageHandler: + def __init__(self, conversation): + self.conversation = conversation + self.callback_agent_response = conversation.callback_agent_response + self.callback_agent_response_correction = conversation.callback_agent_response_correction + self.callback_user_transcript = conversation.callback_user_transcript + self.callback_latency_measurement = conversation.callback_latency_measurement + + def handle_audio_output(self, audio): + self.conversation.audio_interface.output(audio) + + def handle_agent_response(self, response): + if self.conversation.callback_agent_response: + self.conversation.callback_agent_response(response) + + def handle_agent_response_correction(self, original, corrected): + if self.conversation.callback_agent_response_correction: + self.conversation.callback_agent_response_correction(original, corrected) + + def handle_user_transcript(self, transcript): + if self.conversation.callback_user_transcript: + self.conversation.callback_user_transcript(transcript) + + def handle_interruption(self): + self.conversation.audio_interface.interrupt() + + def handle_ping(self, event): + # For WebRTC, pings are handled by the connection itself + pass + + def handle_latency_measurement(self, latency): + if self.conversation.callback_latency_measurement: + self.conversation.callback_latency_measurement(latency) + + def handle_client_tool_call(self, tool_name, parameters): + def send_response(response): + if not self.conversation._should_stop.is_set(): + # Send response through WebRTC connection + import asyncio + asyncio.create_task(self.conversation._connection.send_message(response)) + + self.conversation.client_tools.execute_tool(tool_name, parameters, send_response) + + handler = WebRTCMessageHandler(self) + self._handle_message_core(message, handler) + def _handle_message(self, message, ws): class SyncMessageHandler: def __init__(self, conversation, ws): @@ -808,8 +976,16 @@ async def start_session(self): Will run in background task until `end_session` is called. """ - ws_url = self._get_signed_url() if self.requires_auth else self._get_wss_url() - self._task = asyncio.create_task(self._run(ws_url)) + self._connection = self._create_connection() + + connection_type = self._determine_connection_type() + if connection_type == ConnectionType.WEBSOCKET: + ws_url = self._get_signed_url() if self.requires_auth else self._get_wss_url() + self._task = asyncio.create_task(self._run_websocket(ws_url)) + elif connection_type == ConnectionType.WEBRTC: + self._task = asyncio.create_task(self._run_webrtc()) + else: + raise ValueError(f"Unsupported connection type: {connection_type}") async def end_session(self): """Ends the conversation session and cleans up resources.""" @@ -818,6 +994,11 @@ async def end_session(self): self._ws = None self._should_stop.set() + # Close connection if it exists + if self._connection: + await self._connection.close() + self._connection = None + if self.callback_end_session: await self.callback_end_session() @@ -840,17 +1021,31 @@ async def send_user_message(self, text: str): text: The text message to send to the agent. Raises: - RuntimeError: If the session is not active or websocket is not connected. + RuntimeError: If the session is not active or connection is not established. """ - if not self._ws: - raise RuntimeError("Session not started or websocket not connected.") + connection_type = self._determine_connection_type() - event = UserMessageClientToOrchestratorEvent(text=text) - try: - await self._ws.send(json.dumps(event.to_dict())) - except Exception as e: - print(f"Error sending user message: {e}") - raise + if connection_type == ConnectionType.WEBSOCKET: + if not self._ws: + raise RuntimeError("Session not started or websocket not connected.") + + event = UserMessageClientToOrchestratorEvent(text=text) + try: + await self._ws.send(json.dumps(event.to_dict())) + except Exception as e: + print(f"Error sending user message: {e}") + raise + + elif connection_type == ConnectionType.WEBRTC: + if not self._connection: + raise RuntimeError("Session not started or WebRTC connection not established.") + + event = UserMessageClientToOrchestratorEvent(text=text) + try: + await self._connection.send_message(event.to_dict()) + except Exception as e: + print(f"Error sending user message: {e}") + raise async def register_user_activity(self): """Register user activity to prevent session timeout. @@ -892,7 +1087,7 @@ async def send_contextual_update(self, text: str): print(f"Error sending contextual update: {e}") raise - async def _run(self, ws_url: str): + async def _run_websocket(self, ws_url: str): async with websockets.connect(ws_url, max_size=16 * 1024 * 1024) as ws: self._ws = ws await ws.send(self._create_initiation_message()) @@ -934,6 +1129,84 @@ async def input_callback(audio): finally: self._ws = None + async def _run_webrtc(self): + """Run async WebRTC conversation session.""" + try: + await self._connection.connect() + self._conversation_id = self._connection.conversation_id + + # Set up message callback + async def message_callback(message): + await self._handle_webrtc_message(message) + + self._connection.on_message(message_callback) + + # Set up audio input callback + async def input_callback(audio): + try: + await self._connection.send_audio(audio) + except Exception as e: + print(f"Error sending user audio chunk: {e}") + await self.end_session() + + await self.audio_interface.start(input_callback) + + # Keep running until stopped + while not self._should_stop.is_set(): + await asyncio.sleep(0.1) + + await self._connection.close() + + except Exception as e: + print(f"WebRTC session error: {e}") + await self.end_session() + + async def _handle_webrtc_message(self, message): + """Handle messages from WebRTC connection.""" + class AsyncWebRTCMessageHandler: + def __init__(self, conversation): + self.conversation = conversation + self.callback_agent_response = conversation.callback_agent_response + self.callback_agent_response_correction = conversation.callback_agent_response_correction + self.callback_user_transcript = conversation.callback_user_transcript + self.callback_latency_measurement = conversation.callback_latency_measurement + + async def handle_audio_output(self, audio): + await self.conversation.audio_interface.output(audio) + + async def handle_agent_response(self, response): + if self.conversation.callback_agent_response: + await self.conversation.callback_agent_response(response) + + async def handle_agent_response_correction(self, original, corrected): + if self.conversation.callback_agent_response_correction: + await self.conversation.callback_agent_response_correction(original, corrected) + + async def handle_user_transcript(self, transcript): + if self.conversation.callback_user_transcript: + await self.conversation.callback_user_transcript(transcript) + + async def handle_interruption(self): + await self.conversation.audio_interface.interrupt() + + async def handle_ping(self, event): + # For WebRTC, pings are handled by the connection itself + pass + + async def handle_latency_measurement(self, latency): + if self.conversation.callback_latency_measurement: + await self.conversation.callback_latency_measurement(latency) + + def handle_client_tool_call(self, tool_name, parameters): + def send_response(response): + if not self.conversation._should_stop.is_set(): + asyncio.create_task(self.conversation._connection.send_message(response)) + + self.conversation.client_tools.execute_tool(tool_name, parameters, send_response) + + handler = AsyncWebRTCMessageHandler(self) + await self._handle_message_core_async(message, handler) + async def _handle_message(self, message, ws): class AsyncMessageHandler: def __init__(self, conversation, ws): diff --git a/src/elevenlabs/conversational_ai/conversation_factory.py b/src/elevenlabs/conversational_ai/conversation_factory.py index 808ad853..92db21b3 100644 --- a/src/elevenlabs/conversational_ai/conversation_factory.py +++ b/src/elevenlabs/conversational_ai/conversation_factory.py @@ -169,6 +169,10 @@ def create_webrtc_conversation( user_id: Optional[str] = None, *, conversation_token: Optional[str] = None, + livekit_url: Optional[str] = None, + api_origin: Optional[str] = None, + webrtc_overrides: Optional[dict] = None, + on_debug: Optional[Callable[[dict], None]] = None, audio_interface: Optional[AsyncAudioInterface] = None, config: Optional[ConversationInitiationData] = None, client_tools: Optional[ClientTools] = None, @@ -182,20 +186,23 @@ def create_webrtc_conversation( Convenience function for creating WebRTC conversations with type safety. """ - return create_conversation( + return WebRTCConversation( client=client, agent_id=agent_id, user_id=user_id, - connection_type=ConnectionType.WEBRTC, conversation_token=conversation_token, + livekit_url=livekit_url, + api_origin=api_origin, + webrtc_overrides=webrtc_overrides, + on_debug=on_debug, audio_interface=audio_interface, config=config, client_tools=client_tools, - async_callback_agent_response=callback_agent_response, - async_callback_agent_response_correction=callback_agent_response_correction, - async_callback_user_transcript=callback_user_transcript, - async_callback_latency_measurement=callback_latency_measurement, - async_callback_end_session=callback_end_session, + callback_agent_response=callback_agent_response, + callback_agent_response_correction=callback_agent_response_correction, + callback_user_transcript=callback_user_transcript, + callback_latency_measurement=callback_latency_measurement, + callback_end_session=callback_end_session, ) diff --git a/src/elevenlabs/conversational_ai/webrtc_connection.py b/src/elevenlabs/conversational_ai/webrtc_connection.py index d62c94d5..dacf16e4 100644 --- a/src/elevenlabs/conversational_ai/webrtc_connection.py +++ b/src/elevenlabs/conversational_ai/webrtc_connection.py @@ -98,6 +98,11 @@ async def connect(self) -> None: if self.overrides: await self.send_message(self._construct_overrides()) + self.debug({ + "type": "conversation_initiation_client_data", + "message": self._construct_overrides() + }) + async def close(self) -> None: """Close the WebRTC connection.""" if self._room: @@ -127,6 +132,17 @@ async def send_audio(self, audio_data: bytes) -> None: # This method can be used for custom audio streaming if needed pass + async def receive_messages(self) -> None: + """Receive and handle messages - handled by LiveKit event callbacks.""" + # In WebRTC mode, messages are handled via LiveKit event callbacks + # This method exists for compatibility with the BaseConnection interface + if not self._is_connected: + return + + # Keep the connection alive while connected + while self._is_connected: + await asyncio.sleep(0.1) + async def _fetch_conversation_token(self) -> str: """Fetch conversation token from ElevenLabs API.""" if not self.agent_id: diff --git a/src/elevenlabs/conversational_ai/webrtc_conversation.py b/src/elevenlabs/conversational_ai/webrtc_conversation.py index eee05699..453401b3 100644 --- a/src/elevenlabs/conversational_ai/webrtc_conversation.py +++ b/src/elevenlabs/conversational_ai/webrtc_conversation.py @@ -11,7 +11,7 @@ ClientTools ) from .base_connection import ConnectionType -from .webrtc_connection import WebRTCConnection +from .webrtc_connection import WebRTCConnection, WebRTCConnectionConfig class WebRTCConversation(BaseConversation): @@ -28,6 +28,10 @@ def __init__( user_id: Optional[str] = None, *, conversation_token: Optional[str] = None, + livekit_url: Optional[str] = None, + api_origin: Optional[str] = None, + webrtc_overrides: Optional[dict] = None, + on_debug: Optional[Callable[[dict], None]] = None, audio_interface: Optional[AsyncAudioInterface] = None, config: Optional[ConversationInitiationData] = None, client_tools: Optional[ClientTools] = None, @@ -45,6 +49,10 @@ def __init__( user_id: The ID of the user conversing with the agent. conversation_token: Token for WebRTC authentication. If not provided, will be fetched using the agent_id. + livekit_url: Custom LiveKit WebSocket URL. If not provided, uses default. + api_origin: Custom API origin for token fetching. If not provided, uses default. + webrtc_overrides: Additional overrides specific to WebRTC connection. + on_debug: Debug callback function for WebRTC connection events. audio_interface: The async audio interface to use for input and output. config: Configuration for the conversation. client_tools: Client tools for handling agent tool calls. @@ -60,6 +68,10 @@ def __init__( config = ConversationInitiationData() config.connection_type = ConnectionType.WEBRTC config.conversation_token = conversation_token + config.livekit_url = livekit_url + config.api_origin = api_origin + config.webrtc_overrides = webrtc_overrides or {} + config.on_debug = on_debug super().__init__( client=client, @@ -84,11 +96,8 @@ def __init__( async def start_session(self): """Start the WebRTC conversation session.""" try: - # Create WebRTC connection - self._connection = WebRTCConnection( - conversation_token=self.config.conversation_token, - agent_id=self.agent_id - ) + # Use the enhanced connection creation from BaseConversation + self._connection = self._create_connection() # Set up message handler self._connection.on_message(self._handle_message) @@ -96,10 +105,6 @@ async def start_session(self): # Connect await self._connection.connect() - # Send initiation message - initiation_message = json.loads(self._create_initiation_message()) - await self._connection.send_message(initiation_message) - # Update conversation ID self._conversation_id = self._connection.conversation_id @@ -107,10 +112,18 @@ async def start_session(self): if self.audio_interface: await self.audio_interface.start(self._audio_input_callback) - print(f"WebRTC conversation started with ID: {self._conversation_id}") + if self.config.on_debug: + self.config.on_debug({ + "type": "webrtc_conversation_started", + "conversation_id": self._conversation_id + }) except Exception as e: - print(f"Failed to start WebRTC session: {e}") + if self.config.on_debug: + self.config.on_debug({ + "type": "webrtc_session_start_error", + "error": str(e) + }) raise async def end_session(self): From 3e4224bd2c0387a760c8ffa8a8e10b1fa4d1b617 Mon Sep 17 00:00:00 2001 From: Angelo Giacco Date: Thu, 18 Sep 2025 11:49:23 +0100 Subject: [PATCH 07/20] bump --- poetry.lock | 117 ++++++++++++++++++++++++++++++++++++++++++++++++- pyproject.toml | 2 +- 2 files changed, 117 insertions(+), 2 deletions(-) diff --git a/poetry.lock b/poetry.lock index 10ee395f..0dc691e1 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,5 +1,16 @@ # This file is automatically @generated by Poetry 1.8.5 and should not be changed by hand. +[[package]] +name = "aiofiles" +version = "24.1.0" +description = "File support for asyncio." +optional = false +python-versions = ">=3.8" +files = [ + {file = "aiofiles-24.1.0-py3-none-any.whl", hash = "sha256:b4ec55f4195e3eb5d7abd1bf7e061763e864dd4954231fb8539a0ef8bb8260e5"}, + {file = "aiofiles-24.1.0.tar.gz", hash = "sha256:22a075c9e5a3810f0c2e48f3008c94d68c65d763b9b03857924c99e57355166c"}, +] + [[package]] name = "annotated-types" version = "0.7.0" @@ -244,6 +255,27 @@ files = [ {file = "iniconfig-2.1.0.tar.gz", hash = "sha256:3abbd2e30b36733fee78f9c7f7308f2d0050e88f0087fd25c2645f63c773e1c7"}, ] +[[package]] +name = "livekit" +version = "1.0.13" +description = "Python Real-time SDK for LiveKit" +optional = false +python-versions = ">=3.9.0" +files = [ + {file = "livekit-1.0.13-py3-none-macosx_10_9_x86_64.whl", hash = "sha256:7174723d75544e6942e1c1a99fb297bfee538d0f7b9bd3f3cdebf06e42a72abc"}, + {file = "livekit-1.0.13-py3-none-macosx_11_0_arm64.whl", hash = "sha256:ef1f641bc622c0b15adf0e91dfc62740d20db51d09369d3a7f84e8314b0ce067"}, + {file = "livekit-1.0.13-py3-none-manylinux_2_28_aarch64.whl", hash = "sha256:d40a8b9d5cc931736e82bb723e1ae27436e0b2d20b0217627341030400784dc2"}, + {file = "livekit-1.0.13-py3-none-manylinux_2_28_x86_64.whl", hash = "sha256:d73bb327a1a711b09e0b39d574fb04af9b2f38381c6267330df8a713e44e1be3"}, + {file = "livekit-1.0.13-py3-none-win_amd64.whl", hash = "sha256:bbb2d17203d74991aac23a5d0519e33984f8b0c0d53b2182c837086742d1b813"}, + {file = "livekit-1.0.13.tar.gz", hash = "sha256:eb50b59b7320b1e960ea8f71b8e52fb832fb867e42806845659918dbe13e6a10"}, +] + +[package.dependencies] +aiofiles = ">=24" +numpy = ">=1.26" +protobuf = ">=4.25.0" +types-protobuf = ">=3" + [[package]] name = "mypy" version = "1.13.0" @@ -308,6 +340,60 @@ files = [ {file = "mypy_extensions-1.1.0.tar.gz", hash = "sha256:52e68efc3284861e772bbcd66823fde5ae21fd2fdb51c62a211403730b916558"}, ] +[[package]] +name = "numpy" +version = "2.0.2" +description = "Fundamental package for array computing in Python" +optional = false +python-versions = ">=3.9" +files = [ + {file = "numpy-2.0.2-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:51129a29dbe56f9ca83438b706e2e69a39892b5eda6cedcb6b0c9fdc9b0d3ece"}, + {file = "numpy-2.0.2-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:f15975dfec0cf2239224d80e32c3170b1d168335eaedee69da84fbe9f1f9cd04"}, + {file = "numpy-2.0.2-cp310-cp310-macosx_14_0_arm64.whl", hash = "sha256:8c5713284ce4e282544c68d1c3b2c7161d38c256d2eefc93c1d683cf47683e66"}, + {file = "numpy-2.0.2-cp310-cp310-macosx_14_0_x86_64.whl", hash = "sha256:becfae3ddd30736fe1889a37f1f580e245ba79a5855bff5f2a29cb3ccc22dd7b"}, + {file = "numpy-2.0.2-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:2da5960c3cf0df7eafefd806d4e612c5e19358de82cb3c343631188991566ccd"}, + {file = "numpy-2.0.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:496f71341824ed9f3d2fd36cf3ac57ae2e0165c143b55c3a035ee219413f3318"}, + {file = "numpy-2.0.2-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:a61ec659f68ae254e4d237816e33171497e978140353c0c2038d46e63282d0c8"}, + {file = "numpy-2.0.2-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:d731a1c6116ba289c1e9ee714b08a8ff882944d4ad631fd411106a30f083c326"}, + {file = "numpy-2.0.2-cp310-cp310-win32.whl", hash = "sha256:984d96121c9f9616cd33fbd0618b7f08e0cfc9600a7ee1d6fd9b239186d19d97"}, + {file = "numpy-2.0.2-cp310-cp310-win_amd64.whl", hash = "sha256:c7b0be4ef08607dd04da4092faee0b86607f111d5ae68036f16cc787e250a131"}, + {file = "numpy-2.0.2-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:49ca4decb342d66018b01932139c0961a8f9ddc7589611158cb3c27cbcf76448"}, + {file = "numpy-2.0.2-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:11a76c372d1d37437857280aa142086476136a8c0f373b2e648ab2c8f18fb195"}, + {file = "numpy-2.0.2-cp311-cp311-macosx_14_0_arm64.whl", hash = "sha256:807ec44583fd708a21d4a11d94aedf2f4f3c3719035c76a2bbe1fe8e217bdc57"}, + {file = "numpy-2.0.2-cp311-cp311-macosx_14_0_x86_64.whl", hash = "sha256:8cafab480740e22f8d833acefed5cc87ce276f4ece12fdaa2e8903db2f82897a"}, + {file = "numpy-2.0.2-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a15f476a45e6e5a3a79d8a14e62161d27ad897381fecfa4a09ed5322f2085669"}, + {file = "numpy-2.0.2-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:13e689d772146140a252c3a28501da66dfecd77490b498b168b501835041f951"}, + {file = "numpy-2.0.2-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:9ea91dfb7c3d1c56a0e55657c0afb38cf1eeae4544c208dc465c3c9f3a7c09f9"}, + {file = "numpy-2.0.2-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:c1c9307701fec8f3f7a1e6711f9089c06e6284b3afbbcd259f7791282d660a15"}, + {file = "numpy-2.0.2-cp311-cp311-win32.whl", hash = "sha256:a392a68bd329eafac5817e5aefeb39038c48b671afd242710b451e76090e81f4"}, + {file = "numpy-2.0.2-cp311-cp311-win_amd64.whl", hash = "sha256:286cd40ce2b7d652a6f22efdfc6d1edf879440e53e76a75955bc0c826c7e64dc"}, + {file = "numpy-2.0.2-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:df55d490dea7934f330006d0f81e8551ba6010a5bf035a249ef61a94f21c500b"}, + {file = "numpy-2.0.2-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:8df823f570d9adf0978347d1f926b2a867d5608f434a7cff7f7908c6570dcf5e"}, + {file = "numpy-2.0.2-cp312-cp312-macosx_14_0_arm64.whl", hash = "sha256:9a92ae5c14811e390f3767053ff54eaee3bf84576d99a2456391401323f4ec2c"}, + {file = "numpy-2.0.2-cp312-cp312-macosx_14_0_x86_64.whl", hash = "sha256:a842d573724391493a97a62ebbb8e731f8a5dcc5d285dfc99141ca15a3302d0c"}, + {file = "numpy-2.0.2-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:c05e238064fc0610c840d1cf6a13bf63d7e391717d247f1bf0318172e759e692"}, + {file = "numpy-2.0.2-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:0123ffdaa88fa4ab64835dcbde75dcdf89c453c922f18dced6e27c90d1d0ec5a"}, + {file = "numpy-2.0.2-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:96a55f64139912d61de9137f11bf39a55ec8faec288c75a54f93dfd39f7eb40c"}, + {file = "numpy-2.0.2-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:ec9852fb39354b5a45a80bdab5ac02dd02b15f44b3804e9f00c556bf24b4bded"}, + {file = "numpy-2.0.2-cp312-cp312-win32.whl", hash = "sha256:671bec6496f83202ed2d3c8fdc486a8fc86942f2e69ff0e986140339a63bcbe5"}, + {file = "numpy-2.0.2-cp312-cp312-win_amd64.whl", hash = "sha256:cfd41e13fdc257aa5778496b8caa5e856dc4896d4ccf01841daee1d96465467a"}, + {file = "numpy-2.0.2-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:9059e10581ce4093f735ed23f3b9d283b9d517ff46009ddd485f1747eb22653c"}, + {file = "numpy-2.0.2-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:423e89b23490805d2a5a96fe40ec507407b8ee786d66f7328be214f9679df6dd"}, + {file = "numpy-2.0.2-cp39-cp39-macosx_14_0_arm64.whl", hash = "sha256:2b2955fa6f11907cf7a70dab0d0755159bca87755e831e47932367fc8f2f2d0b"}, + {file = "numpy-2.0.2-cp39-cp39-macosx_14_0_x86_64.whl", hash = "sha256:97032a27bd9d8988b9a97a8c4d2c9f2c15a81f61e2f21404d7e8ef00cb5be729"}, + {file = "numpy-2.0.2-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:1e795a8be3ddbac43274f18588329c72939870a16cae810c2b73461c40718ab1"}, + {file = "numpy-2.0.2-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f26b258c385842546006213344c50655ff1555a9338e2e5e02a0756dc3e803dd"}, + {file = "numpy-2.0.2-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:5fec9451a7789926bcf7c2b8d187292c9f93ea30284802a0ab3f5be8ab36865d"}, + {file = "numpy-2.0.2-cp39-cp39-musllinux_1_2_aarch64.whl", hash = "sha256:9189427407d88ff25ecf8f12469d4d39d35bee1db5d39fc5c168c6f088a6956d"}, + {file = "numpy-2.0.2-cp39-cp39-win32.whl", hash = "sha256:905d16e0c60200656500c95b6b8dca5d109e23cb24abc701d41c02d74c6b3afa"}, + {file = "numpy-2.0.2-cp39-cp39-win_amd64.whl", hash = "sha256:a3f4ab0caa7f053f6797fcd4e1e25caee367db3112ef2b6ef82d749530768c73"}, + {file = "numpy-2.0.2-pp39-pypy39_pp73-macosx_10_9_x86_64.whl", hash = "sha256:7f0a0c6f12e07fa94133c8a67404322845220c06a9e80e85999afe727f7438b8"}, + {file = "numpy-2.0.2-pp39-pypy39_pp73-macosx_14_0_x86_64.whl", hash = "sha256:312950fdd060354350ed123c0e25a71327d3711584beaef30cdaa93320c392d4"}, + {file = "numpy-2.0.2-pp39-pypy39_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:26df23238872200f63518dd2aa984cfca675d82469535dc7162dc2ee52d9dd5c"}, + {file = "numpy-2.0.2-pp39-pypy39_pp73-win_amd64.whl", hash = "sha256:a46288ec55ebbd58947d31d72be2c63cbf839f0a63b49cb755022310792a3385"}, + {file = "numpy-2.0.2.tar.gz", hash = "sha256:883c987dee1880e2a864ab0dc9892292582510604156762362d9326444636e78"}, +] + [[package]] name = "packaging" version = "25.0" @@ -334,6 +420,24 @@ files = [ dev = ["pre-commit", "tox"] testing = ["pytest", "pytest-benchmark"] +[[package]] +name = "protobuf" +version = "6.32.1" +description = "" +optional = false +python-versions = ">=3.9" +files = [ + {file = "protobuf-6.32.1-cp310-abi3-win32.whl", hash = "sha256:a8a32a84bc9f2aad712041b8b366190f71dde248926da517bde9e832e4412085"}, + {file = "protobuf-6.32.1-cp310-abi3-win_amd64.whl", hash = "sha256:b00a7d8c25fa471f16bc8153d0e53d6c9e827f0953f3c09aaa4331c718cae5e1"}, + {file = "protobuf-6.32.1-cp39-abi3-macosx_10_9_universal2.whl", hash = "sha256:d8c7e6eb619ffdf105ee4ab76af5a68b60a9d0f66da3ea12d1640e6d8dab7281"}, + {file = "protobuf-6.32.1-cp39-abi3-manylinux2014_aarch64.whl", hash = "sha256:2f5b80a49e1eb7b86d85fcd23fe92df154b9730a725c3b38c4e43b9d77018bf4"}, + {file = "protobuf-6.32.1-cp39-abi3-manylinux2014_x86_64.whl", hash = "sha256:b1864818300c297265c83a4982fd3169f97122c299f56a56e2445c3698d34710"}, + {file = "protobuf-6.32.1-cp39-cp39-win32.whl", hash = "sha256:68ff170bac18c8178f130d1ccb94700cf72852298e016a2443bdb9502279e5f1"}, + {file = "protobuf-6.32.1-cp39-cp39-win_amd64.whl", hash = "sha256:d0975d0b2f3e6957111aa3935d08a0eb7e006b1505d825f862a1fffc8348e122"}, + {file = "protobuf-6.32.1-py3-none-any.whl", hash = "sha256:2601b779fc7d32a866c6b4404f9d42a3f67c5b9f3f15b4db3cccabe06b95c346"}, + {file = "protobuf-6.32.1.tar.gz", hash = "sha256:ee2469e4a021474ab9baafea6cd070e5bf27c7d29433504ddea1a4ee5850f68d"}, +] + [[package]] name = "pyaudio" version = "0.2.14" @@ -656,6 +760,17 @@ files = [ {file = "tomli-2.2.1.tar.gz", hash = "sha256:cd45e1dc79c835ce60f7404ec8119f2eb06d38b1deba146f07ced3bbc44505ff"}, ] +[[package]] +name = "types-protobuf" +version = "6.32.1.20250918" +description = "Typing stubs for protobuf" +optional = false +python-versions = ">=3.9" +files = [ + {file = "types_protobuf-6.32.1.20250918-py3-none-any.whl", hash = "sha256:22ba6133d142d11cc34d3788ad6dead2732368ebb0406eaa7790ea6ae46c8d0b"}, + {file = "types_protobuf-6.32.1.20250918.tar.gz", hash = "sha256:44ce0ae98475909ca72379946ab61a4435eec2a41090821e713c17e8faf5b88f"}, +] + [[package]] name = "types-pyaudio" version = "0.2.16.20240516" @@ -807,4 +922,4 @@ pyaudio = ["pyaudio"] [metadata] lock-version = "2.0" python-versions = "^3.8" -content-hash = "b7da141a4d5cf0383830ab57e4341326054c70f2d67cc19d189d4cd1cbaf21c0" +content-hash = "e7cb27516e124f02d2e959c2e4986786d7542bfe81362376069ca3ee3c7b0255" diff --git a/pyproject.toml b/pyproject.toml index 61ded467..92b62e57 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -43,7 +43,7 @@ pydantic-core = ">=2.18.2" requests = ">=2.20" typing_extensions = ">= 4.0.0" websockets = ">=11.0" -livekit = ">=1.0.13" +livekit = { version = "^1.0.13", python = ">=3.9" } [tool.poetry.group.dev.dependencies] mypy = "==1.13.0" From bd926c09ed2bba415fe26b925190294cc12768fe Mon Sep 17 00:00:00 2001 From: Angelo Giacco Date: Thu, 18 Sep 2025 12:01:56 +0100 Subject: [PATCH 08/20] bump python ci runner --- .github/workflows/tests.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 897ae427..10579d92 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -10,7 +10,7 @@ jobs: - name: Set up python uses: actions/setup-python@v4 with: - python-version: 3.8 + python-version: 3.9 - name: Bootstrap poetry run: | curl -sSL https://install.python-poetry.org | python - -y --version 1.5.1 @@ -26,7 +26,7 @@ jobs: - name: Set up python uses: actions/setup-python@v4 with: - python-version: 3.8 + python-version: 3.9 - name: Bootstrap poetry run: | curl -sSL https://install.python-poetry.org | python - -y --version 1.5.1 From c40a6f9fe7b19dab4f243981c4a35a4ed3f1d1c1 Mon Sep 17 00:00:00 2001 From: Angelo Giacco Date: Thu, 18 Sep 2025 12:02:04 +0100 Subject: [PATCH 09/20] more bump --- .github/workflows/ci.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 767ee8c8..35f6ab2f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -10,7 +10,7 @@ jobs: - name: Set up python uses: actions/setup-python@v4 with: - python-version: 3.8 + python-version: 3.9 - name: Bootstrap poetry run: | curl -sSL https://install.python-poetry.org | python - -y --version 1.5.1 @@ -26,7 +26,7 @@ jobs: - name: Set up python uses: actions/setup-python@v4 with: - python-version: 3.8 + python-version: 3.9 - name: Bootstrap poetry run: | curl -sSL https://install.python-poetry.org | python - -y --version 1.5.1 @@ -48,7 +48,7 @@ jobs: - name: Set up python uses: actions/setup-python@v4 with: - python-version: 3.8 + python-version: 3.9 - name: Bootstrap poetry run: | curl -sSL https://install.python-poetry.org | python - -y --version 1.5.1 From f9e8069af34f74516756b6ec8153f7f7688505fe Mon Sep 17 00:00:00 2001 From: Angelo Giacco Date: Thu, 18 Sep 2025 12:49:35 +0100 Subject: [PATCH 10/20] nits --- .../conversational_ai/base_connection.py | 16 ++ .../conversational_ai/conversation.py | 43 +++-- .../conversational_ai/webrtc_connection.py | 179 ++++++++++++------ 3 files changed, 167 insertions(+), 71 deletions(-) diff --git a/src/elevenlabs/conversational_ai/base_connection.py b/src/elevenlabs/conversational_ai/base_connection.py index dde8a29d..5c37d5a9 100644 --- a/src/elevenlabs/conversational_ai/base_connection.py +++ b/src/elevenlabs/conversational_ai/base_connection.py @@ -39,6 +39,22 @@ async def send_audio(self, audio_data: bytes) -> None: """Send audio data through the connection.""" pass + def send_message_sync(self, message: dict) -> None: + """Send a message synchronously (for compatibility with sync code).""" + import asyncio + try: + # Try to get the current event loop + loop = asyncio.get_event_loop() + if loop.is_running(): + # If loop is running, create a task + asyncio.create_task(self.send_message(message)) + else: + # If loop is not running, run the coroutine + loop.run_until_complete(self.send_message(message)) + except RuntimeError: + # No event loop, create new one + asyncio.run(self.send_message(message)) + def on_message(self, callback: Callable[[dict], Union[None, Awaitable[None]]]) -> None: """Set the message callback.""" self._on_message_callback = callback diff --git a/src/elevenlabs/conversational_ai/conversation.py b/src/elevenlabs/conversational_ai/conversation.py index 5e8c7853..c3dc2616 100644 --- a/src/elevenlabs/conversational_ai/conversation.py +++ b/src/elevenlabs/conversational_ai/conversation.py @@ -622,17 +622,27 @@ def end_session(self): if self._connection: connection_type = self._determine_connection_type() if connection_type == ConnectionType.WEBRTC: - # For WebRTC, we need to close the connection in an async context import asyncio try: - loop = asyncio.get_event_loop() - if loop.is_running(): - asyncio.create_task(self._connection.close()) - else: - loop.run_until_complete(self._connection.close()) - except RuntimeError: - # No event loop running, create a new one - asyncio.run(self._connection.close()) + try: + loop = asyncio.get_event_loop() + if loop.is_running(): + task = asyncio.create_task(self._connection.close()) + else: + asyncio.wait_for( + loop.run_until_complete(self._connection.close()), + timeout=5.0 + ) + except RuntimeError: + async def cleanup(): + await asyncio.wait_for(self._connection.close(), timeout=5.0) + + asyncio.run(cleanup()) + + except asyncio.TimeoutError: + print("Warning: WebRTC connection cleanup timed out") + except Exception as e: + print(f"Warning: Error during WebRTC connection cleanup: {e}") self._connection = None if self.callback_end_session: @@ -678,9 +688,7 @@ def send_user_message(self, text: str): event = UserMessageClientToOrchestratorEvent(text=text) try: - # Send through WebRTC connection - import asyncio - asyncio.create_task(self._connection.send_message(event.to_dict())) + self._connection.send_message_sync(event.to_dict()) except Exception as e: print(f"Error sending user message: {e}") raise @@ -768,8 +776,15 @@ def _run_webrtc(self): try: # Connect to WebRTC import asyncio - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) + + try: + loop = asyncio.get_event_loop() + if loop.is_closed(): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + except RuntimeError: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) async def webrtc_session(): await self._connection.connect() diff --git a/src/elevenlabs/conversational_ai/webrtc_connection.py b/src/elevenlabs/conversational_ai/webrtc_connection.py index dacf16e4..88e078ef 100644 --- a/src/elevenlabs/conversational_ai/webrtc_connection.py +++ b/src/elevenlabs/conversational_ai/webrtc_connection.py @@ -2,7 +2,14 @@ import asyncio from typing import Optional, Dict, Any, Callable, Union, Awaitable import httpx -from livekit.rtc import Room, TrackKind + +try: + from livekit.rtc import Room, TrackKind +except ImportError: + raise ImportError( + "livekit package is required for WebRTC support. " + "Install with: pip install livekit" + ) from .base_connection import BaseConnection @@ -17,7 +24,7 @@ def __init__( api_origin: Optional[str] = None, overrides: Optional[Dict[str, Any]] = None, on_debug: Optional[Callable[[Dict[str, Any]], None]] = None, - ): + ) -> None: self.conversation_token = conversation_token self.agent_id = agent_id self.livekit_url = livekit_url @@ -40,7 +47,7 @@ def __init__( api_origin: Optional[str] = None, overrides: Optional[Dict[str, Any]] = None, on_debug: Optional[Callable[[Dict[str, Any]], None]] = None, - ): + ) -> None: super().__init__() self.conversation_token = conversation_token self.agent_id = agent_id @@ -49,7 +56,7 @@ def __init__( self.overrides = overrides or {} self.on_debug = on_debug self._room: Optional[Room] = None - self._is_connected = False + self._is_connected: bool = False @classmethod async def create(cls, config: WebRTCConnectionConfig) -> "WebRTCConnection": @@ -68,46 +75,82 @@ async def create(cls, config: WebRTCConnectionConfig) -> "WebRTCConnection": async def connect(self) -> None: """Establish the WebRTC connection using LiveKit.""" - # Get conversation token if not provided - if not self.conversation_token: - if not self.agent_id: - raise ValueError("Either conversation_token or agent_id is required for WebRTC connection") - self.conversation_token = await self._fetch_conversation_token() - - # Create room and connect - self._room = Room() - self._setup_room_callbacks() - - # Connect to LiveKit room using configurable URL - await self._room.connect(self.livekit_url, self.conversation_token) - self._is_connected = True - - # Set conversation ID from room name if available - if self._room.name: - # Extract conversation ID from room name if it contains one - import re - match = re.search(r'(conv_[a-zA-Z0-9]+)', self._room.name) - self.conversation_id = match.group(0) if match else self._room.name - else: - self.conversation_id = f"webrtc-{id(self)}" - - # Enable microphone - await self._room.local_participant.set_microphone_enabled(True) - - # Send overrides if any - if self.overrides: - await self.send_message(self._construct_overrides()) + try: + # Get conversation token if not provided + if not self.conversation_token: + if not self.agent_id: + raise ValueError("Either conversation_token or agent_id is required for WebRTC connection") + self.conversation_token = await self._fetch_conversation_token() - self.debug({ - "type": "conversation_initiation_client_data", - "message": self._construct_overrides() - }) + # Create room and connect + self._room = Room() + self._setup_room_callbacks() + + # Connect to LiveKit room using configurable URL + try: + await self._room.connect(self.livekit_url, self.conversation_token) + self._is_connected = True + except Exception as e: + self._is_connected = False + raise ConnectionError(f"Failed to connect to LiveKit room: {e}") from e + + # Set conversation ID from room name if available + if self._room.name: + # Extract conversation ID from room name if it contains one + import re + match = re.search(r'(conv_[a-zA-Z0-9]+)', self._room.name) + self.conversation_id = match.group(0) if match else self._room.name + else: + self.conversation_id = f"webrtc-{id(self)}" + + # Enable microphone + try: + await self._room.local_participant.set_microphone_enabled(True) + except Exception as e: + self.debug({ + "type": "microphone_enable_error", + "error": str(e) + }) + # Don't fail the connection for microphone issues + + # Send overrides if any + if self.overrides: + try: + await self.send_message(self._construct_overrides()) + except Exception as e: + self.debug({ + "type": "overrides_send_error", + "error": str(e) + }) + + self.debug({ + "type": "conversation_initiation_client_data", + "message": self._construct_overrides() + }) + + except Exception as e: + # Ensure cleanup on connection failure + if self._room: + try: + await self._room.disconnect() + except: + pass + self._room = None + self._is_connected = False + raise async def close(self) -> None: """Close the WebRTC connection.""" if self._room: - await self._room.disconnect() - self._room = None + try: + await self._room.disconnect() + except Exception as e: + self.debug({ + "type": "disconnect_error", + "error": str(e) + }) + finally: + self._room = None self._is_connected = False async def send_message(self, message: dict) -> None: @@ -148,32 +191,54 @@ async def _fetch_conversation_token(self) -> str: if not self.agent_id: raise ValueError("Agent ID is required to fetch conversation token") - # Get version and source from overrides or use defaults - version = self.overrides.get("client", {}).get("version", "2.15.0") # From pyproject.toml - source = self.overrides.get("client", {}).get("source", "python_sdk") + try: + # Get version and source from overrides or use defaults + version = self.overrides.get("client", {}).get("version", "2.15.0") # From pyproject.toml + source = self.overrides.get("client", {}).get("source", "python_sdk") + + # Convert WSS origin to HTTPS for API calls + api_origin = self._convert_wss_to_https(self.api_origin) - # Convert WSS origin to HTTPS for API calls - api_origin = self._convert_wss_to_https(self.api_origin) + url = f"{api_origin}/v1/convai/conversation/token?agent_id={self.agent_id}&source={source}&version={version}" - url = f"{api_origin}/v1/convai/conversation/token?agent_id={self.agent_id}&source={source}&version={version}" + async with httpx.AsyncClient(timeout=30.0) as client: + try: + response = await client.get(url) + except httpx.TimeoutException: + raise ConnectionError(f"Timeout when fetching conversation token for agent {self.agent_id}") + except httpx.NetworkError as e: + raise ConnectionError(f"Network error when fetching conversation token: {e}") - async with httpx.AsyncClient() as client: - response = await client.get(url) + if not response.is_success: + error_msg = f"ElevenLabs API returned {response.status_code} {response.reason_phrase}" + if response.status_code == 401: + error_msg = "Your agent has authentication enabled, but no signed URL or conversation token was provided." + elif response.status_code == 404: + error_msg = f"Agent with ID {self.agent_id} not found" + elif response.status_code == 429: + error_msg = "Rate limit exceeded. Please try again later." - if not response.is_success: - error_msg = f"ElevenLabs API returned {response.status_code} {response.reason_phrase}" - if response.status_code == 401: - error_msg = "Your agent has authentication enabled, but no signed URL or conversation token was provided." + raise Exception(f"Failed to fetch conversation token for agent {self.agent_id}: {error_msg}") - raise Exception(f"Failed to fetch conversation token for agent {self.agent_id}: {error_msg}") + try: + data = response.json() + except Exception as e: + raise Exception(f"Invalid JSON response from API: {e}") - data = response.json() - token = data.get("token") + token = data.get("token") - if not token: - raise Exception("No conversation token received from API") + if not token: + raise Exception("No conversation token received from API") - return token + return token + + except Exception as e: + self.debug({ + "type": "token_fetch_error", + "agent_id": self.agent_id, + "error": str(e) + }) + raise def _convert_wss_to_https(self, origin: str) -> str: """Convert WSS origin to HTTPS for API calls.""" From 177398befd799b53ab2f0cd6a71e89a978cf587e Mon Sep 17 00:00:00 2001 From: Angelo Giacco Date: Thu, 18 Sep 2025 14:35:43 +0100 Subject: [PATCH 11/20] update webrtc --- .../conversational_ai/webrtc_conversation.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/src/elevenlabs/conversational_ai/webrtc_conversation.py b/src/elevenlabs/conversational_ai/webrtc_conversation.py index 453401b3..3d278501 100644 --- a/src/elevenlabs/conversational_ai/webrtc_conversation.py +++ b/src/elevenlabs/conversational_ai/webrtc_conversation.py @@ -90,12 +90,15 @@ def __init__( self.callback_end_session = callback_end_session self._connection: Optional[WebRTCConnection] = None - self._should_stop = asyncio.Event() + self._should_stop: Optional[asyncio.Event] = None self._session_task: Optional[asyncio.Task] = None async def start_session(self): """Start the WebRTC conversation session.""" try: + # Initialize the stop event + if self._should_stop is None: + self._should_stop = asyncio.Event() # Use the enhanced connection creation from BaseConversation self._connection = self._create_connection() @@ -128,7 +131,8 @@ async def start_session(self): async def end_session(self): """End the WebRTC conversation session.""" - self._should_stop.set() + if self._should_stop: + self._should_stop.set() if self.audio_interface: await self.audio_interface.stop() @@ -176,7 +180,7 @@ async def register_user_activity(self): async def _audio_input_callback(self, audio_data: bytes): """Handle audio input from the audio interface.""" - if self._connection and not self._should_stop.is_set(): + if self._connection and self._should_stop and not self._should_stop.is_set(): # For WebRTC, audio is sent through the room's microphone track # This callback can be used for custom processing if needed pass @@ -224,7 +228,8 @@ async def _handle_message(self, message: dict): "type": "pong", "event_id": event["event_id"] } - await self._connection.send_message(pong_message) + if self._connection: + await self._connection.send_message(pong_message) if self.callback_latency_measurement and event.get("ping_ms"): await self.callback_latency_measurement(int(event["ping_ms"])) @@ -239,7 +244,7 @@ async def _handle_message(self, message: dict): # Execute tool asynchronously async def send_response(response): - if not self._should_stop.is_set(): + if self._should_stop and not self._should_stop.is_set(): await self._connection.send_message(response) self.client_tools.execute_tool(tool_name, parameters, send_response) From ac0721225cfb201426ed2f09c7646204fb21f41f Mon Sep 17 00:00:00 2001 From: Angelo Giacco Date: Thu, 18 Sep 2025 14:35:50 +0100 Subject: [PATCH 12/20] type --- src/elevenlabs/conversational_ai/base_connection.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/elevenlabs/conversational_ai/base_connection.py b/src/elevenlabs/conversational_ai/base_connection.py index 5c37d5a9..5a160c50 100644 --- a/src/elevenlabs/conversational_ai/base_connection.py +++ b/src/elevenlabs/conversational_ai/base_connection.py @@ -14,9 +14,9 @@ class ConnectionType(str, Enum): class BaseConnection(ABC): """Base class for conversation connections.""" - def __init__(self): + def __init__(self) -> None: self.conversation_id: Optional[str] = None - self._message_queue = [] + self._message_queue: list[dict] = [] self._on_message_callback: Optional[Callable[[dict], Union[None, Awaitable[None]]]] = None @abstractmethod From 6619235e6b62cd1331591398d82b07b8d8ed5063 Mon Sep 17 00:00:00 2001 From: Angelo Giacco Date: Thu, 18 Sep 2025 14:37:28 +0100 Subject: [PATCH 13/20] fix test --- tests/test_webrtc_conversation.py | 73 +++++++++++++++++++------------ 1 file changed, 46 insertions(+), 27 deletions(-) diff --git a/tests/test_webrtc_conversation.py b/tests/test_webrtc_conversation.py index a46b10b8..0f6ef497 100644 --- a/tests/test_webrtc_conversation.py +++ b/tests/test_webrtc_conversation.py @@ -53,13 +53,21 @@ def test_connection_type_determination(self): def test_factory_creates_correct_conversation_types(self, mock_client): """Test that the factory creates the correct conversation types.""" - # WebRTC conversation - webrtc_conv = create_conversation( - client=mock_client, - agent_id="test-agent", - connection_type=ConnectionType.WEBRTC - ) - assert isinstance(webrtc_conv, WebRTCConversation) + # Create event loop for WebRTC conversation + import asyncio + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + try: + # WebRTC conversation + webrtc_conv = create_conversation( + client=mock_client, + agent_id="test-agent", + connection_type=ConnectionType.WEBRTC + ) + assert isinstance(webrtc_conv, WebRTCConversation) + finally: + loop.close() # WebSocket conversation (sync) ws_conv = create_conversation( @@ -71,23 +79,31 @@ def test_factory_creates_correct_conversation_types(self, mock_client): def test_convenience_functions(self, mock_client, mock_audio_interface): """Test convenience functions for creating conversations.""" - # WebRTC convenience function with conversation token to avoid HTTP calls - with patch('elevenlabs.conversational_ai.webrtc_connection.Room') as mock_room_class: - mock_room = Mock() - mock_room.connect = AsyncMock() - mock_room.disconnect = AsyncMock() - mock_room.local_participant = Mock() - mock_room.local_participant.set_microphone_enabled = AsyncMock() - mock_room.name = "test-room" - mock_room_class.return_value = mock_room - - webrtc_conv = create_webrtc_conversation( - client=mock_client, - agent_id="test-agent", - conversation_token="test-token", - audio_interface=mock_audio_interface - ) - assert isinstance(webrtc_conv, WebRTCConversation) + # Create event loop for WebRTC conversation + import asyncio + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + try: + # WebRTC convenience function with conversation token to avoid HTTP calls + with patch('elevenlabs.conversational_ai.webrtc_connection.Room') as mock_room_class: + mock_room = Mock() + mock_room.connect = AsyncMock() + mock_room.disconnect = AsyncMock() + mock_room.local_participant = Mock() + mock_room.local_participant.set_microphone_enabled = AsyncMock() + mock_room.name = "test-room" + mock_room_class.return_value = mock_room + + webrtc_conv = create_webrtc_conversation( + client=mock_client, + agent_id="test-agent", + conversation_token="test-token", + audio_interface=mock_audio_interface + ) + assert isinstance(webrtc_conv, WebRTCConversation) + finally: + loop.close() # WebSocket convenience function ws_conv = create_websocket_conversation( @@ -190,9 +206,12 @@ async def test_webrtc_connection_token_fetch(self): token = await connection._fetch_conversation_token() assert token == "fetched-token" - mock_client.get.assert_called_once_with( - "https://api.elevenlabs.io/v1/convai/conversation/token?agent_id=test-agent" - ) + # Verify the call was made with required parameters + call_args = mock_client.get.call_args[0][0] + assert call_args.startswith("https://api.elevenlabs.io/v1/convai/conversation/token?") + assert "agent_id=test-agent" in call_args + assert "source=python_sdk" in call_args + assert "version=" in call_args @pytest.mark.asyncio async def test_webrtc_connection_token_fetch_error(self): From 7f955b356c812d485fbcabf55c3cd7368d989071 Mon Sep 17 00:00:00 2001 From: Angelo Giacco Date: Thu, 18 Sep 2025 14:37:39 +0100 Subject: [PATCH 14/20] types --- .../conversational_ai/conversation_factory.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/elevenlabs/conversational_ai/conversation_factory.py b/src/elevenlabs/conversational_ai/conversation_factory.py index 92db21b3..b13b88d0 100644 --- a/src/elevenlabs/conversational_ai/conversation_factory.py +++ b/src/elevenlabs/conversational_ai/conversation_factory.py @@ -128,7 +128,7 @@ def create_conversation( agent_id=agent_id, user_id=user_id, requires_auth=requires_auth, - audio_interface=audio_interface, + audio_interface=audio_interface, # type: ignore config=config, client_tools=client_tools, callback_agent_response=async_callback_agent_response, @@ -147,7 +147,7 @@ def create_conversation( agent_id=agent_id, user_id=user_id, requires_auth=requires_auth, - audio_interface=audio_interface, + audio_interface=audio_interface, # type: ignore config=config, client_tools=client_tools, callback_agent_response=callback_agent_response, @@ -220,12 +220,12 @@ def create_websocket_conversation( callback_user_transcript: Optional[Callable[[str], None]] = None, callback_latency_measurement: Optional[Callable[[int], None]] = None, callback_end_session: Optional[Callable] = None, -) -> Conversation: +) -> Union[Conversation, AsyncConversation]: """Create a WebSocket conversation. Convenience function for creating WebSocket conversations with type safety. """ - return create_conversation( + result = create_conversation( client=client, agent_id=agent_id, user_id=user_id, @@ -239,4 +239,5 @@ def create_websocket_conversation( callback_user_transcript=callback_user_transcript, callback_latency_measurement=callback_latency_measurement, callback_end_session=callback_end_session, - ) \ No newline at end of file + ) + return result \ No newline at end of file From d882c0a03f14f7ca662300183df1cccae403a84e Mon Sep 17 00:00:00 2001 From: Angelo Giacco Date: Thu, 18 Sep 2025 14:39:40 +0100 Subject: [PATCH 15/20] fix and debug --- .../conversational_ai/conversation.py | 4 +-- .../conversational_ai/webrtc_connection.py | 30 +++++++++++++++++-- 2 files changed, 30 insertions(+), 4 deletions(-) diff --git a/src/elevenlabs/conversational_ai/conversation.py b/src/elevenlabs/conversational_ai/conversation.py index c3dc2616..7cd0b69e 100644 --- a/src/elevenlabs/conversational_ai/conversation.py +++ b/src/elevenlabs/conversational_ai/conversation.py @@ -13,7 +13,7 @@ from ..base_client import BaseElevenLabs from ..version import __version__ -from .base_connection import ConnectionType +from .base_connection import ConnectionType, BaseConnection from .connection_factory import create_connection, determine_connection_type @@ -340,7 +340,7 @@ def __init__( self._conversation_id = None self._last_interrupt_id = 0 - self._connection = None + self._connection: Optional[BaseConnection] = None def _get_wss_url(self): base_http_url = self.client._client_wrapper.get_base_url() diff --git a/src/elevenlabs/conversational_ai/webrtc_connection.py b/src/elevenlabs/conversational_ai/webrtc_connection.py index 88e078ef..33391e76 100644 --- a/src/elevenlabs/conversational_ai/webrtc_connection.py +++ b/src/elevenlabs/conversational_ai/webrtc_connection.py @@ -106,12 +106,19 @@ async def connect(self) -> None: # Enable microphone try: await self._room.local_participant.set_microphone_enabled(True) + except AttributeError: + try: + await self._room.local_participant.enable_microphone() + except AttributeError: + self.debug({ + "type": "microphone_enable_error", + "error": "Neither set_microphone_enabled nor enable_microphone methods available" + }) except Exception as e: self.debug({ "type": "microphone_enable_error", "error": str(e) }) - # Don't fail the connection for microphone issues # Send overrides if any if self.overrides: @@ -319,7 +326,26 @@ async def set_microphone_enabled(self, enabled: bool) -> None: if not self._room or not self._room.local_participant: raise RuntimeError("Room not connected") - await self._room.local_participant.set_microphone_enabled(enabled) + try: + await self._room.local_participant.set_microphone_enabled(enabled) + except AttributeError: + try: + if enabled: + await self._room.local_participant.enable_microphone() + else: + await self._room.local_participant.disable_microphone() + except AttributeError: + self.debug({ + "type": "microphone_control_error", + "enabled": enabled, + "error": "Microphone control methods not available" + }) + except Exception as e: + self.debug({ + "type": "microphone_control_error", + "enabled": enabled, + "error": str(e) + }) async def set_microphone_device(self, device_id: str) -> None: """Set the microphone input device.""" From 5579bf0d757d35fe97de0786bdbd128d8a6cba8b Mon Sep 17 00:00:00 2001 From: Angelo Giacco Date: Thu, 18 Sep 2025 15:21:15 +0100 Subject: [PATCH 16/20] fix conversation --- src/elevenlabs/conversational_ai/conversation.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/elevenlabs/conversational_ai/conversation.py b/src/elevenlabs/conversational_ai/conversation.py index 7cd0b69e..78fc80d6 100644 --- a/src/elevenlabs/conversational_ai/conversation.py +++ b/src/elevenlabs/conversational_ai/conversation.py @@ -598,13 +598,12 @@ def start_session(self): Will run in background thread until `end_session` is called. """ - self._connection = self._create_connection() - connection_type = self._determine_connection_type() if connection_type == ConnectionType.WEBSOCKET: ws_url = self._get_signed_url() if self.requires_auth else self._get_wss_url() self._thread = threading.Thread(target=self._run_websocket, args=(ws_url,)) elif connection_type == ConnectionType.WEBRTC: + self._connection = self._create_connection() self._thread = threading.Thread(target=self._run_webrtc) else: raise ValueError(f"Unsupported connection type: {connection_type}") From 406964e2cdd19c4ff4b599b4bd9abb1a14f4397c Mon Sep 17 00:00:00 2001 From: Angelo Giacco Date: Thu, 18 Sep 2025 17:03:03 +0100 Subject: [PATCH 17/20] fix --- src/elevenlabs/conversational_ai/conversation.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/elevenlabs/conversational_ai/conversation.py b/src/elevenlabs/conversational_ai/conversation.py index 78fc80d6..31cfd0f7 100644 --- a/src/elevenlabs/conversational_ai/conversation.py +++ b/src/elevenlabs/conversational_ai/conversation.py @@ -990,13 +990,12 @@ async def start_session(self): Will run in background task until `end_session` is called. """ - self._connection = self._create_connection() - connection_type = self._determine_connection_type() if connection_type == ConnectionType.WEBSOCKET: ws_url = self._get_signed_url() if self.requires_auth else self._get_wss_url() self._task = asyncio.create_task(self._run_websocket(ws_url)) elif connection_type == ConnectionType.WEBRTC: + self._connection = self._create_connection() self._task = asyncio.create_task(self._run_webrtc()) else: raise ValueError(f"Unsupported connection type: {connection_type}") From 755a1026709929732d2fe4e3a65fec3801f6affa Mon Sep 17 00:00:00 2001 From: Angelo Giacco Date: Thu, 18 Sep 2025 17:19:45 +0100 Subject: [PATCH 18/20] fux --- .../conversational_ai/webrtc_connection.py | 57 +++++++++---------- 1 file changed, 28 insertions(+), 29 deletions(-) diff --git a/src/elevenlabs/conversational_ai/webrtc_connection.py b/src/elevenlabs/conversational_ai/webrtc_connection.py index 33391e76..09750a39 100644 --- a/src/elevenlabs/conversational_ai/webrtc_connection.py +++ b/src/elevenlabs/conversational_ai/webrtc_connection.py @@ -105,15 +105,12 @@ async def connect(self) -> None: # Enable microphone try: - await self._room.local_participant.set_microphone_enabled(True) - except AttributeError: - try: - await self._room.local_participant.enable_microphone() - except AttributeError: - self.debug({ - "type": "microphone_enable_error", - "error": "Neither set_microphone_enabled nor enable_microphone methods available" - }) + await self._enable_microphone(True) + except Exception as e: + self.debug({ + "type": "microphone_enable_error", + "error": f"Failed to enable microphone: {e}" + }) except Exception as e: self.debug({ "type": "microphone_enable_error", @@ -326,26 +323,28 @@ async def set_microphone_enabled(self, enabled: bool) -> None: if not self._room or not self._room.local_participant: raise RuntimeError("Room not connected") - try: - await self._room.local_participant.set_microphone_enabled(enabled) - except AttributeError: - try: - if enabled: - await self._room.local_participant.enable_microphone() - else: - await self._room.local_participant.disable_microphone() - except AttributeError: - self.debug({ - "type": "microphone_control_error", - "enabled": enabled, - "error": "Microphone control methods not available" - }) - except Exception as e: - self.debug({ - "type": "microphone_control_error", - "enabled": enabled, - "error": str(e) - }) + await self._enable_microphone(enabled) + + async def _enable_microphone(self, enabled: bool) -> None: + """Internal method to enable/disable microphone via track muting.""" + if not self._room or not self._room.local_participant: + raise RuntimeError("Room not connected") + + # Find the audio track publication + for track_pub in self._room.local_participant.track_publications.values(): + if track_pub.kind == TrackKind.KIND_AUDIO: + if track_pub.track: + if enabled: + await track_pub.track.unmute() + else: + await track_pub.track.mute() + return + + self.debug({ + "type": "microphone_control_error", + "enabled": enabled, + "error": "No audio track found" + }) async def set_microphone_device(self, device_id: str) -> None: """Set the microphone input device.""" From 43da41092514081133e463196d3e75870bbb511f Mon Sep 17 00:00:00 2001 From: Angelo Giacco Date: Thu, 18 Sep 2025 17:24:03 +0100 Subject: [PATCH 19/20] fix mypy --- src/elevenlabs/conversational_ai/conversation_factory.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/elevenlabs/conversational_ai/conversation_factory.py b/src/elevenlabs/conversational_ai/conversation_factory.py index b13b88d0..f91edc80 100644 --- a/src/elevenlabs/conversational_ai/conversation_factory.py +++ b/src/elevenlabs/conversational_ai/conversation_factory.py @@ -240,4 +240,4 @@ def create_websocket_conversation( callback_latency_measurement=callback_latency_measurement, callback_end_session=callback_end_session, ) - return result \ No newline at end of file + return result # type: ignore[return-value] \ No newline at end of file From 44692f892beb959f5eb9108ce446c5dfec7f69ad Mon Sep 17 00:00:00 2001 From: Angelo Giacco Date: Thu, 18 Sep 2025 17:35:15 +0100 Subject: [PATCH 20/20] add location --- .../conversational_ai/conversation.py | 11 +++- .../conversational_ai/conversation_factory.py | 44 +++++++++++++--- .../conversational_ai/location_utils.py | 50 +++++++++++++++++++ 3 files changed, 96 insertions(+), 9 deletions(-) create mode 100644 src/elevenlabs/conversational_ai/location_utils.py diff --git a/src/elevenlabs/conversational_ai/conversation.py b/src/elevenlabs/conversational_ai/conversation.py index 31cfd0f7..a95991b9 100644 --- a/src/elevenlabs/conversational_ai/conversation.py +++ b/src/elevenlabs/conversational_ai/conversation.py @@ -15,6 +15,7 @@ from ..version import __version__ from .base_connection import ConnectionType, BaseConnection from .connection_factory import create_connection, determine_connection_type +from .location_utils import Location, get_origin_for_location class ClientToOrchestratorEvent(str, Enum): @@ -299,6 +300,7 @@ def __init__( user_id: Optional[str] = None, connection_type: Optional[ConnectionType] = None, conversation_token: Optional[str] = None, + location: Optional[Location] = None, livekit_url: Optional[str] = None, api_origin: Optional[str] = None, webrtc_overrides: Optional[dict] = None, @@ -310,6 +312,7 @@ def __init__( self.user_id = user_id self.connection_type = connection_type self.conversation_token = conversation_token + self.location = location self.livekit_url = livekit_url self.api_origin = api_origin self.webrtc_overrides = webrtc_overrides or {} @@ -343,8 +346,12 @@ def __init__( self._connection: Optional[BaseConnection] = None def _get_wss_url(self): - base_http_url = self.client._client_wrapper.get_base_url() - base_ws_url = base_http_url.replace("https://", "wss://").replace("http://", "ws://") + # Use location-based URL if location is specified + if self.config.location is not None: + base_ws_url = get_origin_for_location(self.config.location) + else: + base_http_url = self.client._client_wrapper.get_base_url() + base_ws_url = base_http_url.replace("https://", "wss://").replace("http://", "ws://") return f"{base_ws_url}/v1/convai/conversation?agent_id={self.agent_id}&source=python_sdk&version={__version__}" def _get_signed_url(self): diff --git a/src/elevenlabs/conversational_ai/conversation_factory.py b/src/elevenlabs/conversational_ai/conversation_factory.py index f91edc80..33260b90 100644 --- a/src/elevenlabs/conversational_ai/conversation_factory.py +++ b/src/elevenlabs/conversational_ai/conversation_factory.py @@ -11,6 +11,7 @@ ) from .webrtc_conversation import WebRTCConversation from .base_connection import ConnectionType +from .location_utils import Location, get_origin_for_location, get_livekit_url_for_location def create_conversation( @@ -21,6 +22,7 @@ def create_conversation( connection_type: ConnectionType = ConnectionType.WEBSOCKET, conversation_token: Optional[str] = None, requires_auth: bool = True, + location: Optional[Location] = None, audio_interface: Optional[Union[AudioInterface, AsyncAudioInterface]] = None, config: Optional[ConversationInitiationData] = None, client_tools: Optional[ClientTools] = None, @@ -46,6 +48,7 @@ def create_conversation( connection_type: Type of connection (websocket or webrtc) conversation_token: Token for WebRTC authentication requires_auth: Whether authentication is required + location: Data residency location (us, eu-residency, in-residency, global) audio_interface: Audio interface for the conversation config: Conversation configuration client_tools: Client tools for handling agent calls @@ -63,23 +66,22 @@ def create_conversation( audio_interface=your_audio_interface ) - # WebRTC conversation + # WebRTC conversation with EU residency conversation = create_conversation( client=client, agent_id="your-agent-id", connection_type=ConnectionType.WEBRTC, - conversation_token="your-token", # Optional, will fetch if not provided + location=Location.EU_RESIDENCY, audio_interface=your_async_audio_interface, async_callback_agent_response=your_response_handler ) - # Public agent (no auth required) + # WebSocket conversation with specific location conversation = create_conversation( client=client, - agent_id="public-agent-id", - connection_type=ConnectionType.WEBRTC, - requires_auth=False, - audio_interface=your_async_audio_interface + agent_id="your-agent-id", + location=Location.IN_RESIDENCY, + audio_interface=your_audio_interface ) """ @@ -90,17 +92,29 @@ def create_conversation( config.connection_type = connection_type if conversation_token: config.conversation_token = conversation_token + if location is not None: + config.location = location if connection_type == ConnectionType.WEBRTC: # Create WebRTC conversation if not isinstance(audio_interface, AsyncAudioInterface) and audio_interface is not None: raise ValueError("WebRTC conversations require an AsyncAudioInterface") + # Determine URLs based on location + livekit_url = None + api_origin = None + if location is not None: + livekit_url = get_livekit_url_for_location(location) + # Convert WSS to HTTPS for API origin + api_origin = get_origin_for_location(location).replace("wss://", "https://") + return WebRTCConversation( client=client, agent_id=agent_id, user_id=user_id, conversation_token=conversation_token, + livekit_url=livekit_url, + api_origin=api_origin, audio_interface=audio_interface, config=config, client_tools=client_tools, @@ -169,6 +183,7 @@ def create_webrtc_conversation( user_id: Optional[str] = None, *, conversation_token: Optional[str] = None, + location: Optional[Location] = None, livekit_url: Optional[str] = None, api_origin: Optional[str] = None, webrtc_overrides: Optional[dict] = None, @@ -185,7 +200,17 @@ def create_webrtc_conversation( """Create a WebRTC conversation. Convenience function for creating WebRTC conversations with type safety. + + Args: + location: Data residency location. If provided, overrides livekit_url and api_origin. + livekit_url: Custom LiveKit URL (overridden by location if provided). + api_origin: Custom API origin (overridden by location if provided). """ + # Determine URLs based on location if provided + if location is not None: + livekit_url = get_livekit_url_for_location(location) + api_origin = get_origin_for_location(location).replace("wss://", "https://") + return WebRTCConversation( client=client, agent_id=agent_id, @@ -212,6 +237,7 @@ def create_websocket_conversation( user_id: Optional[str] = None, *, requires_auth: bool = True, + location: Optional[Location] = None, audio_interface: Optional[AudioInterface] = None, config: Optional[ConversationInitiationData] = None, client_tools: Optional[ClientTools] = None, @@ -224,6 +250,9 @@ def create_websocket_conversation( """Create a WebSocket conversation. Convenience function for creating WebSocket conversations with type safety. + + Args: + location: Data residency location (us, eu-residency, in-residency, global) """ result = create_conversation( client=client, @@ -231,6 +260,7 @@ def create_websocket_conversation( user_id=user_id, connection_type=ConnectionType.WEBSOCKET, requires_auth=requires_auth, + location=location, audio_interface=audio_interface, config=config, client_tools=client_tools, diff --git a/src/elevenlabs/conversational_ai/location_utils.py b/src/elevenlabs/conversational_ai/location_utils.py new file mode 100644 index 00000000..0ff95111 --- /dev/null +++ b/src/elevenlabs/conversational_ai/location_utils.py @@ -0,0 +1,50 @@ +from enum import Enum +from typing import Dict + + +class Location(Enum): + """Location enum for data residency and region selection.""" + US = "us" + EU_RESIDENCY = "eu-residency" + IN_RESIDENCY = "in-residency" + GLOBAL = "global" + + +def get_origin_for_location(location: Location) -> str: + """ + Get the WebSocket API origin URL for a given location. + + Args: + location: The location enum value + + Returns: + The WebSocket URL for the specified location + """ + origin_map: Dict[Location, str] = { + Location.US: "wss://api.elevenlabs.io", + Location.EU_RESIDENCY: "wss://api.eu.residency.elevenlabs.io", + Location.IN_RESIDENCY: "wss://api.in.residency.elevenlabs.io", + Location.GLOBAL: "wss://api.elevenlabs.io", + } + + return origin_map[location] + + +def get_livekit_url_for_location(location: Location) -> str: + """ + Get the LiveKit WebRTC URL for a given location. + + Args: + location: The location enum value + + Returns: + The LiveKit URL for the specified location + """ + livekit_url_map: Dict[Location, str] = { + Location.US: "wss://livekit.rtc.elevenlabs.io", + Location.EU_RESIDENCY: "wss://livekit.rtc.eu.residency.elevenlabs.io", + Location.IN_RESIDENCY: "wss://livekit.rtc.in.residency.elevenlabs.io", + Location.GLOBAL: "wss://livekit.rtc.elevenlabs.io", + } + + return livekit_url_map[location] \ No newline at end of file