Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion apps/webapp/app/v3/legacyRunEngineWorker.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ import { env } from "~/env.server";
import { logger } from "~/services/logger.server";
import { singleton } from "~/utils/singleton";
import { TaskRunHeartbeatFailedService } from "./taskRunHeartbeatFailed.server";
import { completeBatchTaskRunItemV3 } from "./services/batchTriggerV3.server";
import {
completeBatchTaskRunItemV3,
tryCompleteBatchV3,
} from "./services/batchTriggerV3.server";
import { prisma } from "~/db.server";
import { marqs } from "./marqs/index.server";

Expand Down Expand Up @@ -50,6 +53,16 @@ function initializeWorker() {
maxAttempts: 10,
},
},
tryCompleteBatchV3: {
schema: z.object({
batchId: z.string(),
scheduleResumeOnComplete: z.boolean(),
}),
visibilityTimeoutMs: 30_000,
retry: {
maxAttempts: 5,
},
},
scheduleRequeueMessage: {
schema: z.object({
messageId: z.string(),
Expand Down Expand Up @@ -85,6 +98,9 @@ function initializeWorker() {
attempt
);
},
tryCompleteBatchV3: async ({ payload }) => {
await tryCompleteBatchV3(payload.batchId, prisma, payload.scheduleResumeOnComplete);
},
scheduleRequeueMessage: async ({ payload }) => {
await marqs.requeueMessageById(payload.messageId);
},
Expand Down
172 changes: 87 additions & 85 deletions apps/webapp/app/v3/services/batchTriggerV3.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import {
TaskRunAttempt,
} from "@trigger.dev/database";
import { z } from "zod";
import { $transaction, prisma, PrismaClientOrTransaction } from "~/db.server";
import { prisma, PrismaClientOrTransaction } from "~/db.server";
import { env } from "~/env.server";
import { batchTaskRunItemStatusForRunStatus } from "~/models/taskRun.server";
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
Expand Down Expand Up @@ -934,6 +934,69 @@ export class BatchTriggerV3Service extends BaseService {
}
}

export async function tryCompleteBatchV3(
batchId: string,
tx: PrismaClientOrTransaction,
scheduleResumeOnComplete: boolean
) {
const batch = await tx.batchTaskRun.findFirst({
where: { id: batchId },
select: {
id: true,
sealed: true,
status: true,
expectedCount: true,
dependentTaskAttemptId: true,
},
});

if (!batch) {
logger.debug("tryCompleteBatchV3: Batch not found", { batchId });
return;
}

if (batch.status === "COMPLETED") {
logger.debug("tryCompleteBatchV3: Already completed", { batchId });
return;
}

if (!batch.sealed) {
logger.debug("tryCompleteBatchV3: Not sealed yet", { batchId });
return;
}

// Count completed items (read-only, no contention)
const completedCount = await tx.batchTaskRunItem.count({
where: { batchTaskRunId: batchId, status: "COMPLETED" },
});

if (completedCount < batch.expectedCount) {
logger.debug("tryCompleteBatchV3: Not all items completed", {
batchId,
completedCount,
expectedCount: batch.expectedCount,
});
return;
}

// Mark batch COMPLETED (idempotent via status check)
const updated = await tx.batchTaskRun.updateMany({
where: { id: batchId, status: "PENDING" },
data: { status: "COMPLETED", completedAt: new Date(), completedCount },
});

if (updated.count === 0) {
logger.debug("tryCompleteBatchV3: Already transitioned", { batchId });
return;
}

logger.debug("tryCompleteBatchV3: Batch completed", { batchId, completedCount });

if (scheduleResumeOnComplete && batch.dependentTaskAttemptId) {
await ResumeBatchRunService.enqueue(batchId, true, tx);
}
}

export async function completeBatchTaskRunItemV3(
itemId: string,
batchTaskRunId: string,
Expand All @@ -953,86 +1016,32 @@ export async function completeBatchTaskRunItemV3(
isRetry,
});

if (isRetry) {
logger.debug("completeBatchTaskRunItemV3 retrying", {
itemId,
batchTaskRunId,
scheduleResumeOnComplete,
taskRunAttemptId,
retryAttempt,
});
}

try {
await $transaction(
tx,
"completeBatchTaskRunItemV3",
async (tx, span) => {
span?.setAttribute("batch_id", batchTaskRunId);

// Update the item to complete
const updated = await tx.batchTaskRunItem.updateMany({
where: {
id: itemId,
status: "PENDING",
},
data: {
status: "COMPLETED",
taskRunAttemptId,
},
});

if (updated.count === 0) {
return;
}

const updatedBatchRun = await tx.batchTaskRun.update({
where: {
id: batchTaskRunId,
},
data: {
completedCount: {
increment: 1,
},
},
select: {
sealed: true,
status: true,
completedCount: true,
expectedCount: true,
dependentTaskAttemptId: true,
},
});
// Update item to COMPLETED (no transaction needed, no contention)
const updated = await tx.batchTaskRunItem.updateMany({
where: { id: itemId, status: "PENDING" },
data: { status: "COMPLETED", taskRunAttemptId },
});

if (
updatedBatchRun.status === "PENDING" &&
updatedBatchRun.completedCount === updatedBatchRun.expectedCount &&
updatedBatchRun.sealed
) {
await tx.batchTaskRun.update({
where: {
id: batchTaskRunId,
},
data: {
status: "COMPLETED",
completedAt: new Date(),
},
});
if (updated.count === 0) {
logger.debug("completeBatchTaskRunItemV3: Item already completed", {
itemId,
batchTaskRunId,
});
return;
}

// We only need to resume the batch if it has a dependent task attempt ID
if (scheduleResumeOnComplete && updatedBatchRun.dependentTaskAttemptId) {
await ResumeBatchRunService.enqueue(batchTaskRunId, true, tx);
}
}
},
{
timeout: 10_000,
maxWait: 4_000,
}
);
// Schedule debounced completion check
// enqueue with same ID overwrites, resetting the 200ms timer (debounce behavior)
await legacyRunEngineWorker.enqueue({
id: `tryCompleteBatchV3:${batchTaskRunId}`,
job: "tryCompleteBatchV3",
payload: { batchId: batchTaskRunId, scheduleResumeOnComplete },
availableAt: new Date(Date.now() + 200),
});
} catch (error) {
if (isPrismaRetriableError(error) || isPrismaRaceConditionError(error)) {
logger.error("completeBatchTaskRunItemV3 failed with a Prisma Error, scheduling a retry", {
logger.error("completeBatchTaskRunItemV3 failed, scheduling retry", {
itemId,
batchTaskRunId,
error,
Expand All @@ -1041,24 +1050,17 @@ export async function completeBatchTaskRunItemV3(
});

if (isRetry) {
//throwing this error will cause the Redis worker to retry the job
throw error;
} else {
//schedule a retry
await legacyRunEngineWorker.enqueue({
id: `completeBatchTaskRunItem:${itemId}`,
job: "completeBatchTaskRunItem",
payload: {
itemId,
batchTaskRunId,
scheduleResumeOnComplete,
taskRunAttemptId,
},
payload: { itemId, batchTaskRunId, scheduleResumeOnComplete, taskRunAttemptId },
availableAt: new Date(Date.now() + 2_000),
});
}
} else {
logger.error("completeBatchTaskRunItemV3 failed with a non-retriable error", {
logger.error("completeBatchTaskRunItemV3 failed with non-retriable error", {
itemId,
batchTaskRunId,
error,
Expand Down