Skip to content

Commit b74a56d

Browse files
fix(db-part-4): enforce consistent cross-resource lock ordering (#5027)
1 parent 65c7029 commit b74a56d

12 files changed

Lines changed: 343 additions & 29 deletions

File tree

apps/sim/app/api/schedules/execute/route.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -426,7 +426,7 @@ async function recoverStaleDatabaseScheduleJobs(now: Date): Promise<void> {
426426

427427
await db.transaction(async (tx) => {
428428
const [lock] = await tx.execute<{ acquired: boolean }>(
429-
sql`SELECT pg_try_advisory_xact_lock(hashtext(${SCHEDULE_EXECUTION_QUEUE_NAME})) AS acquired`
429+
sql`SELECT pg_try_advisory_xact_lock(hashtextextended(${SCHEDULE_EXECUTION_QUEUE_NAME}, 0)) AS acquired`
430430
)
431431
if (!lock?.acquired) {
432432
logger.info(
@@ -527,7 +527,7 @@ async function tryStartDatabaseScheduleJob(jobId: string): Promise<DatabaseSched
527527

528528
return db.transaction(async (tx) => {
529529
const [lock] = await tx.execute<{ acquired: boolean }>(
530-
sql`SELECT pg_try_advisory_xact_lock(hashtext(${SCHEDULE_EXECUTION_QUEUE_NAME})) AS acquired`
530+
sql`SELECT pg_try_advisory_xact_lock(hashtextextended(${SCHEDULE_EXECUTION_QUEUE_NAME}, 0)) AS acquired`
531531
)
532532
if (!lock?.acquired) return 'capacity_full'
533533

apps/sim/app/api/workspaces/[id]/byok-keys/route.ts

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,13 @@ import { getUserEntityPermissions, getWorkspaceById } from '@/lib/workspaces/per
2121

2222
const logger = createLogger('WorkspaceBYOKKeysAPI')
2323

24+
/**
25+
* Bounds the per-provider BYOK advisory-lock wait so a stuck holder fails fast
26+
* (SQLSTATE 55P03) rather than hanging, even if the deployment lacks a
27+
* server-side `lock_timeout`. Transaction-scoped via `set_config(..., true)`.
28+
*/
29+
const WORKSPACE_BYOK_LOCK_TIMEOUT_MS = 5_000
30+
2431
function maskApiKey(key: string): string {
2532
if (key.length <= 8) {
2633
return '•'.repeat(8)
@@ -203,7 +210,10 @@ export const POST = withRouteHandler(
203210

204211
const newKey = await db.transaction(async (tx) => {
205212
await tx.execute(
206-
sql`SELECT pg_advisory_xact_lock(hashtext(${`byok:${workspaceId}:${providerId}`}))`
213+
sql`SELECT set_config('lock_timeout', ${`${WORKSPACE_BYOK_LOCK_TIMEOUT_MS}ms`}, true)`
214+
)
215+
await tx.execute(
216+
sql`SELECT pg_advisory_xact_lock(hashtextextended(${`byok:${workspaceId}:${providerId}`}, 0))`
207217
)
208218

209219
const [{ keyCount }] = await tx

apps/sim/app/api/workspaces/[id]/environment/route.ts

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,13 @@ import {
3333

3434
const logger = createLogger('WorkspaceEnvironmentAPI')
3535

36+
/**
37+
* Bounds the workspace-environment advisory-lock wait so a stuck holder fails
38+
* fast (SQLSTATE 55P03) rather than hanging, even if the deployment lacks a
39+
* server-side `lock_timeout`. Transaction-scoped via `set_config(..., true)`.
40+
*/
41+
const WORKSPACE_ENV_LOCK_TIMEOUT_MS = 5_000
42+
3643
/**
3744
* Restricts decrypted workspace env values to administrators. Members (including
3845
* read-only) receive the variable names with empty values so editor autocomplete
@@ -200,7 +207,10 @@ export const PUT = withRouteHandler(
200207
).then((entries) => Object.fromEntries(entries))
201208

202209
const { existingEncrypted, merged } = await db.transaction(async (tx) => {
203-
await tx.execute(sql`SELECT pg_advisory_xact_lock(hashtext(${workspaceId}))`)
210+
await tx.execute(
211+
sql`SELECT set_config('lock_timeout', ${`${WORKSPACE_ENV_LOCK_TIMEOUT_MS}ms`}, true)`
212+
)
213+
await tx.execute(sql`SELECT pg_advisory_xact_lock(hashtextextended(${workspaceId}, 0))`)
204214

205215
const [existingRow] = await tx
206216
.select()
@@ -328,7 +338,10 @@ export const DELETE = withRouteHandler(
328338
}
329339

330340
const result = await db.transaction(async (tx) => {
331-
await tx.execute(sql`SELECT pg_advisory_xact_lock(hashtext(${workspaceId}))`)
341+
await tx.execute(
342+
sql`SELECT set_config('lock_timeout', ${`${WORKSPACE_ENV_LOCK_TIMEOUT_MS}ms`}, true)`
343+
)
344+
await tx.execute(sql`SELECT pg_advisory_xact_lock(hashtextextended(${workspaceId}, 0))`)
332345

333346
const [existingRow] = await tx
334347
.select()
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/**
2+
* @vitest-environment node
3+
*
4+
* Lock-order regression guard: the paid-org join billing transaction must lock
5+
* the personal Pro subscription BEFORE userStats, matching
6+
* restoreUserProSubscription's subscription → userStats order. Snapshotting
7+
* userStats before locking the subscription inverts that pair and deadlocks a
8+
* concurrent Pro restore for the same user.
9+
*/
10+
import { subscription as subscriptionTable, userStats } from '@sim/db/schema'
11+
import { dbChainMock, dbChainMockFns, resetDbChainMock } from '@sim/testing'
12+
import { beforeEach, describe, expect, it, vi } from 'vitest'
13+
import { reapplyPaidOrgJoinBillingForExistingMember } from '@/lib/billing/organizations/membership'
14+
15+
vi.mock('@sim/db', () => dbChainMock)
16+
17+
vi.mock('@/lib/core/outbox/service', () => ({
18+
enqueueOutboxEvent: vi.fn(),
19+
}))
20+
21+
/**
22+
* A superset row that satisfies every read in the join path: a paid org sub, a
23+
* still-active personal Pro to pause, non-zero usage to snapshot, and zero
24+
* storage (so the conditional org storage-transfer write is skipped — the org
25+
* lock under test is the canonical pre-lock, not the storage update).
26+
*/
27+
const GENERIC_ROW = {
28+
id: 'sub-1',
29+
plan: 'team',
30+
referenceId: 'user-1',
31+
status: 'active',
32+
cancelAtPeriodEnd: false,
33+
stripeSubscriptionId: 'stripe-1',
34+
currentPeriodCost: '5',
35+
proPeriodCostSnapshot: '0',
36+
storageUsedBytes: 0,
37+
}
38+
39+
type LockOp = { op: 'lock' | 'update' | 'insert'; table: unknown }
40+
41+
function createRecordingTx() {
42+
const ops: LockOp[] = []
43+
const select = () => {
44+
const ctx: { table: unknown } = { table: undefined }
45+
const chain = {
46+
from: (table: unknown) => {
47+
ctx.table = table
48+
return chain
49+
},
50+
where: () => chain,
51+
for: () => {
52+
ops.push({ op: 'lock', table: ctx.table })
53+
return chain
54+
},
55+
limit: async () => [GENERIC_ROW],
56+
}
57+
return chain
58+
}
59+
const tx = {
60+
select,
61+
update: (table: unknown) => ({
62+
set: () => ({
63+
where: async () => {
64+
ops.push({ op: 'update', table })
65+
},
66+
}),
67+
}),
68+
insert: (table: unknown) => ({
69+
values: async () => {
70+
ops.push({ op: 'insert', table })
71+
},
72+
}),
73+
execute: async () => [],
74+
}
75+
return { tx, ops }
76+
}
77+
78+
describe('paid-org join billing lock ordering', () => {
79+
beforeEach(() => {
80+
vi.clearAllMocks()
81+
resetDbChainMock()
82+
})
83+
84+
it('locks the personal subscription before mutating userStats', async () => {
85+
const { tx, ops } = createRecordingTx()
86+
dbChainMockFns.transaction.mockImplementation(async (cb: (t: unknown) => unknown) => cb(tx))
87+
88+
await reapplyPaidOrgJoinBillingForExistingMember('user-1', 'org-1')
89+
90+
const firstUserStatsUpdate = ops.findIndex((o) => o.op === 'update' && o.table === userStats)
91+
const subscriptionLock = ops.findIndex((o) => o.op === 'lock' && o.table === subscriptionTable)
92+
93+
expect(firstUserStatsUpdate).toBeGreaterThanOrEqual(0)
94+
expect(subscriptionLock).toBeGreaterThanOrEqual(0)
95+
expect(subscriptionLock).toBeLessThan(firstUserStatsUpdate)
96+
})
97+
})

apps/sim/lib/billing/organizations/membership.ts

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -676,6 +676,17 @@ async function applyPaidOrgJoinBillingTx(
676676
.limit(1)
677677

678678
if (personalPro && !personalPro.cancelAtPeriodEnd) {
679+
// Lock the personal Pro subscription before userStats (snapshotted below) so
680+
// this matches restoreUserProSubscription's subscription → userStats order
681+
// and cannot deadlock against a concurrent Pro restore for the same user.
682+
// The cancel update further down re-locks this row (no-op).
683+
await tx
684+
.select({ id: subscriptionTable.id })
685+
.from(subscriptionTable)
686+
.where(eq(subscriptionTable.id, personalPro.id))
687+
.for('update')
688+
.limit(1)
689+
679690
const [userStatsRow] = await tx
680691
.select({ currentPeriodCost: userStats.currentPeriodCost })
681692
.from(userStats)
@@ -1025,13 +1036,6 @@ export async function removeUserFromOrganization(
10251036
const captureDepartedUsage = async () => {
10261037
if (skipBillingLogic) return 0
10271038

1028-
await tx
1029-
.select({ id: organization.id })
1030-
.from(organization)
1031-
.where(eq(organization.id, organizationId))
1032-
.for('update')
1033-
.limit(1)
1034-
10351039
const [departingUserStats] = await tx
10361040
.select({ currentPeriodCost: userStats.currentPeriodCost })
10371041
.from(userStats)

apps/sim/lib/billing/webhooks/invoices.test.ts

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -230,11 +230,12 @@ describe('invoice billing recovery', () => {
230230
expect(mockBlockOrgMembers).not.toHaveBeenCalled()
231231
})
232232

233-
it('coordinates org usage reset with owner tracker and organization locks', async () => {
234-
queueSelectResponse({ limitResult: [{ userId: 'owner-1' }] })
235-
queueSelectResponse({ limitResult: [{ userId: 'owner-1' }] })
236-
queueSelectResponse({ limitResult: [{ id: 'org-1' }] })
237-
queueSelectResponse({ whereResult: [{ userId: 'owner-1' }, { userId: 'member-1' }] })
233+
it('locks member userStats before the organization row during usage reset', async () => {
234+
queueSelectResponse({ limitResult: [{ userId: 'owner-1' }] }) // owner member row
235+
queueSelectResponse({ limitResult: [{ userId: 'owner-1' }] }) // owner userStats
236+
queueSelectResponse({ whereResult: [{ userId: 'owner-1' }, { userId: 'member-1' }] }) // member ids
237+
queueSelectResponse({ whereResult: [] }) // all-member userStats FOR UPDATE (pre-org lock)
238+
queueSelectResponse({ limitResult: [{ id: 'org-1' }] }) // organization
238239
queueSelectResponse({
239240
whereResult: [
240241
{ userId: 'owner-1', current: '125', currentCopilot: '10' },
@@ -248,9 +249,17 @@ describe('invoice billing recovery', () => {
248249

249250
expect(dbChainMockFns.transaction).toHaveBeenCalledTimes(1)
250251
expect(dbChainMockFns.update).toHaveBeenCalledTimes(2)
251-
expect(Object.keys(dbChainMockFns.select.mock.calls[0][0] ?? {})).toEqual(['userId'])
252-
expect(Object.keys(dbChainMockFns.select.mock.calls[1][0] ?? {})).toEqual(['userId'])
253-
expect(Object.keys(dbChainMockFns.select.mock.calls[2][0] ?? {})).toEqual(['id'])
252+
253+
const whereArgs = dbChainMockFns.where.mock.calls.map(
254+
(call) => call[0] as { type?: string; column?: string; left?: string }
255+
)
256+
const allMemberStatsLockIndex = whereArgs.findIndex(
257+
(arg) => arg?.type === 'inArray' && arg?.column === 'userId'
258+
)
259+
const orgLockIndex = whereArgs.findIndex((arg) => arg?.type === 'eq' && arg?.left === 'id')
260+
expect(allMemberStatsLockIndex).toBeGreaterThanOrEqual(0)
261+
expect(orgLockIndex).toBeGreaterThanOrEqual(0)
262+
expect(allMemberStatsLockIndex).toBeLessThan(orgLockIndex)
254263

255264
const statsReset = dbChainMockFns.set.mock.calls[0][0] as Record<string, unknown>
256265
expect(statsReset.currentPeriodCost).not.toBe('0')

apps/sim/lib/billing/webhooks/invoices.ts

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -457,19 +457,32 @@ export async function resetUsageForSubscription(sub: {
457457
.limit(1)
458458
}
459459

460-
await tx
461-
.select({ id: organization.id })
462-
.from(organization)
463-
.where(eq(organization.id, sub.referenceId))
464-
.for('update')
465-
.limit(1)
466-
467460
const membersRows = await tx
468461
.select({ userId: member.userId })
469462
.from(member)
470463
.where(eq(member.organizationId, sub.referenceId))
471464

472465
const memberIds = membersRows.map((row) => row.userId)
466+
467+
// Lock every member's userStats before the organization row so this path
468+
// follows the canonical userStats → organization order shared by the
469+
// join, remove, threshold-billing, and storage-transfer paths. Locking
470+
// organization first would invert against them and risk an AB-BA
471+
// deadlock. The per-member UPDATE below re-locks these rows (no-op).
472+
if (memberIds.length > 0) {
473+
await tx
474+
.select({ userId: userStats.userId })
475+
.from(userStats)
476+
.where(inArray(userStats.userId, memberIds))
477+
.for('update')
478+
}
479+
480+
await tx
481+
.select({ id: organization.id })
482+
.from(organization)
483+
.where(eq(organization.id, sub.referenceId))
484+
.for('update')
485+
.limit(1)
473486
if (memberIds.length > 0) {
474487
const memberStatsRows = await tx
475488
.select({
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/**
2+
* @vitest-environment node
3+
*
4+
* Lock-order regression guard: `updateDocument` must lock the document's
5+
* embedding rows BEFORE the document row when cascading tag updates, matching
6+
* the embedding → document order every chunk-mutation path uses
7+
* (chunks/service.ts). The opposite order deadlocks a document tag edit against
8+
* a concurrent chunk edit of the same document.
9+
*/
10+
import { document, embedding } from '@sim/db/schema'
11+
import { dbChainMock, dbChainMockFns, resetDbChainMock } from '@sim/testing'
12+
import { beforeEach, describe, expect, it, vi } from 'vitest'
13+
import { updateDocument } from '@/lib/knowledge/documents/service'
14+
15+
vi.mock('@sim/db', () => dbChainMock)
16+
17+
/** invocationCallOrder of the first `tx.update(table)` call. */
18+
function updateOrderForTable(table: unknown): number {
19+
const { calls, invocationCallOrder } = dbChainMockFns.update.mock
20+
for (let i = 0; i < calls.length; i++) {
21+
if (calls[i][0] === table) return invocationCallOrder[i]
22+
}
23+
return -1
24+
}
25+
26+
describe('updateDocument lock ordering', () => {
27+
beforeEach(() => {
28+
vi.clearAllMocks()
29+
resetDbChainMock()
30+
// Post-transaction re-read of the updated document must return a row.
31+
dbChainMockFns.limit.mockResolvedValue([{ id: 'doc-1', knowledgeBaseId: 'kb-1' }])
32+
})
33+
34+
it('updates embeddings before the document row when cascading tag changes', async () => {
35+
await updateDocument('doc-1', { tag1: 'priority' }, 'req-1')
36+
37+
const embeddingWriteOrder = updateOrderForTable(embedding)
38+
const documentWriteOrder = updateOrderForTable(document)
39+
40+
expect(embeddingWriteOrder).toBeGreaterThan(0)
41+
expect(documentWriteOrder).toBeGreaterThan(0)
42+
expect(embeddingWriteOrder).toBeLessThan(documentWriteOrder)
43+
})
44+
})

apps/sim/lib/knowledge/documents/service.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1902,8 +1902,6 @@ export async function updateDocument(
19021902
})
19031903

19041904
await db.transaction(async (tx) => {
1905-
await tx.update(document).set(dbUpdateData).where(eq(document.id, documentId))
1906-
19071905
const hasTagUpdates = ALL_TAG_SLOTS.some((field) => typedUpdateData[field] !== undefined)
19081906

19091907
if (hasTagUpdates) {
@@ -1921,6 +1919,8 @@ export async function updateDocument(
19211919
.set(embeddingUpdateData)
19221920
.where(eq(embedding.documentId, documentId))
19231921
}
1922+
1923+
await tx.update(document).set(dbUpdateData).where(eq(document.id, documentId))
19241924
})
19251925

19261926
const updatedDocument = await db

0 commit comments

Comments
 (0)