Skip to content

Commit 328963e

Browse files
committed
improvement(db): route additional staleness-tolerant reads to the read replica
1 parent 9efb7b4 commit 328963e

7 files changed

Lines changed: 35 additions & 35 deletions

File tree

apps/sim/app/api/workspaces/[id]/metrics/executions/route.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { db } from '@sim/db'
1+
import { db, dbReplica } from '@sim/db'
22
import { pausedExecutions, permissions, workflow, workflowExecutionLogs } from '@sim/db/schema'
33
import { createLogger } from '@sim/logger'
44
import { and, eq, gte, inArray, isNotNull, isNull, lte, or, type SQL, sql } from 'drizzle-orm'
@@ -60,7 +60,7 @@ export const GET = withRouteHandler(
6060
wfWhere.push(inArray(workflow.id, wfList))
6161
}
6262

63-
const workflows = await db
63+
const workflows = await dbReplica
6464
.select({ id: workflow.id, name: workflow.name })
6565
.from(workflow)
6666
.where(and(...wfWhere))
@@ -124,7 +124,7 @@ export const GET = withRouteHandler(
124124
}
125125

126126
if (isAllTime) {
127-
const boundsQuery = db
127+
const boundsQuery = dbReplica
128128
.select({
129129
minDate: sql<Date>`MIN(${workflowExecutionLogs.startedAt})`,
130130
maxDate: sql<Date>`MAX(${workflowExecutionLogs.startedAt})`,
@@ -168,7 +168,7 @@ export const GET = withRouteHandler(
168168
lte(workflowExecutionLogs.startedAt, end),
169169
]
170170

171-
const logs = await db
171+
const logs = await dbReplica
172172
.select({
173173
workflowId: workflowExecutionLogs.workflowId,
174174
level: workflowExecutionLogs.level,

apps/sim/lib/copilot/chat/process-contents.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { db } from '@sim/db'
1+
import { dbReplica } from '@sim/db'
22
import { knowledgeBase } from '@sim/db/schema'
33
import { createLogger } from '@sim/logger'
44
import {
@@ -454,7 +454,7 @@ async function processKnowledgeFromDb(
454454
if (currentWorkspaceId) {
455455
conditions.push(eq(knowledgeBase.workspaceId, currentWorkspaceId))
456456
}
457-
const kbRows = await db
457+
const kbRows = await dbReplica
458458
.select({
459459
id: knowledgeBase.id,
460460
name: knowledgeBase.name,
@@ -562,8 +562,8 @@ async function processExecutionLogFromDb(
562562
): Promise<AgentContext | null> {
563563
try {
564564
const { workflowExecutionLogs, workflow } = await import('@sim/db/schema')
565-
const { db } = await import('@sim/db')
566-
const rows = await db
565+
const { dbReplica } = await import('@sim/db')
566+
const rows = await dbReplica
567567
.select({
568568
id: workflowExecutionLogs.id,
569569
workflowId: workflowExecutionLogs.workflowId,

apps/sim/lib/copilot/chat/workspace-context.ts

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { db } from '@sim/db'
1+
import { dbReplica } from '@sim/db'
22
import {
33
knowledgeBase,
44
knowledgeConnector,
@@ -302,7 +302,7 @@ export async function generateWorkspaceContext(
302302
] = await Promise.all([
303303
getUsersWithPermissions(workspaceId),
304304

305-
db
305+
dbReplica
306306
.select({
307307
id: workflow.id,
308308
name: workflow.name,
@@ -314,7 +314,7 @@ export async function generateWorkspaceContext(
314314
.from(workflow)
315315
.where(and(eq(workflow.workspaceId, workspaceId), isNull(workflow.archivedAt))),
316316

317-
db
317+
dbReplica
318318
.select({
319319
id: workflowFolder.id,
320320
name: workflowFolder.name,
@@ -323,7 +323,7 @@ export async function generateWorkspaceContext(
323323
.from(workflowFolder)
324324
.where(and(eq(workflowFolder.workspaceId, workspaceId), isNull(workflowFolder.archivedAt))),
325325

326-
db
326+
dbReplica
327327
.select({
328328
id: knowledgeBase.id,
329329
name: knowledgeBase.name,
@@ -332,7 +332,7 @@ export async function generateWorkspaceContext(
332332
.from(knowledgeBase)
333333
.where(and(eq(knowledgeBase.workspaceId, workspaceId), isNull(knowledgeBase.deletedAt))),
334334

335-
db
335+
dbReplica
336336
.select({
337337
id: userTableDefinitions.id,
338338
name: userTableDefinitions.name,
@@ -352,7 +352,7 @@ export async function generateWorkspaceContext(
352352

353353
listCustomTools({ userId, workspaceId }),
354354

355-
db
355+
dbReplica
356356
.select({
357357
id: mcpServers.id,
358358
name: mcpServers.name,
@@ -364,7 +364,7 @@ export async function generateWorkspaceContext(
364364

365365
listSkills({ workspaceId, includeBuiltins: false }),
366366

367-
db
367+
dbReplica
368368
.select({
369369
id: workflowSchedule.id,
370370
jobTitle: workflowSchedule.jobTitle,
@@ -388,7 +388,7 @@ export async function generateWorkspaceContext(
388388
tables.length > 0
389389
? await Promise.all(
390390
tables.map(async (t) => {
391-
const [row] = await db
391+
const [row] = await dbReplica
392392
.select({ count: count() })
393393
.from(userTableRows)
394394
.where(eq(userTableRows.tableId, t.id))
@@ -400,7 +400,7 @@ export async function generateWorkspaceContext(
400400
const kbIds = kbs.map((kb) => kb.id)
401401
const connectorRows =
402402
kbIds.length > 0
403-
? await db
403+
? await dbReplica
404404
.select({
405405
knowledgeBaseId: knowledgeConnector.knowledgeBaseId,
406406
connectorType: knowledgeConnector.connectorType,

apps/sim/lib/knowledge/chunks/service.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { db } from '@sim/db'
1+
import { db, dbReplica } from '@sim/db'
22
import { document, embedding, knowledgeBase } from '@sim/db/schema'
33
import { createLogger } from '@sim/logger'
44
import { sha256Hex } from '@sim/security/hash'
@@ -46,7 +46,7 @@ export async function queryChunks(
4646
conditions.push(ilike(embedding.content, `%${search}%`))
4747
}
4848

49-
const chunks = await db
49+
const chunks = await dbReplica
5050
.select({
5151
id: embedding.id,
5252
chunkIndex: embedding.chunkIndex,
@@ -82,7 +82,7 @@ export async function queryChunks(
8282
.limit(limit)
8383
.offset(offset)
8484

85-
const totalCount = await db
85+
const totalCount = await dbReplica
8686
.select({ count: sql`count(*)` })
8787
.from(embedding)
8888
.where(and(...conditions))

apps/sim/lib/knowledge/tags/service.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { db } from '@sim/db'
1+
import { db, dbReplica } from '@sim/db'
22
import { document, embedding, knowledgeBaseTagDefinitions } from '@sim/db/schema'
33
import { createLogger } from '@sim/logger'
44
import { generateId } from '@sim/utils/id'
@@ -99,7 +99,7 @@ export async function getNextAvailableSlot(
9999
export async function getDocumentTagDefinitions(
100100
knowledgeBaseId: string
101101
): Promise<DocumentTagDefinition[]> {
102-
const definitions = await db
102+
const definitions = await dbReplica
103103
.select({
104104
id: knowledgeBaseTagDefinitions.id,
105105
knowledgeBaseId: knowledgeBaseTagDefinitions.knowledgeBaseId,
@@ -123,7 +123,7 @@ export async function getDocumentTagDefinitions(
123123
* Get all tag definitions for a knowledge base (alias for compatibility)
124124
*/
125125
export async function getTagDefinitions(knowledgeBaseId: string): Promise<TagDefinition[]> {
126-
const tagDefinitions = await db
126+
const tagDefinitions = await dbReplica
127127
.select({
128128
id: knowledgeBaseTagDefinitions.id,
129129
tagSlot: knowledgeBaseTagDefinitions.tagSlot,
@@ -655,7 +655,7 @@ export async function getTagUsage(
655655
whereConditions.push(sql`${sql.raw(tagSlot)} != ''`)
656656
}
657657

658-
const documentsWithTag = await db
658+
const documentsWithTag = await dbReplica
659659
.select({
660660
id: document.id,
661661
filename: document.filename,
@@ -703,7 +703,7 @@ export async function getTagUsageStats(
703703
const tagSlot = def.tagSlot
704704
validateTagSlot(tagSlot)
705705

706-
const docCountResult = await db
706+
const docCountResult = await dbReplica
707707
.select({ count: sql<number>`count(*)` })
708708
.from(document)
709709
.where(
@@ -716,7 +716,7 @@ export async function getTagUsageStats(
716716
)
717717
)
718718

719-
const chunkCountResult = await db
719+
const chunkCountResult = await dbReplica
720720
.select({ count: sql<number>`count(*)` })
721721
.from(embedding)
722722
.innerJoin(document, eq(embedding.documentId, document.id))

apps/sim/lib/workspace-events/no-activity.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { db } from '@sim/db'
1+
import { dbReplica } from '@sim/db'
22
import { webhook, workflow, workflowDeploymentVersion, workflowExecutionLogs } from '@sim/db/schema'
33
import { createLogger } from '@sim/logger'
44
import { and, asc, eq, gt, gte, inArray, isNull, ne, or, sql } from 'drizzle-orm'
@@ -42,7 +42,7 @@ export interface NoActivityPollResult {
4242
async function fetchNoActivitySubscriptionPage(
4343
afterWebhookId: string | null
4444
): Promise<SimSubscription[]> {
45-
const rows = await db
45+
const rows = await dbReplica
4646
.select({ webhook, workflow })
4747
.from(webhook)
4848
.innerJoin(workflow, eq(webhook.workflowId, workflow.id))
@@ -105,7 +105,7 @@ async function fetchWatchedWorkflowPage(
105105
conditions.push(gt(workflow.id, afterWorkflowId))
106106
}
107107

108-
return db
108+
return dbReplica
109109
.select({ id: workflow.id, name: workflow.name })
110110
.from(workflow)
111111
.where(and(...conditions))
@@ -120,7 +120,7 @@ async function hasRecentActivity(
120120
): Promise<boolean> {
121121
const windowStart = new Date(Date.now() - config.inactivityHours * 60 * 60 * 1000)
122122

123-
const recentLogs = await db
123+
const recentLogs = await dbReplica
124124
.select({ id: workflowExecutionLogs.id })
125125
.from(workflowExecutionLogs)
126126
.where(

apps/sim/lib/workspace-events/rules.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { db } from '@sim/db'
1+
import { dbReplica } from '@sim/db'
22
import { workflowExecutionLogs } from '@sim/db/schema'
33
import { createLogger } from '@sim/logger'
44
import { and, avg, count, desc, eq, gte, ne, type SQL, sql } from 'drizzle-orm'
@@ -22,7 +22,7 @@ export function excludeSimExecutionsCondition(): SQL {
2222
}
2323

2424
async function checkConsecutiveFailures(workflowId: string, threshold: number): Promise<boolean> {
25-
const recentLogs = await db
25+
const recentLogs = await dbReplica
2626
.select({ level: workflowExecutionLogs.level })
2727
.from(workflowExecutionLogs)
2828
.where(and(eq(workflowExecutionLogs.workflowId, workflowId), excludeSimExecutionsCondition()))
@@ -51,7 +51,7 @@ async function checkFailureRate(
5151

5252
// Single DB-side aggregate: the window is user-configured and this runs on
5353
// the execution-completion path, so never materialize the in-window rows.
54-
const result = await db
54+
const result = await dbReplica
5555
.select({
5656
total: count(),
5757
errors: count(sql`case when ${workflowExecutionLogs.level} = 'error' then 1 end`),
@@ -82,7 +82,7 @@ async function checkLatencySpike(
8282
): Promise<boolean> {
8383
const windowStart = new Date(Date.now() - windowHours * 60 * 60 * 1000)
8484

85-
const result = await db
85+
const result = await dbReplica
8686
.select({
8787
avgDuration: avg(workflowExecutionLogs.totalDurationMs),
8888
count: count(),
@@ -114,7 +114,7 @@ async function checkErrorCount(
114114
): Promise<boolean> {
115115
const windowStart = new Date(Date.now() - windowHours * 60 * 60 * 1000)
116116

117-
const result = await db
117+
const result = await dbReplica
118118
.select({ count: count() })
119119
.from(workflowExecutionLogs)
120120
.where(

0 commit comments

Comments
 (0)