Skip to content

Commit 2c75a4a

Browse files
fix(tables): per-batch delete-job commits, real trigger.dev retries, post-index ANALYZE guard (#4997)
* fix(tables): per-batch delete-job commits, real trigger.dev retries, post-index ANALYZE guard * fix(tables): resume job progress across retries, rethrow root cause for clean failure messages
1 parent 53fdcab commit 2c75a4a

9 files changed

Lines changed: 16651 additions & 51 deletions

File tree

apps/sim/app/api/table/[tableId]/delete-async/route.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import { isTriggerDevEnabled } from '@/lib/core/config/feature-flags'
88
import { runDetached } from '@/lib/core/utils/background'
99
import { generateRequestId } from '@/lib/core/utils/request'
1010
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
11-
import { runTableDelete } from '@/lib/table/delete-runner'
11+
import { markTableDeleteFailed, runTableDelete } from '@/lib/table/delete-runner'
1212
import { markTableJobRunning, releaseJobClaim } from '@/lib/table/service'
1313
import type { TableDeleteJobPayload } from '@/lib/table/types'
1414
import { accessError, checkAccess, tableFilterError } from '@/app/api/table/utils'
@@ -110,6 +110,10 @@ export const POST = withRouteHandler(async (request: NextRequest, { params }: Ro
110110
filter,
111111
excludeRowIds,
112112
cutoff,
113+
}).catch(async (error) => {
114+
// No retry machinery on the detached path — fail the job immediately.
115+
await markTableDeleteFailed(tableId, jobId, error)
116+
throw error
113117
})
114118
)
115119
}

apps/sim/background/table-delete.ts

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
import { task } from '@trigger.dev/sdk'
2-
import { runTableDelete, type TableDeletePayload } from '@/lib/table/delete-runner'
2+
import {
3+
markTableDeleteFailed,
4+
runTableDelete,
5+
type TableDeletePayload,
6+
} from '@/lib/table/delete-runner'
37

48
/**
59
* `TableDeletePayload` with the cutoff as an ISO string — task payloads cross a JSON boundary, so
@@ -10,10 +14,12 @@ export interface TableDeleteTaskPayload extends Omit<TableDeletePayload, 'cutoff
1014
}
1115

1216
/**
13-
* Trigger.dev wrapper around `runTableDelete`. Retry-safe: the worker keysets by id with a
14-
* `created_at <= cutoff` floor and pages are committed independently, so a retried attempt simply
15-
* re-walks and deletes whatever remains. The `table_jobs` ownership gate stops a retried run that
16-
* lost the job (canceled / janitor-failed) within one page.
17+
* Trigger.dev wrapper around `runTableDelete`. Errors propagate out of `run` so the retry policy
18+
* actually fires; the job is marked failed only in `onFailure`, after the final attempt. Retry-
19+
* safe: the worker keysets by id with a `created_at <= cutoff` floor and batches are committed
20+
* independently, so a retried attempt simply re-walks and deletes whatever remains. The
21+
* `table_jobs` ownership gate stops a retried run that lost the job (canceled / janitor-failed)
22+
* within one page.
1723
*/
1824
export const tableDeleteTask = task({
1925
id: 'table-delete',
@@ -26,4 +32,7 @@ export const tableDeleteTask = task({
2632
run: async (payload: TableDeleteTaskPayload) => {
2733
await runTableDelete({ ...payload, cutoff: new Date(payload.cutoff) })
2834
},
35+
onFailure: async ({ payload, error }) => {
36+
await markTableDeleteFailed(payload.tableId, payload.jobId, error)
37+
},
2938
})

apps/sim/lib/table/constants.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,9 @@ export const TABLE_LIMITS = {
2626
MAX_BULK_OPERATION_SIZE: 1000,
2727
/** Maximum rows a single clipboard copy/cut serializes; beyond this the user is steered to Export. */
2828
MAX_COPY_ROWS: 50000,
29-
/** Rows selected + deleted per page in the async background delete-job loop. Each page is one
30-
* transaction (chunked into DELETE_BATCH_SIZE statements inside it); the page is also the
31-
* cancel/ownership-check granularity. */
29+
/** Rows selected + deleted per page in the async background delete-job loop. Each
30+
* DELETE_BATCH_SIZE chunk inside the page commits in its own transaction; the page is the
31+
* keyset-select and cancel/ownership-check granularity. */
3232
DELETE_PAGE_SIZE: 10000,
3333
/** Row count above which an export runs as a background job instead of a synchronous stream.
3434
* Matches the default per-table row cap, so non-enterprise tables keep instant downloads. */

apps/sim/lib/table/delete-runner.test.ts

Lines changed: 87 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { beforeEach, describe, expect, it, vi } from 'vitest'
55

66
const {
77
mockGetTableById,
8+
mockGetJobProgress,
89
mockSelectRowIdPage,
910
mockDeletePageByIds,
1011
mockUpdateJobProgress,
@@ -14,6 +15,7 @@ const {
1415
mockBuildFilterClause,
1516
} = vi.hoisted(() => ({
1617
mockGetTableById: vi.fn(),
18+
mockGetJobProgress: vi.fn(),
1719
mockSelectRowIdPage: vi.fn(),
1820
mockDeletePageByIds: vi.fn(),
1921
mockUpdateJobProgress: vi.fn(),
@@ -25,6 +27,7 @@ const {
2527

2628
vi.mock('@/lib/table/service', () => ({
2729
getTableById: mockGetTableById,
30+
getJobProgress: mockGetJobProgress,
2831
selectRowIdPage: mockSelectRowIdPage,
2932
deletePageByIds: mockDeletePageByIds,
3033
updateJobProgress: mockUpdateJobProgress,
@@ -38,7 +41,7 @@ vi.mock('@/lib/table/constants', () => ({
3841
USER_TABLE_ROWS_SQL_NAME: 'user_table_rows',
3942
}))
4043

41-
import { runTableDelete } from '@/lib/table/delete-runner'
44+
import { markTableDeleteFailed, runTableDelete } from '@/lib/table/delete-runner'
4245

4346
const table = { id: 'tbl_1', workspaceId: 'ws_1', schema: { columns: [] } }
4447
const cutoff = new Date('2026-06-05T00:00:00Z')
@@ -51,6 +54,7 @@ describe('runTableDelete', () => {
5154
beforeEach(() => {
5255
vi.clearAllMocks()
5356
mockGetTableById.mockResolvedValue(table)
57+
mockGetJobProgress.mockResolvedValue(0)
5458
mockUpdateJobProgress.mockResolvedValue(true)
5559
mockMarkJobReady.mockResolvedValue(true)
5660
mockMarkJobFailed.mockResolvedValue(undefined)
@@ -103,17 +107,57 @@ describe('runTableDelete', () => {
103107
)
104108
})
105109

106-
it('marks the job failed and emits a failed event on error', async () => {
110+
it('rethrows unexpected errors without failing the job (caller retries decide)', async () => {
107111
mockSelectRowIdPage.mockRejectedValue(new Error('boom'))
108112

113+
await expect(runTableDelete(basePayload())).rejects.toThrow('boom')
114+
115+
expect(mockMarkJobFailed).not.toHaveBeenCalled()
116+
expect(mockAppendTableEvent).not.toHaveBeenCalledWith(
117+
expect.objectContaining({ status: 'failed' })
118+
)
119+
})
120+
121+
it('returns quietly when superseded mid-run without failing the job', async () => {
122+
mockSelectRowIdPage.mockResolvedValue(['a', 'b'])
123+
mockUpdateJobProgress.mockResolvedValueOnce(true).mockResolvedValueOnce(false)
124+
125+
await expect(runTableDelete(basePayload())).resolves.toBeUndefined()
126+
127+
expect(mockMarkJobFailed).not.toHaveBeenCalled()
128+
})
129+
130+
it('rethrows the root cause so the clean message survives serialization', async () => {
131+
const cause = new Error('canceling statement due to statement timeout')
132+
mockSelectRowIdPage.mockRejectedValue(new Error('Failed query: delete ...', { cause }))
133+
134+
await expect(runTableDelete(basePayload())).rejects.toThrow(
135+
'canceling statement due to statement timeout'
136+
)
137+
})
138+
139+
it('resumes cumulative progress on retry instead of resetting to zero', async () => {
140+
mockGetJobProgress.mockResolvedValue(7)
141+
mockSelectRowIdPage.mockResolvedValueOnce(['a', 'b']).mockResolvedValueOnce([])
142+
109143
await runTableDelete(basePayload())
110144

111-
expect(mockMarkJobFailed).toHaveBeenCalledWith('tbl_1', 'job_1', 'boom')
145+
expect(mockUpdateJobProgress).toHaveBeenNthCalledWith(1, 'tbl_1', 7, 'job_1')
112146
expect(mockAppendTableEvent).toHaveBeenCalledWith(
113-
expect.objectContaining({ kind: 'job', type: 'delete', status: 'failed', error: 'boom' })
147+
expect.objectContaining({ status: 'ready', progress: 9 })
114148
)
115149
})
116150

151+
it('stops at the seed read when the job is no longer owned', async () => {
152+
mockGetJobProgress.mockResolvedValue(null)
153+
154+
await expect(runTableDelete(basePayload())).resolves.toBeUndefined()
155+
156+
expect(mockSelectRowIdPage).not.toHaveBeenCalled()
157+
expect(mockDeletePageByIds).not.toHaveBeenCalled()
158+
expect(mockMarkJobFailed).not.toHaveBeenCalled()
159+
})
160+
117161
it('passes the cutoff and filter clause through to the page query', async () => {
118162
mockSelectRowIdPage.mockResolvedValueOnce([])
119163

@@ -129,3 +173,42 @@ describe('runTableDelete', () => {
129173
)
130174
})
131175
})
176+
177+
describe('markTableDeleteFailed', () => {
178+
beforeEach(() => {
179+
vi.clearAllMocks()
180+
mockMarkJobFailed.mockResolvedValue(undefined)
181+
})
182+
183+
it('marks the job failed and emits the failed event', async () => {
184+
await markTableDeleteFailed('tbl_1', 'job_1', new Error('boom'))
185+
186+
expect(mockMarkJobFailed).toHaveBeenCalledWith('tbl_1', 'job_1', 'boom')
187+
expect(mockAppendTableEvent).toHaveBeenCalledWith(
188+
expect.objectContaining({ kind: 'job', type: 'delete', status: 'failed', error: 'boom' })
189+
)
190+
})
191+
192+
it('prefers the error cause over a verbose wrapper message', async () => {
193+
const cause = new Error('canceling statement due to statement timeout')
194+
const wrapper = new Error(`Failed query: delete from x where id in (${'$1,'.repeat(5000)})`, {
195+
cause,
196+
})
197+
198+
await markTableDeleteFailed('tbl_1', 'job_1', wrapper)
199+
200+
expect(mockMarkJobFailed).toHaveBeenCalledWith(
201+
'tbl_1',
202+
'job_1',
203+
'canceling statement due to statement timeout'
204+
)
205+
})
206+
207+
it('truncates oversized messages', async () => {
208+
await markTableDeleteFailed('tbl_1', 'job_1', new Error('x'.repeat(2000)))
209+
210+
const [, , message] = mockMarkJobFailed.mock.calls[0]
211+
expect(message).toHaveLength(503)
212+
expect(message.endsWith('...')).toBe(true)
213+
})
214+
})

apps/sim/lib/table/delete-runner.ts

Lines changed: 53 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
import { createLogger } from '@sim/logger'
2-
import { getErrorMessage } from '@sim/utils/errors'
2+
import { getErrorMessage, toError } from '@sim/utils/errors'
33
import { generateId } from '@sim/utils/id'
4+
import { truncate } from '@sim/utils/string'
45
import type { Filter } from '@/lib/table'
56
import { TABLE_LIMITS, USER_TABLE_ROWS_SQL_NAME } from '@/lib/table/constants'
67
import { appendTableEvent } from '@/lib/table/events'
78
import {
89
deletePageByIds,
10+
getJobProgress,
911
getTableById,
1012
markJobFailed,
1113
markJobReady,
@@ -38,12 +40,17 @@ export interface TableDeletePayload {
3840
}
3941

4042
/**
41-
* Background worker for large filtered row deletes. Runs detached on the web container (see the
42-
* delete-async kickoff route). Deletes in keyset-paginated pages — `created_at <= cutoff` spares
43-
* rows inserted while the job runs, and `excludeRowIds` spares specific rows (the
44-
* "select all then deselect a few" case). Ownership-gated per page so a cancel/supersede stops
45-
* it within one page; committed pages are never rolled back. Progress and the terminal state are
46-
* surfaced via the table-events SSE stream.
43+
* Background worker for large filtered row deletes (trigger.dev task, or detached on the web
44+
* container when trigger.dev is disabled — see the delete-async kickoff route). Deletes in
45+
* keyset-paginated pages — `created_at <= cutoff` spares rows inserted while the job runs, and
46+
* `excludeRowIds` spares specific rows (the "select all then deselect a few" case).
47+
* Ownership-gated per page so a cancel/supersede stops it within one page; committed batches are
48+
* never rolled back. Progress and the terminal state are surfaced via the table-events SSE
49+
* stream.
50+
*
51+
* Unexpected errors are rethrown so the caller's retry machinery sees them — the caller marks
52+
* the job failed via `markTableDeleteFailed` once it gives up. A superseded run (cancel, or a
53+
* newer job took the table) returns quietly.
4754
*/
4855
export async function runTableDelete(payload: TableDeletePayload): Promise<void> {
4956
const { jobId, tableId, workspaceId, filter, excludeRowIds, cutoff } = payload
@@ -58,8 +65,14 @@ export async function runTableDelete(payload: TableDeletePayload): Promise<void>
5865
: undefined
5966
const excluded = new Set(excludeRowIds ?? [])
6067

61-
let processed = 0
62-
let lastReported = 0
68+
// Resume the persisted count: a retried attempt's earlier batches are already committed,
69+
// so starting at zero would overwrite cumulative progress with this attempt's smaller
70+
// number. Doubles as the initial ownership gate.
71+
const resumed = await getJobProgress(tableId, jobId)
72+
if (resumed === null) throw new JobSupersededError()
73+
74+
let processed = resumed
75+
let lastReported = resumed
6376
let afterId: string | undefined
6477

6578
while (true) {
@@ -128,19 +141,37 @@ export async function runTableDelete(payload: TableDeletePayload): Promise<void>
128141
} catch (err) {
129142
if (err instanceof JobSupersededError) {
130143
logger.info(`[${requestId}] Delete superseded by a newer run; stopping`, { tableId, jobId })
131-
} else {
132-
const message = getErrorMessage(err, 'Delete failed')
133-
logger.error(`[${requestId}] Delete failed for table ${tableId}:`, err)
134-
// Scoped to jobId — a no-op if a newer job has taken over.
135-
await markJobFailed(tableId, jobId, message).catch(() => {})
136-
void appendTableEvent({
137-
kind: 'job',
138-
type: 'delete',
139-
tableId,
140-
jobId,
141-
status: 'failed',
142-
error: message,
143-
})
144+
return
144145
}
146+
// Rethrow the root cause, not the wrapper: drizzle query errors embed the full SQL + params
147+
// list (tens of KB for a batch delete) in `message`, and `cause` does not survive
148+
// trigger.dev's serialization between the failed `run` and `onFailure` — the clean message
149+
// must already be the thrown error's own `message`.
150+
const cause = toError(err).cause
151+
const error = cause ? toError(cause) : toError(err)
152+
logger.error(`[${requestId}] Delete failed for table ${tableId}:`, error)
153+
throw error
145154
}
146155
}
156+
157+
/**
158+
* Marks the delete job failed and emits the failed SSE event. Called once the caller gives up on
159+
* the run: the trigger.dev task's `onFailure` (after retries are exhausted) or the detached
160+
* web-container fallback (no retries). Scoped to jobId — a no-op if a newer job has taken over.
161+
*/
162+
export async function markTableDeleteFailed(
163+
tableId: string,
164+
jobId: string,
165+
error: unknown
166+
): Promise<void> {
167+
const message = truncate(getErrorMessage(toError(error).cause ?? error, 'Delete failed'), 500)
168+
await markJobFailed(tableId, jobId, message).catch(() => {})
169+
void appendTableEvent({
170+
kind: 'job',
171+
type: 'delete',
172+
tableId,
173+
jobId,
174+
status: 'failed',
175+
error: message,
176+
})
177+
}

0 commit comments

Comments
 (0)