Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ def test_multi_language(extension_name: str, config_dir: str) -> None:
},
{
"name": "Chinese",
"audio_file": "16k_zh_cn_hotwords.pcm",
"audio_file": "16k_zh_cn.pcm",
"config_file": MULTI_LANGUAGE_CONFIG_FILE_ZH,
"expected_language": MULTI_LANGUAGE_EXPECTED_LANGUAGE_ZH,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def __init__(self, name: str):
self.sent_user_audio_duration_ms_before_last_reset: int = 0
self.last_finalize_timestamp: int = 0

# Reconnection manager with retry limits and backoff strategy
# Reconnection manager with unlimited retries and backoff strategy
self.reconnect_manager: ReconnectManager | None = None

@override
Expand Down Expand Up @@ -523,11 +523,6 @@ async def _handle_reconnect(self):
self.ten_env.log_error("ReconnectManager not initialized")
return

# Check if we can still retry
if not self.reconnect_manager.can_retry():
self.ten_env.log_warn("No more reconnection attempts allowed")
return

# Attempt a single reconnection
success = await self.reconnect_manager.handle_reconnect(
connection_func=self.start_connection,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"type": "extension",
"name": "azure_asr_python",
"version": "0.2.2",
"version": "0.2.3",
"dependencies": [
{
"type": "system",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,30 +6,31 @@

class ReconnectManager:
"""
Manages reconnection attempts with fixed retry limit and exponential backoff strategy.
Manages reconnection attempts with unlimited retries and exponential backoff strategy.

Features:
- Fixed retry limit (default: 5 attempts)
- Exponential backoff strategy: 300ms, 600ms, 1.2s, 2.4s, 4.8s
- Unlimited retry attempts (will keep retrying until successful)
- Exponential backoff strategy with maximum delay cap: 0.5s, 1s, 2s, 4s (capped)
- Maximum delay cap to prevent overwhelming the service provider (default: 4s)
- Automatic counter reset after successful connection
- Detailed logging for monitoring and debugging
"""

def __init__(
self,
max_attempts: int = 5,
base_delay: float = 0.3, # 300 milliseconds
base_delay: float = 0.5, # 500 milliseconds
max_delay: float = 4.0, # 4 seconds maximum delay
logger=None,
):
self.max_attempts = max_attempts
self.base_delay = base_delay
self.max_delay = max_delay
self.logger = logger

# State tracking
self.attempts = 0
self._connection_successful = False

def reset_counter(self):
def _reset_counter(self):
"""Reset reconnection counter"""
self.attempts = 0
if self.logger:
Expand All @@ -38,18 +39,13 @@ def reset_counter(self):
def mark_connection_successful(self):
"""Mark connection as successful and reset counter"""
self._connection_successful = True
self.reset_counter()

def can_retry(self) -> bool:
"""Check if more reconnection attempts are allowed"""
return self.attempts < self.max_attempts
self._reset_counter()

def get_attempts_info(self) -> dict:
"""Get current reconnection attempts information"""
return {
"current_attempts": self.attempts,
"max_attempts": self.max_attempts,
"can_retry": self.can_retry(),
"unlimited_retries": True,
}

async def handle_reconnect(
Expand All @@ -70,31 +66,18 @@ async def handle_reconnect(
True if connection function executed successfully, False if attempt failed
Note: Actual connection success is determined by callback calling mark_connection_successful()
"""
if not self.can_retry():
if self.logger:
self.logger.log_error(
f"Maximum reconnection attempts ({self.max_attempts}) reached. No more attempts allowed."
)
if error_handler:
await error_handler(
ModuleError(
module=MODULE_NAME_ASR,
code=ModuleErrorCode.FATAL_ERROR.value,
message=f"Failed to reconnect after {self.max_attempts} attempts",
)
)
return False

self._connection_successful = False
self.attempts += 1

# Calculate exponential backoff delay: 2^(attempts-1) * base_delay
delay = self.base_delay * (2 ** (self.attempts - 1))
# Calculate exponential backoff delay with max limit: min(2^(attempts-1) * base_delay, max_delay)
delay = min(
self.base_delay * (2 ** (self.attempts - 1)), self.max_delay
)

if self.logger:
self.logger.log_warn(
f"Attempting reconnection #{self.attempts}/{self.max_attempts} "
f"after {delay} seconds delay..."
f"Attempting reconnection #{self.attempts} "
f"after {delay:.2f} seconds delay..."
)

try:
Expand All @@ -112,18 +95,17 @@ async def handle_reconnect(
except Exception as e:
if self.logger:
self.logger.log_error(
f"Reconnection attempt #{self.attempts} failed: {e}"
f"Reconnection attempt #{self.attempts} failed: {e}. Will retry..."
)

# If this was the last attempt, send error
if self.attempts >= self.max_attempts:
if error_handler:
await error_handler(
ModuleError(
module=MODULE_NAME_ASR,
code=ModuleErrorCode.FATAL_ERROR.value,
message=f"All reconnection attempts failed. Last error: {str(e)}",
)
# Report error but don't stop retrying
if error_handler:
await error_handler(
ModuleError(
module=MODULE_NAME_ASR,
code=ModuleErrorCode.FATAL_ERROR.value,
message=f"Reconnection attempt #{self.attempts} failed: {str(e)}",
)
)

return False
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,11 @@ async def on_data(


def test_invalid_params():
property_json = {"params": {}}
property_json = {
"params": {
"key": "",
}
}

tester = AzureAsrExtensionTester()
tester.set_test_mode_single("azure_asr_python", json.dumps(property_json))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
#
# This file is part of TEN Framework, an open source project.
# Licensed under the Apache License, Version 2.0.
# See the LICENSE file for more information.
#
import threading
from types import SimpleNamespace
from typing_extensions import override
from ten_runtime import (
AsyncExtensionTester,
AsyncTenEnvTester,
Data,
TenError,
TenErrorCode,
)
import json

# We must import it, which means this test fixture will be automatically executed
from .mock import patch_azure_ws # noqa: F401


class UnlimitedReconnectTester(AsyncExtensionTester):
"""Tester for unlimited reconnect strategy"""

def __init__(self, max_failures_before_success: int):
super().__init__()
self.recv_error_count = 0
self.max_failures_before_success = max_failures_before_success

@override
async def on_start(self, ten_env_tester: AsyncTenEnvTester) -> None:
pass

def stop_test_if_checking_failed(
self,
ten_env_tester: AsyncTenEnvTester,
success: bool,
error_message: str,
) -> None:
if not success:
err = TenError.create(
error_code=TenErrorCode.ErrorCodeGeneric,
error_message=error_message,
)
ten_env_tester.stop_test(err)

@override
async def on_data(
self, ten_env_tester: AsyncTenEnvTester, data: Data
) -> None:
data_name = data.get_name()
if data_name == "error":
self.recv_error_count += 1
elif data_name == "asr_result":
# Verify that we received expected number of errors (one per failed attempt)
self.stop_test_if_checking_failed(
ten_env_tester,
self.recv_error_count == self.max_failures_before_success,
f"recv_error_count is not {self.max_failures_before_success}: {self.recv_error_count}",
)
ten_env_tester.stop_test()


# Test that reconnection continues beyond the old 5-attempt limit
# This test simulates 8 failures before success, which would have failed with the old limit
def test_unlimited_reconnect_beyond_old_limit(patch_azure_ws):
"""Test that reconnection continues beyond the old 5-attempt limit (8 failures)"""
start_connection_attempts = 0
max_failures = 8 # More than the old limit of 5

def fake_start_continuous_recognition():
def triggerRecognized():
evt = SimpleNamespace(
result=SimpleNamespace(
text="finally connected",
offset=0,
duration=5000000,
no_match_details=None,
json=json.dumps(
{
"DisplayText": "finally connected",
"Offset": 0,
"Duration": 5000000,
}
),
)
)
patch_azure_ws.event_handlers["recognized"](evt)

def triggerConnected():
event = SimpleNamespace()
patch_azure_ws.event_handlers["connected"](event)
threading.Timer(0.2, triggerRecognized).start()

def triggerWillFailSessionStarted():
event = SimpleNamespace(session_id="123")
patch_azure_ws.event_handlers["session_started"](event)
threading.Timer(0.5, triggerCanceled).start()

def triggerWillSuccessSessionStarted():
event = SimpleNamespace(session_id="123")
patch_azure_ws.event_handlers["session_started"](event)
threading.Timer(0.2, triggerConnected).start()

def triggerSessionStopped():
event = SimpleNamespace(session_id="123")
patch_azure_ws.event_handlers["session_stopped"](event)

def triggerCanceled():
evt = SimpleNamespace(
cancellation_details=SimpleNamespace(
code=123,
reason=1,
error_details=f"mock error details for attempt {start_connection_attempts}",
)
)
patch_azure_ws.event_handlers["canceled"](evt)
threading.Timer(0.1, triggerSessionStopped).start()

nonlocal start_connection_attempts
start_connection_attempts += 1

if start_connection_attempts <= max_failures:
# Simulate failures
threading.Timer(0.5, triggerWillFailSessionStarted).start()
else:
# Finally succeed
threading.Timer(0.2, triggerWillSuccessSessionStarted).start()

return None

def fake_stop_continuous_recognition():
return None

# Inject into recognizer
patch_azure_ws.recognizer_instance.start_continuous_recognition.side_effect = (
fake_start_continuous_recognition
)

patch_azure_ws.recognizer_instance.stop_continuous_recognition.side_effect = (
fake_stop_continuous_recognition
)

property_json = {
"params": {
"key": "fake_key",
"region": "fake_region",
}
}

tester = UnlimitedReconnectTester(max_failures_before_success=max_failures)
tester.set_test_mode_single("azure_asr_python", json.dumps(property_json))
err = tester.run()
assert (
err is None
), f"test_unlimited_reconnect_beyond_old_limit err code: {err.error_code()} message: {err.error_message()}"
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ async def on_init(self, ten_env: AsyncTenEnv) -> None:
listener=self,
log_level=self.config.params.log_level,
log_path=log_path,
reconnect_max_retries=0, # 0 means infinite reconnection
reconnect_delay=0.5, # Initial reconnection delay in seconds (exponential backoff: 0.5s, 1s, 2s, 4s, 4s...)
reconnect_max_delay=4, # Maximum reconnection delay in seconds
)
self.ten_env.log_info(
"vendor_status_changed: Tencent ASR client started",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"type": "extension",
"name": "tencent_asr_python",
"version": "0.2.9",
"version": "0.2.10",
"display_name": {
"locales": {
"en-US": {
Expand Down
Loading