diff --git a/batch/README.md b/batch/README.md index feae08a846..763bef17fa 100644 --- a/batch/README.md +++ b/batch/README.md @@ -24,7 +24,7 @@ Process multiple job offers in parallel via headless workers. Each worker runs t ./batch/batch-runner.sh ``` -4. **Results** are automatically merged into `data/applications.md` and verified with `verify-pipeline.mjs` at the end of the run. +4. **Results** are automatically merged into `data/applications.md`, processed offers are reconciled out of the `data/pipeline.md` inbox, and integrity is verified with `verify-pipeline.mjs` at the end of the run. ## Options @@ -54,7 +54,7 @@ batch/ 1. **batch-runner.sh** reads `batch-input.tsv` and `batch-state.tsv` to determine which offers need processing. 2. For each pending offer, it assigns a report number and launches a headless worker with `batch-prompt.md` as the system prompt (placeholders like `{{URL}}`, `{{REPORT_NUM}}` are resolved). 3. Each worker evaluates the offer, writes a report to `reports/`, generates a PDF to `output/`, and writes a tracker TSV to `tracker-additions/`. -4. After all workers finish, batch-runner calls `merge-tracker.mjs` to merge TSVs into `data/applications.md` and runs `verify-pipeline.mjs` to check integrity. +4. After all workers finish, batch-runner calls `merge-tracker.mjs` to merge TSVs into `data/applications.md`, `reconcile-pipeline.mjs` to move processed offers out of the `data/pipeline.md` inbox, and `verify-pipeline.mjs` to check integrity. ## Tracker Merge @@ -67,6 +67,12 @@ Workers write one TSV per offer to `batch/tracker-additions/`. The merge script Run `npm run merge` manually if you need to merge outside of a batch run. +## Pipeline Reconcile + +Batch mode reads offers from `batch-input.tsv`, but the `data/pipeline.md` inbox is a separate list. Without reconciliation, an offer evaluated by a batch run stays in the pipeline "Pendientes" section and gets surfaced again on the next scan or `/career-ops pipeline` run -- producing duplicate reports. + +`reconcile-pipeline.mjs` (run as `npm run reconcile`) closes that gap: after the tracker merge, every `completed` or `skipped` offer in `batch-state.tsv` whose URL is still in pipeline "Pendientes" is moved to "Procesadas" with its report link and score (entries without a report file on disk are left in place). It is idempotent -- safe to run after every batch, or manually. + ## Resumability `batch-state.tsv` tracks the status of every offer (`pending`, `processing`, `completed`, `failed`). If the batch is interrupted, re-running `batch-runner.sh` picks up where it left off -- completed offers are skipped automatically. diff --git a/batch/batch-runner.sh b/batch/batch-runner.sh index bc65585300..e6c88029c0 100755 --- a/batch/batch-runner.sh +++ b/batch/batch-runner.sh @@ -525,6 +525,9 @@ merge_tracker() { echo "=== Merging tracker additions ===" node "$PROJECT_DIR/merge-tracker.mjs" echo "" + echo "=== Reconciling pipeline.md ===" + node "$PROJECT_DIR/reconcile-pipeline.mjs" || echo "⚠️ Pipeline reconcile had issues (see above)" + echo "" echo "=== Verifying pipeline integrity ===" node "$PROJECT_DIR/verify-pipeline.mjs" || echo "⚠️ Verification found issues (see above)" } diff --git a/package.json b/package.json index b221ebc485..a5e1b3dc39 100644 --- a/package.json +++ b/package.json @@ -8,6 +8,7 @@ "normalize": "node normalize-statuses.mjs", "dedup": "node dedup-tracker.mjs", "merge": "node merge-tracker.mjs", + "reconcile": "node reconcile-pipeline.mjs", "pdf": "node generate-pdf.mjs", "sync-check": "node cv-sync-check.mjs", "update:check": "node update-system.mjs check", diff --git a/reconcile-pipeline.mjs b/reconcile-pipeline.mjs new file mode 100644 index 0000000000..12e2e2c801 --- /dev/null +++ b/reconcile-pipeline.mjs @@ -0,0 +1,277 @@ +#!/usr/bin/env node +/** + * reconcile-pipeline.mjs — Sync pipeline.md "Pendientes" with batch-state.tsv + * + * THE PROBLEM + * batch-runner.sh records every evaluated offer in batch/batch-state.tsv, but + * it never writes back to data/pipeline.md. Offers processed via batch mode + * therefore stay in the "Pendientes" section forever — the next scan and the + * next `/career-ops pipeline` run both re-surface them, and they get evaluated + * again (duplicate reports, duplicate tracker rows). + * + * WHAT THIS DOES + * For each `completed` / `skipped` entry in batch-state.tsv whose URL is still + * sitting in pipeline.md "Pendientes", move that line to "Procesadas" with its + * report link, score and PDF flag. + * + * Idempotent: an entry already moved (no longer in Pendientes) is a no-op, and + * an entry already present in Procesadas is dropped from Pendientes without a + * second copy. Safe to run after every batch. + * + * Run: node reconcile-pipeline.mjs [--dry-run] [--state ] [--pipeline ] + */ + +import { readFileSync, writeFileSync, existsSync, readdirSync, copyFileSync } from 'fs'; +import { join, dirname, resolve, relative, isAbsolute } from 'path'; +import { fileURLToPath } from 'url'; + +const CAREER_OPS = dirname(fileURLToPath(import.meta.url)); +const DRY_RUN = process.argv.includes('--dry-run'); + +if (process.argv.includes('-h') || process.argv.includes('--help')) { + console.log('Usage: node reconcile-pipeline.mjs [--dry-run] [--state ] [--pipeline ]'); + console.log(' Moves batch-processed offers out of pipeline.md "Pendientes" into "Procesadas".'); + process.exit(0); +} + +function argValue(flag) { + const i = process.argv.indexOf(flag); + return i >= 0 && i + 1 < process.argv.length ? process.argv[i + 1] : null; +} + +// Constrain user-supplied --state/--pipeline paths to the repository tree, so a +// crafted path cannot read from or overwrite files outside the project. +function resolveInsideRepo(inputPath, fallbackPath, flag) { + const abs = resolve(inputPath || fallbackPath); + const rel = relative(CAREER_OPS, abs); + if (rel.startsWith('..') || isAbsolute(rel)) { + console.error(`Invalid ${flag}: path must stay inside the repository (${abs})`); + process.exit(1); + } + return abs; +} + +const defaultPipeline = existsSync(join(CAREER_OPS, 'data/pipeline.md')) + ? join(CAREER_OPS, 'data/pipeline.md') + : join(CAREER_OPS, 'pipeline.md'); +const PIPELINE_FILE = resolveInsideRepo(argValue('--pipeline'), defaultPipeline, '--pipeline'); +const STATE_FILE = resolveInsideRepo(argValue('--state'), join(CAREER_OPS, 'batch/batch-state.tsv'), '--state'); +const REPORTS_DIR = join(CAREER_OPS, 'reports'); + +// ---- guards ---- +if (!existsSync(STATE_FILE)) { + console.log('No batch-state.tsv found — nothing to reconcile.'); + process.exit(0); +} +if (!existsSync(PIPELINE_FILE)) { + console.log('No pipeline.md found — nothing to reconcile.'); + process.exit(0); +} + +// ---- parse batch-state.tsv ---- +// columns: id url status started_at completed_at report_num score error retries +const DONE = new Map(); // url -> { reportNum, score } +for (const line of readFileSync(STATE_FILE, 'utf-8').split(/\r?\n/)) { + if (!line.trim() || line.startsWith('id\t')) continue; + const c = line.split('\t'); + if (c.length < 7) continue; + const [, url, status, , , reportNum, score] = c; + // "completed" and "skipped" (below --min-score) both produced a report. + if (status !== 'completed' && status !== 'skipped') continue; + if (!url || !url.trim()) continue; + DONE.set(url.trim(), { reportNum: (reportNum || '').trim(), score: (score || '').trim() }); +} + +if (DONE.size === 0) { + console.log('No completed batch entries in batch-state.tsv — nothing to reconcile.'); + process.exit(0); +} + +// ---- report lookup ---- +let reportFiles = []; +try { reportFiles = readdirSync(REPORTS_DIR).filter(f => f.endsWith('.md')); } catch { /* no reports dir */ } + +function findReportFile(reportNum) { + if (!reportNum || reportNum === '-') return null; + const n = parseInt(reportNum, 10); + if (Number.isNaN(n)) return null; + return reportFiles.find(f => { + const m = f.match(/^(\d+)-/); + return m && parseInt(m[1], 10) === n; + }) || null; +} + +function readReportField(reportFile, field) { + if (!reportFile) return null; + try { + const txt = readFileSync(join(REPORTS_DIR, reportFile), 'utf-8'); + const m = txt.match(new RegExp(`^\\*\\*${field}:\\*\\*\\s*(.+)$`, 'm')); + return m ? m[1].trim() : null; + } catch { return null; } +} + +// State score is authoritative when numeric; otherwise fall back to the report. +function resolveScore(stateScore, reportFile) { + if (/^\d+(?:\.\d+)?$/.test(stateScore)) return `${stateScore}/5`; + const rep = readReportField(reportFile, 'Score'); + if (rep) { + const num = rep.match(/(\d+(?:\.\d+)?)/); + if (num) return `${num[1]}/5`; + if (/n\/?a/i.test(rep)) return 'N/A'; + } + return 'N/A'; +} + +function resolvePdf(reportFile) { + const rep = readReportField(reportFile, 'PDF'); + if (!rep) return '❌'; + return /not generated/i.test(rep) ? '❌' : '✅'; +} + +// ---- parse pipeline.md ---- +const lines = readFileSync(PIPELINE_FILE, 'utf-8').split(/\r?\n/); + +const PENDING_RE = /^##\s+(Pendientes|Pending)\s*$/i; +const PROCESSED_RE = /^##\s+(Procesadas|Processed)\s*$/i; +const SECTION_RE = /^##\s+/; +const PENDING_ITEM_RE = /^- \[ \]\s+/; + +function lineUrl(body) { + // "{url} | company | role" -> "{url}" + const i = body.indexOf(' |'); + return (i >= 0 ? body.slice(0, i) : body).trim(); +} + +let pendStart = -1, procStart = -1; +for (let i = 0; i < lines.length; i++) { + if (pendStart < 0 && PENDING_RE.test(lines[i])) pendStart = i; + else if (procStart < 0 && PROCESSED_RE.test(lines[i])) procStart = i; +} + +if (pendStart < 0) { + console.log('No "Pendientes" section in pipeline.md — nothing to reconcile.'); + process.exit(0); +} + +function sectionEnd(start) { + for (let i = start + 1; i < lines.length; i++) { + if (SECTION_RE.test(lines[i])) return i; + } + return lines.length; +} +const pendEnd = sectionEnd(pendStart); +const procEnd = procStart >= 0 ? sectionEnd(procStart) : -1; + +// URLs already in Procesadas — guards against a double copy on re-runs. +const procUrls = new Set(); +if (procStart >= 0) { + for (let i = procStart + 1; i < procEnd; i++) { + const m = lines[i].match(/^- \[x\]\s+(.+)$/i); + if (!m) continue; + // "[num](path) | url | company | role | score | PDF x" — url is field 2 + const parts = m[1].split('|').map(s => s.trim()); + if (parts[1]) procUrls.add(parts[1]); + } +} + +// ---- walk Pendientes, decide keep vs. move ---- +const removeIdx = new Set(); +const movedProcLines = []; +const moved = []; +const skippedNoReport = []; + +for (let i = pendStart + 1; i < pendEnd; i++) { + if (!PENDING_ITEM_RE.test(lines[i])) continue; // blank lines, "- [!]" errors → keep + const body = lines[i].replace(PENDING_ITEM_RE, ''); + const url = lineUrl(body); + const done = DONE.get(url); + if (!done) continue; // not processed → keep in Pendientes + + if (procUrls.has(url)) { + // Already recorded in Procesadas — just drop the stale Pendientes copy. + removeIdx.add(i); + moved.push({ url, role: '(already in Procesadas)', dup: true }); + continue; + } + + const reportFile = findReportFile(done.reportNum); + if (!reportFile) { + // No report on disk — leave it in Pendientes rather than write a dead link. + skippedNoReport.push({ url, reportNum: done.reportNum || '?' }); + continue; + } + + const parts = body.split('|').map(s => s.trim()); + const company = parts[1] || ''; + const role = parts[2] || ''; + const score = resolveScore(done.score, reportFile); + const pdf = resolvePdf(reportFile); + const num = parseInt(done.reportNum, 10); + + movedProcLines.push(`- [x] [${num}](reports/${reportFile}) | ${url} | ${company} | ${role} | ${score} | PDF ${pdf}`); + moved.push({ url, company, role, num, score }); + procUrls.add(url); + removeIdx.add(i); +} + +// ---- report & exit early if nothing changed ---- +console.log('=== Reconcile pipeline.md ==='); +for (const s of skippedNoReport) { + console.warn(`⚠️ ${s.url} — batch reports report #${s.reportNum} but no reports/${s.reportNum}-*.md found; left in Pendientes.`); +} + +if (removeIdx.size === 0) { + console.log('✅ pipeline.md already in sync — nothing to reconcile.'); + process.exit(0); +} + +// ---- rebuild the file ---- +const out = []; +let skipBlankAfterProc = false; +for (let i = 0; i < lines.length; i++) { + if (removeIdx.has(i)) continue; + if (skipBlankAfterProc) { + skipBlankAfterProc = false; + if (lines[i].trim() === '') continue; // drop the original blank after "## Procesadas" + } + out.push(lines[i]); + if (i === procStart && movedProcLines.length > 0) { + out.push('', ...movedProcLines); + skipBlankAfterProc = true; + } +} +// No Procesadas section yet — create one at the end of the file, matching the +// language the pending section header already uses. +if (procStart < 0 && movedProcLines.length > 0) { + const processedHeader = /Pending/i.test(lines[pendStart]) ? '## Processed' : '## Procesadas'; + if (out.length && out[out.length - 1].trim() !== '') out.push(''); + out.push(processedHeader, '', ...movedProcLines); +} + +const newContent = out.join('\n'); + +const newCount = (() => { + let n = 0, inPend = false; + for (const l of out) { + if (PENDING_RE.test(l)) { inPend = true; continue; } + if (SECTION_RE.test(l)) { inPend = false; continue; } + if (inPend && PENDING_ITEM_RE.test(l)) n++; + } + return n; +})(); + +const realMoves = moved.filter(m => !m.dup); +console.log(`🔄 ${realMoves.length} processed entr${realMoves.length === 1 ? 'y' : 'ies'} moved Pendientes → Procesadas:`); +for (const m of realMoves) console.log(` + #${m.num} ${m.company} — ${m.role} (${m.score})`); +const dups = moved.filter(m => m.dup); +if (dups.length) console.log(`🧹 ${dups.length} stale Pendientes entr${dups.length === 1 ? 'y' : 'ies'} dropped (already in Procesadas).`); +console.log(`📋 Pendientes now: ${newCount} entr${newCount === 1 ? 'y' : 'ies'}`); + +if (DRY_RUN) { + console.log('(dry-run — no changes written)'); + process.exit(0); +} + +copyFileSync(PIPELINE_FILE, `${PIPELINE_FILE}.pre-reconcile.bak`); +writeFileSync(PIPELINE_FILE, newContent); +console.log(`✅ pipeline.md updated (backup: ${PIPELINE_FILE}.pre-reconcile.bak)`); diff --git a/test-all.mjs b/test-all.mjs index c5152fc11f..fcc18b6f33 100644 --- a/test-all.mjs +++ b/test-all.mjs @@ -68,6 +68,7 @@ const scripts = [ { name: 'normalize-statuses.mjs', expectExit: 0 }, { name: 'dedup-tracker.mjs', expectExit: 0 }, { name: 'merge-tracker.mjs', expectExit: 0 }, + { name: 'reconcile-pipeline.mjs', expectExit: 0 }, { name: 'update-system.mjs check', expectExit: 0 }, ];