Skip to content

Commit 1fd5b0d

Browse files
committed
fix(data-drains): add stability window to time cursors so late-visible rows are never skipped
1 parent 8b6184f commit 1fd5b0d

6 files changed

Lines changed: 27 additions & 2 deletions

File tree

apps/sim/lib/data-drains/sources/audit-logs.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import {
66
encodeTimeCursor,
77
timeCursorOrderBy,
88
timeCursorPredicate,
9+
timeCursorStabilityBound,
910
} from '@/lib/data-drains/sources/cursor'
1011
import { getOrganizationWorkspaceIds } from '@/lib/data-drains/sources/helpers'
1112
import type { Cursor, DrainSource, SourcePageInput } from '@/lib/data-drains/types'
@@ -38,7 +39,7 @@ async function* pages(input: SourcePageInput): AsyncIterable<AuditLogRow[]> {
3839
const rows = await dbReplica
3940
.select()
4041
.from(auditLog)
41-
.where(and(scopeClause, cursorClause))
42+
.where(and(scopeClause, timeCursorStabilityBound(auditLog.createdAt), cursorClause))
4243
.orderBy(...timeCursorOrderBy(auditLog.createdAt, auditLog.id))
4344
.limit(input.chunkSize)
4445

apps/sim/lib/data-drains/sources/copilot-chats.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import {
66
encodeTimeCursor,
77
timeCursorOrderBy,
88
timeCursorPredicate,
9+
timeCursorStabilityBound,
910
} from '@/lib/data-drains/sources/cursor'
1011
import { getOrganizationWorkspaceIds } from '@/lib/data-drains/sources/helpers'
1112
import type { Cursor, DrainSource, SourcePageInput } from '@/lib/data-drains/types'
@@ -58,7 +59,13 @@ async function* pages(input: SourcePageInput): AsyncIterable<CopilotChatRow[]> {
5859
const metaRows = await dbReplica
5960
.select(chatColumns)
6061
.from(copilotChats)
61-
.where(and(inArray(copilotChats.workspaceId, workspaceIds), cursorClause))
62+
.where(
63+
and(
64+
inArray(copilotChats.workspaceId, workspaceIds),
65+
timeCursorStabilityBound(copilotChats.createdAt),
66+
cursorClause
67+
)
68+
)
6269
.orderBy(...timeCursorOrderBy(copilotChats.createdAt, copilotChats.id))
6370
.limit(input.chunkSize)
6471

apps/sim/lib/data-drains/sources/copilot-runs.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import {
66
encodeTimeCursor,
77
timeCursorOrderBy,
88
timeCursorPredicate,
9+
timeCursorStabilityBound,
910
} from '@/lib/data-drains/sources/cursor'
1011
import { getOrganizationWorkspaceIds } from '@/lib/data-drains/sources/helpers'
1112
import type { Cursor, DrainSource, SourcePageInput } from '@/lib/data-drains/types'
@@ -31,6 +32,7 @@ async function* pages(input: SourcePageInput): AsyncIterable<CopilotRunRow[]> {
3132
and(
3233
inArray(copilotRuns.workspaceId, workspaceIds),
3334
isNotNull(copilotRuns.completedAt),
35+
timeCursorStabilityBound(copilotRuns.completedAt),
3436
cursorClause
3537
)
3638
)

apps/sim/lib/data-drains/sources/cursor.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,3 +54,14 @@ export function timeCursorPredicate(
5454
export function timeCursorOrderBy(timestampCol: PgColumn, idCol: PgColumn): [SQL, SQL] {
5555
return [sql`date_trunc('milliseconds', ${timestampCol}) asc`, sql`${idCol} asc`]
5656
}
57+
58+
/**
59+
* Excludes rows newer than a short stability window. Timestamp cursors assume
60+
* rows become visible in timestamp order, but out-of-order commits and replica
61+
* lag can surface an earlier-stamped row after the cursor has advanced past it
62+
* — permanently skipping it. Leaving the freshest rows for the next run bounds
63+
* both.
64+
*/
65+
export function timeCursorStabilityBound(timestampCol: PgColumn): SQL {
66+
return sql`${timestampCol} <= now() - interval '5 minutes'`
67+
}

apps/sim/lib/data-drains/sources/job-logs.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import {
66
encodeTimeCursor,
77
timeCursorOrderBy,
88
timeCursorPredicate,
9+
timeCursorStabilityBound,
910
} from '@/lib/data-drains/sources/cursor'
1011
import { getOrganizationWorkspaceIds } from '@/lib/data-drains/sources/helpers'
1112
import type { Cursor, DrainSource, SourcePageInput } from '@/lib/data-drains/types'
@@ -31,6 +32,7 @@ async function* pages(input: SourcePageInput): AsyncIterable<JobLogRow[]> {
3132
and(
3233
inArray(jobExecutionLogs.workspaceId, workspaceIds),
3334
isNotNull(jobExecutionLogs.endedAt),
35+
timeCursorStabilityBound(jobExecutionLogs.endedAt),
3436
cursorClause
3537
)
3638
)

apps/sim/lib/data-drains/sources/workflow-logs.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import {
77
encodeTimeCursor,
88
timeCursorOrderBy,
99
timeCursorPredicate,
10+
timeCursorStabilityBound,
1011
} from '@/lib/data-drains/sources/cursor'
1112
import { getOrganizationWorkspaceIds } from '@/lib/data-drains/sources/helpers'
1213
import type { Cursor, DrainSource, SourcePageInput } from '@/lib/data-drains/types'
@@ -40,6 +41,7 @@ async function* pages(input: SourcePageInput): AsyncIterable<WorkflowLogRow[]> {
4041
and(
4142
inArray(workflowExecutionLogs.workspaceId, workspaceIds),
4243
isNotNull(workflowExecutionLogs.endedAt),
44+
timeCursorStabilityBound(workflowExecutionLogs.endedAt),
4345
cursorClause
4446
)
4547
)

0 commit comments

Comments
 (0)