Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 46 additions & 26 deletions src/taskgraph/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from concurrent.futures import (
FIRST_COMPLETED,
ProcessPoolExecutor,
ThreadPoolExecutor,
wait,
)
from dataclasses import dataclass
Expand Down Expand Up @@ -312,15 +313,13 @@ def _load_tasks_serial(self, kinds, kind_graph, parameters):

return all_tasks

def _load_tasks_parallel(self, kinds, kind_graph, parameters):
def _load_tasks_parallel(self, kinds, kind_graph, parameters, executor):
all_tasks = {}
futures_to_kind = {}
futures = set()
edges = set(kind_graph.edges)

with ProcessPoolExecutor(
mp_context=multiprocessing.get_context("fork")
) as executor:
with executor:

def submit_ready_kinds():
"""Create the next batch of tasks for kinds without dependencies."""
Expand Down Expand Up @@ -434,29 +433,50 @@ def _run(self):
yield "kind_graph", kind_graph

logger.info("Generating full task set")
# The short version of the below is: we only support parallel kind
# processing on Linux.

# The next block deals with enabling parallel kind processing, which
# currently has different support on different platforms. In summary:
# * Parallel kind processing is supported and enabled by default on
# Linux. We use multiple processes by default, but experimental
# support for multiple threads can be enabled instead.
# * On other platforms, we have experimental support for parallel
# kind processing with multiple threads.
#
# Current parallel generation relies on multiprocessing, and more
# specifically: the "fork" multiprocessing method. This is not supported
# at all on Windows (it uses "spawn"). Forking is supported on macOS,
# but no longer works reliably in all cases, and our usage of it here
# causes crashes. See https://github.com/python/cpython/issues/77906
# and http://sealiesoftware.com/blog/archive/2017/6/5/Objective-C_and_fork_in_macOS_1013.html
# for more details on that.
# Other methods of multiprocessing (both "spawn" and "forkserver")
# do not work for our use case, because they cause global variables
# to be reinitialized, which are sometimes modified earlier in graph
# generation. These issues can theoretically be worked around by
# eliminating all reliance on globals as part of task generation, but
# is far from a small amount of work in users like Gecko/Firefox.
# In the long term, the better path forward is likely to be switching
# to threading with a free-threaded python to achieve similar parallel
# processing.
if platform.system() != "Linux" or os.environ.get("TASKGRAPH_SERIAL"):
all_tasks = self._load_tasks_serial(kinds, kind_graph, parameters)
else:
all_tasks = self._load_tasks_parallel(kinds, kind_graph, parameters)
# On all platforms serial kind processing can be enabled by setting
# TASKGRAPH_SERIAL in the environment.
#
# On all platforms, multiple threads can be enabled by setting
# TASKGRAPH_USE_THREADS in the environment. Taskgraph must be running
# from a free-threaded Python build to see any performance benefits.
#
# In the long term, the goal is turn enabled parallel kind processing for
# all platforms by default using threads, and remove support for multiple
# processes altogether.
def load_tasks():
if platform.system() == "Linux":
if os.environ.get("TASKGRAPH_SERIAL"):
return self._load_tasks_serial(kinds, kind_graph, parameters)
elif os.environ.get("TASKGRAPH_USE_THREADS"):
executor = ThreadPoolExecutor(max_workers=os.process_cpu_count())
else:
executor = ProcessPoolExecutor(
mp_context=multiprocessing.get_context("fork")
)
return self._load_tasks_parallel(
kinds, kind_graph, parameters, executor
)
else:
if os.environ.get("TASKGRAPH_SERIAL") or not os.environ.get(
"TASKGRAPH_USE_THREADS"
):
return self._load_tasks_serial(kinds, kind_graph, parameters)
else:
executor = ThreadPoolExecutor(max_workers=os.process_cpu_count())
return self._load_tasks_parallel(
kinds, kind_graph, parameters, executor
)

all_tasks = load_tasks()

full_task_set = TaskGraph(all_tasks, Graph(frozenset(all_tasks), frozenset()))
yield self.verify("full_task_set", full_task_set, graph_config, parameters)
Expand Down
37 changes: 24 additions & 13 deletions src/taskgraph/util/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import pprint
import re
import threading
from collections.abc import Mapping
from functools import reduce
from typing import Literal, Optional, Union
Expand All @@ -14,6 +15,8 @@
import taskgraph
from taskgraph.util.keyed_by import evaluate_keyed_by, iter_dot_path

_schema_creation_lock = threading.RLock()

# Common type definitions that are used across multiple schemas
TaskPriority = Literal[
"highest", "very-high", "high", "medium", "low", "very-low", "lowest"
Expand Down Expand Up @@ -248,28 +251,36 @@ class LegacySchema(voluptuous.Schema):
"""
Operates identically to voluptuous.Schema, but applying some taskgraph-specific checks
in the process.

voluptuous.Schema's `_compile` method is thread-unsafe. Any usage (whether direct or
indirect) of it must be protected by a lock.
"""

def __init__(self, *args, check=True, **kwargs):
super().__init__(*args, **kwargs)
with _schema_creation_lock:
# this constructor may call `_compile`
super().__init__(*args, **kwargs)

self.check = check
if not taskgraph.fast and self.check:
check_schema(self)
self.check = check
if not taskgraph.fast and self.check:
check_schema(self)

def extend(self, *args, **kwargs):
schema = super().extend(*args, **kwargs)
with _schema_creation_lock:
# `extend` may create a new Schema object, which may call `_compile`
schema = super().extend(*args, **kwargs)

if self.check:
check_schema(schema)
# We want twice extend schema to be checked too.
schema.__class__ = LegacySchema
return schema
if self.check:
check_schema(schema)
# We want twice extend schema to be checked too.
schema.__class__ = LegacySchema
return schema

def _compile(self, schema):
if taskgraph.fast:
return
return super()._compile(schema)
with _schema_creation_lock:
if taskgraph.fast:
return
return super()._compile(schema)

def __getitem__(self, item):
return self.schema[item] # type: ignore
Expand Down
1 change: 1 addition & 0 deletions taskcluster/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ treeherder:
'check': 'Checks and lints'
'doc': 'Documentation tasks'
'unit': 'Unit test tasks'
'unit-multithread': 'Unit test tasks with multithreading enabled'
'integration': 'Integration test tasks'

index:
Expand Down
20 changes: 20 additions & 0 deletions taskcluster/kinds/test/linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,26 @@ unit:
command: >-
uv run coverage run --data-file /builds/worker/artifacts/coverage --context=py{matrix[python]} -m pytest -vv

unit-multithread:
description: "Run unit tests with py{matrix[python]} on Linux with multithreading enabled"
matrix:
set-name: "unit-multithread-py{matrix[python]}"
substitution-fields: [description, run.command, treeherder, worker, attributes]
python: ["314t"]
worker:
docker-image: {in-tree: python}
env:
TASKGRAPH_USE_THREADS: "1"
artifacts:
- type: file
path: "/builds/worker/artifacts/coverage"
name: "public/coverage.py{matrix[python]}"
treeherder:
symbol: unit-multithread(py{matrix[python]})
run:
command: >-
uv run coverage run --data-file /builds/worker/artifacts/coverage --context=py{matrix[python]} -m pytest -vv

integration:
description: "Run unit tests with py{matrix[python]} on Linux with resolution {matrix[resolution]}"
attributes:
Expand Down
32 changes: 30 additions & 2 deletions test/test_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# file, You can obtain one at http://mozilla.org/MPL/2.0/.


import os
import platform
from concurrent.futures import ProcessPoolExecutor

Expand All @@ -14,9 +15,13 @@
from taskgraph.loader.default import loader as default_loader

linuxonly = pytest.mark.skipif(
platform.system() != "Linux",
platform.system() != "Linux" or os.environ.get("TASKGRAPH_USE_THREADS"),
reason="requires Linux and 'fork' multiprocessing support",
)
threadsonly = pytest.mark.skipif(
not os.environ.get("TASKGRAPH_USE_THREADS"),
reason="requires multithreading to be enabled",
)


class FakePPE(ProcessPoolExecutor):
Expand All @@ -27,8 +32,16 @@ def submit(self, kind_load_tasks, *args):
return super().submit(kind_load_tasks, *args)


class FakeTPE(ProcessPoolExecutor):
loaded_kinds = []

def submit(self, kind_load_tasks, *args):
self.loaded_kinds.append(kind_load_tasks.__self__.name)
return super().submit(kind_load_tasks, *args)


@linuxonly
def test_kind_ordering(mocker, maketgg):
def test_kind_ordering_multiprocess(mocker, maketgg):
"When task kinds depend on each other, they are loaded in postorder"
mocked_ppe = mocker.patch.object(generator, "ProcessPoolExecutor", new=FakePPE)
tgg = maketgg(
Expand All @@ -42,6 +55,21 @@ def test_kind_ordering(mocker, maketgg):
assert mocked_ppe.loaded_kinds == ["_fake1", "_fake2", "_fake3"]


@threadsonly
def test_kind_ordering_multithread(mocker, maketgg):
"When task kinds depend on each other, they are loaded in postorder"
mocked_tpe = mocker.patch.object(generator, "ThreadPoolExecutor", new=FakeTPE)
tgg = maketgg(
kinds=[
("_fake3", {"kind-dependencies": ["_fake2", "_fake1"]}),
("_fake2", {"kind-dependencies": ["_fake1"]}),
("_fake1", {"kind-dependencies": []}),
]
)
tgg._run_until("full_task_set")
assert mocked_tpe.loaded_kinds == ["_fake1", "_fake2", "_fake3"]


def test_full_task_set(maketgg):
"The full_task_set property has all tasks"
tgg = maketgg()
Expand Down
Loading