diff --git a/lib/internal/modules/cjs/loader.js b/lib/internal/modules/cjs/loader.js index 827655bedb65bf..d1ab0bf54d0a2b 100644 --- a/lib/internal/modules/cjs/loader.js +++ b/lib/internal/modules/cjs/loader.js @@ -329,6 +329,28 @@ function reportModuleNotFoundToWatchMode(basePath, extensions) { } } +/** + * Tell the watch mode that a module was required, from within a worker thread. + * @param {string} filename Absolute path of the module + * @returns {void} + */ +function reportModuleToWatchModeFromWorker(filename) { + if (!shouldReportRequiredModules()) { + return; + } + const { isMainThread } = internalBinding('worker'); + if (isMainThread) { + return; + } + // Lazy require to avoid circular dependency: worker_threads is loaded after + // the CJS loader is fully set up. + const { parentPort } = require('worker_threads'); + if (!parentPort) { + return; + } + parentPort.postMessage({ 'watch:require': [filename] }); +} + /** * Create a new module instance. * @param {string} id @@ -1245,6 +1267,7 @@ Module._load = function(request, parent, isMain, internalResolveOptions = kEmpty relResolveCacheIdentifier = `${parent.path}\x00${request}`; const filename = relativeResolveCache[relResolveCacheIdentifier]; reportModuleToWatchMode(filename); + reportModuleToWatchModeFromWorker(filename); if (filename !== undefined) { const cachedModule = Module._cache[filename]; if (cachedModule !== undefined) { @@ -1335,6 +1358,7 @@ Module._load = function(request, parent, isMain, internalResolveOptions = kEmpty } reportModuleToWatchMode(filename); + reportModuleToWatchModeFromWorker(filename); Module._cache[filename] = module; module[kIsCachedByESMLoader] = false; // If there are resolve hooks, carry the context information into the diff --git a/lib/internal/modules/esm/loader.js b/lib/internal/modules/esm/loader.js index 8ae6761fba571a..c016796accb1be 100644 --- a/lib/internal/modules/esm/loader.js +++ b/lib/internal/modules/esm/loader.js @@ -534,6 +534,16 @@ class ModuleLoader { const type = requestType === kRequireInImportedCJS ? 'require' : 'import'; process.send({ [`watch:${type}`]: [url] }); } + // Relay Events from worker to main thread + if (process.env.WATCH_REPORT_DEPENDENCIES && !process.send) { + const { isMainThread } = internalBinding('worker'); + if (!isMainThread) { + const { parentPort } = require('worker_threads'); + if (parentPort) { + parentPort.postMessage({ 'watch:import': [url] }); + } + } + } // TODO(joyeecheung): update the module requests to use importAttributes as property names. const importAttributes = resolveResult.importAttributes ?? request.attributes; diff --git a/lib/internal/worker.js b/lib/internal/worker.js index 2a4caed82cf7c5..f457f0ee30a7c3 100644 --- a/lib/internal/worker.js +++ b/lib/internal/worker.js @@ -1,6 +1,7 @@ 'use strict'; const { + ArrayIsArray, ArrayPrototypeForEach, ArrayPrototypeMap, ArrayPrototypePush, @@ -333,6 +334,18 @@ class Worker extends EventEmitter { this[kPublicPort].on(event, (message) => this.emit(event, message)); }); setupPortReferencing(this[kPublicPort], this, 'message'); + + // Relay events from worker thread to watcher + if (process.env.WATCH_REPORT_DEPENDENCIES && process.send) { + this[kPublicPort].on('message', (message) => { + if (ArrayIsArray(message?.['watch:require'])) { + process.send({ 'watch:require': message['watch:require'] }); + } + if (ArrayIsArray(message?.['watch:import'])) { + process.send({ 'watch:import': message['watch:import'] }); + } + }); + } this[kPort].postMessage({ argv, type: messageTypes.LOAD_SCRIPT, diff --git a/test/sequential/test-watch-mode-worker.mjs b/test/sequential/test-watch-mode-worker.mjs new file mode 100644 index 00000000000000..a198f00c7aa961 --- /dev/null +++ b/test/sequential/test-watch-mode-worker.mjs @@ -0,0 +1,281 @@ +import * as common from '../common/index.mjs'; +import tmpdir from '../common/tmpdir.js'; +import assert from 'node:assert'; +import path from 'node:path'; +import { execPath } from 'node:process'; +import { describe, it } from 'node:test'; +import { spawn } from 'node:child_process'; +import { writeFileSync, readFileSync } from 'node:fs'; +import { inspect } from 'node:util'; +import { pathToFileURL } from 'node:url'; +import { createInterface } from 'node:readline'; + +if (common.isIBMi) + common.skip('IBMi does not support `fs.watch()`'); + +function restart(file, content = readFileSync(file)) { + writeFileSync(file, content); + const timer = setInterval(() => writeFileSync(file, content), common.platformTimeout(2500)); + return () => clearInterval(timer); +} + +let tmpFiles = 0; +function createTmpFile(content = 'console.log(\'running\');', ext = '.js', basename = tmpdir.path) { + const file = path.join(basename, `${tmpFiles++}${ext}`); + writeFileSync(file, content); + return file; +} + +async function runWriteSucceed({ + file, + watchedFile, + watchFlag = '--watch', + args = [file], + completed = 'Completed running', + restarts = 2, + options = {}, + shouldFail = false, +}) { + args.unshift('--no-warnings'); + if (watchFlag !== null) args.unshift(watchFlag); + + const child = spawn(execPath, args, { encoding: 'utf8', stdio: 'pipe', ...options }); + + let completes = 0; + let cancelRestarts = () => {}; + let stderr = ''; + const stdout = []; + + child.stderr.on('data', (data) => { + stderr += data; + }); + + try { + for await (const data of createInterface({ input: child.stdout })) { + if (!data.startsWith('Waiting for graceful termination') && + !data.startsWith('Gracefully restarted')) { + stdout.push(data); + } + + if (data.startsWith(completed)) { + completes++; + + if (completes === restarts) break; + + if (completes === 1) { + cancelRestarts = restart(watchedFile); + } + } + + if (!shouldFail && data.startsWith('Failed running')) break; + } + } finally { + child.kill(); + cancelRestarts(); + } + + return { stdout, stderr, pid: child.pid }; +} + +tmpdir.refresh(); +const dir = tmpdir.path; + +describe('watch mode', { concurrency: !process.env.TEST_PARALLEL, timeout: 60_000 }, () => { + it('should watch changes to worker - cjs', async () => { + const worker = path.join(dir, 'worker.js'); + + writeFileSync(worker, ` +console.log('worker running'); +`); + + const file = createTmpFile(` +const { Worker } = require('node:worker_threads'); +const w = new Worker(${JSON.stringify(worker)}); +`, '.js', dir); + + const { stderr, stdout } = await runWriteSucceed({ + file, + watchedFile: worker, + }); + + assert.strictEqual(stderr, ''); + assert.deepStrictEqual(stdout, [ + 'worker running', + `Completed running ${inspect(file)}. Waiting for file changes before restarting...`, + `Restarting ${inspect(file)}`, + 'worker running', + `Completed running ${inspect(file)}. Waiting for file changes before restarting...`, + ]); + }); + + it('should watch changes to worker dependencies - cjs', async () => { + const dep = path.join(dir, 'dep.js'); + const worker = path.join(dir, 'worker.js'); + + writeFileSync(dep, ` +module.exports = 'dep v1'; +`); + + writeFileSync(worker, ` +const dep = require('./dep.js'); +console.log(dep); +`); + + const file = createTmpFile(` +const { Worker } = require('node:worker_threads'); +const w = new Worker(${JSON.stringify(worker)}); +`, '.js', dir); + + const { stderr, stdout } = await runWriteSucceed({ + file, + watchedFile: dep, + }); + + assert.strictEqual(stderr, ''); + assert.deepStrictEqual(stdout, [ + 'dep v1', + `Completed running ${inspect(file)}. Waiting for file changes before restarting...`, + `Restarting ${inspect(file)}`, + 'dep v1', + `Completed running ${inspect(file)}. Waiting for file changes before restarting...`, + ]); + }); + + it('should watch changes to nested worker dependencies - cjs', async () => { + const subDep = path.join(dir, 'sub-dep.js'); + const dep = path.join(dir, 'dep.js'); + const worker = path.join(dir, 'worker.js'); + + writeFileSync(subDep, ` +module.exports = 'sub-dep v1'; +`); + + writeFileSync(dep, ` +const subDep = require('./sub-dep.js'); +console.log(subDep); +module.exports = 'dep v1'; +`); + + writeFileSync(worker, ` +const dep = require('./dep.js'); +`); + + const file = createTmpFile(` +const { Worker } = require('node:worker_threads'); +const w = new Worker(${JSON.stringify(worker)}); +`, '.js', dir); + + const { stderr, stdout } = await runWriteSucceed({ + file, + watchedFile: subDep, + }); + + assert.strictEqual(stderr, ''); + assert.deepStrictEqual(stdout, [ + 'sub-dep v1', + `Completed running ${inspect(file)}. Waiting for file changes before restarting...`, + `Restarting ${inspect(file)}`, + 'sub-dep v1', + `Completed running ${inspect(file)}. Waiting for file changes before restarting...`, + ]); + }); + + it('should watch changes to worker - esm', async () => { + const worker = path.join(dir, 'worker.mjs'); + + writeFileSync(worker, ` +console.log('worker running'); +`); + + const file = createTmpFile(` +import { Worker } from 'node:worker_threads'; +new Worker(new URL(${JSON.stringify(pathToFileURL(worker))})); +`, '.mjs', dir); + + const { stderr, stdout } = await runWriteSucceed({ + file, + watchedFile: worker, + }); + + assert.strictEqual(stderr, ''); + assert.deepStrictEqual(stdout, [ + 'worker running', + `Completed running ${inspect(file)}. Waiting for file changes before restarting...`, + `Restarting ${inspect(file)}`, + 'worker running', + `Completed running ${inspect(file)}. Waiting for file changes before restarting...`, + ]); + }); + + it('should watch changes to worker dependencies - esm', async () => { + const dep = path.join(dir, 'dep.mjs'); + const worker = path.join(dir, 'worker.mjs'); + + writeFileSync(dep, ` +export default 'dep v1'; +`); + + writeFileSync(worker, ` +import dep from ${JSON.stringify(pathToFileURL(dep))}; +console.log(dep); +`); + + const file = createTmpFile(` +import { Worker } from 'node:worker_threads'; +new Worker(new URL(${JSON.stringify(pathToFileURL(worker))})); +`, '.mjs', dir); + + const { stderr, stdout } = await runWriteSucceed({ + file, + watchedFile: dep, + }); + + assert.strictEqual(stderr, ''); + assert.deepStrictEqual(stdout, [ + 'dep v1', + `Completed running ${inspect(file)}. Waiting for file changes before restarting...`, + `Restarting ${inspect(file)}`, + 'dep v1', + `Completed running ${inspect(file)}. Waiting for file changes before restarting...`, + ]); + }); + + it('should watch changes to nested worker dependencies - esm', async () => { + const subDep = path.join(dir, 'sub-dep.mjs'); + const dep = path.join(dir, 'dep.mjs'); + const worker = path.join(dir, 'worker.mjs'); + + writeFileSync(subDep, ` +export default 'sub-dep v1'; +`); + + writeFileSync(dep, ` +import subDep from ${JSON.stringify(pathToFileURL(subDep))}; +console.log(subDep); +export default 'dep v1'; +`); + + writeFileSync(worker, ` +import dep from ${JSON.stringify(pathToFileURL(dep))}; +`); + + const file = createTmpFile(` +import { Worker } from 'node:worker_threads'; +new Worker(new URL(${JSON.stringify(pathToFileURL(worker))})); +`, '.mjs', dir); + + const { stderr, stdout } = await runWriteSucceed({ + file, + watchedFile: subDep, + }); + + assert.strictEqual(stderr, ''); + assert.deepStrictEqual(stdout, [ + 'sub-dep v1', + `Completed running ${inspect(file)}. Waiting for file changes before restarting...`, + `Restarting ${inspect(file)}`, + 'sub-dep v1', + `Completed running ${inspect(file)}. Waiting for file changes before restarting...`, + ]); + }); +});