diff --git a/mssql_python/cursor.py b/mssql_python/cursor.py index 421bc35f..e993b393 100644 --- a/mssql_python/cursor.py +++ b/mssql_python/cursor.py @@ -141,16 +141,16 @@ def __init__(self, connection: "Connection", timeout: int = 0) -> None: False ] # Indicates if last_executed_stmt was prepared by ddbc shim. # Is a list instead of a bool coz bools in Python are immutable. - - # Initialize attributes that may be defined later to avoid pylint warnings - # Note: _original_fetch* methods are not initialized here as they need to be - # conditionally set based on hasattr() checks # Hence, we can't pass around bools by reference & modify them. # Therefore, it must be a list with exactly one bool element. self._rownumber = -1 # DB-API extension: last returned row index, -1 before first + # Column-name -> index map for the current result set. For catalog/metadata + # result sets this also carries lowercase and friendly aliases (see + # _prepare_metadata_result_set). self._cached_column_map = None + self._cached_column_map_lower = None self._cached_converter_map = None self._uuid_str_indices = None # Pre-computed UUID column indices for str conversion # Cache the effective native_uuid setting for this cursor's connection. @@ -1395,16 +1395,6 @@ def execute( # pylint: disable=too-many-locals,too-many-branches,too-many-state # Log the actual query being executed logger.debug("Executing query: %s", operation) - # Restore original fetch methods if they exist - if hasattr(self, "_original_fetchone"): - logger.debug("execute: Restoring original fetch methods") - self.fetchone = self._original_fetchone - self.fetchmany = self._original_fetchmany - self.fetchall = self._original_fetchall - del self._original_fetchone - del self._original_fetchmany - del self._original_fetchall - self._check_closed() # Check if the cursor is closed if reset_cursor: if self.hstmt: @@ -1598,8 +1588,8 @@ def _prepare_metadata_result_set( # pylint: disable=too-many-statements Prepares a metadata result set by: 1. Retrieving column metadata if not provided 2. Initializing the description attribute - 3. Setting up column name mappings - 4. Creating wrapper fetch methods with column mapping support + 3. Setting up column name mappings (including any specialized aliases) + so the standard fetch methods return rows keyed by catalog columns Args: column_metadata (list, optional): Pre-fetched column metadata. @@ -1632,94 +1622,37 @@ def _prepare_metadata_result_set( # pylint: disable=too-many-statements if not self.description and fallback_description: self.description = fallback_description - # Define column names in ODBC standard order - self._column_map = {} # pylint: disable=attribute-defined-outside-init - for i, (name, *_) in enumerate(self.description): - # Add standard name - self._column_map[name] = i - # Add lowercase alias - self._column_map[name.lower()] = i - - # If specialized mapping is provided, handle it differently + # Build the column-name -> index map for this metadata result set. + # Both the exact name and its lowercase alias are stored so that rows + # support case-insensitive lookup regardless of the global ``lowercase`` + # setting (catalog column names are well-known and case-stable). + # ``self.description`` is None when DDBCSQLDescribeCol failed above and no + # fallback_description was supplied (e.g. getTypeInfo()); guard with ``or ()`` + # so the cursor simply yields no rows instead of raising TypeError here. + column_map = {} + for i, (name, *_) in enumerate(self.description or ()): + column_map[name] = i + column_map[name.lower()] = i + + # Some catalog helpers expose additional friendly aliases (e.g. ODBC 2.x + # vs 3.x column names) via a specialized mapping. Merge them in so they + # resolve to the same column indices. if specialized_mapping: - # Define specialized fetch methods that use the custom mapping - def fetchone_with_specialized_mapping(): - row = self._original_fetchone() - if row is not None: - merged_map = getattr(row, "_column_map", {}).copy() - merged_map.update(specialized_mapping) - row._column_map = merged_map - return row - - def fetchmany_with_specialized_mapping(size=None): - rows = self._original_fetchmany(size) - for row in rows: - merged_map = getattr(row, "_column_map", {}).copy() - merged_map.update(specialized_mapping) - row._column_map = merged_map - return rows - - def fetchall_with_specialized_mapping(): - rows = self._original_fetchall() - for row in rows: - merged_map = getattr(row, "_column_map", {}).copy() - merged_map.update(specialized_mapping) - row._column_map = merged_map - return rows - - # Save original fetch methods - if not hasattr(self, "_original_fetchone"): - self._original_fetchone = ( - self.fetchone - ) # pylint: disable=attribute-defined-outside-init - self._original_fetchmany = ( - self.fetchmany - ) # pylint: disable=attribute-defined-outside-init - self._original_fetchall = ( - self.fetchall - ) # pylint: disable=attribute-defined-outside-init - - # Use specialized mapping methods - self.fetchone = fetchone_with_specialized_mapping - self.fetchmany = fetchmany_with_specialized_mapping - self.fetchall = fetchall_with_specialized_mapping - else: - # Standard column mapping - # Remember original fetch methods (store only once) - if not hasattr(self, "_original_fetchone"): - self._original_fetchone = ( - self.fetchone - ) # pylint: disable=attribute-defined-outside-init - self._original_fetchmany = ( - self.fetchmany - ) # pylint: disable=attribute-defined-outside-init - self._original_fetchall = ( - self.fetchall - ) # pylint: disable=attribute-defined-outside-init - - # Create wrapper fetch methods that add column mappings - def fetchone_with_mapping(): - row = self._original_fetchone() - if row is not None: - row._column_map = self._column_map - return row - - def fetchmany_with_mapping(size=None): - rows = self._original_fetchmany(size) - for row in rows: - row._column_map = self._column_map - return rows - - def fetchall_with_mapping(): - rows = self._original_fetchall() - for row in rows: - row._column_map = self._column_map - return rows - - # Replace fetch methods - self.fetchone = fetchone_with_mapping - self.fetchmany = fetchmany_with_mapping - self.fetchall = fetchall_with_mapping + column_map.update(specialized_mapping) + + # Route the map through the shared fetch path. The standard fetchone/ + # fetchmany/fetchall methods read these cached maps when constructing Row + # objects, so metadata rows pick up the catalog column names without any + # per-call method reassignment (the previous approach broke static type + # checking - see GH #620). + self._cached_column_map = column_map + # Mirror execute(): when ``lowercase`` is enabled the description names are + # already lowercased, so build the lowercase lookup map (including any + # specialized aliases) to keep case-insensitive access working for catalog + # rows (e.g. row.TABLE_NAME when lowercase=True). + self._cached_column_map_lower = ( + {k.lower(): v for k, v in column_map.items()} if get_settings().lowercase else None + ) # Initialize rownumber tracking so fetchone() and iteration work self._reset_rownumber() diff --git a/tests/test_004_cursor.py b/tests/test_004_cursor.py index 1065d164..9b36c210 100644 --- a/tests/test_004_cursor.py +++ b/tests/test_004_cursor.py @@ -10578,6 +10578,73 @@ def test_setinputsizes_sql_injection_protection(db_connection): cursor.execute("DROP TABLE #test_sql_injection") +def test_fetch_methods_not_shadowed_on_instance(cursor): + """Regression test for GH #620. + + The fetch methods (fetchone/fetchmany/fetchall) must remain regular class + methods and never be reassigned as instance attributes. The previous + implementation swapped them for closures on the instance while preparing + catalog/metadata result sets, which produced a union type that broke static + type checkers (e.g. ``ty`` reported a spurious missing ``self`` argument). + """ + fetch_methods = ("fetchone", "fetchmany", "fetchall") + + # Pristine cursor: methods come from the class, not the instance. + for name in fetch_methods: + assert name not in cursor.__dict__, f"{name} should not be an instance attribute" + + # A catalog helper historically reassigned the fetch methods. Make sure it + # no longer shadows them on the instance. + cursor.getTypeInfo().fetchall() + for name in fetch_methods: + assert name not in cursor.__dict__, f"{name} was shadowed after getTypeInfo()" + + # A normal execute must also leave the class methods intact, and the + # column-name cache populated by the earlier getTypeInfo() call must be + # rebuilt so catalog column names do not leak into an ordinary SELECT. + cursor.execute("SELECT 1 AS one") + rows = cursor.fetchall() + assert rows == [[1]] + row = rows[0] + assert row.one == 1 + # "TYPE_NAME" belonged to the getTypeInfo() result set. If the cache leaked, + # these would resolve to column 0 (returning 1) instead of raising. + with pytest.raises(AttributeError): + _ = row.TYPE_NAME + with pytest.raises(KeyError): + _ = row["TYPE_NAME"] + for name in fetch_methods: + assert name not in cursor.__dict__, f"{name} was shadowed after execute()" + + +def test_metadata_case_insensitive_access_when_lowercase(db_connection): + """Regression test for GH #620 follow-up. + + Catalog result sets must keep case-insensitive column access even when the + global ``lowercase`` setting is enabled. With lowercase=True the description + names are lowercased, so the cursor must build a lowercase lookup map for + metadata rows; otherwise original-cased ODBC names like ``TABLE_NAME`` stop + resolving. + """ + original_lowercase = mssql_python.lowercase + try: + mssql_python.lowercase = True + cursor = db_connection.cursor() + try: + row = cursor.getTypeInfo().fetchone() + assert row is not None, "getTypeInfo() should return at least one row" + # Lowercase access (the stored casing) must work... + lower_value = row.type_name + # ...and so must the original ODBC casing, via the lowercase map. + assert row.TYPE_NAME == lower_value + assert row["TYPE_NAME"] == lower_value + assert row["type_name"] == lower_value + finally: + cursor.close() + finally: + mssql_python.lowercase = original_lowercase + + def test_gettypeinfo_all_types(cursor): """Test getTypeInfo with no arguments returns all data types""" # Get all type information