From c40e2a30756964321a7c0127e0ef5c7ca8e81cda Mon Sep 17 00:00:00 2001 From: trongtruong110-ux <262946671+trongtruong110-ux@users.noreply.github.com> Date: Wed, 20 May 2026 05:54:00 +0000 Subject: [PATCH] fix(export): stream large database dumps with keyset pagination (#59) Co-Authored-By: Claude Opus 4.7 --- src/export/dump.test.ts | 171 +++++++++++++++++--------- src/export/dump.ts | 259 ++++++++++++++++++++++++++++++++++------ 2 files changed, 339 insertions(+), 91 deletions(-) diff --git a/src/export/dump.test.ts b/src/export/dump.test.ts index ca65b43..59c6e5b 100644 --- a/src/export/dump.test.ts +++ b/src/export/dump.test.ts @@ -1,7 +1,6 @@ import { describe, it, expect, vi, beforeEach } from 'vitest' import { dumpDatabaseRoute } from './dump' import { executeOperation } from '.' -import { createResponse } from '../utils' import type { DataSource } from '../types' import type { StarbaseDBConfiguration } from '../handler' @@ -19,6 +18,10 @@ vi.mock('../utils', () => ({ ), })) +// Internal alias dump.ts attaches to keyset-paginated rows. Mock row pages +// include it so the cursor logic is exercised the same way as in production. +const ROWID = '__starbasedb_export_rowid__' + let mockDataSource: DataSource let mockConfig: StarbaseDBConfiguration @@ -26,8 +29,7 @@ beforeEach(() => { vi.clearAllMocks() mockDataSource = { - source: 'external', - external: { dialect: 'sqlite' }, + source: 'internal', rpc: { executeQuery: vi.fn() }, } as any @@ -39,22 +41,22 @@ beforeEach(() => { }) describe('Database Dump Module', () => { - it('should return a database dump when tables exist', async () => { + it('streams a dump with header, schema and quoted INSERTs', async () => { vi.mocked(executeOperation) .mockResolvedValueOnce([{ name: 'users' }, { name: 'orders' }]) .mockResolvedValueOnce([ - { sql: 'CREATE TABLE users (id INTEGER, name TEXT);' }, + { sql: 'CREATE TABLE users (id INTEGER, name TEXT)' }, ]) .mockResolvedValueOnce([ - { id: 1, name: 'Alice' }, - { id: 2, name: 'Bob' }, + { id: 1, name: 'Alice', [ROWID]: 1 }, + { id: 2, name: 'Bob', [ROWID]: 2 }, ]) .mockResolvedValueOnce([ - { sql: 'CREATE TABLE orders (id INTEGER, total REAL);' }, + { sql: 'CREATE TABLE orders (id INTEGER, total REAL)' }, ]) .mockResolvedValueOnce([ - { id: 1, total: 99.99 }, - { id: 2, total: 49.5 }, + { id: 1, total: 99.99, [ROWID]: 1 }, + { id: 2, total: 49.5, [ROWID]: 2 }, ]) const response = await dumpDatabaseRoute(mockDataSource, mockConfig) @@ -67,71 +69,132 @@ describe('Database Dump Module', () => { 'attachment; filename="database_dump.sql"' ) - const dumpText = await response.text() - expect(dumpText).toContain( - 'CREATE TABLE users (id INTEGER, name TEXT);' + const dump = await response.text() + expect(dump.startsWith('SQLite format 3\0')).toBe(true) + expect(dump).toContain('CREATE TABLE users (id INTEGER, name TEXT);') + expect(dump).toContain(`INSERT INTO "users" VALUES (1, 'Alice');`) + expect(dump).toContain(`INSERT INTO "users" VALUES (2, 'Bob');`) + expect(dump).toContain('CREATE TABLE orders (id INTEGER, total REAL);') + expect(dump).toContain('INSERT INTO "orders" VALUES (1, 99.99);') + expect(dump).toContain('INSERT INTO "orders" VALUES (2, 49.5);') + // the internal rowid cursor must never leak into the dump + expect(dump).not.toContain(ROWID) + }) + + it('paginates large tables with keyset pagination (no OFFSET)', async () => { + const pageSize = 2 + vi.mocked(executeOperation) + .mockResolvedValueOnce([{ name: 'items' }]) + .mockResolvedValueOnce([{ sql: 'CREATE TABLE items (id INTEGER)' }]) + // a full page -> another fetch must follow + .mockResolvedValueOnce([ + { id: 10, [ROWID]: 10 }, + { id: 20, [ROWID]: 20 }, + ]) + // a partial page -> last page + .mockResolvedValueOnce([{ id: 30, [ROWID]: 30 }]) + + const response = await dumpDatabaseRoute( + mockDataSource, + mockConfig, + pageSize ) - expect(dumpText).toContain("INSERT INTO users VALUES (1, 'Alice');") - expect(dumpText).toContain("INSERT INTO users VALUES (2, 'Bob');") - expect(dumpText).toContain( - 'CREATE TABLE orders (id INTEGER, total REAL);' + const dump = await response.text() + + expect(dump).toContain('INSERT INTO "items" VALUES (10);') + expect(dump).toContain('INSERT INTO "items" VALUES (20);') + expect(dump).toContain('INSERT INTO "items" VALUES (30);') + + // 1 (tables) + 1 (schema) + 2 (row pages) = 4 calls + expect(executeOperation).toHaveBeenCalledTimes(4) + + // the second row page must continue via a keyset cursor, not OFFSET + const secondPage = vi.mocked(executeOperation).mock.calls[3][0][0] + expect(secondPage.sql).toContain('_rowid_ > ?') + expect(secondPage.sql).not.toContain('OFFSET') + expect(secondPage.params).toEqual([20, pageSize]) + }) + + it('encodes NULL, numbers, booleans, blobs and escapes quotes', async () => { + vi.mocked(executeOperation) + .mockResolvedValueOnce([{ name: 't' }]) + .mockResolvedValueOnce([{ sql: 'CREATE TABLE t (a, b, c, d, e)' }]) + .mockResolvedValueOnce([ + { + a: null, + b: 42, + c: true, + d: "O'Brien", + e: new Uint8Array([0xde, 0xad]).buffer, + [ROWID]: 1, + }, + ]) + + const dump = await ( + await dumpDatabaseRoute(mockDataSource, mockConfig) + ).text() + + expect(dump).toContain( + `INSERT INTO "t" VALUES (NULL, 42, 1, 'O''Brien', X'dead');` ) - expect(dumpText).toContain('INSERT INTO orders VALUES (1, 99.99);') - expect(dumpText).toContain('INSERT INTO orders VALUES (2, 49.5);') }) - it('should handle empty databases (no tables)', async () => { + it('handles an empty database (header only)', async () => { vi.mocked(executeOperation).mockResolvedValueOnce([]) - const response = await dumpDatabaseRoute(mockDataSource, mockConfig) + const dump = await ( + await dumpDatabaseRoute(mockDataSource, mockConfig) + ).text() - expect(response).toBeInstanceOf(Response) - expect(response.headers.get('Content-Type')).toBe( - 'application/x-sqlite3' - ) - const dumpText = await response.text() - expect(dumpText).toBe('SQLite format 3\0') + expect(dump).toBe('SQLite format 3\0') }) - it('should handle databases with tables but no data', async () => { + it('handles a table with no rows', async () => { vi.mocked(executeOperation) .mockResolvedValueOnce([{ name: 'users' }]) - .mockResolvedValueOnce([ - { sql: 'CREATE TABLE users (id INTEGER, name TEXT);' }, - ]) + .mockResolvedValueOnce([{ sql: 'CREATE TABLE users (id INTEGER)' }]) .mockResolvedValueOnce([]) - const response = await dumpDatabaseRoute(mockDataSource, mockConfig) + const dump = await ( + await dumpDatabaseRoute(mockDataSource, mockConfig) + ).text() - expect(response).toBeInstanceOf(Response) - const dumpText = await response.text() - expect(dumpText).toContain( - 'CREATE TABLE users (id INTEGER, name TEXT);' - ) - expect(dumpText).not.toContain('INSERT INTO users VALUES') + expect(dump).toContain('CREATE TABLE users (id INTEGER);') + expect(dump).not.toContain('INSERT INTO') }) - it('should escape single quotes properly in string values', async () => { + it('falls back to OFFSET pagination for WITHOUT ROWID tables', async () => { + const pageSize = 2 vi.mocked(executeOperation) - .mockResolvedValueOnce([{ name: 'users' }]) + .mockResolvedValueOnce([{ name: 'kv' }]) .mockResolvedValueOnce([ - { sql: 'CREATE TABLE users (id INTEGER, bio TEXT);' }, + { + sql: 'CREATE TABLE kv (k TEXT PRIMARY KEY, v TEXT) WITHOUT ROWID', + }, ]) - .mockResolvedValueOnce([{ id: 1, bio: "Alice's adventure" }]) - - const response = await dumpDatabaseRoute(mockDataSource, mockConfig) + .mockResolvedValueOnce([ + { k: 'a', v: '1' }, + { k: 'b', v: '2' }, + ]) + .mockResolvedValueOnce([{ k: 'c', v: '3' }]) - expect(response).toBeInstanceOf(Response) - const dumpText = await response.text() - expect(dumpText).toContain( - "INSERT INTO users VALUES (1, 'Alice''s adventure');" + const response = await dumpDatabaseRoute( + mockDataSource, + mockConfig, + pageSize ) + const dump = await response.text() + + expect(dump).toContain(`INSERT INTO "kv" VALUES ('a', '1');`) + expect(dump).toContain(`INSERT INTO "kv" VALUES ('c', '3');`) + + const firstPage = vi.mocked(executeOperation).mock.calls[2][0][0] + expect(firstPage.sql).toContain('OFFSET') + expect(firstPage.sql).not.toContain('_rowid_') }) - it('should return a 500 response when an error occurs', async () => { - const consoleErrorMock = vi - .spyOn(console, 'error') - .mockImplementation(() => {}) + it('returns 500 when the database cannot be read', async () => { + vi.spyOn(console, 'error').mockImplementation(() => {}) vi.mocked(executeOperation).mockRejectedValue( new Error('Database Error') ) @@ -139,7 +202,7 @@ describe('Database Dump Module', () => { const response = await dumpDatabaseRoute(mockDataSource, mockConfig) expect(response.status).toBe(500) - const jsonResponse: { error: string } = await response.json() - expect(jsonResponse.error).toBe('Failed to create database dump') + const body = (await response.json()) as { error: string } + expect(body.error).toBe('Failed to create database dump') }) }) diff --git a/src/export/dump.ts b/src/export/dump.ts index 91a2e89..5a307fd 100644 --- a/src/export/dump.ts +++ b/src/export/dump.ts @@ -3,67 +3,252 @@ import { StarbaseDBConfiguration } from '../handler' import { DataSource } from '../types' import { createResponse } from '../utils' -export async function dumpDatabaseRoute( +// Rows fetched per page while streaming a table. Caps how much data is held +// in memory at once, independent of the table's total size. +const EXPORT_PAGE_SIZE = 1000 + +// SQLite database file magic string, emitted first for parity with the +// original implementation. +const SQLITE_HEADER = 'SQLite format 3\0' + +// Alias used to carry each row's rowid alongside its columns for keyset +// pagination. Deliberately unlikely to collide with a real column name. +const ROWID_ALIAS = '__starbasedb_export_rowid__' + +// Quote a SQL identifier so unusual table names and reserved words are safe. +function quoteIdentifier(name: string): string { + return `"${name.replace(/"/g, '""')}"` +} + +// Render a value returned by the driver as a SQL literal for an INSERT. +function toSqlLiteral(value: unknown): string { + if (value === null || value === undefined) { + return 'NULL' + } + if (typeof value === 'number' || typeof value === 'bigint') { + return String(value) + } + if (typeof value === 'boolean') { + return value ? '1' : '0' + } + if (value instanceof ArrayBuffer) { + const hex = Array.from(new Uint8Array(value)) + .map((b) => b.toString(16).padStart(2, '0')) + .join('') + return `X'${hex}'` + } + return `'${String(value).replace(/'/g, "''")}'` +} + +// List every user table in the database. +async function listTables( dataSource: DataSource, config: StarbaseDBConfiguration -): Promise { - try { - // Get all table names - const tablesResult = await executeOperation( - [{ sql: "SELECT name FROM sqlite_master WHERE type='table';" }], - dataSource, - config - ) +): Promise { + const rows = await executeOperation( + [ + { + sql: "SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%' ORDER BY name;", + }, + ], + dataSource, + config + ) - const tables = tablesResult.map((row: any) => row.name) - let dumpContent = 'SQLite format 3\0' // SQLite file header + return rows.map((row: any) => row.name as string) +} - // Iterate through all tables - for (const table of tables) { - // Get table schema - const schemaResult = await executeOperation( - [ - { - sql: `SELECT sql FROM sqlite_master WHERE type='table' AND name='${table}';`, - }, - ], +// Yield a table's rows one bounded page at a time. +// +// Ordinary tables use keyset pagination on the implicit `_rowid_` +// (`WHERE _rowid_ > ?`), which stays O(n) across the whole table. Plain +// `LIMIT/OFFSET` re-walks every skipped row on each page — O(n²) — which is +// what makes dumping a large database unusably slow today. +// +// `WITHOUT ROWID` tables have no `_rowid_`, so they fall back to `LIMIT/OFFSET`. +async function* streamTableRows( + table: string, + keyset: boolean, + pageSize: number, + dataSource: DataSource, + config: StarbaseDBConfiguration +): AsyncGenerator[]> { + const quoted = quoteIdentifier(table) + + if (keyset) { + let cursor: unknown = undefined + + while (true) { + const sql = + cursor === undefined + ? `SELECT *, _rowid_ AS ${ROWID_ALIAS} FROM ${quoted} ORDER BY _rowid_ LIMIT ?;` + : `SELECT *, _rowid_ AS ${ROWID_ALIAS} FROM ${quoted} WHERE _rowid_ > ? ORDER BY _rowid_ LIMIT ?;` + const params = + cursor === undefined ? [pageSize] : [cursor, pageSize] + + const rows = await executeOperation( + [{ sql, params }], dataSource, config ) + if (!rows.length) { + return + } + + const nextCursor = rows[rows.length - 1][ROWID_ALIAS] + for (const row of rows) { + delete row[ROWID_ALIAS] + } - if (schemaResult.length) { - const schema = schemaResult[0].sql - dumpContent += `\n-- Table: ${table}\n${schema};\n\n` + yield rows + + if (rows.length < pageSize || nextCursor === undefined) { + return } + cursor = nextCursor + } + } else { + let offset = 0 - // Get table data - const dataResult = await executeOperation( - [{ sql: `SELECT * FROM ${table};` }], + while (true) { + const rows = await executeOperation( + [ + { + sql: `SELECT * FROM ${quoted} LIMIT ? OFFSET ?;`, + params: [pageSize, offset], + }, + ], dataSource, config ) + if (!rows.length) { + return + } - for (const row of dataResult) { - const values = Object.values(row).map((value) => - typeof value === 'string' - ? `'${value.replace(/'/g, "''")}'` - : value - ) - dumpContent += `INSERT INTO ${table} VALUES (${values.join(', ')});\n` + yield rows + + if (rows.length < pageSize) { + return } + offset += rows.length + } + } +} + +// Produce the dump as a sequence of text chunks, fetching only one page of +// rows from the database at any given moment. +async function* generateDump( + tables: string[], + pageSize: number, + dataSource: DataSource, + config: StarbaseDBConfiguration +): AsyncGenerator { + yield SQLITE_HEADER + + for (const table of tables) { + const schemaRows = await executeOperation( + [ + { + sql: "SELECT sql FROM sqlite_master WHERE type='table' AND name=?;", + params: [table], + }, + ], + dataSource, + config + ) + + const schema: string | undefined = schemaRows[0]?.sql + if (!schema) { + continue + } + + yield `\n-- Table: ${table}\n${schema};\n\n` - dumpContent += '\n' + const quoted = quoteIdentifier(table) + const keyset = !/without\s+rowid/i.test(schema) + + for await (const rows of streamTableRows( + table, + keyset, + pageSize, + dataSource, + config + )) { + let chunk = '' + for (const row of rows) { + const values = Object.values(row).map(toSqlLiteral) + chunk += `INSERT INTO ${quoted} VALUES (${values.join(', ')});\n` + } + yield chunk } - // Create a Blob from the dump content - const blob = new Blob([dumpContent], { type: 'application/x-sqlite3' }) + yield '\n' + } +} + +// Adapt an async iterable of strings into a pull-based byte stream. Because +// the stream is `pull`-driven, the runtime only asks the generator for the +// next page once the previous chunk has been flushed downstream — this is +// what gives the dump real backpressure. +function toReadableStream( + chunks: AsyncGenerator +): ReadableStream { + const encoder = new TextEncoder() + + return new ReadableStream({ + async pull(controller) { + try { + const { done, value } = await chunks.next() + if (done) { + controller.close() + return + } + if (value) { + controller.enqueue(encoder.encode(value)) + } + } catch (error) { + controller.error(error) + } + }, + async cancel() { + await chunks.return(undefined) + }, + }) +} + +// Stream a full SQL dump of the database. +// +// The previous implementation concatenated every row of every table into a +// single in-memory string before responding, so a large database would +// exceed the isolate's memory limit and the request would fail outright. +// +// This implementation streams the dump instead: +// - rows are read one bounded page at a time using keyset pagination (O(n)), +// - each page is encoded, enqueued, and then released, +// - the response body is a `pull`-driven `ReadableStream`, so the runtime +// paces production to match how fast the client consumes it. +// +// Peak memory is therefore proportional to one page, not to the database size. +export async function dumpDatabaseRoute( + dataSource: DataSource, + config: StarbaseDBConfiguration, + pageSize: number = EXPORT_PAGE_SIZE +): Promise { + try { + // Resolve the table list eagerly so a connection or permission error + // surfaces as a clean 500 before the streaming response has started. + const tables = await listTables(dataSource, config) const headers = new Headers({ 'Content-Type': 'application/x-sqlite3', 'Content-Disposition': 'attachment; filename="database_dump.sql"', }) - return new Response(blob, { headers }) + const body = toReadableStream( + generateDump(tables, pageSize, dataSource, config) + ) + + return new Response(body, { headers }) } catch (error: any) { console.error('Database Dump Error:', error) return createResponse(undefined, 'Failed to create database dump', 500)