Skip to content

Commit acde71b

Browse files
refactor(tables): extract maxOrderKey + thread import append key
- Extract maxOrderKey(executor, tableId) helper; replaces three identical max(order_key) selects (single/batch insert append + import). - Import: read the append anchor once up front and thread each batch's last key forward (nextImportStartOrderKey + afterOrderKey) instead of re-scanning max(order_key) per batch over a growing table — one scan per import, not one per 1k-row batch.
1 parent e13e673 commit acde71b

2 files changed

Lines changed: 45 additions & 21 deletions

File tree

apps/sim/lib/table/import-runner.ts

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import {
2323
getTableById,
2424
markImportFailed,
2525
markImportReady,
26+
nextImportStartOrderKey,
2627
nextImportStartPosition,
2728
setTableSchemaForImport,
2829
updateImportProgress,
@@ -89,8 +90,10 @@ export async function runTableImport(payload: TableImportPayload): Promise<void>
8990
source = await downloadFileStream({ key: fileKey, context: 'workspace' })
9091

9192
// Append must continue after the existing rows; create/replace start empty. Read once up
92-
// front (the import is the table's sole writer) and assign contiguous positions from it.
93+
// front (the import is the table's sole writer) and assign contiguous positions / threaded
94+
// order keys from it.
9395
const basePosition = mode === 'append' ? await nextImportStartPosition(tableId) : 0
96+
let lastOrderKey = mode === 'append' ? await nextImportStartOrderKey(tableId) : null
9497

9598
// Count bytes as they flow so the row total can be extrapolated from byte progress.
9699
let bytesRead = 0
@@ -182,11 +185,20 @@ export async function runTableImport(payload: TableImportPayload): Promise<void>
182185
const owns = await updateImportProgress(tableId, inserted, importId)
183186
if (!owns) throw new ImportSupersededError()
184187
const coerced = coerceRowsForTable(rows, schema, headerToColumn)
185-
inserted += await bulkInsertImportBatch(
186-
{ tableId, workspaceId, userId, rows: coerced, startPosition: basePosition + inserted },
188+
const result = await bulkInsertImportBatch(
189+
{
190+
tableId,
191+
workspaceId,
192+
userId,
193+
rows: coerced,
194+
startPosition: basePosition + inserted,
195+
afterOrderKey: lastOrderKey,
196+
},
187197
{ ...table, schema },
188198
requestId
189199
)
200+
inserted += result.inserted
201+
lastOrderKey = result.lastOrderKey
190202
// Emit after the first batch, then every interval, so the bar appears early without flooding.
191203
if (
192204
inserted - lastReported >= PROGRESS_INTERVAL_ROWS ||

apps/sim/lib/table/service.ts

Lines changed: 30 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,15 @@ export async function nextImportStartPosition(tableId: string): Promise<number>
177177
return maxPos + 1
178178
}
179179

180+
/**
181+
* Append anchor `order_key` for an import — `max(order_key)`, or null when empty. Read once,
182+
* unlocked, before streaming (the import worker is the table's sole writer); each batch threads
183+
* the previous batch's last key forward so no per-batch max scan is needed.
184+
*/
185+
export async function nextImportStartOrderKey(tableId: string): Promise<string | null> {
186+
return maxOrderKey(db, tableId)
187+
}
188+
180189
const TIMEOUT_CAP_MS = 10 * 60_000
181190

182191
/**
@@ -1030,6 +1039,15 @@ async function nextRowPosition(trx: DbTransaction, tableId: string): Promise<num
10301039
return maxPos + 1
10311040
}
10321041

1042+
/** Largest `order_key` for a table, or `null` when empty — the append anchor for new keys. */
1043+
async function maxOrderKey(executor: DbOrTx, tableId: string): Promise<string | null> {
1044+
const [{ maxKey }] = await executor
1045+
.select({ maxKey: sql<string | null>`max(${userTableRows.orderKey})` })
1046+
.from(userTableRows)
1047+
.where(eq(userTableRows.tableId, tableId))
1048+
return maxKey ?? null
1049+
}
1050+
10331051
/** Shifts every row at or after `position` up by one (`position + 1`). */
10341052
async function shiftRowsUpFrom(trx: DbTransaction, tableId: string, position: number) {
10351053
await trx
@@ -1208,11 +1226,7 @@ async function resolveInsertOrderKey(
12081226
return r?.orderKey ?? null
12091227
}
12101228
if (requestedPosition === undefined) {
1211-
const [{ maxKey }] = await trx
1212-
.select({ maxKey: sql<string | null>`max(${userTableRows.orderKey})` })
1213-
.from(userTableRows)
1214-
.where(eq(userTableRows.tableId, tableId))
1215-
return keyBetween(maxKey ?? null, null)
1229+
return keyBetween(await maxOrderKey(trx, tableId), null)
12161230
}
12171231
const lo = await orderKeyAtSlot(requestedPosition - 1)
12181232
const hi = await orderKeyAtSlot(requestedPosition)
@@ -1301,11 +1315,7 @@ async function resolveBatchInsertOrderKeys(
13011315
positions?: number[]
13021316
): Promise<string[]> {
13031317
if (!positions || positions.length === 0) {
1304-
const [{ maxKey }] = await trx
1305-
.select({ maxKey: sql<string | null>`max(${userTableRows.orderKey})` })
1306-
.from(userTableRows)
1307-
.where(eq(userTableRows.tableId, tableId))
1308-
return nKeysBetween(maxKey ?? null, null, count)
1318+
return nKeysBetween(await maxOrderKey(trx, tableId), null, count)
13091319
}
13101320
const keys: string[] = []
13111321
for (const pos of positions) {
@@ -1707,6 +1717,8 @@ export interface BulkImportBatch {
17071717
rows: RowData[]
17081718
/** Position of the first row in this batch; rows get contiguous positions from here. */
17091719
startPosition: number
1720+
/** Previous batch's last `order_key` (the append anchor); null for the first batch / empty table. */
1721+
afterOrderKey?: string | null
17101722
}
17111723

17121724
/**
@@ -1727,7 +1739,7 @@ export async function bulkInsertImportBatch(
17271739
data: BulkImportBatch,
17281740
table: TableDefinition,
17291741
requestId: string
1730-
): Promise<number> {
1742+
): Promise<{ inserted: number; lastOrderKey: string | null }> {
17311743
for (let i = 0; i < data.rows.length; i++) {
17321744
const sizeValidation = validateRowSize(data.rows[i])
17331745
if (!sizeValidation.valid) {
@@ -1755,12 +1767,9 @@ export async function bulkInsertImportBatch(
17551767
}
17561768

17571769
const now = new Date()
1758-
// Import worker is the table's sole writer; append keys after the current max.
1759-
const [{ maxKey }] = await db
1760-
.select({ maxKey: sql<string | null>`max(${userTableRows.orderKey})` })
1761-
.from(userTableRows)
1762-
.where(eq(userTableRows.tableId, data.tableId))
1763-
const orderKeys = nKeysBetween(maxKey ?? null, null, data.rows.length)
1770+
// Import worker is the table's sole writer; append keys after the anchor the caller threads
1771+
// from the previous batch's last key — no per-batch max(order_key) scan over a growing table.
1772+
const orderKeys = nKeysBetween(data.afterOrderKey ?? null, null, data.rows.length)
17641773
const rowsToInsert = data.rows.map((rowData, i) => ({
17651774
id: `row_${generateId().replace(/-/g, '')}`,
17661775
tableId: data.tableId,
@@ -1775,7 +1784,10 @@ export async function bulkInsertImportBatch(
17751784

17761785
await db.insert(userTableRows).values(rowsToInsert)
17771786
logger.info(`[${requestId}] Bulk-imported ${rowsToInsert.length} rows into table ${data.tableId}`)
1778-
return rowsToInsert.length
1787+
return {
1788+
inserted: rowsToInsert.length,
1789+
lastOrderKey: orderKeys[orderKeys.length - 1] ?? data.afterOrderKey ?? null,
1790+
}
17791791
}
17801792

17811793
/** Deletes every row of a table (set-based; the statement-level trigger zeroes `row_count`). */

0 commit comments

Comments
 (0)