diff --git a/.env.example b/.env.example index dea1da2..0c77fe5 100644 --- a/.env.example +++ b/.env.example @@ -32,6 +32,11 @@ STELLAR_CLI_PATH=stellar # Default: info LOG_LEVEL=info +# ─── Encrypted IPC (optional) ─────────────────────────────────────────────── +# 32-byte key to encrypt stdio MCP traffic in shared environments. +# Provide as 64 hex chars or base64. When set, the MCP client must send/receive +# encrypted frames using the same key. +PULSAR_IPC_ENCRYPTION_KEY= # Tool execution audit log path. # Default: audit.log AUDIT_LOG_PATH=audit.log diff --git a/docs/encrypted_ipc.md b/docs/encrypted_ipc.md new file mode 100644 index 0000000..d9129fb --- /dev/null +++ b/docs/encrypted_ipc.md @@ -0,0 +1,69 @@ +# Encrypted IPC Communication + +Pulsar supports AES-256-GCM encryption of its stdio MCP transport to prevent sensitive data leakage in shared or multi-tenant environments. + +## How It Works + +When `PULSAR_IPC_ENCRYPTION_KEY` is set, Pulsar replaces the default plaintext stdio transport with `EncryptedStdioServerTransport`. Every JSON-RPC message (request and response) is wrapped in a versioned encrypted envelope before being written to stdout, and every incoming line from stdin is decrypted and authenticated before processing. + +The envelope format is newline-delimited JSON: + +```json +{ + "v": 1, + "nonce": "<12-byte base64>", + "ciphertext": "", + "tag": "<16-byte base64>" +} +``` + +- **Algorithm**: AES-256-GCM +- **Nonce**: 12 random bytes, freshly generated per message +- **Tag**: 16-byte GCM authentication tag — any tampered or miskeyed frame is rejected and the connection is closed + +If `PULSAR_IPC_ENCRYPTION_KEY` is not set, Pulsar falls back to the standard plaintext `StdioServerTransport` with no behaviour change. + +## Configuration + +Add the key to your environment or `.env` file: + +```env +# 32-byte key — provide as 64 hex characters or base64 +PULSAR_IPC_ENCRYPTION_KEY= +``` + +### Generating a Key + +```bash +# hex (recommended) +node -e "console.log(require('crypto').randomBytes(32).toString('hex'))" + +# base64 +node -e "console.log(require('crypto').randomBytes(32).toString('base64'))" +``` + +## Client Requirements + +Any MCP client connecting to an encryption-enabled Pulsar instance **must** send and receive the same versioned envelope format using the same 32-byte key. Plaintext frames are rejected immediately and the transport is closed. + +## Security Properties + +| Property | Detail | +|---|---| +| Algorithm | AES-256-GCM | +| Key size | 256 bits (32 bytes) | +| Nonce | 96-bit random per message | +| Integrity | GCM authentication tag — rejects tampered/miskeyed frames | +| Key in logs | Redacted via pino `redact` paths (`PULSAR_IPC_ENCRYPTION_KEY`) | +| Key in CLI args | Never passed as a process argument | +| Key in stderr | Never emitted | + +## Error Handling + +| Condition | Behaviour | +|---|---| +| Malformed JSON frame | `onerror` callback fired; connection closed | +| Missing envelope fields | `onerror` callback fired; connection closed | +| GCM authentication failure | `onerror` callback fired; connection closed | +| Invalid nonce/tag length | `onerror` callback fired; connection closed | +| Plaintext frame received | `onerror` callback fired; connection closed | diff --git a/src/config.ts b/src/config.ts index cdfe007..71268de 100644 --- a/src/config.ts +++ b/src/config.ts @@ -13,6 +13,7 @@ const configSchema = z.object({ stellarSecretKey: z.string().startsWith('S').length(56).optional(), stellarCliPath: z.string().default('stellar'), logLevel: z.enum(['error', 'warn', 'info', 'debug']).default('info'), + ipcEncryptionKey: z.string().optional(), language: z.enum(['en', 'es']).default('en'), auditLogPath: z.string().default('audit.log'), sorobanRpcUrls: z.array(z.string().url()).optional().describe("Array of Soroban RPC endpoints for latency-based routing (preferred over sorobanRpcUrl)"), @@ -42,6 +43,7 @@ const rawConfig = { stellarSecretKey: process.env.STELLAR_SECRET_KEY || undefined, stellarCliPath: process.env.STELLAR_CLI_PATH || 'stellar', logLevel: process.env.LOG_LEVEL || 'info', + ipcEncryptionKey: process.env.PULSAR_IPC_ENCRYPTION_KEY || undefined, language: process.env.LANGUAGE || 'en', auditLogPath: process.env.AUDIT_LOG_PATH || 'audit.log', stellarCliPath: process.env.STELLAR_CLI_PATH || "stellar", diff --git a/src/index.ts b/src/index.ts index e168628..be868b0 100644 --- a/src/index.ts +++ b/src/index.ts @@ -141,6 +141,7 @@ import { import logger from './logger.js'; import { PulsarError, PulsarNetworkError, PulsarValidationError } from './errors.js'; +import { createStdioTransport } from './transport/stdio.js'; import { applyFieldProjection } from './schemas/index.js'; import { initializeI18n } from './i18n/index.js'; import { logToolExecution } from './audit.js'; @@ -405,6 +406,8 @@ class PulsarServer { 'Fetch the ABI/interface spec of a deployed Soroban contract. Returns decoded function signatures, parameter types, and emitted event schemas.', description: 'Fetch the ABI/interface spec of a deployed Soroban contract. Returns decoded function signatures, parameter types, and emitted event schemas.', + description: + 'Fetch the ABI/interface spec of a deployed Soroban contract. Returns decoded function signatures, parameter types, and emitted event schemas.', inputSchema: { type: 'object', description: @@ -455,6 +458,7 @@ class PulsarServer { properties: { contract_id: { type: 'string', + description: 'The Soroban contract address (C...)', description: 'The Soroban contract address (C...) to observe events for.', }, event_type: { @@ -2208,6 +2212,7 @@ class PulsarServer { const parsed = SimulateTransactionsSequenceInputSchema.safeParse(args); if (!parsed.success) { throw new PulsarValidationError( + `Invalid input for get_account_balance`, `Invalid input for simulate_transaction`, `Invalid input for simulate_transactions_sequence`, parsed.error.format() @@ -2227,6 +2232,12 @@ class PulsarServer { case 'emergency_pause': { const parsed = EmergencyPauseInputSchema.safeParse(args); if (!parsed.success) { + throw new PulsarValidationError( + `Invalid input for fetch_contract_spec`, + parsed.error.format() + ); + } + const result = await fetchContractSpec(parsed.data); throw new PulsarValidationError(`Invalid input for emergency_pause`, parsed.error.format()); } const result = await emergencyPause(parsed.data); @@ -2248,6 +2259,7 @@ class PulsarServer { const parsed = SorobanMathInputSchema.safeParse(args); if (!parsed.success) { throw new PulsarValidationError( + `Invalid input for submit_transaction`, `Invalid input for compute_vesting_schedule`, parsed.error.format() ); @@ -2326,6 +2338,7 @@ class PulsarServer { const parsed = ExportDataInputSchema.safeParse(args); if (!parsed.success) { throw new PulsarValidationError( + `Invalid input for simulate_transaction`, `Invalid input for export_data`, parsed.error.format() ); @@ -2364,6 +2377,7 @@ class PulsarServer { const parsed = GenerateContractClientInputSchema.safeParse(args); if (!parsed.success) { throw new PulsarValidationError( + `Invalid input for compute_vesting_schedule`, `Invalid input for deploy_contract`, `Invalid input for generate_contract_client`, parsed.error.format() @@ -2379,6 +2393,7 @@ class PulsarServer { const parsed = ManageDaoTreasuryInputSchema.safeParse(args); if (!parsed.success) { throw new PulsarValidationError( + `Invalid input for deploy_contract`, `Invalid input for manage_dao_treasury`, parsed.error.format() ); @@ -2843,6 +2858,9 @@ class PulsarServer { } async run() { + const transport = createStdioTransport({ + encryptionKey: config.ipcEncryptionKey, + }); // Start metrics recording and endpoint if (config.metricsEnabled) { const metricsInterval = startMetricsRecording(); diff --git a/src/logger.ts b/src/logger.ts index c7288c4..ef955ec 100644 --- a/src/logger.ts +++ b/src/logger.ts @@ -7,6 +7,7 @@ import { config } from './config.js'; */ const redactPaths = [ 'STELLAR_SECRET_KEY', + 'PULSAR_IPC_ENCRYPTION_KEY', 'secret', 'privateKey', 'raw_secret', diff --git a/src/transport/stdio.ts b/src/transport/stdio.ts new file mode 100644 index 0000000..8e08151 --- /dev/null +++ b/src/transport/stdio.ts @@ -0,0 +1,240 @@ +import process from 'node:process'; +import { createCipheriv, createDecipheriv, randomBytes } from 'node:crypto'; +import { Readable, Writable } from 'node:stream'; + +import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js'; +import type { + Transport, + TransportSendOptions, +} from '@modelcontextprotocol/sdk/shared/transport.js'; +import { JSONRPCMessageSchema, type JSONRPCMessage } from '@modelcontextprotocol/sdk/types.js'; + +import { PulsarValidationError } from '../errors.js'; + +const AES_ALGORITHM = 'aes-256-gcm'; +const ENVELOPE_VERSION = 1; +const NONCE_BYTES = 12; +const TAG_BYTES = 16; +const HEX_KEY_REGEX = /^[0-9a-fA-F]{64}$/; + +class LineBuffer { + private buffer?: Buffer; + + append(chunk: Buffer): void { + this.buffer = this.buffer ? Buffer.concat([this.buffer, chunk]) : chunk; + } + + readLine(): string | null { + if (!this.buffer) return null; + const index = this.buffer.indexOf('\n'); + if (index === -1) return null; + const line = this.buffer.toString('utf8', 0, index).replace(/\r$/, ''); + this.buffer = this.buffer.subarray(index + 1); + return line; + } + + clear(): void { + this.buffer = undefined; + } +} + +type EncryptedEnvelope = { + v: number; + nonce: string; + ciphertext: string; + tag: string; +}; + +function isEncryptedEnvelope(value: unknown): value is EncryptedEnvelope { + if (!value || typeof value !== 'object') return false; + const record = value as Record; + return ( + record.v === ENVELOPE_VERSION && + typeof record.nonce === 'string' && + typeof record.ciphertext === 'string' && + typeof record.tag === 'string' + ); +} + +function toError(error: unknown, fallback: string): Error { + if (error instanceof Error) return error; + return new Error(fallback); +} + +function decryptEnvelope(line: string, key: Buffer): JSONRPCMessage { + let parsed: unknown; + try { + parsed = JSON.parse(line); + } catch { + throw new Error('Invalid encrypted stdio frame'); + } + + if (!isEncryptedEnvelope(parsed)) { + throw new Error('Invalid encrypted stdio frame'); + } + + const nonce = Buffer.from(parsed.nonce, 'base64'); + const tag = Buffer.from(parsed.tag, 'base64'); + const ciphertext = Buffer.from(parsed.ciphertext, 'base64'); + + if (nonce.length !== NONCE_BYTES || tag.length !== TAG_BYTES) { + throw new Error('Invalid encrypted stdio frame'); + } + + const decipher = createDecipheriv(AES_ALGORITHM, key, nonce); + decipher.setAuthTag(tag); + + let plaintext: Buffer; + try { + plaintext = Buffer.concat([decipher.update(ciphertext), decipher.final()]); + } catch { + throw new Error('Invalid encrypted stdio payload'); + } + + try { + return JSONRPCMessageSchema.parse(JSON.parse(plaintext.toString('utf8'))); + } catch { + throw new Error('Invalid encrypted stdio payload'); + } +} + +function encryptMessage(message: JSONRPCMessage, key: Buffer): string { + const nonce = randomBytes(NONCE_BYTES); + const cipher = createCipheriv(AES_ALGORITHM, key, nonce); + const plaintext = Buffer.from(JSON.stringify(message), 'utf8'); + const ciphertext = Buffer.concat([cipher.update(plaintext), cipher.final()]); + const tag = cipher.getAuthTag(); + + return ( + JSON.stringify({ + v: ENVELOPE_VERSION, + nonce: nonce.toString('base64'), + ciphertext: ciphertext.toString('base64'), + tag: tag.toString('base64'), + }) + '\n' + ); +} + +export function parseIpcEncryptionKey(rawKey: string): Buffer { + const key = rawKey.trim(); + if (!key) { + throw new PulsarValidationError('PULSAR_IPC_ENCRYPTION_KEY must not be empty'); + } + + if (HEX_KEY_REGEX.test(key)) { + return Buffer.from(key, 'hex'); + } + + const decoded = Buffer.from(key, 'base64'); + const normalized = decoded.toString('base64').replace(/=+$/, ''); + const normalizedInput = key.replace(/=+$/, ''); + + if (decoded.length === 32 && normalized === normalizedInput) { + return decoded; + } + + throw new PulsarValidationError( + 'PULSAR_IPC_ENCRYPTION_KEY must be 32 bytes (64 hex chars or base64).' + ); +} + +export class EncryptedStdioServerTransport implements Transport { + private readonly stdin: Readable; + private readonly stdout: Writable; + private readonly key: Buffer; + private readonly readBuffer = new LineBuffer(); + private started = false; + + constructor({ + key, + stdin = process.stdin, + stdout = process.stdout, + }: { + key: Buffer; + stdin?: Readable; + stdout?: Writable; + }) { + this.stdin = stdin; + this.stdout = stdout; + this.key = key; + } + + onclose?: () => void; + onerror?: (error: Error) => void; + onmessage?: (message: JSONRPCMessage) => void; + sessionId?: string; + + private onData = (chunk: Buffer) => { + this.readBuffer.append(chunk); + this.processReadBuffer(); + }; + + private onError = (error: Error) => { + this.onerror?.(error); + }; + + async start(): Promise { + if (this.started) { + throw new Error('EncryptedStdioServerTransport already started.'); + } + this.started = true; + this.stdin.on('data', this.onData); + this.stdin.on('error', this.onError); + } + + private processReadBuffer(): void { + let line = this.readBuffer.readLine(); + while (line !== null) { + if (line.length > 0) { + try { + const message = decryptEnvelope(line, this.key); + this.onmessage?.(message); + } catch (error) { + this.onerror?.(toError(error, 'Invalid encrypted stdio frame')); + void this.close(); + return; + } + } + line = this.readBuffer.readLine(); + } + } + + async close(): Promise { + this.stdin.off('data', this.onData); + this.stdin.off('error', this.onError); + + const remainingDataListeners = this.stdin.listenerCount('data'); + if (remainingDataListeners === 0) { + this.stdin.pause(); + } + + this.readBuffer.clear(); + this.onclose?.(); + } + + send(message: JSONRPCMessage, _options?: TransportSendOptions): Promise { + return new Promise((resolve) => { + const encryptedLine = encryptMessage(message, this.key); + if (this.stdout.write(encryptedLine)) { + resolve(); + } else { + this.stdout.once('drain', resolve); + } + }); + } +} + +export type StdioTransportOptions = { + encryptionKey?: string; + stdin?: Readable; + stdout?: Writable; +}; + +export function createStdioTransport(options: StdioTransportOptions = {}): Transport { + if (!options.encryptionKey) { + return new StdioServerTransport(options.stdin, options.stdout); + } + + const key = parseIpcEncryptionKey(options.encryptionKey); + return new EncryptedStdioServerTransport({ key, stdin: options.stdin, stdout: options.stdout }); +} diff --git a/tests/unit/stdio_transport.test.ts b/tests/unit/stdio_transport.test.ts new file mode 100644 index 0000000..8b4b2b0 --- /dev/null +++ b/tests/unit/stdio_transport.test.ts @@ -0,0 +1,156 @@ +import { randomBytes } from 'node:crypto'; +import { PassThrough } from 'node:stream'; + +import { describe, expect, it } from 'vitest'; +import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js'; + +import { + createStdioTransport, + EncryptedStdioServerTransport, + parseIpcEncryptionKey, +} from '../../src/transport/stdio.js'; + +describe('stdio transport encryption', () => { + it('parses hex encryption keys', () => { + const raw = randomBytes(32).toString('hex'); + const key = parseIpcEncryptionKey(raw); + + expect(key.length).toBe(32); + expect(key.toString('hex')).toBe(raw); + }); + + it('parses base64 encryption keys', () => { + const raw = randomBytes(32).toString('base64'); + const key = parseIpcEncryptionKey(raw); + + expect(key.length).toBe(32); + expect(key.toString('base64').replace(/=+$/, '')).toBe(raw.replace(/=+$/, '')); + }); + + it('rejects invalid encryption keys', () => { + expect(() => parseIpcEncryptionKey('not-a-key')).toThrow(/PULSAR_IPC_ENCRYPTION_KEY/); + }); + + it('rejects empty encryption keys', () => { + expect(() => parseIpcEncryptionKey('')).toThrow(/PULSAR_IPC_ENCRYPTION_KEY/); + }); + + it('rejects whitespace-only encryption keys', () => { + expect(() => parseIpcEncryptionKey(' ')).toThrow(/PULSAR_IPC_ENCRYPTION_KEY/); + }); + + it('creates a standard stdio transport without encryption', () => { + const transport = createStdioTransport(); + expect(transport).toBeInstanceOf(StdioServerTransport); + }); + + it('creates an encrypted stdio transport when key is provided', () => { + const raw = randomBytes(32).toString('hex'); + const transport = createStdioTransport({ encryptionKey: raw }); + + expect(transport).toBeInstanceOf(EncryptedStdioServerTransport); + }); + + it('encrypts and decrypts stdio frames', async () => { + const key = randomBytes(32); + const wire = new PassThrough(); + const receiverOut = new PassThrough(); + const receiver = new EncryptedStdioServerTransport({ key, stdin: wire, stdout: receiverOut }); + + const received: Array> = []; + receiver.onmessage = (message) => received.push(message as Record); + + await receiver.start(); + + const sender = new EncryptedStdioServerTransport({ + key, + stdin: new PassThrough(), + stdout: wire, + }); + + const message = { jsonrpc: '2.0' as const, id: 1, method: 'tools/list' }; + await sender.send(message); + + await new Promise((resolve) => setTimeout(resolve, 0)); + + expect(received).toEqual([message]); + }); + + it('rejects plaintext frames when encryption is enabled', async () => { + const key = randomBytes(32); + const stdin = new PassThrough(); + const stdout = new PassThrough(); + const transport = new EncryptedStdioServerTransport({ key, stdin, stdout }); + + const errors: Error[] = []; + const messages: Array> = []; + + transport.onerror = (error) => errors.push(error); + transport.onmessage = (message) => messages.push(message as Record); + + await transport.start(); + + stdin.write('{"jsonrpc":"2.0","id":1,"method":"tools/list"}\n'); + + await new Promise((resolve) => setTimeout(resolve, 0)); + + expect(messages).toHaveLength(0); + expect(errors).toHaveLength(1); + }); + + it('rejects frames encrypted with a different key', async () => { + const keyA = randomBytes(32); + const keyB = randomBytes(32); + const wire = new PassThrough(); + const receiverOut = new PassThrough(); + + const sender = new EncryptedStdioServerTransport({ + key: keyA, + stdin: new PassThrough(), + stdout: wire, + }); + + const receiver = new EncryptedStdioServerTransport({ + key: keyB, + stdin: wire, + stdout: receiverOut, + }); + + const errors: Error[] = []; + const messages: Array> = []; + receiver.onerror = (error) => errors.push(error); + receiver.onmessage = (message) => messages.push(message as Record); + + await receiver.start(); + await sender.send({ jsonrpc: '2.0' as const, id: 1, method: 'tools/list' }); + + await new Promise((resolve) => setTimeout(resolve, 0)); + + expect(messages).toHaveLength(0); + expect(errors).toHaveLength(1); + expect(errors[0].message).toMatch(/Invalid encrypted stdio/); + }); + + it('throws when started twice', async () => { + const key = randomBytes(32); + const stdin = new PassThrough(); + const stdout = new PassThrough(); + const transport = new EncryptedStdioServerTransport({ key, stdin, stdout }); + + await transport.start(); + await expect(transport.start()).rejects.toThrow(/already started/); + }); + + it('removes stdin listeners on close', async () => { + const key = randomBytes(32); + const stdin = new PassThrough(); + const stdout = new PassThrough(); + const transport = new EncryptedStdioServerTransport({ key, stdin, stdout }); + + await transport.start(); + expect(stdin.listenerCount('data')).toBe(1); + + await transport.close(); + expect(stdin.listenerCount('data')).toBe(0); + }); +}); diff --git a/tsconfig.json b/tsconfig.json index 71592ba..b649cd9 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -18,6 +18,7 @@ "include": [ "src/**/*", "tests/**/*", + "vitest.config.ts", "tools/account_merge.ts", "tools/account_merge.test.ts" ],