Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions e2e/nx/src/watch.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,33 @@ describe('Nx Watch', () => {

expect(results).toEqual([proj1, proj3]);
}, 50000);

it('should reconnect after daemon restart', async () => {
const getOutput = await runWatchWithReconnect(
`--projects=${proj1} -- echo \\$NX_PROJECT_NAME`
);

// Write file before daemon restart
await writeFileForWatcher(`libs/${proj1}/before-restart.txt`, 'content');
await wait(1000);

// Kill the daemon
runCLI('daemon --stop', {
env: {
NX_DAEMON: 'true',
NX_PROJECT_GRAPH_CACHE_DIRECTORY: cacheDirectory,
},
});

// Wait for reconnection to happen (exponential backoff)
await wait(3000);

// Write file after daemon restart - watch should reconnect and receive this
await writeFileForWatcher(`libs/${proj1}/after-restart.txt`, 'content');

const output = await getOutput();
expect(output).toContain(proj1);
}, 60000);
});

async function wait(timeout = 200) {
Expand Down Expand Up @@ -197,3 +224,46 @@ async function runWatch(command: string) {
});
});
}

async function runWatchWithReconnect(command: string) {
const runCommand = `npx -c 'nx watch --verbose ${command}'`;
isVerboseE2ERun() && console.log(runCommand);
return new Promise<(timeout?: number) => Promise<string[]>>((resolve) => {
const p = spawn(runCommand, {
cwd: tmpProjPath(),
env: {
CI: 'true',
...getStrippedEnvironmentVariables(),
FORCE_COLOR: 'false',
NX_DAEMON: 'true',
NX_PROJECT_GRAPH_CACHE_DIRECTORY: cacheDirectory,
},
shell: true,
stdio: 'pipe',
});

let output = '';
let resolved = false;
p.stdout?.on('data', (data) => {
output += data;
const s = data.toString().trim();
isVerboseE2ERun() && console.log(s);
// Resolve once we see the watch is ready, but don't kill the process yet
if (s.includes('watch process waiting') && !resolved) {
resolved = true;
resolve(async (timeout = 8000) => {
await wait(timeout);
p.kill();
return output
.split('\n')
.filter((line) => line.length > 0 && !line.includes('NX'));
});
}
});

p.stderr?.on('data', (data) => {
const s = data.toString().trim();
isVerboseE2ERun() && console.log('stderr:', s);
});
});
}
12 changes: 10 additions & 2 deletions packages/js/src/executors/node/node.impl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -324,8 +324,16 @@ export async function* nodeExecutor(
includeDependentProjects: true,
},
async (err, data) => {
if (err === 'closed') {
logger.error(`Watch error: Daemon closed the connection`);
if (err === 'reconnecting') {
// Silent - daemon restarts automatically on lockfile changes
return;
} else if (err === 'reconnected') {
// Silent - reconnection succeeded
return;
} else if (err === 'closed') {
logger.error(
`Failed to reconnect to daemon after multiple attempts`
);
process.exit(1);
} else if (err) {
logger.error(`Watch error: ${err?.message ?? 'Unknown'}`);
Expand Down
20 changes: 16 additions & 4 deletions packages/js/src/executors/tsc/lib/batch/watch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,14 @@ export async function watchTaskProjectsPackageJsonFileChanges(
const unregisterFileWatcher = await daemonClient.registerFileWatcher(
{ watchProjects: projects },
(err, data) => {
if (err === 'closed') {
logger.error(`Watch error: Daemon closed the connection`);
if (err === 'reconnecting') {
// Silent - daemon restarts automatically on lockfile changes
return;
} else if (err === 'reconnected') {
// Silent - reconnection succeeded
return;
} else if (err === 'closed') {
logger.error(`Failed to reconnect to daemon after multiple attempts`);
process.exit(1);
} else if (err) {
logger.error(`Watch error: ${err?.message ?? 'Unknown'}`);
Expand Down Expand Up @@ -50,8 +56,14 @@ export async function watchTaskProjectsFileChangesForAssets(
includeGlobalWorkspaceFiles: true,
},
(err, data) => {
if (err === 'closed') {
logger.error(`Watch error: Daemon closed the connection`);
if (err === 'reconnecting') {
// Silent - daemon restarts automatically on lockfile changes
return;
} else if (err === 'reconnected') {
// Silent - reconnection succeeded
return;
} else if (err === 'closed') {
logger.error(`Failed to reconnect to daemon after multiple attempts`);
process.exit(1);
} else if (err) {
logger.error(`Watch error: ${err?.message ?? 'Unknown'}`);
Expand Down
10 changes: 8 additions & 2 deletions packages/js/src/utils/assets/copy-assets-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,14 @@ export class CopyAssetsHandler {
includeGlobalWorkspaceFiles: true,
},
(err, data) => {
if (err === 'closed') {
logger.error(`Watch error: Daemon closed the connection`);
if (err === 'reconnecting') {
// Silent - daemon restarts automatically on lockfile changes
return;
} else if (err === 'reconnected') {
// Silent - reconnection succeeded
return;
} else if (err === 'closed') {
logger.error(`Failed to reconnect to daemon after multiple attempts`);
process.exit(1);
} else if (err) {
logger.error(`Watch error: ${err?.message ?? 'Unknown'}`);
Expand Down
10 changes: 8 additions & 2 deletions packages/js/src/utils/watch-for-single-file-changes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,14 @@ export async function watchForSingleFileChanges(
const unregisterFileWatcher = await daemonClient.registerFileWatcher(
{ watchProjects: [projectName] },
(err, data) => {
if (err === 'closed') {
logger.error(`Watch error: Daemon closed the connection`);
if (err === 'reconnecting') {
// Silent - daemon restarts automatically on lockfile changes
return;
} else if (err === 'reconnected') {
// Silent - reconnection succeeded
return;
} else if (err === 'closed') {
logger.error(`Failed to reconnect to daemon after multiple attempts`);
process.exit(1);
} else if (err) {
logger.error(`Watch error: ${err?.message ?? 'Unknown'}`);
Expand Down
18 changes: 16 additions & 2 deletions packages/nx/src/command-line/graph/graph.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
statSync,
writeFileSync,
} from 'node:fs';
import { VersionMismatchError } from '../../daemon/client/daemon-socket-messenger';
import * as http from 'node:http';
import { minimatch } from 'minimatch';
import { URL } from 'node:url';
Expand Down Expand Up @@ -822,8 +823,21 @@ function createFileWatcher() {
allowPartialGraph: true,
},
debounce(async (error, changes) => {
if (error === 'closed') {
output.error({ title: `Watch error: Daemon closed the connection` });
if (error === 'reconnecting') {
output.note({ title: 'Daemon restarting, reconnecting...' });
return;
} else if (error === 'reconnected') {
output.note({ title: 'Reconnected to daemon' });
return;
} else if (error === 'closed') {
output.error({
title: `Failed to reconnect to daemon after multiple attempts`,
});
process.exit(1);
} else if (error instanceof VersionMismatchError) {
output.error({
title: 'Nx version changed. Please restart your command.',
});
process.exit(1);
} else if (error) {
output.error({ title: `Watch error: ${error?.message ?? 'Unknown'}` });
Expand Down
26 changes: 20 additions & 6 deletions packages/nx/src/command-line/watch/watch.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { spawn } from 'child_process';
import { ChangedFile, daemonClient } from '../../daemon/client/client';
import { VersionMismatchError } from '../../daemon/client/daemon-socket-messenger';
import { output } from '../../utils/output';

export interface WatchArguments {
Expand Down Expand Up @@ -210,13 +211,21 @@ export async function watch(args: WatchArguments) {
includeGlobalWorkspaceFiles: args.includeGlobalWorkspaceFiles,
},
async (err, data) => {
if (err === 'closed') {
if (err === 'reconnecting') {
// Silent - daemon restarts automatically on lockfile changes
return;
} else if (err === 'reconnected') {
// Silent - reconnection succeeded
return;
} else if (err === 'closed') {
output.error({
title: 'Watch connection closed',
bodyLines: [
'The daemon has closed the connection to this watch process.',
'Please restart your watch command.',
],
title: 'Failed to reconnect to daemon after multiple attempts',
bodyLines: ['Please restart your watch command.'],
});
process.exit(1);
} else if (err instanceof VersionMismatchError) {
output.error({
title: 'Nx version changed. Please restart your command.',
});
process.exit(1);
} else if (err !== null) {
Expand All @@ -238,4 +247,9 @@ export async function watch(args: WatchArguments) {
}
);
args.verbose && output.logSingleLine('watch process waiting...');

// Keep the process alive while watching for file changes
// The file watcher callbacks will handle incoming events
// The process will exit when Ctrl+C is pressed or if the connection closes
await new Promise(() => {});
}
14 changes: 11 additions & 3 deletions packages/nx/src/daemon/cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import { join } from 'path';
import { DAEMON_DIR_FOR_CURRENT_WORKSPACE } from './tmp-dir';
import { readJsonFile, writeJsonFileAsync } from '../utils/fileutils';
import { nxVersion } from '../utils/versions';
import { clientLogger } from './logger';
import { VersionMismatchError } from './client/daemon-socket-messenger';

export interface DaemonProcessJson {
processId: number;
Expand All @@ -18,12 +20,18 @@ export const serverProcessJsonPath = join(
export function readDaemonProcessJsonCache(): DaemonProcessJson | null {
try {
const daemonJson = readJsonFile(serverProcessJsonPath);
// If the daemon version doesn't match the client version, treat it as stale
// If the daemon version doesn't match the client version, throw error
if (daemonJson.nxVersion !== nxVersion) {
return null;
clientLogger.log(
`[Cache] Version mismatch: daemon=${daemonJson.nxVersion}, client=${nxVersion}`
);
throw new VersionMismatchError();
}
return daemonJson;
} catch {
} catch (e) {
if (e instanceof VersionMismatchError) {
throw e; // Let version mismatch bubble up
}
return null;
}
}
Expand Down
Loading
Loading