Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/mcp/delta-sync-poller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ export class DeltaSyncPoller {
private paused = false;
private wasHealthy = true;
private lastResult: DeltaSyncResult | null = null;
private tickCount = 0;

constructor(private options: DeltaSyncPollerOptions) {
this.service = new DeltaSyncService({
Expand Down Expand Up @@ -112,6 +113,7 @@ export class DeltaSyncPoller {
clearInterval(this.interval);
this.interval = null;
}
this.tickCount = 0;
this.service.close();
this.options.logger?.info("Delta-sync poller stopped");
}
Expand Down Expand Up @@ -181,6 +183,8 @@ export class DeltaSyncPoller {

if (this.paused || this.service.isSyncing()) return;

this.tickCount++;

const result = await this.service.sync();
this.lastResult = result;

Expand Down
31 changes: 31 additions & 0 deletions src/mcp/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,33 @@ if (process.argv.includes('--lite')) {

const SERVICE_NAME = process.env.SERVICE_NAME || 'supertag-mcp';

/**
* Idle auto-exit: if no tool calls arrive for IDLE_TIMEOUT_MS, the MCP server
* self-terminates. Claude Code will restart it on the next tool call.
* This prevents zombie processes from accumulating across sessions.
* Set SUPERTAG_MCP_IDLE_TIMEOUT=0 to disable.
*/
const DEFAULT_IDLE_TIMEOUT_MS = 30 * 60 * 1000; // 30 minutes
const rawTimeout = Number(process.env.SUPERTAG_MCP_IDLE_TIMEOUT ?? DEFAULT_IDLE_TIMEOUT_MS);
const IDLE_TIMEOUT_MS = Number.isNaN(rawTimeout) ? DEFAULT_IDLE_TIMEOUT_MS : rawTimeout;
let idleTimer: ReturnType<typeof setTimeout> | null = null;

function resetIdleTimer() {
if (IDLE_TIMEOUT_MS <= 0) return;
if (idleTimer) clearTimeout(idleTimer);
idleTimer = setTimeout(() => {
// Don't exit mid-sync — defer until next idle check
if (activePoller?.isSyncing()) {
resetIdleTimer();
return;
}
logger.info('Idle timeout reached, shutting down', { timeoutMs: IDLE_TIMEOUT_MS });
activePoller?.stop();
process.exit(0);
}, IDLE_TIMEOUT_MS);
idleTimer.unref();
}

/**
* MCP-safe logger - configured to write to stderr to avoid interfering with stdio JSON-RPC
* Uses unified logger with explicit stderr stream for MCP protocol compliance
Expand Down Expand Up @@ -366,6 +393,7 @@ server.setRequestHandler(ListToolsRequestSchema, async () => {

// Execute tools
server.setRequestHandler(CallToolRequestSchema, async (request) => {
resetIdleTimer();
const { name, arguments: args } = request.params;
const mode = getToolMode();
logger.info('Tool called', { tool: name, mode });
Expand Down Expand Up @@ -660,6 +688,9 @@ async function main() {
error: String(pollerError),
});
}

// Start idle timer after successful initialization
resetIdleTimer();
} catch (error) {
logger.error('Failed to start MCP server', { error: String(error) });
process.exit(1);
Expand Down
30 changes: 30 additions & 0 deletions src/services/delta-sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,16 @@ const PAGE_SIZE = 100;
*/
export class DeltaSyncService {
private db: Database;
private dbPath: string;
private localApiClient: DeltaSyncOptions["localApiClient"];
private embeddingConfig?: DeltaSyncOptions["embeddingConfig"];
private logger: NonNullable<DeltaSyncOptions["logger"]>;
private syncing = false;

constructor(options: DeltaSyncOptions) {
this.dbPath = options.dbPath;
this.db = new Database(options.dbPath);
this.db.run("PRAGMA busy_timeout = 5000");
this.localApiClient = options.localApiClient;
this.embeddingConfig = options.embeddingConfig;
this.logger = options.logger ?? {
Expand All @@ -43,6 +46,30 @@ export class DeltaSyncService {
};
}

/**
* Check if the database connection is healthy.
* If stale (e.g., "disk full" error from WAL corruption), reconnect.
*/
ensureHealthyConnection(): void {
try {
this.db.run("SELECT 1");
} catch (error) {
this.logger.warn("Database connection unhealthy, reconnecting", {
error: String(error),
});
try {
this.db.close();
} catch (closeError) {
this.logger.warn("Failed to close stale connection (expected)", {
error: String(closeError),
});
}
this.db = new Database(this.dbPath);
this.db.run("PRAGMA busy_timeout = 5000");
this.logger.info("Database connection re-established");
}
}

/**
* Close the database connection.
* Call when the service is no longer needed.
Expand Down Expand Up @@ -229,6 +256,9 @@ export class DeltaSyncService {
* 6. Return result
*/
async sync(): Promise<DeltaSyncResult> {
// Verify connection health before syncing (mitigates stale connection from process accumulation)
this.ensureHealthyConnection();

// T-2.3: In-memory lock check
if (this.syncing) {
this.logger.warn("Delta-sync already in progress, skipping");
Expand Down
242 changes: 242 additions & 0 deletions tests/unit/delta-sync-health-check.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
/**
* DeltaSyncService.ensureHealthyConnection() Tests
*
* Tests for the connection health check and auto-reconnect logic:
* - Healthy connection: no reconnect
* - Unhealthy connection: reconnect succeeds
* - Close failure during reconnect: handled gracefully
* - Logging: warns on unhealthy, info on re-established
* - busy_timeout configured on reconnect
*/

import { describe, it, expect, beforeEach, afterEach, mock } from "bun:test";
import { Database } from "bun:sqlite";
import { unlinkSync } from "fs";
import { DeltaSyncService } from "../../src/services/delta-sync";
import type { SearchResultNode } from "../../src/types/local-api";

// =============================================================================
// Test Helpers
// =============================================================================

function createTestDbPath(): string {
const dbPath = `/tmp/delta-health-test-${Date.now()}-${Math.random().toString(36).slice(2)}.db`;
const db = new Database(dbPath);

db.run(`
CREATE TABLE IF NOT EXISTS nodes (
id TEXT PRIMARY KEY,
name TEXT,
parent_id TEXT,
node_type TEXT,
created INTEGER,
updated INTEGER,
done_at INTEGER,
raw_data TEXT
)
`);

db.run(`
CREATE TABLE IF NOT EXISTS tag_applications (
id INTEGER PRIMARY KEY AUTOINCREMENT,
tuple_node_id TEXT NOT NULL,
data_node_id TEXT NOT NULL,
tag_id TEXT NOT NULL,
tag_name TEXT NOT NULL
)
`);

db.run(`
CREATE TABLE IF NOT EXISTS sync_metadata (
id INTEGER PRIMARY KEY CHECK (id = 1),
last_export_file TEXT NOT NULL DEFAULT '',
last_sync_timestamp INTEGER NOT NULL DEFAULT 0,
total_nodes INTEGER NOT NULL DEFAULT 0
)
`);

// Seed a full sync record
db.run(
"INSERT INTO sync_metadata (id, last_export_file, last_sync_timestamp, total_nodes) VALUES (1, 'test.json', ?, 100)",
[Date.now()]
);

db.close();
return dbPath;
}

function createMockClient() {
return {
searchNodes: async () => [] as SearchResultNode[],
health: async () => true,
};
}

function createMockLogger() {
return {
info: mock((..._args: unknown[]) => {}),
warn: mock((..._args: unknown[]) => {}),
error: mock((..._args: unknown[]) => {}),
};
}

// =============================================================================
// Tests
// =============================================================================

describe("DeltaSyncService - ensureHealthyConnection()", () => {
let service: DeltaSyncService;
let dbPath: string;
let mockLogger: ReturnType<typeof createMockLogger>;

beforeEach(() => {
dbPath = createTestDbPath();
mockLogger = createMockLogger();
});

afterEach(() => {
service?.close();
try { unlinkSync(dbPath); } catch { /* ignore */ }
});

it("should not reconnect when connection is healthy", () => {
service = new DeltaSyncService({
dbPath,
localApiClient: createMockClient(),
logger: mockLogger,
});

service.ensureHealthyConnection();

// No warn/info about reconnection
const warnCalls = mockLogger.warn.mock.calls.filter(
(call) => typeof call[0] === "string" && call[0].includes("unhealthy")
);
expect(warnCalls.length).toBe(0);
});

it("should reconnect when SELECT 1 fails", () => {
service = new DeltaSyncService({
dbPath,
localApiClient: createMockClient(),
logger: mockLogger,
});

// Close the internal DB to simulate stale connection
// Access internals via the service's close + re-create trick:
// We close the underlying connection by calling close(), then
// create a new service pointing at the same file
service.close();

// Re-create so ensureHealthyConnection has a closed DB to detect
service = new DeltaSyncService({
dbPath,
localApiClient: createMockClient(),
logger: mockLogger,
});

// Manually break the connection by closing the internal db
// @ts-expect-error - accessing private field for testing
service.db.close();

service.ensureHealthyConnection();

// Should have logged the unhealthy connection
const warnCalls = mockLogger.warn.mock.calls.filter(
(call) => typeof call[0] === "string" && call[0].includes("unhealthy")
);
expect(warnCalls.length).toBe(1);

// Should have logged re-establishment
const infoCalls = mockLogger.info.mock.calls.filter(
(call) => typeof call[0] === "string" && call[0].includes("re-established")
);
expect(infoCalls.length).toBe(1);

// Connection should work now — verify by running a query
service.ensureHealthyConnection(); // should not throw or warn again
const secondWarnCalls = mockLogger.warn.mock.calls.filter(
(call) => typeof call[0] === "string" && call[0].includes("unhealthy")
);
expect(secondWarnCalls.length).toBe(1); // still just the one from before
});

it("should handle close() gracefully during reconnect even if close is a no-op", () => {
service = new DeltaSyncService({
dbPath,
localApiClient: createMockClient(),
logger: mockLogger,
});

// Close the internal DB to simulate stale connection
// In Bun, double-close doesn't throw, so we verify the reconnect
// still works correctly regardless
// @ts-expect-error - accessing private field for testing
service.db.close();

service.ensureHealthyConnection();

// Should still have reconnected successfully
const infoCalls = mockLogger.info.mock.calls.filter(
(call) => typeof call[0] === "string" && call[0].includes("re-established")
);
expect(infoCalls.length).toBe(1);

// Connection should work after reconnect
service.ensureHealthyConnection();
const secondWarnCalls = mockLogger.warn.mock.calls.filter(
(call) => typeof call[0] === "string" && call[0].includes("unhealthy")
);
expect(secondWarnCalls.length).toBe(1); // only the first one
});

it("should configure busy_timeout on reconnected connection", () => {
service = new DeltaSyncService({
dbPath,
localApiClient: createMockClient(),
logger: mockLogger,
});

// Break connection
// @ts-expect-error - accessing private field for testing
service.db.close();

service.ensureHealthyConnection();

// Verify busy_timeout is set on the new connection
// @ts-expect-error - accessing private field for testing
const result = service.db.query("PRAGMA busy_timeout").get() as { timeout: number };
expect(result.timeout).toBe(5000);
});

it("should configure busy_timeout in constructor", () => {
service = new DeltaSyncService({
dbPath,
localApiClient: createMockClient(),
logger: mockLogger,
});

// @ts-expect-error - accessing private field for testing
const result = service.db.query("PRAGMA busy_timeout").get() as { timeout: number };
expect(result.timeout).toBe(5000);
});

it("should allow sync to succeed after reconnect", async () => {
service = new DeltaSyncService({
dbPath,
localApiClient: createMockClient(),
logger: mockLogger,
});

service.ensureSchema();

// Break connection
// @ts-expect-error - accessing private field for testing
service.db.close();

// Sync should recover via ensureHealthyConnection at start of sync()
const result = await service.sync();
expect(result).toHaveProperty("nodesFound");
expect(typeof result.nodesFound).toBe("number");
});
});
Loading
Loading