Skip to content

Commit 78c82d2

Browse files
committed
perf(webapp): scale the realtime runs feed under high concurrency
Keep a single busy environment from overwhelming the realtime runs feed: - Coalesce per-environment wake notifications to a bounded rate, so a high-throughput environment wakes its subscribers at a steady cap instead of once per run change. - Hold a multi-run live poll open on an empty result instead of returning an immediate up-to-date, cutting wasted round-trips when a wake does not match a subscriber's filter. - Support Redis Cluster sharded pub/sub (SSUBSCRIBE/SPUBLISH) so the feed's pub/sub scales horizontally across shards by environment/run. All behind the existing feature flag and tunable env vars. Bumps ioredis to 5.6.x across the workspace (required for cluster sharded pub/sub).
1 parent 08c80c6 commit 78c82d2

12 files changed

Lines changed: 490 additions & 77 deletions

apps/supervisor/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
"@kubernetes/client-node": "^1.0.0",
1919
"@trigger.dev/core": "workspace:*",
2020
"dockerode": "^4.0.6",
21-
"ioredis": "^5.3.2",
21+
"ioredis": "~5.6.0",
2222
"p-limit": "^6.2.0",
2323
"prom-client": "^15.1.0",
2424
"socket.io": "4.7.4",

apps/webapp/app/env.server.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -324,6 +324,15 @@ const EnvironmentSchema = z
324324
// hydrate cache entry. Floored, so the window only ever widens by < bucket. 0
325325
// disables bucketing (each feed keeps its exact lower bound).
326326
REALTIME_NOTIFIER_RUNSET_CREATED_AT_BUCKET_MS: z.coerce.number().int().default(60_000),
327+
// Leading-edge throttle (ms) on the per-env wake channel: a busy env's run-change
328+
// firehose is collapsed to at most one feed-wake per window, decoupling wake load
329+
// from run throughput. Lossless because consumers refetch current state on a wake.
330+
// 0 disables coalescing (every change wakes immediately).
331+
REALTIME_NOTIFIER_ENV_WAKE_COALESCE_WINDOW_MS: z.coerce.number().int().default(100),
332+
// When "1", a multi-run live poll woken by a change irrelevant to its filter keeps
333+
// holding the long-poll (re-resolving cheaply) instead of returning an empty
334+
// up-to-date the client would immediately re-issue. "0" reverts to per-wake replies.
335+
REALTIME_NOTIFIER_HOLD_ON_EMPTY: z.string().default("1"),
327336

328337
PUBSUB_REDIS_HOST: z
329338
.string()
@@ -387,6 +396,10 @@ const EnvironmentSchema = z
387396
REALTIME_RUNS_PUBSUB_REDIS_CLUSTER_MODE_ENABLED: z
388397
.string()
389398
.default(process.env.PUBSUB_REDIS_CLUSTER_MODE_ENABLED ?? "0"),
399+
// Use sharded pub/sub (SSUBSCRIBE/SPUBLISH) when in cluster mode, so a busy env's
400+
// traffic stays on one shard instead of broadcasting to every node. Only takes
401+
// effect alongside CLUSTER_MODE_ENABLED. "0" forces classic pub/sub on the cluster.
402+
REALTIME_RUNS_PUBSUB_REDIS_SHARDED_ENABLED: z.string().default("1"),
390403

391404
DEFAULT_ENV_EXECUTION_CONCURRENCY_LIMIT: z.coerce.number().int().default(100),
392405
DEFAULT_ENV_EXECUTION_CONCURRENCY_BURST_FACTOR: z.coerce.number().default(1.0),

apps/webapp/app/services/realtime/notifierRealtimeClient.server.ts

Lines changed: 104 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,11 @@ export type NotifierRealtimeClientOptions = {
9191
* same-tag feeds pinned within the same bucket share a cache entry. Defaults to
9292
* 60000. 0 disables bucketing. */
9393
runSetCreatedAtBucketMs?: number;
94+
/** When true (default), a multi-run live poll woken by a change irrelevant to its
95+
* filter keeps holding the long-poll (re-resolving cheaply on each wake) instead of
96+
* returning an empty up-to-date the client would immediately re-issue. The empty
97+
* response is the dominant cost under a busy per-env wake channel. */
98+
holdOnEmpty?: boolean;
9499
/** Observability hook: why a live request woke (notify vs timeout vs abort). */
95100
onWakeup?: (reason: WakeupReason) => void;
96101
/** Observability hook: whether a multi-run resolve hit the cache, coalesced onto
@@ -417,55 +422,93 @@ export class NotifierRealtimeClient implements RealtimeStreamClient {
417422
signal: AbortSignal | undefined
418423
): Promise<Response> {
419424
return this.#withConcurrencySlot(environment, async () => {
420-
// One env-scoped subscription per feed (not one per run): any run change in
421-
// the env wakes us, then we re-resolve the filter.
422-
const reason = await this.#waitForEnvChange(environment.id, signal);
423-
this.options.onWakeup?.(reason);
424-
425-
const cached = this.#workingSetCache.get(handle);
426425
const offsetFloorMs = parseOffsetUpdatedAtMs(offset);
427-
const seq = this.#nextSeq();
426+
// Total time to hold this long-poll, jittered to avoid synchronized refetch herds.
427+
const deadline = Date.now() + this.#jitteredTimeout();
428+
const holdOnEmpty = this.options.holdOnEmpty ?? true;
429+
430+
// Working set we diff against: seeded from the cache (or the offset floor on a
431+
// miss) and advanced on each refetch within this held request.
432+
let prevSeen = this.#workingSetCache.get(handle);
433+
434+
// The per-env channel wakes this feed on ANY run change in the environment, but
435+
// most changes don't match this feed's filter. Rather than return an empty
436+
// up-to-date the client would immediately re-issue (the dominant cost under a
437+
// busy env), we hold the connection and only respond when THIS feed has a real
438+
// delta or the backstop elapses. Each wake re-resolves via the coalesced +
439+
// short-TTL cache, so an env-wide wake never fans out into per-feed CH+PG queries.
440+
while (true) {
441+
const remaining = deadline - Date.now();
442+
// One env-scoped subscription per wait (not one per run); re-subscribed each
443+
// loop until a relevant delta or the budget runs out.
444+
const reason =
445+
remaining > 0 ? await this.#waitForEnvChange(environment.id, signal, remaining) : "timeout";
446+
this.options.onWakeup?.(reason);
447+
448+
if (reason === "abort") {
449+
// Client disconnected; the response is discarded. Skip the refetch.
450+
return this.#buildResponse(buildUpToDateBody(), apiVersion, clientVersion, {
451+
offset,
452+
handle,
453+
cursor: String(this.#nextSeq()),
454+
});
455+
}
428456

429-
// ClickHouse resolves the (possibly stale) membership; Postgres hydrates the
430-
// authoritative current rows, so status is always fresh even if CH lags. The
431-
// resolve+hydrate is coalesced + short-TTL cached so a single env-wide wake
432-
// doesn't fan out into one CH+PG query per concurrent same-filter feed.
433-
const rows = await this.#resolveAndHydrate(environment, filter, skipColumns);
434-
435-
// Diff against what the client already has, using the hydrated updatedAt:
436-
// cache hit => per-row (new = insert, advanced = update); miss => anything
437-
// newer than the offset floor as a merge-safe update.
438-
const changes: RowChange[] = [];
439-
const seen: WorkingSet = new Map();
440-
let maxUpdatedAt = offsetFloorMs;
441-
for (const row of rows) {
442-
const updatedAtMs = row.updatedAt.getTime();
443-
seen.set(row.id, updatedAtMs);
444-
maxUpdatedAt = Math.max(maxUpdatedAt, updatedAtMs);
445-
446-
if (cached) {
447-
const prior = cached.get(row.id);
448-
if (prior === undefined) {
449-
changes.push({ row, operation: "insert" });
450-
} else if (updatedAtMs > prior) {
457+
// ClickHouse resolves the (possibly stale) membership; Postgres hydrates the
458+
// authoritative current rows, so status is always fresh even if CH lags. We
459+
// refetch on every wake AND on the final timeout, so a wake missed during the
460+
// brief re-subscribe gap is still caught by the backstop.
461+
const rows = await this.#resolveAndHydrate(environment, filter, skipColumns);
462+
463+
// Diff against what the client already has, using the hydrated updatedAt:
464+
// prior working set => per-row (new = insert, advanced = update); miss =>
465+
// anything newer than the offset floor as a merge-safe update.
466+
const changes: RowChange[] = [];
467+
const seen: WorkingSet = new Map();
468+
let maxUpdatedAt = offsetFloorMs;
469+
for (const row of rows) {
470+
const updatedAtMs = row.updatedAt.getTime();
471+
seen.set(row.id, updatedAtMs);
472+
maxUpdatedAt = Math.max(maxUpdatedAt, updatedAtMs);
473+
474+
if (prevSeen) {
475+
const prior = prevSeen.get(row.id);
476+
if (prior === undefined) {
477+
changes.push({ row, operation: "insert" });
478+
} else if (updatedAtMs > prior) {
479+
changes.push({ row, operation: "update" });
480+
}
481+
} else if (updatedAtMs > offsetFloorMs) {
451482
changes.push({ row, operation: "update" });
452483
}
453-
} else if (updatedAtMs > offsetFloorMs) {
454-
changes.push({ row, operation: "update" });
455484
}
456-
}
457-
458-
// Refresh the working set so runs that left the filter stop being tracked
459-
// (the client keeps showing them; the SDK never applies deletes).
460-
this.#workingSetCache.set(handle, seen);
461485

462-
const body = changes.length === 0 ? buildUpToDateBody() : buildRowsBody(changes, skipColumns);
486+
// Refresh the working set so runs that left the filter stop being tracked
487+
// (the client keeps showing them; the SDK never applies deletes).
488+
this.#workingSetCache.set(handle, seen);
489+
prevSeen = seen;
490+
491+
if (changes.length > 0) {
492+
const seq = this.#nextSeq();
493+
return this.#buildResponse(buildRowsBody(changes, skipColumns), apiVersion, clientVersion, {
494+
offset: encodeOffset(maxUpdatedAt, seq),
495+
handle,
496+
cursor: String(seq),
497+
});
498+
}
463499

464-
return this.#buildResponse(body, apiVersion, clientVersion, {
465-
offset: encodeOffset(maxUpdatedAt, seq),
466-
handle,
467-
cursor: String(seq),
468-
});
500+
// Empty diff. With hold-on-empty (default) keep waiting until a real delta or
501+
// the budget elapses; otherwise fall back to the legacy per-wake up-to-date.
502+
if (reason === "timeout" || !holdOnEmpty) {
503+
const seq = this.#nextSeq();
504+
return this.#buildResponse(buildUpToDateBody(), apiVersion, clientVersion, {
505+
offset: encodeOffset(maxUpdatedAt, seq),
506+
handle,
507+
cursor: String(seq),
508+
});
509+
}
510+
// reason === "notify" with an empty diff: keep holding (loop, re-subscribe).
511+
}
469512
});
470513
}
471514

@@ -654,21 +697,33 @@ export class NotifierRealtimeClient implements RealtimeStreamClient {
654697
}
655698
}
656699

657-
#waitForChange(runId: string, signal?: AbortSignal): Promise<WakeupReason> {
658-
return this.#waitForSubscription(this.options.notifier.subscribeToRunChanges(runId), signal);
700+
#waitForChange(runId: string, signal?: AbortSignal, timeoutMs?: number): Promise<WakeupReason> {
701+
return this.#waitForSubscription(
702+
this.options.notifier.subscribeToRunChanges(runId),
703+
signal,
704+
timeoutMs
705+
);
659706
}
660707

661-
#waitForEnvChange(environmentId: string, signal?: AbortSignal): Promise<WakeupReason> {
708+
#waitForEnvChange(
709+
environmentId: string,
710+
signal?: AbortSignal,
711+
timeoutMs?: number
712+
): Promise<WakeupReason> {
662713
return this.#waitForSubscription(
663714
this.options.notifier.subscribeToEnvChanges(environmentId),
664-
signal
715+
signal,
716+
timeoutMs
665717
);
666718
}
667719

668-
/** Race a notifier subscription against the backstop timeout and the abort signal. */
720+
/** Race a notifier subscription against a timeout (the jittered backstop by default,
721+
* or an explicit remaining budget when a live request holds across wakes) and the
722+
* abort signal. */
669723
async #waitForSubscription(
670724
subscription: RunChangeSubscription,
671-
signal?: AbortSignal
725+
signal?: AbortSignal,
726+
timeoutMs?: number
672727
): Promise<WakeupReason> {
673728
if (signal?.aborted) {
674729
subscription.unsubscribe();
@@ -682,7 +737,7 @@ export class NotifierRealtimeClient implements RealtimeStreamClient {
682737
return await new Promise<WakeupReason>((resolve) => {
683738
subscription.changed.then(() => resolve("notify")).catch(() => resolve("timeout"));
684739

685-
timer = setTimeout(() => resolve("timeout"), this.#jitteredTimeout());
740+
timer = setTimeout(() => resolve("timeout"), timeoutMs ?? this.#jitteredTimeout());
686741

687742
if (signal) {
688743
onAbort = () => resolve("abort");

apps/webapp/app/services/realtime/notifierRealtimeClientInstance.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ function initializeNotifierRealtimeClient(): NotifierRealtimeClient {
7777
runSetResolveCacheMaxEntries: env.REALTIME_NOTIFIER_RUNSET_CACHE_MAX_ENTRIES,
7878
listCacheMaxEntries: env.REALTIME_NOTIFIER_WORKING_SET_MAX_ENTRIES,
7979
runSetCreatedAtBucketMs: env.REALTIME_NOTIFIER_RUNSET_CREATED_AT_BUCKET_MS,
80+
holdOnEmpty: env.REALTIME_NOTIFIER_HOLD_ON_EMPTY === "1",
8081
onWakeup: (reason) => wakeups.inc({ reason }),
8182
onRunSetResolve: (result) => runSetResolves.inc({ result }),
8283
onRunSetQuery: (stage, ms) => runSetQueryMs.observe({ stage }, ms),

0 commit comments

Comments
 (0)