diff --git a/examples/connexity-metrics/.gitignore b/examples/connexity-metrics/.gitignore new file mode 100644 index 000000000..50d9d205e --- /dev/null +++ b/examples/connexity-metrics/.gitignore @@ -0,0 +1,161 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class +recordings/ +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# poetry +# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control +#poetry.lock + +# pdm +# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. +#pdm.lock +# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it +# in version control. +# https://pdm.fming.dev/#use-with-ide +.pdm.toml + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# PyCharm +# JetBrains specific template is maintained in a separate JetBrains.gitignore that can +# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore +# and can be added to the global gitignore or merged into this file. For a more nuclear +# option (not recommended) you can uncomment the following to ignore the entire idea folder. +#.idea/ +runpod.toml diff --git a/examples/connexity-metrics/Dockerfile b/examples/connexity-metrics/Dockerfile new file mode 100644 index 000000000..1797147fc --- /dev/null +++ b/examples/connexity-metrics/Dockerfile @@ -0,0 +1,10 @@ +FROM python:3.10-bullseye +RUN mkdir /app +COPY *.py /app/ +COPY requirements.txt /app/ +WORKDIR /app +RUN pip3 install -r requirements.txt + +EXPOSE 8765 + +CMD ["python3", "server.py"] diff --git a/examples/connexity-metrics/README.md b/examples/connexity-metrics/README.md new file mode 100644 index 000000000..1a54cdcc9 --- /dev/null +++ b/examples/connexity-metrics/README.md @@ -0,0 +1,64 @@ +# Chatbot with Connexity-metrics + +This project implements a chatbot using a pipeline architecture that integrates audio processing, transcription, and a language model for conversational interactions. The chatbot operates within a daily and twilio communication environment, utilizing various services for text-to-speech and language model responses. It's two different use cases, for daily and twilio separately. + +## Features + +- **Audio Input and Output**: Captures microphone input and plays back audio responses. +- **Voice Activity Detection**: Utilizes Silero VAD to manage audio input intelligently. +- **Text-to-Speech**: Integrates ElevenLabs TTS service to convert text responses into audio. +- **Language Model Interaction**: Uses OpenAI's GPT-4 model to generate responses based on user input. +- **Audio Collection**: Getting a link of your audio recording and sends it to Connexity Metric service for analysis + +## Requirements + +- Python 3.10+ +- `python-dotenv` +- Additional libraries from the `pipecat` package. + +## Setup + +1. Clone the repository. +2. Install the required packages. +3. Set up environment variables for API keys: + - `OPENAI_API_KEY` + - `ELEVENLABS_API_KEY` + - `CONNEXITY_API_KEY` +4. Run the script. + +## Usage + +The chatbot introduces itself and engages in conversations, providing brief and creative responses. Designed for flexibility, it can support multiple languages with appropriate configuration. + +## Events + +- Participants joining or leaving the call are handled dynamically, adjusting the chatbot's behavior accordingly. + + +ℹ️ The first time, things might take extra time to get started since VAD (Voice Activity Detection) model needs to be downloaded. + +## Get started + +```python +python3 -m venv venv +source venv/bin/activate +pip install -r requirements.txt + +cp env.example .env # and add your credentials + +``` + +## Run the server + +```bash +python server.py +``` + +Then, visit `http://localhost:8765/` in your browser to start a chatbot session. + +## Build and test the Docker image + +``` +docker build -t chatbot . +docker run --env-file .env -p 8765:8765 chatbot +``` diff --git a/examples/connexity-metrics/daily_bot.py b/examples/connexity-metrics/daily_bot.py new file mode 100644 index 000000000..31cd4b2bf --- /dev/null +++ b/examples/connexity-metrics/daily_bot.py @@ -0,0 +1,174 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import asyncio +import os +import sys + +import aiohttp +from dotenv import load_dotenv +from loguru import logger +from runner import configure + +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.frames.frames import EndFrame +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext +from pipecat.processors.frameworks.rtvi import RTVIConfig, RTVIProcessor +from pipecat.services.elevenlabs import ElevenLabsTTSService +from pipecat.services.openai import OpenAILLMService +from pipecat.transports.services.daily import DailyParams, DailyTransport +from pipecat.services.connexity import ConnexityDailyMetricsService + +load_dotenv(override=True) +logger.remove(0) +logger.add(sys.stderr, level="DEBUG") + + +async def main(): + """Main bot execution function. + + Sets up and runs the bot pipeline including: + - Daily video transport + - Speech-to-text and text-to-speech services + - Language model integration + - Animation processing + - RTVI event handling + """ + async with aiohttp.ClientSession() as session: + (room_url, token) = await configure(session) + + # Set up Daily transport with video/audio parameters + transport = DailyTransport( + room_url, + token, + "Chatbot", + DailyParams( + audio_out_enabled=True, + camera_out_enabled=False, + vad_enabled=True, + vad_audio_passthrough=True, + vad_analyzer=SileroVADAnalyzer(), + transcription_enabled=True, + + # + # Spanish + # + # transcription_settings=DailyTranscriptionSettings( + # language="es", + # tier="nova", + # model="2-general" + # ) + ), + ) + + # Initialize text-to-speech service + tts = ElevenLabsTTSService( + api_key=os.getenv("ELEVENLABS_API_KEY"), + # + # English + # + voice_id="pNInz6obpgDQGcFmaJgB", + # + # Spanish + # + # model="eleven_multilingual_v2", + # voice_id="gD1IexrzCvsXPHUuT0s3", + ) + + # Initialize LLM service + llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o") + + messages = [ + { + "role": "system", + # + # English + # + "content": "You are Chatbot, a friendly, helpful robot. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by introducing yourself.", + # + # Spanish + # + # "content": "Eres Chatbot, un amigable y útil robot. Tu objetivo es demostrar tus capacidades de una manera breve. Tus respuestas se convertiran a audio así que nunca no debes incluir caracteres especiales. Contesta a lo que el usuario pregunte de una manera creativa, útil y breve. Empieza por presentarte a ti mismo.", + }, + ] + + # Set up conversation context and management + # The context_aggregator will automatically collect conversation context + context = OpenAILLMContext(messages) + context_aggregator = llm.create_context_aggregator(context) + + # + # RTVI events for Pipecat client UI + # + rtvi = RTVIProcessor(config=RTVIConfig(config=[])) + + """ + ConnexityDailyMetricsService uses AudioBufferProcessor under the hood to buffer the audio. On + call completion, ConnexityDailyMetricsService will send the complete audio from buffer to Connexity-gateway for + analysis. + """ + connexity = ConnexityDailyMetricsService(call_id="= MAX_BOTS_PER_ROOM: + raise HTTPException(status_code=500, detail=f"Max bot limited reach for room: {room.url}") + + # Get the token for the room + token = await daily_helpers["rest"].get_token(room.url) + + if not token: + raise HTTPException(status_code=500, detail=f"Failed to get token for room: {room.url}") + + # Spawn a new agent, and join the user session + # Note: this is mostly for demonstration purposes (refer to 'deployment' in README) + try: + proc = subprocess.Popen( + [f"python3 -m daily_bot -u {room.url} -t {token}"], + shell=True, + bufsize=1, + cwd=os.path.dirname(os.path.abspath(__file__)), + ) + bot_procs[proc.pid] = (proc, room.url) + except Exception as e: + raise HTTPException(status_code=500, detail=f"Failed to start subprocess: {e}") + + return RedirectResponse(room.url) + + +@app.get("/status/{pid}") +def get_status(pid: int): + # Look up the subprocess + proc = bot_procs.get(pid) + + # If the subprocess doesn't exist, return an error + if not proc: + raise HTTPException(status_code=404, detail=f"Bot with process id: {pid} not found") + + # Check the status of the subprocess + if proc[0].poll() is None: + status = "running" + else: + status = "finished" + + return JSONResponse({"bot_id": pid, "status": status}) + + +@app.post("/twilio") +async def start_call(): + print("POST TwiML") + return HTMLResponse(content=open("templates/streams.xml.template").read(), media_type="application/xml") + + +@app.websocket("/ws") +async def websocket_endpoint(websocket: WebSocket): + await websocket.accept() + start_data = websocket.iter_text() + await start_data.__anext__() + call_data = json.loads(await start_data.__anext__()) + print(call_data, flush=True) + stream_sid = call_data["start"]["streamSid"] + call_sid = call_data["start"]["callSid"] + start_call_recording(call_sid) + print("WebSocket connection accepted") + await run_bot(websocket, stream_sid, call_sid) + + +if __name__ == "__main__": + import uvicorn + + default_host = os.getenv("HOST", "0.0.0.0") + default_port = int(os.getenv("FAST_API_PORT", "8765")) + + parser = argparse.ArgumentParser(description="Daily Storyteller FastAPI server") + parser.add_argument("--host", type=str, default=default_host, help="Host address") + parser.add_argument("--port", type=int, default=default_port, help="Port number") + parser.add_argument("--reload", action="store_true", help="Reload code on change") + + config = parser.parse_args() + + uvicorn.run( + "server:app", + host=config.host, + port=config.port, + reload=config.reload, + ) diff --git a/examples/connexity-metrics/templates/streams.xml.template b/examples/connexity-metrics/templates/streams.xml.template new file mode 100644 index 000000000..e83826324 --- /dev/null +++ b/examples/connexity-metrics/templates/streams.xml.template @@ -0,0 +1,7 @@ + + + + + + + diff --git a/examples/connexity-metrics/twilio_bot.py b/examples/connexity-metrics/twilio_bot.py new file mode 100644 index 000000000..cab75f643 --- /dev/null +++ b/examples/connexity-metrics/twilio_bot.py @@ -0,0 +1,112 @@ +# +# Copyright (c) 2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import os +import sys + +from dotenv import load_dotenv +from fastapi import WebSocket +from loguru import logger + +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext +from pipecat.serializers.twilio import TwilioFrameSerializer +from pipecat.services.elevenlabs import ElevenLabsTTSService +from pipecat.services.deepgram import DeepgramSTTService +from pipecat.services.openai import OpenAILLMService +from pipecat.transports.network.fastapi_websocket import ( + FastAPIWebsocketParams, + FastAPIWebsocketTransport, +) +from pipecat.services.connexity import ConnexityTwilioMetricsService + +load_dotenv(override=True) + +logger.remove(0) +logger.add(sys.stderr, level="DEBUG") + + +async def run_bot(websocket_client: WebSocket, stream_sid: str, call_sid: str): + transport = FastAPIWebsocketTransport( + websocket=websocket_client, + params=FastAPIWebsocketParams( + audio_in_enabled=True, + audio_out_enabled=True, + add_wav_header=False, + vad_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + vad_audio_passthrough=True, + serializer=TwilioFrameSerializer(stream_sid), + ), + ) + + llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o") + + stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"), audio_passthrough=True) + + tts = ElevenLabsTTSService( + api_key=os.getenv("ELEVENLABS_API_KEY"), + voice_id="pNInz6obpgDQGcFmaJgB", + ) + connexity = ConnexityTwilioMetricsService( + sid=call_sid, # YOU HAVE TO PASS ACTUAL CALL SID FROM TWILIO + assistant_id="", + api_key=os.environ.get("CONNEXITY_API_KEY"), + assistant_speaks_first=True, + twilio_account_id=os.environ.get("TWILIO_ACCOUNT_ID"), + twilio_auth_token=os.environ.get("TWILIO_AUTH_TOKEN") + ) + + messages = [ + { + "role": "system", + "content": "You are an elementary teacher in an audio call. Your output will be converted to audio so don't include special characters in your answers. Respond to what the student said in a short short sentence.", + }, + ] + + context = OpenAILLMContext(messages) + context_aggregator = llm.create_context_aggregator(context) + + pipeline = Pipeline( + [ + transport.input(), # Websocket input from client + stt, # Speech-To-Text + context_aggregator.user(), + llm, # LLM + tts, # Text-To-Speech + transport.output(), # Websocket output to client + connexity, + context_aggregator.assistant(), + ] + ) + + task = PipelineTask( + pipeline, + params=PipelineParams( + allow_interruptions=True + ), + ) + + @transport.event_handler("on_client_connected") + async def on_client_connected(transport, client): + # Kick off the conversation. + messages.append({"role": "system", "content": "Please introduce yourself to the user."}) + await task.queue_frames([context_aggregator.user().get_context_frame()]) + + @transport.event_handler("on_client_disconnected") + async def on_client_disconnected(transport, client): + await task.cancel() + + # We use `handle_sigint=False` because `uvicorn` is controlling keyboard + # interruptions. We use `force_gc=True` to force garbage collection after + # the runner finishes running a task which could be useful for long running + # applications with multiple clients connecting. + runner = PipelineRunner(handle_sigint=False, force_gc=True) + + await runner.run(task) diff --git a/examples/connexity-metrics/utils/start_call_recording.py b/examples/connexity-metrics/utils/start_call_recording.py new file mode 100644 index 000000000..d2865e9e7 --- /dev/null +++ b/examples/connexity-metrics/utils/start_call_recording.py @@ -0,0 +1,17 @@ +import os + +from twilio.rest import Client + +account_sid = os.environ.get("TWILIO_ACCOUNT_ID") +auth_token = os.environ.get("TWILIO_AUTH_TOKEN") +client = Client(account_sid, auth_token) + + +def start_call_recording(call_sid): + try: + recording = client.calls(call_sid).recordings.create() + print(f"Recording started with SID: {recording.sid}") + return recording.sid + except Exception as e: + print(f"Error starting recording: {e}") + return None diff --git a/src/pipecat/services/connexity.py b/src/pipecat/services/connexity.py new file mode 100644 index 000000000..56819b2f6 --- /dev/null +++ b/src/pipecat/services/connexity.py @@ -0,0 +1,341 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import io +import os +import wave +from asyncio import sleep + +import aiohttp +from loguru import logger + +from pipecat.frames.frames import CancelFrame, EndFrame, Frame +from pipecat.processors.audio import audio_buffer_processor +from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor +from pipecat.processors.frame_processor import FrameDirection +from pipecat.services.ai_services import AIService + +from twilio.rest import Client + + +class ConnexityInterface(AIService): + def __init__(self, + call_id: str, + assistant_id: str, + api_key: str, + api_url: str, + assistant_speaks_first: bool = True, + **kwargs, + ): + super().__init__(**kwargs) + self._audio_buffer_processor = audio_buffer_processor + self._audio_memory_buffer = io.BytesIO() + self._api_key = api_key + self._api_url = api_url + self._call_id = call_id + self._assistant_id = assistant_id + self._assistant_speaks_first = assistant_speaks_first + + def _request_headers(self): + return {"Content-Type": "application/json", "X-API-KEY": self._api_key} + + async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + await self.push_frame(frame, direction) + + async def send_audio_url_to_connexity(self, audio_url): + answer_list = {"items" : [{ + "agent_id": self._assistant_id, + "sid": self._call_id, + "first_speaker_role": "assistant" if self._assistant_speaks_first else "user", + "audio_link": audio_url + }]} + print(answer_list, flush=True) + print('----------------------------------------------', flush=True) + async with aiohttp.ClientSession() as session: + async with session.post(self._api_url, headers=self._request_headers(), json=answer_list) as response: + # Optionally handle the response, for example: + if response.status != 200: + print(f"Failed to send data: {response.status}") + else: + print(f"Data sent successfully: {response.status}") + + +class ConnexityLocalMetricsService(ConnexityInterface): + """Initialize a ConnexityLocalMetricsService instance. + + This class uses an AudioBufferProcessor to get the conversation audio and + uploads it to Connexity Voice API for audio processing. + + Args: + call_id (str): Your unique identifier for the call. This is used to match the call in the Connexity Voice system to the call in your system. + assistant_id (str): Identifier for the AI assistant. This can be whatever you want, it's intended for you convenience so you can distinguish + between different assistants and a grouping mechanism for calls. + assistant_speaks_first (bool, optional): Indicates if the assistant speaks first in the conversation. Defaults to True. + output_dir (str, optional): Directory to save temporary audio files. Defaults to "recordings". + + Attributes: + call_id (str): Stores the unique call identifier. + assistant_id (str): Stores the assistant identifier. + assistant_speaks_first (bool): Indicates whether the assistant speaks first. + output_dir (str): Directory path for saving temporary audio files. + + The constructor also ensures that the output directory exists. + """ + + def __init__( + self, + *, + audio_buffer_processor: AudioBufferProcessor, + call_id: str, + assistant_id: str, + api_key: str, + api_url: str = "http://connexity-gateway-owzhcfagkq-uc.a.run.app/process/blackbox/file/pipecat", + assistant_speaks_first: bool = True, + **kwargs, + ): + super().__init__(call_id=call_id, + assistant_id=assistant_id, + api_key=api_key, + api_url=api_url, + assistant_speaks_first=assistant_speaks_first, + **kwargs) + self._audio_buffer_processor = audio_buffer_processor + self._audio_memory_buffer = io.BytesIO() + self._api_url = api_url + + async def stop(self, frame: EndFrame): + await self._process_audio() + await super().stop(frame) + + async def cancel(self, frame: CancelFrame): + await self._process_audio() + await super().cancel(frame) + + def _request_headers(self): + return {"X-API-KEY": self._api_key} + + async def _process_audio(self): + print('PROCESS AUDIO', flush=True) + audio_buffer_processor = self._audio_buffer_processor + + if not audio_buffer_processor.has_audio(): + return + + # Merge the raw PCM data from the buffer processor + new_audio = audio_buffer_processor.merge_audio_buffers() + + try: + # Build a temporary WAV buffer from the new audio chunk + temp_buffer = io.BytesIO() + with wave.open(temp_buffer, "wb") as wf: + wf.setsampwidth(2) # 16-bit + wf.setnchannels(audio_buffer_processor.num_channels) + wf.setframerate(audio_buffer_processor.sample_rate) + wf.writeframes(new_audio) + temp_buffer.seek(0) + + # If this is the *first* chunk, just copy the entire WAV (header + frames) + if self._audio_memory_buffer.tell() == 0: + self._audio_memory_buffer.write(temp_buffer.getvalue()) + else: + # Otherwise, read out existing frames, then rewrite the WAV + self._audio_memory_buffer.seek(0) + with wave.open(self._audio_memory_buffer, "rb") as existing_wf: + params = existing_wf.getparams() + existing_frames = existing_wf.readframes(existing_wf.getnframes()) + + # Truncate the main buffer, and rewrite with the old + new frames + self._audio_memory_buffer.seek(0) + self._audio_memory_buffer.truncate(0) + + with wave.open(self._audio_memory_buffer, "wb") as new_wf: + new_wf.setparams(params) + new_wf.writeframes(existing_frames) + # Skip the standard 44-byte header from the new chunk + new_wf.writeframes(temp_buffer.getvalue()[44:]) + + # Reset so we don't double-process the same audio + audio_buffer_processor.reset_audio_buffers() + logger.info("Audio processed and appended to the in-memory buffer.") + print(self._audio_memory_buffer.read(), flush=True) + + await self.send_audio_file_to_connexity() + + except Exception as e: + print(f"Failed to process audio: {e}", flush=True) + logger.error(f"Failed to process audio: {e}") + + async def send_audio_file_to_connexity(self): + data = aiohttp.FormData() + data.add_field( + "file", + self._audio_memory_buffer, + filename="audio.wav", + content_type="audio/wav", + ) + data.add_field("sid", self._call_id) + data.add_field( + "first_speaker_role", + "assistant" if self._assistant_speaks_first else "user", + ) + data.add_field("agent_id", self._assistant_id) + print(data.__dict__, flush=True) + print(self._audio_memory_buffer.read()) + async with aiohttp.ClientSession() as session: + async with session.post(self._api_url, headers=self._request_headers(), data=data) as response: + # Optionally handle the response, for example: + if response.status != 200: + print(f"Failed to send data: {response}") + else: + print(f"Data sent successfully: {response}") + + +class ConnexityTwilioMetricsService(ConnexityInterface): + """Initialize a ConnexityTwilioMetricsService instance. + + This class uses an AudioBufferProcessor to get the conversation audio and + uploads it to Connexity Voice API for audio processing. + + Args: + call_id (str): Your unique identifier for the call. This is used to match the call in the Connexity Voice system to the call in your system. + assistant_id (str): Identifier for the AI assistant. This can be whatever you want, it's intended for you convenience so you can distinguish + between different assistants and a grouping mechanism for calls. + assistant_speaks_first (bool, optional): Indicates if the assistant speaks first in the conversation. Defaults to True. + output_dir (str, optional): Directory to save temporary audio files. Defaults to "recordings". + + Attributes: + call_id (str): Stores the unique call identifier. + assistant (str): Stores the assistant identifier. + assistant_speaks_first (bool): Indicates whether the assistant speaks first. + output_dir (str): Directory path for saving temporary audio files. + + The constructor also ensures that the output directory exists. + """ + + def __init__( + self, + *, + sid: str, + assistant_id: str, + api_key: str, + api_url: str = "https://connexity-gateway-owzhcfagkq-uc.a.run.app/process/blackbox/links", + assistant_speaks_first: bool = True, + twilio_account_id: str, + twilio_auth_token: str, + **kwargs, + ): + super().__init__(call_id=sid, + assistant_id=assistant_id, + api_key=api_key, + api_url=api_url, + assistant_speaks_first=assistant_speaks_first, + **kwargs) + self._audio_buffer_processor = audio_buffer_processor + self._api_url = api_url + self.twilio_account_id = twilio_account_id + self.twilio_auth_token = twilio_auth_token + + async def cancel(self, frame: CancelFrame): + await self.send_audio_url_to_connexity(await self._get_twilio_recording()) + await super().cancel(frame) + + async def _get_twilio_recording(self): + client = Client( + os.environ["TWILIO_ACCOUNT_ID"], os.environ["TWILIO_AUTH_TOKEN"] + ) + i = 0 + recording = None + + while not recording: + i += 1 + recording = client.recordings.list(call_sid=self._call_id) + if recording: + recording_url = f"https://api.twilio.com/2010-04-01/Accounts/{os.environ["TWILIO_ACCOUNT_ID"]}/Recordings/{recording[0].sid}.wav" + return recording_url + await sleep(3) + if i == 3: + return None + + +class ConnexityDailyMetricsService(ConnexityInterface): + """Initialize a ConnexityDailyMetricsService instance. + + This class uses an AudioBufferProcessor to get the conversation audio and + uploads it to Connexity Voice API for audio processing. + + Args: + call_id (str): Your unique identifier for the call. This is used to match the call in the Connexity Voice system to the call in your system. + assistant_id (str): Identifier for the AI assistant. This can be whatever you want, it's intended for you convenience so you can distinguish + between different assistants and a grouping mechanism for calls. + assistant_speaks_first (bool, optional): Indicates if the assistant speaks first in the conversation. Defaults to True. + output_dir (str, optional): Directory to save temporary audio files. Defaults to "recordings". + + Attributes: + call_id (str): Stores the unique call identifier. + assistant (str): Stores the assistant identifier. + assistant_speaks_first (bool): Indicates whether the assistant speaks first. + output_dir (str): Directory path for saving temporary audio files. + + The constructor also ensures that the output directory exists. + """ + + def __init__( + self, + *, + call_id: str, + assistant_id: str, + api_key: str, + api_url: str = "http://connexity-gateway-owzhcfagkq-uc.a.run.app/process/blackbox/links", + assistant_speaks_first: bool = True, + daily_api_key: str, + room_url: str, + **kwargs, + ): + super().__init__(call_id=call_id, + assistant_id=assistant_id, + api_key=api_key, + api_url=api_url, + assistant_speaks_first=assistant_speaks_first, + **kwargs) + self._audio_buffer_processor = audio_buffer_processor + self._api_url = api_url + self.daily_api_key = daily_api_key + self._room_url = room_url + + # async def stop(self, frame: EndFrame): + # print("END FRAME RECEIVED", flush=True) + # await self.send_audio_url_to_connexity(self._get_daily_recording(self._room_url)) + # await super().stop(frame) + + async def cancel(self, frame: CancelFrame): + print("CANCEL FRAME RECEIVED", flush=True) + await self.send_audio_url_to_connexity(await self._get_daily_recording(self._room_url)) + await super().cancel(frame) + + async def _get_daily_recording(self, room_url): + import requests + call_info_url = 'https://api.daily.co/v1/recordings?room_name={room_name}' + download_link_url = 'https://api.daily.co/v1/recordings/{call_id}/access-link' + i = 0 + + while i != 3: + try: + room_name = room_url.split('/')[-1] + headers = { + 'Content-Type': 'application/json', + 'Authorization': f'Bearer {self.daily_api_key}'} + response = requests.get(call_info_url.format(room_name=room_name), headers=headers) + + call_id = response.json()['data'][0]['id'] + response = requests.get(download_link_url.format(call_id=call_id), headers=headers) + download_url = response.json()['download_link'] + + return download_url + except Exception: + i += 1 + await sleep(3) + continue