Skip to content

Commit 85886b9

Browse files
authored
feat(webapp,supervisor): isolate scheduled runs on a dedicated worker queue (#3839)
## Summary Scheduled runs and their descendants can now be routed to a dedicated per-region worker queue, processed by a separate worker fleet, so a burst of scheduled crons no longer competes with standard and agent runs for the same queue and inflates their startup latency. It is off by default and enabled per organization via a feature flag (with a global default), so nothing changes until it is turned on. ## Design At trigger time, any run whose lineage originates from a schedule (`rootTriggerSource === "schedule"`, which already propagates from a scheduled run down to all of its children) gets its worker queue suffixed with `:scheduled`. The worker queue name is an opaque string persisted on the run and used verbatim by enqueue and dequeue, so this needs no Lua, message-envelope, or concurrency changes. Concurrency stays keyed by environment and queue, not by worker queue. On the consumer side, the dequeue endpoint gains an optional `queueClass` selector. A supervisor sends `queueClass: "scheduled"` and the server derives the actual queue from the worker's own group, so a token can only ever reach its own region's queues. A fleet picks its class with the `TRIGGER_WORKER_QUEUE_CLASS` env var (`default` or `scheduled`), so a dedicated scheduled fleet can run alongside the standard one. Verified end to end against a local managed-worker setup: scheduled runs route to the dedicated queue, are drained only by the scheduled fleet, and standard runs are left untouched.
1 parent 884bea6 commit 85886b9

14 files changed

Lines changed: 426 additions & 8 deletions

File tree

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: webapp
3+
type: feature
4+
---
5+
6+
Scheduled runs and their descendants can be routed to a dedicated worker queue and processed by a separate worker fleet, isolating standard and agent run startup latency from scheduled-cron bursts. Off by default, enabled per organization via a feature flag.

apps/supervisor/src/env.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@ const Env = z
3434

3535
// Dequeue settings (provider mode)
3636
TRIGGER_DEQUEUE_ENABLED: BoolEnv.default(true),
37+
// Which worker-queue class this supervisor fleet serves. "default" pulls the
38+
// region queue (standard/agent runs); "scheduled" pulls the dedicated
39+
// scheduled-lineage queue. Run a separate fleet per class for isolation.
40+
TRIGGER_WORKER_QUEUE_CLASS: z.enum(["default", "scheduled"]).default("default"),
3741
TRIGGER_DEQUEUE_INTERVAL_MS: z.coerce.number().int().default(250),
3842
TRIGGER_DEQUEUE_IDLE_INTERVAL_MS: z.coerce.number().int().default(1000),
3943
TRIGGER_DEQUEUE_MAX_RUN_COUNT: z.coerce.number().int().default(1),

apps/supervisor/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,7 @@ class ManagedSupervisor {
190190
dequeueIdleIntervalMs: env.TRIGGER_DEQUEUE_IDLE_INTERVAL_MS,
191191
queueConsumerEnabled: env.TRIGGER_DEQUEUE_ENABLED,
192192
maxRunCount: env.TRIGGER_DEQUEUE_MAX_RUN_COUNT,
193+
queueClass: env.TRIGGER_WORKER_QUEUE_CLASS,
193194
metricsRegistry: register,
194195
scaling: {
195196
strategy: env.TRIGGER_DEQUEUE_SCALING_STRATEGY,

apps/webapp/app/env.server.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1059,6 +1059,16 @@ const EnvironmentSchema = z
10591059
COMMON_WORKER_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"),
10601060
COMMON_WORKER_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"),
10611061

1062+
// Global default for the scheduled worker-queue split. When "1", runs in a
1063+
// scheduled lineage (rootTriggerSource === "schedule") are routed to a
1064+
// dedicated `<region>:scheduled` worker queue so a separate consumer fleet
1065+
// can dequeue them independently of standard/agent runs. The per-org
1066+
// `workerQueueScheduledSplitEnabled` feature flag overrides this default in
1067+
// BOTH directions (an org set to false stays on the single queue even when
1068+
// this is "1"; an org set to true splits even when this is "0"). Never
1069+
// applies to DEVELOPMENT environments.
1070+
TRIGGER_WORKER_QUEUE_SCHEDULED_SPLIT_ENABLED: z.string().default("0"),
1071+
10621072
TRIGGER_MOLLIFIER_ENABLED: z.string().default("0"),
10631073
// Separate switch for the drainer (consumer side) so it can be split
10641074
// off onto a dedicated worker service. Unset → inherits

apps/webapp/app/routes/engine.v1.worker-actions.dequeue.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,13 @@ import { createActionWorkerApiRoute } from "~/services/routeBuilders/apiBuilder.
77

88
export const action = createActionWorkerApiRoute(
99
{
10-
body: WorkerApiDequeueRequestBody, // Even though we don't use it, we need to keep it for backwards compatibility
10+
body: WorkerApiDequeueRequestBody,
1111
},
1212
async ({
1313
authenticatedWorker,
1414
runnerId,
15+
body,
1516
}): Promise<TypedResponse<WorkerApiDequeueResponseBody>> => {
16-
return json(await authenticatedWorker.dequeue({ runnerId }));
17+
return json(await authenticatedWorker.dequeue({ runnerId, queueClass: body.queueClass }));
1718
}
1819
);
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
import type { WorkerQueueClass } from "@trigger.dev/core/v3/workers";
2+
import { FEATURE_FLAG, FeatureFlagCatalog } from "~/v3/featureFlags";
3+
4+
/**
5+
* Suffix appended to a region's worker queue name to route scheduled-lineage
6+
* runs onto their own Redis list (e.g. `us-nyc-3` -> `us-nyc-3:scheduled`). A
7+
* dedicated consumer fleet dequeues the suffixed list so the top-of-hour
8+
* scheduled-cron herd can't starve standard/agent run startup. The worker queue
9+
* name is opaque everywhere downstream (it's only ever `:`-joined into a Redis
10+
* key and persisted on the run), so encoding the class in the suffix needs no
11+
* Lua, envelope, or resolver changes.
12+
*/
13+
export const SCHEDULED_WORKER_QUEUE_SUFFIX = ":scheduled";
14+
15+
/** `TriggerSource` value used for runs originating from a schedule. */
16+
const SCHEDULE_TRIGGER_SOURCE = "schedule";
17+
18+
/**
19+
* Resolve whether the scheduled worker-queue split is enabled for a run, reading
20+
* only the in-memory org feature-flags JSON (already loaded on the authenticated
21+
* environment) — never a DB query, so it is safe on the trigger hot path.
22+
*
23+
* Precedence: a per-org override wins in BOTH directions; the global default is
24+
* used only when the org has not set the flag.
25+
*/
26+
export function resolveScheduledQueueSplitEnabled({
27+
orgFeatureFlags,
28+
globalDefault,
29+
}: {
30+
orgFeatureFlags: Record<string, unknown> | null | undefined;
31+
globalDefault: boolean;
32+
}): boolean {
33+
const override = orgFeatureFlags?.[FEATURE_FLAG.workerQueueScheduledSplitEnabled];
34+
35+
if (override !== undefined) {
36+
const parsed =
37+
FeatureFlagCatalog[FEATURE_FLAG.workerQueueScheduledSplitEnabled].safeParse(override);
38+
39+
if (parsed.success) {
40+
return parsed.data;
41+
}
42+
}
43+
44+
return globalDefault;
45+
}
46+
47+
/**
48+
* Pick the worker queue a run should be enqueued onto. Runs in a scheduled
49+
* lineage (`rootTriggerSource === "schedule"`, which propagates from a scheduled
50+
* root down to every descendant) route to the suffixed list when the split is
51+
* enabled; everything else is unchanged. Idempotent — never double-suffixes.
52+
*/
53+
export function workerQueueForRun({
54+
workerQueue,
55+
rootTriggerSource,
56+
splitEnabled,
57+
}: {
58+
workerQueue: string;
59+
rootTriggerSource: string | undefined;
60+
splitEnabled: boolean;
61+
}): string {
62+
if (
63+
!splitEnabled ||
64+
rootTriggerSource !== SCHEDULE_TRIGGER_SOURCE ||
65+
workerQueue.endsWith(SCHEDULED_WORKER_QUEUE_SUFFIX)
66+
) {
67+
return workerQueue;
68+
}
69+
70+
return `${workerQueue}${SCHEDULED_WORKER_QUEUE_SUFFIX}`;
71+
}
72+
73+
/**
74+
* Consumer-side counterpart to {@link workerQueueForRun}: given a worker's base
75+
* (region) queue and the requested queue class, return the worker queue to
76+
* dequeue from. `"scheduled"` targets the suffixed list; anything else is the
77+
* base queue. The server always derives this from the authenticated worker's
78+
* own `masterQueue`, so a token can only ever reach its own region's queues.
79+
* Idempotent — never double-suffixes.
80+
*/
81+
export function workerQueueForClass(
82+
masterQueue: string,
83+
queueClass: WorkerQueueClass | undefined
84+
): string {
85+
if (queueClass === "scheduled" && !masterQueue.endsWith(SCHEDULED_WORKER_QUEUE_SUFFIX)) {
86+
return `${masterQueue}${SCHEDULED_WORKER_QUEUE_SUFFIX}`;
87+
}
88+
89+
return masterQueue;
90+
}

apps/webapp/app/runEngine/services/triggerTask.server.ts

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@ import {
3434
IdempotencyKeyConcern,
3535
type ClaimedIdempotency,
3636
} from "../concerns/idempotencyKeys.server";
37+
import {
38+
resolveScheduledQueueSplitEnabled,
39+
workerQueueForRun,
40+
} from "../concerns/workerQueueSplit.server";
3741
import {
3842
publishClaim as publishMollifierClaim,
3943
releaseClaim as releaseMollifierClaim,
@@ -351,7 +355,7 @@ export class RunEngineTriggerTaskService {
351355
environment,
352356
body.options?.region
353357
);
354-
const workerQueue = workerQueueResult?.masterQueue;
358+
const baseWorkerQueue = workerQueueResult?.masterQueue;
355359
const enableFastPath = workerQueueResult?.enableFastPath ?? false;
356360

357361
// Build annotations for this run
@@ -366,6 +370,30 @@ export class RunEngineTriggerTaskService {
366370
taskKind: taskKind ?? "STANDARD",
367371
};
368372

373+
// Route runs in a scheduled lineage (the scheduled run itself and every
374+
// descendant, via the propagated rootTriggerSource) to a dedicated
375+
// `<region>:scheduled` worker queue so a separate consumer fleet can
376+
// dequeue them independently of standard/agent runs. Gated per-org with
377+
// a global default, never applied to dev. Reads only the in-memory org
378+
// flags already on the environment — no DB query on the hot path.
379+
const scheduledQueueSplitEnabled =
380+
environment.type !== "DEVELOPMENT" &&
381+
resolveScheduledQueueSplitEnabled({
382+
orgFeatureFlags: environment.organization.featureFlags as Record<
383+
string,
384+
unknown
385+
> | null,
386+
globalDefault: env.TRIGGER_WORKER_QUEUE_SCHEDULED_SPLIT_ENABLED === "1",
387+
});
388+
const workerQueue =
389+
baseWorkerQueue !== undefined
390+
? workerQueueForRun({
391+
workerQueue: baseWorkerQueue,
392+
rootTriggerSource: annotations.rootTriggerSource,
393+
splitEnabled: scheduledQueueSplitEnabled,
394+
})
395+
: baseWorkerQueue;
396+
369397
try {
370398
return await this.traceEventConcern.traceRun(
371399
triggerRequest,

apps/webapp/app/v3/featureFlags.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ export const FEATURE_FLAG = {
99
hasComputeAccess: "hasComputeAccess",
1010
hasPrivateConnections: "hasPrivateConnections",
1111
mollifierEnabled: "mollifierEnabled",
12+
workerQueueScheduledSplitEnabled: "workerQueueScheduledSplitEnabled",
1213
} as const;
1314

1415
export const FeatureFlagCatalog = {
@@ -20,6 +21,7 @@ export const FeatureFlagCatalog = {
2021
[FEATURE_FLAG.hasComputeAccess]: z.coerce.boolean(),
2122
[FEATURE_FLAG.hasPrivateConnections]: z.coerce.boolean(),
2223
[FEATURE_FLAG.mollifierEnabled]: z.coerce.boolean(),
24+
[FEATURE_FLAG.workerQueueScheduledSplitEnabled]: z.coerce.boolean(),
2325
};
2426

2527
export type FeatureFlagKey = keyof typeof FeatureFlagCatalog;

apps/webapp/app/v3/services/worker/workerGroupTokenService.server.ts

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import {
1010
TaskRunExecutionResult,
1111
} from "@trigger.dev/core/v3";
1212
import { fromFriendlyId } from "@trigger.dev/core/v3/isomorphic";
13-
import { WORKER_HEADERS } from "@trigger.dev/core/v3/workers";
13+
import { WORKER_HEADERS, type WorkerQueueClass } from "@trigger.dev/core/v3/workers";
1414
import {
1515
Prisma,
1616
RuntimeEnvironment,
@@ -27,6 +27,7 @@ import { defaultMachine } from "~/services/platform.v3.server";
2727
import { singleton } from "~/utils/singleton";
2828
import { resolveVariablesForEnvironment } from "~/v3/environmentVariables/environmentVariablesRepository.server";
2929
import { machinePresetFromName } from "~/v3/machinePresets.server";
30+
import { workerQueueForClass } from "~/runEngine/concerns/workerQueueSplit.server";
3031
import { WithRunEngine, WithRunEngineOptions } from "../baseService.server";
3132

3233
const authenticatedWorkerInstanceCache = singleton(
@@ -369,10 +370,18 @@ export class AuthenticatedWorkerInstance extends WithRunEngine {
369370
});
370371
}
371372

372-
async dequeue({ runnerId }: { runnerId?: string }): Promise<DequeuedMessage[]> {
373+
async dequeue({
374+
runnerId,
375+
queueClass,
376+
}: {
377+
runnerId?: string;
378+
queueClass?: WorkerQueueClass;
379+
}): Promise<DequeuedMessage[]> {
380+
// Derive the actual queue from this worker's own masterQueue + class, so a
381+
// token can only ever reach its own region's queues (default or :scheduled).
373382
return await this._engine.dequeueFromWorkerQueue({
374383
consumerId: this.workerInstanceId,
375-
workerQueue: this.masterQueue,
384+
workerQueue: workerQueueForClass(this.masterQueue, queueClass),
376385
workerId: this.workerInstanceId,
377386
runnerId,
378387
});

apps/webapp/test/engine/triggerTask.test.ts

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,124 @@ describe("RunEngineTriggerTaskService", () => {
267267
await engine.quit();
268268
});
269269

270+
containerTest(
271+
"routes scheduled-lineage runs to a separate worker queue that dequeues independently",
272+
async ({ prisma, redisOptions }) => {
273+
const engine = new RunEngine({
274+
prisma,
275+
worker: {
276+
redis: redisOptions,
277+
workers: 1,
278+
tasksPerWorker: 10,
279+
pollIntervalMs: 100,
280+
},
281+
queue: {
282+
redis: redisOptions,
283+
// Disable the background master-queue consumers so our manual
284+
// processMasterQueueForEnvironment + dequeue calls are deterministic.
285+
masterQueueConsumersDisabled: true,
286+
processWorkerQueueDebounceMs: 50,
287+
},
288+
runLock: { redis: redisOptions },
289+
machines: {
290+
defaultMachine: "small-1x",
291+
machines: {
292+
"small-1x": {
293+
name: "small-1x" as const,
294+
cpu: 0.5,
295+
memory: 0.5,
296+
centsPerMs: 0.0001,
297+
},
298+
},
299+
baseCostInCents: 0.0005,
300+
},
301+
tracer: trace.getTracer("test", "0.0.0"),
302+
});
303+
304+
try {
305+
const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");
306+
307+
// Turn the per-org split flag on in-memory — the resolver reads this
308+
// object directly (no DB round-trip on the trigger hot path).
309+
(authenticatedEnvironment.organization as { featureFlags?: unknown }).featureFlags = {
310+
workerQueueScheduledSplitEnabled: true,
311+
};
312+
313+
const taskIdentifier = "test-task";
314+
await setupBackgroundWorker(engine, authenticatedEnvironment, taskIdentifier);
315+
316+
const triggerTaskService = new RunEngineTriggerTaskService({
317+
engine,
318+
prisma,
319+
payloadProcessor: new MockPayloadProcessor(),
320+
queueConcern: new DefaultQueueManager(prisma, engine),
321+
idempotencyKeyConcern: new IdempotencyKeyConcern(
322+
prisma,
323+
engine,
324+
new MockTraceEventConcern()
325+
),
326+
validator: new MockTriggerTaskValidator(),
327+
traceEventConcern: new MockTraceEventConcern(),
328+
tracer: trace.getTracer("test", "0.0.0"),
329+
metadataMaximumSize: 1024 * 1024 * 1,
330+
});
331+
332+
// A standard run (default triggerSource) stays on the region queue.
333+
const standardResult = await triggerTaskService.call({
334+
taskId: taskIdentifier,
335+
environment: authenticatedEnvironment,
336+
body: { payload: { kind: "standard" } },
337+
});
338+
assertNonNullable(standardResult);
339+
340+
// A scheduled run routes to the `<region>:scheduled` queue. Descendants
341+
// would too, via rootTriggerSource propagation.
342+
const scheduledResult = await triggerTaskService.call({
343+
taskId: taskIdentifier,
344+
environment: authenticatedEnvironment,
345+
body: { payload: { kind: "scheduled" } },
346+
options: { triggerSource: "schedule" },
347+
});
348+
assertNonNullable(scheduledResult);
349+
350+
const standardRun = await prisma.taskRun.findUniqueOrThrow({
351+
where: { id: standardResult.run.id },
352+
});
353+
const scheduledRun = await prisma.taskRun.findUniqueOrThrow({
354+
where: { id: scheduledResult.run.id },
355+
});
356+
357+
// Producer routing: the persisted worker queue carries the class.
358+
const baseWorkerQueue = standardRun.workerQueue;
359+
expect(scheduledRun.workerQueue).toBe(`${baseWorkerQueue}:scheduled`);
360+
361+
// Move both runs from the env queue onto their respective worker queues.
362+
await engine.runQueue.processMasterQueueForEnvironment(authenticatedEnvironment.id, 10);
363+
await setTimeout(500);
364+
365+
// Dequeue isolation: the scheduled queue yields only the scheduled run...
366+
const dequeuedScheduled = await engine.dequeueFromWorkerQueue({
367+
consumerId: "test-scheduled-consumer",
368+
workerQueue: `${baseWorkerQueue}:scheduled`,
369+
});
370+
expect(dequeuedScheduled.length).toBe(1);
371+
assertNonNullable(dequeuedScheduled[0]);
372+
expect(dequeuedScheduled[0].run.id).toBe(scheduledResult.run.id);
373+
374+
// ...and the base queue yields only the standard run.
375+
const dequeuedStandard = await engine.dequeueFromWorkerQueue({
376+
consumerId: "test-standard-consumer",
377+
workerQueue: baseWorkerQueue,
378+
});
379+
expect(dequeuedStandard.length).toBe(1);
380+
assertNonNullable(dequeuedStandard[0]);
381+
expect(dequeuedStandard[0].run.id).toBe(standardResult.run.id);
382+
} finally {
383+
await engine.quit();
384+
}
385+
}
386+
);
387+
270388
// The BatchQueue worker rebuilds body.options from Redis-stored items
271389
// (Record<string, unknown>), so the Phase-2 schema coercion doesn't apply
272390
// to in-flight items enqueued before the schema fix. The defensive

0 commit comments

Comments
 (0)