Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 43 additions & 16 deletions scripts/lib/fusion/semantic_dedup.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class _Block:
_CODE_FENCE_RE = re.compile(r"```.*?```", re.DOTALL)


def _split_blocks(text: str) -> list[_Block]:
def _split_blocks(text: str, *, shingle_n: int = _SHINGLE_N) -> list[_Block]:
"""
Split *text* into logical blocks.

Expand All @@ -108,7 +108,7 @@ def _in_fence(start: int, end: int) -> bool:
# Add fenced code blocks as atomic blocks first.
for fs, fe in fence_spans:
block_text = text[fs:fe]
sh = _shingles(_tokenise(block_text))
sh = _shingles(_tokenise(block_text), n=shingle_n)
blocks.append(_Block(
text=block_text,
start=fs,
Expand Down Expand Up @@ -149,7 +149,7 @@ def _in_fence(start: int, end: int) -> bool:
if chunk.strip():
abs_start = seg_start + last
abs_end = seg_start + m.start()
sh = _shingles(_tokenise(chunk))
sh = _shingles(_tokenise(chunk), n=shingle_n)
blocks.append(_Block(
text=chunk,
start=abs_start,
Expand All @@ -163,7 +163,7 @@ def _in_fence(start: int, end: int) -> bool:
if chunk.strip():
abs_start = seg_start + last
abs_end = seg_start + len(segment)
sh = _shingles(_tokenise(chunk))
sh = _shingles(_tokenise(chunk), n=shingle_n)
blocks.append(_Block(
text=chunk,
start=abs_start,
Expand Down Expand Up @@ -206,36 +206,42 @@ def as_dict(self) -> dict:
}


def _run_dedup(text: str) -> tuple[str, DedupStats]:
def _run_dedup(
text: str,
*,
sim_threshold: float = _SIM_THRESHOLD,
shingle_n: int = _SHINGLE_N,
min_block_chars: int = _MIN_BLOCK_CHARS,
) -> tuple[str, DedupStats]:
"""
Run within-text block deduplication.

Args:
text: Source text.
sim_threshold: Jaccard similarity above which blocks are duplicates.
shingle_n: Word n-gram size for fingerprinting.
min_block_chars: Minimum block length to consider for dedup.

Returns the rewritten text and statistics.
"""
stats = DedupStats(tokens_before=estimate_tokens(text))

blocks = _split_blocks(text)
blocks = _split_blocks(text, shingle_n=shingle_n)
stats.blocks_total = len(blocks)

if not blocks:
stats.tokens_after = stats.tokens_before
return text, stats

# Assign 1-based sequential numbers for use in references.
# We'll use the position in the sorted block list as the "block number".
# Blocks that are too short to consider receive no shingle set.

# First pass: mark duplicates.
# kept_blocks: list of (block_number, shingles) for blocks we are keeping.
kept_blocks: list[tuple[int, frozenset]] = []

for idx, block in enumerate(blocks):
block_num = idx + 1 # 1-based
short = len(block.text.strip()) < _MIN_BLOCK_CHARS
short = len(block.text.strip()) < min_block_chars
no_shingles = len(block.shingles) < _MIN_SHINGLES

if short or no_shingles:
# Too short / no shingles — always keep, never dedup.
block.kept = True
block.ref_to = None
continue
Expand All @@ -244,7 +250,7 @@ def _run_dedup(text: str) -> tuple[str, DedupStats]:
duplicate_of: int | None = None
for prev_num, prev_sh in kept_blocks:
sim = _jaccard(block.shingles, prev_sh)
if sim >= _SIM_THRESHOLD:
if sim >= sim_threshold:
duplicate_of = prev_num
break

Expand Down Expand Up @@ -305,19 +311,40 @@ class SemanticDedup(FusionStage):

Splits text into blocks (paragraphs + fenced code blocks), fingerprints
each with 3-word shingles, and replaces near-duplicate blocks
(Jaccard >= 0.8) with compact back-references.
(Jaccard >= threshold) with compact back-references.

The similarity threshold and shingle size can be tuned via constructor
arguments to trade off between dedup aggressiveness and false positives.
Stricter thresholds (e.g. 0.9) only collapse near-identical blocks;
looser thresholds (e.g. 0.6) will catch paraphrased content at the risk
of merging blocks that differ in meaningful ways.
"""

name = "semantic_dedup"
order = 12 # After Cortex(5), after any RLE-style stages(10), before Ionizer(15)

def __init__(
self,
similarity_threshold: float | None = None,
shingle_size: int | None = None,
min_block_chars: int | None = None,
) -> None:
self.similarity_threshold = similarity_threshold or _SIM_THRESHOLD
self.shingle_size = shingle_size or _SHINGLE_N
self.min_block_chars = min_block_chars or _MIN_BLOCK_CHARS

def should_apply(self, ctx: FusionContext) -> bool:
"""Apply to any content longer than 200 characters."""
return len(ctx.content) > 200

def apply(self, ctx: FusionContext) -> FusionResult:
original_tokens = estimate_tokens(ctx.content)
output, stats = _run_dedup(ctx.content)
output, stats = _run_dedup(
ctx.content,
sim_threshold=self.similarity_threshold,
shingle_n=self.shingle_size,
min_block_chars=self.min_block_chars,
)
compressed_tokens = estimate_tokens(output)

markers: list[str] = []
Expand Down