Skip to content

Commit 21dae6f

Browse files
committed
Failed batch queue processing now creates a pre-failed run, better handles failures from queue length limit failures and also retries
1 parent 965721c commit 21dae6f

File tree

17 files changed

+728
-155
lines changed

17 files changed

+728
-155
lines changed

apps/webapp/app/env.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -537,6 +537,7 @@ const EnvironmentSchema = z
537537
MAXIMUM_DEPLOYED_QUEUE_SIZE: z.coerce.number().int().optional(),
538538
QUEUE_SIZE_CACHE_TTL_MS: z.coerce.number().int().optional().default(1_000), // 1 second
539539
QUEUE_SIZE_CACHE_MAX_SIZE: z.coerce.number().int().optional().default(5_000),
540+
QUEUE_SIZE_CACHE_ENABLED: z.coerce.number().int().optional().default(1),
540541
MAX_BATCH_V2_TRIGGER_ITEMS: z.coerce.number().int().default(500),
541542
MAX_BATCH_AND_WAIT_V2_TRIGGER_ITEMS: z.coerce.number().int().default(500),
542543

apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,18 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => {
126126
organizationSlug,
127127
runParam,
128128
spanParam,
129-
error,
129+
linkedRunId,
130+
error:
131+
error instanceof Error
132+
? {
133+
name: error.name,
134+
message: error.message,
135+
stack: error.stack,
136+
cause: error.cause instanceof Error
137+
? { name: error.cause.name, message: error.cause.message }
138+
: error.cause,
139+
}
140+
: error,
130141
});
131142
return redirectWithErrorMessage(
132143
v3RunPath(

apps/webapp/app/runEngine/concerns/queues.server.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -423,6 +423,10 @@ async function getCachedQueueSize(
423423
environment: AuthenticatedEnvironment,
424424
queueName: string
425425
): Promise<number> {
426+
if (!env.QUEUE_SIZE_CACHE_ENABLED) {
427+
return engine.lengthOfQueue(environment, queueName);
428+
}
429+
426430
const cacheKey = `${environment.id}:${queueName}`;
427431
const result = await queueSizeCache.queueSize.swr(cacheKey, async () => {
428432
return engine.lengthOfQueue(environment, queueName);

apps/webapp/app/runEngine/services/streamBatchItems.server.ts

Lines changed: 6 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,14 @@
11
import {
2-
type BatchItemNDJSON,
32
type StreamBatchItemsResponse,
43
BatchItemNDJSON as BatchItemNDJSONSchema,
54
} from "@trigger.dev/core/v3";
6-
import { BatchId, sanitizeQueueName } from "@trigger.dev/core/v3/isomorphic";
5+
import { BatchId } from "@trigger.dev/core/v3/isomorphic";
76
import type { BatchItem, RunEngine } from "@internal/run-engine";
87
import { prisma, type PrismaClientOrTransaction } from "~/db.server";
98
import type { AuthenticatedEnvironment } from "~/services/apiAuth.server";
109
import { logger } from "~/services/logger.server";
1110
import { ServiceValidationError, WithRunEngine } from "../../v3/services/baseService.server";
1211
import { BatchPayloadProcessor } from "../concerns/batchPayloads.server";
13-
import { getMaximumSizeForEnvironment } from "../concerns/queues.server";
1412

1513
export type StreamBatchItemsServiceOptions = {
1614
maxItemBytes: number;
@@ -54,30 +52,6 @@ export class StreamBatchItemsService extends WithRunEngine {
5452
}
5553
}
5654

57-
/**
58-
* Resolve the queue name for a batch item.
59-
* Uses explicit queue name if provided, otherwise falls back to task default queue.
60-
*/
61-
private resolveQueueName(item: BatchItemNDJSON): string {
62-
// Check for explicit queue name in options
63-
const explicitQueue = item.options?.queue;
64-
if (explicitQueue) {
65-
// Handle both string and object forms
66-
if (typeof explicitQueue === "string") {
67-
return sanitizeQueueName(explicitQueue) || `task/${item.task}`;
68-
}
69-
if (typeof explicitQueue === "object" && "name" in explicitQueue) {
70-
const name = (explicitQueue as { name: unknown }).name;
71-
if (typeof name === "string") {
72-
return sanitizeQueueName(name) || `task/${item.task}`;
73-
}
74-
}
75-
}
76-
77-
// Default to task-based queue name
78-
return sanitizeQueueName(`task/${item.task}`) || `task/${item.task}`;
79-
}
80-
8155
/**
8256
* Process a stream of batch items from an async iterator.
8357
* Each item is validated and enqueued to the BatchQueue.
@@ -130,19 +104,8 @@ export class StreamBatchItemsService extends WithRunEngine {
130104
);
131105
}
132106

133-
// Get maximum queue size limit for this environment
134-
const maximumQueueSize = getMaximumSizeForEnvironment(environment);
135-
136-
// Track projected additions per queue for limit validation
137-
// Map of queue_name -> { currentSize: number, projectedAdditions: number }
138-
const queueSizeTracking = new Map<
139-
string,
140-
{ currentSize: number; projectedAdditions: number }
141-
>();
142-
143107
let itemsAccepted = 0;
144108
let itemsDeduplicated = 0;
145-
let itemsSkipped = 0;
146109
let lastIndex = -1;
147110

148111
// Process items from the stream
@@ -165,42 +128,6 @@ export class StreamBatchItemsService extends WithRunEngine {
165128
);
166129
}
167130

168-
// Validate queue size limit before enqueuing
169-
if (maximumQueueSize !== undefined) {
170-
const queueName = this.resolveQueueName(item);
171-
172-
// Get or initialize tracking for this queue
173-
let tracking = queueSizeTracking.get(queueName);
174-
if (!tracking) {
175-
// Fetch current queue size from Redis (first time seeing this queue)
176-
const currentSize = await this._engine.lengthOfQueue(environment, queueName);
177-
tracking = { currentSize, projectedAdditions: 0 };
178-
queueSizeTracking.set(queueName, tracking);
179-
}
180-
181-
// Check if adding this item would exceed the limit
182-
const projectedTotal =
183-
tracking.currentSize + tracking.projectedAdditions + 1;
184-
185-
if (projectedTotal > maximumQueueSize) {
186-
logger.warn("Skipping batch item due to queue size limit", {
187-
batchId: batchFriendlyId,
188-
queueName,
189-
currentSize: tracking.currentSize,
190-
projectedAdditions: tracking.projectedAdditions,
191-
maximumQueueSize,
192-
itemIndex: item.index,
193-
});
194-
195-
// Skip this item - don't enqueue it
196-
itemsSkipped++;
197-
continue;
198-
}
199-
200-
// Increment projected additions for this queue
201-
tracking.projectedAdditions++;
202-
}
203-
204131
// Get the original payload type
205132
const originalPayloadType = (item.options?.payloadType as string) ?? "application/json";
206133

@@ -239,19 +166,14 @@ export class StreamBatchItemsService extends WithRunEngine {
239166
// Get the actual enqueued count from Redis
240167
const enqueuedCount = await this._engine.getBatchEnqueuedCount(batchId);
241168

242-
// Calculate expected count accounting for skipped items
243-
const expectedAfterSkips = batch.runCount - itemsSkipped;
244-
245-
// Validate we received the expected number of items (minus skipped ones)
246-
if (enqueuedCount !== expectedAfterSkips) {
169+
// Validate we received the expected number of items
170+
if (enqueuedCount !== batch.runCount) {
247171
logger.warn("Batch item count mismatch", {
248172
batchId: batchFriendlyId,
249-
originalExpected: batch.runCount,
250-
expectedAfterSkips,
173+
expected: batch.runCount,
251174
received: enqueuedCount,
252175
itemsAccepted,
253176
itemsDeduplicated,
254-
itemsSkipped,
255177
});
256178

257179
// Don't seal the batch if count doesn't match
@@ -260,27 +182,13 @@ export class StreamBatchItemsService extends WithRunEngine {
260182
id: batchFriendlyId,
261183
itemsAccepted,
262184
itemsDeduplicated,
263-
itemsSkipped: itemsSkipped > 0 ? itemsSkipped : undefined,
264185
sealed: false,
265186
enqueuedCount,
266187
expectedCount: batch.runCount,
267188
runCount: batch.runCount,
268189
};
269190
}
270191

271-
// If items were skipped, update the batch's runCount to match actual enqueued count
272-
// This ensures the batch completes correctly with fewer runs
273-
if (itemsSkipped > 0) {
274-
await this._engine.updateBatchRunCount(batchId, enqueuedCount);
275-
276-
logger.info("Updated batch runCount due to skipped items", {
277-
batchId: batchFriendlyId,
278-
originalRunCount: batch.runCount,
279-
newRunCount: enqueuedCount,
280-
itemsSkipped,
281-
});
282-
}
283-
284192
// Seal the batch - use conditional update to prevent TOCTOU race
285193
// Another concurrent request may have already sealed this batch
286194
const now = new Date();
@@ -295,8 +203,6 @@ export class StreamBatchItemsService extends WithRunEngine {
295203
sealedAt: now,
296204
status: "PROCESSING",
297205
processingStartedAt: now,
298-
// Also update runCount in Postgres if items were skipped
299-
...(itemsSkipped > 0 ? { runCount: enqueuedCount } : {}),
300206
},
301207
});
302208

@@ -319,22 +225,19 @@ export class StreamBatchItemsService extends WithRunEngine {
319225
batchId: batchFriendlyId,
320226
itemsAccepted,
321227
itemsDeduplicated,
322-
itemsSkipped,
323228
envId: environment.id,
324229
});
325230

326231
span.setAttribute("itemsAccepted", itemsAccepted);
327232
span.setAttribute("itemsDeduplicated", itemsDeduplicated);
328-
span.setAttribute("itemsSkipped", itemsSkipped);
329233
span.setAttribute("sealedByConcurrentRequest", true);
330234

331235
return {
332236
id: batchFriendlyId,
333237
itemsAccepted,
334238
itemsDeduplicated,
335-
itemsSkipped: itemsSkipped > 0 ? itemsSkipped : undefined,
336239
sealed: true,
337-
runCount: itemsSkipped > 0 ? enqueuedCount : batch.runCount,
240+
runCount: batch.runCount,
338241
};
339242
}
340243

@@ -359,22 +262,19 @@ export class StreamBatchItemsService extends WithRunEngine {
359262
batchId: batchFriendlyId,
360263
itemsAccepted,
361264
itemsDeduplicated,
362-
itemsSkipped,
363265
totalEnqueued: enqueuedCount,
364266
envId: environment.id,
365267
});
366268

367269
span.setAttribute("itemsAccepted", itemsAccepted);
368270
span.setAttribute("itemsDeduplicated", itemsDeduplicated);
369-
span.setAttribute("itemsSkipped", itemsSkipped);
370271

371272
return {
372273
id: batchFriendlyId,
373274
itemsAccepted,
374275
itemsDeduplicated,
375-
itemsSkipped: itemsSkipped > 0 ? itemsSkipped : undefined,
376276
sealed: true,
377-
runCount: itemsSkipped > 0 ? enqueuedCount : batch.runCount,
277+
runCount: batch.runCount,
378278
};
379279
}
380280
);

0 commit comments

Comments
 (0)