Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 64 additions & 1 deletion profile/migration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,52 @@ export class ProfileMigration {
`Failed to save token data: ${saveResult.error ?? 'unknown error'}`,
);
}
this.log(`Token data saved, CID: ${saveResult.cid ?? 'debounced'}`);
this.log(`Token data save() returned (initial CID: ${saveResult.cid ?? 'debounced'})`);

// C2 fix (Audit #333): force durability before cleanup.
//
// `save()` is debounce-based — it may return `cid: 'debounced'`
// and `success: true` while the IPFS pin + OrbitDB bundle ref are
// still pending. The legacy cleanup step (5) deletes legacy KV
// keys and unpins the legacy CID. If the wallet crashes (or the
// debounced flush fails later) between these two points, we lose
// both the legacy state AND the new (un-pinned, gateway-
// reclaimable) CID. The Profile-side bundle ref is not yet on
// OrbitDB, so token-DB loss is also possible.
//
// `awaitNextFlush()` drives the flush via
// `flushScheduler.forceFlushSerialized()` and throws on TIMEOUT or
// POINTER_MONOTONICITY_VIOLATION — converting a silent crash-
// window data loss into a recoverable MIGRATION_FAILED. The 4-
// iteration cap inside `awaitNextFlush` is the only termination
// condition (we pass timeoutMs=0 → no wall-clock deadline) since
// migration durability legitimately scales with token count and
// testnet/IPFS latency.
if (typeof profileTokenStorage.awaitNextFlush === 'function') {
try {
await profileTokenStorage.awaitNextFlush(0);
this.log('Token data flush durable (Audit #333 C2)');
} catch (err) {
throw new ProfileError(
'MIGRATION_FAILED',
`Forced flush of token data failed; refusing to proceed to ` +
`cleanup. Reason: ${err instanceof Error ? err.message : String(err)}`,
err,
);
}
} else {
// A token-storage provider without `awaitNextFlush` cannot
// guarantee durability — refusing to enter cleanup is the only
// safe option. This is a hard error rather than a warning
// because the alternative is the pre-fix silent-loss path.
throw new ProfileError(
'MIGRATION_FAILED',
'ProfileTokenStorageProvider lacks awaitNextFlush() — refusing ' +
'to proceed without a durability gate (Audit #333 C2). ' +
'Upgrade the SDK or wire a real provider implementing the ' +
'TokenStorageProvider durability contract.',
);
}
}
}

Expand Down Expand Up @@ -676,6 +721,24 @@ export class ProfileMigration {
`Failed to load token data from profile: ${loadResult.error ?? 'no data returned'}`,
);
} else {
// C2 fix (Audit #333): post-flush durability assertion.
//
// After `stepPersistToOrbitDb`'s `awaitNextFlush()` returns,
// `pendingData` MUST be null on the real provider — load() then
// walks active bundles in OrbitDB and reports `source:
// 'remote'`. A `source: 'cache'` here means the sanity check is
// reading uncommitted in-memory state (the audit's exact
// complaint: "passes even if nothing was pinned"). Surface as a
// hard error so cleanup does not proceed against unverified
// backing.
if (loadResult.source === 'cache') {
errors.push(
`Sanity-check load returned source='cache' after persist's ` +
`awaitNextFlush(); the bundle ref / IPFS pin is not durable yet. ` +
`Refusing to proceed to cleanup (Audit #333 C2).`,
);
}

const loadedData = loadResult.data;

// Collect token IDs from loaded data
Expand Down
153 changes: 144 additions & 9 deletions profile/profile-token-storage/flush-scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,11 @@
// block-by-block via `pinCarBlocksToIpfs`; per-block IPFS dedup is
// restored, closing the byte-cost-per-token-mutation regression
// introduced by the #212 monolithic-raw interim.
import { fetchCarFromIpfs, pinCarBlocksToIpfs } from '../ipfs-client.js';
import {
fetchCarFromIpfs,
pinCarBlocksToIpfs,
verifyCidAccessibleWithRetry,
} from '../ipfs-client.js';
import { extractCarRootCid } from '../../uxf/transfer-payload.js';
import { extractLostHeadCid } from '../orbitdb-adapter.js';
import type { OrbitDbAdapter } from '../orbitdb-adapter.js';
Expand Down Expand Up @@ -144,6 +148,29 @@ export const POINTER_MONOTONICITY_RECOVERED = 'POINTER_MONOTONICITY_RECOVERED';
*/
export const MONOTONICITY_RECOVERY_PAYLOAD_CAP = 100;

/**
* Audit #333 C3 — OpLog auto-reset probe budget.
*
* Before `addBundleWithOplogAutoReset` falls through to the destructive
* `db.drop()` reset, it probes the configured IPFS gateways for the
* unreachable head CID with exponential backoff. The matcher
* (`extractLostHeadCid`) cannot distinguish a permanently-corrupt head
* (Helia GC) from a transiently-unreachable one (gateway blip /
* propagation lag), so a 30 s probe window — matching the Issue #239
* shutdown gate convention for testnet propagation — gives a single
* miss a chance to recover before we commit to wiping all not-yet-
* bundled OUTBOX/SENT/disposition/finalization state.
*
* A genuinely-corrupt head will fail every gateway HEAD probe across
* the window; the reset then proceeds as before. The cost in the bad-
* case is +30 s before destructive teardown — a small price relative
* to the data-loss exposure on the false-positive path.
*/
export const OPLOG_RESET_PROBE_DEADLINE_MS = 30_000;

/** Per-attempt HEAD timeout inside the probe loop. */
export const OPLOG_RESET_PROBE_PER_ATTEMPT_MS = 5_000;

// ── Helpers ───────────────────────────────────────────────────────────────

/**
Expand Down Expand Up @@ -1744,25 +1771,133 @@ export class FlushScheduler {
throw err;
}

const context = 'flush-scheduler.bundle-write';

// C3 (Audit #333): probe BEFORE the destructive reset.
//
// `extractLostHeadCid` matches both permanently-corrupt heads
// (Helia GC ran on a memory-blockstore wallet) and transiently
// unreachable ones (gateway blip, peer offline, propagation
// lag). The pre-fix code went straight from "matcher matched" to
// `db.drop()`, wiping all OUTBOX/SENT/disposition/finalization
// entries not yet captured in a pinned bundle — a permanent loss
// on a transient fetch failure.
//
// Probe the configured IPFS gateways for the lost CID with
// exponential backoff. If any gateway serves it within the
// deadline, retry `addBundle` ONCE — a transient miss often
// clears within the propagation window. Only fall through to
// the reset when the probe confirms unrecoverability OR the
// gateway-retry STILL fails the same way (in which case the
// local Helia blockstore is the bottleneck, not the network).
//
// Skipped only when no gateways are configured at all — that
// setup has no recovery surface, so destructive reset is the
// only forward path (preserves pre-fix behaviour).
//
// `effectiveLostHeadCid` may be updated by the probe-retry
// branch if a follow-up addBundle attempt surfaces a fresher
// unreachable CID; the reset call then uses the freshest one.
let effectiveLostHeadCid = lostHeadCid;

if (this.host.ipfsGateways.length > 0) {
this.host.log(
`OpLog head unreachable (lostHeadCid=${lostHeadCid}); probing ` +
`${this.host.ipfsGateways.length} gateway(s) before destructive reset ` +
`(Audit #333 C3, deadline=${OPLOG_RESET_PROBE_DEADLINE_MS}ms)`,
);
let probeOk = false;
let probeAttempts = 0;
let probeElapsedMs = 0;
try {
const probe = await verifyCidAccessibleWithRetry(
this.host.ipfsGateways,
lostHeadCid,
{
deadlineMs: OPLOG_RESET_PROBE_DEADLINE_MS,
perAttemptTimeoutMs: OPLOG_RESET_PROBE_PER_ATTEMPT_MS,
},
);
probeOk = probe.ok;
probeAttempts = probe.attempts;
probeElapsedMs = probe.elapsedMs;
} catch (probeErr) {
// Probe itself threw (e.g., gateway URL validation failure
// before any HEAD ran). Treat as "probe did not confirm
// recoverability" — log and fall through to reset, since
// we cannot prove the head is recoverable.
this.host.log(
`OpLog head probe threw before completing: ` +
`${probeErr instanceof Error ? probeErr.message : String(probeErr)}; ` +
`proceeding to reset (Audit #333 C3)`,
);
}

if (probeOk) {
this.host.log(
`OpLog head IS accessible via gateway after ${probeAttempts} ` +
`attempt(s) in ${probeElapsedMs}ms; retrying addBundle once before reset ` +
`(Audit #333 C3)`,
);
try {
await this.bundleIndex.addBundle(cid, bundleRef);
// Probe-retry succeeded — the original failure was
// transient. Reset NOT performed. Caller (flush body)
// continues normally; no operational state was lost.
return;
} catch (retryErr) {
const retryLostHeadCid = extractLostHeadCid(retryErr);
if (retryLostHeadCid === null) {
// Retry hit a DIFFERENT error class (e.g., monotonicity
// violation, network down). Re-throw the new error;
// resetting the log would not help here.
throw retryErr;
}
// Same auto-reset signature again, despite the gateway
// probe succeeding. Local Helia is the bottleneck — fall
// through to the destructive reset, but use the
// most-recent lostHeadCid (which may differ if the
// baseline advanced during the probe).
this.host.log(
`Probe succeeded but addBundle still failed with ` +
`lostHeadCid=${retryLostHeadCid}; proceeding to reset ` +
`(Audit #333 C3)`,
);
effectiveLostHeadCid = retryLostHeadCid;
}
} else {
this.host.log(
`OpLog head NOT accessible via any gateway after ${probeAttempts} ` +
`attempt(s) in ${probeElapsedMs}ms; head is genuinely unrecoverable. ` +
`Proceeding to reset (Audit #333 C3)`,
);
}
} else {
this.host.log(
`OpLog head unreachable (lostHeadCid=${lostHeadCid}) and no IPFS ` +
`gateways configured; no recovery surface available. Proceeding to ` +
`reset (Audit #333 C3)`,
);
}

this.host.log(
`OpLog head unreachable (lostHeadCid=${lostHeadCid}); auto-resetting Profile DB. ` +
`OpLog head unreachable (lostHeadCid=${effectiveLostHeadCid}); auto-resetting Profile DB. ` +
`Prior OpLog history is permanently inaccessible. Token data on local IndexedDB is preserved.`,
);

const context = 'flush-scheduler.bundle-write';
// 3. Trigger event BEFORE the reset so operators see it even if
// the reset itself fails.
this.host.emitEvent({
type: 'profile:oplog-auto-resetting',
timestamp: Date.now(),
data: { lostHeadCid, context },
data: { lostHeadCid: effectiveLostHeadCid, context },
});

// 4. Run the reset.
let resetResult: { recovered: true; lostHeadCid?: string; recoveredAt: number };
try {
resetResult = await dbWithReset.resetCorruptedLog({
lostHeadCid,
lostHeadCid: effectiveLostHeadCid,
context,
});
} catch (resetErr) {
Expand All @@ -1774,7 +1909,7 @@ export class FlushScheduler {
type: 'profile:recovered',
timestamp: Date.now(),
data: {
lostHeadCid,
lostHeadCid: effectiveLostHeadCid,
recoveredAt: Date.now(),
context,
retrySucceeded: false,
Expand All @@ -1795,7 +1930,7 @@ export class FlushScheduler {
const marker: ProfileRecoveryMarker = {
version: 1,
recoveredAt: resetResult.recoveredAt,
lostHeadCid: resetResult.lostHeadCid ?? lostHeadCid,
lostHeadCid: resetResult.lostHeadCid ?? effectiveLostHeadCid,
context,
walkBackClosed: true,
note:
Expand Down Expand Up @@ -1823,7 +1958,7 @@ export class FlushScheduler {
type: 'profile:recovered',
timestamp: Date.now(),
data: {
lostHeadCid,
lostHeadCid: effectiveLostHeadCid,
recoveredAt: resetResult.recoveredAt,
context,
retrySucceeded: false,
Expand All @@ -1838,7 +1973,7 @@ export class FlushScheduler {
type: 'profile:recovered',
timestamp: Date.now(),
data: {
lostHeadCid,
lostHeadCid: effectiveLostHeadCid,
recoveredAt: resetResult.recoveredAt,
context,
retrySucceeded: true,
Expand Down
Loading