Skip to content

Commit 723283f

Browse files
surfaiclaude
andcommitted
fix(runners): dispatch run_after_run_callback in _run_node_async (#5282)
Runner._run_node_async had a TODO at runners.py:427 for node runtime plugin lifecycle, and never dispatched plugin_manager.run_after_run_callback. Because both Workflow(BaseNode) and (post a3 refactor) LlmAgent roots funnel through _run_node_async, any BasePlugin subclass that overrides after_run_callback silently no-op'd — breaking the canonical pattern for memory persistence, metrics emission, and post-run audit hooks. This wires one dispatch call after _consume_event_queue drains, mirroring the legacy path in _exec_with_plugin at runners.py:1230 exactly: outside the finally block, so semantics match legacy (fires on natural drain only, skipped on error in the loop, skipped on caller-break via GeneratorExit). Also: - Narrows the TODO at runners.py:427 from "tracing and plugin lifecycle" to "tracing" since plugin lifecycle is now wired. - Incidental pyink reformat at runners.py:1451: pre-existing 82-char line on v2 HEAD, unrelated to #5282 but required by the pyink CI gate. Companion to test-only PR #5301 (which established the regression anchor). This PR ships the fix plus flips the anchor by replacing the WorkaroundRunner scaffolding with a direct positive assertion. Alternatives considered and rejected: - Wrap in finally to fire on error/break — changes legacy contract. - Lift dispatch to run_async call sites — requires plumbing ic out of _run_node_async, bigger blast radius. - Extract a shared _with_plugin_lifecycle context manager — right long-term shape, but refactors the legacy path too and expands scope. Fixes #5282 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 0b28fb8 commit 723283f

File tree

2 files changed

+168
-2
lines changed

2 files changed

+168
-2
lines changed

src/google/adk/runners.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -424,7 +424,7 @@ async def _run_node_async(
424424
425425
Events flow through ic.event_queue via NodeRunner.
426426
427-
TODO: Add tracing and plugin lifecycle for the node runtime path.
427+
TODO: Add tracing for the node runtime path.
428428
"""
429429
from .workflow._node_runner_class import NodeRunner
430430

@@ -505,6 +505,9 @@ async def _drive_root_node():
505505
try:
506506
async for event in self._consume_event_queue(ic, done_sentinel):
507507
yield event
508+
# 5. Run the after_run callbacks. Mirrors _exec_with_plugin:1230.
509+
# This does NOT emit any event.
510+
await ic.plugin_manager.run_after_run_callback(invocation_context=ic)
508511
finally:
509512
await self._cleanup_root_task(task, self.agent.name)
510513

@@ -1448,7 +1451,9 @@ def _find_agent_to_run(
14481451
# type of the agent. e.g. a remote a2a agent may surface a credential
14491452
# request as a special long-running function tool call.
14501453
event = find_matching_function_call(session.events)
1451-
is_resumable = self.resumability_config and self.resumability_config.is_resumable
1454+
is_resumable = (
1455+
self.resumability_config and self.resumability_config.is_resumable
1456+
)
14521457
# Only route based on a past function response if resumability is enabled.
14531458
# In non-resumable scenarios, a turn ending with function call response
14541459
# shouldn't trap the next turn on that same agent if it's not transferable.
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
# Copyright 2026 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Regression test for google/adk-python#5282.
16+
17+
Before the fix, Runner._run_node_async never dispatched
18+
plugin_manager.run_after_run_callback on the BaseNode path (runners.py:427
19+
TODO), so Plugin.after_run_callback silently no-op'd for Workflow roots.
20+
21+
These tests pin the post-fix behavior: both the pre-run/event hooks AND
22+
after_run_callback must fire on a Workflow root.
23+
"""
24+
25+
from __future__ import annotations
26+
27+
from dataclasses import dataclass
28+
from typing import Optional
29+
30+
from google.adk import Event
31+
from google.adk import Workflow
32+
from google.adk.agents.invocation_context import InvocationContext
33+
from google.adk.apps.app import App
34+
from google.adk.memory.in_memory_memory_service import InMemoryMemoryService
35+
from google.adk.plugins.base_plugin import BasePlugin
36+
from google.adk.runners import Runner
37+
from google.adk.sessions.in_memory_session_service import InMemorySessionService
38+
from google.genai import types
39+
import pytest
40+
41+
42+
@dataclass
43+
class CallbackCounts:
44+
on_user_message_callback: int = 0
45+
before_run_callback: int = 0
46+
on_event_callback: int = 0
47+
after_run_callback: int = 0
48+
49+
50+
class TracerPlugin(BasePlugin):
51+
"""Counts every Plugin lifecycle callback the Runner actually dispatches."""
52+
53+
def __init__(self) -> None:
54+
super().__init__(name='tracer')
55+
self.counts = CallbackCounts()
56+
57+
async def on_user_message_callback(
58+
self,
59+
*,
60+
invocation_context: InvocationContext,
61+
user_message: types.Content,
62+
) -> Optional[types.Content]:
63+
del invocation_context, user_message
64+
self.counts.on_user_message_callback += 1
65+
return None
66+
67+
async def before_run_callback(
68+
self, *, invocation_context: InvocationContext
69+
) -> Optional[types.Content]:
70+
del invocation_context
71+
self.counts.before_run_callback += 1
72+
return None
73+
74+
async def on_event_callback(
75+
self, *, invocation_context: InvocationContext, event: Event
76+
) -> Optional[Event]:
77+
del invocation_context, event
78+
self.counts.on_event_callback += 1
79+
return None
80+
81+
async def after_run_callback(
82+
self, *, invocation_context: InvocationContext
83+
) -> None:
84+
del invocation_context
85+
self.counts.after_run_callback += 1
86+
return None
87+
88+
89+
async def _terminal_node(ctx) -> Event:
90+
"""A single terminal node that yields a content-bearing Event.
91+
92+
Using content (not just state) ensures _consume_event_queue actually runs
93+
the on_event_callback path -- state-only events still flow through the
94+
queue, but content is the canonical case the plugin hook was designed for.
95+
"""
96+
del ctx
97+
return Event(
98+
content=types.Content(
99+
parts=[types.Part(text='done')],
100+
role='model',
101+
)
102+
)
103+
104+
105+
def _build_runner(plugin: TracerPlugin) -> Runner:
106+
workflow = Workflow(
107+
name='Issue5282Repro',
108+
edges=[('START', _terminal_node)],
109+
)
110+
app = App(name='issue_5282_repro', root_agent=workflow, plugins=[plugin])
111+
return Runner(
112+
app_name='issue_5282_repro',
113+
app=app,
114+
session_service=InMemorySessionService(),
115+
memory_service=InMemoryMemoryService(),
116+
)
117+
118+
119+
async def _drive_one_invocation(runner: Runner) -> None:
120+
session = await runner.session_service.create_session(
121+
app_name='issue_5282_repro', user_id='u1'
122+
)
123+
async for _ in runner.run_async(
124+
user_id='u1',
125+
session_id=session.id,
126+
new_message=types.Content(parts=[types.Part(text='hi')], role='user'),
127+
):
128+
pass
129+
130+
131+
@pytest.mark.asyncio
132+
async def test_workflow_root_dispatches_pre_run_and_event_hooks():
133+
"""Baseline: pre-run and per-event hooks fire on a Workflow root."""
134+
plugin = TracerPlugin()
135+
runner = _build_runner(plugin)
136+
137+
await _drive_one_invocation(runner)
138+
139+
assert plugin.counts.on_user_message_callback == 1
140+
assert plugin.counts.before_run_callback == 1
141+
assert plugin.counts.on_event_callback >= 1, (
142+
'on_event_callback should fire at least once via _consume_event_queue '
143+
'for the content-bearing terminal event'
144+
)
145+
146+
147+
@pytest.mark.asyncio
148+
async def test_workflow_root_dispatches_after_run_callback():
149+
"""Regression anchor for #5282: after_run_callback fires on Workflow roots.
150+
151+
Pre-fix, _run_node_async never dispatched run_after_run_callback on the
152+
BaseNode path, so this would have asserted 0 and been xfail-anchored.
153+
Post-fix, the dispatch mirrors _exec_with_plugin's legacy path and fires
154+
exactly once per successful invocation.
155+
"""
156+
plugin = TracerPlugin()
157+
runner = _build_runner(plugin)
158+
159+
await _drive_one_invocation(runner)
160+
161+
assert plugin.counts.after_run_callback == 1

0 commit comments

Comments
 (0)