Skip to content

Commit 5357b82

Browse files
committed
fix(db-part-1): eliminate pool self-deadlock from nested checkouts inside transactions
1 parent bc55fc3 commit 5357b82

34 files changed

Lines changed: 813 additions & 241 deletions

File tree

apps/realtime/src/database/operations.ts

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,12 @@
11
import { AuditAction, AuditResourceType, recordAudit } from '@sim/audit'
22
import * as schema from '@sim/db'
3-
import { workflow, workflowBlocks, workflowEdges, workflowSubflows } from '@sim/db'
3+
import {
4+
instrumentPoolClient,
5+
workflow,
6+
workflowBlocks,
7+
workflowEdges,
8+
workflowSubflows,
9+
} from '@sim/db'
410
import { createLogger } from '@sim/logger'
511
import {
612
BLOCK_OPERATIONS,
@@ -27,13 +33,16 @@ const logger = createLogger('SocketDatabase')
2733

2834
const connectionString = env.DATABASE_URL
2935
const socketDb = drizzle(
30-
postgres(connectionString, {
31-
prepare: false,
32-
idle_timeout: 10,
33-
connect_timeout: 20,
34-
max: 15,
35-
onnotice: () => {},
36-
}),
36+
instrumentPoolClient(
37+
postgres(connectionString, {
38+
prepare: false,
39+
idle_timeout: 10,
40+
connect_timeout: 20,
41+
max: 15,
42+
onnotice: () => {},
43+
}),
44+
'socketDb'
45+
),
3746
{ schema }
3847
)
3948

apps/sim/app/api/credential-sets/[id]/members/route.ts

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -185,16 +185,24 @@ export const DELETE = withRouteHandler(
185185

186186
const requestId = generateId().slice(0, 8)
187187

188-
// Use transaction to ensure member deletion + webhook sync are atomic
189-
await db.transaction(async (tx) => {
190-
await tx.delete(credentialSetMember).where(eq(credentialSetMember.id, memberId))
191-
192-
const syncResult = await syncAllWebhooksForCredentialSet(id, requestId, tx)
188+
await db.delete(credentialSetMember).where(eq(credentialSetMember.id, memberId))
189+
190+
// Runs after the deletion commits: the sync performs external HTTP
191+
// (OAuth refresh, provider unsubscribe) and must not hold a pooled
192+
// connection. A sync failure must not fail the committed mutation —
193+
// it self-heals on the next membership change/deploy.
194+
try {
195+
const syncResult = await syncAllWebhooksForCredentialSet(id, requestId)
193196
logger.info('Synced webhooks after member removed', {
194197
credentialSetId: id,
195198
...syncResult,
196199
})
197-
})
200+
} catch (syncError) {
201+
logger.error('Webhook sync failed after member removal', {
202+
credentialSetId: id,
203+
error: syncError,
204+
})
205+
}
198206

199207
logger.info('Removed member from credential set', {
200208
credentialSetId: id,

apps/sim/app/api/credential-sets/invite/[token]/route.ts

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -194,17 +194,27 @@ export const POST = withRouteHandler(
194194
)
195195
)
196196
}
197+
})
197198

199+
// Runs after the membership commits: the sync performs external HTTP
200+
// (OAuth refresh, provider unsubscribe) and must not hold a pooled
201+
// connection. A sync failure must not fail the committed mutation —
202+
// it self-heals on the next membership change/deploy.
203+
try {
198204
const syncResult = await syncAllWebhooksForCredentialSet(
199205
invitation.credentialSetId,
200-
requestId,
201-
tx
206+
requestId
202207
)
203208
logger.info('Synced webhooks after member joined', {
204209
credentialSetId: invitation.credentialSetId,
205210
...syncResult,
206211
})
207-
})
212+
} catch (syncError) {
213+
logger.error('Webhook sync failed after invitation accept', {
214+
credentialSetId: invitation.credentialSetId,
215+
error: syncError,
216+
})
217+
}
208218

209219
logger.info('Accepted credential set invitation', {
210220
invitationId: invitation.id,

apps/sim/app/api/credential-sets/memberships/route.ts

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,6 @@ export const DELETE = withRouteHandler(async (req: NextRequest) => {
7474
try {
7575
const requestId = generateId().slice(0, 8)
7676

77-
// Use transaction to ensure revocation + webhook sync are atomic
7877
await db.transaction(async (tx) => {
7978
// Find and verify membership
8079
const [membership] = await tx
@@ -104,15 +103,26 @@ export const DELETE = withRouteHandler(async (req: NextRequest) => {
104103
updatedAt: new Date(),
105104
})
106105
.where(eq(credentialSetMember.id, membership.id))
106+
})
107107

108-
// Sync webhooks to remove this user's credential webhooks
109-
const syncResult = await syncAllWebhooksForCredentialSet(credentialSetId, requestId, tx)
108+
// Runs after the revocation commits: the sync performs external HTTP
109+
// (OAuth refresh, provider unsubscribe) and must not hold a pooled
110+
// connection. A sync failure must not fail the committed mutation —
111+
// it self-heals on the next membership change/deploy.
112+
try {
113+
const syncResult = await syncAllWebhooksForCredentialSet(credentialSetId, requestId)
110114
logger.info('Synced webhooks after member left', {
111115
credentialSetId,
112116
userId: session.user.id,
113117
...syncResult,
114118
})
115-
})
119+
} catch (syncError) {
120+
logger.error('Webhook sync failed after member left', {
121+
credentialSetId,
122+
userId: session.user.id,
123+
error: syncError,
124+
})
125+
}
116126

117127
logger.info('User left credential set', {
118128
credentialSetId,

apps/sim/app/api/v1/admin/workspaces/[id]/members/route.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ import {
4747
adminV1ListWorkspaceMembersContract,
4848
} from '@/lib/api/contracts/v1/admin'
4949
import { parseRequest } from '@/lib/api/server'
50+
import { isWorkspaceOnEnterprisePlan } from '@/lib/billing/core/subscription'
5051
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
5152
import { revokeWorkspaceCredentialMembershipsTx } from '@/lib/credentials/access'
5253
import { syncWorkspaceEnvCredentials } from '@/lib/credentials/environment'
@@ -247,7 +248,12 @@ export const POST = withRouteHandler(
247248
updatedAt: now,
248249
})
249250

250-
await applyWorkspaceAutoAddGroup(db, workspaceId, userId)
251+
await applyWorkspaceAutoAddGroup(
252+
db,
253+
workspaceId,
254+
userId,
255+
await isWorkspaceOnEnterprisePlan(workspaceId)
256+
)
251257

252258
logger.info(`Admin API: Added user ${userId} to workspace ${workspaceId}`, {
253259
permissions: permissionLevel,

apps/sim/app/api/workspaces/[id]/permissions/route.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import { type NextRequest, NextResponse } from 'next/server'
88
import { updateWorkspacePermissionsContract } from '@/lib/api/contracts/workspaces'
99
import { parseRequest } from '@/lib/api/server'
1010
import { getSession } from '@/lib/auth'
11+
import { isWorkspaceOnEnterprisePlan } from '@/lib/billing/core/subscription'
1112
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
1213
import { syncWorkspaceEnvCredentials } from '@/lib/credentials/environment'
1314
import { applyWorkspaceAutoAddGroup } from '@/lib/permission-groups/auto-add'
@@ -159,6 +160,12 @@ export const PATCH = withRouteHandler(
159160
existingPerms.map((p) => [p.userId, { permission: p.permissionType, email: p.email }])
160161
)
161162

163+
// Resolved before the transaction: the entitlement check reads billing
164+
// tables on the global pool and must not run while the tx holds a
165+
// pooled connection.
166+
const hasNewMembers = body.updates.some((update) => !permLookup.has(update.userId))
167+
const autoAddEntitled = hasNewMembers ? await isWorkspaceOnEnterprisePlan(workspaceId) : false
168+
162169
await db.transaction(async (tx) => {
163170
for (const update of body.updates) {
164171
const isNew = !permLookup.has(update.userId)
@@ -184,7 +191,7 @@ export const PATCH = withRouteHandler(
184191
})
185192

186193
if (isNew) {
187-
await applyWorkspaceAutoAddGroup(tx, workspaceId, update.userId)
194+
await applyWorkspaceAutoAddGroup(tx, workspaceId, update.userId, autoAddEntitled)
188195
}
189196
}
190197
})

apps/sim/lib/billing/core/usage-log.ts

Lines changed: 30 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -77,11 +77,7 @@ interface UsageEntry {
7777
metadata?: UsageLogMetadata
7878
}
7979

80-
/**
81-
* Parameters for the central recordUsage function.
82-
* This is the single entry point for all billing mutations.
83-
*/
84-
export interface RecordUsageParams {
80+
interface RecordUsageBaseParams {
8581
/** The user being charged */
8682
userId: string
8783
/** One or more usage_log entries to record. Total cost is derived from these. */
@@ -92,19 +88,37 @@ export interface RecordUsageParams {
9288
workflowId?: string
9389
/** Execution context */
9490
executionId?: string
95-
/** Billing entity scope, resolved by caller when already known. */
96-
billingEntity?: BillingEntity
97-
/** Billing period bounds, resolved by caller when already known. */
98-
billingPeriod?: { start: Date; end: Date }
99-
/**
100-
* Optional transaction to run the ledger INSERT in. Callers that reconcile a
101-
* read-then-insert under a lock (e.g. the per-execution advisory lock in the
102-
* workflow completion path) pass their tx so the insert participates in the
103-
* same locked transaction. Defaults to the pooled db.
104-
*/
105-
tx?: DbOrTx
10691
}
10792

93+
/**
94+
* Parameters for the central recordUsage function.
95+
* This is the single entry point for all billing mutations.
96+
*
97+
* Callers that pass `tx` (e.g. the per-execution advisory-lock reconciliation
98+
* in the workflow completion path) must pre-resolve the billing context before
99+
* opening the transaction: resolving it inside would run the subscription
100+
* lookups on the global pool while the tx already holds a pooled connection,
101+
* starving the pool under load (see recordCumulativeUsage for the history).
102+
*/
103+
export type RecordUsageParams = RecordUsageBaseParams &
104+
(
105+
| {
106+
/** Transaction the ledger INSERT participates in. */
107+
tx: DbOrTx
108+
/** Billing entity scope, resolved before the transaction opened. */
109+
billingEntity: BillingEntity
110+
/** Billing period bounds, resolved before the transaction opened. */
111+
billingPeriod: { start: Date; end: Date }
112+
}
113+
| {
114+
tx?: undefined
115+
/** Billing entity scope, resolved by caller when already known. */
116+
billingEntity?: BillingEntity
117+
/** Billing period bounds, resolved by caller when already known. */
118+
billingPeriod?: { start: Date; end: Date }
119+
}
120+
)
121+
108122
export function stableEventKey(parts: Record<string, unknown>): string {
109123
const payload = Object.keys(parts)
110124
.sort()

apps/sim/lib/invitations/core.test.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ const {
1515
mockSyncUsageLimitsFromSubscription,
1616
mockSyncWorkspaceEnvCredentials,
1717
mockApplyWorkspaceAutoAddGroup,
18+
mockIsWorkspaceOnEnterprisePlan,
1819
mockFeatureFlags,
1920
} = vi.hoisted(() => ({
2021
mockEnsureUserInOrganization: vi.fn(),
@@ -27,6 +28,7 @@ const {
2728
mockSyncUsageLimitsFromSubscription: vi.fn(),
2829
mockSyncWorkspaceEnvCredentials: vi.fn(),
2930
mockApplyWorkspaceAutoAddGroup: vi.fn(),
31+
mockIsWorkspaceOnEnterprisePlan: vi.fn(async () => true),
3032
mockFeatureFlags: { isBillingEnabled: true },
3133
}))
3234

@@ -60,6 +62,10 @@ vi.mock('@/lib/auth/active-organization', () => ({
6062
setActiveOrganizationForCurrentSession: mockSetActiveOrganizationForCurrentSession,
6163
}))
6264

65+
vi.mock('@/lib/billing/core/subscription', () => ({
66+
isWorkspaceOnEnterprisePlan: mockIsWorkspaceOnEnterprisePlan,
67+
}))
68+
6369
vi.mock('@/lib/billing/core/usage', () => ({
6470
syncUsageLimitsFromSubscription: mockSyncUsageLimitsFromSubscription,
6571
}))

apps/sim/lib/invitations/core.ts

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import { createLogger } from '@sim/logger'
1717
import { generateId } from '@sim/utils/id'
1818
import { and, eq, inArray, lte } from 'drizzle-orm'
1919
import { setActiveOrganizationForCurrentSession } from '@/lib/auth/active-organization'
20+
import { isWorkspaceOnEnterprisePlan } from '@/lib/billing/core/subscription'
2021
import { syncUsageLimitsFromSubscription } from '@/lib/billing/core/usage'
2122
import {
2223
acquireOrgMembershipLock,
@@ -433,6 +434,14 @@ export async function acceptInvitation(
433434

434435
const acceptedWorkspaceIds: string[] = []
435436

437+
// Resolved before the transaction: the entitlement check reads billing
438+
// tables on the global pool, which must not run while the tx below holds a
439+
// pooled connection and the org-membership advisory lock.
440+
const autoAddEntitlementByWorkspace = new Map<string, boolean>()
441+
for (const workspaceId of new Set(inv.grants.map((grant) => grant.workspaceId))) {
442+
autoAddEntitlementByWorkspace.set(workspaceId, await isWorkspaceOnEnterprisePlan(workspaceId))
443+
}
444+
436445
try {
437446
await db.transaction(async (tx) => {
438447
/**
@@ -502,7 +511,12 @@ export async function acceptInvitation(
502511
})
503512
}
504513

505-
await applyWorkspaceAutoAddGroup(tx, grant.workspaceId, input.userId)
514+
await applyWorkspaceAutoAddGroup(
515+
tx,
516+
grant.workspaceId,
517+
input.userId,
518+
autoAddEntitlementByWorkspace.get(grant.workspaceId) ?? false
519+
)
506520

507521
acceptedWorkspaceIds.push(grant.workspaceId)
508522
}

0 commit comments

Comments
 (0)