From ba07f23c80d8560e81590a0e7d330d7df91f8db7 Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Fri, 23 Jan 2026 15:47:01 +0000 Subject: [PATCH 1/2] Recover runs that failed to dequeue MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit There’s an edge case that means runs can end up in the currentConcurrency set when they’re not in the correct state for execution. This means they will be permanently stuck in queued. Given an environmentId this will fix those runs. This is a temporary fix while we permanently fix the issue. --- scripts/recover-stuck-runs.ts | 355 ++++++++++++++++++++++++++++++++++ 1 file changed, 355 insertions(+) create mode 100755 scripts/recover-stuck-runs.ts diff --git a/scripts/recover-stuck-runs.ts b/scripts/recover-stuck-runs.ts new file mode 100755 index 0000000000..e0bce9821e --- /dev/null +++ b/scripts/recover-stuck-runs.ts @@ -0,0 +1,355 @@ +#!/usr/bin/env tsx + +/** + * Recovery script for runs stuck in currentConcurrency with QUEUED execution status + * + * PROBLEM: + * During high database load, runs can get dequeued from Redis (added to currentConcurrency) + * but fail to update their execution status in the database. This leaves them stuck in an + * inconsistent state where they won't be re-dequeued because they're marked as "in progress" + * in Redis, but their database state still shows QUEUED. + * + * SOLUTION: + * This script identifies and recovers these stuck runs by: + * 1. Reading from the environment currentConcurrency Redis set + * 2. Checking which runs have QUEUED execution status (inconsistent state) + * 3. Re-adding them to their specific queue sorted sets + * 4. Removing them from the queue-specific currentConcurrency sets + * + * SAFETY: + * - Dry-run mode when no write Redis URL is provided (read-only, no writes) + * - Uses separate Redis connections for reads and writes + * - Write connection only created when redisWriteUrl is provided + * + * ARGUMENTS: + * The Trigger.dev environment ID (e.g., env_abc123) + * PostgreSQL connection string + * Redis connection string for reads (redis:// or rediss://) + * [redisWriteUrl] Optional Redis connection string for writes (omit for dry-run) + * + * USAGE: + * tsx scripts/recover-stuck-runs.ts [redisWriteUrl] + * + * EXAMPLES: + * + * Dry-run mode (safe, no writes): + * tsx scripts/recover-stuck-runs.ts env_1234567890 \ + * "postgresql://user:pass@localhost:5432/triggerdev" \ + * "redis://readonly.example.com:6379" + * + * Execute mode (makes actual changes): + * tsx scripts/recover-stuck-runs.ts env_1234567890 \ + * "postgresql://user:pass@localhost:5432/triggerdev" \ + * "redis://readonly.example.com:6379" \ + * "redis://writeonly.example.com:6379" + */ + +import { PrismaClient, TaskRunExecutionStatus } from "@trigger.dev/database"; +import { createRedisClient } from "@internal/redis"; + +interface StuckRun { + runId: string; + orgId: string; + projectId: string; + environmentId: string; + queue: string; + concurrencyKey: string | null; + executionStatus: TaskRunExecutionStatus; + snapshotCreatedAt: Date; + taskIdentifier: string; +} + +interface RedisOperation { + type: "ZADD" | "SREM"; + key: string; + args: (string | number)[]; + description: string; +} + +async function main() { + const [environmentId, postgresUrl, redisReadUrl, redisWriteUrl] = process.argv.slice(2); + + if (!environmentId || !postgresUrl || !redisReadUrl) { + console.error("Usage: tsx scripts/recover-stuck-runs.ts [redisWriteUrl]"); + console.error(""); + console.error("Dry-run mode when no redisWriteUrl is provided (read-only)."); + console.error("Execute mode when redisWriteUrl is provided (makes actual changes)."); + console.error(""); + console.error("Example (dry-run):"); + console.error(' tsx scripts/recover-stuck-runs.ts env_1234567890 \\'); + console.error(' "postgresql://user:pass@localhost:5432/triggerdev" \\'); + console.error(' "redis://readonly.example.com:6379"'); + console.error(""); + console.error("Example (execute):"); + console.error(' tsx scripts/recover-stuck-runs.ts env_1234567890 \\'); + console.error(' "postgresql://user:pass@localhost:5432/triggerdev" \\'); + console.error(' "redis://readonly.example.com:6379" \\'); + console.error(' "redis://writeonly.example.com:6379"'); + process.exit(1); + } + + const executeMode = !!redisWriteUrl; + + if (executeMode) { + console.log("⚠️ EXECUTE MODE - Changes will be made to Redis\n"); + } else { + console.log("πŸ” DRY RUN MODE - No changes will be made to Redis\n"); + } + + console.log(`πŸ” Scanning for stuck runs in environment: ${environmentId}`); + + // Create Prisma client with the provided connection URL + const prisma = new PrismaClient({ + datasources: { + db: { + url: postgresUrl, + }, + }, + }); + + try { + // Get environment details + const environment = await prisma.runtimeEnvironment.findUnique({ + where: { id: environmentId }, + include: { + organization: true, + project: true, + }, + }); + + if (!environment) { + console.error(`❌ Environment not found: ${environmentId}`); + process.exit(1); + } + + console.log(`πŸ“ Environment: ${environment.slug} (${environment.type})`); + console.log(`πŸ“ Organization: ${environment.organization.slug}`); + console.log(`πŸ“ Project: ${environment.project.slug}`); + + // Parse Redis read URL + const redisReadUrlObj = new URL(redisReadUrl); + const redisReadOptions = { + host: redisReadUrlObj.hostname, + port: parseInt(redisReadUrlObj.port || "6379"), + username: redisReadUrlObj.username || undefined, + password: redisReadUrlObj.password || undefined, + enableAutoPipelining: false, + ...(redisReadUrlObj.protocol === "rediss:" + ? { + tls: { + // If connecting via localhost tunnel to a remote Redis, disable cert verification + rejectUnauthorized: redisReadUrlObj.hostname === "localhost" ? false : true, + }, + } + : {}), + }; + + // Create Redis read client + const redisRead = createRedisClient(redisReadOptions); + + // Create Redis write client if redisWriteUrl is provided + let redisWrite = null; + if (redisWriteUrl) { + const redisWriteUrlObj = new URL(redisWriteUrl); + const redisWriteOptions = { + host: redisWriteUrlObj.hostname, + port: parseInt(redisWriteUrlObj.port || "6379"), + username: redisWriteUrlObj.username || undefined, + password: redisWriteUrlObj.password || undefined, + enableAutoPipelining: false, + ...(redisWriteUrlObj.protocol === "rediss:" + ? { + tls: { + // If connecting via localhost tunnel to a remote Redis, disable cert verification + rejectUnauthorized: redisWriteUrlObj.hostname === "localhost" ? false : true, + }, + } + : {}), + }; + redisWrite = createRedisClient(redisWriteOptions); + } + + try { + // Build the Redis key for environment-level currentConcurrency set + // Format: engine:runqueue:{org:X}:proj:Y:env:Z:currentConcurrency + const envConcurrencyKey = `engine:runqueue:{org:${environment.organizationId}}:proj:${environment.projectId}:env:${environmentId}:currentConcurrency`; + + console.log(`\nπŸ”‘ Checking Redis key: ${envConcurrencyKey}`); + + // Get all run IDs in the environment's currentConcurrency set + const runIds = await redisRead.smembers(envConcurrencyKey); + + if (runIds.length === 0) { + console.log(`βœ… No runs in currentConcurrency set`); + return; + } + + console.log(`πŸ“Š Found ${runIds.length} runs in currentConcurrency set`); + + // Query database for latest snapshots and queue info of these runs + const runInfo = await prisma.$queryRaw< + Array<{ + runId: string; + executionStatus: TaskRunExecutionStatus; + snapshotCreatedAt: Date; + organizationId: string; + projectId: string; + environmentId: string; + taskIdentifier: string; + queue: string; + concurrencyKey: string | null; + }> + >` + SELECT DISTINCT ON (s."runId") + s."runId", + s."executionStatus", + s."createdAt" as "snapshotCreatedAt", + r."organizationId", + r."projectId", + r."runtimeEnvironmentId" as "environmentId", + r."taskIdentifier", + r."queue", + r."concurrencyKey" + FROM "TaskRunExecutionSnapshot" s + INNER JOIN "TaskRun" r ON r.id = s."runId" + WHERE s."runId" = ANY(${runIds}) + AND s."isValid" = true + ORDER BY s."runId", s."createdAt" DESC + `; + + const stuckRuns: StuckRun[] = []; + + // Find runs with QUEUED execution status (inconsistent state) + for (const info of runInfo) { + if (info.executionStatus === "QUEUED") { + stuckRuns.push({ + runId: info.runId, + orgId: info.organizationId, + projectId: info.projectId, + environmentId: info.environmentId, + queue: info.queue, + concurrencyKey: info.concurrencyKey, + executionStatus: info.executionStatus, + snapshotCreatedAt: info.snapshotCreatedAt, + taskIdentifier: info.taskIdentifier, + }); + } + } + + if (stuckRuns.length === 0) { + console.log(`βœ… No stuck runs found (all runs have progressed beyond QUEUED state)`); + return; + } + + console.log(`\n⚠️ Found ${stuckRuns.length} stuck runs in QUEUED state:`); + console.log(`════════════════════════════════════════════════════════════════`); + + for (const run of stuckRuns) { + const age = Date.now() - run.snapshotCreatedAt.getTime(); + const ageMinutes = Math.floor(age / 1000 / 60); + console.log(` β€’ Run: ${run.runId}`); + console.log(` Task: ${run.taskIdentifier}`); + console.log(` Queue: ${run.queue}`); + console.log(` Concurrency Key: ${run.concurrencyKey || "(none)"}`); + console.log(` Status: ${run.executionStatus}`); + console.log(` Stuck for: ${ageMinutes} minutes`); + console.log(` Snapshot created: ${run.snapshotCreatedAt.toISOString()}`); + console.log(); + } + + // Prepare recovery operations + console.log(`\n⚑ ${executeMode ? "Executing" : "Planning"} recovery for ${stuckRuns.length} stuck runs`); + console.log(`This will:`); + console.log(` 1. Add each run back to its specific queue sorted set`); + console.log(` 2. Remove each run from the queue-specific currentConcurrency set`); + console.log(); + + let successCount = 0; + let failureCount = 0; + + const currentTimestamp = Date.now(); + + for (const run of stuckRuns) { + try { + // Build queue key: engine:runqueue:{org:X}:proj:Y:env:Z:queue:QUEUENAME + // Build queue currentConcurrency key: engine:runqueue:{org:X}:proj:Y:env:Z:queue:QUEUENAME:currentConcurrency + const queueKey = run.concurrencyKey + ? `engine:runqueue:{org:${run.orgId}}:proj:${run.projectId}:env:${run.environmentId}:queue:${run.queue}:ck:${run.concurrencyKey}` + : `engine:runqueue:{org:${run.orgId}}:proj:${run.projectId}:env:${run.environmentId}:queue:${run.queue}`; + + const queueConcurrencyKey = `${queueKey}:currentConcurrency`; + + const operations: RedisOperation[] = [ + { + type: "ZADD", + key: queueKey, + args: [currentTimestamp, run.runId], + description: `Add run to queue sorted set with score ${currentTimestamp}`, + }, + { + type: "SREM", + key: queueConcurrencyKey, + args: [run.runId], + description: `Remove run from queue currentConcurrency set`, + }, + ]; + + if (executeMode && redisWrite) { + // Execute operations using the write client + await redisWrite.zadd(queueKey, currentTimestamp, run.runId); + const removed = await redisWrite.srem(queueConcurrencyKey, run.runId); + + console.log(` βœ“ Recovered run ${run.runId} (${run.taskIdentifier})`); + if (removed === 0) { + console.log(` ⚠ Run was not in queue currentConcurrency set`); + } + successCount++; + } else { + // Dry run - just show what would be done + console.log(` πŸ“ Would recover run ${run.runId} (${run.taskIdentifier}):`); + for (const op of operations) { + console.log(` ${op.type} ${op.key}`); + console.log(` Args: ${JSON.stringify(op.args)}`); + console.log(` (${op.description})`); + } + successCount++; + } + } catch (error) { + console.error(` βœ— Failed to recover run ${run.runId}:`, error); + failureCount++; + } + } + + console.log(`\n═══════════════════════════════════════════════════════════════`); + if (executeMode) { + console.log(`βœ… Recovery complete!`); + console.log(` Recovered: ${successCount}`); + console.log(` Failed: ${failureCount}`); + console.log(); + console.log(`ℹ️ Note: The recovered runs should be automatically dequeued`); + console.log(` by the master queue consumers within a few seconds.`); + } else { + console.log(`πŸ“‹ Dry run complete - no changes were made`); + console.log(` Would recover: ${successCount}`); + console.log(` Would fail: ${failureCount}`); + console.log(); + console.log(`πŸ’‘ To execute these changes, run again with a redisWriteUrl argument`); + } + } finally { + await redisRead.quit(); + if (redisWrite) { + await redisWrite.quit(); + } + } + } catch (error) { + console.error("❌ Error during recovery:", error); + throw error; + } finally { + await prisma.$disconnect(); + } +} + +main().catch((error) => { + console.error("Fatal error:", error); + process.exit(1); +}); From 3367f37a4e1c4450794f5df86702cd864275dc69 Mon Sep 17 00:00:00 2001 From: "claude[bot]" <41898282+claude[bot]@users.noreply.github.com> Date: Fri, 23 Jan 2026 16:17:37 +0000 Subject: [PATCH 2/2] fix: remove runs from env-level currentConcurrency set during recovery The recovery script was only removing runs from the queue-level currentConcurrency set but not from the environment-level set. This could leave runs stuck consuming environment-level concurrency capacity and blocking future dequeues. Co-authored-by: Eric Allam --- scripts/recover-stuck-runs.ts | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/scripts/recover-stuck-runs.ts b/scripts/recover-stuck-runs.ts index e0bce9821e..15deeb899c 100755 --- a/scripts/recover-stuck-runs.ts +++ b/scripts/recover-stuck-runs.ts @@ -15,6 +15,7 @@ * 2. Checking which runs have QUEUED execution status (inconsistent state) * 3. Re-adding them to their specific queue sorted sets * 4. Removing them from the queue-specific currentConcurrency sets + * 5. Removing them from the environment-level currentConcurrency set * * SAFETY: * - Dry-run mode when no write Redis URL is provided (read-only, no writes) @@ -262,6 +263,7 @@ async function main() { console.log(`This will:`); console.log(` 1. Add each run back to its specific queue sorted set`); console.log(` 2. Remove each run from the queue-specific currentConcurrency set`); + console.log(` 3. Remove each run from the env-level currentConcurrency set`); console.log(); let successCount = 0; @@ -292,17 +294,27 @@ async function main() { args: [run.runId], description: `Remove run from queue currentConcurrency set`, }, + { + type: "SREM", + key: envConcurrencyKey, + args: [run.runId], + description: `Remove run from env currentConcurrency set`, + }, ]; if (executeMode && redisWrite) { // Execute operations using the write client await redisWrite.zadd(queueKey, currentTimestamp, run.runId); - const removed = await redisWrite.srem(queueConcurrencyKey, run.runId); + const removedFromQueue = await redisWrite.srem(queueConcurrencyKey, run.runId); + const removedFromEnv = await redisWrite.srem(envConcurrencyKey, run.runId); console.log(` βœ“ Recovered run ${run.runId} (${run.taskIdentifier})`); - if (removed === 0) { + if (removedFromQueue === 0) { console.log(` ⚠ Run was not in queue currentConcurrency set`); } + if (removedFromEnv === 0) { + console.log(` ⚠ Run was not in env currentConcurrency set`); + } successCount++; } else { // Dry run - just show what would be done