Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .server-changes/scheduled-run-region-display.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: fix
---

Scheduled runs now show under their correct region in the dashboard, run details, and the API, and match region filters, instead of appearing under a separate region.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import { Prisma, TaskRunAttemptStatus, TaskRunStatus } from "@trigger.dev/databa
import assertNever from "assert-never";
import { API_VERSIONS, CURRENT_API_VERSION, RunStatusUnspecifiedApiVersion } from "~/api/versions";
import { $replica, prisma } from "~/db.server";
import { baseWorkerQueue } from "~/runEngine/concerns/workerQueueSplit.server";
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import {
findRunByIdWithMollifierFallback,
Expand Down Expand Up @@ -519,7 +520,7 @@ async function createCommonRunStructure(run: CommonRelatedRun, apiVersion: API_V
triggerFunction: resolveTriggerFunction(run),
batchId: run.batch?.friendlyId,
metadata,
region: run.workerQueue || undefined,
region: run.workerQueue ? baseWorkerQueue(run.workerQueue) : undefined,
};
}

Expand Down
3 changes: 2 additions & 1 deletion apps/webapp/app/presenters/v3/NextRunListPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { timeFilters } from "~/components/runs/v3/SharedFilters";
import { findDisplayableEnvironment } from "~/models/runtimeEnvironment.server";
import { getTaskIdentifiers } from "~/models/task.server";
import { RunsRepository } from "~/services/runsRepository/runsRepository.server";
import { baseWorkerQueue } from "~/runEngine/concerns/workerQueueSplit.server";
import { machinePresetFromRun } from "~/v3/machinePresets.server";
import { ServiceValidationError } from "~/v3/services/baseService.server";
import { isCancellableRunStatus, isFinalRunStatus, isPendingRunStatus } from "~/v3/taskStatus";
Expand Down Expand Up @@ -259,7 +260,7 @@ export class NextRunListPresenter {
name: run.queue.replace("task/", ""),
type: run.queue.startsWith("task/") ? "task" : "custom",
},
region: run.workerQueue ? run.workerQueue : undefined,
region: run.workerQueue ? baseWorkerQueue(run.workerQueue) : undefined,
taskKind: RunAnnotations.safeParse(run.annotations).data?.taskKind ?? "STANDARD",
};
}),
Expand Down
3 changes: 2 additions & 1 deletion apps/webapp/app/presenters/v3/SpanPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import {
getUserProvidedIdempotencyKey,
} from "@trigger.dev/core/v3/serverOnly";
import { RUNNING_STATUSES } from "~/components/runs/v3/TaskRunStatus";
import { baseWorkerQueue } from "~/runEngine/concerns/workerQueueSplit.server";
import { logger } from "~/services/logger.server";
import { rehydrateAttribute } from "~/v3/eventRepository/eventRepository.server";
import { machinePresetFromRun } from "~/v3/machinePresets.server";
Expand Down Expand Up @@ -302,7 +303,7 @@ export class SpanPresenter extends BasePresenter {
location: true,
},
where: {
masterQueue: run.workerQueue,
masterQueue: baseWorkerQueue(run.workerQueue),
},
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
type SyntheticReplayTaskRun,
} from "~/v3/mollifier/syntheticReplayTaskRun.server";
import parseDuration from "parse-duration";
import { baseWorkerQueue } from "~/runEngine/concerns/workerQueueSplit.server";
import { findCurrentWorkerDeployment } from "~/v3/models/workerDeployment.server";
import { queueTypeFromType } from "~/presenters/v3/QueueRetrievePresenter.server";
import { ReplayRunData } from "~/v3/replayTask";
Expand Down Expand Up @@ -209,7 +210,7 @@ export async function loader({ request, params }: LoaderFunctionArgs) {
maxAttempts: run.maxAttempts,
maxDurationSeconds: run.maxDurationInSeconds,
machinePreset: run.machinePreset,
region: environment.type === "DEVELOPMENT" ? undefined : run.workerQueue,
region: environment.type === "DEVELOPMENT" ? undefined : baseWorkerQueue(run.workerQueue),
Comment thread
ericallam marked this conversation as resolved.
regions: regionsResult.regions,
ttlSeconds: run.ttl ? parseDuration(run.ttl, "s") ?? undefined : undefined,
idempotencyKey: run.idempotencyKey,
Expand Down
21 changes: 21 additions & 0 deletions apps/webapp/app/runEngine/concerns/workerQueueSplit.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,27 @@ import { FEATURE_FLAG, FeatureFlagCatalog } from "~/v3/featureFlags";
*/
export const SCHEDULED_WORKER_QUEUE_SUFFIX = ":scheduled";

/**
* Recover the base region a worker queue belongs to by stripping any split
* suffix (e.g. `us-nyc-3:scheduled` -> `us-nyc-3`). Region/masterQueue names are
* either `<name>` or `<projectId>-<name>` and never contain a colon, so the
* region is everything before the first `:`. Use this wherever a worker queue is
* read as a region — for display, filtering, or as a region override — so
* scheduled-split runs group under their real region instead of a phantom one.
* Idempotent; returns the input unchanged when there's no suffix. A nullish
* worker queue (e.g. from a synthetic run snapshot) passes straight through.
*/
export function baseWorkerQueue(workerQueue: string): string;
export function baseWorkerQueue(workerQueue: string | null | undefined): string | null | undefined;
export function baseWorkerQueue(workerQueue: string | null | undefined): string | null | undefined {
if (workerQueue == null) {
return workerQueue;
}

const colon = workerQueue.indexOf(":");
return colon === -1 ? workerQueue : workerQueue.slice(0, colon);
}

/** `TriggerSource` value used for runs originating from a schedule. */
const SCHEDULE_TRIGGER_SOURCE = "schedule";

Expand Down
3 changes: 2 additions & 1 deletion apps/webapp/app/services/runsReplicationService.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import EventEmitter from "node:events";
import pLimit from "p-limit";
import { detectBadJsonStrings } from "~/utils/detectBadJsonStrings";
import { calculateErrorFingerprint } from "~/utils/errorFingerprinting";
import { baseWorkerQueue } from "~/runEngine/concerns/workerQueueSplit.server";
import {
isClickHouseJsonParseError,
parseRowNumberFromError,
Expand Down Expand Up @@ -1121,7 +1122,7 @@ export class RunsReplicationService {
event === "delete" ? 1 : 0, // _is_deleted
run.concurrencyKey ?? "", // concurrency_key
run.bulkActionGroupIds ?? [], // bulk_action_group_ids
run.masterQueue ?? "", // worker_queue
baseWorkerQueue(run.masterQueue ?? ""), // worker_queue (region; strip any split suffix like `:scheduled`)
Comment thread
ericallam marked this conversation as resolved.
run.maxDurationInSeconds ?? null, // max_duration_in_seconds
annotations?.triggerSource ?? "", // trigger_source
annotations?.rootTriggerSource ?? "", // root_trigger_source
Expand Down
5 changes: 4 additions & 1 deletion apps/webapp/app/v3/services/replayTaskRun.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
} from "@trigger.dev/core/v3";
import { type TaskRun } from "@trigger.dev/database";
import { findEnvironmentById } from "~/models/runtimeEnvironment.server";
import { baseWorkerQueue } from "~/runEngine/concerns/workerQueueSplit.server";
import { logger } from "~/services/logger.server";
import { BaseService } from "./baseService.server";
import { OutOfEntitlementError, TriggerTaskService } from "./triggerTask.server";
Expand Down Expand Up @@ -65,7 +66,9 @@ export class ReplayTaskRunService extends BaseService {
existingTaskRun.engine === "V1" ||
existingEnvironment.type === "DEVELOPMENT" ||
authenticatedEnvironment.type === "DEVELOPMENT";
const region = ignoreRegion ? undefined : overrideOptions.region ?? existingTaskRun.workerQueue;
const region = ignoreRegion
? undefined
: overrideOptions.region ?? baseWorkerQueue(existingTaskRun.workerQueue);
Comment thread
ericallam marked this conversation as resolved.

try {
const taskQueue = await this._prisma.taskQueue.findFirst({
Expand Down
42 changes: 42 additions & 0 deletions apps/webapp/test/workerQueueSplit.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { describe, expect, it } from "vitest";
import {
baseWorkerQueue,
resolveScheduledQueueSplitEnabled,
workerQueueForRun,
workerQueueForClass,
Expand Down Expand Up @@ -98,6 +99,47 @@ describe("workerQueueForRun", () => {
});
});

describe("baseWorkerQueue", () => {
const region = "us-nyc-3";
const scheduled = `${region}${SCHEDULED_WORKER_QUEUE_SUFFIX}`;

it("strips the scheduled split suffix back to the base region", () => {
expect(baseWorkerQueue(scheduled)).toBe(region);
});

it("leaves a base region untouched (idempotent)", () => {
expect(baseWorkerQueue(region)).toBe(region);
expect(baseWorkerQueue(baseWorkerQueue(scheduled))).toBe(region);
});

it("strips any future `:<class>` suffix, not just `:scheduled`", () => {
expect(baseWorkerQueue("us-nyc-3:priority")).toBe(region);
expect(baseWorkerQueue("us-nyc-3:a:b")).toBe(region);
});

it("handles the project-scoped masterQueue shape (`<projectId>-<name>`)", () => {
expect(baseWorkerQueue("proj_abc123-main:scheduled")).toBe("proj_abc123-main");
});

it("returns an empty string unchanged", () => {
expect(baseWorkerQueue("")).toBe("");
});

it("passes a nullish worker queue straight through (synthetic run snapshots)", () => {
expect(baseWorkerQueue(null)).toBeNull();
expect(baseWorkerQueue(undefined)).toBeUndefined();
});

it("round-trips with workerQueueForRun: the split queue strips back to the region it came from", () => {
const enqueued = workerQueueForRun({
workerQueue: region,
rootTriggerSource: "schedule",
splitEnabled: true,
});
expect(baseWorkerQueue(enqueued)).toBe(region);
});
});

describe("workerQueueForClass", () => {
const region = "us-nyc-3";
const scheduled = `${region}${SCHEDULED_WORKER_QUEUE_SUFFIX}`;
Expand Down