diff --git a/packages/nx/src/command-line/daemon/daemon.ts b/packages/nx/src/command-line/daemon/daemon.ts index 7abdbc6dedcfa..0495a5f757b03 100644 --- a/packages/nx/src/command-line/daemon/daemon.ts +++ b/packages/nx/src/command-line/daemon/daemon.ts @@ -2,6 +2,7 @@ import type { Arguments } from 'yargs'; import { DAEMON_OUTPUT_LOG_FILE } from '../../daemon/tmp-dir'; import { output } from '../../utils/output'; import { generateDaemonHelpOutput } from '../../daemon/client/generate-help-output'; +import { daemonClient } from '../../daemon/client/client'; export async function daemonHandler(args: Arguments) { if (args.start) { diff --git a/packages/nx/src/daemon/client/client.ts b/packages/nx/src/daemon/client/client.ts index c99c8e7f03b0b..e8da03808d40b 100644 --- a/packages/nx/src/daemon/client/client.ts +++ b/packages/nx/src/daemon/client/client.ts @@ -77,6 +77,8 @@ import { FLUSH_SYNC_GENERATOR_CHANGES_TO_DISK, type HandleFlushSyncGeneratorChangesToDiskMessage, } from '../message-types/flush-sync-generator-changes-to-disk'; +import { deserialize } from 'node:v8'; +import { isJsonMessage } from '../../utils/consume-messages-from-socket'; const DAEMON_ENV_SETTINGS = { NX_PROJECT_GLOB_CACHE: 'false', @@ -227,7 +229,10 @@ export class DaemonClient { return this.sendToDaemonViaQueue({ type: 'HASH_TASKS', runnerOptions, - env, + env: + process.env.NX_USE_V8_SERIALIZER !== 'false' + ? structuredClone(process.env) + : env, tasks, taskGraph, }); @@ -265,7 +270,9 @@ export class DaemonClient { ).listen( (message) => { try { - const parsedMessage = JSON.parse(message); + const parsedMessage = isJsonMessage(message) + ? JSON.parse(message) + : deserialize(Buffer.from(message, 'binary')); callback(null, parsedMessage); } catch (e) { callback(e, null); @@ -543,13 +550,15 @@ export class DaemonClient { private handleMessage(serializedResult: string) { try { - performance.mark('json-parse-start'); - const parsedResult = JSON.parse(serializedResult); - performance.mark('json-parse-end'); + performance.mark('result-parse-start'); + const parsedResult = isJsonMessage(serializedResult) + ? JSON.parse(serializedResult) + : deserialize(Buffer.from(serializedResult, 'binary')); + performance.mark('result-parse-end'); performance.measure( 'deserialize daemon response', - 'json-parse-start', - 'json-parse-end' + 'result-parse-start', + 'result-parse-end' ); if (parsedResult.error) { this.currentReject(parsedResult.error); @@ -557,7 +566,7 @@ export class DaemonClient { performance.measure( 'total for sendMessageToDaemon()', 'sendMessageToDaemon-start', - 'json-parse-end' + 'result-parse-end' ); return this.currentResolve(parsedResult); } diff --git a/packages/nx/src/daemon/client/daemon-socket-messenger.ts b/packages/nx/src/daemon/client/daemon-socket-messenger.ts index 34c00ff12e839..c582bcb744efc 100644 --- a/packages/nx/src/daemon/client/daemon-socket-messenger.ts +++ b/packages/nx/src/daemon/client/daemon-socket-messenger.ts @@ -1,7 +1,10 @@ -import { randomUUID } from 'crypto'; +import { serialize } from 'v8'; import { Socket } from 'net'; import { performance } from 'perf_hooks'; -import { consumeMessagesFromSocket } from '../../utils/consume-messages-from-socket'; +import { + consumeMessagesFromSocket, + MESSAGE_END_SEQ, +} from '../../utils/consume-messages-from-socket'; export interface Message extends Record { type: string; @@ -12,9 +15,14 @@ export class DaemonSocketMessenger { constructor(private socket: Socket) {} async sendMessage(messageToDaemon: Message) { - this.socket.write(JSON.stringify(messageToDaemon)); + if (process.env.NX_USE_V8_SERIALIZER !== 'false') { + const serialized = serialize(messageToDaemon); + this.socket.write(serialized.toString('binary')); + } else { + this.socket.write(JSON.stringify(messageToDaemon)); + } // send EOT to indicate that the message has been fully written - this.socket.write(String.fromCodePoint(4)); + this.socket.write(MESSAGE_END_SEQ); } listen( diff --git a/packages/nx/src/daemon/server/file-watching/file-watcher-sockets.ts b/packages/nx/src/daemon/server/file-watching/file-watcher-sockets.ts index 453010d913bc9..0ac3a61501220 100644 --- a/packages/nx/src/daemon/server/file-watching/file-watcher-sockets.ts +++ b/packages/nx/src/daemon/server/file-watching/file-watcher-sockets.ts @@ -89,14 +89,18 @@ export function notifyFileWatcherSockets( } if (changedProjects.length > 0 || changedFiles.length > 0) { - return handleResult(socket, 'FILE-WATCH-CHANGED', () => - Promise.resolve({ - description: 'File watch changed', - response: JSON.stringify({ - changedProjects, - changedFiles, + return handleResult( + socket, + 'FILE-WATCH-CHANGED', + () => + Promise.resolve({ + description: 'File watch changed', + response: JSON.stringify({ + changedProjects, + changedFiles, + }), }), - }) + 'json' ); } }) diff --git a/packages/nx/src/daemon/server/handle-context-file-data.ts b/packages/nx/src/daemon/server/handle-context-file-data.ts index 52468d72033d8..849e2aa4174bd 100644 --- a/packages/nx/src/daemon/server/handle-context-file-data.ts +++ b/packages/nx/src/daemon/server/handle-context-file-data.ts @@ -5,7 +5,7 @@ import { HandlerResult } from './server'; export async function handleContextFileData(): Promise { const files = await getAllFileDataInContext(workspaceRoot); return { - response: JSON.stringify(files), + response: files, description: 'handleContextFileData', }; } diff --git a/packages/nx/src/daemon/server/handle-flush-sync-generator-changes-to-disk.ts b/packages/nx/src/daemon/server/handle-flush-sync-generator-changes-to-disk.ts index 1a36ff4fd086d..6a71b8431330f 100644 --- a/packages/nx/src/daemon/server/handle-flush-sync-generator-changes-to-disk.ts +++ b/packages/nx/src/daemon/server/handle-flush-sync-generator-changes-to-disk.ts @@ -7,7 +7,7 @@ export async function handleFlushSyncGeneratorChangesToDisk( const result = await flushSyncGeneratorChangesToDisk(generators); return { - response: JSON.stringify(result), + response: result, description: 'handleFlushSyncGeneratorChangesToDisk', }; } diff --git a/packages/nx/src/daemon/server/handle-get-files-in-directory.ts b/packages/nx/src/daemon/server/handle-get-files-in-directory.ts index adcfe93f82c4e..8ab03e1942491 100644 --- a/packages/nx/src/daemon/server/handle-get-files-in-directory.ts +++ b/packages/nx/src/daemon/server/handle-get-files-in-directory.ts @@ -7,7 +7,7 @@ export async function handleGetFilesInDirectory( ): Promise { const files = await getFilesInDirectoryUsingContext(workspaceRoot, dir); return { - response: JSON.stringify(files), + response: files, description: 'handleNxWorkspaceFiles', }; } diff --git a/packages/nx/src/daemon/server/handle-get-registered-sync-generators.ts b/packages/nx/src/daemon/server/handle-get-registered-sync-generators.ts index f209e5e1307b2..c3789e221344a 100644 --- a/packages/nx/src/daemon/server/handle-get-registered-sync-generators.ts +++ b/packages/nx/src/daemon/server/handle-get-registered-sync-generators.ts @@ -5,7 +5,7 @@ export async function handleGetRegisteredSyncGenerators(): Promise { const files = await globWithWorkspaceContext(workspaceRoot, globs, exclude); return { - response: JSON.stringify(files), + response: files, description: 'handleGlob', }; } diff --git a/packages/nx/src/daemon/server/handle-hash-tasks.ts b/packages/nx/src/daemon/server/handle-hash-tasks.ts index 9d99fa88a97e2..a835f16fdd752 100644 --- a/packages/nx/src/daemon/server/handle-hash-tasks.ts +++ b/packages/nx/src/daemon/server/handle-hash-tasks.ts @@ -47,8 +47,10 @@ export async function handleHashTasks(payload: { payload.runnerOptions ); } - const response = JSON.stringify( - await storedHasher.hashTasks(payload.tasks, payload.taskGraph, payload.env) + const response = await storedHasher.hashTasks( + payload.tasks, + payload.taskGraph, + payload.env ); return { response, diff --git a/packages/nx/src/daemon/server/handle-nx-workspace-files.ts b/packages/nx/src/daemon/server/handle-nx-workspace-files.ts index c4a8c03b04025..96f11a4d6d3f5 100644 --- a/packages/nx/src/daemon/server/handle-nx-workspace-files.ts +++ b/packages/nx/src/daemon/server/handle-nx-workspace-files.ts @@ -10,7 +10,7 @@ export async function handleNxWorkspaceFiles( projectRootMap ); return { - response: JSON.stringify(files), + response: files, description: 'handleNxWorkspaceFiles', }; } diff --git a/packages/nx/src/daemon/server/handle-outputs-tracking.ts b/packages/nx/src/daemon/server/handle-outputs-tracking.ts index e5d59b73d9f35..63a3da9bbf035 100644 --- a/packages/nx/src/daemon/server/handle-outputs-tracking.ts +++ b/packages/nx/src/daemon/server/handle-outputs-tracking.ts @@ -31,7 +31,7 @@ export async function handleOutputsHashesMatch(payload: { payload.data.hash ); return { - response: JSON.stringify(res), + response: res, description: 'outputsHashesMatch', }; } catch (e) { diff --git a/packages/nx/src/daemon/server/handle-task-history.ts b/packages/nx/src/daemon/server/handle-task-history.ts index c3e94f8834dfb..0297f6678ec98 100644 --- a/packages/nx/src/daemon/server/handle-task-history.ts +++ b/packages/nx/src/daemon/server/handle-task-history.ts @@ -14,7 +14,7 @@ export async function handleGetFlakyTasks(hashes: string[]) { const taskHistory = getTaskHistory(); const history = await taskHistory.getFlakyTasks(hashes); return { - response: JSON.stringify(history), + response: history, description: 'handleGetFlakyTasks', }; } @@ -23,7 +23,7 @@ export async function handleGetEstimatedTaskTimings(targets: TaskTarget[]) { const taskHistory = getTaskHistory(); const history = await taskHistory.getEstimatedTaskTimings(targets); return { - response: JSON.stringify(history), + response: history, description: 'handleGetEstimatedTaskTimings', }; } diff --git a/packages/nx/src/daemon/server/server.ts b/packages/nx/src/daemon/server/server.ts index e418169cced04..7312bc6d4ab83 100644 --- a/packages/nx/src/daemon/server/server.ts +++ b/packages/nx/src/daemon/server/server.ts @@ -4,7 +4,10 @@ import { join } from 'path'; import { PerformanceObserver } from 'perf_hooks'; import { hashArray } from '../../hasher/file-hasher'; import { hashFile } from '../../native'; -import { consumeMessagesFromSocket } from '../../utils/consume-messages-from-socket'; +import { + consumeMessagesFromSocket, + isJsonMessage, +} from '../../utils/consume-messages-from-socket'; import { readJsonFile } from '../../utils/fileutils'; import { PackageJson } from '../../utils/package-json'; import { nxVersion } from '../../utils/versions'; @@ -110,6 +113,7 @@ import { isHandleFlushSyncGeneratorChangesToDiskMessage, } from '../message-types/flush-sync-generator-changes-to-disk'; import { handleFlushSyncGeneratorChangesToDisk } from './handle-flush-sync-generator-changes-to-disk'; +import { deserialize, serialize } from 'v8'; let performanceObserver: PerformanceObserver | undefined; let workspaceWatcherError: Error | undefined; @@ -120,7 +124,7 @@ global.NX_DAEMON = true; export type HandlerResult = { description: string; error?: any; - response?: string; + response?: string | object | boolean; }; let numberOfOpenConnections = 0; @@ -165,7 +169,7 @@ const server = createServer(async (socket) => { }); registerProcessTerminationListeners(); -async function handleMessage(socket, data: string) { +async function handleMessage(socket: Socket, data: string) { if (workspaceWatcherError) { await respondWithErrorAndExit( socket, @@ -187,8 +191,16 @@ async function handleMessage(socket, data: string) { const unparsedPayload = data; let payload; + let mode: 'json' | 'v8' = 'json'; try { - payload = JSON.parse(unparsedPayload); + // JSON Message + if (isJsonMessage(unparsedPayload)) { + payload = JSON.parse(unparsedPayload); + } else { + // V8 Serialized Message + payload = deserialize(Buffer.from(unparsedPayload, 'binary')); + mode = 'v8'; + } } catch (e) { await respondWithErrorAndExit( socket, @@ -196,90 +208,152 @@ async function handleMessage(socket, data: string) { new Error(`Unsupported payload sent to daemon server: ${unparsedPayload}`) ); } - if (payload.type === 'PING') { - await handleResult(socket, 'PING', () => - Promise.resolve({ response: JSON.stringify(true), description: 'ping' }) + await handleResult( + socket, + 'PING', + () => Promise.resolve({ response: true, description: 'ping' }), + mode ); } else if (payload.type === 'REQUEST_PROJECT_GRAPH') { - await handleResult(socket, 'REQUEST_PROJECT_GRAPH', () => - handleRequestProjectGraph() + await handleResult( + socket, + 'REQUEST_PROJECT_GRAPH', + () => handleRequestProjectGraph(), + mode ); } else if (payload.type === 'HASH_TASKS') { - await handleResult(socket, 'HASH_TASKS', () => handleHashTasks(payload)); + await handleResult( + socket, + 'HASH_TASKS', + () => handleHashTasks(payload), + mode + ); } else if (payload.type === 'PROCESS_IN_BACKGROUND') { - await handleResult(socket, 'PROCESS_IN_BACKGROUND', () => - handleProcessInBackground(payload) + await handleResult( + socket, + 'PROCESS_IN_BACKGROUND', + () => handleProcessInBackground(payload), + mode ); } else if (payload.type === 'RECORD_OUTPUTS_HASH') { - await handleResult(socket, 'RECORD_OUTPUTS_HASH', () => - handleRecordOutputsHash(payload) + await handleResult( + socket, + 'RECORD_OUTPUTS_HASH', + () => handleRecordOutputsHash(payload), + mode ); } else if (payload.type === 'OUTPUTS_HASHES_MATCH') { - await handleResult(socket, 'OUTPUTS_HASHES_MATCH', () => - handleOutputsHashesMatch(payload) + await handleResult( + socket, + 'OUTPUTS_HASHES_MATCH', + () => handleOutputsHashesMatch(payload), + mode ); } else if (payload.type === 'REQUEST_SHUTDOWN') { - await handleResult(socket, 'REQUEST_SHUTDOWN', () => - handleRequestShutdown(server, numberOfOpenConnections) + await handleResult( + socket, + 'REQUEST_SHUTDOWN', + () => handleRequestShutdown(server, numberOfOpenConnections), + mode ); } else if (payload.type === 'REGISTER_FILE_WATCHER') { registeredFileWatcherSockets.push({ socket, config: payload.config }); } else if (isHandleGlobMessage(payload)) { - await handleResult(socket, GLOB, () => - handleGlob(payload.globs, payload.exclude) + await handleResult( + socket, + GLOB, + () => handleGlob(payload.globs, payload.exclude), + mode ); } else if (isHandleNxWorkspaceFilesMessage(payload)) { - await handleResult(socket, GET_NX_WORKSPACE_FILES, () => - handleNxWorkspaceFiles(payload.projectRootMap) + await handleResult( + socket, + GET_NX_WORKSPACE_FILES, + () => handleNxWorkspaceFiles(payload.projectRootMap), + mode ); } else if (isHandleGetFilesInDirectoryMessage(payload)) { - await handleResult(socket, GET_FILES_IN_DIRECTORY, () => - handleGetFilesInDirectory(payload.dir) + await handleResult( + socket, + GET_FILES_IN_DIRECTORY, + () => handleGetFilesInDirectory(payload.dir), + mode ); } else if (isHandleContextFileDataMessage(payload)) { - await handleResult(socket, GET_CONTEXT_FILE_DATA, () => - handleContextFileData() + await handleResult( + socket, + GET_CONTEXT_FILE_DATA, + () => handleContextFileData(), + mode ); } else if (isHandleHashGlobMessage(payload)) { - await handleResult(socket, HASH_GLOB, () => - handleHashGlob(payload.globs, payload.exclude) + await handleResult( + socket, + HASH_GLOB, + () => handleHashGlob(payload.globs, payload.exclude), + mode ); } else if (isHandleGetFlakyTasksMessage(payload)) { - await handleResult(socket, GET_FLAKY_TASKS, () => - handleGetFlakyTasks(payload.hashes) + await handleResult( + socket, + GET_FLAKY_TASKS, + () => handleGetFlakyTasks(payload.hashes), + mode ); } else if (isHandleGetEstimatedTaskTimings(payload)) { - await handleResult(socket, GET_ESTIMATED_TASK_TIMINGS, () => - handleGetEstimatedTaskTimings(payload.targets) + await handleResult( + socket, + GET_ESTIMATED_TASK_TIMINGS, + () => handleGetEstimatedTaskTimings(payload.targets), + mode ); } else if (isHandleWriteTaskRunsToHistoryMessage(payload)) { - await handleResult(socket, RECORD_TASK_RUNS, () => - handleRecordTaskRuns(payload.taskRuns) + await handleResult( + socket, + RECORD_TASK_RUNS, + () => handleRecordTaskRuns(payload.taskRuns), + mode ); } else if (isHandleForceShutdownMessage(payload)) { - await handleResult(socket, 'FORCE_SHUTDOWN', () => - handleForceShutdown(server) + await handleResult( + socket, + 'FORCE_SHUTDOWN', + () => handleForceShutdown(server), + mode ); } else if (isHandleGetSyncGeneratorChangesMessage(payload)) { - await handleResult(socket, GET_SYNC_GENERATOR_CHANGES, () => - handleGetSyncGeneratorChanges(payload.generators) + await handleResult( + socket, + GET_SYNC_GENERATOR_CHANGES, + () => handleGetSyncGeneratorChanges(payload.generators), + mode ); } else if (isHandleFlushSyncGeneratorChangesToDiskMessage(payload)) { - await handleResult(socket, FLUSH_SYNC_GENERATOR_CHANGES_TO_DISK, () => - handleFlushSyncGeneratorChangesToDisk(payload.generators) + await handleResult( + socket, + FLUSH_SYNC_GENERATOR_CHANGES_TO_DISK, + () => handleFlushSyncGeneratorChangesToDisk(payload.generators), + mode ); } else if (isHandleGetRegisteredSyncGeneratorsMessage(payload)) { - await handleResult(socket, GET_REGISTERED_SYNC_GENERATORS, () => - handleGetRegisteredSyncGenerators() + await handleResult( + socket, + GET_REGISTERED_SYNC_GENERATORS, + () => handleGetRegisteredSyncGenerators(), + mode ); } else if (isHandleUpdateWorkspaceContextMessage(payload)) { - await handleResult(socket, UPDATE_WORKSPACE_CONTEXT, () => - handleUpdateWorkspaceContext( - payload.createdFiles, - payload.updatedFiles, - payload.deletedFiles - ) + await handleResult( + socket, + UPDATE_WORKSPACE_CONTEXT, + () => + handleUpdateWorkspaceContext( + payload.createdFiles, + payload.updatedFiles, + payload.deletedFiles + ), + mode ); } else { await respondWithErrorAndExit( @@ -293,7 +367,8 @@ async function handleMessage(socket, data: string) { export async function handleResult( socket: Socket, type: string, - hrFn: () => Promise + hrFn: () => Promise, + mode: 'json' | 'v8' ) { const startMark = new Date(); const hr = await hrFn(); @@ -301,11 +376,15 @@ export async function handleResult( if (hr.error) { await respondWithErrorAndExit(socket, hr.description, hr.error); } else { - await respondToClient(socket, hr.response, hr.description); + const response = + typeof hr.response === 'string' + ? hr.response + : serializeUnserializedResult(hr.response, mode); + await respondToClient(socket, response, hr.description); } const endMark = new Date(); serverLogger.log( - `Handled ${type}. Handling time: ${ + `Handled ${mode} message ${type}. Handling time: ${ doneHandlingMark.getTime() - startMark.getTime() }. Response time: ${endMark.getTime() - doneHandlingMark.getTime()}.` ); @@ -566,3 +645,13 @@ export async function startServer(): Promise { } }); } +function serializeUnserializedResult( + response: boolean | object, + mode: 'json' | 'v8' +) { + if (mode === 'json') { + return JSON.stringify(response); + } else { + return serialize(response).toString('binary'); + } +} diff --git a/packages/nx/src/daemon/server/shutdown-utils.ts b/packages/nx/src/daemon/server/shutdown-utils.ts index d49e74be69875..50a0dfe3b8da6 100644 --- a/packages/nx/src/daemon/server/shutdown-utils.ts +++ b/packages/nx/src/daemon/server/shutdown-utils.ts @@ -10,6 +10,7 @@ import { } from '../../project-graph/error-types'; import { removeDbConnections } from '../../utils/db-connection'; import { cleanupPlugins } from '../../project-graph/plugins/get-plugins'; +import { MESSAGE_END_SEQ } from '../../utils/consume-messages-from-socket'; export const SERVER_INACTIVITY_TIMEOUT_MS = 10800000 as const; // 10800000 ms = 3 hours @@ -98,7 +99,7 @@ export function respondToClient( if (description) { serverLogger.requestLog(`Responding to the client.`, description); } - socket.write(`${response}${String.fromCodePoint(4)}`, (err) => { + socket.write(response + MESSAGE_END_SEQ, (err) => { if (err) { console.error(err); } diff --git a/packages/nx/src/project-graph/plugins/isolation/messaging.ts b/packages/nx/src/project-graph/plugins/isolation/messaging.ts index 109439b365e5d..086ee06164fb8 100644 --- a/packages/nx/src/project-graph/plugins/isolation/messaging.ts +++ b/packages/nx/src/project-graph/plugins/isolation/messaging.ts @@ -8,9 +8,10 @@ import { CreateMetadataContext, CreateNodesContextV2, } from '../public-api'; -import { LoadedNxPlugin } from '../internal-api'; -import { Serializable } from 'child_process'; -import { Socket } from 'net'; +import type { LoadedNxPlugin } from '../internal-api'; +import type { Serializable } from 'child_process'; +import type { Socket } from 'net'; +import { MESSAGE_END_SEQ } from '../../../utils/consume-messages-from-socket'; export interface PluginWorkerLoadMessage { type: 'load'; @@ -219,5 +220,5 @@ export function sendMessageOverSocket( socket: Socket, message: PluginWorkerMessage | PluginWorkerResult ) { - socket.write(JSON.stringify(message) + String.fromCodePoint(4)); + socket.write(JSON.stringify(message) + MESSAGE_END_SEQ); } diff --git a/packages/nx/src/project-graph/plugins/isolation/plugin-pool.ts b/packages/nx/src/project-graph/plugins/isolation/plugin-pool.ts index b6bb218bca8ea..b8fd90e1098d9 100644 --- a/packages/nx/src/project-graph/plugins/isolation/plugin-pool.ts +++ b/packages/nx/src/project-graph/plugins/isolation/plugin-pool.ts @@ -230,7 +230,7 @@ function createWorkerHandler( } }, createDependenciesResult: ({ tx, ...result }) => { - const { resolver, rejector } = pending.get(tx); + const { resolver, rejector } = getPendingPromise(tx, pending); if (result.success) { resolver(result.dependencies); } else if (result.success === false) { @@ -238,7 +238,7 @@ function createWorkerHandler( } }, createNodesResult: ({ tx, ...result }) => { - const { resolver, rejector } = pending.get(tx); + const { resolver, rejector } = getPendingPromise(tx, pending); if (result.success) { resolver(result.result); } else if (result.success === false) { @@ -254,7 +254,7 @@ function createWorkerHandler( } }, createMetadataResult: ({ tx, ...result }) => { - const { resolver, rejector } = pending.get(tx); + const { resolver, rejector } = getPendingPromise(tx, pending); if (result.success) { resolver(result.metadata); } else if (result.success === false) { @@ -282,6 +282,23 @@ function createWorkerExitHandler( }; } +function getPendingPromise(tx: string, pending: Map) { + const pendingPromise = pending.get(tx); + + if (!pendingPromise) { + throw new Error( + `No pending promise found for transaction "${tx}". This may indicate a bug in the plugin pool. Currently pending promises:` + + Object.keys(pending).map((t) => ` - ${t}`) + ); + } + + const { rejector, resolver } = pendingPromise; + return { + rejector, + resolver, + }; +} + function registerPendingPromise( tx: string, pending: Map, diff --git a/packages/nx/src/tasks-runner/pseudo-ipc.ts b/packages/nx/src/tasks-runner/pseudo-ipc.ts index 1a251ea24e62a..7c5d0f0b28f78 100644 --- a/packages/nx/src/tasks-runner/pseudo-ipc.ts +++ b/packages/nx/src/tasks-runner/pseudo-ipc.ts @@ -18,7 +18,10 @@ */ import { connect, Server, Socket } from 'net'; -import { consumeMessagesFromSocket } from '../utils/consume-messages-from-socket'; +import { + consumeMessagesFromSocket, + MESSAGE_END_SEQ, +} from '../utils/consume-messages-from-socket'; import { Serializable } from 'child_process'; export interface PseudoIPCMessage { @@ -98,7 +101,7 @@ export class PseudoIPCServer { JSON.stringify({ type: 'TO_CHILDREN_FROM_PARENT', message }) ); // send EOT to indicate that the message has been fully written - socket.write(String.fromCodePoint(4)); + socket.write(MESSAGE_END_SEQ); }); } @@ -107,7 +110,7 @@ export class PseudoIPCServer { socket.write( JSON.stringify({ type: 'TO_CHILDREN_FROM_PARENT', id, message }) ); - socket.write(String.fromCodePoint(4)); + socket.write(MESSAGE_END_SEQ); }); } @@ -139,7 +142,7 @@ export class PseudoIPCClient { JSON.stringify({ type: 'TO_PARENT_FROM_CHILDREN', message }) ); // send EOT to indicate that the message has been fully written - this.socket.write(String.fromCodePoint(4)); + this.socket.write(MESSAGE_END_SEQ); } notifyChildIsReady(id: string) { @@ -150,7 +153,7 @@ export class PseudoIPCClient { } as PseudoIPCMessage) ); // send EOT to indicate that the message has been fully written - this.socket.write(String.fromCodePoint(4)); + this.socket.write(MESSAGE_END_SEQ); } onMessageFromParent( diff --git a/packages/nx/src/utils/consume-messages-from-socket.spec.ts b/packages/nx/src/utils/consume-messages-from-socket.spec.ts index 9188b7fd6dc07..ff05139097182 100644 --- a/packages/nx/src/utils/consume-messages-from-socket.spec.ts +++ b/packages/nx/src/utils/consume-messages-from-socket.spec.ts @@ -1,4 +1,7 @@ -import { consumeMessagesFromSocket } from './consume-messages-from-socket'; +import { + consumeMessagesFromSocket, + MESSAGE_END_SEQ, +} from './consume-messages-from-socket'; describe('consumeMessagesFromSocket', () => { it('should handle messages where every messages is in its own chunk', () => { @@ -6,8 +9,8 @@ describe('consumeMessagesFromSocket', () => { const r = consumeMessagesFromSocket((message) => messages.push(JSON.parse(message)) ); - r(JSON.stringify({ one: 1 }) + String.fromCodePoint(4)); - r(JSON.stringify({ two: 2 }) + String.fromCodePoint(4)); + r(JSON.stringify({ one: 1 }) + MESSAGE_END_SEQ); + r(JSON.stringify({ two: 2 }) + MESSAGE_END_SEQ); expect(messages).toEqual([{ one: 1 }, { two: 2 }]); }); @@ -18,29 +21,29 @@ describe('consumeMessagesFromSocket', () => { ); const message = JSON.stringify({ one: 1 }); r(message.substring(0, 3)); - r(message.substring(3) + String.fromCodePoint(4)); + r(message.substring(3) + MESSAGE_END_SEQ); expect(messages).toEqual([{ one: 1 }]); }); - // it('should handle messages where multiple messages are in the same chunk', () => { - // const messages = [] as any[]; - // const r = consumeMessagesFromSocket((message) => - // messages.push(JSON.parse(message)) - // ); - // const message1 = JSON.stringify({ one: 1 }); - // const message2 = JSON.stringify({ two: 2 }); - // const message3 = JSON.stringify({ three: 3 }); - // - // r(message1.substring(0, 3)); - // r( - // message1.substring(3) + - // String.fromCodePoint(4) + - // message2 + - // String.fromCodePoint(4) + - // message3.substring(0, 3) - // ); - // r(message3.substring(3) + String.fromCodePoint(4)); - // - // expect(messages).toEqual([{ one: 1 }, { two: 2 }, { three: 3 }]); - // }); + it('should handle messages where multiple messages are in the same chunk', () => { + const messages = [] as any[]; + const r = consumeMessagesFromSocket((message) => + messages.push(JSON.parse(message)) + ); + const message1 = JSON.stringify({ one: 1 }); + const message2 = JSON.stringify({ two: 2 }); + const message3 = JSON.stringify({ three: 3 }); + + r(message1.substring(0, 3)); + r( + message1.substring(3) + + MESSAGE_END_SEQ + + message2 + + MESSAGE_END_SEQ + + message3.substring(0, 3) + ); + r(message3.substring(3) + MESSAGE_END_SEQ); + + expect(messages).toEqual([{ one: 1 }, { two: 2 }, { three: 3 }]); + }); }); diff --git a/packages/nx/src/utils/consume-messages-from-socket.ts b/packages/nx/src/utils/consume-messages-from-socket.ts index 1abb4ee014da7..d88be3984b9fe 100644 --- a/packages/nx/src/utils/consume-messages-from-socket.ts +++ b/packages/nx/src/utils/consume-messages-from-socket.ts @@ -1,12 +1,14 @@ +export const MESSAGE_END_SEQ = 'NX_MSG_END' + String.fromCharCode(4); + export function consumeMessagesFromSocket(callback: (message: string) => void) { let message = ''; return (data) => { const chunk = data.toString(); - if (chunk.codePointAt(chunk.length - 1) === 4) { - message += chunk.substring(0, chunk.length - 1); + if (chunk.endsWith(MESSAGE_END_SEQ)) { + message += chunk.substring(0, chunk.length - MESSAGE_END_SEQ.length); // Server may send multiple messages in one chunk, so splitting by 0x4 - const messages = message.split(''); + const messages = message.split(MESSAGE_END_SEQ); for (const splitMessage of messages) { callback(splitMessage); } @@ -17,3 +19,17 @@ export function consumeMessagesFromSocket(callback: (message: string) => void) { } }; } + +export function isJsonMessage(message: string): boolean { + return ( + // json objects + ['[', '{'].some((prefix) => message.startsWith(prefix)) || + // booleans + message === 'true' || + message === 'false' || + // strings + (message.startsWith('"') && message.endsWith('"')) || + // numbers + /^[0-9]+(.?[0-9]+)?$/.test(message) + ); +}