Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ informal introduction to the features and their implementation.
- [Customizing the Sandbox](#customizing-the-sandbox)
- [Passthrough Modules](#passthrough-modules)
- [Invalid Module Members](#invalid-module-members)
- [Debugging Workflows with `breakpoint()` / `pdb`](#debugging-workflows-with-breakpoint--pdb)
- [Known Sandbox Issues](#known-sandbox-issues)
- [Global Import/Builtins](#global-importbuiltins)
- [Sandbox is not Secure](#sandbox-is-not-secure)
Expand Down Expand Up @@ -1241,6 +1242,79 @@ my_worker = Worker(..., workflow_runner=SandboxedWorkflowRunner(restrictions=my_

See the API for more details on exact fields and their meaning.

##### Debugging Workflows with `breakpoint()` / `pdb`

Setting `debug_mode=True` on the `Worker` (or `TEMPORAL_DEBUG=1` in the environment) routes workflow activations
onto the asyncio main thread instead of a worker thread pool. This lets `breakpoint()` and `pdb.set_trace()`
inside workflow code open an interactive REPL — without it, pdb hangs because its `input()` call would run on a
thread that does not own the controlling TTY.

A minimal runnable example:

```python
import asyncio
from datetime import timedelta

from temporalio import workflow
from temporalio.client import Client
from temporalio.worker import Worker


@workflow.defn
class DebugMeWorkflow:
@workflow.run
async def run(self) -> str:
x = 42
breakpoint() # interactive pdb prompt opens at this line
return f"x was {x}"


async def main() -> None:
client = await Client.connect("localhost:7233")
async with Worker(
client,
task_queue="debug-me",
workflows=[DebugMeWorkflow],
debug_mode=True,
):
result = await client.execute_workflow(
DebugMeWorkflow.run,
id="debug-me-wf",
task_queue="debug-me",
task_timeout=timedelta(minutes=10), # see caveat below
)
print(result)


if __name__ == "__main__":
asyncio.run(main())
```

Run with `python debug_me.py`, or under pytest with `pytest -s` (the `-s` flag disables pytest's stdin
capture). At the `(Pdb)` prompt you'll land at the line where `breakpoint()` was called, with workflow
locals in scope. Try `p x`, `n`, `c`, `q`.

**Quitting cleanly.** Typing `q` or hitting Ctrl-D continues the workflow rather than raising `BdbQuit`
(which would fail the workflow task). To genuinely abort, kill the outer process with Ctrl-C.

Two caveats when pausing at a breakpoint inside a workflow:

1. **Workflow task timeout.** Temporal expires a workflow task after ~10 seconds by default. If you sit at the
`(Pdb)` prompt longer than that, the server reassigns the task and your workflow replays from the start when
you continue — re-hitting the breakpoint. Pass `task_timeout=timedelta(minutes=N)` to `execute_workflow` /
`start_workflow` to give yourself debugging headroom:

```python
await client.execute_workflow(MyWorkflow.run, ..., task_timeout=timedelta(minutes=10))
```

2. **Deterministic replay.** Workflows are deterministic and replay from history; any wall-clock pause violates
that contract. For post-mortem debugging without these caveats, use the [Replayer](#replayer) on a recorded
history instead of live debugging.

A `breakpoint()` call from workflow code without `debug_mode` enabled raises a `RuntimeError` with a pointer to
this section, so the failure mode is loud rather than a silent hang.

##### Known Sandbox Issues

Below are known sandbox issues. As the sandbox is developed and matures, some may be resolved.
Expand Down
165 changes: 165 additions & 0 deletions temporalio/worker/_debugger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
"""Workflow debugger support.

When ``debug_mode=True`` on the Worker (or the ``TEMPORAL_DEBUG`` env var
is set), the worker uses helpers from this module to make ``breakpoint()``
inside workflow code open an interactive pdb prompt. The inline-dispatch
piece lives on the worker itself; everything else (sandbox relaxation,
breakpoint hook, custom Pdb subclass) lives here.
"""

from __future__ import annotations

import dataclasses
import sys
import threading
from types import FrameType, TracebackType

import temporalio.workflow
from temporalio.worker.workflow_sandbox._runner import SandboxedWorkflowRunner

from ._workflow_instance import WorkflowRunner

__all__ = [
"_install_workflow_breakpoint_hook",
"_relax_sandbox_for_debugger",
"_temporal_workflow_breakpoint_hook",
]

# Prefix used to detect threads in the workflow task ThreadPoolExecutor.
_WORKFLOW_THREAD_NAME_PREFIX = "temporal_workflow_"

_ORIGINAL_BREAKPOINTHOOK = sys.breakpointhook


def _build_workflow_pdb_class() -> type:
"""Build a Pdb subclass that suspends sandbox restrictions during the REPL.

pdb's cmdloop touches ``readline.get_completer`` and other
sandbox-restricted internals each time it interacts with the user; we
bracket each interaction with ``_sandbox_unrestricted.value = True`` and
restore the previous value afterwards. Outside the REPL the sandbox
stays intact.

``pdb`` is imported lazily because it's a debug-only dependency that
pulls in ``cmd``/``bdb``/``linecache``; no reason to pay that cost at
worker import time.
"""
import pdb

from temporalio.workflow._sandbox import _sandbox_unrestricted

class _WorkflowPdb(pdb.Pdb):
# The `interaction` signature differs across Python versions: 3.10-3.12
# typeshed names the second parameter `traceback: TracebackType | None`,
# while 3.13+ renames it `tb_or_exc` and widens the type to include
# `BaseException`. No single signature satisfies both stubs, so we
# suppress the override check.
def interaction( # type: ignore[override]
self,
frame: FrameType | None,
tb_or_exc: TracebackType | BaseException | None,
) -> None:
prev = getattr(_sandbox_unrestricted, "value", False)
_sandbox_unrestricted.value = True
try:
super().interaction(frame, tb_or_exc) # type: ignore[arg-type]
finally:
_sandbox_unrestricted.value = prev

# Override `q`/`quit`/`exit`/EOF (Ctrl-D) to behave like `continue`.
# Default pdb raises `BdbQuit`, which propagates as an uncaught
# exception out of workflow.run, fails the workflow task, and
# triggers a server retry storm during teardown. For a debug
# session the user almost always wants "stop debugging and let the
# workflow finish" — that's `continue`. Users who truly want to
# abort can Ctrl-C the outer shell.
def do_quit(self, arg: str) -> bool | None:
self.message(
"[Temporal] 'q'/Ctrl-D continues the workflow. "
"Ctrl-C the outer shell to abort."
)
return self.do_continue(arg)

do_q = do_exit = do_quit
do_EOF = do_quit

return _WorkflowPdb


def _temporal_workflow_breakpoint_hook(*args: object, **kwargs: object) -> object:
"""Process-wide ``sys.breakpointhook`` that handles ``breakpoint()`` calls.

From a workflow worker thread without ``debug_mode``: raises a clear
``RuntimeError`` (replacing the previous silent hang). From inside a
workflow activation (with ``debug_mode`` on): drops the user into a
custom Pdb at the workflow's own frame. From anywhere else: delegates
to whatever hook was previously installed.
"""
if threading.current_thread().name.startswith(_WORKFLOW_THREAD_NAME_PREFIX):
raise RuntimeError(
"breakpoint() / pdb.set_trace() inside workflow code requires "
"debug_mode=True (or the TEMPORAL_DEBUG environment variable) on "
"the Worker. Without it the workflow runs on a thread pool and "
"pdb's interactive REPL cannot read stdin."
)
if not temporalio.workflow.in_workflow():
# Not inside a workflow activation — let pytest's wrapper, ipdb, or
# whatever else is configured handle it.
return _ORIGINAL_BREAKPOINTHOOK(*args, **kwargs)
# Inside a workflow: drop the user into pdb at the caller's frame (the
# workflow's `run` method, where breakpoint() was actually written) rather
# than landing inside this hook. Bypassing the configured breakpoint hook
# also avoids pytest's pdb wrapper, which assumes a test-code context and
# touches sandbox-restricted internals during its terminal-writer setup.
# `sandbox_unrestricted()` lifts member checks for the duration of the
# REPL so pdb's own initialization (readline, etc.) isn't blocked.
# `skip` tells pdb not to stop in our hook frame or the contextlib
# plumbing — without it pdb's first step lands at the `with` teardown
# instead of the user's next workflow line.
caller_frame = sys._getframe(1)
with temporalio.workflow.unsafe.sandbox_unrestricted():
pdb_cls = _build_workflow_pdb_class()
pdb_cls(
skip=[
"temporalio.worker._debugger",
"temporalio.workflow._sandbox",
"contextlib",
]
).set_trace(caller_frame)
return None


def _install_workflow_breakpoint_hook() -> None:
"""Set ``sys.breakpointhook`` to the workflow hook if it isn't already."""
if sys.breakpointhook is not _temporal_workflow_breakpoint_hook:
sys.breakpointhook = _temporal_workflow_breakpoint_hook


def _relax_sandbox_for_debugger(workflow_runner: WorkflowRunner) -> WorkflowRunner:
"""Allow ``breakpoint()`` past the sandbox so it can reach the worker hook.

The sandbox flags ``breakpoint`` as non-deterministic by default; without
this relaxation the call raises before our breakpoint hook can run.
Once inside the hook, the hook itself enters ``sandbox_unrestricted()``
for the duration of the debugger session, so pdb's internals (readline,
os.environ, etc.) aren't blocked either — without permanently dropping
sandbox checks for the rest of workflow execution.
"""
if not isinstance(workflow_runner, SandboxedWorkflowRunner):
return workflow_runner

restrictions = workflow_runner.restrictions
invalid = restrictions.invalid_module_members
builtins_matcher = invalid.children.get("__builtins__")
if builtins_matcher is None or "breakpoint" not in builtins_matcher.use:
return workflow_runner

new_use = set(builtins_matcher.use) - {"breakpoint"}
new_builtins = dataclasses.replace(builtins_matcher, use=new_use)
new_invalid = dataclasses.replace(
invalid, children={**invalid.children, "__builtins__": new_builtins}
)
new_restrictions = dataclasses.replace(
restrictions, invalid_module_members=new_invalid
)
return dataclasses.replace(workflow_runner, restrictions=new_restrictions)
Loading