Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .server-changes/realtime-runs-subscription-scalability.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: feature
---

Add a new backend for the realtime runs feed (single runs, tags, and batches) that scales under high concurrency, available behind a feature flag
2 changes: 1 addition & 1 deletion apps/supervisor/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
"@kubernetes/client-node": "^1.0.0",
"@trigger.dev/core": "workspace:*",
"dockerode": "^4.0.6",
"ioredis": "^5.3.2",
"ioredis": "~5.6.0",
"p-limit": "^6.2.0",
"prom-client": "^15.1.0",
"socket.io": "4.7.4",
Expand Down
4 changes: 4 additions & 0 deletions apps/webapp/app/entry.server.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import {
registerRunEngineEventBusHandlers,
setupBatchQueueCallbacks,
} from "./v3/runEngineHandlers.server";
import { registerRunChangeNotifierHandlers } from "./services/realtime/runChangeNotifierHandlers.server";
// Touch the sessions replication singleton at entry so it boots deterministically
// on webapp startup. The singleton's initializer wires start (gated on
// `clickhouseFactory.isReady()`) and SIGTERM/SIGINT shutdown — mirrors
Expand Down Expand Up @@ -269,6 +270,9 @@ process.on("uncaughtException", (error, origin) => {

singleton("RunEngineEventBusHandlers", registerRunEngineEventBusHandlers);
singleton("SetupBatchQueueCallbacks", setupBatchQueueCallbacks);
// Attach the run-changed notifier delegations to the engine event bus.
// No-ops (registers nothing) unless REALTIME_NOTIFIER_ENABLED=1.
singleton("RunChangeNotifierHandlers", registerRunChangeNotifierHandlers);

// Wrapped in singleton() so Remix's dev-mode CJS reloads don't append
// duplicate copies of the processor — Sentry's processor list lives in
Expand Down
90 changes: 90 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,47 @@ const EnvironmentSchema = z
.int()
.default(24 * 60 * 60 * 1000), // 1 day in milliseconds

// Master switch for the notifier-backed realtime feed.
// "0" (default) = the existing realtime path serves everything, publishes are
// no-ops, and no notifier Redis connections are opened (zero-overhead off).
// "1" = run-changed signals are published and the per-org `realtimeBackend`
// feature flag selects the backend per request.
REALTIME_NOTIFIER_ENABLED: z.string().default("0"),
// Backstop wait before a live notifier request refetches the run (ms). Matches
// Electric's ~20s live long-poll hold so the client polling cadence is unchanged
// across backends (a ±15% jitter is applied per request to avoid refetch herds).
REALTIME_NOTIFIER_LIVE_POLL_TIMEOUT_MS: z.coerce.number().int().default(20_000),
// Hard cap on the tag-list snapshot size served by the notifier feed.
REALTIME_NOTIFIER_MAX_LIST_RESULTS: z.coerce.number().int().default(1_000),
// Short-TTL coalescing cache for the multi-run (tag-list/batch) resolve+hydrate.
// Concurrent same-filter feeds share one ClickHouse resolve + Postgres hydrate
// within this window, so an env-wide wake doesn't fan out into per-feed queries.
// Staleness budget: a newly-matching run is visible within ~ttl + poll interval.
REALTIME_NOTIFIER_RUNSET_CACHE_TTL_MS: z.coerce.number().int().default(1_000),
REALTIME_NOTIFIER_RUNSET_CACHE_MAX_ENTRIES: z.coerce.number().int().default(5_000),
// Cap on the per-handle working-set cache (runId -> updatedAt) the notifier keeps
// for diffing multi-run live polls.
REALTIME_NOTIFIER_WORKING_SET_MAX_ENTRIES: z.coerce.number().int().default(10_000),
// Quantize the tag-list createdAt lower bound to this epoch-aligned bucket (ms) so
// same-tag feeds that pin their window within the same bucket share one resolve+
// hydrate cache entry. Floored, so the window only ever widens by < bucket. 0
// disables bucketing (each feed keeps its exact lower bound).
REALTIME_NOTIFIER_RUNSET_CREATED_AT_BUCKET_MS: z.coerce.number().int().default(60_000),
// Leading-edge throttle (ms) on the per-env wake channel: a busy env's run-change
// firehose is collapsed to at most one feed-wake per window, decoupling wake load
// from run throughput. Lossless because consumers refetch current state on a wake.
// 0 disables coalescing (every change wakes immediately).
REALTIME_NOTIFIER_ENV_WAKE_COALESCE_WINDOW_MS: z.coerce.number().int().default(100),
// When "1", a multi-run live poll woken by a change irrelevant to its filter keeps
// holding the long-poll (re-resolving cheaply) instead of returning an empty
// up-to-date the client would immediately re-issue. "0" reverts to per-wake replies.
REALTIME_NOTIFIER_HOLD_ON_EMPTY: z.string().default("1"),
// Max concurrent fresh ClickHouse resolves (cache misses) per instance. Caps the
// distinct-filter reconnect stampede: a mass reconnect of N feeds on N different filters
// queues to this many concurrent CH queries instead of firing all N at once. Same-filter
// bursts collapse via the single-flight cache before taking a permit. 0 disables the gate.
REALTIME_NOTIFIER_RESOLVE_ADMISSION_LIMIT: z.coerce.number().int().default(16),

PUBSUB_REDIS_HOST: z
.string()
.optional()
Expand Down Expand Up @@ -332,6 +373,41 @@ const EnvironmentSchema = z
PUBSUB_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"),
PUBSUB_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"),

// Dedicated pub/sub Redis for the realtime runs feed's run-changed notifier, so
// its publish/subscribe traffic can run on its own instance. Each value falls
// back to the shared PUBSUB_REDIS_* (then REDIS_*) when unset, so the default is
// unchanged until explicitly pointed at a dedicated instance.
REALTIME_RUNS_PUBSUB_REDIS_HOST: z
.string()
.optional()
.transform((v) => v ?? process.env.PUBSUB_REDIS_HOST ?? process.env.REDIS_HOST),
REALTIME_RUNS_PUBSUB_REDIS_PORT: z.coerce
.number()
.optional()
.transform((v) => {
if (v !== undefined) return v;
const raw = process.env.PUBSUB_REDIS_PORT ?? process.env.REDIS_PORT;
return raw ? parseInt(raw) : undefined;
}),
REALTIME_RUNS_PUBSUB_REDIS_USERNAME: z
.string()
.optional()
.transform((v) => v ?? process.env.PUBSUB_REDIS_USERNAME ?? process.env.REDIS_USERNAME),
REALTIME_RUNS_PUBSUB_REDIS_PASSWORD: z
.string()
.optional()
.transform((v) => v ?? process.env.PUBSUB_REDIS_PASSWORD ?? process.env.REDIS_PASSWORD),
REALTIME_RUNS_PUBSUB_REDIS_TLS_DISABLED: z
.string()
.default(process.env.PUBSUB_REDIS_TLS_DISABLED ?? process.env.REDIS_TLS_DISABLED ?? "false"),
REALTIME_RUNS_PUBSUB_REDIS_CLUSTER_MODE_ENABLED: z
.string()
.default(process.env.PUBSUB_REDIS_CLUSTER_MODE_ENABLED ?? "0"),
// Use sharded pub/sub (SSUBSCRIBE/SPUBLISH) when in cluster mode, so a busy env's
// traffic stays on one shard instead of broadcasting to every node. Only takes
// effect alongside CLUSTER_MODE_ENABLED. "0" forces classic pub/sub on the cluster.
REALTIME_RUNS_PUBSUB_REDIS_SHARDED_ENABLED: z.string().default("1"),

DEFAULT_ENV_EXECUTION_CONCURRENCY_LIMIT: z.coerce.number().int().default(100),
DEFAULT_ENV_EXECUTION_CONCURRENCY_BURST_FACTOR: z.coerce.number().default(1.0),
DEFAULT_ORG_EXECUTION_CONCURRENCY_LIMIT: z.coerce.number().int().default(300),
Expand Down Expand Up @@ -1608,6 +1684,20 @@ const EnvironmentSchema = z
.enum(["log", "error", "warn", "info", "debug"])
.default("info"),
RUN_ENGINE_CLICKHOUSE_COMPRESSION_REQUEST: z.string().default("1"),
// ClickHouse client used by the realtime runs feed for tag/batch id resolution.
// Kept on its own URL + pool so the feed's reads can't contend with the main
// analytics client (CLICKHOUSE_URL). Falls back to the main URL when unset.
REALTIME_RUNS_CLICKHOUSE_URL: z
.string()
.optional()
.transform((v) => v ?? process.env.CLICKHOUSE_URL),
REALTIME_RUNS_CLICKHOUSE_KEEP_ALIVE_ENABLED: z.string().default("1"),
REALTIME_RUNS_CLICKHOUSE_KEEP_ALIVE_IDLE_SOCKET_TTL_MS: z.coerce.number().int().optional(),
REALTIME_RUNS_CLICKHOUSE_MAX_OPEN_CONNECTIONS: z.coerce.number().int().default(10),
REALTIME_RUNS_CLICKHOUSE_LOG_LEVEL: z
.enum(["log", "error", "warn", "info", "debug"])
.default("info"),
REALTIME_RUNS_CLICKHOUSE_COMPRESSION_REQUEST: z.string().default("1"),
EVENTS_CLICKHOUSE_BATCH_SIZE: z.coerce.number().int().default(1000),
EVENTS_CLICKHOUSE_FLUSH_INTERVAL_MS: z.coerce.number().int().default(1000),
METRICS_CLICKHOUSE_BATCH_SIZE: z.coerce.number().int().default(10000),
Expand Down
21 changes: 19 additions & 2 deletions apps/webapp/app/models/runtimeEnvironment.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -237,10 +237,20 @@ export async function findEnvironmentBySlug(
return environment ? toAuthenticated(environment) : null;
}

// The authenticated environment plus the run scalars the realtime publish needs.
// Both come from one taskRun read — see findEnvironmentFromRun.
export type EnvironmentFromRun = {
environment: AuthenticatedEnvironment;
runTags: string[];
batchId: string | null;
};

export async function findEnvironmentFromRun(
runId: string,
tx?: PrismaClientOrTransaction
): Promise<AuthenticatedEnvironment | null> {
): Promise<EnvironmentFromRun | null> {
// The include (no select) already pulls every taskRun scalar, so runTags/batchId
// ride along for free — no extra query for the realtime publish to send a full record.
const taskRun = await (tx ?? $replica).taskRun.findFirst({
where: {
id: runId,
Expand All @@ -249,7 +259,14 @@ export async function findEnvironmentFromRun(
runtimeEnvironment: { include: authIncludeBase },
},
});
return taskRun?.runtimeEnvironment ? toAuthenticated(taskRun.runtimeEnvironment) : null;
if (!taskRun?.runtimeEnvironment) {
return null;
}
return {
environment: toAuthenticated(taskRun.runtimeEnvironment),
runTags: taskRun.runTags,
batchId: taskRun.batchId,
};
}

export async function createNewSession(
Expand Down
6 changes: 5 additions & 1 deletion apps/webapp/app/routes/api.v1.runs.$runId.metadata.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import type { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { authenticateApiRequest } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
import { updateMetadataService } from "~/services/metadata/updateMetadataInstance.server";
import { publishChangeRecord } from "~/services/realtime/runChangeNotifierInstance.server";
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
import { ServiceValidationError } from "~/v3/services/common.server";
import { applyMetadataMutationToBufferedRun } from "~/v3/mollifier/applyMetadataMutation.server";
Expand Down Expand Up @@ -184,7 +185,10 @@ const { action } = createActionApiRoute(
return json({ error: "Internal Server Error" }, { status: 500 });
}
if (pgResult) {
return json(pgResult, { status: 200 });
// Reflect metadata.set() on a live feed before the next lifecycle event. Publish the
// internal id (the router keys single-run feeds by it, not the friendly id from the URL).
publishChangeRecord({ runId: pgResult.runId, envId: env.id });
return json({ metadata: pgResult.metadata }, { status: 200 });
}

// PG miss. Target run is either buffered or genuinely absent.
Expand Down
8 changes: 8 additions & 0 deletions apps/webapp/app/routes/api.v1.runs.$runId.tags.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { MAX_TAGS_PER_RUN } from "~/models/taskRunTag.server";
import { authenticateApiRequest } from "~/services/apiAuth.server";
import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server";
import { logger } from "~/services/logger.server";
import { publishChangeRecord } from "~/services/realtime/runChangeNotifierInstance.server";
import { mutateWithFallback } from "~/v3/mollifier/mutateWithFallback.server";

// Pull the existing tags out of a buffer entry's serialised payload so
Expand Down Expand Up @@ -90,6 +91,13 @@ export async function action({ request, params }: ActionFunctionArgs) {
},
data: { runTags: { push: newTags } },
});
// Publish a run-changed record with the NEW tag set so tag feeds reindex
// (no-op unless enabled).
publishChangeRecord({
runId: taskRun.id,
envId: env.id,
tags: existing.concat(newTags),
});
return json({ message: `Successfully set ${newTags.length} new tags.` }, { status: 200 });
},
// Buffer-applied patch path. The mutateSnapshot Lua deduplicates
Expand Down
8 changes: 6 additions & 2 deletions apps/webapp/app/routes/realtime.v1.batches.$batchId.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { z } from "zod";
import { $replica } from "~/db.server";
import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server";
import { realtimeClient } from "~/services/realtimeClientGlobal.server";
import { resolveRealtimeStreamClient } from "~/services/realtime/resolveRealtimeStreamClient.server";
import { anyResource, createLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server";

const ParamsSchema = z.object({
Expand Down Expand Up @@ -33,7 +33,11 @@ export const loader = createLoaderApiRoute(
},
},
async ({ authentication, request, resource: batchRun, apiVersion }) => {
return realtimeClient.streamBatch(
// Pick the Electric proxy or the notifier-backed batch feed
// per org (defaults to Electric). Both implement streamBatch.
const client = await resolveRealtimeStreamClient(authentication.environment);

return client.streamBatch(
request.url,
authentication.environment,
batchRun.id,
Expand Down
9 changes: 7 additions & 2 deletions apps/webapp/app/routes/realtime.v1.runs.$runId.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { json } from "@remix-run/server-runtime";
import { z } from "zod";
import { $replica } from "~/db.server";
import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server";
import { realtimeClient } from "~/services/realtimeClientGlobal.server";
import { resolveRealtimeStreamClient } from "~/services/realtime/resolveRealtimeStreamClient.server";
import {
anyResource,
createLoaderApiRoute,
Expand Down Expand Up @@ -48,7 +48,12 @@ export const loader = createLoaderApiRoute(
},
},
async ({ authentication, request, resource: run, apiVersion }) => {
return realtimeClient.streamRun(
// Pick the Electric proxy or the notifier-backed shim per org (defaults to
// Electric; controlled by REALTIME_NOTIFIER_ENABLED + the realtimeBackend
// feature flag). Both implement the same streamRun contract.
const client = await resolveRealtimeStreamClient(authentication.environment);

return client.streamRun(
request.url,
authentication.environment,
run.id,
Expand Down
8 changes: 6 additions & 2 deletions apps/webapp/app/routes/realtime.v1.runs.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { z } from "zod";
import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server";
import { realtimeClient } from "~/services/realtimeClientGlobal.server";
import { resolveRealtimeStreamClient } from "~/services/realtime/resolveRealtimeStreamClient.server";
import {
anyResource,
createLoaderApiRoute,
Expand Down Expand Up @@ -39,7 +39,11 @@ export const loader = createLoaderApiRoute(
},
},
async ({ searchParams, authentication, request, apiVersion }) => {
return realtimeClient.streamRuns(
// Pick the Electric proxy or the notifier-backed tag-list feed per org
// (defaults to Electric). Both implement streamRuns.
const client = await resolveRealtimeStreamClient(authentication.environment);

return client.streamRuns(
request.url,
authentication.environment,
searchParams,
Expand Down
49 changes: 48 additions & 1 deletion apps/webapp/app/services/clickhouse/clickhouseFactory.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,36 @@ function initializeRunEngineClickhouseClient(): ClickHouse {
});
}

/** Realtime runs feed tag/batch id resolution (`REALTIME_RUNS_CLICKHOUSE_URL`);
* falls back to the default client if unset. */
const defaultRealtimeClickhouseClient = singleton(
"realtimeClickhouseClient",
initializeRealtimeClickhouseClient
);

function initializeRealtimeClickhouseClient(): ClickHouse {
if (!env.REALTIME_RUNS_CLICKHOUSE_URL) {
return defaultClickhouseClient;
}

const url = new URL(env.REALTIME_RUNS_CLICKHOUSE_URL);
url.searchParams.delete("secure");

return new ClickHouse({
url: url.toString(),
name: "realtime-runs-clickhouse",
keepAlive: {
enabled: env.REALTIME_RUNS_CLICKHOUSE_KEEP_ALIVE_ENABLED === "1",
idleSocketTtl: env.REALTIME_RUNS_CLICKHOUSE_KEEP_ALIVE_IDLE_SOCKET_TTL_MS,
},
logLevel: env.REALTIME_RUNS_CLICKHOUSE_LOG_LEVEL,
compression: {
request: env.REALTIME_RUNS_CLICKHOUSE_COMPRESSION_REQUEST === "1",
},
maxOpenConnections: env.REALTIME_RUNS_CLICKHOUSE_MAX_OPEN_CONNECTIONS,
});
}

/** Task events (`EVENTS_CLICKHOUSE_URL`); not exported — accessed via factory. */
const defaultEventsClickhouseClient = singleton(
"eventsClickhouseClient",
Expand Down Expand Up @@ -257,7 +287,8 @@ export type ClientType =
| "logs"
| "query"
| "admin"
| "engine";
| "engine"
| "realtime";

function buildOrgClickhouseClient(url: string, clientType: ClientType): ClickHouse {
const parsed = new URL(url);
Expand Down Expand Up @@ -330,6 +361,20 @@ function buildOrgClickhouseClient(url: string, clientType: ClientType): ClickHou
},
maxOpenConnections: env.RUN_ENGINE_CLICKHOUSE_MAX_OPEN_CONNECTIONS,
});
case "realtime":
return new ClickHouse({
url: parsed.toString(),
name,
keepAlive: {
enabled: env.REALTIME_RUNS_CLICKHOUSE_KEEP_ALIVE_ENABLED === "1",
idleSocketTtl: env.REALTIME_RUNS_CLICKHOUSE_KEEP_ALIVE_IDLE_SOCKET_TTL_MS,
},
logLevel: env.REALTIME_RUNS_CLICKHOUSE_LOG_LEVEL,
compression: {
request: env.REALTIME_RUNS_CLICKHOUSE_COMPRESSION_REQUEST === "1",
},
maxOpenConnections: env.REALTIME_RUNS_CLICKHOUSE_MAX_OPEN_CONNECTIONS,
});
case "standard":
case "query":
case "admin":
Expand Down Expand Up @@ -398,6 +443,8 @@ export class ClickhouseFactory {
return defaultAdminClickhouseClient;
case "engine":
return defaultRunEngineClickhouseClient;
case "realtime":
return defaultRealtimeClickhouseClient;
}
}

Expand Down
2 changes: 2 additions & 0 deletions apps/webapp/app/services/metadata/updateMetadata.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,8 @@ export class UpdateMetadataService {

return {
metadata: newMetadata,
// Internal id, so callers can publish realtime records keyed how the router indexes feeds.
runId: taskRun.id,
};
}

Expand Down
Loading
Loading