Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/adapters/workspace-log-sink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ const WORKSPACE_EVENTS = new Set<EngineEventType>([
"bundle.crashed",
"bundle.recovered",
"bundle.dead",
"bundle.start_failed",
"data.changed",
"config.changed",
"skill.created",
Expand Down
1 change: 1 addition & 0 deletions src/api/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ export class SseEventManager implements EventSink {
type === "bundle.crashed" ||
type === "bundle.recovered" ||
type === "bundle.dead" ||
type === "bundle.start_failed" ||
type === "data.changed" ||
type === "config.changed" ||
type === "skill.created" ||
Expand Down
8 changes: 7 additions & 1 deletion src/api/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,13 @@ export function startServer(options: ServerOptions): ServerHandle {
const internalToken = runtime.getInternalToken();

const mcpSources = runtime.mcpSources();
const healthMonitor = new HealthMonitor(mcpSources, runtime.getEventSink());
const startFailures = runtime.getStartFailures().map((f) => ({
name: f.serverName,
error: f.error,
}));
const healthMonitor = new HealthMonitor(mcpSources, runtime.getEventSink(), {
startFailures,
});
healthMonitor.start();

// SSE event manager — listens to runtime events and broadcasts to clients
Expand Down
1 change: 1 addition & 0 deletions src/engine/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ export type EngineEventType =
| "bundle.crashed"
| "bundle.recovered"
| "bundle.dead"
| "bundle.start_failed"
| "data.changed"
| "config.changed"
| "skill.created"
Expand Down
23 changes: 18 additions & 5 deletions src/runtime/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ export class Runtime {
_systemSource: import("../tools/types.ts").ToolSource | null;
/** Platform sources (home, conversations, files, etc.) — retained for JIT workspace registration. */
private _platformSources: import("../tools/types.ts").ToolSource[] = [];
/** Bundles that threw during startup. Surfaced via /v1/health so operators can see dead bundles instead of silent absence. */
_startFailures: import("./workspace-runtime.ts").BundleStartFailure[] = [];
/** Getter for current workspace ID (set per-request). */
private _currentWorkspaceId: (() => string | null) | null = null;
private _manageConversationCtx:
Expand Down Expand Up @@ -454,13 +456,18 @@ export class Runtime {

// Phase 3: Start workspace bundles with per-workspace registries
const configDir = config.configPath ? dirname(config.configPath) : undefined;
const { registries: workspaceRegistries, entries: workspaceBundleEntries } =
await startWorkspaceBundles(workspaceStore, platformSources, systemTools, configDir, {
workDir: resolveWorkDir(config),
allowInsecureRemotes: config.allowInsecureRemotes,
});
const {
registries: workspaceRegistries,
entries: workspaceBundleEntries,
startFailures,
} = await startWorkspaceBundles(workspaceStore, platformSources, systemTools, configDir, {
workDir: resolveWorkDir(config),
allowInsecureRemotes: config.allowInsecureRemotes,
eventSink: events,
});
rt._workspaceRegistries = workspaceRegistries;
rt._platformSources = platformSources;
rt._startFailures = startFailures;

// Seed lifecycle instances for workspace bundles (user-installed only)
for (const entry of workspaceBundleEntries) {
Expand Down Expand Up @@ -870,6 +877,12 @@ export class Runtime {
return sources;
}

/** Bundles that failed to start at boot. Used by HealthMonitor to report
* dead bundles that never became McpSources. */
getStartFailures(): import("./workspace-runtime.ts").BundleStartFailure[] {
return this._startFailures;
}

/** Get all tracked bundle instances (unfiltered — use getBundleInstancesForWorkspace for scoped access). */
getBundleInstances(): BundleInstance[] {
return this.lifecycle.getInstances();
Expand Down
42 changes: 40 additions & 2 deletions src/runtime/workspace-runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { join } from "node:path";
import { deriveServerName, resolveBundleDataDir } from "../bundles/paths.ts";
import { startBundleSource } from "../bundles/startup.ts";
import type { BundleRef, LocalBundleMeta } from "../bundles/types.ts";
import type { EventSink } from "../engine/types.ts";
import { ToolRegistry } from "../tools/registry.ts";
import type { ToolSource } from "../tools/types.ts";
import type { Workspace } from "../workspace/types.ts";
Expand All @@ -33,6 +34,15 @@ export interface ProcessInventoryEntry {
meta?: LocalBundleMeta | null;
}

/** A bundle whose startup threw — the process is not running, but we record it
* for operator visibility (workspace log, SSE, /v1/health). */
export interface BundleStartFailure {
wsId: string;
serverName: string;
bundleName: string;
error: string;
}

// ---------------------------------------------------------------------------
// Process inventory
// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -141,8 +151,18 @@ export async function startWorkspaceBundles(
opts?: {
allowInsecureRemotes?: boolean;
workDir?: string;
/**
* Optional event sink. When provided, bundle start failures emit a
* `bundle.start_failed` event (workspace log + SSE). Callers that invoke
* this before the runtime's sink is wired (e.g. some tests) may omit it.
*/
eventSink?: EventSink;
},
): Promise<{ registries: Map<string, ToolRegistry>; entries: ProcessInventoryEntry[] }> {
): Promise<{
registries: Map<string, ToolRegistry>;
entries: ProcessInventoryEntry[];
startFailures: BundleStartFailure[];
}> {
const workDir = opts?.workDir ?? join(process.env.NB_WORK_DIR ?? "", ".nimblebrain");
const workspaces = await workspaceStore.list();
const inventory = buildProcessInventory(workspaces, workDir);
Expand All @@ -164,6 +184,7 @@ export async function startWorkspaceBundles(

const registries = new Map<string, ToolRegistry>();
const resultEntries: ProcessInventoryEntry[] = [];
const startFailures: BundleStartFailure[] = [];

for (const [wsId, wsEntries] of byWorkspace) {
const wsRegistry = createWorkspaceRegistry(platformSources, systemSource);
Expand All @@ -179,16 +200,33 @@ export async function startWorkspaceBundles(
resultEntries.push({ ...entry, serverName: result.sourceName, meta: result.meta });
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
const bundleName = bundleNameFromRef(entry.bundle);
// Keep stderr logging for Docker/k8s operators tailing container logs.
process.stderr.write(
`[workspace-runtime] Failed to start ${entry.serverName} in ${wsId}: ${msg}\n`,
);
const failure: BundleStartFailure = {
wsId,
serverName: entry.serverName,
bundleName,
error: msg,
};
startFailures.push(failure);
// Surface to the workspace log and to SSE clients. Same failure data
// is handed back to the caller so HealthMonitor can report it via
// /v1/health (bundle never became an McpSource, so the monitor has
// no other way to know it exists).
opts?.eventSink?.emit({
type: "bundle.start_failed",
data: { ...failure },
});
}
}

registries.set(wsId, wsRegistry);
}

return { registries, entries: resultEntries };
return { registries, entries: resultEntries, startFailures };
}

// ---------------------------------------------------------------------------
Expand Down
33 changes: 31 additions & 2 deletions src/tools/health-monitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ export interface BundleHealth {
state: BundleState;
uptime: number | null;
restartCount: number;
/** Populated for bundles that never came up (dead-on-arrival). */
error?: string;
}

interface BundleRecord {
Expand All @@ -16,13 +18,24 @@ interface BundleRecord {
restartCount: number;
}

/** Dead-on-arrival record for a bundle whose startup threw — the process is
* not running, so no McpSource exists to monitor. Kept here so `/v1/health`
* can still report it as `dead` (operators would otherwise see the bundle
* simply vanish from health output). */
export interface StartFailureRecord {
name: string;
error: string;
}

const MAX_RESTARTS = 5;
const DEFAULT_BASE_DELAY_MS = 1000;
const DEFAULT_CHECK_INTERVAL_MS = 30_000;

export interface HealthMonitorOptions {
checkIntervalMs?: number;
baseDelayMs?: number;
/** Bundles that threw at startup and should be reported as `dead`. */
startFailures?: StartFailureRecord[];
}

/**
Expand All @@ -31,6 +44,7 @@ export interface HealthMonitorOptions {
*/
export class HealthMonitor {
private records: BundleRecord[];
private startFailures: StartFailureRecord[];
private timer: ReturnType<typeof setInterval> | null = null;
private checkIntervalMs: number;
private baseDelayMs: number;
Expand All @@ -47,6 +61,7 @@ export class HealthMonitor {
state: "healthy" as BundleState,
restartCount: 0,
}));
this.startFailures = opts.startFailures ?? [];
}

/** Start the periodic health check loop. */
Expand All @@ -69,14 +84,28 @@ export class HealthMonitor {
await Promise.all(tasks);
}

/** Get per-bundle health info. */
/** Get per-bundle health info. Merges live records with dead-on-arrival
* start failures so operators see every bundle the system tried to run. */
getStatus(): BundleHealth[] {
return this.records.map((r) => ({
const live: BundleHealth[] = this.records.map((r) => ({
name: r.source.name,
state: r.state,
uptime: r.source.uptime(),
restartCount: r.restartCount,
}));
// A start failure is suppressed if the same server later came up. We
// compare by name so the live record (which has real uptime) wins.
const liveNames = new Set(live.map((r) => r.name));
const dead: BundleHealth[] = this.startFailures
.filter((f) => !liveNames.has(f.name))
.map((f) => ({
name: f.name,
state: "dead" as BundleState,
uptime: null,
restartCount: 0,
error: f.error,
}));
return [...live, ...dead];
}

private async checkOne(record: BundleRecord): Promise<void> {
Expand Down
46 changes: 46 additions & 0 deletions test/unit/health-monitor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -166,4 +166,50 @@ describe("HealthMonitor", () => {
expect(sink.events).toHaveLength(0);
expect(source.restartCalls).toBe(0);
});

it("reports startFailures as dead in getStatus with no live source", () => {
const sink = makeEventCollector();
const monitor = new HealthMonitor([], sink, {
startFailures: [{ name: "broken-bundle", error: "ENOENT: missing binary" }],
});

const status = monitor.getStatus();
expect(status).toHaveLength(1);
expect(status[0]!.name).toBe("broken-bundle");
expect(status[0]!.state).toBe("dead");
expect(status[0]!.uptime).toBeNull();
expect(status[0]!.restartCount).toBe(0);
expect(status[0]!.error).toBe("ENOENT: missing binary");
});

it("merges startFailures with live source records", () => {
const source = makeMockSource("live-bundle");
const sink = makeEventCollector();
const monitor = new HealthMonitor([source], sink, {
startFailures: [{ name: "broken-bundle", error: "boom" }],
});

const status = monitor.getStatus();
expect(status.map((s) => s.name).sort()).toEqual(["broken-bundle", "live-bundle"]);
const broken = status.find((s) => s.name === "broken-bundle");
expect(broken!.state).toBe("dead");
const live = status.find((s) => s.name === "live-bundle");
expect(live!.state).toBe("healthy");
});

it("suppresses a startFailure when a source with the same name recovers", () => {
// Manage case: if a bundle started on a retry path (name reused), the
// live status should win — don't double-report.
const source = makeMockSource("flaky");
const sink = makeEventCollector();
const monitor = new HealthMonitor([source], sink, {
startFailures: [{ name: "flaky", error: "first attempt failed" }],
});

const status = monitor.getStatus();
expect(status).toHaveLength(1);
expect(status[0]!.name).toBe("flaky");
expect(status[0]!.state).toBe("healthy");
expect(status[0]!.error).toBeUndefined();
});
});
81 changes: 80 additions & 1 deletion test/unit/runtime/workspace-runtime.test.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
import { describe, expect, it } from "bun:test";
import { mkdtempSync, rmSync } from "node:fs";
import { tmpdir } from "node:os";
import { join } from "node:path";
import type { BundleRef } from "../../../src/bundles/types.ts";
import type { Workspace } from "../../../src/workspace/types.ts";
import type { EngineEvent, EventSink } from "../../../src/engine/types.ts";
import {
buildProcessInventory,
type ProcessInventoryEntry,
startWorkspaceBundles,
} from "../../../src/runtime/workspace-runtime.ts";
import { WorkspaceStore } from "../../../src/workspace/workspace-store.ts";
import type { Workspace } from "../../../src/workspace/types.ts";

// ---------------------------------------------------------------------------
// Fixtures
Expand Down Expand Up @@ -154,3 +159,77 @@ describe("buildProcessInventory", () => {
expect(uniqueDirs.size).toBe(dataDirs.length);
});
});

// ---------------------------------------------------------------------------
// startWorkspaceBundles — failure surfacing (issue #7)
// ---------------------------------------------------------------------------

describe("startWorkspaceBundles — bundle.start_failed surfacing", () => {
it("emits bundle.start_failed and returns failures when a local bundle path is missing", async () => {
const tmp = mkdtempSync(join(tmpdir(), "nb-start-fail-"));
try {
const store = new WorkspaceStore(tmp);
const ws = await store.create("Broken");
// Point at a path that does not exist — startBundleSource will throw
// "Local bundle not found" inside the try block.
await store.update(ws.id, {
bundles: [{ path: "/this/path/does/not/exist/__nb_test__" }],
});

const collected: EngineEvent[] = [];
const sink: EventSink = { emit: (e) => collected.push(e) };

// Swallow the "Failed to start" stderr write that the function also does
// — the important behavior is the event + return value.
const origWrite = process.stderr.write.bind(process.stderr);
process.stderr.write = (() => true) as typeof process.stderr.write;
try {
const result = await startWorkspaceBundles(store, [], null, undefined, {
workDir: tmp,
eventSink: sink,
});

// The failed bundle is not in resultEntries, but is in startFailures.
expect(result.entries.some((e) => e.wsId === ws.id)).toBe(false);
expect(result.startFailures).toHaveLength(1);
expect(result.startFailures[0]!.wsId).toBe(ws.id);
expect(result.startFailures[0]!.error).toContain("Local bundle not found");

// The registry was still created so the workspace is usable for
// platform tools — existing "startup continues on failure" behavior.
expect(result.registries.has(ws.id)).toBe(true);

// An event was emitted with the same details.
const failedEvents = collected.filter((e) => e.type === "bundle.start_failed");
expect(failedEvents).toHaveLength(1);
expect(failedEvents[0]!.data.wsId).toBe(ws.id);
expect(failedEvents[0]!.data.error).toContain("Local bundle not found");
} finally {
process.stderr.write = origWrite;
}
} finally {
rmSync(tmp, { recursive: true, force: true });
}
});

it("no failures emitted and no event when all bundles start cleanly (empty workspace)", async () => {
const tmp = mkdtempSync(join(tmpdir(), "nb-start-ok-"));
try {
const store = new WorkspaceStore(tmp);
await store.create("Empty");

const collected: EngineEvent[] = [];
const sink: EventSink = { emit: (e) => collected.push(e) };

const result = await startWorkspaceBundles(store, [], null, undefined, {
workDir: tmp,
eventSink: sink,
});

expect(result.startFailures).toEqual([]);
expect(collected.filter((e) => e.type === "bundle.start_failed")).toHaveLength(0);
} finally {
rmSync(tmp, { recursive: true, force: true });
}
});
});
Loading