Releases: boringnode/queue
All types are imported from /types subpath
Breaking Change (patch due to 0.3.0 being released < 10 minutes ago)
Types are no longer exported from the main entry point. Import them from the /types subpath instead:
- import type { AdapterFactory } from '@boringnode/queue'
+ import type { AdapterFactory } from '@boringnode/queue/types'Also added missing type exports: AdapterFactory, DispatchManyResult, JobRecord, JobRetention, JobStatus.
Job Retention, Bulk Dispatch & Storage Redesign
Breaking Changes
Redis + Knex storage layout
This release introduces a new storage layout for both Redis and Knex adapters. Existing data is incompatible and requires migration.
The storage architecture has been completely redesigned to support new features like job retention and status tracking. This change affects both Redis and Knex adapters.
Redis adapter migration
New storage layout
The Redis adapter now stores job payloads in a dedicated hash and tracks queue state in separate sets/hashes:
| Key | Type | Description |
|---|---|---|
jobs::<queue>::data |
Hash | jobId -> job payload JSON |
jobs::<queue>::pending |
Sorted Set | jobId -> priority score |
jobs::<queue>::delayed |
Sorted Set | jobId -> executeAt timestamp |
jobs::<queue>::active |
Hash | jobId -> { workerId, acquiredAt } |
jobs::<queue>::completed |
Hash | jobId -> { finishedAt } |
jobs::<queue>::completed::index |
Sorted Set | jobId -> finishedAt (for pruning) |
jobs::<queue>::failed |
Hash | jobId -> { finishedAt, error } |
jobs::<queue>::failed::index |
Sorted Set | jobId -> finishedAt (for pruning) |
Migration steps
Option 1: Flush existing data (recommended for alpha users)
redis-cli KEYS "jobs::*" | xargs redis-cli DELOption 2: Wait for existing jobs to complete
- Stop pushing new jobs
- Wait for all workers to drain existing queues
- Deploy new version
- Clean up old keys:
# Remove old format keys (adjust pattern to your prefix)
redis-cli KEYS "jobs::*" | grep -v "::data\|::pending\|::delayed\|::active\|::completed\|::failed" | xargs redis-cli DELKnex adapter migration
Schema changes
The Knex adapter now persists completed/failed job state. Existing tables need these changes:
| Column | Type | Description |
|---|---|---|
finished_at |
BIGINT (nullable) | Completion/failure timestamp |
error |
TEXT (nullable) | Error message for failed jobs |
status |
ENUM | Add completed and failed values |
New index for pruning queries:
(queue, status, finished_at)
Migration SQL
PostgreSQL:
-- Add new columns
ALTER TABLE queue_jobs ADD COLUMN finished_at BIGINT;
ALTER TABLE queue_jobs ADD COLUMN error TEXT;
-- Add new enum values (PostgreSQL specific)
ALTER TYPE queue_jobs_status ADD VALUE 'completed';
ALTER TYPE queue_jobs_status ADD VALUE 'failed';
-- Add index for pruning
CREATE INDEX idx_queue_jobs_finished ON queue_jobs (queue, status, finished_at);MySQL:
-- Add new columns
ALTER TABLE queue_jobs ADD COLUMN finished_at BIGINT UNSIGNED NULL;
ALTER TABLE queue_jobs ADD COLUMN error TEXT NULL;
-- Modify enum to include new values
ALTER TABLE queue_jobs MODIFY COLUMN status ENUM('pending', 'active', 'delayed', 'completed', 'failed') NOT NULL;
-- Add index for pruning
CREATE INDEX idx_queue_jobs_finished ON queue_jobs (queue, status, finished_at);SQLite:
-- Add new columns (SQLite doesn't enforce enum, so status just works)
ALTER TABLE queue_jobs ADD COLUMN finished_at INTEGER;
ALTER TABLE queue_jobs ADD COLUMN error TEXT;
-- Add index for pruning
CREATE INDEX idx_queue_jobs_finished ON queue_jobs (queue, status, finished_at);Fresh install
For new installations, drop and recreate the table to get the new schema automatically:
DROP TABLE IF EXISTS queue_jobs;
-- Table will be recreated on first adapter useNew Features
Bulk job dispatch with dispatchMany
Jobs can now be dispatched in batches using Job.dispatchMany(). This is more efficient than calling dispatch() multiple times as it uses optimized batch operations (Redis MULTI/EXEC transaction, SQL batch insert).
// Dispatch multiple jobs at once
const { jobIds } = await SendEmailJob.dispatchMany([
{ to: 'user1@example.com', subject: 'Newsletter' },
{ to: 'user2@example.com', subject: 'Newsletter' },
{ to: 'user3@example.com', subject: 'Newsletter' },
])
.group('newsletter-jan-2025')
.toQueue('emails')
.priority(3)
.run()
console.log(`Dispatched ${jobIds.length} jobs`)Use cases
- Newsletters: Send thousands of emails in a single batch operation
- Bulk exports: Create export jobs for multiple users
- Data migrations: Queue many transformation jobs at once
- Notifications: Dispatch notifications to many recipients
API
The JobBatchDispatcher supports the same fluent API as JobDispatcher:
await SendEmailJob.dispatchMany(payloads)
.toQueue('emails') // Target queue
.group('batch-123') // Group all jobs together
.priority(1) // Set priority for all jobs
.with('redis') // Use specific adapter
.run()Adapter API
For low-level access, adapters now support pushMany() and pushManyOn():
await adapter.pushManyOn('emails', [
{ id: 'uuid1', name: 'SendEmailJob', payload: {...}, attempts: 0 },
{ id: 'uuid2', name: 'SendEmailJob', payload: {...}, attempts: 0 },
])Job grouping with groupId
Jobs can now be assigned to a group using the groupId option. This allows organizing related jobs together for easier monitoring and filtering in UIs.
// Assign jobs to a group
await SendEmailJob.dispatch({ to: 'user@example.com' })
.group('newsletter-jan-2025')
.run()
// Combine with other options
await ExportJob.dispatch({ userId: 1 })
.group('batch-export-123')
.toQueue('exports')
.priority(2)
.run()Use cases
- Batch operations: Group all jobs from a newsletter send, bulk export, or data migration
- Monitoring: Filter and view related jobs together in queue UIs
- Debugging: Easily find all jobs related to a specific operation
API
The groupId is stored with the job data and can be accessed via:
const record = await adapter.getJob('job-id', 'queue-name')
console.log(record.data.groupId) // 'newsletter-jan-2025'Job retention
Jobs can now be kept in history after completion or failure using the removeOnComplete and removeOnFail options:
// Global configuration
await QueueManager.init({
// ...
defaultJobOptions: {
removeOnComplete: false, // Keep all completed jobs
removeOnFail: { count: 100 }, // Keep last 100 failed jobs
},
})
// Per-queue configuration
await QueueManager.init({
// ...
queues: {
critical: {
defaultJobOptions: {
removeOnFail: { age: '7d', count: 1000 }, // Keep for 7 days, max 1000
},
},
},
})
// Per-job configuration (via Job class)
class MyJob extends Job {
static options = {
removeOnComplete: { count: 50 },
}
}Retention options
true(default): Remove job immediately after completion/failurefalse: Keep job in history indefinitely{ age?: Duration, count?: number }: Keep with pruning by age and/or count
Job status API
A new getJob method allows retrieving job status and data:
const adapter = QueueManager.use()
const record = await adapter.getJob('job-id', 'queue-name')
if (record) {
console.log(record.status) // 'pending' | 'active' | 'delayed' | 'completed' | 'failed'
console.log(record.data) // Original job data
console.log(record.finishedAt) // Timestamp (for completed/failed)
console.log(record.error) // Error message (for failed)
}Commits
- docs: refactor README with new features and better structure (a0bc28c)
- fix: prevent double-claim on SQLite with status guard (1a089dd)
- feat: add dispatchMany for bulk job dispatch (9bc5fa8)
- feat: add groupId support for job grouping (b2594d2)
- docs(examples): add job retention options (bd3b8d1)
- chore(tests): remove unnecessary locations config and debug logs (046faf1)
- feat: add job history and retention (1db9f85)
- feat: add job retention types (839f42e)
- chore: add funding file (69f9f39)
- docs: use markdown note (3f33166)
Full Changelog: v0.2.0...v0.3.0
DI-first Job Architecture & Simplified Naming
Breaking Changes
Job constructor reserved for dependency injection
The Job constructor no longer accepts payload and context parameters. These are now provided via the internal $hydrate() method called by the worker.
Before:
class SendEmailJob extends Job<SendEmailPayload> {
constructor(payload: SendEmailPayload, context: JobContext) {
super(payload, context)
}
}After:
class SendEmailJob extends Job<SendEmailPayload> {
// Constructor for DI only
constructor(private mailer: MailerService) {
super()
}
}
// Or simply omit the constructor if no DI needed
class SendEmailJob extends Job<SendEmailPayload> {
async execute() { ... }
}JobFactory signature simplified
The jobFactory now only receives JobClass. The worker handles payload/context via $hydrate().
Before: jobFactory: (JobClass, payload, context) => ...
After: jobFactory: (JobClass) => container.make(JobClass)
AbortSignal moved to this.signal
The signal parameter has been removed from execute(). Access it via this.signal instead.
Before: async execute(signal?: AbortSignal)
After: async execute() + use this.signal
static jobName replaced by options.name
Job name now defaults to the class name. The static jobName property is removed.
Before:
static readonly jobName = 'SendEmailJob'
static options = { queue: 'emails' }After:
static options = { queue: 'emails' }
// name defaults to class name, or override with: name: 'CustomName'Warning
If minifying code in production, always specify name explicitly.
Schedule jobName → name
ScheduleConfig.jobName and ScheduleData.jobName renamed to name. Database column renamed from job_name to name.
Warning
Existing databases require manual migration or table recreation.
New Features
this.signal- Access abort signal anywhere in the job instance for timeout handling$hydrate()method - Internal method for worker to provide runtime data (payload, context, signal)
Commits
- refactor!: rename jobName to name in Schedule types and database (00446c3)
- refactor!: replace static jobName with options.name (ef3e218)
- refactor!: separate dependency injection from job hydration (e1bc7b5)
- docs: improve JSDoc documentation for types with default values (c12654a)
Full Changelog: v0.1.0...v0.2.0
Persistent Job Scheduling
New Feature: Persistent Job Scheduling
Schedule jobs to run on a recurring basis using cron expressions or fixed intervals. Schedules are persisted in the database and survive worker restarts.
Creating Schedules
// Interval-based schedule (ID defaults to job name)
await MetricsJob.schedule({ endpoint: '/api/health' })
.every('10s')
.run()
// Cron-based schedule with custom ID
await CleanupJob.schedule({ days: 30 })
.id('daily-cleanup')
.cron('0 0 * * *')
.timezone('Europe/Paris')
.run()
// With constraints
await ReportJob.schedule({ type: 'weekly' })
.cron('0 9 * * MON')
.from(new Date('2024-01-01'))
.to(new Date('2024-12-31'))
.limit(52)
.run()Managing Schedules
import { Schedule } from '@boringnode/queue'
const schedule = await Schedule.find('MetricsJob')
await schedule.pause()
await schedule.resume()
await schedule.trigger() // Immediate run
await schedule.delete()
// List schedules
const active = await Schedule.list({ status: 'active' })Schedule Options
| Method | Description |
|---|---|
| .id(string) | Unique identifier (defaults to job name) |
| .every(duration) | Fixed interval ('5s', '1m', '1h', '1d') |
| .cron(expression) | Cron schedule |
| .timezone(tz) | Timezone for cron (default: 'UTC') |
| .from(date) / .to(date) | Date boundaries |
| .between(from, to) | Shorthand for date range |
| .limit(n) | Maximum number of runs |
How It Works
- Schedules are persisted via the adapter (Redis, Knex)
- Worker automatically polls and dispatches due schedules
- Multiple workers can run concurrently - atomic claiming prevents duplicates
- Failed jobs don't affect the schedule - next run still occurs
Job Context & Dependency Injection"
Breaking Changes
Job Constructor Signature
The Job constructor now requires a context parameter in addition to payload:
// Before
class MyJob extends Job<Payload> {
constructor(payload: Payload) {
super(payload)
}
}
// After
import type { JobContext } from '@boringnode/queue/types'
class MyJob extends Job<Payload> {
constructor(payload: Payload, context: JobContext) {
super(payload, context)
}
}Type Imports
Types must now be imported from @boringnode/queue/types:
// Before
import type { JobContext } from '@boringnode/queue'
import type { JobOptions } from '@boringnode/queue/types/main'
// After
import type { JobContext, JobOptions } from '@boringnode/queue/types'Worker Config Rename
The pollingInterval option has been renamed to idleDelay:
// Before
worker: {
pollingInterval: '2s'
}
// After
worker: {
idleDelay: '2s'
}
New Features
Job Context
Jobs now have access to execution metadata via this.context:
class MyJob extends Job<Payload> {
async execute() {
console.log(`Job ID: ${this.context.jobId}`)
console.log(`Attempt: ${this.context.attempt}`) // 1, 2, 3...
console.log(`Queue: ${this.context.queue}`)
if (this.context.attempt > 1) {
console.log('This is a retry!')
}
}
}Available context properties:
| Property | Type | Description |
|---|---|---|
| jobId | string | Unique job identifier |
| name | string | Job class name |
| attempt | number | Current attempt (1-based) |
| queue | string | Queue name |
| priority | number | Job priority |
| acquiredAt | Date | When job was acquired |
| stalledCount | number | Stalled recovery count |
Dependency Injection Support
Custom job instantiation via jobFactory enables IoC container integration:
await QueueManager.init({
default: 'redis',
adapters: { redis: redis() },
jobFactory: async (JobClass, payload, context) => {
return app.container.make(JobClass, [payload, context])
}
})Graceful Shutdown
Workers now support graceful shutdown with configurable behavior:
const worker = new Worker({
// ...
worker: {
gracefulShutdown: true, // default: true
onShutdownSignal: async () => {
console.log('Shutdown signal received, cleaning up...')
}
}
})gracefulShutdown- Enable/disable automatic SIGINT/SIGTERM handling (default:true)onShutdownSignal- Optional callback invoked when a shutdown signal is received
Custom Logger
You can now provide a custom logger (pino-compatible interface):
import { pino } from 'pino'
await QueueManager.init({
default: 'redis',
adapters: { redis: redis() },
logger: pino()
})The logger is used for warnings (e.g., when no jobs are found for configured locations) and errors.
Named Errors Export
All error classes are now exported via the errors namespace:
import { errors } from '@boringnode/queue'
try {
await job.dispatch()
} catch (e) {
if (e instanceof errors.E_JOB_NOT_FOUND) {
// handle job not found
}
if (e instanceof errors.E_QUEUE_NOT_INITIALIZED) {
// QueueManager.init() was not called
}
}Available errors:
E_JOB_NOT_FOUND- Job class not registeredE_JOB_TIMEOUT- Job exceeded timeoutE_JOB_MAX_ATTEMPTS_REACHED- Job exhausted all retriesE_QUEUE_NOT_INITIALIZED- QueueManager not initializedE_ADAPTER_INIT_ERROR- Adapter factory threw an errorE_CONFIGURATION_ERROR- Invalid configuration
Commits
- refactor!: consolidate type exports under /types path (5cec963)
- feat!: expose job context to job instances (26d8043)
- feat: add jobFactory option for dependency injection support (c1563b5)
- docs: add JSDoc documentation to public APIs (592f4b9)
- refactor!: extract magic numbers into named constants (df0688c)
- fix: improve graceful shutdown in Worker (12c46f6)
- chore: update lock file (ccdc019)
- ci: add GitHub Actions workflow for checks (5bf7db3)
- chore: add logger information (e23fbeb)
- fix: improve error handling in QueueManager (658488d)
- feat: add injectable logger system (e89f662)
- feat: export errors and types for better DX (892fcf8)
- test: add worker concurrency tests to prevent job duplication (d736521)
- refactor: make locations optional in QueueManagerConfig (6d3be20)
- fix(knex): prevent race condition with FOR UPDATE SKIP LOCKED (379dc03)
- feat: add stalled jobs recovery mechanism (5b0970f)
- fix(redis): remove unsafe popAndWait method with race condition (dd2bfa8)
- chore: add drivers as peer deps (109bc08)
Do not use import path
Use .js to import files
Commits
Full Changelog: v0.0.1-alpha.1...v0.0.1-alpha.2
Knex adapter & connection management
This release adds a Knex adapter, bringing SQL database support alongside Redis.
Connection handling has been improved: Redis now only closes connections it owns, making it safer to use shared/external connections. Adapter instances are now cached to avoid unnecessary re-instantiation.
On the performance side, pool filling now uses parallelized pop operations for faster job retrieval.
Commits
- feat(adapter): add knex adapter (c41fc8b)
- fix(manager): cache adapter instances (8cd76b8)
- feat(redis): only quit the connection when we own it (89cb921)
- docs(readme): add benchmark value with 100k jobs (45579d9)
- docs(readme): update benchmark results with real stuff (b998045)
- refactor(benchmark): allow to take duration as a params (df13753)
- refactor(worker): parallelize pop for filling the pool (15cdc86)
Full Changelog: v0.0.1-alpha.0...v0.0.1-alpha.1
Concurrency overhaul & reliability fixes
This release introduces a JobPool system for proper concurrency management, along with worker timeouts via AbortSignal. The Redis adapter has been refactored to use atomic Lua scripts, replacing the previous lock-based approach for better performance.
Workers now ensure pool completion before shutdown, making graceful termination more predictable.
Commits
- docs(readme): add more features (784f410)
- docs(readme): add benchmark section (b12d59f)
- test(adapter): improve active tracking cleanup (c7c785f)
- refactor(redis): remove verrou in favor of atomic lua script (ba69213)
- refactor(redis): do not check for delayed jobs every pop (648163e)
- chore: add some benchmark (458763e)
- refactor(worker): use AbortSignal for timeout (1da0daa)
- fix(worker): remove console.log (7830615)
- fix(worker): avoid memory leak with job initialization fail (47442eb)
- fix(worker): avoid memory leak when job class is not found (eed85f5)
- fix(worker): avoid memory leak by releasing the lease when job fail (db5fdd3)
- style: lint (62f34d6)
- fix(worker): ensure pool completion before shutting down (690cf06)
- style: lint fix (8e89885)
- feat(worker): add timeout possibility (8ac461f)
- feat: use job pool to ensure proper concurrency (38fd333)
- feat: add JobPool system (34bbbc8)
- chore: add basic readme (96ee73d)
Full Changelog: v0.0.1-alpha...v0.0.1-alpha.0
v0.0.1-alpha
Commits
- feat: setup build process (284c174)
- fix: correct typescript typing (d7539fc)
- chore: initial commit (d2548ce)
Full Changelog: https://github.com/boringnode/queue/commits/v0.0.1-alpha