diff --git a/sentry_sdk/consts.py b/sentry_sdk/consts.py index 59bf2055b2..f078f334e3 100644 --- a/sentry_sdk/consts.py +++ b/sentry_sdk/consts.py @@ -1180,6 +1180,8 @@ class OP: COHERE_CHAT_COMPLETIONS_CREATE = "ai.chat_completions.create.cohere" COHERE_EMBEDDINGS_CREATE = "ai.embeddings.create.cohere" DB = "db" + DB_CURSOR_ITERATOR = "db.cursor.iter" + DB_CURSOR_FETCH = "db.cursor.fetch" DB_REDIS = "db.redis" EVENT_DJANGO = "event.django" FUNCTION = "function" diff --git a/sentry_sdk/integrations/asyncpg.py b/sentry_sdk/integrations/asyncpg.py index a48ec7dbeb..f914967f9e 100644 --- a/sentry_sdk/integrations/asyncpg.py +++ b/sentry_sdk/integrations/asyncpg.py @@ -15,17 +15,29 @@ record_sql_queries, ) from sentry_sdk.utils import ( + ContextVar, capture_internal_exceptions, parse_version, ) try: import asyncpg # type: ignore[import-not-found] - from asyncpg.cursor import BaseCursor # type: ignore + from asyncpg.cursor import ( # type: ignore + BaseCursor, + Cursor, + CursorIterator, + ) except ImportError: raise DidNotEnable("asyncpg not installed.") +_asyncpg_cursor_iterator_is_invoked = ContextVar( + "asyncpg_cursor_iterator_is_invoked", default=False +) +_asyncpg_cursor_fetch_is_invoked = ContextVar( + "asyncpg_cursor_fetch_is_invoked", default=False +) + class AsyncPGIntegration(Integration): identifier = "asyncpg" @@ -53,6 +65,10 @@ def setup_once() -> None: ) asyncpg.Connection.prepare = _wrap_connection_method(asyncpg.Connection.prepare) + CursorIterator.__anext__ = _wrap_cursor_iterator_method( + CursorIterator.__anext__ + ) + Cursor.fetch = _wrap_cursor_fetch_method(Cursor.fetch) BaseCursor._bind_exec = _wrap_cursor_method(BaseCursor._bind_exec) BaseCursor._exec = _wrap_cursor_method(BaseCursor._exec) @@ -161,6 +177,38 @@ async def _inner(*args: "Any", **kwargs: "Any") -> "T": return _inner +def _wrap_cursor_fetch_method( + f: "Callable[..., Awaitable[T]]", +) -> "Callable[..., Awaitable[T]]": + async def _inner(*args: "Any", **kwargs: "Any") -> "T": + if sentry_sdk.get_client().get_integration(AsyncPGIntegration) is None: + return await f(*args, **kwargs) + + _asyncpg_cursor_fetch_is_invoked.set(True) + try: + return await f(*args, **kwargs) + finally: + _asyncpg_cursor_fetch_is_invoked.set(False) + + return _inner + + +def _wrap_cursor_iterator_method( + f: "Callable[..., Awaitable[T]]", +) -> "Callable[..., Awaitable[T]]": + async def _inner(*args: "Any", **kwargs: "Any") -> "T": + if sentry_sdk.get_client().get_integration(AsyncPGIntegration) is None: + return await f(*args, **kwargs) + + _asyncpg_cursor_iterator_is_invoked.set(True) + try: + return await f(*args, **kwargs) + finally: + _asyncpg_cursor_iterator_is_invoked.set(False) + + return _inner + + def _wrap_cursor_method( f: "Callable[..., Awaitable[T]]", ) -> "Callable[..., Awaitable[T]]": @@ -168,6 +216,13 @@ async def _inner(*args: "Any", **kwargs: "Any") -> "T": if sentry_sdk.get_client().get_integration(AsyncPGIntegration) is None: return await f(*args, **kwargs) + if _asyncpg_cursor_iterator_is_invoked.get(): + span_op_override_value = OP.DB_CURSOR_ITERATOR + elif _asyncpg_cursor_fetch_is_invoked.get(): + span_op_override_value = OP.DB_CURSOR_FETCH + else: + span_op_override_value = None + cursor = args[0] query = _normalize_query(cursor._query) with record_sql_queries( @@ -178,6 +233,7 @@ async def _inner(*args: "Any", **kwargs: "Any") -> "T": executemany=False, record_cursor_repr=True, span_origin=AsyncPGIntegration.origin, + span_op_override_value=span_op_override_value, ) as span: _set_db_data(span, cursor._connection) res = await f(*args, **kwargs) diff --git a/sentry_sdk/tracing_utils.py b/sentry_sdk/tracing_utils.py index 3b44bad0bb..411f9923ad 100644 --- a/sentry_sdk/tracing_utils.py +++ b/sentry_sdk/tracing_utils.py @@ -134,6 +134,7 @@ def record_sql_queries( executemany: bool, record_cursor_repr: bool = False, span_origin: str = "manual", + span_op_override_value: "Optional[str]" = None, ) -> "Generator[Union[sentry_sdk.tracing.Span, sentry_sdk.traces.StreamedSpan], None, None]": # TODO: Bring back capturing of params by default client = sentry_sdk.get_client() @@ -167,13 +168,15 @@ def record_sql_queries( name="" if query is None else query, attributes={ "sentry.origin": span_origin, - "sentry.op": OP.DB, + "sentry.op": span_op_override_value + if span_op_override_value + else OP.DB, }, ) as span: yield span else: with sentry_sdk.start_span( - op=OP.DB, + op=span_op_override_value if span_op_override_value is not None else OP.DB, name=query, origin=span_origin, ) as span: diff --git a/tests/integrations/asyncpg/test_asyncpg.py b/tests/integrations/asyncpg/test_asyncpg.py index fee791c338..f4f638e8af 100644 --- a/tests/integrations/asyncpg/test_asyncpg.py +++ b/tests/integrations/asyncpg/test_asyncpg.py @@ -21,7 +21,7 @@ import sentry_sdk from sentry_sdk import capture_message, start_transaction -from sentry_sdk.consts import SPANDATA +from sentry_sdk.consts import OP, SPANDATA from sentry_sdk.integrations.asyncpg import AsyncPGIntegration from sentry_sdk.tracing_utils import record_sql_queries from tests.conftest import ApproxDict @@ -1423,7 +1423,7 @@ async def test_cursor__bind_exec_creates_spans( assert segment["name"] == "test_segment" assert bind_exec_span["attributes"]["sentry.origin"] == "auto.db.asyncpg" - assert bind_exec_span["attributes"]["sentry.op"] == "db" + assert bind_exec_span["attributes"]["sentry.op"] == OP.DB_CURSOR_ITERATOR assert bind_exec_span["attributes"]["db.system.name"] == "postgresql" assert bind_exec_span["attributes"]["db.driver.name"] == "asyncpg" assert bind_exec_span["attributes"]["server.address"] == PG_HOST @@ -1487,6 +1487,173 @@ async def test_cursor__bind_exec_creates_spans( ) +@pytest.mark.asyncio +@pytest.mark.parametrize("span_streaming", [True, False]) +async def test_cursor_iteration_creates_db_cursor_iter_spans( + sentry_init, capture_events, capture_items, span_streaming +) -> None: + """ + Regression test for https://github.com/getsentry/sentry-python/issues/6576 + + When iterating a server-side cursor with a small prefetch, asyncpg fetches + rows in batches. Each batch triggers BaseCursor._bind_exec (on first query) and + BaseCursor._exec (second query onwards) through CursorIterator.__anext__, which creates a + span with the same query description. The resulting burst of identical spans + causes Sentry's N+1 query detector to raise a false positive. + + To mitigate, we set the "op"/"sentry.op" to `db.cursor.iter` instead of `db` + so that the sentry backend can exclude these spans from n+1 detection. + """ + sentry_init( + integrations=[AsyncPGIntegration()], + traces_sample_rate=1.0, + _experiments={ + "trace_lifecycle": "stream" if span_streaming else "static", + }, + ) + + if span_streaming: + items = capture_items("span") + + with sentry_sdk.traces.start_span(name="test_segment"): + conn: Connection = await connect(PG_CONNECTION_URI) + + await conn.executemany( + "INSERT INTO users(name, password, dob) VALUES($1, $2, $3)", + [(f"user-{i}", "pw", datetime.date(1990, 1, 1)) for i in range(20)], + ) + + async with conn.transaction(): + async for _record in conn.cursor("SELECT * FROM users", prefetch=5): + pass + + await conn.close() + + sentry_sdk.flush() + + cursor_iter_spans = [ + item.payload + for item in items + if item.payload.get("name") == "SELECT * FROM users" + ] + + assert len(cursor_iter_spans) == 5 + for span in cursor_iter_spans: + assert span["attributes"]["sentry.op"] == OP.DB_CURSOR_ITERATOR + else: + events = capture_events() + + with start_transaction(name="test_transaction", sampled=True): + conn: Connection = await connect(PG_CONNECTION_URI) + + await conn.executemany( + "INSERT INTO users(name, password, dob) VALUES($1, $2, $3)", + [(f"user-{i}", "pw", datetime.date(1990, 1, 1)) for i in range(20)], + ) + + async with conn.transaction(): + async for _record in conn.cursor("SELECT * FROM users", prefetch=5): + pass + + await conn.close() + + (event,) = events + + cursor_iter_spans = [ + s for s in event["spans"] if s.get("description") == "SELECT * FROM users" + ] + + assert len(cursor_iter_spans) == 5 + for span in cursor_iter_spans: + assert span["op"] == OP.DB_CURSOR_ITERATOR + + +@pytest.mark.asyncio +@pytest.mark.parametrize("span_streaming", [True, False]) +async def test_cursor_fetch_creates_db_cursor_fetch_spans( + sentry_init, capture_events, capture_items, span_streaming +) -> None: + """ + Regression test for https://github.com/getsentry/sentry-python/issues/6576 + + When a user invokes "connection.stream" within SQLAlchemy, SQLAlchemy's dialect + for asyncpg uses asyncpg's "Cursor.fetch" method instead of the "CursorIterator.__anext__" + method. + + Because the "fetch" methods use `_exec` (and our patch for it) under the hood, it makes it appear that + the same query is being executed many times when it is in fact iterating over a result set. + + This results in an accidental trigger of our n+1 detection. + + To mitigate, we set the "op"/"sentry.op" to `db.cursor.fetch` instead of `db` + so that the sentry backend can exclude these spans from n+1 detection. + """ + sentry_init( + integrations=[AsyncPGIntegration()], + traces_sample_rate=1.0, + _experiments={ + "trace_lifecycle": "stream" if span_streaming else "static", + }, + ) + + if span_streaming: + items = capture_items("span") + + with sentry_sdk.traces.start_span(name="test_segment"): + conn: Connection = await connect(PG_CONNECTION_URI) + + await conn.executemany( + "INSERT INTO users(name, password, dob) VALUES($1, $2, $3)", + [(f"user-{i}", "pw", datetime.date(1990, 1, 1)) for i in range(20)], + ) + + async with conn.transaction(): + cur = await conn.cursor("SELECT * FROM users") + await cur.fetch(10) + await cur.fetch(10) + + await conn.close() + + sentry_sdk.flush() + + cursor_fetch_spans = [ + item.payload + for item in items + if item.payload.get("name") == "SELECT * FROM users" + ] + + assert len(cursor_fetch_spans) == 2 + for span in cursor_fetch_spans: + assert span["attributes"]["sentry.op"] == OP.DB_CURSOR_FETCH + else: + events = capture_events() + + with start_transaction(name="test_transaction", sampled=True): + conn: Connection = await connect(PG_CONNECTION_URI) + + await conn.executemany( + "INSERT INTO users(name, password, dob) VALUES($1, $2, $3)", + [(f"user-{i}", "pw", datetime.date(1990, 1, 1)) for i in range(20)], + ) + + async with conn.transaction(): + cur = await conn.cursor("SELECT * FROM users") + await cur.fetch(10) + await cur.fetch(10) + + await conn.close() + + (event,) = events + + cursor_fetch_spans = [ + s for s in event["spans"] if s.get("description") == "SELECT * FROM users" + ] + + assert len(cursor_fetch_spans) == 2 + for span in cursor_fetch_spans: + assert span["op"] == OP.DB_CURSOR_FETCH + + @pytest.mark.asyncio async def test_cursor__exec_methods_create_spans(sentry_init, capture_events) -> None: sentry_init( @@ -1543,6 +1710,7 @@ async def test_cursor__exec_methods_create_spans(sentry_init, capture_events) -> assert span["data"]["db.cursor"] is not None assert span["data"]["db.system"] == "postgresql" assert span["data"]["db.driver.name"] == "asyncpg" + assert span["op"] == OP.DB assert span["origin"] == "auto.db.asyncpg" _assert_query_source( span,