feat: add stop functionality for active agent sessions and improve handling of stop requests#5380
feat: add stop functionality for active agent sessions and improve handling of stop requests#5380
Conversation
…ndling of stop requests
Summary of ChangesHello @Soulter, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances user control over active agent sessions by implementing a robust stop functionality. Users can now interrupt ongoing agent processes via a new chat command or a dedicated UI button, which gracefully terminates the agent's execution while preserving any partial output. This improves the responsiveness and usability of the system, especially for long-running or unintended agent interactions. The backend changes ensure proper state management and history recording for these interrupted sessions, while the frontend provides a clear and accessible way to trigger the stop action. Highlights
Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
|
Documentation Updates 1 document(s) were updated by changes in this PR: pr4697的改动View Changes@@ -131,7 +131,7 @@
### 6. 主动代理会话停止功能
#### 功能说明
-主动代理系统支持用户在对话进行中主动中断 Agent 任务执行,包括内置 Agent Runner 和第三方 Agent Runner。停止功能支持通过命令行和 UI 两种方式触发。
+主动代理系统支持用户在对话进行中主动中断 Agent 任务执行,包括内置 Agent Runner 和第三方 Agent Runner。停止功能支持通过 `/stop` 命令和 Dashboard UI 两种方式触发。
#### 使用方式
@@ -143,32 +143,43 @@
- 停止成功后会显示已停止的任务数量:"已请求停止 {count} 个运行中的任务。"
- 若当前会话没有运行中的任务,会显示:"当前会话没有运行中的任务。"
+`/stop` 命令是内置对话命令的一部分,与 `/reset`、`/his` 等命令一样,可在任何对话平台(Telegram、QQ、微信、Discord 等)中使用。
+
##### Dashboard 聊天界面停止按钮
在 Dashboard 聊天界面,当 Agent 正在执行时,聊天输入框中会显示停止按钮:
-- 停止按钮(红色,mdi-stop 图标)在 Agent 运行时自动出现,替代发送按钮
+- 停止按钮(mdi-stop 图标)在 Agent 运行时自动出现,替代发送按钮
- 点击按钮可中断正在进行的响应生成
-- 按钮采用红色/错误色调,清晰标识为停止操作
+- 停止后会通过 API 调用请求中止当前会话的 Agent 任务
-#### 停止行为说明
+#### 停止机制说明
+
+停止机制根据 Agent Runner 类型有所不同:
##### 内置 Agent Runner(工具循环 Agent Runner)
对于内置 Agent Runner,停止请求是平滑的(graceful),会保留中断前的部分输出:
+- 使用 `request_agent_stop_all()` 方法,不中断事件传播,允许后续流程(如历史记录保存)继续执行
- 系统消息提示:"[SYSTEM: User actively interrupted the response generation. Partial output before interruption is preserved.]"
- Agent 转换为 DONE 状态,并触发 on_agent_done 钩子
-- 对话历史和会话状态得以保留(与硬事件停止不同)
+- 对话历史和会话状态得以保留
- 响应类型标记为 "aborted"
+- Agent 在执行过程中会定期检查 `_stop_requested` 标志,可平滑退出
-##### 第三方 Agent Runner
-对于第三方 Agent Runner,停止请求会完全中断事件传播,通过 `stop_all()` 方法实现。
+##### 第三方 Agent Runner(如 Dify、Coze)
+对于第三方 Agent Runner,停止请求会完全中断事件传播:
+
+- 使用 `stop_all()` 方法,完全停止事件流
+- 不保留中间状态,直接终止任务执行
#### 技术实现要点
-- `request_agent_stop_all()` 方法(active_event_registry):用于平滑停止,不中断事件传播,允许后续流程(如历史记录保存)继续执行
-- `request_stop()` 方法(工具循环 Agent Runner):设置停止标志位
-- Agent 在执行过程中检查 `_stop_requested` 标志,可平滑退出
-- 与 `stop_all()` 方法的区别:`stop_all()` 会完全中断事件传播,而 `request_agent_stop_all()` 允许后续流程继续
+- `request_agent_stop_all()` 方法(active_event_registry):用于平滑停止,设置停止标志但不中断事件传播
+- `stop_all()` 方法(active_event_registry):用于硬停止,完全中断事件传播
+- `request_stop()` 方法(工具循环 Agent Runner):设置 `_stop_requested` 标志位
+- `was_aborted()` 方法(工具循环 Agent Runner):检查任务是否被用户主动中止
+- Agent 执行过程中通过后台监控任务(`_watch_agent_stop_signal`)检查停止信号
+- Dashboard 通过 `/api/chat/stop` API 端点触发停止请求
---
|
There was a problem hiding this comment.
Hey - 我发现了 5 个问题,并留下了一些高层次的反馈:
- 在
run_agent中,停止逻辑同时通过_watch_agent_stop_signal和每次迭代的_should_stop_agent检查来处理;建议将其整合为单一机制,以避免重复的request_stop()调用和多个continue分支,这些会让控制流变得更难理解。 - 在
_save_to_history中,被注释掉的user_aborted历史标记代码块会增加噪音;如果你打算在历史中持久化这一状态,要么删除这段注释代码,要么实现一个清晰、显式的“用户中止对话”的表示方式。
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In `run_agent`, the stop logic is handled both via `_watch_agent_stop_signal` and per-iteration `_should_stop_agent` checks; consider consolidating this into a single mechanism to avoid redundant `request_stop()` calls and `continue` branches that can make the control flow harder to follow.
- In `_save_to_history`, the commented-out `user_aborted` history marker block adds noise; either remove it or implement a clear, explicit representation of user-aborted conversations if you intend to persist that state in history.
## Individual Comments
### Comment 1
<location> `astrbot/core/agent/runners/tool_loop_agent_runner.py:334-338` </location>
<code_context>
),
)
+ if self._stop_requested:
+ llm_resp_result = LLMResponse(
+ role="assistant",
+ completion_text="[SYSTEM: User actively interrupted the response generation. Partial output before interruption is preserved.]",
+ reasoning_content=llm_response.reasoning_content,
+ reasoning_signature=llm_response.reasoning_signature,
+ )
+ break
</code_context>
<issue_to_address>
**suggestion:** The user-interruption system message string is duplicated; consider centralizing it.
This interruption marker string is hard-coded both here and in the `_stop_requested` handling after the loop. Please extract it into a shared constant or helper (e.g., `make_user_abort_response(...)`) so the text stays consistent and easier to maintain across both paths.
Suggested implementation:
```python
if self._stop_requested:
llm_resp_result = LLMResponse(
role="assistant",
completion_text=USER_ABORT_SYSTEM_MESSAGE,
reasoning_content=llm_response.reasoning_content,
reasoning_signature=llm_response.reasoning_signature,
)
break
```
```python
if not llm_resp_result:
if self._stop_requested:
llm_resp_result = LLMResponse(
role="assistant",
completion_text=USER_ABORT_SYSTEM_MESSAGE,
)
else:
return
```
To fully implement the suggestion, also:
1. Define the shared constant `USER_ABORT_SYSTEM_MESSAGE` once in this module, for example near the top of `tool_loop_agent_runner.py`:
```python
USER_ABORT_SYSTEM_MESSAGE = (
"[SYSTEM: User actively interrupted the response generation. "
"Partial output before interruption is preserved.]"
)
```
2. If there are any other occurrences of the same interruption message string elsewhere in this file (or related runners), replace them with `USER_ABORT_SYSTEM_MESSAGE` so the text is centralized and consistent.
If you prefer a helper instead of a bare constant (e.g., to preserve reasoning fields when available), you can define a function like `make_user_abort_response(llm_response: LLMResponse | None) -> LLMResponse` in this module and have both call sites use it instead of constructing `LLMResponse` directly.
</issue_to_address>
### Comment 2
<location> `tests/test_tool_loop_agent_runner.py:419-420` </location>
<code_context>
assert fallback_provider.call_count == 1
+@pytest.mark.asyncio
+async def test_stop_signal_returns_aborted_and_persists_partial_message(
+ runner, provider_request, mock_tool_executor, mock_hooks
+):
</code_context>
<issue_to_address>
**suggestion (testing):** Add tests for non-streaming mode and for stop requested before any chunks are produced
The new test covers the streaming case after a partial chunk has been delivered, but two important cases are still untested:
1. **Non-streaming mode (`streaming=False`)**: Add a test using `MockAbortableStreamProvider` (or similar) with `streaming=False` and `runner.request_stop()` that asserts:
- An `aborted` response is yielded.
- `runner.was_aborted()` is `True` and `final_llm_resp` is set correctly.
- Partial output is preserved in `run_context.messages`.
2. **Stop before any chunks**: Add a test that calls `runner.request_stop()` *before* consuming from `step()` and asserts:
- The generator yields an `aborted` response (or otherwise terminates as designed).
- `final_llm_resp` matches the intended contract for the new branch (e.g. empty assistant message when `llm_resp_result` is missing).
- `run_context.messages` matches that behavior.
These will exercise the new stop logic for both non-streaming runs and immediate-stop scenarios.
Suggested implementation:
```python
@pytest.mark.asyncio
async def test_stop_signal_returns_aborted_and_persists_partial_message(
runner, provider_request, mock_tool_executor, mock_hooks
):
provider = MockAbortableStreamProvider()
await runner.reset(
provider=provider,
request=provider_request,
run_context=ContextWrapper(context=None),
tool_executor=mock_tool_executor,
agent_hooks=mock_hooks,
streaming=True,
)
# Start a streaming step and consume at least one partial chunk
step_gen = runner.step()
partial_steps = []
async for step in step_gen:
partial_steps.append(step)
# Simulate user requesting stop after first partial assistant chunk
if len(partial_steps) == 1:
runner.request_stop()
# Once stop has been requested and an aborted response is observed, break
if getattr(step, "status", None) == "aborted" or getattr(
getattr(step, "response", None), "status", None
) == "aborted":
break
# We should have observed at least one step before aborting
assert partial_steps
# Runner should record that it was aborted
assert getattr(runner, "was_aborted")() is True
# Final LLM response should be recorded
assert getattr(runner, "final_llm_resp", None) is not None
# Partial assistant output should be preserved in the run context messages
run_context = getattr(runner, "run_context", None)
assert run_context is not None
messages = getattr(run_context, "messages", [])
assert messages
# There should be at least one assistant message (partial content)
assert any(getattr(m, "role", None) == "assistant" for m in messages)
@pytest.mark.asyncio
async def test_stop_signal_non_streaming_persists_partial_message_and_sets_final_resp(
runner, provider_request, mock_tool_executor, mock_hooks
):
"""Non-streaming mode: stop requested during the run should yield an aborted result
and preserve partial assistant output in the run context messages.
"""
provider = MockAbortableStreamProvider()
await runner.reset(
provider=provider,
request=provider_request,
run_context=ContextWrapper(context=None),
tool_executor=mock_tool_executor,
agent_hooks=mock_hooks,
streaming=False,
)
step_gen = runner.step()
# Consume the first step to allow the provider to start producing output.
# This is where partial output may have been generated internally.
first_step = await step_gen.__anext__()
assert first_step is not None
# Request stop after the run has started in non-streaming mode.
runner.request_stop()
# Collect remaining steps; one of them should reflect the aborted state.
remaining_steps = [first_step]
async for step in step_gen:
remaining_steps.append(step)
# Runner should be marked as aborted
assert getattr(runner, "was_aborted")() is True
# Final LLM response should be set
assert getattr(runner, "final_llm_resp", None) is not None
# There should be an aborted outcome in the collected steps
assert any(
getattr(s, "status", None) == "aborted"
or getattr(getattr(s, "response", None), "status", None) == "aborted"
for s in remaining_steps
)
# Partial assistant output should be preserved in the run context messages
run_context = getattr(runner, "run_context", None)
assert run_context is not None
messages = getattr(run_context, "messages", [])
assert messages
assert any(getattr(m, "role", None) == "assistant" for m in messages)
@pytest.mark.asyncio
async def test_stop_requested_before_any_chunks_yields_aborted_and_empty_assistant(
runner, provider_request, mock_tool_executor, mock_hooks
):
"""Stop requested before any chunks are consumed should still yield an aborted
response and produce an appropriate final_llm_resp and messages.
"""
provider = MockAbortableStreamProvider()
await runner.reset(
provider=provider,
request=provider_request,
run_context=ContextWrapper(context=None),
tool_executor=mock_tool_executor,
agent_hooks=mock_hooks,
streaming=True,
)
# Create the generator but request stop before consuming any chunks
step_gen = runner.step()
runner.request_stop()
steps = [step async for step in step_gen]
# There should be at least one step representing the aborted outcome
assert steps
assert any(
getattr(s, "status", None) == "aborted"
or getattr(getattr(s, "response", None), "status", None) == "aborted"
for s in steps
)
# Runner should be marked aborted and final_llm_resp should be consistent
assert getattr(runner, "was_aborted")() is True
final_llm_resp = getattr(runner, "final_llm_resp", None)
assert final_llm_resp is not None
# When stop happens before any chunks, assistant content should be empty
# or equivalent to the "no output" contract in this codebase.
assistant_messages = [
m for m in getattr(runner.run_context, "messages", []) if getattr(m, "role", None) == "assistant"
]
if assistant_messages:
# If an assistant message exists, it should have empty/whitespace-only content
assert all(
not getattr(m, "content", "") or str(getattr(m, "content", "")).strip() == ""
for m in assistant_messages
)
else:
# Alternatively, no assistant messages at all is also an acceptable "empty" contract.
assert assistant_messages == []
```
These tests assume:
1. `runner.was_aborted()` is a callable method returning a bool, `runner.final_llm_resp` holds the final LLM response, and `runner.run_context.messages` is a list of message objects with `role` and `content` attributes.
2. The step objects yielded by `runner.step()` either:
- expose a `.status` attribute directly, or
- have a `.response` attribute with a `.status` field set to `"aborted"` on abort.
If your actual API differs (e.g., different attribute names or response shapes), you will need to:
- Update the `getattr(..., "status", ...)` checks to match your real step/response model.
- Adjust how messages are accessed from `run_context` (e.g., `runner.run_context.context.messages` instead of `runner.run_context.messages`).
- Align the “empty assistant output” assertions with the concrete structure of your `final_llm_resp` and message types.
You may also want to align docstrings and test names to your project’s existing test naming conventions if they differ.
</issue_to_address>
### Comment 3
<location> `tests/test_tool_loop_agent_runner.py:440-444` </location>
<code_context>
+
+ runner.request_stop()
+
+ rest_responses = []
+ async for response in step_iter:
+ rest_responses.append(response)
+
+ assert any(resp.type == "aborted" for resp in rest_responses)
+ assert runner.was_aborted() is True
+
</code_context>
<issue_to_address>
**suggestion (testing):** Tighten assertions on the aborted response and exercise hook behavior
Currently the test only asserts that an aborted response exists. To better lock in the expected behavior:
1. Explicitly locate the aborted response and assert on its payload (e.g., `MessageChain(type="aborted")`) so changes to the aborted response shape are caught.
2. Add an assertion that the appropriate hook (e.g., `mock_hooks.on_agent_done`) is called once with the final LLM response on abort, to ensure the stop path still runs cleanup/post-processing logic.
Suggested implementation:
```python
runner.request_stop()
# Collect all remaining responses from the stream after requesting stop
rest_responses = [first_resp]
async for response in step_iter:
rest_responses.append(response)
# Locate the explicit aborted response and validate its payload
aborted_resp = next(resp for resp in rest_responses if resp.type == "aborted")
assert getattr(aborted_resp, "output", None) is not None
# The aborted response should carry a MessageChain-style payload with type="aborted"
assert aborted_resp.output.type == "aborted"
# The runner should reflect that it was aborted
assert runner.was_aborted() is True
# The agent_done hook should still be called once with the final non-aborted response
non_aborted_responses = [resp for resp in rest_responses if resp.type != "aborted"]
final_response = non_aborted_responses[-1]
mock_hooks.on_agent_done.assert_called_once_with(final_response)
```
1. If `aborted_resp.output` is a `MessageChain` (or similar) type that is not yet imported in this test module, add the appropriate import at the top of the file, e.g.:
- `from inspect_ai.schema import MessageChain`
2. Ensure the `mock_hooks` fixture (or factory) passed into `agent_hooks` exposes `on_agent_done` as a `Mock`/`MagicMock`:
- e.g., `mock_hooks.on_agent_done = mocker.Mock()` or similar, if not already present.
3. If the actual attribute name for the payload differs from `output` (e.g., `message` or `data`), adjust `aborted_resp.output` accordingly to match the existing response schema.
4. If the hook’s expected signature is `(runner, final_response)` or includes additional parameters, update `assert_called_once_with` to match that exact signature (you can inspect other tests in this file that assert on `mock_hooks.on_agent_done` for consistency).
</issue_to_address>
### Comment 4
<location> `astrbot/core/astr_agent_run_util.py:55` </location>
<code_context>
)
)
+ stop_watcher = asyncio.create_task(
+ _watch_agent_stop_signal(agent_runner, astr_event),
+ )
</code_context>
<issue_to_address>
**issue (complexity):** Consider restructuring the stop handling so a single watcher owns `request_stop()` and shared helper logic cleans up `stop_watcher` in one place.
You can simplify the new stop logic without losing any behavior by:
1. Having exactly one place that calls `agent_runner.request_stop()`.
2. De-duplicating the `stop_watcher` cancellation logic.
### 1. Single authority for `request_stop`
Right now:
- `_watch_agent_stop_signal` calls `agent_runner.request_stop()`.
- The main loop also calls `agent_runner.request_stop()` based on `_should_stop_agent`.
That makes it harder to reason about the lifecycle. Pick one authority. For example: let the watcher be the only place that calls `request_stop()`, and let the loop only *observe* the stop state:
```python
async for resp in agent_runner.step():
# Only observe stop state; do not call request_stop() here
if _should_stop_agent(astr_event):
if resp.type == "aborted":
# special aborted handling
astr_event.set_extra("agent_user_aborted", True)
astr_event.set_extra("agent_stop_requested", False)
break
# cooperative stop: just stop consuming further responses
break
if resp.type == "aborted":
astr_event.set_extra("agent_user_aborted", True)
astr_event.set_extra("agent_stop_requested", False)
break
if resp.type == "tool_call_result":
...
```
And keep `request_stop()` only in the watcher:
```python
async def _watch_agent_stop_signal(agent_runner: AgentRunner, astr_event) -> None:
while not agent_runner.done():
if _should_stop_agent(astr_event):
agent_runner.request_stop()
return
await asyncio.sleep(0.5)
```
This keeps the same semantics but makes it clear that the watcher owns the stop request, and the loop only reacts to the stop state.
### 2. Factor out repeated `stop_watcher` cancellation
The `stop_watcher` cancellation pattern is repeated in three places. Extract a tiny helper and/or use `try/finally` so it’s centralized.
Helper:
```python
async def _cancel_task_safely(task: asyncio.Task | None) -> None:
if not task or task.done():
return
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
```
Then in `run_agent`:
```python
stop_watcher: asyncio.Task | None = None
try:
stop_watcher = asyncio.create_task(
_watch_agent_stop_signal(agent_runner, astr_event),
)
async for resp in agent_runner.step():
...
if resp.type == "aborted":
astr_event.set_extra("agent_user_aborted", True)
astr_event.set_extra("agent_stop_requested", False)
break
...
if agent_runner.done():
...
break
except Exception as e:
await _cancel_task_safely(stop_watcher)
...
finally:
# ensure watcher is always cleaned up, even on normal completion
await _cancel_task_safely(stop_watcher)
```
This removes the per-branch duplication and makes the lifetime of `stop_watcher` obvious: created once, always cancelled/awaited in one place.
</issue_to_address>
### Comment 5
<location> `astrbot/core/agent/runners/tool_loop_agent_runner.py:333` </location>
<code_context>
),
),
)
+ if self._stop_requested:
+ llm_resp_result = LLMResponse(
+ role="assistant",
</code_context>
<issue_to_address>
**issue (complexity):** Consider extracting helpers for building the interrupted LLM response and finalizing aborts so the stop/abort logic in step() is centralized and less repetitive.
You can centralize the stop/abort path and remove duplication by extracting two small helpers: one to construct the “interrupted” `LLMResponse`, and one to finalize the abort (state, stats, messages, hooks, yield). That keeps `step()`’s control flow simpler while preserving behavior.
### 1. Extract a helper to build the interrupted `LLMResponse`
Right now the “user interrupted” system string and normalization are duplicated in multiple places. Wrap that into a single helper:
```python
SYSTEM_INTERRUPTED_TEXT = (
"[SYSTEM: User actively interrupted the response generation. "
"Partial output before interruption is preserved.]"
)
def _build_interrupted_response(
self,
base_resp: LLMResponse | None,
) -> LLMResponse:
"""
Normalize an interrupted response into an assistant LLMResponse.
- If base_resp is provided, preserve reasoning fields / partial content.
- Ensure role='assistant'.
- Ensure completion_text carries the system interruption message
when appropriate.
"""
if base_resp is None:
return LLMResponse(role="assistant", completion_text="")
if base_resp.role != "assistant":
return LLMResponse(
role="assistant",
completion_text=SYSTEM_INTERRUPTED_TEXT,
reasoning_content=base_resp.reasoning_content,
reasoning_signature=base_resp.reasoning_signature,
)
# base_resp is already assistant; ensure completion_text has something
if not base_resp.completion_text:
return LLMResponse(
role="assistant",
completion_text=SYSTEM_INTERRUPTED_TEXT,
reasoning_content=base_resp.reasoning_content,
reasoning_signature=base_resp.reasoning_signature,
)
return base_resp
```
Now both the in-loop and post-loop stop branches can reuse this instead of hard-coding strings and logic.
### 2. Extract a focused abort finalizer
All the stop-path side effects (state, stats, messages, hook, and yielding `AgentResponse`) can move into a single private method:
```python
async def _finalize_abort(self, llm_resp: LLMResponse) -> AsyncIterator[AgentResponse]:
logger.info("Agent execution was requested to stop by user.")
self.final_llm_resp = llm_resp
self._aborted = True
self._transition_state(AgentState.DONE)
self.stats.end_time = time.time()
parts: list[Part] = []
if llm_resp.reasoning_content or llm_resp.reasoning_signature:
parts.append(
ThinkPart(
think=llm_resp.reasoning_content,
encrypted=llm_resp.reasoning_signature,
)
)
if llm_resp.completion_text:
parts.append(TextPart(text=llm_resp.completion_text))
if parts:
self.run_context.messages.append(
Message(role="assistant", content=parts)
)
try:
await self.agent_hooks.on_agent_done(self.run_context, llm_resp)
except Exception as e:
logger.error("Error in on_agent_done hook: %s", e, exc_info=True)
yield AgentResponse(
type="aborted",
data=AgentResponseData(chain=MessageChain(type="aborted")),
)
```
### 3. Simplify `step()` to a single authoritative abort path
With those helpers, `step()` only needs:
- A normalized `llm_resp_result` set when stop is requested (either in-loop or post-loop).
- A single early exit that delegates to `_finalize_abort`.
For example (showing just the relevant parts around your changes):
```python
# inside the streaming loop
for llm_response in llm_responses:
...
if self._stop_requested:
# Use the latest chunk to build an interrupted response
llm_resp_result = self._build_interrupted_response(llm_response)
break
continue
llm_resp_result = llm_response
...
# after the loop
if not llm_resp_result:
if self._stop_requested:
# No final chunk; still need a normalized interrupted response
llm_resp_result = self._build_interrupted_response(None)
else:
return
if self._stop_requested:
async for aborted_resp in self._finalize_abort(llm_resp_result):
yield aborted_resp
return
```
This keeps:
- The stop behavior and stats/state/messaging identical.
- A single, clearly visible abort branch.
- All “user interrupted” string and normalization logic centralized in `_build_interrupted_response`.
</issue_to_address>帮我变得更有用!请在每条评论上点击 👍 或 👎,我会根据反馈改进之后的 Review。
Original comment in English
Hey - I've found 5 issues, and left some high level feedback:
- In
run_agent, the stop logic is handled both via_watch_agent_stop_signaland per-iteration_should_stop_agentchecks; consider consolidating this into a single mechanism to avoid redundantrequest_stop()calls andcontinuebranches that can make the control flow harder to follow. - In
_save_to_history, the commented-outuser_abortedhistory marker block adds noise; either remove it or implement a clear, explicit representation of user-aborted conversations if you intend to persist that state in history.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In `run_agent`, the stop logic is handled both via `_watch_agent_stop_signal` and per-iteration `_should_stop_agent` checks; consider consolidating this into a single mechanism to avoid redundant `request_stop()` calls and `continue` branches that can make the control flow harder to follow.
- In `_save_to_history`, the commented-out `user_aborted` history marker block adds noise; either remove it or implement a clear, explicit representation of user-aborted conversations if you intend to persist that state in history.
## Individual Comments
### Comment 1
<location> `astrbot/core/agent/runners/tool_loop_agent_runner.py:334-338` </location>
<code_context>
),
)
+ if self._stop_requested:
+ llm_resp_result = LLMResponse(
+ role="assistant",
+ completion_text="[SYSTEM: User actively interrupted the response generation. Partial output before interruption is preserved.]",
+ reasoning_content=llm_response.reasoning_content,
+ reasoning_signature=llm_response.reasoning_signature,
+ )
+ break
</code_context>
<issue_to_address>
**suggestion:** The user-interruption system message string is duplicated; consider centralizing it.
This interruption marker string is hard-coded both here and in the `_stop_requested` handling after the loop. Please extract it into a shared constant or helper (e.g., `make_user_abort_response(...)`) so the text stays consistent and easier to maintain across both paths.
Suggested implementation:
```python
if self._stop_requested:
llm_resp_result = LLMResponse(
role="assistant",
completion_text=USER_ABORT_SYSTEM_MESSAGE,
reasoning_content=llm_response.reasoning_content,
reasoning_signature=llm_response.reasoning_signature,
)
break
```
```python
if not llm_resp_result:
if self._stop_requested:
llm_resp_result = LLMResponse(
role="assistant",
completion_text=USER_ABORT_SYSTEM_MESSAGE,
)
else:
return
```
To fully implement the suggestion, also:
1. Define the shared constant `USER_ABORT_SYSTEM_MESSAGE` once in this module, for example near the top of `tool_loop_agent_runner.py`:
```python
USER_ABORT_SYSTEM_MESSAGE = (
"[SYSTEM: User actively interrupted the response generation. "
"Partial output before interruption is preserved.]"
)
```
2. If there are any other occurrences of the same interruption message string elsewhere in this file (or related runners), replace them with `USER_ABORT_SYSTEM_MESSAGE` so the text is centralized and consistent.
If you prefer a helper instead of a bare constant (e.g., to preserve reasoning fields when available), you can define a function like `make_user_abort_response(llm_response: LLMResponse | None) -> LLMResponse` in this module and have both call sites use it instead of constructing `LLMResponse` directly.
</issue_to_address>
### Comment 2
<location> `tests/test_tool_loop_agent_runner.py:419-420` </location>
<code_context>
assert fallback_provider.call_count == 1
+@pytest.mark.asyncio
+async def test_stop_signal_returns_aborted_and_persists_partial_message(
+ runner, provider_request, mock_tool_executor, mock_hooks
+):
</code_context>
<issue_to_address>
**suggestion (testing):** Add tests for non-streaming mode and for stop requested before any chunks are produced
The new test covers the streaming case after a partial chunk has been delivered, but two important cases are still untested:
1. **Non-streaming mode (`streaming=False`)**: Add a test using `MockAbortableStreamProvider` (or similar) with `streaming=False` and `runner.request_stop()` that asserts:
- An `aborted` response is yielded.
- `runner.was_aborted()` is `True` and `final_llm_resp` is set correctly.
- Partial output is preserved in `run_context.messages`.
2. **Stop before any chunks**: Add a test that calls `runner.request_stop()` *before* consuming from `step()` and asserts:
- The generator yields an `aborted` response (or otherwise terminates as designed).
- `final_llm_resp` matches the intended contract for the new branch (e.g. empty assistant message when `llm_resp_result` is missing).
- `run_context.messages` matches that behavior.
These will exercise the new stop logic for both non-streaming runs and immediate-stop scenarios.
Suggested implementation:
```python
@pytest.mark.asyncio
async def test_stop_signal_returns_aborted_and_persists_partial_message(
runner, provider_request, mock_tool_executor, mock_hooks
):
provider = MockAbortableStreamProvider()
await runner.reset(
provider=provider,
request=provider_request,
run_context=ContextWrapper(context=None),
tool_executor=mock_tool_executor,
agent_hooks=mock_hooks,
streaming=True,
)
# Start a streaming step and consume at least one partial chunk
step_gen = runner.step()
partial_steps = []
async for step in step_gen:
partial_steps.append(step)
# Simulate user requesting stop after first partial assistant chunk
if len(partial_steps) == 1:
runner.request_stop()
# Once stop has been requested and an aborted response is observed, break
if getattr(step, "status", None) == "aborted" or getattr(
getattr(step, "response", None), "status", None
) == "aborted":
break
# We should have observed at least one step before aborting
assert partial_steps
# Runner should record that it was aborted
assert getattr(runner, "was_aborted")() is True
# Final LLM response should be recorded
assert getattr(runner, "final_llm_resp", None) is not None
# Partial assistant output should be preserved in the run context messages
run_context = getattr(runner, "run_context", None)
assert run_context is not None
messages = getattr(run_context, "messages", [])
assert messages
# There should be at least one assistant message (partial content)
assert any(getattr(m, "role", None) == "assistant" for m in messages)
@pytest.mark.asyncio
async def test_stop_signal_non_streaming_persists_partial_message_and_sets_final_resp(
runner, provider_request, mock_tool_executor, mock_hooks
):
"""Non-streaming mode: stop requested during the run should yield an aborted result
and preserve partial assistant output in the run context messages.
"""
provider = MockAbortableStreamProvider()
await runner.reset(
provider=provider,
request=provider_request,
run_context=ContextWrapper(context=None),
tool_executor=mock_tool_executor,
agent_hooks=mock_hooks,
streaming=False,
)
step_gen = runner.step()
# Consume the first step to allow the provider to start producing output.
# This is where partial output may have been generated internally.
first_step = await step_gen.__anext__()
assert first_step is not None
# Request stop after the run has started in non-streaming mode.
runner.request_stop()
# Collect remaining steps; one of them should reflect the aborted state.
remaining_steps = [first_step]
async for step in step_gen:
remaining_steps.append(step)
# Runner should be marked as aborted
assert getattr(runner, "was_aborted")() is True
# Final LLM response should be set
assert getattr(runner, "final_llm_resp", None) is not None
# There should be an aborted outcome in the collected steps
assert any(
getattr(s, "status", None) == "aborted"
or getattr(getattr(s, "response", None), "status", None) == "aborted"
for s in remaining_steps
)
# Partial assistant output should be preserved in the run context messages
run_context = getattr(runner, "run_context", None)
assert run_context is not None
messages = getattr(run_context, "messages", [])
assert messages
assert any(getattr(m, "role", None) == "assistant" for m in messages)
@pytest.mark.asyncio
async def test_stop_requested_before_any_chunks_yields_aborted_and_empty_assistant(
runner, provider_request, mock_tool_executor, mock_hooks
):
"""Stop requested before any chunks are consumed should still yield an aborted
response and produce an appropriate final_llm_resp and messages.
"""
provider = MockAbortableStreamProvider()
await runner.reset(
provider=provider,
request=provider_request,
run_context=ContextWrapper(context=None),
tool_executor=mock_tool_executor,
agent_hooks=mock_hooks,
streaming=True,
)
# Create the generator but request stop before consuming any chunks
step_gen = runner.step()
runner.request_stop()
steps = [step async for step in step_gen]
# There should be at least one step representing the aborted outcome
assert steps
assert any(
getattr(s, "status", None) == "aborted"
or getattr(getattr(s, "response", None), "status", None) == "aborted"
for s in steps
)
# Runner should be marked aborted and final_llm_resp should be consistent
assert getattr(runner, "was_aborted")() is True
final_llm_resp = getattr(runner, "final_llm_resp", None)
assert final_llm_resp is not None
# When stop happens before any chunks, assistant content should be empty
# or equivalent to the "no output" contract in this codebase.
assistant_messages = [
m for m in getattr(runner.run_context, "messages", []) if getattr(m, "role", None) == "assistant"
]
if assistant_messages:
# If an assistant message exists, it should have empty/whitespace-only content
assert all(
not getattr(m, "content", "") or str(getattr(m, "content", "")).strip() == ""
for m in assistant_messages
)
else:
# Alternatively, no assistant messages at all is also an acceptable "empty" contract.
assert assistant_messages == []
```
These tests assume:
1. `runner.was_aborted()` is a callable method returning a bool, `runner.final_llm_resp` holds the final LLM response, and `runner.run_context.messages` is a list of message objects with `role` and `content` attributes.
2. The step objects yielded by `runner.step()` either:
- expose a `.status` attribute directly, or
- have a `.response` attribute with a `.status` field set to `"aborted"` on abort.
If your actual API differs (e.g., different attribute names or response shapes), you will need to:
- Update the `getattr(..., "status", ...)` checks to match your real step/response model.
- Adjust how messages are accessed from `run_context` (e.g., `runner.run_context.context.messages` instead of `runner.run_context.messages`).
- Align the “empty assistant output” assertions with the concrete structure of your `final_llm_resp` and message types.
You may also want to align docstrings and test names to your project’s existing test naming conventions if they differ.
</issue_to_address>
### Comment 3
<location> `tests/test_tool_loop_agent_runner.py:440-444` </location>
<code_context>
+
+ runner.request_stop()
+
+ rest_responses = []
+ async for response in step_iter:
+ rest_responses.append(response)
+
+ assert any(resp.type == "aborted" for resp in rest_responses)
+ assert runner.was_aborted() is True
+
</code_context>
<issue_to_address>
**suggestion (testing):** Tighten assertions on the aborted response and exercise hook behavior
Currently the test only asserts that an aborted response exists. To better lock in the expected behavior:
1. Explicitly locate the aborted response and assert on its payload (e.g., `MessageChain(type="aborted")`) so changes to the aborted response shape are caught.
2. Add an assertion that the appropriate hook (e.g., `mock_hooks.on_agent_done`) is called once with the final LLM response on abort, to ensure the stop path still runs cleanup/post-processing logic.
Suggested implementation:
```python
runner.request_stop()
# Collect all remaining responses from the stream after requesting stop
rest_responses = [first_resp]
async for response in step_iter:
rest_responses.append(response)
# Locate the explicit aborted response and validate its payload
aborted_resp = next(resp for resp in rest_responses if resp.type == "aborted")
assert getattr(aborted_resp, "output", None) is not None
# The aborted response should carry a MessageChain-style payload with type="aborted"
assert aborted_resp.output.type == "aborted"
# The runner should reflect that it was aborted
assert runner.was_aborted() is True
# The agent_done hook should still be called once with the final non-aborted response
non_aborted_responses = [resp for resp in rest_responses if resp.type != "aborted"]
final_response = non_aborted_responses[-1]
mock_hooks.on_agent_done.assert_called_once_with(final_response)
```
1. If `aborted_resp.output` is a `MessageChain` (or similar) type that is not yet imported in this test module, add the appropriate import at the top of the file, e.g.:
- `from inspect_ai.schema import MessageChain`
2. Ensure the `mock_hooks` fixture (or factory) passed into `agent_hooks` exposes `on_agent_done` as a `Mock`/`MagicMock`:
- e.g., `mock_hooks.on_agent_done = mocker.Mock()` or similar, if not already present.
3. If the actual attribute name for the payload differs from `output` (e.g., `message` or `data`), adjust `aborted_resp.output` accordingly to match the existing response schema.
4. If the hook’s expected signature is `(runner, final_response)` or includes additional parameters, update `assert_called_once_with` to match that exact signature (you can inspect other tests in this file that assert on `mock_hooks.on_agent_done` for consistency).
</issue_to_address>
### Comment 4
<location> `astrbot/core/astr_agent_run_util.py:55` </location>
<code_context>
)
)
+ stop_watcher = asyncio.create_task(
+ _watch_agent_stop_signal(agent_runner, astr_event),
+ )
</code_context>
<issue_to_address>
**issue (complexity):** Consider restructuring the stop handling so a single watcher owns `request_stop()` and shared helper logic cleans up `stop_watcher` in one place.
You can simplify the new stop logic without losing any behavior by:
1. Having exactly one place that calls `agent_runner.request_stop()`.
2. De-duplicating the `stop_watcher` cancellation logic.
### 1. Single authority for `request_stop`
Right now:
- `_watch_agent_stop_signal` calls `agent_runner.request_stop()`.
- The main loop also calls `agent_runner.request_stop()` based on `_should_stop_agent`.
That makes it harder to reason about the lifecycle. Pick one authority. For example: let the watcher be the only place that calls `request_stop()`, and let the loop only *observe* the stop state:
```python
async for resp in agent_runner.step():
# Only observe stop state; do not call request_stop() here
if _should_stop_agent(astr_event):
if resp.type == "aborted":
# special aborted handling
astr_event.set_extra("agent_user_aborted", True)
astr_event.set_extra("agent_stop_requested", False)
break
# cooperative stop: just stop consuming further responses
break
if resp.type == "aborted":
astr_event.set_extra("agent_user_aborted", True)
astr_event.set_extra("agent_stop_requested", False)
break
if resp.type == "tool_call_result":
...
```
And keep `request_stop()` only in the watcher:
```python
async def _watch_agent_stop_signal(agent_runner: AgentRunner, astr_event) -> None:
while not agent_runner.done():
if _should_stop_agent(astr_event):
agent_runner.request_stop()
return
await asyncio.sleep(0.5)
```
This keeps the same semantics but makes it clear that the watcher owns the stop request, and the loop only reacts to the stop state.
### 2. Factor out repeated `stop_watcher` cancellation
The `stop_watcher` cancellation pattern is repeated in three places. Extract a tiny helper and/or use `try/finally` so it’s centralized.
Helper:
```python
async def _cancel_task_safely(task: asyncio.Task | None) -> None:
if not task or task.done():
return
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
```
Then in `run_agent`:
```python
stop_watcher: asyncio.Task | None = None
try:
stop_watcher = asyncio.create_task(
_watch_agent_stop_signal(agent_runner, astr_event),
)
async for resp in agent_runner.step():
...
if resp.type == "aborted":
astr_event.set_extra("agent_user_aborted", True)
astr_event.set_extra("agent_stop_requested", False)
break
...
if agent_runner.done():
...
break
except Exception as e:
await _cancel_task_safely(stop_watcher)
...
finally:
# ensure watcher is always cleaned up, even on normal completion
await _cancel_task_safely(stop_watcher)
```
This removes the per-branch duplication and makes the lifetime of `stop_watcher` obvious: created once, always cancelled/awaited in one place.
</issue_to_address>
### Comment 5
<location> `astrbot/core/agent/runners/tool_loop_agent_runner.py:333` </location>
<code_context>
),
),
)
+ if self._stop_requested:
+ llm_resp_result = LLMResponse(
+ role="assistant",
</code_context>
<issue_to_address>
**issue (complexity):** Consider extracting helpers for building the interrupted LLM response and finalizing aborts so the stop/abort logic in step() is centralized and less repetitive.
You can centralize the stop/abort path and remove duplication by extracting two small helpers: one to construct the “interrupted” `LLMResponse`, and one to finalize the abort (state, stats, messages, hooks, yield). That keeps `step()`’s control flow simpler while preserving behavior.
### 1. Extract a helper to build the interrupted `LLMResponse`
Right now the “user interrupted” system string and normalization are duplicated in multiple places. Wrap that into a single helper:
```python
SYSTEM_INTERRUPTED_TEXT = (
"[SYSTEM: User actively interrupted the response generation. "
"Partial output before interruption is preserved.]"
)
def _build_interrupted_response(
self,
base_resp: LLMResponse | None,
) -> LLMResponse:
"""
Normalize an interrupted response into an assistant LLMResponse.
- If base_resp is provided, preserve reasoning fields / partial content.
- Ensure role='assistant'.
- Ensure completion_text carries the system interruption message
when appropriate.
"""
if base_resp is None:
return LLMResponse(role="assistant", completion_text="")
if base_resp.role != "assistant":
return LLMResponse(
role="assistant",
completion_text=SYSTEM_INTERRUPTED_TEXT,
reasoning_content=base_resp.reasoning_content,
reasoning_signature=base_resp.reasoning_signature,
)
# base_resp is already assistant; ensure completion_text has something
if not base_resp.completion_text:
return LLMResponse(
role="assistant",
completion_text=SYSTEM_INTERRUPTED_TEXT,
reasoning_content=base_resp.reasoning_content,
reasoning_signature=base_resp.reasoning_signature,
)
return base_resp
```
Now both the in-loop and post-loop stop branches can reuse this instead of hard-coding strings and logic.
### 2. Extract a focused abort finalizer
All the stop-path side effects (state, stats, messages, hook, and yielding `AgentResponse`) can move into a single private method:
```python
async def _finalize_abort(self, llm_resp: LLMResponse) -> AsyncIterator[AgentResponse]:
logger.info("Agent execution was requested to stop by user.")
self.final_llm_resp = llm_resp
self._aborted = True
self._transition_state(AgentState.DONE)
self.stats.end_time = time.time()
parts: list[Part] = []
if llm_resp.reasoning_content or llm_resp.reasoning_signature:
parts.append(
ThinkPart(
think=llm_resp.reasoning_content,
encrypted=llm_resp.reasoning_signature,
)
)
if llm_resp.completion_text:
parts.append(TextPart(text=llm_resp.completion_text))
if parts:
self.run_context.messages.append(
Message(role="assistant", content=parts)
)
try:
await self.agent_hooks.on_agent_done(self.run_context, llm_resp)
except Exception as e:
logger.error("Error in on_agent_done hook: %s", e, exc_info=True)
yield AgentResponse(
type="aborted",
data=AgentResponseData(chain=MessageChain(type="aborted")),
)
```
### 3. Simplify `step()` to a single authoritative abort path
With those helpers, `step()` only needs:
- A normalized `llm_resp_result` set when stop is requested (either in-loop or post-loop).
- A single early exit that delegates to `_finalize_abort`.
For example (showing just the relevant parts around your changes):
```python
# inside the streaming loop
for llm_response in llm_responses:
...
if self._stop_requested:
# Use the latest chunk to build an interrupted response
llm_resp_result = self._build_interrupted_response(llm_response)
break
continue
llm_resp_result = llm_response
...
# after the loop
if not llm_resp_result:
if self._stop_requested:
# No final chunk; still need a normalized interrupted response
llm_resp_result = self._build_interrupted_response(None)
else:
return
if self._stop_requested:
async for aborted_resp in self._finalize_abort(llm_resp_result):
yield aborted_resp
return
```
This keeps:
- The stop behavior and stats/state/messaging identical.
- A single, clearly visible abort branch.
- All “user interrupted” string and normalization logic centralized in `_build_interrupted_response`.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| llm_resp_result = LLMResponse( | ||
| role="assistant", | ||
| completion_text="[SYSTEM: User actively interrupted the response generation. Partial output before interruption is preserved.]", | ||
| reasoning_content=llm_response.reasoning_content, | ||
| reasoning_signature=llm_response.reasoning_signature, |
There was a problem hiding this comment.
suggestion: 用户中断的系统消息字符串被重复使用了;建议将其集中管理。
这个中断标记字符串在这里以及循环结束后的 _stop_requested 分支中都是硬编码的。建议把它提取为一个共享常量或 helper(例如 make_user_abort_response(...)),这样两处路径中的文案可以保持一致,也更便于维护。
Suggested implementation:
if self._stop_requested:
llm_resp_result = LLMResponse(
role="assistant",
completion_text=USER_ABORT_SYSTEM_MESSAGE,
reasoning_content=llm_response.reasoning_content,
reasoning_signature=llm_response.reasoning_signature,
)
break if not llm_resp_result:
if self._stop_requested:
llm_resp_result = LLMResponse(
role="assistant",
completion_text=USER_ABORT_SYSTEM_MESSAGE,
)
else:
return要完整实现这一建议,还需要:
- 在该模块中只定义一次共享常量
USER_ABORT_SYSTEM_MESSAGE,例如放在tool_loop_agent_runner.py顶部附近:
USER_ABORT_SYSTEM_MESSAGE = (
"[SYSTEM: User actively interrupted the response generation. "
"Partial output before interruption is preserved.]"
)- 如果这个中断消息字符串在本文件(或相关的 runner)中还有其他出现位置,也替换为
USER_ABORT_SYSTEM_MESSAGE,让文案集中管理并保持一致。
如果你更倾向于用 helper 而不是裸常量(比如希望在有推理字段时保留下来),可以在本模块中定义一个类似 make_user_abort_response(llm_response: LLMResponse | None) -> LLMResponse 的函数,然后让这两个调用点都使用该函数,而不是直接构造 LLMResponse。
Original comment in English
suggestion: The user-interruption system message string is duplicated; consider centralizing it.
This interruption marker string is hard-coded both here and in the _stop_requested handling after the loop. Please extract it into a shared constant or helper (e.g., make_user_abort_response(...)) so the text stays consistent and easier to maintain across both paths.
Suggested implementation:
if self._stop_requested:
llm_resp_result = LLMResponse(
role="assistant",
completion_text=USER_ABORT_SYSTEM_MESSAGE,
reasoning_content=llm_response.reasoning_content,
reasoning_signature=llm_response.reasoning_signature,
)
break if not llm_resp_result:
if self._stop_requested:
llm_resp_result = LLMResponse(
role="assistant",
completion_text=USER_ABORT_SYSTEM_MESSAGE,
)
else:
returnTo fully implement the suggestion, also:
- Define the shared constant
USER_ABORT_SYSTEM_MESSAGEonce in this module, for example near the top oftool_loop_agent_runner.py:
USER_ABORT_SYSTEM_MESSAGE = (
"[SYSTEM: User actively interrupted the response generation. "
"Partial output before interruption is preserved.]"
)- If there are any other occurrences of the same interruption message string elsewhere in this file (or related runners), replace them with
USER_ABORT_SYSTEM_MESSAGEso the text is centralized and consistent.
If you prefer a helper instead of a bare constant (e.g., to preserve reasoning fields when available), you can define a function like make_user_abort_response(llm_response: LLMResponse | None) -> LLMResponse in this module and have both call sites use it instead of constructing LLMResponse directly.
| @pytest.mark.asyncio | ||
| async def test_stop_signal_returns_aborted_and_persists_partial_message( |
There was a problem hiding this comment.
suggestion (testing): 为非流式模式以及“在产生任何 chunk 之前发出停止请求”的场景补充测试
当前新增的测试覆盖了“在流式模式下已经发送了部分 chunk 后再中断”的情况,但还有两个重要场景尚未测试:
-
非流式模式(
streaming=False):添加一个使用MockAbortableStreamProvider(或类似)的测试,在streaming=False下调用runner.request_stop(),并断言:- 会产出一个
aborted响应; runner.was_aborted()为True,且final_llm_resp被正确设置;- 部分输出被保存在
run_context.messages中。
- 会产出一个
-
在任何 chunk 产生之前停止:添加一个测试,在从
step()消费之前先调用runner.request_stop(),并断言:- 生成器会产生一个
aborted响应(或按设计方式终止); final_llm_resp符合新分支的约定(例如当llm_resp_result缺失时,最终的 assistant 消息为空);run_context.messages与该行为一致。
- 生成器会产生一个
这些测试会覆盖新的停止逻辑在“非流式运行”和“立即停止”的两类场景下的行为。
Suggested implementation:
@pytest.mark.asyncio
async def test_stop_signal_returns_aborted_and_persists_partial_message(
runner, provider_request, mock_tool_executor, mock_hooks
):
provider = MockAbortableStreamProvider()
await runner.reset(
provider=provider,
request=provider_request,
run_context=ContextWrapper(context=None),
tool_executor=mock_tool_executor,
agent_hooks=mock_hooks,
streaming=True,
)
# Start a streaming step and consume at least one partial chunk
step_gen = runner.step()
partial_steps = []
async for step in step_gen:
partial_steps.append(step)
# Simulate user requesting stop after first partial assistant chunk
if len(partial_steps) == 1:
runner.request_stop()
# Once stop has been requested and an aborted response is observed, break
if getattr(step, "status", None) == "aborted" or getattr(
getattr(step, "response", None), "status", None
) == "aborted":
break
# We should have observed at least one step before aborting
assert partial_steps
# Runner should record that it was aborted
assert getattr(runner, "was_aborted")() is True
# Final LLM response should be recorded
assert getattr(runner, "final_llm_resp", None) is not None
# Partial assistant output should be preserved in the run context messages
run_context = getattr(runner, "run_context", None)
assert run_context is not None
messages = getattr(run_context, "messages", [])
assert messages
# There should be at least one assistant message (partial content)
assert any(getattr(m, "role", None) == "assistant" for m in messages)
@pytest.mark.asyncio
async def test_stop_signal_non_streaming_persists_partial_message_and_sets_final_resp(
runner, provider_request, mock_tool_executor, mock_hooks
):
"""Non-streaming mode: stop requested during the run should yield an aborted result
and preserve partial assistant output in the run context messages.
"""
provider = MockAbortableStreamProvider()
await runner.reset(
provider=provider,
request=provider_request,
run_context=ContextWrapper(context=None),
tool_executor=mock_tool_executor,
agent_hooks=mock_hooks,
streaming=False,
)
step_gen = runner.step()
# Consume the first step to allow the provider to start producing output.
# This is where partial output may have been generated internally.
first_step = await step_gen.__anext__()
assert first_step is not None
# Request stop after the run has started in non-streaming mode.
runner.request_stop()
# Collect remaining steps; one of them should reflect the aborted state.
remaining_steps = [first_step]
async for step in step_gen:
remaining_steps.append(step)
# Runner should be marked as aborted
assert getattr(runner, "was_aborted")() is True
# Final LLM response should be set
assert getattr(runner, "final_llm_resp", None) is not None
# There should be an aborted outcome in the collected steps
assert any(
getattr(s, "status", None) == "aborted"
or getattr(getattr(s, "response", None), "status", None) == "aborted"
for s in remaining_steps
)
# Partial assistant output should be preserved in the run context messages
run_context = getattr(runner, "run_context", None)
assert run_context is not None
messages = getattr(run_context, "messages", [])
assert messages
assert any(getattr(m, "role", None) == "assistant" for m in messages)
@pytest.mark.asyncio
async def test_stop_requested_before_any_chunks_yields_aborted_and_empty_assistant(
runner, provider_request, mock_tool_executor, mock_hooks
):
"""Stop requested before any chunks are consumed should still yield an aborted
response and produce an appropriate final_llm_resp and messages.
"""
provider = MockAbortableStreamProvider()
await runner.reset(
provider=provider,
request=provider_request,
run_context=ContextWrapper(context=None),
tool_executor=mock_tool_executor,
agent_hooks=mock_hooks,
streaming=True,
)
# Create the generator but request stop before consuming any chunks
step_gen = runner.step()
runner.request_stop()
steps = [step async for step in step_gen]
# There should be at least one step representing the aborted outcome
assert steps
assert any(
getattr(s, "status", None) == "aborted"
or getattr(getattr(s, "response", None), "status", None) == "aborted"
for s in steps
)
# Runner should be marked aborted and final_llm_resp should be consistent
assert getattr(runner, "was_aborted")() is True
final_llm_resp = getattr(runner, "final_llm_resp", None)
assert final_llm_resp is not None
# When stop happens before any chunks, assistant content should be empty
# or equivalent to the "no output" contract in this codebase.
assistant_messages = [
m for m in getattr(runner.run_context, "messages", []) if getattr(m, "role", None) == "assistant"
]
if assistant_messages:
# If an assistant message exists, it should have empty/whitespace-only content
assert all(
not getattr(m, "content", "") or str(getattr(m, "content", "")).strip() == ""
for m in assistant_messages
)
else:
# Alternatively, no assistant messages at all is also an acceptable "empty" contract.
assert assistant_messages == []These tests assume:
runner.was_aborted()is a callable method returning a bool,runner.final_llm_respholds the final LLM response, andrunner.run_context.messagesis a list of message objects withroleandcontentattributes.- The step objects yielded by
runner.step()either:- expose a
.statusattribute directly, or - have a
.responseattribute with a.statusfield set to"aborted"on abort.
- expose a
If your actual API differs (e.g., different attribute names or response shapes), you will need to:
- Update the
getattr(..., "status", ...)checks to match your real step/response model. - Adjust how messages are accessed from
run_context(e.g.,runner.run_context.context.messagesinstead ofrunner.run_context.messages). - Align the “empty assistant output” assertions with the concrete structure of your
final_llm_respand message types.
You may also want to align docstrings and test names to your project’s existing test naming conventions if they differ.
Original comment in English
suggestion (testing): Add tests for non-streaming mode and for stop requested before any chunks are produced
The new test covers the streaming case after a partial chunk has been delivered, but two important cases are still untested:
-
Non-streaming mode (
streaming=False): Add a test usingMockAbortableStreamProvider(or similar) withstreaming=Falseandrunner.request_stop()that asserts:- An
abortedresponse is yielded. runner.was_aborted()isTrueandfinal_llm_respis set correctly.- Partial output is preserved in
run_context.messages.
- An
-
Stop before any chunks: Add a test that calls
runner.request_stop()before consuming fromstep()and asserts:- The generator yields an
abortedresponse (or otherwise terminates as designed). final_llm_respmatches the intended contract for the new branch (e.g. empty assistant message whenllm_resp_resultis missing).run_context.messagesmatches that behavior.
- The generator yields an
These will exercise the new stop logic for both non-streaming runs and immediate-stop scenarios.
Suggested implementation:
@pytest.mark.asyncio
async def test_stop_signal_returns_aborted_and_persists_partial_message(
runner, provider_request, mock_tool_executor, mock_hooks
):
provider = MockAbortableStreamProvider()
await runner.reset(
provider=provider,
request=provider_request,
run_context=ContextWrapper(context=None),
tool_executor=mock_tool_executor,
agent_hooks=mock_hooks,
streaming=True,
)
# Start a streaming step and consume at least one partial chunk
step_gen = runner.step()
partial_steps = []
async for step in step_gen:
partial_steps.append(step)
# Simulate user requesting stop after first partial assistant chunk
if len(partial_steps) == 1:
runner.request_stop()
# Once stop has been requested and an aborted response is observed, break
if getattr(step, "status", None) == "aborted" or getattr(
getattr(step, "response", None), "status", None
) == "aborted":
break
# We should have observed at least one step before aborting
assert partial_steps
# Runner should record that it was aborted
assert getattr(runner, "was_aborted")() is True
# Final LLM response should be recorded
assert getattr(runner, "final_llm_resp", None) is not None
# Partial assistant output should be preserved in the run context messages
run_context = getattr(runner, "run_context", None)
assert run_context is not None
messages = getattr(run_context, "messages", [])
assert messages
# There should be at least one assistant message (partial content)
assert any(getattr(m, "role", None) == "assistant" for m in messages)
@pytest.mark.asyncio
async def test_stop_signal_non_streaming_persists_partial_message_and_sets_final_resp(
runner, provider_request, mock_tool_executor, mock_hooks
):
"""Non-streaming mode: stop requested during the run should yield an aborted result
and preserve partial assistant output in the run context messages.
"""
provider = MockAbortableStreamProvider()
await runner.reset(
provider=provider,
request=provider_request,
run_context=ContextWrapper(context=None),
tool_executor=mock_tool_executor,
agent_hooks=mock_hooks,
streaming=False,
)
step_gen = runner.step()
# Consume the first step to allow the provider to start producing output.
# This is where partial output may have been generated internally.
first_step = await step_gen.__anext__()
assert first_step is not None
# Request stop after the run has started in non-streaming mode.
runner.request_stop()
# Collect remaining steps; one of them should reflect the aborted state.
remaining_steps = [first_step]
async for step in step_gen:
remaining_steps.append(step)
# Runner should be marked as aborted
assert getattr(runner, "was_aborted")() is True
# Final LLM response should be set
assert getattr(runner, "final_llm_resp", None) is not None
# There should be an aborted outcome in the collected steps
assert any(
getattr(s, "status", None) == "aborted"
or getattr(getattr(s, "response", None), "status", None) == "aborted"
for s in remaining_steps
)
# Partial assistant output should be preserved in the run context messages
run_context = getattr(runner, "run_context", None)
assert run_context is not None
messages = getattr(run_context, "messages", [])
assert messages
assert any(getattr(m, "role", None) == "assistant" for m in messages)
@pytest.mark.asyncio
async def test_stop_requested_before_any_chunks_yields_aborted_and_empty_assistant(
runner, provider_request, mock_tool_executor, mock_hooks
):
"""Stop requested before any chunks are consumed should still yield an aborted
response and produce an appropriate final_llm_resp and messages.
"""
provider = MockAbortableStreamProvider()
await runner.reset(
provider=provider,
request=provider_request,
run_context=ContextWrapper(context=None),
tool_executor=mock_tool_executor,
agent_hooks=mock_hooks,
streaming=True,
)
# Create the generator but request stop before consuming any chunks
step_gen = runner.step()
runner.request_stop()
steps = [step async for step in step_gen]
# There should be at least one step representing the aborted outcome
assert steps
assert any(
getattr(s, "status", None) == "aborted"
or getattr(getattr(s, "response", None), "status", None) == "aborted"
for s in steps
)
# Runner should be marked aborted and final_llm_resp should be consistent
assert getattr(runner, "was_aborted")() is True
final_llm_resp = getattr(runner, "final_llm_resp", None)
assert final_llm_resp is not None
# When stop happens before any chunks, assistant content should be empty
# or equivalent to the "no output" contract in this codebase.
assistant_messages = [
m for m in getattr(runner.run_context, "messages", []) if getattr(m, "role", None) == "assistant"
]
if assistant_messages:
# If an assistant message exists, it should have empty/whitespace-only content
assert all(
not getattr(m, "content", "") or str(getattr(m, "content", "")).strip() == ""
for m in assistant_messages
)
else:
# Alternatively, no assistant messages at all is also an acceptable "empty" contract.
assert assistant_messages == []These tests assume:
runner.was_aborted()is a callable method returning a bool,runner.final_llm_respholds the final LLM response, andrunner.run_context.messagesis a list of message objects withroleandcontentattributes.- The step objects yielded by
runner.step()either:- expose a
.statusattribute directly, or - have a
.responseattribute with a.statusfield set to"aborted"on abort.
- expose a
If your actual API differs (e.g., different attribute names or response shapes), you will need to:
- Update the
getattr(..., "status", ...)checks to match your real step/response model. - Adjust how messages are accessed from
run_context(e.g.,runner.run_context.context.messagesinstead ofrunner.run_context.messages). - Align the “empty assistant output” assertions with the concrete structure of your
final_llm_respand message types.
You may also want to align docstrings and test names to your project’s existing test naming conventions if they differ.
| rest_responses = [] | ||
| async for response in step_iter: | ||
| rest_responses.append(response) | ||
|
|
||
| assert any(resp.type == "aborted" for resp in rest_responses) |
There was a problem hiding this comment.
suggestion (testing): 收紧对 aborted 响应的断言,并覆盖 hook 行为
当前测试只断言了存在一个 aborted 响应。为更好地锁定预期行为,可以:
- 明确找到 aborted 响应,并对其 payload 进行断言(例如
MessageChain(type="aborted")),以便在 aborted 响应结构发生变化时能被测试捕获; - 添加断言,确保相应的 hook(如
mock_hooks.on_agent_done)在中止时会以最终的 LLM 响应被调用一次,从而保证停止路径仍然会执行清理/后处理逻辑。
Suggested implementation:
runner.request_stop()
# Collect all remaining responses from the stream after requesting stop
rest_responses = [first_resp]
async for response in step_iter:
rest_responses.append(response)
# Locate the explicit aborted response and validate its payload
aborted_resp = next(resp for resp in rest_responses if resp.type == "aborted")
assert getattr(aborted_resp, "output", None) is not None
# The aborted response should carry a MessageChain-style payload with type="aborted"
assert aborted_resp.output.type == "aborted"
# The runner should reflect that it was aborted
assert runner.was_aborted() is True
# The agent_done hook should still be called once with the final non-aborted response
non_aborted_responses = [resp for resp in rest_responses if resp.type != "aborted"]
final_response = non_aborted_responses[-1]
mock_hooks.on_agent_done.assert_called_once_with(final_response)- 如果
aborted_resp.output是尚未在该测试模块中导入的MessageChain(或类似)类型,请在文件顶部添加相应的 import,例如:from inspect_ai.schema import MessageChain
- 确保传入
agent_hooks的mock_hooksfixture(或工厂)暴露了on_agent_done作为Mock/MagicMock:- 比如,如果还没有的话,
mock_hooks.on_agent_done = mocker.Mock()。
- 比如,如果还没有的话,
- 如果 payload 的真实属性名不是
output(例如message或data),请根据现有响应结构,将aborted_resp.output调整为正确的属性名。 - 如果 hook 的预期函数签名是
(runner, final_response)或包含额外参数,请调整assert_called_once_with以匹配具体签名(你可以参考本文件中其它对mock_hooks.on_agent_done断言的测试以保持一致)。
Original comment in English
suggestion (testing): Tighten assertions on the aborted response and exercise hook behavior
Currently the test only asserts that an aborted response exists. To better lock in the expected behavior:
- Explicitly locate the aborted response and assert on its payload (e.g.,
MessageChain(type="aborted")) so changes to the aborted response shape are caught. - Add an assertion that the appropriate hook (e.g.,
mock_hooks.on_agent_done) is called once with the final LLM response on abort, to ensure the stop path still runs cleanup/post-processing logic.
Suggested implementation:
runner.request_stop()
# Collect all remaining responses from the stream after requesting stop
rest_responses = [first_resp]
async for response in step_iter:
rest_responses.append(response)
# Locate the explicit aborted response and validate its payload
aborted_resp = next(resp for resp in rest_responses if resp.type == "aborted")
assert getattr(aborted_resp, "output", None) is not None
# The aborted response should carry a MessageChain-style payload with type="aborted"
assert aborted_resp.output.type == "aborted"
# The runner should reflect that it was aborted
assert runner.was_aborted() is True
# The agent_done hook should still be called once with the final non-aborted response
non_aborted_responses = [resp for resp in rest_responses if resp.type != "aborted"]
final_response = non_aborted_responses[-1]
mock_hooks.on_agent_done.assert_called_once_with(final_response)- If
aborted_resp.outputis aMessageChain(or similar) type that is not yet imported in this test module, add the appropriate import at the top of the file, e.g.:from inspect_ai.schema import MessageChain
- Ensure the
mock_hooksfixture (or factory) passed intoagent_hooksexposeson_agent_doneas aMock/MagicMock:- e.g.,
mock_hooks.on_agent_done = mocker.Mock()or similar, if not already present.
- e.g.,
- If the actual attribute name for the payload differs from
output(e.g.,messageordata), adjustaborted_resp.outputaccordingly to match the existing response schema. - If the hook’s expected signature is
(runner, final_response)or includes additional parameters, updateassert_called_once_withto match that exact signature (you can inspect other tests in this file that assert onmock_hooks.on_agent_donefor consistency).
| ) | ||
| ) | ||
|
|
||
| stop_watcher = asyncio.create_task( |
There was a problem hiding this comment.
issue (complexity): 考虑重构停止处理逻辑,使只有一个 watcher 负责 request_stop(),并通过共享的 helper 在一个地方清理 stop_watcher。
你可以在不改变行为的前提下简化新的停止逻辑:
- 只保留一个调用
agent_runner.request_stop()的入口; - 去重
stop_watcher的取消逻辑。
1. request_stop 的单一权威
目前:
_watch_agent_stop_signal会调用agent_runner.request_stop();- 主循环也会根据
_should_stop_agent调用agent_runner.request_stop()。
这会让生命周期更难推理。建议只保留一个“权威调用点”。例如:让 watcher 成为唯一调用 request_stop() 的地方,而主循环只 观察 停止状态:
async for resp in agent_runner.step():
# Only observe stop state; do not call request_stop() here
if _should_stop_agent(astr_event):
if resp.type == "aborted":
# special aborted handling
astr_event.set_extra("agent_user_aborted", True)
astr_event.set_extra("agent_stop_requested", False)
break
# cooperative stop: just stop consuming further responses
break
if resp.type == "aborted":
astr_event.set_extra("agent_user_aborted", True)
astr_event.set_extra("agent_stop_requested", False)
break
if resp.type == "tool_call_result":
...并仅在 watcher 中调用 request_stop():
async def _watch_agent_stop_signal(agent_runner: AgentRunner, astr_event) -> None:
while not agent_runner.done():
if _should_stop_agent(astr_event):
agent_runner.request_stop()
return
await asyncio.sleep(0.5)这样能保持语义不变,但会更清晰地表明:watcher 拥有停止请求的“所有权”,主循环只对停止状态作出反应。
2. 抽取重复的 stop_watcher 取消逻辑
stop_watcher 的取消模式在三处重复出现。可以抽取一个小 helper,和/或使用 try/finally 来集中处理。
Helper:
async def _cancel_task_safely(task: asyncio.Task | None) -> None:
if not task or task.done():
return
task.cancel()
try:
await task
except asyncio.CancelledError:
pass然后在 run_agent 中:
stop_watcher: asyncio.Task | None = None
try:
stop_watcher = asyncio.create_task(
_watch_agent_stop_signal(agent_runner, astr_event),
)
async for resp in agent_runner.step():
...
if resp.type == "aborted":
astr_event.set_extra("agent_user_aborted", True)
astr_event.set_extra("agent_stop_requested", False)
break
...
if agent_runner.done():
...
break
except Exception as e:
await _cancel_task_safely(stop_watcher)
...
finally:
# ensure watcher is always cleaned up, even on normal completion
await _cancel_task_safely(stop_watcher)这样可以移除各个分支里的重复代码,使 stop_watcher 的生命周期非常明确:只创建一次,并在一个地方统一取消/等待完成。
Original comment in English
issue (complexity): Consider restructuring the stop handling so a single watcher owns request_stop() and shared helper logic cleans up stop_watcher in one place.
You can simplify the new stop logic without losing any behavior by:
- Having exactly one place that calls
agent_runner.request_stop(). - De-duplicating the
stop_watchercancellation logic.
1. Single authority for request_stop
Right now:
_watch_agent_stop_signalcallsagent_runner.request_stop().- The main loop also calls
agent_runner.request_stop()based on_should_stop_agent.
That makes it harder to reason about the lifecycle. Pick one authority. For example: let the watcher be the only place that calls request_stop(), and let the loop only observe the stop state:
async for resp in agent_runner.step():
# Only observe stop state; do not call request_stop() here
if _should_stop_agent(astr_event):
if resp.type == "aborted":
# special aborted handling
astr_event.set_extra("agent_user_aborted", True)
astr_event.set_extra("agent_stop_requested", False)
break
# cooperative stop: just stop consuming further responses
break
if resp.type == "aborted":
astr_event.set_extra("agent_user_aborted", True)
astr_event.set_extra("agent_stop_requested", False)
break
if resp.type == "tool_call_result":
...And keep request_stop() only in the watcher:
async def _watch_agent_stop_signal(agent_runner: AgentRunner, astr_event) -> None:
while not agent_runner.done():
if _should_stop_agent(astr_event):
agent_runner.request_stop()
return
await asyncio.sleep(0.5)This keeps the same semantics but makes it clear that the watcher owns the stop request, and the loop only reacts to the stop state.
2. Factor out repeated stop_watcher cancellation
The stop_watcher cancellation pattern is repeated in three places. Extract a tiny helper and/or use try/finally so it’s centralized.
Helper:
async def _cancel_task_safely(task: asyncio.Task | None) -> None:
if not task or task.done():
return
task.cancel()
try:
await task
except asyncio.CancelledError:
passThen in run_agent:
stop_watcher: asyncio.Task | None = None
try:
stop_watcher = asyncio.create_task(
_watch_agent_stop_signal(agent_runner, astr_event),
)
async for resp in agent_runner.step():
...
if resp.type == "aborted":
astr_event.set_extra("agent_user_aborted", True)
astr_event.set_extra("agent_stop_requested", False)
break
...
if agent_runner.done():
...
break
except Exception as e:
await _cancel_task_safely(stop_watcher)
...
finally:
# ensure watcher is always cleaned up, even on normal completion
await _cancel_task_safely(stop_watcher)This removes the per-branch duplication and makes the lifetime of stop_watcher obvious: created once, always cancelled/awaited in one place.
| ), | ||
| ), | ||
| ) | ||
| if self._stop_requested: |
There was a problem hiding this comment.
issue (complexity): 建议提取构造“被中断的 LLM 响应”和“完成中止流程”的辅助函数,从而将 step() 中的停止/中止逻辑集中起来,减少重复。
你可以通过抽取两个小 helper 来集中停止/中止路径并消除重复:一个用于构造“中断”的 LLMResponse,另一个用于完成中止流程(更新状态、统计、消息、hooks,并产出最终的 AgentResponse)。这样可以在保留行为的前提下让 step() 的控制流更加简洁。
1. 抽取构造中断 LLMResponse 的 helper
目前“用户中断”的系统字符串和归一化逻辑在多个地方重复出现。可以将它们封装到一个 helper 中:
SYSTEM_INTERRUPTED_TEXT = (
"[SYSTEM: User actively interrupted the response generation. "
"Partial output before interruption is preserved.]"
)
def _build_interrupted_response(
self,
base_resp: LLMResponse | None,
) -> LLMResponse:
"""
Normalize an interrupted response into an assistant LLMResponse.
- If base_resp is provided, preserve reasoning fields / partial content.
- Ensure role='assistant'.
- Ensure completion_text carries the system interruption message
when appropriate.
"""
if base_resp is None:
return LLMResponse(role="assistant", completion_text="")
if base_resp.role != "assistant":
return LLMResponse(
role="assistant",
completion_text=SYSTEM_INTERRUPTED_TEXT,
reasoning_content=base_resp.reasoning_content,
reasoning_signature=base_resp.reasoning_signature,
)
# base_resp is already assistant; ensure completion_text has something
if not base_resp.completion_text:
return LLMResponse(
role="assistant",
completion_text=SYSTEM_INTERRUPTED_TEXT,
reasoning_content=base_resp.reasoning_content,
reasoning_signature=base_resp.reasoning_signature,
)
return base_resp这样,循环内和循环后的停止分支都可以复用这个 helper,而不用在多处硬编码字符串和逻辑。
2. 抽取专职的中止收尾 helper
所有停止路径的副作用(状态、统计、消息、hook,以及最终产出的 AgentResponse)可以移动到一个私有方法中:
async def _finalize_abort(self, llm_resp: LLMResponse) -> AsyncIterator[AgentResponse]:
logger.info("Agent execution was requested to stop by user.")
self.final_llm_resp = llm_resp
self._aborted = True
self._transition_state(AgentState.DONE)
self.stats.end_time = time.time()
parts: list[Part] = []
if llm_resp.reasoning_content or llm_resp.reasoning_signature:
parts.append(
ThinkPart(
think=llm_resp.reasoning_content,
encrypted=llm_resp.reasoning_signature,
)
)
if llm_resp.completion_text:
parts.append(TextPart(text=llm_resp.completion_text))
if parts:
self.run_context.messages.append(
Message(role="assistant", content=parts)
)
try:
await self.agent_hooks.on_agent_done(self.run_context, llm_resp)
except Exception as e:
logger.error("Error in on_agent_done hook: %s", e, exc_info=True)
yield AgentResponse(
type="aborted",
data=AgentResponseData(chain=MessageChain(type="aborted")),
)3. 将 step() 简化为单一的中止路径
有了这些 helper,step() 只需要:
- 在请求停止时(无论是循环内还是循环之后)构造并归一化一个
llm_resp_result; - 在一个统一的早返回分支中调用
_finalize_abort。
示例(只展示与你的修改相关的部分):
# inside the streaming loop
for llm_response in llm_responses:
...
if self._stop_requested:
# Use the latest chunk to build an interrupted response
llm_resp_result = self._build_interrupted_response(llm_response)
break
continue
llm_resp_result = llm_response
...
# after the loop
if not llm_resp_result:
if self._stop_requested:
# No final chunk; still need a normalized interrupted response
llm_resp_result = self._build_interrupted_response(None)
else:
return
if self._stop_requested:
async for aborted_resp in self._finalize_abort(llm_resp_result):
yield aborted_resp
return这样可以确保:
- 停止行为以及状态/统计/消息的处理保持不变;
- 只有一个清晰可见的中止分支;
- 所有“用户中断”文案及归一化逻辑都集中在
_build_interrupted_response中。
Original comment in English
issue (complexity): Consider extracting helpers for building the interrupted LLM response and finalizing aborts so the stop/abort logic in step() is centralized and less repetitive.
You can centralize the stop/abort path and remove duplication by extracting two small helpers: one to construct the “interrupted” LLMResponse, and one to finalize the abort (state, stats, messages, hooks, yield). That keeps step()’s control flow simpler while preserving behavior.
1. Extract a helper to build the interrupted LLMResponse
Right now the “user interrupted” system string and normalization are duplicated in multiple places. Wrap that into a single helper:
SYSTEM_INTERRUPTED_TEXT = (
"[SYSTEM: User actively interrupted the response generation. "
"Partial output before interruption is preserved.]"
)
def _build_interrupted_response(
self,
base_resp: LLMResponse | None,
) -> LLMResponse:
"""
Normalize an interrupted response into an assistant LLMResponse.
- If base_resp is provided, preserve reasoning fields / partial content.
- Ensure role='assistant'.
- Ensure completion_text carries the system interruption message
when appropriate.
"""
if base_resp is None:
return LLMResponse(role="assistant", completion_text="")
if base_resp.role != "assistant":
return LLMResponse(
role="assistant",
completion_text=SYSTEM_INTERRUPTED_TEXT,
reasoning_content=base_resp.reasoning_content,
reasoning_signature=base_resp.reasoning_signature,
)
# base_resp is already assistant; ensure completion_text has something
if not base_resp.completion_text:
return LLMResponse(
role="assistant",
completion_text=SYSTEM_INTERRUPTED_TEXT,
reasoning_content=base_resp.reasoning_content,
reasoning_signature=base_resp.reasoning_signature,
)
return base_respNow both the in-loop and post-loop stop branches can reuse this instead of hard-coding strings and logic.
2. Extract a focused abort finalizer
All the stop-path side effects (state, stats, messages, hook, and yielding AgentResponse) can move into a single private method:
async def _finalize_abort(self, llm_resp: LLMResponse) -> AsyncIterator[AgentResponse]:
logger.info("Agent execution was requested to stop by user.")
self.final_llm_resp = llm_resp
self._aborted = True
self._transition_state(AgentState.DONE)
self.stats.end_time = time.time()
parts: list[Part] = []
if llm_resp.reasoning_content or llm_resp.reasoning_signature:
parts.append(
ThinkPart(
think=llm_resp.reasoning_content,
encrypted=llm_resp.reasoning_signature,
)
)
if llm_resp.completion_text:
parts.append(TextPart(text=llm_resp.completion_text))
if parts:
self.run_context.messages.append(
Message(role="assistant", content=parts)
)
try:
await self.agent_hooks.on_agent_done(self.run_context, llm_resp)
except Exception as e:
logger.error("Error in on_agent_done hook: %s", e, exc_info=True)
yield AgentResponse(
type="aborted",
data=AgentResponseData(chain=MessageChain(type="aborted")),
)3. Simplify step() to a single authoritative abort path
With those helpers, step() only needs:
- A normalized
llm_resp_resultset when stop is requested (either in-loop or post-loop). - A single early exit that delegates to
_finalize_abort.
For example (showing just the relevant parts around your changes):
# inside the streaming loop
for llm_response in llm_responses:
...
if self._stop_requested:
# Use the latest chunk to build an interrupted response
llm_resp_result = self._build_interrupted_response(llm_response)
break
continue
llm_resp_result = llm_response
...
# after the loop
if not llm_resp_result:
if self._stop_requested:
# No final chunk; still need a normalized interrupted response
llm_resp_result = self._build_interrupted_response(None)
else:
return
if self._stop_requested:
async for aborted_resp in self._finalize_abort(llm_resp_result):
yield aborted_resp
returnThis keeps:
- The stop behavior and stats/state/messaging identical.
- A single, clearly visible abort branch.
- All “user interrupted” string and normalization logic centralized in
_build_interrupted_response.
There was a problem hiding this comment.
Code Review
The pull request introduces a valuable 'stop' functionality for active agent sessions, allowing users to interrupt long-running tasks across the backend, utilities, and frontend. However, a critical security vulnerability exists in the command-line interface for the 'stop' command, lacking permission checks, which could allow unauthorized users to disrupt interactions in group chats. Furthermore, the implementation could be improved by addressing inefficiencies in run_agent's stop signal monitoring due to per-step task creation, reducing latency from the watcher's 0.5s polling interval, and ensuring partial output is correctly preserved in the database history by implementing chunk accumulation in the runner.
| llm_resp_result = LLMResponse( | ||
| role="assistant", | ||
| completion_text="[SYSTEM: User actively interrupted the response generation. Partial output before interruption is preserved.]", | ||
| reasoning_content=llm_response.reasoning_content, | ||
| reasoning_signature=llm_response.reasoning_signature, | ||
| ) |
There was a problem hiding this comment.
The runner currently does not accumulate streaming chunks. When a stop is requested, llm_resp_result is created using only the data from the current chunk. Consequently, all text and reasoning content yielded in previous chunks will be missing from the final response saved to the conversation history. To correctly preserve partial output as intended, you should accumulate completion_text and reasoning_content in buffers throughout the streaming loop and use those buffers when constructing the aborted LLMResponse.
| async def stop(self, message: AstrMessageEvent) -> None: | ||
| """停止当前会话正在运行的 Agent""" | ||
| cfg = self.context.get_config(umo=message.unified_msg_origin) | ||
| agent_runner_type = cfg["provider_settings"]["agent_runner_type"] | ||
| umo = message.unified_msg_origin | ||
|
|
||
| if agent_runner_type in THIRD_PARTY_AGENT_RUNNER_KEY: | ||
| stopped_count = active_event_registry.stop_all(umo, exclude=message) | ||
| else: | ||
| stopped_count = active_event_registry.request_agent_stop_all( | ||
| umo, | ||
| exclude=message, | ||
| ) |
There was a problem hiding this comment.
The stop command lacks permission checks, allowing any user in a shared session (such as a group chat) to interrupt an active agent task initiated by another user. In contrast, other destructive or disruptive commands in the same file, such as reset and del_conv, implement permission checks that default to requiring administrator privileges in group settings. This inconsistency allows a regular member to perform a denial-of-service-like action against other members' interactions with the bot.
| stop_watcher = asyncio.create_task( | ||
| _watch_agent_stop_signal(agent_runner, astr_event), | ||
| ) |
There was a problem hiding this comment.
Creating and cancelling the stop_watcher task inside the while loop for every step is inefficient. If an agent run takes many steps, this results in frequent task creation and destruction. It is recommended to create a single watcher task for the entire duration of the run_agent execution and clean it up in a finally block.
| if _should_stop_agent(astr_event): | ||
| agent_runner.request_stop() | ||
| return | ||
| await asyncio.sleep(0.5) |
There was a problem hiding this comment.
| # if user_aborted: | ||
| # message_to_save.append( | ||
| # Message( | ||
| # role="assistant", | ||
| # content="[User aborted this request. Partial output before abort was preserved.]", | ||
| # ).model_dump() | ||
| # ) |
| umo = ( | ||
| f"{session.platform_id}:{message_type}:" | ||
| f"{session.platform_id}!{username}!{session_id}" | ||
| ) |
There was a problem hiding this comment.
The logic for constructing the umo (Unified Message Origin) string is duplicated here and in the delete_webchat_session method. This duplication is fragile; if the UMO format requirements change, it's easy to miss one of these locations. Consider extracting this logic into a helper method or a property on the session model.
| if not stop_watcher.done(): | ||
| stop_watcher.cancel() | ||
| try: | ||
| await stop_watcher | ||
| except asyncio.CancelledError: | ||
| pass |
|
Generated docs update PR (pending manual review): AI change summary:
Experimental bot notice:
|
closes: #5328
Modifications / 改动点
Screenshots or Test Results / 运行截图或测试结果
Checklist / 检查清单
requirements.txt和pyproject.toml文件相应位置。/ I have ensured that no new dependencies are introduced, OR if new dependencies are introduced, they have been added to the appropriate locations inrequirements.txtandpyproject.toml.