Skip to content

Comments

feat: add stop functionality for active agent sessions and improve handling of stop requests#5380

Merged
Soulter merged 3 commits intomasterfrom
feat/stop-agent
Feb 23, 2026
Merged

feat: add stop functionality for active agent sessions and improve handling of stop requests#5380
Soulter merged 3 commits intomasterfrom
feat/stop-agent

Conversation

@Soulter
Copy link
Member

@Soulter Soulter commented Feb 23, 2026

closes: #5328

Modifications / 改动点

  • This is NOT a breaking change. / 这不是一个破坏性变更。

Screenshots or Test Results / 运行截图或测试结果


Checklist / 检查清单

  • 😊 如果 PR 中有新加入的功能,已经通过 Issue / 邮件等方式和作者讨论过。/ If there are new features added in the PR, I have discussed it with the authors through issues/emails, etc.
  • 👀 我的更改经过了良好的测试,并已在上方提供了“验证步骤”和“运行截图”。/ My changes have been well-tested, and "Verification Steps" and "Screenshots" have been provided above.
  • 🤓 我确保没有引入新依赖库,或者引入了新依赖库的同时将其添加到了 requirements.txtpyproject.toml 文件相应位置。/ I have ensured that no new dependencies are introduced, OR if new dependencies are introduced, they have been added to the appropriate locations in requirements.txt and pyproject.toml.
  • 😮 我的更改没有引入恶意代码。/ My changes do not introduce malicious code.

@dosubot dosubot bot added the size:L This PR changes 100-499 lines, ignoring generated files. label Feb 23, 2026
@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @Soulter, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances user control over active agent sessions by implementing a robust stop functionality. Users can now interrupt ongoing agent processes via a new chat command or a dedicated UI button, which gracefully terminates the agent's execution while preserving any partial output. This improves the responsiveness and usability of the system, especially for long-running or unintended agent interactions. The backend changes ensure proper state management and history recording for these interrupted sessions, while the frontend provides a clear and accessible way to trigger the stop action.

Highlights

  • New Stop Functionality for Agent Sessions: Introduced a new /stop command and a corresponding UI button to allow users to terminate active agent sessions gracefully. This prevents agents from continuing to process requests when no longer needed.
  • Improved Agent Stop Handling: The agent runner now includes mechanisms to detect and respond to stop requests, ensuring that partial outputs are preserved and the agent's state is correctly updated upon interruption.
  • Enhanced Conversation History Saving: The logic for saving conversation history has been updated to correctly handle scenarios where an agent run is aborted by the user, ensuring that any partial responses are still recorded.
  • Frontend Integration for Stopping Agents: The web dashboard's chat interface now dynamically displays a 'stop' button instead of the 'send' button when an agent is actively running, providing an intuitive way for users to interrupt ongoing operations.
Changelog
  • astrbot/builtin_stars/builtin_commands/commands/conversation.py
    • Added stop asynchronous method to handle agent session termination requests.
  • astrbot/builtin_stars/builtin_commands/main.py
    • Registered the new stop command to be accessible via the chat interface.
  • astrbot/core/agent/runners/tool_loop_agent_runner.py
    • Initialized _stop_requested and _aborted flags to manage agent state during termination.
    • Implemented logic to check for _stop_requested during agent steps and break the loop if a stop is requested.
    • Added comprehensive handling for _stop_requested to finalize LLM responses, set the agent to an aborted state, and trigger on_agent_done hooks.
    • Introduced request_stop and was_aborted methods to allow external components to signal and check for agent termination.
  • astrbot/core/astr_agent_run_util.py
    • Added _should_stop_agent utility function to check for agent stop signals.
    • Integrated a stop_watcher task to continuously monitor for agent stop signals during run_agent execution.
    • Modified run_agent to call agent_runner.request_stop() when a stop signal is detected and to handle aborted responses.
    • Ensured stop_watcher is cancelled and awaited in various exit paths of run_agent to prevent resource leaks.
    • Introduced _watch_agent_stop_signal asynchronous function to actively poll for agent stop requests.
  • astrbot/core/pipeline/process_stage/method/agent_sub_stages/internal.py
    • Updated history saving conditions to include agent_runner.was_aborted() to ensure partial results are saved.
    • Modified _save_to_history to accept a user_aborted flag and adjust LLM response handling for aborted sessions.
  • astrbot/core/utils/active_event_registry.py
    • Added request_agent_stop_all method to signal a stop request to all active agent events for a given UMO without fully stopping the event propagation.
  • astrbot/dashboard/routes/chat.py
    • Imported MessageType and active_event_registry for new functionality.
    • Added a new POST route /chat/stop to handle requests for stopping active chat sessions.
    • Implemented stop_session method to process stop requests, validate session ownership, and call active_event_registry.request_agent_stop_all.
  • dashboard/src/components/chat/Chat.vue
    • Passed is-running prop to ChatInput to indicate if an agent is active.
    • Added @stop event listener to ChatInput to trigger handleStopMessage.
  • dashboard/src/components/chat/ChatInput.vue
    • Modified the send button to conditionally display a 'stop' button with an error color when isRunning is true.
    • Added isRunning to the component's props interface.
    • Added stop to the component's emitted events.
  • dashboard/src/components/chat/StandaloneChat.vue
    • Passed is-running prop to ChatInput for standalone chat.
    • Added @stop event listener to ChatInput for standalone chat to trigger handleStopMessage.
  • dashboard/src/composables/useMessages.ts
    • Added new reactive references: currentRequestController, currentReader, currentRunningSessionId, and userStopRequested to manage stop state.
    • Updated sendMessage to set userStopRequested to false and currentRunningSessionId before sending.
    • Integrated AbortController to allow cancellation of fetch requests.
    • Stored the response.body reader in currentReader for potential cancellation.
    • Modified error handling in sendMessage to suppress console errors if userStopRequested is true.
    • Ensured cleanup of currentReader, currentRequestController, currentRunningSessionId, and userStopRequested in the finally block of sendMessage.
    • Implemented stopMessage function to send a stop request to the backend, cancel the current reader, and abort the fetch request.
    • Exported stopMessage from the composable.
  • dashboard/src/i18n/locales/en-US/features/chat.json
    • Added 'stopGenerating' translation key.
  • dashboard/src/i18n/locales/zh-CN/features/chat.json
    • Added '停止生成' (Stop generating) translation key.
  • tests/test_tool_loop_agent_runner.py
    • Added MockAbortableStreamProvider to simulate an LLM provider that can be aborted during streaming.
    • Added test_stop_signal_returns_aborted_and_persists_partial_message to verify agent stop functionality, ensuring partial messages are saved and the agent is marked as aborted.
Activity
  • The pull request introduces a new feature to stop active agent sessions.
  • The author has confirmed that the changes are not breaking.
  • The author has provided a summary of the modifications in the PR description.
  • The PR includes a checklist for review, covering aspects like new features discussion, testing, dependency management, and malicious code checks.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@dosubot dosubot bot added area:core The bug / feature is about astrbot's core, backend feature:chatui The bug / feature is about astrbot's chatui, webchat labels Feb 23, 2026
@dosubot
Copy link

dosubot bot commented Feb 23, 2026

Documentation Updates

1 document(s) were updated by changes in this PR:

pr4697的改动
View Changes
@@ -131,7 +131,7 @@
 ### 6. 主动代理会话停止功能
 
 #### 功能说明
-主动代理系统支持用户在对话进行中主动中断 Agent 任务执行,包括内置 Agent Runner 和第三方 Agent Runner。停止功能支持通过命令行和 UI 两种方式触发。
+主动代理系统支持用户在对话进行中主动中断 Agent 任务执行,包括内置 Agent Runner 和第三方 Agent Runner。停止功能支持通过 `/stop` 命令和 Dashboard UI 两种方式触发。
 
 #### 使用方式
 
@@ -143,32 +143,43 @@
 - 停止成功后会显示已停止的任务数量:"已请求停止 {count} 个运行中的任务。"
 - 若当前会话没有运行中的任务,会显示:"当前会话没有运行中的任务。"
 
+`/stop` 命令是内置对话命令的一部分,与 `/reset`、`/his` 等命令一样,可在任何对话平台(Telegram、QQ、微信、Discord 等)中使用。
+
 ##### Dashboard 聊天界面停止按钮
 在 Dashboard 聊天界面,当 Agent 正在执行时,聊天输入框中会显示停止按钮:
 
-- 停止按钮(红色,mdi-stop 图标)在 Agent 运行时自动出现,替代发送按钮
+- 停止按钮(mdi-stop 图标)在 Agent 运行时自动出现,替代发送按钮
 - 点击按钮可中断正在进行的响应生成
-- 按钮采用红色/错误色调,清晰标识为停止操作
+- 停止后会通过 API 调用请求中止当前会话的 Agent 任务
 
-#### 停止行为说明
+#### 停止机制说明
+
+停止机制根据 Agent Runner 类型有所不同:
 
 ##### 内置 Agent Runner(工具循环 Agent Runner)
 对于内置 Agent Runner,停止请求是平滑的(graceful),会保留中断前的部分输出:
 
+- 使用 `request_agent_stop_all()` 方法,不中断事件传播,允许后续流程(如历史记录保存)继续执行
 - 系统消息提示:"[SYSTEM: User actively interrupted the response generation. Partial output before interruption is preserved.]"
 - Agent 转换为 DONE 状态,并触发 on_agent_done 钩子
-- 对话历史和会话状态得以保留(与硬事件停止不同)
+- 对话历史和会话状态得以保留
 - 响应类型标记为 "aborted"
+- Agent 在执行过程中会定期检查 `_stop_requested` 标志,可平滑退出
 
-##### 第三方 Agent Runner
-对于第三方 Agent Runner,停止请求会完全中断事件传播,通过 `stop_all()` 方法实现。
+##### 第三方 Agent Runner(如 Dify、Coze)
+对于第三方 Agent Runner,停止请求会完全中断事件传播:
+
+- 使用 `stop_all()` 方法,完全停止事件流
+- 不保留中间状态,直接终止任务执行
 
 #### 技术实现要点
 
-- `request_agent_stop_all()` 方法(active_event_registry):用于平滑停止,不中断事件传播,允许后续流程(如历史记录保存)继续执行
-- `request_stop()` 方法(工具循环 Agent Runner):设置停止标志位
-- Agent 在执行过程中检查 `_stop_requested` 标志,可平滑退出
-- 与 `stop_all()` 方法的区别:`stop_all()` 会完全中断事件传播,而 `request_agent_stop_all()` 允许后续流程继续
+- `request_agent_stop_all()` 方法(active_event_registry):用于平滑停止,设置停止标志但不中断事件传播
+- `stop_all()` 方法(active_event_registry):用于硬停止,完全中断事件传播
+- `request_stop()` 方法(工具循环 Agent Runner):设置 `_stop_requested` 标志位
+- `was_aborted()` 方法(工具循环 Agent Runner):检查任务是否被用户主动中止
+- Agent 执行过程中通过后台监控任务(`_watch_agent_stop_signal`)检查停止信号
+- Dashboard 通过 `/api/chat/stop` API 端点触发停止请求
 
 ---
 

How did I do? Any feedback?  Join Discord

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - 我发现了 5 个问题,并留下了一些高层次的反馈:

  • run_agent 中,停止逻辑同时通过 _watch_agent_stop_signal 和每次迭代的 _should_stop_agent 检查来处理;建议将其整合为单一机制,以避免重复的 request_stop() 调用和多个 continue 分支,这些会让控制流变得更难理解。
  • _save_to_history 中,被注释掉的 user_aborted 历史标记代码块会增加噪音;如果你打算在历史中持久化这一状态,要么删除这段注释代码,要么实现一个清晰、显式的“用户中止对话”的表示方式。
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- In `run_agent`, the stop logic is handled both via `_watch_agent_stop_signal` and per-iteration `_should_stop_agent` checks; consider consolidating this into a single mechanism to avoid redundant `request_stop()` calls and `continue` branches that can make the control flow harder to follow.
- In `_save_to_history`, the commented-out `user_aborted` history marker block adds noise; either remove it or implement a clear, explicit representation of user-aborted conversations if you intend to persist that state in history.

## Individual Comments

### Comment 1
<location> `astrbot/core/agent/runners/tool_loop_agent_runner.py:334-338` </location>
<code_context>
                         ),
                     )
+                if self._stop_requested:
+                    llm_resp_result = LLMResponse(
+                        role="assistant",
+                        completion_text="[SYSTEM: User actively interrupted the response generation. Partial output before interruption is preserved.]",
+                        reasoning_content=llm_response.reasoning_content,
+                        reasoning_signature=llm_response.reasoning_signature,
+                    )
+                    break
</code_context>

<issue_to_address>
**suggestion:** The user-interruption system message string is duplicated; consider centralizing it.

This interruption marker string is hard-coded both here and in the `_stop_requested` handling after the loop. Please extract it into a shared constant or helper (e.g., `make_user_abort_response(...)`) so the text stays consistent and easier to maintain across both paths.

Suggested implementation:

```python
                if self._stop_requested:
                    llm_resp_result = LLMResponse(
                        role="assistant",
                        completion_text=USER_ABORT_SYSTEM_MESSAGE,
                        reasoning_content=llm_response.reasoning_content,
                        reasoning_signature=llm_response.reasoning_signature,
                    )
                    break

```

```python
        if not llm_resp_result:
            if self._stop_requested:
                llm_resp_result = LLMResponse(
                    role="assistant",
                    completion_text=USER_ABORT_SYSTEM_MESSAGE,
                )
            else:
                return

```

To fully implement the suggestion, also:

1. Define the shared constant `USER_ABORT_SYSTEM_MESSAGE` once in this module, for example near the top of `tool_loop_agent_runner.py`:

```python
USER_ABORT_SYSTEM_MESSAGE = (
    "[SYSTEM: User actively interrupted the response generation. "
    "Partial output before interruption is preserved.]"
)
```

2. If there are any other occurrences of the same interruption message string elsewhere in this file (or related runners), replace them with `USER_ABORT_SYSTEM_MESSAGE` so the text is centralized and consistent.

If you prefer a helper instead of a bare constant (e.g., to preserve reasoning fields when available), you can define a function like `make_user_abort_response(llm_response: LLMResponse | None) -> LLMResponse` in this module and have both call sites use it instead of constructing `LLMResponse` directly.
</issue_to_address>

### Comment 2
<location> `tests/test_tool_loop_agent_runner.py:419-420` </location>
<code_context>
     assert fallback_provider.call_count == 1


+@pytest.mark.asyncio
+async def test_stop_signal_returns_aborted_and_persists_partial_message(
+    runner, provider_request, mock_tool_executor, mock_hooks
+):
</code_context>

<issue_to_address>
**suggestion (testing):** Add tests for non-streaming mode and for stop requested before any chunks are produced

The new test covers the streaming case after a partial chunk has been delivered, but two important cases are still untested:

1. **Non-streaming mode (`streaming=False`)**: Add a test using `MockAbortableStreamProvider` (or similar) with `streaming=False` and `runner.request_stop()` that asserts:
   - An `aborted` response is yielded.
   - `runner.was_aborted()` is `True` and `final_llm_resp` is set correctly.
   - Partial output is preserved in `run_context.messages`.

2. **Stop before any chunks**: Add a test that calls `runner.request_stop()` *before* consuming from `step()` and asserts:
   - The generator yields an `aborted` response (or otherwise terminates as designed).
   - `final_llm_resp` matches the intended contract for the new branch (e.g. empty assistant message when `llm_resp_result` is missing).
   - `run_context.messages` matches that behavior.

These will exercise the new stop logic for both non-streaming runs and immediate-stop scenarios.

Suggested implementation:

```python
@pytest.mark.asyncio
async def test_stop_signal_returns_aborted_and_persists_partial_message(
    runner, provider_request, mock_tool_executor, mock_hooks
):
    provider = MockAbortableStreamProvider()

    await runner.reset(
        provider=provider,
        request=provider_request,
        run_context=ContextWrapper(context=None),
        tool_executor=mock_tool_executor,
        agent_hooks=mock_hooks,
        streaming=True,
    )

    # Start a streaming step and consume at least one partial chunk
    step_gen = runner.step()
    partial_steps = []
    async for step in step_gen:
        partial_steps.append(step)
        # Simulate user requesting stop after first partial assistant chunk
        if len(partial_steps) == 1:
            runner.request_stop()
        # Once stop has been requested and an aborted response is observed, break
        if getattr(step, "status", None) == "aborted" or getattr(
            getattr(step, "response", None), "status", None
        ) == "aborted":
            break

    # We should have observed at least one step before aborting
    assert partial_steps
    # Runner should record that it was aborted
    assert getattr(runner, "was_aborted")() is True
    # Final LLM response should be recorded
    assert getattr(runner, "final_llm_resp", None) is not None

    # Partial assistant output should be preserved in the run context messages
    run_context = getattr(runner, "run_context", None)
    assert run_context is not None
    messages = getattr(run_context, "messages", [])
    assert messages
    # There should be at least one assistant message (partial content)
    assert any(getattr(m, "role", None) == "assistant" for m in messages)


@pytest.mark.asyncio
async def test_stop_signal_non_streaming_persists_partial_message_and_sets_final_resp(
    runner, provider_request, mock_tool_executor, mock_hooks
):
    """Non-streaming mode: stop requested during the run should yield an aborted result
    and preserve partial assistant output in the run context messages.
    """
    provider = MockAbortableStreamProvider()

    await runner.reset(
        provider=provider,
        request=provider_request,
        run_context=ContextWrapper(context=None),
        tool_executor=mock_tool_executor,
        agent_hooks=mock_hooks,
        streaming=False,
    )

    step_gen = runner.step()

    # Consume the first step to allow the provider to start producing output.
    # This is where partial output may have been generated internally.
    first_step = await step_gen.__anext__()
    assert first_step is not None

    # Request stop after the run has started in non-streaming mode.
    runner.request_stop()

    # Collect remaining steps; one of them should reflect the aborted state.
    remaining_steps = [first_step]
    async for step in step_gen:
        remaining_steps.append(step)

    # Runner should be marked as aborted
    assert getattr(runner, "was_aborted")() is True
    # Final LLM response should be set
    assert getattr(runner, "final_llm_resp", None) is not None

    # There should be an aborted outcome in the collected steps
    assert any(
        getattr(s, "status", None) == "aborted"
        or getattr(getattr(s, "response", None), "status", None) == "aborted"
        for s in remaining_steps
    )

    # Partial assistant output should be preserved in the run context messages
    run_context = getattr(runner, "run_context", None)
    assert run_context is not None
    messages = getattr(run_context, "messages", [])
    assert messages
    assert any(getattr(m, "role", None) == "assistant" for m in messages)


@pytest.mark.asyncio
async def test_stop_requested_before_any_chunks_yields_aborted_and_empty_assistant(
    runner, provider_request, mock_tool_executor, mock_hooks
):
    """Stop requested before any chunks are consumed should still yield an aborted
    response and produce an appropriate final_llm_resp and messages.
    """
    provider = MockAbortableStreamProvider()

    await runner.reset(
        provider=provider,
        request=provider_request,
        run_context=ContextWrapper(context=None),
        tool_executor=mock_tool_executor,
        agent_hooks=mock_hooks,
        streaming=True,
    )

    # Create the generator but request stop before consuming any chunks
    step_gen = runner.step()
    runner.request_stop()

    steps = [step async for step in step_gen]

    # There should be at least one step representing the aborted outcome
    assert steps
    assert any(
        getattr(s, "status", None) == "aborted"
        or getattr(getattr(s, "response", None), "status", None) == "aborted"
        for s in steps
    )

    # Runner should be marked aborted and final_llm_resp should be consistent
    assert getattr(runner, "was_aborted")() is True
    final_llm_resp = getattr(runner, "final_llm_resp", None)
    assert final_llm_resp is not None

    # When stop happens before any chunks, assistant content should be empty
    # or equivalent to the "no output" contract in this codebase.
    assistant_messages = [
        m for m in getattr(runner.run_context, "messages", []) if getattr(m, "role", None) == "assistant"
    ]
    if assistant_messages:
        # If an assistant message exists, it should have empty/whitespace-only content
        assert all(
            not getattr(m, "content", "") or str(getattr(m, "content", "")).strip() == ""
            for m in assistant_messages
        )
    else:
        # Alternatively, no assistant messages at all is also an acceptable "empty" contract.
        assert assistant_messages == []

```

These tests assume:

1. `runner.was_aborted()` is a callable method returning a bool, `runner.final_llm_resp` holds the final LLM response, and `runner.run_context.messages` is a list of message objects with `role` and `content` attributes.
2. The step objects yielded by `runner.step()` either:
   - expose a `.status` attribute directly, or
   - have a `.response` attribute with a `.status` field set to `"aborted"` on abort.

If your actual API differs (e.g., different attribute names or response shapes), you will need to:
- Update the `getattr(..., "status", ...)` checks to match your real step/response model.
- Adjust how messages are accessed from `run_context` (e.g., `runner.run_context.context.messages` instead of `runner.run_context.messages`).
- Align the “empty assistant output” assertions with the concrete structure of your `final_llm_resp` and message types.

You may also want to align docstrings and test names to your project’s existing test naming conventions if they differ.
</issue_to_address>

### Comment 3
<location> `tests/test_tool_loop_agent_runner.py:440-444` </location>
<code_context>
+
+    runner.request_stop()
+
+    rest_responses = []
+    async for response in step_iter:
+        rest_responses.append(response)
+
+    assert any(resp.type == "aborted" for resp in rest_responses)
+    assert runner.was_aborted() is True
+
</code_context>

<issue_to_address>
**suggestion (testing):** Tighten assertions on the aborted response and exercise hook behavior

Currently the test only asserts that an aborted response exists. To better lock in the expected behavior:

1. Explicitly locate the aborted response and assert on its payload (e.g., `MessageChain(type="aborted")`) so changes to the aborted response shape are caught.
2. Add an assertion that the appropriate hook (e.g., `mock_hooks.on_agent_done`) is called once with the final LLM response on abort, to ensure the stop path still runs cleanup/post-processing logic.

Suggested implementation:

```python
    runner.request_stop()

    # Collect all remaining responses from the stream after requesting stop
    rest_responses = [first_resp]
    async for response in step_iter:
        rest_responses.append(response)

    # Locate the explicit aborted response and validate its payload
    aborted_resp = next(resp for resp in rest_responses if resp.type == "aborted")
    assert getattr(aborted_resp, "output", None) is not None
    # The aborted response should carry a MessageChain-style payload with type="aborted"
    assert aborted_resp.output.type == "aborted"

    # The runner should reflect that it was aborted
    assert runner.was_aborted() is True

    # The agent_done hook should still be called once with the final non-aborted response
    non_aborted_responses = [resp for resp in rest_responses if resp.type != "aborted"]
    final_response = non_aborted_responses[-1]
    mock_hooks.on_agent_done.assert_called_once_with(final_response)

```

1. If `aborted_resp.output` is a `MessageChain` (or similar) type that is not yet imported in this test module, add the appropriate import at the top of the file, e.g.:
   - `from inspect_ai.schema import MessageChain`
2. Ensure the `mock_hooks` fixture (or factory) passed into `agent_hooks` exposes `on_agent_done` as a `Mock`/`MagicMock`:
   - e.g., `mock_hooks.on_agent_done = mocker.Mock()` or similar, if not already present.
3. If the actual attribute name for the payload differs from `output` (e.g., `message` or `data`), adjust `aborted_resp.output` accordingly to match the existing response schema.
4. If the hook’s expected signature is `(runner, final_response)` or includes additional parameters, update `assert_called_once_with` to match that exact signature (you can inspect other tests in this file that assert on `mock_hooks.on_agent_done` for consistency).
</issue_to_address>

### Comment 4
<location> `astrbot/core/astr_agent_run_util.py:55` </location>
<code_context>
                     )
                 )

+        stop_watcher = asyncio.create_task(
+            _watch_agent_stop_signal(agent_runner, astr_event),
+        )
</code_context>

<issue_to_address>
**issue (complexity):** Consider restructuring the stop handling so a single watcher owns `request_stop()` and shared helper logic cleans up `stop_watcher` in one place.

You can simplify the new stop logic without losing any behavior by:

1. Having exactly one place that calls `agent_runner.request_stop()`.
2. De-duplicating the `stop_watcher` cancellation logic.

### 1. Single authority for `request_stop`

Right now:

- `_watch_agent_stop_signal` calls `agent_runner.request_stop()`.
- The main loop also calls `agent_runner.request_stop()` based on `_should_stop_agent`.

That makes it harder to reason about the lifecycle. Pick one authority. For example: let the watcher be the only place that calls `request_stop()`, and let the loop only *observe* the stop state:

```python
async for resp in agent_runner.step():
    # Only observe stop state; do not call request_stop() here
    if _should_stop_agent(astr_event):
        if resp.type == "aborted":
            # special aborted handling
            astr_event.set_extra("agent_user_aborted", True)
            astr_event.set_extra("agent_stop_requested", False)
            break
        # cooperative stop: just stop consuming further responses
        break

    if resp.type == "aborted":
        astr_event.set_extra("agent_user_aborted", True)
        astr_event.set_extra("agent_stop_requested", False)
        break

    if resp.type == "tool_call_result":
        ...
```

And keep `request_stop()` only in the watcher:

```python
async def _watch_agent_stop_signal(agent_runner: AgentRunner, astr_event) -> None:
    while not agent_runner.done():
        if _should_stop_agent(astr_event):
            agent_runner.request_stop()
            return
        await asyncio.sleep(0.5)
```

This keeps the same semantics but makes it clear that the watcher owns the stop request, and the loop only reacts to the stop state.

### 2. Factor out repeated `stop_watcher` cancellation

The `stop_watcher` cancellation pattern is repeated in three places. Extract a tiny helper and/or use `try/finally` so it’s centralized.

Helper:

```python
async def _cancel_task_safely(task: asyncio.Task | None) -> None:
    if not task or task.done():
        return
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        pass
```

Then in `run_agent`:

```python
stop_watcher: asyncio.Task | None = None
try:
    stop_watcher = asyncio.create_task(
        _watch_agent_stop_signal(agent_runner, astr_event),
    )

    async for resp in agent_runner.step():
        ...
        if resp.type == "aborted":
            astr_event.set_extra("agent_user_aborted", True)
            astr_event.set_extra("agent_stop_requested", False)
            break
        ...

    if agent_runner.done():
        ...
        break

except Exception as e:
    await _cancel_task_safely(stop_watcher)
    ...
finally:
    # ensure watcher is always cleaned up, even on normal completion
    await _cancel_task_safely(stop_watcher)
```

This removes the per-branch duplication and makes the lifetime of `stop_watcher` obvious: created once, always cancelled/awaited in one place.
</issue_to_address>

### Comment 5
<location> `astrbot/core/agent/runners/tool_loop_agent_runner.py:333` </location>
<code_context>
                             ),
                         ),
                     )
+                if self._stop_requested:
+                    llm_resp_result = LLMResponse(
+                        role="assistant",
</code_context>

<issue_to_address>
**issue (complexity):** Consider extracting helpers for building the interrupted LLM response and finalizing aborts so the stop/abort logic in step() is centralized and less repetitive.

You can centralize the stop/abort path and remove duplication by extracting two small helpers: one to construct the “interrupted” `LLMResponse`, and one to finalize the abort (state, stats, messages, hooks, yield). That keeps `step()`’s control flow simpler while preserving behavior.

### 1. Extract a helper to build the interrupted `LLMResponse`

Right now the “user interrupted” system string and normalization are duplicated in multiple places. Wrap that into a single helper:

```python
SYSTEM_INTERRUPTED_TEXT = (
    "[SYSTEM: User actively interrupted the response generation. "
    "Partial output before interruption is preserved.]"
)

def _build_interrupted_response(
    self,
    base_resp: LLMResponse | None,
) -> LLMResponse:
    """
    Normalize an interrupted response into an assistant LLMResponse.

    - If base_resp is provided, preserve reasoning fields / partial content.
    - Ensure role='assistant'.
    - Ensure completion_text carries the system interruption message
      when appropriate.
    """
    if base_resp is None:
        return LLMResponse(role="assistant", completion_text="")

    if base_resp.role != "assistant":
        return LLMResponse(
            role="assistant",
            completion_text=SYSTEM_INTERRUPTED_TEXT,
            reasoning_content=base_resp.reasoning_content,
            reasoning_signature=base_resp.reasoning_signature,
        )

    # base_resp is already assistant; ensure completion_text has something
    if not base_resp.completion_text:
        return LLMResponse(
            role="assistant",
            completion_text=SYSTEM_INTERRUPTED_TEXT,
            reasoning_content=base_resp.reasoning_content,
            reasoning_signature=base_resp.reasoning_signature,
        )

    return base_resp
```

Now both the in-loop and post-loop stop branches can reuse this instead of hard-coding strings and logic.

### 2. Extract a focused abort finalizer

All the stop-path side effects (state, stats, messages, hook, and yielding `AgentResponse`) can move into a single private method:

```python
async def _finalize_abort(self, llm_resp: LLMResponse) -> AsyncIterator[AgentResponse]:
    logger.info("Agent execution was requested to stop by user.")

    self.final_llm_resp = llm_resp
    self._aborted = True
    self._transition_state(AgentState.DONE)
    self.stats.end_time = time.time()

    parts: list[Part] = []
    if llm_resp.reasoning_content or llm_resp.reasoning_signature:
        parts.append(
            ThinkPart(
                think=llm_resp.reasoning_content,
                encrypted=llm_resp.reasoning_signature,
            )
        )
    if llm_resp.completion_text:
        parts.append(TextPart(text=llm_resp.completion_text))
    if parts:
        self.run_context.messages.append(
            Message(role="assistant", content=parts)
        )

    try:
        await self.agent_hooks.on_agent_done(self.run_context, llm_resp)
    except Exception as e:
        logger.error("Error in on_agent_done hook: %s", e, exc_info=True)

    yield AgentResponse(
        type="aborted",
        data=AgentResponseData(chain=MessageChain(type="aborted")),
    )
```

### 3. Simplify `step()` to a single authoritative abort path

With those helpers, `step()` only needs:

- A normalized `llm_resp_result` set when stop is requested (either in-loop or post-loop).
- A single early exit that delegates to `_finalize_abort`.

For example (showing just the relevant parts around your changes):

```python
# inside the streaming loop
for llm_response in llm_responses:
    ...
    if self._stop_requested:
        # Use the latest chunk to build an interrupted response
        llm_resp_result = self._build_interrupted_response(llm_response)
        break

    continue

llm_resp_result = llm_response
...

# after the loop
if not llm_resp_result:
    if self._stop_requested:
        # No final chunk; still need a normalized interrupted response
        llm_resp_result = self._build_interrupted_response(None)
    else:
        return

if self._stop_requested:
    async for aborted_resp in self._finalize_abort(llm_resp_result):
        yield aborted_resp
    return
```

This keeps:

- The stop behavior and stats/state/messaging identical.
- A single, clearly visible abort branch.
- All “user interrupted” string and normalization logic centralized in `_build_interrupted_response`.
</issue_to_address>

Sourcery 对开源项目是免费的——如果你觉得这次 Review 有帮助,欢迎分享 ✨
帮我变得更有用!请在每条评论上点击 👍 或 👎,我会根据反馈改进之后的 Review。
Original comment in English

Hey - I've found 5 issues, and left some high level feedback:

  • In run_agent, the stop logic is handled both via _watch_agent_stop_signal and per-iteration _should_stop_agent checks; consider consolidating this into a single mechanism to avoid redundant request_stop() calls and continue branches that can make the control flow harder to follow.
  • In _save_to_history, the commented-out user_aborted history marker block adds noise; either remove it or implement a clear, explicit representation of user-aborted conversations if you intend to persist that state in history.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- In `run_agent`, the stop logic is handled both via `_watch_agent_stop_signal` and per-iteration `_should_stop_agent` checks; consider consolidating this into a single mechanism to avoid redundant `request_stop()` calls and `continue` branches that can make the control flow harder to follow.
- In `_save_to_history`, the commented-out `user_aborted` history marker block adds noise; either remove it or implement a clear, explicit representation of user-aborted conversations if you intend to persist that state in history.

## Individual Comments

### Comment 1
<location> `astrbot/core/agent/runners/tool_loop_agent_runner.py:334-338` </location>
<code_context>
                         ),
                     )
+                if self._stop_requested:
+                    llm_resp_result = LLMResponse(
+                        role="assistant",
+                        completion_text="[SYSTEM: User actively interrupted the response generation. Partial output before interruption is preserved.]",
+                        reasoning_content=llm_response.reasoning_content,
+                        reasoning_signature=llm_response.reasoning_signature,
+                    )
+                    break
</code_context>

<issue_to_address>
**suggestion:** The user-interruption system message string is duplicated; consider centralizing it.

This interruption marker string is hard-coded both here and in the `_stop_requested` handling after the loop. Please extract it into a shared constant or helper (e.g., `make_user_abort_response(...)`) so the text stays consistent and easier to maintain across both paths.

Suggested implementation:

```python
                if self._stop_requested:
                    llm_resp_result = LLMResponse(
                        role="assistant",
                        completion_text=USER_ABORT_SYSTEM_MESSAGE,
                        reasoning_content=llm_response.reasoning_content,
                        reasoning_signature=llm_response.reasoning_signature,
                    )
                    break

```

```python
        if not llm_resp_result:
            if self._stop_requested:
                llm_resp_result = LLMResponse(
                    role="assistant",
                    completion_text=USER_ABORT_SYSTEM_MESSAGE,
                )
            else:
                return

```

To fully implement the suggestion, also:

1. Define the shared constant `USER_ABORT_SYSTEM_MESSAGE` once in this module, for example near the top of `tool_loop_agent_runner.py`:

```python
USER_ABORT_SYSTEM_MESSAGE = (
    "[SYSTEM: User actively interrupted the response generation. "
    "Partial output before interruption is preserved.]"
)
```

2. If there are any other occurrences of the same interruption message string elsewhere in this file (or related runners), replace them with `USER_ABORT_SYSTEM_MESSAGE` so the text is centralized and consistent.

If you prefer a helper instead of a bare constant (e.g., to preserve reasoning fields when available), you can define a function like `make_user_abort_response(llm_response: LLMResponse | None) -> LLMResponse` in this module and have both call sites use it instead of constructing `LLMResponse` directly.
</issue_to_address>

### Comment 2
<location> `tests/test_tool_loop_agent_runner.py:419-420` </location>
<code_context>
     assert fallback_provider.call_count == 1


+@pytest.mark.asyncio
+async def test_stop_signal_returns_aborted_and_persists_partial_message(
+    runner, provider_request, mock_tool_executor, mock_hooks
+):
</code_context>

<issue_to_address>
**suggestion (testing):** Add tests for non-streaming mode and for stop requested before any chunks are produced

The new test covers the streaming case after a partial chunk has been delivered, but two important cases are still untested:

1. **Non-streaming mode (`streaming=False`)**: Add a test using `MockAbortableStreamProvider` (or similar) with `streaming=False` and `runner.request_stop()` that asserts:
   - An `aborted` response is yielded.
   - `runner.was_aborted()` is `True` and `final_llm_resp` is set correctly.
   - Partial output is preserved in `run_context.messages`.

2. **Stop before any chunks**: Add a test that calls `runner.request_stop()` *before* consuming from `step()` and asserts:
   - The generator yields an `aborted` response (or otherwise terminates as designed).
   - `final_llm_resp` matches the intended contract for the new branch (e.g. empty assistant message when `llm_resp_result` is missing).
   - `run_context.messages` matches that behavior.

These will exercise the new stop logic for both non-streaming runs and immediate-stop scenarios.

Suggested implementation:

```python
@pytest.mark.asyncio
async def test_stop_signal_returns_aborted_and_persists_partial_message(
    runner, provider_request, mock_tool_executor, mock_hooks
):
    provider = MockAbortableStreamProvider()

    await runner.reset(
        provider=provider,
        request=provider_request,
        run_context=ContextWrapper(context=None),
        tool_executor=mock_tool_executor,
        agent_hooks=mock_hooks,
        streaming=True,
    )

    # Start a streaming step and consume at least one partial chunk
    step_gen = runner.step()
    partial_steps = []
    async for step in step_gen:
        partial_steps.append(step)
        # Simulate user requesting stop after first partial assistant chunk
        if len(partial_steps) == 1:
            runner.request_stop()
        # Once stop has been requested and an aborted response is observed, break
        if getattr(step, "status", None) == "aborted" or getattr(
            getattr(step, "response", None), "status", None
        ) == "aborted":
            break

    # We should have observed at least one step before aborting
    assert partial_steps
    # Runner should record that it was aborted
    assert getattr(runner, "was_aborted")() is True
    # Final LLM response should be recorded
    assert getattr(runner, "final_llm_resp", None) is not None

    # Partial assistant output should be preserved in the run context messages
    run_context = getattr(runner, "run_context", None)
    assert run_context is not None
    messages = getattr(run_context, "messages", [])
    assert messages
    # There should be at least one assistant message (partial content)
    assert any(getattr(m, "role", None) == "assistant" for m in messages)


@pytest.mark.asyncio
async def test_stop_signal_non_streaming_persists_partial_message_and_sets_final_resp(
    runner, provider_request, mock_tool_executor, mock_hooks
):
    """Non-streaming mode: stop requested during the run should yield an aborted result
    and preserve partial assistant output in the run context messages.
    """
    provider = MockAbortableStreamProvider()

    await runner.reset(
        provider=provider,
        request=provider_request,
        run_context=ContextWrapper(context=None),
        tool_executor=mock_tool_executor,
        agent_hooks=mock_hooks,
        streaming=False,
    )

    step_gen = runner.step()

    # Consume the first step to allow the provider to start producing output.
    # This is where partial output may have been generated internally.
    first_step = await step_gen.__anext__()
    assert first_step is not None

    # Request stop after the run has started in non-streaming mode.
    runner.request_stop()

    # Collect remaining steps; one of them should reflect the aborted state.
    remaining_steps = [first_step]
    async for step in step_gen:
        remaining_steps.append(step)

    # Runner should be marked as aborted
    assert getattr(runner, "was_aborted")() is True
    # Final LLM response should be set
    assert getattr(runner, "final_llm_resp", None) is not None

    # There should be an aborted outcome in the collected steps
    assert any(
        getattr(s, "status", None) == "aborted"
        or getattr(getattr(s, "response", None), "status", None) == "aborted"
        for s in remaining_steps
    )

    # Partial assistant output should be preserved in the run context messages
    run_context = getattr(runner, "run_context", None)
    assert run_context is not None
    messages = getattr(run_context, "messages", [])
    assert messages
    assert any(getattr(m, "role", None) == "assistant" for m in messages)


@pytest.mark.asyncio
async def test_stop_requested_before_any_chunks_yields_aborted_and_empty_assistant(
    runner, provider_request, mock_tool_executor, mock_hooks
):
    """Stop requested before any chunks are consumed should still yield an aborted
    response and produce an appropriate final_llm_resp and messages.
    """
    provider = MockAbortableStreamProvider()

    await runner.reset(
        provider=provider,
        request=provider_request,
        run_context=ContextWrapper(context=None),
        tool_executor=mock_tool_executor,
        agent_hooks=mock_hooks,
        streaming=True,
    )

    # Create the generator but request stop before consuming any chunks
    step_gen = runner.step()
    runner.request_stop()

    steps = [step async for step in step_gen]

    # There should be at least one step representing the aborted outcome
    assert steps
    assert any(
        getattr(s, "status", None) == "aborted"
        or getattr(getattr(s, "response", None), "status", None) == "aborted"
        for s in steps
    )

    # Runner should be marked aborted and final_llm_resp should be consistent
    assert getattr(runner, "was_aborted")() is True
    final_llm_resp = getattr(runner, "final_llm_resp", None)
    assert final_llm_resp is not None

    # When stop happens before any chunks, assistant content should be empty
    # or equivalent to the "no output" contract in this codebase.
    assistant_messages = [
        m for m in getattr(runner.run_context, "messages", []) if getattr(m, "role", None) == "assistant"
    ]
    if assistant_messages:
        # If an assistant message exists, it should have empty/whitespace-only content
        assert all(
            not getattr(m, "content", "") or str(getattr(m, "content", "")).strip() == ""
            for m in assistant_messages
        )
    else:
        # Alternatively, no assistant messages at all is also an acceptable "empty" contract.
        assert assistant_messages == []

```

These tests assume:

1. `runner.was_aborted()` is a callable method returning a bool, `runner.final_llm_resp` holds the final LLM response, and `runner.run_context.messages` is a list of message objects with `role` and `content` attributes.
2. The step objects yielded by `runner.step()` either:
   - expose a `.status` attribute directly, or
   - have a `.response` attribute with a `.status` field set to `"aborted"` on abort.

If your actual API differs (e.g., different attribute names or response shapes), you will need to:
- Update the `getattr(..., "status", ...)` checks to match your real step/response model.
- Adjust how messages are accessed from `run_context` (e.g., `runner.run_context.context.messages` instead of `runner.run_context.messages`).
- Align the “empty assistant output” assertions with the concrete structure of your `final_llm_resp` and message types.

You may also want to align docstrings and test names to your project’s existing test naming conventions if they differ.
</issue_to_address>

### Comment 3
<location> `tests/test_tool_loop_agent_runner.py:440-444` </location>
<code_context>
+
+    runner.request_stop()
+
+    rest_responses = []
+    async for response in step_iter:
+        rest_responses.append(response)
+
+    assert any(resp.type == "aborted" for resp in rest_responses)
+    assert runner.was_aborted() is True
+
</code_context>

<issue_to_address>
**suggestion (testing):** Tighten assertions on the aborted response and exercise hook behavior

Currently the test only asserts that an aborted response exists. To better lock in the expected behavior:

1. Explicitly locate the aborted response and assert on its payload (e.g., `MessageChain(type="aborted")`) so changes to the aborted response shape are caught.
2. Add an assertion that the appropriate hook (e.g., `mock_hooks.on_agent_done`) is called once with the final LLM response on abort, to ensure the stop path still runs cleanup/post-processing logic.

Suggested implementation:

```python
    runner.request_stop()

    # Collect all remaining responses from the stream after requesting stop
    rest_responses = [first_resp]
    async for response in step_iter:
        rest_responses.append(response)

    # Locate the explicit aborted response and validate its payload
    aborted_resp = next(resp for resp in rest_responses if resp.type == "aborted")
    assert getattr(aborted_resp, "output", None) is not None
    # The aborted response should carry a MessageChain-style payload with type="aborted"
    assert aborted_resp.output.type == "aborted"

    # The runner should reflect that it was aborted
    assert runner.was_aborted() is True

    # The agent_done hook should still be called once with the final non-aborted response
    non_aborted_responses = [resp for resp in rest_responses if resp.type != "aborted"]
    final_response = non_aborted_responses[-1]
    mock_hooks.on_agent_done.assert_called_once_with(final_response)

```

1. If `aborted_resp.output` is a `MessageChain` (or similar) type that is not yet imported in this test module, add the appropriate import at the top of the file, e.g.:
   - `from inspect_ai.schema import MessageChain`
2. Ensure the `mock_hooks` fixture (or factory) passed into `agent_hooks` exposes `on_agent_done` as a `Mock`/`MagicMock`:
   - e.g., `mock_hooks.on_agent_done = mocker.Mock()` or similar, if not already present.
3. If the actual attribute name for the payload differs from `output` (e.g., `message` or `data`), adjust `aborted_resp.output` accordingly to match the existing response schema.
4. If the hook’s expected signature is `(runner, final_response)` or includes additional parameters, update `assert_called_once_with` to match that exact signature (you can inspect other tests in this file that assert on `mock_hooks.on_agent_done` for consistency).
</issue_to_address>

### Comment 4
<location> `astrbot/core/astr_agent_run_util.py:55` </location>
<code_context>
                     )
                 )

+        stop_watcher = asyncio.create_task(
+            _watch_agent_stop_signal(agent_runner, astr_event),
+        )
</code_context>

<issue_to_address>
**issue (complexity):** Consider restructuring the stop handling so a single watcher owns `request_stop()` and shared helper logic cleans up `stop_watcher` in one place.

You can simplify the new stop logic without losing any behavior by:

1. Having exactly one place that calls `agent_runner.request_stop()`.
2. De-duplicating the `stop_watcher` cancellation logic.

### 1. Single authority for `request_stop`

Right now:

- `_watch_agent_stop_signal` calls `agent_runner.request_stop()`.
- The main loop also calls `agent_runner.request_stop()` based on `_should_stop_agent`.

That makes it harder to reason about the lifecycle. Pick one authority. For example: let the watcher be the only place that calls `request_stop()`, and let the loop only *observe* the stop state:

```python
async for resp in agent_runner.step():
    # Only observe stop state; do not call request_stop() here
    if _should_stop_agent(astr_event):
        if resp.type == "aborted":
            # special aborted handling
            astr_event.set_extra("agent_user_aborted", True)
            astr_event.set_extra("agent_stop_requested", False)
            break
        # cooperative stop: just stop consuming further responses
        break

    if resp.type == "aborted":
        astr_event.set_extra("agent_user_aborted", True)
        astr_event.set_extra("agent_stop_requested", False)
        break

    if resp.type == "tool_call_result":
        ...
```

And keep `request_stop()` only in the watcher:

```python
async def _watch_agent_stop_signal(agent_runner: AgentRunner, astr_event) -> None:
    while not agent_runner.done():
        if _should_stop_agent(astr_event):
            agent_runner.request_stop()
            return
        await asyncio.sleep(0.5)
```

This keeps the same semantics but makes it clear that the watcher owns the stop request, and the loop only reacts to the stop state.

### 2. Factor out repeated `stop_watcher` cancellation

The `stop_watcher` cancellation pattern is repeated in three places. Extract a tiny helper and/or use `try/finally` so it’s centralized.

Helper:

```python
async def _cancel_task_safely(task: asyncio.Task | None) -> None:
    if not task or task.done():
        return
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        pass
```

Then in `run_agent`:

```python
stop_watcher: asyncio.Task | None = None
try:
    stop_watcher = asyncio.create_task(
        _watch_agent_stop_signal(agent_runner, astr_event),
    )

    async for resp in agent_runner.step():
        ...
        if resp.type == "aborted":
            astr_event.set_extra("agent_user_aborted", True)
            astr_event.set_extra("agent_stop_requested", False)
            break
        ...

    if agent_runner.done():
        ...
        break

except Exception as e:
    await _cancel_task_safely(stop_watcher)
    ...
finally:
    # ensure watcher is always cleaned up, even on normal completion
    await _cancel_task_safely(stop_watcher)
```

This removes the per-branch duplication and makes the lifetime of `stop_watcher` obvious: created once, always cancelled/awaited in one place.
</issue_to_address>

### Comment 5
<location> `astrbot/core/agent/runners/tool_loop_agent_runner.py:333` </location>
<code_context>
                             ),
                         ),
                     )
+                if self._stop_requested:
+                    llm_resp_result = LLMResponse(
+                        role="assistant",
</code_context>

<issue_to_address>
**issue (complexity):** Consider extracting helpers for building the interrupted LLM response and finalizing aborts so the stop/abort logic in step() is centralized and less repetitive.

You can centralize the stop/abort path and remove duplication by extracting two small helpers: one to construct the “interrupted” `LLMResponse`, and one to finalize the abort (state, stats, messages, hooks, yield). That keeps `step()`’s control flow simpler while preserving behavior.

### 1. Extract a helper to build the interrupted `LLMResponse`

Right now the “user interrupted” system string and normalization are duplicated in multiple places. Wrap that into a single helper:

```python
SYSTEM_INTERRUPTED_TEXT = (
    "[SYSTEM: User actively interrupted the response generation. "
    "Partial output before interruption is preserved.]"
)

def _build_interrupted_response(
    self,
    base_resp: LLMResponse | None,
) -> LLMResponse:
    """
    Normalize an interrupted response into an assistant LLMResponse.

    - If base_resp is provided, preserve reasoning fields / partial content.
    - Ensure role='assistant'.
    - Ensure completion_text carries the system interruption message
      when appropriate.
    """
    if base_resp is None:
        return LLMResponse(role="assistant", completion_text="")

    if base_resp.role != "assistant":
        return LLMResponse(
            role="assistant",
            completion_text=SYSTEM_INTERRUPTED_TEXT,
            reasoning_content=base_resp.reasoning_content,
            reasoning_signature=base_resp.reasoning_signature,
        )

    # base_resp is already assistant; ensure completion_text has something
    if not base_resp.completion_text:
        return LLMResponse(
            role="assistant",
            completion_text=SYSTEM_INTERRUPTED_TEXT,
            reasoning_content=base_resp.reasoning_content,
            reasoning_signature=base_resp.reasoning_signature,
        )

    return base_resp
```

Now both the in-loop and post-loop stop branches can reuse this instead of hard-coding strings and logic.

### 2. Extract a focused abort finalizer

All the stop-path side effects (state, stats, messages, hook, and yielding `AgentResponse`) can move into a single private method:

```python
async def _finalize_abort(self, llm_resp: LLMResponse) -> AsyncIterator[AgentResponse]:
    logger.info("Agent execution was requested to stop by user.")

    self.final_llm_resp = llm_resp
    self._aborted = True
    self._transition_state(AgentState.DONE)
    self.stats.end_time = time.time()

    parts: list[Part] = []
    if llm_resp.reasoning_content or llm_resp.reasoning_signature:
        parts.append(
            ThinkPart(
                think=llm_resp.reasoning_content,
                encrypted=llm_resp.reasoning_signature,
            )
        )
    if llm_resp.completion_text:
        parts.append(TextPart(text=llm_resp.completion_text))
    if parts:
        self.run_context.messages.append(
            Message(role="assistant", content=parts)
        )

    try:
        await self.agent_hooks.on_agent_done(self.run_context, llm_resp)
    except Exception as e:
        logger.error("Error in on_agent_done hook: %s", e, exc_info=True)

    yield AgentResponse(
        type="aborted",
        data=AgentResponseData(chain=MessageChain(type="aborted")),
    )
```

### 3. Simplify `step()` to a single authoritative abort path

With those helpers, `step()` only needs:

- A normalized `llm_resp_result` set when stop is requested (either in-loop or post-loop).
- A single early exit that delegates to `_finalize_abort`.

For example (showing just the relevant parts around your changes):

```python
# inside the streaming loop
for llm_response in llm_responses:
    ...
    if self._stop_requested:
        # Use the latest chunk to build an interrupted response
        llm_resp_result = self._build_interrupted_response(llm_response)
        break

    continue

llm_resp_result = llm_response
...

# after the loop
if not llm_resp_result:
    if self._stop_requested:
        # No final chunk; still need a normalized interrupted response
        llm_resp_result = self._build_interrupted_response(None)
    else:
        return

if self._stop_requested:
    async for aborted_resp in self._finalize_abort(llm_resp_result):
        yield aborted_resp
    return
```

This keeps:

- The stop behavior and stats/state/messaging identical.
- A single, clearly visible abort branch.
- All “user interrupted” string and normalization logic centralized in `_build_interrupted_response`.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment on lines +334 to +338
llm_resp_result = LLMResponse(
role="assistant",
completion_text="[SYSTEM: User actively interrupted the response generation. Partial output before interruption is preserved.]",
reasoning_content=llm_response.reasoning_content,
reasoning_signature=llm_response.reasoning_signature,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: 用户中断的系统消息字符串被重复使用了;建议将其集中管理。

这个中断标记字符串在这里以及循环结束后的 _stop_requested 分支中都是硬编码的。建议把它提取为一个共享常量或 helper(例如 make_user_abort_response(...)),这样两处路径中的文案可以保持一致,也更便于维护。

Suggested implementation:

                if self._stop_requested:
                    llm_resp_result = LLMResponse(
                        role="assistant",
                        completion_text=USER_ABORT_SYSTEM_MESSAGE,
                        reasoning_content=llm_response.reasoning_content,
                        reasoning_signature=llm_response.reasoning_signature,
                    )
                    break
        if not llm_resp_result:
            if self._stop_requested:
                llm_resp_result = LLMResponse(
                    role="assistant",
                    completion_text=USER_ABORT_SYSTEM_MESSAGE,
                )
            else:
                return

要完整实现这一建议,还需要:

  1. 在该模块中只定义一次共享常量 USER_ABORT_SYSTEM_MESSAGE,例如放在 tool_loop_agent_runner.py 顶部附近:
USER_ABORT_SYSTEM_MESSAGE = (
    "[SYSTEM: User actively interrupted the response generation. "
    "Partial output before interruption is preserved.]"
)
  1. 如果这个中断消息字符串在本文件(或相关的 runner)中还有其他出现位置,也替换为 USER_ABORT_SYSTEM_MESSAGE,让文案集中管理并保持一致。

如果你更倾向于用 helper 而不是裸常量(比如希望在有推理字段时保留下来),可以在本模块中定义一个类似 make_user_abort_response(llm_response: LLMResponse | None) -> LLMResponse 的函数,然后让这两个调用点都使用该函数,而不是直接构造 LLMResponse

Original comment in English

suggestion: The user-interruption system message string is duplicated; consider centralizing it.

This interruption marker string is hard-coded both here and in the _stop_requested handling after the loop. Please extract it into a shared constant or helper (e.g., make_user_abort_response(...)) so the text stays consistent and easier to maintain across both paths.

Suggested implementation:

                if self._stop_requested:
                    llm_resp_result = LLMResponse(
                        role="assistant",
                        completion_text=USER_ABORT_SYSTEM_MESSAGE,
                        reasoning_content=llm_response.reasoning_content,
                        reasoning_signature=llm_response.reasoning_signature,
                    )
                    break
        if not llm_resp_result:
            if self._stop_requested:
                llm_resp_result = LLMResponse(
                    role="assistant",
                    completion_text=USER_ABORT_SYSTEM_MESSAGE,
                )
            else:
                return

To fully implement the suggestion, also:

  1. Define the shared constant USER_ABORT_SYSTEM_MESSAGE once in this module, for example near the top of tool_loop_agent_runner.py:
USER_ABORT_SYSTEM_MESSAGE = (
    "[SYSTEM: User actively interrupted the response generation. "
    "Partial output before interruption is preserved.]"
)
  1. If there are any other occurrences of the same interruption message string elsewhere in this file (or related runners), replace them with USER_ABORT_SYSTEM_MESSAGE so the text is centralized and consistent.

If you prefer a helper instead of a bare constant (e.g., to preserve reasoning fields when available), you can define a function like make_user_abort_response(llm_response: LLMResponse | None) -> LLMResponse in this module and have both call sites use it instead of constructing LLMResponse directly.

Comment on lines +419 to +420
@pytest.mark.asyncio
async def test_stop_signal_returns_aborted_and_persists_partial_message(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): 为非流式模式以及“在产生任何 chunk 之前发出停止请求”的场景补充测试

当前新增的测试覆盖了“在流式模式下已经发送了部分 chunk 后再中断”的情况,但还有两个重要场景尚未测试:

  1. 非流式模式(streaming=False:添加一个使用 MockAbortableStreamProvider(或类似)的测试,在 streaming=False 下调用 runner.request_stop(),并断言:

    • 会产出一个 aborted 响应;
    • runner.was_aborted()True,且 final_llm_resp 被正确设置;
    • 部分输出被保存在 run_context.messages 中。
  2. 在任何 chunk 产生之前停止:添加一个测试,在从 step() 消费之前先调用 runner.request_stop(),并断言:

    • 生成器会产生一个 aborted 响应(或按设计方式终止);
    • final_llm_resp 符合新分支的约定(例如当 llm_resp_result 缺失时,最终的 assistant 消息为空);
    • run_context.messages 与该行为一致。

这些测试会覆盖新的停止逻辑在“非流式运行”和“立即停止”的两类场景下的行为。

Suggested implementation:

@pytest.mark.asyncio
async def test_stop_signal_returns_aborted_and_persists_partial_message(
    runner, provider_request, mock_tool_executor, mock_hooks
):
    provider = MockAbortableStreamProvider()

    await runner.reset(
        provider=provider,
        request=provider_request,
        run_context=ContextWrapper(context=None),
        tool_executor=mock_tool_executor,
        agent_hooks=mock_hooks,
        streaming=True,
    )

    # Start a streaming step and consume at least one partial chunk
    step_gen = runner.step()
    partial_steps = []
    async for step in step_gen:
        partial_steps.append(step)
        # Simulate user requesting stop after first partial assistant chunk
        if len(partial_steps) == 1:
            runner.request_stop()
        # Once stop has been requested and an aborted response is observed, break
        if getattr(step, "status", None) == "aborted" or getattr(
            getattr(step, "response", None), "status", None
        ) == "aborted":
            break

    # We should have observed at least one step before aborting
    assert partial_steps
    # Runner should record that it was aborted
    assert getattr(runner, "was_aborted")() is True
    # Final LLM response should be recorded
    assert getattr(runner, "final_llm_resp", None) is not None

    # Partial assistant output should be preserved in the run context messages
    run_context = getattr(runner, "run_context", None)
    assert run_context is not None
    messages = getattr(run_context, "messages", [])
    assert messages
    # There should be at least one assistant message (partial content)
    assert any(getattr(m, "role", None) == "assistant" for m in messages)


@pytest.mark.asyncio
async def test_stop_signal_non_streaming_persists_partial_message_and_sets_final_resp(
    runner, provider_request, mock_tool_executor, mock_hooks
):
    """Non-streaming mode: stop requested during the run should yield an aborted result
    and preserve partial assistant output in the run context messages.
    """
    provider = MockAbortableStreamProvider()

    await runner.reset(
        provider=provider,
        request=provider_request,
        run_context=ContextWrapper(context=None),
        tool_executor=mock_tool_executor,
        agent_hooks=mock_hooks,
        streaming=False,
    )

    step_gen = runner.step()

    # Consume the first step to allow the provider to start producing output.
    # This is where partial output may have been generated internally.
    first_step = await step_gen.__anext__()
    assert first_step is not None

    # Request stop after the run has started in non-streaming mode.
    runner.request_stop()

    # Collect remaining steps; one of them should reflect the aborted state.
    remaining_steps = [first_step]
    async for step in step_gen:
        remaining_steps.append(step)

    # Runner should be marked as aborted
    assert getattr(runner, "was_aborted")() is True
    # Final LLM response should be set
    assert getattr(runner, "final_llm_resp", None) is not None

    # There should be an aborted outcome in the collected steps
    assert any(
        getattr(s, "status", None) == "aborted"
        or getattr(getattr(s, "response", None), "status", None) == "aborted"
        for s in remaining_steps
    )

    # Partial assistant output should be preserved in the run context messages
    run_context = getattr(runner, "run_context", None)
    assert run_context is not None
    messages = getattr(run_context, "messages", [])
    assert messages
    assert any(getattr(m, "role", None) == "assistant" for m in messages)


@pytest.mark.asyncio
async def test_stop_requested_before_any_chunks_yields_aborted_and_empty_assistant(
    runner, provider_request, mock_tool_executor, mock_hooks
):
    """Stop requested before any chunks are consumed should still yield an aborted
    response and produce an appropriate final_llm_resp and messages.
    """
    provider = MockAbortableStreamProvider()

    await runner.reset(
        provider=provider,
        request=provider_request,
        run_context=ContextWrapper(context=None),
        tool_executor=mock_tool_executor,
        agent_hooks=mock_hooks,
        streaming=True,
    )

    # Create the generator but request stop before consuming any chunks
    step_gen = runner.step()
    runner.request_stop()

    steps = [step async for step in step_gen]

    # There should be at least one step representing the aborted outcome
    assert steps
    assert any(
        getattr(s, "status", None) == "aborted"
        or getattr(getattr(s, "response", None), "status", None) == "aborted"
        for s in steps
    )

    # Runner should be marked aborted and final_llm_resp should be consistent
    assert getattr(runner, "was_aborted")() is True
    final_llm_resp = getattr(runner, "final_llm_resp", None)
    assert final_llm_resp is not None

    # When stop happens before any chunks, assistant content should be empty
    # or equivalent to the "no output" contract in this codebase.
    assistant_messages = [
        m for m in getattr(runner.run_context, "messages", []) if getattr(m, "role", None) == "assistant"
    ]
    if assistant_messages:
        # If an assistant message exists, it should have empty/whitespace-only content
        assert all(
            not getattr(m, "content", "") or str(getattr(m, "content", "")).strip() == ""
            for m in assistant_messages
        )
    else:
        # Alternatively, no assistant messages at all is also an acceptable "empty" contract.
        assert assistant_messages == []

These tests assume:

  1. runner.was_aborted() is a callable method returning a bool, runner.final_llm_resp holds the final LLM response, and runner.run_context.messages is a list of message objects with role and content attributes.
  2. The step objects yielded by runner.step() either:
    • expose a .status attribute directly, or
    • have a .response attribute with a .status field set to "aborted" on abort.

If your actual API differs (e.g., different attribute names or response shapes), you will need to:

  • Update the getattr(..., "status", ...) checks to match your real step/response model.
  • Adjust how messages are accessed from run_context (e.g., runner.run_context.context.messages instead of runner.run_context.messages).
  • Align the “empty assistant output” assertions with the concrete structure of your final_llm_resp and message types.

You may also want to align docstrings and test names to your project’s existing test naming conventions if they differ.

Original comment in English

suggestion (testing): Add tests for non-streaming mode and for stop requested before any chunks are produced

The new test covers the streaming case after a partial chunk has been delivered, but two important cases are still untested:

  1. Non-streaming mode (streaming=False): Add a test using MockAbortableStreamProvider (or similar) with streaming=False and runner.request_stop() that asserts:

    • An aborted response is yielded.
    • runner.was_aborted() is True and final_llm_resp is set correctly.
    • Partial output is preserved in run_context.messages.
  2. Stop before any chunks: Add a test that calls runner.request_stop() before consuming from step() and asserts:

    • The generator yields an aborted response (or otherwise terminates as designed).
    • final_llm_resp matches the intended contract for the new branch (e.g. empty assistant message when llm_resp_result is missing).
    • run_context.messages matches that behavior.

These will exercise the new stop logic for both non-streaming runs and immediate-stop scenarios.

Suggested implementation:

@pytest.mark.asyncio
async def test_stop_signal_returns_aborted_and_persists_partial_message(
    runner, provider_request, mock_tool_executor, mock_hooks
):
    provider = MockAbortableStreamProvider()

    await runner.reset(
        provider=provider,
        request=provider_request,
        run_context=ContextWrapper(context=None),
        tool_executor=mock_tool_executor,
        agent_hooks=mock_hooks,
        streaming=True,
    )

    # Start a streaming step and consume at least one partial chunk
    step_gen = runner.step()
    partial_steps = []
    async for step in step_gen:
        partial_steps.append(step)
        # Simulate user requesting stop after first partial assistant chunk
        if len(partial_steps) == 1:
            runner.request_stop()
        # Once stop has been requested and an aborted response is observed, break
        if getattr(step, "status", None) == "aborted" or getattr(
            getattr(step, "response", None), "status", None
        ) == "aborted":
            break

    # We should have observed at least one step before aborting
    assert partial_steps
    # Runner should record that it was aborted
    assert getattr(runner, "was_aborted")() is True
    # Final LLM response should be recorded
    assert getattr(runner, "final_llm_resp", None) is not None

    # Partial assistant output should be preserved in the run context messages
    run_context = getattr(runner, "run_context", None)
    assert run_context is not None
    messages = getattr(run_context, "messages", [])
    assert messages
    # There should be at least one assistant message (partial content)
    assert any(getattr(m, "role", None) == "assistant" for m in messages)


@pytest.mark.asyncio
async def test_stop_signal_non_streaming_persists_partial_message_and_sets_final_resp(
    runner, provider_request, mock_tool_executor, mock_hooks
):
    """Non-streaming mode: stop requested during the run should yield an aborted result
    and preserve partial assistant output in the run context messages.
    """
    provider = MockAbortableStreamProvider()

    await runner.reset(
        provider=provider,
        request=provider_request,
        run_context=ContextWrapper(context=None),
        tool_executor=mock_tool_executor,
        agent_hooks=mock_hooks,
        streaming=False,
    )

    step_gen = runner.step()

    # Consume the first step to allow the provider to start producing output.
    # This is where partial output may have been generated internally.
    first_step = await step_gen.__anext__()
    assert first_step is not None

    # Request stop after the run has started in non-streaming mode.
    runner.request_stop()

    # Collect remaining steps; one of them should reflect the aborted state.
    remaining_steps = [first_step]
    async for step in step_gen:
        remaining_steps.append(step)

    # Runner should be marked as aborted
    assert getattr(runner, "was_aborted")() is True
    # Final LLM response should be set
    assert getattr(runner, "final_llm_resp", None) is not None

    # There should be an aborted outcome in the collected steps
    assert any(
        getattr(s, "status", None) == "aborted"
        or getattr(getattr(s, "response", None), "status", None) == "aborted"
        for s in remaining_steps
    )

    # Partial assistant output should be preserved in the run context messages
    run_context = getattr(runner, "run_context", None)
    assert run_context is not None
    messages = getattr(run_context, "messages", [])
    assert messages
    assert any(getattr(m, "role", None) == "assistant" for m in messages)


@pytest.mark.asyncio
async def test_stop_requested_before_any_chunks_yields_aborted_and_empty_assistant(
    runner, provider_request, mock_tool_executor, mock_hooks
):
    """Stop requested before any chunks are consumed should still yield an aborted
    response and produce an appropriate final_llm_resp and messages.
    """
    provider = MockAbortableStreamProvider()

    await runner.reset(
        provider=provider,
        request=provider_request,
        run_context=ContextWrapper(context=None),
        tool_executor=mock_tool_executor,
        agent_hooks=mock_hooks,
        streaming=True,
    )

    # Create the generator but request stop before consuming any chunks
    step_gen = runner.step()
    runner.request_stop()

    steps = [step async for step in step_gen]

    # There should be at least one step representing the aborted outcome
    assert steps
    assert any(
        getattr(s, "status", None) == "aborted"
        or getattr(getattr(s, "response", None), "status", None) == "aborted"
        for s in steps
    )

    # Runner should be marked aborted and final_llm_resp should be consistent
    assert getattr(runner, "was_aborted")() is True
    final_llm_resp = getattr(runner, "final_llm_resp", None)
    assert final_llm_resp is not None

    # When stop happens before any chunks, assistant content should be empty
    # or equivalent to the "no output" contract in this codebase.
    assistant_messages = [
        m for m in getattr(runner.run_context, "messages", []) if getattr(m, "role", None) == "assistant"
    ]
    if assistant_messages:
        # If an assistant message exists, it should have empty/whitespace-only content
        assert all(
            not getattr(m, "content", "") or str(getattr(m, "content", "")).strip() == ""
            for m in assistant_messages
        )
    else:
        # Alternatively, no assistant messages at all is also an acceptable "empty" contract.
        assert assistant_messages == []

These tests assume:

  1. runner.was_aborted() is a callable method returning a bool, runner.final_llm_resp holds the final LLM response, and runner.run_context.messages is a list of message objects with role and content attributes.
  2. The step objects yielded by runner.step() either:
    • expose a .status attribute directly, or
    • have a .response attribute with a .status field set to "aborted" on abort.

If your actual API differs (e.g., different attribute names or response shapes), you will need to:

  • Update the getattr(..., "status", ...) checks to match your real step/response model.
  • Adjust how messages are accessed from run_context (e.g., runner.run_context.context.messages instead of runner.run_context.messages).
  • Align the “empty assistant output” assertions with the concrete structure of your final_llm_resp and message types.

You may also want to align docstrings and test names to your project’s existing test naming conventions if they differ.

Comment on lines +440 to +444
rest_responses = []
async for response in step_iter:
rest_responses.append(response)

assert any(resp.type == "aborted" for resp in rest_responses)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): 收紧对 aborted 响应的断言,并覆盖 hook 行为

当前测试只断言了存在一个 aborted 响应。为更好地锁定预期行为,可以:

  1. 明确找到 aborted 响应,并对其 payload 进行断言(例如 MessageChain(type="aborted")),以便在 aborted 响应结构发生变化时能被测试捕获;
  2. 添加断言,确保相应的 hook(如 mock_hooks.on_agent_done)在中止时会以最终的 LLM 响应被调用一次,从而保证停止路径仍然会执行清理/后处理逻辑。

Suggested implementation:

    runner.request_stop()

    # Collect all remaining responses from the stream after requesting stop
    rest_responses = [first_resp]
    async for response in step_iter:
        rest_responses.append(response)

    # Locate the explicit aborted response and validate its payload
    aborted_resp = next(resp for resp in rest_responses if resp.type == "aborted")
    assert getattr(aborted_resp, "output", None) is not None
    # The aborted response should carry a MessageChain-style payload with type="aborted"
    assert aborted_resp.output.type == "aborted"

    # The runner should reflect that it was aborted
    assert runner.was_aborted() is True

    # The agent_done hook should still be called once with the final non-aborted response
    non_aborted_responses = [resp for resp in rest_responses if resp.type != "aborted"]
    final_response = non_aborted_responses[-1]
    mock_hooks.on_agent_done.assert_called_once_with(final_response)
  1. 如果 aborted_resp.output 是尚未在该测试模块中导入的 MessageChain(或类似)类型,请在文件顶部添加相应的 import,例如:
    • from inspect_ai.schema import MessageChain
  2. 确保传入 agent_hooksmock_hooks fixture(或工厂)暴露了 on_agent_done 作为 Mock/MagicMock
    • 比如,如果还没有的话,mock_hooks.on_agent_done = mocker.Mock()
  3. 如果 payload 的真实属性名不是 output(例如 messagedata),请根据现有响应结构,将 aborted_resp.output 调整为正确的属性名。
  4. 如果 hook 的预期函数签名是 (runner, final_response) 或包含额外参数,请调整 assert_called_once_with 以匹配具体签名(你可以参考本文件中其它对 mock_hooks.on_agent_done 断言的测试以保持一致)。
Original comment in English

suggestion (testing): Tighten assertions on the aborted response and exercise hook behavior

Currently the test only asserts that an aborted response exists. To better lock in the expected behavior:

  1. Explicitly locate the aborted response and assert on its payload (e.g., MessageChain(type="aborted")) so changes to the aborted response shape are caught.
  2. Add an assertion that the appropriate hook (e.g., mock_hooks.on_agent_done) is called once with the final LLM response on abort, to ensure the stop path still runs cleanup/post-processing logic.

Suggested implementation:

    runner.request_stop()

    # Collect all remaining responses from the stream after requesting stop
    rest_responses = [first_resp]
    async for response in step_iter:
        rest_responses.append(response)

    # Locate the explicit aborted response and validate its payload
    aborted_resp = next(resp for resp in rest_responses if resp.type == "aborted")
    assert getattr(aborted_resp, "output", None) is not None
    # The aborted response should carry a MessageChain-style payload with type="aborted"
    assert aborted_resp.output.type == "aborted"

    # The runner should reflect that it was aborted
    assert runner.was_aborted() is True

    # The agent_done hook should still be called once with the final non-aborted response
    non_aborted_responses = [resp for resp in rest_responses if resp.type != "aborted"]
    final_response = non_aborted_responses[-1]
    mock_hooks.on_agent_done.assert_called_once_with(final_response)
  1. If aborted_resp.output is a MessageChain (or similar) type that is not yet imported in this test module, add the appropriate import at the top of the file, e.g.:
    • from inspect_ai.schema import MessageChain
  2. Ensure the mock_hooks fixture (or factory) passed into agent_hooks exposes on_agent_done as a Mock/MagicMock:
    • e.g., mock_hooks.on_agent_done = mocker.Mock() or similar, if not already present.
  3. If the actual attribute name for the payload differs from output (e.g., message or data), adjust aborted_resp.output accordingly to match the existing response schema.
  4. If the hook’s expected signature is (runner, final_response) or includes additional parameters, update assert_called_once_with to match that exact signature (you can inspect other tests in this file that assert on mock_hooks.on_agent_done for consistency).

)
)

stop_watcher = asyncio.create_task(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): 考虑重构停止处理逻辑,使只有一个 watcher 负责 request_stop(),并通过共享的 helper 在一个地方清理 stop_watcher

你可以在不改变行为的前提下简化新的停止逻辑:

  1. 只保留一个调用 agent_runner.request_stop() 的入口;
  2. 去重 stop_watcher 的取消逻辑。

1. request_stop 的单一权威

目前:

  • _watch_agent_stop_signal 会调用 agent_runner.request_stop()
  • 主循环也会根据 _should_stop_agent 调用 agent_runner.request_stop()

这会让生命周期更难推理。建议只保留一个“权威调用点”。例如:让 watcher 成为唯一调用 request_stop() 的地方,而主循环只 观察 停止状态:

async for resp in agent_runner.step():
    # Only observe stop state; do not call request_stop() here
    if _should_stop_agent(astr_event):
        if resp.type == "aborted":
            # special aborted handling
            astr_event.set_extra("agent_user_aborted", True)
            astr_event.set_extra("agent_stop_requested", False)
            break
        # cooperative stop: just stop consuming further responses
        break

    if resp.type == "aborted":
        astr_event.set_extra("agent_user_aborted", True)
        astr_event.set_extra("agent_stop_requested", False)
        break

    if resp.type == "tool_call_result":
        ...

并仅在 watcher 中调用 request_stop()

async def _watch_agent_stop_signal(agent_runner: AgentRunner, astr_event) -> None:
    while not agent_runner.done():
        if _should_stop_agent(astr_event):
            agent_runner.request_stop()
            return
        await asyncio.sleep(0.5)

这样能保持语义不变,但会更清晰地表明:watcher 拥有停止请求的“所有权”,主循环只对停止状态作出反应。

2. 抽取重复的 stop_watcher 取消逻辑

stop_watcher 的取消模式在三处重复出现。可以抽取一个小 helper,和/或使用 try/finally 来集中处理。

Helper:

async def _cancel_task_safely(task: asyncio.Task | None) -> None:
    if not task or task.done():
        return
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        pass

然后在 run_agent 中:

stop_watcher: asyncio.Task | None = None
try:
    stop_watcher = asyncio.create_task(
        _watch_agent_stop_signal(agent_runner, astr_event),
    )

    async for resp in agent_runner.step():
        ...
        if resp.type == "aborted":
            astr_event.set_extra("agent_user_aborted", True)
            astr_event.set_extra("agent_stop_requested", False)
            break
        ...

    if agent_runner.done():
        ...
        break

except Exception as e:
    await _cancel_task_safely(stop_watcher)
    ...
finally:
    # ensure watcher is always cleaned up, even on normal completion
    await _cancel_task_safely(stop_watcher)

这样可以移除各个分支里的重复代码,使 stop_watcher 的生命周期非常明确:只创建一次,并在一个地方统一取消/等待完成。

Original comment in English

issue (complexity): Consider restructuring the stop handling so a single watcher owns request_stop() and shared helper logic cleans up stop_watcher in one place.

You can simplify the new stop logic without losing any behavior by:

  1. Having exactly one place that calls agent_runner.request_stop().
  2. De-duplicating the stop_watcher cancellation logic.

1. Single authority for request_stop

Right now:

  • _watch_agent_stop_signal calls agent_runner.request_stop().
  • The main loop also calls agent_runner.request_stop() based on _should_stop_agent.

That makes it harder to reason about the lifecycle. Pick one authority. For example: let the watcher be the only place that calls request_stop(), and let the loop only observe the stop state:

async for resp in agent_runner.step():
    # Only observe stop state; do not call request_stop() here
    if _should_stop_agent(astr_event):
        if resp.type == "aborted":
            # special aborted handling
            astr_event.set_extra("agent_user_aborted", True)
            astr_event.set_extra("agent_stop_requested", False)
            break
        # cooperative stop: just stop consuming further responses
        break

    if resp.type == "aborted":
        astr_event.set_extra("agent_user_aborted", True)
        astr_event.set_extra("agent_stop_requested", False)
        break

    if resp.type == "tool_call_result":
        ...

And keep request_stop() only in the watcher:

async def _watch_agent_stop_signal(agent_runner: AgentRunner, astr_event) -> None:
    while not agent_runner.done():
        if _should_stop_agent(astr_event):
            agent_runner.request_stop()
            return
        await asyncio.sleep(0.5)

This keeps the same semantics but makes it clear that the watcher owns the stop request, and the loop only reacts to the stop state.

2. Factor out repeated stop_watcher cancellation

The stop_watcher cancellation pattern is repeated in three places. Extract a tiny helper and/or use try/finally so it’s centralized.

Helper:

async def _cancel_task_safely(task: asyncio.Task | None) -> None:
    if not task or task.done():
        return
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        pass

Then in run_agent:

stop_watcher: asyncio.Task | None = None
try:
    stop_watcher = asyncio.create_task(
        _watch_agent_stop_signal(agent_runner, astr_event),
    )

    async for resp in agent_runner.step():
        ...
        if resp.type == "aborted":
            astr_event.set_extra("agent_user_aborted", True)
            astr_event.set_extra("agent_stop_requested", False)
            break
        ...

    if agent_runner.done():
        ...
        break

except Exception as e:
    await _cancel_task_safely(stop_watcher)
    ...
finally:
    # ensure watcher is always cleaned up, even on normal completion
    await _cancel_task_safely(stop_watcher)

This removes the per-branch duplication and makes the lifetime of stop_watcher obvious: created once, always cancelled/awaited in one place.

),
),
)
if self._stop_requested:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): 建议提取构造“被中断的 LLM 响应”和“完成中止流程”的辅助函数,从而将 step() 中的停止/中止逻辑集中起来,减少重复。

你可以通过抽取两个小 helper 来集中停止/中止路径并消除重复:一个用于构造“中断”的 LLMResponse,另一个用于完成中止流程(更新状态、统计、消息、hooks,并产出最终的 AgentResponse)。这样可以在保留行为的前提下让 step() 的控制流更加简洁。

1. 抽取构造中断 LLMResponse 的 helper

目前“用户中断”的系统字符串和归一化逻辑在多个地方重复出现。可以将它们封装到一个 helper 中:

SYSTEM_INTERRUPTED_TEXT = (
    "[SYSTEM: User actively interrupted the response generation. "
    "Partial output before interruption is preserved.]"
)

def _build_interrupted_response(
    self,
    base_resp: LLMResponse | None,
) -> LLMResponse:
    """
    Normalize an interrupted response into an assistant LLMResponse.

    - If base_resp is provided, preserve reasoning fields / partial content.
    - Ensure role='assistant'.
    - Ensure completion_text carries the system interruption message
      when appropriate.
    """
    if base_resp is None:
        return LLMResponse(role="assistant", completion_text="")

    if base_resp.role != "assistant":
        return LLMResponse(
            role="assistant",
            completion_text=SYSTEM_INTERRUPTED_TEXT,
            reasoning_content=base_resp.reasoning_content,
            reasoning_signature=base_resp.reasoning_signature,
        )

    # base_resp is already assistant; ensure completion_text has something
    if not base_resp.completion_text:
        return LLMResponse(
            role="assistant",
            completion_text=SYSTEM_INTERRUPTED_TEXT,
            reasoning_content=base_resp.reasoning_content,
            reasoning_signature=base_resp.reasoning_signature,
        )

    return base_resp

这样,循环内和循环后的停止分支都可以复用这个 helper,而不用在多处硬编码字符串和逻辑。

2. 抽取专职的中止收尾 helper

所有停止路径的副作用(状态、统计、消息、hook,以及最终产出的 AgentResponse)可以移动到一个私有方法中:

async def _finalize_abort(self, llm_resp: LLMResponse) -> AsyncIterator[AgentResponse]:
    logger.info("Agent execution was requested to stop by user.")

    self.final_llm_resp = llm_resp
    self._aborted = True
    self._transition_state(AgentState.DONE)
    self.stats.end_time = time.time()

    parts: list[Part] = []
    if llm_resp.reasoning_content or llm_resp.reasoning_signature:
        parts.append(
            ThinkPart(
                think=llm_resp.reasoning_content,
                encrypted=llm_resp.reasoning_signature,
            )
        )
    if llm_resp.completion_text:
        parts.append(TextPart(text=llm_resp.completion_text))
    if parts:
        self.run_context.messages.append(
            Message(role="assistant", content=parts)
        )

    try:
        await self.agent_hooks.on_agent_done(self.run_context, llm_resp)
    except Exception as e:
        logger.error("Error in on_agent_done hook: %s", e, exc_info=True)

    yield AgentResponse(
        type="aborted",
        data=AgentResponseData(chain=MessageChain(type="aborted")),
    )

3. 将 step() 简化为单一的中止路径

有了这些 helper,step() 只需要:

  • 在请求停止时(无论是循环内还是循环之后)构造并归一化一个 llm_resp_result
  • 在一个统一的早返回分支中调用 _finalize_abort

示例(只展示与你的修改相关的部分):

# inside the streaming loop
for llm_response in llm_responses:
    ...
    if self._stop_requested:
        # Use the latest chunk to build an interrupted response
        llm_resp_result = self._build_interrupted_response(llm_response)
        break

    continue

llm_resp_result = llm_response
...

# after the loop
if not llm_resp_result:
    if self._stop_requested:
        # No final chunk; still need a normalized interrupted response
        llm_resp_result = self._build_interrupted_response(None)
    else:
        return

if self._stop_requested:
    async for aborted_resp in self._finalize_abort(llm_resp_result):
        yield aborted_resp
    return

这样可以确保:

  • 停止行为以及状态/统计/消息的处理保持不变;
  • 只有一个清晰可见的中止分支;
  • 所有“用户中断”文案及归一化逻辑都集中在 _build_interrupted_response 中。
Original comment in English

issue (complexity): Consider extracting helpers for building the interrupted LLM response and finalizing aborts so the stop/abort logic in step() is centralized and less repetitive.

You can centralize the stop/abort path and remove duplication by extracting two small helpers: one to construct the “interrupted” LLMResponse, and one to finalize the abort (state, stats, messages, hooks, yield). That keeps step()’s control flow simpler while preserving behavior.

1. Extract a helper to build the interrupted LLMResponse

Right now the “user interrupted” system string and normalization are duplicated in multiple places. Wrap that into a single helper:

SYSTEM_INTERRUPTED_TEXT = (
    "[SYSTEM: User actively interrupted the response generation. "
    "Partial output before interruption is preserved.]"
)

def _build_interrupted_response(
    self,
    base_resp: LLMResponse | None,
) -> LLMResponse:
    """
    Normalize an interrupted response into an assistant LLMResponse.

    - If base_resp is provided, preserve reasoning fields / partial content.
    - Ensure role='assistant'.
    - Ensure completion_text carries the system interruption message
      when appropriate.
    """
    if base_resp is None:
        return LLMResponse(role="assistant", completion_text="")

    if base_resp.role != "assistant":
        return LLMResponse(
            role="assistant",
            completion_text=SYSTEM_INTERRUPTED_TEXT,
            reasoning_content=base_resp.reasoning_content,
            reasoning_signature=base_resp.reasoning_signature,
        )

    # base_resp is already assistant; ensure completion_text has something
    if not base_resp.completion_text:
        return LLMResponse(
            role="assistant",
            completion_text=SYSTEM_INTERRUPTED_TEXT,
            reasoning_content=base_resp.reasoning_content,
            reasoning_signature=base_resp.reasoning_signature,
        )

    return base_resp

Now both the in-loop and post-loop stop branches can reuse this instead of hard-coding strings and logic.

2. Extract a focused abort finalizer

All the stop-path side effects (state, stats, messages, hook, and yielding AgentResponse) can move into a single private method:

async def _finalize_abort(self, llm_resp: LLMResponse) -> AsyncIterator[AgentResponse]:
    logger.info("Agent execution was requested to stop by user.")

    self.final_llm_resp = llm_resp
    self._aborted = True
    self._transition_state(AgentState.DONE)
    self.stats.end_time = time.time()

    parts: list[Part] = []
    if llm_resp.reasoning_content or llm_resp.reasoning_signature:
        parts.append(
            ThinkPart(
                think=llm_resp.reasoning_content,
                encrypted=llm_resp.reasoning_signature,
            )
        )
    if llm_resp.completion_text:
        parts.append(TextPart(text=llm_resp.completion_text))
    if parts:
        self.run_context.messages.append(
            Message(role="assistant", content=parts)
        )

    try:
        await self.agent_hooks.on_agent_done(self.run_context, llm_resp)
    except Exception as e:
        logger.error("Error in on_agent_done hook: %s", e, exc_info=True)

    yield AgentResponse(
        type="aborted",
        data=AgentResponseData(chain=MessageChain(type="aborted")),
    )

3. Simplify step() to a single authoritative abort path

With those helpers, step() only needs:

  • A normalized llm_resp_result set when stop is requested (either in-loop or post-loop).
  • A single early exit that delegates to _finalize_abort.

For example (showing just the relevant parts around your changes):

# inside the streaming loop
for llm_response in llm_responses:
    ...
    if self._stop_requested:
        # Use the latest chunk to build an interrupted response
        llm_resp_result = self._build_interrupted_response(llm_response)
        break

    continue

llm_resp_result = llm_response
...

# after the loop
if not llm_resp_result:
    if self._stop_requested:
        # No final chunk; still need a normalized interrupted response
        llm_resp_result = self._build_interrupted_response(None)
    else:
        return

if self._stop_requested:
    async for aborted_resp in self._finalize_abort(llm_resp_result):
        yield aborted_resp
    return

This keeps:

  • The stop behavior and stats/state/messaging identical.
  • A single, clearly visible abort branch.
  • All “user interrupted” string and normalization logic centralized in _build_interrupted_response.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

The pull request introduces a valuable 'stop' functionality for active agent sessions, allowing users to interrupt long-running tasks across the backend, utilities, and frontend. However, a critical security vulnerability exists in the command-line interface for the 'stop' command, lacking permission checks, which could allow unauthorized users to disrupt interactions in group chats. Furthermore, the implementation could be improved by addressing inefficiencies in run_agent's stop signal monitoring due to per-step task creation, reducing latency from the watcher's 0.5s polling interval, and ensuring partial output is correctly preserved in the database history by implementing chunk accumulation in the runner.

Comment on lines +334 to +339
llm_resp_result = LLMResponse(
role="assistant",
completion_text="[SYSTEM: User actively interrupted the response generation. Partial output before interruption is preserved.]",
reasoning_content=llm_response.reasoning_content,
reasoning_signature=llm_response.reasoning_signature,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The runner currently does not accumulate streaming chunks. When a stop is requested, llm_resp_result is created using only the data from the current chunk. Consequently, all text and reasoning content yielded in previous chunks will be missing from the final response saved to the conversation history. To correctly preserve partial output as intended, you should accumulate completion_text and reasoning_content in buffers throughout the streaming loop and use those buffers when constructing the aborted LLMResponse.

Comment on lines +105 to +117
async def stop(self, message: AstrMessageEvent) -> None:
"""停止当前会话正在运行的 Agent"""
cfg = self.context.get_config(umo=message.unified_msg_origin)
agent_runner_type = cfg["provider_settings"]["agent_runner_type"]
umo = message.unified_msg_origin

if agent_runner_type in THIRD_PARTY_AGENT_RUNNER_KEY:
stopped_count = active_event_registry.stop_all(umo, exclude=message)
else:
stopped_count = active_event_registry.request_agent_stop_all(
umo,
exclude=message,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-medium medium

The stop command lacks permission checks, allowing any user in a shared session (such as a group chat) to interrupt an active agent task initiated by another user. In contrast, other destructive or disruptive commands in the same file, such as reset and del_conv, implement permission checks that default to requiring administrator privileges in group settings. This inconsistency allows a regular member to perform a denial-of-service-like action against other members' interactions with the bot.

Comment on lines +55 to +57
stop_watcher = asyncio.create_task(
_watch_agent_stop_signal(agent_runner, astr_event),
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Creating and cancelling the stop_watcher task inside the while loop for every step is inefficient. If an agent run takes many steps, this results in frequent task creation and destruction. It is recommended to create a single watcher task for the entire duration of the run_agent execution and clean it up in a finally block.

if _should_stop_agent(astr_event):
agent_runner.request_stop()
return
await asyncio.sleep(0.5)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

A 0.5-second polling interval introduces a noticeable delay between the user clicking the stop button and the agent actually stopping. Reducing this interval to 0.1s would make the functionality feel much more responsive.

Suggested change
await asyncio.sleep(0.5)
await asyncio.sleep(0.1)

Comment on lines +383 to +389
# if user_aborted:
# message_to_save.append(
# Message(
# role="assistant",
# content="[User aborted this request. Partial output before abort was preserved.]",
# ).model_dump()
# )
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This block of commented-out code should be removed to maintain a clean codebase.

Comment on lines +631 to +634
umo = (
f"{session.platform_id}:{message_type}:"
f"{session.platform_id}!{username}!{session_id}"
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic for constructing the umo (Unified Message Origin) string is duplicated here and in the delete_webchat_session method. This duplication is fragile; if the UMO format requirements change, it's easy to miss one of these locations. Consider extracting this logic into a helper method or a property on the session model.

Comment on lines +64 to +69
if not stop_watcher.done():
stop_watcher.cancel()
try:
await stop_watcher
except asyncio.CancelledError:
pass
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This cleanup logic for the stop_watcher is repeated three times in this file (lines 64-69, 145-150, and 164-169). Extracting this into a small helper function would improve maintainability and reduce code duplication.

@Soulter Soulter merged commit e357d9d into master Feb 23, 2026
6 checks passed
astrbot-doc-agent bot pushed a commit to AstrBotDevs/AstrBot-docs that referenced this pull request Feb 23, 2026
@astrbot-doc-agent
Copy link

Generated docs update PR (pending manual review):
AstrBotDevs/AstrBot-docs#135
Trigger: PR merged


AI change summary:

  • zh/use/command.mden/use/command.md 中新增 /stop 指令说明,用于中断当前 Agent 任务。
  • zh/use/webui.mden/use/webui.md 中新增聊天章节,说明输入框右侧停止按钮的功能。
  • 中英文文档均已同步更新。

Experimental bot notice:

  • This output is generated by AstrBot-Doc-Agent for review only.
  • It does not represent the final documentation form.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:core The bug / feature is about astrbot's core, backend feature:chatui The bug / feature is about astrbot's chatui, webchat size:L This PR changes 100-499 lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature] agent连续执行是不能打断,目前没有找到任何打断agent执行的方式。。

1 participant