Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 5 additions & 55 deletions pymongo/asynchronous/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@
)

from pymongo import _csot
from pymongo.common import MAX_ADAPTIVE_RETRIES
from pymongo.errors import (
OperationFailure,
)
from pymongo.helpers_shared import _REAUTHENTICATION_REQUIRED_CODE
from pymongo.lock import _async_create_lock

_IS_SYNC = False

Expand Down Expand Up @@ -76,11 +76,8 @@ async def inner(*args: Any, **kwargs: Any) -> Any:
return cast(F, inner)


_MAX_RETRIES = 5
_BACKOFF_INITIAL = 0.1
_BACKOFF_MAX = 10
DEFAULT_RETRY_TOKEN_CAPACITY = 1000.0
DEFAULT_RETRY_TOKEN_RETURN = 0.1


def _backoff(
Expand All @@ -90,79 +87,32 @@ def _backoff(
return jitter * min(initial_delay * (2**attempt), max_delay)


class _TokenBucket:
"""A token bucket implementation for rate limiting."""

def __init__(
self,
capacity: float = DEFAULT_RETRY_TOKEN_CAPACITY,
return_rate: float = DEFAULT_RETRY_TOKEN_RETURN,
):
self.lock = _async_create_lock()
self.capacity = capacity
self.tokens = capacity
self.return_rate = return_rate

async def consume(self) -> bool:
"""Consume a token from the bucket if available."""
async with self.lock:
if self.tokens >= 1:
self.tokens -= 1
return True
return False

async def deposit(self, retry: bool = False) -> None:
"""Deposit a token back into the bucket."""
retry_token = 1 if retry else 0
async with self.lock:
self.tokens = min(self.capacity, self.tokens + retry_token + self.return_rate)


class _RetryPolicy:
"""A retry limiter that performs exponential backoff with jitter.

When adaptive retries are enabled, retry attempts are limited by a token bucket to prevent overwhelming the server during
a prolonged outage or high load.
"""
"""A retry limiter that performs exponential backoff with jitter."""

def __init__(
self,
token_bucket: _TokenBucket,
attempts: int = _MAX_RETRIES,
attempts: int = MAX_ADAPTIVE_RETRIES,
backoff_initial: float = _BACKOFF_INITIAL,
backoff_max: float = _BACKOFF_MAX,
adaptive_retry: bool = False,
):
self.token_bucket = token_bucket
self.attempts = attempts
self.backoff_initial = backoff_initial
self.backoff_max = backoff_max
self.adaptive_retry = adaptive_retry

async def record_success(self, retry: bool) -> None:
"""Record a successful operation."""
if self.adaptive_retry:
await self.token_bucket.deposit(retry)

def backoff(self, attempt: int) -> float:
"""Return the backoff duration for the given ."""
"""Return the backoff duration for the given attempt."""
return _backoff(max(0, attempt - 1), self.backoff_initial, self.backoff_max)

async def should_retry(self, attempt: int, delay: float) -> bool:
"""Return if we have budget to retry and how long to backoff."""
"""Return if we have retry attempts remaining and the next backoff would not exceed a timeout."""
if attempt > self.attempts:
return False

# If the delay would exceed the deadline, bail early before consuming a token.
if _csot.get_timeout():
if time.monotonic() + delay > _csot.get_deadline():
return False

# Check token bucket last since we only want to consume a token if we actually retry.
if self.adaptive_retry and not await self.token_bucket.consume():
# DRIVERS-3246 Improve diagnostics when this case happens.
# We could add info to the exception and log.
return False
return True


Expand Down
23 changes: 9 additions & 14 deletions pymongo/asynchronous/mongo_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@
from pymongo.asynchronous.command_cursor import AsyncCommandCursor
from pymongo.asynchronous.helpers import (
_RetryPolicy,
_TokenBucket,
)
from pymongo.asynchronous.settings import TopologySettings
from pymongo.asynchronous.topology import Topology, _ErrorContext
Expand Down Expand Up @@ -615,17 +614,17 @@ def __init__(
client to use Stable API. See `versioned API <https://www.mongodb.com/docs/manual/reference/stable-api/#what-is-the-stable-api--and-should-you-use-it->`_ for
details.

| **Adaptive retry options:**
| (If not enabled explicitly, adaptive retries will not be enabled.)
| **Overload retry options:**

- `adaptive_retries`: (boolean) Whether the adaptive retry mechanism is enabled for this client.
If enabled, server overload errors will use a token-bucket based system to mitigate further overload.
- `max_adaptive_retries`: (int) How many retries to allow for overload errors. Defaults to ``2``.
- `enable_overload_retargeting`: (boolean) Whether overload retargeting is enabled for this client.
If enabled, server overload errors will cause retry attempts to select a server that has not yet returned an overload error, if possible.
Defaults to ``False``.

.. seealso:: The MongoDB documentation on `connections <https://dochub.mongodb.org/core/connections>`_.

.. versionchanged:: 4.17
Added the ``adaptive_retries`` URI and keyword argument.
Added the ``max_adaptive_retries`` and ``enable_overload_retargeting`` URI and keyword arguments.
Comment on lines +617 to +627
Copy link

Copilot AI Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR description template is still unfilled (no summary of behavior changes, migration notes, or test plan). Given this PR changes retry semantics and public connection options, please update the PR description with the intended defaults, compatibility story (adaptiveRetries vs maxAdaptiveRetries), and how the change was validated.

Copilot uses AI. Check for mistakes.

.. versionchanged:: 4.5
Added the ``serverMonitoringMode`` keyword argument.
Expand Down Expand Up @@ -894,9 +893,7 @@ def __init__(
self._options.read_concern,
)

self._retry_policy = _RetryPolicy(
_TokenBucket(), adaptive_retry=self._options.adaptive_retries
)
self._retry_policy = _RetryPolicy(attempts=self._options.max_adaptive_retries)

self._init_based_on_options(self._seeds, srv_max_hosts, srv_service_name)

Expand Down Expand Up @@ -2822,7 +2819,6 @@ async def run(self) -> T:
self._check_last_error(check_csot=True)
try:
res = await self._read() if self._is_read else await self._write()
await self._retry_policy.record_success(self._attempt_number > 0)
# Track whether the transaction has completed a command.
# If we need to apply backpressure to the first command,
# we will need to revert back to starting state.
Expand Down Expand Up @@ -2930,10 +2926,9 @@ async def run(self) -> T:
transaction.set_starting()
transaction.attempt = 0

if (
self._server is not None
and self._client.topology_description.topology_type_name == "Sharded"
or exc.has_error_label("SystemOverloadedError")
if self._server is not None and (
self._client.topology_description.topology_type_name == "Sharded"
or (overloaded and self._client.options.enable_overload_retargeting)
):
self._deprioritized_servers.append(self._server)

Expand Down
29 changes: 21 additions & 8 deletions pymongo/client_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,10 +235,15 @@ def __init__(
self.__server_monitoring_mode = options.get(
"servermonitoringmode", common.SERVER_MONITORING_MODE
)
self.__adaptive_retries = (
options.get("adaptive_retries", common.ADAPTIVE_RETRIES)
if "adaptive_retries" in options
else options.get("adaptiveretries", common.ADAPTIVE_RETRIES)
self.__max_adaptive_retries = (
options.get("max_adaptive_retries", common.MAX_ADAPTIVE_RETRIES)
if "max_adaptive_retries" in options
else options.get("maxadaptiveretries", common.MAX_ADAPTIVE_RETRIES)
)
self.__enable_overload_retargeting = (
options.get("enable_overload_retargeting", common.ENABLE_OVERLOAD_RETARGETING)
if "enable_overload_retargeting" in options
else options.get("enableoverloadretargeting", common.ENABLE_OVERLOAD_RETARGETING)
)

@property
Expand Down Expand Up @@ -353,9 +358,17 @@ def server_monitoring_mode(self) -> str:
return self.__server_monitoring_mode

@property
def adaptive_retries(self) -> bool:
"""The configured adaptiveRetries option.
def max_adaptive_retries(self) -> int:
"""The configured maxAdaptiveRetries option.

.. versionadded:: 4.17
"""
return self.__max_adaptive_retries

@property
def enable_overload_retargeting(self) -> bool:
"""The configured enableOverloadRetargeting option.

.. versionadded:: 4.XX
.. versionadded:: 4.17
"""
return self.__adaptive_retries
return self.__enable_overload_retargeting
13 changes: 9 additions & 4 deletions pymongo/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,11 @@
# Default value for serverMonitoringMode
SERVER_MONITORING_MODE = "auto" # poll/stream/auto

# Default value for adaptiveRetries
ADAPTIVE_RETRIES = False
# Default value for max adaptive retries
MAX_ADAPTIVE_RETRIES = 2

# Default value for enableOverloadRetargeting
ENABLE_OVERLOAD_RETARGETING = False

# Auth mechanism properties that must raise an error instead of warning if they invalidate.
_MECH_PROP_MUST_RAISE = ["CANONICALIZE_HOST_NAME"]
Expand Down Expand Up @@ -741,7 +744,8 @@ def validate_server_monitoring_mode(option: str, value: str) -> str:
"srvmaxhosts": validate_non_negative_integer,
"timeoutms": validate_timeoutms,
"servermonitoringmode": validate_server_monitoring_mode,
"adaptiveretries": validate_boolean_or_string,
"maxadaptiveretries": validate_non_negative_integer,
"enableoverloadretargeting": validate_boolean_or_string,
}

# Dictionary where keys are the names of URI options specific to pymongo,
Expand Down Expand Up @@ -775,7 +779,8 @@ def validate_server_monitoring_mode(option: str, value: str) -> str:
"server_selector": validate_is_callable_or_none,
"auto_encryption_opts": validate_auto_encryption_opts_or_none,
"authoidcallowedhosts": validate_list,
"adaptive_retries": validate_boolean_or_string,
"max_adaptive_retries": validate_non_negative_integer,
"enable_overload_retargeting": validate_boolean_or_string,
}

# Dictionary where keys are any URI option name, and values are the
Expand Down
60 changes: 5 additions & 55 deletions pymongo/synchronous/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@
)

from pymongo import _csot
from pymongo.common import MAX_ADAPTIVE_RETRIES
from pymongo.errors import (
OperationFailure,
)
from pymongo.helpers_shared import _REAUTHENTICATION_REQUIRED_CODE
from pymongo.lock import _create_lock

_IS_SYNC = True

Expand Down Expand Up @@ -76,11 +76,8 @@ def inner(*args: Any, **kwargs: Any) -> Any:
return cast(F, inner)


_MAX_RETRIES = 5
_BACKOFF_INITIAL = 0.1
_BACKOFF_MAX = 10
DEFAULT_RETRY_TOKEN_CAPACITY = 1000.0
DEFAULT_RETRY_TOKEN_RETURN = 0.1


def _backoff(
Expand All @@ -90,79 +87,32 @@ def _backoff(
return jitter * min(initial_delay * (2**attempt), max_delay)


class _TokenBucket:
"""A token bucket implementation for rate limiting."""

def __init__(
self,
capacity: float = DEFAULT_RETRY_TOKEN_CAPACITY,
return_rate: float = DEFAULT_RETRY_TOKEN_RETURN,
):
self.lock = _create_lock()
self.capacity = capacity
self.tokens = capacity
self.return_rate = return_rate

def consume(self) -> bool:
"""Consume a token from the bucket if available."""
with self.lock:
if self.tokens >= 1:
self.tokens -= 1
return True
return False

def deposit(self, retry: bool = False) -> None:
"""Deposit a token back into the bucket."""
retry_token = 1 if retry else 0
with self.lock:
self.tokens = min(self.capacity, self.tokens + retry_token + self.return_rate)


class _RetryPolicy:
"""A retry limiter that performs exponential backoff with jitter.

When adaptive retries are enabled, retry attempts are limited by a token bucket to prevent overwhelming the server during
a prolonged outage or high load.
"""
"""A retry limiter that performs exponential backoff with jitter."""

def __init__(
self,
token_bucket: _TokenBucket,
attempts: int = _MAX_RETRIES,
attempts: int = MAX_ADAPTIVE_RETRIES,
backoff_initial: float = _BACKOFF_INITIAL,
backoff_max: float = _BACKOFF_MAX,
adaptive_retry: bool = False,
):
self.token_bucket = token_bucket
self.attempts = attempts
self.backoff_initial = backoff_initial
self.backoff_max = backoff_max
self.adaptive_retry = adaptive_retry

def record_success(self, retry: bool) -> None:
"""Record a successful operation."""
if self.adaptive_retry:
self.token_bucket.deposit(retry)

def backoff(self, attempt: int) -> float:
"""Return the backoff duration for the given ."""
"""Return the backoff duration for the given attempt."""
return _backoff(max(0, attempt - 1), self.backoff_initial, self.backoff_max)

def should_retry(self, attempt: int, delay: float) -> bool:
"""Return if we have budget to retry and how long to backoff."""
"""Return if we have retry attempts remaining and the next backoff would not exceed a timeout."""
if attempt > self.attempts:
return False

# If the delay would exceed the deadline, bail early before consuming a token.
if _csot.get_timeout():
if time.monotonic() + delay > _csot.get_deadline():
return False

# Check token bucket last since we only want to consume a token if we actually retry.
if self.adaptive_retry and not self.token_bucket.consume():
# DRIVERS-3246 Improve diagnostics when this case happens.
# We could add info to the exception and log.
return False
return True


Expand Down
Loading
Loading