Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 5 additions & 16 deletions src/agents/extensions/memory/advanced_sqlite_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,26 +133,15 @@ async def add_items(self, items: list[TResponseInputItem]) -> None:
def _add_items_sync():
"""Synchronous helper to add items and structure metadata together."""
with self._locked_connection() as conn:
# Keep both writes in one critical section so message IDs and metadata stay aligned.
self._insert_items(conn, items)
conn.commit()
# Both writes are in one transaction so a metadata failure rolls
# back the message rows too and the caller gets a clean exception.
try:
self._insert_items(conn, items)
self._insert_structure_metadata(conn, items)
conn.commit()
except Exception as e:
except Exception:
conn.rollback()
self._logger.error(
f"Failed to add structure metadata for session {self.session_id}: {e}"
)
try:
deleted_count = self._cleanup_orphaned_messages_sync(conn)
if deleted_count:
conn.commit()
else:
conn.rollback()
except Exception as cleanup_error:
conn.rollback()
self._logger.error(f"Failed to cleanup orphaned messages: {cleanup_error}")
raise

await asyncio.to_thread(_add_items_sync)

Expand Down
40 changes: 40 additions & 0 deletions tests/extensions/memory/test_advanced_sqlite_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -1396,6 +1396,46 @@ async def add_batch(worker_id: int) -> list[str]:
session.close()


async def test_add_items_is_atomic_on_structure_metadata_failure():
"""Regression: add_items must be fully atomic.

If _insert_structure_metadata raises, the message rows must roll back so that
callers observe a clean failure rather than a half-written state where get_items()
returns nothing but the messages table contains orphaned rows.
"""
session = AdvancedSQLiteSession(session_id="atomic_test", create_tables=True)

items: list[TResponseInputItem] = [{"role": "user", "content": "hello"}]

original_insert_metadata = session._insert_structure_metadata

def failing_insert_structure_metadata(conn, items): # type: ignore[override]
raise RuntimeError("simulated metadata failure")

session._insert_structure_metadata = failing_insert_structure_metadata # type: ignore[method-assign]

with pytest.raises(RuntimeError, match="simulated metadata failure"):
await session.add_items(items)

session._insert_structure_metadata = original_insert_metadata

# Messages table must be empty; the transaction was rolled back.
with session._locked_connection() as conn:
count = conn.execute(
f"SELECT COUNT(*) FROM {session.messages_table} WHERE session_id = ?",
(session.session_id,),
).fetchone()[0]
assert count == 0, "orphaned message rows must not remain after a failed add_items call"

# A subsequent successful call must work normally.
await session.add_items(items)
result = await session.get_items()
assert len(result) == 1
assert result[0]["content"] == "hello" # type: ignore[index]

session.close()


async def test_output_tokens_details_persisted_when_input_details_missing():
"""Regression: output_tokens_details must persist even if input_tokens_details is None.

Expand Down
Loading