Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,262 @@
# Conversation Recorder Extension

Records conversation audio (user and agent) to a file and optionally uploads it to cloud storage.

## Features

- **Audio Mixing**: Mixes audio from multiple sources (user input + agent output) into a single recording
- **Multiple Storage Backends**: Local filesystem, Google Cloud Storage (GCS), or S3-compatible storage
- **Custom Filenames**: Optionally specify a custom filename or auto-generate with timestamp
- **Automatic Lifecycle**: Starts recording on user join, stops and saves on user leave

## Installation

The extension is included in the TEN Agent ten_packages. Ensure the following dependencies are installed:

```
numpy
scipy
google-cloud-storage # Only needed for GCS storage
boto3 # Only needed for S3 storage
```

## Configuration

### Basic Properties

| Property | Type | Default | Description |
|----------|------|---------|-------------|
| `storage_type` | string | `"local"` | Storage backend: `"local"`, `"gcs"`, or `"s3"` |
| `file_path` | string | `"records/"` | Local directory path (for local storage) |
| `filename` | string | auto-generated | Custom filename (works with all storage modes) |
| `start_trigger` | string | `"on_user_joined"` | When to start recording: `"on_user_joined"` or `"on_start"` |
| `sample_rate` | int | `24000` | Audio sample rate in Hz |

### GCS Storage Properties

| Property | Type | Description |
|----------|------|-------------|
| `gcp_bucket_name` | string | GCS bucket name (required for GCS) |
| `gcp_project_id` | string | GCP project ID (optional if using ADC) |
| `gcp_credentials_path` | string | Path to service account JSON file |
| `gcp_upload_prefix` | string | Folder prefix in bucket (e.g., `"recordings/"`) |

### S3 Storage Properties

| Property | Type | Description |
|----------|------|-------------|
| `s3_bucket_name` | string | S3 bucket name (required for S3) |
| `s3_access_key_id` | string | AWS access key ID (optional if using IAM roles) |
| `s3_secret_access_key` | string | AWS secret access key |
| `s3_endpoint_url` | string | Custom endpoint URL (for MinIO, DigitalOcean Spaces, etc.) |
| `s3_region` | string | AWS region (e.g., `"us-east-1"`) |
| `s3_upload_prefix` | string | Folder prefix in bucket |

## Usage Examples

### Local Storage

```json
{
"type": "extension",
"name": "conversation_recorder",
"addon": "conversation_recorder",
"extension_group": "default",
"property": {
"storage_type": "local",
"file_path": "records",
"start_trigger": "on_user_joined"
}
}
```

### Google Cloud Storage

```json
{
"type": "extension",
"name": "conversation_recorder",
"addon": "conversation_recorder",
"extension_group": "default",
"property": {
"storage_type": "gcs",
"gcp_bucket_name": "my-recordings-bucket",
"gcp_upload_prefix": "conversations/",
"gcp_credentials_path": "/path/to/service-account.json",
"start_trigger": "on_user_joined"
}
}
```

### S3-Compatible Storage (AWS S3)

```json
{
"type": "extension",
"name": "conversation_recorder",
"addon": "conversation_recorder",
"extension_group": "default",
"property": {
"storage_type": "s3",
"s3_bucket_name": "my-recordings-bucket",
"s3_region": "us-east-1",
"s3_upload_prefix": "conversations/",
"start_trigger": "on_user_joined"
}
}
```

### S3-Compatible Storage (MinIO)

```json
{
"type": "extension",
"name": "conversation_recorder",
"addon": "conversation_recorder",
"extension_group": "default",
"property": {
"storage_type": "s3",
"s3_bucket_name": "my-recordings-bucket",
"s3_endpoint_url": "https://minio.example.com:9000",
"s3_access_key_id": "my-access-key",
"s3_secret_access_key": "my-secret-key",
"s3_upload_prefix": "conversations/"
}
}
```

## Graph Integration

To use this extension in your TEN Agent graph, you need to:

1. **Add the node** to your graph's `nodes` array
2. **Connect audio sources** to the recorder (user audio from `streamid_adapter` and agent audio from `v2v`)
3. **Connect user events** to trigger recording start/stop

### Example Graph Configuration

```json
{
"name": "my_graph",
"auto_start": true,
"graph": {
"nodes": [
{
"type": "extension",
"name": "agora_rtc",
"addon": "agora_rtc",
"property": { ... }
},
{
"type": "extension",
"name": "streamid_adapter",
"addon": "streamid_adapter",
"property": {}
},
{
"type": "extension",
"name": "v2v",
"addon": "gemini_mllm_python",
"property": { ... }
},
{
"type": "extension",
"name": "conversation_recorder",
"addon": "conversation_recorder",
"extension_group": "default",
"property": {
"storage_type": "local",
"file_path": "records",
"start_trigger": "on_user_joined"
}
}
],
"connections": [
{
"extension": "streamid_adapter",
"audio_frame": [
{
"name": "pcm_frame",
"dest": [
{ "extension": "v2v" },
{ "extension": "conversation_recorder" }
]
}
]
},
{
"extension": "v2v",
"audio_frame": [
{
"name": "pcm_frame",
"dest": [
{ "extension": "conversation_recorder" }
]
}
]
},
{
"extension": "conversation_recorder",
"cmd": [
{
"names": ["on_user_joined", "on_user_left"],
"source": [
{ "extension": "agora_rtc" }
]
}
]
}
]
}
}
```

### Connection Diagram

```
┌─────────────────┐
│ agora_rtc │
│ (user audio) │
└────────┬────────┘
│ pcm_frame
┌─────────────────┐
│ streamid_adapter│
└────────┬────────┘
│ pcm_frame
┌──────────────┴──────────────┐
▼ ▼
┌─────────┐ ┌──────────────────┐
│ v2v │ │ │
│ (LLM) │ │ conversation_ │
└────┬────┘ │ recorder │
│ pcm_frame │ │
└────────────────────►│ (mixes both │
│ audio sources) │
└──────────────────┘
┌──────────────────┐
│ Storage Backend │
│ (local/gcs/s3) │
└──────────────────┘
```

### Required Connections

1. **User audio**: Connect `pcm_frame` from `streamid_adapter` to `conversation_recorder`
2. **Agent audio**: Connect `pcm_frame` from `v2v` (LLM extension) to `conversation_recorder`
3. **User events**: Connect `on_user_joined` and `on_user_left` commands from `agora_rtc` to `conversation_recorder`

## Output

- **Local storage**: Saves WAV file to the specified `file_path` directory
- **GCS**: Uploads to `gs://{bucket}/{prefix}/{filename}.wav`
- **S3**: Uploads to `s3://{bucket}/{prefix}/{filename}.wav`

If no custom `filename` is provided, files are named with timestamp: `conversation_YYYYMMDD_HHMMSS.wav`


## Author

Written by [Roei Bracha](https://github.com/Roei-Bracha)
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from . import addon
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from ten_runtime import (
Addon,
register_addon_as_extension,
TenEnv,
)
from .extension import ConversationRecorderExtension


@register_addon_as_extension("conversation_recorder")
class ConversationRecorderExtensionAddon(Addon):
def on_create_instance(self, ten_env: TenEnv, name: str, context) -> None:
ten_env.log_info(
"ConversationRecorderExtensionAddon on_create_instance"
)
ten_env.on_create_instance_done(
ConversationRecorderExtension(name), context
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import numpy as np
import threading
from typing import Dict, Deque
from collections import deque


class AudioMixer:
def __init__(self, sample_rate=24000, channels=1, chunk_duration_ms=40):
self.sample_rate = sample_rate
self.channels = channels
# Calculate chunk size in samples.
# e.g., 24000Hz * 0.04s = 960 samples
self.chunk_size = int(self.sample_rate * (chunk_duration_ms / 1000.0))

# Buffers: map string(stream_id) -> Deque[float] (samples)
# We use deque for efficient pop from left.
self.buffers: Dict[str, Deque[float]] = {}
self.lock = threading.Lock()

def _resample(
self, audio: np.ndarray, src_rate: int, dst_rate: int
) -> np.ndarray:
"""Resample audio from src_rate to dst_rate using linear interpolation."""
if src_rate == dst_rate:
return audio

# Calculate the resampling ratio
ratio = dst_rate / src_rate

# Calculate output length
output_length = int(len(audio) * ratio)
if output_length == 0:
return np.array([], dtype=np.float32)

# Use linear interpolation for resampling
x_old = np.linspace(0, 1, len(audio))
x_new = np.linspace(0, 1, output_length)
resampled = np.interp(x_new, x_old, audio)

return resampled.astype(np.float32)

def push_audio(
self, source_id: str, pcm_data: bytes, source_sample_rate: int = None
):
"""
Push raw PCM bytes (int16) into the source's buffer.
If source_sample_rate differs from mixer sample_rate, audio will be resampled.

Args:
source_id: Unique identifier for the audio source
pcm_data: Raw PCM bytes in int16 format
source_sample_rate: Sample rate of the input audio (defaults to mixer rate if not specified)
"""
# Convert to float32 for mixing headroom
audio_array = np.frombuffer(pcm_data, dtype=np.int16).astype(np.float32)

# Resample if needed
if (
source_sample_rate is not None
and source_sample_rate != self.sample_rate
):
audio_array = self._resample(
audio_array, source_sample_rate, self.sample_rate
)

with self.lock:
if source_id not in self.buffers:
self.buffers[source_id] = deque()

# Extend deque with samples
self.buffers[source_id].extend(audio_array)

def mix_next_chunk(self) -> bytes:
"""
Extracts `chunk_size` samples from all buffers, mixes them, and returns bytes.
If a buffer has insufficient data, it contributes 0 (silence) for the missing part.
If ALL buffers are empty, returns empty bytes (indicating no data to write).
"""
with self.lock:
# Check if any buffer has data
has_data = any(len(buf) > 0 for buf in self.buffers.values())
if not has_data:
return b""

# Initialize mixer buffer
mixed_chunk = np.zeros(self.chunk_size, dtype=np.float32)

# Mix each source
for buf in self.buffers.values():
# Extract up to chunk_size samples
count = min(len(buf), self.chunk_size)
if count > 0:
# Create temporary array from deque slice
# Iterating deque is fast enough for 960 items?
# Ideally we'd chunk this better, but for python MVP this is readable.
# Optimization: slice deque to list then array.
samples = [buf.popleft() for _ in range(count)]
samples_arr = np.array(samples, dtype=np.float32)

# Add to mix
mixed_chunk[:count] += samples_arr

# Clip and convert to int16
mixed_chunk = np.clip(mixed_chunk, -32768, 32767)
return mixed_chunk.astype(np.int16).tobytes()
Loading
Loading