-
Notifications
You must be signed in to change notification settings - Fork 42
FIX: Fixing test that does not work properly / Failing in connection pool #493
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -51,7 +51,13 @@ Connection::Connection(const std::wstring& conn_str, bool use_pool) | |||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| Connection::~Connection() { | ||||||||||||||||||||||||||||
| disconnect(); // fallback if user forgets to disconnect | ||||||||||||||||||||||||||||
| try { | ||||||||||||||||||||||||||||
| disconnect(); // fallback if user forgets to disconnect | ||||||||||||||||||||||||||||
| } catch (...) { | ||||||||||||||||||||||||||||
| // Never throw from a destructor — doing so during stack unwinding | ||||||||||||||||||||||||||||
| // causes std::terminate(). Log and swallow. | ||||||||||||||||||||||||||||
| LOG_ERROR("Exception suppressed in ~Connection destructor"); | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| // Allocates connection handle | ||||||||||||||||||||||||||||
|
|
@@ -99,23 +105,22 @@ void Connection::disconnect() { | |||||||||||||||||||||||||||
| // When we free the DBC handle below, the ODBC driver will automatically free | ||||||||||||||||||||||||||||
| // all child STMT handles. We need to tell the SqlHandle objects about this | ||||||||||||||||||||||||||||
| // so they don't try to free the handles again during their destruction. | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| // THREAD-SAFETY: Lock mutex to safely access _childStatementHandles | ||||||||||||||||||||||||||||
| // This protects against concurrent allocStatementHandle() calls or GC finalizers | ||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||
| std::lock_guard<std::mutex> lock(_childHandlesMutex); | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| // First compact: remove expired weak_ptrs (they're already destroyed) | ||||||||||||||||||||||||||||
| size_t originalSize = _childStatementHandles.size(); | ||||||||||||||||||||||||||||
| _childStatementHandles.erase( | ||||||||||||||||||||||||||||
| std::remove_if(_childStatementHandles.begin(), _childStatementHandles.end(), | ||||||||||||||||||||||||||||
| [](const std::weak_ptr<SqlHandle>& wp) { return wp.expired(); }), | ||||||||||||||||||||||||||||
| _childStatementHandles.end()); | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| LOG("Compacted child handles: %zu -> %zu (removed %zu expired)", | ||||||||||||||||||||||||||||
| originalSize, _childStatementHandles.size(), | ||||||||||||||||||||||||||||
| originalSize - _childStatementHandles.size()); | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| LOG("Compacted child handles: %zu -> %zu (removed %zu expired)", originalSize, | ||||||||||||||||||||||||||||
| _childStatementHandles.size(), originalSize - _childStatementHandles.size()); | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| LOG("Marking %zu child statement handles as implicitly freed", | ||||||||||||||||||||||||||||
| _childStatementHandles.size()); | ||||||||||||||||||||||||||||
| for (auto& weakHandle : _childStatementHandles) { | ||||||||||||||||||||||||||||
|
|
@@ -124,8 +129,10 @@ void Connection::disconnect() { | |||||||||||||||||||||||||||
| // This is guaranteed by allocStatementHandle() which only creates STMT handles | ||||||||||||||||||||||||||||
| // If this assertion fails, it indicates a serious bug in handle tracking | ||||||||||||||||||||||||||||
| if (handle->type() != SQL_HANDLE_STMT) { | ||||||||||||||||||||||||||||
| LOG_ERROR("CRITICAL: Non-STMT handle (type=%d) found in _childStatementHandles. " | ||||||||||||||||||||||||||||
| "This will cause a handle leak!", handle->type()); | ||||||||||||||||||||||||||||
| LOG_ERROR( | ||||||||||||||||||||||||||||
| "CRITICAL: Non-STMT handle (type=%d) found in _childStatementHandles. " | ||||||||||||||||||||||||||||
| "This will cause a handle leak!", | ||||||||||||||||||||||||||||
| handle->type()); | ||||||||||||||||||||||||||||
| continue; // Skip marking to prevent leak | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
| handle->markImplicitlyFreed(); | ||||||||||||||||||||||||||||
|
|
@@ -136,8 +143,13 @@ void Connection::disconnect() { | |||||||||||||||||||||||||||
| } // Release lock before potentially slow SQLDisconnect call | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| SQLRETURN ret = SQLDisconnect_ptr(_dbcHandle->get()); | ||||||||||||||||||||||||||||
| checkError(ret); | ||||||||||||||||||||||||||||
| // triggers SQLFreeHandle via destructor, if last owner | ||||||||||||||||||||||||||||
| if (!SQL_SUCCEEDED(ret)) { | ||||||||||||||||||||||||||||
| // Log the error but do NOT throw — disconnect must be safe to call | ||||||||||||||||||||||||||||
| // from destructors, reset() failure paths, and pool cleanup. | ||||||||||||||||||||||||||||
| // Throwing here during stack unwinding causes std::terminate(). | ||||||||||||||||||||||||||||
| LOG_ERROR("SQLDisconnect failed (ret=%d), forcing handle cleanup", ret); | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
| LOG_ERROR("SQLDisconnect failed (ret=%d), forcing handle cleanup", ret); | |
| LOG_ERROR("SQLDisconnect failed (ret=%d), forcing handle cleanup", ret); | |
| // Best-effort: retrieve and log ODBC diagnostics for debuggability. | |
| // This must not throw, to keep disconnect noexcept-safe. | |
| try { | |
| ErrorInfo err = SQLCheckError_Wrap(SQL_HANDLE_DBC, _dbcHandle, ret); | |
| std::string diagMsg = WideToUTF8(err.ddbcErrorMsg); | |
| LOG_ERROR("SQLDisconnect diagnostics: %s", diagMsg.c_str()); | |
| } catch (...) { | |
| // Swallow all exceptions: cleanup paths must not throw. | |
| LOG_ERROR("SQLDisconnect: failed to retrieve ODBC diagnostics"); | |
| } |
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -21,20 +21,26 @@ std::shared_ptr<Connection> ConnectionPool::acquire(const std::wstring& connStr, | |||||
| auto now = std::chrono::steady_clock::now(); | ||||||
| size_t before = _pool.size(); | ||||||
|
|
||||||
| LOG("ConnectionPool::acquire: pool_size=%zu, max_size=%zu, idle_timeout=%d", before, | ||||||
| _max_size, _idle_timeout_secs); | ||||||
|
|
||||||
| // Phase 1: Remove stale connections, collect for later disconnect | ||||||
| _pool.erase(std::remove_if(_pool.begin(), _pool.end(), | ||||||
| [&](const std::shared_ptr<Connection>& conn) { | ||||||
| auto idle_time = | ||||||
| std::chrono::duration_cast<std::chrono::seconds>( | ||||||
| now - conn->lastUsed()) | ||||||
| .count(); | ||||||
| if (idle_time > _idle_timeout_secs) { | ||||||
| to_disconnect.push_back(conn); | ||||||
| return true; | ||||||
| } | ||||||
| return false; | ||||||
| }), | ||||||
| _pool.end()); | ||||||
| _pool.erase( | ||||||
| std::remove_if( | ||||||
| _pool.begin(), _pool.end(), | ||||||
| [&](const std::shared_ptr<Connection>& conn) { | ||||||
| auto idle_time = | ||||||
| std::chrono::duration_cast<std::chrono::seconds>(now - conn->lastUsed()) | ||||||
| .count(); | ||||||
| LOG("ConnectionPool::acquire: checking conn idle_time=%lld vs timeout=%d", | ||||||
| (long long)idle_time, _idle_timeout_secs); | ||||||
|
Comment on lines
+35
to
+36
|
||||||
| LOG("ConnectionPool::acquire: checking conn idle_time=%lld vs timeout=%d", | |
| (long long)idle_time, _idle_timeout_secs); |
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -278,81 +278,95 @@ def try_overflow(): | |||||||||||||||||||
| c.close() | ||||||||||||||||||||
|
|
||||||||||||||||||||
|
|
||||||||||||||||||||
| @pytest.mark.skip("Flaky test - idle timeout behavior needs investigation") | ||||||||||||||||||||
| def test_pool_idle_timeout_removes_connections(conn_str): | ||||||||||||||||||||
| """Test that idle_timeout removes connections from the pool after the timeout.""" | ||||||||||||||||||||
| pooling(max_size=2, idle_timeout=1) | ||||||||||||||||||||
| conn1 = connect(conn_str) | ||||||||||||||||||||
| spid_list = [] | ||||||||||||||||||||
| cursor1 = conn1.cursor() | ||||||||||||||||||||
| cursor1.execute("SELECT @@SPID") | ||||||||||||||||||||
| spid1 = cursor1.fetchone()[0] | ||||||||||||||||||||
| spid_list.append(spid1) | ||||||||||||||||||||
| # Use connection_id (a GUID unique per physical connection) instead of @@SPID, | ||||||||||||||||||||
| # because SQL Server can reassign the same SPID to a new connection. | ||||||||||||||||||||
| cursor1.execute("SELECT connection_id FROM sys.dm_exec_connections WHERE session_id = @@SPID") | ||||||||||||||||||||
| conn_id1 = cursor1.fetchone()[0] | ||||||||||||||||||||
| conn1.close() | ||||||||||||||||||||
|
|
||||||||||||||||||||
| # Wait for longer than idle_timeout | ||||||||||||||||||||
| time.sleep(3) | ||||||||||||||||||||
| # Wait well beyond the idle_timeout to account for slow CI and integer-second granularity | ||||||||||||||||||||
| time.sleep(5) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| # Get a new connection, which should not reuse the previous SPID | ||||||||||||||||||||
| # Get a new connection — the idle one should have been evicted during acquire() | ||||||||||||||||||||
| conn2 = connect(conn_str) | ||||||||||||||||||||
| cursor2 = conn2.cursor() | ||||||||||||||||||||
| cursor2.execute("SELECT @@SPID") | ||||||||||||||||||||
| spid2 = cursor2.fetchone()[0] | ||||||||||||||||||||
| spid_list.append(spid2) | ||||||||||||||||||||
| cursor2.execute("SELECT connection_id FROM sys.dm_exec_connections WHERE session_id = @@SPID") | ||||||||||||||||||||
| conn_id2 = cursor2.fetchone()[0] | ||||||||||||||||||||
| conn2.close() | ||||||||||||||||||||
|
|
||||||||||||||||||||
| assert spid1 != spid2, "Idle timeout did not remove connection from pool" | ||||||||||||||||||||
| assert ( | ||||||||||||||||||||
| conn_id1 != conn_id2 | ||||||||||||||||||||
| ), "Idle timeout did not remove connection from pool — same connection_id reused" | ||||||||||||||||||||
|
|
||||||||||||||||||||
|
|
||||||||||||||||||||
| # ============================================================================= | ||||||||||||||||||||
| # Error Handling and Recovery Tests | ||||||||||||||||||||
| # ============================================================================= | ||||||||||||||||||||
|
|
||||||||||||||||||||
|
|
||||||||||||||||||||
| @pytest.mark.skip( | ||||||||||||||||||||
| "Test causes fatal crash - forcibly closing underlying connection leads to undefined behavior" | ||||||||||||||||||||
| ) | ||||||||||||||||||||
| def test_pool_removes_invalid_connections(conn_str): | ||||||||||||||||||||
| """Test that the pool removes connections that become invalid (simulate by closing underlying connection).""" | ||||||||||||||||||||
| """Test that the pool removes connections that become invalid and recovers gracefully. | ||||||||||||||||||||
|
|
||||||||||||||||||||
| This test simulates a connection being returned to the pool in a dirty state | ||||||||||||||||||||
| (with an open transaction) by calling _conn.close() directly, bypassing the | ||||||||||||||||||||
| normal Python close() which does a rollback. The pool's acquire() should detect | ||||||||||||||||||||
| the bad connection during reset(), discard it, and create a fresh one. | ||||||||||||||||||||
| """ | ||||||||||||||||||||
| pooling(max_size=1, idle_timeout=30) | ||||||||||||||||||||
| conn = connect(conn_str) | ||||||||||||||||||||
| cursor = conn.cursor() | ||||||||||||||||||||
| cursor.execute("SELECT 1") | ||||||||||||||||||||
| # Simulate invalidation by forcibly closing the connection at the driver level | ||||||||||||||||||||
| try: | ||||||||||||||||||||
| # Try to access a private attribute or method to forcibly close the underlying connection | ||||||||||||||||||||
| # This is implementation-specific; if not possible, skip | ||||||||||||||||||||
| if hasattr(conn, "_conn") and hasattr(conn._conn, "close"): | ||||||||||||||||||||
| conn._conn.close() | ||||||||||||||||||||
| else: | ||||||||||||||||||||
| pytest.skip("Cannot forcibly close underlying connection for this driver") | ||||||||||||||||||||
| except Exception: | ||||||||||||||||||||
| pass | ||||||||||||||||||||
| # Safely close the connection, ignoring errors due to forced invalidation | ||||||||||||||||||||
| cursor.fetchone() | ||||||||||||||||||||
|
|
||||||||||||||||||||
| # Record the connection_id of the original connection | ||||||||||||||||||||
| cursor.execute("SELECT connection_id FROM sys.dm_exec_connections WHERE session_id = @@SPID") | ||||||||||||||||||||
| original_conn_id = cursor.fetchone()[0] | ||||||||||||||||||||
|
|
||||||||||||||||||||
| # Force-return the connection to the pool WITHOUT rollback. | ||||||||||||||||||||
| # This leaves the pooled connection in a dirty state (open implicit transaction) | ||||||||||||||||||||
| # which will cause reset() to fail on next acquire(). | ||||||||||||||||||||
| conn._conn.close() | ||||||||||||||||||||
|
|
||||||||||||||||||||
| # Python close() will fail since the underlying handle is already gone | ||||||||||||||||||||
| try: | ||||||||||||||||||||
| conn.close() | ||||||||||||||||||||
| except RuntimeError as e: | ||||||||||||||||||||
| if "not initialized" not in str(e): | ||||||||||||||||||||
| raise | ||||||||||||||||||||
| # Now, get a new connection from the pool and ensure it works | ||||||||||||||||||||
| except RuntimeError: | ||||||||||||||||||||
| pass | ||||||||||||||||||||
|
|
||||||||||||||||||||
| # Now get a new connection — the pool should discard the dirty one and create fresh | ||||||||||||||||||||
| new_conn = connect(conn_str) | ||||||||||||||||||||
| new_cursor = new_conn.cursor() | ||||||||||||||||||||
| try: | ||||||||||||||||||||
| new_cursor.execute("SELECT 1") | ||||||||||||||||||||
| result = new_cursor.fetchone() | ||||||||||||||||||||
| assert result is not None and result[0] == 1, "Pool did not remove invalid connection" | ||||||||||||||||||||
| finally: | ||||||||||||||||||||
| new_conn.close() | ||||||||||||||||||||
| new_cursor.execute("SELECT 1") | ||||||||||||||||||||
| result = new_cursor.fetchone() | ||||||||||||||||||||
| assert result is not None and result[0] == 1, "Pool did not recover from invalid connection" | ||||||||||||||||||||
|
|
||||||||||||||||||||
| # Verify it's a different physical connection | ||||||||||||||||||||
| new_cursor.execute( | ||||||||||||||||||||
| "SELECT connection_id FROM sys.dm_exec_connections WHERE session_id = @@SPID" | ||||||||||||||||||||
| ) | ||||||||||||||||||||
| new_conn_id = new_cursor.fetchone()[0] | ||||||||||||||||||||
| assert ( | ||||||||||||||||||||
| original_conn_id != new_conn_id | ||||||||||||||||||||
| ), "Expected a new physical connection after pool discarded the dirty one" | ||||||||||||||||||||
|
Comment on lines
+348
to
+355
|
||||||||||||||||||||
|
|
||||||||||||||||||||
| new_conn.close() | ||||||||||||||||||||
|
|
||||||||||||||||||||
|
|
||||||||||||||||||||
| def test_pool_recovery_after_failed_connection(conn_str): | ||||||||||||||||||||
| """Test that the pool recovers after a failed connection attempt.""" | ||||||||||||||||||||
| pooling(max_size=1, idle_timeout=30) | ||||||||||||||||||||
| # First, try to connect with a bad password (should fail) | ||||||||||||||||||||
| if "Pwd=" in conn_str: | ||||||||||||||||||||
| bad_conn_str = conn_str.replace("Pwd=", "Pwd=wrongpassword") | ||||||||||||||||||||
| elif "Password=" in conn_str: | ||||||||||||||||||||
| bad_conn_str = conn_str.replace("Password=", "Password=wrongpassword") | ||||||||||||||||||||
| import re | ||||||||||||||||||||
|
|
||||||||||||||||||||
| pwd_match = re.search(r"(Pwd|Password)=", conn_str, re.IGNORECASE) | ||||||||||||||||||||
| if pwd_match: | ||||||||||||||||||||
| key = pwd_match.group(0) # e.g. "PWD=" or "Pwd=" or "Password=" | ||||||||||||||||||||
| bad_conn_str = conn_str.replace(key, key + "wrongpassword") | ||||||||||||||||||||
| else: | ||||||||||||||||||||
|
Comment on lines
+366
to
370
|
||||||||||||||||||||
| pwd_match = re.search(r"(Pwd|Password)=", conn_str, re.IGNORECASE) | |
| if pwd_match: | |
| key = pwd_match.group(0) # e.g. "PWD=" or "Pwd=" or "Password=" | |
| bad_conn_str = conn_str.replace(key, key + "wrongpassword") | |
| else: | |
| # Replace the value of the first Pwd/Password key-value pair with "wrongpassword" | |
| pattern = re.compile(r"(?i)(Pwd|Password\s*=\s*)([^;]*)") | |
| bad_conn_str, num_subs = pattern.subn(lambda m: m.group(1) + "wrongpassword", conn_str, count=1) | |
| if num_subs == 0: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
~Connectionnow swallows all exceptions (good), but the log message loses the original error context. Consider catchingconst std::exception&first and loggingex.what()(then a catch-all for non-std exceptions), so debugging unexpected disconnect failures is possible without rethrowing.