Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ PLATFORMS
arm64-darwin-22
arm64-darwin-23
arm64-darwin-24
arm64-darwin-25
x86_64-darwin-21
x86_64-darwin-23
x86_64-linux
Expand Down
55 changes: 55 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,19 @@ It is recommended to set this value less than or equal to the queue database's c
- `concurrency_maintenance`: whether the dispatcher will perform the concurrency maintenance work. This is `true` by default, and it's useful if you don't use any [concurrency controls](#concurrency-controls) and want to disable it or if you run multiple dispatchers and want some of them to just dispatch jobs without doing anything else.


### Scheduler polling interval

The scheduler process checks for due recurring tasks and reloads dynamic tasks at a configurable interval. You can set this interval using the `polling_interval` key under the `scheduler` section in your `config/queue.yml`:

```yaml
scheduler:
polling_interval: 5 # seconds
```

This controls how frequently the scheduler wakes up to enqueue due recurring jobs and reload dynamic tasks.

> **Note:** The scheduler process always starts by default to support dynamic recurring tasks, even if no static tasks are configured in `config/recurring.yml`. If you don't use recurring tasks at all, you can disable the scheduler by setting `SOLID_QUEUE_SKIP_RECURRING=true` or passing `skip_recurring: true` in the configuration.

### Queue order and priorities

As mentioned above, if you specify a list of queues for a worker, these will be polled in the order given, such as for the list `real_time,background`, no jobs will be taken from `background` unless there aren't any more jobs waiting in `real_time`.
Expand Down Expand Up @@ -732,6 +745,48 @@ my_periodic_resque_job:

and the job will be enqueued via `perform_later` so it'll run in Resque. However, in this case we won't track any `solid_queue_recurring_execution` record for it and there won't be any guarantees that the job is enqueued only once each time.


### Scheduling and Unscheduling Recurring Tasks Dynamically

You can schedule and unschedule recurring tasks at runtime, without editing the configuration file. Use the following methods:

#### Scheduling a recurring task

```ruby
SolidQueue.schedule_task(
"my_dynamic_task",
class: "MyJob",
args: [1, 2],
schedule: "every 10 minutes"
)
```

This will create a dynamic recurring task with the given key, class, and schedule. The API accepts the same options as the YAML configuration: `class`, `args`, `command`, `schedule`, `queue`, `priority`, and `description`.

#### Unscheduling a recurring task

```ruby
SolidQueue.unschedule_task(key)
```

This will delete a dynamically scheduled recurring task by its key. If you attempt to unschedule a static (configuration-defined) recurring task, an error will be raised.

> **Note:** Static recurring tasks (those defined in `config/recurring.yml`) cannot be unscheduled at runtime. Attempting to do so will raise an error.

#### Example: Scheduling and unscheduling a recurring task

```ruby
# Schedule a new dynamic recurring task
SolidQueue.schedule_task(
"cleanup_temp_files",
class: "TempFileCleanerJob",
schedule: "every day at 2am"
)

# Unschedule the task later by key
SolidQueue.unschedule_task("cleanup_temp_files")
```

## Inspiration

Solid Queue has been inspired by [resque](https://github.com/resque/resque) and [GoodJob](https://github.com/bensheldon/good_job). We recommend checking out these projects as they're great examples from which we've learnt a lot.
Expand Down
1 change: 1 addition & 0 deletions app/models/solid_queue/recurring_task.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ class RecurringTask < Record
validate :existing_job_class

scope :static, -> { where(static: true) }
scope :dynamic, -> { where(static: false) }

has_many :recurring_executions, foreign_key: :task_key, primary_key: :key

Expand Down
2 changes: 2 additions & 0 deletions lib/generators/solid_queue/install/templates/config/queue.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ default: &default
threads: 3
processes: <%%= ENV.fetch("JOB_CONCURRENCY", 1) %>
polling_interval: 0.1
scheduler:
polling_interval: 1

development:
<<: *default
Expand Down
12 changes: 12 additions & 0 deletions lib/solid_queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,18 @@ module SolidQueue

delegate :on_start, :on_stop, :on_exit, to: Supervisor


def schedule_task(key, **options)
RecurringTask.from_configuration(key, **options).tap do |task|
task.static = false
task.save!
end
end

def unschedule_task(key)
RecurringTask.dynamic.find_by!(key:).destroy
end

[ Dispatcher, Scheduler, Worker ].each do |process|
define_singleton_method(:"on_#{process.name.demodulize.downcase}_start") do |&block|
process.on_start(&block)
Expand Down
29 changes: 21 additions & 8 deletions lib/solid_queue/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ def instantiate
concurrency_maintenance_interval: 600
}

SCHEDULER_DEFAULTS = {
polling_interval: 1
}

DEFAULT_CONFIG_FILE_PATH = "config/queue.yml"
DEFAULT_RECURRING_SCHEDULE_FILE_PATH = "config/recurring.yml"

Expand Down Expand Up @@ -137,11 +141,12 @@ def dispatchers
end

def schedulers
if !skip_recurring_tasks? && recurring_tasks.any?
[ Process.new(:scheduler, recurring_tasks: recurring_tasks) ]
else
[]
end
return [] if skip_recurring_tasks?

# Always start a scheduler (even with no static recurring tasks) to support
# dynamic tasks that may be added at runtime via SolidQueue.schedule_task.
# Use skip_recurring: true or SOLID_QUEUE_SKIP_RECURRING=true to disable.
[ Process.new(:scheduler, { recurring_tasks:, **scheduler_options.with_defaults(SCHEDULER_DEFAULTS) }) ]
end

def workers_options
Expand All @@ -154,6 +159,10 @@ def dispatchers_options
.map { |options| options.dup.symbolize_keys }
end

def scheduler_options
@scheduler_options ||= processes_config.fetch(:scheduler, {}).dup.symbolize_keys
end

def recurring_tasks
@recurring_tasks ||= recurring_tasks_config.map do |id, options|
RecurringTask.from_configuration(id, **options) if options&.has_key?(:schedule)
Expand All @@ -162,9 +171,13 @@ def recurring_tasks

def processes_config
@processes_config ||= config_from \
options.slice(:workers, :dispatchers).presence || options[:config_file],
keys: [ :workers, :dispatchers ],
fallback: { workers: [ WORKER_DEFAULTS ], dispatchers: [ DISPATCHER_DEFAULTS ] }
options.slice(:workers, :dispatchers, :scheduler).presence || options[:config_file],
keys: [ :workers, :dispatchers, :scheduler ],
fallback: {
workers: [ WORKER_DEFAULTS ],
dispatchers: [ DISPATCHER_DEFAULTS ],
scheduler: SCHEDULER_DEFAULTS
}
end

def recurring_tasks_config
Expand Down
4 changes: 4 additions & 0 deletions lib/solid_queue/processes/registrable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,9 @@ def heartbeat
self.process = nil
wake_up
end

def refresh_registered_process
wrap_in_app_executor { process&.update_columns(metadata: metadata.compact) }
end
end
end
15 changes: 11 additions & 4 deletions lib/solid_queue/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ class Scheduler < Processes::Base
include Processes::Runnable
include LifecycleHooks

attr_reader :recurring_schedule
attr_reader :recurring_schedule, :polling_interval

after_boot :run_start_hooks
after_boot :schedule_recurring_tasks
Expand All @@ -15,6 +15,8 @@ class Scheduler < Processes::Base

def initialize(recurring_tasks:, **options)
@recurring_schedule = RecurringSchedule.new(recurring_tasks)
options = options.dup.with_defaults(SolidQueue::Configuration::SCHEDULER_DEFAULTS)
@polling_interval = options[:polling_interval]

super(**options)
end
Expand All @@ -24,13 +26,18 @@ def metadata
end

private
SLEEP_INTERVAL = 60 # Right now it doesn't matter, can be set to 1 in the future for dynamic tasks

def run
loop do
break if shutting_down?

interruptible_sleep(SLEEP_INTERVAL)
recurring_schedule.reload!

if recurring_schedule.changed?
refresh_registered_process
recurring_schedule.clear_changes
end

interruptible_sleep(polling_interval)
end
ensure
SolidQueue.instrument(:shutdown_process, process: self) do
Expand Down
71 changes: 60 additions & 11 deletions lib/solid_queue/scheduler/recurring_schedule.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,26 @@ module SolidQueue
class Scheduler::RecurringSchedule
include AppExecutor

attr_reader :configured_tasks, :scheduled_tasks
attr_reader :scheduled_tasks

def initialize(tasks)
@configured_tasks = Array(tasks).map { |task| SolidQueue::RecurringTask.wrap(task) }.select(&:valid?)
@static_tasks = Array(tasks).map { |task| SolidQueue::RecurringTask.wrap(task) }.select(&:valid?)
@scheduled_tasks = Concurrent::Hash.new
@changes = Concurrent::Hash.new
end

def configured_tasks
static_tasks + dynamic_tasks.to_a
end

def empty?
configured_tasks.empty?
scheduled_tasks.empty? && dynamic_tasks.none?
end

def schedule_tasks
wrap_in_app_executor do
persist_tasks
reload_tasks
persist_static_tasks
reload_static_tasks
end

configured_tasks.each do |task|
Expand All @@ -36,17 +41,61 @@ def unschedule_tasks
end

def task_keys
configured_tasks.map(&:key)
static_task_keys + dynamic_tasks.pluck(:key)
end

def reload!
wrap_in_app_executor do
{ added_tasks: schedule_new_dynamic_tasks,
removed_tasks: unschedule_old_dynamic_tasks }.each do |key, values|
if values.any?
@changes[key] = values
else
@changes.delete(key)
end
end
end
end

def changed?
@changes.any?
end

def clear_changes
@changes.clear
end

private
def persist_tasks
SolidQueue::RecurringTask.static.where.not(key: task_keys).delete_all
SolidQueue::RecurringTask.create_or_update_all configured_tasks
attr_reader :static_tasks

def dynamic_tasks
SolidQueue::RecurringTask.dynamic
end

def static_task_keys
static_tasks.map(&:key)
end

def schedule_new_dynamic_tasks
dynamic_tasks.where.not(key: scheduled_tasks.keys).each do |task|
schedule_task(task)
end
end

def unschedule_old_dynamic_tasks
(scheduled_tasks.keys - SolidQueue::RecurringTask.pluck(:key)).each do |key|
scheduled_tasks[key].cancel
scheduled_tasks.delete(key)
end
end

def persist_static_tasks
SolidQueue::RecurringTask.static.where.not(key: static_task_keys).delete_all
SolidQueue::RecurringTask.create_or_update_all static_tasks
end

def reload_tasks
@configured_tasks = SolidQueue::RecurringTask.where(key: task_keys).to_a
def reload_static_tasks
@static_tasks = SolidQueue::RecurringTask.static.where(key: static_task_keys).to_a
end

def schedule(task)
Expand Down
50 changes: 50 additions & 0 deletions test/solid_queue_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,54 @@ class SolidQueueTest < ActiveSupport::TestCase
test "it has a version number" do
assert SolidQueue::VERSION
end

test "schedules recurring tasks" do
SolidQueue.schedule_task("test 1", command: "puts 1", schedule: "every hour")
SolidQueue.schedule_task("test 2", command: "puts 2", schedule: "every minute", static: true)

assert SolidQueue::RecurringTask.exists?(key: "test 1", command: "puts 1", schedule: "every hour", static: false)
assert SolidQueue::RecurringTask.exists?(key: "test 2", command: "puts 2", schedule: "every minute", static: false)
end

test "schedules recurring tasks with class and args (same keys as YAML config)" do
SolidQueue.schedule_task("test 3", class: "AddToBufferJob", args: [ 42 ], schedule: "every hour")

task = SolidQueue::RecurringTask.find_by!(key: "test 3")
assert_equal "AddToBufferJob", task.class_name
assert_equal [ 42 ], task.arguments
assert_equal false, task.static
end

test "unschedules recurring tasks" do
dynamic_task = SolidQueue::RecurringTask.create!(
key: "dynamic", command: "puts 'd'", schedule: "every day", static: false
)

static_task = SolidQueue::RecurringTask.create!(
key: "static", command: "puts 's'", schedule: "every week", static: true
)

SolidQueue.unschedule_task(dynamic_task.key)

assert_raises(ActiveRecord::RecordNotFound) do
SolidQueue.unschedule_task(static_task.key)
end

assert_not SolidQueue::RecurringTask.exists?(key: "dynamic", static: false)
assert SolidQueue::RecurringTask.exists?(key: "static", static: true)
end

test "schedule_task with duplicate key raises error" do
SolidQueue.schedule_task("duplicate_test", command: "puts 1", schedule: "every hour")

assert_raises(ActiveRecord::RecordNotUnique) do
SolidQueue.schedule_task("duplicate_test", command: "puts 2", schedule: "every minute")
end
end

test "unschedule_task with nonexistent key raises RecordNotFound" do
assert_raises(ActiveRecord::RecordNotFound) do
SolidQueue.unschedule_task("nonexistent_key")
end
end
end
6 changes: 3 additions & 3 deletions test/unit/configuration_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class ConfigurationTest < ActiveSupport::TestCase
test "default configuration when config given is empty" do
configuration = SolidQueue::Configuration.new(config_file: config_file_path(:empty_configuration), recurring_schedule_file: config_file_path(:empty_configuration))

assert_equal 2, configuration.configured_processes.count
assert_equal 3, configuration.configured_processes.count # includes scheduler for dynamic tasks
assert_processes configuration, :worker, 1, queues: "*"
assert_processes configuration, :dispatcher, 1, batch_size: SolidQueue::Configuration::DISPATCHER_DEFAULTS[:batch_size]
end
Expand Down Expand Up @@ -134,12 +134,12 @@ class ConfigurationTest < ActiveSupport::TestCase

configuration = SolidQueue::Configuration.new(recurring_schedule_file: config_file_path(:recurring_with_production_only))
assert configuration.valid?
assert_processes configuration, :scheduler, 0
assert_processes configuration, :scheduler, 1 # Starts in case of dynamic tasks

assert_output(/Provided configuration file '[^']+' does not exist\./) do
configuration = SolidQueue::Configuration.new(recurring_schedule_file: config_file_path(:recurring_with_empty))
assert configuration.valid?
assert_processes configuration, :scheduler, 0
assert_processes configuration, :scheduler, 1
end

# No processes
Expand Down
Loading
Loading