Skip to content

Commit 8a539f6

Browse files
fix(workspace-events): keyset-paginate the watched-workflow scan
The 500-row LIMIT silently and deterministically excluded high-id workflows from no_activity coverage in watch-everything subscriptions on large workspaces. The scan now pages by workflow id, mirroring the subscription scan; per-workflow checks move into a helper so the pagination loop stays flat. Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent 233f541 commit 8a539f6

2 files changed

Lines changed: 122 additions & 54 deletions

File tree

apps/sim/lib/workspace-events/no-activity.test.ts

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ vi.mock('@/lib/workspace-events/rules', () => ({
3535

3636
import {
3737
NO_ACTIVITY_SUBSCRIPTION_PAGE_SIZE,
38+
NO_ACTIVITY_WORKFLOW_PAGE_SIZE,
3839
pollNoActivityEvents,
3940
} from '@/lib/workspace-events/no-activity'
4041
import type { SimSubscriptionConfig } from '@/lib/workspace-events/types'
@@ -238,6 +239,37 @@ describe('pollNoActivityEvents', () => {
238239
)
239240
})
240241

242+
it('pages through watched workflows past the page size with a keyset cursor (no lost coverage)', async () => {
243+
// Full first page of watched workflows all inside their cooldown (skipped
244+
// without activity queries), then a second page holding the quiet
245+
// workflow that must still be reached.
246+
mockReadLastFiredAt.mockImplementation((_wf: string, _block: string, scopeKey: string) =>
247+
Promise.resolve(scopeKey.startsWith('wf-p1-') ? new Date() : null)
248+
)
249+
const firstPage = Array.from({ length: NO_ACTIVITY_WORKFLOW_PAGE_SIZE }, (_, i) => ({
250+
id: `wf-p1-${i}`,
251+
name: `Workflow ${i}`,
252+
}))
253+
dbChainMockFns.limit
254+
.mockResolvedValueOnce([makeSubscriptionRow(makeConfig())])
255+
.mockResolvedValueOnce(firstPage)
256+
.mockResolvedValueOnce([{ id: 'wf-quiet', name: 'Quiet Workflow' }])
257+
.mockResolvedValueOnce([])
258+
259+
const result = await pollNoActivityEvents()
260+
261+
expect(result.checked).toBe(NO_ACTIVITY_WORKFLOW_PAGE_SIZE + 1)
262+
expect(result.skipped).toBe(NO_ACTIVITY_WORKFLOW_PAGE_SIZE)
263+
expect(result.fired).toBe(1)
264+
expect(mockDispatchSimEvent.mock.calls[0][1]).toMatchObject({ workflowId: 'wf-quiet' })
265+
expect(allWhereConditions()).toContainEqual(
266+
expect.objectContaining({
267+
type: 'gt',
268+
right: `wf-p1-${NO_ACTIVITY_WORKFLOW_PAGE_SIZE - 1}`,
269+
})
270+
)
271+
})
272+
241273
it('excludes the subscriber workflow in SQL (before the LIMIT)', async () => {
242274
dbChainMockFns.limit
243275
.mockResolvedValueOnce([makeSubscriptionRow(makeConfig())])

apps/sim/lib/workspace-events/no-activity.ts

Lines changed: 90 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,11 @@ const logger = createLogger('WorkspaceEventNoActivity')
1818
*/
1919
export const NO_ACTIVITY_SUBSCRIPTION_PAGE_SIZE = 500
2020

21-
/** Bound on watched workflows checked per subscription per poll. */
22-
const MAX_WORKFLOWS_PER_SUBSCRIPTION = 500
21+
/**
22+
* Page size for the keyset-paginated watched-workflow scan. Every watched
23+
* workflow is visited each poll — pagination bounds memory, not total work.
24+
*/
25+
export const NO_ACTIVITY_WORKFLOW_PAGE_SIZE = 500
2326

2427
export interface NoActivityPollResult {
2528
subscriptions: number
@@ -72,21 +75,23 @@ async function fetchNoActivitySubscriptionPage(
7275
}
7376

7477
/**
75-
* Resolves the workflows a no_activity subscription watches: deployed, active
76-
* workflows in the subscriber's workspace, minus the subscriber itself,
77-
* narrowed to the explicit selection when one is set (empty selection watches
78-
* everything). Deployed-only keeps never-runnable draft workflows from
79-
* alerting forever.
78+
* Fetches one page of the workflows a no_activity subscription watches:
79+
* deployed, active workflows in the subscriber's workspace, minus the
80+
* subscriber itself, narrowed to the explicit selection when one is set
81+
* (empty selection watches everything). Deployed-only keeps never-runnable
82+
* draft workflows from alerting forever. Keyset-paginated by workflow id so
83+
* watch-everything subscriptions in large workspaces never silently lose
84+
* coverage past a cap.
8085
*/
81-
async function fetchWatchedWorkflows(
86+
async function fetchWatchedWorkflowPage(
8287
workspaceId: string,
8388
subscriberWorkflowId: string,
84-
config: SimSubscriptionConfig
89+
config: SimSubscriptionConfig,
90+
afterWorkflowId: string | null
8591
): Promise<Array<{ id: string; name: string }>> {
8692
// Subscriber exclusion and the explicit selection must be SQL conditions:
87-
// filtering in memory after an unordered LIMIT could permanently drop an
88-
// explicitly watched workflow in workspaces above the cap. The ORDER BY
89-
// keeps the capped scan deterministic across polls.
93+
// filtering in memory after the LIMIT could drop an explicitly watched
94+
// workflow. The ORDER BY drives the keyset cursor.
9095
const conditions = [
9196
eq(workflow.workspaceId, workspaceId),
9297
eq(workflow.isDeployed, true),
@@ -96,13 +101,16 @@ async function fetchWatchedWorkflows(
96101
if (config.workflowIds.length > 0) {
97102
conditions.push(inArray(workflow.id, config.workflowIds))
98103
}
104+
if (afterWorkflowId !== null) {
105+
conditions.push(gt(workflow.id, afterWorkflowId))
106+
}
99107

100108
return db
101109
.select({ id: workflow.id, name: workflow.name })
102110
.from(workflow)
103111
.where(and(...conditions))
104112
.orderBy(asc(workflow.id))
105-
.limit(MAX_WORKFLOWS_PER_SUBSCRIPTION)
113+
.limit(NO_ACTIVITY_WORKFLOW_PAGE_SIZE)
106114
}
107115

108116
/** True when the workflow had at least one qualifying execution inside the window. */
@@ -136,9 +144,66 @@ function noActivityCooldownMs(config: SimSubscriptionConfig): number {
136144
return Math.max(SIM_RULE_COOLDOWN_HOURS, config.inactivityHours) * 60 * 60 * 1000
137145
}
138146

147+
/**
148+
* Checks one watched workflow and fires when it has gone quiet, accumulating
149+
* counts into `result`.
150+
*/
151+
async function checkWatchedWorkflow(
152+
subscription: SimSubscription,
153+
config: SimSubscriptionConfig,
154+
sourceWorkflow: { id: string; name: string },
155+
result: NoActivityPollResult
156+
): Promise<void> {
157+
result.checked++
158+
159+
const blockKey = subscription.webhook.blockId ?? subscription.webhook.path
160+
const cooldownMs = noActivityCooldownMs(config)
161+
162+
const lastFiredAt = await readLastFiredAt(
163+
subscription.webhook.workflowId,
164+
blockKey,
165+
sourceWorkflow.id
166+
)
167+
if (isWithinCooldown(lastFiredAt, cooldownMs)) {
168+
result.skipped++
169+
return
170+
}
171+
172+
if (await hasRecentActivity(sourceWorkflow.id, config)) {
173+
result.skipped++
174+
return
175+
}
176+
177+
const claimed = await claimCooldown(
178+
subscription.webhook.workflowId,
179+
blockKey,
180+
sourceWorkflow.id,
181+
cooldownMs
182+
)
183+
if (!claimed) {
184+
result.skipped++
185+
return
186+
}
187+
188+
const payload = buildNoActivityEventPayload({
189+
workflowId: sourceWorkflow.id,
190+
workflowName: sourceWorkflow.name,
191+
})
192+
193+
await dispatchSimEvent(subscription, payload)
194+
result.fired++
195+
196+
logger.info(`no_activity event fired for workflow ${sourceWorkflow.id}`, {
197+
subscriberWorkflowId: subscription.webhook.workflowId,
198+
inactivityHours: config.inactivityHours,
199+
})
200+
}
201+
139202
/**
140203
* Checks a single no_activity subscription's watched workflows and fires
141-
* events for the inactive ones, accumulating counts into `result`.
204+
* events for the inactive ones, accumulating counts into `result`. The
205+
* watched-workflow scan is keyset-paginated, so coverage is complete even in
206+
* workspaces with more workflows than one page.
142207
*/
143208
async function checkSubscription(
144209
subscription: SimSubscription,
@@ -150,52 +215,23 @@ async function checkSubscription(
150215
const workspaceId = subscription.workflow.workspaceId
151216
if (!workspaceId) return
152217

153-
const blockKey = subscription.webhook.blockId ?? subscription.webhook.path
154-
const cooldownMs = noActivityCooldownMs(config)
155-
156-
const watched = await fetchWatchedWorkflows(workspaceId, subscription.webhook.workflowId, config)
157-
158-
for (const sourceWorkflow of watched) {
159-
result.checked++
160-
161-
const lastFiredAt = await readLastFiredAt(
218+
let cursor: string | null = null
219+
while (true) {
220+
const page = await fetchWatchedWorkflowPage(
221+
workspaceId,
162222
subscription.webhook.workflowId,
163-
blockKey,
164-
sourceWorkflow.id
223+
config,
224+
cursor
165225
)
166-
if (isWithinCooldown(lastFiredAt, cooldownMs)) {
167-
result.skipped++
168-
continue
169-
}
226+
if (page.length === 0) break
170227

171-
if (await hasRecentActivity(sourceWorkflow.id, config)) {
172-
result.skipped++
173-
continue
174-
}
228+
cursor = page[page.length - 1].id
175229

176-
const claimed = await claimCooldown(
177-
subscription.webhook.workflowId,
178-
blockKey,
179-
sourceWorkflow.id,
180-
cooldownMs
181-
)
182-
if (!claimed) {
183-
result.skipped++
184-
continue
230+
for (const sourceWorkflow of page) {
231+
await checkWatchedWorkflow(subscription, config, sourceWorkflow, result)
185232
}
186233

187-
const payload = buildNoActivityEventPayload({
188-
workflowId: sourceWorkflow.id,
189-
workflowName: sourceWorkflow.name,
190-
})
191-
192-
await dispatchSimEvent(subscription, payload)
193-
result.fired++
194-
195-
logger.info(`no_activity event fired for workflow ${sourceWorkflow.id}`, {
196-
subscriberWorkflowId: subscription.webhook.workflowId,
197-
inactivityHours: config.inactivityHours,
198-
})
234+
if (page.length < NO_ACTIVITY_WORKFLOW_PAGE_SIZE) break
199235
}
200236
}
201237

0 commit comments

Comments
 (0)