Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 34 additions & 1 deletion apps/sim/app/api/webhooks/agentmail/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,51 @@ import {
webhookSvixHeadersSchema,
} from '@/lib/api/contracts/webhooks'
import { isTriggerDevEnabled } from '@/lib/core/config/feature-flags'
import {
assertContentLengthWithinLimit,
isPayloadSizeLimitError,
readStreamToBufferWithLimit,
} from '@/lib/core/utils/stream-limits'
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
import { executeInboxTask } from '@/lib/mothership/inbox/executor'
import type { AgentMailWebhookPayload, RejectionReason } from '@/lib/mothership/inbox/types'
import { WEBHOOK_MAX_BODY_BYTES } from '@/lib/webhooks/constants'

const logger = createLogger('AgentMailWebhook')

const AUTOMATED_SENDERS = ['mailer-daemon@', 'noreply@', 'no-reply@', 'postmaster@']
const MAX_EMAILS_PER_HOUR = 20

const AGENTMAIL_BODY_LABEL = 'AgentMail webhook body'

/**
* Bound the unauthenticated AgentMail webhook body before buffering it for Svix
* signature verification, so an oversized payload cannot exhaust pod memory.
*/
async function readAgentMailBody(req: Request): Promise<string> {
assertContentLengthWithinLimit(req.headers, WEBHOOK_MAX_BODY_BYTES, AGENTMAIL_BODY_LABEL)
const buffer = await readStreamToBufferWithLimit(req.body, {
maxBytes: WEBHOOK_MAX_BODY_BYTES,
label: AGENTMAIL_BODY_LABEL,
})
return new TextDecoder().decode(buffer)
}

export const POST = withRouteHandler(async (req: Request) => {
try {
const rawBody = await req.text()
let rawBody: string
try {
rawBody = await readAgentMailBody(req)
} catch (bodyError) {
if (isPayloadSizeLimitError(bodyError)) {
logger.warn('Rejected oversized AgentMail webhook body', {
maxBytes: WEBHOOK_MAX_BODY_BYTES,
observedBytes: bodyError.observedBytes,
})
return NextResponse.json({ error: 'Request body too large' }, { status: 413 })
}
throw bodyError
}
const headersResult = webhookSvixHeadersSchema.safeParse({
'svix-id': req.headers.get('svix-id'),
'svix-timestamp': req.headers.get('svix-timestamp'),
Expand Down
1 change: 1 addition & 0 deletions apps/sim/lib/core/config/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ export const env = createEnv({
ADMISSION_GATE_MAX_INFLIGHT: z.string().optional().default('500'), // Max concurrent in-flight execution requests per pod
API_MAX_JSON_BODY_BYTES: z.string().optional().default('52428800'),// Default max JSON request body size for contract routes (50 MB)
CHAT_MAX_REQUEST_BYTES: z.string().optional().default('230686720'),// Max request body size for the public deployed-chat endpoint (220 MB; covers 15 base64 file attachments)
WEBHOOK_MAX_REQUEST_BYTES: z.string().optional().default('10485760'),// Max request body size for public webhook receiver endpoints (10 MB; provider payloads rarely exceed a few MB)

// Rate Limiting Configuration
RATE_LIMIT_WINDOW_MS: z.string().optional().default('60000'), // Rate limit window duration in milliseconds (default: 1 minute)
Expand Down
12 changes: 12 additions & 0 deletions apps/sim/lib/webhooks/constants.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { env } from '@/lib/core/config/env'

/**
* Maximum size of a webhook request body read into memory. The webhook receivers
* are public and unauthenticated, so the body must be bounded before it is
* buffered to prevent a memory-exhaustion DoS. Provider payloads rarely exceed a
* few MB; defaults to 10 MB and is overridable via `WEBHOOK_MAX_REQUEST_BYTES`.
*
* Shared by every public webhook receiver so the cap is a single source of truth.
*/
export const WEBHOOK_MAX_BODY_BYTES =
Number.parseInt(env.WEBHOOK_MAX_REQUEST_BYTES, 10) || 10 * 1024 * 1024
24 changes: 22 additions & 2 deletions apps/sim/lib/webhooks/processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,14 @@ import { tryAdmit } from '@/lib/core/admission/gate'
import { getInlineJobQueue, getJobQueue, shouldExecuteInline } from '@/lib/core/async-jobs'
import type { AsyncExecutionCorrelation } from '@/lib/core/async-jobs/types'
import { isProd } from '@/lib/core/config/feature-flags'
import {
assertContentLengthWithinLimit,
isPayloadSizeLimitError,
readStreamToBufferWithLimit,
} from '@/lib/core/utils/stream-limits'
import { getEffectiveDecryptedEnv } from '@/lib/environment/utils'
import { preprocessExecution } from '@/lib/execution/preprocessing'
import { WEBHOOK_MAX_BODY_BYTES } from '@/lib/webhooks/constants'
import {
getPendingWebhookVerification,
matchesPendingWebhookVerificationProbe,
Expand Down Expand Up @@ -71,19 +77,33 @@ async function verifyCredentialSetBilling(credentialSetId: string): Promise<{
return { valid: true }
}

const WEBHOOK_BODY_LABEL = 'Webhook request body'

export async function parseWebhookBody(
request: NextRequest,
requestId: string
): Promise<{ body: unknown; rawBody: string } | NextResponse> {
let rawBody: string | null = null
try {
const requestClone = request.clone()
rawBody = await requestClone.text()
assertContentLengthWithinLimit(request.headers, WEBHOOK_MAX_BODY_BYTES, WEBHOOK_BODY_LABEL)

const buffer = await readStreamToBufferWithLimit(request.clone().body, {
maxBytes: WEBHOOK_MAX_BODY_BYTES,
label: WEBHOOK_BODY_LABEL,
})
rawBody = new TextDecoder().decode(buffer)

if (!rawBody || rawBody.length === 0) {
return { body: {}, rawBody: '' }
}
} catch (bodyError) {
if (isPayloadSizeLimitError(bodyError)) {
logger.warn(`[${requestId}] Rejected oversized webhook body`, {
maxBytes: WEBHOOK_MAX_BODY_BYTES,
observedBytes: bodyError.observedBytes,
})
return new NextResponse('Request body too large', { status: 413 })
}
logger.error(`[${requestId}] Failed to read request body`, {
error: toError(bodyError).message,
})
Expand Down
Loading