diff --git a/apps/webapp/app/v3/legacyRunEngineWorker.server.ts b/apps/webapp/app/v3/legacyRunEngineWorker.server.ts index 09fdbf886c..7a381063bd 100644 --- a/apps/webapp/app/v3/legacyRunEngineWorker.server.ts +++ b/apps/webapp/app/v3/legacyRunEngineWorker.server.ts @@ -5,7 +5,10 @@ import { env } from "~/env.server"; import { logger } from "~/services/logger.server"; import { singleton } from "~/utils/singleton"; import { TaskRunHeartbeatFailedService } from "./taskRunHeartbeatFailed.server"; -import { completeBatchTaskRunItemV3 } from "./services/batchTriggerV3.server"; +import { + completeBatchTaskRunItemV3, + tryCompleteBatchV3, +} from "./services/batchTriggerV3.server"; import { prisma } from "~/db.server"; import { marqs } from "./marqs/index.server"; @@ -50,6 +53,16 @@ function initializeWorker() { maxAttempts: 10, }, }, + tryCompleteBatchV3: { + schema: z.object({ + batchId: z.string(), + scheduleResumeOnComplete: z.boolean(), + }), + visibilityTimeoutMs: 30_000, + retry: { + maxAttempts: 5, + }, + }, scheduleRequeueMessage: { schema: z.object({ messageId: z.string(), @@ -85,6 +98,9 @@ function initializeWorker() { attempt ); }, + tryCompleteBatchV3: async ({ payload }) => { + await tryCompleteBatchV3(payload.batchId, prisma, payload.scheduleResumeOnComplete); + }, scheduleRequeueMessage: async ({ payload }) => { await marqs.requeueMessageById(payload.messageId); }, diff --git a/apps/webapp/app/v3/services/batchTriggerV3.server.ts b/apps/webapp/app/v3/services/batchTriggerV3.server.ts index 337130f443..e4bc583b7c 100644 --- a/apps/webapp/app/v3/services/batchTriggerV3.server.ts +++ b/apps/webapp/app/v3/services/batchTriggerV3.server.ts @@ -14,7 +14,7 @@ import { TaskRunAttempt, } from "@trigger.dev/database"; import { z } from "zod"; -import { $transaction, prisma, PrismaClientOrTransaction } from "~/db.server"; +import { prisma, PrismaClientOrTransaction } from "~/db.server"; import { env } from "~/env.server"; import { batchTaskRunItemStatusForRunStatus } from "~/models/taskRun.server"; import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; @@ -934,6 +934,69 @@ export class BatchTriggerV3Service extends BaseService { } } +export async function tryCompleteBatchV3( + batchId: string, + tx: PrismaClientOrTransaction, + scheduleResumeOnComplete: boolean +) { + const batch = await tx.batchTaskRun.findFirst({ + where: { id: batchId }, + select: { + id: true, + sealed: true, + status: true, + expectedCount: true, + dependentTaskAttemptId: true, + }, + }); + + if (!batch) { + logger.debug("tryCompleteBatchV3: Batch not found", { batchId }); + return; + } + + if (batch.status === "COMPLETED") { + logger.debug("tryCompleteBatchV3: Already completed", { batchId }); + return; + } + + if (!batch.sealed) { + logger.debug("tryCompleteBatchV3: Not sealed yet", { batchId }); + return; + } + + // Count completed items (read-only, no contention) + const completedCount = await tx.batchTaskRunItem.count({ + where: { batchTaskRunId: batchId, status: "COMPLETED" }, + }); + + if (completedCount < batch.expectedCount) { + logger.debug("tryCompleteBatchV3: Not all items completed", { + batchId, + completedCount, + expectedCount: batch.expectedCount, + }); + return; + } + + // Mark batch COMPLETED (idempotent via status check) + const updated = await tx.batchTaskRun.updateMany({ + where: { id: batchId, status: "PENDING" }, + data: { status: "COMPLETED", completedAt: new Date(), completedCount }, + }); + + if (updated.count === 0) { + logger.debug("tryCompleteBatchV3: Already transitioned", { batchId }); + return; + } + + logger.debug("tryCompleteBatchV3: Batch completed", { batchId, completedCount }); + + if (scheduleResumeOnComplete && batch.dependentTaskAttemptId) { + await ResumeBatchRunService.enqueue(batchId, true, tx); + } +} + export async function completeBatchTaskRunItemV3( itemId: string, batchTaskRunId: string, @@ -953,86 +1016,32 @@ export async function completeBatchTaskRunItemV3( isRetry, }); - if (isRetry) { - logger.debug("completeBatchTaskRunItemV3 retrying", { - itemId, - batchTaskRunId, - scheduleResumeOnComplete, - taskRunAttemptId, - retryAttempt, - }); - } - try { - await $transaction( - tx, - "completeBatchTaskRunItemV3", - async (tx, span) => { - span?.setAttribute("batch_id", batchTaskRunId); - - // Update the item to complete - const updated = await tx.batchTaskRunItem.updateMany({ - where: { - id: itemId, - status: "PENDING", - }, - data: { - status: "COMPLETED", - taskRunAttemptId, - }, - }); - - if (updated.count === 0) { - return; - } - - const updatedBatchRun = await tx.batchTaskRun.update({ - where: { - id: batchTaskRunId, - }, - data: { - completedCount: { - increment: 1, - }, - }, - select: { - sealed: true, - status: true, - completedCount: true, - expectedCount: true, - dependentTaskAttemptId: true, - }, - }); + // Update item to COMPLETED (no transaction needed, no contention) + const updated = await tx.batchTaskRunItem.updateMany({ + where: { id: itemId, status: "PENDING" }, + data: { status: "COMPLETED", taskRunAttemptId }, + }); - if ( - updatedBatchRun.status === "PENDING" && - updatedBatchRun.completedCount === updatedBatchRun.expectedCount && - updatedBatchRun.sealed - ) { - await tx.batchTaskRun.update({ - where: { - id: batchTaskRunId, - }, - data: { - status: "COMPLETED", - completedAt: new Date(), - }, - }); + if (updated.count === 0) { + logger.debug("completeBatchTaskRunItemV3: Item already completed", { + itemId, + batchTaskRunId, + }); + return; + } - // We only need to resume the batch if it has a dependent task attempt ID - if (scheduleResumeOnComplete && updatedBatchRun.dependentTaskAttemptId) { - await ResumeBatchRunService.enqueue(batchTaskRunId, true, tx); - } - } - }, - { - timeout: 10_000, - maxWait: 4_000, - } - ); + // Schedule debounced completion check + // enqueue with same ID overwrites, resetting the 200ms timer (debounce behavior) + await legacyRunEngineWorker.enqueue({ + id: `tryCompleteBatchV3:${batchTaskRunId}`, + job: "tryCompleteBatchV3", + payload: { batchId: batchTaskRunId, scheduleResumeOnComplete }, + availableAt: new Date(Date.now() + 200), + }); } catch (error) { if (isPrismaRetriableError(error) || isPrismaRaceConditionError(error)) { - logger.error("completeBatchTaskRunItemV3 failed with a Prisma Error, scheduling a retry", { + logger.error("completeBatchTaskRunItemV3 failed, scheduling retry", { itemId, batchTaskRunId, error, @@ -1041,24 +1050,17 @@ export async function completeBatchTaskRunItemV3( }); if (isRetry) { - //throwing this error will cause the Redis worker to retry the job throw error; } else { - //schedule a retry await legacyRunEngineWorker.enqueue({ id: `completeBatchTaskRunItem:${itemId}`, job: "completeBatchTaskRunItem", - payload: { - itemId, - batchTaskRunId, - scheduleResumeOnComplete, - taskRunAttemptId, - }, + payload: { itemId, batchTaskRunId, scheduleResumeOnComplete, taskRunAttemptId }, availableAt: new Date(Date.now() + 2_000), }); } } else { - logger.error("completeBatchTaskRunItemV3 failed with a non-retriable error", { + logger.error("completeBatchTaskRunItemV3 failed with non-retriable error", { itemId, batchTaskRunId, error,