Skip to content

Commit 272bad9

Browse files
authored
feat(realtime): preflight schema-compatibility check on startup (#4940)
* feat(realtime): preflight schema-compatibility check on startup The socket service authorizes every connection with a full-row query against the workflow table. When a deploy ships a realtime image whose compiled schema is ahead of/behind the live DB (e.g. a column dropped by a migration the image predates), that query fails on every request and silently breaks persistence — yet the process stays up and the shallow /health probe keeps returning 200, so the deploy looks healthy while serving nothing. Run one representative workflow query before listen(): a schema mismatch throws, propagates to the entrypoint, and the task exits non-zero and never goes healthy, so CodeDeploy auto-rolls-back instead of shifting traffic onto broken tasks. Schema-class errors (undefined column/table/function) fail fast; connection-class errors retry with backoff so a cold DB at boot does not flap. Runs once at startup, never on the per-probe LB health check, to avoid a DB blip mass- terminating the fleet (cascading failure). * fix(realtime): unwrap cause for schema codes, drop sleep after final attempt - isSchemaMismatch now walks the error.cause chain — drizzle wraps the driver error, so the SQLSTATE often lives on the inner cause, not the outer throw. Without this a wrapped 42703/42P01 was retried 5x and mis-reported as "database unreachable" instead of failing fast. - No longer sleeps after the final failed attempt (~6-10s of dead wait that undermined the fail-fast contract); sleep now only happens between attempts. - Tests: assert sleep is called exactly 4 times on exhaustion, and add a wrapped-cause fail-fast case.
1 parent b2a485e commit 272bad9

3 files changed

Lines changed: 207 additions & 0 deletions

File tree

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
import { beforeEach, describe, expect, it, vi } from 'vitest'
2+
3+
const { mockLimit } = vi.hoisted(() => ({
4+
mockLimit: vi.fn(),
5+
}))
6+
7+
vi.mock('@sim/db', () => ({
8+
db: {
9+
select: () => ({
10+
from: () => ({
11+
limit: mockLimit,
12+
}),
13+
}),
14+
},
15+
}))
16+
17+
vi.mock('@sim/db/schema', () => ({
18+
workflow: {},
19+
}))
20+
21+
vi.mock('@sim/logger', () => ({
22+
createLogger: () => ({
23+
info: vi.fn(),
24+
warn: vi.fn(),
25+
error: vi.fn(),
26+
debug: vi.fn(),
27+
}),
28+
}))
29+
30+
vi.mock('@sim/utils/helpers', () => ({
31+
sleep: vi.fn().mockResolvedValue(undefined),
32+
}))
33+
34+
import { sleep } from '@sim/utils/helpers'
35+
import { assertSchemaCompatibility } from '@/database/preflight'
36+
37+
/** Builds a Postgres-shaped error carrying a SQLSTATE `code`, as postgres.js throws. */
38+
function pgError(code: string): Error & { code: string } {
39+
return Object.assign(new Error(`pg error ${code}`), { code })
40+
}
41+
42+
/** Mirrors how drizzle wraps the driver error: the SQLSTATE lives on `cause`, not the outer error. */
43+
function wrappedPgError(code: string): Error {
44+
return new Error('Failed query', { cause: pgError(code) })
45+
}
46+
47+
describe('assertSchemaCompatibility', () => {
48+
beforeEach(() => {
49+
vi.clearAllMocks()
50+
})
51+
52+
it('resolves when the representative schema query succeeds', async () => {
53+
mockLimit.mockResolvedValueOnce([])
54+
55+
await expect(assertSchemaCompatibility()).resolves.toBeUndefined()
56+
57+
expect(mockLimit).toHaveBeenCalledTimes(1)
58+
})
59+
60+
it('throws immediately on an undefined-column mismatch without retrying', async () => {
61+
mockLimit.mockRejectedValue(pgError('42703'))
62+
63+
await expect(assertSchemaCompatibility()).rejects.toThrow(/incompatible with the live database/)
64+
65+
expect(mockLimit).toHaveBeenCalledTimes(1)
66+
expect(sleep).not.toHaveBeenCalled()
67+
})
68+
69+
it('throws immediately on an undefined-table mismatch', async () => {
70+
mockLimit.mockRejectedValue(pgError('42P01'))
71+
72+
await expect(assertSchemaCompatibility()).rejects.toThrow(/incompatible with the live database/)
73+
74+
expect(mockLimit).toHaveBeenCalledTimes(1)
75+
})
76+
77+
it('detects a schema mismatch wrapped in error.cause and fails fast', async () => {
78+
mockLimit.mockRejectedValue(wrappedPgError('42703'))
79+
80+
await expect(assertSchemaCompatibility()).rejects.toThrow(/incompatible with the live database/)
81+
82+
expect(mockLimit).toHaveBeenCalledTimes(1)
83+
expect(sleep).not.toHaveBeenCalled()
84+
})
85+
86+
it('retries transient connection errors and resolves once reachable', async () => {
87+
mockLimit
88+
.mockRejectedValueOnce(pgError('ECONNREFUSED'))
89+
.mockRejectedValueOnce(pgError('ECONNREFUSED'))
90+
.mockResolvedValueOnce([])
91+
92+
await expect(assertSchemaCompatibility()).resolves.toBeUndefined()
93+
94+
expect(mockLimit).toHaveBeenCalledTimes(3)
95+
expect(sleep).toHaveBeenCalledTimes(2)
96+
})
97+
98+
it('throws after exhausting retries when the database stays unreachable', async () => {
99+
mockLimit.mockRejectedValue(pgError('ECONNREFUSED'))
100+
101+
await expect(assertSchemaCompatibility()).rejects.toThrow(/database unreachable/)
102+
103+
expect(mockLimit).toHaveBeenCalledTimes(5)
104+
expect(sleep).toHaveBeenCalledTimes(4)
105+
})
106+
})
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
import { db } from '@sim/db'
2+
import { workflow } from '@sim/db/schema'
3+
import { createLogger } from '@sim/logger'
4+
import { getErrorMessage } from '@sim/utils/errors'
5+
import { sleep } from '@sim/utils/helpers'
6+
import { backoffWithJitter } from '@sim/utils/retry'
7+
8+
const logger = createLogger('SocketPreflight')
9+
10+
/**
11+
* Maximum attempts for the schema canary when the database is merely unreachable.
12+
* Connection-class failures are retried; schema-class failures fail immediately.
13+
*/
14+
const MAX_CONNECT_ATTEMPTS = 5
15+
16+
/**
17+
* Postgres SQLSTATE codes meaning the deployed image's compiled schema disagrees
18+
* with the live database (undefined column, table, or function). These never
19+
* self-heal, so retrying only delays an inevitable startup failure.
20+
*/
21+
const SCHEMA_MISMATCH_CODES = new Set(['42703', '42P01', '42883'])
22+
23+
/**
24+
* Walks the `cause` chain so a SQLSTATE code is found even when drizzle wraps the
25+
* driver error (the code commonly lives on the inner `cause`, not the outer throw).
26+
*/
27+
function isSchemaMismatch(error: unknown): boolean {
28+
const seen = new Set<unknown>()
29+
let current: unknown = error
30+
while (current && typeof current === 'object' && !seen.has(current)) {
31+
seen.add(current)
32+
const code = (current as { code?: unknown }).code
33+
if (typeof code === 'string' && SCHEMA_MISMATCH_CODES.has(code)) {
34+
return true
35+
}
36+
current = (current as { cause?: unknown }).cause
37+
}
38+
return false
39+
}
40+
41+
/**
42+
* Verifies, before the server accepts traffic, that the deployed image's schema
43+
* is compatible with the live database — throwing if it is not.
44+
*
45+
* Every socket is authorized against the `workflow` table through a full-row
46+
* drizzle projection. If the image's compiled schema is ahead of (or behind) the
47+
* database — e.g. a column dropped by a migration the image predates — that query
48+
* fails on every request and silently breaks persistence, yet the process stays
49+
* up and the shallow `/health` probe keeps returning 200. The fleet looks healthy
50+
* while serving nothing.
51+
*
52+
* Running one representative query at startup turns that latent, per-request
53+
* failure into an immediate startup failure: the throw propagates to the server
54+
* entrypoint, the task exits non-zero and never becomes healthy, and the deploy's
55+
* health gate never flips — so CodeDeploy auto-rolls-back instead of shifting
56+
* traffic onto broken tasks.
57+
*
58+
* Deliberately invoked once at startup and never from the per-probe load-balancer
59+
* health check: a deep dependency check on every probe would let a transient
60+
* database blip mass-terminate the whole fleet (cascading failure).
61+
*
62+
* @throws when the schema is incompatible, or the database stays unreachable
63+
* across {@link MAX_CONNECT_ATTEMPTS} attempts.
64+
*/
65+
export async function assertSchemaCompatibility(): Promise<void> {
66+
let lastError: unknown
67+
68+
for (let attempt = 1; attempt <= MAX_CONNECT_ATTEMPTS; attempt++) {
69+
try {
70+
await db.select().from(workflow).limit(1)
71+
logger.info('Schema-compatibility check passed')
72+
return
73+
} catch (error) {
74+
lastError = error
75+
76+
if (isSchemaMismatch(error)) {
77+
throw new Error(
78+
`Deployed image is incompatible with the live database schema: ${getErrorMessage(error)}`
79+
)
80+
}
81+
82+
if (attempt === MAX_CONNECT_ATTEMPTS) {
83+
break
84+
}
85+
86+
const delay = backoffWithJitter(attempt, null)
87+
logger.warn(
88+
`Schema-compatibility check could not reach the database (attempt ${attempt}/${MAX_CONNECT_ATTEMPTS}), retrying in ${Math.round(delay)}ms`,
89+
getErrorMessage(error)
90+
)
91+
await sleep(delay)
92+
}
93+
}
94+
95+
throw new Error(
96+
`Schema-compatibility check failed after ${MAX_CONNECT_ATTEMPTS} attempts — database unreachable: ${getErrorMessage(lastError)}`
97+
)
98+
}

apps/realtime/src/index.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { createServer } from 'http'
22
import { createLogger } from '@sim/logger'
33
import type { Server as SocketIOServer } from 'socket.io'
44
import { createSocketIOServer, shutdownSocketIOAdapter } from '@/config/socket'
5+
import { assertSchemaCompatibility } from '@/database/preflight'
56
import { env } from '@/env'
67
import { setupAllHandlers } from '@/handlers'
78
import { type AuthenticatedSocket, authenticateSocket } from '@/middleware/auth'
@@ -93,6 +94,8 @@ async function main() {
9394
setupAllHandlers(socket, roomManager)
9495
})
9596

97+
await assertSchemaCompatibility()
98+
9699
httpServer.listen(PORT, '0.0.0.0', () => {
97100
logger.info(`Socket.IO server running on port ${PORT}`)
98101
logger.info(`Health check available at: http://localhost:${PORT}/health`)

0 commit comments

Comments
 (0)