From 07de7587342168fbbabb44ac4358055b07b83b24 Mon Sep 17 00:00:00 2001 From: Sam Clegg Date: Mon, 18 May 2026 20:09:25 -0700 Subject: [PATCH] Implement Pthread Manager Worker for synchronous thread creation This change introduces a dedicated 'Pthread Manager' worker that acts as an intermediary for managing the lifecycle of pthread workers. By moving the responsibility of spawning and managing workers to a dedicated manager worker, we enable synchronous pthread creation on the main browser thread even when it is blocked (e.g. during a join or futex wait). This is because the manager worker and its nested child workers can be started by the browser independently of the main thread's event loop. Key changes: - Add PTHREAD_MANAGER setting to enable this new mode. - Update shell.js and parseTools.mjs to detect and support the manager environment. - Update libpthread.js to spawn the manager worker and proxy thread creation requests to it. - Update runtime_pthread.js with logic to handle the manager worker's responsibilities: spawning, terminating, and relaying messages for child pthreads. - Ensure the manager worker is fully initialized before the main application starts using addRunDependency. Fixes: #18633 --- src/lib/libpthread.js | 42 +++++++++++++++++++++++++ src/parseTools.mjs | 7 +++-- src/runtime_pthread.js | 71 ++++++++++++++++++++++++++++++++++++------ src/settings.js | 5 +++ src/shell.js | 3 ++ 5 files changed, 117 insertions(+), 11 deletions(-) diff --git a/src/lib/libpthread.js b/src/lib/libpthread.js index 37e82174990a8..b4aa296537891 100644 --- a/src/lib/libpthread.js +++ b/src/lib/libpthread.js @@ -113,8 +113,28 @@ var LibraryPThread = { } }, initMainThread() { +#if PTHREAD_MANAGER + if ({{{ ENVIRONMENT_IS_MAIN_THREAD() }}}) { +#if ASSERTIONS + dbg('PThread: initializing manager worker'); +#endif + PThread.allocateUnusedWorker(); + PThread.managerWorker = PThread.unusedWorkers.pop(); + addOnPreRun(async () => { + var managerReady = PThread.loadWasmModuleToWorker(PThread.managerWorker); + addRunDependency('manager-worker'); + await managerReady; + PThread.managerWorker.postMessage({ cmd: 'makeManager' }); + removeRunDependency('manager-worker'); + }); + } +#endif #if PTHREAD_POOL_SIZE +#if PTHREAD_MANAGER + var pthreadPoolSize = 0; +#else var pthreadPoolSize = {{{ PTHREAD_POOL_SIZE }}}; +#endif // Start loading up the Worker pool, if requested. while (pthreadPoolSize--) { PThread.allocateUnusedWorker(); @@ -284,6 +304,11 @@ var LibraryPThread = { if (cmd === 'checkMailbox') { checkMailbox(); + } else if (cmd === 'relay') { + var targetWorker = PThread.pthreads[d.thread]; + if (targetWorker && targetWorker.onmessage) { + targetWorker.onmessage({ data: d.data }); + } } else if (cmd === 'spawnThread') { spawnThread(d); } else if (cmd === 'cleanupThread') { @@ -679,6 +704,23 @@ var LibraryPThread = { assert(threadParams.pthread_ptr, 'spawnThread called with null pthread ptr'); #endif +#if PTHREAD_MANAGER + if ({{{ ENVIRONMENT_IS_MAIN_THREAD() }}}) { + var workerStub = { + pthread_ptr: threadParams.pthread_ptr, + postMessage: (msg, transfer) => { + PThread.managerWorker.postMessage({ cmd: 'relay', thread: threadParams.pthread_ptr, data: msg }, transfer); + }, + terminate: () => { + PThread.managerWorker.postMessage({ cmd: 'terminate', thread: threadParams.pthread_ptr }); + } + }; + PThread.pthreads[threadParams.pthread_ptr] = workerStub; + PThread.managerWorker.postMessage({ cmd: 'spawnThread', threadParams: threadParams }, threadParams.transferList); + return 0; + } +#endif + var worker = PThread.getNewWorker(); if (!worker) { // No available workers in the PThread pool. diff --git a/src/parseTools.mjs b/src/parseTools.mjs index 54c33952b2b90..5203cfd229f31 100644 --- a/src/parseTools.mjs +++ b/src/parseTools.mjs @@ -1121,7 +1121,10 @@ function ENVIRONMENT_IS_MAIN_THREAD() { function ENVIRONMENT_IS_WORKER_THREAD() { assert(PTHREADS || WASM_WORKERS); var envs = []; - if (PTHREADS) envs.push('ENVIRONMENT_IS_PTHREAD'); + if (PTHREADS) { + envs.push('ENVIRONMENT_IS_PTHREAD'); + envs.push('ENVIRONMENT_IS_PTHREAD_MANAGER'); + } if (WASM_WORKERS) envs.push('ENVIRONMENT_IS_WASM_WORKER'); return '(' + envs.join('||') + ')'; } @@ -1169,7 +1172,7 @@ function wasmWorkerDetection() { function pthreadDetection() { if (ASSERTIONS) { - return "globalThis.name?.startsWith('em-pthread')"; + return "globalThis.name?.startsWith('em-pthread') && globalThis.name != 'em-pthread-manager'"; } else { return "globalThis.name == 'em-pthread'"; } diff --git a/src/runtime_pthread.js b/src/runtime_pthread.js index e2e5f2375cccb..1679cc1751b33 100644 --- a/src/runtime_pthread.js +++ b/src/runtime_pthread.js @@ -22,7 +22,7 @@ var sharedModules = {}; var startWorker; -if (ENVIRONMENT_IS_PTHREAD) { +if (ENVIRONMENT_IS_PTHREAD || ENVIRONMENT_IS_PTHREAD_MANAGER) { // Thread-local guard variable for one-time init of the JS state var initializedJS = false; @@ -42,11 +42,53 @@ if (ENVIRONMENT_IS_PTHREAD) { // notified about them. self.onunhandledrejection = (e) => { throw e.reason || e; }; + function handleManagerMessage(e) { + var d = e.data; + var cmd = d.cmd; + if (cmd === 'spawnThread') { + var threadParams = d.threadParams; + var worker = PThread.getNewWorker(); + PThread.runningWorkers.push(worker); + PThread.pthreads[threadParams.pthread_ptr] = worker; + worker.pthread_ptr = threadParams.pthread_ptr; + worker.onmessage = (e) => { + postMessage({ cmd: 'relay', thread: worker.pthread_ptr, data: e.data }); + }; + var msg = { + cmd: 'run', + start_routine: threadParams.startRoutine, + arg: threadParams.arg, + pthread_ptr: threadParams.pthread_ptr, + }; + worker.postMessage(msg, threadParams.transferList); + } else if (cmd === 'relay') { + var worker = PThread.pthreads[d.thread]; + if (worker) { + worker.postMessage(d.data, d.transferList); + } + } else if (cmd === 'terminate') { + var worker = PThread.pthreads[d.thread]; + if (worker) { + worker.terminate(); + delete PThread.pthreads[d.thread]; + } + } else if (cmd === 'checkMailbox') { + if (initializedJS) { + checkMailbox(); + } + } + } + {{{ asyncIf(ASYNCIFY == 2) }}}function handleMessage(e) { try { var msgData = e['data']; - //dbg('msgData: ' + Object.keys(msgData)); var cmd = msgData.cmd; + if (cmd === 'makeManager') { + ENVIRONMENT_IS_PTHREAD = false; + ENVIRONMENT_IS_PTHREAD_MANAGER = true; + self.onmessage = handleManagerMessage; + return; + } if (cmd === 'load') { // Preload command that is called once per worker to parse and load the Emscripten code. #if ASSERTIONS workerID = msgData.workerID; @@ -63,12 +105,23 @@ if (ENVIRONMENT_IS_PTHREAD) { startWorker = () => { // Notify the main thread that this thread has loaded. postMessage({ cmd: 'loaded' }); - // Process any messages that were queued before the thread was ready. - for (let msg of messageQueue) { - handleMessage(msg); + + if (ENVIRONMENT_IS_PTHREAD_MANAGER) { + initializedJS = true; + // Process any messages that were queued before the thread was ready. + for (let msg of messageQueue) { + handleManagerMessage(msg); + } + // Restore the real message handler. + self.onmessage = handleManagerMessage; + } else { + // Process any messages that were queued before the thread was ready. + for (let msg of messageQueue) { + handleMessage(msg); + } + // Restore the real message handler. + self.onmessage = handleMessage; } - // Restore the real message handler. - self.onmessage = handleMessage; }; #if MAIN_MODULE @@ -192,11 +245,11 @@ if (ENVIRONMENT_IS_PTHREAD) { err(`worker: onmessage() captured an uncaught exception: ${ex}`); if (ex?.stack) err(ex.stack); #endif - __emscripten_thread_crashed(); + if (typeof __emscripten_thread_crashed !== 'undefined') __emscripten_thread_crashed(); throw ex; } }; self.onmessage = handleMessage; -} // ENVIRONMENT_IS_PTHREAD +} // ENVIRONMENT_IS_PTHREAD || ENVIRONMENT_IS_PTHREAD_MANAGER diff --git a/src/settings.js b/src/settings.js index 80e2e72989245..85617b5c8af55 100644 --- a/src/settings.js +++ b/src/settings.js @@ -1617,6 +1617,11 @@ var WEBAUDIO_DEBUG = 0; // repeatedly yield back to the JS event loop in order for the thread to // actually start. // If your application needs to be able to synchronously create new threads, +// If true, a dedicated worker is used to manage pthread lifecycles. +// This allows synchronous thread creation even when the main thread is +// blocked. +var PTHREAD_MANAGER = false; + // you can pre-create a pthread pool by specifying -sPTHREAD_POOL_SIZE=x, // in which case the specified number of Workers will be preloaded into a pool // before the application starts, and that many threads can then be available diff --git a/src/shell.js b/src/shell.js index 1b31a3065acc4..7728174bbd977 100644 --- a/src/shell.js +++ b/src/shell.js @@ -97,6 +97,9 @@ var ENVIRONMENT_IS_SHELL = !ENVIRONMENT_IS_WEB && !ENVIRONMENT_IS_NODE && !ENVIR // The way we signal to a worker that it is hosting a pthread is to construct // it with a specific name. var ENVIRONMENT_IS_PTHREAD = ENVIRONMENT_IS_WORKER && {{{ pthreadDetection() }}} +#if PTHREADS +var ENVIRONMENT_IS_PTHREAD_MANAGER = ENVIRONMENT_IS_WORKER && globalThis.name == 'em-pthread-manager'; +#endif #if MODULARIZE && ASSERTIONS if (ENVIRONMENT_IS_PTHREAD) {