diff --git a/README.md b/README.md index 2a09aac241..fb289ae28d 100644 --- a/README.md +++ b/README.md @@ -61,6 +61,14 @@ Documentation on the framework and how to use it can be found [here](https://doc - entrypoint: The starting point for an interactive session, similar to a request handler in a web server. - Worker: The main process that coordinates job scheduling and launches agents for user sessions. +**Backchannel Handling**: The voice agent backchannel logic has been refactored into a dedicated handler to improve clarity and maintain behavior. + +- **What changed**: Backchannel detection and state are now implemented in `livekit.agents.voice.backchannel.BackchannelHandler`. +- **Why**: Extracting the logic isolates short-utterance detection (e.g., "okay", "yeah") so these affirmations don't trigger full turn commits or LLM replies and so resume/interrupt timers are easier to reason about. +- **Exit commands**: Built-in support for exit command detection (e.g., "exit", "quit", "goodbye", "bye", "stop", "end") within the backchannel handler for easier identification of session-termination intents. +- **Behavior notes**: The refactor preserves existing behavior: fast detection of backchannels, a small false-interruption timer to avoid spurious interrupts, and skipping backchannel transcripts from the session transcript buffer. + + ## Usage ### Simple voice agent diff --git a/livekit-agents/livekit/agents/ipc/supervised_proc.py b/livekit-agents/livekit/agents/ipc/supervised_proc.py index b1d192fbaa..a8a3b83693 100644 --- a/livekit-agents/livekit/agents/ipc/supervised_proc.py +++ b/livekit-agents/livekit/agents/ipc/supervised_proc.py @@ -39,11 +39,18 @@ def _mask_ctrl_c() -> Generator[None, None, None]: finally: signal.pthread_sigmask(signal.SIG_UNBLOCK, [signal.SIGINT]) else: - old = signal.signal(signal.SIGINT, signal.SIG_IGN) - try: + # On Windows, signal.signal() only works in the main thread + # Check if we're in the main thread before trying to modify signals + import threading + if threading.current_thread() is threading.main_thread(): + old = signal.signal(signal.SIGINT, signal.SIG_IGN) + try: + yield + finally: + signal.signal(signal.SIGINT, old) + else: + # Not in main thread - just yield without signal masking yield - finally: - signal.signal(signal.SIGINT, old) @dataclass diff --git a/livekit-agents/livekit/agents/telemetry/traces.py b/livekit-agents/livekit/agents/telemetry/traces.py index 09b82363e2..e26d92ca1f 100644 --- a/livekit-agents/livekit/agents/telemetry/traces.py +++ b/livekit-agents/livekit/agents/telemetry/traces.py @@ -16,12 +16,22 @@ from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter from opentelemetry.sdk._logs import ( - LogData, LoggerProvider, LoggingHandler, - LogRecord, LogRecordProcessor, + ReadableLogRecord, ) +# Compatibility aliases for older API names +LogRecord = ReadableLogRecord +try: + from opentelemetry.sdk._logs import LogData +except ImportError: + # LogData removed in newer versions - create a simple wrapper + from dataclasses import dataclass + @dataclass + class LogData: + log_record: ReadableLogRecord + instrumentation_scope: object = None from opentelemetry.sdk._logs.export import BatchLogRecordProcessor from opentelemetry.sdk.resources import SERVICE_NAME, Resource from opentelemetry.sdk.trace import SpanProcessor, TracerProvider diff --git a/livekit-agents/livekit/agents/voice/agent_activity.py b/livekit-agents/livekit/agents/voice/agent_activity.py index 0c3f7c743d..985bc47cfb 100644 --- a/livekit-agents/livekit/agents/voice/agent_activity.py +++ b/livekit-agents/livekit/agents/voice/agent_activity.py @@ -53,6 +53,7 @@ _EndOfTurnInfo, _PreemptiveGenerationInfo, ) +from .backchannel import BackchannelHandler, BACKCHANNEL_WORDS from .events import ( AgentFalseInterruptionEvent, ErrorEvent, @@ -125,7 +126,7 @@ def __init__(self, agent: Agent, sess: AgentSession) -> None: self._paused_speech: SpeechHandle | None = None self._false_interruption_timer: asyncio.TimerHandle | None = None self._interrupt_paused_speech_task: asyncio.Task[None] | None = None - + self._backchannel = BackchannelHandler() # fired when a speech_task finishes or when a new speech_handle is scheduled # this is used to wake up the main task when the scheduling state changes self._q_updated = asyncio.Event() @@ -176,6 +177,8 @@ def _validate_turn_detection( stt_model = self.stt llm_model = self.llm + logger.debug("what is current mode %s",mode) + if mode == "vad" and not vad_model: logger.warning("turn_detection is set to 'vad', but no VAD model is provided") mode = None @@ -644,7 +647,7 @@ async def _resume_scheduling_task(self) -> None: async def resume(self) -> None: # `resume` must only be called by AgentSession - + logger.debug("trying to resume") async with self._lock: span = tracer.start_span( "resume_agent_activity", @@ -1131,7 +1134,7 @@ def _on_input_audio_transcription_completed(self, ev: llm.InputTranscriptionComp self._session._user_input_transcribed( UserInputTranscribedEvent(transcript=ev.transcript, is_final=ev.is_final) ) - + logger.debug("input audio - %s",ev.transcript) if ev.is_final: # TODO: for realtime models, the created_at field is off. it should be set to when the user started speaking. # but we don't have that information here. @@ -1167,11 +1170,13 @@ def _on_generation_created(self, ev: llm.GenerationCreatedEvent) -> None: self._schedule_speech(handle, SpeechHandle.SPEECH_PRIORITY_NORMAL) def _interrupt_by_audio_activity(self) -> None: + """Called when VAD detects user speech during agent speaking. + + With backchannel detection: PAUSE audio (keep generating), wait for STT. + """ opt = self._session.options - use_pause = opt.resume_false_interruption and opt.false_interruption_timeout is not None if isinstance(self.llm, llm.RealtimeModel) and self.llm.capabilities.turn_detection: - # ignore if realtime model has turn detection enabled return if ( @@ -1180,34 +1185,38 @@ def _interrupt_by_audio_activity(self) -> None: and self._audio_recognition is not None ): text = self._audio_recognition.current_transcript - - # TODO(long): better word splitting for multi-language - if len(split_words(text, split_character=True)) < opt.min_interruption_words: + word_count = len(split_words(text, split_character=True)) + if word_count < opt.min_interruption_words: return if self._rt_session is not None: self._rt_session.start_user_activity() + # PAUSE audio output but keep generating - we'll decide when STT returns if ( self._current_speech is not None and not self._current_speech.interrupted and self._current_speech.allow_interruptions ): - self._paused_speech = self._current_speech - - # reset the false interruption timer - if self._false_interruption_timer: - self._false_interruption_timer.cancel() - self._false_interruption_timer = None - - if use_pause and self._session.output.audio and self._session.output.audio.can_pause: - self._session.output.audio.pause() - self._session._update_agent_state("listening") - else: - if self._rt_session is not None: - self._rt_session.interrupt() - - self._current_speech.interrupt() + # Only pause if we haven't already + if self._paused_speech is None: + self._paused_speech = self._current_speech + self._backchannel.reset_resume_flag() + + # Pause audio if supported (keeps buffering in background) + audio_output = self._session.output.audio + if audio_output and audio_output.can_pause: + audio_output.pause() + self._session._update_agent_state("listening") + logger.info( + "[BACKCHANNEL] Audio PAUSED - waiting for STT (generation continues)", + extra={"speech_id": id(self._current_speech)}, + ) + else: + logger.debug( + "[BACKCHANNEL] Cannot pause audio - waiting for STT anyway", + extra={"speech_id": id(self._current_speech)}, + ) # region recognition hooks @@ -1216,13 +1225,23 @@ def on_start_of_speech(self, ev: vad.VADEvent | None) -> None: if self._false_interruption_timer: # cancel the timer when user starts speaking but leave the paused state unchanged + logger.debug("[BACKCHANNEL] Cancelling false interruption timer - user started speaking again") self._false_interruption_timer.cancel() self._false_interruption_timer = None def on_end_of_speech(self, ev: vad.VADEvent | None) -> None: speech_end_time = time.time() if ev: - speech_end_time = speech_end_time - ev.silence_duration + speech_end_time = speech_end_time - ev.silence_duration + logger.debug( + "[BACKCHANNEL] VAD on_end_of_speech", + extra={ + "has_paused_speech": self._paused_speech is not None, + "speech_duration": ev.speech_duration if ev else None, + "silence_duration": ev.silence_duration if ev else None, + }, + ) + self._session._update_user_state( "listening", last_speaking_time=speech_end_time, @@ -1233,6 +1252,10 @@ def on_end_of_speech(self, ev: vad.VADEvent | None) -> None: and (timeout := self._session.options.false_interruption_timeout) is not None ): # schedule a resume timer when user stops speaking + logger.debug( + "[BACKCHANNEL] Starting false interruption timer", + extra={"timeout": timeout}, + ) self._start_false_interruption_timer(timeout) def on_vad_inference_done(self, ev: vad.VADEvent) -> None: @@ -1240,8 +1263,28 @@ def on_vad_inference_done(self, ev: vad.VADEvent) -> None: # ignore vad inference done event if turn_detection is manual or realtime_llm return - if ev.speech_duration >= self._session.options.min_interruption_duration: + min_duration = self._session.options.min_interruption_duration + has_current_speech = self._current_speech is not None + + if ev.speech_duration >= min_duration: + logger.info( + "[BACKCHANNEL] VAD threshold MET - triggering interrupt", + extra={ + "speech_duration": ev.speech_duration, + "min_interruption_duration": min_duration, + }, + ) self._interrupt_by_audio_activity() + else: + if has_current_speech and ev.speech_duration > 0.1: + logger.debug( + "[BACKCHANNEL] VAD threshold NOT met - speech too short to interrupt", + extra={ + "speech_duration": ev.speech_duration, + "min_interruption_duration": min_duration, + "need_more": min_duration - ev.speech_duration, + }, + ) def on_interim_transcript(self, ev: stt.SpeechEvent, *, speaking: bool | None) -> None: if isinstance(self.llm, llm.RealtimeModel) and self.llm.capabilities.user_transcription: @@ -1271,40 +1314,77 @@ def on_interim_transcript(self, ev: stt.SpeechEvent, *, speaking: bool | None) - # schedule a resume timer if interrupted after end_of_speech self._start_false_interruption_timer(timeout) + def _is_backchannel(self, text: str) -> bool: + """Check if transcript contains only backchannel/affirmative words.""" + return self._backchannel.is_backchannel(text) + def on_final_transcript(self, ev: stt.SpeechEvent, *, speaking: bool | None = None) -> None: if isinstance(self.llm, llm.RealtimeModel) and self.llm.capabilities.user_transcription: # skip stt transcription if user_transcription is enabled on the realtime model return + transcript_text = ev.alternatives[0].text + is_bc = self._is_backchannel(transcript_text) + agent_is_speaking = ( + self._current_speech is not None + and not self._current_speech.interrupted + and not self._current_speech.done() + ) + + logger.info( + "[BACKCHANNEL] STT received", + extra={ + "transcript": transcript_text, + "is_backchannel": is_bc, + "paused": self._paused_speech is not None, + "speaking": agent_is_speaking, + }, + ) + + if is_bc: + if self._paused_speech is not None: + # Backchannel while paused: resume and return immediately + logger.info("[BACKCHANNEL] Resuming (paused)", extra={"transcript": transcript_text}) + self._resume_paused_speech(reason="backchannel") + self._backchannel.last_skipped_backchannel_transcript = transcript_text + return + elif agent_is_speaking: + # Backchannel while speaking: ignore and return immediately + logger.debug("[BACKCHANNEL] Ignoring (speaking)", extra={"transcript": transcript_text}) + self._backchannel.last_skipped_backchannel_transcript = transcript_text + return + # Backchannel with agent not speaking - still skip + self._backchannel.last_skipped_backchannel_transcript = transcript_text + return + + # Meaningful input (not backchannel) - interrupt immediately + self._backchannel.last_skipped_backchannel_transcript = None + + if self._paused_speech is not None: + # Interrupt paused speech + logger.info("[BACKCHANNEL] Interrupting (paused)", extra={"transcript": transcript_text}) + self._user_turn_completed_atask = self._create_speech_task( + self._interrupt_paused_speech(self._interrupt_paused_speech_task), + name="AgentActivity._interrupt_paused_speech", + ) + elif agent_is_speaking: + # Interrupt active speech immediately + logger.info("[BACKCHANNEL] Interrupting (active)", extra={"transcript": transcript_text}) + # Interrupt in priority order for speed + if self._rt_session is not None: + self._rt_session.interrupt() + if self._current_speech and not self._current_speech.interrupted: + self._current_speech.interrupt() + + # Emit user input transcribed event for non-backchannel inputs or meaningful interruptions self._session._user_input_transcribed( UserInputTranscribedEvent( language=ev.alternatives[0].language, - transcript=ev.alternatives[0].text, + transcript=transcript_text, is_final=True, speaker_id=ev.alternatives[0].speaker_id, ), ) - # agent speech might not be interrupted if VAD failed and a final transcript is received - # we call _interrupt_by_audio_activity (idempotent) to pause the speech, if possible - # which will also be immediately interrupted - - if self._audio_recognition and self._turn_detection not in ( - "manual", - "realtime_llm", - ): - self._interrupt_by_audio_activity() - - if ( - speaking is False - and self._paused_speech - and (timeout := self._session.options.false_interruption_timeout) is not None - ): - # schedule a resume timer if interrupted after end_of_speech - self._start_false_interruption_timer(timeout) - - self._interrupt_paused_speech_task = asyncio.create_task( - self._interrupt_paused_speech(old_task=self._interrupt_paused_speech_task) - ) def on_preemptive_generation(self, info: _PreemptiveGenerationInfo) -> None: if ( @@ -1557,6 +1637,10 @@ async def _user_turn_completed_task( ) self._session.emit("metrics_collected", MetricsCollectedEvent(metrics=eou_metrics)) + def should_skip_transcript(self, transcript: str) -> bool: + """Check if this transcript should be skipped.""" + return self._backchannel.should_skip_transcript(transcript) + # AudioRecognition is calling this method to retrieve the chat context before running the TurnDetector model # noqa: E501 def retrieve_chat_ctx(self) -> llm.ChatContext: return self._agent.chat_ctx @@ -2541,36 +2625,59 @@ def _start_false_interruption_timer(self, timeout: float) -> None: self._false_interruption_timer.cancel() def _on_false_interruption() -> None: - if self._paused_speech is None or ( - self._current_speech and self._current_speech is not self._paused_speech - ): - # already new speech is scheduled, do nothing - self._paused_speech = None - return + self._resume_paused_speech(reason="false_interruption_timer") - resumed = False - if ( - self._session.options.resume_false_interruption - and (audio_output := self._session.output.audio) - and audio_output.can_pause - and not self._paused_speech.done() - ): - self._session._update_agent_state("speaking") - audio_output.resume() - resumed = True - logger.debug("resumed false interrupted speech", extra={"timeout": timeout}) + self._false_interruption_timer = self._session._loop.call_later( + timeout, _on_false_interruption + ) - self._session.emit( - "agent_false_interruption", AgentFalseInterruptionEvent(resumed=resumed) + def _resume_paused_speech(self, *, reason: str) -> bool: + # Guard against race condition: if resume was already called by timer or STT, + # return False immediately + if self._backchannel.is_resume_already_called(): + logger.debug( + "resume already called, ignoring duplicate call", + extra={"reason": reason}, ) + return False + + self._backchannel.mark_resume_called() + + if self._false_interruption_timer is not None: + self._false_interruption_timer.cancel() + self._false_interruption_timer = None + if self._paused_speech is None or ( + self._current_speech and self._current_speech is not self._paused_speech + ): self._paused_speech = None - self._false_interruption_timer = None + return False - self._false_interruption_timer = self._session._loop.call_later( - timeout, _on_false_interruption + resumed = False + if ( + self._session.options.resume_false_interruption + and (audio_output := self._session.output.audio) + and audio_output.can_pause + and not self._paused_speech.done() + ): + self._session._update_agent_state("speaking") + audio_output.resume() + logger.debug( + "resumed paused speech", + extra={ + "speech_id": self._paused_speech.id, + "reason": reason, + }, + ) + resumed = True + + self._session.emit( + "agent_false_interruption", AgentFalseInterruptionEvent(resumed=resumed) ) + self._paused_speech = None + return resumed + async def _interrupt_paused_speech(self, old_task: asyncio.Task[None] | None = None) -> None: if old_task is not None: await old_task diff --git a/livekit-agents/livekit/agents/voice/agent_session.py b/livekit-agents/livekit/agents/voice/agent_session.py index 628718a6b2..1f7cbf5537 100644 --- a/livekit-agents/livekit/agents/voice/agent_session.py +++ b/livekit-agents/livekit/agents/voice/agent_session.py @@ -89,6 +89,7 @@ class AgentSessionOptions: preemptive_generation: bool tts_text_transforms: Sequence[TextTransforms] | None ivr_detection: bool + interruption_resume_words: Sequence[str] | None Userdata_T = TypeVar("Userdata_T") @@ -159,6 +160,7 @@ def __init__( tts_text_transforms: NotGivenOr[Sequence[TextTransforms] | None] = NOT_GIVEN, preemptive_generation: bool = False, ivr_detection: bool = False, + interruption_resume_words: NotGivenOr[Sequence[str] | None] = NOT_GIVEN, conn_options: NotGivenOr[SessionConnectOptions] = NOT_GIVEN, loop: asyncio.AbstractEventLoop | None = None, # deprecated @@ -259,6 +261,17 @@ def __init__( ) false_interruption_timeout = agent_false_interruption_timeout + # Validate false_interruption_timeout + if is_given(false_interruption_timeout) and false_interruption_timeout is not None: + if false_interruption_timeout <= 0: + raise ValueError( + f"false_interruption_timeout must be positive, got {false_interruption_timeout}" + ) + logger.debug( + "false_interruption_timeout is set to %.1f seconds", + false_interruption_timeout, + ) + if not is_given(video_sampler): video_sampler = VoiceActivityVideoSampler(speaking_fps=1.0, silent_fps=0.3) @@ -285,6 +298,9 @@ def __init__( ), preemptive_generation=preemptive_generation, ivr_detection=ivr_detection, + interruption_resume_words=( + interruption_resume_words if is_given(interruption_resume_words) else None + ), use_tts_aligned_transcript=use_tts_aligned_transcript if is_given(use_tts_aligned_transcript) else None, @@ -877,6 +893,7 @@ def say( raise RuntimeError("AgentSession is closing, cannot use say()") # attach to the session span if called outside of the AgentSession + logger.debug("trying to say - %s",text) use_span: AbstractContextManager[trace.Span | None] = nullcontext() if trace.get_current_span() is trace.INVALID_SPAN and self._session_span is not None: use_span = trace.use_span(self._session_span, end_on_exit=False) diff --git a/livekit-agents/livekit/agents/voice/audio_recognition.py b/livekit-agents/livekit/agents/voice/audio_recognition.py index 741bd8ed2c..7d543f23fe 100644 --- a/livekit-agents/livekit/agents/voice/audio_recognition.py +++ b/livekit-agents/livekit/agents/voice/audio_recognition.py @@ -88,6 +88,7 @@ def on_interim_transcript(self, ev: stt.SpeechEvent, *, speaking: bool | None) - def on_final_transcript(self, ev: stt.SpeechEvent, *, speaking: bool | None = None) -> None: ... def on_end_of_turn(self, info: _EndOfTurnInfo) -> bool: ... def on_preemptive_generation(self, info: _PreemptiveGenerationInfo) -> None: ... + def should_skip_transcript(self, transcript: str) -> bool: ... def retrieve_chat_ctx(self) -> llm.ChatContext: ... @@ -365,6 +366,14 @@ async def _on_stt_event(self, ev: stt.SpeechEvent) -> None: extra["transcript_delay"] = time.time() - self._last_speaking_time logger.debug("received user transcript", extra=extra) + # Check if this transcript should be skipped + if self._hooks.should_skip_transcript(transcript): + logger.debug( + "skipping transcript accumulation (FillerWords detected)", + extra={"transcript": transcript}, + ) + return + self._last_final_transcript_time = time.time() self._audio_transcript += f" {transcript}" self._audio_transcript = self._audio_transcript.lstrip() diff --git a/livekit-agents/livekit/agents/voice/backchannel.py b/livekit-agents/livekit/agents/voice/backchannel.py new file mode 100644 index 0000000000..49554ef324 --- /dev/null +++ b/livekit-agents/livekit/agents/voice/backchannel.py @@ -0,0 +1,122 @@ +"""Backchannel handling for voice agents. + +Backchannels are short affirmative responses (like "yeah", "okay", "uh-huh") that +allow users to acknowledge agent speech without interrupting the agent's output. +""" + +from __future__ import annotations + +import asyncio +import re +from typing import TYPE_CHECKING + +from ..log import logger + +if TYPE_CHECKING: + from livekit import rtc + + +# Backchannel words - short affirmative responses that shouldn't interrupt agent speech +BACKCHANNEL_WORDS: frozenset[str] = frozenset({ + "yeah", "yep", "yes", "okay", "ok", "uh-huh", "uh huh", "uhuh", + "mm-hmm", "mmhmm", "mm hmm", "mhm", "right", "sure", "got it", + "i see", "go on", "continue", "alright", "all right", "aha", +}) + +# Exit command words - commands that always trigger session exit +EXIT_COMMAND_WORDS: frozenset[str] = frozenset({ + "exit", "quit", "goodbye", "bye", "bye-bye", "byebye", "stop", "end", + "terminate", "close", "hang up", "hangup", "see you", "take care", +}) + + +class BackchannelHandler: + """Handles backchannel detection and audio pause/resume logic.""" + + def __init__(self) -> None: + self.paused_speech: any = None # SpeechHandle + self.false_interruption_timer: asyncio.TimerHandle | None = None + self.interrupt_paused_speech_task: asyncio.Task[None] | None = None + self.resume_paused_called = False + self.last_skipped_backchannel_transcript: str | None = None + self.Exit_Command_Words = EXIT_COMMAND_WORDS + self.Back_Channel_Words = BACKCHANNEL_WORDS + + def is_backchannel(self, text: str) -> bool: + """Check if transcript contains only backchannel/affirmative words or exit commands.""" + # Fast normalize: remove punctuation and convert to lowercase + normalized = re.sub(r"[^\w\s-]", "", text.lower()).strip() + + if not normalized: + return False + + # Check exit commands FIRST (highest priority) + if normalized in self.Exit_Command_Words: + logger.info("[BACKCHANNEL] Exit command detected", extra={"transcript": text}) + return True + + # Then check backchannel words (lower priority) + if normalized in self.Back_Channel_Words: + logger.info("[BACKCHANNEL] Detected", extra={"transcript": text}) + return True + + # Fast space variation check - only compute if needed + normalized_no_space = normalized.replace(" ", "") + if len(normalized_no_space) < 2: # Skip very short strings + logger.debug("[BACKCHANNEL] Not matched", extra={"transcript": text}) + return False + + # Check exit commands with space variations + for exit_word in self.Exit_Command_Words: + if normalized_no_space == exit_word.replace(" ", ""): + logger.info("[BACKCHANNEL] Exit command detected", extra={"transcript": text}) + return True + + # Check backchannel words with space variations + for bc_word in self.Back_Channel_Words: + if normalized_no_space == bc_word.replace(" ", ""): + logger.info("[BACKCHANNEL] Detected", extra={"transcript": text}) + return True + + logger.debug("[BACKCHANNEL] Not matched", extra={"transcript": text}) + return False + + def should_skip_transcript(self, transcript: str) -> bool: + """Check if this transcript should be skipped (e.g., backchannel that was resumed).""" + # Fast exit if no backchannel was skipped + if self.last_skipped_backchannel_transcript is None: + return False + # Fast comparison of normalized strings + return ( + transcript.lower().strip() + == self.last_skipped_backchannel_transcript.lower().strip() + ) + + def start_false_interruption_timer( + self, timeout: float, on_timeout_callback: callable + ) -> None: + """Start a timer to resume paused audio if no user input arrives.""" + if self.false_interruption_timer is not None: + self.false_interruption_timer.cancel() + + self.false_interruption_timer = asyncio.get_event_loop().call_later( + timeout, on_timeout_callback + ) + + def cancel_false_interruption_timer(self) -> None: + """Cancel the false interruption timer.""" + if self.false_interruption_timer is not None: + self.false_interruption_timer.cancel() + self.false_interruption_timer = None + + def reset_resume_flag(self) -> None: + """Reset the resume called flag for a new pause cycle.""" + self.resume_paused_called = False + + def mark_resume_called(self) -> None: + """Mark that resume has been called to prevent duplicate calls.""" + self.resume_paused_called = True + + def is_resume_already_called(self) -> bool: + """Check if resume was already called in this cycle.""" + return self.resume_paused_called