Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/api/NewsService.openapi.json

Large diffs are not rendered by default.

40 changes: 28 additions & 12 deletions docs/api/NewsService.openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -296,22 +296,13 @@ components:
importanceScore:
type: integer
format: int32
description: |-
Composite importance score (0-100): severity × 40% + source tier × 20% + corroboration × 30% + recency × 10%.
Absent (0) when not yet scored.
description: 'Composite importance score (0-100): severity × 40% + source tier × 20% + corroboration × 30% + recency × 10%.'
corroborationCount:
type: integer
format: int32
description: Number of distinct sources that reported the same story in this digest cycle.
storyPhase:
type: string
enum:
- STORY_PHASE_UNSPECIFIED
- STORY_PHASE_BREAKING
- STORY_PHASE_DEVELOPING
- STORY_PHASE_SUSTAINED
- STORY_PHASE_FADING
description: StoryPhase represents the lifecycle stage of a tracked news story.
storyMeta:
$ref: '#/components/schemas/StoryMeta'
required:
- source
- title
Expand Down Expand Up @@ -357,3 +348,28 @@ components:
format: double
description: Longitude in decimal degrees (-180 to 180).
description: GeoCoordinates represents a geographic location using WGS84 coordinates.
StoryMeta:
type: object
properties:
firstSeen:
type: integer
format: int64
description: 'Epoch ms when the story first appeared in any digest cycle.. Warning: Values > 2^53 may lose precision in JavaScript'
mentionCount:
type: integer
format: int32
description: Total number of digest cycles in which this story appeared.
sourceCount:
type: integer
format: int32
description: Number of unique sources that reported this story (cached from Redis Set).
phase:
type: string
enum:
- STORY_PHASE_UNSPECIFIED
- STORY_PHASE_BREAKING
- STORY_PHASE_DEVELOPING
- STORY_PHASE_SUSTAINED
- STORY_PHASE_FADING
description: StoryPhase represents the lifecycle stage of a tracked news story.
description: StoryMeta carries cross-cycle persistence data attached to each news item.
7 changes: 3 additions & 4 deletions proto/worldmonitor/news/v1/news_item.proto
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,11 @@ message NewsItem {
// Human-readable location name.
string location_name = 8;
// Composite importance score (0-100): severity × 40% + source tier × 20% + corroboration × 30% + recency × 10%.
// Absent (0) when not yet scored.
int32 importance_score = 9;
// Number of distinct sources that reported the same story in this digest cycle.
int32 corroboration_count = 10;
// Story lifecycle phase derived at read time from story:track metadata.
StoryPhase story_phase = 11;
// Story lifecycle metadata derived from cross-cycle persistence data.
StoryMeta story_meta = 11;
}

// StoryMeta carries cross-cycle persistence data attached to each news item.
Expand All @@ -47,7 +46,7 @@ message StoryMeta {
int32 mention_count = 2;
// Number of unique sources that reported this story (cached from Redis Set).
int32 source_count = 3;
// Story lifecycle phase.
// Story lifecycle phase derived from persistence data.
StoryPhase phase = 4;
}

Expand Down
12 changes: 12 additions & 0 deletions server/_shared/cache-keys.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
// ── Story persistence tracking keys (E3) ─────────────────────────────────────
// Hash: firstSeen, lastSeen, mentionCount, sourceCount, currentScore, peakScore, title, link, severity
export const STORY_TRACK_KEY_PREFIX = 'story:track:v1:';
// Set: unique feed names that have mentioned this story
export const STORY_SOURCES_KEY_PREFIX = 'story:sources:v1:';
// Sorted set: single member "peak" with score = highest importanceScore seen
export const STORY_PEAK_KEY_PREFIX = 'story:peak:v1:';
// Sorted set: accumulator for digest mode notifications (score = pubDate epoch ms)
export const DIGEST_ACCUMULATOR_KEY_PREFIX = 'digest:accumulator:v1:';
// TTL for all story tracking keys (48 hours)
export const STORY_TRACKING_TTL_S = 172800;

/**
* Story tracking keys — written by list-feed-digest.ts, read by digest cron (E2).
* All keys use 32-char SHA-256 hex prefix of the normalised title as ${titleHash}.
Expand Down
205 changes: 125 additions & 80 deletions server/worldmonitor/news/v1/list-feed-digest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import type {
CategoryBucket,
NewsItem as ProtoNewsItem,
ThreatLevel as ProtoThreatLevel,
StoryMeta as ProtoStoryMeta,
StoryPhase as ProtoStoryPhase,
} from '../../../../src/generated/server/worldmonitor/news/v1/service_server';
import { cachedFetchJson, getCachedJsonBatch, runRedisPipeline } from '../../../_shared/redis';
Expand All @@ -20,6 +21,7 @@ import {
STORY_PEAK_KEY,
DIGEST_ACCUMULATOR_KEY,
STORY_TTL,
STORY_TRACK_KEY_PREFIX,
} from '../../../_shared/cache-keys';
import { getRelayBaseUrl, getRelayHeaders } from '../../../_shared/relay';

Expand Down Expand Up @@ -63,21 +65,6 @@ const SCORE_WEIGHTS = {
recency: 0.1,
} as const;

/** Derive story lifecycle phase from Redis-stored tracking data. */
function computePhase(
mentionCount: number,
firstSeenMs: number,
lastSeenMs: number,
now: number,
): ProtoStoryPhase {
const ageH = (now - firstSeenMs) / 3_600_000;
const silenceH = (now - lastSeenMs) / 3_600_000;
if (silenceH > 24) return 'STORY_PHASE_FADING';
if (mentionCount >= 3 && ageH >= 12) return 'STORY_PHASE_SUSTAINED';
if (mentionCount >= 2) return 'STORY_PHASE_DEVELOPING';
if (ageH < 2) return 'STORY_PHASE_BREAKING';
return 'STORY_PHASE_UNSPECIFIED';
}

interface ParsedItem {
source: string;
Expand All @@ -91,13 +78,7 @@ interface ParsedItem {
classSource: 'keyword' | 'llm';
importanceScore: number;
corroborationCount: number;
storyPhase: ProtoStoryPhase;
}

function normalizeTitle(title: string): string {
// 120-char window provides high headline discrimination in practice;
// see todo #102 if hash collision accuracy becomes a concern.
return title.toLowerCase().replace(/[^\w\s]/g, '').replace(/\s+/g, ' ').trim().slice(0, 120);
titleHash?: string;
}

function computeImportanceScore(
Expand Down Expand Up @@ -246,7 +227,6 @@ function parseRssXml(xml: string, feed: ServerFeed, variant: string): ParsedItem
classSource: 'keyword',
importanceScore: 0,
corroborationCount: 1,
storyPhase: 'STORY_PHASE_UNSPECIFIED',
});
}

Expand Down Expand Up @@ -316,23 +296,85 @@ async function enrichWithAiCache(items: ParsedItem[]): Promise<void> {
}
}

function toProtoItem(item: ParsedItem): ProtoNewsItem {
// ── Story persistence tracking ────────────────────────────────────────────────

function normalizeTitle(title: string): string {
// \p{L} = any Unicode letter; \p{N} = any Unicode number.
// The `u` flag is required for Unicode property escapes — without it \w
// matches only ASCII [A-Za-z0-9_], stripping all Arabic/CJK/Cyrillic chars
// and collapsing every non-Latin title to the same empty hash.
return title
.toLowerCase()
.replace(/[^\p{L}\p{N}\s]/gu, '')
.replace(/\s+/g, ' ')
.trim()
.slice(0, 120);
}

interface StoryTrack {
firstSeen: number;
lastSeen: number;
mentionCount: number;
sourceCount: number;
currentScore: number;
peakScore: number;
}

function derivePhase(track: StoryTrack): ProtoStoryPhase {
const ageMs = Date.now() - track.firstSeen;
if (track.mentionCount <= 1) return 'STORY_PHASE_BREAKING';
if (track.mentionCount <= 5 && ageMs < 2 * 60 * 60 * 1000) return 'STORY_PHASE_DEVELOPING';
// FADING requires real scores from E1. Until E1 ships, currentScore and
// peakScore are both 0 (HSETNX placeholders), so this branch is intentionally
// inactive — stories fall through to SUSTAINED rather than incorrectly FADING.
if (track.currentScore > 0 && track.peakScore > 0 && track.currentScore < track.peakScore * 0.5) return 'STORY_PHASE_FADING';
return 'STORY_PHASE_SUSTAINED';
}

/**
* Batch-read existing story:track hashes from Redis for a list of title hashes.
* Returns a Map<titleHash, StoryTrack>. Missing entries are absent from the map.
*/
async function readStoryTracks(titleHashes: string[]): Promise<Map<string, StoryTrack>> {
if (titleHashes.length === 0) return new Map();
const fields = ['firstSeen', 'lastSeen', 'mentionCount', 'sourceCount', 'currentScore', 'peakScore'];
const commands = titleHashes.map(h => [
'HMGET', `${STORY_TRACK_KEY_PREFIX}${h}`, ...fields,
]);
const results = await runRedisPipeline(commands, true);
const map = new Map<string, StoryTrack>();
for (let i = 0; i < titleHashes.length; i++) {
const vals = results[i]?.result as string[] | null;
if (!vals || !vals[0]) continue; // firstSeen missing → new story
map.set(titleHashes[i]!, {
firstSeen: Number(vals[0]),
lastSeen: Number(vals[1] ?? 0),
mentionCount: Number(vals[2] ?? 0),
sourceCount: Number(vals[3] ?? 0),
currentScore: Number(vals[4] ?? 0),
peakScore: Number(vals[5] ?? 0),
});
}
return map;
}

function toProtoItem(item: ParsedItem, storyMeta?: ProtoStoryMeta): ProtoNewsItem {
return {
source: item.source,
title: item.title,
link: item.link,
publishedAt: item.publishedAt,
isAlert: item.isAlert,
importanceScore: item.importanceScore,
corroborationCount: item.corroborationCount ?? 0,
storyMeta,
threat: {
level: LEVEL_TO_PROTO[item.level],
category: item.category,
confidence: item.confidence,
source: item.classSource,
},
locationName: '',
importanceScore: item.importanceScore,
corroborationCount: item.corroborationCount,
storyPhase: item.storyPhase,
};
}

Expand Down Expand Up @@ -483,38 +525,34 @@ async function buildDigest(variant: string, lang: string): Promise<ListFeedDiges
}
}

// Build corroboration map across the FULL corpus (before any per-category truncation)
// so cross-category mentions are captured. Key = normalized title.
// Flatten ALL items before any truncation so cross-category corroboration is counted.
const allItems = [...results.values()].flat();

// Compute sha256 title hashes and build corroboration map in one pass.
// Hashes are stored on each item for reuse as Redis story-tracking keys.
const corroborationMap = new Map<string, Set<string>>();
for (const items of results.values()) {
for (const item of items) {
const norm = normalizeTitle(item.title);
const sources = corroborationMap.get(norm) ?? new Set();
sources.add(item.source);
corroborationMap.set(norm, sources);
}
await Promise.all(allItems.map(async (item) => {
const hash = await sha256Hex(normalizeTitle(item.title));
item.titleHash = hash;
const sources = corroborationMap.get(hash) ?? new Set<string>();
sources.add(item.source);
corroborationMap.set(hash, sources);
}));

for (const item of allItems) {
item.corroborationCount = corroborationMap.get(item.titleHash!)?.size ?? 1;
}

// Enrich ALL items with the AI classification cache BEFORE scoring so that
// importanceScore uses the final (post-LLM) threat level, and the subsequent
// truncation discards items based on their true score. Running enrichment
// after slicing was a bug: upgraded items could have been already cut, and
// downgraded items kept a score they no longer deserved.
const allItems = [...results.values()].flat();
// importanceScore uses the final (post-LLM) threat level, and truncation
// discards items based on their true score.
await enrichWithAiCache(allItems);

// Assign corroboration count and compute importance score using final levels.
for (const items of results.values()) {
for (const item of items) {
const norm = normalizeTitle(item.title);
item.corroborationCount = corroborationMap.get(norm)?.size ?? 1;
item.importanceScore = computeImportanceScore(
item.level,
item.source,
item.corroborationCount,
item.publishedAt,
);
}
// Compute importance score using final (post-enrichment) threat levels.
for (const item of allItems) {
item.importanceScore = computeImportanceScore(
item.level, item.source, item.corroborationCount, item.publishedAt,
);
}

// Sort by importanceScore desc, then pubDate desc; then truncate per category.
Expand All @@ -527,41 +565,48 @@ async function buildDigest(variant: string, lang: string): Promise<ListFeedDiges
}

const allSliced = [...slicedByCategory.values()].flat();
// titleHash was already set on each item during the corroboration pass above.
const titleHashes = allSliced.map(i => i.titleHash!);

// Pre-compute title hashes once — reused for tracking write and phase read.
const titleHashes = await Promise.all(
allSliced.map(item => sha256Hex(normalizeTitle(item.title))),
);
const now = Date.now();

// Write tracking FIRST so phase read sees this cycle's mentionCount/firstSeen.
// Without this ordering, first-time stories never return STORY_PHASE_BREAKING
// and all stories lag by one digest cycle. Awaited here so the write completes
// before the isolate moves on (digest is cached 15 min, negligible extra latency).
// Read existing story tracking BEFORE writing so we know the previous cycle's
// mentionCount. We merge read state + this cycle's increment in memory to
// produce accurate, current StoryMeta without a second Redis round-trip.
const uniqueHashes = [...new Set(titleHashes)];
const storyTracks = await readStoryTracks(uniqueHashes).catch(() => new Map<string, StoryTrack>());

// Write story tracking. Errors never fail the digest build.
await writeStoryTracking(allSliced, variant, titleHashes).catch((err: unknown) =>
console.warn('[digest] story tracking write failed:', err),
);

// Batch-read story tracking hashes (HGETALL) to assign lifecycle phases.
// Reads post-write data so first-time stories correctly get STORY_PHASE_BREAKING.
const trackResults = await runRedisPipeline(
titleHashes.map(h => ['HGETALL', STORY_TRACK_KEY(h)]),
);
const phaseNow = Date.now();
for (let i = 0; i < allSliced.length; i++) {
const raw = trackResults[i]?.result as Record<string, string> | null | undefined;
if (raw && typeof raw === 'object' && raw.firstSeen) {
allSliced[i]!.storyPhase = computePhase(
Number(raw.mentionCount ?? '1'),
Number(raw.firstSeen),
Number(raw.lastSeen ?? raw.firstSeen),
phaseNow,
);
}
}

for (const [category, sliced] of slicedByCategory) {
categories[category] = {
items: sliced.map(toProtoItem),
items: sliced.map((item) => {
const hash = item.titleHash!;
const sourceCount = corroborationMap.get(hash)?.size ?? 1;
const stale = storyTracks.get(hash);
// Merge stale state + this cycle's HINCRBY to get the current mentionCount.
// New stories (stale = undefined) start at mentionCount=1 this cycle.
const mentionCount = stale ? stale.mentionCount + 1 : 1;
const firstSeen = stale?.firstSeen ?? now;
const merged: StoryTrack = {
firstSeen,
lastSeen: now,
mentionCount,
sourceCount,
currentScore: stale?.currentScore ?? 0,
peakScore: stale?.peakScore ?? 0,
};
const storyMeta: ProtoStoryMeta = {
firstSeen,
mentionCount,
sourceCount,
phase: derivePhase(merged),
};
return toProtoItem(item, storyMeta);
}),
};
}

Expand Down
Loading
Loading