Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions src/lib/libpthread.js
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,28 @@ var LibraryPThread = {
}
},
initMainThread() {
#if PTHREAD_MANAGER
if ({{{ ENVIRONMENT_IS_MAIN_THREAD() }}}) {
#if ASSERTIONS
dbg('PThread: initializing manager worker');
#endif
PThread.allocateUnusedWorker();
PThread.managerWorker = PThread.unusedWorkers.pop();
addOnPreRun(async () => {
var managerReady = PThread.loadWasmModuleToWorker(PThread.managerWorker);
addRunDependency('manager-worker');
await managerReady;
PThread.managerWorker.postMessage({ cmd: 'makeManager' });
removeRunDependency('manager-worker');
});
}
#endif
#if PTHREAD_POOL_SIZE
#if PTHREAD_MANAGER
var pthreadPoolSize = 0;
#else
var pthreadPoolSize = {{{ PTHREAD_POOL_SIZE }}};
#endif
// Start loading up the Worker pool, if requested.
while (pthreadPoolSize--) {
PThread.allocateUnusedWorker();
Expand Down Expand Up @@ -284,6 +304,11 @@ var LibraryPThread = {

if (cmd === 'checkMailbox') {
checkMailbox();
} else if (cmd === 'relay') {
var targetWorker = PThread.pthreads[d.thread];
if (targetWorker && targetWorker.onmessage) {
targetWorker.onmessage({ data: d.data });
}
} else if (cmd === 'spawnThread') {
spawnThread(d);
} else if (cmd === 'cleanupThread') {
Expand Down Expand Up @@ -679,6 +704,23 @@ var LibraryPThread = {
assert(threadParams.pthread_ptr, 'spawnThread called with null pthread ptr');
#endif

#if PTHREAD_MANAGER
if ({{{ ENVIRONMENT_IS_MAIN_THREAD() }}}) {
var workerStub = {
pthread_ptr: threadParams.pthread_ptr,
postMessage: (msg, transfer) => {
PThread.managerWorker.postMessage({ cmd: 'relay', thread: threadParams.pthread_ptr, data: msg }, transfer);
},
terminate: () => {
PThread.managerWorker.postMessage({ cmd: 'terminate', thread: threadParams.pthread_ptr });
}
};
PThread.pthreads[threadParams.pthread_ptr] = workerStub;
PThread.managerWorker.postMessage({ cmd: 'spawnThread', threadParams: threadParams }, threadParams.transferList);
return 0;
}
#endif

var worker = PThread.getNewWorker();
if (!worker) {
// No available workers in the PThread pool.
Expand Down
7 changes: 5 additions & 2 deletions src/parseTools.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -1121,7 +1121,10 @@ function ENVIRONMENT_IS_MAIN_THREAD() {
function ENVIRONMENT_IS_WORKER_THREAD() {
assert(PTHREADS || WASM_WORKERS);
var envs = [];
if (PTHREADS) envs.push('ENVIRONMENT_IS_PTHREAD');
if (PTHREADS) {
envs.push('ENVIRONMENT_IS_PTHREAD');
envs.push('ENVIRONMENT_IS_PTHREAD_MANAGER');
}
if (WASM_WORKERS) envs.push('ENVIRONMENT_IS_WASM_WORKER');
return '(' + envs.join('||') + ')';
}
Expand Down Expand Up @@ -1169,7 +1172,7 @@ function wasmWorkerDetection() {

function pthreadDetection() {
if (ASSERTIONS) {
return "globalThis.name?.startsWith('em-pthread')";
return "globalThis.name?.startsWith('em-pthread') && globalThis.name != 'em-pthread-manager'";
} else {
return "globalThis.name == 'em-pthread'";
}
Expand Down
71 changes: 62 additions & 9 deletions src/runtime_pthread.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ var sharedModules = {};

var startWorker;

if (ENVIRONMENT_IS_PTHREAD) {
if (ENVIRONMENT_IS_PTHREAD || ENVIRONMENT_IS_PTHREAD_MANAGER) {
// Thread-local guard variable for one-time init of the JS state
var initializedJS = false;

Expand All @@ -42,11 +42,53 @@ if (ENVIRONMENT_IS_PTHREAD) {
// notified about them.
self.onunhandledrejection = (e) => { throw e.reason || e; };

function handleManagerMessage(e) {
var d = e.data;
var cmd = d.cmd;
if (cmd === 'spawnThread') {
var threadParams = d.threadParams;
var worker = PThread.getNewWorker();
PThread.runningWorkers.push(worker);
PThread.pthreads[threadParams.pthread_ptr] = worker;
worker.pthread_ptr = threadParams.pthread_ptr;
worker.onmessage = (e) => {
postMessage({ cmd: 'relay', thread: worker.pthread_ptr, data: e.data });
};
var msg = {
cmd: 'run',
start_routine: threadParams.startRoutine,
arg: threadParams.arg,
pthread_ptr: threadParams.pthread_ptr,
};
worker.postMessage(msg, threadParams.transferList);
} else if (cmd === 'relay') {
var worker = PThread.pthreads[d.thread];
if (worker) {
worker.postMessage(d.data, d.transferList);
}
} else if (cmd === 'terminate') {
var worker = PThread.pthreads[d.thread];
if (worker) {
worker.terminate();
delete PThread.pthreads[d.thread];
}
} else if (cmd === 'checkMailbox') {
if (initializedJS) {
checkMailbox();
}
}
}

{{{ asyncIf(ASYNCIFY == 2) }}}function handleMessage(e) {
try {
var msgData = e['data'];
//dbg('msgData: ' + Object.keys(msgData));
var cmd = msgData.cmd;
if (cmd === 'makeManager') {
ENVIRONMENT_IS_PTHREAD = false;
ENVIRONMENT_IS_PTHREAD_MANAGER = true;
self.onmessage = handleManagerMessage;
return;
}
if (cmd === 'load') { // Preload command that is called once per worker to parse and load the Emscripten code.
#if ASSERTIONS
workerID = msgData.workerID;
Expand All @@ -63,12 +105,23 @@ if (ENVIRONMENT_IS_PTHREAD) {
startWorker = () => {
// Notify the main thread that this thread has loaded.
postMessage({ cmd: 'loaded' });
// Process any messages that were queued before the thread was ready.
for (let msg of messageQueue) {
handleMessage(msg);

if (ENVIRONMENT_IS_PTHREAD_MANAGER) {
initializedJS = true;
// Process any messages that were queued before the thread was ready.
for (let msg of messageQueue) {
handleManagerMessage(msg);
}
// Restore the real message handler.
self.onmessage = handleManagerMessage;
} else {
// Process any messages that were queued before the thread was ready.
for (let msg of messageQueue) {
handleMessage(msg);
}
// Restore the real message handler.
self.onmessage = handleMessage;
}
// Restore the real message handler.
self.onmessage = handleMessage;
};

#if MAIN_MODULE
Expand Down Expand Up @@ -192,11 +245,11 @@ if (ENVIRONMENT_IS_PTHREAD) {
err(`worker: onmessage() captured an uncaught exception: ${ex}`);
if (ex?.stack) err(ex.stack);
#endif
__emscripten_thread_crashed();
if (typeof __emscripten_thread_crashed !== 'undefined') __emscripten_thread_crashed();
throw ex;
}
};

self.onmessage = handleMessage;

} // ENVIRONMENT_IS_PTHREAD
} // ENVIRONMENT_IS_PTHREAD || ENVIRONMENT_IS_PTHREAD_MANAGER
5 changes: 5 additions & 0 deletions src/settings.js
Original file line number Diff line number Diff line change
Expand Up @@ -1617,6 +1617,11 @@ var WEBAUDIO_DEBUG = 0;
// repeatedly yield back to the JS event loop in order for the thread to
// actually start.
// If your application needs to be able to synchronously create new threads,
// If true, a dedicated worker is used to manage pthread lifecycles.
// This allows synchronous thread creation even when the main thread is
// blocked.
var PTHREAD_MANAGER = false;

// you can pre-create a pthread pool by specifying -sPTHREAD_POOL_SIZE=x,
// in which case the specified number of Workers will be preloaded into a pool
// before the application starts, and that many threads can then be available
Expand Down
3 changes: 3 additions & 0 deletions src/shell.js
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ var ENVIRONMENT_IS_SHELL = !ENVIRONMENT_IS_WEB && !ENVIRONMENT_IS_NODE && !ENVIR
// The way we signal to a worker that it is hosting a pthread is to construct
// it with a specific name.
var ENVIRONMENT_IS_PTHREAD = ENVIRONMENT_IS_WORKER && {{{ pthreadDetection() }}}
#if PTHREADS
var ENVIRONMENT_IS_PTHREAD_MANAGER = ENVIRONMENT_IS_WORKER && globalThis.name == 'em-pthread-manager';
#endif

#if MODULARIZE && ASSERTIONS
if (ENVIRONMENT_IS_PTHREAD) {
Expand Down
Loading