Skip to content

Commit d273a41

Browse files
committed
Fix mothership block req lifecycle
1 parent b087aba commit d273a41

File tree

6 files changed

+489
-53
lines changed

6 files changed

+489
-53
lines changed

apps/sim/app/api/mothership/execute/route.ts

Lines changed: 128 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { checkInternalAuth } from '@/lib/auth/hybrid'
55
import { buildIntegrationToolSchemas } from '@/lib/copilot/chat/payload'
66
import { generateWorkspaceContext } from '@/lib/copilot/chat/workspace-context'
77
import { runHeadlessCopilotLifecycle } from '@/lib/copilot/request/lifecycle/headless'
8+
import { requestExplicitStreamAbort } from '@/lib/copilot/request/session/explicit-abort'
89
import { generateId } from '@/lib/core/utils/uuid'
910
import {
1011
assertActiveWorkspaceAccess,
@@ -26,8 +27,16 @@ const ExecuteRequestSchema = z.object({
2627
workspaceId: z.string().min(1, 'workspaceId is required'),
2728
userId: z.string().min(1, 'userId is required'),
2829
chatId: z.string().optional(),
30+
messageId: z.string().optional(),
31+
requestId: z.string().optional(),
32+
workflowId: z.string().optional(),
33+
executionId: z.string().optional(),
2934
})
3035

36+
function isAbortError(error: unknown): boolean {
37+
return error instanceof Error && error.name === 'AbortError'
38+
}
39+
3140
/**
3241
* POST /api/mothership/execute
3342
*
@@ -37,6 +46,7 @@ const ExecuteRequestSchema = z.object({
3746
*/
3847
export async function POST(req: NextRequest) {
3948
let messageId: string | undefined
49+
let requestId: string | undefined
4050

4151
try {
4252
const auth = await checkInternalAuth(req, { requireWorkflowId: false })
@@ -45,14 +55,29 @@ export async function POST(req: NextRequest) {
4555
}
4656

4757
const body = await req.json()
48-
const { messages, responseFormat, workspaceId, userId, chatId } =
49-
ExecuteRequestSchema.parse(body)
58+
const {
59+
messages,
60+
responseFormat,
61+
workspaceId,
62+
userId,
63+
chatId,
64+
messageId: providedMessageId,
65+
requestId: providedRequestId,
66+
workflowId,
67+
executionId,
68+
} = ExecuteRequestSchema.parse(body)
5069

5170
await assertActiveWorkspaceAccess(workspaceId, userId)
5271

5372
const effectiveChatId = chatId || generateId()
54-
messageId = generateId()
55-
const reqLogger = logger.withMetadata({ messageId })
73+
messageId = providedMessageId || generateId()
74+
requestId = providedRequestId || generateId()
75+
const reqLogger = logger.withMetadata({
76+
messageId,
77+
requestId,
78+
workflowId,
79+
executionId,
80+
})
5681
const [workspaceContext, integrationTools, userPermission] = await Promise.all([
5782
generateWorkspaceContext(workspaceId, userId),
5883
buildIntegrationToolSchemas(userId, messageId),
@@ -72,52 +97,96 @@ export async function POST(req: NextRequest) {
7297
...(userPermission ? { userPermission } : {}),
7398
}
7499

75-
const result = await runHeadlessCopilotLifecycle(requestPayload, {
76-
userId,
77-
workspaceId,
78-
chatId: effectiveChatId,
79-
goRoute: '/api/mothership/execute',
80-
autoExecuteTools: true,
81-
interactive: false,
82-
})
100+
let allowExplicitAbort = true
101+
let explicitAbortRequest: Promise<void> | undefined
102+
const onAbort = () => {
103+
if (!allowExplicitAbort || explicitAbortRequest || !messageId) {
104+
return
105+
}
83106

84-
if (!result.success) {
85-
logger.error(
86-
messageId
87-
? `Mothership execute failed [messageId:${messageId}]`
88-
: 'Mothership execute failed',
89-
{
90-
error: result.error,
91-
errors: result.errors,
92-
}
93-
)
94-
return NextResponse.json(
95-
{
96-
error: result.error || 'Mothership execution failed',
97-
content: result.content || '',
98-
},
99-
{ status: 500 }
100-
)
107+
explicitAbortRequest = requestExplicitStreamAbort({
108+
streamId: messageId,
109+
userId,
110+
chatId: effectiveChatId,
111+
}).catch((error) => {
112+
reqLogger.warn('Failed to send explicit abort for mothership execution', {
113+
error: error instanceof Error ? error.message : String(error),
114+
})
115+
})
101116
}
102117

103-
const clientToolNames = new Set(integrationTools.map((t) => t.name))
104-
const clientToolCalls = (result.toolCalls || []).filter(
105-
(tc: { name: string }) => clientToolNames.has(tc.name) || tc.name.startsWith('mcp-')
106-
)
118+
if (req.signal.aborted) {
119+
onAbort()
120+
} else {
121+
req.signal.addEventListener('abort', onAbort, { once: true })
122+
}
123+
124+
try {
125+
const result = await runHeadlessCopilotLifecycle(requestPayload, {
126+
userId,
127+
workspaceId,
128+
chatId: effectiveChatId,
129+
workflowId,
130+
executionId,
131+
simRequestId: requestId,
132+
goRoute: '/api/mothership/execute',
133+
autoExecuteTools: true,
134+
interactive: false,
135+
abortSignal: req.signal,
136+
})
137+
138+
allowExplicitAbort = false
139+
140+
if (req.signal.aborted) {
141+
reqLogger.info('Mothership execute aborted after lifecycle completion')
142+
return NextResponse.json({ error: 'Mothership execution aborted' }, { status: 499 })
143+
}
107144

108-
return NextResponse.json({
109-
content: result.content,
110-
model: 'mothership',
111-
tokens: result.usage
112-
? {
113-
prompt: result.usage.prompt,
114-
completion: result.usage.completion,
115-
total: (result.usage.prompt || 0) + (result.usage.completion || 0),
145+
if (!result.success) {
146+
logger.error(
147+
messageId
148+
? `Mothership execute failed [messageId:${messageId}]`
149+
: 'Mothership execute failed',
150+
{
151+
requestId,
152+
workflowId,
153+
executionId,
154+
error: result.error,
155+
errors: result.errors,
116156
}
117-
: {},
118-
cost: result.cost || undefined,
119-
toolCalls: clientToolCalls,
120-
})
157+
)
158+
return NextResponse.json(
159+
{
160+
error: result.error || 'Mothership execution failed',
161+
content: result.content || '',
162+
},
163+
{ status: 500 }
164+
)
165+
}
166+
167+
const clientToolNames = new Set(integrationTools.map((t) => t.name))
168+
const clientToolCalls = (result.toolCalls || []).filter(
169+
(tc: { name: string }) => clientToolNames.has(tc.name) || tc.name.startsWith('mcp-')
170+
)
171+
172+
return NextResponse.json({
173+
content: result.content,
174+
model: 'mothership',
175+
tokens: result.usage
176+
? {
177+
prompt: result.usage.prompt,
178+
completion: result.usage.completion,
179+
total: (result.usage.prompt || 0) + (result.usage.completion || 0),
180+
}
181+
: {},
182+
cost: result.cost || undefined,
183+
toolCalls: clientToolCalls,
184+
})
185+
} finally {
186+
allowExplicitAbort = false
187+
req.signal.removeEventListener('abort', onAbort)
188+
await explicitAbortRequest
189+
}
121190
} catch (error) {
122191
if (error instanceof z.ZodError) {
123192
return NextResponse.json(
@@ -126,9 +195,23 @@ export async function POST(req: NextRequest) {
126195
)
127196
}
128197

198+
if (req.signal.aborted || isAbortError(error)) {
199+
logger.info(
200+
messageId
201+
? `Mothership execute aborted [messageId:${messageId}]`
202+
: 'Mothership execute aborted',
203+
{
204+
requestId,
205+
}
206+
)
207+
208+
return NextResponse.json({ error: 'Mothership execution aborted' }, { status: 499 })
209+
}
210+
129211
logger.error(
130212
messageId ? `Mothership execute error [messageId:${messageId}]` : 'Mothership execute error',
131213
{
214+
requestId,
132215
error: error instanceof Error ? error.message : 'Unknown error',
133216
}
134217
)

0 commit comments

Comments
 (0)