From 812af117dc3c00ea7afaf0a0cac07bf0479a3725 Mon Sep 17 00:00:00 2001 From: Isaac <91521821+isimisi@users.noreply.github.com> Date: Fri, 15 May 2026 10:41:30 +0200 Subject: [PATCH 1/4] perf(redis): use sorted set index for O(log N) schedule claiming --- src/drivers/redis_adapter.ts | 214 ++++++++++++++++++++++++----------- 1 file changed, 148 insertions(+), 66 deletions(-) diff --git a/src/drivers/redis_adapter.ts b/src/drivers/redis_adapter.ts index 27fe11d..12e6ba8 100644 --- a/src/drivers/redis_adapter.ts +++ b/src/drivers/redis_adapter.ts @@ -16,6 +16,7 @@ import { resolveRetention } from '../utils.js' const redisKey = 'jobs' const schedulesKey = 'schedules' const schedulesIndexKey = 'schedules::index' +const schedulesDueKey = 'schedules::due' type RedisConfig = Redis | RedisOptions /** @@ -352,76 +353,98 @@ const GET_JOB_SCRIPT = ` ` /** - * Lua script for atomically claiming a due schedule. - * Iterates the schedule index server-side and claims the first due schedule. - * Returns the schedule data if claimed, nil otherwise. + * Lua script for atomically claiming a due schedule using a sorted set index. + * + * Uses ZRANGEBYSCORE on schedules::due (scored by next_run_at) for O(log N) + * lookup instead of scanning all schedule hashes via SMEMBERS. + * + * Stale entries (paused, exhausted, deleted) are cleaned from the ZSET on + * sight so subsequent calls skip them. + * + * KEYS[1] = schedules::due (the ZSET) + * KEYS[2] = schedule key prefix (e.g. "schedules::") + * ARGV[1] = now (epoch milliseconds) */ const CLAIM_SCHEDULE_SCRIPT = ` - local schedules_index_key = KEYS[1] - local schedule_key_prefix = KEYS[2] + local due_key = KEYS[1] + local prefix = KEYS[2] local now = tonumber(ARGV[1]) - local ids = redis.call('SMEMBERS', schedules_index_key) + while true do + local candidates = redis.call('ZRANGEBYSCORE', due_key, '-inf', tostring(now), 'LIMIT', 0, 1) - for i = 1, #ids do - local schedule_key = schedule_key_prefix .. ids[i] + if #candidates == 0 then + return nil + end + + local id = candidates[1] + local schedule_key = prefix .. id -- Get schedule data local data = redis.call('HGETALL', schedule_key) - if #data > 0 then + + -- Deleted schedule still in ZSET + if #data == 0 then + redis.call('ZREM', due_key, id) + else -- Convert HGETALL result to table local schedule = {} for j = 1, #data, 2 do schedule[data[j]] = data[j + 1] end - -- Check if schedule is due - if schedule.status == 'active' then - local next_run_at = tonumber(schedule.next_run_at) - - if next_run_at and next_run_at <= now then - local run_count = tonumber(schedule.run_count or '0') - local run_limit = schedule.run_limit and tonumber(schedule.run_limit) or nil - local to_date = schedule.to_date and tonumber(schedule.to_date) or nil - - -- Check limits - if not (run_limit and run_count >= run_limit) and not (to_date and now > to_date) then - -- This schedule is claimable - atomically update it - local new_run_count = run_count + 1 - - -- Calculate new next_run_at (simple interval-based for now) - -- Complex cron calculation happens in the caller - local new_next_run_at = '' - local every_ms = schedule.every_ms and tonumber(schedule.every_ms) or nil - if every_ms then - new_next_run_at = tostring(now + every_ms) - end - - -- Check if we've hit the limit after this run - if run_limit and new_run_count >= run_limit then - new_next_run_at = '' - end - - -- Check if past end date - if to_date and new_next_run_at ~= '' and tonumber(new_next_run_at) > to_date then - new_next_run_at = '' - end - - -- Update the schedule atomically - redis.call('HSET', schedule_key, - 'next_run_at', new_next_run_at, - 'last_run_at', tostring(now), - 'run_count', tostring(new_run_count)) - - -- Return the schedule data (before update) as JSON - return cjson.encode(schedule) + -- Check if schedule is active + if schedule.status ~= 'active' then + redis.call('ZREM', due_key, id) + else + local run_count = tonumber(schedule.run_count or '0') + local run_limit = schedule.run_limit and tonumber(schedule.run_limit) or nil + local to_date = schedule.to_date and tonumber(schedule.to_date) or nil + + -- Check limits + if (run_limit and run_count >= run_limit) or (to_date and now > to_date) then + redis.call('ZREM', due_key, id) + else + -- This schedule is claimable - atomically update it + local new_run_count = run_count + 1 + + -- Calculate new next_run_at (simple interval-based for now) + -- Complex cron calculation happens in the caller + local new_next_run_at = '' + local every_ms = schedule.every_ms and tonumber(schedule.every_ms) or nil + if every_ms then + new_next_run_at = tostring(now + every_ms) + end + + -- Check if we've hit the limit after this run + if run_limit and new_run_count >= run_limit then + new_next_run_at = '' + end + + -- Check if past end date + if to_date and new_next_run_at ~= '' and tonumber(new_next_run_at) > to_date then + new_next_run_at = '' end + + -- Update the schedule atomically + redis.call('HSET', schedule_key, + 'next_run_at', new_next_run_at, + 'last_run_at', tostring(now), + 'run_count', tostring(new_run_count)) + + -- Update or remove from ZSET + if new_next_run_at ~= '' then + redis.call('ZADD', due_key, tonumber(new_next_run_at), id) + else + redis.call('ZREM', due_key, id) + end + + -- Return the schedule data (before update) as JSON + return cjson.encode(schedule) end end end end - - return nil ` /** @@ -700,10 +723,11 @@ export class RedisAdapter implements Adapter { const id = config.id ?? randomUUID() const now = Date.now() const scheduleKey = `${schedulesKey}::${id}` - const [existingRunCount, existingCreatedAt] = await this.#connection.hmget( + const [existingRunCount, existingCreatedAt, existingNextRunAt] = await this.#connection.hmget( scheduleKey, 'run_count', - 'created_at' + 'created_at', + 'next_run_at' ) const scheduleData: Record = { @@ -722,13 +746,17 @@ export class RedisAdapter implements Adapter { if (config.to !== undefined) scheduleData.to_date = config.to.getTime().toString() if (config.limit !== undefined) scheduleData.run_limit = config.limit.toString() - // Upsert schedule and clear stale optional fields from previous config. - await this.#connection + const multi = this.#connection .multi() .hdel(scheduleKey, 'cron_expression', 'every_ms', 'from_date', 'to_date', 'run_limit') .hset(scheduleKey, scheduleData) .sadd(schedulesIndexKey, id) - .exec() + + if (existingNextRunAt) { + multi.zadd(schedulesDueKey, Number.parseInt(existingNextRunAt, 10), id) + } + + await multi.exec() return id } @@ -804,14 +832,34 @@ export class RedisAdapter implements Adapter { } if (updates.runCount !== undefined) data.run_count = updates.runCount.toString() - if (Object.keys(data).length > 0) { - await this.#connection.hset(scheduleKey, data) + if (Object.keys(data).length === 0) return + + const multi = this.#connection.multi().hset(scheduleKey, data) + + if (updates.nextRunAt) { + multi.zadd(schedulesDueKey, updates.nextRunAt.getTime(), id) + } else if (updates.nextRunAt === null || updates.status === 'paused') { + multi.zrem(schedulesDueKey, id) + } + + if (updates.status === 'active' && updates.nextRunAt === undefined) { + const existing = await this.#connection.hget(scheduleKey, 'next_run_at') + if (existing) { + multi.zadd(schedulesDueKey, Number.parseInt(existing, 10), id) + } } + + await multi.exec() } async deleteSchedule(id: string): Promise { const scheduleKey = `${schedulesKey}::${id}` - await this.#connection.multi().del(scheduleKey).srem(schedulesIndexKey, id).exec() + await this.#connection + .multi() + .del(scheduleKey) + .srem(schedulesIndexKey, id) + .zrem(schedulesDueKey, id) + .exec() } async claimDueSchedule(): Promise { @@ -819,7 +867,7 @@ export class RedisAdapter implements Adapter { const result = await this.#connection.eval( CLAIM_SCHEDULE_SCRIPT, 2, - schedulesIndexKey, + schedulesDueKey, `${schedulesKey}::`, now.toString() ) @@ -841,7 +889,6 @@ export class RedisAdapter implements Adapter { }) const nextRun = cron.next().toDate().getTime() - // Check limits before updating const runCount = Number.parseInt(data.run_count || '0', 10) + 1 const runLimit = data.run_limit ? Number.parseInt(data.run_limit, 10) : null const toDate = data.to_date ? Number.parseInt(data.to_date, 10) : null @@ -854,16 +901,51 @@ export class RedisAdapter implements Adapter { newNextRunAt = '' } - await this.#connection.hset( - `${schedulesKey}::${data.id}`, - 'next_run_at', - newNextRunAt.toString() - ) + const scheduleKey = `${schedulesKey}::${data.id}` + const multi = this.#connection + .multi() + .hset(scheduleKey, 'next_run_at', newNextRunAt.toString()) + + if (typeof newNextRunAt === 'number') { + multi.zadd(schedulesDueKey, newNextRunAt, data.id) + } else { + multi.zrem(schedulesDueKey, data.id) + } + + await multi.exec() } return this.#hashToScheduleData(data) } + async backfillDueIndex(): Promise { + const ids = await this.#connection.smembers(schedulesIndexKey) + if (ids.length === 0) return 0 + + const pipeline = this.#connection.pipeline() + for (const id of ids) { + pipeline.hmget(`${schedulesKey}::${id}`, 'next_run_at', 'status') + } + const results = await pipeline.exec() + if (!results) return 0 + + const addPipeline = this.#connection.pipeline() + let count = 0 + + for (let i = 0; i < ids.length; i++) { + const [err, values] = results[i] + if (err || !values) continue + const [nextRunAt, status] = values as [string | null, string | null] + if (nextRunAt && status === 'active') { + addPipeline.zadd(schedulesDueKey, Number.parseInt(nextRunAt, 10), ids[i]) + count++ + } + } + + if (count > 0) await addPipeline.exec() + return count + } + #hashToScheduleData(data: Record): ScheduleData { return { id: data.id, From dd967dba82cf88a689295a6e1567da958e62fd16 Mon Sep 17 00:00:00 2001 From: Isaac <91521821+isimisi@users.noreply.github.com> Date: Tue, 2 Jun 2026 13:53:43 +0200 Subject: [PATCH 2/4] fix(redis): auto-backfill due index on first schedule claim Existing users upgrading will have schedules in the legacy format (hashes + SET) but not in the new ZSET. Run backfillDueIndex() once per worker process on the first claimDueSchedule() call so schedules keep firing without manual intervention. --- src/drivers/redis_adapter.ts | 9 +++++++++ tests/adapter.spec.ts | 3 +++ 2 files changed, 12 insertions(+) diff --git a/src/drivers/redis_adapter.ts b/src/drivers/redis_adapter.ts index 12e6ba8..509ce5c 100644 --- a/src/drivers/redis_adapter.ts +++ b/src/drivers/redis_adapter.ts @@ -480,6 +480,7 @@ export class RedisAdapter implements Adapter { readonly #connection: Redis readonly #ownsConnection: boolean #workerId: string = '' + #dueIndexReady = false constructor(connection: Redis, ownsConnection: boolean = false) { this.#connection = connection @@ -862,7 +863,15 @@ export class RedisAdapter implements Adapter { .exec() } + async #ensureDueIndex(): Promise { + if (this.#dueIndexReady) return + await this.backfillDueIndex() + this.#dueIndexReady = true + } + async claimDueSchedule(): Promise { + await this.#ensureDueIndex() + const now = Date.now() const result = await this.#connection.eval( CLAIM_SCHEDULE_SCRIPT, diff --git a/tests/adapter.spec.ts b/tests/adapter.spec.ts index a276625..869dc75 100644 --- a/tests/adapter.spec.ts +++ b/tests/adapter.spec.ts @@ -99,6 +99,9 @@ test.group('Adapter | Redis', (group) => { await adapter.updateSchedule(id, { nextRunAt: futureRunAt }) } + // Warm the due-index backfill so it doesn't count against the spy + await adapter.claimDueSchedule() + const { result: claimed, writes } = await withRedisWriteSpy({ connection, run: () => adapter.claimDueSchedule(), From f3c85cd18b8e37575da976cba3d6d2f484454cdf Mon Sep 17 00:00:00 2001 From: Isaac <91521821+isimisi@users.noreply.github.com> Date: Wed, 3 Jun 2026 12:29:12 +0200 Subject: [PATCH 3/4] perf(redis): update claim schedule script to use sorted set index --- src/drivers/redis_scripts.ts | 140 ++++++++++++++++++++--------------- 1 file changed, 81 insertions(+), 59 deletions(-) diff --git a/src/drivers/redis_scripts.ts b/src/drivers/redis_scripts.ts index e0ff6dd..2f8cedf 100644 --- a/src/drivers/redis_scripts.ts +++ b/src/drivers/redis_scripts.ts @@ -1,4 +1,4 @@ -import { REDIS_DEDUP_LUA, REDIS_JOB_STORAGE_LUA } from './redis_job_storage.js' +import { REDIS_DEDUP_LUA, REDIS_JOB_STORAGE_LUA } from './redis_job_storage.js'; /** * Lua script for pushing a job to the queue. @@ -18,7 +18,7 @@ ${REDIS_JOB_STORAGE_LUA} redis.call('ZADD', pending_key, score, job_id) return 1 -` +`; /** * Lua script for pushing a dedup job. @@ -80,7 +80,7 @@ ${REDIS_DEDUP_LUA} redis.call('PEXPIRE', dedup_key, ttl) end return {'added', job_id} -` +`; /** * Lua script for pushing a delayed job. @@ -100,7 +100,7 @@ ${REDIS_JOB_STORAGE_LUA} redis.call('ZADD', delayed_key, execute_at, job_id) return 1 -` +`; /** * Lua script for atomic job acquisition. @@ -158,7 +158,7 @@ ${REDIS_JOB_STORAGE_LUA} return encode_job_result(job_data, overlay_key, job_id, { acquiredAt = now }) -` +`; /** * Lua script for removing a job completely (no history). @@ -193,7 +193,7 @@ ${REDIS_JOB_STORAGE_LUA} delete_job_data(data_key, overlay_key, job_id) return 1 -` +`; /** * Lua script for finalizing a job in history. @@ -277,7 +277,7 @@ ${REDIS_JOB_STORAGE_LUA} end return 1 -` +`; /** * Lua script for retrying a job. @@ -330,7 +330,7 @@ ${REDIS_JOB_STORAGE_LUA} end return 1 -` +`; /** * Lua script for recovering stalled jobs. @@ -399,7 +399,7 @@ ${REDIS_JOB_STORAGE_LUA} end return recovered -` +`; /** * Lua script for getting a job record with its status. @@ -458,77 +458,99 @@ ${REDIS_JOB_STORAGE_LUA} finishedAt = finished_at, error = error_msg }) -` +`; /** - * Lua script for atomically claiming a due schedule. - * Iterates the schedule index server-side and claims the first due schedule. - * Returns the schedule data if claimed, nil otherwise. + * Lua script for atomically claiming a due schedule using a sorted set index. + * + * Uses ZRANGEBYSCORE on schedules::due (scored by next_run_at) for O(log N) + * lookup instead of scanning all schedule hashes via SMEMBERS. + * + * Stale entries (paused, exhausted, deleted) are cleaned from the ZSET on + * sight so subsequent calls skip them. + * + * KEYS[1] = schedules::due (the ZSET) + * KEYS[2] = schedule key prefix (e.g. "schedules::") + * ARGV[1] = now (epoch milliseconds) */ export const CLAIM_SCHEDULE_SCRIPT = ` - local schedules_index_key = KEYS[1] - local schedule_key_prefix = KEYS[2] + local due_key = KEYS[1] + local prefix = KEYS[2] local now = tonumber(ARGV[1]) - local ids = redis.call('SMEMBERS', schedules_index_key) + while true do + local candidates = redis.call('ZRANGEBYSCORE', due_key, '-inf', tostring(now), 'LIMIT', 0, 1) + + if #candidates == 0 then + return nil + end - for i = 1, #ids do - local schedule_key = schedule_key_prefix .. ids[i] + local id = candidates[1] + local schedule_key = prefix .. id -- Get schedule data local data = redis.call('HGETALL', schedule_key) - if #data > 0 then + + -- Deleted schedule still in ZSET + if #data == 0 then + redis.call('ZREM', due_key, id) + else -- Convert HGETALL result to table local schedule = {} for j = 1, #data, 2 do schedule[data[j]] = data[j + 1] end - -- Check if schedule is due - if schedule.status == 'active' then - local next_run_at = tonumber(schedule.next_run_at) - - if next_run_at and next_run_at <= now then - local run_count = tonumber(schedule.run_count or '0') - local run_limit = schedule.run_limit and tonumber(schedule.run_limit) or nil - local to_date = schedule.to_date and tonumber(schedule.to_date) or nil - - -- Check limits - if not (run_limit and run_count >= run_limit) and not (to_date and now > to_date) then - -- This schedule is claimable - atomically update it - local new_run_count = run_count + 1 - - -- Calculate new next_run_at (simple interval-based for now) - -- Complex cron calculation happens in the caller - local new_next_run_at = '' - local every_ms = schedule.every_ms and tonumber(schedule.every_ms) or nil - if every_ms then - new_next_run_at = tostring(now + every_ms) - end - - -- Check if we've hit the limit after this run - if run_limit and new_run_count >= run_limit then - new_next_run_at = '' - end + -- Check if schedule is active + if schedule.status ~= 'active' then + redis.call('ZREM', due_key, id) + else + local run_count = tonumber(schedule.run_count or '0') + local run_limit = schedule.run_limit and tonumber(schedule.run_limit) or nil + local to_date = schedule.to_date and tonumber(schedule.to_date) or nil + + -- Check limits + if (run_limit and run_count >= run_limit) or (to_date and now > to_date) then + redis.call('ZREM', due_key, id) + else + -- This schedule is claimable - atomically update it + local new_run_count = run_count + 1 + + -- Calculate new next_run_at (simple interval-based for now) + -- Complex cron calculation happens in the caller + local new_next_run_at = '' + local every_ms = schedule.every_ms and tonumber(schedule.every_ms) or nil + if every_ms then + new_next_run_at = tostring(now + every_ms) + end - -- Check if past end date - if to_date and new_next_run_at ~= '' and tonumber(new_next_run_at) > to_date then - new_next_run_at = '' - end + -- Check if we've hit the limit after this run + if run_limit and new_run_count >= run_limit then + new_next_run_at = '' + end - -- Update the schedule atomically - redis.call('HSET', schedule_key, - 'next_run_at', new_next_run_at, - 'last_run_at', tostring(now), - 'run_count', tostring(new_run_count)) + -- Check if past end date + if to_date and new_next_run_at ~= '' and tonumber(new_next_run_at) > to_date then + new_next_run_at = '' + end - -- Return the schedule data (before update) as JSON - return cjson.encode(schedule) + -- Update the schedule atomically + redis.call('HSET', schedule_key, + 'next_run_at', new_next_run_at, + 'last_run_at', tostring(now), + 'run_count', tostring(new_run_count)) + + -- Update or remove from ZSET + if new_next_run_at ~= '' then + redis.call('ZADD', due_key, tonumber(new_next_run_at), id) + else + redis.call('ZREM', due_key, id) end + + -- Return the schedule data (before update) as JSON + return cjson.encode(schedule) end end end end - - return nil -` +`; From 408b18e3bc6ebffbdba2b6596a42666050c91455 Mon Sep 17 00:00:00 2001 From: Isaac <91521821+isimisi@users.noreply.github.com> Date: Mon, 8 Jun 2026 10:07:00 +0200 Subject: [PATCH 4/4] feat(adapter): add migrate() to interface, validate hash in claim script MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace auto-backfill with an explicit migrate() lifecycle method on the Adapter interface. Remove #ensureDueIndex() and #dueIndexReady from RedisAdapter — users call migrate() once after upgrading to populate the schedules::due ZSET from pre-existing data. The Lua claim script now validates the hash's next_run_at before claiming, repairing stale ZSET scores on sight. This keeps the hash canonical and the ZSET as a derived index. --- src/contracts/adapter.ts | 10 + src/drivers/fake_adapter.ts | 4 + src/drivers/knex_adapter.ts | 2 + src/drivers/redis_adapter.ts | 1312 +++++++++++++++----------------- src/drivers/redis_scripts.ts | 31 +- src/drivers/sync_adapter.ts | 4 + tests/_mocks/memory_adapter.ts | 4 + tests/adapter.spec.ts | 89 ++- 8 files changed, 753 insertions(+), 703 deletions(-) diff --git a/src/contracts/adapter.ts b/src/contracts/adapter.ts index 8bd8e12..c7a7fdd 100644 --- a/src/contracts/adapter.ts +++ b/src/contracts/adapter.ts @@ -205,6 +205,16 @@ export interface Adapter { */ destroy(): Promise + /** + * Run adapter-specific migrations needed after a major version upgrade. + * + * This method is idempotent — it is always safe to call multiple times. + * Adapters that have no pending migrations return immediately. + * + * Call this once during your deployment process before starting workers. + */ + migrate(): Promise + /** * Create or update a schedule. * diff --git a/src/drivers/fake_adapter.ts b/src/drivers/fake_adapter.ts index 3671cc4..1b98825 100644 --- a/src/drivers/fake_adapter.ts +++ b/src/drivers/fake_adapter.ts @@ -385,6 +385,10 @@ export class FakeAdapter implements Adapter { return Promise.resolve() } + migrate(): Promise { + return Promise.resolve() + } + async upsertSchedule(config: ScheduleConfig): Promise { const id = config.id ?? randomUUID() const existing = this.#schedules.get(id) diff --git a/src/drivers/knex_adapter.ts b/src/drivers/knex_adapter.ts index adeb610..7c43b03 100644 --- a/src/drivers/knex_adapter.ts +++ b/src/drivers/knex_adapter.ts @@ -88,6 +88,8 @@ export class KnexAdapter implements Adapter { } } + async migrate(): Promise {} + async pop(): Promise { return this.popFrom('default') } diff --git a/src/drivers/redis_adapter.ts b/src/drivers/redis_adapter.ts index 8f384a6..178bb2c 100644 --- a/src/drivers/redis_adapter.ts +++ b/src/drivers/redis_adapter.ts @@ -1,40 +1,37 @@ -import { randomUUID } from 'node:crypto'; -import { Redis, type RedisOptions } from 'ioredis'; -import { DEFAULT_PRIORITY } from '../constants.js'; -import { calculateScore } from '../utils.js'; -import type { Adapter, AcquiredJob, PushResult } from '../contracts/adapter.js'; -import type { DedupOutcome } from '../types/main.js'; +import { randomUUID } from 'node:crypto' +import { Redis, type RedisOptions } from 'ioredis' +import { DEFAULT_PRIORITY } from '../constants.js' +import { calculateScore } from '../utils.js' +import type { Adapter, AcquiredJob, PushResult } from '../contracts/adapter.js' +import type { DedupOutcome } from '../types/main.js' import type { - JobData, - JobRecord, - JobRetention, - ScheduleConfig, - ScheduleData, - ScheduleListOptions, -} from '../types/main.js'; -import { resolveRetention } from '../utils.js'; + JobData, + JobRecord, + JobRetention, + ScheduleConfig, + ScheduleData, + ScheduleListOptions, +} from '../types/main.js' +import { resolveRetention } from '../utils.js' +import { encodeRedisJobPayloadOverlay, hydrateRedisJob } from './redis_job_storage.js' import { - encodeRedisJobPayloadOverlay, - hydrateRedisJob, -} from './redis_job_storage.js'; -import { - ACQUIRE_JOB_SCRIPT, - CLAIM_SCHEDULE_SCRIPT, - FINALIZE_JOB_SCRIPT, - GET_JOB_SCRIPT, - PUSH_DEDUP_JOB_SCRIPT, - PUSH_DELAYED_JOB_SCRIPT, - PUSH_JOB_SCRIPT, - RECOVER_STALLED_JOBS_SCRIPT, - REMOVE_JOB_SCRIPT, - RETRY_JOB_SCRIPT, -} from './redis_scripts.js'; - -const redisKey = 'jobs'; -const schedulesKey = 'schedules'; -const schedulesIndexKey = 'schedules::index'; -const schedulesDueKey = 'schedules::due'; -type RedisConfig = Redis | RedisOptions; + ACQUIRE_JOB_SCRIPT, + CLAIM_SCHEDULE_SCRIPT, + FINALIZE_JOB_SCRIPT, + GET_JOB_SCRIPT, + PUSH_DEDUP_JOB_SCRIPT, + PUSH_DELAYED_JOB_SCRIPT, + PUSH_JOB_SCRIPT, + RECOVER_STALLED_JOBS_SCRIPT, + REMOVE_JOB_SCRIPT, + RETRY_JOB_SCRIPT, +} from './redis_scripts.js' + +const redisKey = 'jobs' +const schedulesKey = 'schedules' +const schedulesIndexKey = 'schedules::index' +const schedulesDueKey = 'schedules::due' +type RedisConfig = Redis | RedisOptions /** * Create a new Redis adapter factory. @@ -47,676 +44,613 @@ type RedisConfig = Redis | RedisOptions; * managing the connection lifecycle. */ export function redis(config?: RedisConfig) { - return () => { - if (config instanceof Redis) { - return new RedisAdapter(config, false); - } - - const options: RedisOptions = { - host: 'localhost', - port: 6379, - keyPrefix: 'boringnode::queue::', - db: 0, - ...config, - }; - - const connection = new Redis(options); - return new RedisAdapter(connection, true); - }; + return () => { + if (config instanceof Redis) { + return new RedisAdapter(config, false) + } + + const options: RedisOptions = { + host: 'localhost', + port: 6379, + keyPrefix: 'boringnode::queue::', + db: 0, + ...config, + } + + const connection = new Redis(options) + return new RedisAdapter(connection, true) + } } export class RedisAdapter implements Adapter { - readonly #connection: Redis; - readonly #ownsConnection: boolean; - #workerId: string = ''; - #dueIndexReady = false; - - constructor(connection: Redis, ownsConnection: boolean = false) { - this.#connection = connection; - this.#ownsConnection = ownsConnection; - } - - #getKeys(queue: string) { - return { - data: `${redisKey}::${queue}::data`, - pending: `${redisKey}::${queue}::pending`, - delayed: `${redisKey}::${queue}::delayed`, - active: `${redisKey}::${queue}::active`, - overlay: `${redisKey}::${queue}::metadata`, - completed: `${redisKey}::${queue}::completed`, - completedIndex: `${redisKey}::${queue}::completed::index`, - failed: `${redisKey}::${queue}::failed`, - failedIndex: `${redisKey}::${queue}::failed::index`, - }; - } - - #getDedupKey(queue: string, dedupId: string): string { - return `${this.#getDedupPrefix(queue)}${dedupId}`; - } - - #getDedupPrefix(queue: string): string { - return `${redisKey}::${queue}::dedup::`; - } - - setWorkerId(workerId: string): void { - this.#workerId = workerId; - } - - async destroy(): Promise { - if (this.#ownsConnection) { - await this.#connection.quit(); - } - } - - pop(): Promise { - return this.popFrom('default'); - } - - async popFrom(queue: string): Promise { - const keys = this.#getKeys(queue); - const now = Date.now(); - - const result = await this.#connection.eval( - ACQUIRE_JOB_SCRIPT, - 5, - keys.data, - keys.pending, - keys.active, - keys.delayed, - keys.overlay, - this.#workerId, - now.toString(), - ); - - if (!result) { - return null; - } - - const { data, overlay, acquiredAt } = JSON.parse(result as string) as { - data: string; - overlay?: string; - acquiredAt: number; - }; - - return { ...hydrateRedisJob(data, overlay), acquiredAt }; - } - - async completeJob( - jobId: string, - queue: string, - removeOnComplete?: JobRetention, - ): Promise { - const keys = this.#getKeys(queue); - const dedupPrefix = this.#getDedupPrefix(queue); - const { keep, maxAge, maxCount } = resolveRetention(removeOnComplete); - - if (!keep) { - await this.#connection.eval( - REMOVE_JOB_SCRIPT, - 3, - keys.data, - keys.active, - keys.overlay, - jobId, - dedupPrefix, - ); - return; - } - - await this.#connection.eval( - FINALIZE_JOB_SCRIPT, - 5, - keys.data, - keys.active, - keys.completed, - keys.completedIndex, - keys.overlay, - jobId, - Date.now().toString(), - maxAge.toString(), - maxCount.toString(), - '', - dedupPrefix, - ); - } - - async failJob( - jobId: string, - queue: string, - error?: Error, - removeOnFail?: JobRetention, - ): Promise { - const keys = this.#getKeys(queue); - const dedupPrefix = this.#getDedupPrefix(queue); - const { keep, maxAge, maxCount } = resolveRetention(removeOnFail); - - if (!keep) { - await this.#connection.eval( - REMOVE_JOB_SCRIPT, - 3, - keys.data, - keys.active, - keys.overlay, - jobId, - dedupPrefix, - ); - return; - } - - await this.#connection.eval( - FINALIZE_JOB_SCRIPT, - 5, - keys.data, - keys.active, - keys.failed, - keys.failedIndex, - keys.overlay, - jobId, - Date.now().toString(), - maxAge.toString(), - maxCount.toString(), - error?.message || '', - dedupPrefix, - ); - } - - async retryJob(jobId: string, queue: string, retryAt?: Date): Promise { - const keys = this.#getKeys(queue); - const now = Date.now(); - - await this.#connection.eval( - RETRY_JOB_SCRIPT, - 5, - keys.data, - keys.active, - keys.pending, - keys.delayed, - keys.overlay, - jobId, - retryAt ? retryAt.getTime().toString() : '0', - now.toString(), - ); - } - - async getJob(jobId: string, queue: string): Promise { - const keys = this.#getKeys(queue); - - const result = await this.#connection.eval( - GET_JOB_SCRIPT, - 7, - keys.data, - keys.pending, - keys.delayed, - keys.active, - keys.completed, - keys.failed, - keys.overlay, - jobId, - ); - - if (!result) { - return null; - } - - const record = JSON.parse(result as string) as Omit & { - data: string; - overlay?: string; - }; - - return { ...record, data: hydrateRedisJob(record.data, record.overlay) }; - } - - push(jobData: JobData): Promise { - return this.pushOn('default', jobData); - } - - pushLater(jobData: JobData, delay: number): Promise { - return this.pushLaterOn('default', jobData, delay); - } - - async pushLaterOn( - queue: string, - jobData: JobData, - delay: number, - ): Promise { - const keys = this.#getKeys(queue); - const executeAt = Date.now() + delay; - - if (jobData.dedup) { - const dedupKey = this.#getDedupKey(queue, jobData.dedup.id); - const [payloadData, payloadIsUndefined] = encodeRedisJobPayloadOverlay( - jobData.payload, - ); - const result = (await this.#connection.eval( - PUSH_DEDUP_JOB_SCRIPT, - 5, - keys.data, - keys.delayed, - dedupKey, - keys.pending, - keys.overlay, - jobData.id, - JSON.stringify(jobData), - executeAt.toString(), - (jobData.dedup.ttl ?? 0).toString(), - jobData.dedup.extend ? '1' : '0', - jobData.dedup.replace ? '1' : '0', - payloadData, - payloadIsUndefined, - )) as [string, string]; - return { outcome: result[0] as DedupOutcome, jobId: result[1] }; - } - + readonly #connection: Redis + readonly #ownsConnection: boolean + #workerId: string = '' + constructor(connection: Redis, ownsConnection: boolean = false) { + this.#connection = connection + this.#ownsConnection = ownsConnection + } + + #getKeys(queue: string) { + return { + data: `${redisKey}::${queue}::data`, + pending: `${redisKey}::${queue}::pending`, + delayed: `${redisKey}::${queue}::delayed`, + active: `${redisKey}::${queue}::active`, + overlay: `${redisKey}::${queue}::metadata`, + completed: `${redisKey}::${queue}::completed`, + completedIndex: `${redisKey}::${queue}::completed::index`, + failed: `${redisKey}::${queue}::failed`, + failedIndex: `${redisKey}::${queue}::failed::index`, + } + } + + #getDedupKey(queue: string, dedupId: string): string { + return `${this.#getDedupPrefix(queue)}${dedupId}` + } + + #getDedupPrefix(queue: string): string { + return `${redisKey}::${queue}::dedup::` + } + + setWorkerId(workerId: string): void { + this.#workerId = workerId + } + + async destroy(): Promise { + if (this.#ownsConnection) { + await this.#connection.quit() + } + } + + pop(): Promise { + return this.popFrom('default') + } + + async popFrom(queue: string): Promise { + const keys = this.#getKeys(queue) + const now = Date.now() + + const result = await this.#connection.eval( + ACQUIRE_JOB_SCRIPT, + 5, + keys.data, + keys.pending, + keys.active, + keys.delayed, + keys.overlay, + this.#workerId, + now.toString() + ) + + if (!result) { + return null + } + + const { data, overlay, acquiredAt } = JSON.parse(result as string) as { + data: string + overlay?: string + acquiredAt: number + } + + return { ...hydrateRedisJob(data, overlay), acquiredAt } + } + + async completeJob(jobId: string, queue: string, removeOnComplete?: JobRetention): Promise { + const keys = this.#getKeys(queue) + const dedupPrefix = this.#getDedupPrefix(queue) + const { keep, maxAge, maxCount } = resolveRetention(removeOnComplete) + + if (!keep) { await this.#connection.eval( - PUSH_DELAYED_JOB_SCRIPT, - 3, - keys.data, - keys.delayed, - keys.overlay, - jobData.id, - JSON.stringify(jobData), - executeAt.toString(), - ); - } - - async pushOn(queue: string, jobData: JobData): Promise { - const keys = this.#getKeys(queue); - const priority = jobData.priority ?? DEFAULT_PRIORITY; - const timestamp = Date.now(); - const score = calculateScore(priority, timestamp); - - if (jobData.dedup) { - const dedupKey = this.#getDedupKey(queue, jobData.dedup.id); - const [payloadData, payloadIsUndefined] = encodeRedisJobPayloadOverlay( - jobData.payload, - ); - const result = (await this.#connection.eval( - PUSH_DEDUP_JOB_SCRIPT, - 5, - keys.data, - keys.pending, - dedupKey, - keys.delayed, - keys.overlay, - jobData.id, - JSON.stringify(jobData), - score.toString(), - (jobData.dedup.ttl ?? 0).toString(), - jobData.dedup.extend ? '1' : '0', - jobData.dedup.replace ? '1' : '0', - payloadData, - payloadIsUndefined, - )) as [string, string]; - return { outcome: result[0] as DedupOutcome, jobId: result[1] }; - } - + REMOVE_JOB_SCRIPT, + 3, + keys.data, + keys.active, + keys.overlay, + jobId, + dedupPrefix + ) + return + } + + await this.#connection.eval( + FINALIZE_JOB_SCRIPT, + 5, + keys.data, + keys.active, + keys.completed, + keys.completedIndex, + keys.overlay, + jobId, + Date.now().toString(), + maxAge.toString(), + maxCount.toString(), + '', + dedupPrefix + ) + } + + async failJob( + jobId: string, + queue: string, + error?: Error, + removeOnFail?: JobRetention + ): Promise { + const keys = this.#getKeys(queue) + const dedupPrefix = this.#getDedupPrefix(queue) + const { keep, maxAge, maxCount } = resolveRetention(removeOnFail) + + if (!keep) { await this.#connection.eval( - PUSH_JOB_SCRIPT, - 3, - keys.data, - keys.pending, - keys.overlay, - jobData.id, - JSON.stringify(jobData), - score.toString(), - ); - } - - pushMany(jobs: JobData[]): Promise { - return this.pushManyOn('default', jobs); - } - - async pushManyOn(queue: string, jobs: JobData[]): Promise { - if (jobs.length === 0) return; - - if (jobs.some((j) => j.dedup)) { - throw new Error( - 'dedup is not supported in batch dispatch; use single dispatch', - ); - } - - const keys = this.#getKeys(queue); - const now = Date.now(); - const multi = this.#connection.multi(); - - for (const job of jobs) { - const priority = job.priority ?? DEFAULT_PRIORITY; - const score = calculateScore(priority, now); - multi.hdel(keys.overlay, job.id); - multi.hset(keys.data, job.id, JSON.stringify(job)); - multi.zadd(keys.pending, score, job.id); - } - - await multi.exec(); - } - - size(): Promise { - return this.sizeOf('default'); - } - - sizeOf(queue: string): Promise { - const keys = this.#getKeys(queue); - return this.#connection.zcard(keys.pending); - } - - async recoverStalledJobs( - queue: string, - stalledThreshold: number, - maxStalledCount: number, - ): Promise { - const keys = this.#getKeys(queue); - const now = Date.now(); - - const recovered = await this.#connection.eval( - RECOVER_STALLED_JOBS_SCRIPT, - 4, - keys.data, - keys.active, - keys.pending, - keys.overlay, - now.toString(), - stalledThreshold.toString(), - maxStalledCount.toString(), - this.#getDedupPrefix(queue), - ); - - return recovered as number; - } - - async upsertSchedule(config: ScheduleConfig): Promise { - const id = config.id ?? randomUUID(); - const now = Date.now(); - const scheduleKey = `${schedulesKey}::${id}`; - const [existingRunCount, existingCreatedAt, existingNextRunAt] = - await this.#connection.hmget( - scheduleKey, - 'run_count', - 'created_at', - 'next_run_at', - ); - - const scheduleData: Record = { - id, - name: config.name, - payload: JSON.stringify(config.payload), - timezone: config.timezone, - status: 'active', - run_count: existingRunCount ?? '0', - created_at: existingCreatedAt ?? now.toString(), - }; - - if (config.cronExpression !== undefined) - scheduleData.cron_expression = config.cronExpression; - if (config.everyMs !== undefined) - scheduleData.every_ms = config.everyMs.toString(); - if (config.from !== undefined) - scheduleData.from_date = config.from.getTime().toString(); - if (config.to !== undefined) - scheduleData.to_date = config.to.getTime().toString(); - if (config.limit !== undefined) - scheduleData.run_limit = config.limit.toString(); - - const multi = this.#connection - .multi() - .hdel( - scheduleKey, - 'cron_expression', - 'every_ms', - 'from_date', - 'to_date', - 'run_limit', - ) - .hset(scheduleKey, scheduleData) - .sadd(schedulesIndexKey, id); - - if (existingNextRunAt) { - multi.zadd( - schedulesDueKey, - Number.parseInt(existingNextRunAt, 10), - id, - ); - } - - await multi.exec(); - - return id; - } - - /** - * @deprecated Use `upsertSchedule` instead. - */ - createSchedule(config: ScheduleConfig): Promise { - return this.upsertSchedule(config); - } - - async getSchedule(id: string): Promise { - const scheduleKey = `${schedulesKey}::${id}`; - const data = await this.#connection.hgetall(scheduleKey); - + REMOVE_JOB_SCRIPT, + 3, + keys.data, + keys.active, + keys.overlay, + jobId, + dedupPrefix + ) + return + } + + await this.#connection.eval( + FINALIZE_JOB_SCRIPT, + 5, + keys.data, + keys.active, + keys.failed, + keys.failedIndex, + keys.overlay, + jobId, + Date.now().toString(), + maxAge.toString(), + maxCount.toString(), + error?.message || '', + dedupPrefix + ) + } + + async retryJob(jobId: string, queue: string, retryAt?: Date): Promise { + const keys = this.#getKeys(queue) + const now = Date.now() + + await this.#connection.eval( + RETRY_JOB_SCRIPT, + 5, + keys.data, + keys.active, + keys.pending, + keys.delayed, + keys.overlay, + jobId, + retryAt ? retryAt.getTime().toString() : '0', + now.toString() + ) + } + + async getJob(jobId: string, queue: string): Promise { + const keys = this.#getKeys(queue) + + const result = await this.#connection.eval( + GET_JOB_SCRIPT, + 7, + keys.data, + keys.pending, + keys.delayed, + keys.active, + keys.completed, + keys.failed, + keys.overlay, + jobId + ) + + if (!result) { + return null + } + + const record = JSON.parse(result as string) as Omit & { + data: string + overlay?: string + } + + return { ...record, data: hydrateRedisJob(record.data, record.overlay) } + } + + push(jobData: JobData): Promise { + return this.pushOn('default', jobData) + } + + pushLater(jobData: JobData, delay: number): Promise { + return this.pushLaterOn('default', jobData, delay) + } + + async pushLaterOn(queue: string, jobData: JobData, delay: number): Promise { + const keys = this.#getKeys(queue) + const executeAt = Date.now() + delay + + if (jobData.dedup) { + const dedupKey = this.#getDedupKey(queue, jobData.dedup.id) + const [payloadData, payloadIsUndefined] = encodeRedisJobPayloadOverlay(jobData.payload) + const result = (await this.#connection.eval( + PUSH_DEDUP_JOB_SCRIPT, + 5, + keys.data, + keys.delayed, + dedupKey, + keys.pending, + keys.overlay, + jobData.id, + JSON.stringify(jobData), + executeAt.toString(), + (jobData.dedup.ttl ?? 0).toString(), + jobData.dedup.extend ? '1' : '0', + jobData.dedup.replace ? '1' : '0', + payloadData, + payloadIsUndefined + )) as [string, string] + return { outcome: result[0] as DedupOutcome, jobId: result[1] } + } + + await this.#connection.eval( + PUSH_DELAYED_JOB_SCRIPT, + 3, + keys.data, + keys.delayed, + keys.overlay, + jobData.id, + JSON.stringify(jobData), + executeAt.toString() + ) + } + + async pushOn(queue: string, jobData: JobData): Promise { + const keys = this.#getKeys(queue) + const priority = jobData.priority ?? DEFAULT_PRIORITY + const timestamp = Date.now() + const score = calculateScore(priority, timestamp) + + if (jobData.dedup) { + const dedupKey = this.#getDedupKey(queue, jobData.dedup.id) + const [payloadData, payloadIsUndefined] = encodeRedisJobPayloadOverlay(jobData.payload) + const result = (await this.#connection.eval( + PUSH_DEDUP_JOB_SCRIPT, + 5, + keys.data, + keys.pending, + dedupKey, + keys.delayed, + keys.overlay, + jobData.id, + JSON.stringify(jobData), + score.toString(), + (jobData.dedup.ttl ?? 0).toString(), + jobData.dedup.extend ? '1' : '0', + jobData.dedup.replace ? '1' : '0', + payloadData, + payloadIsUndefined + )) as [string, string] + return { outcome: result[0] as DedupOutcome, jobId: result[1] } + } + + await this.#connection.eval( + PUSH_JOB_SCRIPT, + 3, + keys.data, + keys.pending, + keys.overlay, + jobData.id, + JSON.stringify(jobData), + score.toString() + ) + } + + pushMany(jobs: JobData[]): Promise { + return this.pushManyOn('default', jobs) + } + + async pushManyOn(queue: string, jobs: JobData[]): Promise { + if (jobs.length === 0) return + + if (jobs.some((j) => j.dedup)) { + throw new Error('dedup is not supported in batch dispatch; use single dispatch') + } + + const keys = this.#getKeys(queue) + const now = Date.now() + const multi = this.#connection.multi() + + for (const job of jobs) { + const priority = job.priority ?? DEFAULT_PRIORITY + const score = calculateScore(priority, now) + multi.hdel(keys.overlay, job.id) + multi.hset(keys.data, job.id, JSON.stringify(job)) + multi.zadd(keys.pending, score, job.id) + } + + await multi.exec() + } + + size(): Promise { + return this.sizeOf('default') + } + + sizeOf(queue: string): Promise { + const keys = this.#getKeys(queue) + return this.#connection.zcard(keys.pending) + } + + async recoverStalledJobs( + queue: string, + stalledThreshold: number, + maxStalledCount: number + ): Promise { + const keys = this.#getKeys(queue) + const now = Date.now() + + const recovered = await this.#connection.eval( + RECOVER_STALLED_JOBS_SCRIPT, + 4, + keys.data, + keys.active, + keys.pending, + keys.overlay, + now.toString(), + stalledThreshold.toString(), + maxStalledCount.toString(), + this.#getDedupPrefix(queue) + ) + + return recovered as number + } + + async upsertSchedule(config: ScheduleConfig): Promise { + const id = config.id ?? randomUUID() + const now = Date.now() + const scheduleKey = `${schedulesKey}::${id}` + const [existingRunCount, existingCreatedAt, existingNextRunAt] = await this.#connection.hmget( + scheduleKey, + 'run_count', + 'created_at', + 'next_run_at' + ) + + const scheduleData: Record = { + id, + name: config.name, + payload: JSON.stringify(config.payload), + timezone: config.timezone, + status: 'active', + run_count: existingRunCount ?? '0', + created_at: existingCreatedAt ?? now.toString(), + } + + if (config.cronExpression !== undefined) scheduleData.cron_expression = config.cronExpression + if (config.everyMs !== undefined) scheduleData.every_ms = config.everyMs.toString() + if (config.from !== undefined) scheduleData.from_date = config.from.getTime().toString() + if (config.to !== undefined) scheduleData.to_date = config.to.getTime().toString() + if (config.limit !== undefined) scheduleData.run_limit = config.limit.toString() + + const multi = this.#connection + .multi() + .hdel(scheduleKey, 'cron_expression', 'every_ms', 'from_date', 'to_date', 'run_limit') + .hset(scheduleKey, scheduleData) + .sadd(schedulesIndexKey, id) + + if (existingNextRunAt) { + multi.zadd(schedulesDueKey, Number.parseInt(existingNextRunAt, 10), id) + } + + await multi.exec() + + return id + } + + /** + * @deprecated Use `upsertSchedule` instead. + */ + createSchedule(config: ScheduleConfig): Promise { + return this.upsertSchedule(config) + } + + async getSchedule(id: string): Promise { + const scheduleKey = `${schedulesKey}::${id}` + const data = await this.#connection.hgetall(scheduleKey) + + if (!data || Object.keys(data).length === 0) { + return null + } + + return this.#hashToScheduleData(data) + } + + async listSchedules(options?: ScheduleListOptions): Promise { + const ids = await this.#connection.smembers(schedulesIndexKey) + if (ids.length === 0) { + return [] + } + + const pipeline = this.#connection.pipeline() + + for (const id of ids) { + pipeline.hgetall(`${schedulesKey}::${id}`) + } + + const results = await pipeline.exec() + if (!results) { + return [] + } + + const schedules: ScheduleData[] = [] + + for (const [, data] of results) { if (!data || Object.keys(data).length === 0) { - return null; - } - - return this.#hashToScheduleData(data); - } - - async listSchedules(options?: ScheduleListOptions): Promise { - const ids = await this.#connection.smembers(schedulesIndexKey); - if (ids.length === 0) { - return []; - } - - const pipeline = this.#connection.pipeline(); - - for (const id of ids) { - pipeline.hgetall(`${schedulesKey}::${id}`); + continue } - const results = await pipeline.exec(); - if (!results) { - return []; - } - - const schedules: ScheduleData[] = []; - - for (const [, data] of results) { - if (!data || Object.keys(data).length === 0) { - continue; - } + const schedule = this.#hashToScheduleData(data as Record) - const schedule = this.#hashToScheduleData( - data as Record, - ); - - // Filter by status if provided - if (options?.status && schedule.status !== options.status) { - continue; - } - - schedules.push(schedule); - } - - return schedules; - } - - async updateSchedule( - id: string, - updates: Partial< - Pick - >, - ): Promise { - const scheduleKey = `${schedulesKey}::${id}`; - const data: Record = {}; - - if (updates.status !== undefined) data.status = updates.status; - if (updates.nextRunAt !== undefined) { - data.next_run_at = updates.nextRunAt - ? updates.nextRunAt.getTime().toString() - : ''; + // Filter by status if provided + if (options?.status && schedule.status !== options.status) { + continue } - if (updates.lastRunAt !== undefined) { - data.last_run_at = updates.lastRunAt - ? updates.lastRunAt.getTime().toString() - : ''; - } - if (updates.runCount !== undefined) - data.run_count = updates.runCount.toString(); - - if (Object.keys(data).length === 0) return; - - const multi = this.#connection.multi().hset(scheduleKey, data); - if (updates.nextRunAt) { - multi.zadd(schedulesDueKey, updates.nextRunAt.getTime(), id); - } else if (updates.nextRunAt === null || updates.status === 'paused') { - multi.zrem(schedulesDueKey, id); + schedules.push(schedule) + } + + return schedules + } + + async updateSchedule( + id: string, + updates: Partial> + ): Promise { + const scheduleKey = `${schedulesKey}::${id}` + const data: Record = {} + + if (updates.status !== undefined) data.status = updates.status + if (updates.nextRunAt !== undefined) { + data.next_run_at = updates.nextRunAt ? updates.nextRunAt.getTime().toString() : '' + } + if (updates.lastRunAt !== undefined) { + data.last_run_at = updates.lastRunAt ? updates.lastRunAt.getTime().toString() : '' + } + if (updates.runCount !== undefined) data.run_count = updates.runCount.toString() + + if (Object.keys(data).length === 0) return + + const multi = this.#connection.multi().hset(scheduleKey, data) + + if (updates.nextRunAt) { + multi.zadd(schedulesDueKey, updates.nextRunAt.getTime(), id) + } else if (updates.nextRunAt === null || updates.status === 'paused') { + multi.zrem(schedulesDueKey, id) + } + + if (updates.status === 'active' && updates.nextRunAt === undefined) { + const existing = await this.#connection.hget(scheduleKey, 'next_run_at') + if (existing) { + multi.zadd(schedulesDueKey, Number.parseInt(existing, 10), id) } - - if (updates.status === 'active' && updates.nextRunAt === undefined) { - const existing = await this.#connection.hget( - scheduleKey, - 'next_run_at', - ); - if (existing) { - multi.zadd(schedulesDueKey, Number.parseInt(existing, 10), id); - } + } + + await multi.exec() + } + + async deleteSchedule(id: string): Promise { + const scheduleKey = `${schedulesKey}::${id}` + await this.#connection + .multi() + .del(scheduleKey) + .srem(schedulesIndexKey, id) + .zrem(schedulesDueKey, id) + .exec() + } + + async migrate(): Promise { + await this.backfillDueIndex() + } + + async claimDueSchedule(): Promise { + const now = Date.now() + const result = await this.#connection.eval( + CLAIM_SCHEDULE_SCRIPT, + 2, + schedulesDueKey, + `${schedulesKey}::`, + now.toString() + ) + + if (!result) { + return null + } + + const data = JSON.parse(result as string) as Record + + // If cron expression, we need to recalculate next_run_at properly. + // The Lua script only handles simple interval; cron needs JS cron-parser. + // This is safe because the schedule is already claimed (run_count incremented). + if (data.cron_expression) { + const { CronExpressionParser } = await import('cron-parser') + const cron = CronExpressionParser.parse(data.cron_expression, { + currentDate: new Date(now), + tz: data.timezone || 'UTC', + }) + const nextRun = cron.next().toDate().getTime() + + const runCount = Number.parseInt(data.run_count || '0', 10) + 1 + const runLimit = data.run_limit ? Number.parseInt(data.run_limit, 10) : null + const toDate = data.to_date ? Number.parseInt(data.to_date, 10) : null + + let newNextRunAt: number | string = nextRun + + if (runLimit !== null && runCount >= runLimit) { + newNextRunAt = '' + } else if (toDate && nextRun > toDate) { + newNextRunAt = '' } - await multi.exec(); - } - - async deleteSchedule(id: string): Promise { - const scheduleKey = `${schedulesKey}::${id}`; - await this.#connection - .multi() - .del(scheduleKey) - .srem(schedulesIndexKey, id) - .zrem(schedulesDueKey, id) - .exec(); - } - - async #ensureDueIndex(): Promise { - if (this.#dueIndexReady) return; - await this.backfillDueIndex(); - this.#dueIndexReady = true; - } - - async claimDueSchedule(): Promise { - await this.#ensureDueIndex(); - - const now = Date.now(); - const result = await this.#connection.eval( - CLAIM_SCHEDULE_SCRIPT, - 2, - schedulesDueKey, - `${schedulesKey}::`, - now.toString(), - ); - - if (!result) { - return null; - } + const scheduleKey = `${schedulesKey}::${data.id}` + const multi = this.#connection + .multi() + .hset(scheduleKey, 'next_run_at', newNextRunAt.toString()) - const data = JSON.parse(result as string) as Record; - - // If cron expression, we need to recalculate next_run_at properly. - // The Lua script only handles simple interval; cron needs JS cron-parser. - // This is safe because the schedule is already claimed (run_count incremented). - if (data.cron_expression) { - const { CronExpressionParser } = await import('cron-parser'); - const cron = CronExpressionParser.parse(data.cron_expression, { - currentDate: new Date(now), - tz: data.timezone || 'UTC', - }); - const nextRun = cron.next().toDate().getTime(); - - const runCount = Number.parseInt(data.run_count || '0', 10) + 1; - const runLimit = data.run_limit - ? Number.parseInt(data.run_limit, 10) - : null; - const toDate = data.to_date ? Number.parseInt(data.to_date, 10) : null; - - let newNextRunAt: number | string = nextRun; - - if (runLimit !== null && runCount >= runLimit) { - newNextRunAt = ''; - } else if (toDate && nextRun > toDate) { - newNextRunAt = ''; - } - - const scheduleKey = `${schedulesKey}::${data.id}`; - const multi = this.#connection - .multi() - .hset(scheduleKey, 'next_run_at', newNextRunAt.toString()); - - if (typeof newNextRunAt === 'number') { - multi.zadd(schedulesDueKey, newNextRunAt, data.id); - } else { - multi.zrem(schedulesDueKey, data.id); - } - - await multi.exec(); + if (typeof newNextRunAt === 'number') { + multi.zadd(schedulesDueKey, newNextRunAt, data.id) + } else { + multi.zrem(schedulesDueKey, data.id) } - return this.#hashToScheduleData(data); - } - - async backfillDueIndex(): Promise { - const ids = await this.#connection.smembers(schedulesIndexKey); - if (ids.length === 0) return 0; - - const pipeline = this.#connection.pipeline(); - for (const id of ids) { - pipeline.hmget(`${schedulesKey}::${id}`, 'next_run_at', 'status'); - } - const results = await pipeline.exec(); - if (!results) return 0; - - const addPipeline = this.#connection.pipeline(); - let count = 0; - - for (let i = 0; i < ids.length; i++) { - const [err, values] = results[i]; - if (err || !values) continue; - const [nextRunAt, status] = values as [string | null, string | null]; - if (nextRunAt && status === 'active') { - addPipeline.zadd( - schedulesDueKey, - Number.parseInt(nextRunAt, 10), - ids[i], - ); - count++; - } + await multi.exec() + } + + return this.#hashToScheduleData(data) + } + + async backfillDueIndex(): Promise { + const ids = await this.#connection.smembers(schedulesIndexKey) + if (ids.length === 0) return 0 + + const pipeline = this.#connection.pipeline() + for (const id of ids) { + pipeline.hmget(`${schedulesKey}::${id}`, 'next_run_at', 'status') + } + const results = await pipeline.exec() + if (!results) return 0 + + const addPipeline = this.#connection.pipeline() + let count = 0 + + for (let i = 0; i < ids.length; i++) { + const [err, values] = results[i] + if (err || !values) continue + const [nextRunAt, status] = values as [string | null, string | null] + if (nextRunAt && status === 'active') { + addPipeline.zadd(schedulesDueKey, Number.parseInt(nextRunAt, 10), ids[i]) + count++ } - - if (count > 0) await addPipeline.exec(); - return count; - } - - #hashToScheduleData(data: Record): ScheduleData { - return { - id: data.id, - name: data.name, - payload: JSON.parse(data.payload || '{}'), - cronExpression: data.cron_expression || null, - everyMs: data.every_ms ? Number.parseInt(data.every_ms, 10) : null, - timezone: data.timezone || 'UTC', - from: data.from_date - ? new Date(Number.parseInt(data.from_date, 10)) - : null, - to: data.to_date ? new Date(Number.parseInt(data.to_date, 10)) : null, - limit: data.run_limit ? Number.parseInt(data.run_limit, 10) : null, - runCount: Number.parseInt(data.run_count || '0', 10), - nextRunAt: data.next_run_at - ? new Date(Number.parseInt(data.next_run_at, 10)) - : null, - lastRunAt: data.last_run_at - ? new Date(Number.parseInt(data.last_run_at, 10)) - : null, - status: (data.status as 'active' | 'paused') || 'active', - createdAt: data.created_at - ? new Date(Number.parseInt(data.created_at, 10)) - : new Date(), - }; - } + } + + if (count > 0) await addPipeline.exec() + return count + } + + #hashToScheduleData(data: Record): ScheduleData { + return { + id: data.id, + name: data.name, + payload: JSON.parse(data.payload || '{}'), + cronExpression: data.cron_expression || null, + everyMs: data.every_ms ? Number.parseInt(data.every_ms, 10) : null, + timezone: data.timezone || 'UTC', + from: data.from_date ? new Date(Number.parseInt(data.from_date, 10)) : null, + to: data.to_date ? new Date(Number.parseInt(data.to_date, 10)) : null, + limit: data.run_limit ? Number.parseInt(data.run_limit, 10) : null, + runCount: Number.parseInt(data.run_count || '0', 10), + nextRunAt: data.next_run_at ? new Date(Number.parseInt(data.next_run_at, 10)) : null, + lastRunAt: data.last_run_at ? new Date(Number.parseInt(data.last_run_at, 10)) : null, + status: (data.status as 'active' | 'paused') || 'active', + createdAt: data.created_at ? new Date(Number.parseInt(data.created_at, 10)) : new Date(), + } + } } diff --git a/src/drivers/redis_scripts.ts b/src/drivers/redis_scripts.ts index 2f8cedf..82e00cb 100644 --- a/src/drivers/redis_scripts.ts +++ b/src/drivers/redis_scripts.ts @@ -1,4 +1,4 @@ -import { REDIS_DEDUP_LUA, REDIS_JOB_STORAGE_LUA } from './redis_job_storage.js'; +import { REDIS_DEDUP_LUA, REDIS_JOB_STORAGE_LUA } from './redis_job_storage.js' /** * Lua script for pushing a job to the queue. @@ -18,7 +18,7 @@ ${REDIS_JOB_STORAGE_LUA} redis.call('ZADD', pending_key, score, job_id) return 1 -`; +` /** * Lua script for pushing a dedup job. @@ -80,7 +80,7 @@ ${REDIS_DEDUP_LUA} redis.call('PEXPIRE', dedup_key, ttl) end return {'added', job_id} -`; +` /** * Lua script for pushing a delayed job. @@ -100,7 +100,7 @@ ${REDIS_JOB_STORAGE_LUA} redis.call('ZADD', delayed_key, execute_at, job_id) return 1 -`; +` /** * Lua script for atomic job acquisition. @@ -158,7 +158,7 @@ ${REDIS_JOB_STORAGE_LUA} return encode_job_result(job_data, overlay_key, job_id, { acquiredAt = now }) -`; +` /** * Lua script for removing a job completely (no history). @@ -193,7 +193,7 @@ ${REDIS_JOB_STORAGE_LUA} delete_job_data(data_key, overlay_key, job_id) return 1 -`; +` /** * Lua script for finalizing a job in history. @@ -277,7 +277,7 @@ ${REDIS_JOB_STORAGE_LUA} end return 1 -`; +` /** * Lua script for retrying a job. @@ -330,7 +330,7 @@ ${REDIS_JOB_STORAGE_LUA} end return 1 -`; +` /** * Lua script for recovering stalled jobs. @@ -399,7 +399,7 @@ ${REDIS_JOB_STORAGE_LUA} end return recovered -`; +` /** * Lua script for getting a job record with its status. @@ -458,7 +458,7 @@ ${REDIS_JOB_STORAGE_LUA} finishedAt = finished_at, error = error_msg }) -`; +` /** * Lua script for atomically claiming a due schedule using a sorted set index. @@ -505,6 +505,14 @@ export const CLAIM_SCHEDULE_SCRIPT = ` if schedule.status ~= 'active' then redis.call('ZREM', due_key, id) else + -- Hash is the source of truth for next_run_at. + -- If the ZSET score is stale, repair it and skip this candidate. + local hash_nra = schedule.next_run_at + if not hash_nra or hash_nra == '' then + redis.call('ZREM', due_key, id) + elseif tonumber(hash_nra) > now then + redis.call('ZADD', due_key, tonumber(hash_nra), id) + else local run_count = tonumber(schedule.run_count or '0') local run_limit = schedule.run_limit and tonumber(schedule.run_limit) or nil local to_date = schedule.to_date and tonumber(schedule.to_date) or nil @@ -550,7 +558,8 @@ export const CLAIM_SCHEDULE_SCRIPT = ` -- Return the schedule data (before update) as JSON return cjson.encode(schedule) end + end end end end -`; +` diff --git a/src/drivers/sync_adapter.ts b/src/drivers/sync_adapter.ts index d97fa55..e94aa7b 100644 --- a/src/drivers/sync_adapter.ts +++ b/src/drivers/sync_adapter.ts @@ -118,6 +118,10 @@ export class SyncAdapter implements Adapter { return Promise.resolve() } + migrate(): Promise { + return Promise.resolve() + } + upsertSchedule(_config: ScheduleConfig): Promise { // No-op: schedules don't make sense for sync adapter // Return a fake ID so code doesn't break in dev diff --git a/tests/_mocks/memory_adapter.ts b/tests/_mocks/memory_adapter.ts index fa64e31..40f40e5 100644 --- a/tests/_mocks/memory_adapter.ts +++ b/tests/_mocks/memory_adapter.ts @@ -293,6 +293,10 @@ export class MemoryAdapter implements Adapter { return Promise.resolve() } + migrate(): Promise { + return Promise.resolve() + } + async upsertSchedule(config: ScheduleConfig): Promise { const id = config.id ?? randomUUID() const existing = this.#schedules.get(id) diff --git a/tests/adapter.spec.ts b/tests/adapter.spec.ts index e061869..5e3209a 100644 --- a/tests/adapter.spec.ts +++ b/tests/adapter.spec.ts @@ -99,9 +99,6 @@ test.group('Adapter | Redis', (group) => { await adapter.updateSchedule(id, { nextRunAt: futureRunAt }) } - // Warm the due-index backfill so it doesn't count against the spy - await adapter.claimDueSchedule() - const { result: claimed, writes } = await withRedisWriteSpy({ connection, run: () => adapter.claimDueSchedule(), @@ -607,6 +604,92 @@ test.group('Adapter | Redis', (group) => { assert.isNull(await connection.hget(metadataKey, 'metadata-stalled-uuid-1')) assert.isNull(await adapter.getJob('metadata-stalled-uuid-1', queue)) }) + + test('backfillDueIndex populates ZSET for pre-existing schedules', async ({ assert }) => { + const adapter = new RedisAdapter(connection) + + // Simulate pre-upgrade schedule data: write hash + index directly, skip ZSET + const id = 'pre-existing-schedule' + const pastRunAt = (Date.now() - 5_000).toString() + await connection + .multi() + .hset(`schedules::${id}`, { + id, + name: 'LegacyJob', + payload: '{}', + status: 'active', + every_ms: '60000', + timezone: 'UTC', + next_run_at: pastRunAt, + last_run_at: '', + run_count: '0', + created_at: Date.now().toString(), + }) + .sadd('schedules::index', id) + .exec() + + // Without backfill, ZSET has no entry so claim returns null + const beforeBackfill = await adapter.claimDueSchedule() + assert.isNull(beforeBackfill) + + await adapter.backfillDueIndex() + + const afterBackfill = await adapter.claimDueSchedule() + assert.isNotNull(afterBackfill) + assert.equal(afterBackfill!.id, id) + }) + + test('backfillDueIndex is idempotent', async ({ assert }) => { + const adapter = new RedisAdapter(connection) + + await adapter.upsertSchedule({ + id: 'idempotent-schedule', + name: 'TestJob', + payload: {}, + everyMs: 60_000, + timezone: 'UTC', + }) + await adapter.updateSchedule('idempotent-schedule', { + nextRunAt: new Date(Date.now() + 30_000), + }) + + // Clear the ZSET so backfill has work to do + await connection.del('schedules::due') + + const first = await adapter.backfillDueIndex() + const second = await adapter.backfillDueIndex() + + assert.isAbove(first, 0) + assert.equal(second, first) + + const score = await connection.zscore('schedules::due', 'idempotent-schedule') + assert.isNotNull(score) + }) + + test('stale ZSET score is self-healed during claim', async ({ assert }) => { + const adapter = new RedisAdapter(connection) + const id = 'stale-score-schedule' + const futureRunAt = Date.now() + 60_000 + + await adapter.upsertSchedule({ + id, + name: 'StaleJob', + payload: {}, + everyMs: 60_000, + timezone: 'UTC', + }) + await adapter.updateSchedule(id, { nextRunAt: new Date(futureRunAt) }) + + // Corrupt the ZSET score to a past value while hash still says future + await connection.zadd('schedules::due', Date.now() - 10_000, id) + + const claimed = await adapter.claimDueSchedule() + assert.isNull(claimed, 'should not claim when hash says schedule is not due yet') + + // ZSET score should have been repaired to match the hash + const repairedScore = await connection.zscore('schedules::due', id) + assert.equal(Number(repairedScore), futureRunAt) + }) }) test.group('Adapter | Knex (SQLite)', (group) => {