Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions packages/react-ui/public/locales/de/translation.json
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,9 @@
"Invalid Access": "",
"You tried to access a project that you do not have access to.": "",
"New Table": "",
"No, cancel": "",
"Yes, and don't ask me again": "",
"Yes, proceed": "",
"Response stopped": "",
"Retry": "",
"Reply...": "",
Expand Down
3 changes: 3 additions & 0 deletions packages/react-ui/public/locales/en/translation.json
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,9 @@
"Invalid Access": "",
"You tried to access a project that you do not have access to.": "",
"New Table": "",
"No, cancel": "",
"Yes, and don't ask me again": "",
"Yes, proceed": "",
"Response stopped": "",
"Retry": "",
"Reply...": "",
Expand Down
3 changes: 3 additions & 0 deletions packages/react-ui/public/locales/es/translation.json
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,9 @@
"Invalid Access": "",
"You tried to access a project that you do not have access to.": "",
"New Table": "",
"No, cancel": "",
"Yes, and don't ask me again": "",
"Yes, proceed": "",
"Response stopped": "",
"Retry": "",
"Reply...": "",
Expand Down
3 changes: 3 additions & 0 deletions packages/react-ui/public/locales/fr/translation.json
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,9 @@
"Invalid Access": "",
"You tried to access a project that you do not have access to.": "",
"New Table": "",
"No, cancel": "",
"Yes, and don't ask me again": "",
"Yes, proceed": "",
"Response stopped": "",
"Retry": "",
"Reply...": "",
Expand Down
3 changes: 3 additions & 0 deletions packages/react-ui/public/locales/ja/translation.json
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,9 @@
"Invalid Access": "",
"You tried to access a project that you do not have access to.": "",
"New Table": "",
"No, cancel": "",
"Yes, and don't ask me again": "",
"Yes, proceed": "",
"Response stopped": "",
"Retry": "",
"Reply...": "",
Expand Down
3 changes: 3 additions & 0 deletions packages/react-ui/public/locales/nl/translation.json
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,9 @@
"Invalid Access": "",
"You tried to access a project that you do not have access to.": "",
"New Table": "",
"No, cancel": "",
"Yes, and don't ask me again": "",
"Yes, proceed": "",
"Response stopped": "",
"Retry": "",
"Reply...": "",
Expand Down
3 changes: 3 additions & 0 deletions packages/react-ui/public/locales/pt/translation.json
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,9 @@
"Invalid Access": "",
"You tried to access a project that you do not have access to.": "",
"New Table": "",
"No, cancel": "",
"Yes, and don't ask me again": "",
"Yes, proceed": "",
"Response stopped": "",
"Retry": "",
"Reply...": "",
Expand Down
3 changes: 3 additions & 0 deletions packages/react-ui/public/locales/zh-TW/translation.json
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,9 @@
"Invalid Access": "",
"You tried to access a project that you do not have access to.": "",
"New Table": "",
"No, cancel": "",
"Yes, and don't ask me again": "",
"Yes, proceed": "",
"Response stopped": "",
"Retry": "",
"Reply...": "",
Expand Down
3 changes: 3 additions & 0 deletions packages/react-ui/public/locales/zh/translation.json
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,9 @@
"Invalid Access": "",
"You tried to access a project that you do not have access to.": "",
"New Table": "",
"No, cancel": "",
"Yes, and don't ask me again": "",
"Yes, proceed": "",
"Response stopped": "",
"Retry": "",
"Reply...": "",
Expand Down
54 changes: 54 additions & 0 deletions packages/server/api/src/app/chat/chat-approval-gate.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import { redisConnections } from '../database/redis-connections'
import { pubsub } from '../helper/pubsub'

const GATE_TIMEOUT_MS = 5 * 60 * 1000
const CHANNEL_PREFIX = 'tool-approval:'

function channelName(gateId: string): string {
return `${CHANNEL_PREFIX}${gateId}`
}

async function waitForApproval({ gateId }: { gateId: string }): Promise<boolean> {
const channel = channelName(gateId)
const subscriber = await redisConnections.create()

return new Promise<boolean>((resolve) => {
let settled = false

const cleanup = () => {
if (settled) return
settled = true
subscriber.unsubscribe(channel).then(() => subscriber.quit()).catch(() => undefined)
}

const timeout = setTimeout(() => {
cleanup()
resolve(false)
}, GATE_TIMEOUT_MS)

subscriber.on('message', (_ch, message) => {
if (_ch !== channel) return
clearTimeout(timeout)
cleanup()
try {
const parsed = JSON.parse(message)
resolve(parsed.approved === true)
}
catch {
resolve(false)
}
})

void subscriber.subscribe(channel)
})
}

async function resolveGate({ gateId, approved }: { gateId: string, approved: boolean }): Promise<void> {
const channel = channelName(gateId)
await pubsub.publish(channel, JSON.stringify({ approved }))
}

export const chatApprovalGate = {
waitForApproval,
resolveGate,
}
32 changes: 29 additions & 3 deletions packages/server/api/src/app/chat/chat-controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ import {
SetProjectContextRequest,
UpdateChatConversationRequest,
} from '@activepieces/shared'
import { pipeUIMessageStreamToResponse } from 'ai'
import { FastifyPluginAsyncZod } from 'fastify-type-provider-zod'
import { StatusCodes } from 'http-status-codes'
import { z } from 'zod'
import { securityAccess } from '../core/security/authorization/fastify-security'
import { chatApprovalGate } from './chat-approval-gate'
import { chatService } from './chat-service'

const CHAT_PRINCIPALS = [PrincipalType.USER] as const
Expand Down Expand Up @@ -72,7 +74,7 @@ export const chatController: FastifyPluginAsyncZod = async (app) => {
const { content, files } = request.body
const log = request.log

const { result, closeMcpClient } = await chatService(log).sendMessage({
const { stream, closeMcpClient } = await chatService(log).sendMessage({
conversationId: request.params.id,
userId: request.principal.id,
platformId: request.principal.platform.id,
Expand All @@ -83,12 +85,16 @@ export const chatController: FastifyPluginAsyncZod = async (app) => {
await reply.hijack()

try {
result.pipeUIMessageStreamToResponse(reply.raw, {
pipeUIMessageStreamToResponse({
response: reply.raw,
stream,
headers: {
'X-Accel-Buffering': 'no',
},
})
await result.consumeStream()
await new Promise<void>((resolve) => {
reply.raw.on('close', resolve)
})
}
catch (err: unknown) {
const isClientDisconnect = err instanceof Error && 'code' in err && err.code === 'ECONNRESET'
Expand All @@ -104,6 +110,14 @@ export const chatController: FastifyPluginAsyncZod = async (app) => {
}
})

app.post('/tool-approvals/:gateId', ToolApprovalRoute, async (request, reply) => {
await chatApprovalGate.resolveGate({
gateId: request.params.gateId,
approved: request.body.approved,
})
return reply.status(StatusCodes.OK).send({ success: true })
})

app.post('/conversations/:id/project-context', SetProjectContextRoute, async (request) => {
return chatService(request.log).setProjectContext({
id: request.params.id,
Expand Down Expand Up @@ -198,6 +212,18 @@ const SendMessageRoute = {
},
}

const ToolApprovalRoute = {
config: {
security: securityAccess.publicPlatform(CHAT_PRINCIPALS),
},
schema: {
tags: ['chat'],
security: [SERVICE_KEY_SECURITY_OPENAPI],
params: z.object({ gateId: z.string() }),
body: z.object({ approved: z.boolean() }),
},
}

const SetProjectContextRoute = {
config: {
security: securityAccess.publicPlatform(CHAT_PRINCIPALS),
Expand Down
94 changes: 47 additions & 47 deletions packages/server/api/src/app/chat/chat-service.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { ServerResponse } from 'http'
import {
ActivepiecesError,
AIProviderModelType,
Expand All @@ -16,7 +15,7 @@ import {
spreadIfDefined,
UpdateChatConversationRequest,
} from '@activepieces/shared'
import { LanguageModel, ModelMessage, stepCountIs, streamText } from 'ai'
import { createUIMessageStream, LanguageModel, ModelMessage, stepCountIs, streamText } from 'ai'
import { FastifyBaseLogger } from 'fastify'
import { aiProviderService } from '../ai/ai-provider-service'
import { repoFactory } from '../core/db/repo-factory'
Expand Down Expand Up @@ -164,6 +163,8 @@ export const chatService = (log: FastifyBaseLogger) => ({
const newUserMessage: ModelMessage = { role: 'user' as const, content: userContent }
const allMessages = [...previousMessages, newUserMessage]

await conversationRepo().update(conversationId, { messages: allMessages })

const compactionState = await resolveCompactionState({
conversation,
allMessages,
Expand Down Expand Up @@ -197,7 +198,6 @@ export const chatService = (log: FastifyBaseLogger) => ({
},
availableProjectIds: userProjects.map((p) => p.id),
})
const tools = { ...localTools, ...mcpToolSet }

const closeMcpClient = async (): Promise<void> => {
if (mcpClient) {
Expand All @@ -207,47 +207,50 @@ export const chatService = (log: FastifyBaseLogger) => ({
}
}

try {
const result = streamText({
model,
system: systemPrompt,
messages: messagesForLlm,
tools,
stopWhen: stepCountIs(MAX_STEPS),
onStepFinish: ({ finishReason, usage }) => {
log.debug({ conversationId, finishReason, usage }, 'Chat step finished')
},
onFinish: async ({ response, usage }) => {
const updatedMessages = [...allMessages, ...response.messages]
try {
await conversationRepo().update(conversationId, {
messages: updatedMessages,
...(pendingTitle ? { title: pendingTitle } : {}),
...(isNil(conversation.modelName) ? { modelName } : {}),
})
}
catch (saveErr) {
log.error({ err: saveErr, conversationId }, 'Failed to persist conversation messages')
}

log.info({
conversationId,
inputTokens: usage.inputTokens,
outputTokens: usage.outputTokens,
provider: providerConfig.provider,
}, 'Chat message completed')
},
onError: ({ error }) => {
log.error({ err: error, conversationId }, 'Chat streamText error')
},
})
const stream = createUIMessageStream({
execute: ({ writer }) => {
const gatedTools = chatMcp.withApprovalGates({ mcpToolSet, writer, log })
const tools = { ...localTools, ...gatedTools }

const textStream = streamText({
model,
system: systemPrompt,
messages: messagesForLlm,
tools,
stopWhen: stepCountIs(MAX_STEPS),
onStepFinish: ({ finishReason, usage }) => {
log.debug({ conversationId, finishReason, usage }, 'Chat step finished')
},
onFinish: async ({ response, usage }) => {
const updatedMessages = [...allMessages, ...response.messages]
try {
await conversationRepo().update(conversationId, {
messages: updatedMessages,
...(pendingTitle ? { title: pendingTitle } : {}),
...(isNil(conversation.modelName) ? { modelName } : {}),
})
}
catch (saveErr) {
log.error({ err: saveErr, conversationId }, 'Failed to persist conversation messages')
}

log.info({
conversationId,
inputTokens: usage.inputTokens,
outputTokens: usage.outputTokens,
provider: providerConfig.provider,
}, 'Chat message completed')
},
onError: ({ error }) => {
log.error({ err: error, conversationId }, 'Chat streamText error')
},
})

return { result, closeMcpClient }
}
catch (err) {
await closeMcpClient()
throw err
}
writer.merge(textStream.toUIMessageStream())
},
})

return { stream, closeMcpClient }
},

})
Expand Down Expand Up @@ -386,9 +389,6 @@ type SendMessageParams = {
}

type SendMessageResult = {
result: {
pipeUIMessageStreamToResponse(response: ServerResponse, options?: Record<string, unknown>): void
consumeStream(): PromiseLike<void>
}
stream: ReadableStream
closeMcpClient: () => Promise<void>
}
Loading
Loading