Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 13 additions & 8 deletions src/agents/extensions/memory/advanced_sqlite_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,15 @@ async def add_items(self, items: list[TResponseInputItem]) -> None:
async def get_items(
self,
limit: int | None = None,
offset: int = 0,
branch_id: str | None = None,
) -> list[TResponseInputItem]:
"""Get items from current or specified branch.

Args:
limit: Maximum number of items to return. If None, uses session_settings.limit.
offset: Number of items to skip (from the most recent) before applying the limit.
Defaults to 0. Combined with limit, enables pagination over history.
branch_id: Branch to get items from. If None, uses current branch.

Returns:
Expand All @@ -161,7 +164,7 @@ def _get_all_items_sync():
# TODO: Refactor SQLiteSession to use asyncio.Lock instead of threading.Lock and update this code # noqa: E501
with self._lock if self._is_memory_db else threading.Lock():
with closing(conn.cursor()) as cursor:
if session_limit is None:
if session_limit is None and offset == 0:
cursor.execute(
f"""
SELECT m.message_data
Expand All @@ -173,20 +176,21 @@ def _get_all_items_sync():
(self.session_id, branch_id),
)
else:
sql_limit = session_limit if session_limit is not None else -1
cursor.execute(
f"""
SELECT m.message_data
FROM {self.messages_table} m
JOIN message_structure s ON m.id = s.message_id
WHERE m.session_id = ? AND s.branch_id = ?
ORDER BY s.sequence_number DESC
LIMIT ?
LIMIT ? OFFSET ?
""",
(self.session_id, branch_id, session_limit),
(self.session_id, branch_id, sql_limit, offset),
)

rows = cursor.fetchall()
if session_limit is not None:
if session_limit is not None or offset > 0:
rows = list(reversed(rows))

items = []
Expand All @@ -207,7 +211,7 @@ def _get_items_sync():
with self._lock if self._is_memory_db else threading.Lock():
with closing(conn.cursor()) as cursor:
# Get message IDs in correct order for this branch
if session_limit is None:
if session_limit is None and offset == 0:
cursor.execute(
f"""
SELECT m.message_data
Expand All @@ -219,20 +223,21 @@ def _get_items_sync():
(self.session_id, branch_id),
)
else:
sql_limit = session_limit if session_limit is not None else -1
cursor.execute(
f"""
SELECT m.message_data
FROM {self.messages_table} m
JOIN message_structure s ON m.id = s.message_id
WHERE m.session_id = ? AND s.branch_id = ?
ORDER BY s.sequence_number DESC
LIMIT ?
LIMIT ? OFFSET ?
""",
(self.session_id, branch_id, session_limit),
(self.session_id, branch_id, sql_limit, offset),
)

rows = cursor.fetchall()
if session_limit is not None:
if session_limit is not None or offset > 0:
rows = list(reversed(rows))

items = []
Expand Down
15 changes: 10 additions & 5 deletions src/agents/extensions/memory/async_sqlite_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,19 +102,23 @@ async def _locked_connection(self) -> AsyncIterator[aiosqlite.Connection]:
conn = await self._get_connection()
yield conn

async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]:
async def get_items(
self, limit: int | None = None, offset: int = 0
) -> list[TResponseInputItem]:
"""Retrieve the conversation history for this session.

Args:
limit: Maximum number of items to retrieve. If None, retrieves all items.
When specified, returns the latest N items in chronological order.
offset: Number of items to skip (from the most recent) before applying the limit.
Defaults to 0. Combined with limit, enables pagination over history.

Returns:
List of input items representing the conversation history
"""

async with self._locked_connection() as conn:
if limit is None:
if limit is None and offset == 0:
cursor = await conn.execute(
f"""
SELECT message_data FROM {self.messages_table}
Expand All @@ -124,20 +128,21 @@ async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]:
(self.session_id,),
)
else:
sql_limit = limit if limit is not None else -1
cursor = await conn.execute(
f"""
SELECT message_data FROM {self.messages_table}
WHERE session_id = ?
ORDER BY id DESC
LIMIT ?
LIMIT ? OFFSET ?
""",
(self.session_id, limit),
(self.session_id, sql_limit, offset),
)

rows = list(await cursor.fetchall())
await cursor.close()

if limit is not None:
if limit is not None or offset > 0:
rows = rows[::-1]

items: list[TResponseInputItem] = []
Expand Down
11 changes: 10 additions & 1 deletion src/agents/extensions/memory/dapr_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,12 +232,16 @@ async def _handle_concurrency_conflict(self, error: Exception, attempt: int) ->
# Session protocol implementation
# ------------------------------------------------------------------

async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]:
async def get_items(
self, limit: int | None = None, offset: int = 0
) -> list[TResponseInputItem]:
"""Retrieve the conversation history for this session.

Args:
limit: Maximum number of items to retrieve. If None, uses session_settings.limit.
When specified, returns the latest N items in chronological order.
offset: Number of items to skip (from the most recent) before applying the limit.
Defaults to 0. Combined with limit, enables pagination over history.

Returns:
List of input items representing the conversation history
Expand All @@ -253,6 +257,11 @@ async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]:
)

messages = self._decode_messages(response.data)
if not messages:
return []
# Apply offset: trim from the end
if offset > 0:
messages = messages[:-offset] if offset < len(messages) else []
if not messages:
return []
if session_limit is not None:
Expand Down
6 changes: 4 additions & 2 deletions src/agents/extensions/memory/encrypt_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,10 @@ def _unwrap(self, item: TResponseInputItem | EncryptedEnvelope) -> TResponseInpu
except (InvalidToken, KeyError):
return None

async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]:
encrypted_items = await self.underlying_session.get_items(limit)
async def get_items(
self, limit: int | None = None, offset: int = 0
) -> list[TResponseInputItem]:
encrypted_items = await self.underlying_session.get_items(limit, offset=offset)
valid_items: list[TResponseInputItem] = []
for enc in encrypted_items:
item = self._unwrap(enc)
Expand Down
22 changes: 16 additions & 6 deletions src/agents/extensions/memory/redis_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,28 +140,38 @@ async def _set_ttl_if_configured(self, *keys: str) -> None:
# Session protocol implementation
# ------------------------------------------------------------------

async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]:
async def get_items(
self, limit: int | None = None, offset: int = 0
) -> list[TResponseInputItem]:
"""Retrieve the conversation history for this session.

Args:
limit: Maximum number of items to retrieve. If None, uses session_settings.limit.
When specified, returns the latest N items in chronological order.
offset: Number of items to skip (from the most recent) before applying the limit.
Defaults to 0. Combined with limit, enables pagination over history.

Returns:
List of input items representing the conversation history
"""
session_limit = resolve_session_limit(limit, self.session_settings)

async with self._lock:
if session_limit is None:
if session_limit is None and offset == 0:
# Get all messages in chronological order
raw_messages = await self._redis.lrange(self._messages_key, 0, -1) # type: ignore[misc] # Redis library returns Union[Awaitable[T], T] in async context
else:
elif session_limit is not None:
if session_limit <= 0:
return []
# Get the latest N messages (Redis list is ordered chronologically)
# Use negative indices to get from the end - Redis uses -N to -1 for last N items
raw_messages = await self._redis.lrange(self._messages_key, -session_limit, -1) # type: ignore[misc] # Redis library returns Union[Awaitable[T], T] in async context
# Calculate Redis range indices from the end, accounting for offset
# e.g., limit=3, offset=2 on a list of 10 → indices -5 to -3
end_idx = -(offset + 1)
start_idx = -(offset + session_limit)
raw_messages = await self._redis.lrange(self._messages_key, start_idx, end_idx) # type: ignore[misc]
else:
# offset > 0 but no limit: skip the last `offset` items
end_idx = -(offset + 1)
raw_messages = await self._redis.lrange(self._messages_key, 0, end_idx) # type: ignore[misc]
Comment on lines +173 to +174
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Clamp Redis LRANGE indices for large offsets

The Redis pagination math passes raw negative indices to LRANGE; when offset exceeds the list length, Redis normalizes out-of-range indices instead of erroring, so ranges like 0, -(offset+1) can still return the oldest element(s) instead of an empty list. That makes get_items(offset>=len(history)) return stale data and breaks page boundaries.

Useful? React with 👍 / 👎.


items: list[TResponseInputItem] = []
for raw_msg in raw_messages:
Expand Down
18 changes: 12 additions & 6 deletions src/agents/extensions/memory/sqlalchemy_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,12 +224,16 @@ async def _ensure_tables(self) -> None:
finally:
self._init_lock.release()

async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]:
async def get_items(
self, limit: int | None = None, offset: int = 0
) -> list[TResponseInputItem]:
"""Retrieve the conversation history for this session.

Args:
limit: Maximum number of items to retrieve. If None, uses session_settings.limit.
When specified, returns the latest N items in chronological order.
offset: Number of items to skip (from the most recent) before applying the limit.
Defaults to 0. Combined with limit, enables pagination over history.

Returns:
List of input items representing the conversation history
Expand All @@ -239,7 +243,7 @@ async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]:
session_limit = resolve_session_limit(limit, self.session_settings)

async with self._session_factory() as sess:
if session_limit is None:
if session_limit is None and offset == 0:
stmt = (
select(self._messages.c.message_data)
.where(self._messages.c.session_id == self.session_id)
Expand All @@ -252,19 +256,21 @@ async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]:
stmt = (
select(self._messages.c.message_data)
.where(self._messages.c.session_id == self.session_id)
# Use DESC + LIMIT to get the latest N
# then reverse later for chronological order.
# Use DESC + LIMIT/OFFSET to paginate from most recent,
# then reverse for chronological order.
.order_by(
self._messages.c.created_at.desc(),
self._messages.c.id.desc(),
)
.limit(session_limit)
.offset(offset)
)
if session_limit is not None:
stmt = stmt.limit(session_limit)

result = await sess.execute(stmt)
rows: list[str] = [row[0] for row in result.all()]

if session_limit is not None:
if session_limit is not None or offset > 0:
rows.reverse()

items: list[TResponseInputItem] = []
Expand Down
4 changes: 3 additions & 1 deletion src/agents/memory/openai_conversations_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ async def _get_session_id(self) -> str:
async def _clear_session_id(self) -> None:
self._session_id = None

async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]:
async def get_items(
self, limit: int | None = None, offset: int = 0
) -> list[TResponseInputItem]:
Comment on lines +74 to +75
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Honor offset in OpenAIConversationsSession.get_items

get_items now advertises an offset argument, but this method never applies it in either query path, so get_items(offset=...) returns the same page as offset=0. This breaks pagination specifically for the OpenAI-backed session and can cause duplicate pages when callers iterate through history using offsets.

Useful? React with 👍 / 👎.

session_id = await self._get_session_id()

session_limit = resolve_session_limit(limit, self.session_settings)
Expand Down
6 changes: 4 additions & 2 deletions src/agents/memory/openai_responses_compaction_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,10 @@ async def run_compaction(self, args: OpenAIResponsesCompactionArgs | None = None
f"candidates={len(self._compaction_candidate_items)})"
)

async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]:
return await self.underlying_session.get_items(limit)
async def get_items(
self, limit: int | None = None, offset: int = 0
) -> list[TResponseInputItem]:
return await self.underlying_session.get_items(limit, offset=offset)

async def _defer_compaction(self, response_id: str, store: bool | None = None) -> None:
if self._deferred_response_id is not None:
Expand Down
12 changes: 10 additions & 2 deletions src/agents/memory/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,16 @@ class Session(Protocol):
session_id: str
session_settings: SessionSettings | None = None

async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]:
async def get_items(
self, limit: int | None = None, offset: int = 0
) -> list[TResponseInputItem]:
"""Retrieve the conversation history for this session.

Args:
limit: Maximum number of items to retrieve. If None, retrieves all items.
When specified, returns the latest N items in chronological order.
offset: Number of items to skip (from the end) before applying the limit.
Defaults to 0. Combined with limit, enables pagination over history.

Returns:
List of input items representing the conversation history
Expand Down Expand Up @@ -68,12 +72,16 @@ class SessionABC(ABC):
session_settings: SessionSettings | None = None

@abstractmethod
async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]:
async def get_items(
self, limit: int | None = None, offset: int = 0
) -> list[TResponseInputItem]:
"""Retrieve the conversation history for this session.

Args:
limit: Maximum number of items to retrieve. If None, retrieves all items.
When specified, returns the latest N items in chronological order.
offset: Number of items to skip (from the end) before applying the limit.
Defaults to 0. Combined with limit, enables pagination over history.

Returns:
List of input items representing the conversation history
Expand Down
20 changes: 13 additions & 7 deletions src/agents/memory/sqlite_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,16 @@ def _init_db_for_connection(self, conn: sqlite3.Connection) -> None:

conn.commit()

async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]:
async def get_items(
self, limit: int | None = None, offset: int = 0
) -> list[TResponseInputItem]:
"""Retrieve the conversation history for this session.

Args:
limit: Maximum number of items to retrieve. If None, uses session_settings.limit.
When specified, returns the latest N items in chronological order.
offset: Number of items to skip (from the most recent) before applying the limit.
Defaults to 0. Combined with limit, enables pagination over history.

Returns:
List of input items representing the conversation history
Expand All @@ -129,8 +133,8 @@ async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]:
def _get_items_sync():
conn = self._get_connection()
with self._lock if self._is_memory_db else threading.Lock():
if session_limit is None:
# Fetch all items in chronological order
if session_limit is None and offset == 0:
# Fast path: fetch all items in chronological order
cursor = conn.execute(
f"""
SELECT message_data FROM {self.messages_table}
Expand All @@ -140,21 +144,23 @@ def _get_items_sync():
(self.session_id,),
)
else:
# Fetch the latest N items in chronological order
# Use DESC + LIMIT/OFFSET to paginate from most recent,
# then reverse to return in chronological order.
sql_limit = session_limit if session_limit is not None else -1
cursor = conn.execute(
f"""
SELECT message_data FROM {self.messages_table}
WHERE session_id = ?
ORDER BY id DESC
LIMIT ?
LIMIT ? OFFSET ?
""",
(self.session_id, session_limit),
(self.session_id, sql_limit, offset),
)

rows = cursor.fetchall()

# Reverse to get chronological order when using DESC
if session_limit is not None:
if session_limit is not None or offset > 0:
rows = list(reversed(rows))

items = []
Expand Down
Loading