Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions batch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ Process multiple job offers in parallel via headless workers. Each worker runs t
./batch/batch-runner.sh
```

4. **Results** are automatically merged into `data/applications.md` and verified with `verify-pipeline.mjs` at the end of the run.
4. **Results** are automatically merged into `data/applications.md`, processed offers are reconciled out of the `data/pipeline.md` inbox, and integrity is verified with `verify-pipeline.mjs` at the end of the run.

## Options

Expand Down Expand Up @@ -54,7 +54,7 @@ batch/
1. **batch-runner.sh** reads `batch-input.tsv` and `batch-state.tsv` to determine which offers need processing.
2. For each pending offer, it assigns a report number and launches a headless worker with `batch-prompt.md` as the system prompt (placeholders like `{{URL}}`, `{{REPORT_NUM}}` are resolved).
3. Each worker evaluates the offer, writes a report to `reports/`, generates a PDF to `output/`, and writes a tracker TSV to `tracker-additions/`.
4. After all workers finish, batch-runner calls `merge-tracker.mjs` to merge TSVs into `data/applications.md` and runs `verify-pipeline.mjs` to check integrity.
4. After all workers finish, batch-runner calls `merge-tracker.mjs` to merge TSVs into `data/applications.md`, `reconcile-pipeline.mjs` to move processed offers out of the `data/pipeline.md` inbox, and `verify-pipeline.mjs` to check integrity.

## Tracker Merge

Expand All @@ -67,6 +67,12 @@ Workers write one TSV per offer to `batch/tracker-additions/`. The merge script

Run `npm run merge` manually if you need to merge outside of a batch run.

## Pipeline Reconcile

Batch mode reads offers from `batch-input.tsv`, but the `data/pipeline.md` inbox is a separate list. Without reconciliation, an offer evaluated by a batch run stays in the pipeline "Pendientes" section and gets surfaced again on the next scan or `/career-ops pipeline` run -- producing duplicate reports.

`reconcile-pipeline.mjs` (run as `npm run reconcile`) closes that gap: after the tracker merge, every `completed` or `skipped` offer in `batch-state.tsv` whose URL is still in pipeline "Pendientes" is moved to "Procesadas" with its report link and score (entries without a report file on disk are left in place). It is idempotent -- safe to run after every batch, or manually.

## Resumability

`batch-state.tsv` tracks the status of every offer (`pending`, `processing`, `completed`, `failed`). If the batch is interrupted, re-running `batch-runner.sh` picks up where it left off -- completed offers are skipped automatically.
Expand Down
3 changes: 3 additions & 0 deletions batch/batch-runner.sh
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,9 @@ merge_tracker() {
echo "=== Merging tracker additions ==="
node "$PROJECT_DIR/merge-tracker.mjs"
echo ""
echo "=== Reconciling pipeline.md ==="
node "$PROJECT_DIR/reconcile-pipeline.mjs" || echo "⚠️ Pipeline reconcile had issues (see above)"
echo ""
echo "=== Verifying pipeline integrity ==="
node "$PROJECT_DIR/verify-pipeline.mjs" || echo "⚠️ Verification found issues (see above)"
}
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
"normalize": "node normalize-statuses.mjs",
"dedup": "node dedup-tracker.mjs",
"merge": "node merge-tracker.mjs",
"reconcile": "node reconcile-pipeline.mjs",
"pdf": "node generate-pdf.mjs",
"sync-check": "node cv-sync-check.mjs",
"update:check": "node update-system.mjs check",
Expand Down
277 changes: 277 additions & 0 deletions reconcile-pipeline.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,277 @@
#!/usr/bin/env node
/**
* reconcile-pipeline.mjs — Sync pipeline.md "Pendientes" with batch-state.tsv
*
* THE PROBLEM
* batch-runner.sh records every evaluated offer in batch/batch-state.tsv, but
* it never writes back to data/pipeline.md. Offers processed via batch mode
* therefore stay in the "Pendientes" section forever — the next scan and the
* next `/career-ops pipeline` run both re-surface them, and they get evaluated
* again (duplicate reports, duplicate tracker rows).
*
* WHAT THIS DOES
* For each `completed` / `skipped` entry in batch-state.tsv whose URL is still
* sitting in pipeline.md "Pendientes", move that line to "Procesadas" with its
* report link, score and PDF flag.
*
* Idempotent: an entry already moved (no longer in Pendientes) is a no-op, and
* an entry already present in Procesadas is dropped from Pendientes without a
* second copy. Safe to run after every batch.
*
* Run: node reconcile-pipeline.mjs [--dry-run] [--state <path>] [--pipeline <path>]
*/

import { readFileSync, writeFileSync, existsSync, readdirSync, copyFileSync } from 'fs';
import { join, dirname, resolve, relative, isAbsolute } from 'path';
import { fileURLToPath } from 'url';

const CAREER_OPS = dirname(fileURLToPath(import.meta.url));
const DRY_RUN = process.argv.includes('--dry-run');

if (process.argv.includes('-h') || process.argv.includes('--help')) {
console.log('Usage: node reconcile-pipeline.mjs [--dry-run] [--state <path>] [--pipeline <path>]');
console.log(' Moves batch-processed offers out of pipeline.md "Pendientes" into "Procesadas".');
process.exit(0);
}

function argValue(flag) {
const i = process.argv.indexOf(flag);
return i >= 0 && i + 1 < process.argv.length ? process.argv[i + 1] : null;
}

// Constrain user-supplied --state/--pipeline paths to the repository tree, so a
// crafted path cannot read from or overwrite files outside the project.
function resolveInsideRepo(inputPath, fallbackPath, flag) {
const abs = resolve(inputPath || fallbackPath);
const rel = relative(CAREER_OPS, abs);
if (rel.startsWith('..') || isAbsolute(rel)) {
console.error(`Invalid ${flag}: path must stay inside the repository (${abs})`);
process.exit(1);
}
return abs;
}

const defaultPipeline = existsSync(join(CAREER_OPS, 'data/pipeline.md'))
? join(CAREER_OPS, 'data/pipeline.md')
: join(CAREER_OPS, 'pipeline.md');
const PIPELINE_FILE = resolveInsideRepo(argValue('--pipeline'), defaultPipeline, '--pipeline');
const STATE_FILE = resolveInsideRepo(argValue('--state'), join(CAREER_OPS, 'batch/batch-state.tsv'), '--state');
const REPORTS_DIR = join(CAREER_OPS, 'reports');

// ---- guards ----
if (!existsSync(STATE_FILE)) {
console.log('No batch-state.tsv found — nothing to reconcile.');
process.exit(0);
}
if (!existsSync(PIPELINE_FILE)) {
console.log('No pipeline.md found — nothing to reconcile.');
process.exit(0);
}

// ---- parse batch-state.tsv ----
// columns: id url status started_at completed_at report_num score error retries
const DONE = new Map(); // url -> { reportNum, score }
for (const line of readFileSync(STATE_FILE, 'utf-8').split(/\r?\n/)) {
if (!line.trim() || line.startsWith('id\t')) continue;
const c = line.split('\t');
if (c.length < 7) continue;
const [, url, status, , , reportNum, score] = c;
// "completed" and "skipped" (below --min-score) both produced a report.
if (status !== 'completed' && status !== 'skipped') continue;
if (!url || !url.trim()) continue;
DONE.set(url.trim(), { reportNum: (reportNum || '').trim(), score: (score || '').trim() });
}

if (DONE.size === 0) {
console.log('No completed batch entries in batch-state.tsv — nothing to reconcile.');
process.exit(0);
}

// ---- report lookup ----
let reportFiles = [];
try { reportFiles = readdirSync(REPORTS_DIR).filter(f => f.endsWith('.md')); } catch { /* no reports dir */ }

function findReportFile(reportNum) {
if (!reportNum || reportNum === '-') return null;
const n = parseInt(reportNum, 10);
if (Number.isNaN(n)) return null;
return reportFiles.find(f => {
const m = f.match(/^(\d+)-/);
return m && parseInt(m[1], 10) === n;
}) || null;
}

function readReportField(reportFile, field) {
if (!reportFile) return null;
try {
const txt = readFileSync(join(REPORTS_DIR, reportFile), 'utf-8');
const m = txt.match(new RegExp(`^\\*\\*${field}:\\*\\*\\s*(.+)$`, 'm'));
return m ? m[1].trim() : null;
} catch { return null; }
}

// State score is authoritative when numeric; otherwise fall back to the report.
function resolveScore(stateScore, reportFile) {
if (/^\d+(?:\.\d+)?$/.test(stateScore)) return `${stateScore}/5`;
const rep = readReportField(reportFile, 'Score');
if (rep) {
const num = rep.match(/(\d+(?:\.\d+)?)/);
if (num) return `${num[1]}/5`;
if (/n\/?a/i.test(rep)) return 'N/A';
}
return 'N/A';
}

function resolvePdf(reportFile) {
const rep = readReportField(reportFile, 'PDF');
if (!rep) return '❌';
return /not generated/i.test(rep) ? '❌' : '✅';
}

// ---- parse pipeline.md ----
const lines = readFileSync(PIPELINE_FILE, 'utf-8').split(/\r?\n/);

const PENDING_RE = /^##\s+(Pendientes|Pending)\s*$/i;
const PROCESSED_RE = /^##\s+(Procesadas|Processed)\s*$/i;
const SECTION_RE = /^##\s+/;
const PENDING_ITEM_RE = /^- \[ \]\s+/;

function lineUrl(body) {
// "{url} | company | role" -> "{url}"
const i = body.indexOf(' |');
return (i >= 0 ? body.slice(0, i) : body).trim();
}

let pendStart = -1, procStart = -1;
for (let i = 0; i < lines.length; i++) {
if (pendStart < 0 && PENDING_RE.test(lines[i])) pendStart = i;
else if (procStart < 0 && PROCESSED_RE.test(lines[i])) procStart = i;
}

if (pendStart < 0) {
console.log('No "Pendientes" section in pipeline.md — nothing to reconcile.');
process.exit(0);
}

function sectionEnd(start) {
for (let i = start + 1; i < lines.length; i++) {
if (SECTION_RE.test(lines[i])) return i;
}
return lines.length;
}
const pendEnd = sectionEnd(pendStart);
const procEnd = procStart >= 0 ? sectionEnd(procStart) : -1;

// URLs already in Procesadas — guards against a double copy on re-runs.
const procUrls = new Set();
if (procStart >= 0) {
for (let i = procStart + 1; i < procEnd; i++) {
const m = lines[i].match(/^- \[x\]\s+(.+)$/i);
if (!m) continue;
// "[num](path) | url | company | role | score | PDF x" — url is field 2
const parts = m[1].split('|').map(s => s.trim());
if (parts[1]) procUrls.add(parts[1]);
}
}

// ---- walk Pendientes, decide keep vs. move ----
const removeIdx = new Set();
const movedProcLines = [];
const moved = [];
const skippedNoReport = [];

for (let i = pendStart + 1; i < pendEnd; i++) {
if (!PENDING_ITEM_RE.test(lines[i])) continue; // blank lines, "- [!]" errors → keep
const body = lines[i].replace(PENDING_ITEM_RE, '');
const url = lineUrl(body);
const done = DONE.get(url);
if (!done) continue; // not processed → keep in Pendientes

if (procUrls.has(url)) {
// Already recorded in Procesadas — just drop the stale Pendientes copy.
removeIdx.add(i);
moved.push({ url, role: '(already in Procesadas)', dup: true });
continue;
}

const reportFile = findReportFile(done.reportNum);
if (!reportFile) {
// No report on disk — leave it in Pendientes rather than write a dead link.
skippedNoReport.push({ url, reportNum: done.reportNum || '?' });
continue;
}

const parts = body.split('|').map(s => s.trim());
const company = parts[1] || '';
const role = parts[2] || '';
const score = resolveScore(done.score, reportFile);
const pdf = resolvePdf(reportFile);
const num = parseInt(done.reportNum, 10);

movedProcLines.push(`- [x] [${num}](reports/${reportFile}) | ${url} | ${company} | ${role} | ${score} | PDF ${pdf}`);
moved.push({ url, company, role, num, score });
procUrls.add(url);
removeIdx.add(i);
}

// ---- report & exit early if nothing changed ----
console.log('=== Reconcile pipeline.md ===');
for (const s of skippedNoReport) {
console.warn(`⚠️ ${s.url} — batch reports report #${s.reportNum} but no reports/${s.reportNum}-*.md found; left in Pendientes.`);
}

if (removeIdx.size === 0) {
console.log('✅ pipeline.md already in sync — nothing to reconcile.');
process.exit(0);
}

// ---- rebuild the file ----
const out = [];
let skipBlankAfterProc = false;
for (let i = 0; i < lines.length; i++) {
if (removeIdx.has(i)) continue;
if (skipBlankAfterProc) {
skipBlankAfterProc = false;
if (lines[i].trim() === '') continue; // drop the original blank after "## Procesadas"
}
out.push(lines[i]);
if (i === procStart && movedProcLines.length > 0) {
out.push('', ...movedProcLines);
skipBlankAfterProc = true;
}
}
// No Procesadas section yet — create one at the end of the file, matching the
// language the pending section header already uses.
if (procStart < 0 && movedProcLines.length > 0) {
const processedHeader = /Pending/i.test(lines[pendStart]) ? '## Processed' : '## Procesadas';
if (out.length && out[out.length - 1].trim() !== '') out.push('');
out.push(processedHeader, '', ...movedProcLines);
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

const newContent = out.join('\n');

const newCount = (() => {
let n = 0, inPend = false;
for (const l of out) {
if (PENDING_RE.test(l)) { inPend = true; continue; }
if (SECTION_RE.test(l)) { inPend = false; continue; }
if (inPend && PENDING_ITEM_RE.test(l)) n++;
}
return n;
})();

const realMoves = moved.filter(m => !m.dup);
console.log(`🔄 ${realMoves.length} processed entr${realMoves.length === 1 ? 'y' : 'ies'} moved Pendientes → Procesadas:`);
for (const m of realMoves) console.log(` + #${m.num} ${m.company} — ${m.role} (${m.score})`);
const dups = moved.filter(m => m.dup);
if (dups.length) console.log(`🧹 ${dups.length} stale Pendientes entr${dups.length === 1 ? 'y' : 'ies'} dropped (already in Procesadas).`);
console.log(`📋 Pendientes now: ${newCount} entr${newCount === 1 ? 'y' : 'ies'}`);

if (DRY_RUN) {
console.log('(dry-run — no changes written)');
process.exit(0);
}

copyFileSync(PIPELINE_FILE, `${PIPELINE_FILE}.pre-reconcile.bak`);
writeFileSync(PIPELINE_FILE, newContent);
console.log(`✅ pipeline.md updated (backup: ${PIPELINE_FILE}.pre-reconcile.bak)`);
1 change: 1 addition & 0 deletions test-all.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ const scripts = [
{ name: 'normalize-statuses.mjs', expectExit: 0 },
{ name: 'dedup-tracker.mjs', expectExit: 0 },
{ name: 'merge-tracker.mjs', expectExit: 0 },
{ name: 'reconcile-pipeline.mjs', expectExit: 0 },
{ name: 'update-system.mjs check', expectExit: 0 },
];

Expand Down
Loading