diff --git a/packages/react-ui/public/locales/de/translation.json b/packages/react-ui/public/locales/de/translation.json index 508019580b7..3e97abcc1a5 100644 --- a/packages/react-ui/public/locales/de/translation.json +++ b/packages/react-ui/public/locales/de/translation.json @@ -479,6 +479,9 @@ "Invalid Access": "", "You tried to access a project that you do not have access to.": "", "New Table": "", + "No, cancel": "", + "Yes, and don't ask me again": "", + "Yes, proceed": "", "Response stopped": "", "Retry": "", "Reply...": "", diff --git a/packages/react-ui/public/locales/en/translation.json b/packages/react-ui/public/locales/en/translation.json index ac327e46fdb..5e77df512ec 100644 --- a/packages/react-ui/public/locales/en/translation.json +++ b/packages/react-ui/public/locales/en/translation.json @@ -477,6 +477,9 @@ "Invalid Access": "", "You tried to access a project that you do not have access to.": "", "New Table": "", + "No, cancel": "", + "Yes, and don't ask me again": "", + "Yes, proceed": "", "Response stopped": "", "Retry": "", "Reply...": "", diff --git a/packages/react-ui/public/locales/es/translation.json b/packages/react-ui/public/locales/es/translation.json index 8332f429b98..32f20d37a17 100644 --- a/packages/react-ui/public/locales/es/translation.json +++ b/packages/react-ui/public/locales/es/translation.json @@ -479,6 +479,9 @@ "Invalid Access": "", "You tried to access a project that you do not have access to.": "", "New Table": "", + "No, cancel": "", + "Yes, and don't ask me again": "", + "Yes, proceed": "", "Response stopped": "", "Retry": "", "Reply...": "", diff --git a/packages/react-ui/public/locales/fr/translation.json b/packages/react-ui/public/locales/fr/translation.json index 8332f429b98..32f20d37a17 100644 --- a/packages/react-ui/public/locales/fr/translation.json +++ b/packages/react-ui/public/locales/fr/translation.json @@ -479,6 +479,9 @@ "Invalid Access": "", "You tried to access a project that you do not have access to.": "", "New Table": "", + "No, cancel": "", + "Yes, and don't ask me again": "", + "Yes, proceed": "", "Response stopped": "", "Retry": "", "Reply...": "", diff --git a/packages/react-ui/public/locales/ja/translation.json b/packages/react-ui/public/locales/ja/translation.json index 944802abcc4..2324c24f4b4 100644 --- a/packages/react-ui/public/locales/ja/translation.json +++ b/packages/react-ui/public/locales/ja/translation.json @@ -479,6 +479,9 @@ "Invalid Access": "", "You tried to access a project that you do not have access to.": "", "New Table": "", + "No, cancel": "", + "Yes, and don't ask me again": "", + "Yes, proceed": "", "Response stopped": "", "Retry": "", "Reply...": "", diff --git a/packages/react-ui/public/locales/nl/translation.json b/packages/react-ui/public/locales/nl/translation.json index 508019580b7..3e97abcc1a5 100644 --- a/packages/react-ui/public/locales/nl/translation.json +++ b/packages/react-ui/public/locales/nl/translation.json @@ -479,6 +479,9 @@ "Invalid Access": "", "You tried to access a project that you do not have access to.": "", "New Table": "", + "No, cancel": "", + "Yes, and don't ask me again": "", + "Yes, proceed": "", "Response stopped": "", "Retry": "", "Reply...": "", diff --git a/packages/react-ui/public/locales/pt/translation.json b/packages/react-ui/public/locales/pt/translation.json index 8332f429b98..32f20d37a17 100644 --- a/packages/react-ui/public/locales/pt/translation.json +++ b/packages/react-ui/public/locales/pt/translation.json @@ -479,6 +479,9 @@ "Invalid Access": "", "You tried to access a project that you do not have access to.": "", "New Table": "", + "No, cancel": "", + "Yes, and don't ask me again": "", + "Yes, proceed": "", "Response stopped": "", "Retry": "", "Reply...": "", diff --git a/packages/react-ui/public/locales/zh-TW/translation.json b/packages/react-ui/public/locales/zh-TW/translation.json index 944802abcc4..2324c24f4b4 100644 --- a/packages/react-ui/public/locales/zh-TW/translation.json +++ b/packages/react-ui/public/locales/zh-TW/translation.json @@ -479,6 +479,9 @@ "Invalid Access": "", "You tried to access a project that you do not have access to.": "", "New Table": "", + "No, cancel": "", + "Yes, and don't ask me again": "", + "Yes, proceed": "", "Response stopped": "", "Retry": "", "Reply...": "", diff --git a/packages/react-ui/public/locales/zh/translation.json b/packages/react-ui/public/locales/zh/translation.json index 944802abcc4..2324c24f4b4 100644 --- a/packages/react-ui/public/locales/zh/translation.json +++ b/packages/react-ui/public/locales/zh/translation.json @@ -479,6 +479,9 @@ "Invalid Access": "", "You tried to access a project that you do not have access to.": "", "New Table": "", + "No, cancel": "", + "Yes, and don't ask me again": "", + "Yes, proceed": "", "Response stopped": "", "Retry": "", "Reply...": "", diff --git a/packages/server/api/src/app/chat/chat-approval-gate.ts b/packages/server/api/src/app/chat/chat-approval-gate.ts new file mode 100644 index 00000000000..c5a737524b3 --- /dev/null +++ b/packages/server/api/src/app/chat/chat-approval-gate.ts @@ -0,0 +1,54 @@ +import { redisConnections } from '../database/redis-connections' +import { pubsub } from '../helper/pubsub' + +const GATE_TIMEOUT_MS = 5 * 60 * 1000 +const CHANNEL_PREFIX = 'tool-approval:' + +function channelName(gateId: string): string { + return `${CHANNEL_PREFIX}${gateId}` +} + +async function waitForApproval({ gateId }: { gateId: string }): Promise { + const channel = channelName(gateId) + const subscriber = await redisConnections.create() + + return new Promise((resolve) => { + let settled = false + + const cleanup = () => { + if (settled) return + settled = true + subscriber.unsubscribe(channel).then(() => subscriber.quit()).catch(() => undefined) + } + + const timeout = setTimeout(() => { + cleanup() + resolve(false) + }, GATE_TIMEOUT_MS) + + subscriber.on('message', (_ch, message) => { + if (_ch !== channel) return + clearTimeout(timeout) + cleanup() + try { + const parsed = JSON.parse(message) + resolve(parsed.approved === true) + } + catch { + resolve(false) + } + }) + + void subscriber.subscribe(channel) + }) +} + +async function resolveGate({ gateId, approved }: { gateId: string, approved: boolean }): Promise { + const channel = channelName(gateId) + await pubsub.publish(channel, JSON.stringify({ approved })) +} + +export const chatApprovalGate = { + waitForApproval, + resolveGate, +} diff --git a/packages/server/api/src/app/chat/chat-controller.ts b/packages/server/api/src/app/chat/chat-controller.ts index e0132f20154..32accf834a2 100644 --- a/packages/server/api/src/app/chat/chat-controller.ts +++ b/packages/server/api/src/app/chat/chat-controller.ts @@ -6,10 +6,12 @@ import { SetProjectContextRequest, UpdateChatConversationRequest, } from '@activepieces/shared' +import { pipeUIMessageStreamToResponse } from 'ai' import { FastifyPluginAsyncZod } from 'fastify-type-provider-zod' import { StatusCodes } from 'http-status-codes' import { z } from 'zod' import { securityAccess } from '../core/security/authorization/fastify-security' +import { chatApprovalGate } from './chat-approval-gate' import { chatService } from './chat-service' const CHAT_PRINCIPALS = [PrincipalType.USER] as const @@ -72,7 +74,7 @@ export const chatController: FastifyPluginAsyncZod = async (app) => { const { content, files } = request.body const log = request.log - const { result, closeMcpClient } = await chatService(log).sendMessage({ + const { stream, closeMcpClient } = await chatService(log).sendMessage({ conversationId: request.params.id, userId: request.principal.id, platformId: request.principal.platform.id, @@ -83,12 +85,16 @@ export const chatController: FastifyPluginAsyncZod = async (app) => { await reply.hijack() try { - result.pipeUIMessageStreamToResponse(reply.raw, { + pipeUIMessageStreamToResponse({ + response: reply.raw, + stream, headers: { 'X-Accel-Buffering': 'no', }, }) - await result.consumeStream() + await new Promise((resolve) => { + reply.raw.on('close', resolve) + }) } catch (err: unknown) { const isClientDisconnect = err instanceof Error && 'code' in err && err.code === 'ECONNRESET' @@ -104,6 +110,14 @@ export const chatController: FastifyPluginAsyncZod = async (app) => { } }) + app.post('/tool-approvals/:gateId', ToolApprovalRoute, async (request, reply) => { + await chatApprovalGate.resolveGate({ + gateId: request.params.gateId, + approved: request.body.approved, + }) + return reply.status(StatusCodes.OK).send({ success: true }) + }) + app.post('/conversations/:id/project-context', SetProjectContextRoute, async (request) => { return chatService(request.log).setProjectContext({ id: request.params.id, @@ -198,6 +212,18 @@ const SendMessageRoute = { }, } +const ToolApprovalRoute = { + config: { + security: securityAccess.publicPlatform(CHAT_PRINCIPALS), + }, + schema: { + tags: ['chat'], + security: [SERVICE_KEY_SECURITY_OPENAPI], + params: z.object({ gateId: z.string() }), + body: z.object({ approved: z.boolean() }), + }, +} + const SetProjectContextRoute = { config: { security: securityAccess.publicPlatform(CHAT_PRINCIPALS), diff --git a/packages/server/api/src/app/chat/chat-service.ts b/packages/server/api/src/app/chat/chat-service.ts index 1daa432b559..df156e1c167 100644 --- a/packages/server/api/src/app/chat/chat-service.ts +++ b/packages/server/api/src/app/chat/chat-service.ts @@ -1,4 +1,3 @@ -import { ServerResponse } from 'http' import { ActivepiecesError, AIProviderModelType, @@ -16,7 +15,7 @@ import { spreadIfDefined, UpdateChatConversationRequest, } from '@activepieces/shared' -import { LanguageModel, ModelMessage, stepCountIs, streamText } from 'ai' +import { createUIMessageStream, LanguageModel, ModelMessage, stepCountIs, streamText } from 'ai' import { FastifyBaseLogger } from 'fastify' import { aiProviderService } from '../ai/ai-provider-service' import { repoFactory } from '../core/db/repo-factory' @@ -164,6 +163,8 @@ export const chatService = (log: FastifyBaseLogger) => ({ const newUserMessage: ModelMessage = { role: 'user' as const, content: userContent } const allMessages = [...previousMessages, newUserMessage] + await conversationRepo().update(conversationId, { messages: allMessages }) + const compactionState = await resolveCompactionState({ conversation, allMessages, @@ -197,7 +198,6 @@ export const chatService = (log: FastifyBaseLogger) => ({ }, availableProjectIds: userProjects.map((p) => p.id), }) - const tools = { ...localTools, ...mcpToolSet } const closeMcpClient = async (): Promise => { if (mcpClient) { @@ -207,47 +207,50 @@ export const chatService = (log: FastifyBaseLogger) => ({ } } - try { - const result = streamText({ - model, - system: systemPrompt, - messages: messagesForLlm, - tools, - stopWhen: stepCountIs(MAX_STEPS), - onStepFinish: ({ finishReason, usage }) => { - log.debug({ conversationId, finishReason, usage }, 'Chat step finished') - }, - onFinish: async ({ response, usage }) => { - const updatedMessages = [...allMessages, ...response.messages] - try { - await conversationRepo().update(conversationId, { - messages: updatedMessages, - ...(pendingTitle ? { title: pendingTitle } : {}), - ...(isNil(conversation.modelName) ? { modelName } : {}), - }) - } - catch (saveErr) { - log.error({ err: saveErr, conversationId }, 'Failed to persist conversation messages') - } - - log.info({ - conversationId, - inputTokens: usage.inputTokens, - outputTokens: usage.outputTokens, - provider: providerConfig.provider, - }, 'Chat message completed') - }, - onError: ({ error }) => { - log.error({ err: error, conversationId }, 'Chat streamText error') - }, - }) + const stream = createUIMessageStream({ + execute: ({ writer }) => { + const gatedTools = chatMcp.withApprovalGates({ mcpToolSet, writer, log }) + const tools = { ...localTools, ...gatedTools } + + const textStream = streamText({ + model, + system: systemPrompt, + messages: messagesForLlm, + tools, + stopWhen: stepCountIs(MAX_STEPS), + onStepFinish: ({ finishReason, usage }) => { + log.debug({ conversationId, finishReason, usage }, 'Chat step finished') + }, + onFinish: async ({ response, usage }) => { + const updatedMessages = [...allMessages, ...response.messages] + try { + await conversationRepo().update(conversationId, { + messages: updatedMessages, + ...(pendingTitle ? { title: pendingTitle } : {}), + ...(isNil(conversation.modelName) ? { modelName } : {}), + }) + } + catch (saveErr) { + log.error({ err: saveErr, conversationId }, 'Failed to persist conversation messages') + } + + log.info({ + conversationId, + inputTokens: usage.inputTokens, + outputTokens: usage.outputTokens, + provider: providerConfig.provider, + }, 'Chat message completed') + }, + onError: ({ error }) => { + log.error({ err: error, conversationId }, 'Chat streamText error') + }, + }) - return { result, closeMcpClient } - } - catch (err) { - await closeMcpClient() - throw err - } + writer.merge(textStream.toUIMessageStream()) + }, + }) + + return { stream, closeMcpClient } }, }) @@ -386,9 +389,6 @@ type SendMessageParams = { } type SendMessageResult = { - result: { - pipeUIMessageStreamToResponse(response: ServerResponse, options?: Record): void - consumeStream(): PromiseLike - } + stream: ReadableStream closeMcpClient: () => Promise } diff --git a/packages/server/api/src/app/chat/mcp/chat-mcp.ts b/packages/server/api/src/app/chat/mcp/chat-mcp.ts index 1e27570c5be..2f3178545a8 100644 --- a/packages/server/api/src/app/chat/mcp/chat-mcp.ts +++ b/packages/server/api/src/app/chat/mcp/chat-mcp.ts @@ -1,9 +1,14 @@ -import { isNil, tryCatch } from '@activepieces/shared' +import { apId, isNil, tryCatch } from '@activepieces/shared' import { createMCPClient } from '@ai-sdk/mcp' import { FastifyBaseLogger } from 'fastify' import { system } from '../../helper/system/system' import { AppSystemProp } from '../../helper/system/system-props' import { mcpOAuthTokenService } from '../../mcp/oauth/token/mcp-oauth-token.service' +import { chatApprovalGate } from '../chat-approval-gate' + +type StreamWriter = { + write(part: Record): void +} async function getMcpCredentials({ platformId, userId, log }: { platformId: string @@ -46,6 +51,76 @@ async function connectMcpClient({ mcpCredentials, log }: { return { mcpClient: client, mcpToolSet } } +const AP_TOOLS_REQUIRING_APPROVAL = new Set([ + 'ap_delete_table', + 'ap_delete_step', + 'ap_delete_branch', + 'ap_delete_records', + 'ap_run_action', + 'ap_test_step', + 'ap_test_flow', + 'ap_change_flow_status', +]) + +function humanizeToolName(name: string): string { + return name + .replace(/^ap_/, '') + .replace(/_/g, ' ') + .replace(/\b\w/g, (c) => c.toUpperCase()) +} + +function requiresApproval(name: string): boolean { + return AP_TOOLS_REQUIRING_APPROVAL.has(name) || !name.startsWith('ap_') +} + +function hasExecute(tool: object): tool is object & { execute: (args: unknown) => Promise } { + return 'execute' in tool && typeof tool.execute === 'function' +} + +function withApprovalGates({ mcpToolSet, writer, log }: { + mcpToolSet: Record + writer: StreamWriter + log: FastifyBaseLogger +}): Record { + const result: Record = {} + + for (const [name, tool] of Object.entries(mcpToolSet)) { + if (!requiresApproval(name) || typeof tool !== 'object' || tool === null || !hasExecute(tool)) { + result[name] = tool + continue + } + + const originalExecute = tool.execute.bind(tool) + result[name] = Object.assign({}, tool, { + execute: async (args: unknown) => { + const gateId = apId() + const displayName = typeof args === 'object' && args !== null && 'displayName' in args && typeof args.displayName === 'string' + ? args.displayName + : humanizeToolName(name) + + writer.write({ + type: 'data-approval-request', + data: { gateId, toolName: name, displayName }, + transient: true, + }) + + log.info({ gateId, toolName: name }, 'Tool approval gate opened — waiting for user') + const approved = await chatApprovalGate.waitForApproval({ gateId }) + + if (!approved) { + log.info({ gateId, toolName: name }, 'Tool approval rejected or timed out') + return { content: [{ type: 'text', text: 'Action cancelled by user.' }] } + } + + log.info({ gateId, toolName: name }, 'Tool approval granted — executing') + return originalExecute(args) + }, + }) + } + + return result +} + type McpCredentials = { mcpServerUrl: string | null mcpToken: string | null @@ -59,4 +134,5 @@ type McpConnection = { export const chatMcp = { getCredentials: getMcpCredentials, connectClient: connectMcpClient, + withApprovalGates, } diff --git a/packages/server/api/src/app/mcp/tools/ap-delete-branch.ts b/packages/server/api/src/app/mcp/tools/ap-delete-branch.ts index 171680c521c..9e62a65460e 100644 --- a/packages/server/api/src/app/mcp/tools/ap-delete-branch.ts +++ b/packages/server/api/src/app/mcp/tools/ap-delete-branch.ts @@ -27,6 +27,7 @@ export const apDeleteBranchTool = (mcp: ProjectScopedMcpServer, log: FastifyBase flowId: z.string().describe('The id of the flow'), routerStepName: z.string().describe('The name of the ROUTER step. Use ap_flow_structure to get valid values.'), branchIndex: z.number().describe('The index of the branch to delete (0-based). Cannot delete the fallback/last branch.'), + displayName: z.string().optional().describe('Short approval prompt shown to the user (e.g. "Delete branch 2 from router"). Must include what the action does and the target name.'), }, annotations: { destructiveHint: true, openWorldHint: false }, execute: async (args) => { diff --git a/packages/server/api/src/app/mcp/tools/ap-delete-records.ts b/packages/server/api/src/app/mcp/tools/ap-delete-records.ts index 31ee586a99f..b46b1e48726 100644 --- a/packages/server/api/src/app/mcp/tools/ap-delete-records.ts +++ b/packages/server/api/src/app/mcp/tools/ap-delete-records.ts @@ -6,6 +6,7 @@ import { mcpUtils } from './mcp-utils' const deleteRecordsInput = z.object({ recordIds: z.array(z.string()).describe('Array of record IDs to delete. Use ap_find_records to find them.'), + displayName: z.string().optional().describe('Short approval prompt shown to the user (e.g. "Delete 3 records from Emails table"). Must include what the action does and the target name.'), }) export const apDeleteRecordsTool = (mcp: ProjectScopedMcpServer, log: FastifyBaseLogger): McpToolDefinition => { diff --git a/packages/server/api/src/app/mcp/tools/ap-delete-step.ts b/packages/server/api/src/app/mcp/tools/ap-delete-step.ts index f537f40d20a..bd4ffd5120f 100644 --- a/packages/server/api/src/app/mcp/tools/ap-delete-step.ts +++ b/packages/server/api/src/app/mcp/tools/ap-delete-step.ts @@ -26,6 +26,7 @@ export const apDeleteStepTool = (mcp: ProjectScopedMcpServer, log: FastifyBaseLo inputSchema: { flowId: z.string().describe('The id of the flow'), stepName: z.string().describe('The name of the step to delete. Use ap_flow_structure to get valid values.'), + displayName: z.string().optional().describe('Short approval prompt shown to the user (e.g. "Delete Send Email step"). Must include what the action does and the target name.'), }, annotations: { destructiveHint: true, openWorldHint: false }, execute: async (args) => { diff --git a/packages/server/api/src/app/mcp/tools/ap-delete-table.ts b/packages/server/api/src/app/mcp/tools/ap-delete-table.ts index 3564bc01782..1bbd5d43fd3 100644 --- a/packages/server/api/src/app/mcp/tools/ap-delete-table.ts +++ b/packages/server/api/src/app/mcp/tools/ap-delete-table.ts @@ -6,6 +6,7 @@ import { mcpUtils } from './mcp-utils' const deleteTableInput = z.object({ tableId: z.string().describe('The ID of the table to delete. Use ap_list_tables to find it.'), + displayName: z.string().optional().describe('Short approval prompt shown to the user (e.g. "Delete Customer Emails table"). Must include what the action does and the target name.'), }) export const apDeleteTableTool = (mcp: ProjectScopedMcpServer, log: FastifyBaseLogger): McpToolDefinition => { diff --git a/packages/server/api/src/app/mcp/tools/ap-test-flow.ts b/packages/server/api/src/app/mcp/tools/ap-test-flow.ts index 47cf27bd375..62503a9be0e 100644 --- a/packages/server/api/src/app/mcp/tools/ap-test-flow.ts +++ b/packages/server/api/src/app/mcp/tools/ap-test-flow.ts @@ -6,6 +6,7 @@ import { mcpUtils } from './mcp-utils' const testFlowInput = z.object({ flowId: z.string().describe('The ID of the flow to test. Use ap_list_flows to find it.'), + displayName: z.string().optional().describe('Short approval prompt shown to the user (e.g. "Test Send Welcome Email"). Must include what the action does and the target name.'), triggerTestData: z.record(z.string(), z.unknown()).optional().describe('Mock trigger output data. Saved as sample data before running the test. Useful when the trigger has no prior test data.'), }) @@ -15,7 +16,7 @@ export const apTestFlowTool = (mcp: ProjectScopedMcpServer, log: FastifyBaseLogg permission: Permission.WRITE_FLOW, description: 'Test a flow end-to-end in the test environment. Requires a configured trigger. Waits up to 120s. Pass triggerTestData to provide mock trigger output when no sample data exists.', inputSchema: testFlowInput.shape, - annotations: { destructiveHint: false, idempotentHint: false, openWorldHint: false }, + annotations: { destructiveHint: false, idempotentHint: false, openWorldHint: true }, execute: async (args) => { try { const { flowId, triggerTestData } = testFlowInput.parse(args) diff --git a/packages/server/api/src/app/mcp/tools/ap-test-step.ts b/packages/server/api/src/app/mcp/tools/ap-test-step.ts index 978090a65e1..c5a36d4c024 100644 --- a/packages/server/api/src/app/mcp/tools/ap-test-step.ts +++ b/packages/server/api/src/app/mcp/tools/ap-test-step.ts @@ -7,6 +7,7 @@ import { mcpUtils } from './mcp-utils' const testStepInput = z.object({ flowId: z.string().describe('The ID of the flow containing the step. Use ap_list_flows to find it.'), stepName: z.string().describe('The name of the step to test (e.g., "step_1"). Use ap_flow_structure to find it.'), + displayName: z.string().optional().describe('Short approval prompt shown to the user (e.g. "Test Send Email step in Welcome Flow"). Must include what the action does and the target name.'), triggerTestData: z.record(z.string(), z.unknown()).optional().describe('Mock trigger output data. Saved as sample data before running the test. Useful when the trigger has no prior test data.'), }) @@ -16,7 +17,7 @@ export const apTestStepTool = (mcp: ProjectScopedMcpServer, log: FastifyBaseLogg permission: Permission.WRITE_FLOW, description: 'Test a single step within a flow. Runs all steps up to and including the specified step. The flow must have a configured trigger. Pass triggerTestData when no sample data exists.', inputSchema: testStepInput.shape, - annotations: { destructiveHint: false, idempotentHint: false, openWorldHint: false }, + annotations: { destructiveHint: false, idempotentHint: false, openWorldHint: true }, execute: async (args) => { try { const { flowId, stepName, triggerTestData } = testStepInput.parse(args) diff --git a/packages/server/api/src/app/workers/job-queue/job-broker.ts b/packages/server/api/src/app/workers/job-queue/job-broker.ts index e746cd20b77..8a16d819d84 100644 --- a/packages/server/api/src/app/workers/job-queue/job-broker.ts +++ b/packages/server/api/src/app/workers/job-queue/job-broker.ts @@ -89,6 +89,22 @@ async function tryDequeue(worker: BullMQWorker, queueName: string, log: FastifyB if (isNil(job)) { return null // waiting list empty — drainDelay provided backpressure } + + if (job.deferredFailure) { + log.warn( + { queueName, jobId: job.id, jobName: job.name, deferredFailure: job.deferredFailure }, + '[jobBroker#tryDequeue] Failing job with deferred failure (BullMQ stalled limit exceeded)', + ) + const { error: failError } = await tryCatch(() => job.moveToFailed(new Error(job.deferredFailure), token, false)) + if (failError) { + log.error( + { queueName, jobId: job.id, error: String(failError) }, + '[jobBroker#tryDequeue] Failed to fail deferred-failure job', + ) + } + return tryDequeue(worker, queueName, log) + } + log.info({ queueName, jobId: job.id, jobName: job.name }, '[jobBroker#tryDequeue] Dequeued job') const originalSchemaVersion = (job.data as Record).schemaVersion diff --git a/packages/server/api/src/assets/prompts/chat-system-prompt.md b/packages/server/api/src/assets/prompts/chat-system-prompt.md index e193f8647d1..ca3d88329a1 100644 --- a/packages/server/api/src/assets/prompts/chat-system-prompt.md +++ b/packages/server/api/src/assets/prompts/chat-system-prompt.md @@ -26,8 +26,8 @@ You have access to tools for reading data, building automations, managing tables Tool risk levels: - **Read-only** (ap_list_flows, ap_list_connections, ap_find_records, ap_flow_structure, ap_list_runs, ap_get_run): Use freely. No confirmation needed. - **Write** (ap_create_flow, ap_add_step, ap_update_trigger, ap_insert_records, ap_manage_fields): Use after the user approves a proposal or explicitly requests the action. -- **Destructive** (ap_delete_step, ap_delete_table, ap_delete_records, ap_change_flow_status): Always confirm before executing. List what will be affected, show "Yes, proceed" / "Cancel" quick-replies, and wait. -- **Connection-bound** (ap_run_action, ap_test_step — anything that sends data through an external service): Always show a confirmation card first: what action, which connection, which project. +- **Destructive** (ap_delete_step, ap_delete_table, ap_delete_records, ap_change_flow_status): The system will automatically prompt the user for approval before executing. Do NOT add your own confirmation — just call the tool directly when the user asks. +- **Connection-bound** (ap_run_action, ap_test_step, ap_test_flow — anything that sends data through an external service): The system will automatically prompt the user for approval before executing. Do NOT add your own confirmation — just call the tool directly. Error handling: - If a tool call fails, retry ONCE silently. diff --git a/packages/server/api/test/integration/ce/workers/job-broker-seed-cause.test.ts b/packages/server/api/test/integration/ce/workers/job-broker-seed-cause.test.ts new file mode 100644 index 00000000000..d09ae633def --- /dev/null +++ b/packages/server/api/test/integration/ce/workers/job-broker-seed-cause.test.ts @@ -0,0 +1,205 @@ +import { FastifyInstance } from 'fastify' +import { setupTestEnvironment, teardownTestEnvironment } from '../../../helpers/test-setup' +import { mockAndSaveBasicSetup } from '../../../helpers/mocks' +import { jobBroker } from '../../../../src/app/workers/job-queue/job-broker' +import { jobQueue, JobType } from '../../../../src/app/workers/job-queue/job-queue' +import { redisConnections } from '../../../../src/app/database/redis-connections' +import { QueueName } from '../../../../src/app/workers/job' +import { + apId, + EngineResponseStatus, + LATEST_JOB_DATA_SCHEMA_VERSION, + TriggerHookType, + WorkerJobType, +} from '@activepieces/shared' + +let app: FastifyInstance + +beforeAll(async () => { + app = await setupTestEnvironment() + await jobBroker(app.log).init() +}) + +afterAll(async () => { + await jobBroker(app.log).close() + await teardownTestEnvironment() +}) + +const jobKey = (jobId: string): string => `bull:${QueueName.WORKER_JOBS}:${jobId}` +const lockKey = (jobId: string): string => `${jobKey(jobId)}:lock` +const activeKey = (): string => `bull:${QueueName.WORKER_JOBS}:active` +const completedKey = (): string => `bull:${QueueName.WORKER_JOBS}:completed` +const failedKey = (): string => `bull:${QueueName.WORKER_JOBS}:failed` + +/** + * Reproduces the seed cause behind the production "stuck-active" zombies: + * provisionFlowPieces (or any work between getNextJob and completeJob) takes + * longer than `lockDuration` (120s in prod). The Redis lock for the job + * expires. By the time the worker calls completeJob with its original token, + * BullMQ's moveToFinished Lua script returns `-2 Missing lock`, which the + * broker classifies as a "stalled job error" and silently swallows. The job + * never receives an LREM from the active list, attemptsMade never increments, + * and the job becomes a zombie that the BullMQ 5.61 stalled-scan loop + * recycles forever (until our fix in tryDequeue). + * + * This test simulates the > 120s gap by directly DELing the lock key after + * the dequeue, then calling completeJob with the (now stale) token. + */ +describe('jobBroker.completeJob — seed cause for stuck-active zombies', () => { + it('SEED: silently swallows "Missing lock" on moveToCompleted, leaving job in active', async () => { + const { mockPlatform, mockProject } = await mockAndSaveBasicSetup() + const requestId = apId() + + const jobData = { + jobType: WorkerJobType.EXECUTE_TRIGGER_HOOK, + platformId: mockPlatform.id, + projectId: mockProject.id, + schemaVersion: LATEST_JOB_DATA_SCHEMA_VERSION, + flowId: apId(), + flowVersionId: apId(), + test: false, + hookType: TriggerHookType.ON_ENABLE, + requestId, + webserverId: 'seed-cause-test', + } + + const jobId = apId() + await jobQueue(app.log).add({ + type: JobType.ONE_TIME, + id: jobId, + data: jobData, + }) + + const polledJob = await jobBroker(app.log).poll() + expect(polledJob).not.toBeNull() + expect(polledJob!.jobId).toBe(jobId) + + const redis = await redisConnections.useExisting() + + const lockBefore = await redis.get(lockKey(jobId)) + expect(lockBefore).toBe(polledJob!.token) + const activeBefore = await redis.lrange(activeKey(), 0, -1) + expect(activeBefore).toContain(jobId) + + // Simulate cold-cache provisioning (or any > lockDuration delay) that lets + // the lock TTL expire before completeJob runs. + const deleted = await redis.del(lockKey(jobId)) + expect(deleted).toBe(1) + + await expect( + jobBroker(app.log).completeJob({ + jobId, + token: polledJob!.token, + queueName: polledJob!.queueName, + status: EngineResponseStatus.LOG_SIZE_EXCEEDED, + logs: 'simulated large logs', + }), + ).resolves.toBeUndefined() + + const activeAfter = await redis.lrange(activeKey(), 0, -1) + const completedAfter = await redis.zrange(completedKey(), 0, -1) + const failedAfter = await redis.zrange(failedKey(), 0, -1) + const atm = await redis.hget(jobKey(jobId), 'atm') + + expect(activeAfter).toContain(jobId) + expect(completedAfter).not.toContain(jobId) + expect(failedAfter).not.toContain(jobId) + expect(atm == null || atm === '0').toBe(true) + + await redis.lrem(activeKey(), 0, jobId) + await redis.del(jobKey(jobId)) + }) + + it('SEED: same swallow happens on the INTERNAL_ERROR -> moveToFailed path', async () => { + const { mockPlatform, mockProject } = await mockAndSaveBasicSetup() + const requestId = apId() + + const jobData = { + jobType: WorkerJobType.EXECUTE_TRIGGER_HOOK, + platformId: mockPlatform.id, + projectId: mockProject.id, + schemaVersion: LATEST_JOB_DATA_SCHEMA_VERSION, + flowId: apId(), + flowVersionId: apId(), + test: false, + hookType: TriggerHookType.ON_ENABLE, + requestId, + webserverId: 'seed-cause-test-2', + } + + const jobId = apId() + await jobQueue(app.log).add({ + type: JobType.ONE_TIME, + id: jobId, + data: jobData, + }) + + const polledJob = await jobBroker(app.log).poll() + expect(polledJob).not.toBeNull() + + const redis = await redisConnections.useExisting() + await redis.del(lockKey(jobId)) + + await expect( + jobBroker(app.log).completeJob({ + jobId, + token: polledJob!.token, + queueName: polledJob!.queueName, + status: EngineResponseStatus.INTERNAL_ERROR, + errorMessage: 'sandbox died during cold-cache provision', + }), + ).resolves.toBeUndefined() + + const activeAfter = await redis.lrange(activeKey(), 0, -1) + const failedAfter = await redis.zrange(failedKey(), 0, -1) + const atm = await redis.hget(jobKey(jobId), 'atm') + + expect(activeAfter).toContain(jobId) + expect(failedAfter).not.toContain(jobId) + expect(atm == null || atm === '0').toBe(true) + + await redis.lrem(activeKey(), 0, jobId) + await redis.del(jobKey(jobId)) + }) + + it('CONTROL: when the lock IS still valid, completeJob removes the job from active and increments atm', async () => { + const { mockPlatform, mockProject } = await mockAndSaveBasicSetup() + const requestId = apId() + + const jobData = { + jobType: WorkerJobType.EXECUTE_TRIGGER_HOOK, + platformId: mockPlatform.id, + projectId: mockProject.id, + schemaVersion: LATEST_JOB_DATA_SCHEMA_VERSION, + flowId: apId(), + flowVersionId: apId(), + test: false, + hookType: TriggerHookType.ON_ENABLE, + requestId, + webserverId: 'seed-cause-control', + } + + const jobId = apId() + await jobQueue(app.log).add({ + type: JobType.ONE_TIME, + id: jobId, + data: jobData, + }) + + const polledJob = await jobBroker(app.log).poll() + expect(polledJob).not.toBeNull() + + const redis = await redisConnections.useExisting() + + await jobBroker(app.log).completeJob({ + jobId, + token: polledJob!.token, + queueName: polledJob!.queueName, + status: EngineResponseStatus.OK, + response: { ok: true }, + }) + + const activeAfter = await redis.lrange(activeKey(), 0, -1) + expect(activeAfter).not.toContain(jobId) + }) +}) diff --git a/packages/server/api/test/unit/app/workers/job-queue/job-broker.test.ts b/packages/server/api/test/unit/app/workers/job-queue/job-broker.test.ts index 4cfc3a1f834..d552d9bb290 100644 --- a/packages/server/api/test/unit/app/workers/job-queue/job-broker.test.ts +++ b/packages/server/api/test/unit/app/workers/job-queue/job-broker.test.ts @@ -41,13 +41,15 @@ const mockLog: FastifyBaseLogger = { level: 'info', } as unknown as FastifyBaseLogger -function createMockJob(id: string, data?: Record): Job { +function createMockJob(id: string, data?: Record, deferredFailure?: string): Job { return { id, name: `job-name-${id}`, data: { projectId: 'proj-1', platformId: 'plat-1', ...data }, attemptsMade: 0, + deferredFailure, moveToDelayed: vi.fn().mockResolvedValue(undefined), + moveToFailed: vi.fn().mockResolvedValue(undefined), changePriority: vi.fn().mockResolvedValue(undefined), updateData: vi.fn().mockResolvedValue(undefined), } as unknown as Job @@ -157,6 +159,45 @@ describe('tryDequeue', () => { expect(jobA.changePriority).toHaveBeenCalledWith({ priority: 10 }) }) + it('should fail job with deferredFailure and skip interceptors', async () => { + const zombieJob = createMockJob('zombie-1', undefined, 'job stalled more than allowable limit') + + vi.mocked(mockWorker.getNextJob) + .mockResolvedValueOnce(zombieJob) + .mockResolvedValueOnce(undefined as unknown as Job) + + const result = await tryDequeue(mockWorker, 'test-queue', mockLog) + + expect(result).toBeNull() + expect(zombieJob.moveToFailed).toHaveBeenCalledTimes(1) + const moveToFailedCall = vi.mocked(zombieJob.moveToFailed).mock.calls[0] + expect(moveToFailedCall[0]).toBeInstanceOf(Error) + expect((moveToFailedCall[0] as Error).message).toBe('job stalled more than allowable limit') + expect(moveToFailedCall[1]).toMatch(/^token-/) + expect(moveToFailedCall[2]).toBe(false) + expect(mockPreDispatch).not.toHaveBeenCalled() + expect(mockOnJobFinished).not.toHaveBeenCalled() + expect(mockWorker.getNextJob).toHaveBeenCalledTimes(2) + }) + + it('should keep draining when moveToFailed throws on a deferred-failure job', async () => { + const zombieJob = createMockJob('zombie-2', undefined, 'job stalled more than allowable limit') + vi.mocked(zombieJob.moveToFailed).mockRejectedValueOnce(new Error('Missing lock')) + const liveJob = createMockJob('live-1') + + vi.mocked(mockWorker.getNextJob) + .mockResolvedValueOnce(zombieJob) + .mockResolvedValueOnce(liveJob) + + mockPreDispatch.mockResolvedValueOnce({ verdict: InterceptorVerdict.ALLOW }) + + const result = await tryDequeue(mockWorker, 'test-queue', mockLog) + + expect(result).not.toBeNull() + expect(result!.jobId).toBe('live-1') + expect(mockWorker.getNextJob).toHaveBeenCalledTimes(2) + }) + it('should return null when queue is empty (no jobs at all)', async () => { vi.mocked(mockWorker.getNextJob).mockResolvedValueOnce(undefined as unknown as Job) diff --git a/packages/shared/package.json b/packages/shared/package.json index b2410234171..e2e4bcc5414 100644 --- a/packages/shared/package.json +++ b/packages/shared/package.json @@ -1,6 +1,6 @@ { "name": "@activepieces/shared", - "version": "0.71.2", + "version": "0.71.3", "type": "commonjs", "sideEffects": false, "main": "./dist/src/index.js", diff --git a/packages/web/public/locales/en/translation.json b/packages/web/public/locales/en/translation.json index e0afc5fbdd9..83d19387e40 100644 --- a/packages/web/public/locales/en/translation.json +++ b/packages/web/public/locales/en/translation.json @@ -1582,6 +1582,9 @@ "Private Chat": "Private Chat", "Ready to use": "Ready to use", "Regenerate": "Regenerate", + "No, cancel": "No, cancel", + "Yes, and don't ask me again": "Yes, and don't ask me again", + "Yes, proceed": "Yes, proceed", "Response stopped": "Response stopped", "Sandbox not configured": "Sandbox not configured", "Set up an AI provider to get started": "Set up an AI provider to get started", diff --git a/packages/web/src/app/routes/chat-with-ai/ai-chat-box.tsx b/packages/web/src/app/routes/chat-with-ai/ai-chat-box.tsx index ece7f9053ea..f807b59c18f 100644 --- a/packages/web/src/app/routes/chat-with-ai/ai-chat-box.tsx +++ b/packages/web/src/app/routes/chat-with-ai/ai-chat-box.tsx @@ -1,8 +1,8 @@ -import { AIProviderName } from '@activepieces/shared'; +import { AIProviderName, ProjectType } from '@activepieces/shared'; import { t } from 'i18next'; import { AlertTriangle, RefreshCw, Square } from 'lucide-react'; import { motion } from 'motion/react'; -import { useCallback, useEffect, useMemo, useState } from 'react'; +import { useCallback, useEffect, useMemo, useRef, useState } from 'react'; import { ChatContainerContent, @@ -12,6 +12,7 @@ import { import { ScrollButton } from '@/components/prompt-kit/scroll-button'; import { Button } from '@/components/ui/button'; import { useAgentChat } from '@/features/chat/lib/use-chat'; +import { useToolApproval } from '@/features/chat/lib/use-tool-approval'; import { aiProviderQueries } from '@/features/platform-admin'; import { projectCollectionUtils } from '@/features/projects'; @@ -27,6 +28,7 @@ import { ChatModelSelector } from './components/chat-model-selector'; import { ChatProjectSelector } from './components/chat-project-selector'; import { QuickReplies } from './components/message-content'; import { MultiQuestionForm } from './components/multi-question-form'; +import { ToolApprovalForm } from './components/tool-approval-form'; import { getTextFromParts, parseMultiQuestion, @@ -80,6 +82,7 @@ function ChatBoxContent({ setConversationId, setModelName, setProjectContext, + pendingApprovalRequest, } = useAgentChat({ onTitleUpdate, onConversationCreated }); const { data: allProjects } = projectCollectionUtils.useAll(); const projects = allProjects ?? []; @@ -95,6 +98,23 @@ function ChatBoxContent({ new Set(), ); + const didAutoSelectProjectRef = useRef(false); + useEffect(() => { + if ( + didAutoSelectProjectRef.current || + selectedProjectId !== null || + initialConversationId + ) + return; + const personalProject = projects.find( + (p) => p.type === ProjectType.PERSONAL, + ); + if (personalProject) { + didAutoSelectProjectRef.current = true; + void setProjectContext(personalProject.id); + } + }, [projects, selectedProjectId, initialConversationId, setProjectContext]); + useEffect(() => { if (initialConversationId) { void setConversationId(initialConversationId); @@ -130,6 +150,16 @@ function ChatBoxContent({ activeQuestions.length > 0 && !!lastMessage && !dismissedFormIds.has(lastMessage.id); + + const { + hasActiveApproval, + approvalDisplayName, + approve, + approveAndRemember, + reject, + dismiss: dismissApproval, + } = useToolApproval({ pendingApprovalRequest }); + const isEmpty = messages.length === 0 && !isLoadingHistory && !isStreaming; if (isEmpty) { @@ -243,7 +273,16 @@ function ChatBoxContent({
- {hasActiveForm ? ( + {hasActiveApproval ? ( + + ) : hasActiveForm ? ( p.id === selectedProjectId) : null; - const selectedColor = selectedProject - ? PROJECT_COLOR_PALETTE[selectedProject.icon.color] - : null; - return ( @@ -48,18 +51,13 @@ export function ChatProjectSelector({ > {selectedProject ? ( <> - - {selectedProject.displayName.charAt(0).toUpperCase()} - - - {selectedProject.displayName} - + +
+ +
+ + {APPROVAL_OPTIONS.map((option, i) => { + const id = `${fieldId}-opt-${i}`; + const isHovered = hoveredIndex === i; + const prevHovered = hoveredIndex === i - 1; + return ( + + {i > 0 && ( +
+ +
+ )} + +
+ ); + })} +
+
+ + ); +} diff --git a/packages/web/src/app/routes/chat-with-ai/components/tool-call-group.tsx b/packages/web/src/app/routes/chat-with-ai/components/tool-call-group.tsx index e86daef7bd5..227d830b13a 100644 --- a/packages/web/src/app/routes/chat-with-ai/components/tool-call-group.tsx +++ b/packages/web/src/app/routes/chat-with-ai/components/tool-call-group.tsx @@ -8,11 +8,9 @@ import { ChainOfThoughtStep, ChainOfThoughtTrigger, } from '@/components/prompt-kit/chain-of-thought'; -import { - extractToolContext, - ToolCallCard, -} from '@/features/chat/components/tool-call-card'; +import { ToolCallCard } from '@/features/chat/components/tool-call-card'; import { ChatUIMessage, DynamicToolPart } from '@/features/chat/lib/chat-types'; +import { chatUtils } from '@/features/chat/lib/chat-utils'; const PENDING_STATES = new Set([ 'input-streaming', @@ -131,7 +129,7 @@ function describeToolParts(parts: DynamicToolPart[]): string { for (const part of parts) { const name = (part.title ?? part.toolName).toLowerCase(); - const ctx = extractToolContext({ + const ctx = chatUtils.extractToolContext({ input: isObject(part.input) ? part.input : undefined, }); if (ctx && !contexts.includes(ctx)) contexts.push(ctx); diff --git a/packages/web/src/components/ui/sidebar-shadcn.tsx b/packages/web/src/components/ui/sidebar-shadcn.tsx index 508a0b23fe4..bbbe2d75165 100644 --- a/packages/web/src/components/ui/sidebar-shadcn.tsx +++ b/packages/web/src/components/ui/sidebar-shadcn.tsx @@ -819,6 +819,7 @@ export { SidebarMenuSub, SidebarMenuSubButton, SidebarMenuSubItem, + SidebarContext, SidebarProvider, SidebarRail, SidebarSeparator, diff --git a/packages/web/src/features/chat/components/tool-call-card.tsx b/packages/web/src/features/chat/components/tool-call-card.tsx index faf2f8f85bc..5c5f19fe0e5 100644 --- a/packages/web/src/features/chat/components/tool-call-card.tsx +++ b/packages/web/src/features/chat/components/tool-call-card.tsx @@ -10,55 +10,11 @@ import { CollapsibleTrigger, } from '@/components/ui/collapsible'; import { DynamicToolPart } from '@/features/chat/lib/chat-types'; -import { formatUtils } from '@/lib/format-utils'; +import { chatUtils } from '@/features/chat/lib/chat-utils'; import { cn } from '@/lib/utils'; type ToolStatus = 'running' | 'completed' | 'failed' | 'stopped'; -function humanizePieceName(raw: string): string { - return formatUtils.convertEnumToHumanReadable( - raw.replace(/^@activepieces\/piece-/, '').replace(/-/g, '_'), - ); -} - -export function extractToolContext({ - input, -}: { - input: Record | undefined; -}): string | null { - if (!input) return null; - const parts: string[] = []; - - if (typeof input.pieceName === 'string') { - parts.push(humanizePieceName(input.pieceName)); - } - if (typeof input.actionName === 'string' && input.actionName) { - parts.push(formatUtils.convertEnumToHumanReadable(input.actionName)); - } else if (typeof input.displayName === 'string' && input.displayName) { - parts.push(input.displayName); - } - if (typeof input.triggerName === 'string' && input.triggerName) { - parts.push(formatUtils.convertEnumToHumanReadable(input.triggerName)); - } - if (typeof input.flowId === 'string' && parts.length === 0) { - parts.push(input.flowId.slice(0, 8)); - } - if (typeof input.query === 'string' && parts.length === 0) { - parts.push( - `"${input.query.slice(0, 30)}${input.query.length > 30 ? '…' : ''}"`, - ); - } - if ( - isObject(input.settings) && - typeof input.settings.pieceName === 'string' && - parts.length === 0 - ) { - parts.push(humanizePieceName(input.settings.pieceName)); - } - - return parts.length > 0 ? parts.join(' ') : null; -} - function deriveStatus(part: DynamicToolPart): ToolStatus { if (part.state === 'output-available') return 'completed'; if (part.state === 'output-error') return 'failed'; @@ -78,20 +34,6 @@ function extractOutput(part: DynamicToolPart): string | undefined { return undefined; } -function formatToolLabel({ part }: { part: DynamicToolPart }): string { - const raw = part.title ?? part.toolName; - const mcpMatch = /^mcp__[^_]+__(.+)$/.exec(raw); - const name = mcpMatch ? mcpMatch[1] : raw; - const baseName = formatUtils.convertEnumToHumanReadable( - name.replace(/^ap_/, ''), - ); - - const input = isObject(part.input) ? part.input : undefined; - const context = extractToolContext({ input }); - if (!context) return baseName; - return `${baseName} — ${context}`; -} - function StatusIcon({ status }: { status: ToolStatus }) { switch (status) { case 'running': @@ -122,7 +64,7 @@ export function ToolCallCard({ toolPart }: { toolPart: DynamicToolPart }) { const status = deriveStatus(toolPart); const output = extractOutput(toolPart); const input = isObject(toolPart.input) ? toolPart.input : undefined; - const displayName = formatToolLabel({ part: toolPart }); + const displayName = chatUtils.formatToolLabel({ part: toolPart }); const hasInput = input && Object.keys(input).length > 0; const hasOutput = Boolean(output); const hasContent = hasInput || hasOutput; diff --git a/packages/web/src/features/chat/lib/chat-utils.ts b/packages/web/src/features/chat/lib/chat-utils.ts new file mode 100644 index 00000000000..21c476009f6 --- /dev/null +++ b/packages/web/src/features/chat/lib/chat-utils.ts @@ -0,0 +1,79 @@ +import { isObject } from '@activepieces/shared'; + +import { formatUtils } from '@/lib/format-utils'; + +import { DynamicToolPart } from './chat-types'; + +function humanizePieceName(raw: string): string { + return formatUtils.convertEnumToHumanReadable( + raw.replace(/^@activepieces\/piece-/, '').replace(/-/g, '_'), + ); +} + +function formatToolName({ + part, + includeContext = true, +}: { + part: DynamicToolPart; + includeContext?: boolean; +}): string { + const raw = part.title ?? part.toolName; + const mcpMatch = /^mcp__[^_]+__(.+)$/.exec(raw); + const name = mcpMatch ? mcpMatch[1] : raw; + const baseName = formatUtils.convertEnumToHumanReadable( + name.replace(/^ap_/, ''), + ); + + if (!includeContext) return baseName; + + const input = isObject(part.input) ? part.input : undefined; + const context = extractToolContext({ input }); + if (!context) return baseName; + return `${baseName} — ${context}`; +} + +function extractToolContext({ + input, +}: { + input: Record | undefined; +}): string | null { + if (!input) return null; + const parts: string[] = []; + + if (typeof input.pieceName === 'string') { + parts.push(humanizePieceName(input.pieceName)); + } + if (typeof input.actionName === 'string' && input.actionName) { + parts.push(formatUtils.convertEnumToHumanReadable(input.actionName)); + } else if (typeof input.displayName === 'string' && input.displayName) { + parts.push(input.displayName); + } + if (typeof input.triggerName === 'string' && input.triggerName) { + parts.push(formatUtils.convertEnumToHumanReadable(input.triggerName)); + } + if (typeof input.flowId === 'string' && parts.length === 0) { + parts.push(input.flowId.slice(0, 8)); + } + if (typeof input.query === 'string' && parts.length === 0) { + parts.push( + `"${input.query.slice(0, 30)}${input.query.length > 30 ? '…' : ''}"`, + ); + } + if ( + isObject(input.settings) && + typeof input.settings.pieceName === 'string' && + parts.length === 0 + ) { + parts.push(humanizePieceName(input.settings.pieceName)); + } + + return parts.length > 0 ? parts.join(' ') : null; +} + +export const chatUtils = { + formatToolLabel: ({ part }: { part: DynamicToolPart }) => + formatToolName({ part }), + formatToolActionName: ({ part }: { part: DynamicToolPart }) => + formatToolName({ part, includeContext: false }), + extractToolContext, +}; diff --git a/packages/web/src/features/chat/lib/use-chat.ts b/packages/web/src/features/chat/lib/use-chat.ts index 2bb1456c488..d79445bd91a 100644 --- a/packages/web/src/features/chat/lib/use-chat.ts +++ b/packages/web/src/features/chat/lib/use-chat.ts @@ -187,6 +187,11 @@ export function useAgentChat({ selectedProjectIdRef.current = value; _setSelectedProjectId(value); }, []); + const [pendingApprovalRequest, setPendingApprovalRequest] = useState<{ + gateId: string; + toolName: string; + displayName: string; + } | null>(null); const cancelledRef = useRef(false); const messageCountRef = useRef(0); const onTitleUpdateRef = useRef(onTitleUpdate); @@ -234,17 +239,33 @@ export function useAgentChat({ } = useChat({ transport, onData: (dataPart) => { + const data = dataPart.data; if ( dataPart.type === 'data-session-title' && - typeof dataPart.data === 'object' && - dataPart.data !== null && - typeof (dataPart.data as Record)['title'] === 'string' + typeof data === 'object' && + data !== null && + typeof (data as Record)['title'] === 'string' ) { onTitleUpdateRef.current?.( - (dataPart.data as Record)['title'] as string, + (data as Record)['title'] as string, conversationIdRef.current ?? undefined, ); } + if ( + dataPart.type === 'data-approval-request' && + typeof data === 'object' && + data !== null + ) { + const d = data as Record; + if (typeof d.gateId === 'string' && typeof d.toolName === 'string') { + setPendingApprovalRequest({ + gateId: d.gateId, + toolName: d.toolName, + displayName: + typeof d.displayName === 'string' ? d.displayName : d.toolName, + }); + } + } }, onError: () => { setPendingMessages([]); @@ -396,6 +417,7 @@ export function useAgentChat({ cancelledRef.current = false; setLocalError(null); setWasCancelled(false); + setPendingApprovalRequest(null); const fileNames = files?.map((f) => f.name) ?? []; lastSentFileNamesRef.current = fileNames; @@ -523,5 +545,6 @@ export function useAgentChat({ setConversationId, setModelName, setProjectContext, + pendingApprovalRequest, }; } diff --git a/packages/web/src/features/chat/lib/use-tool-approval.ts b/packages/web/src/features/chat/lib/use-tool-approval.ts new file mode 100644 index 00000000000..6c6e04501bb --- /dev/null +++ b/packages/web/src/features/chat/lib/use-tool-approval.ts @@ -0,0 +1,97 @@ +import { useCallback, useEffect, useRef, useState } from 'react'; + +import { API_URL } from '@/lib/api'; +import { authenticationSession } from '@/lib/authentication-session'; + +type ApprovalRequest = { + gateId: string; + toolName: string; + displayName: string; +}; + +async function sendApprovalDecision({ + gateId, + approved, +}: { + gateId: string; + approved: boolean; +}): Promise { + const token = authenticationSession.getToken(); + await fetch(`${API_URL}/v1/chat/tool-approvals/${gateId}`, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Authorization: `Bearer ${token}`, + }, + body: JSON.stringify({ approved }), + }); +} + +export function useToolApproval({ + pendingApprovalRequest, +}: { + pendingApprovalRequest: ApprovalRequest | null; +}) { + const autoApproveRef = useRef(false); + const [dismissed, setDismissed] = useState(false); + + useEffect(() => { + if (!pendingApprovalRequest) return; + setDismissed(false); + if (autoApproveRef.current) { + void sendApprovalDecision({ + gateId: pendingApprovalRequest.gateId, + approved: true, + }); + } + }, [pendingApprovalRequest]); + + const hasActiveApproval = + pendingApprovalRequest !== null && !autoApproveRef.current && !dismissed; + + const approve = useCallback(() => { + if (!pendingApprovalRequest) return; + setDismissed(true); + void sendApprovalDecision({ + gateId: pendingApprovalRequest.gateId, + approved: true, + }); + }, [pendingApprovalRequest]); + + const approveAndRemember = useCallback(() => { + if (!pendingApprovalRequest) return; + setDismissed(true); + autoApproveRef.current = true; + void sendApprovalDecision({ + gateId: pendingApprovalRequest.gateId, + approved: true, + }); + }, [pendingApprovalRequest]); + + const reject = useCallback(() => { + if (!pendingApprovalRequest) return; + setDismissed(true); + void sendApprovalDecision({ + gateId: pendingApprovalRequest.gateId, + approved: false, + }); + }, [pendingApprovalRequest]); + + const dismiss = useCallback(() => { + if (!pendingApprovalRequest) return; + setDismissed(true); + void sendApprovalDecision({ + gateId: pendingApprovalRequest.gateId, + approved: false, + }); + }, [pendingApprovalRequest]); + + return { + hasActiveApproval, + approvalDisplayName: pendingApprovalRequest?.displayName ?? null, + approve, + approveAndRemember, + reject, + dismiss, + }; +} diff --git a/packages/web/src/features/projects/components/ap-project-display.tsx b/packages/web/src/features/projects/components/ap-project-display.tsx index b661b5f8c40..c184c3dbb09 100644 --- a/packages/web/src/features/projects/components/ap-project-display.tsx +++ b/packages/web/src/features/projects/components/ap-project-display.tsx @@ -5,9 +5,10 @@ import { ProjectType, } from '@activepieces/shared'; import { User } from 'lucide-react'; +import { useContext } from 'react'; import { Avatar } from '@/components/ui/avatar'; -import { useSidebar } from '@/components/ui/sidebar-shadcn'; +import { SidebarContext } from '@/components/ui/sidebar-shadcn'; import { Tooltip, TooltipContent, @@ -16,11 +17,17 @@ import { } from '@/components/ui/tooltip'; import { cn } from '@/lib/utils'; +function useSidebarSafe(): string { + const context = useContext(SidebarContext); + return context?.state ?? 'expanded'; +} + type ApProjectDisplayProps = { title: string; icon?: ProjectIcon; containerClassName?: string; titleClassName?: string; + iconClassName?: string; maxLengthToNotShowTooltip?: number; projectType: ProjectType; inSidebar?: boolean; @@ -31,15 +38,19 @@ export const ApProjectDisplay = ({ icon, containerClassName = '', titleClassName = '', + iconClassName, maxLengthToNotShowTooltip = 30, projectType, inSidebar = false, }: ApProjectDisplayProps) => { - const { state } = useSidebar(); + const sidebarState = useSidebarSafe(); const projectAvatar = isNil(icon) ? null : projectType === ProjectType.TEAM ? ( ) : ( - + ); const shouldShowTooltip = title.length > maxLengthToNotShowTooltip; @@ -59,7 +72,7 @@ export const ApProjectDisplay = ({ const content = (
{projectAvatar} - {((inSidebar && state === 'expanded') || !inSidebar) && ( + {((inSidebar && sidebarState === 'expanded') || !inSidebar) && ( {displayText} )}