Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,32 @@ docker build -t contentsplit .
docker run -p 8080:8080 -e OPENAI_API_KEY=sk-... contentsplit
```

## ✅ Quality Scoring Module

This repository also includes a deterministic quality scorer for structured submissions:

- implementation: `quality_scorer.py`
- tests: `tests/test_quality_scorer.py`
- sample scorecards (20): `sample_scorecards.json`

Run tests:

```bash
python3 -m unittest discover -s tests -p 'test_*.py'
```

Regenerate sample scorecards:

```bash
PYTHONPATH=. python3 scripts/generate_sample_scorecards.py
```

Evaluate scorer calibration against a provided ground-truth set:

```bash
PYTHONPATH=. python3 scripts/evaluate_ground_truth.py submissions.json ground_truth_scores.json 0.05
```

## License

MIT
252 changes: 252 additions & 0 deletions quality_scorer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
import json
import math
import re
from collections import Counter
from typing import Any

WEIGHTS = {
"completeness": 0.30,
"format_compliance": 0.20,
"coverage": 0.25,
"clarity": 0.15,
"validity": 0.10,
}


def clamp(value: float, low: float = 0.0, high: float = 1.0) -> float:
return max(low, min(high, value))


def detect_format(submission: str) -> str:
text = submission.strip()
if not text:
return "text"

if _is_json(text):
return "json"
if _is_markdown(text):
return "markdown"
if _is_code(text):
return "code"
return "text"


def score_submission(submission: str) -> dict[str, Any]:
detected_format = detect_format(submission)
dimensions = {
"completeness": _score_completeness(submission, detected_format),
"format_compliance": _score_format_compliance(submission, detected_format),
"coverage": _score_coverage(submission, detected_format),
"clarity": _score_clarity(submission, detected_format),
"validity": _score_validity(submission, detected_format),
}

weighted = sum(dimensions[name] * weight for name, weight in WEIGHTS.items())
weighted = round(clamp(weighted), 6)
pass_threshold = weighted >= 0.70

return {
"weighted_score": weighted,
"quality_rating": _quality_rating(weighted),
"scores": {name: round(value, 6) for name, value in dimensions.items()},
"feedback": _build_feedback(dimensions, detected_format),
"pass_threshold": pass_threshold,
"detected_format": detected_format,
}


def score_batch(submissions: list[str]) -> list[dict[str, Any]]:
return [score_submission(submission) for submission in submissions]


def evaluate_against_ground_truth(predictions: list[dict[str, Any]], ground_truth_scores: list[float]) -> float:
if len(predictions) != len(ground_truth_scores):
raise ValueError("predictions and ground_truth_scores must have the same length")
if not predictions:
return 0.0
abs_errors = []
for pred, truth in zip(predictions, ground_truth_scores):
abs_errors.append(abs(pred["weighted_score"] - truth))
return sum(abs_errors) / len(abs_errors)


def evaluate_ground_truth_submission_set(
submissions: list[str], ground_truth_scores: list[float], tolerance: float = 0.05
) -> dict[str, Any]:
predictions = score_batch(submissions)
mae = evaluate_against_ground_truth(predictions, ground_truth_scores)
return {
"count": len(submissions),
"mae": round(mae, 6),
"tolerance": tolerance,
"within_tolerance": mae <= tolerance,
}


def _is_json(text: str) -> bool:
if not text.startswith("{") and not text.startswith("["):
return False
try:
payload = json.loads(text)
except json.JSONDecodeError:
return False
return isinstance(payload, (dict, list))


def _is_markdown(text: str) -> bool:
patterns = [
r"(?m)^#{1,6}\s+\S+",
r"(?m)^[-*+]\s+\S+",
r"(?m)^\d+\.\s+\S+",
r"(?m)^```",
r"\[[^\]]+\]\([^)]+\)",
]
return any(re.search(pattern, text) for pattern in patterns)


def _is_code(text: str) -> bool:
patterns = [
r"(?m)^\s*(def|class|import|from)\s+\w+",
r"(?m)^\s*(function|const|let|var)\s+\w+",
r"(?m)^\s*#include\s+<",
r"(?m)^\s*(public|private|protected)\s+\w+",
r"[{};]{2,}",
]
return any(re.search(pattern, text) for pattern in patterns)


def _score_completeness(submission: str, detected_format: str) -> float:
text = submission.strip()
if not text:
return 0.0

if detected_format == "json":
payload = json.loads(text)
if isinstance(payload, dict):
total = len(payload)
non_empty = sum(1 for value in payload.values() if value not in (None, "", [], {}))
return clamp((non_empty / max(total, 1)) * 0.9 + (0.1 if total >= 3 else 0.0))
if isinstance(payload, list):
return clamp(min(len(payload), 5) / 5.0)

word_count = len(_tokenize_words(text))
structural_bonus = 0.0
if detected_format == "markdown":
structural_bonus = min(0.3, 0.05 * len(re.findall(r"(?m)^#{1,6}\s+\S+", text)))
elif detected_format == "code":
structural_bonus = min(0.3, 0.03 * len(re.findall(r"(?m)^\s*\w+", text)))
base = min(word_count / 120.0, 1.0)
return clamp(base * 0.8 + structural_bonus)


def _score_format_compliance(submission: str, detected_format: str) -> float:
text = submission.strip()
if not text:
return 0.0
if detected_format == "json":
try:
json.loads(text)
return 1.0
except json.JSONDecodeError:
return 0.2
if detected_format == "markdown":
heading_count = len(re.findall(r"(?m)^#{1,6}\s+\S+", text))
list_count = len(re.findall(r"(?m)^[-*+]\s+\S+", text))
return clamp(0.5 + min(0.5, 0.1 * heading_count + 0.05 * list_count))
if detected_format == "code":
syntax_markers = len(re.findall(r"[{}();:=]", text))
return clamp(0.5 + min(0.5, syntax_markers / 80.0))
return clamp(0.6 + min(0.4, len(_tokenize_words(text)) / 250.0))


def _score_coverage(submission: str, detected_format: str) -> float:
words = _tokenize_words(submission)
if not words:
return 0.0
unique_ratio = len(set(words)) / len(words)
density = min(len(words) / 180.0, 1.0)
structure = 0.0
if detected_format == "markdown":
structure = min(0.2, 0.04 * len(re.findall(r"(?m)^[-*+]\s+\S+", submission)))
if detected_format == "json":
payload = json.loads(submission)
if isinstance(payload, dict):
structure = min(0.2, 0.03 * len(payload))
return clamp(0.45 * unique_ratio + 0.45 * density + structure)


def _score_clarity(submission: str, detected_format: str) -> float:
text = submission.strip()
if not text:
return 0.0

if detected_format == "json":
payload = json.loads(text)
if isinstance(payload, dict):
keys = [str(k) for k in payload.keys()]
snake_or_camel = sum(1 for key in keys if re.match(r"^[a-z][a-zA-Z0-9_]*$", key))
return clamp(0.5 + 0.5 * (snake_or_camel / max(len(keys), 1)))

sentences = [s.strip() for s in re.split(r"[.!?]+", text) if s.strip()]
if not sentences:
return 0.3
sentence_lengths = [len(_tokenize_words(sentence)) for sentence in sentences]
avg_len = sum(sentence_lengths) / len(sentence_lengths)
smoothness = 1.0 - min(abs(avg_len - 16.0) / 24.0, 1.0)
punctuation = clamp(text.count(",") / max(len(sentences), 1) / 4.0 + 0.4, 0.0, 1.0)
return clamp(0.65 * smoothness + 0.35 * punctuation)


def _score_validity(submission: str, detected_format: str) -> float:
text = submission.strip()
if not text:
return 0.0

if detected_format == "json":
try:
json.loads(text)
return 1.0
except json.JSONDecodeError:
return 0.0
if detected_format == "code":
pairs = [("(", ")"), ("{", "}"), ("[", "]")]
balance_scores = []
for left, right in pairs:
diff = abs(text.count(left) - text.count(right))
balance_scores.append(clamp(1.0 - diff / 8.0))
return sum(balance_scores) / len(balance_scores)
if detected_format == "markdown":
heading_count = len(re.findall(r"(?m)^#{1,6}\s+\S+", text))
broken_links = len(re.findall(r"\[[^\]]+\]\([^)]+$", text, re.MULTILINE))
return clamp(0.6 + min(0.35, 0.05 * heading_count) - min(0.3, 0.1 * broken_links))
return clamp(0.55 + min(0.4, len(_tokenize_words(text)) / 300.0))


def _build_feedback(dimensions: dict[str, float], detected_format: str) -> list[str]:
feedback = [f"Detected format: {detected_format}."]
weakest = sorted(dimensions.items(), key=lambda item: item[1])[:2]
strongest = sorted(dimensions.items(), key=lambda item: item[1], reverse=True)[:1]

for name, score in weakest:
if score < 0.6:
feedback.append(f"Improve {name.replace('_', ' ')}: current score {score:.2f}.")
for name, score in strongest:
feedback.append(f"Strong {name.replace('_', ' ')} at {score:.2f}.")

if len(feedback) < 3:
feedback.append("Add more structured sections and concrete details to improve overall quality.")
return feedback


def _tokenize_words(text: str) -> list[str]:
return re.findall(r"[A-Za-z0-9_]+", text.lower())


def _quality_rating(weighted_score: float) -> str:
if weighted_score >= 0.85:
return "excellent"
if weighted_score >= 0.70:
return "good"
if weighted_score >= 0.50:
return "fair"
return "needs_improvement"
Loading