diff --git a/src/elevenlabs/conversational_ai/default_audio_interface.py b/src/elevenlabs/conversational_ai/default_audio_interface.py index e3bd9ad8..ef8adf17 100644 --- a/src/elevenlabs/conversational_ai/default_audio_interface.py +++ b/src/elevenlabs/conversational_ai/default_audio_interface.py @@ -16,15 +16,27 @@ def __init__(self): except ImportError: raise ImportError("To use DefaultAudioInterface you must install pyaudio.") self.pyaudio = pyaudio + self.should_stop = threading.Event() + self.output_thread = None + self.output_queue = None + self.in_stream = None + self.out_stream = None + self.p = None + self.input_callback = None + self._started = False def start(self, input_callback: Callable[[bytes], None]): + if self._started: + # If already started, stop first to avoid resource leaks + self.stop() + # Audio input is using callbacks from pyaudio which we simply pass through. self.input_callback = input_callback # Audio output is buffered so we can handle interruptions. # Start a separate thread to handle writing to the output stream. - self.output_queue: queue.Queue[bytes] = queue.Queue() - self.should_stop = threading.Event() + self.output_queue = queue.Queue() + self.should_stop.clear() # Reset the event in case start is called multiple times self.output_thread = threading.Thread(target=self._output_thread) self.p = self.pyaudio.PyAudio() @@ -47,28 +59,50 @@ def start(self, input_callback: Callable[[bytes], None]): ) self.output_thread.start() + self._started = True def stop(self): + if not self._started: + return # Nothing to stop + self.should_stop.set() - self.output_thread.join() - self.in_stream.stop_stream() - self.in_stream.close() - self.out_stream.close() - self.p.terminate() + + if self.output_thread and self.output_thread.is_alive(): + self.output_thread.join() + + if self.in_stream: + self.in_stream.stop_stream() + self.in_stream.close() + self.in_stream = None + + if self.out_stream: + self.out_stream.close() + self.out_stream = None + + if self.p: + self.p.terminate() + self.p = None + + self.output_thread = None + self.output_queue = None + self.input_callback = None + self._started = False def output(self, audio: bytes): - self.output_queue.put(audio) + if self.output_queue: + self.output_queue.put(audio) def interrupt(self): # Clear the output queue to stop any audio that is currently playing. # Note: We can't atomically clear the whole queue, but we are doing # it from the message handling thread so no new audio will be added # while we are clearing. - try: - while True: - _ = self.output_queue.get(block=False) - except queue.Empty: - pass + if self.output_queue: + try: + while True: + _ = self.output_queue.get(block=False) + except queue.Empty: + pass def _output_thread(self): while not self.should_stop.is_set(): @@ -94,15 +128,27 @@ def __init__(self): except ImportError: raise ImportError("To use AsyncDefaultAudioInterface you must install pyaudio.") self.pyaudio = pyaudio + self.should_stop = asyncio.Event() + self.output_task = None + self.output_queue = None + self.in_stream = None + self.out_stream = None + self.p = None + self.input_callback = None + self._started = False async def start(self, input_callback: Callable[[bytes], Awaitable[None]]): + if self._started: + # If already started, stop first to avoid resource leaks + await self.stop() + # Audio input is using callbacks from pyaudio which we adapt to async self.input_callback = input_callback # Audio output is buffered so we can handle interruptions. # Start a separate task to handle writing to the output stream. - self.output_queue: asyncio.Queue[bytes] = asyncio.Queue() - self.should_stop = asyncio.Event() + self.output_queue = asyncio.Queue() + self.should_stop.clear() # Reset the event in case start is called multiple times self.p = self.pyaudio.PyAudio() self.in_stream = self.p.open( @@ -125,33 +171,55 @@ async def start(self, input_callback: Callable[[bytes], Awaitable[None]]): # Start the output task self.output_task = asyncio.create_task(self._output_task()) + self._started = True async def stop(self): + if not self._started: + return # Nothing to stop + self.should_stop.set() - await self.output_task - self.in_stream.stop_stream() - self.in_stream.close() - self.out_stream.close() - self.p.terminate() + + if self.output_task and not self.output_task.done(): + await self.output_task + + if self.in_stream: + self.in_stream.stop_stream() + self.in_stream.close() + self.in_stream = None + + if self.out_stream: + self.out_stream.close() + self.out_stream = None + + if self.p: + self.p.terminate() + self.p = None + + self.output_task = None + self.output_queue = None + self.input_callback = None + self._started = False async def output(self, audio: bytes): - await self.output_queue.put(audio) + if self.output_queue: + await self.output_queue.put(audio) async def interrupt(self): # Clear the output queue to stop any audio that is currently playing. - try: - while True: - try: - _ = self.output_queue.get_nowait() - except asyncio.QueueEmpty: - break - except AttributeError: - # In Python 3.8, it's asyncio.QueueEmpty, in 3.10+ it's asyncio.QueueEmpty - while not self.output_queue.empty(): - try: - _ = self.output_queue.get_nowait() - except: - break + if self.output_queue: + try: + while True: + try: + _ = self.output_queue.get_nowait() + except asyncio.QueueEmpty: + break + except AttributeError: + # In Python 3.8, it's asyncio.QueueEmpty, in 3.10+ it's asyncio.QueueEmpty + while not self.output_queue.empty(): + try: + _ = self.output_queue.get_nowait() + except: + break async def _output_task(self): while not self.should_stop.is_set():