diff --git a/app/jobs/pollable_job_wrapper.rb b/app/jobs/pollable_job_wrapper.rb index eb1fe78fdab..4572198eca1 100644 --- a/app/jobs/pollable_job_wrapper.rb +++ b/app/jobs/pollable_job_wrapper.rb @@ -70,6 +70,13 @@ def error(job, exception) end def failure(job) + begin + unwrapped_job = Enqueuer.unwrap_job(@handler) + unwrapped_job.recover_from_failure + rescue StandardError => e + logger.error("failure recovery failed: #{e.class}: #{e.message}") + end + change_state(job, PollableJobModel::FAILED_STATE) end diff --git a/app/jobs/v3/services/synchronize_broker_catalog_job.rb b/app/jobs/v3/services/synchronize_broker_catalog_job.rb index 0660993da29..785105c281a 100644 --- a/app/jobs/v3/services/synchronize_broker_catalog_job.rb +++ b/app/jobs/v3/services/synchronize_broker_catalog_job.rb @@ -34,6 +34,14 @@ def display_name 'service_broker.catalog.synchronize' end + def recover_from_failure + ServiceBroker.where(guid: broker_guid, state: ServiceBrokerStateEnum::SYNCHRONIZING). + update(state: ServiceBrokerStateEnum::SYNCHRONIZATION_FAILED) + rescue StandardError => e + logger = Steno.logger('cc.background') + logger.error("Failed to recover broker state for #{broker_guid}: #{e.class}: #{e.message}") + end + private attr_reader :broker_guid, :user_audit_info diff --git a/app/jobs/v3/services/update_broker_job.rb b/app/jobs/v3/services/update_broker_job.rb index f8e2c219d88..fd090741742 100644 --- a/app/jobs/v3/services/update_broker_job.rb +++ b/app/jobs/v3/services/update_broker_job.rb @@ -37,6 +37,14 @@ def display_name 'service_broker.update' end + def recover_from_failure + ServiceBroker.where(guid: broker_guid, state: ServiceBrokerStateEnum::SYNCHRONIZING). + update(state: ServiceBrokerStateEnum::SYNCHRONIZATION_FAILED) + rescue StandardError => e + logger = Steno.logger('cc.background') + logger.error("Failed to recover broker state for #{broker_guid}: #{e.class}: #{e.message}") + end + private attr_reader :update_request_guid, :broker_guid, :previous_broker_state, :user_audit_info @@ -66,17 +74,32 @@ def perform @warnings rescue StandardError => e - ServiceBroker.where(id: update_request.service_broker_id).update(state: previous_broker_state) + rollback_broker_state raise V3::ServiceBrokerUpdate::InvalidServiceBroker.new(e.message) if e.is_a?(Sequel::ValidationFailed) - raise e + raise ensure - update_request.destroy + destroy_update_request end private + def rollback_broker_state + return unless update_request + + ServiceBroker.where(id: update_request.service_broker_id, state: ServiceBrokerStateEnum::SYNCHRONIZING). + update(state: previous_broker_state) + rescue StandardError + # Best effort only; wrapper failure hook will retry + end + + def destroy_update_request + update_request&.destroy + rescue StandardError + # Don't mask original failure + end + def update_params params = {} params[:name] = update_request.name unless update_request.name.nil? diff --git a/app/jobs/wrapping_job.rb b/app/jobs/wrapping_job.rb index 4bb85144147..94e001fa8f0 100644 --- a/app/jobs/wrapping_job.rb +++ b/app/jobs/wrapping_job.rb @@ -51,6 +51,10 @@ def error(job, e) handler.error(job, e) if handler.respond_to?(:error) end + def recover_from_failure + handler.recover_from_failure if handler.respond_to?(:recover_from_failure) + end + def display_name handler.respond_to?(:display_name) ? handler.display_name : handler.class.name end diff --git a/spec/unit/jobs/pollable_job_wrapper_spec.rb b/spec/unit/jobs/pollable_job_wrapper_spec.rb index 0db1f850068..f26ec3a478b 100644 --- a/spec/unit/jobs/pollable_job_wrapper_spec.rb +++ b/spec/unit/jobs/pollable_job_wrapper_spec.rb @@ -201,6 +201,55 @@ class BigException < StandardError execute_all_jobs(expected_successes: 0, expected_failures: 1) end end + + context 'when the job implements recover_from_failure' do + let(:broker) do + VCAP::CloudController::ServiceBroker.create( + name: 'recovery-test-broker', + broker_url: 'http://example.org/broker-url', + auth_username: 'username', + auth_password: 'password', + state: VCAP::CloudController::ServiceBrokerStateEnum::SYNCHRONIZING + ) + end + + let(:update_request) do + VCAP::CloudController::ServiceBrokerUpdateRequest.create( + name: 'new-name', + service_broker_id: broker.id + ) + end + + let(:user_audit_info) { instance_double(VCAP::CloudController::UserAuditInfo, { user_guid: Sham.guid }) } + let(:update_job) { VCAP::CloudController::V3::UpdateBrokerJob.new(update_request.guid, broker.guid, 'AVAILABLE', user_audit_info:) } + let(:pollable_job) { PollableJobWrapper.new(update_job) } + + before do + allow_any_instance_of(VCAP::CloudController::V3::UpdateBrokerJob::Perform).to receive(:perform).and_raise(StandardError.new('job failed')) + end + + it 'calls recover_from_failure and reverts broker state to AVAILABLE' do + enqueued_job = VCAP::CloudController::Jobs::Enqueuer.new.enqueue(pollable_job) + VCAP::CloudController::PollableJobModel.make(delayed_job_guid: enqueued_job.guid, state: 'PROCESSING') + + execute_all_jobs(expected_successes: 0, expected_failures: 1) + + broker.reload + expect(broker.state).to eq('SYNCHRONIZATION_FAILED') + end + end + + context 'when the job does not implement recover_from_failure' do + it 'still marks the job as FAILED without error' do + enqueued_job = VCAP::CloudController::Jobs::Enqueuer.new.enqueue(pollable_job) + job_model = VCAP::CloudController::PollableJobModel.make(delayed_job_guid: enqueued_job.guid, state: 'PROCESSING') + + execute_all_jobs(expected_successes: 0, expected_failures: 1) + + job_model.reload + expect(job_model.state).to eq('FAILED') + end + end end end diff --git a/spec/unit/jobs/v3/services/update_broker_job_spec.rb b/spec/unit/jobs/v3/services/update_broker_job_spec.rb index 0d3727f4afa..f3d5dc1b30d 100644 --- a/spec/unit/jobs/v3/services/update_broker_job_spec.rb +++ b/spec/unit/jobs/v3/services/update_broker_job_spec.rb @@ -446,6 +446,33 @@ module V3 end end + context 'when database disconnects during state rollback' do + let(:catalog_error) { StandardError.new('Catalog fetch failed') } + + before do + allow_any_instance_of(VCAP::CloudController::V3::ServiceBrokerCatalogUpdater).to receive(:refresh).and_raise(catalog_error) + + mock_dataset = instance_double(Sequel::Postgres::Dataset) + allow(mock_dataset).to receive(:update).and_raise(Sequel::DatabaseDisconnectError.new('connection lost')) + + allow(ServiceBroker).to receive(:where).and_call_original + allow(ServiceBroker).to receive(:where).with(id: broker.id).and_return(mock_dataset) + end + + it 'swallows the database error and re-raises the original catalog error' do + expect { job.perform }.to raise_error(catalog_error) + end + + it 'does not raise a database connection error' do + expect { job.perform }.not_to raise_error(Sequel::DatabaseDisconnectError) + end + + it 'still cleans up the update request' do + expect { job.perform }.to raise_error(catalog_error) + expect(ServiceBrokerUpdateRequest.where(id: update_broker_request.id).all).to be_empty + end + end + context 'when the broker ceases to exist during the job' do it 'raises a ServiceBrokerGone error' do broker.destroy @@ -457,6 +484,65 @@ module V3 end end + describe '#recover_from_failure' do + let(:previous_state) { ServiceBrokerStateEnum::AVAILABLE } + + subject(:job) do + UpdateBrokerJob.new(update_broker_request.guid, broker.guid, previous_state, user_audit_info:) + end + + context 'when broker is in SYNCHRONIZING state' do + before do + broker.update(state: ServiceBrokerStateEnum::SYNCHRONIZING) + end + + it 'sets the broker to SYNCHRONIZATION_FAILED' do + job.recover_from_failure + + broker.reload + expect(broker.state).to eq(ServiceBrokerStateEnum::SYNCHRONIZATION_FAILED) + end + end + + context 'when broker is not in SYNCHRONIZING state' do + before do + broker.update(state: ServiceBrokerStateEnum::AVAILABLE) + end + + it 'does not change the broker state' do + job.recover_from_failure + + broker.reload + expect(broker.state).to eq(ServiceBrokerStateEnum::AVAILABLE) + end + end + + context 'when broker state has changed to something else' do + before do + broker.update(state: ServiceBrokerStateEnum::DELETE_IN_PROGRESS) + end + + it 'does not overwrite the newer state' do + job.recover_from_failure + + broker.reload + expect(broker.state).to eq(ServiceBrokerStateEnum::DELETE_IN_PROGRESS) + end + end + + context 'when database error occurs during recovery' do + before do + broker.update(state: ServiceBrokerStateEnum::SYNCHRONIZING) + allow(ServiceBroker).to receive(:where).and_raise(Sequel::DatabaseError.new('connection lost')) + allow(Steno).to receive(:logger).and_return(double(error: nil)) + end + + it 'logs the error and does not raise' do + expect { job.recover_from_failure }.not_to raise_error + end + end + end + def setup_broker_with_invalid_catalog catalog = instance_double(Services::ServiceBrokers::V2::Catalog) diff --git a/spec/unit/jobs/wrapping_job_spec.rb b/spec/unit/jobs/wrapping_job_spec.rb index f6a5801bd72..a6a2ae1e04a 100644 --- a/spec/unit/jobs/wrapping_job_spec.rb +++ b/spec/unit/jobs/wrapping_job_spec.rb @@ -198,6 +198,28 @@ module Jobs end end end + + describe '#recover_from_failure' do + context 'when the wrapped job has the recover_from_failure method defined' do + it 'delegates to the handler' do + handler = double('Job', recover_from_failure: nil) + job = WrappingJob.new(handler) + + expect(handler).to receive(:recover_from_failure) + job.recover_from_failure + end + end + + context 'when the wrapped job does not have the recover_from_failure method defined' do + it 'does not raise an exception' do + handler = Object.new + job = WrappingJob.new(handler) + expect do + job.recover_from_failure + end.not_to raise_error + end + end + end end end end