From 4b76578b402d1e063e83d51289a504b03385efb4 Mon Sep 17 00:00:00 2001 From: ecanlar Date: Tue, 31 Mar 2026 11:10:04 +0200 Subject: [PATCH] Add offset parameter to get_items() for real pagination MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The existing limit parameter only supports truncation (latest N items). This adds an offset parameter across all session backends to enable proper pagination over conversation history. All backends updated: SQLiteSession, AsyncSQLiteSession, SQLAlchemySession, RedisSession, DaprSession, AdvancedSQLiteSession, EncryptedSession, OpenAIResponsesCompactionSession, and OpenAIConversationsSession. Backward compatible — offset defaults to 0. Closes #2810 Co-Authored-By: Claude Opus 4.6 (1M context) --- .../memory/advanced_sqlite_session.py | 21 ++++---- .../extensions/memory/async_sqlite_session.py | 15 ++++-- src/agents/extensions/memory/dapr_session.py | 11 ++++- .../extensions/memory/encrypt_session.py | 6 ++- src/agents/extensions/memory/redis_session.py | 22 ++++++--- .../extensions/memory/sqlalchemy_session.py | 18 ++++--- .../memory/openai_conversations_session.py | 4 +- .../openai_responses_compaction_session.py | 6 ++- src/agents/memory/session.py | 12 ++++- src/agents/memory/sqlite_session.py | 20 +++++--- tests/test_session.py | 48 +++++++++++++++++++ 11 files changed, 143 insertions(+), 40 deletions(-) diff --git a/src/agents/extensions/memory/advanced_sqlite_session.py b/src/agents/extensions/memory/advanced_sqlite_session.py index fcb4743cb3..df7a0a58aa 100644 --- a/src/agents/extensions/memory/advanced_sqlite_session.py +++ b/src/agents/extensions/memory/advanced_sqlite_session.py @@ -138,12 +138,15 @@ async def add_items(self, items: list[TResponseInputItem]) -> None: async def get_items( self, limit: int | None = None, + offset: int = 0, branch_id: str | None = None, ) -> list[TResponseInputItem]: """Get items from current or specified branch. Args: limit: Maximum number of items to return. If None, uses session_settings.limit. + offset: Number of items to skip (from the most recent) before applying the limit. + Defaults to 0. Combined with limit, enables pagination over history. branch_id: Branch to get items from. If None, uses current branch. Returns: @@ -161,7 +164,7 @@ def _get_all_items_sync(): # TODO: Refactor SQLiteSession to use asyncio.Lock instead of threading.Lock and update this code # noqa: E501 with self._lock if self._is_memory_db else threading.Lock(): with closing(conn.cursor()) as cursor: - if session_limit is None: + if session_limit is None and offset == 0: cursor.execute( f""" SELECT m.message_data @@ -173,6 +176,7 @@ def _get_all_items_sync(): (self.session_id, branch_id), ) else: + sql_limit = session_limit if session_limit is not None else -1 cursor.execute( f""" SELECT m.message_data @@ -180,13 +184,13 @@ def _get_all_items_sync(): JOIN message_structure s ON m.id = s.message_id WHERE m.session_id = ? AND s.branch_id = ? ORDER BY s.sequence_number DESC - LIMIT ? + LIMIT ? OFFSET ? """, - (self.session_id, branch_id, session_limit), + (self.session_id, branch_id, sql_limit, offset), ) rows = cursor.fetchall() - if session_limit is not None: + if session_limit is not None or offset > 0: rows = list(reversed(rows)) items = [] @@ -207,7 +211,7 @@ def _get_items_sync(): with self._lock if self._is_memory_db else threading.Lock(): with closing(conn.cursor()) as cursor: # Get message IDs in correct order for this branch - if session_limit is None: + if session_limit is None and offset == 0: cursor.execute( f""" SELECT m.message_data @@ -219,6 +223,7 @@ def _get_items_sync(): (self.session_id, branch_id), ) else: + sql_limit = session_limit if session_limit is not None else -1 cursor.execute( f""" SELECT m.message_data @@ -226,13 +231,13 @@ def _get_items_sync(): JOIN message_structure s ON m.id = s.message_id WHERE m.session_id = ? AND s.branch_id = ? ORDER BY s.sequence_number DESC - LIMIT ? + LIMIT ? OFFSET ? """, - (self.session_id, branch_id, session_limit), + (self.session_id, branch_id, sql_limit, offset), ) rows = cursor.fetchall() - if session_limit is not None: + if session_limit is not None or offset > 0: rows = list(reversed(rows)) items = [] diff --git a/src/agents/extensions/memory/async_sqlite_session.py b/src/agents/extensions/memory/async_sqlite_session.py index 2eef596264..d77cb53156 100644 --- a/src/agents/extensions/memory/async_sqlite_session.py +++ b/src/agents/extensions/memory/async_sqlite_session.py @@ -102,19 +102,23 @@ async def _locked_connection(self) -> AsyncIterator[aiosqlite.Connection]: conn = await self._get_connection() yield conn - async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]: + async def get_items( + self, limit: int | None = None, offset: int = 0 + ) -> list[TResponseInputItem]: """Retrieve the conversation history for this session. Args: limit: Maximum number of items to retrieve. If None, retrieves all items. When specified, returns the latest N items in chronological order. + offset: Number of items to skip (from the most recent) before applying the limit. + Defaults to 0. Combined with limit, enables pagination over history. Returns: List of input items representing the conversation history """ async with self._locked_connection() as conn: - if limit is None: + if limit is None and offset == 0: cursor = await conn.execute( f""" SELECT message_data FROM {self.messages_table} @@ -124,20 +128,21 @@ async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]: (self.session_id,), ) else: + sql_limit = limit if limit is not None else -1 cursor = await conn.execute( f""" SELECT message_data FROM {self.messages_table} WHERE session_id = ? ORDER BY id DESC - LIMIT ? + LIMIT ? OFFSET ? """, - (self.session_id, limit), + (self.session_id, sql_limit, offset), ) rows = list(await cursor.fetchall()) await cursor.close() - if limit is not None: + if limit is not None or offset > 0: rows = rows[::-1] items: list[TResponseInputItem] = [] diff --git a/src/agents/extensions/memory/dapr_session.py b/src/agents/extensions/memory/dapr_session.py index ce6bf754a3..a8c875c085 100644 --- a/src/agents/extensions/memory/dapr_session.py +++ b/src/agents/extensions/memory/dapr_session.py @@ -232,12 +232,16 @@ async def _handle_concurrency_conflict(self, error: Exception, attempt: int) -> # Session protocol implementation # ------------------------------------------------------------------ - async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]: + async def get_items( + self, limit: int | None = None, offset: int = 0 + ) -> list[TResponseInputItem]: """Retrieve the conversation history for this session. Args: limit: Maximum number of items to retrieve. If None, uses session_settings.limit. When specified, returns the latest N items in chronological order. + offset: Number of items to skip (from the most recent) before applying the limit. + Defaults to 0. Combined with limit, enables pagination over history. Returns: List of input items representing the conversation history @@ -253,6 +257,11 @@ async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]: ) messages = self._decode_messages(response.data) + if not messages: + return [] + # Apply offset: trim from the end + if offset > 0: + messages = messages[:-offset] if offset < len(messages) else [] if not messages: return [] if session_limit is not None: diff --git a/src/agents/extensions/memory/encrypt_session.py b/src/agents/extensions/memory/encrypt_session.py index d7f2e8edb9..d446c08e86 100644 --- a/src/agents/extensions/memory/encrypt_session.py +++ b/src/agents/extensions/memory/encrypt_session.py @@ -170,8 +170,10 @@ def _unwrap(self, item: TResponseInputItem | EncryptedEnvelope) -> TResponseInpu except (InvalidToken, KeyError): return None - async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]: - encrypted_items = await self.underlying_session.get_items(limit) + async def get_items( + self, limit: int | None = None, offset: int = 0 + ) -> list[TResponseInputItem]: + encrypted_items = await self.underlying_session.get_items(limit, offset=offset) valid_items: list[TResponseInputItem] = [] for enc in encrypted_items: item = self._unwrap(enc) diff --git a/src/agents/extensions/memory/redis_session.py b/src/agents/extensions/memory/redis_session.py index 1eee549e11..7933ae420f 100644 --- a/src/agents/extensions/memory/redis_session.py +++ b/src/agents/extensions/memory/redis_session.py @@ -140,12 +140,16 @@ async def _set_ttl_if_configured(self, *keys: str) -> None: # Session protocol implementation # ------------------------------------------------------------------ - async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]: + async def get_items( + self, limit: int | None = None, offset: int = 0 + ) -> list[TResponseInputItem]: """Retrieve the conversation history for this session. Args: limit: Maximum number of items to retrieve. If None, uses session_settings.limit. When specified, returns the latest N items in chronological order. + offset: Number of items to skip (from the most recent) before applying the limit. + Defaults to 0. Combined with limit, enables pagination over history. Returns: List of input items representing the conversation history @@ -153,15 +157,21 @@ async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]: session_limit = resolve_session_limit(limit, self.session_settings) async with self._lock: - if session_limit is None: + if session_limit is None and offset == 0: # Get all messages in chronological order raw_messages = await self._redis.lrange(self._messages_key, 0, -1) # type: ignore[misc] # Redis library returns Union[Awaitable[T], T] in async context - else: + elif session_limit is not None: if session_limit <= 0: return [] - # Get the latest N messages (Redis list is ordered chronologically) - # Use negative indices to get from the end - Redis uses -N to -1 for last N items - raw_messages = await self._redis.lrange(self._messages_key, -session_limit, -1) # type: ignore[misc] # Redis library returns Union[Awaitable[T], T] in async context + # Calculate Redis range indices from the end, accounting for offset + # e.g., limit=3, offset=2 on a list of 10 → indices -5 to -3 + end_idx = -(offset + 1) + start_idx = -(offset + session_limit) + raw_messages = await self._redis.lrange(self._messages_key, start_idx, end_idx) # type: ignore[misc] + else: + # offset > 0 but no limit: skip the last `offset` items + end_idx = -(offset + 1) + raw_messages = await self._redis.lrange(self._messages_key, 0, end_idx) # type: ignore[misc] items: list[TResponseInputItem] = [] for raw_msg in raw_messages: diff --git a/src/agents/extensions/memory/sqlalchemy_session.py b/src/agents/extensions/memory/sqlalchemy_session.py index 759ddaf5d5..8628f85d76 100644 --- a/src/agents/extensions/memory/sqlalchemy_session.py +++ b/src/agents/extensions/memory/sqlalchemy_session.py @@ -224,12 +224,16 @@ async def _ensure_tables(self) -> None: finally: self._init_lock.release() - async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]: + async def get_items( + self, limit: int | None = None, offset: int = 0 + ) -> list[TResponseInputItem]: """Retrieve the conversation history for this session. Args: limit: Maximum number of items to retrieve. If None, uses session_settings.limit. When specified, returns the latest N items in chronological order. + offset: Number of items to skip (from the most recent) before applying the limit. + Defaults to 0. Combined with limit, enables pagination over history. Returns: List of input items representing the conversation history @@ -239,7 +243,7 @@ async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]: session_limit = resolve_session_limit(limit, self.session_settings) async with self._session_factory() as sess: - if session_limit is None: + if session_limit is None and offset == 0: stmt = ( select(self._messages.c.message_data) .where(self._messages.c.session_id == self.session_id) @@ -252,19 +256,21 @@ async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]: stmt = ( select(self._messages.c.message_data) .where(self._messages.c.session_id == self.session_id) - # Use DESC + LIMIT to get the latest N - # then reverse later for chronological order. + # Use DESC + LIMIT/OFFSET to paginate from most recent, + # then reverse for chronological order. .order_by( self._messages.c.created_at.desc(), self._messages.c.id.desc(), ) - .limit(session_limit) + .offset(offset) ) + if session_limit is not None: + stmt = stmt.limit(session_limit) result = await sess.execute(stmt) rows: list[str] = [row[0] for row in result.all()] - if session_limit is not None: + if session_limit is not None or offset > 0: rows.reverse() items: list[TResponseInputItem] = [] diff --git a/src/agents/memory/openai_conversations_session.py b/src/agents/memory/openai_conversations_session.py index 4d4fbaf635..d8de1d2d73 100644 --- a/src/agents/memory/openai_conversations_session.py +++ b/src/agents/memory/openai_conversations_session.py @@ -70,7 +70,9 @@ async def _get_session_id(self) -> str: async def _clear_session_id(self) -> None: self._session_id = None - async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]: + async def get_items( + self, limit: int | None = None, offset: int = 0 + ) -> list[TResponseInputItem]: session_id = await self._get_session_id() session_limit = resolve_session_limit(limit, self.session_settings) diff --git a/src/agents/memory/openai_responses_compaction_session.py b/src/agents/memory/openai_responses_compaction_session.py index 4f8fbb37ae..7cdc43ccc0 100644 --- a/src/agents/memory/openai_responses_compaction_session.py +++ b/src/agents/memory/openai_responses_compaction_session.py @@ -239,8 +239,10 @@ async def run_compaction(self, args: OpenAIResponsesCompactionArgs | None = None f"candidates={len(self._compaction_candidate_items)})" ) - async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]: - return await self.underlying_session.get_items(limit) + async def get_items( + self, limit: int | None = None, offset: int = 0 + ) -> list[TResponseInputItem]: + return await self.underlying_session.get_items(limit, offset=offset) async def _defer_compaction(self, response_id: str, store: bool | None = None) -> None: if self._deferred_response_id is not None: diff --git a/src/agents/memory/session.py b/src/agents/memory/session.py index 85a65a1690..0eb3532db9 100644 --- a/src/agents/memory/session.py +++ b/src/agents/memory/session.py @@ -21,12 +21,16 @@ class Session(Protocol): session_id: str session_settings: SessionSettings | None = None - async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]: + async def get_items( + self, limit: int | None = None, offset: int = 0 + ) -> list[TResponseInputItem]: """Retrieve the conversation history for this session. Args: limit: Maximum number of items to retrieve. If None, retrieves all items. When specified, returns the latest N items in chronological order. + offset: Number of items to skip (from the end) before applying the limit. + Defaults to 0. Combined with limit, enables pagination over history. Returns: List of input items representing the conversation history @@ -68,12 +72,16 @@ class SessionABC(ABC): session_settings: SessionSettings | None = None @abstractmethod - async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]: + async def get_items( + self, limit: int | None = None, offset: int = 0 + ) -> list[TResponseInputItem]: """Retrieve the conversation history for this session. Args: limit: Maximum number of items to retrieve. If None, retrieves all items. When specified, returns the latest N items in chronological order. + offset: Number of items to skip (from the end) before applying the limit. + Defaults to 0. Combined with limit, enables pagination over history. Returns: List of input items representing the conversation history diff --git a/src/agents/memory/sqlite_session.py b/src/agents/memory/sqlite_session.py index 92c9630c9b..050db1b551 100644 --- a/src/agents/memory/sqlite_session.py +++ b/src/agents/memory/sqlite_session.py @@ -114,12 +114,16 @@ def _init_db_for_connection(self, conn: sqlite3.Connection) -> None: conn.commit() - async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]: + async def get_items( + self, limit: int | None = None, offset: int = 0 + ) -> list[TResponseInputItem]: """Retrieve the conversation history for this session. Args: limit: Maximum number of items to retrieve. If None, uses session_settings.limit. When specified, returns the latest N items in chronological order. + offset: Number of items to skip (from the most recent) before applying the limit. + Defaults to 0. Combined with limit, enables pagination over history. Returns: List of input items representing the conversation history @@ -129,8 +133,8 @@ async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]: def _get_items_sync(): conn = self._get_connection() with self._lock if self._is_memory_db else threading.Lock(): - if session_limit is None: - # Fetch all items in chronological order + if session_limit is None and offset == 0: + # Fast path: fetch all items in chronological order cursor = conn.execute( f""" SELECT message_data FROM {self.messages_table} @@ -140,21 +144,23 @@ def _get_items_sync(): (self.session_id,), ) else: - # Fetch the latest N items in chronological order + # Use DESC + LIMIT/OFFSET to paginate from most recent, + # then reverse to return in chronological order. + sql_limit = session_limit if session_limit is not None else -1 cursor = conn.execute( f""" SELECT message_data FROM {self.messages_table} WHERE session_id = ? ORDER BY id DESC - LIMIT ? + LIMIT ? OFFSET ? """, - (self.session_id, session_limit), + (self.session_id, sql_limit, offset), ) rows = cursor.fetchall() # Reverse to get chronological order when using DESC - if session_limit is not None: + if session_limit is not None or offset > 0: rows = list(reversed(rows)) items = [] diff --git a/tests/test_session.py b/tests/test_session.py index aaa80ec7aa..086fd66d86 100644 --- a/tests/test_session.py +++ b/tests/test_session.py @@ -368,6 +368,54 @@ async def test_sqlite_session_get_items_with_limit(): session.close() +@pytest.mark.asyncio +async def test_sqlite_session_get_items_with_offset(): + """Test SQLiteSession get_items with offset for pagination.""" + with tempfile.TemporaryDirectory() as temp_dir: + db_path = Path(temp_dir) / "test_offset.db" + session_id = "offset_test" + session = SQLiteSession(session_id, db_path) + + # Add 6 items: Message 1, Response 1, ... Message 3, Response 3 + items: list[TResponseInputItem] = [ + {"role": "user", "content": f"Message {i}"} + if i % 2 == 0 + else {"role": "assistant", "content": f"Response {i}"} + for i in range(6) + ] + await session.add_items(items) + + # Page 1: 2 most recent items (offset=0) + page1 = await session.get_items(limit=2, offset=0) + assert len(page1) == 2 + assert page1[0].get("content") == "Message 4" + assert page1[1].get("content") == "Response 5" + + # Page 2: next 2 items (offset=2) + page2 = await session.get_items(limit=2, offset=2) + assert len(page2) == 2 + assert page2[0].get("content") == "Message 2" + assert page2[1].get("content") == "Response 3" + + # Page 3: last 2 items (offset=4) + page3 = await session.get_items(limit=2, offset=4) + assert len(page3) == 2 + assert page3[0].get("content") == "Message 0" + assert page3[1].get("content") == "Response 1" + + # Offset beyond available items + empty = await session.get_items(limit=2, offset=10) + assert len(empty) == 0 + + # Offset without limit: skip the 2 most recent, return the rest + skipped = await session.get_items(offset=2) + assert len(skipped) == 4 + assert skipped[0].get("content") == "Message 0" + assert skipped[-1].get("content") == "Response 3" + + session.close() + + @pytest.mark.parametrize("runner_method", ["run", "run_sync", "run_streamed"]) @pytest.mark.asyncio async def test_session_memory_appends_list_input_by_default(runner_method):