From 37f64dc0fb67bb2544ce4aca8943d755fcfd27bc Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Thu, 7 May 2026 16:20:35 -0400 Subject: [PATCH 01/10] PYTHON-5631 - test_direct_client_maintains_pool_to_arbiter waits instead of asserting --- test/asynchronous/test_client.py | 2 +- test/test_client.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/test/asynchronous/test_client.py b/test/asynchronous/test_client.py index ca150ca6df..9023112b53 100644 --- a/test/asynchronous/test_client.py +++ b/test/asynchronous/test_client.py @@ -2715,7 +2715,7 @@ async def test_direct_client_maintains_pool_to_arbiter(self): await listener.async_wait_for_event(monitoring.ConnectionReadyEvent, 1) self.assertEqual(listener.event_count(monitoring.ConnectionCreatedEvent), 1) arbiter = c._topology.get_server_by_address(("c", 3)) - self.assertEqual(len(arbiter.pool.conns), 1) + await async_wait_until(lambda: len(arbiter.pool.conns) == 1, "create 1 pooled connection") # Arbiter pool is marked ready. self.assertEqual(listener.event_count(monitoring.PoolReadyEvent), 1) diff --git a/test/test_client.py b/test/test_client.py index 75d585fdad..fb400b6986 100644 --- a/test/test_client.py +++ b/test/test_client.py @@ -2670,7 +2670,7 @@ def test_direct_client_maintains_pool_to_arbiter(self): listener.wait_for_event(monitoring.ConnectionReadyEvent, 1) self.assertEqual(listener.event_count(monitoring.ConnectionCreatedEvent), 1) arbiter = c._topology.get_server_by_address(("c", 3)) - self.assertEqual(len(arbiter.pool.conns), 1) + wait_until(lambda: len(arbiter.pool.conns) == 1, "create 1 pooled connection") # Arbiter pool is marked ready. self.assertEqual(listener.event_count(monitoring.PoolReadyEvent), 1) From 32e391893b1f659205574f1d6352857015b4498c Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Mon, 11 May 2026 11:39:04 -0400 Subject: [PATCH 02/10] PYTHON-3923 - Handle socket closures in tests to avoid ResourceWarning --- pymongo/asynchronous/pool.py | 57 ++++++++++++---------- pymongo/pool_shared.py | 89 ++++++++++++++++++++--------------- pymongo/synchronous/pool.py | 57 ++++++++++++---------- pyproject.toml | 5 -- test/__init__.py | 2 + test/asynchronous/__init__.py | 2 + 6 files changed, 122 insertions(+), 90 deletions(-) diff --git a/pymongo/asynchronous/pool.py b/pymongo/asynchronous/pool.py index a5d5b28990..6f9aa3492d 100644 --- a/pymongo/asynchronous/pool.py +++ b/pymongo/asynchronous/pool.py @@ -1065,34 +1065,43 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A raise conn = AsyncConnection(networking_interface, self, self.address, conn_id, self.is_sdam) # type: ignore[arg-type] - async with self.lock: - self.active_contexts.add(conn.cancel_context) - self.active_contexts.discard(tmp_context) - if tmp_context.cancelled: - conn.cancel_context.cancel() - completed_hello = False try: - if not self.is_sdam: - await conn.hello() - completed_hello = True - self.is_writable = conn.is_writable - if handler: - handler.contribute_socket(conn, completed_handshake=False) - - await conn.authenticate() - # Catch KeyboardInterrupt, CancelledError, etc. and cleanup. - except BaseException as e: async with self.lock: - self.active_contexts.discard(conn.cancel_context) - if not completed_hello: - self._handle_connection_error(e) - await conn.close_conn(ConnectionClosedReason.ERROR) - raise + self.active_contexts.add(conn.cancel_context) + self.active_contexts.discard(tmp_context) + if tmp_context.cancelled: + conn.cancel_context.cancel() + completed_hello = False + try: + if not self.is_sdam: + await conn.hello() + completed_hello = True + self.is_writable = conn.is_writable + if handler: + handler.contribute_socket(conn, completed_handshake=False) + + await conn.authenticate() + # Catch KeyboardInterrupt, CancelledError, etc. and cleanup. + except BaseException as e: + async with self.lock: + self.active_contexts.discard(conn.cancel_context) + if not completed_hello: + self._handle_connection_error(e) + await conn.close_conn(ConnectionClosedReason.ERROR) + raise - if handler: - await handler.client._topology.receive_cluster_time(conn._cluster_time) + if handler: + await handler.client._topology.receive_cluster_time(conn._cluster_time) - return conn + return conn + # Catch cancellations that interrupt outside the inner try block above + except BaseException: + if not conn.closed: + try: + await conn.close_conn(ConnectionClosedReason.ERROR) + except BaseException: # noqa: S110 + pass + raise @contextlib.asynccontextmanager async def checkout( diff --git a/pymongo/pool_shared.py b/pymongo/pool_shared.py index a6f434885b..5de8d35455 100644 --- a/pymongo/pool_shared.py +++ b/pymongo/pool_shared.py @@ -207,6 +207,7 @@ async def _async_create_connection(address: _Address, options: PoolOptions) -> s sock = socket.socket(af, socktype, proto) # Fallback when SOCK_CLOEXEC isn't available. _set_non_inheritable_non_atomic(sock.fileno()) + sock_returned = False try: sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # CSOT: apply timeout to socket connect. @@ -223,14 +224,17 @@ async def _async_create_connection(address: _Address, options: PoolOptions) -> s asyncio.get_running_loop().sock_connect(sock, sa), timeout=timeout ) sock.settimeout(timeout) + sock_returned = True return sock except asyncio.TimeoutError as e: - sock.close() err = socket.timeout("timed out") err.__cause__ = e except OSError as e: - sock.close() err = e # type: ignore[assignment] + finally: + # Always close the socket if it wasn't returned to avoid leaks. + if not sock_returned: + sock.close() if err is not None: raise err @@ -307,48 +311,59 @@ async def _configured_protocol_interface( Sets protocol's SSL and timeout options. """ sock = await _async_create_connection(address, options) - ssl_context = options._ssl_context - timeout = options.socket_timeout + sock_adopted = False + try: + ssl_context = options._ssl_context + timeout = options.socket_timeout - if ssl_context is None: - return AsyncNetworkingInterface( - await asyncio.get_running_loop().create_connection( + if ssl_context is None: + result = await asyncio.get_running_loop().create_connection( lambda: PyMongoProtocol(timeout=timeout), sock=sock ) - ) + sock_adopted = True + return AsyncNetworkingInterface(result) - host = address[0] - try: - # We have to pass hostname / ip address to wrap_socket - # to use SSLContext.check_hostname. - transport, protocol = await asyncio.get_running_loop().create_connection( # type: ignore[call-overload] - lambda: PyMongoProtocol(timeout=timeout), - sock=sock, - server_hostname=host, - ssl=ssl_context, - ) - except _CertificateError: - # Raise _CertificateError directly like we do after match_hostname - # below. - raise - except (OSError, *SSLErrors) as exc: - # We raise AutoReconnect for transient and permanent SSL handshake - # failures alike. Permanent handshake failures, like protocol - # mismatch, will be turned into ServerSelectionTimeoutErrors later. - details = _get_timeout_details(options) - _raise_connection_failure(address, exc, "SSL handshake failed: ", timeout_details=details) - if ( - ssl_context.verify_mode - and not ssl_context.check_hostname - and not options.tls_allow_invalid_hostnames - ): + host = address[0] try: - ssl.match_hostname(transport.get_extra_info("peercert"), hostname=host) # type:ignore[attr-defined,unused-ignore] + # We have to pass hostname / ip address to wrap_socket + # to use SSLContext.check_hostname. + transport, protocol = await asyncio.get_running_loop().create_connection( # type: ignore[call-overload] + lambda: PyMongoProtocol(timeout=timeout), + sock=sock, + server_hostname=host, + ssl=ssl_context, + ) + sock_adopted = True except _CertificateError: - transport.abort() + # Raise _CertificateError directly like we do after match_hostname + # below. raise - - return AsyncNetworkingInterface((transport, protocol)) + except (OSError, *SSLErrors) as exc: + # We raise AutoReconnect for transient and permanent SSL handshake + # failures alike. Permanent handshake failures, like protocol + # mismatch, will be turned into ServerSelectionTimeoutErrors later. + details = _get_timeout_details(options) + _raise_connection_failure( + address, exc, "SSL handshake failed: ", timeout_details=details + ) + if ( + ssl_context.verify_mode + and not ssl_context.check_hostname + and not options.tls_allow_invalid_hostnames + ): + try: + ssl.match_hostname(transport.get_extra_info("peercert"), hostname=host) # type:ignore[attr-defined,unused-ignore] + except _CertificateError: + transport.abort() + raise + + return AsyncNetworkingInterface((transport, protocol)) + finally: + # If cancellation or any exception lands between sock creation and + # transport adoption, asyncio.create_connection has not registered + # cleanup for the sock — close it ourselves so it doesn't leak. + if not sock_adopted: + sock.close() def _create_connection(address: _Address, options: PoolOptions) -> socket.socket: diff --git a/pymongo/synchronous/pool.py b/pymongo/synchronous/pool.py index 25f2d08fe7..9fd0011b1f 100644 --- a/pymongo/synchronous/pool.py +++ b/pymongo/synchronous/pool.py @@ -1061,34 +1061,43 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect raise conn = Connection(networking_interface, self, self.address, conn_id, self.is_sdam) # type: ignore[arg-type] - with self.lock: - self.active_contexts.add(conn.cancel_context) - self.active_contexts.discard(tmp_context) - if tmp_context.cancelled: - conn.cancel_context.cancel() - completed_hello = False try: - if not self.is_sdam: - conn.hello() - completed_hello = True - self.is_writable = conn.is_writable - if handler: - handler.contribute_socket(conn, completed_handshake=False) - - conn.authenticate() - # Catch KeyboardInterrupt, CancelledError, etc. and cleanup. - except BaseException as e: with self.lock: - self.active_contexts.discard(conn.cancel_context) - if not completed_hello: - self._handle_connection_error(e) - conn.close_conn(ConnectionClosedReason.ERROR) - raise + self.active_contexts.add(conn.cancel_context) + self.active_contexts.discard(tmp_context) + if tmp_context.cancelled: + conn.cancel_context.cancel() + completed_hello = False + try: + if not self.is_sdam: + conn.hello() + completed_hello = True + self.is_writable = conn.is_writable + if handler: + handler.contribute_socket(conn, completed_handshake=False) + + conn.authenticate() + # Catch KeyboardInterrupt, CancelledError, etc. and cleanup. + except BaseException as e: + with self.lock: + self.active_contexts.discard(conn.cancel_context) + if not completed_hello: + self._handle_connection_error(e) + conn.close_conn(ConnectionClosedReason.ERROR) + raise - if handler: - handler.client._topology.receive_cluster_time(conn._cluster_time) + if handler: + handler.client._topology.receive_cluster_time(conn._cluster_time) - return conn + return conn + # Catch cancellations that interrupt outside the inner try block above + except BaseException: + if not conn.closed: + try: + conn.close_conn(ConnectionClosedReason.ERROR) + except BaseException: # noqa: S110 + pass + raise @contextlib.contextmanager def checkout( diff --git a/pyproject.toml b/pyproject.toml index 9b3287834a..dd8a4955d2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -108,11 +108,6 @@ filterwarnings = [ # pytest-asyncio known issue: https://github.com/pytest-dev/pytest-asyncio/issues/1032 "module:.*WindowsSelectorEventLoopPolicy:DeprecationWarning", "module:.*et_event_loop_policy:DeprecationWarning", - # TODO: Remove as part of PYTHON-3923. - "module:unclosed Date: Mon, 18 May 2026 10:50:15 -0400 Subject: [PATCH 03/10] Clarify comment --- pymongo/pool_shared.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pymongo/pool_shared.py b/pymongo/pool_shared.py index 5de8d35455..ebb84c3f87 100644 --- a/pymongo/pool_shared.py +++ b/pymongo/pool_shared.py @@ -224,6 +224,7 @@ async def _async_create_connection(address: _Address, options: PoolOptions) -> s asyncio.get_running_loop().sock_connect(sock, sa), timeout=timeout ) sock.settimeout(timeout) + # Set immediately before return. Do not insert an await between this and the return sock_returned = True return sock except asyncio.TimeoutError as e: From 20c5c6bfa972d34ece5a8270f4980db9c293482f Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Mon, 18 May 2026 11:38:22 -0400 Subject: [PATCH 04/10] Comment clarity --- pymongo/pool_shared.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pymongo/pool_shared.py b/pymongo/pool_shared.py index ebb84c3f87..1ad84478da 100644 --- a/pymongo/pool_shared.py +++ b/pymongo/pool_shared.py @@ -360,9 +360,10 @@ async def _configured_protocol_interface( return AsyncNetworkingInterface((transport, protocol)) finally: - # If cancellation or any exception lands between sock creation and + # If cancellation or any exception lands between socket creation and # transport adoption, asyncio.create_connection has not registered - # cleanup for the sock — close it ourselves so it doesn't leak. + # cleanup for the sock. + # Close it ourselves to prevent leaks. if not sock_adopted: sock.close() From eceb69faa78938c7fad651c1919f0f7a53cd3ab4 Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Mon, 18 May 2026 13:29:21 -0400 Subject: [PATCH 05/10] Fix pooling test leak --- test/asynchronous/test_pooling.py | 1 + test/test_pooling.py | 1 + 2 files changed, 2 insertions(+) diff --git a/test/asynchronous/test_pooling.py b/test/asynchronous/test_pooling.py index 9db9b5ab3a..6ae84be6e6 100644 --- a/test/asynchronous/test_pooling.py +++ b/test/asynchronous/test_pooling.py @@ -172,6 +172,7 @@ async def create_pool(self, pair=None, *args, **kwargs): kwargs["server_api"] = pool_options.server_api pool = Pool(pair, PoolOptions(*args, **kwargs)) await pool.ready() + self.addAsyncCleanup(pool.close) return pool diff --git a/test/test_pooling.py b/test/test_pooling.py index 95558d00d5..ed5fa40020 100644 --- a/test/test_pooling.py +++ b/test/test_pooling.py @@ -172,6 +172,7 @@ def create_pool(self, pair=None, *args, **kwargs): kwargs["server_api"] = pool_options.server_api pool = Pool(pair, PoolOptions(*args, **kwargs)) pool.ready() + self.addCleanup(pool.close) return pool From b01931495a240f7e86b4f66ff53aef59b08b701b Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Mon, 18 May 2026 14:17:33 -0400 Subject: [PATCH 06/10] Fix asyncio transport socket leak --- pymongo/pool_shared.py | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/pymongo/pool_shared.py b/pymongo/pool_shared.py index 1ad84478da..2e4522ea3f 100644 --- a/pymongo/pool_shared.py +++ b/pymongo/pool_shared.py @@ -312,15 +312,14 @@ async def _configured_protocol_interface( Sets protocol's SSL and timeout options. """ sock = await _async_create_connection(address, options) + ssl_context = options._ssl_context + timeout = options.socket_timeout + # Create the Protocol early to prevent asyncio resource leaks during cleanup path + protocol = PyMongoProtocol(timeout=timeout) sock_adopted = False try: - ssl_context = options._ssl_context - timeout = options.socket_timeout - if ssl_context is None: - result = await asyncio.get_running_loop().create_connection( - lambda: PyMongoProtocol(timeout=timeout), sock=sock - ) + result = await asyncio.get_running_loop().create_connection(lambda: protocol, sock=sock) sock_adopted = True return AsyncNetworkingInterface(result) @@ -328,8 +327,8 @@ async def _configured_protocol_interface( try: # We have to pass hostname / ip address to wrap_socket # to use SSLContext.check_hostname. - transport, protocol = await asyncio.get_running_loop().create_connection( # type: ignore[call-overload] - lambda: PyMongoProtocol(timeout=timeout), + transport, _ = await asyncio.get_running_loop().create_connection( # type: ignore[call-overload] + lambda: protocol, sock=sock, server_hostname=host, ssl=ssl_context, @@ -360,12 +359,13 @@ async def _configured_protocol_interface( return AsyncNetworkingInterface((transport, protocol)) finally: - # If cancellation or any exception lands between socket creation and - # transport adoption, asyncio.create_connection has not registered - # cleanup for the sock. - # Close it ourselves to prevent leaks. if not sock_adopted: - sock.close() + # If the protocol owns the transport, it also adopted the socket and needs to be cleaned up from the transport + if protocol.transport is not None: + protocol.transport.abort() + # Otherwise the socket was never adopted, close it directly + else: + sock.close() def _create_connection(address: _Address, options: PoolOptions) -> socket.socket: From 29ba515060edce015bef5f5f50e3d4aeaae9c2e6 Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Tue, 19 May 2026 12:56:39 -0400 Subject: [PATCH 07/10] Shield async connection closing --- pymongo/asynchronous/pool.py | 18 ++++++++++++------ pymongo/synchronous/pool.py | 18 ++++++++++++------ 2 files changed, 24 insertions(+), 12 deletions(-) diff --git a/pymongo/asynchronous/pool.py b/pymongo/asynchronous/pool.py index 6f9aa3492d..89157e9543 100644 --- a/pymongo/asynchronous/pool.py +++ b/pymongo/asynchronous/pool.py @@ -853,9 +853,12 @@ async def _reset( # publishing the PoolClearedEvent. if close: if not _IS_SYNC: - await asyncio.gather( - *[conn.close_conn(ConnectionClosedReason.POOL_CLOSED) for conn in sockets], # type: ignore[func-returns-value] - return_exceptions=True, + # Shield the closing of connections to avoid leaks + await asyncio.shield( + asyncio.gather( + *[conn.close_conn(ConnectionClosedReason.POOL_CLOSED) for conn in sockets], # type: ignore[func-returns-value] + return_exceptions=True, + ) ) else: for conn in sockets: @@ -890,9 +893,12 @@ async def _reset( interrupt_connections=interrupt_connections, ) if not _IS_SYNC: - await asyncio.gather( - *[conn.close_conn(ConnectionClosedReason.STALE) for conn in sockets], # type: ignore[func-returns-value] - return_exceptions=True, + # Shield the closing of connections to avoid leaks + await asyncio.shield( + asyncio.gather( + *[conn.close_conn(ConnectionClosedReason.STALE) for conn in sockets], # type: ignore[func-returns-value] + return_exceptions=True, + ) ) else: for conn in sockets: diff --git a/pymongo/synchronous/pool.py b/pymongo/synchronous/pool.py index 9fd0011b1f..a3790cd9c5 100644 --- a/pymongo/synchronous/pool.py +++ b/pymongo/synchronous/pool.py @@ -851,9 +851,12 @@ def _reset( # publishing the PoolClearedEvent. if close: if not _IS_SYNC: - asyncio.gather( - *[conn.close_conn(ConnectionClosedReason.POOL_CLOSED) for conn in sockets], # type: ignore[func-returns-value] - return_exceptions=True, + # Shield the closing of connections to avoid leaks + asyncio.shield( + asyncio.gather( + *[conn.close_conn(ConnectionClosedReason.POOL_CLOSED) for conn in sockets], # type: ignore[func-returns-value] + return_exceptions=True, + ) ) else: for conn in sockets: @@ -888,9 +891,12 @@ def _reset( interrupt_connections=interrupt_connections, ) if not _IS_SYNC: - asyncio.gather( - *[conn.close_conn(ConnectionClosedReason.STALE) for conn in sockets], # type: ignore[func-returns-value] - return_exceptions=True, + # Shield the closing of connections to avoid leaks + asyncio.shield( + asyncio.gather( + *[conn.close_conn(ConnectionClosedReason.STALE) for conn in sockets], # type: ignore[func-returns-value] + return_exceptions=True, + ) ) else: for conn in sockets: From 467144c2900b418a9b321a1e633c99041d46ce41 Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Tue, 19 May 2026 15:12:14 -0400 Subject: [PATCH 08/10] EG malloc --- .evergreen/run-tests.sh | 1 + pymongo/asynchronous/pool.py | 46 ++++++++++++++++++++++++------------ pymongo/synchronous/pool.py | 46 ++++++++++++++++++++++++------------ 3 files changed, 63 insertions(+), 30 deletions(-) diff --git a/.evergreen/run-tests.sh b/.evergreen/run-tests.sh index 0785bcf01d..0daf97c6d0 100755 --- a/.evergreen/run-tests.sh +++ b/.evergreen/run-tests.sh @@ -39,6 +39,7 @@ trap "cleanup_tests" SIGINT ERR # Start the test runner. echo "Running tests with UV_PYTHON=${UV_PYTHON:-}..." echo "UV_ARGS=${UV_ARGS}" +export PYTHONTRACEMALLOC=20 uv run ${UV_ARGS} --reinstall-package pymongo .evergreen/scripts/run_tests.py "$@" echo "Running tests with UV_PYTHON=${UV_PYTHON:-}... done." diff --git a/pymongo/asynchronous/pool.py b/pymongo/asynchronous/pool.py index 89157e9543..65a6b4c8bc 100644 --- a/pymongo/asynchronous/pool.py +++ b/pymongo/asynchronous/pool.py @@ -573,8 +573,18 @@ async def _close_conn(self) -> None: # shutdown. try: await self.conn.close() - except Exception: # noqa: S110 - pass + except BaseException as exc: + # Force-abort the underlying transport so the socket fd is + # released even if the graceful close raised or was cancelled + # before reaching transport.abort(). + transport = getattr(self.conn.get_conn, "transport", None) + if transport is not None: + try: + transport.abort() + except Exception: # noqa: S110 + pass + if not isinstance(exc, Exception): + raise def conn_closed(self) -> bool: """Return True if we know socket has been closed, False otherwise.""" @@ -853,12 +863,9 @@ async def _reset( # publishing the PoolClearedEvent. if close: if not _IS_SYNC: - # Shield the closing of connections to avoid leaks - await asyncio.shield( - asyncio.gather( - *[conn.close_conn(ConnectionClosedReason.POOL_CLOSED) for conn in sockets], # type: ignore[func-returns-value] - return_exceptions=True, - ) + await asyncio.gather( + *[conn.close_conn(ConnectionClosedReason.POOL_CLOSED) for conn in sockets], # type: ignore[func-returns-value] + return_exceptions=True, ) else: for conn in sockets: @@ -893,12 +900,9 @@ async def _reset( interrupt_connections=interrupt_connections, ) if not _IS_SYNC: - # Shield the closing of connections to avoid leaks - await asyncio.shield( - asyncio.gather( - *[conn.close_conn(ConnectionClosedReason.STALE) for conn in sockets], # type: ignore[func-returns-value] - return_exceptions=True, - ) + await asyncio.gather( + *[conn.close_conn(ConnectionClosedReason.STALE) for conn in sockets], # type: ignore[func-returns-value] + return_exceptions=True, ) else: for conn in sockets: @@ -1070,7 +1074,19 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A _raise_connection_failure(self.address, error, timeout_details=details) raise - conn = AsyncConnection(networking_interface, self, self.address, conn_id, self.is_sdam) # type: ignore[arg-type] + try: + conn = AsyncConnection(networking_interface, self, self.address, conn_id, self.is_sdam) # type: ignore[arg-type] + except BaseException: + # Release the networking_interface's transport if AsyncConnection + # construction failed, since no AsyncConnection exists yet to + # close_conn() through the outer cleanup path. + transport = getattr(networking_interface.get_conn, "transport", None) + if transport is not None: + try: + transport.abort() + except Exception: # noqa: S110 + pass + raise try: async with self.lock: self.active_contexts.add(conn.cancel_context) diff --git a/pymongo/synchronous/pool.py b/pymongo/synchronous/pool.py index a3790cd9c5..7ecfe53cfe 100644 --- a/pymongo/synchronous/pool.py +++ b/pymongo/synchronous/pool.py @@ -571,8 +571,18 @@ def _close_conn(self) -> None: # shutdown. try: self.conn.close() - except Exception: # noqa: S110 - pass + except BaseException as exc: + # Force-abort the underlying transport so the socket fd is + # released even if the graceful close raised or was cancelled + # before reaching transport.abort(). + transport = getattr(self.conn.get_conn, "transport", None) + if transport is not None: + try: + transport.abort() + except Exception: # noqa: S110 + pass + if not isinstance(exc, Exception): + raise def conn_closed(self) -> bool: """Return True if we know socket has been closed, False otherwise.""" @@ -851,12 +861,9 @@ def _reset( # publishing the PoolClearedEvent. if close: if not _IS_SYNC: - # Shield the closing of connections to avoid leaks - asyncio.shield( - asyncio.gather( - *[conn.close_conn(ConnectionClosedReason.POOL_CLOSED) for conn in sockets], # type: ignore[func-returns-value] - return_exceptions=True, - ) + asyncio.gather( + *[conn.close_conn(ConnectionClosedReason.POOL_CLOSED) for conn in sockets], # type: ignore[func-returns-value] + return_exceptions=True, ) else: for conn in sockets: @@ -891,12 +898,9 @@ def _reset( interrupt_connections=interrupt_connections, ) if not _IS_SYNC: - # Shield the closing of connections to avoid leaks - asyncio.shield( - asyncio.gather( - *[conn.close_conn(ConnectionClosedReason.STALE) for conn in sockets], # type: ignore[func-returns-value] - return_exceptions=True, - ) + asyncio.gather( + *[conn.close_conn(ConnectionClosedReason.STALE) for conn in sockets], # type: ignore[func-returns-value] + return_exceptions=True, ) else: for conn in sockets: @@ -1066,7 +1070,19 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect _raise_connection_failure(self.address, error, timeout_details=details) raise - conn = Connection(networking_interface, self, self.address, conn_id, self.is_sdam) # type: ignore[arg-type] + try: + conn = Connection(networking_interface, self, self.address, conn_id, self.is_sdam) # type: ignore[arg-type] + except BaseException: + # Release the networking_interface's transport if Connection + # construction failed, since no Connection exists yet to + # close_conn() through the outer cleanup path. + transport = getattr(networking_interface.get_conn, "transport", None) + if transport is not None: + try: + transport.abort() + except Exception: # noqa: S110 + pass + raise try: with self.lock: self.active_contexts.add(conn.cancel_context) From ffc93b85799ff3aee3494d1ccc5d9e8f37e29ee2 Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Wed, 20 May 2026 12:06:55 -0400 Subject: [PATCH 09/10] Fix leaking monitor pools --- .evergreen/run-tests.sh | 16 +++ pymongo/asynchronous/monitor.py | 26 ++-- pymongo/asynchronous/pool.py | 209 +++++++++++++++++++++++++------ pymongo/asynchronous/server.py | 11 +- pymongo/asynchronous/topology.py | 24 +++- pymongo/synchronous/monitor.py | 26 ++-- pymongo/synchronous/pool.py | 109 ++++++++++------ pymongo/synchronous/server.py | 11 +- pymongo/synchronous/topology.py | 24 +++- 9 files changed, 357 insertions(+), 99 deletions(-) diff --git a/.evergreen/run-tests.sh b/.evergreen/run-tests.sh index 0daf97c6d0..fb66800157 100755 --- a/.evergreen/run-tests.sh +++ b/.evergreen/run-tests.sh @@ -40,6 +40,22 @@ trap "cleanup_tests" SIGINT ERR echo "Running tests with UV_PYTHON=${UV_PYTHON:-}..." echo "UV_ARGS=${UV_ARGS}" export PYTHONTRACEMALLOC=20 +# PYTHON-3923 leak diagnostic: log AsyncConnection lifecycle to a file that +# bypasses pytest stdio capture. Reset the file before each run so we only +# see traces from this run. +export PYMONGO_LEAK_TRACE_FILE=/tmp/pymongo_leak_trace.log +: > "$PYMONGO_LEAK_TRACE_FILE" || true + +# Always dump the trace file on script exit (pass or fail) so CI captures it. +dump_leak_trace() { + if [ -s "$PYMONGO_LEAK_TRACE_FILE" ]; then + echo "=================== PYMONGO_LEAK_TRACE BEGIN ===================" + cat "$PYMONGO_LEAK_TRACE_FILE" + echo "==================== PYMONGO_LEAK_TRACE END ====================" + fi +} +trap "dump_leak_trace" EXIT + uv run ${UV_ARGS} --reinstall-package pymongo .evergreen/scripts/run_tests.py "$@" echo "Running tests with UV_PYTHON=${UV_PYTHON:-}... done." diff --git a/pymongo/asynchronous/monitor.py b/pymongo/asynchronous/monitor.py index 45c12b219f..8f6322761f 100644 --- a/pymongo/asynchronous/monitor.py +++ b/pymongo/asynchronous/monitor.py @@ -200,10 +200,21 @@ async def join(self) -> None: async def close(self) -> None: self.gc_safe_close() - await self._rtt_monitor.close() - # Increment the generation and maybe close the socket. If the executor - # thread has the socket checked out, it will be closed when checked in. - await self._reset_connection() + # Run rtt_monitor.close() and self._pool.close() independently so a + # failure in rtt cleanup does not skip the monitor pool's close, which + # would otherwise orphan its conn deque. + rtt_error: Optional[BaseException] = None + try: + await self._rtt_monitor.close() + except BaseException as exc: + rtt_error = exc + # Close the monitor pool. This both closes the conn in the deque + # and marks the pool CLOSED, so any in-flight check_once that checks + # the conn back in will close it via close_conn(POOL_CLOSED) rather + # than returning it to the deque of a pool that is about to be GC'd. + await self._pool.close() + if rtt_error is not None: + raise rtt_error async def _reset_connection(self) -> None: # Clear our pooled connection. @@ -456,9 +467,10 @@ def __init__(self, topology: Topology, topology_settings: TopologySettings, pool async def close(self) -> None: self.gc_safe_close() - # Increment the generation and maybe close the socket. If the executor - # thread has the socket checked out, it will be closed when checked in. - await self._pool.reset() + # Close the RTT monitor pool. If the executor task has the socket + # checked out, checkin will close it via close_conn(POOL_CLOSED) + # because the pool is now CLOSED. + await self._pool.close() async def add_sample(self, sample: float) -> None: """Add a RTT sample.""" diff --git a/pymongo/asynchronous/pool.py b/pymongo/asynchronous/pool.py index 65a6b4c8bc..b964bfda2e 100644 --- a/pymongo/asynchronous/pool.py +++ b/pymongo/asynchronous/pool.py @@ -112,6 +112,22 @@ _IS_SYNC = False +# Diagnostic for PYTHON-3923: tag each AsyncConnection lifecycle event so we +# can identify which conns are created but never reach _close_conn. +# Writes to a file so pytest's stdio capture (per-test, only-on-failure) does +# not swallow the trace. Path is configurable via PYMONGO_LEAK_TRACE_FILE; +# default is "/tmp/pymongo_leak_trace.log". Remove once the leak is identified. +_LEAK_TRACE_PATH = os.environ.get("PYMONGO_LEAK_TRACE_FILE", "/tmp/pymongo_leak_trace.log") + + +def _leak_trace(msg: str) -> None: + try: + with open(_LEAK_TRACE_PATH, "a") as _f: # noqa: S108 + _f.write(f"PYMONGO_LEAK_TRACE: t={time.monotonic():.4f} pid={os.getpid()} {msg}\n") + except Exception: # noqa: S110 + pass + + class AsyncConnection: """Store a connection with some metadata. @@ -180,6 +196,48 @@ def __init__( self.creation_time = time.monotonic() # For gossiping $clusterTime from the connection handshake to the client. self._cluster_time = None + # NOTE: the parameter `id` shadows the built-in id() in this scope, so + # we must not call id(self) here. Use repr() or skip obj_id. + _leak_trace( + f"created conn_id={self.id} addr={self.address} is_sdam={self.is_sdam}" + ) + + def __del__(self) -> None: + # Defense-in-depth: if an AsyncConnection is GC'd without + # close_conn() having run (e.g. its parent Pool was discarded + # before reset/close, or a Pool._reset gather was cancelled + # before any close_conn task got CPU), abort the transport so + # the socket fd is released and the unclosed-transport + # ResourceWarning is not raised. + try: + if not self.closed: + # Probe transport state so we can tell whether Part A's + # sync-abort in Pool._reset already released the fd + # (state=closing) or this is a genuine untouched leak + # (state=open). + state = "unknown" + try: + transport = self.conn.get_conn.transport + if transport is None: + state = "none" + elif transport.is_closing(): + state = "closing" + else: + state = "open" + except Exception: # noqa: S110 + pass + _leak_trace( + f"GC-WITHOUT-CLOSE conn_id={self.id} addr={self.address} " + f"is_sdam={self.is_sdam} transport={state}" + ) + try: + transport = self.conn.get_conn.transport + if transport is not None: + transport.abort() + except Exception: # noqa: S110 + pass + except Exception: # noqa: S110 + pass def set_conn_timeout(self, timeout: Optional[float]) -> None: """Cache last timeout to avoid duplicate calls to conn.settimeout.""" @@ -565,6 +623,23 @@ async def close_conn(self, reason: Optional[str]) -> None: async def _close_conn(self) -> None: """Close this connection.""" + _leak_trace( + f"_close_conn conn_id={self.id} addr={self.address} " + f"already_closed={self.closed} obj_id={id(self):x}" + ) + # Force-abort the underlying transport first so the socket fd is + # released even if a previous _close_conn already set self.closed + # but didn't reach transport.abort() (e.g. cancelled mid-close), or + # if the graceful close path below raises. transport.abort() is + # idempotent — _SelectorTransport._force_close returns early when + # _conn_lost is already set. + if not _IS_SYNC: + transport = self.conn.get_conn.transport + if transport is not None: + try: + transport.abort() + except Exception: # noqa: S110 + pass if self.closed: return self.closed = True @@ -573,18 +648,8 @@ async def _close_conn(self) -> None: # shutdown. try: await self.conn.close() - except BaseException as exc: - # Force-abort the underlying transport so the socket fd is - # released even if the graceful close raised or was cancelled - # before reaching transport.abort(). - transport = getattr(self.conn.get_conn, "transport", None) - if transport is not None: - try: - transport.abort() - except Exception: # noqa: S110 - pass - if not isinstance(exc, Exception): - raise + except Exception: # noqa: S110 + pass def conn_closed(self) -> bool: """Return True if we know socket has been closed, False otherwise.""" @@ -846,6 +911,12 @@ async def _reset( keep.append(conn) sockets = discard self.conns = keep + _leak_trace( + f"_reset close={close} addr={self.address} " + f"deque_size={len(sockets)} " + f"conn_ids={[c.id for c in sockets]} " + f"obj_ids={[hex(id(c))[2:] for c in sockets]}" + ) if close: self.state = PoolState.CLOSED @@ -857,6 +928,23 @@ async def _reset( for context in self.active_contexts: context.cancel() + # Synchronously abort the transports of all snapshotted conns. This + # releases the socket fd and schedules _call_connection_lost before + # any await. If the gather below is cancelled (e.g. by test + # teardown propagating a CancelledError into _reset), the inner + # close_conn() Tasks are cancelled before their body runs, so the + # transport.abort() inside _close_conn() never fires and the + # snapshotted conns leak. transport.abort() is idempotent — the + # close_conn coroutines below remain safe to run. + if not _IS_SYNC: + for conn in sockets: + try: + transport = conn.conn.get_conn.transport + if transport is not None: + transport.abort() + except Exception: # noqa: S110 + pass + listeners = self.opts._event_listeners # CMAP spec says that close() MUST close sockets before publishing the # PoolClosedEvent but that reset() SHOULD close sockets *after* @@ -979,18 +1067,31 @@ async def remove_stale_sockets(self, reference_generation: int) -> None: self._pending += 1 incremented = True conn = await self.connect() - close_conn = False - async with self.lock: - # Close connection and return if the pool was reset during - # socket creation or while acquiring the pool lock. - if self.gen.get_overall() != reference_generation: - close_conn = True - if not close_conn: - self.conns.appendleft(conn) - self.active_contexts.discard(conn.cancel_context) - if close_conn: - await conn.close_conn(ConnectionClosedReason.STALE) - return + try: + close_conn = False + async with self.lock: + # Close connection and return if the pool was reset during + # socket creation or while acquiring the pool lock. + if self.gen.get_overall() != reference_generation: + close_conn = True + if not close_conn: + self.conns.appendleft(conn) + self.active_contexts.discard(conn.cancel_context) + if close_conn: + await conn.close_conn(ConnectionClosedReason.STALE) + return + except BaseException: + # If cancellation or any other exception lands between + # connect() returning and the conn being appended to the + # deque (or closed via stale-generation), the conn would + # otherwise be orphaned. Close it explicitly to release + # the underlying socket. + if not conn.closed: + try: + await conn.close_conn(ConnectionClosedReason.ERROR) + except BaseException: # noqa: S110 + pass + raise finally: if incremented: # Notify after adding the socket to the pool. @@ -1086,6 +1187,8 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A transport.abort() except Exception: # noqa: S110 pass + async with self.lock: + self.active_contexts.discard(tmp_context) raise try: async with self.lock: @@ -1160,26 +1263,30 @@ async def checkout( conn = await self._get_conn(checkout_started_time, handler=handler) - duration = time.monotonic() - checkout_started_time - if self.enabled_for_cmap: - assert listeners is not None - listeners.publish_connection_checked_out(self.address, conn.id, duration) - if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - message=_ConnectionStatusMessage.CHECKOUT_SUCCEEDED, - clientId=self._client_id, - serverHost=self.address[0], - serverPort=self.address[1], - driverConnectionId=conn.id, - durationMS=duration, - ) try: + duration = time.monotonic() - checkout_started_time + if self.enabled_for_cmap: + assert listeners is not None + listeners.publish_connection_checked_out(self.address, conn.id, duration) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _CONNECTION_LOGGER, + message=_ConnectionStatusMessage.CHECKOUT_SUCCEEDED, + clientId=self._client_id, + serverHost=self.address[0], + serverPort=self.address[1], + driverConnectionId=conn.id, + durationMS=duration, + ) async with self.lock: self.active_contexts.add(conn.cancel_context) yield conn # Catch KeyboardInterrupt, CancelledError, etc. and cleanup. - except BaseException: + except BaseException as _checkout_exc: + _leak_trace( + f"checkout-except conn_id={conn.id} addr={self.address} " + f"obj_id={id(conn):x} exc={type(_checkout_exc).__name__}" + ) # Exception in caller. Ensure the connection gets returned. # Note that when pinned is True, the session owns the # connection and it is responsible for checking the connection @@ -1191,7 +1298,20 @@ async def checkout( exc_type, exc_val, _ = sys.exc_info() await handler.handle(exc_type, exc_val) if not pinned and conn.active: - await self.checkin(conn) + try: + await self.checkin(conn) + except BaseException: + # If checkin is interrupted (e.g., cancellation during a + # lock acquire), the conn is left neither in the deque + # nor closed. Force-abort the transport so the socket fd + # is released instead of leaking on GC. + transport = getattr(conn.conn.get_conn, "transport", None) + if transport is not None: + try: + transport.abort() + except Exception: # noqa: S110 + pass + raise raise if conn.pinned_txn: async with self.lock: @@ -1364,6 +1484,10 @@ async def checkin(self, conn: AsyncConnection) -> None: :param conn: The connection to check into the pool. """ + _leak_trace( + f"checkin-enter conn_id={conn.id} addr={self.address} " + f"pool_closed={self.closed} conn_closed={conn.closed} obj_id={id(conn):x}" + ) txn = conn.pinned_txn cursor = conn.pinned_cursor conn.active = False @@ -1423,6 +1547,11 @@ async def checkin(self, conn: AsyncConnection) -> None: self._max_connecting_cond.notify() if close_conn: await conn.close_conn(ConnectionClosedReason.STALE) + else: + _leak_trace( + f"checkin-appendleft conn_id={conn.id} addr={self.address} " + f"obj_id={id(conn):x}" + ) async with self.size_cond: if txn: diff --git a/pymongo/asynchronous/server.py b/pymongo/asynchronous/server.py index f212306174..029f649277 100644 --- a/pymongo/asynchronous/server.py +++ b/pymongo/asynchronous/server.py @@ -114,8 +114,17 @@ async def close(self) -> None: serverPort=self._description.address[1], ) - await self._monitor.close() + # Run monitor.close() and pool.close() independently so a failure in + # one (e.g. monitor's rtt_monitor.close() raising) does not skip the + # other and orphan the server pool's connections. + monitor_error: Optional[BaseException] = None + try: + await self._monitor.close() + except BaseException as exc: + monitor_error = exc await self._pool.close() + if monitor_error is not None: + raise monitor_error def request_check(self) -> None: """Check the server's state soon.""" diff --git a/pymongo/asynchronous/topology.py b/pymongo/asynchronous/topology.py index 01e346bfa8..afcdca0ac7 100644 --- a/pymongo/asynchronous/topology.py +++ b/pymongo/asynchronous/topology.py @@ -722,8 +722,16 @@ async def close(self) -> None: """ async with self._lock: old_td = self._description + first_close_error: Optional[BaseException] = None for server in self._servers.values(): - await server.close() + # Each server.close must run independently. A failure on one + # server (e.g. its monitor's cleanup raising) must not skip + # close() on the remaining servers, or their pool conns leak. + try: + await server.close() + except BaseException as exc: + if first_close_error is None: + first_close_error = exc if not _IS_SYNC: self._monitor_tasks.append(server._monitor) @@ -783,6 +791,11 @@ async def close(self) -> None: await self.__events_executor.join(1) process_events_queue(weakref.ref(self._events)) # type: ignore[arg-type] + # Re-raise the first server.close() error (if any) after we've done + # everything we can to clean up the rest of the topology. + if first_close_error is not None: + raise first_close_error + @property def description(self) -> TopologyDescription: return self._description @@ -981,7 +994,14 @@ async def _update_servers(self) -> None: for address, server in list(self._servers.items()): if not self._description.has_server(address): - await server.close() + # Each server.close must run independently. If one server's + # cleanup raises, we still need to remove it from _servers and + # add its monitor to _monitor_tasks, and we must process the + # remaining servers, otherwise their pool conns leak. + try: + await server.close() + except BaseException: # noqa: S110 + pass if not _IS_SYNC: self._monitor_tasks.append(server._monitor) self._servers.pop(address) diff --git a/pymongo/synchronous/monitor.py b/pymongo/synchronous/monitor.py index f395588814..3feecc042e 100644 --- a/pymongo/synchronous/monitor.py +++ b/pymongo/synchronous/monitor.py @@ -198,10 +198,21 @@ def join(self) -> None: def close(self) -> None: self.gc_safe_close() - self._rtt_monitor.close() - # Increment the generation and maybe close the socket. If the executor - # thread has the socket checked out, it will be closed when checked in. - self._reset_connection() + # Run rtt_monitor.close() and self._pool.close() independently so a + # failure in rtt cleanup does not skip the monitor pool's close, which + # would otherwise orphan its conn deque. + rtt_error: Optional[BaseException] = None + try: + self._rtt_monitor.close() + except BaseException as exc: + rtt_error = exc + # Close the monitor pool. This both closes the conn in the deque + # and marks the pool CLOSED, so any in-flight check_once that checks + # the conn back in will close it via close_conn(POOL_CLOSED) rather + # than returning it to the deque of a pool that is about to be GC'd. + self._pool.close() + if rtt_error is not None: + raise rtt_error def _reset_connection(self) -> None: # Clear our pooled connection. @@ -454,9 +465,10 @@ def __init__(self, topology: Topology, topology_settings: TopologySettings, pool def close(self) -> None: self.gc_safe_close() - # Increment the generation and maybe close the socket. If the executor - # thread has the socket checked out, it will be closed when checked in. - self._pool.reset() + # Close the RTT monitor pool. If the executor task has the socket + # checked out, checkin will close it via close_conn(POOL_CLOSED) + # because the pool is now CLOSED. + self._pool.close() def add_sample(self, sample: float) -> None: """Add a RTT sample.""" diff --git a/pymongo/synchronous/pool.py b/pymongo/synchronous/pool.py index 7ecfe53cfe..6d49f71959 100644 --- a/pymongo/synchronous/pool.py +++ b/pymongo/synchronous/pool.py @@ -563,6 +563,19 @@ def close_conn(self, reason: Optional[str]) -> None: def _close_conn(self) -> None: """Close this connection.""" + # Force-abort the underlying transport first so the socket fd is + # released even if a previous _close_conn already set self.closed + # but didn't reach transport.abort() (e.g. cancelled mid-close), or + # if the graceful close path below raises. transport.abort() is + # idempotent — _SelectorTransport._force_close returns early when + # _conn_lost is already set. + if not _IS_SYNC: + transport = self.conn.get_conn.transport + if transport is not None: + try: + transport.abort() + except Exception: # noqa: S110 + pass if self.closed: return self.closed = True @@ -571,18 +584,8 @@ def _close_conn(self) -> None: # shutdown. try: self.conn.close() - except BaseException as exc: - # Force-abort the underlying transport so the socket fd is - # released even if the graceful close raised or was cancelled - # before reaching transport.abort(). - transport = getattr(self.conn.get_conn, "transport", None) - if transport is not None: - try: - transport.abort() - except Exception: # noqa: S110 - pass - if not isinstance(exc, Exception): - raise + except Exception: # noqa: S110 + pass def conn_closed(self) -> bool: """Return True if we know socket has been closed, False otherwise.""" @@ -975,18 +978,31 @@ def remove_stale_sockets(self, reference_generation: int) -> None: self._pending += 1 incremented = True conn = self.connect() - close_conn = False - with self.lock: - # Close connection and return if the pool was reset during - # socket creation or while acquiring the pool lock. - if self.gen.get_overall() != reference_generation: - close_conn = True - if not close_conn: - self.conns.appendleft(conn) - self.active_contexts.discard(conn.cancel_context) - if close_conn: - conn.close_conn(ConnectionClosedReason.STALE) - return + try: + close_conn = False + with self.lock: + # Close connection and return if the pool was reset during + # socket creation or while acquiring the pool lock. + if self.gen.get_overall() != reference_generation: + close_conn = True + if not close_conn: + self.conns.appendleft(conn) + self.active_contexts.discard(conn.cancel_context) + if close_conn: + conn.close_conn(ConnectionClosedReason.STALE) + return + except BaseException: + # If cancellation or any other exception lands between + # connect() returning and the conn being appended to the + # deque (or closed via stale-generation), the conn would + # otherwise be orphaned. Close it explicitly to release + # the underlying socket. + if not conn.closed: + try: + conn.close_conn(ConnectionClosedReason.ERROR) + except BaseException: # noqa: S110 + pass + raise finally: if incremented: # Notify after adding the socket to the pool. @@ -1082,6 +1098,8 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect transport.abort() except Exception: # noqa: S110 pass + with self.lock: + self.active_contexts.discard(tmp_context) raise try: with self.lock: @@ -1156,21 +1174,21 @@ def checkout( conn = self._get_conn(checkout_started_time, handler=handler) - duration = time.monotonic() - checkout_started_time - if self.enabled_for_cmap: - assert listeners is not None - listeners.publish_connection_checked_out(self.address, conn.id, duration) - if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - message=_ConnectionStatusMessage.CHECKOUT_SUCCEEDED, - clientId=self._client_id, - serverHost=self.address[0], - serverPort=self.address[1], - driverConnectionId=conn.id, - durationMS=duration, - ) try: + duration = time.monotonic() - checkout_started_time + if self.enabled_for_cmap: + assert listeners is not None + listeners.publish_connection_checked_out(self.address, conn.id, duration) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _CONNECTION_LOGGER, + message=_ConnectionStatusMessage.CHECKOUT_SUCCEEDED, + clientId=self._client_id, + serverHost=self.address[0], + serverPort=self.address[1], + driverConnectionId=conn.id, + durationMS=duration, + ) with self.lock: self.active_contexts.add(conn.cancel_context) yield conn @@ -1187,7 +1205,20 @@ def checkout( exc_type, exc_val, _ = sys.exc_info() handler.handle(exc_type, exc_val) if not pinned and conn.active: - self.checkin(conn) + try: + self.checkin(conn) + except BaseException: + # If checkin is interrupted (e.g., cancellation during a + # lock acquire), the conn is left neither in the deque + # nor closed. Force-abort the transport so the socket fd + # is released instead of leaking on GC. + transport = getattr(conn.conn.get_conn, "transport", None) + if transport is not None: + try: + transport.abort() + except Exception: # noqa: S110 + pass + raise raise if conn.pinned_txn: with self.lock: diff --git a/pymongo/synchronous/server.py b/pymongo/synchronous/server.py index f57420918b..887849b7f8 100644 --- a/pymongo/synchronous/server.py +++ b/pymongo/synchronous/server.py @@ -114,8 +114,17 @@ def close(self) -> None: serverPort=self._description.address[1], ) - self._monitor.close() + # Run monitor.close() and pool.close() independently so a failure in + # one (e.g. monitor's rtt_monitor.close() raising) does not skip the + # other and orphan the server pool's connections. + monitor_error: Optional[BaseException] = None + try: + self._monitor.close() + except BaseException as exc: + monitor_error = exc self._pool.close() + if monitor_error is not None: + raise monitor_error def request_check(self) -> None: """Check the server's state soon.""" diff --git a/pymongo/synchronous/topology.py b/pymongo/synchronous/topology.py index ec1615f0c6..60c7c4427f 100644 --- a/pymongo/synchronous/topology.py +++ b/pymongo/synchronous/topology.py @@ -720,8 +720,16 @@ def close(self) -> None: """ with self._lock: old_td = self._description + first_close_error: Optional[BaseException] = None for server in self._servers.values(): - server.close() + # Each server.close must run independently. A failure on one + # server (e.g. its monitor's cleanup raising) must not skip + # close() on the remaining servers, or their pool conns leak. + try: + server.close() + except BaseException as exc: + if first_close_error is None: + first_close_error = exc if not _IS_SYNC: self._monitor_tasks.append(server._monitor) @@ -781,6 +789,11 @@ def close(self) -> None: self.__events_executor.join(1) process_events_queue(weakref.ref(self._events)) # type: ignore[arg-type] + # Re-raise the first server.close() error (if any) after we've done + # everything we can to clean up the rest of the topology. + if first_close_error is not None: + raise first_close_error + @property def description(self) -> TopologyDescription: return self._description @@ -979,7 +992,14 @@ def _update_servers(self) -> None: for address, server in list(self._servers.items()): if not self._description.has_server(address): - server.close() + # Each server.close must run independently. If one server's + # cleanup raises, we still need to remove it from _servers and + # add its monitor to _monitor_tasks, and we must process the + # remaining servers, otherwise their pool conns leak. + try: + server.close() + except BaseException: # noqa: S110 + pass if not _IS_SYNC: self._monitor_tasks.append(server._monitor) self._servers.pop(address) From eabc4862cec05f8ccf135026130b3f6a16bece6a Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Thu, 21 May 2026 14:28:19 -0400 Subject: [PATCH 10/10] Cleanup --- .evergreen/run-tests.sh | 17 ------- pymongo/asynchronous/pool.py | 89 ++++-------------------------------- pymongo/synchronous/pool.py | 27 +++++++++++ 3 files changed, 36 insertions(+), 97 deletions(-) diff --git a/.evergreen/run-tests.sh b/.evergreen/run-tests.sh index fb66800157..0785bcf01d 100755 --- a/.evergreen/run-tests.sh +++ b/.evergreen/run-tests.sh @@ -39,23 +39,6 @@ trap "cleanup_tests" SIGINT ERR # Start the test runner. echo "Running tests with UV_PYTHON=${UV_PYTHON:-}..." echo "UV_ARGS=${UV_ARGS}" -export PYTHONTRACEMALLOC=20 -# PYTHON-3923 leak diagnostic: log AsyncConnection lifecycle to a file that -# bypasses pytest stdio capture. Reset the file before each run so we only -# see traces from this run. -export PYMONGO_LEAK_TRACE_FILE=/tmp/pymongo_leak_trace.log -: > "$PYMONGO_LEAK_TRACE_FILE" || true - -# Always dump the trace file on script exit (pass or fail) so CI captures it. -dump_leak_trace() { - if [ -s "$PYMONGO_LEAK_TRACE_FILE" ]; then - echo "=================== PYMONGO_LEAK_TRACE BEGIN ===================" - cat "$PYMONGO_LEAK_TRACE_FILE" - echo "==================== PYMONGO_LEAK_TRACE END ====================" - fi -} -trap "dump_leak_trace" EXIT - uv run ${UV_ARGS} --reinstall-package pymongo .evergreen/scripts/run_tests.py "$@" echo "Running tests with UV_PYTHON=${UV_PYTHON:-}... done." diff --git a/pymongo/asynchronous/pool.py b/pymongo/asynchronous/pool.py index b964bfda2e..bc9fc0a71f 100644 --- a/pymongo/asynchronous/pool.py +++ b/pymongo/asynchronous/pool.py @@ -112,22 +112,6 @@ _IS_SYNC = False -# Diagnostic for PYTHON-3923: tag each AsyncConnection lifecycle event so we -# can identify which conns are created but never reach _close_conn. -# Writes to a file so pytest's stdio capture (per-test, only-on-failure) does -# not swallow the trace. Path is configurable via PYMONGO_LEAK_TRACE_FILE; -# default is "/tmp/pymongo_leak_trace.log". Remove once the leak is identified. -_LEAK_TRACE_PATH = os.environ.get("PYMONGO_LEAK_TRACE_FILE", "/tmp/pymongo_leak_trace.log") - - -def _leak_trace(msg: str) -> None: - try: - with open(_LEAK_TRACE_PATH, "a") as _f: # noqa: S108 - _f.write(f"PYMONGO_LEAK_TRACE: t={time.monotonic():.4f} pid={os.getpid()} {msg}\n") - except Exception: # noqa: S110 - pass - - class AsyncConnection: """Store a connection with some metadata. @@ -196,48 +180,16 @@ def __init__( self.creation_time = time.monotonic() # For gossiping $clusterTime from the connection handshake to the client. self._cluster_time = None - # NOTE: the parameter `id` shadows the built-in id() in this scope, so - # we must not call id(self) here. Use repr() or skip obj_id. - _leak_trace( - f"created conn_id={self.id} addr={self.address} is_sdam={self.is_sdam}" - ) def __del__(self) -> None: - # Defense-in-depth: if an AsyncConnection is GC'd without - # close_conn() having run (e.g. its parent Pool was discarded - # before reset/close, or a Pool._reset gather was cancelled - # before any close_conn task got CPU), abort the transport so - # the socket fd is released and the unclosed-transport - # ResourceWarning is not raised. - try: - if not self.closed: - # Probe transport state so we can tell whether Part A's - # sync-abort in Pool._reset already released the fd - # (state=closing) or this is a genuine untouched leak - # (state=open). - state = "unknown" - try: - transport = self.conn.get_conn.transport - if transport is None: - state = "none" - elif transport.is_closing(): - state = "closing" - else: - state = "open" - except Exception: # noqa: S110 - pass - _leak_trace( - f"GC-WITHOUT-CLOSE conn_id={self.id} addr={self.address} " - f"is_sdam={self.is_sdam} transport={state}" - ) - try: - transport = self.conn.get_conn.transport - if transport is not None: - transport.abort() - except Exception: # noqa: S110 - pass - except Exception: # noqa: S110 - pass + # Ensure all async connections are properly cleaned up on GC : + if not _IS_SYNC and not self.closed: + try: + transport = self.conn.get_conn.transport + if transport is not None: + transport.abort() + except Exception: # noqa: S110 + pass def set_conn_timeout(self, timeout: Optional[float]) -> None: """Cache last timeout to avoid duplicate calls to conn.settimeout.""" @@ -623,10 +575,6 @@ async def close_conn(self, reason: Optional[str]) -> None: async def _close_conn(self) -> None: """Close this connection.""" - _leak_trace( - f"_close_conn conn_id={self.id} addr={self.address} " - f"already_closed={self.closed} obj_id={id(self):x}" - ) # Force-abort the underlying transport first so the socket fd is # released even if a previous _close_conn already set self.closed # but didn't reach transport.abort() (e.g. cancelled mid-close), or @@ -911,12 +859,6 @@ async def _reset( keep.append(conn) sockets = discard self.conns = keep - _leak_trace( - f"_reset close={close} addr={self.address} " - f"deque_size={len(sockets)} " - f"conn_ids={[c.id for c in sockets]} " - f"obj_ids={[hex(id(c))[2:] for c in sockets]}" - ) if close: self.state = PoolState.CLOSED @@ -1282,11 +1224,7 @@ async def checkout( self.active_contexts.add(conn.cancel_context) yield conn # Catch KeyboardInterrupt, CancelledError, etc. and cleanup. - except BaseException as _checkout_exc: - _leak_trace( - f"checkout-except conn_id={conn.id} addr={self.address} " - f"obj_id={id(conn):x} exc={type(_checkout_exc).__name__}" - ) + except BaseException: # Exception in caller. Ensure the connection gets returned. # Note that when pinned is True, the session owns the # connection and it is responsible for checking the connection @@ -1484,10 +1422,6 @@ async def checkin(self, conn: AsyncConnection) -> None: :param conn: The connection to check into the pool. """ - _leak_trace( - f"checkin-enter conn_id={conn.id} addr={self.address} " - f"pool_closed={self.closed} conn_closed={conn.closed} obj_id={id(conn):x}" - ) txn = conn.pinned_txn cursor = conn.pinned_cursor conn.active = False @@ -1547,11 +1481,6 @@ async def checkin(self, conn: AsyncConnection) -> None: self._max_connecting_cond.notify() if close_conn: await conn.close_conn(ConnectionClosedReason.STALE) - else: - _leak_trace( - f"checkin-appendleft conn_id={conn.id} addr={self.address} " - f"obj_id={id(conn):x}" - ) async with self.size_cond: if txn: diff --git a/pymongo/synchronous/pool.py b/pymongo/synchronous/pool.py index 6d49f71959..efca68ec3a 100644 --- a/pymongo/synchronous/pool.py +++ b/pymongo/synchronous/pool.py @@ -181,6 +181,16 @@ def __init__( # For gossiping $clusterTime from the connection handshake to the client. self._cluster_time = None + def __del__(self) -> None: + # Ensure all async connections are properly cleaned up on GC : + if not _IS_SYNC and not self.closed: + try: + transport = self.conn.get_conn.transport + if transport is not None: + transport.abort() + except Exception: # noqa: S110 + pass + def set_conn_timeout(self, timeout: Optional[float]) -> None: """Cache last timeout to avoid duplicate calls to conn.settimeout.""" if timeout == self.last_timeout: @@ -858,6 +868,23 @@ def _reset( for context in self.active_contexts: context.cancel() + # Synchronously abort the transports of all snapshotted conns. This + # releases the socket fd and schedules _call_connection_lost before + # any await. If the gather below is cancelled (e.g. by test + # teardown propagating a CancelledError into _reset), the inner + # close_conn() Tasks are cancelled before their body runs, so the + # transport.abort() inside _close_conn() never fires and the + # snapshotted conns leak. transport.abort() is idempotent — the + # close_conn coroutines below remain safe to run. + if not _IS_SYNC: + for conn in sockets: + try: + transport = conn.conn.get_conn.transport + if transport is not None: + transport.abort() + except Exception: # noqa: S110 + pass + listeners = self.opts._event_listeners # CMAP spec says that close() MUST close sockets before publishing the # PoolClosedEvent but that reset() SHOULD close sockets *after*