diff --git a/helpers/transcribe.py b/helpers/transcribe.py index 26d3906..d05379f 100644 --- a/helpers/transcribe.py +++ b/helpers/transcribe.py @@ -1,15 +1,30 @@ -"""Transcribe a video with ElevenLabs Scribe. +"""Transcribe a video with FunASR (local, no API key required). + +Replaces the original ElevenLabs Scribe backend with FunASR (ModelScope's +open-source ASR toolkit). Runs fully on-device, supports Chinese + English +mixed speech, and emits the same JSON schema the rest of the pipeline +expects: + + { + "language_code": "zh" | "en" | ..., + "words": [ + {"type": "word", "text": "...", "start": s, "end": s, "speaker_id": "speaker_0"}, + {"type": "spacing", "text": " ", "start": s, "end": s}, + {"type": "audio_event", "text": "(laughter)", "start": s, "end": s, "speaker_id": "speaker_0"} + ] + } -Extracts mono 16kHz audio via ffmpeg, uploads to Scribe with verbatim + -diarize + audio events + word-level timestamps, writes the full response -to /transcripts/.json. +Pipeline: ffmpeg extracts mono 16kHz WAV → FunASR AutoModel runs ASR +(paraformer-zh) + VAD (fsmn-vad) + punctuation (ct-punc) + speaker +diarization (cam++) → character-level timestamps are flattened into the +word array above. Silence gaps detected by VAD become `spacing` entries. -Cached: if the output file already exists, the upload is skipped. +Cached: if the output file already exists, transcription is skipped. Usage: python helpers/transcribe.py python helpers/transcribe.py --edit-dir /custom/edit - python helpers/transcribe.py --language en + python helpers/transcribe.py --language zh python helpers/transcribe.py --num-speakers 2 """ @@ -17,33 +32,17 @@ import argparse import json -import os import subprocess import sys import tempfile import time from pathlib import Path -import requests - - -SCRIBE_URL = "https://api.elevenlabs.io/v1/speech-to-text" +# FunASR is heavy; import lazily so --help / imports elsewhere stay fast. +_MODEL_CACHE: dict = {} - -def load_api_key() -> str: - for candidate in [Path(__file__).resolve().parent.parent / ".env", Path(".env")]: - if candidate.exists(): - for line in candidate.read_text().splitlines(): - line = line.strip() - if not line or line.startswith("#") or "=" not in line: - continue - k, v = line.split("=", 1) - if k.strip() == "ELEVENLABS_API_KEY": - return v.strip().strip('"').strip("'") - v = os.environ.get("ELEVENLABS_API_KEY", "") - if not v: - sys.exit("ELEVENLABS_API_KEY not found in .env or environment") - return v +# Chinese + common punctuation to strip when aligning chars with timestamps. +_PUNCT = set(",。!?、;:“”‘’()《》【】「」『』〈〉…—·,.!?;:\"'()[]<>~ \t\n") def extract_audio(video_path: Path, dest: Path) -> None: @@ -55,42 +54,158 @@ def extract_audio(video_path: Path, dest: Path) -> None: subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) -def call_scribe( +def _get_model(language: str | None = None): + """Load and cache the FunASR AutoModel. + + Defaults to a Chinese-optimised stack (paraformer-zh). For English-only + input, pass language='en' → uses the multilingual Whisper bundle. + """ + key = (language or "zh").lower() + if key in _MODEL_CACHE: + return _MODEL_CACHE[key] + + from funasr import AutoModel # lazy import; pulls in torch the first time + + if key.startswith("en"): + # English / multilingual path. Whisper handles non-Chinese well. + model = AutoModel( + model="iic/Whisper-large-v3", + vad_model="fsmn-vad", + punc_model="ct-punc", + spk_model="cam++", + disable_update=True, + ) + else: + # Chinese (or mixed) path. Paraformer is faster and stronger on zh. + model = AutoModel( + model="paraformer-zh", + vad_model="fsmn-vad", + punc_model="ct-punc", + spk_model="cam++", + disable_update=True, + ) + + _MODEL_CACHE[key] = model + return model + + +def _is_punct(ch: str) -> bool: + return ch in _PUNCT + + +def _strip_chars(text: str) -> list[str]: + """Flatten sentence text into the list of speakable characters that + should line up 1:1 with the per-char timestamps returned by Paraformer. + + Punctuation, whitespace, and newlines are removed — they have no + timestamp in the paraformer output. + """ + return [c for c in text if not _is_punct(c)] + + +def _sentence_to_words(sent: dict, prev_end: float | None) -> tuple[list[dict], float]: + """Convert one FunASR sentence_info entry into word/spacing dicts.""" + out: list[dict] = [] + start_s = float(sent.get("start", 0)) / 1000.0 + end_s = float(sent.get("end", 0)) / 1000.0 + spk = sent.get("spk", 0) + speaker_id = f"speaker_{int(spk)}" + text = sent.get("text", "") or "" + ts = sent.get("timestamp") or [] + + if prev_end is not None and start_s > prev_end + 0.01: + out.append({ + "type": "spacing", + "text": " ", + "start": round(prev_end, 3), + "end": round(start_s, 3), + }) + + chars = _strip_chars(text) + + if ts and len(chars) == len(ts): + # Happy path: per-character timestamps align exactly. + for ch, pair in zip(chars, ts): + s_ms, e_ms = pair + out.append({ + "type": "word", + "text": ch, + "start": round(float(s_ms) / 1000.0, 3), + "end": round(float(e_ms) / 1000.0, 3), + "speaker_id": speaker_id, + }) + else: + # Fallback: emit whole sentence as one entry. Keeps the pipeline + # working even when punctuation or tagging breaks alignment. + out.append({ + "type": "word", + "text": text.strip(), + "start": round(start_s, 3), + "end": round(end_s, 3), + "speaker_id": speaker_id, + }) + + return out, end_s + + +def call_funasr( audio_path: Path, - api_key: str, language: str | None = None, num_speakers: int | None = None, ) -> dict: - data: dict[str, str] = { - "model_id": "scribe_v1", - "diarize": "true", - "tag_audio_events": "true", - "timestamps_granularity": "word", + """Run FunASR on a mono 16kHz WAV file and return an ElevenLabs-shaped dict.""" + model = _get_model(language) + + kwargs: dict = { + "input": str(audio_path), + "batch_size_s": 300, + "return_spk_res": True, } - if language: - data["language_code"] = language + # FunASR accepts a hint for expected speaker count on CAM++. if num_speakers: - data["num_speakers"] = str(num_speakers) - - with open(audio_path, "rb") as f: - resp = requests.post( - SCRIBE_URL, - headers={"xi-api-key": api_key}, - files={"file": (audio_path.name, f, "audio/wav")}, - data=data, - timeout=1800, - ) - - if resp.status_code != 200: - raise RuntimeError(f"Scribe returned {resp.status_code}: {resp.text[:500]}") - - return resp.json() + kwargs["preset_spk_num"] = int(num_speakers) + + res = model.generate(**kwargs) + + if not res: + return {"language_code": language or "auto", "words": []} + + record = res[0] if isinstance(res, list) else res + sentences = record.get("sentence_info") or [] + + words: list[dict] = [] + prev_end: float | None = None + for sent in sentences: + chunk, prev_end = _sentence_to_words(sent, prev_end) + words.extend(chunk) + + # Safety net: if diarization+VAD produced nothing, fall back to the + # flat text + word-level timestamps directly on the record. + if not words: + flat_ts = record.get("timestamp") or [] + flat_text = record.get("text", "") or "" + chars = _strip_chars(flat_text) + if flat_ts and len(chars) == len(flat_ts): + for ch, pair in zip(chars, flat_ts): + s_ms, e_ms = pair + words.append({ + "type": "word", + "text": ch, + "start": round(float(s_ms) / 1000.0, 3), + "end": round(float(e_ms) / 1000.0, 3), + "speaker_id": "speaker_0", + }) + + return { + "language_code": language or record.get("language") or "auto", + "text": record.get("text", ""), + "words": words, + } def transcribe_one( video: Path, edit_dir: Path, - api_key: str, language: str | None = None, num_speakers: int | None = None, verbose: bool = True, @@ -117,10 +232,10 @@ def transcribe_one( extract_audio(video, audio) size_mb = audio.stat().st_size / (1024 * 1024) if verbose: - print(f" uploading {video.stem}.wav ({size_mb:.1f} MB)", flush=True) - payload = call_scribe(audio, api_key, language, num_speakers) + print(f" transcribing {video.stem}.wav ({size_mb:.1f} MB) with FunASR", flush=True) + payload = call_funasr(audio, language, num_speakers) - out_path.write_text(json.dumps(payload, indent=2)) + out_path.write_text(json.dumps(payload, indent=2, ensure_ascii=False)) dt = time.time() - t0 if verbose: @@ -133,7 +248,7 @@ def transcribe_one( def main() -> None: - ap = argparse.ArgumentParser(description="Transcribe a video with ElevenLabs Scribe") + ap = argparse.ArgumentParser(description="Transcribe a video with FunASR (local)") ap.add_argument("video", type=Path, help="Path to video file") ap.add_argument( "--edit-dir", @@ -145,13 +260,13 @@ def main() -> None: "--language", type=str, default=None, - help="Optional ISO language code (e.g., 'en'). Omit to auto-detect.", + help="Optional ISO language code ('zh' or 'en'). Omit for Chinese default.", ) ap.add_argument( "--num-speakers", type=int, default=None, - help="Optional number of speakers when known. Improves diarization accuracy.", + help="Optional expected number of speakers. Improves diarization accuracy.", ) args = ap.parse_args() @@ -160,12 +275,10 @@ def main() -> None: sys.exit(f"video not found: {video}") edit_dir = (args.edit_dir or (video.parent / "edit")).resolve() - api_key = load_api_key() transcribe_one( video=video, edit_dir=edit_dir, - api_key=api_key, language=args.language, num_speakers=args.num_speakers, ) diff --git a/helpers/transcribe_batch.py b/helpers/transcribe_batch.py index 5aeb1d6..d3349e1 100644 --- a/helpers/transcribe_batch.py +++ b/helpers/transcribe_batch.py @@ -1,8 +1,14 @@ -"""Batch-transcribe every video in a directory with 4 parallel workers. +"""Batch-transcribe every video in a directory. -Walks for common video extensions, runs ElevenLabs Scribe on +Walks for common video extensions, runs FunASR locally on each, writes transcripts to /edit/transcripts/.json. +FunASR runs in-process and is not fork-safe across models; by default we +transcribe sequentially so the shared AutoModel (plus cached torch +weights) is reused. Passing --workers > 1 spawns more threads but each +will still serialise on the GIL and may thrash memory — recommended +only for very short clips. + Cached per-file: any source that already has a transcript is skipped. Usage: @@ -20,7 +26,7 @@ from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path -from transcribe import load_api_key, transcribe_one +from transcribe import transcribe_one VIDEO_EXTS = {".mp4", ".MP4", ".mov", ".MOV", ".mkv", ".MKV", ".avi", ".AVI", ".m4v"} @@ -43,7 +49,12 @@ def main() -> None: default=None, help="Edit output directory (default: /edit)", ) - ap.add_argument("--workers", type=int, default=4, help="Parallel workers (default: 4)") + ap.add_argument( + "--workers", + type=int, + default=1, + help="Parallel workers (default: 1; FunASR is heavy, keep it low)", + ) ap.add_argument( "--language", type=str, @@ -77,9 +88,7 @@ def main() -> None: print("nothing to do") return - api_key = load_api_key() - - print(f"transcribing {len(pending)} files with {args.workers} parallel workers") + print(f"transcribing {len(pending)} files with {args.workers} parallel workers (FunASR / local)") t0 = time.time() errors: list[tuple[Path, str]] = [] @@ -89,7 +98,6 @@ def main() -> None: transcribe_one, video=v, edit_dir=edit_dir, - api_key=api_key, language=args.language, num_speakers=args.num_speakers, verbose=False,