Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
9fa67f6
[Cosmos] Honor max_item_count for query_items(feed_range=...)
tvaron3 Apr 22, 2026
2b36cc9
Suppress continuation token when feed_range page is truncated
tvaron3 Apr 22, 2026
3492e4d
[Cosmos] Fix /pkranges drain loop for containers with >8K PK ranges
tvaron3 May 30, 2026
6dbf1ab
Merge remote-tracking branch 'upstream/main' into users/tomasvaron/fi…
tvaron3 May 30, 2026
3a616b3
Port pkranges drain fix to sync path, add safety-bound 503, add pagin…
tvaron3 May 30, 2026
256bcd1
Cosmos: align A-IM header with peer SDKs + pagination integration tests
tvaron3 May 30, 2026
f70ed10
Cosmos: bump to 4.16.1 with pkranges hotfix; loosen timeout test bound
tvaron3 May 30, 2026
ff76a5b
Merge remote-tracking branch 'upstream/main' into users/tomasvaron/fi…
tvaron3 May 30, 2026
8930381
Cosmos: shorten 4.16.1 pkranges changelog entry
tvaron3 May 30, 2026
df1b062
Match peer SDKs: terminate /pkranges drain on literal HTTP 304
tvaron3 May 30, 2026
b555d5d
Restore defensive drain fallbacks for status-blind callers; add gap-c…
tvaron3 May 31, 2026
a54764d
Log warning when drain falls back to status-blind termination
tvaron3 May 31, 2026
a1e27a5
Remove status-blind drain fallback; tighten status_code contract
tvaron3 May 31, 2026
b541152
test(cosmos): relax pkranges drain integration assertion and add AAD …
tvaron3 May 31, 2026
b46fbec
fix(cosmos): address review feedback and wire status sidecar in split…
tvaron3 May 31, 2026
b8153ca
test(cosmos): remove gateway-incompatible drain integration tests; co…
tvaron3 May 31, 2026
f3a4b00
fix(cosmos): widen evaluate_drain_page status_code to Optional[int] t…
tvaron3 May 31, 2026
dae43f8
test(cosmos): make routing-map drain-loop unit-test mocks compatible …
tvaron3 May 31, 2026
c5b78b3
refactor(cosmos): address PR review feedback from @simorenoh
tvaron3 May 31, 2026
d428848
test: pin stale-etag cleanup contract instead of brittle call-count
tvaron3 May 31, 2026
1fc7493
chore: set 4.16.1 release date to 2026-05-31
tvaron3 May 31, 2026
a1028cc
test: rename hdrs -> request_headers to satisfy cspell
tvaron3 May 31, 2026
4220f58
Move AVG breaking change in changelog; drop brittle post-fallback dra…
tvaron3 Jun 1, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
## Release History

### 4.16.1 (2026-05-31)

#### Bugs Fixed
* Fixed a bug in the sync and async `/pkranges` change-feed refresh where some containers could fail to build a complete routing map. See [PR 47245](https://github.com/Azure/azure-sdk-for-python/pull/47245).

### 4.16.0 (2026-05-29)

#### Features Added
Expand All @@ -8,14 +13,14 @@

#### Breaking Changes
* `CosmosItemPaged.get_response_headers()` and `CosmosAsyncItemPaged.get_response_headers()` now return a single `CaseInsensitiveDict` (the latest page) instead of `List[CaseInsensitiveDict]` (introduced in 4.16.0b1); `get_last_response_headers()` has been removed. This avoids unbounded memory growth on large queries. **Migration:** code that previously accessed `headers[i]['x-ms-request-charge']` should switch to `headers['x-ms-request-charge']` for the latest page, or pass `response_hook=` to the query method to receive per-page headers as they arrive. See [PR 47172](https://github.com/Azure/azure-sdk-for-python/pull/47172).
* `SELECT VALUE AVG(...)` queries spanning multiple physical partitions now raise `ValueError` instead of returning a mathematically incorrect merged value from client-side aggregation. **Migration:** rewrite cross-partition `AVG` queries as `SUM(...) / COUNT(...)` (both of which merge correctly across partitions), or scope the query to a single partition via `partition_key=`. See [PR 47105](https://github.com/Azure/azure-sdk-for-python/pull/47105).

#### Bugs Fixed
* Fixed bug where the `Content-Length` HTTP request header was computed from the character count of the request body instead of its UTF-8 byte count. See [PR 47008](https://github.com/Azure/azure-sdk-for-python/pull/47008)
* Added an opt-in fallback for invalid UTF-8 in response bodies. Default behavior is unchanged (strict decode). Setting `AZURE_COSMOS_CHARSET_DECODER_ERROR_ACTION_ON_MALFORMED_INPUT` to `REPLACE` or `IGNORE` enables a permissive decode so reads, queries, and change-feed iteration can make progress past corrupt payloads. See [PR 47008](https://github.com/Azure/azure-sdk-for-python/pull/47008)
* Fixed bug where `CosmosClient` construction with AAD credentials would crash at startup if the semantic reranking inference endpoint environment variable was not set, even when semantic reranking was not being used. The inference service is now lazily initialized on first use. See [PR 46243](https://github.com/Azure/azure-sdk-for-python/pull/46243)
* Fixed bug where region names in `preferred_locations` and `excluded_locations` (client-level and per-request) were not matched tolerantly for differences in case, whitespace, hyphens, and underscores. See [PR 46937](https://github.com/Azure/azure-sdk-for-python/pull/46937)
* Fixed a bug in `query_items(feed_range=...)` where pagination could return incorrect results after a partition split caused the supplied feed range to overlap multiple physical partitions. See [PR 47105](https://github.com/Azure/azure-sdk-for-python/pull/47105)
* Fixed bug where `SELECT VALUE AVG(...)` queries spanning multiple physical partitions returned mathematically incorrect merged values from client-side aggregation. These queries now raise `ValueError`. See [PR 47105](https://github.com/Azure/azure-sdk-for-python/pull/47105)
* Fixed bug where a `ValueError("Ranges overlap")` or an `AssertionError("code bug: returned overlapping ranges ... is empty")` from the partition key range cache could escape to the caller when the `/pkranges` response contained a transiently inconsistent snapshot (overlap or gap). See [PR 47091](https://github.com/Azure/azure-sdk-for-python/pull/47091)

#### Other Changes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ def _handle_transient_snapshot_retry_decision(
)
raise CosmosHttpResponseError(
status_code=http_constants.StatusCodes.SERVICE_UNAVAILABLE,
sub_status=http_constants.SubStatusCodes.ROUTING_MAP_SNAPSHOT_INCONSISTENT,
message=(
"Routing-map fetch for collection '{}' returned overlapping "
"or gapped ranges on {} attempt(s)."
Expand Down Expand Up @@ -295,6 +296,86 @@ def _resolve_endpoint(client: Any) -> str:
return f"__unknown_{id(client)}__"




# ---------------------------------------------------------------------------
# /pkranges change-feed drain helpers (shared by sync + async providers)
# ---------------------------------------------------------------------------
Comment thread
tvaron3 marked this conversation as resolved.
#
# These helpers hoist the *pure decision logic* of the routing-map change-feed
# drain out of the sync and async providers so a future bug-fix lands in one
# place. The providers still own the I/O-shaped parts that genuinely differ:
# - sync uses ``ranges.extend(list(generator))``
# - async uses ``async for item in generator: ...``
# Everything else (per-page state transitions) lives here.


class _DrainPageDecision:
"""Outcome of evaluating a single /pkranges drain page."""

CONTINUE = "continue"
STOP_DRAINED = "stop_drained"


def evaluate_drain_page(
*,
page_new_etag: Optional[str],
current_if_none_match: Optional[str],
new_etag: Optional[str],
seen_any_etag: bool,
status_code: Optional[int],
) -> Tuple[str, Optional[str], Optional[str], bool]:
"""Decide whether to keep draining the /pkranges change feed.

Pure function: no I/O. The sole termination signal is literal HTTP
``304 Not Modified`` (matching Java, .NET v3, and Go). ``status_code``
is required: production callers wire it via the
``_internal_response_status_capture`` sidecar populated by
``_synchronized_request`` / ``_asynchronous_request`` before any
return, so it is always a concrete int by the time we land here.
There is intentionally no secondary safety net (e.g. a page cap)
here -- peer SDKs (.NET v3, Java, Go) all rely solely on the 304
termination predicate and we mirror that contract.

:keyword page_new_etag: ETag header from the current page response, if any.
:paramtype page_new_etag: str or None
:keyword current_if_none_match: The ``If-None-Match`` we sent for this page.
:paramtype current_if_none_match: str or None
:keyword new_etag: Running accumulator for the final etag to publish.
:paramtype new_etag: str or None
:keyword bool seen_any_etag: Whether the service has ever surfaced an ETag
across the drain so far.
:keyword status_code: HTTP status code of the page response. Required at runtime;
``None`` indicates the response-status sidecar was not wired by the caller and
raises ``RuntimeError``. Typed as ``Optional[int]`` so callers that read the
status from a sidecar list typed as ``List[Optional[int]]`` (whose first slot
is ``None`` until populated by ``_synchronized_request`` /
``_asynchronous_request``) satisfy mypy without an extra cast.
:paramtype status_code: int or None

:returns: ``(decision, new_etag, next_if_none_match, seen_any_etag)``.
``next_if_none_match`` is only meaningful when ``decision == CONTINUE``.
:rtype: tuple
"""
if status_code is None:
raise RuntimeError(
"evaluate_drain_page invoked with status_code=None. The /pkranges "
"drain loop requires the _internal_response_status_capture sidecar "
"to be wired by the caller; this indicates a programming error in "
"the routing-map provider."
)

if page_new_etag:
seen_any_etag = True
new_etag = page_new_etag

if status_code == http_constants.StatusCodes.NOT_MODIFIED:
return (_DrainPageDecision.STOP_DRAINED, new_etag, current_if_none_match, seen_any_etag)

next_inm = page_new_etag if page_new_etag else current_if_none_match
return (_DrainPageDecision.CONTINUE, new_etag, next_inm, seen_any_etag)


class _IncrementalMergeFailed(Exception):
"""Private exception type raised by :func:`process_fetched_ranges` when the
incremental update cannot resolve all partition key ranges.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
_OverlapDetected,
_GapDetected,
_handle_transient_snapshot_retry_decision,
_DrainPageDecision,
evaluate_drain_page,
)


Expand Down Expand Up @@ -334,6 +336,7 @@ async def get_routing_map(
return self._collection_routing_map_by_item.get(collection_id)


# pylint: disable=too-many-statements,too-many-locals
async def _fetch_routing_map(
self,
collection_link: str,
Expand Down Expand Up @@ -377,35 +380,84 @@ async def _fetch_routing_map(
inconsistency_attempt_count = 0

while True:
request_kwargs = dict(kwargs)
response_headers: CaseInsensitiveDict = CaseInsensitiveDict()
request_kwargs['_internal_response_headers_capture'] = response_headers

# Prepare sanitised options and headers for the PK-range fetch.
ranges: List[Dict[str, Any]] = []
# Start the change-feed drain at the previous map's etag (if any).
# On subsequent drain pages we advance this with the etag returned
# for the previous page so the service returns "what's new since X"
# until it eventually responds with 304 / no new ranges, mirroring
# the .NET and Go SDK behaviour.
current_if_none_match = (
current_previous_map.change_feed_etag if current_previous_map else None
)
new_etag = current_if_none_match
# Track whether the service ever surfaced an ETag header during this
# drain attempt. If it never did, we want ``process_fetched_ranges``
# to surface the "no ETag" observability warning rather than
# silently treating ``current_if_none_match`` as the fresh etag.
seen_any_etag = False

# Hoist: ``prepare_fetch_options_and_headers`` is loop-invariant
# for this drain attempt -- ``change_feed_options`` depends only on
# ``feed_options`` and the headers it builds depend only on
# ``current_previous_map.change_feed_etag``, neither of which
# change inside the inner drain loop. Compute them once here; the
# only per-page mutation is the ``If-None-Match`` override below.
base_kwargs_for_headers: Dict[str, Any] = dict(kwargs)
change_feed_options = prepare_fetch_options_and_headers(
current_previous_map, feed_options, request_kwargs
current_previous_map, feed_options, base_kwargs_for_headers
)
base_headers: Dict[str, Any] = base_kwargs_for_headers['headers']

ranges: List[Dict[str, Any]] = []
try:
pk_range_generator = self._document_client._ReadPartitionKeyRanges(
collection_link,
change_feed_options,
**request_kwargs
)
async for item in pk_range_generator:
ranges.append(item)

except CosmosHttpResponseError as e:
logger.error( # pylint: disable=do-not-log-exceptions-if-not-debug,do-not-log-raised-errors
"Failed to read partition key ranges for collection '%s': %s", collection_link, e)
raise
while True:
request_kwargs = dict(kwargs)
# Shallow-copy ``base_headers`` so the per-iter
# ``If-None-Match`` override does not bleed across iterations.
request_kwargs['headers'] = dict(base_headers)
response_headers: CaseInsensitiveDict = CaseInsensitiveDict()
request_kwargs['_internal_response_headers_capture'] = response_headers
# Sidecar list -- populated by _Request with the raw wire
# status. Lets us terminate on literal 304 (matching peer
# SDKs) instead of inferring it from an empty page.
status_capture: List[Optional[int]] = [None]
Comment thread
kushagraThapar marked this conversation as resolved.
request_kwargs['_internal_response_status_capture'] = status_capture

# Override If-None-Match with the running etag from the drain
# so each page advances. ``prepare_fetch_options_and_headers``
# only sets it from ``current_previous_map.change_feed_etag``
# which never advances during this drain.
drain_headers = request_kwargs['headers']
if current_if_none_match:
drain_headers[http_constants.HttpHeaders.IfNoneMatch] = current_if_none_match
else:
drain_headers.pop(http_constants.HttpHeaders.IfNoneMatch, None)

new_etag = response_headers.get(http_constants.HttpHeaders.ETag)
try:
pk_range_generator = self._document_client._ReadPartitionKeyRanges(
collection_link,
change_feed_options,
**request_kwargs
)
ranges.extend([item async for item in pk_range_generator])
except CosmosHttpResponseError as e:
logger.error( # pylint: disable=do-not-log-exceptions-if-not-debug,do-not-log-raised-errors
"Failed to read partition key ranges for collection '%s': %s",
collection_link, e)
raise

decision, new_etag, current_if_none_match, seen_any_etag = evaluate_drain_page(
page_new_etag=response_headers.get(http_constants.HttpHeaders.ETag),
current_if_none_match=current_if_none_match,
new_etag=new_etag,
seen_any_etag=seen_any_etag,
status_code=status_capture[0],
)
if decision == _DrainPageDecision.STOP_DRAINED:
break

try:
effective_new_etag = new_etag if seen_any_etag else None
return process_fetched_ranges(
ranges, current_previous_map, collection_id, collection_link, new_etag
ranges, current_previous_map, collection_id, collection_link, effective_new_etag
)
except _IncrementalMergeFailed:
if current_previous_map is not None and incomplete_attempt_count < _INCOMPLETE_ROUTING_MAP_MAX_RETRIES:
Expand Down
Loading
Loading