Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions astrbot/builtin_stars/builtin_commands/commands/conversation.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,30 @@ async def reset(self, message: AstrMessageEvent) -> None:

message.set_result(MessageEventResult().message(ret))

async def stop(self, message: AstrMessageEvent) -> None:
"""停止当前会话正在运行的 Agent"""
cfg = self.context.get_config(umo=message.unified_msg_origin)
agent_runner_type = cfg["provider_settings"]["agent_runner_type"]
umo = message.unified_msg_origin

if agent_runner_type in THIRD_PARTY_AGENT_RUNNER_KEY:
stopped_count = active_event_registry.stop_all(umo, exclude=message)
else:
stopped_count = active_event_registry.request_agent_stop_all(
umo,
exclude=message,
)
Comment on lines +105 to +117
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-medium medium

The stop command lacks permission checks, allowing any user in a shared session (such as a group chat) to interrupt an active agent task initiated by another user. In contrast, other destructive or disruptive commands in the same file, such as reset and del_conv, implement permission checks that default to requiring administrator privileges in group settings. This inconsistency allows a regular member to perform a denial-of-service-like action against other members' interactions with the bot.


if stopped_count > 0:
message.set_result(
MessageEventResult().message(
f"已请求停止 {stopped_count} 个运行中的任务。"
)
)
return

message.set_result(MessageEventResult().message("当前会话没有运行中的任务。"))

async def his(self, message: AstrMessageEvent, page: int = 1) -> None:
"""查看对话记录"""
if not self.context.get_using_provider(message.unified_msg_origin):
Expand Down
5 changes: 5 additions & 0 deletions astrbot/builtin_stars/builtin_commands/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,11 @@ async def reset(self, message: AstrMessageEvent) -> None:
"""重置 LLM 会话"""
await self.conversation_c.reset(message)

@filter.command("stop")
async def stop(self, message: AstrMessageEvent) -> None:
"""停止当前会话中正在运行的 Agent"""
await self.conversation_c.stop(message)

@filter.permission_type(filter.PermissionType.ADMIN)
@filter.command("model")
async def model_ls(
Expand Down
58 changes: 58 additions & 0 deletions astrbot/core/agent/runners/tool_loop_agent_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ async def reset(
self.tool_executor = tool_executor
self.agent_hooks = agent_hooks
self.run_context = run_context
self._stop_requested = False
self._aborted = False

# These two are used for tool schema mode handling
# We now have two modes:
Expand Down Expand Up @@ -328,6 +330,14 @@ async def step(self):
),
),
)
if self._stop_requested:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): 建议提取构造“被中断的 LLM 响应”和“完成中止流程”的辅助函数,从而将 step() 中的停止/中止逻辑集中起来,减少重复。

你可以通过抽取两个小 helper 来集中停止/中止路径并消除重复:一个用于构造“中断”的 LLMResponse,另一个用于完成中止流程(更新状态、统计、消息、hooks,并产出最终的 AgentResponse)。这样可以在保留行为的前提下让 step() 的控制流更加简洁。

1. 抽取构造中断 LLMResponse 的 helper

目前“用户中断”的系统字符串和归一化逻辑在多个地方重复出现。可以将它们封装到一个 helper 中:

SYSTEM_INTERRUPTED_TEXT = (
    "[SYSTEM: User actively interrupted the response generation. "
    "Partial output before interruption is preserved.]"
)

def _build_interrupted_response(
    self,
    base_resp: LLMResponse | None,
) -> LLMResponse:
    """
    Normalize an interrupted response into an assistant LLMResponse.

    - If base_resp is provided, preserve reasoning fields / partial content.
    - Ensure role='assistant'.
    - Ensure completion_text carries the system interruption message
      when appropriate.
    """
    if base_resp is None:
        return LLMResponse(role="assistant", completion_text="")

    if base_resp.role != "assistant":
        return LLMResponse(
            role="assistant",
            completion_text=SYSTEM_INTERRUPTED_TEXT,
            reasoning_content=base_resp.reasoning_content,
            reasoning_signature=base_resp.reasoning_signature,
        )

    # base_resp is already assistant; ensure completion_text has something
    if not base_resp.completion_text:
        return LLMResponse(
            role="assistant",
            completion_text=SYSTEM_INTERRUPTED_TEXT,
            reasoning_content=base_resp.reasoning_content,
            reasoning_signature=base_resp.reasoning_signature,
        )

    return base_resp

这样,循环内和循环后的停止分支都可以复用这个 helper,而不用在多处硬编码字符串和逻辑。

2. 抽取专职的中止收尾 helper

所有停止路径的副作用(状态、统计、消息、hook,以及最终产出的 AgentResponse)可以移动到一个私有方法中:

async def _finalize_abort(self, llm_resp: LLMResponse) -> AsyncIterator[AgentResponse]:
    logger.info("Agent execution was requested to stop by user.")

    self.final_llm_resp = llm_resp
    self._aborted = True
    self._transition_state(AgentState.DONE)
    self.stats.end_time = time.time()

    parts: list[Part] = []
    if llm_resp.reasoning_content or llm_resp.reasoning_signature:
        parts.append(
            ThinkPart(
                think=llm_resp.reasoning_content,
                encrypted=llm_resp.reasoning_signature,
            )
        )
    if llm_resp.completion_text:
        parts.append(TextPart(text=llm_resp.completion_text))
    if parts:
        self.run_context.messages.append(
            Message(role="assistant", content=parts)
        )

    try:
        await self.agent_hooks.on_agent_done(self.run_context, llm_resp)
    except Exception as e:
        logger.error("Error in on_agent_done hook: %s", e, exc_info=True)

    yield AgentResponse(
        type="aborted",
        data=AgentResponseData(chain=MessageChain(type="aborted")),
    )

3. 将 step() 简化为单一的中止路径

有了这些 helper,step() 只需要:

  • 在请求停止时(无论是循环内还是循环之后)构造并归一化一个 llm_resp_result
  • 在一个统一的早返回分支中调用 _finalize_abort

示例(只展示与你的修改相关的部分):

# inside the streaming loop
for llm_response in llm_responses:
    ...
    if self._stop_requested:
        # Use the latest chunk to build an interrupted response
        llm_resp_result = self._build_interrupted_response(llm_response)
        break

    continue

llm_resp_result = llm_response
...

# after the loop
if not llm_resp_result:
    if self._stop_requested:
        # No final chunk; still need a normalized interrupted response
        llm_resp_result = self._build_interrupted_response(None)
    else:
        return

if self._stop_requested:
    async for aborted_resp in self._finalize_abort(llm_resp_result):
        yield aborted_resp
    return

这样可以确保:

  • 停止行为以及状态/统计/消息的处理保持不变;
  • 只有一个清晰可见的中止分支;
  • 所有“用户中断”文案及归一化逻辑都集中在 _build_interrupted_response 中。
Original comment in English

issue (complexity): Consider extracting helpers for building the interrupted LLM response and finalizing aborts so the stop/abort logic in step() is centralized and less repetitive.

You can centralize the stop/abort path and remove duplication by extracting two small helpers: one to construct the “interrupted” LLMResponse, and one to finalize the abort (state, stats, messages, hooks, yield). That keeps step()’s control flow simpler while preserving behavior.

1. Extract a helper to build the interrupted LLMResponse

Right now the “user interrupted” system string and normalization are duplicated in multiple places. Wrap that into a single helper:

SYSTEM_INTERRUPTED_TEXT = (
    "[SYSTEM: User actively interrupted the response generation. "
    "Partial output before interruption is preserved.]"
)

def _build_interrupted_response(
    self,
    base_resp: LLMResponse | None,
) -> LLMResponse:
    """
    Normalize an interrupted response into an assistant LLMResponse.

    - If base_resp is provided, preserve reasoning fields / partial content.
    - Ensure role='assistant'.
    - Ensure completion_text carries the system interruption message
      when appropriate.
    """
    if base_resp is None:
        return LLMResponse(role="assistant", completion_text="")

    if base_resp.role != "assistant":
        return LLMResponse(
            role="assistant",
            completion_text=SYSTEM_INTERRUPTED_TEXT,
            reasoning_content=base_resp.reasoning_content,
            reasoning_signature=base_resp.reasoning_signature,
        )

    # base_resp is already assistant; ensure completion_text has something
    if not base_resp.completion_text:
        return LLMResponse(
            role="assistant",
            completion_text=SYSTEM_INTERRUPTED_TEXT,
            reasoning_content=base_resp.reasoning_content,
            reasoning_signature=base_resp.reasoning_signature,
        )

    return base_resp

Now both the in-loop and post-loop stop branches can reuse this instead of hard-coding strings and logic.

2. Extract a focused abort finalizer

All the stop-path side effects (state, stats, messages, hook, and yielding AgentResponse) can move into a single private method:

async def _finalize_abort(self, llm_resp: LLMResponse) -> AsyncIterator[AgentResponse]:
    logger.info("Agent execution was requested to stop by user.")

    self.final_llm_resp = llm_resp
    self._aborted = True
    self._transition_state(AgentState.DONE)
    self.stats.end_time = time.time()

    parts: list[Part] = []
    if llm_resp.reasoning_content or llm_resp.reasoning_signature:
        parts.append(
            ThinkPart(
                think=llm_resp.reasoning_content,
                encrypted=llm_resp.reasoning_signature,
            )
        )
    if llm_resp.completion_text:
        parts.append(TextPart(text=llm_resp.completion_text))
    if parts:
        self.run_context.messages.append(
            Message(role="assistant", content=parts)
        )

    try:
        await self.agent_hooks.on_agent_done(self.run_context, llm_resp)
    except Exception as e:
        logger.error("Error in on_agent_done hook: %s", e, exc_info=True)

    yield AgentResponse(
        type="aborted",
        data=AgentResponseData(chain=MessageChain(type="aborted")),
    )

3. Simplify step() to a single authoritative abort path

With those helpers, step() only needs:

  • A normalized llm_resp_result set when stop is requested (either in-loop or post-loop).
  • A single early exit that delegates to _finalize_abort.

For example (showing just the relevant parts around your changes):

# inside the streaming loop
for llm_response in llm_responses:
    ...
    if self._stop_requested:
        # Use the latest chunk to build an interrupted response
        llm_resp_result = self._build_interrupted_response(llm_response)
        break

    continue

llm_resp_result = llm_response
...

# after the loop
if not llm_resp_result:
    if self._stop_requested:
        # No final chunk; still need a normalized interrupted response
        llm_resp_result = self._build_interrupted_response(None)
    else:
        return

if self._stop_requested:
    async for aborted_resp in self._finalize_abort(llm_resp_result):
        yield aborted_resp
    return

This keeps:

  • The stop behavior and stats/state/messaging identical.
  • A single, clearly visible abort branch.
  • All “user interrupted” string and normalization logic centralized in _build_interrupted_response.

llm_resp_result = LLMResponse(
role="assistant",
completion_text="[SYSTEM: User actively interrupted the response generation. Partial output before interruption is preserved.]",
reasoning_content=llm_response.reasoning_content,
reasoning_signature=llm_response.reasoning_signature,
Comment on lines +334 to +338
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: 用户中断的系统消息字符串被重复使用了;建议将其集中管理。

这个中断标记字符串在这里以及循环结束后的 _stop_requested 分支中都是硬编码的。建议把它提取为一个共享常量或 helper(例如 make_user_abort_response(...)),这样两处路径中的文案可以保持一致,也更便于维护。

Suggested implementation:

                if self._stop_requested:
                    llm_resp_result = LLMResponse(
                        role="assistant",
                        completion_text=USER_ABORT_SYSTEM_MESSAGE,
                        reasoning_content=llm_response.reasoning_content,
                        reasoning_signature=llm_response.reasoning_signature,
                    )
                    break
        if not llm_resp_result:
            if self._stop_requested:
                llm_resp_result = LLMResponse(
                    role="assistant",
                    completion_text=USER_ABORT_SYSTEM_MESSAGE,
                )
            else:
                return

要完整实现这一建议,还需要:

  1. 在该模块中只定义一次共享常量 USER_ABORT_SYSTEM_MESSAGE,例如放在 tool_loop_agent_runner.py 顶部附近:
USER_ABORT_SYSTEM_MESSAGE = (
    "[SYSTEM: User actively interrupted the response generation. "
    "Partial output before interruption is preserved.]"
)
  1. 如果这个中断消息字符串在本文件(或相关的 runner)中还有其他出现位置,也替换为 USER_ABORT_SYSTEM_MESSAGE,让文案集中管理并保持一致。

如果你更倾向于用 helper 而不是裸常量(比如希望在有推理字段时保留下来),可以在本模块中定义一个类似 make_user_abort_response(llm_response: LLMResponse | None) -> LLMResponse 的函数,然后让这两个调用点都使用该函数,而不是直接构造 LLMResponse

Original comment in English

suggestion: The user-interruption system message string is duplicated; consider centralizing it.

This interruption marker string is hard-coded both here and in the _stop_requested handling after the loop. Please extract it into a shared constant or helper (e.g., make_user_abort_response(...)) so the text stays consistent and easier to maintain across both paths.

Suggested implementation:

                if self._stop_requested:
                    llm_resp_result = LLMResponse(
                        role="assistant",
                        completion_text=USER_ABORT_SYSTEM_MESSAGE,
                        reasoning_content=llm_response.reasoning_content,
                        reasoning_signature=llm_response.reasoning_signature,
                    )
                    break
        if not llm_resp_result:
            if self._stop_requested:
                llm_resp_result = LLMResponse(
                    role="assistant",
                    completion_text=USER_ABORT_SYSTEM_MESSAGE,
                )
            else:
                return

To fully implement the suggestion, also:

  1. Define the shared constant USER_ABORT_SYSTEM_MESSAGE once in this module, for example near the top of tool_loop_agent_runner.py:
USER_ABORT_SYSTEM_MESSAGE = (
    "[SYSTEM: User actively interrupted the response generation. "
    "Partial output before interruption is preserved.]"
)
  1. If there are any other occurrences of the same interruption message string elsewhere in this file (or related runners), replace them with USER_ABORT_SYSTEM_MESSAGE so the text is centralized and consistent.

If you prefer a helper instead of a bare constant (e.g., to preserve reasoning fields when available), you can define a function like make_user_abort_response(llm_response: LLMResponse | None) -> LLMResponse in this module and have both call sites use it instead of constructing LLMResponse directly.

)
Comment on lines +334 to +339
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The runner currently does not accumulate streaming chunks. When a stop is requested, llm_resp_result is created using only the data from the current chunk. Consequently, all text and reasoning content yielded in previous chunks will be missing from the final response saved to the conversation history. To correctly preserve partial output as intended, you should accumulate completion_text and reasoning_content in buffers throughout the streaming loop and use those buffers when constructing the aborted LLMResponse.

break
continue
llm_resp_result = llm_response

Expand All @@ -339,6 +349,48 @@ async def step(self):
break # got final response

if not llm_resp_result:
if self._stop_requested:
llm_resp_result = LLMResponse(role="assistant", completion_text="")
else:
return

if self._stop_requested:
logger.info("Agent execution was requested to stop by user.")
llm_resp = llm_resp_result
if llm_resp.role != "assistant":
llm_resp = LLMResponse(
role="assistant",
completion_text="[SYSTEM: User actively interrupted the response generation. Partial output before interruption is preserved.]",
)
self.final_llm_resp = llm_resp
self._aborted = True
self._transition_state(AgentState.DONE)
self.stats.end_time = time.time()

parts = []
if llm_resp.reasoning_content or llm_resp.reasoning_signature:
parts.append(
ThinkPart(
think=llm_resp.reasoning_content,
encrypted=llm_resp.reasoning_signature,
)
)
if llm_resp.completion_text:
parts.append(TextPart(text=llm_resp.completion_text))
if parts:
self.run_context.messages.append(
Message(role="assistant", content=parts)
)

try:
await self.agent_hooks.on_agent_done(self.run_context, llm_resp)
except Exception as e:
logger.error(f"Error in on_agent_done hook: {e}", exc_info=True)

yield AgentResponse(
type="aborted",
data=AgentResponseData(chain=MessageChain(type="aborted")),
)
return

# 处理 LLM 响应
Expand Down Expand Up @@ -848,5 +900,11 @@ def done(self) -> bool:
"""检查 Agent 是否已完成工作"""
return self._state in (AgentState.DONE, AgentState.ERROR)

def request_stop(self) -> None:
self._stop_requested = True

def was_aborted(self) -> bool:
return self._aborted

def get_final_llm_resp(self) -> LLMResponse | None:
return self.final_llm_resp
44 changes: 43 additions & 1 deletion astrbot/core/astr_agent_run_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
AgentRunner = ToolLoopAgentRunner[AstrAgentContext]


def _should_stop_agent(astr_event) -> bool:
return astr_event.is_stopped() or bool(astr_event.get_extra("agent_stop_requested"))


async def run_agent(
agent_runner: AgentRunner,
max_step: int = 30,
Expand Down Expand Up @@ -48,10 +52,28 @@ async def run_agent(
)
)

stop_watcher = asyncio.create_task(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): 考虑重构停止处理逻辑,使只有一个 watcher 负责 request_stop(),并通过共享的 helper 在一个地方清理 stop_watcher

你可以在不改变行为的前提下简化新的停止逻辑:

  1. 只保留一个调用 agent_runner.request_stop() 的入口;
  2. 去重 stop_watcher 的取消逻辑。

1. request_stop 的单一权威

目前:

  • _watch_agent_stop_signal 会调用 agent_runner.request_stop()
  • 主循环也会根据 _should_stop_agent 调用 agent_runner.request_stop()

这会让生命周期更难推理。建议只保留一个“权威调用点”。例如:让 watcher 成为唯一调用 request_stop() 的地方,而主循环只 观察 停止状态:

async for resp in agent_runner.step():
    # Only observe stop state; do not call request_stop() here
    if _should_stop_agent(astr_event):
        if resp.type == "aborted":
            # special aborted handling
            astr_event.set_extra("agent_user_aborted", True)
            astr_event.set_extra("agent_stop_requested", False)
            break
        # cooperative stop: just stop consuming further responses
        break

    if resp.type == "aborted":
        astr_event.set_extra("agent_user_aborted", True)
        astr_event.set_extra("agent_stop_requested", False)
        break

    if resp.type == "tool_call_result":
        ...

并仅在 watcher 中调用 request_stop()

async def _watch_agent_stop_signal(agent_runner: AgentRunner, astr_event) -> None:
    while not agent_runner.done():
        if _should_stop_agent(astr_event):
            agent_runner.request_stop()
            return
        await asyncio.sleep(0.5)

这样能保持语义不变,但会更清晰地表明:watcher 拥有停止请求的“所有权”,主循环只对停止状态作出反应。

2. 抽取重复的 stop_watcher 取消逻辑

stop_watcher 的取消模式在三处重复出现。可以抽取一个小 helper,和/或使用 try/finally 来集中处理。

Helper:

async def _cancel_task_safely(task: asyncio.Task | None) -> None:
    if not task or task.done():
        return
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        pass

然后在 run_agent 中:

stop_watcher: asyncio.Task | None = None
try:
    stop_watcher = asyncio.create_task(
        _watch_agent_stop_signal(agent_runner, astr_event),
    )

    async for resp in agent_runner.step():
        ...
        if resp.type == "aborted":
            astr_event.set_extra("agent_user_aborted", True)
            astr_event.set_extra("agent_stop_requested", False)
            break
        ...

    if agent_runner.done():
        ...
        break

except Exception as e:
    await _cancel_task_safely(stop_watcher)
    ...
finally:
    # ensure watcher is always cleaned up, even on normal completion
    await _cancel_task_safely(stop_watcher)

这样可以移除各个分支里的重复代码,使 stop_watcher 的生命周期非常明确:只创建一次,并在一个地方统一取消/等待完成。

Original comment in English

issue (complexity): Consider restructuring the stop handling so a single watcher owns request_stop() and shared helper logic cleans up stop_watcher in one place.

You can simplify the new stop logic without losing any behavior by:

  1. Having exactly one place that calls agent_runner.request_stop().
  2. De-duplicating the stop_watcher cancellation logic.

1. Single authority for request_stop

Right now:

  • _watch_agent_stop_signal calls agent_runner.request_stop().
  • The main loop also calls agent_runner.request_stop() based on _should_stop_agent.

That makes it harder to reason about the lifecycle. Pick one authority. For example: let the watcher be the only place that calls request_stop(), and let the loop only observe the stop state:

async for resp in agent_runner.step():
    # Only observe stop state; do not call request_stop() here
    if _should_stop_agent(astr_event):
        if resp.type == "aborted":
            # special aborted handling
            astr_event.set_extra("agent_user_aborted", True)
            astr_event.set_extra("agent_stop_requested", False)
            break
        # cooperative stop: just stop consuming further responses
        break

    if resp.type == "aborted":
        astr_event.set_extra("agent_user_aborted", True)
        astr_event.set_extra("agent_stop_requested", False)
        break

    if resp.type == "tool_call_result":
        ...

And keep request_stop() only in the watcher:

async def _watch_agent_stop_signal(agent_runner: AgentRunner, astr_event) -> None:
    while not agent_runner.done():
        if _should_stop_agent(astr_event):
            agent_runner.request_stop()
            return
        await asyncio.sleep(0.5)

This keeps the same semantics but makes it clear that the watcher owns the stop request, and the loop only reacts to the stop state.

2. Factor out repeated stop_watcher cancellation

The stop_watcher cancellation pattern is repeated in three places. Extract a tiny helper and/or use try/finally so it’s centralized.

Helper:

async def _cancel_task_safely(task: asyncio.Task | None) -> None:
    if not task or task.done():
        return
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        pass

Then in run_agent:

stop_watcher: asyncio.Task | None = None
try:
    stop_watcher = asyncio.create_task(
        _watch_agent_stop_signal(agent_runner, astr_event),
    )

    async for resp in agent_runner.step():
        ...
        if resp.type == "aborted":
            astr_event.set_extra("agent_user_aborted", True)
            astr_event.set_extra("agent_stop_requested", False)
            break
        ...

    if agent_runner.done():
        ...
        break

except Exception as e:
    await _cancel_task_safely(stop_watcher)
    ...
finally:
    # ensure watcher is always cleaned up, even on normal completion
    await _cancel_task_safely(stop_watcher)

This removes the per-branch duplication and makes the lifetime of stop_watcher obvious: created once, always cancelled/awaited in one place.

_watch_agent_stop_signal(agent_runner, astr_event),
)
Comment on lines +55 to +57
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Creating and cancelling the stop_watcher task inside the while loop for every step is inefficient. If an agent run takes many steps, this results in frequent task creation and destruction. It is recommended to create a single watcher task for the entire duration of the run_agent execution and clean it up in a finally block.

try:
async for resp in agent_runner.step():
if astr_event.is_stopped():
if _should_stop_agent(astr_event):
agent_runner.request_stop()

if resp.type == "aborted":
if not stop_watcher.done():
stop_watcher.cancel()
try:
await stop_watcher
except asyncio.CancelledError:
pass
Comment on lines +64 to +69
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This cleanup logic for the stop_watcher is repeated three times in this file (lines 64-69, 145-150, and 164-169). Extracting this into a small helper function would improve maintainability and reduce code duplication.

astr_event.set_extra("agent_user_aborted", True)
astr_event.set_extra("agent_stop_requested", False)
return

if _should_stop_agent(astr_event):
continue

if resp.type == "tool_call_result":
msg_chain = resp.data["chain"]

Expand Down Expand Up @@ -120,6 +142,12 @@ async def run_agent(
# display the reasoning content only when configured
continue
yield resp.data["chain"] # MessageChain
if not stop_watcher.done():
stop_watcher.cancel()
try:
await stop_watcher
except asyncio.CancelledError:
pass
if agent_runner.done():
# send agent stats to webchat
if astr_event.get_platform_name() == "webchat":
Expand All @@ -133,6 +161,12 @@ async def run_agent(
break

except Exception as e:
if "stop_watcher" in locals() and not stop_watcher.done():
stop_watcher.cancel()
try:
await stop_watcher
except asyncio.CancelledError:
pass
logger.error(traceback.format_exc())

err_msg = f"\n\nAstrBot 请求失败。\n错误类型: {type(e).__name__}\n错误信息: {e!s}\n\n请在平台日志查看和分享错误详情。\n"
Expand All @@ -155,6 +189,14 @@ async def run_agent(
return


async def _watch_agent_stop_signal(agent_runner: AgentRunner, astr_event) -> None:
while not agent_runner.done():
if _should_stop_agent(astr_event):
agent_runner.request_stop()
return
await asyncio.sleep(0.5)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

A 0.5-second polling interval introduces a noticeable delay between the user clicking the stop button and the agent actually stopping. Reducing this interval to 0.1s would make the functionality feel much more responsive.

Suggested change
await asyncio.sleep(0.5)
await asyncio.sleep(0.1)



async def run_live_agent(
agent_runner: AgentRunner,
tts_provider: TTSProvider | None = None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,13 +247,16 @@ async def process(
yield

# 保存历史记录
if not event.is_stopped() and agent_runner.done():
if agent_runner.done() and (
not event.is_stopped() or agent_runner.was_aborted()
):
await self._save_to_history(
event,
req,
agent_runner.get_final_llm_resp(),
agent_runner.run_context.messages,
agent_runner.stats,
user_aborted=agent_runner.was_aborted(),
)

elif streaming_response and not stream_to_general:
Expand Down Expand Up @@ -308,13 +311,14 @@ async def process(
)

# 检查事件是否被停止,如果被停止则不保存历史记录
if not event.is_stopped():
if not event.is_stopped() or agent_runner.was_aborted():
await self._save_to_history(
event,
req,
final_resp,
agent_runner.run_context.messages,
agent_runner.stats,
user_aborted=agent_runner.was_aborted(),
)

asyncio.create_task(
Expand All @@ -340,16 +344,29 @@ async def _save_to_history(
llm_response: LLMResponse | None,
all_messages: list[Message],
runner_stats: AgentStats | None,
user_aborted: bool = False,
) -> None:
if (
not req
or not req.conversation
or not llm_response
or llm_response.role != "assistant"
):
if not req or not req.conversation:
return

if not llm_response.completion_text and not req.tool_calls_result:
if not llm_response and not user_aborted:
return

if llm_response and llm_response.role != "assistant":
if not user_aborted:
return
llm_response = LLMResponse(
role="assistant",
completion_text=llm_response.completion_text or "",
)
elif llm_response is None:
llm_response = LLMResponse(role="assistant", completion_text="")

if (
not llm_response.completion_text
and not req.tool_calls_result
and not user_aborted
):
logger.debug("LLM 响应为空,不保存记录。")
return

Expand All @@ -363,6 +380,14 @@ async def _save_to_history(
continue
message_to_save.append(message.model_dump())

# if user_aborted:
# message_to_save.append(
# Message(
# role="assistant",
# content="[User aborted this request. Partial output before abort was preserved.]",
# ).model_dump()
# )
Comment on lines +383 to +389
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This block of commented-out code should be removed to maintain a clean codebase.


token_usage = None
if runner_stats:
# token_usage = runner_stats.token_usage.total
Expand Down
17 changes: 17 additions & 0 deletions astrbot/core/utils/active_event_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,22 @@ def stop_all(
count += 1
return count

def request_agent_stop_all(
self,
umo: str,
exclude: AstrMessageEvent | None = None,
) -> int:
"""请求停止指定 UMO 的所有活跃事件中的 Agent 运行。

与 stop_all 不同,这里不会调用 event.stop_event(),
因此不会中断事件传播,后续流程(如历史记录保存)仍可继续。
"""
count = 0
for event in list(self._events.get(umo, [])):
if event is not exclude:
event.set_extra("agent_stop_requested", True)
count += 1
return count


active_event_registry = ActiveEventRegistry()
47 changes: 40 additions & 7 deletions astrbot/dashboard/routes/chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
from astrbot.core import logger, sp
from astrbot.core.core_lifecycle import AstrBotCoreLifecycle
from astrbot.core.db import BaseDatabase
from astrbot.core.platform.message_type import MessageType
from astrbot.core.platform.sources.webchat.webchat_queue_mgr import webchat_queue_mgr
from astrbot.core.utils.active_event_registry import active_event_registry
from astrbot.core.utils.astrbot_path import get_astrbot_data_path

from .route import Response, Route, RouteContext
Expand Down Expand Up @@ -41,6 +43,7 @@ def __init__(
"/chat/new_session": ("GET", self.new_session),
"/chat/sessions": ("GET", self.get_sessions),
"/chat/get_session": ("GET", self.get_session),
"/chat/stop": ("POST", self.stop_session),
"/chat/delete_session": ("GET", self.delete_webchat_session),
"/chat/update_session_display_name": (
"POST",
Expand Down Expand Up @@ -466,13 +469,13 @@ async def stream():
if tc_id in tool_calls:
tool_calls[tc_id]["result"] = tcr.get("result")
tool_calls[tc_id]["finished_ts"] = tcr.get("ts")
accumulated_parts.append(
{
"type": "tool_call",
"tool_calls": [tool_calls[tc_id]],
}
)
tool_calls.pop(tc_id, None)
accumulated_parts.append(
{
"type": "tool_call",
"tool_calls": [tool_calls[tc_id]],
}
)
tool_calls.pop(tc_id, None)
elif chain_type == "reasoning":
accumulated_reasoning += result_text
elif streaming:
Expand Down Expand Up @@ -603,6 +606,36 @@ async def stream():
response.timeout = None # fix SSE auto disconnect issue
return response

async def stop_session(self):
"""Stop active agent runs for a session."""
post_data = await request.json
if post_data is None:
return Response().error("Missing JSON body").__dict__

session_id = post_data.get("session_id")
if not session_id:
return Response().error("Missing key: session_id").__dict__

username = g.get("username", "guest")
session = await self.db.get_platform_session_by_id(session_id)
if not session:
return Response().error(f"Session {session_id} not found").__dict__
if session.creator != username:
return Response().error("Permission denied").__dict__

message_type = (
MessageType.GROUP_MESSAGE.value
if session.is_group
else MessageType.FRIEND_MESSAGE.value
)
umo = (
f"{session.platform_id}:{message_type}:"
f"{session.platform_id}!{username}!{session_id}"
)
Comment on lines +631 to +634
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic for constructing the umo (Unified Message Origin) string is duplicated here and in the delete_webchat_session method. This duplication is fragile; if the UMO format requirements change, it's easy to miss one of these locations. Consider extracting this logic into a helper method or a property on the session model.

stopped_count = active_event_registry.request_agent_stop_all(umo)

return Response().ok(data={"stopped_count": stopped_count}).__dict__

async def delete_webchat_session(self):
"""Delete a Platform session and all its related data."""
session_id = request.args.get("session_id")
Expand Down
Loading