diff --git a/.server-changes/scheduled-run-region-display.md b/.server-changes/scheduled-run-region-display.md new file mode 100644 index 00000000000..dca41c4341e --- /dev/null +++ b/.server-changes/scheduled-run-region-display.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: fix +--- + +Scheduled runs now show under their correct region in the dashboard, run details, and the API, and match region filters, instead of appearing under a separate region. diff --git a/apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts b/apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts index 3f102b4f41e..00290b36203 100644 --- a/apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts @@ -15,6 +15,7 @@ import { Prisma, TaskRunAttemptStatus, TaskRunStatus } from "@trigger.dev/databa import assertNever from "assert-never"; import { API_VERSIONS, CURRENT_API_VERSION, RunStatusUnspecifiedApiVersion } from "~/api/versions"; import { $replica, prisma } from "~/db.server"; +import { baseWorkerQueue } from "~/runEngine/concerns/workerQueueSplit.server"; import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; import { findRunByIdWithMollifierFallback, @@ -519,7 +520,7 @@ async function createCommonRunStructure(run: CommonRelatedRun, apiVersion: API_V triggerFunction: resolveTriggerFunction(run), batchId: run.batch?.friendlyId, metadata, - region: run.workerQueue || undefined, + region: run.workerQueue ? baseWorkerQueue(run.workerQueue) : undefined, }; } diff --git a/apps/webapp/app/presenters/v3/NextRunListPresenter.server.ts b/apps/webapp/app/presenters/v3/NextRunListPresenter.server.ts index ffa8d5df91e..8d1b8c13883 100644 --- a/apps/webapp/app/presenters/v3/NextRunListPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/NextRunListPresenter.server.ts @@ -11,6 +11,7 @@ import { timeFilters } from "~/components/runs/v3/SharedFilters"; import { findDisplayableEnvironment } from "~/models/runtimeEnvironment.server"; import { getTaskIdentifiers } from "~/models/task.server"; import { RunsRepository } from "~/services/runsRepository/runsRepository.server"; +import { baseWorkerQueue } from "~/runEngine/concerns/workerQueueSplit.server"; import { machinePresetFromRun } from "~/v3/machinePresets.server"; import { ServiceValidationError } from "~/v3/services/baseService.server"; import { isCancellableRunStatus, isFinalRunStatus, isPendingRunStatus } from "~/v3/taskStatus"; @@ -259,7 +260,7 @@ export class NextRunListPresenter { name: run.queue.replace("task/", ""), type: run.queue.startsWith("task/") ? "task" : "custom", }, - region: run.workerQueue ? run.workerQueue : undefined, + region: run.workerQueue ? baseWorkerQueue(run.workerQueue) : undefined, taskKind: RunAnnotations.safeParse(run.annotations).data?.taskKind ?? "STANDARD", }; }), diff --git a/apps/webapp/app/presenters/v3/SpanPresenter.server.ts b/apps/webapp/app/presenters/v3/SpanPresenter.server.ts index 47ae27bd17c..bd0ac5c540a 100644 --- a/apps/webapp/app/presenters/v3/SpanPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/SpanPresenter.server.ts @@ -15,6 +15,7 @@ import { getUserProvidedIdempotencyKey, } from "@trigger.dev/core/v3/serverOnly"; import { RUNNING_STATUSES } from "~/components/runs/v3/TaskRunStatus"; +import { baseWorkerQueue } from "~/runEngine/concerns/workerQueueSplit.server"; import { logger } from "~/services/logger.server"; import { rehydrateAttribute } from "~/v3/eventRepository/eventRepository.server"; import { machinePresetFromRun } from "~/v3/machinePresets.server"; @@ -302,7 +303,7 @@ export class SpanPresenter extends BasePresenter { location: true, }, where: { - masterQueue: run.workerQueue, + masterQueue: baseWorkerQueue(run.workerQueue), }, }); diff --git a/apps/webapp/app/routes/resources.taskruns.$runParam.replay.ts b/apps/webapp/app/routes/resources.taskruns.$runParam.replay.ts index 507d3cc706f..631bd5ece52 100644 --- a/apps/webapp/app/routes/resources.taskruns.$runParam.replay.ts +++ b/apps/webapp/app/routes/resources.taskruns.$runParam.replay.ts @@ -18,6 +18,7 @@ import { type SyntheticReplayTaskRun, } from "~/v3/mollifier/syntheticReplayTaskRun.server"; import parseDuration from "parse-duration"; +import { baseWorkerQueue } from "~/runEngine/concerns/workerQueueSplit.server"; import { findCurrentWorkerDeployment } from "~/v3/models/workerDeployment.server"; import { queueTypeFromType } from "~/presenters/v3/QueueRetrievePresenter.server"; import { ReplayRunData } from "~/v3/replayTask"; @@ -209,7 +210,7 @@ export async function loader({ request, params }: LoaderFunctionArgs) { maxAttempts: run.maxAttempts, maxDurationSeconds: run.maxDurationInSeconds, machinePreset: run.machinePreset, - region: environment.type === "DEVELOPMENT" ? undefined : run.workerQueue, + region: environment.type === "DEVELOPMENT" ? undefined : baseWorkerQueue(run.workerQueue), regions: regionsResult.regions, ttlSeconds: run.ttl ? parseDuration(run.ttl, "s") ?? undefined : undefined, idempotencyKey: run.idempotencyKey, diff --git a/apps/webapp/app/runEngine/concerns/workerQueueSplit.server.ts b/apps/webapp/app/runEngine/concerns/workerQueueSplit.server.ts index 8dbea7ffdc8..af0d4a819a1 100644 --- a/apps/webapp/app/runEngine/concerns/workerQueueSplit.server.ts +++ b/apps/webapp/app/runEngine/concerns/workerQueueSplit.server.ts @@ -12,6 +12,27 @@ import { FEATURE_FLAG, FeatureFlagCatalog } from "~/v3/featureFlags"; */ export const SCHEDULED_WORKER_QUEUE_SUFFIX = ":scheduled"; +/** + * Recover the base region a worker queue belongs to by stripping any split + * suffix (e.g. `us-nyc-3:scheduled` -> `us-nyc-3`). Region/masterQueue names are + * either `` or `-` and never contain a colon, so the + * region is everything before the first `:`. Use this wherever a worker queue is + * read as a region — for display, filtering, or as a region override — so + * scheduled-split runs group under their real region instead of a phantom one. + * Idempotent; returns the input unchanged when there's no suffix. A nullish + * worker queue (e.g. from a synthetic run snapshot) passes straight through. + */ +export function baseWorkerQueue(workerQueue: string): string; +export function baseWorkerQueue(workerQueue: string | null | undefined): string | null | undefined; +export function baseWorkerQueue(workerQueue: string | null | undefined): string | null | undefined { + if (workerQueue == null) { + return workerQueue; + } + + const colon = workerQueue.indexOf(":"); + return colon === -1 ? workerQueue : workerQueue.slice(0, colon); +} + /** `TriggerSource` value used for runs originating from a schedule. */ const SCHEDULE_TRIGGER_SOURCE = "schedule"; diff --git a/apps/webapp/app/services/runsReplicationService.server.ts b/apps/webapp/app/services/runsReplicationService.server.ts index 39bfd379ecb..d6055c21b17 100644 --- a/apps/webapp/app/services/runsReplicationService.server.ts +++ b/apps/webapp/app/services/runsReplicationService.server.ts @@ -38,6 +38,7 @@ import EventEmitter from "node:events"; import pLimit from "p-limit"; import { detectBadJsonStrings } from "~/utils/detectBadJsonStrings"; import { calculateErrorFingerprint } from "~/utils/errorFingerprinting"; +import { baseWorkerQueue } from "~/runEngine/concerns/workerQueueSplit.server"; import { isClickHouseJsonParseError, parseRowNumberFromError, @@ -1121,7 +1122,7 @@ export class RunsReplicationService { event === "delete" ? 1 : 0, // _is_deleted run.concurrencyKey ?? "", // concurrency_key run.bulkActionGroupIds ?? [], // bulk_action_group_ids - run.masterQueue ?? "", // worker_queue + baseWorkerQueue(run.masterQueue ?? ""), // worker_queue (region; strip any split suffix like `:scheduled`) run.maxDurationInSeconds ?? null, // max_duration_in_seconds annotations?.triggerSource ?? "", // trigger_source annotations?.rootTriggerSource ?? "", // root_trigger_source diff --git a/apps/webapp/app/v3/services/replayTaskRun.server.ts b/apps/webapp/app/v3/services/replayTaskRun.server.ts index 836611b3610..7975694b5e4 100644 --- a/apps/webapp/app/v3/services/replayTaskRun.server.ts +++ b/apps/webapp/app/v3/services/replayTaskRun.server.ts @@ -6,6 +6,7 @@ import { } from "@trigger.dev/core/v3"; import { type TaskRun } from "@trigger.dev/database"; import { findEnvironmentById } from "~/models/runtimeEnvironment.server"; +import { baseWorkerQueue } from "~/runEngine/concerns/workerQueueSplit.server"; import { logger } from "~/services/logger.server"; import { BaseService } from "./baseService.server"; import { OutOfEntitlementError, TriggerTaskService } from "./triggerTask.server"; @@ -65,7 +66,9 @@ export class ReplayTaskRunService extends BaseService { existingTaskRun.engine === "V1" || existingEnvironment.type === "DEVELOPMENT" || authenticatedEnvironment.type === "DEVELOPMENT"; - const region = ignoreRegion ? undefined : overrideOptions.region ?? existingTaskRun.workerQueue; + const region = ignoreRegion + ? undefined + : overrideOptions.region ?? baseWorkerQueue(existingTaskRun.workerQueue); try { const taskQueue = await this._prisma.taskQueue.findFirst({ diff --git a/apps/webapp/test/workerQueueSplit.test.ts b/apps/webapp/test/workerQueueSplit.test.ts index 6386fa6e449..b80af06f9b9 100644 --- a/apps/webapp/test/workerQueueSplit.test.ts +++ b/apps/webapp/test/workerQueueSplit.test.ts @@ -1,5 +1,6 @@ import { describe, expect, it } from "vitest"; import { + baseWorkerQueue, resolveScheduledQueueSplitEnabled, workerQueueForRun, workerQueueForClass, @@ -98,6 +99,47 @@ describe("workerQueueForRun", () => { }); }); +describe("baseWorkerQueue", () => { + const region = "us-nyc-3"; + const scheduled = `${region}${SCHEDULED_WORKER_QUEUE_SUFFIX}`; + + it("strips the scheduled split suffix back to the base region", () => { + expect(baseWorkerQueue(scheduled)).toBe(region); + }); + + it("leaves a base region untouched (idempotent)", () => { + expect(baseWorkerQueue(region)).toBe(region); + expect(baseWorkerQueue(baseWorkerQueue(scheduled))).toBe(region); + }); + + it("strips any future `:` suffix, not just `:scheduled`", () => { + expect(baseWorkerQueue("us-nyc-3:priority")).toBe(region); + expect(baseWorkerQueue("us-nyc-3:a:b")).toBe(region); + }); + + it("handles the project-scoped masterQueue shape (`-`)", () => { + expect(baseWorkerQueue("proj_abc123-main:scheduled")).toBe("proj_abc123-main"); + }); + + it("returns an empty string unchanged", () => { + expect(baseWorkerQueue("")).toBe(""); + }); + + it("passes a nullish worker queue straight through (synthetic run snapshots)", () => { + expect(baseWorkerQueue(null)).toBeNull(); + expect(baseWorkerQueue(undefined)).toBeUndefined(); + }); + + it("round-trips with workerQueueForRun: the split queue strips back to the region it came from", () => { + const enqueued = workerQueueForRun({ + workerQueue: region, + rootTriggerSource: "schedule", + splitEnabled: true, + }); + expect(baseWorkerQueue(enqueued)).toBe(region); + }); +}); + describe("workerQueueForClass", () => { const region = "us-nyc-3"; const scheduled = `${region}${SCHEDULED_WORKER_QUEUE_SUFFIX}`;