Skip to content

Commit 94d9554

Browse files
committed
refactor(webapp,run-engine): route the realtime runs feed through one per-env change channel
Rework the new realtime runs backend (still behind its feature flag, with the existing backend as the default) so every feed is a predicate over one self-describing change record, published once per run change to a single per-environment channel. A per-instance router indexes the currently-held feeds by run, tag, and batch. On a change it hydrates the affected runs once and serializes them once, then fans the result to every matching feed, so one hot shared tag costs a single query and serialize no matter how many feeds watch it. Newly triggered runs surface immediately (the run-created event now carries tags and batch) rather than waiting for a status change. An admission gate bounds how many cold ClickHouse resolves run concurrently, so a mass reconnect across distinct filters queues instead of stampeding the database. Live long-polls hold for about 20 seconds to match the existing backend cadence.
1 parent 812a881 commit 94d9554

24 files changed

Lines changed: 1648 additions & 677 deletions

apps/webapp/app/env.server.ts

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -306,8 +306,10 @@ const EnvironmentSchema = z
306306
// "1" = run-changed signals are published and the per-org `realtimeBackend`
307307
// feature flag selects the backend per request.
308308
REALTIME_NOTIFIER_ENABLED: z.string().default("0"),
309-
// Backstop wait before a live notifier request refetches the run (ms).
310-
REALTIME_NOTIFIER_LIVE_POLL_TIMEOUT_MS: z.coerce.number().int().default(5_000),
309+
// Backstop wait before a live notifier request refetches the run (ms). Matches
310+
// Electric's ~20s live long-poll hold so the client polling cadence is unchanged
311+
// across backends (a ±15% jitter is applied per request to avoid refetch herds).
312+
REALTIME_NOTIFIER_LIVE_POLL_TIMEOUT_MS: z.coerce.number().int().default(20_000),
311313
// Hard cap on the tag-list snapshot size served by the notifier feed.
312314
REALTIME_NOTIFIER_MAX_LIST_RESULTS: z.coerce.number().int().default(1_000),
313315
// Short-TTL coalescing cache for the multi-run (tag-list/batch) resolve+hydrate.
@@ -333,6 +335,11 @@ const EnvironmentSchema = z
333335
// holding the long-poll (re-resolving cheaply) instead of returning an empty
334336
// up-to-date the client would immediately re-issue. "0" reverts to per-wake replies.
335337
REALTIME_NOTIFIER_HOLD_ON_EMPTY: z.string().default("1"),
338+
// Max concurrent fresh ClickHouse resolves (cache misses) per instance. Caps the
339+
// distinct-filter reconnect stampede: a mass reconnect of N feeds on N different filters
340+
// queues to this many concurrent CH queries instead of firing all N at once. Same-filter
341+
// bursts collapse via the single-flight cache before taking a permit. 0 disables the gate.
342+
REALTIME_NOTIFIER_RESOLVE_ADMISSION_LIMIT: z.coerce.number().int().default(16),
336343

337344
PUBSUB_REDIS_HOST: z
338345
.string()

apps/webapp/app/models/runtimeEnvironment.server.ts

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -237,10 +237,20 @@ export async function findEnvironmentBySlug(
237237
return environment ? toAuthenticated(environment) : null;
238238
}
239239

240+
// The authenticated environment plus the run scalars the realtime publish needs.
241+
// Both come from one taskRun read — see findEnvironmentFromRun.
242+
export type EnvironmentFromRun = {
243+
environment: AuthenticatedEnvironment;
244+
runTags: string[];
245+
batchId: string | null;
246+
};
247+
240248
export async function findEnvironmentFromRun(
241249
runId: string,
242250
tx?: PrismaClientOrTransaction
243-
): Promise<AuthenticatedEnvironment | null> {
251+
): Promise<EnvironmentFromRun | null> {
252+
// The include (no select) already pulls every taskRun scalar, so runTags/batchId
253+
// ride along for free — no extra query for the realtime publish to send a full record.
244254
const taskRun = await (tx ?? $replica).taskRun.findFirst({
245255
where: {
246256
id: runId,
@@ -249,7 +259,14 @@ export async function findEnvironmentFromRun(
249259
runtimeEnvironment: { include: authIncludeBase },
250260
},
251261
});
252-
return taskRun?.runtimeEnvironment ? toAuthenticated(taskRun.runtimeEnvironment) : null;
262+
if (!taskRun?.runtimeEnvironment) {
263+
return null;
264+
}
265+
return {
266+
environment: toAuthenticated(taskRun.runtimeEnvironment),
267+
runTags: taskRun.runTags,
268+
batchId: taskRun.batchId,
269+
};
253270
}
254271

255272
export async function createNewSession(

apps/webapp/app/routes/api.v1.runs.$runId.metadata.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import type { AuthenticatedEnvironment } from "~/services/apiAuth.server";
1212
import { authenticateApiRequest } from "~/services/apiAuth.server";
1313
import { logger } from "~/services/logger.server";
1414
import { updateMetadataService } from "~/services/metadata/updateMetadataInstance.server";
15+
import { publishChangeRecord } from "~/services/realtime/runChangeNotifierInstance.server";
1516
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
1617
import { ServiceValidationError } from "~/v3/services/common.server";
1718
import { applyMetadataMutationToBufferedRun } from "~/v3/mollifier/applyMetadataMutation.server";
@@ -184,6 +185,11 @@ const { action } = createActionApiRoute(
184185
return json({ error: "Internal Server Error" }, { status: 500 });
185186
}
186187
if (pgResult) {
188+
// Mid-run metadata flush succeeded: publish a run-changed record so a live single-run
189+
// feed reflects metadata.set() without waiting for the next lifecycle event (this
190+
// path doesn't otherwise touch the engine event bus). envId is free; partial record,
191+
// matched by runId. No-op when disabled.
192+
publishChangeRecord({ runId, envId: env.id });
187193
return json(pgResult, { status: 200 });
188194
}
189195

apps/webapp/app/routes/api.v1.runs.$runId.tags.ts

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import { MAX_TAGS_PER_RUN } from "~/models/taskRunTag.server";
77
import { authenticateApiRequest } from "~/services/apiAuth.server";
88
import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server";
99
import { logger } from "~/services/logger.server";
10-
import { publishRunChanged } from "~/services/realtime/runChangeNotifierInstance.server";
10+
import { publishChangeRecord } from "~/services/realtime/runChangeNotifierInstance.server";
1111
import { mutateWithFallback } from "~/v3/mollifier/mutateWithFallback.server";
1212

1313
// Pull the existing tags out of a buffer entry's serialised payload so
@@ -91,8 +91,13 @@ export async function action({ request, params }: ActionFunctionArgs) {
9191
},
9292
data: { runTags: { push: newTags } },
9393
});
94-
// Delegate a run-changed notify (no-op unless enabled).
95-
publishRunChanged({ runId: taskRun.id, environmentId: env.id });
94+
// Publish a run-changed record with the NEW tag set so tag feeds reindex
95+
// (no-op unless enabled).
96+
publishChangeRecord({
97+
runId: taskRun.id,
98+
envId: env.id,
99+
tags: existing.concat(newTags),
100+
});
96101
return json({ message: `Successfully set ${newTags.length} new tags.` }, { status: 200 });
97102
},
98103
// Buffer-applied patch path. The mutateSnapshot Lua deduplicates

apps/webapp/app/services/realtime/electricStreamProtocol.server.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,26 @@ export function buildRowsBody(changes: RowChange[], skipColumns: string[] = []):
273273
return JSON.stringify(messages);
274274
}
275275

276+
/** A row change whose wire `value` was already serialized (once, shared across feeds by
277+
* the EnvChangeRouter); the per-feed `operation` is applied here. */
278+
export type SerializedRowChange = {
279+
runId: string;
280+
value: Record<string, string | null>;
281+
operation: "insert" | "update";
282+
};
283+
284+
/** Like `buildRowsBody`, but from values serialized once per (runId, columnSet) upstream,
285+
* so a run matching many feeds is serialized once and reused across their bodies. */
286+
export function buildRowsBodyFromSerialized(changes: SerializedRowChange[]): string {
287+
const messages: ShapeMessage[] = changes.map((change) => ({
288+
key: runShapeKey(change.runId),
289+
value: change.value,
290+
headers: { operation: change.operation },
291+
}));
292+
messages.push(UP_TO_DATE);
293+
return JSON.stringify(messages);
294+
}
295+
276296
export const INITIAL_OFFSET = "-1";
277297

278298
/**

0 commit comments

Comments
 (0)