Skip to content

Commit 532b9ea

Browse files
committed
feat(realtime): preflight schema-compatibility check on startup
The socket service authorizes every connection with a full-row query against the workflow table. When a deploy ships a realtime image whose compiled schema is ahead of/behind the live DB (e.g. a column dropped by a migration the image predates), that query fails on every request and silently breaks persistence — yet the process stays up and the shallow /health probe keeps returning 200, so the deploy looks healthy while serving nothing. Run one representative workflow query before listen(): a schema mismatch throws, propagates to the entrypoint, and the task exits non-zero and never goes healthy, so CodeDeploy auto-rolls-back instead of shifting traffic onto broken tasks. Schema-class errors (undefined column/table/function) fail fast; connection-class errors retry with backoff so a cold DB at boot does not flap. Runs once at startup, never on the per-probe LB health check, to avoid a DB blip mass- terminating the fleet (cascading failure).
1 parent 62c48bf commit 532b9ea

3 files changed

Lines changed: 175 additions & 0 deletions

File tree

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
import { beforeEach, describe, expect, it, vi } from 'vitest'
2+
3+
const { mockLimit } = vi.hoisted(() => ({
4+
mockLimit: vi.fn(),
5+
}))
6+
7+
vi.mock('@sim/db', () => ({
8+
db: {
9+
select: () => ({
10+
from: () => ({
11+
limit: mockLimit,
12+
}),
13+
}),
14+
},
15+
}))
16+
17+
vi.mock('@sim/db/schema', () => ({
18+
workflow: {},
19+
}))
20+
21+
vi.mock('@sim/logger', () => ({
22+
createLogger: () => ({
23+
info: vi.fn(),
24+
warn: vi.fn(),
25+
error: vi.fn(),
26+
debug: vi.fn(),
27+
}),
28+
}))
29+
30+
vi.mock('@sim/utils/helpers', () => ({
31+
sleep: vi.fn().mockResolvedValue(undefined),
32+
}))
33+
34+
import { sleep } from '@sim/utils/helpers'
35+
import { assertSchemaCompatibility } from '@/database/preflight'
36+
37+
/** Builds a Postgres-shaped error carrying a SQLSTATE `code`, as postgres.js throws. */
38+
function pgError(code: string): Error & { code: string } {
39+
return Object.assign(new Error(`pg error ${code}`), { code })
40+
}
41+
42+
describe('assertSchemaCompatibility', () => {
43+
beforeEach(() => {
44+
vi.clearAllMocks()
45+
})
46+
47+
it('resolves when the representative schema query succeeds', async () => {
48+
mockLimit.mockResolvedValueOnce([])
49+
50+
await expect(assertSchemaCompatibility()).resolves.toBeUndefined()
51+
52+
expect(mockLimit).toHaveBeenCalledTimes(1)
53+
})
54+
55+
it('throws immediately on an undefined-column mismatch without retrying', async () => {
56+
mockLimit.mockRejectedValue(pgError('42703'))
57+
58+
await expect(assertSchemaCompatibility()).rejects.toThrow(/incompatible with the live database/)
59+
60+
expect(mockLimit).toHaveBeenCalledTimes(1)
61+
expect(sleep).not.toHaveBeenCalled()
62+
})
63+
64+
it('throws immediately on an undefined-table mismatch', async () => {
65+
mockLimit.mockRejectedValue(pgError('42P01'))
66+
67+
await expect(assertSchemaCompatibility()).rejects.toThrow(/incompatible with the live database/)
68+
69+
expect(mockLimit).toHaveBeenCalledTimes(1)
70+
})
71+
72+
it('retries transient connection errors and resolves once reachable', async () => {
73+
mockLimit
74+
.mockRejectedValueOnce(pgError('ECONNREFUSED'))
75+
.mockRejectedValueOnce(pgError('ECONNREFUSED'))
76+
.mockResolvedValueOnce([])
77+
78+
await expect(assertSchemaCompatibility()).resolves.toBeUndefined()
79+
80+
expect(mockLimit).toHaveBeenCalledTimes(3)
81+
expect(sleep).toHaveBeenCalledTimes(2)
82+
})
83+
84+
it('throws after exhausting retries when the database stays unreachable', async () => {
85+
mockLimit.mockRejectedValue(pgError('ECONNREFUSED'))
86+
87+
await expect(assertSchemaCompatibility()).rejects.toThrow(/database unreachable/)
88+
89+
expect(mockLimit).toHaveBeenCalledTimes(5)
90+
})
91+
})
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
import { db } from '@sim/db'
2+
import { workflow } from '@sim/db/schema'
3+
import { createLogger } from '@sim/logger'
4+
import { getErrorMessage } from '@sim/utils/errors'
5+
import { sleep } from '@sim/utils/helpers'
6+
import { backoffWithJitter } from '@sim/utils/retry'
7+
8+
const logger = createLogger('SocketPreflight')
9+
10+
/**
11+
* Maximum attempts for the schema canary when the database is merely unreachable.
12+
* Connection-class failures are retried; schema-class failures fail immediately.
13+
*/
14+
const MAX_CONNECT_ATTEMPTS = 5
15+
16+
/**
17+
* Postgres SQLSTATE codes meaning the deployed image's compiled schema disagrees
18+
* with the live database (undefined column, table, or function). These never
19+
* self-heal, so retrying only delays an inevitable startup failure.
20+
*/
21+
const SCHEMA_MISMATCH_CODES = new Set(['42703', '42P01', '42883'])
22+
23+
function isSchemaMismatch(error: unknown): boolean {
24+
const code = (error as { code?: unknown })?.code
25+
return typeof code === 'string' && SCHEMA_MISMATCH_CODES.has(code)
26+
}
27+
28+
/**
29+
* Verifies, before the server accepts traffic, that the deployed image's schema
30+
* is compatible with the live database — throwing if it is not.
31+
*
32+
* Every socket is authorized against the `workflow` table through a full-row
33+
* drizzle projection. If the image's compiled schema is ahead of (or behind) the
34+
* database — e.g. a column dropped by a migration the image predates — that query
35+
* fails on every request and silently breaks persistence, yet the process stays
36+
* up and the shallow `/health` probe keeps returning 200. The fleet looks healthy
37+
* while serving nothing.
38+
*
39+
* Running one representative query at startup turns that latent, per-request
40+
* failure into an immediate startup failure: the throw propagates to the server
41+
* entrypoint, the task exits non-zero and never becomes healthy, and the deploy's
42+
* health gate never flips — so CodeDeploy auto-rolls-back instead of shifting
43+
* traffic onto broken tasks.
44+
*
45+
* Deliberately invoked once at startup and never from the per-probe load-balancer
46+
* health check: a deep dependency check on every probe would let a transient
47+
* database blip mass-terminate the whole fleet (cascading failure).
48+
*
49+
* @throws when the schema is incompatible, or the database stays unreachable
50+
* across {@link MAX_CONNECT_ATTEMPTS} attempts.
51+
*/
52+
export async function assertSchemaCompatibility(): Promise<void> {
53+
let lastError: unknown
54+
55+
for (let attempt = 1; attempt <= MAX_CONNECT_ATTEMPTS; attempt++) {
56+
try {
57+
await db.select().from(workflow).limit(1)
58+
logger.info('Schema-compatibility check passed')
59+
return
60+
} catch (error) {
61+
lastError = error
62+
63+
if (isSchemaMismatch(error)) {
64+
throw new Error(
65+
`Deployed image is incompatible with the live database schema: ${getErrorMessage(error)}`
66+
)
67+
}
68+
69+
const delay = backoffWithJitter(attempt, null)
70+
logger.warn(
71+
`Schema-compatibility check could not reach the database (attempt ${attempt}/${MAX_CONNECT_ATTEMPTS}), retrying in ${Math.round(delay)}ms`,
72+
getErrorMessage(error)
73+
)
74+
await sleep(delay)
75+
}
76+
}
77+
78+
throw new Error(
79+
`Schema-compatibility check failed after ${MAX_CONNECT_ATTEMPTS} attempts — database unreachable: ${getErrorMessage(lastError)}`
80+
)
81+
}

apps/realtime/src/index.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { createServer } from 'http'
22
import { createLogger } from '@sim/logger'
33
import type { Server as SocketIOServer } from 'socket.io'
44
import { createSocketIOServer, shutdownSocketIOAdapter } from '@/config/socket'
5+
import { assertSchemaCompatibility } from '@/database/preflight'
56
import { env } from '@/env'
67
import { setupAllHandlers } from '@/handlers'
78
import { type AuthenticatedSocket, authenticateSocket } from '@/middleware/auth'
@@ -93,6 +94,8 @@ async function main() {
9394
setupAllHandlers(socket, roomManager)
9495
})
9596

97+
await assertSchemaCompatibility()
98+
9699
httpServer.listen(PORT, '0.0.0.0', () => {
97100
logger.info(`Socket.IO server running on port ${PORT}`)
98101
logger.info(`Health check available at: http://localhost:${PORT}/health`)

0 commit comments

Comments
 (0)