diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..8dafc8f --- /dev/null +++ b/.dockerignore @@ -0,0 +1,45 @@ +# Virtual environment and build artifacts +.venv/ +*.egg-info/ +dist/ +build/ +__pycache__/ +*.py[cod] + +# Test artifacts +.pytest_cache/ +.coverage +htmlcov/ +coverage.xml + +# Type / lint caches +.mypy_cache/ +.ruff_cache/ + +# Secrets and local config +.env +assets/ + +# Dev tooling +.pre-commit-config.yaml +.claude/ +.idea/ +.vscode/ +*.swp +*.swo +.DS_Store + +# Git +.git/ +.github/ + +# Docs and non-runtime files +docs/ +tests/ +CHANGELOG.md +CONTRIBUTING.md +CLAUDE.md +SECURITY.md +Makefile +README.md +uv.lock diff --git a/.gitignore b/.gitignore index abb7096..c64f816 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ assets/ +.claude/ # Python .venv/ diff --git a/CHANGELOG.md b/CHANGELOG.md index 92ecb6e..c83f769 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,29 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [0.1.1] - 2026-03-06 + +### Added + +- `combined_report.json` now includes four derived metrics: `avg_time_per_file_seconds`, `avg_time_per_mb_seconds`, `processing_speed_ratio` (real-time factor), and `words_per_audio_hour` (transcription density) +- Slack notifications now include detailed per-stage stats (processed / skipped / failed counts) and average processing time per file +- `make test-slack` Makefile target for validating Slack webhook integration +- Dockerfile and `.dockerignore` for containerized deployment +- Sentiment output directory (`/sentiment/`) support in batch pipeline + +### Changed + +- Centralized Demucs scratch directory resolution in CLI — RAM disk detection and fallback confirmation now happen in one place +- Worker status reporting and failure aggregation in `pipeline-parallel` refactored for improved accuracy +- `python-dotenv` import in Slack notifier is now conditional — avoids import-time failure when the package is absent +- DEPLOYMENT.md expanded: HuggingFace token setup, NVIDIA driver requirements, cloud instance guidelines, and Docker usage +- Combined report fields documented in README under the Parallel Pipeline section + +### Fixed + +- Narrowed exception handling in `gpu_utils.py`, `transcriber.py`, and `notifier.py` to avoid masking unexpected errors +- Typo in `SeparationError` docstring + ## [0.1.0] - 2026-03-01 ### Added @@ -35,5 +58,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `transformers` capped at `<4.40.0` — versions 4.40+ use `torch.utils._pytree.register_pytree_node`, an API introduced in PyTorch 2.2, which breaks with the pinned PyTorch 2.1.2 - `make dev-setup` now reinstalls CUDA torch wheels (`torch==2.1.2+cu121`, `torchaudio==2.1.2+cu121`) as its final step — `uv sync` resolves torch from PyPI and installs the CPU-only build, silently breaking GPU inference -[Unreleased]: https://github.com/LunarCommand/audio-refinery/compare/v0.1.0...HEAD +[Unreleased]: https://github.com/LunarCommand/audio-refinery/compare/v0.1.1...HEAD +[0.1.1]: https://github.com/LunarCommand/audio-refinery/compare/v0.1.0...v0.1.1 [0.1.0]: https://github.com/LunarCommand/audio-refinery/releases/tag/v0.1.0 diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..1163830 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,39 @@ +FROM nvidia/cuda:12.1.1-cudnn8-devel-ubuntu22.04 + +# System dependencies +RUN apt-get update && apt-get install -y \ + python3.11 python3.11-dev python3-pip python3.11-venv \ + ffmpeg git curl \ + && rm -rf /var/lib/apt/lists/* + +# Non-root user +RUN useradd -m -u 1000 refinery +WORKDIR /app +USER refinery + +# Install uv +RUN pip install --user uv + +# Copy and install the package (resolves main deps; may pull CPU-only torch) +COPY --chown=refinery:refinery . . +RUN uv pip install -e . + +# Install WhisperX at the pinned commit — no-deps to avoid overwriting torch +# v3.1.1 tag has the old API without device_index; use the correct commit instead +RUN uv pip install --no-deps \ + "whisperx @ git+https://github.com/m-bain/whisperX.git@741ab9a2a8a1076c171e785363b23c55a91ceff1" + +# Install pinned WhisperX runtime deps +# transformers must stay <4.40.0 — 4.40+ uses torch.utils._pytree.register_pytree_node +# which was added in PyTorch 2.2 and breaks with the pinned 2.1.2 +RUN uv pip install \ + "av==16.1.0" "ctranslate2==4.7.1" "faster-whisper==1.2.1" \ + "flatbuffers==25.12.19" "nltk==3.9.2" "onnxruntime==1.24.1" \ + "transformers>=4.30.0,<4.40.0" + +# Reinstall PyTorch with CUDA 12.1 wheels last — uv pip install -e . above may have +# pulled CPU-only builds; this guarantees the CUDA wheel is what's actually used +RUN uv pip install torch==2.1.2+cu121 torchaudio==2.1.2+cu121 \ + --extra-index-url https://download.pytorch.org/whl/cu121 + +CMD ["audio-refinery", "--help"] diff --git a/Makefile b/Makefile index 0ec7ef6..89dbca1 100644 --- a/Makefile +++ b/Makefile @@ -79,6 +79,17 @@ dev-setup: install-dev install-whisperx install-torch-cuda pre-commit-install ## @echo " 2. Run 'make test' to verify everything works" @echo " 3. Run 'audio-refinery --help' to see available commands" +test-slack: ## Send a test Slack notification to verify SLACK_WEBHOOK_URL is configured + @uv run python -c "\ +from dotenv import load_dotenv; \ +load_dotenv(); \ +import os, sys, json, urllib.request; \ +url = os.getenv('SLACK_WEBHOOK_URL') or (print('SLACK_WEBHOOK_URL is not set — add it to .env or export it') or sys.exit(1)); \ +data = json.dumps({'text': ':white_check_mark: *Test notification* from \`audio-refinery\` — Slack integration is working.'}).encode(); \ +req = urllib.request.Request(url, data=data, headers={'Content-Type': 'application/json'}); \ +urllib.request.urlopen(req, timeout=5); \ +print('Test notification sent — check your Slack channel')" + stats: ## Show project statistics @echo "Project Statistics:" @echo "===================" diff --git a/README.md b/README.md index 3b748df..96eef5c 100644 --- a/README.md +++ b/README.md @@ -607,6 +607,29 @@ Options: --help Show this message and exit. ``` +### Combined report fields + +`combined_report.json` is always written after all workers finish. It contains aggregate metrics across all workers: + +| Field | Type | Description | +|---|---|---| +| `run_at` | string | ISO 8601 timestamp of run start (UTC) | +| `total_discovered` | int | Total WAV files found in `extracted/` | +| `total_time_seconds` | float | Wall-clock seconds from first worker start to last finish | +| `total_audio_hours` | float | Total audio duration processed across all workers | +| `source_audio_bytes` | int | Combined size of all input WAV files | +| `total_words` | int | Total words transcribed across all files | +| `total_segments` | int | Total transcript segments across all files | +| `avg_time_per_file_seconds` | float | `total_time / total_discovered` — average wall-clock cost per file | +| `avg_time_per_mb_seconds` | float | `total_time / source_MB` — processing seconds per MB of source audio | +| `processing_speed_ratio` | float | `audio_seconds / wall_seconds` — real-time factor (e.g. `3.7` means the pipeline processed audio 3.7× faster than its playback duration) | +| `words_per_audio_hour` | float | Transcription density — useful for detecting sparse/silent audio or diarization misses | +| `gpu_temp_celsius` | object | Per-device temperature summary: `peak_celsius`, `avg_celsius`, `sample_count` | +| `workers` | array | Per-worker label, device, exit code, and individual summary | +| `combined_failures` | array | Aggregated failure records from all workers | + +`null` is written for derived metrics when the divisor is zero (e.g. `avg_time_per_file_seconds` is `null` if no files were discovered). + ### Power limit / sudoers `--power-limit` invokes `sudo nvidia-smi -pl `. To allow this without a password prompt: diff --git a/docs/DEPLOYMENT.md b/docs/DEPLOYMENT.md index 45508b5..9d0437c 100644 --- a/docs/DEPLOYMENT.md +++ b/docs/DEPLOYMENT.md @@ -191,7 +191,62 @@ with engine.connect() as conn: ## Docker Containerization Containerizing audio-refinery ensures that the PyTorch 2.1.2 + CUDA 12.1 dependency stack is -portable and reproducible across machines, including cloud GPU instances. +portable and reproducible across machines. The same image runs on a local GPU workstation and +on a cloud GPU instance — the difference is how the image is built and delivered, and what +scratch storage is available. + +### Prerequisites + +#### HuggingFace Token + +Pyannote speaker diarization uses gated models that require HuggingFace authentication: + +1. Create a free account at [huggingface.co](https://huggingface.co) +2. Accept the model terms for [pyannote/speaker-diarization-3.1](https://huggingface.co/pyannote/speaker-diarization-3.1) and [pyannote/segmentation-3.0](https://huggingface.co/pyannote/segmentation-3.0) +3. Generate a read token at [huggingface.co/settings/tokens](https://huggingface.co/settings/tokens) + +Pass the token via `-e HF_TOKEN` or the compose `environment:` block. Without it the diarization +stage will fail with a 401 authentication error the first time it tries to download the model. + +#### NVIDIA Driver Version + +The `nvidia/cuda:12.1.1` base image requires **NVIDIA driver ≥ 525.85.12** on the host. +Check before pulling the image: + +```bash +nvidia-smi --query-gpu=driver_version --format=csv,noheader +``` + +If the driver is older than 525, update it before proceeding. + +#### NVIDIA Container Toolkit + +Docker cannot access the GPU without the NVIDIA Container Toolkit installed and configured on +the host. This applies to both local workstations and cloud instances — most cloud GPU images +include NVIDIA drivers but not the container toolkit. + +```bash +# Install the toolkit (Ubuntu / Debian) +curl -fsSL https://nvidia.github.io/libnvidia-container/gpgkey \ + | sudo gpg --dearmor -o /usr/share/keyrings/nvidia-container-toolkit-keyring.gpg +curl -s -L https://nvidia.github.io/libnvidia-container/stable/deb/nvidia-container-toolkit.list \ + | sed 's#deb https://#deb [signed-by=/usr/share/keyrings/nvidia-container-toolkit-keyring.gpg] https://#g' \ + | sudo tee /etc/apt/sources.list.d/nvidia-container-toolkit.list +sudo apt-get update && sudo apt-get install -y nvidia-container-toolkit + +# Configure Docker to use the NVIDIA runtime +sudo nvidia-ctk runtime configure --runtime=docker +sudo systemctl restart docker +``` + +Verify GPU access inside a container before building: + +```bash +docker run --rm --gpus all nvidia/cuda:12.1.1-base-ubuntu22.04 nvidia-smi +``` + +This should print the same `nvidia-smi` output as the host. If it fails, the toolkit is not +installed correctly — audio-refinery will not be able to use the GPU inside the container. ### Dockerfile @@ -212,25 +267,50 @@ USER refinery # Install uv RUN pip install --user uv -# Install PyTorch first (CUDA 12.1 wheel) -RUN uv pip install torch==2.1.2 torchaudio==2.1.2 \ - --extra-index-url https://download.pytorch.org/whl/cu121 - -# Install WhisperX and runtime deps (must be separate due to ctranslate2 constraints) -RUN uv pip install setuptools && \ - uv pip install --no-deps --no-build-isolation \ - "whisperx @ git+https://github.com/m-bain/whisperX.git@v3.1.1" && \ - uv pip install "ctranslate2>=4.0" "faster-whisper>=1.0.0" \ - "transformers>=4.35.0,<4.42.0" nltk - -# Copy and install the package +# Copy and install the package (resolves main deps; may pull CPU-only torch) COPY --chown=refinery:refinery . . RUN uv pip install -e . +# Install WhisperX at the pinned commit — no-deps to avoid overwriting torch +# v3.1.1 tag has the old API without device_index; use the correct commit instead +RUN uv pip install --no-deps \ + "whisperx @ git+https://github.com/m-bain/whisperX.git@741ab9a2a8a1076c171e785363b23c55a91ceff1" + +# Install pinned WhisperX runtime deps +# transformers must stay <4.40.0 — 4.40+ uses torch.utils._pytree.register_pytree_node +# which was added in PyTorch 2.2 and breaks with the pinned 2.1.2 +RUN uv pip install \ + "av==16.1.0" "ctranslate2==4.7.1" "faster-whisper==1.2.1" \ + "flatbuffers==25.12.19" "nltk==3.9.2" "onnxruntime==1.24.1" \ + "transformers>=4.30.0,<4.40.0" + +# Reinstall PyTorch with CUDA 12.1 wheels last — uv pip install -e . above may have +# pulled CPU-only builds; this guarantees the CUDA wheel is what's actually used +RUN uv pip install torch==2.1.2+cu121 torchaudio==2.1.2+cu121 \ + --extra-index-url https://download.pytorch.org/whl/cu121 + CMD ["audio-refinery", "--help"] ``` -### docker-compose.yml +### Building the Image + +```bash +docker build -t audio-refinery:latest . +``` + +The build clones WhisperX from GitHub and downloads PyTorch CUDA wheels, so it requires internet +access and takes 10–20 minutes on first run. Subsequent builds are faster due to layer caching, +provided `pyproject.toml` has not changed. + +--- + +### Running Locally (Workstation) + +For a local GPU workstation, build the image once and run it via compose for sustained batch +runs, or `docker run` for one-off jobs. Audio data and the RAM disk are bind-mounted from the +host, so the container has no persistent state of its own. + +**Sustained batch — docker-compose.yml:** ```yaml services: @@ -244,8 +324,8 @@ services: device_ids: ['0'] # Pin to specific GPU by PCI ID capabilities: [gpu] volumes: - - /data/audio:/data # Persistent audio storage - - /mnt/fast_scratch:/mnt/fast_scratch # RAM disk (mount on host first) + - /data/audio:/data # Persistent audio storage + - /mnt/fast_scratch:/mnt/fast_scratch # RAM disk (mount on host first) environment: - HF_TOKEN=${HF_TOKEN} - SLACK_WEBHOOK_URL=${SLACK_WEBHOOK_URL} @@ -255,21 +335,80 @@ services: --compute-type int8_float16 ``` -### Running in the cloud +**Ad-hoc run:** + +```bash +docker run --rm --gpus '"device=0"' \ + -v /data/audio:/data \ + -v /mnt/fast_scratch:/mnt/fast_scratch \ + -e HF_TOKEN="${HF_TOKEN}" \ + -e SLACK_WEBHOOK_URL="${SLACK_WEBHOOK_URL}" \ + audio-refinery:latest \ + audio-refinery pipeline --base-dir /data/batch --compute-type int8_float16 +``` + +`--gpus '"device=0"'` pins to a specific GPU by index. Use `--gpus all` to expose all GPUs. + +--- + +### Deploying to the Cloud + +Cloud deployment follows the same container pattern as local, with two differences: the image +is delivered via a registry rather than built in place, and NVMe instance storage substitutes +for the RAM disk. + +#### Instance Selection + +The recommended minimum is a **24 GB GPU** to hold all models resident simultaneously. On a +10–12 GB GPU, models are loaded and unloaded between stages, adding 10–30 seconds of overhead +per file. See [VRAM Footprint by Stage](#vram-footprint-by-stage) for a full breakdown. + +Common instance types that meet the 24 GB threshold: NVIDIA A10G (AWS g5), L4 (GCP g2), RTX +3090 / 4090 (bare metal providers). + +#### Registry Workflow + +Build and push from your local machine (or a CI runner), then pull on the cloud instance: + +```bash +# Build and push (local machine) +docker build -t your-registry/audio-refinery:latest . +docker push your-registry/audio-refinery:latest + +# Pull and run (cloud instance — after completing Prerequisites above) +docker pull your-registry/audio-refinery:latest +docker run --rm --gpus '"device=0"' \ + -v /data/audio:/data \ + -v /mnt/nvme:/mnt/fast_scratch \ + -e HF_TOKEN="${HF_TOKEN}" \ + -e SLACK_WEBHOOK_URL="${SLACK_WEBHOOK_URL}" \ + your-registry/audio-refinery:latest \ + audio-refinery pipeline --base-dir /data/batch --compute-type int8_float16 +``` + +Alternatively, build directly on the cloud instance if it has internet access and you want to +skip managing a registry: -On a cloud GPU instance without a RAM disk, substitute a high-bandwidth NVMe instance volume -for `/mnt/fast_scratch`. Cloud GPU instances typically provide NVMe-backed instance storage at -2–4 GB/s write throughput — adequate as a scratch substitute at the cost of some SSD wear. +```bash +git clone https://github.com/LunarCommand/audio-refinery.git +cd audio-refinery +docker build -t audio-refinery:latest . +``` + +#### Scratch Storage + +Cloud GPU instances do not have a RAM disk. Use a high-bandwidth NVMe instance volume as scratch: ```bash -# Use a local NVMe volume as scratch instead of RAM disk -audio-refinery pipeline \ - --base-dir /data/batch \ - --compute-type int8_float16 - # The pipeline will prompt for confirmation before writing to local storage - # if /mnt/fast_scratch is not mounted +# Mount instance NVMe storage (device path varies by provider) +sudo mkdir -p /mnt/nvme +sudo mount /dev/nvme1n1 /mnt/nvme ``` +Pass `/mnt/nvme` as the scratch bind mount (`-v /mnt/nvme:/mnt/fast_scratch`). Cloud NVMe +instance storage typically provides 2–4 GB/s write throughput — adequate as a scratch substitute +at the cost of some SSD wear. + --- ## Monitoring diff --git a/docs/DEVELOPMENT.md b/docs/DEVELOPMENT.md index 21e8f7d..66048d7 100644 --- a/docs/DEVELOPMENT.md +++ b/docs/DEVELOPMENT.md @@ -18,11 +18,11 @@ This guide covers setting up audio-refinery for development, testing, and contri The three GPU-resident models have the following approximate VRAM footprints: -| Model | Stage | Peak VRAM | -|---|---|:---:| -| Demucs `htdemucs` | Vocal separation | ~4 GB | -| Pyannote `speaker-diarization-3.1` | Diarization | ~1 GB | -| WhisperX `large-v3` | Transcription | ~10 GB | +| Model | Stage | Peak VRAM | +|------------------------------------|------------------|:---------:| +| Demucs `htdemucs` | Vocal separation | ~4 GB | +| Pyannote `speaker-diarization-3.1` | Diarization | ~1 GB | +| WhisperX `large-v3` | Transcription | ~10 GB | A **24 GB GPU** (RTX 3090, 3090 Ti, 4090, A5000, etc.) holds all three models simultaneously with room for a comfortable batch size (16–32). This is the recommended configuration for @@ -259,12 +259,12 @@ Hooks run: Each stage is a standalone module with a pure-function API: -| Module | Function | Output Model | -|--------|----------|--------------| -| `separator.py` | `separate()` | `SeparationResult` | -| `diarizer.py` | `diarize()` | `DiarizationResult` | -| `transcriber.py` | `transcribe()` | `TranscriptionResult` | -| `sentiment_analyzer.py` | `analyze_sentiment()` | `SentimentResult` | +| Module | Function | Output Model | +|-------------------------|-----------------------|-----------------------| +| `separator.py` | `separate()` | `SeparationResult` | +| `diarizer.py` | `diarize()` | `DiarizationResult` | +| `transcriber.py` | `transcribe()` | `TranscriptionResult` | +| `sentiment_analyzer.py` | `analyze_sentiment()` | `SentimentResult` | ### Data Flow @@ -371,15 +371,15 @@ Before creating a release: After release: -- [ ] GitHub Release page shows correct version and artifacts +- [ ] GitHub Release page shows the correct version and artifacts - [ ] Release notes are accurate ### Quick Reference ```bash # Complete release workflow -git checkout main && git pull -git checkout -b release/vX.Y.Z +git switch main && git pull +git switch -c release/vX.Y.Z # Update pyproject.toml version and CHANGELOG.md make all-checks git add pyproject.toml CHANGELOG.md diff --git a/pyproject.toml b/pyproject.toml index c482afc..48aceb2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "audio-refinery" -version = "0.1.0" +version = "0.1.1" description = "GPU-accelerated audio processing pipeline: vocal separation, speaker diarization, transcription, and sentiment analysis." readme = "README.md" license = { text = "MIT" } diff --git a/src/cli.py b/src/cli.py index c30e192..d44f983 100644 --- a/src/cli.py +++ b/src/cli.py @@ -7,6 +7,8 @@ import sys import threading import time +from collections import Counter +from pathlib import Path # Force PCI bus order so CUDA device indices match nvidia-smi numbering. # Must be set before any CUDA context is created. @@ -145,12 +147,66 @@ def _fmt_time(seconds: float) -> str: return f"{mins}m {secs}s" +def _mkdir_demucs(demucs_path: Path, base_path: Path, demucs_on_ramdisk: bool) -> tuple[Path, bool]: + """Create demucs_path, prompting for local fallback on PermissionError. + + Returns (final_demucs_path, final_demucs_on_ramdisk). + """ + try: + demucs_path.mkdir(parents=True, exist_ok=True) + return demucs_path, demucs_on_ramdisk + except PermissionError: + console.print( + Panel( + "[bold yellow]/mnt/fast_scratch is not writable.[/bold yellow]\n\n" + "The RAM disk is mounted but the current user cannot write to it.\n" + "Remount with open permissions:\n\n" + " [dim]sudo mount -o remount,mode=1777 /mnt/fast_scratch[/dim]\n\n" + f" Fallback path: [bold]{base_path / 'demucs'}[/bold]", + title="[yellow bold]RAM Disk Not Writable[/yellow bold]", + border_style="yellow", + ) + ) + if not click.confirm("Continue using local storage for Demucs scratch?", default=False): + console.print("[dim]Aborted.[/dim]") + sys.exit(0) + demucs_path = base_path / "demucs" + demucs_path.mkdir(parents=True, exist_ok=True) + return demucs_path, False + + +def _resolve_demucs_scratch(base_path: Path) -> tuple[Path, bool]: + """Resolve the Demucs scratch directory, prompting if /mnt/fast_scratch is unavailable. + + Returns (demucs_path, demucs_on_ramdisk). Exits if the user declines local fallback. + """ + fast_scratch = Path("/mnt/fast_scratch") + if fast_scratch.is_mount(): + return fast_scratch / "demucs", True + console.print( + Panel( + "[bold yellow]/mnt/fast_scratch is not mounted.[/bold yellow]\n\n" + "The RAM disk is not available. Without it, Demucs scratch files will be\n" + "written to local storage, which is slower and increases SSD wear.\n\n" + f" Fallback path: [bold]{base_path / 'demucs'}[/bold]\n\n" + "To mount the RAM disk before running:\n" + " [dim]sudo mount -t tmpfs -o size=32G,mode=1777 tmpfs /mnt/fast_scratch[/dim]", + title="[yellow bold]RAM Disk Not Available[/yellow bold]", + border_style="yellow", + ) + ) + if not click.confirm("Continue using local storage for Demucs scratch?", default=False): + console.print("[dim]Aborted.[/dim]") + sys.exit(0) + return base_path / "demucs", False + + @click.group() def cli(): """Audio Refinery — Audio processing pipeline.""" -@cli.command() +@cli.command("separate") @click.argument("input_file", type=click.Path(exists=True, dir_okay=False, resolve_path=True)) @click.option( "-o", @@ -177,8 +233,6 @@ def cli(): ) def separate_cmd(input_file: str, output_dir: str, device: str, segment: int | None): """Run Demucs vocal separation on an audio file.""" - from pathlib import Path - input_path = Path(input_file) output_path = Path(output_dir) @@ -231,7 +285,7 @@ def separate_cmd(input_file: str, output_dir: str, device: str, segment: int | N ) -@cli.command() +@cli.command("diarize") @click.argument("input_file", type=click.Path(exists=True, dir_okay=False, resolve_path=True)) @click.option( "-d", @@ -275,8 +329,6 @@ def diarize_cmd( output_file: str | None, ): """Run Pyannote speaker diarization on an audio file.""" - from pathlib import Path - input_path = Path(input_file) speaker_hints = "" @@ -355,7 +407,7 @@ def diarize_cmd( ) -@cli.command() +@cli.command("transcribe") @click.argument("input_file", type=click.Path(exists=True, dir_okay=False, resolve_path=True)) @click.option( "-d", @@ -409,8 +461,6 @@ def transcribe_cmd( output_file: str | None, ): """Run WhisperX transcription on an audio file.""" - from pathlib import Path - input_path = Path(input_file) diarization_path = Path(diarization_file) if diarization_file else None @@ -538,8 +588,6 @@ def sentiment_cmd(transcription_file: str, model: str, device: str, output_file: TRANSCRIPTION_FILE is also updated in place with sentiment fields merged into each segment, giving a single enriched output for downstream use. """ - from pathlib import Path - tx_path = Path(transcription_file) console.print( @@ -824,8 +872,6 @@ def pipeline( /mnt/fast_scratch/demucs — RAM disk (used automatically if mounted) /demucs — disk fallback (requires confirmation) """ - from pathlib import Path - from rich.progress import BarColumn, MofNCompleteColumn, Progress, SpinnerColumn, TextColumn, TimeElapsedColumn from src.pipeline import discover_files @@ -860,49 +906,9 @@ def pipeline( demucs_path = Path(demucs_dir) demucs_on_ramdisk = demucs_path.is_mount() else: - fast_scratch = Path("/mnt/fast_scratch") - if fast_scratch.is_mount(): - demucs_path = fast_scratch / "demucs" - demucs_on_ramdisk = True - else: - console.print( - Panel( - "[bold yellow]/mnt/fast_scratch is not mounted.[/bold yellow]\n\n" - "The RAM disk is not available. Without it, Demucs scratch files will be\n" - "written to local storage, which is slower and increases SSD wear.\n\n" - f" Fallback path: [bold]{base_path / 'demucs'}[/bold]\n\n" - "To mount the RAM disk before running:\n" - " [dim]sudo mount -t tmpfs -o size=32G,mode=1777 tmpfs /mnt/fast_scratch[/dim]", - title="[yellow bold]RAM Disk Not Available[/yellow bold]", - border_style="yellow", - ) - ) - if not click.confirm("Continue using local storage for Demucs scratch?", default=False): - console.print("[dim]Aborted.[/dim]") - sys.exit(0) - demucs_path = base_path / "demucs" - demucs_on_ramdisk = False + demucs_path, demucs_on_ramdisk = _resolve_demucs_scratch(base_path) - try: - demucs_path.mkdir(parents=True, exist_ok=True) - except PermissionError: - console.print( - Panel( - "[bold yellow]/mnt/fast_scratch is not writable.[/bold yellow]\n\n" - "The RAM disk is mounted but the current user cannot write to it.\n" - "Remount with open permissions:\n\n" - " [dim]sudo mount -o remount,mode=1777 /mnt/fast_scratch[/dim]\n\n" - f" Fallback path: [bold]{base_path / 'demucs'}[/bold]", - title="[yellow bold]RAM Disk Not Writable[/yellow bold]", - border_style="yellow", - ) - ) - if not click.confirm("Continue using local storage for Demucs scratch?", default=False): - console.print("[dim]Aborted.[/dim]") - sys.exit(0) - demucs_path = base_path / "demucs" - demucs_on_ramdisk = False - demucs_path.mkdir(parents=True, exist_ok=True) + demucs_path, demucs_on_ramdisk = _mkdir_demucs(demucs_path, base_path, demucs_on_ramdisk) for path in [diar_path, tx_path, summary_dir]: path.mkdir(parents=True, exist_ok=True) if sentiment: @@ -1010,9 +1016,9 @@ def _on_progress(content_id: str, stage: str, i: int, n: int) -> None: if _progress_file is not None: _progress_file.write_text(json.dumps({"done": i, "total": n, "current": "", "stage": "done"})) return - colour = _stage_colours.get(stage, "cyan") + stage_colour = _stage_colours.get(stage, "cyan") label = _stage_labels.get(stage, stage) - # Update temperature reading at most once every 5 s. + # Update the temperature reading at most once every 5 s. now = time.monotonic() if device != "cpu" and now - _temp_state["ts"] >= 5.0: _t = query_gpu_temperature(_cuda_idx) @@ -1021,7 +1027,9 @@ def _on_progress(content_id: str, stage: str, i: int, n: int) -> None: if _t is not None: _temp_state["readings"].append(_t) temp_str = f" · {_fmt_temp(_temp_state['value'], temp_limit)}" if device != "cpu" else "" - progress.update(task, completed=i, description=f"[{colour}]{label}[/{colour}] · {content_id}{temp_str}") + progress.update( + task, completed=i, description=f"[{stage_colour}]{label}[/{stage_colour}] · {content_id}{temp_str}" + ) # Plain-text fallback when stdout is redirected to a log file (no TTY). if not sys.stdout.isatty(): click.echo(f"[{time.strftime('%H:%M:%S')}] {label} {i}/{n} · {content_id}") @@ -1249,6 +1257,35 @@ def _stage_stats(stage_result): completed=completed, failures=len(all_failures), elapsed_seconds=total_time, + stages={ + "separation": { + "processed": sep_result.n_succeeded, + "skipped": sep_result.n_skipped, + "failed": sep_result.n_failed, + }, + "diarization": { + "processed": diar_result.n_succeeded, + "skipped": diar_result.n_skipped, + "failed": diar_result.n_failed, + }, + "transcription": { + "processed": tx_result.n_succeeded, + "skipped": tx_result.n_skipped, + "failed": tx_result.n_failed, + }, + **( + { + "sentiment": { + "processed": sent_result.n_succeeded, + "skipped": sent_result.n_skipped, + "failed": sent_result.n_failed, + } + } + if sentiment + else {} + ), + }, + avg_per_file_seconds=pipeline_avg, ) if all_failures: @@ -1393,7 +1430,6 @@ def pipeline_parallel( /summary/ — per-worker summaries + combined_report.json """ from datetime import UTC, datetime - from pathlib import Path from src.pipeline import discover_files, partition_ids @@ -1423,53 +1459,13 @@ def pipeline_parallel( _warn_if_gpu_busy(list(devices)) # ── Resolve Demucs scratch (interactive if needed; performed once here) ── - fast_scratch = Path("/mnt/fast_scratch") - if fast_scratch.is_mount(): - demucs_path = fast_scratch / "demucs" - demucs_on_ramdisk = True - else: - console.print( - Panel( - "[bold yellow]/mnt/fast_scratch is not mounted.[/bold yellow]\n\n" - "The RAM disk is not available. Without it, Demucs scratch files will be\n" - "written to local storage, which is slower and increases SSD wear.\n\n" - f" Fallback path: [bold]{base_path / 'demucs'}[/bold]\n\n" - "To mount the RAM disk before running:\n" - " [dim]sudo mount -t tmpfs -o size=32G,mode=1777 tmpfs /mnt/fast_scratch[/dim]", - title="[yellow bold]RAM Disk Not Available[/yellow bold]", - border_style="yellow", - ) - ) - if not click.confirm("Continue using local storage for Demucs scratch?", default=False): - console.print("[dim]Aborted.[/dim]") - sys.exit(0) - demucs_path = base_path / "demucs" - demucs_on_ramdisk = False + demucs_path, demucs_on_ramdisk = _resolve_demucs_scratch(base_path) # ── Create working directories ────────────────────────────────────────── manifests_dir = base_path / "manifests" logs_dir = base_path / "logs" summary_dir = base_path / "summary" - try: - demucs_path.mkdir(parents=True, exist_ok=True) - except PermissionError: - console.print( - Panel( - "[bold yellow]/mnt/fast_scratch is not writable.[/bold yellow]\n\n" - "The RAM disk is mounted but the current user cannot write to it.\n" - "Remount with open permissions:\n\n" - " [dim]sudo mount -o remount,mode=1777 /mnt/fast_scratch[/dim]\n\n" - f" Fallback path: [bold]{base_path / 'demucs'}[/bold]", - title="[yellow bold]RAM Disk Not Writable[/yellow bold]", - border_style="yellow", - ) - ) - if not click.confirm("Continue using local storage for Demucs scratch?", default=False): - console.print("[dim]Aborted.[/dim]") - sys.exit(0) - demucs_path = base_path / "demucs" - demucs_on_ramdisk = False - demucs_path.mkdir(parents=True, exist_ok=True) + demucs_path, demucs_on_ramdisk = _mkdir_demucs(demucs_path, base_path, demucs_on_ramdisk) for path in [manifests_dir, logs_dir, summary_dir]: path.mkdir(parents=True, exist_ok=True) @@ -1534,14 +1530,16 @@ def pipeline_parallel( # ── Build worker commands ─────────────────────────────────────────────── refinery_cmd = sys.argv[0] - def _build_worker_cmd(device: str, manifest_path: Path, summary_path: Path, progress_path: Path) -> list[str]: + def _build_worker_cmd( + worker_device: str, manifest_path: Path, summary_path: Path, progress_path: Path + ) -> list[str]: cmd = [ refinery_cmd, "pipeline", "--base-dir", str(base_path), "--device", - device, + worker_device, "--demucs-dir", str(demucs_path), "--manifest", @@ -1579,9 +1577,9 @@ def _build_worker_cmd(device: str, manifest_path: Path, summary_path: Path, prog scratch_suffix = "(RAM disk)" if demucs_on_ramdisk else "(disk)" tflops_table = load_tflops_table() - def _gpu_stat_line(device: str) -> str: - idx = int(device.split(":")[1]) if ":" in device else 0 - info = query_gpu_info(idx) + def _gpu_stat_line(gpu_device: str) -> str: + gpu_idx = int(gpu_device.split(":")[1]) if ":" in gpu_device else 0 + info = query_gpu_info(gpu_idx) if info is None: return "[dim]GPU info unavailable[/dim]" vram_gb = round(info.vram_mib / 1024) @@ -1621,10 +1619,10 @@ def _gpu_stat_line(device: str) -> str: gpu_temps: dict[str, int | None] = {w["device"]: None for w in workers} gpu_temp_readings: dict[str, list[int]] = {w["device"]: [] for w in workers} - def _read_progress(path: Path) -> dict: + def _read_progress(fpath: Path) -> dict: try: - return json.loads(path.read_text()) - except Exception: + return json.loads(fpath.read_text()) + except (OSError, json.JSONDecodeError): return {"done": 0, "total": "?", "current": "—", "stage": "starting"} def _worker_status_table() -> Table: @@ -1642,18 +1640,18 @@ def _worker_status_table() -> Table: tbl.add_column("Stage", width=10) tbl.add_column("File") tbl.add_column("Progress", justify="right", width=10) - for w in workers: - p = _read_progress(w["progress"]) + for wdict in workers: + p = _read_progress(wdict["progress"]) done, total_w = p.get("done", 0), p.get("total", "?") stage_val = p.get("stage", "—") if stage_val == "done": - n_failures = p.get("failures", None) - stage_display = "[yellow]Done[/yellow]" if n_failures else "[green]Done[/green]" + worker_failures = p.get("failures", None) + stage_display = "[yellow]Done[/yellow]" if worker_failures else "[green]Done[/green]" file_display = "" else: stage_display, file_display = stage_val, p.get("current", "—") - temp_display = _fmt_temp(gpu_temps.get(w["device"]), temp_limit) - tbl.add_row(w["label"], w["device"], temp_display, stage_display, file_display, f"{done}/{total_w}") + temp_display = _fmt_temp(gpu_temps.get(wdict["device"]), temp_limit) + tbl.add_row(wdict["label"], wdict["device"], temp_display, stage_display, file_display, f"{done}/{total_w}") return tbl try: @@ -1699,18 +1697,13 @@ def _worker_status_table() -> Table: total_time = time.monotonic() - t0 - # ── Report exit status ────────────────────────────────────────────────── - for w in workers: - ok = w["rc"] == 0 - status = "[green]OK[/green]" if ok else f"[red]FAILED (exit {w['rc']})[/red]" - console.print(f"Worker {w['label']} ({w['device']}): {status}") console.print(f"[dim]Total wall-clock time: {_fmt_time(total_time)}[/dim]\n") # ── Aggregate summaries ───────────────────────────────────────────────── - def _load_summary(path: Path) -> dict | None: + def _load_summary(fpath: Path) -> dict | None: try: - return json.loads(path.read_text()) - except Exception: + return json.loads(fpath.read_text()) + except (OSError, json.JSONDecodeError): return None all_combined_failures: list[dict] = [] @@ -1722,6 +1715,19 @@ def _load_summary(path: Path) -> dict | None: for f in summary.get("failures", []): all_combined_failures.append({"worker": w["label"], "device": w["device"], **f}) + # ── Report exit status ────────────────────────────────────────────────── + for i, w in enumerate(workers): + summary = worker_summaries[i] + if w["rc"] == 0: + status = "[green]OK[/green]" + elif summary is not None: + n_failures = len(summary.get("failures", [])) + label = "failure" if n_failures == 1 else "failures" + status = f"[yellow]Completed ({n_failures} file {label})[/yellow]" + else: + status = f"[red]FAILED (exit {w['rc']})[/red]" + console.print(f"Worker {w['label']} ({w['device']}): {status}") + _notif_processed = 0 _notif_failures = 0 @@ -1732,11 +1738,11 @@ def _load_summary(path: Path) -> dict | None: combined.add_column("Skipped", justify="right") combined.add_column("Failed", justify="right") - def _agg(key: str, sub: str) -> int: + def _agg(agg_key: str, sub: str) -> int: agg_total = 0 for s in worker_summaries: if s: - agg_total += s.get("stages", {}).get(key, {}).get(sub, 0) + agg_total += s.get("stages", {}).get(agg_key, {}).get(sub, 0) return agg_total stage_rows = [ @@ -1776,6 +1782,10 @@ def _agg(key: str, sub: str) -> int: # ── Combined failure report (printed) ────────────────────────────────── if all_combined_failures: + failure_counts: Counter = Counter() + for f in all_combined_failures: + key = (f.get("worker", ""), f.get("device", ""), f.get("stage", ""), f.get("error", "")) + failure_counts[key] += 1 fail_table = Table( title=f"Combined Failure Report ({len(all_combined_failures)} failures)", border_style="red", @@ -1783,16 +1793,10 @@ def _agg(key: str, sub: str) -> int: fail_table.add_column("Worker", style="bold") fail_table.add_column("Device") fail_table.add_column("Stage") - fail_table.add_column("Content ID") + fail_table.add_column("Count", justify="right") fail_table.add_column("Error") - for f in all_combined_failures: - fail_table.add_row( - f.get("worker", ""), - f.get("device", ""), - f.get("stage", ""), - f.get("content_id", ""), - f.get("error", ""), - ) + for (worker, device, stage, error), count in sorted(failure_counts.items()): + fail_table.add_row(worker, device, stage, str(count), error) console.print(fail_table) # ── Write combined_report.json (always) ──────────────────────────────── @@ -1805,14 +1809,24 @@ def _agg(key: str, sub: str) -> int: for device, readings in gpu_temp_readings.items() if readings } or None + _cr_n_files = len(all_ids) + _cr_total_audio_hours = round(sum(s.get("total_audio_hours", 0.0) for s in worker_summaries if s), 4) + _cr_source_audio_bytes = sum(s.get("source_audio_bytes", 0) for s in worker_summaries if s) + _cr_total_words = sum(s.get("total_words", 0) for s in worker_summaries if s) + _cr_total_segments = sum(s.get("total_segments", 0) for s in worker_summaries if s) + _cr_source_mb = _cr_source_audio_bytes / 1_000_000 combined_report = { "run_at": datetime.now(UTC).isoformat(), - "total_discovered": len(all_ids), + "total_discovered": _cr_n_files, "total_time_seconds": round(total_time, 2), - "total_audio_hours": round(sum(s.get("total_audio_hours", 0.0) for s in worker_summaries if s), 4), - "source_audio_bytes": sum(s.get("source_audio_bytes", 0) for s in worker_summaries if s), - "total_words": sum(s.get("total_words", 0) for s in worker_summaries if s), - "total_segments": sum(s.get("total_segments", 0) for s in worker_summaries if s), + "total_audio_hours": _cr_total_audio_hours, + "source_audio_bytes": _cr_source_audio_bytes, + "total_words": _cr_total_words, + "total_segments": _cr_total_segments, + "avg_time_per_file_seconds": round(total_time / _cr_n_files, 2) if _cr_n_files else None, + "avg_time_per_mb_seconds": round(total_time / _cr_source_mb, 4) if _cr_source_mb else None, + "processing_speed_ratio": round(_cr_total_audio_hours * 3600 / total_time, 3) if total_time else None, + "words_per_audio_hour": round(_cr_total_words / _cr_total_audio_hours, 1) if _cr_total_audio_hours else None, "gpu_temp_celsius": _gpu_temp_summaries, "workers": [ { @@ -1829,21 +1843,32 @@ def _agg(key: str, sub: str) -> int: combined_report_path.write_text(json.dumps(combined_report, indent=2)) console.print(f"[dim]Combined report written to: {combined_report_path}[/dim]") - worker_statuses = [(w["label"], w["device"], w["rc"] == 0) for w in workers] + worker_statuses = [] + for i, w in enumerate(workers): + ws = worker_summaries[i] + n_fail = len(ws.get("failures", [])) if ws is not None else 0 + worker_statuses.append((w["label"], w["device"], w["rc"], n_fail)) + _parallel_stages: dict[str, dict[str, int]] | None = None + if any(s for s in worker_summaries): + _parallel_stages = { + stage_key: { + "processed": _agg(stage_key, "processed"), + "skipped": _agg(stage_key, "skipped"), + "failed": _agg(stage_key, "failed"), + } + for stage_key in (["separation", "diarization", "transcription"] + (["sentiment"] if sentiment else [])) + } notify_pipeline_parallel_complete( worker_statuses=worker_statuses, total_discovered=len(all_ids), total_processed=_notif_processed, failures=_notif_failures, elapsed_seconds=total_time, + stages=_parallel_stages, + avg_per_file_seconds=combined_avg if _notif_processed else 0.0, ) if not all(w["rc"] == 0 for w in workers): failed_logs = "\n".join(f" {w['log']}" for w in workers) console.print(f"[dim]Worker logs retained for inspection:[/dim]\n{failed_logs}") sys.exit(1) - - -cli.add_command(separate_cmd, name="separate") -cli.add_command(diarize_cmd, name="diarize") -cli.add_command(transcribe_cmd, name="transcribe") diff --git a/src/diarizer.py b/src/diarizer.py index 9aa6110..c8840f1 100644 --- a/src/diarizer.py +++ b/src/diarizer.py @@ -49,7 +49,7 @@ def __init__(self, message: str): def _resolve_hf_token(hf_token: str | None) -> str: - """Return the HuggingFace token, loading from environment if not provided. + """Return the HuggingFace token, loading from the environment if not provided. Raises DiarizationError if no token is found. """ @@ -87,7 +87,7 @@ def load_pipeline(model: str, device: str, hf_token: str): try: import torch - # Pass the token via environment variable rather than as a kwarg. + # Pass the token via the environment variable rather than as a kwarg. # pyannote's from_pretrained() signature varies across versions, but # huggingface_hub always reads HF_TOKEN from the environment automatically. os.environ["HF_TOKEN"] = hf_token @@ -126,6 +126,8 @@ def diarize( max_speakers: Optional upper bound on speaker count. hf_token: HuggingFace token. If None, reads from HF_TOKEN env var. model: Pyannote model ID. + _pipeline: Pre-loaded Pyannote pipeline instance. Skips model loading when + provided; intended for testing and pipeline reuse. Returns: DiarizationResult with full provenance of the diarization run. diff --git a/src/gpu_utils.py b/src/gpu_utils.py index ba5ce45..0c36abd 100644 --- a/src/gpu_utils.py +++ b/src/gpu_utils.py @@ -60,7 +60,7 @@ def load_tflops_table() -> dict[str, float]: with open(_TFLOPS_TABLE_PATH, "rb") as f: data = tomllib.load(f) return {k: float(v) for k, v in data.get("tflops", {}).items()} - except Exception: + except (OSError, KeyError, ValueError, tomllib.TOMLDecodeError): return {} @@ -123,13 +123,13 @@ def _sort_key(g: tuple[int, int, int, str]) -> tuple: tflops = lookup_tflops(name, tflops_table) if tflops is not None: # Tier 1: known GPU — rank by TFLOPS. - return (1, tflops, 0, 0) + return 1, tflops, 0, 0 # Tier 0: unknown GPU — rank by rounded VRAM then SM clock. - return (0, 0.0, round(mem_mib / 1024), sm_clock) + return 0, 0.0, round(mem_mib / 1024), sm_clock gpus.sort(key=_sort_key, reverse=True) return tuple(f"cuda:{g[0]}" for g in gpus) - except Exception: + except (subprocess.TimeoutExpired, subprocess.SubprocessError, OSError, ValueError): return ("cuda:0",) diff --git a/src/notifier.py b/src/notifier.py index 729b2cd..8751182 100644 --- a/src/notifier.py +++ b/src/notifier.py @@ -1,7 +1,7 @@ """Slack webhook notifications for pipeline events. Set ``SLACK_WEBHOOK_URL`` in your ``.env`` file or the shell environment -to enable. All functions are fire-and-forget — errors are silently ignored so +to enable. All functions are fire-and-forget — errors are silently ignored, so a notification failure can never block or abort the pipeline. """ @@ -12,17 +12,16 @@ import urllib.error import urllib.request +try: + from dotenv import load_dotenv as _load_dotenv +except ImportError: + _load_dotenv = None # type: ignore[assignment] + def _send(text: str) -> None: """POST a plain-text Slack message to the configured webhook URL, if any.""" - # Load .env lazily so the webhook URL is available even if this fires before - # the diarizer stage (which is when load_dotenv() normally runs). - try: - from dotenv import load_dotenv - - load_dotenv() - except ImportError: - pass + if _load_dotenv is not None: + _load_dotenv() url = os.getenv("SLACK_WEBHOOK_URL") if not url: @@ -31,7 +30,7 @@ def _send(text: str) -> None: data = json.dumps({"text": text}).encode() req = urllib.request.Request(url, data=data, headers={"Content-Type": "application/json"}) urllib.request.urlopen(req, timeout=5) - except Exception: + except (urllib.error.URLError, OSError, ValueError): pass # Never raise — notification failure must not interrupt the pipeline @@ -40,20 +39,56 @@ def _fmt_elapsed(seconds: float) -> str: return f"{mins}m {secs}s" if mins else f"{secs}s" +_STAGE_LABELS = { + "separation": "Vocal separation", + "diarization": "Speaker diarization", + "transcription": "Transcription", + "sentiment": "Text sentiment", +} + + def notify_pipeline_complete( device: str, total: int, completed: int, failures: int, elapsed_seconds: float, + stages: dict[str, dict[str, int]] | None = None, + avg_per_file_seconds: float = 0.0, ) -> None: - """Send a Slack notification when a single-GPU pipeline run ends.""" + """Send a Slack notification when a single-GPU pipeline run ends. + + Args: + device: PyTorch device string, e.g. ``"cuda:0"``. + total: Total files discovered. + completed: Files successfully transcribed (processed + skipped). + failures: Total stage-level failures across all stages. + elapsed_seconds: Wall-clock runtime in seconds. + stages: Optional per-stage counts keyed by stage name, each containing + ``"processed"``, ``"skipped"``, and ``"failed"`` counts, e.g. + ``{"separation": {"processed": 12, "skipped": 0, "failed": 0}, ...}``. + avg_per_file_seconds: Average wall-clock time per file across the full pipeline. + """ elapsed = _fmt_elapsed(elapsed_seconds) if failures == 0: icon, status = ":white_check_mark:", "Pipeline complete" else: icon, status = ":warning:", f"Pipeline complete with {failures} failure(s)" - _send(f"{icon} *{status}* on `{device}`\n{completed}/{total} files processed in {elapsed}") + avg_str = f" · avg/file: {_fmt_elapsed(avg_per_file_seconds)}" if avg_per_file_seconds else "" + lines = [ + f"{icon} *{status}* on `{device}`", + f"{completed}/{total} files transcribed in {elapsed}{avg_str}", + ] + if stages: + for stage_key, stage_label in _STAGE_LABELS.items(): + s = stages.get(stage_key) + if s is None: + continue + lines.append( + f"{stage_label}: {s.get('processed', 0)} processed, " + f"{s.get('skipped', 0)} skipped, {s.get('failed', 0)} failed" + ) + _send("\n".join(lines)) def notify_thermal_shutdown(device: str, temp: int, limit: int) -> None: @@ -62,34 +97,52 @@ def notify_thermal_shutdown(device: str, temp: int, limit: int) -> None: def notify_pipeline_parallel_complete( - worker_statuses: list[tuple[str, str, bool]], + worker_statuses: list[tuple[str, str, int, int]], total_discovered: int, total_processed: int, failures: int, elapsed_seconds: float, + stages: dict[str, dict[str, int]] | None = None, + avg_per_file_seconds: float = 0.0, ) -> None: """Send a Slack notification when a multi-GPU pipeline-parallel run ends. Args: - worker_statuses: One ``(label, device, ok)`` tuple per worker, - e.g. ``[("W0", "cuda:0", True), ("W1", "cuda:1", False)]``. + worker_statuses: One ``(label, device, exit_code, n_failures)`` tuple per worker, + e.g. ``[("W0", "cuda:0", 0, 0), ("W1", "cuda:1", 1, 3)]``. total_discovered: Total files discovered across all workers. total_processed: Files successfully transcribed (processed + skipped). failures: Total stage-level failures across all workers. elapsed_seconds: Wall-clock runtime in seconds. + stages: Optional aggregated per-stage counts keyed by stage name, each + containing ``"processed"``, ``"skipped"``, and ``"failed"`` counts. + avg_per_file_seconds: Combined average wall-clock time per file. """ elapsed = _fmt_elapsed(elapsed_seconds) - all_ok = all(ok for _, _, ok in worker_statuses) and failures == 0 + all_ok = all(rc == 0 for _, _, rc, _ in worker_statuses) and failures == 0 icon = ":white_check_mark:" if all_ok else ":warning:" worker_parts = [] - for label, device, ok in worker_statuses: - status_icon = ":white_check_mark:" if ok else ":x:" - worker_parts.append(f"{label} (`{device}`): {status_icon}") + for label, device, rc, n_fail in worker_statuses: + if rc == 0: + worker_icon = ":white_check_mark:" + elif n_fail > 0: + worker_icon = f":warning: ({n_fail} failure{'s' if n_fail != 1 else ''})" + else: + worker_icon = f":x: (exit {rc})" + worker_parts.append(f"{label} (`{device}`): {worker_icon}") + avg_str = f" · avg/file: {_fmt_elapsed(avg_per_file_seconds)}" if avg_per_file_seconds else "" lines = [ f"{icon} *Pipeline-parallel {'complete' if all_ok else 'finished with issues'}*", " | ".join(worker_parts), - f"{total_processed}/{total_discovered} files processed in {elapsed}", + f"{total_processed}/{total_discovered} files transcribed in {elapsed}{avg_str}", ] - if failures > 0: - lines.append(f"{failures} file(s) failed") + if stages: + for stage_key, stage_label in _STAGE_LABELS.items(): + s = stages.get(stage_key) + if s is None: + continue + lines.append( + f"{stage_label}: {s.get('processed', 0)} processed, " + f"{s.get('skipped', 0)} skipped, {s.get('failed', 0)} failed" + ) _send("\n".join(lines)) diff --git a/src/pipeline.py b/src/pipeline.py index dc39f97..9b8c427 100644 --- a/src/pipeline.py +++ b/src/pipeline.py @@ -28,6 +28,7 @@ _has_torch = True except ImportError: + _torch = None # type: ignore[assignment] _has_torch = False from src.diarizer import ( @@ -218,7 +219,7 @@ def partition_ids(ids: list[str], n: int = 2) -> list[list[str]]: Worker i gets positions i, i+n, i+2n, ... This distributes workload more evenly than a naive chunked split when file durations correlate with naming. With n=2, - behavior is identical to the previous dual-worker implementation. + the behavior is identical to the previous dual-worker implementation. Args: ids: Ordered list of content_id strings (typically from discover_files()). @@ -594,6 +595,7 @@ def run_pipeline( demucs_output_dir: Demucs output root directory (RAM disk strongly recommended). diarization_dir: Directory for diarization_.json files. transcription_dir: Directory for transcription_.json files. + sentiment_dir: Directory for sentiment_.json files. device: GPU device string ('cuda', 'cuda:N', or 'cpu'). segment: Demucs segment size in seconds (VRAM optimisation). compute_type: WhisperX CTranslate2 compute type. @@ -662,7 +664,7 @@ def run_pipeline( # ── Load sentiment model if needed ──────────────────────────────────── # Text-only; independent of audio model availability. Loaded before the - # early-return check so a sentiment-only run (all audio done, sentiment + # early-return check, so a sentiment-only run (all audio done, sentiment # pending) falls through to Pass 3 rather than returning early. sentiment_pipeline_obj = None sentiment_load_error: str | None = None @@ -738,7 +740,7 @@ def run_pipeline( _sep_rtf = round(sep.processing_time_seconds / _sep_dur, 3) if _sep_dur > 0 else None _sep_vram = _read_vram(device) # no_vocals.wav is only needed by step 6 (CLAP). Delete it now unless - # events are enabled, to keep RAM disk usage as low as possible. + # events are enabled to keep RAM disk usage as low as possible. if not keep_scratch and not enable_events: _cleanup_stem(no_vocals) result.separation.outcomes.append( @@ -874,7 +876,7 @@ def run_pipeline( # ── Pass 3: Text Sentiment Analysis ─────────────────────────────────────── # Text-only step — reads the transcription JSON, produces per-segment scores. - # Runs after all audio stages so it covers both freshly-transcribed files + # Runs after all audio stages, so it covers both freshly-transcribed files # (processed in Pass 2) and previously-transcribed files (fast-tracked in # Pass 1). This means a first run with --sentiment on a previously-transcribed # base-dir produces sentiment for all files without re-running audio stages. diff --git a/src/separator.py b/src/separator.py index 1f6457c..8af87ae 100644 --- a/src/separator.py +++ b/src/separator.py @@ -21,7 +21,7 @@ class SeparationError(Exception): - """Raised when Demucs subprocess fails.""" + """Raised when the Demucs subprocess fails.""" def __init__(self, message: str, returncode: int | None = None, stderr: str = ""): super().__init__(message) diff --git a/src/transcriber.py b/src/transcriber.py index 0741a86..7a05997 100644 --- a/src/transcriber.py +++ b/src/transcriber.py @@ -56,7 +56,7 @@ def _suppress_output(): def _parse_whisperx_device(device: str) -> tuple[str, int]: """Split a PyTorch-style device string for ctranslate2's separate API. - ctranslate2 (whisperx's backend) takes device and device_index as separate + ctranslate2 (whisperx's backend) takes the device and device_index as separate parameters and does not accept 'cuda:N' as a combined string. PyTorch-style load_align_model and align() still use the original string. @@ -100,6 +100,8 @@ def transcribe( diarization_file: Optional path to a DiarizationResult JSON from step 2. When provided, speaker labels are merged into the transcript output. model: Whisper model size (default: 'large-v3'). + _whisperx_model: Pre-loaded WhisperX model instance. When provided, model + loading is skipped (used by the pipeline for batch efficiency). Returns: TranscriptionResult with full provenance of the transcription run. @@ -185,7 +187,7 @@ def transcribe( aligned = whisperx.align( raw_result["segments"], align_model, metadata, audio, device, return_char_alignments=False ) - except Exception: + except (RuntimeError, ValueError, OSError, KeyError): alignment_fallback = True aligned = raw_result diff --git a/uv.lock b/uv.lock index dd84456..3baca07 100644 --- a/uv.lock +++ b/uv.lock @@ -126,7 +126,7 @@ wheels = [ [[package]] name = "audio-refinery" -version = "0.1.0" +version = "0.1.1" source = { editable = "." } dependencies = [ { name = "click" },