From 800599f4ce37f0a0eda45d172d1584a4dd62e0df Mon Sep 17 00:00:00 2001 From: Ben Hearsum Date: Thu, 12 Feb 2026 15:17:11 -0500 Subject: [PATCH 1/4] refactor: pass executor to _load_tasks_parallel This will make a subsequent commit that adds support for a ThreadPoolExecutor cleaner. --- src/taskgraph/generator.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/taskgraph/generator.py b/src/taskgraph/generator.py index a944a8b02..7707d4d8f 100644 --- a/src/taskgraph/generator.py +++ b/src/taskgraph/generator.py @@ -312,15 +312,13 @@ def _load_tasks_serial(self, kinds, kind_graph, parameters): return all_tasks - def _load_tasks_parallel(self, kinds, kind_graph, parameters): + def _load_tasks_parallel(self, kinds, kind_graph, parameters, executor): all_tasks = {} futures_to_kind = {} futures = set() edges = set(kind_graph.edges) - with ProcessPoolExecutor( - mp_context=multiprocessing.get_context("fork") - ) as executor: + with executor: def submit_ready_kinds(): """Create the next batch of tasks for kinds without dependencies.""" @@ -456,7 +454,8 @@ def _run(self): if platform.system() != "Linux" or os.environ.get("TASKGRAPH_SERIAL"): all_tasks = self._load_tasks_serial(kinds, kind_graph, parameters) else: - all_tasks = self._load_tasks_parallel(kinds, kind_graph, parameters) + executor = ProcessPoolExecutor(mp_context=multiprocessing.get_context("fork")) + all_tasks = self._load_tasks_parallel(kinds, kind_graph, parameters, executor) full_task_set = TaskGraph(all_tasks, Graph(frozenset(all_tasks), frozenset())) yield self.verify("full_task_set", full_task_set, graph_config, parameters) From b86b219b6bb7dd763b5a0ab1394908db672a3b56 Mon Sep 17 00:00:00 2001 From: Ben Hearsum Date: Thu, 12 Feb 2026 12:51:48 -0500 Subject: [PATCH 2/4] fix: make schema code thread-safe with locking Voluptuous contains thread-unsafe code; work around this by locking before calling into it. --- src/taskgraph/util/schema.py | 37 +++++++++++++++++++++++------------- 1 file changed, 24 insertions(+), 13 deletions(-) diff --git a/src/taskgraph/util/schema.py b/src/taskgraph/util/schema.py index f6330c196..d2e4919be 100644 --- a/src/taskgraph/util/schema.py +++ b/src/taskgraph/util/schema.py @@ -4,6 +4,7 @@ import pprint import re +import threading from collections.abc import Mapping from functools import reduce from typing import Literal, Optional, Union @@ -14,6 +15,8 @@ import taskgraph from taskgraph.util.keyed_by import evaluate_keyed_by, iter_dot_path +_schema_creation_lock = threading.RLock() + # Common type definitions that are used across multiple schemas TaskPriority = Literal[ "highest", "very-high", "high", "medium", "low", "very-low", "lowest" @@ -248,28 +251,36 @@ class LegacySchema(voluptuous.Schema): """ Operates identically to voluptuous.Schema, but applying some taskgraph-specific checks in the process. + + voluptuous.Schema's `_compile` method is thread-unsafe. Any usage (whether direct or + indirect) of it must be protected by a lock. """ def __init__(self, *args, check=True, **kwargs): - super().__init__(*args, **kwargs) + with _schema_creation_lock: + # this constructor may call `_compile` + super().__init__(*args, **kwargs) - self.check = check - if not taskgraph.fast and self.check: - check_schema(self) + self.check = check + if not taskgraph.fast and self.check: + check_schema(self) def extend(self, *args, **kwargs): - schema = super().extend(*args, **kwargs) + with _schema_creation_lock: + # `extend` may create a new Schema object, which may call `_compile` + schema = super().extend(*args, **kwargs) - if self.check: - check_schema(schema) - # We want twice extend schema to be checked too. - schema.__class__ = LegacySchema - return schema + if self.check: + check_schema(schema) + # We want twice extend schema to be checked too. + schema.__class__ = LegacySchema + return schema def _compile(self, schema): - if taskgraph.fast: - return - return super()._compile(schema) + with _schema_creation_lock: + if taskgraph.fast: + return + return super()._compile(schema) def __getitem__(self, item): return self.schema[item] # type: ignore From ddcacba13ea9b9aec34e961af6094666526e71be Mon Sep 17 00:00:00 2001 From: Ben Hearsum Date: Fri, 24 Oct 2025 14:24:55 -0400 Subject: [PATCH 3/4] feat: add support for parallel kind processing with threads Even with 3.14 free-threaded python, this is still a bit slower than multiprocessing on Linux, but it will allow us to start experimenting with it more, and may allow users on macOS and Windows to immediately see a speed-up. --- src/taskgraph/generator.py | 67 +++++++++++++++++++++++++------------- test/test_generator.py | 32 ++++++++++++++++-- 2 files changed, 74 insertions(+), 25 deletions(-) diff --git a/src/taskgraph/generator.py b/src/taskgraph/generator.py index 7707d4d8f..044069ae5 100644 --- a/src/taskgraph/generator.py +++ b/src/taskgraph/generator.py @@ -11,6 +11,7 @@ from concurrent.futures import ( FIRST_COMPLETED, ProcessPoolExecutor, + ThreadPoolExecutor, wait, ) from dataclasses import dataclass @@ -432,30 +433,50 @@ def _run(self): yield "kind_graph", kind_graph logger.info("Generating full task set") - # The short version of the below is: we only support parallel kind - # processing on Linux. + + # The next block deals with enabling parallel kind processing, which + # currently has different support on different platforms. In summary: + # * Parallel kind processing is supported and enabled by default on + # Linux. We use multiple processes by default, but experimental + # support for multiple threads can be enabled instead. + # * On other platforms, we have experimental support for parallel + # kind processing with multiple threads. # - # Current parallel generation relies on multiprocessing, and more - # specifically: the "fork" multiprocessing method. This is not supported - # at all on Windows (it uses "spawn"). Forking is supported on macOS, - # but no longer works reliably in all cases, and our usage of it here - # causes crashes. See https://github.com/python/cpython/issues/77906 - # and http://sealiesoftware.com/blog/archive/2017/6/5/Objective-C_and_fork_in_macOS_1013.html - # for more details on that. - # Other methods of multiprocessing (both "spawn" and "forkserver") - # do not work for our use case, because they cause global variables - # to be reinitialized, which are sometimes modified earlier in graph - # generation. These issues can theoretically be worked around by - # eliminating all reliance on globals as part of task generation, but - # is far from a small amount of work in users like Gecko/Firefox. - # In the long term, the better path forward is likely to be switching - # to threading with a free-threaded python to achieve similar parallel - # processing. - if platform.system() != "Linux" or os.environ.get("TASKGRAPH_SERIAL"): - all_tasks = self._load_tasks_serial(kinds, kind_graph, parameters) - else: - executor = ProcessPoolExecutor(mp_context=multiprocessing.get_context("fork")) - all_tasks = self._load_tasks_parallel(kinds, kind_graph, parameters, executor) + # On all platforms serial kind processing can be enabled by setting + # TASKGRAPH_SERIAL in the environment. + # + # On all platforms, multiple threads can be enabled by setting + # TASKGRAPH_USE_THREADS in the environment. Taskgraph must be running + # from a free-threaded Python build to see any performance benefits. + # + # In the long term, the goal is turn enabled parallel kind processing for + # all platforms by default using threads, and remove support for multiple + # processes altogether. + def load_tasks(): + if platform.system() == "Linux": + if os.environ.get("TASKGRAPH_SERIAL"): + return self._load_tasks_serial(kinds, kind_graph, parameters) + elif os.environ.get("TASKGRAPH_USE_THREADS"): + executor = ThreadPoolExecutor(max_workers=os.process_cpu_count()) + else: + executor = ProcessPoolExecutor( + mp_context=multiprocessing.get_context("fork") + ) + return self._load_tasks_parallel( + kinds, kind_graph, parameters, executor + ) + else: + if os.environ.get("TASKGRAPH_SERIAL") or not os.environ.get( + "TASKGRAPH_USE_THREADS" + ): + return self._load_tasks_serial(kinds, kind_graph, parameters) + else: + executor = ThreadPoolExecutor(max_workers=os.process_cpu_count()) + return self._load_tasks_parallel( + kinds, kind_graph, parameters, executor + ) + + all_tasks = load_tasks() full_task_set = TaskGraph(all_tasks, Graph(frozenset(all_tasks), frozenset())) yield self.verify("full_task_set", full_task_set, graph_config, parameters) diff --git a/test/test_generator.py b/test/test_generator.py index 783073b21..0353295ab 100644 --- a/test/test_generator.py +++ b/test/test_generator.py @@ -3,6 +3,7 @@ # file, You can obtain one at http://mozilla.org/MPL/2.0/. +import os import platform from concurrent.futures import ProcessPoolExecutor @@ -14,9 +15,13 @@ from taskgraph.loader.default import loader as default_loader linuxonly = pytest.mark.skipif( - platform.system() != "Linux", + platform.system() != "Linux" or os.environ.get("TASKGRAPH_USE_THREADS"), reason="requires Linux and 'fork' multiprocessing support", ) +threadsonly = pytest.mark.skipif( + not os.environ.get("TASKGRAPH_USE_THREADS"), + reason="requires multithreading to be enabled", +) class FakePPE(ProcessPoolExecutor): @@ -27,8 +32,16 @@ def submit(self, kind_load_tasks, *args): return super().submit(kind_load_tasks, *args) +class FakeTPE(ProcessPoolExecutor): + loaded_kinds = [] + + def submit(self, kind_load_tasks, *args): + self.loaded_kinds.append(kind_load_tasks.__self__.name) + return super().submit(kind_load_tasks, *args) + + @linuxonly -def test_kind_ordering(mocker, maketgg): +def test_kind_ordering_multiprocess(mocker, maketgg): "When task kinds depend on each other, they are loaded in postorder" mocked_ppe = mocker.patch.object(generator, "ProcessPoolExecutor", new=FakePPE) tgg = maketgg( @@ -42,6 +55,21 @@ def test_kind_ordering(mocker, maketgg): assert mocked_ppe.loaded_kinds == ["_fake1", "_fake2", "_fake3"] +@threadsonly +def test_kind_ordering_multithread(mocker, maketgg): + "When task kinds depend on each other, they are loaded in postorder" + mocked_tpe = mocker.patch.object(generator, "ThreadPoolExecutor", new=FakeTPE) + tgg = maketgg( + kinds=[ + ("_fake3", {"kind-dependencies": ["_fake2", "_fake1"]}), + ("_fake2", {"kind-dependencies": ["_fake1"]}), + ("_fake1", {"kind-dependencies": []}), + ] + ) + tgg._run_until("full_task_set") + assert mocked_tpe.loaded_kinds == ["_fake1", "_fake2", "_fake3"] + + def test_full_task_set(maketgg): "The full_task_set property has all tasks" tgg = maketgg() From f4b9a78cf46c181acfe112988f718d70f72874a0 Mon Sep 17 00:00:00 2001 From: Ben Hearsum Date: Thu, 12 Feb 2026 19:42:53 -0500 Subject: [PATCH 4/4] feat: add multithreading tests --- taskcluster/config.yml | 1 + taskcluster/kinds/test/linux.yml | 20 ++++++++++++++++++++ 2 files changed, 21 insertions(+) diff --git a/taskcluster/config.yml b/taskcluster/config.yml index 6f563b7ba..fd6eeedaa 100644 --- a/taskcluster/config.yml +++ b/taskcluster/config.yml @@ -7,6 +7,7 @@ treeherder: 'check': 'Checks and lints' 'doc': 'Documentation tasks' 'unit': 'Unit test tasks' + 'unit-multithread': 'Unit test tasks with multithreading enabled' 'integration': 'Integration test tasks' index: diff --git a/taskcluster/kinds/test/linux.yml b/taskcluster/kinds/test/linux.yml index a5e710866..c0b3b8280 100644 --- a/taskcluster/kinds/test/linux.yml +++ b/taskcluster/kinds/test/linux.yml @@ -50,6 +50,26 @@ unit: command: >- uv run coverage run --data-file /builds/worker/artifacts/coverage --context=py{matrix[python]} -m pytest -vv +unit-multithread: + description: "Run unit tests with py{matrix[python]} on Linux with multithreading enabled" + matrix: + set-name: "unit-multithread-py{matrix[python]}" + substitution-fields: [description, run.command, treeherder, worker, attributes] + python: ["314t"] + worker: + docker-image: {in-tree: python} + env: + TASKGRAPH_USE_THREADS: "1" + artifacts: + - type: file + path: "/builds/worker/artifacts/coverage" + name: "public/coverage.py{matrix[python]}" + treeherder: + symbol: unit-multithread(py{matrix[python]}) + run: + command: >- + uv run coverage run --data-file /builds/worker/artifacts/coverage --context=py{matrix[python]} -m pytest -vv + integration: description: "Run unit tests with py{matrix[python]} on Linux with resolution {matrix[resolution]}" attributes: