From 6ead71bab85808fcb5e1d1c788ef5ef3e8eb1784 Mon Sep 17 00:00:00 2001 From: eavanvalkenburg Date: Tue, 14 Apr 2026 14:22:46 +0200 Subject: [PATCH 1/9] add experimental file history provider --- .worktrees/devui_datastar | 1 + .worktrees/issue-4675-duplicate-telemetry | 1 + .worktrees/issue-4676-a2a-sdk-update | 1 + python/packages/core/AGENTS.md | 2 + .../packages/core/agent_framework/__init__.py | 2 + .../core/agent_framework/_feature_stage.py | 1 + .../core/agent_framework/_sessions.py | 220 +++++++++++++++++- .../packages/core/tests/core/test_sessions.py | 135 +++++++++++ .../samples/02-agents/conversations/README.md | 16 ++ .../conversations/file_history_provider.py | 150 ++++++++++++ 10 files changed, 528 insertions(+), 1 deletion(-) create mode 160000 .worktrees/devui_datastar create mode 160000 .worktrees/issue-4675-duplicate-telemetry create mode 160000 .worktrees/issue-4676-a2a-sdk-update create mode 100644 python/samples/02-agents/conversations/file_history_provider.py diff --git a/.worktrees/devui_datastar b/.worktrees/devui_datastar new file mode 160000 index 0000000000..bf8d9672e1 --- /dev/null +++ b/.worktrees/devui_datastar @@ -0,0 +1 @@ +Subproject commit bf8d9672e147c42696a5a17b0ed37878196b6715 diff --git a/.worktrees/issue-4675-duplicate-telemetry b/.worktrees/issue-4675-duplicate-telemetry new file mode 160000 index 0000000000..55cc6e85c0 --- /dev/null +++ b/.worktrees/issue-4675-duplicate-telemetry @@ -0,0 +1 @@ +Subproject commit 55cc6e85c08db4d7795a48e85261655efd895409 diff --git a/.worktrees/issue-4676-a2a-sdk-update b/.worktrees/issue-4676-a2a-sdk-update new file mode 160000 index 0000000000..c551983295 --- /dev/null +++ b/.worktrees/issue-4676-a2a-sdk-update @@ -0,0 +1 @@ +Subproject commit c5519832953763b847b7cacc515edb78cf50d28d diff --git a/python/packages/core/AGENTS.md b/python/packages/core/AGENTS.md index d6940289ac..30f946435a 100644 --- a/python/packages/core/AGENTS.md +++ b/python/packages/core/AGENTS.md @@ -63,6 +63,8 @@ agent_framework/ - **`SessionContext`** - Context object for session-scoped data during agent runs - **`ContextProvider`** - Base class for context providers (RAG, memory systems) - **`HistoryProvider`** - Base class for conversation history storage +- **`InMemoryHistoryProvider`** - Built-in session-state history provider for local runs +- **`FileHistoryProvider`** - JSON Lines file-backed history provider storing one file per session with one message record per line ### Skills (`_skills.py`) diff --git a/python/packages/core/agent_framework/__init__.py b/python/packages/core/agent_framework/__init__.py index 497fc1496d..7475b1eb96 100644 --- a/python/packages/core/agent_framework/__init__.py +++ b/python/packages/core/agent_framework/__init__.py @@ -103,6 +103,7 @@ from ._sessions import ( AgentSession, ContextProvider, + FileHistoryProvider, HistoryProvider, InMemoryHistoryProvider, SessionContext, @@ -318,6 +319,7 @@ "FanInEdgeGroup", "FanOutEdgeGroup", "FileCheckpointStorage", + "FileHistoryProvider", "FinalT", "FinishReason", "FinishReasonLiteral", diff --git a/python/packages/core/agent_framework/_feature_stage.py b/python/packages/core/agent_framework/_feature_stage.py index 6fb698768c..1bda62b5d3 100644 --- a/python/packages/core/agent_framework/_feature_stage.py +++ b/python/packages/core/agent_framework/_feature_stage.py @@ -47,6 +47,7 @@ class ExperimentalFeature(str, Enum): """ EVALS = "EVALS" + FILE_HISTORY = "FILE_HISTORY" SKILLS = "SKILLS" diff --git a/python/packages/core/agent_framework/_sessions.py b/python/packages/core/agent_framework/_sessions.py index 55d1a10a18..d2d6f5c3fa 100644 --- a/python/packages/core/agent_framework/_sessions.py +++ b/python/packages/core/agent_framework/_sessions.py @@ -8,16 +8,22 @@ - HistoryProvider: Base class for history storage providers - AgentSession: Lightweight session state container - InMemoryHistoryProvider: Built-in in-memory history provider +- FileHistoryProvider: Built-in JSON Lines file history provider """ from __future__ import annotations +import asyncio import copy +import json import uuid from abc import abstractmethod +from base64 import urlsafe_b64encode from collections.abc import Awaitable, Callable, Mapping, Sequence -from typing import TYPE_CHECKING, Any, ClassVar, TypeGuard, cast +from pathlib import Path +from typing import TYPE_CHECKING, Any, ClassVar, TypeAlias, TypeGuard, cast +from ._feature_stage import ExperimentalFeature, experimental from ._middleware import ChatContext, ChatMiddleware from ._types import AgentResponse, ChatResponse, Message, ResponseStream from .exceptions import ChatClientInvalidResponseException @@ -30,6 +36,17 @@ # Registry of known types for state deserialization _STATE_TYPE_REGISTRY: dict[str, type] = {} +JsonDumps: TypeAlias = Callable[[Any], str | bytes] +JsonLoads: TypeAlias = Callable[[str | bytes], Any] + + +def _default_json_dumps(value: Any) -> str: + return json.dumps(value, ensure_ascii=False) + + +def _default_json_loads(value: str | bytes) -> Any: + return json.loads(value) + def _is_middleware_sequence( middleware: MiddlewareTypes | Sequence[MiddlewareTypes], @@ -837,3 +854,204 @@ async def save_messages( return existing = state.get("messages", []) state["messages"] = [*existing, *messages] + + +@experimental(feature_id=ExperimentalFeature.FILE_HISTORY) +class FileHistoryProvider(HistoryProvider): + """File-backed history provider that stores one JSON Lines file per session. + + Each persisted message is written as a single JSON object per line. The + provider does not serialize full session snapshots into the file. By default + it uses the standard library ``json`` module, but callers can inject + alternative ``dumps`` and ``loads`` callables compatible with the JSON + Lines format. + """ + + DEFAULT_SOURCE_ID: ClassVar[str] = "file_history" + DEFAULT_SESSION_FILE_STEM: ClassVar[str] = "default" + FILE_EXTENSION: ClassVar[str] = ".jsonl" + _ENCODED_SESSION_PREFIX: ClassVar[str] = "~session-" + _WINDOWS_RESERVED_FILE_STEMS: ClassVar[frozenset[str]] = frozenset({ + "CON", + "PRN", + "AUX", + "NUL", + "COM1", + "COM2", + "COM3", + "COM4", + "COM5", + "COM6", + "COM7", + "COM8", + "COM9", + "LPT1", + "LPT2", + "LPT3", + "LPT4", + "LPT5", + "LPT6", + "LPT7", + "LPT8", + "LPT9", + }) + + def __init__( + self, + storage_path: str | Path, + *, + source_id: str = DEFAULT_SOURCE_ID, + load_messages: bool = True, + store_inputs: bool = True, + store_context_messages: bool = False, + store_context_from: set[str] | None = None, + store_outputs: bool = True, + skip_excluded: bool = False, + dumps: JsonDumps | None = None, + loads: JsonLoads | None = None, + ) -> None: + """Initialize the file history provider. + + Args: + storage_path: Directory path where session history files will be stored. + + Keyword Args: + source_id: Unique identifier for this provider instance. + load_messages: Whether to load messages before invocation. + store_inputs: Whether to store input messages. + store_context_messages: Whether to store context from other providers. + store_context_from: If set, only store context from these source_ids. + store_outputs: Whether to store response messages. + skip_excluded: When True, ``get_messages`` omits messages whose + ``additional_properties["_excluded"]`` is truthy. + dumps: Callable that serializes a message payload dict to JSON text + or UTF-8 bytes. The returned JSON must fit on a single line. + loads: Callable that deserializes JSON text or bytes back to a + message payload dict. + """ + super().__init__( + source_id=source_id, + load_messages=load_messages, + store_inputs=store_inputs, + store_context_messages=store_context_messages, + store_context_from=store_context_from, + store_outputs=store_outputs, + ) + self.storage_path = Path(storage_path) + self.storage_path.mkdir(parents=True, exist_ok=True) + self._storage_root = self.storage_path.resolve() + self.skip_excluded = skip_excluded + self.dumps = dumps or _default_json_dumps + self.loads = loads or _default_json_loads + + async def get_messages( + self, + session_id: str | None, + *, + state: dict[str, Any] | None = None, + **kwargs: Any, + ) -> list[Message]: + """Retrieve messages from the session's JSON Lines file.""" + del state, kwargs + file_path = self._session_file_path(session_id) + + def _read_messages() -> list[Message]: + if not file_path.exists(): + return [] + + messages: list[Message] = [] + with file_path.open(encoding="utf-8") as file_handle: + for line_number, line in enumerate(file_handle, start=1): + serialized = line.strip() + if not serialized: + continue + try: + payload = self.loads(serialized) + except (TypeError, ValueError) as exc: + raise ValueError( + f"Failed to deserialize history line {line_number} from '{file_path}'." + ) from exc + if not isinstance(payload, Mapping): + raise ValueError( + f"History line {line_number} in '{file_path}' did not deserialize to a mapping." + ) + + try: + message = Message.from_dict(dict(cast(Mapping[str, Any], payload))) + except ValueError as exc: + raise ValueError( + f"History line {line_number} in '{file_path}' is not a valid Message payload." + ) from exc + messages.append(message) + return messages + + messages = await asyncio.to_thread(_read_messages) + if self.skip_excluded: + messages = [m for m in messages if not m.additional_properties.get("_excluded", False)] + return messages + + async def save_messages( + self, + session_id: str | None, + messages: Sequence[Message], + *, + state: dict[str, Any] | None = None, + **kwargs: Any, + ) -> None: + """Append messages to the session's JSON Lines file.""" + del state, kwargs + if not messages: + return + + file_path = self._session_file_path(session_id) + + def _append_messages() -> None: + serialized_messages = [self._serialize_message(message) for message in messages] + with file_path.open("a", encoding="utf-8") as file_handle: + file_handle.write("".join(f"{serialized_message}\n" for serialized_message in serialized_messages)) + + await asyncio.to_thread(_append_messages) + + def _serialize_message(self, message: Message) -> str: + """Serialize a message payload to a single JSON Lines record.""" + serialized = self.dumps(message.to_dict()) + if isinstance(serialized, bytes): + serialized_text = serialized.decode("utf-8") + elif isinstance(serialized, str): + serialized_text = serialized + else: + raise TypeError("FileHistoryProvider.dumps must return str or bytes.") + + if "\n" in serialized_text or "\r" in serialized_text: + raise ValueError("FileHistoryProvider.dumps must return single-line JSON for JSON Lines storage.") + return serialized_text + + def _session_file_path(self, session_id: str | None) -> Path: + """Resolve the on-disk history file path for a session.""" + file_path = (self._storage_root / f"{self._session_file_stem(session_id)}{self.FILE_EXTENSION}").resolve() + if not file_path.is_relative_to(self._storage_root): + raise ValueError(f"Session history path escaped storage directory: {session_id!r}") + return file_path + + def _session_file_stem(self, session_id: str | None) -> str: + """Return the filename stem for a session.""" + raw_session_id = session_id or self.DEFAULT_SESSION_FILE_STEM + if self._is_literal_session_file_stem_safe(raw_session_id): + return raw_session_id + + encoded_session_id = urlsafe_b64encode(raw_session_id.encode("utf-8")).decode("ascii").rstrip("=") + return f"{self._ENCODED_SESSION_PREFIX}{encoded_session_id or self.DEFAULT_SESSION_FILE_STEM}" + + @classmethod + def _is_literal_session_file_stem_safe(cls, session_id: str) -> bool: + """Return whether the session ID can be used directly as a filename stem.""" + if ( + not session_id + or session_id.startswith(".") + or session_id.endswith((" ", ".")) + or session_id.upper() in cls._WINDOWS_RESERVED_FILE_STEMS + ): + return False + if any(ord(character) < 32 for character in session_id): + return False + return all(character.isalnum() or character in "._-" for character in session_id) diff --git a/python/packages/core/tests/core/test_sessions.py b/python/packages/core/tests/core/test_sessions.py index e5eacebfe5..893555f84a 100644 --- a/python/packages/core/tests/core/test_sessions.py +++ b/python/packages/core/tests/core/test_sessions.py @@ -1,7 +1,9 @@ # Copyright (c) Microsoft. All rights reserved. +import asyncio import json from collections.abc import Awaitable, Callable, Sequence +from pathlib import Path import pytest @@ -10,6 +12,8 @@ AgentSession, ChatContext, ContextProvider, + ExperimentalFeature, + FileHistoryProvider, HistoryProvider, InMemoryHistoryProvider, Message, @@ -505,3 +509,134 @@ async def test_source_id_attribution(self) -> None: ctx = SessionContext(session_id="s1", input_messages=[]) ctx.extend_messages("custom-source", [Message(role="user", contents=["test"])]) assert "custom-source" in ctx.context_messages + + +class TestFileHistoryProvider: + def test_is_marked_experimental(self) -> None: + assert FileHistoryProvider.__feature_stage__ == "experimental" + assert FileHistoryProvider.__feature_id__ == ExperimentalFeature.FILE_HISTORY.value + assert FileHistoryProvider.__doc__ is not None + assert ".. warning:: Experimental" in FileHistoryProvider.__doc__ + + async def test_stores_and_loads_messages(self, tmp_path: Path) -> None: + from agent_framework import AgentResponse + + provider = FileHistoryProvider(tmp_path) + session = AgentSession(session_id="s1") + + input_message = Message(role="user", contents=["hello"]) + response_message = Message(role="assistant", contents=["hi there"]) + first_context = SessionContext(session_id=session.session_id, input_messages=[input_message]) + + await provider.before_run( # type: ignore[arg-type] + agent=None, + session=session, + context=first_context, + state={}, + ) + first_context._response = AgentResponse(messages=[response_message]) + await provider.after_run( # type: ignore[arg-type] + agent=None, + session=session, + context=first_context, + state={}, + ) + + session_file = provider._session_file_path(session.session_id) + assert session_file.name == "s1.jsonl" + assert session_file.exists() + raw_lines = (await asyncio.to_thread(session_file.read_text, encoding="utf-8")).splitlines() + assert len(raw_lines) == 2 + payloads = [json.loads(line) for line in raw_lines] + assert all(payload["type"] == "message" for payload in payloads) + assert all("session_id" not in payload for payload in payloads) + + second_context = SessionContext( + session_id=session.session_id, input_messages=[Message(role="user", contents=["again"])] + ) + await provider.before_run( # type: ignore[arg-type] + agent=None, + session=session, + context=second_context, + state={}, + ) + loaded = second_context.context_messages.get(provider.source_id, []) + assert len(loaded) == 2 + assert loaded[0].text == "hello" + assert loaded[1].text == "hi there" + + def test_creates_storage_directory(self, tmp_path: Path) -> None: + nested_path = tmp_path / "nested" / "history" + provider = FileHistoryProvider(nested_path) + assert provider.storage_path == nested_path + assert nested_path.exists() + assert nested_path.is_dir() + + async def test_uses_encoded_filename_for_unsafe_session_id(self, tmp_path: Path) -> None: + provider = FileHistoryProvider(tmp_path) + unsafe_session_id = "../unsafe/session" + + await provider.save_messages(unsafe_session_id, [Message(role="user", contents=["hello"])]) + + session_file = provider._session_file_path(unsafe_session_id) + assert session_file.parent == provider.storage_path + assert session_file.name.startswith("~session-") + assert session_file.suffix == ".jsonl" + assert session_file.exists() + jsonl_files = await asyncio.to_thread( + lambda: sorted(path.name for path in provider.storage_path.glob("*.jsonl")) + ) + assert jsonl_files == [session_file.name] + + async def test_allows_custom_serializers_returning_bytes(self, tmp_path: Path) -> None: + calls: list[str] = [] + + def dumps(payload: object) -> bytes: + calls.append("dumps") + return json.dumps(payload).encode("utf-8") + + def loads(payload: str | bytes) -> object: + calls.append("loads") + if isinstance(payload, bytes): + payload = payload.decode("utf-8") + return json.loads(payload) + + provider = FileHistoryProvider(tmp_path, dumps=dumps, loads=loads) + + await provider.save_messages("custom-serializer", [Message(role="user", contents=["hello"])]) + loaded = await provider.get_messages("custom-serializer") + + assert calls == ["dumps", "loads"] + assert len(loaded) == 1 + assert loaded[0].text == "hello" + + async def test_invalid_jsonl_line_raises(self, tmp_path: Path) -> None: + provider = FileHistoryProvider(tmp_path) + await asyncio.to_thread(provider._session_file_path("broken").write_text, "{not-json}\n", encoding="utf-8") + + with pytest.raises(ValueError, match="Failed to deserialize history line 1"): + await provider.get_messages("broken") + + async def test_skip_excluded_omits_excluded_messages(self, tmp_path: Path) -> None: + provider = FileHistoryProvider(tmp_path, skip_excluded=True) + + await provider.save_messages( + "skip-excluded", + [ + Message(role="user", contents=["keep"]), + Message(role="assistant", contents=["skip"], additional_properties={"_excluded": True}), + ], + ) + + loaded = await provider.get_messages("skip-excluded") + + assert [message.text for message in loaded] == ["keep"] + + async def test_serializer_must_return_single_line_json(self, tmp_path: Path) -> None: + def dumps(payload: object) -> str: + return json.dumps(payload, indent=2) + + provider = FileHistoryProvider(tmp_path, dumps=dumps) + + with pytest.raises(ValueError, match="single-line JSON"): + await provider.save_messages("pretty-json", [Message(role="user", contents=["hello"])]) diff --git a/python/samples/02-agents/conversations/README.md b/python/samples/02-agents/conversations/README.md index bbfb078659..1780033a6e 100644 --- a/python/samples/02-agents/conversations/README.md +++ b/python/samples/02-agents/conversations/README.md @@ -8,6 +8,8 @@ These samples demonstrate different approaches to managing conversation history |------|-------------| | [`suspend_resume_session.py`](suspend_resume_session.py) | Suspend and resume conversation sessions, comparing service-managed sessions (Azure AI Foundry) with in-memory sessions (OpenAI). | | [`custom_history_provider.py`](custom_history_provider.py) | Implement a custom history provider by extending `HistoryProvider`, enabling conversation persistence in your preferred storage backend. | +| [`file_history_provider.py`](file_history_provider.py) | Use the experimental `FileHistoryProvider` with `FoundryChatClient` and a function tool so the local JSON Lines file shows the full tool-calling loop. | +| [`file_history_provider_orjson.py`](file_history_provider_orjson.py) | Configure `FileHistoryProvider` to use `orjson`, compare it to the standard library `json`, and preview the compact JSONL output. | | [`cosmos_history_provider.py`](cosmos_history_provider.py) | Use Azure Cosmos DB as a history provider for durable conversation storage with `CosmosHistoryProvider`. | | [`cosmos_history_provider_conversation_persistence.py`](cosmos_history_provider_conversation_persistence.py) | Persist and resume conversations across application restarts using `CosmosHistoryProvider` — serialize session state, restore it, and continue with full Cosmos DB history. | | [`cosmos_history_provider_messages.py`](cosmos_history_provider_messages.py) | Direct message history operations — retrieve stored messages as a transcript, clear session history, and verify data deletion. | @@ -25,6 +27,20 @@ These samples demonstrate different approaches to managing conversation history **For `custom_history_provider.py`:** - `OPENAI_API_KEY`: Your OpenAI API key +**For `file_history_provider.py`:** +- `FOUNDRY_PROJECT_ENDPOINT`: Your Azure AI Foundry project endpoint +- `FOUNDRY_MODEL`: The Foundry model deployment name +- Azure CLI authentication (`az login`) + +**For `file_history_provider_orjson.py`:** +- Run with a PEP 723-compatible runner so `orjson` is installed automatically, for example: + `uv run samples/02-agents/conversations/file_history_provider_orjson.py` +- The serializer benchmark runs without external services. +- For the optional live Foundry conversation: + - `FOUNDRY_PROJECT_ENDPOINT`: Your Azure AI Foundry project endpoint + - `FOUNDRY_MODEL`: The Foundry model deployment name + - Azure CLI authentication (`az login`) + **For Cosmos DB samples (`cosmos_history_provider*.py`):** - `FOUNDRY_PROJECT_ENDPOINT`: Your Azure AI Foundry project endpoint - `FOUNDRY_MODEL`: The Foundry model deployment name diff --git a/python/samples/02-agents/conversations/file_history_provider.py b/python/samples/02-agents/conversations/file_history_provider.py new file mode 100644 index 0000000000..d3c089c0e7 --- /dev/null +++ b/python/samples/02-agents/conversations/file_history_provider.py @@ -0,0 +1,150 @@ +# Copyright (c) Microsoft. All rights reserved. + +from __future__ import annotations + +import asyncio +import os +import tempfile +from collections.abc import Iterator +from contextlib import contextmanager +from pathlib import Path +from typing import Annotated + +# Uncomment this filter to suppress the experimental FileHistoryProvider warning +# before running the sample. +# import warnings # isort: skip +# warnings.filterwarnings("ignore", message=r"\[FILE_HISTORY\].*", category=FutureWarning) +from agent_framework import Agent, FileHistoryProvider, tool +from agent_framework.foundry import FoundryChatClient +from azure.identity import AzureCliCredential +from dotenv import load_dotenv +from pydantic import Field + +try: + import orjson +except ImportError: + orjson = None + + +# Load environment variables from .env file. +load_dotenv() + +""" +File History Provider + +This sample demonstrates how to use the experimental `FileHistoryProvider` with +`FoundryChatClient` and a function tool so the persisted JSON Lines file shows +the tool-calling loop as well as the regular chat turns. + +Environment variables: + FOUNDRY_PROJECT_ENDPOINT: Azure AI Foundry project endpoint. + FOUNDRY_MODEL: Foundry model deployment name. + +Key components: +- `FileHistoryProvider`: Stores one message JSON object per line in a local + `.jsonl` file for each session. +- `lookup_weather`: A function tool that makes the persisted file show the + assistant function call and tool result lines. +- `json.dumps(..., indent=2)`: Pretty-prints selected records in the sample + output while keeping the on-disk JSONL file compact and valid. +- `USE_TEMP_DIRECTORY`: Toggle between a temporary directory and a persistent + `sessions/` folder next to this sample file. +""" + +USE_TEMP_DIRECTORY = False +"""When True, store JSONL files in a temporary directory for this run only.""" + +LOCAL_SESSIONS_DIRECTORY_NAME = "sessions" +"""Folder name used when persisting history next to this sample file.""" + + +@tool(approval_mode="never_require") +def lookup_weather( + location: Annotated[str, Field(description="The city to look up weather for.")], +) -> str: + """Return a deterministic weather report for a city.""" + weather_reports = { + "Seattle": "Seattle is rainy with a high of 13C.", + "Amsterdam": "Amsterdam is cloudy with a high of 16C.", + } + return weather_reports.get(location, f"{location} is sunny with a high of 20C.") + + +@contextmanager +def _resolve_storage_directory() -> Iterator[Path]: + """Yield the configured storage directory for the sample run.""" + if USE_TEMP_DIRECTORY: + with tempfile.TemporaryDirectory(prefix="af-file-history-") as temp_directory: + yield Path(temp_directory) + return + + storage_directory = Path(__file__).resolve().parent / LOCAL_SESSIONS_DIRECTORY_NAME + storage_directory.mkdir(parents=True, exist_ok=True) + yield storage_directory + + +async def main() -> None: + """Run the file history provider sample.""" + + with _resolve_storage_directory() as storage_directory: + print(f"Using temporary directory: {USE_TEMP_DIRECTORY}") + print(f"Storage directory: {storage_directory}\n") + + # 2. Create the agent with a tool so the JSONL file includes tool-calling messages. + agent = Agent( + client=FoundryChatClient( + project_endpoint=os.getenv("FOUNDRY_PROJECT_ENDPOINT"), + model="gpt-4.1-mini", + credential=AzureCliCredential(), + ), + name="FileHistoryAgent", + instructions=( + "You are a helpful assistant, use the lookup_weather tool for weather questions and " + "answer with the tool result in one sentence." + ), + tools=[lookup_weather], + # if orjson is available, use it for more faster JSON serialization in the FileHistoryProvider, + # otherwise fall back to the default json module. + context_providers=[ + FileHistoryProvider( + storage_directory, + dumps=orjson.dumps if orjson else None, + loads=orjson.loads if orjson else None, + ) + ], + default_options={"store": False}, + ) + + # 3. Let Agent create the default UUID session id for this conversation. + session = agent.create_session() + + # 4. Ask a question that triggers the weather tool. + print("=== Run with tool calling ===") + query = "Use the lookup_weather tool for Seattle and tell me the weather." + response = await agent.run(query, session=session) + print(f"User: {query}") + print(f"Assistant: {response.text}\n") + + # 5. Ask a follow-up question that triggers the weather tool as well + print("=== Follow-up question ===") + query = "And what about Amsterdam?" + response = await agent.run(query, session=session) + print(f"User: {query}") + print(f"Assistant: {response.text}\n") + + +if __name__ == "__main__": + asyncio.run(main()) + +""" +Sample output: +Using temporary directory: False +Storage directory: /path/to/samples/02-agents/conversations/sessions + +=== Run with tool calling === +User: Use the lookup_weather tool for Seattle and tell me the weather. +Assistant: +=== Follow-up question === +User: And what about Amsterdam? +Assistant: +""" From abb6eec52284ce4543e8b67f84156696c4976dcf Mon Sep 17 00:00:00 2001 From: eavanvalkenburg Date: Tue, 14 Apr 2026 16:03:19 +0200 Subject: [PATCH 2/9] Improve file history provider writes Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../core/agent_framework/_sessions.py | 16 ++++- .../packages/core/tests/core/test_sessions.py | 62 +++++++++++++++++++ .../samples/02-agents/conversations/README.md | 1 - .../conversations/file_history_provider.py | 2 +- 4 files changed, 76 insertions(+), 5 deletions(-) diff --git a/python/packages/core/agent_framework/_sessions.py b/python/packages/core/agent_framework/_sessions.py index d2d6f5c3fa..ddaf610546 100644 --- a/python/packages/core/agent_framework/_sessions.py +++ b/python/packages/core/agent_framework/_sessions.py @@ -16,6 +16,7 @@ import asyncio import copy import json +import threading import uuid from abc import abstractmethod from base64 import urlsafe_b64encode @@ -871,6 +872,8 @@ class FileHistoryProvider(HistoryProvider): DEFAULT_SESSION_FILE_STEM: ClassVar[str] = "default" FILE_EXTENSION: ClassVar[str] = ".jsonl" _ENCODED_SESSION_PREFIX: ClassVar[str] = "~session-" + _FILE_WRITE_LOCKS: ClassVar[dict[Path, threading.Lock]] = {} + _FILE_WRITE_LOCKS_GUARD: ClassVar[threading.Lock] = threading.Lock() _WINDOWS_RESERVED_FILE_STEMS: ClassVar[frozenset[str]] = frozenset({ "CON", "PRN", @@ -1004,11 +1007,12 @@ async def save_messages( return file_path = self._session_file_path(session_id) + file_lock = self._session_write_lock(file_path) def _append_messages() -> None: - serialized_messages = [self._serialize_message(message) for message in messages] - with file_path.open("a", encoding="utf-8") as file_handle: - file_handle.write("".join(f"{serialized_message}\n" for serialized_message in serialized_messages)) + with file_lock, file_path.open("a", encoding="utf-8") as file_handle: + for message in messages: + file_handle.write(f"{self._serialize_message(message)}\n") await asyncio.to_thread(_append_messages) @@ -1042,6 +1046,12 @@ def _session_file_stem(self, session_id: str | None) -> str: encoded_session_id = urlsafe_b64encode(raw_session_id.encode("utf-8")).decode("ascii").rstrip("=") return f"{self._ENCODED_SESSION_PREFIX}{encoded_session_id or self.DEFAULT_SESSION_FILE_STEM}" + @classmethod + def _session_write_lock(cls, file_path: Path) -> threading.Lock: + """Return the process-local append lock for a session history file.""" + with cls._FILE_WRITE_LOCKS_GUARD: + return cls._FILE_WRITE_LOCKS.setdefault(file_path, threading.Lock()) + @classmethod def _is_literal_session_file_stem_safe(cls, session_id: str) -> bool: """Return whether the session ID can be used directly as a filename stem.""" diff --git a/python/packages/core/tests/core/test_sessions.py b/python/packages/core/tests/core/test_sessions.py index 893555f84a..0713f98a78 100644 --- a/python/packages/core/tests/core/test_sessions.py +++ b/python/packages/core/tests/core/test_sessions.py @@ -2,8 +2,11 @@ import asyncio import json +import threading +import time from collections.abc import Awaitable, Callable, Sequence from pathlib import Path +from typing import Any import pytest @@ -640,3 +643,62 @@ def dumps(payload: object) -> str: with pytest.raises(ValueError, match="single-line JSON"): await provider.save_messages("pretty-json", [Message(role="user", contents=["hello"])]) + + async def test_concurrent_writes_for_same_session_are_locked( + self, + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, + ) -> None: + provider = FileHistoryProvider(tmp_path) + session_id = "shared-session" + file_path = provider._session_file_path(session_id) + real_open = Path.open + write_started = threading.Event() + active_writes = 0 + overlap_detected = False + + class _TrackingFile: + def __init__(self, wrapped: Any) -> None: + self._wrapped = wrapped + + def __enter__(self) -> "_TrackingFile": + self._wrapped.__enter__() + return self + + def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: + self._wrapped.__exit__(exc_type, exc_val, exc_tb) + + def write(self, data: str) -> int: + nonlocal active_writes, overlap_detected + write_started.set() + active_writes += 1 + overlap_detected = overlap_detected or active_writes > 1 + try: + time.sleep(0.05) + return int(self._wrapped.write(data)) + finally: + active_writes -= 1 + + def __getattr__(self, name: str) -> Any: + return getattr(self._wrapped, name) + + def tracked_open(path: Path, *args: Any, **kwargs: Any) -> Any: + handle = real_open(path, *args, **kwargs) + if path == file_path and args and args[0] == "a": + return _TrackingFile(handle) + return handle + + monkeypatch.setattr(Path, "open", tracked_open) + + first_save = asyncio.create_task(provider.save_messages(session_id, [Message(role="user", contents=["first"])])) + started = await asyncio.to_thread(write_started.wait, 1.0) + assert started + + second_save = asyncio.create_task( + provider.save_messages(session_id, [Message(role="assistant", contents=["second"])]) + ) + await asyncio.gather(first_save, second_save) + + assert not overlap_detected + loaded = await provider.get_messages(session_id) + assert [message.text for message in loaded] == ["first", "second"] diff --git a/python/samples/02-agents/conversations/README.md b/python/samples/02-agents/conversations/README.md index 1780033a6e..b9409ee257 100644 --- a/python/samples/02-agents/conversations/README.md +++ b/python/samples/02-agents/conversations/README.md @@ -9,7 +9,6 @@ These samples demonstrate different approaches to managing conversation history | [`suspend_resume_session.py`](suspend_resume_session.py) | Suspend and resume conversation sessions, comparing service-managed sessions (Azure AI Foundry) with in-memory sessions (OpenAI). | | [`custom_history_provider.py`](custom_history_provider.py) | Implement a custom history provider by extending `HistoryProvider`, enabling conversation persistence in your preferred storage backend. | | [`file_history_provider.py`](file_history_provider.py) | Use the experimental `FileHistoryProvider` with `FoundryChatClient` and a function tool so the local JSON Lines file shows the full tool-calling loop. | -| [`file_history_provider_orjson.py`](file_history_provider_orjson.py) | Configure `FileHistoryProvider` to use `orjson`, compare it to the standard library `json`, and preview the compact JSONL output. | | [`cosmos_history_provider.py`](cosmos_history_provider.py) | Use Azure Cosmos DB as a history provider for durable conversation storage with `CosmosHistoryProvider`. | | [`cosmos_history_provider_conversation_persistence.py`](cosmos_history_provider_conversation_persistence.py) | Persist and resume conversations across application restarts using `CosmosHistoryProvider` — serialize session state, restore it, and continue with full Cosmos DB history. | | [`cosmos_history_provider_messages.py`](cosmos_history_provider_messages.py) | Direct message history operations — retrieve stored messages as a transcript, clear session history, and verify data deletion. | diff --git a/python/samples/02-agents/conversations/file_history_provider.py b/python/samples/02-agents/conversations/file_history_provider.py index d3c089c0e7..a84b5c519f 100644 --- a/python/samples/02-agents/conversations/file_history_provider.py +++ b/python/samples/02-agents/conversations/file_history_provider.py @@ -94,7 +94,7 @@ async def main() -> None: agent = Agent( client=FoundryChatClient( project_endpoint=os.getenv("FOUNDRY_PROJECT_ENDPOINT"), - model="gpt-4.1-mini", + model=os.getenv("FOUNDRY_MODEL"), credential=AzureCliCredential(), ), name="FileHistoryAgent", From 1698c308f8574d99e41c7f81c97fbcf53fd1d7e6 Mon Sep 17 00:00:00 2001 From: eavanvalkenburg Date: Tue, 14 Apr 2026 16:05:07 +0200 Subject: [PATCH 3/9] typo --- python/samples/02-agents/conversations/file_history_provider.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/samples/02-agents/conversations/file_history_provider.py b/python/samples/02-agents/conversations/file_history_provider.py index a84b5c519f..ebe7e2ffbd 100644 --- a/python/samples/02-agents/conversations/file_history_provider.py +++ b/python/samples/02-agents/conversations/file_history_provider.py @@ -103,7 +103,7 @@ async def main() -> None: "answer with the tool result in one sentence." ), tools=[lookup_weather], - # if orjson is available, use it for more faster JSON serialization in the FileHistoryProvider, + # if orjson is available, use it for faster JSON serialization in the FileHistoryProvider, # otherwise fall back to the default json module. context_providers=[ FileHistoryProvider( From dfe30f363749b5bc835b14424e58d576621b4423 Mon Sep 17 00:00:00 2001 From: eavanvalkenburg Date: Tue, 14 Apr 2026 16:21:41 +0200 Subject: [PATCH 4/9] cleanup --- .worktrees/devui_datastar | 1 - 1 file changed, 1 deletion(-) delete mode 160000 .worktrees/devui_datastar diff --git a/.worktrees/devui_datastar b/.worktrees/devui_datastar deleted file mode 160000 index bf8d9672e1..0000000000 --- a/.worktrees/devui_datastar +++ /dev/null @@ -1 +0,0 @@ -Subproject commit bf8d9672e147c42696a5a17b0ed37878196b6715 From 9f5e03cdc049ca5934bba67107362a227e9aa880 Mon Sep 17 00:00:00 2001 From: eavanvalkenburg Date: Tue, 14 Apr 2026 16:23:56 +0200 Subject: [PATCH 5/9] cleanup --- .worktrees/issue-4675-duplicate-telemetry | 1 - .worktrees/issue-4676-a2a-sdk-update | 1 - 2 files changed, 2 deletions(-) delete mode 160000 .worktrees/issue-4675-duplicate-telemetry delete mode 160000 .worktrees/issue-4676-a2a-sdk-update diff --git a/.worktrees/issue-4675-duplicate-telemetry b/.worktrees/issue-4675-duplicate-telemetry deleted file mode 160000 index 55cc6e85c0..0000000000 --- a/.worktrees/issue-4675-duplicate-telemetry +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 55cc6e85c08db4d7795a48e85261655efd895409 diff --git a/.worktrees/issue-4676-a2a-sdk-update b/.worktrees/issue-4676-a2a-sdk-update deleted file mode 160000 index c551983295..0000000000 --- a/.worktrees/issue-4676-a2a-sdk-update +++ /dev/null @@ -1 +0,0 @@ -Subproject commit c5519832953763b847b7cacc515edb78cf50d28d From 7a6635f682beaa4c764fab7cd8a4cc85c36404f4 Mon Sep 17 00:00:00 2001 From: eavanvalkenburg Date: Tue, 14 Apr 2026 16:30:13 +0200 Subject: [PATCH 6/9] fix in readme --- python/samples/02-agents/conversations/README.md | 9 --------- 1 file changed, 9 deletions(-) diff --git a/python/samples/02-agents/conversations/README.md b/python/samples/02-agents/conversations/README.md index b9409ee257..4780861ce8 100644 --- a/python/samples/02-agents/conversations/README.md +++ b/python/samples/02-agents/conversations/README.md @@ -31,15 +31,6 @@ These samples demonstrate different approaches to managing conversation history - `FOUNDRY_MODEL`: The Foundry model deployment name - Azure CLI authentication (`az login`) -**For `file_history_provider_orjson.py`:** -- Run with a PEP 723-compatible runner so `orjson` is installed automatically, for example: - `uv run samples/02-agents/conversations/file_history_provider_orjson.py` -- The serializer benchmark runs without external services. -- For the optional live Foundry conversation: - - `FOUNDRY_PROJECT_ENDPOINT`: Your Azure AI Foundry project endpoint - - `FOUNDRY_MODEL`: The Foundry model deployment name - - Azure CLI authentication (`az login`) - **For Cosmos DB samples (`cosmos_history_provider*.py`):** - `FOUNDRY_PROJECT_ENDPOINT`: Your Azure AI Foundry project endpoint - `FOUNDRY_MODEL`: The Foundry model deployment name From 103c1ba16e6367801cd744b6455297327a4afb01 Mon Sep 17 00:00:00 2001 From: eavanvalkenburg Date: Tue, 14 Apr 2026 19:51:36 +0200 Subject: [PATCH 7/9] added security messages --- python/packages/core/agent_framework/_sessions.py | 9 +++++++++ python/samples/02-agents/conversations/README.md | 2 ++ .../02-agents/conversations/file_history_provider.py | 7 +++++++ 3 files changed, 18 insertions(+) diff --git a/python/packages/core/agent_framework/_sessions.py b/python/packages/core/agent_framework/_sessions.py index ddaf610546..4a0a7b1450 100644 --- a/python/packages/core/agent_framework/_sessions.py +++ b/python/packages/core/agent_framework/_sessions.py @@ -866,6 +866,15 @@ class FileHistoryProvider(HistoryProvider): it uses the standard library ``json`` module, but callers can inject alternative ``dumps`` and ``loads`` callables compatible with the JSON Lines format. + + Security posture: + Persisted history is stored as plaintext JSONL on the local filesystem. + Treat ``storage_path`` as trusted application storage, not as a secret + store. Encoded fallback filenames and resolved-path validation help + prevent path traversal via ``session_id``, but they do not encrypt file + contents or provide cross-process / cross-host locking. Use OS-level + file permissions, trusted directories, and carefully review what agent + or tool output is allowed to be persisted. """ DEFAULT_SOURCE_ID: ClassVar[str] = "file_history" diff --git a/python/samples/02-agents/conversations/README.md b/python/samples/02-agents/conversations/README.md index 4780861ce8..08faa65e4c 100644 --- a/python/samples/02-agents/conversations/README.md +++ b/python/samples/02-agents/conversations/README.md @@ -30,6 +30,8 @@ These samples demonstrate different approaches to managing conversation history - `FOUNDRY_PROJECT_ENDPOINT`: Your Azure AI Foundry project endpoint - `FOUNDRY_MODEL`: The Foundry model deployment name - Azure CLI authentication (`az login`) +- The sample writes plaintext JSONL conversation logs to disk; use a trusted + local directory and avoid treating the history files as secure secret storage **For Cosmos DB samples (`cosmos_history_provider*.py`):** - `FOUNDRY_PROJECT_ENDPOINT`: Your Azure AI Foundry project endpoint diff --git a/python/samples/02-agents/conversations/file_history_provider.py b/python/samples/02-agents/conversations/file_history_provider.py index ebe7e2ffbd..04a87f8224 100644 --- a/python/samples/02-agents/conversations/file_history_provider.py +++ b/python/samples/02-agents/conversations/file_history_provider.py @@ -49,6 +49,13 @@ output while keeping the on-disk JSONL file compact and valid. - `USE_TEMP_DIRECTORY`: Toggle between a temporary directory and a persistent `sessions/` folder next to this sample file. + +Security posture: +- The history files are plaintext JSONL on disk, so use a trusted storage + directory and treat the files as conversation logs, not as secure secret + storage. +- Path safety checks protect the filename derived from the session id, but they + do not redact message contents or encrypt the file. """ USE_TEMP_DIRECTORY = False From 603cfa30c0bc56c530a26a891e92ba4fa9a438d2 Mon Sep 17 00:00:00 2001 From: eavanvalkenburg Date: Tue, 14 Apr 2026 20:28:35 +0200 Subject: [PATCH 8/9] Refine file history provider locking Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../core/agent_framework/_sessions.py | 98 ++++++++++++------- .../packages/core/tests/core/test_sessions.py | 24 +++++ 2 files changed, 86 insertions(+), 36 deletions(-) diff --git a/python/packages/core/agent_framework/_sessions.py b/python/packages/core/agent_framework/_sessions.py index 4a0a7b1450..20125f19ff 100644 --- a/python/packages/core/agent_framework/_sessions.py +++ b/python/packages/core/agent_framework/_sessions.py @@ -18,6 +18,7 @@ import json import threading import uuid +import weakref from abc import abstractmethod from base64 import urlsafe_b64encode from collections.abc import Awaitable, Callable, Mapping, Sequence @@ -880,9 +881,11 @@ class FileHistoryProvider(HistoryProvider): DEFAULT_SOURCE_ID: ClassVar[str] = "file_history" DEFAULT_SESSION_FILE_STEM: ClassVar[str] = "default" FILE_EXTENSION: ClassVar[str] = ".jsonl" + _FILE_LOCK_STRIPE_COUNT: ClassVar[int] = 64 _ENCODED_SESSION_PREFIX: ClassVar[str] = "~session-" - _FILE_WRITE_LOCKS: ClassVar[dict[Path, threading.Lock]] = {} - _FILE_WRITE_LOCKS_GUARD: ClassVar[threading.Lock] = threading.Lock() + _FILE_WRITE_LOCKS: ClassVar[tuple[threading.Lock, ...]] = tuple( + threading.Lock() for _ in range(_FILE_LOCK_STRIPE_COUNT) + ) _WINDOWS_RESERVED_FILE_STEMS: ClassVar[frozenset[str]] = frozenset({ "CON", "PRN", @@ -955,6 +958,10 @@ def __init__( self.skip_excluded = skip_excluded self.dumps = dumps or _default_json_dumps self.loads = loads or _default_json_loads + self._async_write_locks_by_loop: weakref.WeakKeyDictionary[ + asyncio.AbstractEventLoop, + tuple[asyncio.Lock, ...], + ] = weakref.WeakKeyDictionary() async def get_messages( self, @@ -966,38 +973,42 @@ async def get_messages( """Retrieve messages from the session's JSON Lines file.""" del state, kwargs file_path = self._session_file_path(session_id) + async_lock = self._session_async_write_lock(file_path) + thread_lock = self._session_write_lock(file_path) def _read_messages() -> list[Message]: - if not file_path.exists(): - return [] - - messages: list[Message] = [] - with file_path.open(encoding="utf-8") as file_handle: - for line_number, line in enumerate(file_handle, start=1): - serialized = line.strip() - if not serialized: - continue - try: - payload = self.loads(serialized) - except (TypeError, ValueError) as exc: - raise ValueError( - f"Failed to deserialize history line {line_number} from '{file_path}'." - ) from exc - if not isinstance(payload, Mapping): - raise ValueError( - f"History line {line_number} in '{file_path}' did not deserialize to a mapping." - ) - - try: - message = Message.from_dict(dict(cast(Mapping[str, Any], payload))) - except ValueError as exc: - raise ValueError( - f"History line {line_number} in '{file_path}' is not a valid Message payload." - ) from exc - messages.append(message) - return messages - - messages = await asyncio.to_thread(_read_messages) + with thread_lock: + if not file_path.exists(): + return [] + + messages: list[Message] = [] + with file_path.open(encoding="utf-8") as file_handle: + for line_number, line in enumerate(file_handle, start=1): + serialized = line.strip() + if not serialized: + continue + try: + payload = self.loads(serialized) + except (TypeError, ValueError) as exc: + raise ValueError( + f"Failed to deserialize history line {line_number} from '{file_path}'." + ) from exc + if not isinstance(payload, Mapping): + raise ValueError( + f"History line {line_number} in '{file_path}' did not deserialize to a mapping." + ) + + try: + message = Message.from_dict(dict(cast(Mapping[str, Any], payload))) + except ValueError as exc: + raise ValueError( + f"History line {line_number} in '{file_path}' is not a valid Message payload." + ) from exc + messages.append(message) + return messages + + async with async_lock: + messages = await asyncio.to_thread(_read_messages) if self.skip_excluded: messages = [m for m in messages if not m.additional_properties.get("_excluded", False)] return messages @@ -1016,6 +1027,7 @@ async def save_messages( return file_path = self._session_file_path(session_id) + async_lock = self._session_async_write_lock(file_path) file_lock = self._session_write_lock(file_path) def _append_messages() -> None: @@ -1023,7 +1035,8 @@ def _append_messages() -> None: for message in messages: file_handle.write(f"{self._serialize_message(message)}\n") - await asyncio.to_thread(_append_messages) + async with async_lock: + await asyncio.to_thread(_append_messages) def _serialize_message(self, message: Message) -> str: """Serialize a message payload to a single JSON Lines record.""" @@ -1055,11 +1068,24 @@ def _session_file_stem(self, session_id: str | None) -> str: encoded_session_id = urlsafe_b64encode(raw_session_id.encode("utf-8")).decode("ascii").rstrip("=") return f"{self._ENCODED_SESSION_PREFIX}{encoded_session_id or self.DEFAULT_SESSION_FILE_STEM}" + def _session_async_write_lock(self, file_path: Path) -> asyncio.Lock: + """Return the event-loop-local async lock for a session history file.""" + loop = asyncio.get_running_loop() + locks = self._async_write_locks_by_loop.get(loop) + if locks is None: + locks = tuple(asyncio.Lock() for _ in range(self._FILE_LOCK_STRIPE_COUNT)) + self._async_write_locks_by_loop[loop] = locks + return locks[self._lock_index(file_path)] + @classmethod def _session_write_lock(cls, file_path: Path) -> threading.Lock: - """Return the process-local append lock for a session history file.""" - with cls._FILE_WRITE_LOCKS_GUARD: - return cls._FILE_WRITE_LOCKS.setdefault(file_path, threading.Lock()) + """Return the process-local thread lock for a session history file.""" + return cls._FILE_WRITE_LOCKS[cls._lock_index(file_path)] + + @classmethod + def _lock_index(cls, file_path: Path) -> int: + """Map a session history file to a bounded lock stripe.""" + return hash(file_path) % cls._FILE_LOCK_STRIPE_COUNT @classmethod def _is_literal_session_file_stem_safe(cls, session_id: str) -> bool: diff --git a/python/packages/core/tests/core/test_sessions.py b/python/packages/core/tests/core/test_sessions.py index 0713f98a78..ebb91d0b0d 100644 --- a/python/packages/core/tests/core/test_sessions.py +++ b/python/packages/core/tests/core/test_sessions.py @@ -620,6 +620,30 @@ async def test_invalid_jsonl_line_raises(self, tmp_path: Path) -> None: with pytest.raises(ValueError, match="Failed to deserialize history line 1"): await provider.get_messages("broken") + async def test_missing_session_file_returns_empty_messages(self, tmp_path: Path) -> None: + provider = FileHistoryProvider(tmp_path) + + loaded = await provider.get_messages("missing") + + assert loaded == [] + + async def test_none_session_id_uses_default_jsonl_file(self, tmp_path: Path) -> None: + provider = FileHistoryProvider(tmp_path) + + await provider.save_messages(None, [Message(role="user", contents=["hello"])]) + + session_file = provider._session_file_path(None) + assert session_file.name == "default.jsonl" + loaded = await provider.get_messages(None) + assert [message.text for message in loaded] == ["hello"] + + async def test_non_mapping_jsonl_line_raises(self, tmp_path: Path) -> None: + provider = FileHistoryProvider(tmp_path) + await asyncio.to_thread(provider._session_file_path("non-mapping").write_text, "[1, 2, 3]\n", encoding="utf-8") + + with pytest.raises(ValueError, match="did not deserialize to a mapping"): + await provider.get_messages("non-mapping") + async def test_skip_excluded_omits_excluded_messages(self, tmp_path: Path) -> None: provider = FileHistoryProvider(tmp_path, skip_excluded=True) From ba83f70b028b41a46449c244253653cbfa3a28d6 Mon Sep 17 00:00:00 2001 From: eavanvalkenburg Date: Tue, 14 Apr 2026 21:04:03 +0200 Subject: [PATCH 9/9] added additional sample --- .../samples/02-agents/conversations/README.md | 8 + ...story_provider_conversation_persistence.py | 185 ++++++++++++++++++ 2 files changed, 193 insertions(+) create mode 100644 python/samples/02-agents/conversations/file_history_provider_conversation_persistence.py diff --git a/python/samples/02-agents/conversations/README.md b/python/samples/02-agents/conversations/README.md index 08faa65e4c..002d4e6773 100644 --- a/python/samples/02-agents/conversations/README.md +++ b/python/samples/02-agents/conversations/README.md @@ -9,6 +9,7 @@ These samples demonstrate different approaches to managing conversation history | [`suspend_resume_session.py`](suspend_resume_session.py) | Suspend and resume conversation sessions, comparing service-managed sessions (Azure AI Foundry) with in-memory sessions (OpenAI). | | [`custom_history_provider.py`](custom_history_provider.py) | Implement a custom history provider by extending `HistoryProvider`, enabling conversation persistence in your preferred storage backend. | | [`file_history_provider.py`](file_history_provider.py) | Use the experimental `FileHistoryProvider` with `FoundryChatClient` and a function tool so the local JSON Lines file shows the full tool-calling loop. | +| [`file_history_provider_conversation_persistence.py`](file_history_provider_conversation_persistence.py) | Persist a tool-driven weather conversation with `FileHistoryProvider`, inspect the stored JSONL records, and continue with another city. | | [`cosmos_history_provider.py`](cosmos_history_provider.py) | Use Azure Cosmos DB as a history provider for durable conversation storage with `CosmosHistoryProvider`. | | [`cosmos_history_provider_conversation_persistence.py`](cosmos_history_provider_conversation_persistence.py) | Persist and resume conversations across application restarts using `CosmosHistoryProvider` — serialize session state, restore it, and continue with full Cosmos DB history. | | [`cosmos_history_provider_messages.py`](cosmos_history_provider_messages.py) | Direct message history operations — retrieve stored messages as a transcript, clear session history, and verify data deletion. | @@ -33,6 +34,13 @@ These samples demonstrate different approaches to managing conversation history - The sample writes plaintext JSONL conversation logs to disk; use a trusted local directory and avoid treating the history files as secure secret storage +**For `file_history_provider_conversation_persistence.py`:** +- `FOUNDRY_PROJECT_ENDPOINT`: Your Azure AI Foundry project endpoint +- `FOUNDRY_MODEL`: The Foundry model deployment name +- Azure CLI authentication (`az login`) +- The sample writes plaintext JSONL conversation logs to disk; use a trusted + local directory and avoid treating the history files as secure secret storage + **For Cosmos DB samples (`cosmos_history_provider*.py`):** - `FOUNDRY_PROJECT_ENDPOINT`: Your Azure AI Foundry project endpoint - `FOUNDRY_MODEL`: The Foundry model deployment name diff --git a/python/samples/02-agents/conversations/file_history_provider_conversation_persistence.py b/python/samples/02-agents/conversations/file_history_provider_conversation_persistence.py new file mode 100644 index 0000000000..70c5d7e8e8 --- /dev/null +++ b/python/samples/02-agents/conversations/file_history_provider_conversation_persistence.py @@ -0,0 +1,185 @@ +# Copyright (c) Microsoft. All rights reserved. +# ruff: noqa: T201 + +from __future__ import annotations + +import asyncio +import json +import tempfile +from collections.abc import Iterator +from contextlib import contextmanager +from pathlib import Path +from typing import Annotated + +# Uncomment this filter to suppress the experimental FileHistoryProvider warning +# before running the sample. +# import warnings # isort: skip +# warnings.filterwarnings("ignore", message=r"\[FILE_HISTORY\].*", category=FutureWarning) +from agent_framework import Agent, FileHistoryProvider, tool +from agent_framework.foundry import FoundryChatClient +from azure.identity.aio import AzureCliCredential +from dotenv import load_dotenv +from pydantic import Field + +try: + import orjson +except ImportError: + orjson = None + + +load_dotenv() + +""" +File History Provider Conversation Persistence + +This sample demonstrates persisting a tool-driven conversation with the +experimental `FileHistoryProvider`, reading the stored JSONL file back from +disk, and then continuing the same conversation with another city. + +Environment variables: + FOUNDRY_PROJECT_ENDPOINT: Azure AI Foundry project endpoint. + FOUNDRY_MODEL: Foundry model deployment name. + +Key components: +- `FileHistoryProvider`: Stores one message JSON object per line in a local + `.jsonl` file for each session. +- `get_weather`: A function tool that makes the persisted file show the + assistant function call and tool result records. +- `json.dumps(..., indent=2)`: Pretty-prints a few persisted JSONL records + while keeping the on-disk file compact and valid. +- `load_dotenv()`: Loads `.env` values up front so the sample can stay focused + on history persistence instead of manual environment variable plumbing. +- Optional `orjson`: Uses `orjson.dumps` / `orjson.loads` automatically when + available, otherwise falls back to the standard library `json` module. + +Security posture: +- The history file is plaintext JSONL on disk, so use a trusted storage + directory and treat it as conversation logging, not as secure secret storage. +- Path safety checks protect the filename derived from the session id, but they + do not redact message contents or encrypt the file. +""" + +USE_TEMP_DIRECTORY = False +"""When True, store JSONL files in a temporary directory for this run only.""" + +LOCAL_SESSIONS_DIRECTORY_NAME = "sessions" +"""Folder name used when persisting history next to this sample file.""" + + +@tool(approval_mode="never_require") +def get_weather( + city: Annotated[str, Field(description="The city to get the weather for.")], +) -> str: + """Return a deterministic weather report for a city.""" + weather_reports = { + "Seattle": "Seattle is rainy with a high of 13C.", + "Amsterdam": "Amsterdam is cloudy with a high of 16C.", + } + return weather_reports.get(city, f"{city} is sunny with a high of 20C.") + + +@contextmanager +def _resolve_storage_directory() -> Iterator[Path]: + """Yield the configured storage directory for the sample run.""" + if USE_TEMP_DIRECTORY: + with tempfile.TemporaryDirectory(prefix="af-file-history-resume-") as temp_directory: + yield Path(temp_directory) + return + + storage_directory = Path(__file__).resolve().parent / LOCAL_SESSIONS_DIRECTORY_NAME + storage_directory.mkdir(parents=True, exist_ok=True) + yield storage_directory + + +async def main() -> None: + """Run the file history provider conversation persistence sample.""" + + with _resolve_storage_directory() as storage_directory: + print(f"Using temporary directory: {USE_TEMP_DIRECTORY}") + print(f"Storage directory: {storage_directory}\n") + + # 1. Create the client, history provider, and tool-enabled agent. + agent = Agent( + client=FoundryChatClient( + credential=AzureCliCredential(), + ), + name="WeatherHistoryAgent", + instructions=( + "You are a helpful assistant. Use the get_weather tool for weather questions " + "and answer in one sentence using the tool result." + ), + tools=[get_weather], + context_providers=[ + FileHistoryProvider( + storage_directory, + dumps=orjson.dumps if orjson else None, + loads=orjson.loads if orjson else None, + ) + ], + default_options={"store": False}, + ) + + # 2. Ask about the first city so the JSONL file is created on disk. + session = agent.create_session() + history_file = storage_directory / f"{session.session_id}.jsonl" + print("=== First weather question ===\n") + first_query = "Use the get_weather tool and tell me the weather in Seattle." + first_response = await agent.run(first_query, session=session) + print(f"User: {first_query}") + print(f"Assistant: {first_response.text}\n") + + # 3. Read the stored JSONL records back from disk and pretty-print a few of them. + raw_lines = (await asyncio.to_thread(history_file.read_text, encoding="utf-8")).splitlines() + print(f"Stored message lines after first question: {len(raw_lines)}") + print(f"History file: {history_file}\n") + print("=== JSONL preview from disk ===\n") + for index, line in enumerate(raw_lines[:4], start=1): + print(f"Record {index}:") + print(json.dumps(json.loads(line), indent=2)) + print() + + # 4. Continue the same persisted conversation with another city. + print("=== Second weather question ===\n") + second_query = "Now use the get_weather tool for Amsterdam." + second_response = await agent.run(second_query, session=session) + print(f"User: {second_query}") + print(f"Assistant: {second_response.text}\n") + + updated_lines = (await asyncio.to_thread(history_file.read_text, encoding="utf-8")).splitlines() + print(f"Stored message lines after second question: {len(updated_lines)}") + print(f"History file: {history_file}") + + +if __name__ == "__main__": + asyncio.run(main()) + +""" +Sample output: +Using temporary directory: False +Storage directory: /path/to/samples/02-agents/conversations/sessions + +=== First weather question === + +User: Use the get_weather tool and tell me the weather in Seattle. +Assistant: + +Stored message lines after first question: 4 +History file: /path/to/samples/02-agents/conversations/sessions/.jsonl + +=== JSONL preview from disk === + +Record 1: +{ + "type": "message", + "role": "user", + ... +} + +=== Second weather question === + +User: Now use the get_weather tool for Amsterdam. +Assistant: + +Stored message lines after second question: 8 +History file: /path/to/samples/02-agents/conversations/sessions/.jsonl +"""