Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/packet-v2-packets-api.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@trigger.dev/core": patch
---

Large run outputs can use the new API which allows switching object storage providers.
25 changes: 22 additions & 3 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,28 @@ POSTHOG_PROJECT_KEY=
# DEPOT_TOKEN=<Depot org token>
# DEV_OTEL_EXPORTER_OTLP_ENDPOINT="http://0.0.0.0:4318"
# These are needed for the object store (for handling large payloads/outputs)
# OBJECT_STORE_BASE_URL="https://{bucket}.{accountId}.r2.cloudflarestorage.com"
# OBJECT_STORE_ACCESS_KEY_ID=
# OBJECT_STORE_SECRET_ACCESS_KEY=
#
# Default provider
# OBJECT_STORE_BASE_URL=http://localhost:9005
# OBJECT_STORE_BUCKET=packets
# OBJECT_STORE_ACCESS_KEY_ID=minioadmin
# OBJECT_STORE_SECRET_ACCESS_KEY=minioadmin
# OBJECT_STORE_REGION=us-east-1
# OBJECT_STORE_SERVICE=s3
#
# OBJECT_STORE_DEFAULT_PROTOCOL=s3 # Only specify this if you're going to migrate object storage and set protocol values below
# Named providers (protocol-prefixed data) - optional for multi-provider support
# OBJECT_STORE_S3_BASE_URL=https://s3.amazonaws.com
# OBJECT_STORE_S3_ACCESS_KEY_ID=
# OBJECT_STORE_S3_SECRET_ACCESS_KEY=
# OBJECT_STORE_S3_REGION=us-east-1
# OBJECT_STORE_S3_SERVICE=s3
#
# OBJECT_STORE_R2_BASE_URL=https://{bucket}.{accountId}.r2.cloudflarestorage.com
# OBJECT_STORE_R2_ACCESS_KEY_ID=
# OBJECT_STORE_R2_SECRET_ACCESS_KEY=
# OBJECT_STORE_R2_REGION=auto
# OBJECT_STORE_R2_SERVICE=s3
# CHECKPOINT_THRESHOLD_IN_MS=10000

# These control the server-side internal telemetry
Expand Down
6 changes: 6 additions & 0 deletions .server-changes/multi-provider-object-storage.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: feature
---

Multi-provider object storage with protocol-based routing for zero-downtime migration
6 changes: 6 additions & 0 deletions .server-changes/object-store-iam-auth.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: feature
---

Add IAM role-based auth support for object stores (no access keys required).
7 changes: 7 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -349,11 +349,18 @@ const EnvironmentSchema = z
.default(60 * 1000 * 15), // 15 minutes

OBJECT_STORE_BASE_URL: z.string().optional(),
OBJECT_STORE_BUCKET: z.string().optional(),
OBJECT_STORE_ACCESS_KEY_ID: z.string().optional(),
OBJECT_STORE_SECRET_ACCESS_KEY: z.string().optional(),
OBJECT_STORE_REGION: z.string().optional(),
OBJECT_STORE_SERVICE: z.string().default("s3"),

// Protocol to use for new uploads (e.g., "s3", "r2"). Data without protocol uses default provider above.
// If specified, you must configure the corresponding provider using OBJECT_STORE_{PROTOCOL}_* env vars.
// Example: OBJECT_STORE_DEFAULT_PROTOCOL=s3 requires OBJECT_STORE_S3_BASE_URL, OBJECT_STORE_S3_ACCESS_KEY_ID, etc.
// Enables zero-downtime migration between providers (old data keeps working, new data uses new provider).
OBJECT_STORE_DEFAULT_PROTOCOL: z.string().regex(/^[a-z0-9]+$/).optional(),

ARTIFACTS_OBJECT_STORE_BUCKET: z.string().optional(),
ARTIFACTS_OBJECT_STORE_BASE_URL: z.string().optional(),
ARTIFACTS_OBJECT_STORE_ACCESS_KEY_ID: z.string().optional(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import assertNever from "assert-never";
import { API_VERSIONS, CURRENT_API_VERSION, RunStatusUnspecifiedApiVersion } from "~/api/versions";
import { $replica, prisma } from "~/db.server";
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { generatePresignedUrl } from "~/v3/r2.server";
import { generatePresignedUrl } from "~/v3/objectStore.server";
import { tracer } from "~/v3/tracer.server";
import { startSpanWithEnv } from "~/v3/tracing.server";

Expand Down
5 changes: 3 additions & 2 deletions apps/webapp/app/routes/api.v1.packets.$.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { json } from "@remix-run/server-runtime";
import { z } from "zod";
import { authenticateApiRequest } from "~/services/apiAuth.server";
import { createLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server";
import { generatePresignedUrl } from "~/v3/r2.server";
import { generatePresignedUrl } from "~/v3/objectStore.server";

const ParamsSchema = z.object({
"*": z.string(),
Expand All @@ -29,7 +29,8 @@ export async function action({ request, params }: ActionFunctionArgs) {
authenticationResult.environment.project.externalRef,
authenticationResult.environment.slug,
filename,
"PUT"
"PUT",
{ forceNoPrefix: true }
);

if (!signed.success) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
import { type ActionFunctionArgs, json } from "@remix-run/server-runtime";
import {
type CompleteWaitpointTokenResponseBody,
conditionallyExportPacket,
stringifyIO,
} from "@trigger.dev/core/v3";
import { type CompleteWaitpointTokenResponseBody, stringifyIO } from "@trigger.dev/core/v3";
import { WaitpointId } from "@trigger.dev/core/v3/isomorphic";
import { z } from "zod";
import { $replica } from "~/db.server";
import { env } from "~/env.server";
import { processWaitpointCompletionPacket } from "~/runEngine/concerns/waitpointCompletionPacket.server";
import type { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { verifyHttpCallbackHash } from "~/services/httpCallback.server";
import { logger } from "~/services/logger.server";
import { engine } from "~/v3/runEngine.server";
Expand Down Expand Up @@ -41,8 +39,10 @@ export async function action({ request, params }: ActionFunctionArgs) {
},
include: {
environment: {
select: {
apiKey: true,
include: {
project: true,
organization: true,
orgMember: true,
parentEnvironment: {
select: {
apiKey: true,
Expand Down Expand Up @@ -77,9 +77,10 @@ export async function action({ request, params }: ActionFunctionArgs) {
const body = await request.json().catch(() => ({}));

const stringifiedData = await stringifyIO(body);
const finalData = await conditionallyExportPacket(
const finalData = await processWaitpointCompletionPacket(
stringifiedData,
`${waitpointId}/waitpoint/http-callback`
waitpoint.environment,
`${WaitpointId.toFriendlyId(waitpointId)}/http-callback`
);

const result = await engine.completeWaitpoint({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ import { json } from "@remix-run/server-runtime";
import {
CompleteWaitpointTokenRequestBody,
type CompleteWaitpointTokenResponseBody,
conditionallyExportPacket,
stringifyIO,
} from "@trigger.dev/core/v3";
import { WaitpointId } from "@trigger.dev/core/v3/isomorphic";
import { z } from "zod";
import { $replica } from "~/db.server";
import { env } from "~/env.server";
import { logger } from "~/services/logger.server";
import { processWaitpointCompletionPacket } from "~/runEngine/concerns/waitpointCompletionPacket.server";
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
import { engine } from "~/v3/runEngine.server";

Expand Down Expand Up @@ -52,9 +52,10 @@ const { action, loader } = createActionApiRoute(
}

const stringifiedData = await stringifyIO(body.data);
const finalData = await conditionallyExportPacket(
const finalData = await processWaitpointCompletionPacket(
stringifiedData,
`${waitpointId}/waitpoint/token`
authentication.environment,
`${WaitpointId.toFriendlyId(waitpointId)}/token`
);

const result = await engine.completeWaitpoint({
Expand Down
45 changes: 45 additions & 0 deletions apps/webapp/app/routes/api.v2.packets.$.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import type { ActionFunctionArgs } from "@remix-run/server-runtime";
import { json } from "@remix-run/server-runtime";
import { z } from "zod";
import { authenticateApiRequest } from "~/services/apiAuth.server";
import { generatePresignedUrl } from "~/v3/objectStore.server";

const ParamsSchema = z.object({
"*": z.string(),
});

/**
* PUT-only presign for packet uploads (SDK offload). Uses OBJECT_STORE_DEFAULT_PROTOCOL for
* unprefixed keys; returns canonical storagePath for IOPacket.data. GET presigns use v1.
*/
export async function action({ request, params }: ActionFunctionArgs) {
if (request.method.toUpperCase() !== "PUT") {
return { status: 405, body: "Method Not Allowed" };
}

const authenticationResult = await authenticateApiRequest(request);

if (!authenticationResult) {
return json({ error: "Invalid or Missing API key" }, { status: 401 });
}

const parsedParams = ParamsSchema.parse(params);
const filename = parsedParams["*"];

const signed = await generatePresignedUrl(
authenticationResult.environment.project.externalRef,
authenticationResult.environment.slug,
filename,
"PUT"
);

if (!signed.success) {
return json({ error: `Failed to generate presigned URL: ${signed.error}` }, { status: 500 });
}

if (signed.storagePath === undefined) {
return json({ error: "Failed to resolve storage path for packet upload" }, { status: 500 });
}

return json({ presignedUrl: signed.url, storagePath: signed.storagePath });
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,7 @@ import { env } from "~/env.server";
import { parse } from "@conform-to/zod";
import { Form, useLocation, useNavigation, useSubmit } from "@remix-run/react";
import { type ActionFunctionArgs, json } from "@remix-run/server-runtime";
import {
conditionallyExportPacket,
IOPacket,
stringifyIO,
timeoutError,
WaitpointTokenStatus,
} from "@trigger.dev/core/v3";
import { stringifyIO, timeoutError, WaitpointTokenStatus } from "@trigger.dev/core/v3";
import { WaitpointId } from "@trigger.dev/core/v3/isomorphic";
import type { Waitpoint } from "@trigger.dev/database";
import { useCallback, useRef } from "react";
Expand All @@ -24,6 +18,8 @@ import { $replica } from "~/db.server";
import { useOrganization } from "~/hooks/useOrganizations";
import { useProject } from "~/hooks/useProject";
import { redirectWithErrorMessage, redirectWithSuccessMessage } from "~/models/message.server";
import { findEnvironmentBySlug } from "~/models/runtimeEnvironment.server";
import { processWaitpointCompletionPacket } from "~/runEngine/concerns/waitpointCompletionPacket.server";
import { logger } from "~/services/logger.server";
import { requireUserId } from "~/services/session.server";
import { EnvironmentParamSchema, ProjectParamSchema, v3RunsPath } from "~/utils/pathBuilder";
Expand Down Expand Up @@ -86,6 +82,7 @@ export const action = async ({ request, params }: ActionFunctionArgs) => {
const waitpoint = await $replica.waitpoint.findFirst({
select: {
projectId: true,
environmentId: true,
},
where: {
id: waitpointId,
Expand Down Expand Up @@ -150,11 +147,29 @@ export const action = async ({ request, params }: ActionFunctionArgs) => {
);
}

const environment = await findEnvironmentBySlug(project.id, envParam, userId);
if (!environment) {
return redirectWithErrorMessage(
submission.value.failureRedirect,
request,
"Environment not found"
);
}

if (environment.id !== waitpoint.environmentId) {
return redirectWithErrorMessage(
submission.value.failureRedirect,
request,
"No waitpoint found"
);
}

const data = submission.value.payload ? JSON.parse(submission.value.payload) : {};
const stringifiedData = await stringifyIO(data);
const finalData = await conditionallyExportPacket(
const finalData = await processWaitpointCompletionPacket(
stringifiedData,
`${waitpointId}/waitpoint/token`
environment,
`${WaitpointId.toFriendlyId(waitpointId)}/token`
);

const result = await engine.completeWaitpoint({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { basename } from "node:path";
import { z } from "zod";
import { prisma } from "~/db.server";
import { requireUserId } from "~/services/session.server";
import { generatePresignedRequest } from "~/v3/r2.server";
import { generatePresignedRequest } from "~/v3/objectStore.server";

const ParamSchema = z.object({
environmentId: z.string(),
Expand Down
28 changes: 17 additions & 11 deletions apps/webapp/app/runEngine/concerns/batchPayloads.server.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { IOPacket, packetRequiresOffloading, tryCatch } from "@trigger.dev/core/v3";
import { type IOPacket, packetRequiresOffloading, tryCatch } from "@trigger.dev/core/v3";
import { env } from "~/env.server";
import { startActiveSpan } from "~/v3/tracer.server";
import { uploadPacketToObjectStore, r2 } from "~/v3/r2.server";
import type { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
import { hasObjectStoreClient, uploadPacketToObjectStore } from "~/v3/objectStore.server";
import { startActiveSpan } from "~/v3/tracer.server";

export type BatchPayloadProcessResult = {
/** The processed payload - either the original or an R2 path */
Expand Down Expand Up @@ -31,7 +31,7 @@ export class BatchPayloadProcessor {
* If not available, large payloads will be stored inline (which may fail for very large payloads).
*/
isObjectStoreAvailable(): boolean {
return r2 !== undefined && env.OBJECT_STORE_BASE_URL !== undefined;
return hasObjectStoreClient();
}

/**
Expand Down Expand Up @@ -103,11 +103,17 @@ export class BatchPayloadProcessor {
};
}

// Upload to R2
// Upload to object store
const filename = `batch_${batchId}/item_${itemIndex}/payload.json`;

const [uploadError] = await tryCatch(
uploadPacketToObjectStore(filename, packet.data, packet.dataType, environment)
const [uploadError, uploadedFilename] = await tryCatch(
uploadPacketToObjectStore(
filename,
packet.data,
packet.dataType,
environment,
env.OBJECT_STORE_DEFAULT_PROTOCOL
)
);

if (uploadError) {
Expand All @@ -125,18 +131,18 @@ export class BatchPayloadProcessor {
);
}

logger.debug("Batch item payload offloaded to R2", {
logger.debug("Batch item payload offloaded to object store", {
batchId,
itemIndex,
filename,
filename: uploadedFilename,
size,
});

span.setAttribute("wasOffloaded", true);
span.setAttribute("offloadPath", filename);
span.setAttribute("offloadPath", uploadedFilename);

return {
payload: filename,
payload: uploadedFilename!,
payloadType: "application/store",
wasOffloaded: true,
size,
Expand Down
8 changes: 4 additions & 4 deletions apps/webapp/app/runEngine/concerns/payloads.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { IOPacket, packetRequiresOffloading, tryCatch } from "@trigger.dev/core/
import { PayloadProcessor, TriggerTaskRequest } from "../types";
import { env } from "~/env.server";
import { startActiveSpan } from "~/v3/tracer.server";
import { uploadPacketToObjectStore } from "~/v3/r2.server";
import { uploadPacketToObjectStore } from "~/v3/objectStore.server";
import { ServiceValidationError } from "~/v3/services/common.server";

export class DefaultPayloadProcessor implements PayloadProcessor {
Expand Down Expand Up @@ -31,16 +31,16 @@ export class DefaultPayloadProcessor implements PayloadProcessor {

const filename = `${request.friendlyId}/payload.json`;

const [uploadError] = await tryCatch(
uploadPacketToObjectStore(filename, packet.data, packet.dataType, request.environment)
const [uploadError, uploadedFilename] = await tryCatch(
uploadPacketToObjectStore(filename, packet.data, packet.dataType, request.environment, env.OBJECT_STORE_DEFAULT_PROTOCOL)
);

if (uploadError) {
throw new ServiceValidationError("Failed to upload large payload to object store", 500); // This is retryable
}

return {
data: filename,
data: uploadedFilename!,
dataType: "application/store",
};
});
Expand Down
Loading
Loading