From 42d1878d7d432a21cb078878ac0c44a22d319a1e Mon Sep 17 00:00:00 2001 From: hongbin9 Date: Tue, 9 Jun 2026 16:17:32 +0800 Subject: [PATCH] fix(cli): flush run parts after json stream idle --- packages/opencode/src/cli/cmd/run.ts | 157 +++++++++++++++++---------- 1 file changed, 98 insertions(+), 59 deletions(-) diff --git a/packages/opencode/src/cli/cmd/run.ts b/packages/opencode/src/cli/cmd/run.ts index 18d033dadb3c..ab893d7eecb6 100644 --- a/packages/opencode/src/cli/cmd/run.ts +++ b/packages/opencode/src/cli/cmd/run.ts @@ -20,7 +20,7 @@ import { UI } from "../ui" import { effectCmd } from "../effect-cmd" import { EOL } from "os" import { Filesystem } from "@/util/filesystem" -import { createOpencodeClient, type OpencodeClient, type ToolPart } from "@opencode-ai/sdk/v2" +import { createOpencodeClient, type OpencodeClient, type Part as SessionPart, type ToolPart } from "@opencode-ai/sdk/v2" import { FormatError, FormatUnknownError } from "../error" import { INTERACTIVE_INPUT_ERROR, resolveInteractiveStdin } from "./run/runtime.stdin" @@ -629,83 +629,120 @@ export const RunCommand = effectCmd({ // to stdout/UI. `client` is passed explicitly because attach mode may // rebind the SDK to the session's directory after the subscription is // created, and replies issued from inside the loop must use that client. + let flushParts: (parts?: SessionPart[]) => Promise = async () => {} async function loop(client: OpencodeClient, events: Awaited>) { const toggles = new Map() + const emitted = new Set() let error: string | undefined - for await (const event of events.stream) { + const mark = (part: SessionPart) => { + emitted.add(part.id) + } + + async function handlePart(part: SessionPart) { + if (part.sessionID !== sessionID) return + + if (part.type === "tool" && (part.state.status === "completed" || part.state.status === "error")) { + if (emit("tool_use", { part })) { + mark(part) + return + } + if (part.state.status === "completed") { + await tool(part) + mark(part) + return + } + await toolError(part) + UI.error(part.state.error) + mark(part) + } + if ( - event.type === "message.updated" && - event.properties.sessionID === sessionID && - event.properties.info.role === "assistant" && - args.format !== "json" && - toggles.get("start") !== true + part.type === "tool" && + part.tool === "task" && + part.state.status === "running" && + args.format !== "json" ) { - UI.empty() - UI.println(`> ${event.properties.info.agent} · ${event.properties.info.modelID}`) - UI.empty() - toggles.set("start", true) + if (toggles.get(part.id) === true) return + await tool(part) + toggles.set(part.id, true) } - if (event.type === "message.part.updated") { - const part = event.properties.part - if (part.sessionID !== sessionID) continue - - if (part.type === "tool" && (part.state.status === "completed" || part.state.status === "error")) { - if (emit("tool_use", { part })) continue - if (part.state.status === "completed") { - await tool(part) - continue - } - await toolError(part) - UI.error(part.state.error) + if (part.type === "step-start") { + if (emit("step_start", { part })) { + mark(part) + return } + } - if ( - part.type === "tool" && - part.tool === "task" && - part.state.status === "running" && - args.format !== "json" - ) { - if (toggles.get(part.id) === true) continue - await tool(part) - toggles.set(part.id, true) + if (part.type === "step-finish") { + if (emit("step_finish", { part })) { + mark(part) + return } + } - if (part.type === "step-start") { - if (emit("step_start", { part })) continue + if (part.type === "text" && part.time?.end) { + if (emit("text", { part })) { + mark(part) + return } - - if (part.type === "step-finish") { - if (emit("step_finish", { part })) continue + const text = part.text.trim() + if (!text) return + if (!process.stdout.isTTY) { + process.stdout.write(text + EOL) + mark(part) + return } + UI.empty() + UI.println(text) + UI.empty() + mark(part) + } - if (part.type === "text" && part.time?.end) { - if (emit("text", { part })) continue - const text = part.text.trim() - if (!text) continue - if (!process.stdout.isTTY) { - process.stdout.write(text + EOL) - continue - } + if (part.type === "reasoning" && part.time?.end && thinking) { + if (emit("reasoning", { part })) { + mark(part) + return + } + const text = part.text.trim() + if (!text) return + const line = `Thinking: ${text}` + if (process.stdout.isTTY) { UI.empty() - UI.println(text) + UI.println(`${UI.Style.TEXT_DIM}\u001b[3m${line}\u001b[0m${UI.Style.TEXT_NORMAL}`) UI.empty() + mark(part) + return } + process.stdout.write(line + EOL) + mark(part) + } + } - if (part.type === "reasoning" && part.time?.end && thinking) { - if (emit("reasoning", { part })) continue - const text = part.text.trim() - if (!text) continue - const line = `Thinking: ${text}` - if (process.stdout.isTTY) { - UI.empty() - UI.println(`${UI.Style.TEXT_DIM}\u001b[3m${line}\u001b[0m${UI.Style.TEXT_NORMAL}`) - UI.empty() - continue - } - process.stdout.write(line + EOL) - } + flushParts = async (parts = []) => { + for (const part of parts) { + if (emitted.has(part.id)) continue + await handlePart(part) + } + } + + for await (const event of events.stream) { + if ( + event.type === "message.updated" && + event.properties.sessionID === sessionID && + event.properties.info.role === "assistant" && + args.format !== "json" && + toggles.get("start") !== true + ) { + UI.empty() + UI.println(`> ${event.properties.info.agent} · ${event.properties.info.modelID}`) + UI.empty() + toggles.set("start", true) + } + + if (event.type === "message.part.updated") { + await handlePart(event.properties.part) } if (event.type === "session.error") { @@ -787,6 +824,7 @@ export const RunCommand = effectCmd({ return } await finish() + if (!args.attach) await flushParts(result.data?.parts) return } @@ -804,6 +842,7 @@ export const RunCommand = effectCmd({ return } await finish() + if (!args.attach) await flushParts(result.data?.parts) return }