Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,7 @@ yarn-debug.log*
yarn-error.log*

minio-data/

__pycache__/
*.pyc
.husky/_
21 changes: 21 additions & 0 deletions analysis/hallucination-audit/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Hallucination audit workspace

This folder contains reusable scripts for transcription hallucination audits.

Key doc

- docs/hallucination-audit-20260210.md

Scripts

- analysis/hallucination-audit/run_audit.py
- analysis/hallucination-audit/compute_audio_volume.py
- analysis/hallucination-audit/download_full_audio.py
- analysis/hallucination-audit/transcribe_full_audio.py
- analysis/hallucination-audit/align_with_full_transcript.py
- analysis/hallucination-audit/create_langfuse_dataset_sample.py

Notes

- Raw meeting artifacts are intentionally not stored in this branch.
- Keep large audio files and raw trace dumps in dedicated audit branches or local workspace storage.
201 changes: 201 additions & 0 deletions analysis/hallucination-audit/align_with_full_transcript.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
import argparse
import csv
import json
import re
from pathlib import Path
from typing import Dict, List, Optional, Tuple


def normalize_text(value: str) -> str:
lowered = value.lower()
stripped = re.sub(r"[^a-z0-9\s]", " ", lowered)
collapsed = re.sub(r"\s+", " ", stripped).strip()
return collapsed


def levenshtein_distance(a: str, b: str) -> int:
if a == b:
return 0
if not a:
return len(b)
if not b:
return len(a)
if len(a) < len(b):
a, b = b, a
previous = list(range(len(b) + 1))
for i, ca in enumerate(a, start=1):
current = [i]
for j, cb in enumerate(b, start=1):
insert_cost = current[j - 1] + 1
delete_cost = previous[j] + 1
replace_cost = previous[j - 1] + (0 if ca == cb else 1)
current.append(min(insert_cost, delete_cost, replace_cost))
previous = current
return previous[-1]


def build_index(words: List[str]) -> Dict[str, List[int]]:
index: Dict[str, List[int]] = {}
for pos, word in enumerate(words):
if len(word) < 4:
continue
index.setdefault(word, []).append(pos)
return index


def find_subsequence_window(
snippet_words: List[str],
full_words: List[str],
) -> Optional[Tuple[int, int]]:
if not snippet_words or not full_words or len(snippet_words) > len(full_words):
return None
snippet_length = len(snippet_words)
last_start = len(full_words) - snippet_length
for start in range(last_start + 1):
if full_words[start : start + snippet_length] == snippet_words:
return (start, start + snippet_length)
return None


def best_match(
snippet_text: str,
snippet_words: List[str],
full_text: str,
full_words: List[str],
index: Dict[str, List[int]],
) -> Tuple[Optional[float], str, Optional[Tuple[int, int]]]:
if not snippet_text:
return None, "empty", None
if snippet_text in full_text:
return 1.0, "substring", find_subsequence_window(snippet_words, full_words)

unique_words = sorted(set(snippet_words), key=len, reverse=True)
candidates = [word for word in unique_words if len(word) >= 4][:3]
if not candidates:
return None, "no_candidates", None

window_size = max(8, min(len(full_words), len(snippet_words) + 6))
snippet_word_set = set(snippet_words)
best_score: Optional[float] = None
best_window: Optional[Tuple[int, int]] = None
for word in candidates:
positions = index.get(word, [])
if len(positions) > 100:
positions = positions[:100]
for pos in positions:
start = max(0, pos - 3)
end = min(len(full_words), start + window_size)
window_text = " ".join(full_words[start:end])
if not window_text:
continue

if best_score is not None:
max_possible = 1 - (
abs(len(snippet_text) - len(window_text))
/ max(len(snippet_text), len(window_text))
)
if max_possible <= best_score:
continue

window_word_set = set(full_words[start:end])
if snippet_word_set and window_word_set:
overlap_ratio = len(snippet_word_set & window_word_set) / len(
snippet_word_set
)
if overlap_ratio < 0.25:
continue

dist = levenshtein_distance(snippet_text, window_text)
ratio = dist / max(len(snippet_text), len(window_text))
score = 1 - ratio
if best_score is None or score > best_score:
best_score = score
best_window = (start, end)
return best_score, "fuzzy", best_window
Comment on lines +37 to +114

Copilot AI Feb 13, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function contains several magic numbers that should be extracted as named constants for better code clarity:

  • 4 (lines 40, 73): minimum word length for indexing and candidate selection
  • 3 (line 73): maximum number of candidate words
  • 8 (line 77): minimum window size
  • 6 (line 77): window size padding
  • 100 (line 83): maximum positions to check per word
  • 3 (line 86): position offset for window start
  • 0.25 (line 105): minimum overlap ratio threshold

These magic numbers represent important thresholds for the fuzzy matching algorithm. Extracting them as named constants would make the algorithm's behavior more transparent and easier to tune.

Copilot uses AI. Check for mistakes.


def get_fieldnames(records: List[Dict[str, object]]) -> List[str]:
fieldnames: List[str] = []
seen = set()
for record in records:
for key in record.keys():
if key in seen:
continue
seen.add(key)
fieldnames.append(key)
return fieldnames


def normalize_csv_value(value: object) -> str:
if isinstance(value, list):
return "|".join(str(item) for item in value)
if isinstance(value, dict):
return json.dumps(value)
if value is None:
return ""
return str(value)


def main() -> None:
parser = argparse.ArgumentParser(
description="Align snippet transcripts with full transcript text.",
)
parser.add_argument("--meeting-id", required=True)
parser.add_argument("--input", default="")
parser.add_argument("--threshold", type=float, default=0.85)
args = parser.parse_args()

meeting_dir = Path("analysis/hallucination-audit") / args.meeting_id
input_path = (
Path(args.input)
if args.input
else meeting_dir / "transcriptions_classified_with_audio.json"
)
full_path = meeting_dir / "full_transcript.txt"

if not input_path.exists():
raise SystemExit(f"Missing {input_path}")
if not full_path.exists():
raise SystemExit(f"Missing {full_path}")

records = json.loads(input_path.read_text(encoding="utf-8"))
full_text_raw = full_path.read_text(encoding="utf-8")
full_text = normalize_text(full_text_raw)
full_words = full_text.split()
index = build_index(full_words)

for record in records:
snippet_text_raw = record.get("output_text") or ""
snippet_text = normalize_text(snippet_text_raw)
snippet_words = snippet_text.split()
score, method, window = best_match(
snippet_text,
snippet_words,
full_text,
full_words,
index,
)
record["full_transcript_match_score"] = score
record["full_transcript_match_method"] = method
record["full_transcript_match_window"] = list(window) if window else None
record["full_transcript_match_found"] = bool(
score is not None and score >= args.threshold
)

output_json = meeting_dir / "transcriptions_classified_with_audio_and_full.json"
output_json.write_text(json.dumps(records, indent=2), encoding="utf-8")

output_csv = meeting_dir / "transcriptions_classified_with_audio_and_full.csv"
if records:
fields = get_fieldnames(records)
with output_csv.open("w", encoding="utf-8", newline="") as handle:
writer = csv.DictWriter(handle, fieldnames=fields)
writer.writeheader()
for record in records:
writer.writerow(
{field: normalize_csv_value(record.get(field)) for field in fields}
)


if __name__ == "__main__":
main()
Loading
Loading