diff --git a/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/wait_time_from_header_backoff_strategy.py b/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/wait_time_from_header_backoff_strategy.py index 5cda96a4d..e4e44883e 100644 --- a/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/wait_time_from_header_backoff_strategy.py +++ b/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/wait_time_from_header_backoff_strategy.py @@ -2,6 +2,7 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # +import logging import re from dataclasses import InitVar, dataclass from typing import Any, Mapping, Optional, Union @@ -19,6 +20,8 @@ from airbyte_cdk.sources.types import Config from airbyte_cdk.utils import AirbyteTracedException +logger = logging.getLogger("airbyte") + @dataclass class WaitTimeFromHeaderBackoffStrategy(BackoffStrategy): @@ -57,6 +60,15 @@ def backoff_time( header_value = None if isinstance(response_or_exception, requests.Response): header_value = get_numeric_value_from_header(response_or_exception, header, regex) + if header_value is not None: + logger.info( + "Rate limit header '%s' detected with value: %s (status code: %s)", + header, + header_value, + response_or_exception.status_code + if hasattr(response_or_exception, "status_code") + else "N/A", + ) if ( self.max_waiting_time_in_seconds and header_value diff --git a/airbyte_cdk/sources/streams/http/http_client.py b/airbyte_cdk/sources/streams/http/http_client.py index e7a5715ac..aed16b9fd 100644 --- a/airbyte_cdk/sources/streams/http/http_client.py +++ b/airbyte_cdk/sources/streams/http/http_client.py @@ -440,6 +440,18 @@ def _handle_error_resolution( # Emit stream status RUNNING with the reason RATE_LIMITED to log that the rate limit has been reached if error_resolution.response_action == ResponseAction.RATE_LIMITED: + self._logger.info( + "Rate limited: emitting RATE_LIMITED stream status for stream '%s' (status code: %s, url: %s, error: %s)", + self._name, + response.status_code if response is not None else "N/A", + request.url, + error_resolution.error_message + or ( + self._error_message_parser.parse_response_error_message(response) + if response is not None + else str(exc) + ), + ) # TODO: Update to handle with message repository when concurrent message repository is ready reasons = [AirbyteStreamStatusReason(type=AirbyteStreamStatusReasonType.RATE_LIMITED)] message = orjson.dumps( @@ -524,6 +536,21 @@ def _handle_error_resolution( if backoff_time: user_defined_backoff_time = backoff_time break + if user_defined_backoff_time is not None: + self._logger.info( + "Retrying with backoff: waiting %.2f seconds (attempt %d, status code: %s, action: %s, url: %s, error: %s)", + user_defined_backoff_time, + self._request_attempt_count[request], + response.status_code if response is not None else "N/A", + error_resolution.response_action.value, + request.url, + error_resolution.error_message + or ( + self._error_message_parser.parse_response_error_message(response) + if response is not None + else str(exc) + ), + ) error_message = ( error_resolution.error_message or f"Request to {request.url} failed with failure type {error_resolution.failure_type}, response action {error_resolution.response_action}."