diff --git a/pymongo/asynchronous/monitor.py b/pymongo/asynchronous/monitor.py index 45c12b219f..8f6322761f 100644 --- a/pymongo/asynchronous/monitor.py +++ b/pymongo/asynchronous/monitor.py @@ -200,10 +200,21 @@ async def join(self) -> None: async def close(self) -> None: self.gc_safe_close() - await self._rtt_monitor.close() - # Increment the generation and maybe close the socket. If the executor - # thread has the socket checked out, it will be closed when checked in. - await self._reset_connection() + # Run rtt_monitor.close() and self._pool.close() independently so a + # failure in rtt cleanup does not skip the monitor pool's close, which + # would otherwise orphan its conn deque. + rtt_error: Optional[BaseException] = None + try: + await self._rtt_monitor.close() + except BaseException as exc: + rtt_error = exc + # Close the monitor pool. This both closes the conn in the deque + # and marks the pool CLOSED, so any in-flight check_once that checks + # the conn back in will close it via close_conn(POOL_CLOSED) rather + # than returning it to the deque of a pool that is about to be GC'd. + await self._pool.close() + if rtt_error is not None: + raise rtt_error async def _reset_connection(self) -> None: # Clear our pooled connection. @@ -456,9 +467,10 @@ def __init__(self, topology: Topology, topology_settings: TopologySettings, pool async def close(self) -> None: self.gc_safe_close() - # Increment the generation and maybe close the socket. If the executor - # thread has the socket checked out, it will be closed when checked in. - await self._pool.reset() + # Close the RTT monitor pool. If the executor task has the socket + # checked out, checkin will close it via close_conn(POOL_CLOSED) + # because the pool is now CLOSED. + await self._pool.close() async def add_sample(self, sample: float) -> None: """Add a RTT sample.""" diff --git a/pymongo/asynchronous/pool.py b/pymongo/asynchronous/pool.py index a5d5b28990..bc9fc0a71f 100644 --- a/pymongo/asynchronous/pool.py +++ b/pymongo/asynchronous/pool.py @@ -181,6 +181,16 @@ def __init__( # For gossiping $clusterTime from the connection handshake to the client. self._cluster_time = None + def __del__(self) -> None: + # Ensure all async connections are properly cleaned up on GC : + if not _IS_SYNC and not self.closed: + try: + transport = self.conn.get_conn.transport + if transport is not None: + transport.abort() + except Exception: # noqa: S110 + pass + def set_conn_timeout(self, timeout: Optional[float]) -> None: """Cache last timeout to avoid duplicate calls to conn.settimeout.""" if timeout == self.last_timeout: @@ -565,6 +575,19 @@ async def close_conn(self, reason: Optional[str]) -> None: async def _close_conn(self) -> None: """Close this connection.""" + # Force-abort the underlying transport first so the socket fd is + # released even if a previous _close_conn already set self.closed + # but didn't reach transport.abort() (e.g. cancelled mid-close), or + # if the graceful close path below raises. transport.abort() is + # idempotent — _SelectorTransport._force_close returns early when + # _conn_lost is already set. + if not _IS_SYNC: + transport = self.conn.get_conn.transport + if transport is not None: + try: + transport.abort() + except Exception: # noqa: S110 + pass if self.closed: return self.closed = True @@ -847,6 +870,23 @@ async def _reset( for context in self.active_contexts: context.cancel() + # Synchronously abort the transports of all snapshotted conns. This + # releases the socket fd and schedules _call_connection_lost before + # any await. If the gather below is cancelled (e.g. by test + # teardown propagating a CancelledError into _reset), the inner + # close_conn() Tasks are cancelled before their body runs, so the + # transport.abort() inside _close_conn() never fires and the + # snapshotted conns leak. transport.abort() is idempotent — the + # close_conn coroutines below remain safe to run. + if not _IS_SYNC: + for conn in sockets: + try: + transport = conn.conn.get_conn.transport + if transport is not None: + transport.abort() + except Exception: # noqa: S110 + pass + listeners = self.opts._event_listeners # CMAP spec says that close() MUST close sockets before publishing the # PoolClosedEvent but that reset() SHOULD close sockets *after* @@ -969,18 +1009,31 @@ async def remove_stale_sockets(self, reference_generation: int) -> None: self._pending += 1 incremented = True conn = await self.connect() - close_conn = False - async with self.lock: - # Close connection and return if the pool was reset during - # socket creation or while acquiring the pool lock. - if self.gen.get_overall() != reference_generation: - close_conn = True - if not close_conn: - self.conns.appendleft(conn) - self.active_contexts.discard(conn.cancel_context) - if close_conn: - await conn.close_conn(ConnectionClosedReason.STALE) - return + try: + close_conn = False + async with self.lock: + # Close connection and return if the pool was reset during + # socket creation or while acquiring the pool lock. + if self.gen.get_overall() != reference_generation: + close_conn = True + if not close_conn: + self.conns.appendleft(conn) + self.active_contexts.discard(conn.cancel_context) + if close_conn: + await conn.close_conn(ConnectionClosedReason.STALE) + return + except BaseException: + # If cancellation or any other exception lands between + # connect() returning and the conn being appended to the + # deque (or closed via stale-generation), the conn would + # otherwise be orphaned. Close it explicitly to release + # the underlying socket. + if not conn.closed: + try: + await conn.close_conn(ConnectionClosedReason.ERROR) + except BaseException: # noqa: S110 + pass + raise finally: if incremented: # Notify after adding the socket to the pool. @@ -1064,35 +1117,58 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A _raise_connection_failure(self.address, error, timeout_details=details) raise - conn = AsyncConnection(networking_interface, self, self.address, conn_id, self.is_sdam) # type: ignore[arg-type] - async with self.lock: - self.active_contexts.add(conn.cancel_context) - self.active_contexts.discard(tmp_context) - if tmp_context.cancelled: - conn.cancel_context.cancel() - completed_hello = False try: - if not self.is_sdam: - await conn.hello() - completed_hello = True - self.is_writable = conn.is_writable - if handler: - handler.contribute_socket(conn, completed_handshake=False) - - await conn.authenticate() - # Catch KeyboardInterrupt, CancelledError, etc. and cleanup. - except BaseException as e: + conn = AsyncConnection(networking_interface, self, self.address, conn_id, self.is_sdam) # type: ignore[arg-type] + except BaseException: + # Release the networking_interface's transport if AsyncConnection + # construction failed, since no AsyncConnection exists yet to + # close_conn() through the outer cleanup path. + transport = getattr(networking_interface.get_conn, "transport", None) + if transport is not None: + try: + transport.abort() + except Exception: # noqa: S110 + pass async with self.lock: - self.active_contexts.discard(conn.cancel_context) - if not completed_hello: - self._handle_connection_error(e) - await conn.close_conn(ConnectionClosedReason.ERROR) + self.active_contexts.discard(tmp_context) raise + try: + async with self.lock: + self.active_contexts.add(conn.cancel_context) + self.active_contexts.discard(tmp_context) + if tmp_context.cancelled: + conn.cancel_context.cancel() + completed_hello = False + try: + if not self.is_sdam: + await conn.hello() + completed_hello = True + self.is_writable = conn.is_writable + if handler: + handler.contribute_socket(conn, completed_handshake=False) + + await conn.authenticate() + # Catch KeyboardInterrupt, CancelledError, etc. and cleanup. + except BaseException as e: + async with self.lock: + self.active_contexts.discard(conn.cancel_context) + if not completed_hello: + self._handle_connection_error(e) + await conn.close_conn(ConnectionClosedReason.ERROR) + raise - if handler: - await handler.client._topology.receive_cluster_time(conn._cluster_time) + if handler: + await handler.client._topology.receive_cluster_time(conn._cluster_time) - return conn + return conn + # Catch cancellations that interrupt outside the inner try block above + except BaseException: + if not conn.closed: + try: + await conn.close_conn(ConnectionClosedReason.ERROR) + except BaseException: # noqa: S110 + pass + raise @contextlib.asynccontextmanager async def checkout( @@ -1129,21 +1205,21 @@ async def checkout( conn = await self._get_conn(checkout_started_time, handler=handler) - duration = time.monotonic() - checkout_started_time - if self.enabled_for_cmap: - assert listeners is not None - listeners.publish_connection_checked_out(self.address, conn.id, duration) - if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - message=_ConnectionStatusMessage.CHECKOUT_SUCCEEDED, - clientId=self._client_id, - serverHost=self.address[0], - serverPort=self.address[1], - driverConnectionId=conn.id, - durationMS=duration, - ) try: + duration = time.monotonic() - checkout_started_time + if self.enabled_for_cmap: + assert listeners is not None + listeners.publish_connection_checked_out(self.address, conn.id, duration) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _CONNECTION_LOGGER, + message=_ConnectionStatusMessage.CHECKOUT_SUCCEEDED, + clientId=self._client_id, + serverHost=self.address[0], + serverPort=self.address[1], + driverConnectionId=conn.id, + durationMS=duration, + ) async with self.lock: self.active_contexts.add(conn.cancel_context) yield conn @@ -1160,7 +1236,20 @@ async def checkout( exc_type, exc_val, _ = sys.exc_info() await handler.handle(exc_type, exc_val) if not pinned and conn.active: - await self.checkin(conn) + try: + await self.checkin(conn) + except BaseException: + # If checkin is interrupted (e.g., cancellation during a + # lock acquire), the conn is left neither in the deque + # nor closed. Force-abort the transport so the socket fd + # is released instead of leaking on GC. + transport = getattr(conn.conn.get_conn, "transport", None) + if transport is not None: + try: + transport.abort() + except Exception: # noqa: S110 + pass + raise raise if conn.pinned_txn: async with self.lock: diff --git a/pymongo/asynchronous/server.py b/pymongo/asynchronous/server.py index f212306174..029f649277 100644 --- a/pymongo/asynchronous/server.py +++ b/pymongo/asynchronous/server.py @@ -114,8 +114,17 @@ async def close(self) -> None: serverPort=self._description.address[1], ) - await self._monitor.close() + # Run monitor.close() and pool.close() independently so a failure in + # one (e.g. monitor's rtt_monitor.close() raising) does not skip the + # other and orphan the server pool's connections. + monitor_error: Optional[BaseException] = None + try: + await self._monitor.close() + except BaseException as exc: + monitor_error = exc await self._pool.close() + if monitor_error is not None: + raise monitor_error def request_check(self) -> None: """Check the server's state soon.""" diff --git a/pymongo/asynchronous/topology.py b/pymongo/asynchronous/topology.py index 01e346bfa8..afcdca0ac7 100644 --- a/pymongo/asynchronous/topology.py +++ b/pymongo/asynchronous/topology.py @@ -722,8 +722,16 @@ async def close(self) -> None: """ async with self._lock: old_td = self._description + first_close_error: Optional[BaseException] = None for server in self._servers.values(): - await server.close() + # Each server.close must run independently. A failure on one + # server (e.g. its monitor's cleanup raising) must not skip + # close() on the remaining servers, or their pool conns leak. + try: + await server.close() + except BaseException as exc: + if first_close_error is None: + first_close_error = exc if not _IS_SYNC: self._monitor_tasks.append(server._monitor) @@ -783,6 +791,11 @@ async def close(self) -> None: await self.__events_executor.join(1) process_events_queue(weakref.ref(self._events)) # type: ignore[arg-type] + # Re-raise the first server.close() error (if any) after we've done + # everything we can to clean up the rest of the topology. + if first_close_error is not None: + raise first_close_error + @property def description(self) -> TopologyDescription: return self._description @@ -981,7 +994,14 @@ async def _update_servers(self) -> None: for address, server in list(self._servers.items()): if not self._description.has_server(address): - await server.close() + # Each server.close must run independently. If one server's + # cleanup raises, we still need to remove it from _servers and + # add its monitor to _monitor_tasks, and we must process the + # remaining servers, otherwise their pool conns leak. + try: + await server.close() + except BaseException: # noqa: S110 + pass if not _IS_SYNC: self._monitor_tasks.append(server._monitor) self._servers.pop(address) diff --git a/pymongo/pool_shared.py b/pymongo/pool_shared.py index a6f434885b..2e4522ea3f 100644 --- a/pymongo/pool_shared.py +++ b/pymongo/pool_shared.py @@ -207,6 +207,7 @@ async def _async_create_connection(address: _Address, options: PoolOptions) -> s sock = socket.socket(af, socktype, proto) # Fallback when SOCK_CLOEXEC isn't available. _set_non_inheritable_non_atomic(sock.fileno()) + sock_returned = False try: sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # CSOT: apply timeout to socket connect. @@ -223,14 +224,18 @@ async def _async_create_connection(address: _Address, options: PoolOptions) -> s asyncio.get_running_loop().sock_connect(sock, sa), timeout=timeout ) sock.settimeout(timeout) + # Set immediately before return. Do not insert an await between this and the return + sock_returned = True return sock except asyncio.TimeoutError as e: - sock.close() err = socket.timeout("timed out") err.__cause__ = e except OSError as e: - sock.close() err = e # type: ignore[assignment] + finally: + # Always close the socket if it wasn't returned to avoid leaks. + if not sock_returned: + sock.close() if err is not None: raise err @@ -309,46 +314,58 @@ async def _configured_protocol_interface( sock = await _async_create_connection(address, options) ssl_context = options._ssl_context timeout = options.socket_timeout - - if ssl_context is None: - return AsyncNetworkingInterface( - await asyncio.get_running_loop().create_connection( - lambda: PyMongoProtocol(timeout=timeout), sock=sock - ) - ) - - host = address[0] + # Create the Protocol early to prevent asyncio resource leaks during cleanup path + protocol = PyMongoProtocol(timeout=timeout) + sock_adopted = False try: - # We have to pass hostname / ip address to wrap_socket - # to use SSLContext.check_hostname. - transport, protocol = await asyncio.get_running_loop().create_connection( # type: ignore[call-overload] - lambda: PyMongoProtocol(timeout=timeout), - sock=sock, - server_hostname=host, - ssl=ssl_context, - ) - except _CertificateError: - # Raise _CertificateError directly like we do after match_hostname - # below. - raise - except (OSError, *SSLErrors) as exc: - # We raise AutoReconnect for transient and permanent SSL handshake - # failures alike. Permanent handshake failures, like protocol - # mismatch, will be turned into ServerSelectionTimeoutErrors later. - details = _get_timeout_details(options) - _raise_connection_failure(address, exc, "SSL handshake failed: ", timeout_details=details) - if ( - ssl_context.verify_mode - and not ssl_context.check_hostname - and not options.tls_allow_invalid_hostnames - ): + if ssl_context is None: + result = await asyncio.get_running_loop().create_connection(lambda: protocol, sock=sock) + sock_adopted = True + return AsyncNetworkingInterface(result) + + host = address[0] try: - ssl.match_hostname(transport.get_extra_info("peercert"), hostname=host) # type:ignore[attr-defined,unused-ignore] + # We have to pass hostname / ip address to wrap_socket + # to use SSLContext.check_hostname. + transport, _ = await asyncio.get_running_loop().create_connection( # type: ignore[call-overload] + lambda: protocol, + sock=sock, + server_hostname=host, + ssl=ssl_context, + ) + sock_adopted = True except _CertificateError: - transport.abort() + # Raise _CertificateError directly like we do after match_hostname + # below. raise - - return AsyncNetworkingInterface((transport, protocol)) + except (OSError, *SSLErrors) as exc: + # We raise AutoReconnect for transient and permanent SSL handshake + # failures alike. Permanent handshake failures, like protocol + # mismatch, will be turned into ServerSelectionTimeoutErrors later. + details = _get_timeout_details(options) + _raise_connection_failure( + address, exc, "SSL handshake failed: ", timeout_details=details + ) + if ( + ssl_context.verify_mode + and not ssl_context.check_hostname + and not options.tls_allow_invalid_hostnames + ): + try: + ssl.match_hostname(transport.get_extra_info("peercert"), hostname=host) # type:ignore[attr-defined,unused-ignore] + except _CertificateError: + transport.abort() + raise + + return AsyncNetworkingInterface((transport, protocol)) + finally: + if not sock_adopted: + # If the protocol owns the transport, it also adopted the socket and needs to be cleaned up from the transport + if protocol.transport is not None: + protocol.transport.abort() + # Otherwise the socket was never adopted, close it directly + else: + sock.close() def _create_connection(address: _Address, options: PoolOptions) -> socket.socket: diff --git a/pymongo/synchronous/monitor.py b/pymongo/synchronous/monitor.py index f395588814..3feecc042e 100644 --- a/pymongo/synchronous/monitor.py +++ b/pymongo/synchronous/monitor.py @@ -198,10 +198,21 @@ def join(self) -> None: def close(self) -> None: self.gc_safe_close() - self._rtt_monitor.close() - # Increment the generation and maybe close the socket. If the executor - # thread has the socket checked out, it will be closed when checked in. - self._reset_connection() + # Run rtt_monitor.close() and self._pool.close() independently so a + # failure in rtt cleanup does not skip the monitor pool's close, which + # would otherwise orphan its conn deque. + rtt_error: Optional[BaseException] = None + try: + self._rtt_monitor.close() + except BaseException as exc: + rtt_error = exc + # Close the monitor pool. This both closes the conn in the deque + # and marks the pool CLOSED, so any in-flight check_once that checks + # the conn back in will close it via close_conn(POOL_CLOSED) rather + # than returning it to the deque of a pool that is about to be GC'd. + self._pool.close() + if rtt_error is not None: + raise rtt_error def _reset_connection(self) -> None: # Clear our pooled connection. @@ -454,9 +465,10 @@ def __init__(self, topology: Topology, topology_settings: TopologySettings, pool def close(self) -> None: self.gc_safe_close() - # Increment the generation and maybe close the socket. If the executor - # thread has the socket checked out, it will be closed when checked in. - self._pool.reset() + # Close the RTT monitor pool. If the executor task has the socket + # checked out, checkin will close it via close_conn(POOL_CLOSED) + # because the pool is now CLOSED. + self._pool.close() def add_sample(self, sample: float) -> None: """Add a RTT sample.""" diff --git a/pymongo/synchronous/pool.py b/pymongo/synchronous/pool.py index 25f2d08fe7..efca68ec3a 100644 --- a/pymongo/synchronous/pool.py +++ b/pymongo/synchronous/pool.py @@ -181,6 +181,16 @@ def __init__( # For gossiping $clusterTime from the connection handshake to the client. self._cluster_time = None + def __del__(self) -> None: + # Ensure all async connections are properly cleaned up on GC : + if not _IS_SYNC and not self.closed: + try: + transport = self.conn.get_conn.transport + if transport is not None: + transport.abort() + except Exception: # noqa: S110 + pass + def set_conn_timeout(self, timeout: Optional[float]) -> None: """Cache last timeout to avoid duplicate calls to conn.settimeout.""" if timeout == self.last_timeout: @@ -563,6 +573,19 @@ def close_conn(self, reason: Optional[str]) -> None: def _close_conn(self) -> None: """Close this connection.""" + # Force-abort the underlying transport first so the socket fd is + # released even if a previous _close_conn already set self.closed + # but didn't reach transport.abort() (e.g. cancelled mid-close), or + # if the graceful close path below raises. transport.abort() is + # idempotent — _SelectorTransport._force_close returns early when + # _conn_lost is already set. + if not _IS_SYNC: + transport = self.conn.get_conn.transport + if transport is not None: + try: + transport.abort() + except Exception: # noqa: S110 + pass if self.closed: return self.closed = True @@ -845,6 +868,23 @@ def _reset( for context in self.active_contexts: context.cancel() + # Synchronously abort the transports of all snapshotted conns. This + # releases the socket fd and schedules _call_connection_lost before + # any await. If the gather below is cancelled (e.g. by test + # teardown propagating a CancelledError into _reset), the inner + # close_conn() Tasks are cancelled before their body runs, so the + # transport.abort() inside _close_conn() never fires and the + # snapshotted conns leak. transport.abort() is idempotent — the + # close_conn coroutines below remain safe to run. + if not _IS_SYNC: + for conn in sockets: + try: + transport = conn.conn.get_conn.transport + if transport is not None: + transport.abort() + except Exception: # noqa: S110 + pass + listeners = self.opts._event_listeners # CMAP spec says that close() MUST close sockets before publishing the # PoolClosedEvent but that reset() SHOULD close sockets *after* @@ -965,18 +1005,31 @@ def remove_stale_sockets(self, reference_generation: int) -> None: self._pending += 1 incremented = True conn = self.connect() - close_conn = False - with self.lock: - # Close connection and return if the pool was reset during - # socket creation or while acquiring the pool lock. - if self.gen.get_overall() != reference_generation: - close_conn = True - if not close_conn: - self.conns.appendleft(conn) - self.active_contexts.discard(conn.cancel_context) - if close_conn: - conn.close_conn(ConnectionClosedReason.STALE) - return + try: + close_conn = False + with self.lock: + # Close connection and return if the pool was reset during + # socket creation or while acquiring the pool lock. + if self.gen.get_overall() != reference_generation: + close_conn = True + if not close_conn: + self.conns.appendleft(conn) + self.active_contexts.discard(conn.cancel_context) + if close_conn: + conn.close_conn(ConnectionClosedReason.STALE) + return + except BaseException: + # If cancellation or any other exception lands between + # connect() returning and the conn being appended to the + # deque (or closed via stale-generation), the conn would + # otherwise be orphaned. Close it explicitly to release + # the underlying socket. + if not conn.closed: + try: + conn.close_conn(ConnectionClosedReason.ERROR) + except BaseException: # noqa: S110 + pass + raise finally: if incremented: # Notify after adding the socket to the pool. @@ -1060,35 +1113,58 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect _raise_connection_failure(self.address, error, timeout_details=details) raise - conn = Connection(networking_interface, self, self.address, conn_id, self.is_sdam) # type: ignore[arg-type] - with self.lock: - self.active_contexts.add(conn.cancel_context) - self.active_contexts.discard(tmp_context) - if tmp_context.cancelled: - conn.cancel_context.cancel() - completed_hello = False try: - if not self.is_sdam: - conn.hello() - completed_hello = True - self.is_writable = conn.is_writable - if handler: - handler.contribute_socket(conn, completed_handshake=False) - - conn.authenticate() - # Catch KeyboardInterrupt, CancelledError, etc. and cleanup. - except BaseException as e: + conn = Connection(networking_interface, self, self.address, conn_id, self.is_sdam) # type: ignore[arg-type] + except BaseException: + # Release the networking_interface's transport if Connection + # construction failed, since no Connection exists yet to + # close_conn() through the outer cleanup path. + transport = getattr(networking_interface.get_conn, "transport", None) + if transport is not None: + try: + transport.abort() + except Exception: # noqa: S110 + pass with self.lock: - self.active_contexts.discard(conn.cancel_context) - if not completed_hello: - self._handle_connection_error(e) - conn.close_conn(ConnectionClosedReason.ERROR) + self.active_contexts.discard(tmp_context) raise + try: + with self.lock: + self.active_contexts.add(conn.cancel_context) + self.active_contexts.discard(tmp_context) + if tmp_context.cancelled: + conn.cancel_context.cancel() + completed_hello = False + try: + if not self.is_sdam: + conn.hello() + completed_hello = True + self.is_writable = conn.is_writable + if handler: + handler.contribute_socket(conn, completed_handshake=False) + + conn.authenticate() + # Catch KeyboardInterrupt, CancelledError, etc. and cleanup. + except BaseException as e: + with self.lock: + self.active_contexts.discard(conn.cancel_context) + if not completed_hello: + self._handle_connection_error(e) + conn.close_conn(ConnectionClosedReason.ERROR) + raise - if handler: - handler.client._topology.receive_cluster_time(conn._cluster_time) + if handler: + handler.client._topology.receive_cluster_time(conn._cluster_time) - return conn + return conn + # Catch cancellations that interrupt outside the inner try block above + except BaseException: + if not conn.closed: + try: + conn.close_conn(ConnectionClosedReason.ERROR) + except BaseException: # noqa: S110 + pass + raise @contextlib.contextmanager def checkout( @@ -1125,21 +1201,21 @@ def checkout( conn = self._get_conn(checkout_started_time, handler=handler) - duration = time.monotonic() - checkout_started_time - if self.enabled_for_cmap: - assert listeners is not None - listeners.publish_connection_checked_out(self.address, conn.id, duration) - if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - message=_ConnectionStatusMessage.CHECKOUT_SUCCEEDED, - clientId=self._client_id, - serverHost=self.address[0], - serverPort=self.address[1], - driverConnectionId=conn.id, - durationMS=duration, - ) try: + duration = time.monotonic() - checkout_started_time + if self.enabled_for_cmap: + assert listeners is not None + listeners.publish_connection_checked_out(self.address, conn.id, duration) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _CONNECTION_LOGGER, + message=_ConnectionStatusMessage.CHECKOUT_SUCCEEDED, + clientId=self._client_id, + serverHost=self.address[0], + serverPort=self.address[1], + driverConnectionId=conn.id, + durationMS=duration, + ) with self.lock: self.active_contexts.add(conn.cancel_context) yield conn @@ -1156,7 +1232,20 @@ def checkout( exc_type, exc_val, _ = sys.exc_info() handler.handle(exc_type, exc_val) if not pinned and conn.active: - self.checkin(conn) + try: + self.checkin(conn) + except BaseException: + # If checkin is interrupted (e.g., cancellation during a + # lock acquire), the conn is left neither in the deque + # nor closed. Force-abort the transport so the socket fd + # is released instead of leaking on GC. + transport = getattr(conn.conn.get_conn, "transport", None) + if transport is not None: + try: + transport.abort() + except Exception: # noqa: S110 + pass + raise raise if conn.pinned_txn: with self.lock: diff --git a/pymongo/synchronous/server.py b/pymongo/synchronous/server.py index f57420918b..887849b7f8 100644 --- a/pymongo/synchronous/server.py +++ b/pymongo/synchronous/server.py @@ -114,8 +114,17 @@ def close(self) -> None: serverPort=self._description.address[1], ) - self._monitor.close() + # Run monitor.close() and pool.close() independently so a failure in + # one (e.g. monitor's rtt_monitor.close() raising) does not skip the + # other and orphan the server pool's connections. + monitor_error: Optional[BaseException] = None + try: + self._monitor.close() + except BaseException as exc: + monitor_error = exc self._pool.close() + if monitor_error is not None: + raise monitor_error def request_check(self) -> None: """Check the server's state soon.""" diff --git a/pymongo/synchronous/topology.py b/pymongo/synchronous/topology.py index ec1615f0c6..60c7c4427f 100644 --- a/pymongo/synchronous/topology.py +++ b/pymongo/synchronous/topology.py @@ -720,8 +720,16 @@ def close(self) -> None: """ with self._lock: old_td = self._description + first_close_error: Optional[BaseException] = None for server in self._servers.values(): - server.close() + # Each server.close must run independently. A failure on one + # server (e.g. its monitor's cleanup raising) must not skip + # close() on the remaining servers, or their pool conns leak. + try: + server.close() + except BaseException as exc: + if first_close_error is None: + first_close_error = exc if not _IS_SYNC: self._monitor_tasks.append(server._monitor) @@ -781,6 +789,11 @@ def close(self) -> None: self.__events_executor.join(1) process_events_queue(weakref.ref(self._events)) # type: ignore[arg-type] + # Re-raise the first server.close() error (if any) after we've done + # everything we can to clean up the rest of the topology. + if first_close_error is not None: + raise first_close_error + @property def description(self) -> TopologyDescription: return self._description @@ -979,7 +992,14 @@ def _update_servers(self) -> None: for address, server in list(self._servers.items()): if not self._description.has_server(address): - server.close() + # Each server.close must run independently. If one server's + # cleanup raises, we still need to remove it from _servers and + # add its monitor to _monitor_tasks, and we must process the + # remaining servers, otherwise their pool conns leak. + try: + server.close() + except BaseException: # noqa: S110 + pass if not _IS_SYNC: self._monitor_tasks.append(server._monitor) self._servers.pop(address) diff --git a/pyproject.toml b/pyproject.toml index 9b3287834a..dd8a4955d2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -108,11 +108,6 @@ filterwarnings = [ # pytest-asyncio known issue: https://github.com/pytest-dev/pytest-asyncio/issues/1032 "module:.*WindowsSelectorEventLoopPolicy:DeprecationWarning", "module:.*et_event_loop_policy:DeprecationWarning", - # TODO: Remove as part of PYTHON-3923. - "module:unclosed