Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 36 additions & 103 deletions mssql_python/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,16 +141,16 @@ def __init__(self, connection: "Connection", timeout: int = 0) -> None:
False
] # Indicates if last_executed_stmt was prepared by ddbc shim.
# Is a list instead of a bool coz bools in Python are immutable.

# Initialize attributes that may be defined later to avoid pylint warnings
# Note: _original_fetch* methods are not initialized here as they need to be
# conditionally set based on hasattr() checks
# Hence, we can't pass around bools by reference & modify them.
# Therefore, it must be a list with exactly one bool element.

self._rownumber = -1 # DB-API extension: last returned row index, -1 before first

# Column-name -> index map for the current result set. For catalog/metadata
# result sets this also carries lowercase and friendly aliases (see
# _prepare_metadata_result_set).
self._cached_column_map = None
self._cached_column_map_lower = None
self._cached_converter_map = None
self._uuid_str_indices = None # Pre-computed UUID column indices for str conversion
# Cache the effective native_uuid setting for this cursor's connection.
Expand Down Expand Up @@ -1395,16 +1395,6 @@ def execute( # pylint: disable=too-many-locals,too-many-branches,too-many-state
# Log the actual query being executed
logger.debug("Executing query: %s", operation)

# Restore original fetch methods if they exist
if hasattr(self, "_original_fetchone"):
logger.debug("execute: Restoring original fetch methods")
self.fetchone = self._original_fetchone
self.fetchmany = self._original_fetchmany
self.fetchall = self._original_fetchall
del self._original_fetchone
del self._original_fetchmany
del self._original_fetchall

self._check_closed() # Check if the cursor is closed
if reset_cursor:
if self.hstmt:
Expand Down Expand Up @@ -1598,8 +1588,8 @@ def _prepare_metadata_result_set( # pylint: disable=too-many-statements
Prepares a metadata result set by:
1. Retrieving column metadata if not provided
2. Initializing the description attribute
3. Setting up column name mappings
4. Creating wrapper fetch methods with column mapping support
3. Setting up column name mappings (including any specialized aliases)
so the standard fetch methods return rows keyed by catalog columns

Args:
column_metadata (list, optional): Pre-fetched column metadata.
Expand Down Expand Up @@ -1632,94 +1622,37 @@ def _prepare_metadata_result_set( # pylint: disable=too-many-statements
if not self.description and fallback_description:
self.description = fallback_description

# Define column names in ODBC standard order
self._column_map = {} # pylint: disable=attribute-defined-outside-init
for i, (name, *_) in enumerate(self.description):
# Add standard name
self._column_map[name] = i
# Add lowercase alias
self._column_map[name.lower()] = i

# If specialized mapping is provided, handle it differently
# Build the column-name -> index map for this metadata result set.
# Both the exact name and its lowercase alias are stored so that rows
# support case-insensitive lookup regardless of the global ``lowercase``
# setting (catalog column names are well-known and case-stable).
# ``self.description`` is None when DDBCSQLDescribeCol failed above and no
# fallback_description was supplied (e.g. getTypeInfo()); guard with ``or ()``
# so the cursor simply yields no rows instead of raising TypeError here.
column_map = {}
for i, (name, *_) in enumerate(self.description or ()):
column_map[name] = i
column_map[name.lower()] = i

# Some catalog helpers expose additional friendly aliases (e.g. ODBC 2.x
# vs 3.x column names) via a specialized mapping. Merge them in so they
# resolve to the same column indices.
if specialized_mapping:
# Define specialized fetch methods that use the custom mapping
def fetchone_with_specialized_mapping():
row = self._original_fetchone()
if row is not None:
merged_map = getattr(row, "_column_map", {}).copy()
merged_map.update(specialized_mapping)
row._column_map = merged_map
return row

def fetchmany_with_specialized_mapping(size=None):
rows = self._original_fetchmany(size)
for row in rows:
merged_map = getattr(row, "_column_map", {}).copy()
merged_map.update(specialized_mapping)
row._column_map = merged_map
return rows

def fetchall_with_specialized_mapping():
rows = self._original_fetchall()
for row in rows:
merged_map = getattr(row, "_column_map", {}).copy()
merged_map.update(specialized_mapping)
row._column_map = merged_map
return rows

# Save original fetch methods
if not hasattr(self, "_original_fetchone"):
self._original_fetchone = (
self.fetchone
) # pylint: disable=attribute-defined-outside-init
self._original_fetchmany = (
self.fetchmany
) # pylint: disable=attribute-defined-outside-init
self._original_fetchall = (
self.fetchall
) # pylint: disable=attribute-defined-outside-init

# Use specialized mapping methods
self.fetchone = fetchone_with_specialized_mapping
self.fetchmany = fetchmany_with_specialized_mapping
self.fetchall = fetchall_with_specialized_mapping
else:
# Standard column mapping
# Remember original fetch methods (store only once)
if not hasattr(self, "_original_fetchone"):
self._original_fetchone = (
self.fetchone
) # pylint: disable=attribute-defined-outside-init
self._original_fetchmany = (
self.fetchmany
) # pylint: disable=attribute-defined-outside-init
self._original_fetchall = (
self.fetchall
) # pylint: disable=attribute-defined-outside-init

# Create wrapper fetch methods that add column mappings
def fetchone_with_mapping():
row = self._original_fetchone()
if row is not None:
row._column_map = self._column_map
return row

def fetchmany_with_mapping(size=None):
rows = self._original_fetchmany(size)
for row in rows:
row._column_map = self._column_map
return rows

def fetchall_with_mapping():
rows = self._original_fetchall()
for row in rows:
row._column_map = self._column_map
return rows

# Replace fetch methods
self.fetchone = fetchone_with_mapping
self.fetchmany = fetchmany_with_mapping
self.fetchall = fetchall_with_mapping
column_map.update(specialized_mapping)

# Route the map through the shared fetch path. The standard fetchone/
# fetchmany/fetchall methods read these cached maps when constructing Row
# objects, so metadata rows pick up the catalog column names without any
# per-call method reassignment (the previous approach broke static type
# checking - see GH #620).
self._cached_column_map = column_map
# Mirror execute(): when ``lowercase`` is enabled the description names are
# already lowercased, so build the lowercase lookup map (including any
# specialized aliases) to keep case-insensitive access working for catalog
# rows (e.g. row.TABLE_NAME when lowercase=True).
self._cached_column_map_lower = (
{k.lower(): v for k, v in column_map.items()} if get_settings().lowercase else None
)
Comment thread
gargsaumya marked this conversation as resolved.

# Initialize rownumber tracking so fetchone() and iteration work
self._reset_rownumber()
Expand Down
67 changes: 67 additions & 0 deletions tests/test_004_cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -10578,6 +10578,73 @@ def test_setinputsizes_sql_injection_protection(db_connection):
cursor.execute("DROP TABLE #test_sql_injection")


def test_fetch_methods_not_shadowed_on_instance(cursor):
"""Regression test for GH #620.

The fetch methods (fetchone/fetchmany/fetchall) must remain regular class
methods and never be reassigned as instance attributes. The previous
implementation swapped them for closures on the instance while preparing
catalog/metadata result sets, which produced a union type that broke static
type checkers (e.g. ``ty`` reported a spurious missing ``self`` argument).
"""
fetch_methods = ("fetchone", "fetchmany", "fetchall")

# Pristine cursor: methods come from the class, not the instance.
for name in fetch_methods:
assert name not in cursor.__dict__, f"{name} should not be an instance attribute"

# A catalog helper historically reassigned the fetch methods. Make sure it
# no longer shadows them on the instance.
cursor.getTypeInfo().fetchall()
for name in fetch_methods:
assert name not in cursor.__dict__, f"{name} was shadowed after getTypeInfo()"

# A normal execute must also leave the class methods intact, and the
# column-name cache populated by the earlier getTypeInfo() call must be
# rebuilt so catalog column names do not leak into an ordinary SELECT.
cursor.execute("SELECT 1 AS one")
rows = cursor.fetchall()
assert rows == [[1]]
row = rows[0]
assert row.one == 1
# "TYPE_NAME" belonged to the getTypeInfo() result set. If the cache leaked,
# these would resolve to column 0 (returning 1) instead of raising.
with pytest.raises(AttributeError):
_ = row.TYPE_NAME
with pytest.raises(KeyError):
_ = row["TYPE_NAME"]
for name in fetch_methods:
assert name not in cursor.__dict__, f"{name} was shadowed after execute()"


def test_metadata_case_insensitive_access_when_lowercase(db_connection):
"""Regression test for GH #620 follow-up.

Catalog result sets must keep case-insensitive column access even when the
global ``lowercase`` setting is enabled. With lowercase=True the description
names are lowercased, so the cursor must build a lowercase lookup map for
metadata rows; otherwise original-cased ODBC names like ``TABLE_NAME`` stop
resolving.
"""
original_lowercase = mssql_python.lowercase
try:
mssql_python.lowercase = True
cursor = db_connection.cursor()
try:
row = cursor.getTypeInfo().fetchone()
assert row is not None, "getTypeInfo() should return at least one row"
# Lowercase access (the stored casing) must work...
lower_value = row.type_name
# ...and so must the original ODBC casing, via the lowercase map.
assert row.TYPE_NAME == lower_value
assert row["TYPE_NAME"] == lower_value
assert row["type_name"] == lower_value
finally:
cursor.close()
finally:
mssql_python.lowercase = original_lowercase


def test_gettypeinfo_all_types(cursor):
"""Test getTypeInfo with no arguments returns all data types"""
# Get all type information
Expand Down
Loading