Skip to content

Commit 64bd8ee

Browse files
committed
Fix subagent streaming and parent tracking for nested agents
🤖 Generated with Codebuff Co-Authored-By: Codebuff <noreply@codebuff.com>
1 parent 42cb2e5 commit 64bd8ee

File tree

3 files changed

+161
-67
lines changed

3 files changed

+161
-67
lines changed

backend/src/run-programmatic-step.ts

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -273,12 +273,15 @@ export async function runProgrammaticStep(
273273
role: 'assistant' as const,
274274
content: toolCallString,
275275
})
276-
state.sendSubagentChunk({
277-
userInputId,
278-
agentId: state.agentState.agentId,
279-
agentType: state.agentState.agentType!,
280-
chunk: toolCallString,
281-
})
276+
if (!state.agentState.parentId) {
277+
// Top-level agents have no parent streaming helper, so emit chunks for subagent traces only in that case.
278+
state.sendSubagentChunk?.({
279+
userInputId,
280+
agentId: state.agentState.agentId,
281+
agentType: state.agentState.agentType!,
282+
chunk: toolCallString,
283+
})
284+
}
282285
}
283286

284287
// Execute the tool synchronously and get the result immediately

backend/src/tools/handlers/tool/spawn-agents.ts

Lines changed: 51 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ export const handleSpawnAgents = ((
9191
)
9292
}
9393

94+
const lastSentSubagentChunkByAgent = new Map<string, string>()
95+
9496
const {
9597
ws,
9698
fingerprintId,
@@ -147,53 +149,74 @@ export const handleSpawnAgents = ((
147149
isOnlyChild: agents.length === 1,
148150
parentSystemPrompt,
149151
onResponseChunk: (chunk: string | PrintModeEvent) => {
150-
if (typeof chunk === 'string') {
152+
const sendDedupedSubagentChunk = (textChunk: string) => {
153+
const lastChunk = lastSentSubagentChunkByAgent.get(
154+
subAgentState.agentId,
155+
)
156+
157+
if (lastChunk === textChunk) {
158+
return
159+
}
160+
161+
lastSentSubagentChunkByAgent.set(
162+
subAgentState.agentId,
163+
textChunk,
164+
)
165+
151166
sendSubagentChunk({
152167
userInputId,
153168
agentId: subAgentState.agentId,
154169
agentType,
155-
chunk,
170+
chunk: textChunk,
156171
prompt,
157172
})
173+
}
174+
175+
if (typeof chunk === 'string') {
176+
sendDedupedSubagentChunk(chunk)
158177
return
159178
}
160179

180+
if (chunk.type === 'text') {
181+
const textChunk = chunk.text
182+
if (textChunk) {
183+
sendDedupedSubagentChunk(textChunk)
184+
}
185+
}
186+
161187
// For nested agent events, add parentAgentId to enable proper nesting in UI
162188
if (
163189
chunk.type === 'subagent_start' ||
164190
chunk.type === 'subagent_finish'
165191
) {
166-
logger.debug(
167-
{
168-
eventType: chunk.type,
169-
agentId: chunk.agentId,
170-
parentId: subAgentState.agentId,
171-
parentAgentId: subAgentState.agentId,
172-
},
173-
`spawn-agents: Adding parentAgentId to ${chunk.type} event`,
174-
)
175-
writeToClient({
176-
...chunk,
177-
parentAgentId: subAgentState.agentId,
178-
})
192+
const ensuredParentAgentId =
193+
chunk.parentAgentId ??
194+
subAgentState.parentId ??
195+
parentAgentState?.agentId
196+
const eventWithParent =
197+
ensuredParentAgentId !== undefined
198+
? { ...chunk, parentAgentId: ensuredParentAgentId }
199+
: chunk
200+
if (
201+
chunk.type === 'subagent_finish' &&
202+
chunk.agentId &&
203+
lastSentSubagentChunkByAgent.has(chunk.agentId)
204+
) {
205+
lastSentSubagentChunkByAgent.delete(chunk.agentId)
206+
}
207+
writeToClient(eventWithParent)
179208
return
180209
}
181210

182211
// For tool calls and results from nested agents, preserve the agentId but add parentAgentId
183212
if (chunk.type === 'tool_call' || chunk.type === 'tool_result') {
184-
logger.debug(
185-
{
186-
eventType: chunk.type,
187-
agentId: (chunk as any).agentId,
188-
parentId: subAgentState.agentId,
189-
parentAgentId: subAgentState.agentId,
190-
},
191-
`spawn-agents: Adding parentAgentId to ${chunk.type} event`,
192-
)
193-
writeToClient({
194-
...chunk,
195-
parentAgentId: subAgentState.agentId,
196-
})
213+
const ensuredParentAgentId =
214+
(chunk as any).parentAgentId ?? subAgentState.agentId
215+
const eventWithParent =
216+
ensuredParentAgentId !== undefined
217+
? { ...chunk, parentAgentId: ensuredParentAgentId }
218+
: chunk
219+
writeToClient(eventWithParent)
197220
return
198221
}
199222

npm-app/src/client.ts

Lines changed: 101 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,8 @@ export class Client {
209209
private responseComplete: boolean = false
210210
private userInputId: string | undefined
211211
private currentOnChunk: ((chunk: string | PrintModeEvent) => void) | undefined
212+
private onlyChildAgents: Set<string> = new Set()
213+
private emitPromptChunkToParser: ((textChunk: string) => boolean) | undefined
212214

213215
public usageData: UsageData = {
214216
usage: 0,
@@ -957,6 +959,10 @@ export class Client {
957959
// Store the chunk locally
958960
storeSubagentChunk({ agentId, agentType, chunk, prompt })
959961

962+
if (this.onlyChildAgents.has(agentId) && this.emitPromptChunkToParser) {
963+
this.emitPromptChunkToParser(chunk)
964+
}
965+
960966
// Refresh display if we're currently viewing this agent
961967
refreshSubagentDisplay(agentId)
962968
})
@@ -1340,6 +1346,51 @@ export class Client {
13401346

13411347
this.userInputId = userInputId
13421348
this.currentOnChunk = onChunk
1349+
this.onlyChildAgents.clear()
1350+
1351+
const clearStreamingState = () => {
1352+
this.emitPromptChunkToParser = undefined
1353+
this.onlyChildAgents.clear()
1354+
}
1355+
1356+
const emitChunkToParser = (textChunk: string) => {
1357+
rawChunkBuffer.push(textChunk)
1358+
1359+
const trimmed = textChunk.trim()
1360+
for (const tag of ONE_TIME_TAGS) {
1361+
if (
1362+
trimmed.startsWith(`<${tag}>`) &&
1363+
trimmed.endsWith(closeXml(tag))
1364+
) {
1365+
if (this.oneTimeFlags[tag]) {
1366+
return true
1367+
}
1368+
Spinner.get().stop()
1369+
const warningMessage = trimmed
1370+
.replace(`<${tag}>`, '')
1371+
.replace(closeXml(tag), '')
1372+
process.stdout.write(yellow(`\n\n${warningMessage}\n\n`))
1373+
this.oneTimeFlags[tag as (typeof ONE_TIME_LABELS)[number]] = true
1374+
return true
1375+
}
1376+
}
1377+
1378+
try {
1379+
xmlStreamParser.write(textChunk, 'utf8')
1380+
} catch (e) {
1381+
logger.error(
1382+
{
1383+
errorMessage: e instanceof Error ? e.message : String(e),
1384+
errorStack: e instanceof Error ? e.stack : undefined,
1385+
chunk: textChunk,
1386+
},
1387+
'Error writing chunk to XML stream parser',
1388+
)
1389+
}
1390+
1391+
return false
1392+
}
1393+
this.emitPromptChunkToParser = emitChunkToParser
13431394

13441395
const stopResponse = () => {
13451396
responseStopped = true
@@ -1349,6 +1400,7 @@ export class Client {
13491400
this.currentOnChunk = undefined
13501401

13511402
xmlStreamParser.destroy()
1403+
clearStreamingState()
13521404

13531405
const additionalMessages = prompt
13541406
? [
@@ -1394,45 +1446,60 @@ export class Client {
13941446

13951447
unsubscribeChunks = this.webSocket.subscribe('response-chunk', (a) => {
13961448
if (a.userInputId !== userInputId) return
1397-
if (typeof a.chunk === 'string') {
1398-
const { chunk } = a
1399-
1400-
rawChunkBuffer.push(chunk)
1401-
1402-
const trimmed = chunk.trim()
1403-
for (const tag of ONE_TIME_TAGS) {
1404-
if (
1405-
trimmed.startsWith(`<${tag}>`) &&
1406-
trimmed.endsWith(closeXml(tag))
1407-
) {
1408-
if (this.oneTimeFlags[tag]) {
1409-
return
1410-
}
1411-
Spinner.get().stop()
1412-
const warningMessage = trimmed
1413-
.replace(`<${tag}>`, '')
1414-
.replace(closeXml(tag), '')
1415-
process.stdout.write(yellow(`\n\n${warningMessage}\n\n`))
1416-
this.oneTimeFlags[tag as (typeof ONE_TIME_LABELS)[number]] = true
1449+
const incomingChunk = a.chunk
1450+
1451+
if (typeof incomingChunk === 'string') {
1452+
emitChunkToParser(incomingChunk)
1453+
return
1454+
}
1455+
1456+
if (incomingChunk.type === 'text') {
1457+
printModeLog(incomingChunk)
1458+
1459+
if (incomingChunk.agentId) {
1460+
if (this.onlyChildAgents.has(incomingChunk.agentId)) {
1461+
// For only-child subagents we rely on subagent-response-chunk strings
14171462
return
14181463
}
1464+
// Skip nested subagent text from streaming output
1465+
return
14191466
}
14201467

1421-
try {
1422-
xmlStreamParser.write(chunk, 'utf8')
1423-
} catch (e) {
1424-
logger.error(
1425-
{
1426-
errorMessage: e instanceof Error ? e.message : String(e),
1427-
errorStack: e instanceof Error ? e.stack : undefined,
1428-
chunk,
1429-
},
1430-
'Error writing chunk to XML stream parser',
1431-
)
1468+
emitChunkToParser(incomingChunk.text)
1469+
return
1470+
}
1471+
1472+
if (incomingChunk.type === 'error') {
1473+
const errorText = `${yellow(incomingChunk.message)}\n`
1474+
const handler = this.currentOnChunk
1475+
if (handler) {
1476+
handler(errorText)
1477+
} else {
1478+
Spinner.get().stop()
1479+
DiffManager.receivedResponse()
1480+
process.stdout.write(errorText)
14321481
}
1433-
} else {
1434-
onChunk(a.chunk)
14351482
}
1483+
1484+
if (
1485+
(incomingChunk.type === 'subagent_start' ||
1486+
incomingChunk.type === 'subagent-start') &&
1487+
incomingChunk.agentId
1488+
) {
1489+
if (incomingChunk.onlyChild) {
1490+
this.onlyChildAgents.add(incomingChunk.agentId)
1491+
} else {
1492+
this.onlyChildAgents.delete(incomingChunk.agentId)
1493+
}
1494+
} else if (
1495+
(incomingChunk.type === 'subagent_finish' ||
1496+
incomingChunk.type === 'subagent-finish') &&
1497+
incomingChunk.agentId
1498+
) {
1499+
this.onlyChildAgents.delete(incomingChunk.agentId)
1500+
}
1501+
1502+
onChunk(incomingChunk)
14361503
})
14371504

14381505
let stepsCount = 0
@@ -1552,6 +1619,7 @@ Go to https://www.codebuff.com/config for more information.`) +
15521619

15531620
unsubscribeChunks()
15541621
unsubscribeComplete()
1622+
clearStreamingState()
15551623

15561624
// Clear the onChunk callback when response is complete
15571625
this.currentOnChunk = undefined

0 commit comments

Comments
 (0)