Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import logging
import re
from dataclasses import InitVar, dataclass
from typing import Any, Mapping, Optional, Union
Expand All @@ -19,6 +20,8 @@
from airbyte_cdk.sources.types import Config
from airbyte_cdk.utils import AirbyteTracedException

logger = logging.getLogger("airbyte")


@dataclass
class WaitTimeFromHeaderBackoffStrategy(BackoffStrategy):
Expand Down Expand Up @@ -57,6 +60,15 @@ def backoff_time(
header_value = None
if isinstance(response_or_exception, requests.Response):
header_value = get_numeric_value_from_header(response_or_exception, header, regex)
if header_value is not None:
logger.info(
"Rate limit header '%s' detected with value: %s (status code: %s)",
header,
header_value,
response_or_exception.status_code
if hasattr(response_or_exception, "status_code")
else "N/A",
)
if (
self.max_waiting_time_in_seconds
and header_value
Expand Down
27 changes: 27 additions & 0 deletions airbyte_cdk/sources/streams/http/http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,18 @@ def _handle_error_resolution(

# Emit stream status RUNNING with the reason RATE_LIMITED to log that the rate limit has been reached
if error_resolution.response_action == ResponseAction.RATE_LIMITED:
self._logger.info(
"Rate limited: emitting RATE_LIMITED stream status for stream '%s' (status code: %s, url: %s, error: %s)",
self._name,
response.status_code if response is not None else "N/A",
request.url,
error_resolution.error_message
or (
self._error_message_parser.parse_response_error_message(response)
if response is not None
else str(exc)
),
)
# TODO: Update to handle with message repository when concurrent message repository is ready
reasons = [AirbyteStreamStatusReason(type=AirbyteStreamStatusReasonType.RATE_LIMITED)]
message = orjson.dumps(
Expand Down Expand Up @@ -524,6 +536,21 @@ def _handle_error_resolution(
if backoff_time:
user_defined_backoff_time = backoff_time
break
if user_defined_backoff_time is not None:
self._logger.info(
"Retrying with backoff: waiting %.2f seconds (attempt %d, status code: %s, action: %s, url: %s, error: %s)",
user_defined_backoff_time,
self._request_attempt_count[request],
response.status_code if response is not None else "N/A",
error_resolution.response_action.value,
request.url,
error_resolution.error_message
or (
self._error_message_parser.parse_response_error_message(response)
if response is not None
else str(exc)
),
)
error_message = (
error_resolution.error_message
or f"Request to {request.url} failed with failure type {error_resolution.failure_type}, response action {error_resolution.response_action}."
Expand Down
Loading