Skip to content
5 changes: 3 additions & 2 deletions getstream/video/rtc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ async def discover_location():


async def join(
call: Call, user_id: Optional[str] = None, create=True, **kwargs
call: Call, user_id: Optional[str] = None, create=True, fast_join=False, **kwargs
) -> ConnectionManager:
"""
Join a call. This method will:
Expand All @@ -60,14 +60,15 @@ async def join(
call: The call to join
user_id: The user id to join with
create: Whether to create the call if it doesn't exist
fast_join: Whether to use fast join (default: False)
**kwargs: Additional arguments to pass to the join call request

Returns:
A ConnectionManager object that can be used as a context manager
"""
# Return ConnectionManager instance that handles everything internally
# when used as an async context manager and async iterator
return ConnectionManager(call=call, user_id=user_id, create=create, **kwargs)
return ConnectionManager(call=call, user_id=user_id, create=create, fast_join=fast_join, **kwargs)


__all__ = [
Expand Down
141 changes: 120 additions & 21 deletions getstream/video/rtc/connection_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import aiortc

from getstream.common import telemetry
from getstream.stream_response import StreamResponse
from getstream.utils import StreamAsyncIOEventEmitter
from getstream.video.rtc.coordinator.ws import StreamAPIWS
from getstream.video.rtc.pb.stream.video.sfu.event import events_pb2
Expand All @@ -22,6 +23,7 @@
ConnectionOptions,
connect_websocket,
join_call,
fast_join_call,
watch_call,
)
from getstream.video.rtc.track_util import (
Expand Down Expand Up @@ -53,6 +55,7 @@ def __init__(
user_id: Optional[str] = None,
create: bool = True,
subscription_config: Optional[SubscriptionConfig] = None,
fast_join: bool = False,
**kwargs: Any,
):
super().__init__()
Expand All @@ -61,6 +64,7 @@ def __init__(
self.call: Call = call
self.user_id: Optional[str] = user_id
self.create: bool = create
self.fast_join: bool = fast_join
self.kwargs: Dict[str, Any] = kwargs
self.running: bool = False
self.session_id: str = str(uuid.uuid4())
Expand Down Expand Up @@ -269,21 +273,38 @@ async def _connect_internal(
"coordinator-join-call",
) as span:
if not (ws_url or token):
join_response = await join_call(
self.call,
self.user_id,
"auto",
self.create,
self.local_sfu,
**self.kwargs,
)
ws_url = join_response.data.credentials.server.ws_endpoint
token = join_response.data.credentials.token
self.join_response = join_response
logger.debug(f"coordinator join response: {join_response.data}")
span.set_attribute(
"credentials", join_response.data.credentials.to_json()
)
if self.fast_join:
# Use fast join to get multiple edge credentials
fast_join_response = await fast_join_call(
self.call,
self.user_id,
"auto",
self.create,
self.local_sfu,
**self.kwargs,
)
logger.debug(
f"Received {len(fast_join_response.data.credentials)} edge credentials for fast join"
)

self._fast_join_response = fast_join_response
else:
# Use regular join
join_response = await join_call(
self.call,
self.user_id,
"auto",
self.create,
self.local_sfu,
**self.kwargs,
)
ws_url = join_response.data.credentials.server.ws_endpoint
token = join_response.data.credentials.token
self.join_response = join_response
logger.debug(f"coordinator join response: {join_response.data}")
span.set_attribute(
"credentials", join_response.data.credentials.to_json()
)

# Use provided session_id or current one
current_session_id = session_id or self.session_id
Expand All @@ -295,12 +316,38 @@ async def _connect_internal(
with telemetry.start_as_current_span(
"sfu-signaling-ws-connect",
) as span:
self._ws_client, sfu_event = await connect_websocket(
token=token,
ws_url=ws_url,
session_id=current_session_id,
options=self._connection_options,
)
# Handle fast join or regular join
if self.fast_join and hasattr(self, "_fast_join_response"):
# Fast join - race multiple edges
self._ws_client, sfu_event, selected_cred = await self._race_edges(
self._fast_join_response.data.credentials, current_session_id
)

# Use the selected credentials
ws_url = selected_cred.server.ws_endpoint
token = selected_cred.token

#map it to standard join call object so that retry/migration can happen
self.join_response = StreamResponse(
response=self._fast_join_response._StreamResponse__response,
data=JoinCallResponse(
call=self._fast_join_response.data.call,
members=self._fast_join_response.data.members,
credentials=selected_cred,
stats_options=self._fast_join_response.data.stats_options,
duration=self._fast_join_response.data.duration,
)
)

span.set_attribute("credentials", selected_cred.to_json())
else:
# Regular join - connect to single edge
self._ws_client, sfu_event = await connect_websocket(
token=token,
ws_url=ws_url,
session_id=current_session_id,
options=self._connection_options,
)
Comment on lines +320 to +350
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Accessing private attribute via name mangling is fragile.

Line 332 accesses self._fast_join_response._StreamResponse__response using Python name mangling to reach a private attribute. This is brittle and will break if the StreamResponse class is refactored.

Consider one of these alternatives:

  1. Add a public accessor method to StreamResponse:
# In getstream/stream_response.py
@property
def response(self) -> httpx.Response:
    """Returns the underlying httpx Response."""
    return self.__response
  1. Store the httpx.Response separately when creating _fast_join_response:
# Store the response separately
self._fast_join_http_response = fast_join_response._StreamResponse__response  # or use accessor
  1. Allow StreamResponse to be constructed without the httpx response if it's not needed:
# Make response optional in StreamResponse constructor
self.join_response = JoinCallResponse(
    call=self._fast_join_response.data.call,
    # ... rest of mapping
)
# Wrap it later if needed
🤖 Prompt for AI Agents
In getstream/video/rtc/connection_manager.py around lines 320 to 350, the code
is directly accessing a mangled private attribute
self._fast_join_response._StreamResponse__response which is brittle; change the
usage to rely on a public API: add a public property or accessor on
StreamResponse (e.g. response -> returns underlying httpx.Response) and then
replace the mangled access with self._fast_join_response.response;
alternatively, if exposing the underlying http response is undesirable, store
the httpx.Response separately when creating _fast_join_response or refactor
StreamResponse to allow construction without the httpx.Response and map
join_response without needing that private attribute.


self._ws_client.on_wildcard("*", _log_event)
self._ws_client.on_event("ice_trickle", self._on_ice_trickle)
Expand Down Expand Up @@ -530,3 +577,55 @@ async def _restore_published_tracks(self):
await self._peer_manager.restore_published_tracks()
except Exception as e:
logger.error("Failed to restore published tracks", exc_info=e)

async def _race_edges(self, credentials_list, session_id):
"""Try multiple edge WebSocket connections sequentially and return the first successful one.

This method iterates through edge URLs one by one, attempting to connect to each.
The first edge that successfully connects is used, and the iteration stops.

Args:
credentials_list: List of Credentials to try
session_id: Session ID for the connection

Returns:
Tuple of (WebSocket client, SFU event, selected Credentials)

Raises:
SfuConnectionError: If all edge connections fail
"""
if not credentials_list:
raise SfuConnectionError("No edge credentials provided for racing")

logger.info(f"Trying {len(credentials_list)} edge connections sequentially")

errors = []

# Try each edge sequentially
for cred in credentials_list:
logger.debug(f"Trying edge {cred.server.edge_name} at {cred.server.ws_endpoint}")

try:
# Attempt to connect to this edge
ws_client, sfu_event = await connect_websocket(
token=cred.token,
ws_url=cred.server.ws_endpoint,
session_id=session_id,
options=self._connection_options,
)

# Success! Return the connection and credentials
logger.info(
f"Edge {cred.server.edge_name} connected successfully"
)
return ws_client, sfu_event, cred

except Exception as e:
errors.append((cred.server.edge_name, str(e)))
# Continue to next edge

# All connections failed
error_msg = "All edge connections failed:\n" + "\n".join(
f" - {edge}: {error}" for edge, error in errors
)
raise SfuConnectionError(error_msg)
Comment on lines +581 to +631
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Sequential iteration defeats the purpose of "racing" connections.

The _race_edges method tries connections sequentially (one after another), not in parallel. This contradicts the PR's stated goal of "racing" connections to find the fastest edge and the method name suggests parallel racing.

Sequential attempts mean:

  • Total time = sum of all timeouts for failed edges before success
  • No latency benefit compared to trying a single edge
  • The "fast join" feature doesn't deliver on its promise

Implement true parallel racing using asyncio.wait with FIRST_COMPLETED:

async def _race_edges(self, credentials_list, session_id):
    """Race multiple edge WebSocket connections in parallel and return the first successful one.
    
    Args:
        credentials_list: List of Credentials to try
        session_id: Session ID for the connection
        
    Returns:
        Tuple of (WebSocket client, SFU event, selected Credentials)
        
    Raises:
        SfuConnectionError: If all edge connections fail
    """
    if not credentials_list:
        raise SfuConnectionError("No edge credentials provided for racing")
    
    logger.info(f"Racing {len(credentials_list)} edge connections in parallel")
    
    # Create tasks for all edges
    async def try_edge(cred):
        try:
            ws_client, sfu_event = await connect_websocket(
                token=cred.token,
                ws_url=cred.server.ws_endpoint,
                session_id=session_id,
                options=self._connection_options,
            )
            return (ws_client, sfu_event, cred, None)
        except Exception as e:
            return (None, None, cred, e)
    
    tasks = [asyncio.create_task(try_edge(cred)) for cred in credentials_list]
    
    try:
        # Wait for the first successful connection
        done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
        
        # Check if we got a success
        for task in done:
            ws_client, sfu_event, cred, error = task.result()
            if error is None:
                # Success! Cancel remaining tasks
                for t in pending:
                    t.cancel()
                logger.info(f"Edge {cred.server.edge_name} won the race")
                return ws_client, sfu_event, cred
        
        # First task completed but failed, keep waiting
        while pending:
            done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
            for task in done:
                ws_client, sfu_event, cred, error = task.result()
                if error is None:
                    # Success! Cancel remaining tasks
                    for t in pending:
                        t.cancel()
                    logger.info(f"Edge {cred.server.edge_name} connected successfully")
                    return ws_client, sfu_event, cred
        
        # All failed - gather errors
        errors = []
        for task in tasks:
            _, _, cred, error = task.result()
            if error:
                errors.append((cred.server.edge_name, str(error)))
        
        error_msg = "All edge connections failed:\n" + "\n".join(
            f"  - {edge}: {error}" for edge, error in errors
        )
        raise SfuConnectionError(error_msg)
        
    finally:
        # Ensure all tasks are cleaned up
        for task in tasks:
            if not task.done():
                task.cancel()
🤖 Prompt for AI Agents
In getstream/video/rtc/connection_manager.py around lines 581 to 631, the
_race_edges implementation attempts connections sequentially which defeats the
"race" semantics; replace the loop with parallel tasks that each try to connect
(wrap per-edge connect in an async helper and asyncio.create_task), use
asyncio.wait(return_when=FIRST_COMPLETED) to capture the first successful
connection, on success cancel remaining pending tasks and return the ws_client,
sfu_event and credentials, and if all tasks fail gather their exceptions and
raise SfuConnectionError with the combined error message; ensure tasks are
cancelled/cleaned up in a finally block and maintain the existing logging.

60 changes: 59 additions & 1 deletion getstream/video/rtc/connection_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from getstream.models import CallRequest
from getstream.utils import build_body_dict, build_query_param
from getstream.video.async_call import Call
from getstream.video.rtc.models import JoinCallResponse
from getstream.video.rtc.models import JoinCallResponse, FastJoinCallResponse
from getstream.video.rtc.pb.stream.video.sfu.event import events_pb2
from getstream.video.rtc.pb.stream.video.sfu.models.models_pb2 import (
TRACK_TYPE_AUDIO,
Expand All @@ -41,6 +41,7 @@
"SfuConnectionError",
"ConnectionOptions",
"join_call",
"fast_join_call",
"join_call_coordinator_request",
"create_join_request",
"prepare_video_track_info",
Expand Down Expand Up @@ -157,6 +158,63 @@ async def join_call(
raise SfuConnectionError(f"Failed to join call: {e}")


async def fast_join_call(
call: Call,
user_id: str,
location: str,
create: bool,
local_sfu: bool,
**kwargs,
) -> StreamResponse[FastJoinCallResponse]:
"""Join call via coordinator API using fast join to get multiple edge credentials.

This function requests multiple edge URLs from the coordinator. The caller
is responsible for racing these edges to find the fastest connection.

Args:
call: The call to join
user_id: The user ID to join the call with
location: The preferred location
create: Whether to create the call if it doesn't exist
local_sfu: Whether to use local SFU for development
**kwargs: Additional arguments to pass to the join call request

Returns:
A StreamResponse containing FastJoinCallResponse with multiple edge credentials

Raises:
SfuConnectionError: If the coordinator request fails
"""
try:
# Import here to avoid circular dependency
from getstream.video.rtc.coordinator_api import fast_join_call_coordinator_request

# Get multiple edge credentials from coordinator
fast_join_response = await fast_join_call_coordinator_request(
call,
user_id,
location=location,
create=create,
**kwargs,
)

if local_sfu:
# Override all credentials with local SFU for development
for cred in fast_join_response.data.credentials:
cred.server.url = "http://127.0.0.1:3031/twirp"
cred.server.ws_endpoint = "ws://127.0.0.1:3031/ws"

logger.debug(
f"Received {len(fast_join_response.data.credentials)} edge credentials for fast join"
)

return fast_join_response

except Exception as e:
logger.error(f"Failed to fast join call via coordinator: {e}")
raise SfuConnectionError(f"Failed to fast join call: {e}")


async def join_call_coordinator_request(
call: Call,
user_id: str,
Expand Down
67 changes: 66 additions & 1 deletion getstream/video/rtc/coordinator_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from getstream.utils import build_body_dict

# Import the types we need from __init__ without creating circular imports
from getstream.video.rtc.models import JoinCallResponse
from getstream.video.rtc.models import JoinCallResponse, FastJoinCallResponse

logger = logging.getLogger("getstream.video.rtc.coordinator")

Expand Down Expand Up @@ -79,3 +79,68 @@ async def join_call_coordinator_request(
path_params=path_params,
json=json_body,
)


async def fast_join_call_coordinator_request(
call: Call,
user_id: str,
create: bool = False,
data: Optional[CallRequest] = None,
ring: Optional[bool] = None,
notify: Optional[bool] = None,
video: Optional[bool] = None,
location: Optional[str] = None,
) -> StreamResponse[FastJoinCallResponse]:
"""Make a fast join request to get multiple edge credentials from the coordinator.

Args:
call: The call to join
user_id: The user ID to join the call with
create: Whether to create the call if it doesn't exist
data: Additional call data if creating
ring: Whether to ring other users
notify: Whether to notify other users
video: Whether to enable video
location: The preferred location

Returns:
A response containing the call information and an array of credentials for multiple edges
"""
# Create a token for this user
token = call.client.stream.create_token(user_id=user_id)

# Create a new client with this token
client = call.client.stream.__class__(
api_key=call.client.stream.api_key,
api_secret=call.client.stream.api_secret,
base_url=call.client.stream.base_url,
)

# Set up authentication
client.token = token
client.headers["Authorization"] = token
client.client.headers["Authorization"] = token

# Prepare path parameters for the request
path_params = {
"type": call.call_type,
"id": call.id,
}

# Build the request body
json_body = build_body_dict(
location=location or "FRA", # Default to Frankfurt if not specified
create=create,
notify=notify,
ring=ring,
video=video,
data=data,
)

# Make the POST request to fast join the call
return await client.post(
"/api/v2/video/call/{type}/{id}/fast_join",
FastJoinCallResponse,
path_params=path_params,
json=json_body,
)
Comment on lines +84 to +146
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Significant code duplication with join_call_coordinator_request.

The fast_join_call_coordinator_request function duplicates almost all logic from join_call_coordinator_request (lines 19-81), differing only in the endpoint path and return type.

Consider refactoring to a shared internal function:

async def _call_coordinator_request(
    call: Call,
    user_id: str,
    endpoint: str,
    response_type: type,
    create: bool = False,
    data: Optional[CallRequest] = None,
    ring: Optional[bool] = None,
    notify: Optional[bool] = None,
    video: Optional[bool] = None,
    location: Optional[str] = None,
):
    """Internal function to make coordinator requests."""
    token = call.client.stream.create_token(user_id=user_id)
    client = call.client.stream.__class__(
        api_key=call.client.stream.api_key,
        api_secret=call.client.stream.api_secret,
        base_url=call.client.stream.base_url,
    )
    client.token = token
    client.headers["Authorization"] = token
    client.client.headers["Authorization"] = token
    
    path_params = {"type": call.call_type, "id": call.id}
    json_body = build_body_dict(
        location=location or "FRA",
        create=create,
        notify=notify,
        ring=ring,
        video=video,
        data=data,
    )
    
    return await client.post(
        f"/api/v2/video/call/{{type}}/{{id}}/{endpoint}",
        response_type,
        path_params=path_params,
        json=json_body,
    )

async def join_call_coordinator_request(...) -> StreamResponse[JoinCallResponse]:
    return await _call_coordinator_request(call, user_id, "join", JoinCallResponse, ...)

async def fast_join_call_coordinator_request(...) -> StreamResponse[FastJoinCallResponse]:
    return await _call_coordinator_request(call, user_id, "fast_join", FastJoinCallResponse, ...)
🤖 Prompt for AI Agents
In getstream/video/rtc/coordinator_api.py around lines 84 to 146,
fast_join_call_coordinator_request duplicates almost all logic from
join_call_coordinator_request (lines ~19-81); extract the shared logic into a
single internal helper (e.g., _call_coordinator_request) that accepts
parameters: call, user_id, endpoint (string), response_type (type), and the
existing optional flags (create, data, ring, notify, video, location); move
token creation, client construction and auth header setup, path_params and
json_body construction into that helper and have join_call_coordinator_request
and fast_join_call_coordinator_request simply call the helper with
endpoint="join"/"fast_join" and
response_type=JoinCallResponse/FastJoinCallResponse, preserving default location
fallback and await/return semantics.

9 changes: 9 additions & 0 deletions getstream/video/rtc/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,12 @@ class JoinCallResponse(DataClassJsonMixin):
credentials: Credentials = dc_field(metadata=dc_config(field_name="credentials"))
stats_options: dict = dc_field(metadata=dc_config(field_name="stats_options"))
duration: str = dc_field(metadata=dc_config(field_name="duration"))


@dataclass
class FastJoinCallResponse(DataClassJsonMixin):
call: CallResponse = dc_field(metadata=dc_config(field_name="call"))
members: List[MemberResponse] = dc_field(metadata=dc_config(field_name="members"))
credentials: List[Credentials] = dc_field(metadata=dc_config(field_name="credentials"))
stats_options: dict = dc_field(metadata=dc_config(field_name="stats_options"))
duration: str = dc_field(metadata=dc_config(field_name="duration"))