Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -302,10 +302,10 @@ describe("JsonEventEmitter streaming deltas", () => {

emitter.emitCommandOutputChunk("line1\n")
emitter.emitCommandOutputChunk("line1\nline2\n")
emitter.emitCommandOutputDone(17)
emitter.markCommandOutputExited(17)

// This completion say is expected from the extension, but should be suppressed
// because we already streamed and completed via commandExecutionStatus.
// This completion say is expected from the extension and should finalize
// the status-driven command_output stream without duplicating content.
emitMessage(emitter, {
ts: 999,
type: "say",
Expand Down Expand Up @@ -343,4 +343,47 @@ describe("JsonEventEmitter streaming deltas", () => {
done: true,
})
})

it("flushes remaining output on final say completion after fast status:exited", () => {
const { stdout, lines } = createMockStdout()
const emitter = new JsonEventEmitter({ mode: "stream-json", stdout })
const commandId = 606

emitMessage(
emitter,
createAskMessage({
ts: commandId,
ask: "command",
partial: false,
text: "aws sts get-caller-identity",
}),
)

emitter.emitCommandOutputChunk("{\n")
emitter.markCommandOutputExited(0)

emitMessage(emitter, {
ts: 607,
type: "say",
say: "command_output",
partial: false,
text: '{\n "Account": "123"\n}\n',
} as ClineMessage)

const output = lines()
expect(output).toHaveLength(3)
expect(output[1]).toMatchObject({
type: "tool_result",
id: commandId,
subtype: "command",
tool_result: { name: "execute_command", output: "{\n" },
})
expect(output[2]).toMatchObject({
type: "tool_result",
id: commandId,
subtype: "command",
tool_result: { name: "execute_command", output: ' "Account": "123"\n}\n', exitCode: 0 },
done: true,
})
})
})
76 changes: 66 additions & 10 deletions apps/cli/src/agent/json-event-emitter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ const SKIP_SAY_TYPES = new Set([

/** Key offset for reasoning content to avoid collision with text content delta tracking */
const REASONING_KEY_OFFSET = 1_000_000_000
/** Grace period to wait for final say:command_output after status:exited */
const COMMAND_OUTPUT_EXIT_GRACE_MS = 250

export class JsonEventEmitter {
private mode: "json" | "stream-json"
Expand All @@ -115,8 +117,8 @@ export class JsonEventEmitter {
private statusDrivenCommandOutputIds = new Set<number>()
// Track command ids that already emitted a terminal command_output done event.
private completedCommandOutputIds = new Set<number>()
// Suppress the next say:command_output completion message after status-driven streaming.
private suppressNextCommandOutputSay = false
// Track exited commands awaiting final say:command_output completion.
private pendingCommandCompletionByToolUseId = new Map<number, { exitCode?: number; timer: NodeJS.Timeout }>()
// Track the completion result content
private completionResultContent: string | undefined
// Track the latest assistant text as a fallback for result.content.
Expand Down Expand Up @@ -338,6 +340,7 @@ export class JsonEventEmitter {

if (isDone) {
event.done = true
this.clearPendingCommandCompletion(commandId)
this.previousCommandOutputByToolUseId.delete(commandId)
this.statusDrivenCommandOutputIds.delete(commandId)
this.completedCommandOutputIds.add(commandId)
Expand Down Expand Up @@ -368,6 +371,7 @@ export class JsonEventEmitter {
})

if (isDone) {
this.clearPendingCommandCompletion(commandId)
this.previousCommandOutputByToolUseId.delete(commandId)
this.statusDrivenCommandOutputIds.delete(commandId)
this.completedCommandOutputIds.add(commandId)
Expand All @@ -387,17 +391,47 @@ export class JsonEventEmitter {
this.emitCommandOutputEvent(commandId, outputSnapshot, false)
}

public markCommandOutputExited(exitCode?: number): void {
const commandId = this.activeCommandToolUseId
if (commandId === undefined) {
return
}

this.statusDrivenCommandOutputIds.add(commandId)
this.clearPendingCommandCompletion(commandId)

const timer = setTimeout(() => {
// Fallback close if final say:command_output never arrives.
if (!this.pendingCommandCompletionByToolUseId.has(commandId)) {
return
}
this.pendingCommandCompletionByToolUseId.delete(commandId)
this.emitCommandOutputEvent(commandId, undefined, true, exitCode)
}, COMMAND_OUTPUT_EXIT_GRACE_MS)
timer.unref?.()

this.pendingCommandCompletionByToolUseId.set(commandId, { exitCode, timer })
}

public emitCommandOutputDone(exitCode?: number): void {
const commandId = this.activeCommandToolUseId
if (commandId === undefined) {
return
}

this.statusDrivenCommandOutputIds.add(commandId)
this.suppressNextCommandOutputSay = true
this.emitCommandOutputEvent(commandId, undefined, true, exitCode)
}

private clearPendingCommandCompletion(commandId: number): void {
const pending = this.pendingCommandCompletionByToolUseId.get(commandId)
if (!pending) {
return
}
clearTimeout(pending.timer)
this.pendingCommandCompletionByToolUseId.delete(commandId)
}

/**
* Get content to send for a message (delta for streaming, full for json mode).
*/
Expand Down Expand Up @@ -624,9 +658,19 @@ export class JsonEventEmitter {
const toolInfo = parseToolInfo(msg.text)

if (subtype === "command") {
if (this.activeCommandToolUseId !== undefined && this.activeCommandToolUseId !== msg.ts) {
const previousCommandId = this.activeCommandToolUseId
const pending = this.pendingCommandCompletionByToolUseId.get(previousCommandId)
if (pending) {
clearTimeout(pending.timer)
this.pendingCommandCompletionByToolUseId.delete(previousCommandId)
this.emitCommandOutputEvent(previousCommandId, undefined, true, pending.exitCode)
}
}

this.activeCommandToolUseId = msg.ts
this.completedCommandOutputIds.delete(msg.ts)
this.suppressNextCommandOutputSay = false
this.clearPendingCommandCompletion(msg.ts)

if (isStreamingPartial) {
const commandDelta = this.computeStructuredDelta(msg.ts, msg.text)
Expand Down Expand Up @@ -707,17 +751,26 @@ export class JsonEventEmitter {
}

private handleCommandOutputMessage(msg: ClineMessage, isDone: boolean): void {
if (this.suppressNextCommandOutputSay) {
if (isDone) {
this.suppressNextCommandOutputSay = false
const commandId = this.activeCommandToolUseId ?? msg.ts
if (this.completedCommandOutputIds.has(commandId)) {
return
}

const pending = this.pendingCommandCompletionByToolUseId.get(commandId)
if (pending) {
if (!isDone) {
return
}
clearTimeout(pending.timer)
this.pendingCommandCompletionByToolUseId.delete(commandId)
this.emitCommandOutputEvent(commandId, msg.text, true, pending.exitCode)
return
}

const commandId = this.activeCommandToolUseId ?? msg.ts
if (this.statusDrivenCommandOutputIds.has(commandId) || this.completedCommandOutputIds.has(commandId)) {
if (this.statusDrivenCommandOutputIds.has(commandId)) {
return
}

this.emitCommandOutputEvent(commandId, msg.text, isDone)
}

Expand Down Expand Up @@ -841,7 +894,10 @@ export class JsonEventEmitter {
this.previousCommandOutputByToolUseId.clear()
this.statusDrivenCommandOutputIds.clear()
this.completedCommandOutputIds.clear()
this.suppressNextCommandOutputSay = false
for (const pending of this.pendingCommandCompletionByToolUseId.values()) {
clearTimeout(pending.timer)
}
this.pendingCommandCompletionByToolUseId.clear()
this.completionResultContent = undefined
this.lastAssistantText = undefined
this.expectPromptEchoAsUser = true
Expand Down
18 changes: 12 additions & 6 deletions apps/cli/src/commands/cli/stdin-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -432,16 +432,22 @@ export async function runStdinStreamMode({ host, jsonEmitter, setStreamRequestId
return
}

if (
parsedStatus.status === "exited" ||
parsedStatus.status === "timeout" ||
parsedStatus.status === "fallback"
) {
if (parsedStatus.status === "exited") {
const exitCode =
parsedStatus.status === "exited" && typeof parsedStatus.exitCode === "number"
? parsedStatus.exitCode
: undefined
jsonEmitter.emitCommandOutputDone(exitCode)

if (typeof parsedStatus.output === "string") {
jsonEmitter.emitCommandOutputChunk(parsedStatus.output)
}

jsonEmitter.markCommandOutputExited(exitCode)
return
}

if (parsedStatus.status === "timeout" || parsedStatus.status === "fallback") {
jsonEmitter.emitCommandOutputDone(undefined)
return
}

Expand Down
Loading