diff --git a/Gemfile.lock b/Gemfile.lock index ecb7ccf8..1f613581 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -193,6 +193,7 @@ PLATFORMS arm64-darwin-22 arm64-darwin-23 arm64-darwin-24 + arm64-darwin-25 x86_64-darwin-21 x86_64-darwin-23 x86_64-linux diff --git a/README.md b/README.md index b49f9c60..536c2f86 100644 --- a/README.md +++ b/README.md @@ -271,6 +271,19 @@ It is recommended to set this value less than or equal to the queue database's c - `concurrency_maintenance`: whether the dispatcher will perform the concurrency maintenance work. This is `true` by default, and it's useful if you don't use any [concurrency controls](#concurrency-controls) and want to disable it or if you run multiple dispatchers and want some of them to just dispatch jobs without doing anything else. +### Scheduler polling interval + +The scheduler process checks for due recurring tasks and reloads dynamic tasks at a configurable interval. You can set this interval using the `polling_interval` key under the `scheduler` section in your `config/queue.yml`: + +```yaml +scheduler: + polling_interval: 5 # seconds +``` + +This controls how frequently the scheduler wakes up to enqueue due recurring jobs and reload dynamic tasks. + +> **Note:** The scheduler process always starts by default to support dynamic recurring tasks, even if no static tasks are configured in `config/recurring.yml`. If you don't use recurring tasks at all, you can disable the scheduler by setting `SOLID_QUEUE_SKIP_RECURRING=true` or passing `skip_recurring: true` in the configuration. + ### Queue order and priorities As mentioned above, if you specify a list of queues for a worker, these will be polled in the order given, such as for the list `real_time,background`, no jobs will be taken from `background` unless there aren't any more jobs waiting in `real_time`. @@ -732,6 +745,48 @@ my_periodic_resque_job: and the job will be enqueued via `perform_later` so it'll run in Resque. However, in this case we won't track any `solid_queue_recurring_execution` record for it and there won't be any guarantees that the job is enqueued only once each time. + +### Scheduling and Unscheduling Recurring Tasks Dynamically + +You can schedule and unschedule recurring tasks at runtime, without editing the configuration file. Use the following methods: + +#### Scheduling a recurring task + +```ruby +SolidQueue.schedule_task( + "my_dynamic_task", + class: "MyJob", + args: [1, 2], + schedule: "every 10 minutes" +) +``` + +This will create a dynamic recurring task with the given key, class, and schedule. The API accepts the same options as the YAML configuration: `class`, `args`, `command`, `schedule`, `queue`, `priority`, and `description`. + +#### Unscheduling a recurring task + +```ruby +SolidQueue.unschedule_task(key) +``` + +This will delete a dynamically scheduled recurring task by its key. If you attempt to unschedule a static (configuration-defined) recurring task, an error will be raised. + +> **Note:** Static recurring tasks (those defined in `config/recurring.yml`) cannot be unscheduled at runtime. Attempting to do so will raise an error. + +#### Example: Scheduling and unscheduling a recurring task + +```ruby +# Schedule a new dynamic recurring task +SolidQueue.schedule_task( + "cleanup_temp_files", + class: "TempFileCleanerJob", + schedule: "every day at 2am" +) + +# Unschedule the task later by key +SolidQueue.unschedule_task("cleanup_temp_files") +``` + ## Inspiration Solid Queue has been inspired by [resque](https://github.com/resque/resque) and [GoodJob](https://github.com/bensheldon/good_job). We recommend checking out these projects as they're great examples from which we've learnt a lot. diff --git a/app/models/solid_queue/recurring_task.rb b/app/models/solid_queue/recurring_task.rb index 2a776c8a..5de7775f 100644 --- a/app/models/solid_queue/recurring_task.rb +++ b/app/models/solid_queue/recurring_task.rb @@ -11,6 +11,7 @@ class RecurringTask < Record validate :existing_job_class scope :static, -> { where(static: true) } + scope :dynamic, -> { where(static: false) } has_many :recurring_executions, foreign_key: :task_key, primary_key: :key diff --git a/lib/generators/solid_queue/install/templates/config/queue.yml b/lib/generators/solid_queue/install/templates/config/queue.yml index 15691e9d..d7b0e6b9 100644 --- a/lib/generators/solid_queue/install/templates/config/queue.yml +++ b/lib/generators/solid_queue/install/templates/config/queue.yml @@ -7,6 +7,8 @@ default: &default threads: 3 processes: <%%= ENV.fetch("JOB_CONCURRENCY", 1) %> polling_interval: 0.1 + scheduler: + polling_interval: 1 development: <<: *default diff --git a/lib/solid_queue.rb b/lib/solid_queue.rb index e0d51c8c..11994b76 100644 --- a/lib/solid_queue.rb +++ b/lib/solid_queue.rb @@ -43,6 +43,18 @@ module SolidQueue delegate :on_start, :on_stop, :on_exit, to: Supervisor + + def schedule_task(key, **options) + RecurringTask.from_configuration(key, **options).tap do |task| + task.static = false + task.save! + end + end + + def unschedule_task(key) + RecurringTask.dynamic.find_by!(key:).destroy + end + [ Dispatcher, Scheduler, Worker ].each do |process| define_singleton_method(:"on_#{process.name.demodulize.downcase}_start") do |&block| process.on_start(&block) diff --git a/lib/solid_queue/configuration.rb b/lib/solid_queue/configuration.rb index 94169ca7..21f7ad5d 100644 --- a/lib/solid_queue/configuration.rb +++ b/lib/solid_queue/configuration.rb @@ -28,6 +28,10 @@ def instantiate concurrency_maintenance_interval: 600 } + SCHEDULER_DEFAULTS = { + polling_interval: 1 + } + DEFAULT_CONFIG_FILE_PATH = "config/queue.yml" DEFAULT_RECURRING_SCHEDULE_FILE_PATH = "config/recurring.yml" @@ -137,11 +141,12 @@ def dispatchers end def schedulers - if !skip_recurring_tasks? && recurring_tasks.any? - [ Process.new(:scheduler, recurring_tasks: recurring_tasks) ] - else - [] - end + return [] if skip_recurring_tasks? + + # Always start a scheduler (even with no static recurring tasks) to support + # dynamic tasks that may be added at runtime via SolidQueue.schedule_task. + # Use skip_recurring: true or SOLID_QUEUE_SKIP_RECURRING=true to disable. + [ Process.new(:scheduler, { recurring_tasks:, **scheduler_options.with_defaults(SCHEDULER_DEFAULTS) }) ] end def workers_options @@ -154,6 +159,10 @@ def dispatchers_options .map { |options| options.dup.symbolize_keys } end + def scheduler_options + @scheduler_options ||= processes_config.fetch(:scheduler, {}).dup.symbolize_keys + end + def recurring_tasks @recurring_tasks ||= recurring_tasks_config.map do |id, options| RecurringTask.from_configuration(id, **options) if options&.has_key?(:schedule) @@ -162,9 +171,13 @@ def recurring_tasks def processes_config @processes_config ||= config_from \ - options.slice(:workers, :dispatchers).presence || options[:config_file], - keys: [ :workers, :dispatchers ], - fallback: { workers: [ WORKER_DEFAULTS ], dispatchers: [ DISPATCHER_DEFAULTS ] } + options.slice(:workers, :dispatchers, :scheduler).presence || options[:config_file], + keys: [ :workers, :dispatchers, :scheduler ], + fallback: { + workers: [ WORKER_DEFAULTS ], + dispatchers: [ DISPATCHER_DEFAULTS ], + scheduler: SCHEDULER_DEFAULTS + } end def recurring_tasks_config diff --git a/lib/solid_queue/processes/registrable.rb b/lib/solid_queue/processes/registrable.rb index c7428010..4edf03e3 100644 --- a/lib/solid_queue/processes/registrable.rb +++ b/lib/solid_queue/processes/registrable.rb @@ -59,5 +59,9 @@ def heartbeat self.process = nil wake_up end + + def refresh_registered_process + wrap_in_app_executor { process&.update_columns(metadata: metadata.compact) } + end end end diff --git a/lib/solid_queue/scheduler.rb b/lib/solid_queue/scheduler.rb index 3cec90fa..920c5d3a 100644 --- a/lib/solid_queue/scheduler.rb +++ b/lib/solid_queue/scheduler.rb @@ -5,7 +5,7 @@ class Scheduler < Processes::Base include Processes::Runnable include LifecycleHooks - attr_reader :recurring_schedule + attr_reader :recurring_schedule, :polling_interval after_boot :run_start_hooks after_boot :schedule_recurring_tasks @@ -15,6 +15,8 @@ class Scheduler < Processes::Base def initialize(recurring_tasks:, **options) @recurring_schedule = RecurringSchedule.new(recurring_tasks) + options = options.dup.with_defaults(SolidQueue::Configuration::SCHEDULER_DEFAULTS) + @polling_interval = options[:polling_interval] super(**options) end @@ -24,13 +26,18 @@ def metadata end private - SLEEP_INTERVAL = 60 # Right now it doesn't matter, can be set to 1 in the future for dynamic tasks - def run loop do break if shutting_down? - interruptible_sleep(SLEEP_INTERVAL) + recurring_schedule.reload! + + if recurring_schedule.changed? + refresh_registered_process + recurring_schedule.clear_changes + end + + interruptible_sleep(polling_interval) end ensure SolidQueue.instrument(:shutdown_process, process: self) do diff --git a/lib/solid_queue/scheduler/recurring_schedule.rb b/lib/solid_queue/scheduler/recurring_schedule.rb index b765edf1..12c7af3d 100644 --- a/lib/solid_queue/scheduler/recurring_schedule.rb +++ b/lib/solid_queue/scheduler/recurring_schedule.rb @@ -4,21 +4,26 @@ module SolidQueue class Scheduler::RecurringSchedule include AppExecutor - attr_reader :configured_tasks, :scheduled_tasks + attr_reader :scheduled_tasks def initialize(tasks) - @configured_tasks = Array(tasks).map { |task| SolidQueue::RecurringTask.wrap(task) }.select(&:valid?) + @static_tasks = Array(tasks).map { |task| SolidQueue::RecurringTask.wrap(task) }.select(&:valid?) @scheduled_tasks = Concurrent::Hash.new + @changes = Concurrent::Hash.new + end + + def configured_tasks + static_tasks + dynamic_tasks.to_a end def empty? - configured_tasks.empty? + scheduled_tasks.empty? && dynamic_tasks.none? end def schedule_tasks wrap_in_app_executor do - persist_tasks - reload_tasks + persist_static_tasks + reload_static_tasks end configured_tasks.each do |task| @@ -36,17 +41,61 @@ def unschedule_tasks end def task_keys - configured_tasks.map(&:key) + static_task_keys + dynamic_tasks.pluck(:key) + end + + def reload! + wrap_in_app_executor do + { added_tasks: schedule_new_dynamic_tasks, + removed_tasks: unschedule_old_dynamic_tasks }.each do |key, values| + if values.any? + @changes[key] = values + else + @changes.delete(key) + end + end + end + end + + def changed? + @changes.any? + end + + def clear_changes + @changes.clear end private - def persist_tasks - SolidQueue::RecurringTask.static.where.not(key: task_keys).delete_all - SolidQueue::RecurringTask.create_or_update_all configured_tasks + attr_reader :static_tasks + + def dynamic_tasks + SolidQueue::RecurringTask.dynamic + end + + def static_task_keys + static_tasks.map(&:key) + end + + def schedule_new_dynamic_tasks + dynamic_tasks.where.not(key: scheduled_tasks.keys).each do |task| + schedule_task(task) + end + end + + def unschedule_old_dynamic_tasks + (scheduled_tasks.keys - SolidQueue::RecurringTask.pluck(:key)).each do |key| + scheduled_tasks[key].cancel + scheduled_tasks.delete(key) + end + end + + def persist_static_tasks + SolidQueue::RecurringTask.static.where.not(key: static_task_keys).delete_all + SolidQueue::RecurringTask.create_or_update_all static_tasks end - def reload_tasks - @configured_tasks = SolidQueue::RecurringTask.where(key: task_keys).to_a + def reload_static_tasks + @static_tasks = SolidQueue::RecurringTask.static.where(key: static_task_keys).to_a end def schedule(task) diff --git a/test/solid_queue_test.rb b/test/solid_queue_test.rb index d6d61b57..542c3d82 100644 --- a/test/solid_queue_test.rb +++ b/test/solid_queue_test.rb @@ -4,4 +4,54 @@ class SolidQueueTest < ActiveSupport::TestCase test "it has a version number" do assert SolidQueue::VERSION end + + test "schedules recurring tasks" do + SolidQueue.schedule_task("test 1", command: "puts 1", schedule: "every hour") + SolidQueue.schedule_task("test 2", command: "puts 2", schedule: "every minute", static: true) + + assert SolidQueue::RecurringTask.exists?(key: "test 1", command: "puts 1", schedule: "every hour", static: false) + assert SolidQueue::RecurringTask.exists?(key: "test 2", command: "puts 2", schedule: "every minute", static: false) + end + + test "schedules recurring tasks with class and args (same keys as YAML config)" do + SolidQueue.schedule_task("test 3", class: "AddToBufferJob", args: [ 42 ], schedule: "every hour") + + task = SolidQueue::RecurringTask.find_by!(key: "test 3") + assert_equal "AddToBufferJob", task.class_name + assert_equal [ 42 ], task.arguments + assert_equal false, task.static + end + + test "unschedules recurring tasks" do + dynamic_task = SolidQueue::RecurringTask.create!( + key: "dynamic", command: "puts 'd'", schedule: "every day", static: false + ) + + static_task = SolidQueue::RecurringTask.create!( + key: "static", command: "puts 's'", schedule: "every week", static: true + ) + + SolidQueue.unschedule_task(dynamic_task.key) + + assert_raises(ActiveRecord::RecordNotFound) do + SolidQueue.unschedule_task(static_task.key) + end + + assert_not SolidQueue::RecurringTask.exists?(key: "dynamic", static: false) + assert SolidQueue::RecurringTask.exists?(key: "static", static: true) + end + + test "schedule_task with duplicate key raises error" do + SolidQueue.schedule_task("duplicate_test", command: "puts 1", schedule: "every hour") + + assert_raises(ActiveRecord::RecordNotUnique) do + SolidQueue.schedule_task("duplicate_test", command: "puts 2", schedule: "every minute") + end + end + + test "unschedule_task with nonexistent key raises RecordNotFound" do + assert_raises(ActiveRecord::RecordNotFound) do + SolidQueue.unschedule_task("nonexistent_key") + end + end end diff --git a/test/unit/configuration_test.rb b/test/unit/configuration_test.rb index 11c2a5ff..14db95cb 100644 --- a/test/unit/configuration_test.rb +++ b/test/unit/configuration_test.rb @@ -21,7 +21,7 @@ class ConfigurationTest < ActiveSupport::TestCase test "default configuration when config given is empty" do configuration = SolidQueue::Configuration.new(config_file: config_file_path(:empty_configuration), recurring_schedule_file: config_file_path(:empty_configuration)) - assert_equal 2, configuration.configured_processes.count + assert_equal 3, configuration.configured_processes.count # includes scheduler for dynamic tasks assert_processes configuration, :worker, 1, queues: "*" assert_processes configuration, :dispatcher, 1, batch_size: SolidQueue::Configuration::DISPATCHER_DEFAULTS[:batch_size] end @@ -134,12 +134,12 @@ class ConfigurationTest < ActiveSupport::TestCase configuration = SolidQueue::Configuration.new(recurring_schedule_file: config_file_path(:recurring_with_production_only)) assert configuration.valid? - assert_processes configuration, :scheduler, 0 + assert_processes configuration, :scheduler, 1 # Starts in case of dynamic tasks assert_output(/Provided configuration file '[^']+' does not exist\./) do configuration = SolidQueue::Configuration.new(recurring_schedule_file: config_file_path(:recurring_with_empty)) assert configuration.valid? - assert_processes configuration, :scheduler, 0 + assert_processes configuration, :scheduler, 1 end # No processes diff --git a/test/unit/scheduler_test.rb b/test/unit/scheduler_test.rb index 3e838c50..882dcd72 100644 --- a/test/unit/scheduler_test.rb +++ b/test/unit/scheduler_test.rb @@ -3,7 +3,7 @@ class SchedulerTest < ActiveSupport::TestCase self.use_transactional_tests = false - test "recurring schedule" do + test "recurring schedule (only static)" do recurring_tasks = { example_task: { class: "AddToBufferJob", schedule: "every hour", args: 42 } } scheduler = SolidQueue::Scheduler.new(recurring_tasks: recurring_tasks).tap(&:start) @@ -17,6 +17,41 @@ class SchedulerTest < ActiveSupport::TestCase scheduler.stop end + test "recurring schedule (only dynamic)" do + SolidQueue::RecurringTask.create( + key: "dynamic_task", static: false, class_name: "AddToBufferJob", schedule: "every second", arguments: [ 42 ] + ) + scheduler = SolidQueue::Scheduler.new(recurring_tasks: {}).tap(&:start) + + wait_for_registered_processes(1, timeout: 1.second) + + process = SolidQueue::Process.first + assert_equal "Scheduler", process.kind + + assert_metadata process, recurring_schedule: [ "dynamic_task" ] + ensure + scheduler.stop + end + + test "recurring schedule (static + dynamic)" do + SolidQueue::RecurringTask.create( + key: "dynamic_task", static: false, class_name: "AddToBufferJob", schedule: "every second", arguments: [ 42 ] + ) + + recurring_tasks = { static_task: { class: "AddToBufferJob", schedule: "every hour", args: 42 } } + + scheduler = SolidQueue::Scheduler.new(recurring_tasks: recurring_tasks).tap(&:start) + + wait_for_registered_processes(1, timeout: 1.second) + + process = SolidQueue::Process.first + assert_equal "Scheduler", process.kind + + assert_metadata process, recurring_schedule: [ "static_task", "dynamic_task" ] + ensure + scheduler.stop + end + test "run more than one instance of the scheduler with recurring tasks" do recurring_tasks = { example_task: { class: "AddToBufferJob", schedule: "every second", args: 42 } } schedulers = 2.times.collect do @@ -37,4 +72,79 @@ class SchedulerTest < ActiveSupport::TestCase end end end + + test "dynamic task actually enqueues jobs" do + SolidQueue::RecurringTask.create!( + key: "dynamic_enqueue_task", static: false, class_name: "AddToBufferJob", schedule: "every second", arguments: [ 42 ] + ) + + scheduler = SolidQueue::Scheduler.new(recurring_tasks: {}, polling_interval: 0.1).tap(&:start) + + wait_for_registered_processes(1, timeout: 1.second) + wait_while_with_timeout(3.seconds) { SolidQueue::Job.count < 1 } + + skip_active_record_query_cache do + assert SolidQueue::Job.count >= 1, "Expected at least one job to be enqueued by the dynamic task" + assert_equal SolidQueue::Job.count, SolidQueue::RecurringExecution.count + end + ensure + scheduler&.stop + end + + test "updates metadata after adding dynamic task post-start" do + scheduler = SolidQueue::Scheduler.new(recurring_tasks: {}, polling_interval: 0.1).tap(&:start) + + wait_for_registered_processes(1, timeout: 1.second) + + skip_active_record_query_cache do + process = SolidQueue::Process.first + # initially there are no recurring_schedule keys + assert_empty process.metadata + + # now create a dynamic task after the scheduler has booted + SolidQueue::RecurringTask.create!( + key: "new_dynamic_task", + static: false, + class_name: "AddToBufferJob", + schedule: "every second", + arguments: [ 42 ] + ) + + wait_while_with_timeout(3.seconds) { process.reload.metadata.empty? } + + # metadata should now include the new key + assert_metadata process, recurring_schedule: [ "new_dynamic_task" ] + end + ensure + scheduler&.stop + end + + test "updates metadata after removing dynamic task post-start" do + old_dynamic_task = SolidQueue::RecurringTask.create!( + key: "old_dynamic_task", + static: false, + class_name: "AddToBufferJob", + schedule: "every second", + arguments: [ 42 ] + ) + + scheduler = SolidQueue::Scheduler.new(recurring_tasks: {}, polling_interval: 0.1).tap(&:start) + + wait_for_registered_processes(1, timeout: 1.second) + + skip_active_record_query_cache do + process = SolidQueue::Process.first + # initially there is one recurring_schedule key + assert_metadata process, recurring_schedule: [ "old_dynamic_task" ] + + old_dynamic_task.destroy + + wait_while_with_timeout(3.seconds) { process.reload.metadata.present? } + + # The task is unscheduled after it's been removed, and it's reflected in the metadata + assert_empty process.metadata + end + ensure + scheduler&.stop + end end