Skip to content

Releases: boringnode/queue

All types are imported from /types subpath

15 Jan 17:39
v0.3.1
59556a0

Choose a tag to compare

Breaking Change (patch due to 0.3.0 being released < 10 minutes ago)

Types are no longer exported from the main entry point. Import them from the /types subpath instead:

- import type { AdapterFactory } from '@boringnode/queue'
+ import type { AdapterFactory } from '@boringnode/queue/types'

Also added missing type exports: AdapterFactory, DispatchManyResult, JobRecord, JobRetention, JobStatus.

Job Retention, Bulk Dispatch & Storage Redesign

15 Jan 16:40
v0.3.0
1fe1515

Choose a tag to compare

Breaking Changes

Redis + Knex storage layout

This release introduces a new storage layout for both Redis and Knex adapters. Existing data is incompatible and requires migration.

The storage architecture has been completely redesigned to support new features like job retention and status tracking. This change affects both Redis and Knex adapters.


Redis adapter migration

New storage layout

The Redis adapter now stores job payloads in a dedicated hash and tracks queue state in separate sets/hashes:

Key Type Description
jobs::<queue>::data Hash jobId -> job payload JSON
jobs::<queue>::pending Sorted Set jobId -> priority score
jobs::<queue>::delayed Sorted Set jobId -> executeAt timestamp
jobs::<queue>::active Hash jobId -> { workerId, acquiredAt }
jobs::<queue>::completed Hash jobId -> { finishedAt }
jobs::<queue>::completed::index Sorted Set jobId -> finishedAt (for pruning)
jobs::<queue>::failed Hash jobId -> { finishedAt, error }
jobs::<queue>::failed::index Sorted Set jobId -> finishedAt (for pruning)

Migration steps

Option 1: Flush existing data (recommended for alpha users)

redis-cli KEYS "jobs::*" | xargs redis-cli DEL

Option 2: Wait for existing jobs to complete

  1. Stop pushing new jobs
  2. Wait for all workers to drain existing queues
  3. Deploy new version
  4. Clean up old keys:
# Remove old format keys (adjust pattern to your prefix)
redis-cli KEYS "jobs::*" | grep -v "::data\|::pending\|::delayed\|::active\|::completed\|::failed" | xargs redis-cli DEL

Knex adapter migration

Schema changes

The Knex adapter now persists completed/failed job state. Existing tables need these changes:

Column Type Description
finished_at BIGINT (nullable) Completion/failure timestamp
error TEXT (nullable) Error message for failed jobs
status ENUM Add completed and failed values

New index for pruning queries:

  • (queue, status, finished_at)

Migration SQL

PostgreSQL:

-- Add new columns
ALTER TABLE queue_jobs ADD COLUMN finished_at BIGINT;
ALTER TABLE queue_jobs ADD COLUMN error TEXT;

-- Add new enum values (PostgreSQL specific)
ALTER TYPE queue_jobs_status ADD VALUE 'completed';
ALTER TYPE queue_jobs_status ADD VALUE 'failed';

-- Add index for pruning
CREATE INDEX idx_queue_jobs_finished ON queue_jobs (queue, status, finished_at);

MySQL:

-- Add new columns
ALTER TABLE queue_jobs ADD COLUMN finished_at BIGINT UNSIGNED NULL;
ALTER TABLE queue_jobs ADD COLUMN error TEXT NULL;

-- Modify enum to include new values
ALTER TABLE queue_jobs MODIFY COLUMN status ENUM('pending', 'active', 'delayed', 'completed', 'failed') NOT NULL;

-- Add index for pruning
CREATE INDEX idx_queue_jobs_finished ON queue_jobs (queue, status, finished_at);

SQLite:

-- Add new columns (SQLite doesn't enforce enum, so status just works)
ALTER TABLE queue_jobs ADD COLUMN finished_at INTEGER;
ALTER TABLE queue_jobs ADD COLUMN error TEXT;

-- Add index for pruning
CREATE INDEX idx_queue_jobs_finished ON queue_jobs (queue, status, finished_at);

Fresh install

For new installations, drop and recreate the table to get the new schema automatically:

DROP TABLE IF EXISTS queue_jobs;
-- Table will be recreated on first adapter use

New Features

Bulk job dispatch with dispatchMany

Jobs can now be dispatched in batches using Job.dispatchMany(). This is more efficient than calling dispatch() multiple times as it uses optimized batch operations (Redis MULTI/EXEC transaction, SQL batch insert).

// Dispatch multiple jobs at once
const { jobIds } = await SendEmailJob.dispatchMany([
  { to: 'user1@example.com', subject: 'Newsletter' },
  { to: 'user2@example.com', subject: 'Newsletter' },
  { to: 'user3@example.com', subject: 'Newsletter' },
])
  .group('newsletter-jan-2025')
  .toQueue('emails')
  .priority(3)
  .run()

console.log(`Dispatched ${jobIds.length} jobs`)

Use cases

  • Newsletters: Send thousands of emails in a single batch operation
  • Bulk exports: Create export jobs for multiple users
  • Data migrations: Queue many transformation jobs at once
  • Notifications: Dispatch notifications to many recipients

API

The JobBatchDispatcher supports the same fluent API as JobDispatcher:

await SendEmailJob.dispatchMany(payloads)
  .toQueue('emails')      // Target queue
  .group('batch-123')     // Group all jobs together
  .priority(1)            // Set priority for all jobs
  .with('redis')          // Use specific adapter
  .run()

Adapter API

For low-level access, adapters now support pushMany() and pushManyOn():

await adapter.pushManyOn('emails', [
  { id: 'uuid1', name: 'SendEmailJob', payload: {...}, attempts: 0 },
  { id: 'uuid2', name: 'SendEmailJob', payload: {...}, attempts: 0 },
])

Job grouping with groupId

Jobs can now be assigned to a group using the groupId option. This allows organizing related jobs together for easier monitoring and filtering in UIs.

// Assign jobs to a group
await SendEmailJob.dispatch({ to: 'user@example.com' })
  .group('newsletter-jan-2025')
  .run()

// Combine with other options
await ExportJob.dispatch({ userId: 1 })
  .group('batch-export-123')
  .toQueue('exports')
  .priority(2)
  .run()

Use cases

  • Batch operations: Group all jobs from a newsletter send, bulk export, or data migration
  • Monitoring: Filter and view related jobs together in queue UIs
  • Debugging: Easily find all jobs related to a specific operation

API

The groupId is stored with the job data and can be accessed via:

const record = await adapter.getJob('job-id', 'queue-name')
console.log(record.data.groupId) // 'newsletter-jan-2025'

Job retention

Jobs can now be kept in history after completion or failure using the removeOnComplete and removeOnFail options:

// Global configuration
await QueueManager.init({
  // ...
  defaultJobOptions: {
    removeOnComplete: false, // Keep all completed jobs
    removeOnFail: { count: 100 }, // Keep last 100 failed jobs
  },
})

// Per-queue configuration
await QueueManager.init({
  // ...
  queues: {
    critical: {
      defaultJobOptions: {
        removeOnFail: { age: '7d', count: 1000 }, // Keep for 7 days, max 1000
      },
    },
  },
})

// Per-job configuration (via Job class)
class MyJob extends Job {
  static options = {
    removeOnComplete: { count: 50 },
  }
}

Retention options

  • true (default): Remove job immediately after completion/failure
  • false: Keep job in history indefinitely
  • { age?: Duration, count?: number }: Keep with pruning by age and/or count

Job status API

A new getJob method allows retrieving job status and data:

const adapter = QueueManager.use()
const record = await adapter.getJob('job-id', 'queue-name')

if (record) {
  console.log(record.status) // 'pending' | 'active' | 'delayed' | 'completed' | 'failed'
  console.log(record.data) // Original job data
  console.log(record.finishedAt) // Timestamp (for completed/failed)
  console.log(record.error) // Error message (for failed)
}

Commits

  • docs: refactor README with new features and better structure (a0bc28c)
  • fix: prevent double-claim on SQLite with status guard (1a089dd)
  • feat: add dispatchMany for bulk job dispatch (9bc5fa8)
  • feat: add groupId support for job grouping (b2594d2)
  • docs(examples): add job retention options (bd3b8d1)
  • chore(tests): remove unnecessary locations config and debug logs (046faf1)
  • feat: add job history and retention (1db9f85)
  • feat: add job retention types (839f42e)
  • chore: add funding file (69f9f39)
  • docs: use markdown note (3f33166)

Full Changelog: v0.2.0...v0.3.0

DI-first Job Architecture & Simplified Naming

03 Jan 09:00
v0.2.0
9b17531

Choose a tag to compare

Breaking Changes

Job constructor reserved for dependency injection

The Job constructor no longer accepts payload and context parameters. These are now provided via the internal $hydrate() method called by the worker.

Before:

class SendEmailJob extends Job<SendEmailPayload> {
  constructor(payload: SendEmailPayload, context: JobContext) {
    super(payload, context)
  }
}

After:

class SendEmailJob extends Job<SendEmailPayload> {
  // Constructor for DI only
  constructor(private mailer: MailerService) {
    super()
  }
}

// Or simply omit the constructor if no DI needed
class SendEmailJob extends Job<SendEmailPayload> {
  async execute() { ... }
}

JobFactory signature simplified

The jobFactory now only receives JobClass. The worker handles payload/context via $hydrate().

Before: jobFactory: (JobClass, payload, context) => ...
After: jobFactory: (JobClass) => container.make(JobClass)

AbortSignal moved to this.signal

The signal parameter has been removed from execute(). Access it via this.signal instead.

Before: async execute(signal?: AbortSignal)
After: async execute() + use this.signal

static jobName replaced by options.name

Job name now defaults to the class name. The static jobName property is removed.

Before:

static readonly jobName = 'SendEmailJob'
static options = { queue: 'emails' }

After:

static options = { queue: 'emails' }
// name defaults to class name, or override with: name: 'CustomName'

Warning

If minifying code in production, always specify name explicitly.

Schedule jobNamename

ScheduleConfig.jobName and ScheduleData.jobName renamed to name. Database column renamed from job_name to name.

Warning

Existing databases require manual migration or table recreation.

New Features

  • this.signal - Access abort signal anywhere in the job instance for timeout handling
  • $hydrate() method - Internal method for worker to provide runtime data (payload, context, signal)

Commits

  • refactor!: rename jobName to name in Schedule types and database (00446c3)
  • refactor!: replace static jobName with options.name (ef3e218)
  • refactor!: separate dependency injection from job hydration (e1bc7b5)
  • docs: improve JSDoc documentation for types with default values (c12654a)

Full Changelog: v0.1.0...v0.2.0

Persistent Job Scheduling

02 Jan 12:10
v0.1.0
5283833

Choose a tag to compare

New Feature: Persistent Job Scheduling

Schedule jobs to run on a recurring basis using cron expressions or fixed intervals. Schedules are persisted in the database and survive worker restarts.

Creating Schedules

// Interval-based schedule (ID defaults to job name)
await MetricsJob.schedule({ endpoint: '/api/health' })
  .every('10s')
  .run()
  
// Cron-based schedule with custom ID
await CleanupJob.schedule({ days: 30 })
  .id('daily-cleanup')
  .cron('0 0 * * *')
  .timezone('Europe/Paris')
  .run()
  
// With constraints
await ReportJob.schedule({ type: 'weekly' })
  .cron('0 9 * * MON')
  .from(new Date('2024-01-01'))
  .to(new Date('2024-12-31'))
  .limit(52)
  .run()

Managing Schedules

import { Schedule } from '@boringnode/queue'

const schedule = await Schedule.find('MetricsJob')
await schedule.pause()
await schedule.resume()
await schedule.trigger()  // Immediate run
await schedule.delete()

// List schedules
const active = await Schedule.list({ status: 'active' })

Schedule Options

Method Description
.id(string) Unique identifier (defaults to job name)
.every(duration) Fixed interval ('5s', '1m', '1h', '1d')
.cron(expression) Cron schedule
.timezone(tz) Timezone for cron (default: 'UTC')
.from(date) / .to(date) Date boundaries
.between(from, to) Shorthand for date range
.limit(n) Maximum number of runs

How It Works

  • Schedules are persisted via the adapter (Redis, Knex)
  • Worker automatically polls and dispatches due schedules
  • Multiple workers can run concurrently - atomic claiming prevents duplicates
  • Failed jobs don't affect the schedule - next run still occurs

Job Context & Dependency Injection"

01 Jan 21:20
v0.0.1-alpha.4
a1cdb72

Choose a tag to compare

Breaking Changes

Job Constructor Signature

The Job constructor now requires a context parameter in addition to payload:

// Before
class MyJob extends Job<Payload> {
  constructor(payload: Payload) {
    super(payload)
  }
}

// After
import type { JobContext } from '@boringnode/queue/types'

class MyJob extends Job<Payload> {
  constructor(payload: Payload, context: JobContext) {
    super(payload, context)
  }
}

Type Imports

Types must now be imported from @boringnode/queue/types:

// Before
import type { JobContext } from '@boringnode/queue'
import type { JobOptions } from '@boringnode/queue/types/main'

// After
import type { JobContext, JobOptions } from '@boringnode/queue/types'

Worker Config Rename

The pollingInterval option has been renamed to idleDelay:

// Before
worker: {
  pollingInterval: '2s'
}

// After
worker: {
  idleDelay: '2s'
}

New Features

Job Context

Jobs now have access to execution metadata via this.context:

class MyJob extends Job<Payload> {
  async execute() {
    console.log(`Job ID: ${this.context.jobId}`)
    console.log(`Attempt: ${this.context.attempt}`) // 1, 2, 3...
    console.log(`Queue: ${this.context.queue}`)
    
    if (this.context.attempt > 1) {
      console.log('This is a retry!')
    }
  }
}

Available context properties:

Property Type Description
jobId string Unique job identifier
name string Job class name
attempt number Current attempt (1-based)
queue string Queue name
priority number Job priority
acquiredAt Date When job was acquired
stalledCount number Stalled recovery count

Dependency Injection Support

Custom job instantiation via jobFactory enables IoC container integration:

await QueueManager.init({
  default: 'redis',
  adapters: { redis: redis() },
  jobFactory: async (JobClass, payload, context) => {
    return app.container.make(JobClass, [payload, context])
  }
})

Graceful Shutdown

Workers now support graceful shutdown with configurable behavior:

const worker = new Worker({
  // ...
  worker: {
    gracefulShutdown: true, // default: true
    onShutdownSignal: async () => {
      console.log('Shutdown signal received, cleaning up...')
    }
  }
})
  • gracefulShutdown - Enable/disable automatic SIGINT/SIGTERM handling (default: true)
  • onShutdownSignal - Optional callback invoked when a shutdown signal is received

Custom Logger

You can now provide a custom logger (pino-compatible interface):

import { pino } from 'pino'

await QueueManager.init({
  default: 'redis',
  adapters: { redis: redis() },
  logger: pino()
})

The logger is used for warnings (e.g., when no jobs are found for configured locations) and errors.

Named Errors Export

All error classes are now exported via the errors namespace:

import { errors } from '@boringnode/queue'

try {
  await job.dispatch()
} catch (e) {
  if (e instanceof errors.E_JOB_NOT_FOUND) {
    // handle job not found
  }
  if (e instanceof errors.E_QUEUE_NOT_INITIALIZED) {
    // QueueManager.init() was not called
  }
}

Available errors:

  • E_JOB_NOT_FOUND - Job class not registered
  • E_JOB_TIMEOUT - Job exceeded timeout
  • E_JOB_MAX_ATTEMPTS_REACHED - Job exhausted all retries
  • E_QUEUE_NOT_INITIALIZED - QueueManager not initialized
  • E_ADAPTER_INIT_ERROR - Adapter factory threw an error
  • E_CONFIGURATION_ERROR - Invalid configuration

Commits

  • refactor!: consolidate type exports under /types path (5cec963)
  • feat!: expose job context to job instances (26d8043)
  • feat: add jobFactory option for dependency injection support (c1563b5)
  • docs: add JSDoc documentation to public APIs (592f4b9)
  • refactor!: extract magic numbers into named constants (df0688c)
  • fix: improve graceful shutdown in Worker (12c46f6)
  • chore: update lock file (ccdc019)
  • ci: add GitHub Actions workflow for checks (5bf7db3)
  • chore: add logger information (e23fbeb)
  • fix: improve error handling in QueueManager (658488d)
  • feat: add injectable logger system (e89f662)
  • feat: export errors and types for better DX (892fcf8)
  • test: add worker concurrency tests to prevent job duplication (d736521)
  • refactor: make locations optional in QueueManagerConfig (6d3be20)
  • fix(knex): prevent race condition with FOR UPDATE SKIP LOCKED (379dc03)
  • feat: add stalled jobs recovery mechanism (5b0970f)
  • fix(redis): remove unsafe popAndWait method with race condition (dd2bfa8)
  • chore: add drivers as peer deps (109bc08)

Do not use import path

12 Dec 22:50
v0.0.1-alpha.3
83900a7

Choose a tag to compare

Pre-release

Commits

  • refactor: do not use import path (ac2d041)

Full Changelog: v0.0.1-alpha.2...v0.0.1-alpha.3

Use .js to import files

12 Dec 22:11
v0.0.1-alpha.2
57ed42a

Choose a tag to compare

Pre-release

Commits

  • refactor: migrate to .js import (0696d20)
  • docs(readme): add knex information (c942cc4)

Full Changelog: v0.0.1-alpha.1...v0.0.1-alpha.2

Knex adapter & connection management

07 Dec 23:36
v0.0.1-alpha.1
d07c587

Choose a tag to compare

Pre-release

This release adds a Knex adapter, bringing SQL database support alongside Redis.
Connection handling has been improved: Redis now only closes connections it owns, making it safer to use shared/external connections. Adapter instances are now cached to avoid unnecessary re-instantiation.
On the performance side, pool filling now uses parallelized pop operations for faster job retrieval.

Commits

  • feat(adapter): add knex adapter (c41fc8b)
  • fix(manager): cache adapter instances (8cd76b8)
  • feat(redis): only quit the connection when we own it (89cb921)
  • docs(readme): add benchmark value with 100k jobs (45579d9)
  • docs(readme): update benchmark results with real stuff (b998045)
  • refactor(benchmark): allow to take duration as a params (df13753)
  • refactor(worker): parallelize pop for filling the pool (15cdc86)

Full Changelog: v0.0.1-alpha.0...v0.0.1-alpha.1

Concurrency overhaul & reliability fixes

07 Dec 21:18
v0.0.1-alpha.0
4c91774

Choose a tag to compare

This release introduces a JobPool system for proper concurrency management, along with worker timeouts via AbortSignal. The Redis adapter has been refactored to use atomic Lua scripts, replacing the previous lock-based approach for better performance.
Workers now ensure pool completion before shutdown, making graceful termination more predictable.

Commits

  • docs(readme): add more features (784f410)
  • docs(readme): add benchmark section (b12d59f)
  • test(adapter): improve active tracking cleanup (c7c785f)
  • refactor(redis): remove verrou in favor of atomic lua script (ba69213)
  • refactor(redis): do not check for delayed jobs every pop (648163e)
  • chore: add some benchmark (458763e)
  • refactor(worker): use AbortSignal for timeout (1da0daa)
  • fix(worker): remove console.log (7830615)
  • fix(worker): avoid memory leak with job initialization fail (47442eb)
  • fix(worker): avoid memory leak when job class is not found (eed85f5)
  • fix(worker): avoid memory leak by releasing the lease when job fail (db5fdd3)
  • style: lint (62f34d6)
  • fix(worker): ensure pool completion before shutting down (690cf06)
  • style: lint fix (8e89885)
  • feat(worker): add timeout possibility (8ac461f)
  • feat: use job pool to ensure proper concurrency (38fd333)
  • feat: add JobPool system (34bbbc8)
  • chore: add basic readme (96ee73d)

Full Changelog: v0.0.1-alpha...v0.0.1-alpha.0

v0.0.1-alpha

21 Nov 15:08
v0.0.1-alpha
3b0517c

Choose a tag to compare

v0.0.1-alpha Pre-release
Pre-release

Commits

  • feat: setup build process (284c174)
  • fix: correct typescript typing (d7539fc)
  • chore: initial commit (d2548ce)

Full Changelog: https://github.com/boringnode/queue/commits/v0.0.1-alpha