From 8b93ffbab3c3ed0cb0c9901e6879c383074e29ef Mon Sep 17 00:00:00 2001 From: Elizabeth Locke Date: Fri, 22 May 2026 15:00:09 -0400 Subject: [PATCH 1/2] Fix pdb / breakpoint() hang in workflow code (#1104) When debug_mode=True (or TEMPORAL_DEBUG=1), breakpoint() inside workflow code now opens an interactive pdb prompt -- including from a sandboxed workflow run under pytest. Four pieces: - Inline dispatch on the asyncio main thread (via loop.call_soon to avoid nesting inside the dispatch task's __step() and tripping Python 3.14's task-entry validation). - breakpoint removed from the sandbox's invalid builtins so the call reaches the worker hook. Nothing else is relaxed. - A Pdb subclass that lands at the workflow's own frame, suspends sandbox checks during each REPL interaction, and overrides q/Ctrl-D to continue the workflow instead of failing it with BdbQuit. - A defensive sys.breakpointhook that raises a clear RuntimeError when breakpoint() is called from a workflow worker thread without debug_mode, replacing the previous silent hang. When debug_mode is not set, the worker's dispatch and sandbox config are unchanged. Adds a README subsection on debugging workflows and five tests at tests/worker/test_breakpoint_hang.py. Verified on Python 3.13 and 3.14. Closes #1104. --- README.md | 74 +++++++++ temporalio/worker/_debugger.py | 165 ++++++++++++++++++++ temporalio/worker/_workflow.py | 137 ++++++++++++----- tests/worker/test_breakpoint_hang.py | 222 +++++++++++++++++++++++++++ 4 files changed, 556 insertions(+), 42 deletions(-) create mode 100644 temporalio/worker/_debugger.py create mode 100644 tests/worker/test_breakpoint_hang.py diff --git a/README.md b/README.md index e00de1e84..464463845 100644 --- a/README.md +++ b/README.md @@ -82,6 +82,7 @@ informal introduction to the features and their implementation. - [Customizing the Sandbox](#customizing-the-sandbox) - [Passthrough Modules](#passthrough-modules) - [Invalid Module Members](#invalid-module-members) + - [Debugging Workflows with `breakpoint()` / `pdb`](#debugging-workflows-with-breakpoint--pdb) - [Known Sandbox Issues](#known-sandbox-issues) - [Global Import/Builtins](#global-importbuiltins) - [Sandbox is not Secure](#sandbox-is-not-secure) @@ -1241,6 +1242,79 @@ my_worker = Worker(..., workflow_runner=SandboxedWorkflowRunner(restrictions=my_ See the API for more details on exact fields and their meaning. +##### Debugging Workflows with `breakpoint()` / `pdb` + +Setting `debug_mode=True` on the `Worker` (or `TEMPORAL_DEBUG=1` in the environment) routes workflow activations +onto the asyncio main thread instead of a worker thread pool. This lets `breakpoint()` and `pdb.set_trace()` +inside workflow code open an interactive REPL — without it, pdb hangs because its `input()` call would run on a +thread that does not own the controlling TTY. + +A minimal runnable example: + +```python +import asyncio +from datetime import timedelta + +from temporalio import workflow +from temporalio.client import Client +from temporalio.worker import Worker + + +@workflow.defn +class DebugMeWorkflow: + @workflow.run + async def run(self) -> str: + x = 42 + breakpoint() # interactive pdb prompt opens at this line + return f"x was {x}" + + +async def main() -> None: + client = await Client.connect("localhost:7233") + async with Worker( + client, + task_queue="debug-me", + workflows=[DebugMeWorkflow], + debug_mode=True, + ): + result = await client.execute_workflow( + DebugMeWorkflow.run, + id="debug-me-wf", + task_queue="debug-me", + task_timeout=timedelta(minutes=10), # see caveat below + ) + print(result) + + +if __name__ == "__main__": + asyncio.run(main()) +``` + +Run with `python debug_me.py`, or under pytest with `pytest -s` (the `-s` flag disables pytest's stdin +capture). At the `(Pdb)` prompt you'll land at the line where `breakpoint()` was called, with workflow +locals in scope. Try `p x`, `n`, `c`, `q`. + +**Quitting cleanly.** Typing `q` or hitting Ctrl-D continues the workflow rather than raising `BdbQuit` +(which would fail the workflow task). To genuinely abort, kill the outer process with Ctrl-C. + +Two caveats when pausing at a breakpoint inside a workflow: + +1. **Workflow task timeout.** Temporal expires a workflow task after ~10 seconds by default. If you sit at the + `(Pdb)` prompt longer than that, the server reassigns the task and your workflow replays from the start when + you continue — re-hitting the breakpoint. Pass `task_timeout=timedelta(minutes=N)` to `execute_workflow` / + `start_workflow` to give yourself debugging headroom: + + ```python + await client.execute_workflow(MyWorkflow.run, ..., task_timeout=timedelta(minutes=10)) + ``` + +2. **Deterministic replay.** Workflows are deterministic and replay from history; any wall-clock pause violates + that contract. For post-mortem debugging without these caveats, use the [Replayer](#replayer) on a recorded + history instead of live debugging. + +A `breakpoint()` call from workflow code without `debug_mode` enabled raises a `RuntimeError` with a pointer to +this section, so the failure mode is loud rather than a silent hang. + ##### Known Sandbox Issues Below are known sandbox issues. As the sandbox is developed and matures, some may be resolved. diff --git a/temporalio/worker/_debugger.py b/temporalio/worker/_debugger.py new file mode 100644 index 000000000..a1f0181e3 --- /dev/null +++ b/temporalio/worker/_debugger.py @@ -0,0 +1,165 @@ +"""Workflow debugger support. + +When ``debug_mode=True`` on the Worker (or the ``TEMPORAL_DEBUG`` env var +is set), the worker uses helpers from this module to make ``breakpoint()`` +inside workflow code open an interactive pdb prompt. The inline-dispatch +piece lives on the worker itself; everything else (sandbox relaxation, +breakpoint hook, custom Pdb subclass) lives here. +""" + +from __future__ import annotations + +import dataclasses +import sys +import threading +from types import FrameType, TracebackType + +import temporalio.workflow +from temporalio.worker.workflow_sandbox._runner import SandboxedWorkflowRunner + +from ._workflow_instance import WorkflowRunner + +__all__ = [ + "_install_workflow_breakpoint_hook", + "_relax_sandbox_for_debugger", + "_temporal_workflow_breakpoint_hook", +] + +# Prefix used to detect threads in the workflow task ThreadPoolExecutor. +_WORKFLOW_THREAD_NAME_PREFIX = "temporal_workflow_" + +_ORIGINAL_BREAKPOINTHOOK = sys.breakpointhook + + +def _build_workflow_pdb_class() -> type: + """Build a Pdb subclass that suspends sandbox restrictions during the REPL. + + pdb's cmdloop touches ``readline.get_completer`` and other + sandbox-restricted internals each time it interacts with the user; we + bracket each interaction with ``_sandbox_unrestricted.value = True`` and + restore the previous value afterwards. Outside the REPL the sandbox + stays intact. + + ``pdb`` is imported lazily because it's a debug-only dependency that + pulls in ``cmd``/``bdb``/``linecache``; no reason to pay that cost at + worker import time. + """ + import pdb + + from temporalio.workflow._sandbox import _sandbox_unrestricted + + class _WorkflowPdb(pdb.Pdb): + # The `interaction` signature differs across Python versions: 3.10-3.12 + # typeshed names the second parameter `traceback: TracebackType | None`, + # while 3.13+ renames it `tb_or_exc` and widens the type to include + # `BaseException`. No single signature satisfies both stubs, so we + # suppress the override check. + def interaction( # type: ignore[override] + self, + frame: FrameType | None, + tb_or_exc: TracebackType | BaseException | None, + ) -> None: + prev = getattr(_sandbox_unrestricted, "value", False) + _sandbox_unrestricted.value = True + try: + super().interaction(frame, tb_or_exc) # type: ignore[arg-type] + finally: + _sandbox_unrestricted.value = prev + + # Override `q`/`quit`/`exit`/EOF (Ctrl-D) to behave like `continue`. + # Default pdb raises `BdbQuit`, which propagates as an uncaught + # exception out of workflow.run, fails the workflow task, and + # triggers a server retry storm during teardown. For a debug + # session the user almost always wants "stop debugging and let the + # workflow finish" — that's `continue`. Users who truly want to + # abort can Ctrl-C the outer shell. + def do_quit(self, arg: str) -> bool | None: + self.message( + "[Temporal] 'q'/Ctrl-D continues the workflow. " + "Ctrl-C the outer shell to abort." + ) + return self.do_continue(arg) + + do_q = do_exit = do_quit + do_EOF = do_quit + + return _WorkflowPdb + + +def _temporal_workflow_breakpoint_hook(*args: object, **kwargs: object) -> object: + """Process-wide ``sys.breakpointhook`` that handles ``breakpoint()`` calls. + + From a workflow worker thread without ``debug_mode``: raises a clear + ``RuntimeError`` (replacing the previous silent hang). From inside a + workflow activation (with ``debug_mode`` on): drops the user into a + custom Pdb at the workflow's own frame. From anywhere else: delegates + to whatever hook was previously installed. + """ + if threading.current_thread().name.startswith(_WORKFLOW_THREAD_NAME_PREFIX): + raise RuntimeError( + "breakpoint() / pdb.set_trace() inside workflow code requires " + "debug_mode=True (or the TEMPORAL_DEBUG environment variable) on " + "the Worker. Without it the workflow runs on a thread pool and " + "pdb's interactive REPL cannot read stdin." + ) + if not temporalio.workflow.in_workflow(): + # Not inside a workflow activation — let pytest's wrapper, ipdb, or + # whatever else is configured handle it. + return _ORIGINAL_BREAKPOINTHOOK(*args, **kwargs) + # Inside a workflow: drop the user into pdb at the caller's frame (the + # workflow's `run` method, where breakpoint() was actually written) rather + # than landing inside this hook. Bypassing the configured breakpoint hook + # also avoids pytest's pdb wrapper, which assumes a test-code context and + # touches sandbox-restricted internals during its terminal-writer setup. + # `sandbox_unrestricted()` lifts member checks for the duration of the + # REPL so pdb's own initialization (readline, etc.) isn't blocked. + # `skip` tells pdb not to stop in our hook frame or the contextlib + # plumbing — without it pdb's first step lands at the `with` teardown + # instead of the user's next workflow line. + caller_frame = sys._getframe(1) + with temporalio.workflow.unsafe.sandbox_unrestricted(): + pdb_cls = _build_workflow_pdb_class() + pdb_cls( + skip=[ + "temporalio.worker._debugger", + "temporalio.workflow._sandbox", + "contextlib", + ] + ).set_trace(caller_frame) + return None + + +def _install_workflow_breakpoint_hook() -> None: + """Set ``sys.breakpointhook`` to the workflow hook if it isn't already.""" + if sys.breakpointhook is not _temporal_workflow_breakpoint_hook: + sys.breakpointhook = _temporal_workflow_breakpoint_hook + + +def _relax_sandbox_for_debugger(workflow_runner: WorkflowRunner) -> WorkflowRunner: + """Allow ``breakpoint()`` past the sandbox so it can reach the worker hook. + + The sandbox flags ``breakpoint`` as non-deterministic by default; without + this relaxation the call raises before our breakpoint hook can run. + Once inside the hook, the hook itself enters ``sandbox_unrestricted()`` + for the duration of the debugger session, so pdb's internals (readline, + os.environ, etc.) aren't blocked either — without permanently dropping + sandbox checks for the rest of workflow execution. + """ + if not isinstance(workflow_runner, SandboxedWorkflowRunner): + return workflow_runner + + restrictions = workflow_runner.restrictions + invalid = restrictions.invalid_module_members + builtins_matcher = invalid.children.get("__builtins__") + if builtins_matcher is None or "breakpoint" not in builtins_matcher.use: + return workflow_runner + + new_use = set(builtins_matcher.use) - {"breakpoint"} + new_builtins = dataclasses.replace(builtins_matcher, use=new_use) + new_invalid = dataclasses.replace( + invalid, children={**invalid.children, "__builtins__": new_builtins} + ) + new_restrictions = dataclasses.replace( + restrictions, invalid_module_members=new_invalid + ) + return dataclasses.replace(workflow_runner, restrictions=new_restrictions) diff --git a/temporalio/worker/_workflow.py b/temporalio/worker/_workflow.py index 9e2ac9c7b..01a352b18 100644 --- a/temporalio/worker/_workflow.py +++ b/temporalio/worker/_workflow.py @@ -32,6 +32,10 @@ from temporalio.worker.workflow_sandbox._runner import SandboxedWorkflowRunner from . import _command_aware_visitor +from ._debugger import ( + _install_workflow_breakpoint_hook, + _relax_sandbox_for_debugger, +) from ._interceptor import ( Interceptor, WorkflowInboundInterceptor, @@ -49,6 +53,7 @@ # Set to true to log all activations and completions LOG_PROTOS = False + # Value was chosen abitrarily as a small number that allows some concurrency and prevents # large numbers of concurrent external storage operations causing resource contention. # This default limit is per workflow task activation and does not limit the total number @@ -111,6 +116,11 @@ def __init__( ), ) + # In debug mode, also lift the sandbox restriction on breakpoint() so + # pdb works in workflow code. + self._debug_mode = debug_mode + if self._debug_mode: + workflow_runner = _relax_sandbox_for_debugger(workflow_runner) self._workflow_runner = workflow_runner self._unsandboxed_workflow_runner = unsandboxed_workflow_runner @@ -145,7 +155,9 @@ def __init__( # If debug mode is enabled, disable deadlock detection # otherwise set to 2 seconds - self._deadlock_timeout_seconds = None if debug_mode else 2 + self._deadlock_timeout_seconds = None if self._debug_mode else 2 + + _install_workflow_breakpoint_hook() # Keep track of workflows that could not be evicted self._could_not_evict_count = 0 @@ -255,6 +267,34 @@ async def drain_poll_queue(self) -> None: except PollShutdownError: return + async def _activate_inline_for_debug( + self, + loop: asyncio.AbstractEventLoop, + workflow: _RunningWorkflow, + act: temporalio.bridge.proto.workflow_activation.WorkflowActivation, + ) -> temporalio.bridge.proto.workflow_completion.WorkflowActivationCompletion: + # Indirect through call_soon + a future so the activation runs outside + # the dispatch task's __step() context. Python 3.14 refuses to enter a + # task while another on the same thread is mid-step; suspending at the + # await below clears that state so workflow.activate can step its own + # task without collision. + future: asyncio.Future = loop.create_future() + + def run_inline() -> None: + # _run_once clears the running-loop registration on exit; restore + # the main loop so later code sees the right one. + main_loop = asyncio._get_running_loop() + try: + completion = workflow.activate(act) + future.set_result(completion) + except BaseException as e: + future.set_exception(e) + finally: + asyncio._set_running_loop(main_loop) + + loop.call_soon(run_inline) + return await future + async def _handle_activation( self, act: temporalio.bridge.proto.workflow_activation.WorkflowActivation ) -> None: @@ -344,35 +384,43 @@ async def _handle_activation( ) self._running_workflows[act.run_id] = workflow - # Run activation in separate thread so we can check if it's - # deadlocked - activate_task = asyncio.get_running_loop().run_in_executor( - self._workflow_task_executor, - workflow.activate, - act, - ) - - # Run activation task with deadlock timeout - try: - completion = await asyncio.wait_for( - activate_task, self._deadlock_timeout_seconds + if self._debug_mode: + # Inline on the main thread so pdb / breakpoint() can read + # stdin. The loop blocks during the activation — that's the + # intended single-stepping semantic. + completion = await self._activate_inline_for_debug( + asyncio.get_running_loop(), workflow, act ) - except asyncio.TimeoutError: - # Need to create the deadlock exception up here so it - # captures the trace now instead of later after we may have - # interrupted it - deadlock_exc = _DeadlockError.from_deadlocked_workflow( - workflow.instance, self._deadlock_timeout_seconds + else: + # Run activation in separate thread so we can check if it's + # deadlocked + activate_task = asyncio.get_running_loop().run_in_executor( + self._workflow_task_executor, + workflow.activate, + act, ) - # When we deadlock, we will raise an exception to fail - # the task. But before we do that, we want to try to - # interrupt the thread and put this activation task on - # the workflow so that the successive eviction can wait - # on it before trying to evict. - workflow.attempt_deadlock_interruption() - # Set the task and raise - workflow.deadlocked_activation_task = activate_task - raise deadlock_exc from None + + # Run activation task with deadlock timeout + try: + completion = await asyncio.wait_for( + activate_task, self._deadlock_timeout_seconds + ) + except asyncio.TimeoutError: + # Need to create the deadlock exception up here so it + # captures the trace now instead of later after we may have + # interrupted it + deadlock_exc = _DeadlockError.from_deadlocked_workflow( + workflow.instance, self._deadlock_timeout_seconds + ) + # When we deadlock, we will raise an exception to fail + # the task. But before we do that, we want to try to + # interrupt the thread and put this activation task on + # the workflow so that the successive eviction can wait + # on it before trying to evict. + workflow.attempt_deadlock_interruption() + # Set the task and raise + workflow.deadlocked_activation_task = activate_task + raise deadlock_exc from None except Exception as err: if isinstance(err, _DeadlockError): @@ -590,22 +638,27 @@ async def _handle_cache_eviction( handle_eviction_task: asyncio.Future | None = None while True: try: - # We only create the eviction task if we haven't already or - # it is done. This is because if it already is running and - # timed out, it's still running (and holding on to a - # thread). But if did complete running but failed with - # another error, we want to re-create the task. - if not handle_eviction_task or handle_eviction_task.done(): - handle_eviction_task = ( - asyncio.get_running_loop().run_in_executor( - self._workflow_task_executor, - workflow.activate, - act, + if self._debug_mode: + await self._activate_inline_for_debug( + asyncio.get_running_loop(), workflow, act + ) + else: + # We only create the eviction task if we haven't already or + # it is done. This is because if it already is running and + # timed out, it's still running (and holding on to a + # thread). But if did complete running but failed with + # another error, we want to re-create the task. + if not handle_eviction_task or handle_eviction_task.done(): + handle_eviction_task = ( + asyncio.get_running_loop().run_in_executor( + self._workflow_task_executor, + workflow.activate, + act, + ) ) + await asyncio.wait_for( + handle_eviction_task, self._deadlock_timeout_seconds ) - await asyncio.wait_for( - handle_eviction_task, self._deadlock_timeout_seconds - ) # Break if it succeeds break except BaseException as err: diff --git a/tests/worker/test_breakpoint_hang.py b/tests/worker/test_breakpoint_hang.py new file mode 100644 index 000000000..240bd8440 --- /dev/null +++ b/tests/worker/test_breakpoint_hang.py @@ -0,0 +1,222 @@ +"""Tests for the pdb / breakpoint() fix in workflow code (#1104).""" + +from __future__ import annotations + +import pdb +import sys +import threading +import uuid +from types import FrameType +from typing import Any +from unittest.mock import patch + +from temporalio import workflow +from temporalio.client import Client +from temporalio.worker import Worker +from temporalio.worker._debugger import _temporal_workflow_breakpoint_hook + + +@workflow.defn(sandboxed=False) +class ThreadCaptureWorkflow: + """Returns the name of the thread the workflow runs on. + + `sandboxed=False` so `threading.current_thread()` isn't intercepted — + these tests are about thread placement, not sandbox behavior. + """ + + @workflow.run + async def run(self) -> str: + return threading.current_thread().name + + +async def test_workflow_runs_on_pool_thread_without_debug_mode(client: Client): + """Production behavior unchanged: workflows run on `temporal_workflow_*`.""" + task_queue = f"tq-{uuid.uuid4()}" + async with Worker( + client, + task_queue=task_queue, + workflows=[ThreadCaptureWorkflow], + ): + thread_name = await client.execute_workflow( + ThreadCaptureWorkflow.run, + id=f"wf-{uuid.uuid4()}", + task_queue=task_queue, + ) + + main_name = threading.main_thread().name + assert thread_name != main_name, ( + f"workflow ran on the main thread ({main_name!r}) — production behavior changed" + ) + assert thread_name.startswith("temporal_workflow_"), ( + f"expected pool thread, got {thread_name!r}" + ) + + +async def test_workflow_runs_on_main_thread_in_debug_mode(client: Client): + """debug_mode=True moves workflow activation to the asyncio main thread + so pdb's input() reaches the controlling TTY.""" + task_queue = f"tq-{uuid.uuid4()}" + async with Worker( + client, + task_queue=task_queue, + workflows=[ThreadCaptureWorkflow], + debug_mode=True, + ): + thread_name = await client.execute_workflow( + ThreadCaptureWorkflow.run, + id=f"wf-{uuid.uuid4()}", + task_queue=task_queue, + ) + + main_name = threading.main_thread().name + assert thread_name == main_name, ( + f"expected workflow on main thread ({main_name!r}) in debug mode; " + f"got {thread_name!r}" + ) + + +@workflow.defn +class SandboxedBreakpointWorkflow: + """Sandboxed workflow that calls breakpoint() — verifies the fix works + without requiring users to switch to UnsandboxedWorkflowRunner.""" + + @workflow.run + async def run(self) -> str: + bird = "chicken" + breakpoint() + return f"bird was {bird}" + + +async def test_breakpoint_works_in_sandboxed_workflow_in_debug_mode(client: Client): + """breakpoint() inside a sandboxed workflow reaches the debugger when + debug_mode=True — no need to switch to UnsandboxedWorkflowRunner. + + Patches `pdb.Pdb.set_trace` with a stub so CI doesn't hang on an + interactive prompt. Reaching the stub on `MainThread` with the + workflow's `run` frame proves the full path (sandbox relaxation -> + our hook -> pdb) works through the sandbox. Also verifies workflow + locals (`bird`) are visible in the captured frame. + """ + captured: dict[str, object] = {} + + def stub_set_trace(_self: pdb.Pdb, frame: FrameType | None = None) -> None: + captured["thread"] = threading.current_thread().name + captured["frame_name"] = frame.f_code.co_name if frame else None + captured["bird"] = frame.f_locals.get("bird") if frame else None + captured["called"] = True + + task_queue = f"tq-{uuid.uuid4()}" + with patch.object(pdb.Pdb, "set_trace", stub_set_trace): + async with Worker( + client, + task_queue=task_queue, + workflows=[SandboxedBreakpointWorkflow], + debug_mode=True, + ): + result = await client.execute_workflow( + SandboxedBreakpointWorkflow.run, + id=f"wf-{uuid.uuid4()}", + task_queue=task_queue, + ) + + assert result == "bird was chicken", ( + f"workflow did not complete; breakpoint() likely raised inside the sandbox: " + f"result={result!r}" + ) + assert captured.get("called"), "pdb.Pdb.set_trace was never reached" + assert captured["thread"] == threading.main_thread().name, ( + f"breakpoint landed on {captured['thread']!r}, not the main thread" + ) + assert captured["frame_name"] == "run", ( + f"breakpoint stopped at frame {captured['frame_name']!r}, " + f"expected the workflow's `run` method" + ) + assert captured["bird"] == "chicken", ( + f"workflow local `bird` not visible in pdb frame: got {captured['bird']!r}" + ) + + +async def test_breakpoint_quit_continues_workflow_in_debug_mode(client: Client): + """Typing `q` (or hitting Ctrl-D) in a workflow pdb session should + continue the workflow rather than failing the workflow task with + BdbQuit. The hook overrides `do_quit`/`do_EOF` to call `do_continue` + instead, so a debug session ends cleanly. + + Drives pdb via `cmdqueue` so no real stdin is needed. The first + iteration of cmdloop sees `q`, which dispatches to our overridden + `do_quit` -> `do_continue`. The workflow then completes normally. + """ + captured: dict[str, object] = {} + + class _AutoQuitPdb(pdb.Pdb): + """Pdb subclass that pre-queues `q` and captures frame state on + entry to `interaction`.""" + + def __init__(self, *args: Any, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) + self.cmdqueue = ["q"] + + def interaction( # type: ignore[override] + self, frame: FrameType | None, traceback: Any + ) -> Any: + if frame is not None: + captured["frame_name"] = frame.f_code.co_name + captured["bird"] = frame.f_locals.get("bird") + return super().interaction(frame, traceback) + + task_queue = f"tq-{uuid.uuid4()}" + with patch("pdb.Pdb", _AutoQuitPdb): + async with Worker( + client, + task_queue=task_queue, + workflows=[SandboxedBreakpointWorkflow], + debug_mode=True, + ): + result = await client.execute_workflow( + SandboxedBreakpointWorkflow.run, + id=f"wf-{uuid.uuid4()}", + task_queue=task_queue, + ) + + assert result == "bird was chicken", ( + f"workflow did not complete after `q`; `BdbQuit` likely propagated: " + f"result={result!r}" + ) + assert captured.get("frame_name") == "run", ( + f"pdb didn't stop in workflow.run frame: got {captured.get('frame_name')!r}" + ) + assert captured.get("bird") == "chicken", ( + f"workflow local `bird` not visible at pdb breakpoint: " + f"got {captured.get('bird')!r}" + ) + + +def test_breakpoint_hook_raises_on_workflow_thread(client: Client): + """The defensive hook fails loudly when breakpoint() is called from a + `temporal_workflow_*` thread without debug mode.""" + # Constructing a Worker installs the hook. We don't need to run anything. + Worker( + client, + workflows=[ThreadCaptureWorkflow], + task_queue=f"tq-{uuid.uuid4()}", + ) + assert sys.breakpointhook is _temporal_workflow_breakpoint_hook + + captured: list[BaseException] = [] + + def call_breakpoint_on_worker_thread() -> None: + try: + sys.breakpointhook() + except BaseException as e: + captured.append(e) + + t = threading.Thread( + target=call_breakpoint_on_worker_thread, + name="temporal_workflow__test", + ) + t.start() + t.join() + + assert len(captured) == 1 + assert isinstance(captured[0], RuntimeError) + assert "debug_mode=True" in str(captured[0]) From dbc6023487cfbc8cd1c162da65552e29e41ef9b3 Mon Sep 17 00:00:00 2001 From: Brian Strauch Date: Mon, 8 Jun 2026 09:40:31 -0700 Subject: [PATCH 2/2] Build streamed OpenAI events before serializing them (#1586) * Fall back to model_dump_json for OpenAI payload serialization OpenAI response and stream event types whose pydantic serializer is a lazily-built MockValSer cannot be serialized by the generic any-schema serializer, raising PydanticSerializationError (e.g. when streaming via WorkflowStreamClient). The model's own model_dump_json() handles them. Fixes #1585 * Dispatch pydantic models to their own serializer OpenAI's BaseModel sets defer_build=True, so a model's serializer is a MockValSer placeholder until pydantic's lazy build runs. The generic any-schema serializer reaches for that placeholder directly without triggering the build and raises PydanticSerializationError. Route pydantic models through their own model_dump_json (which triggers the build) by type instead of catching the error; non-model values continue through the generic serializer unchanged. * Build streamed events at the source instead of in the converter Force the deferred pydantic build on each streamed event before it is published or returned, so it serializes regardless of build state. This also covers the activity's list return value, which the payload converter serializes generically and cannot build on its own. Drop the now-redundant to_payload override. --- temporalio/contrib/openai_agents/_invoke_model_activity.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/temporalio/contrib/openai_agents/_invoke_model_activity.py b/temporalio/contrib/openai_agents/_invoke_model_activity.py index a43f9aeaf..3f7a639dd 100644 --- a/temporalio/contrib/openai_agents/_invoke_model_activity.py +++ b/temporalio/contrib/openai_agents/_invoke_model_activity.py @@ -393,6 +393,9 @@ async def invoke_model_activity_streaming( conversation_id=input.get("conversation_id"), prompt=input.get("prompt"), ): + # OpenAI models set defer_build=True, so an event's pydantic + # schema may still be an unbuilt placeholder. + type(event).model_rebuild() events.append(event) events_topic.publish(event) except APIStatusError as e: