From b002752029597bf03d5aef1fc852948ff8c21bd3 Mon Sep 17 00:00:00 2001 From: Katharina Przybill <30441792+kathap@users.noreply.github.com> Date: Tue, 7 Apr 2026 14:04:36 +0200 Subject: [PATCH 1/3] Fix broker stuck in SYNCHRONIZING on DB error during rollback When a service broker update job fails and attempts to revert the broker state, a database connection failure could cause the job to crash without properly handling the original error. This left the broker stuck in SYNCHRONIZING state with a FAILED job. This change wraps the state rollback operation in error handling to catch database errors and allow the original exception to be raised and the job to be retried properly. Changes: - app/jobs/v3/services/update_broker_job.rb: Add error handling around ServiceBroker.where().update() call in rescue block to gracefully handle database disconnections during state rollback - spec/unit/jobs/v3/services/update_broker_job_spec.rb: Add test case for database disconnect during state rollback --- app/jobs/v3/services/update_broker_job.rb | 11 +++++--- .../v3/services/update_broker_job_spec.rb | 27 +++++++++++++++++++ 2 files changed, 35 insertions(+), 3 deletions(-) diff --git a/app/jobs/v3/services/update_broker_job.rb b/app/jobs/v3/services/update_broker_job.rb index f8e2c219d88..59f092acc19 100644 --- a/app/jobs/v3/services/update_broker_job.rb +++ b/app/jobs/v3/services/update_broker_job.rb @@ -66,13 +66,18 @@ def perform @warnings rescue StandardError => e - ServiceBroker.where(id: update_request.service_broker_id).update(state: previous_broker_state) + begin + ServiceBroker.where(id: update_request.service_broker_id).update(state: previous_broker_state) if update_request + rescue StandardError + # Swallow errors during state rollback (e.g., DB connection issues) + # so the original exception can be raised and the job retried + end raise V3::ServiceBrokerUpdate::InvalidServiceBroker.new(e.message) if e.is_a?(Sequel::ValidationFailed) - raise e + raise ensure - update_request.destroy + update_request.destroy if update_request end private diff --git a/spec/unit/jobs/v3/services/update_broker_job_spec.rb b/spec/unit/jobs/v3/services/update_broker_job_spec.rb index 0d3727f4afa..de0ab4df244 100644 --- a/spec/unit/jobs/v3/services/update_broker_job_spec.rb +++ b/spec/unit/jobs/v3/services/update_broker_job_spec.rb @@ -446,6 +446,33 @@ module V3 end end + context 'when database disconnects during state rollback' do + let(:catalog_error) { StandardError.new('Catalog fetch failed') } + + before do + allow_any_instance_of(VCAP::CloudController::V3::ServiceBrokerCatalogUpdater).to receive(:refresh).and_raise(catalog_error) + + mock_dataset = instance_double(Sequel::Postgres::Dataset) + allow(mock_dataset).to receive(:update).and_raise(Sequel::DatabaseDisconnectError.new('connection lost')) + + allow(ServiceBroker).to receive(:where).and_call_original + allow(ServiceBroker).to receive(:where).with(id: broker.id).and_return(mock_dataset) + end + + it 'swallows the database error and re-raises the original catalog error' do + expect { job.perform }.to raise_error(catalog_error) + end + + it 'does not raise a database connection error' do + expect { job.perform }.not_to raise_error(Sequel::DatabaseDisconnectError) + end + + it 'still cleans up the update request' do + expect { job.perform }.to raise_error(catalog_error) + expect(ServiceBrokerUpdateRequest.where(id: update_broker_request.id).all).to be_empty + end + end + context 'when the broker ceases to exist during the job' do it 'raises a ServiceBrokerGone error' do broker.destroy From 1c425fce49707d8e57a73317ad7bfe1a28ec4af1 Mon Sep 17 00:00:00 2001 From: Katharina Przybill <30441792+kathap@users.noreply.github.com> Date: Fri, 10 Apr 2026 08:51:56 +0200 Subject: [PATCH 2/3] Add failure recovery hook to prevent stuck brokers When UpdateBrokerJob exhausts retries and transitions to FAILED, invoke recover_from_failure to revert broker from SYNCHRONIZING back to previous state. This ensures brokers don't remain stuck when jobs fail during extended database outages. Changes: - UpdateBrokerJob: Add recover_from_failure method with conditional WHERE clause to safely revert SYNCHRONIZING brokers - PollableJobWrapper: Call recover_from_failure hook in failure method - Extract rollback_broker_state and destroy_update_request helpers for cleaner error handling - Add tests for recovery hook behavior and edge cases --- app/jobs/pollable_job_wrapper.rb | 7 +++ app/jobs/v3/services/update_broker_job.rb | 32 +++++++--- spec/unit/jobs/pollable_job_wrapper_spec.rb | 49 +++++++++++++++ .../v3/services/update_broker_job_spec.rb | 59 +++++++++++++++++++ 4 files changed, 140 insertions(+), 7 deletions(-) diff --git a/app/jobs/pollable_job_wrapper.rb b/app/jobs/pollable_job_wrapper.rb index eb1fe78fdab..6c1b20c1de1 100644 --- a/app/jobs/pollable_job_wrapper.rb +++ b/app/jobs/pollable_job_wrapper.rb @@ -70,6 +70,13 @@ def error(job, exception) end def failure(job) + begin + unwrapped_job = Enqueuer.unwrap_job(@handler) + unwrapped_job.recover_from_failure if unwrapped_job.respond_to?(:recover_from_failure) + rescue StandardError => e + logger.error("failure recovery failed for #{unwrapped_job.class.name}: #{e.class}: #{e.message}") + end + change_state(job, PollableJobModel::FAILED_STATE) end diff --git a/app/jobs/v3/services/update_broker_job.rb b/app/jobs/v3/services/update_broker_job.rb index 59f092acc19..6463f6eb912 100644 --- a/app/jobs/v3/services/update_broker_job.rb +++ b/app/jobs/v3/services/update_broker_job.rb @@ -37,6 +37,14 @@ def display_name 'service_broker.update' end + def recover_from_failure + ServiceBroker.where(guid: broker_guid, state: ServiceBrokerStateEnum::SYNCHRONIZING). + update(state: previous_broker_state) + rescue StandardError => e + logger = Steno.logger('cc.background') + logger.error("Failed to recover broker state for #{broker_guid}: #{e.class}: #{e.message}") + end + private attr_reader :update_request_guid, :broker_guid, :previous_broker_state, :user_audit_info @@ -66,22 +74,32 @@ def perform @warnings rescue StandardError => e - begin - ServiceBroker.where(id: update_request.service_broker_id).update(state: previous_broker_state) if update_request - rescue StandardError - # Swallow errors during state rollback (e.g., DB connection issues) - # so the original exception can be raised and the job retried - end + rollback_broker_state raise V3::ServiceBrokerUpdate::InvalidServiceBroker.new(e.message) if e.is_a?(Sequel::ValidationFailed) raise ensure - update_request.destroy if update_request + destroy_update_request end private + def rollback_broker_state + return unless update_request + + ServiceBroker.where(id: update_request.service_broker_id, state: ServiceBrokerStateEnum::SYNCHRONIZING). + update(state: previous_broker_state) + rescue StandardError + # Best effort only; wrapper failure hook will retry + end + + def destroy_update_request + update_request&.destroy + rescue StandardError + # Don't mask original failure + end + def update_params params = {} params[:name] = update_request.name unless update_request.name.nil? diff --git a/spec/unit/jobs/pollable_job_wrapper_spec.rb b/spec/unit/jobs/pollable_job_wrapper_spec.rb index 0db1f850068..f09ca359c30 100644 --- a/spec/unit/jobs/pollable_job_wrapper_spec.rb +++ b/spec/unit/jobs/pollable_job_wrapper_spec.rb @@ -201,6 +201,55 @@ class BigException < StandardError execute_all_jobs(expected_successes: 0, expected_failures: 1) end end + + context 'when the job implements recover_from_failure' do + let(:broker) do + VCAP::CloudController::ServiceBroker.create( + name: 'recovery-test-broker', + broker_url: 'http://example.org/broker-url', + auth_username: 'username', + auth_password: 'password', + state: VCAP::CloudController::ServiceBrokerStateEnum::SYNCHRONIZING + ) + end + + let(:update_request) do + VCAP::CloudController::ServiceBrokerUpdateRequest.create( + name: 'new-name', + service_broker_id: broker.id + ) + end + + let(:user_audit_info) { instance_double(VCAP::CloudController::UserAuditInfo, { user_guid: Sham.guid }) } + let(:update_job) { VCAP::CloudController::V3::UpdateBrokerJob.new(update_request.guid, broker.guid, 'AVAILABLE', user_audit_info:) } + let(:pollable_job) { PollableJobWrapper.new(update_job) } + + before do + allow_any_instance_of(VCAP::CloudController::V3::UpdateBrokerJob::Perform).to receive(:perform).and_raise(StandardError.new('job failed')) + end + + it 'calls recover_from_failure and reverts broker state to AVAILABLE' do + enqueued_job = VCAP::CloudController::Jobs::Enqueuer.new.enqueue(pollable_job) + VCAP::CloudController::PollableJobModel.make(delayed_job_guid: enqueued_job.guid, state: 'PROCESSING') + + execute_all_jobs(expected_successes: 0, expected_failures: 1) + + broker.reload + expect(broker.state).to eq('AVAILABLE') + end + end + + context 'when the job does not implement recover_from_failure' do + it 'still marks the job as FAILED without error' do + enqueued_job = VCAP::CloudController::Jobs::Enqueuer.new.enqueue(pollable_job) + job_model = VCAP::CloudController::PollableJobModel.make(delayed_job_guid: enqueued_job.guid, state: 'PROCESSING') + + execute_all_jobs(expected_successes: 0, expected_failures: 1) + + job_model.reload + expect(job_model.state).to eq('FAILED') + end + end end end diff --git a/spec/unit/jobs/v3/services/update_broker_job_spec.rb b/spec/unit/jobs/v3/services/update_broker_job_spec.rb index de0ab4df244..34e4a489163 100644 --- a/spec/unit/jobs/v3/services/update_broker_job_spec.rb +++ b/spec/unit/jobs/v3/services/update_broker_job_spec.rb @@ -484,6 +484,65 @@ module V3 end end + describe '#recover_from_failure' do + let(:previous_state) { ServiceBrokerStateEnum::AVAILABLE } + + subject(:job) do + UpdateBrokerJob.new(update_broker_request.guid, broker.guid, previous_state, user_audit_info:) + end + + context 'when broker is in SYNCHRONIZING state' do + before do + broker.update(state: ServiceBrokerStateEnum::SYNCHRONIZING) + end + + it 'reverts the broker to the previous state' do + job.recover_from_failure + + broker.reload + expect(broker.state).to eq(ServiceBrokerStateEnum::AVAILABLE) + end + end + + context 'when broker is not in SYNCHRONIZING state' do + before do + broker.update(state: ServiceBrokerStateEnum::AVAILABLE) + end + + it 'does not change the broker state' do + job.recover_from_failure + + broker.reload + expect(broker.state).to eq(ServiceBrokerStateEnum::AVAILABLE) + end + end + + context 'when broker state has changed to something else' do + before do + broker.update(state: ServiceBrokerStateEnum::DELETE_IN_PROGRESS) + end + + it 'does not overwrite the newer state' do + job.recover_from_failure + + broker.reload + expect(broker.state).to eq(ServiceBrokerStateEnum::DELETE_IN_PROGRESS) + end + end + + context 'when database error occurs during recovery' do + before do + broker.update(state: ServiceBrokerStateEnum::SYNCHRONIZING) + allow(ServiceBroker).to receive(:where).and_raise(Sequel::DatabaseError.new('connection lost')) + allow(Steno).to receive(:logger).and_return(double(error: nil)) + end + + it 'logs the error and does not raise' do + expect { job.recover_from_failure }.not_to raise_error + end + end + end + def setup_broker_with_invalid_catalog catalog = instance_double(Services::ServiceBrokers::V2::Catalog) From c9dc2eeded82bb97f04e151507289ee7fc466b73 Mon Sep 17 00:00:00 2001 From: Katharina Przybill <30441792+kathap@users.noreply.github.com> Date: Tue, 14 Apr 2026 12:11:55 +0200 Subject: [PATCH 3/3] Add recover_from_failure hook to SynchronizeBrokerCatalogJob When a create-service-broker job fails during a database outage, the broker can get stuck in SYNCHRONIZING state. The failure occurs after the state was set but before the rescue block can revert it, leaving the broker permanently unusable. This adds a recover_from_failure hook that runs when the delayed_job transitions to FAILED state (after all retries are exhausted). At this point the database is likely back up, allowing the broker state to be set to SYNCHRONIZATION_FAILED. The WHERE clause ensures we only update brokers still in SYNCHRONIZING state, preventing overwrites of newer state changes. This mirrors the fix applied to UpdateBrokerJob and ensures both create and update operations handle database failures gracefully. --- app/jobs/pollable_job_wrapper.rb | 4 ++-- .../synchronize_broker_catalog_job.rb | 8 +++++++ app/jobs/v3/services/update_broker_job.rb | 2 +- app/jobs/wrapping_job.rb | 4 ++++ spec/unit/jobs/pollable_job_wrapper_spec.rb | 2 +- .../v3/services/update_broker_job_spec.rb | 4 ++-- spec/unit/jobs/wrapping_job_spec.rb | 22 +++++++++++++++++++ 7 files changed, 40 insertions(+), 6 deletions(-) diff --git a/app/jobs/pollable_job_wrapper.rb b/app/jobs/pollable_job_wrapper.rb index 6c1b20c1de1..4572198eca1 100644 --- a/app/jobs/pollable_job_wrapper.rb +++ b/app/jobs/pollable_job_wrapper.rb @@ -72,9 +72,9 @@ def error(job, exception) def failure(job) begin unwrapped_job = Enqueuer.unwrap_job(@handler) - unwrapped_job.recover_from_failure if unwrapped_job.respond_to?(:recover_from_failure) + unwrapped_job.recover_from_failure rescue StandardError => e - logger.error("failure recovery failed for #{unwrapped_job.class.name}: #{e.class}: #{e.message}") + logger.error("failure recovery failed: #{e.class}: #{e.message}") end change_state(job, PollableJobModel::FAILED_STATE) diff --git a/app/jobs/v3/services/synchronize_broker_catalog_job.rb b/app/jobs/v3/services/synchronize_broker_catalog_job.rb index 0660993da29..785105c281a 100644 --- a/app/jobs/v3/services/synchronize_broker_catalog_job.rb +++ b/app/jobs/v3/services/synchronize_broker_catalog_job.rb @@ -34,6 +34,14 @@ def display_name 'service_broker.catalog.synchronize' end + def recover_from_failure + ServiceBroker.where(guid: broker_guid, state: ServiceBrokerStateEnum::SYNCHRONIZING). + update(state: ServiceBrokerStateEnum::SYNCHRONIZATION_FAILED) + rescue StandardError => e + logger = Steno.logger('cc.background') + logger.error("Failed to recover broker state for #{broker_guid}: #{e.class}: #{e.message}") + end + private attr_reader :broker_guid, :user_audit_info diff --git a/app/jobs/v3/services/update_broker_job.rb b/app/jobs/v3/services/update_broker_job.rb index 6463f6eb912..fd090741742 100644 --- a/app/jobs/v3/services/update_broker_job.rb +++ b/app/jobs/v3/services/update_broker_job.rb @@ -39,7 +39,7 @@ def display_name def recover_from_failure ServiceBroker.where(guid: broker_guid, state: ServiceBrokerStateEnum::SYNCHRONIZING). - update(state: previous_broker_state) + update(state: ServiceBrokerStateEnum::SYNCHRONIZATION_FAILED) rescue StandardError => e logger = Steno.logger('cc.background') logger.error("Failed to recover broker state for #{broker_guid}: #{e.class}: #{e.message}") diff --git a/app/jobs/wrapping_job.rb b/app/jobs/wrapping_job.rb index 4bb85144147..94e001fa8f0 100644 --- a/app/jobs/wrapping_job.rb +++ b/app/jobs/wrapping_job.rb @@ -51,6 +51,10 @@ def error(job, e) handler.error(job, e) if handler.respond_to?(:error) end + def recover_from_failure + handler.recover_from_failure if handler.respond_to?(:recover_from_failure) + end + def display_name handler.respond_to?(:display_name) ? handler.display_name : handler.class.name end diff --git a/spec/unit/jobs/pollable_job_wrapper_spec.rb b/spec/unit/jobs/pollable_job_wrapper_spec.rb index f09ca359c30..f26ec3a478b 100644 --- a/spec/unit/jobs/pollable_job_wrapper_spec.rb +++ b/spec/unit/jobs/pollable_job_wrapper_spec.rb @@ -235,7 +235,7 @@ class BigException < StandardError execute_all_jobs(expected_successes: 0, expected_failures: 1) broker.reload - expect(broker.state).to eq('AVAILABLE') + expect(broker.state).to eq('SYNCHRONIZATION_FAILED') end end diff --git a/spec/unit/jobs/v3/services/update_broker_job_spec.rb b/spec/unit/jobs/v3/services/update_broker_job_spec.rb index 34e4a489163..f3d5dc1b30d 100644 --- a/spec/unit/jobs/v3/services/update_broker_job_spec.rb +++ b/spec/unit/jobs/v3/services/update_broker_job_spec.rb @@ -496,11 +496,11 @@ module V3 broker.update(state: ServiceBrokerStateEnum::SYNCHRONIZING) end - it 'reverts the broker to the previous state' do + it 'sets the broker to SYNCHRONIZATION_FAILED' do job.recover_from_failure broker.reload - expect(broker.state).to eq(ServiceBrokerStateEnum::AVAILABLE) + expect(broker.state).to eq(ServiceBrokerStateEnum::SYNCHRONIZATION_FAILED) end end diff --git a/spec/unit/jobs/wrapping_job_spec.rb b/spec/unit/jobs/wrapping_job_spec.rb index f6a5801bd72..a6a2ae1e04a 100644 --- a/spec/unit/jobs/wrapping_job_spec.rb +++ b/spec/unit/jobs/wrapping_job_spec.rb @@ -198,6 +198,28 @@ module Jobs end end end + + describe '#recover_from_failure' do + context 'when the wrapped job has the recover_from_failure method defined' do + it 'delegates to the handler' do + handler = double('Job', recover_from_failure: nil) + job = WrappingJob.new(handler) + + expect(handler).to receive(:recover_from_failure) + job.recover_from_failure + end + end + + context 'when the wrapped job does not have the recover_from_failure method defined' do + it 'does not raise an exception' do + handler = Object.new + job = WrappingJob.new(handler) + expect do + job.recover_from_failure + end.not_to raise_error + end + end + end end end end