Copyright 2026 Firefly Software Foundation. Licensed under the Apache License 2.0.
The Memory module provides conversation history, working memory, and pluggable storage backends for agents, multi-agent delegation, reasoning patterns, and pipelines. It ensures that LLM interactions are stateful and context-aware across turns, sessions, and pipeline steps.
The memory subsystem has four layers:
graph TD
MM[MemoryManager] --> CM[ConversationMemory]
MM --> WM[WorkingMemory]
WM --> STORE[MemoryStore]
CM --> TOKENS[TokenEstimator]
STORE --> IM[InMemoryStore]
STORE --> FS[FileStore]
STORE --> SQ[SQLiteStore]
STORE --> PG[PostgreSQLStore]
STORE --> MG[MongoDBStore]
STORE --> CUSTOM[Custom Backend]
- ConversationMemory -- Token-aware, per-conversation chat history that
wraps pydantic-ai's
message_historymechanism. - WorkingMemory -- Scoped key-value scratchpad for session facts, entities, and intermediate state.
- MemoryStore -- Pluggable persistence backends. Three are stdlib-only
(
InMemoryStore,FileStore,SQLiteStore);PostgreSQLStoreandMongoDBStorelive behind optional dependency groups. Implement theMemoryStoreprotocol for any custom backend. - MemoryManager -- Facade that composes conversation and working memory behind a single API.
from fireflyframework_agentic.agents import FireflyAgent
from fireflyframework_agentic.memory import MemoryManager
# Create a memory manager
memory = MemoryManager(max_conversation_tokens=32_000)
# Attach to an agent
agent = FireflyAgent(
name="assistant",
model="openai:gpt-4o",
memory=memory,
)
# Start a conversation
conv_id = memory.new_conversation()
# Each run automatically loads/stores conversation history
result1 = await agent.run("What is Python?", conversation_id=conv_id)
result2 = await agent.run("What about its type system?", conversation_id=conv_id)
# result2 has full context from result1ConversationMemory manages list[ModelMessage] per conversation ID. It
automatically enforces a token budget by dropping oldest turns (FIFO).
from fireflyframework_agentic.memory import ConversationMemory
conv_mem = ConversationMemory(max_tokens=16_000)
cid = conv_mem.new_conversation()
# After an agent run, store the turn
conv_mem.add_turn(
cid,
user_prompt="Hello",
assistant_response="Hi there!",
raw_messages=result.new_messages(), # pydantic-ai ModelMessage list
)
# Before the next run, get the trimmed history
history = conv_mem.get_message_history(cid)
# Pass to agent: agent.run("Next question", message_history=history)When FireflyAgent has a memory attached, all of this happens automatically.
When the total token count exceeds max_tokens, older turns are evicted from
the front. The TokenEstimator (from the content module) is used for counting.
conv_mem = ConversationMemory(max_tokens=4_000)
# After many turns, only the most recent ones fitting in 4000 tokens are returnedWhen many turns are evicted, important context can be lost. The optional summarization feature compresses evicted turns into a summary that is retained for context enrichment.
summarize_threshold is a token count (default 3072). When the sum of
the per-turn token_estimate values exceeds it, the oldest turns are evicted
and (if a summarizer is configured) compressed into a summary. Pass a
summarizer callable that takes a list of ConversationTurn objects and
returns a summary string:
from fireflyframework_agentic.memory import ConversationMemory
def my_summarizer(turns):
texts = [f"{t.user_prompt} -> {t.assistant_response}" for t in turns]
return "Summary of earlier conversation: " + "; ".join(texts)
conv_mem = ConversationMemory(
max_tokens=4_000,
summarize_threshold=3_000,
summarizer=my_summarizer,
)Summarization only runs when a summarizer is supplied; with no summarizer,
turns are still evicted by the token budget but no summary is produced.
Retrieve the summary for a conversation:
summary = conv_mem.get_summary(conversation_id)
if summary:
print(f"Evicted turns summarised as: {summary}")In production, the summarizer can be an LLM call that condenses older turns into a concise paragraph, preserving key facts while staying within budget.
When the manager is built via MemoryManager.from_config(), the threshold
comes from memory_summarize_threshold (env
FIREFLY_AGENTIC_MEMORY_SUMMARIZE_THRESHOLD) and is applied as a token
count. The config default is 10, which is intentionally small; set a
realistic token budget for production:
export FIREFLY_AGENTIC_MEMORY_SUMMARIZE_THRESHOLD=3000WorkingMemory is a scoped key-value store for facts and intermediate state.
It is useful for passing context between pipeline steps, storing extraction
results for validation, or maintaining entities across reasoning iterations.
from fireflyframework_agentic.memory import WorkingMemory
wm = WorkingMemory(scope_id="idp-session-42")
wm.set("doc_type", "invoice")
wm.set("vendor", "Acme Corp", importance=0.9) # importance is 0.0-1.0, default 0.5
print(wm.get("doc_type")) # "invoice"
print(wm.has("vendor")) # True
print(wm.keys()) # ["doc_type", "vendor"]
print(wm.items()) # [("doc_type", "invoice"), ("vendor", "Acme Corp")]
print(wm.to_dict()) # {"doc_type": "invoice", "vendor": "Acme Corp"}
wm.delete("vendor") # remove a single key
# Render as a text block for prompt injection
print(wm.to_context_string())
# Working Memory:
# - doc_type: invoiceThe full key-value API is set(key, value, *, importance=0.5), get(key, default=None), has(key), delete(key), keys(), items(), to_dict(),
to_context_string(), and clear().
Multiple WorkingMemory instances can share the same store backend while
maintaining independent namespaces:
from fireflyframework_agentic.memory import InMemoryStore, WorkingMemory
store = InMemoryStore()
agent_a_mem = WorkingMemory(store=store, scope_id="agent_a")
agent_b_mem = WorkingMemory(store=store, scope_id="agent_b")
agent_a_mem.set("key", "from A")
agent_b_mem.set("key", "from B")
assert agent_a_mem.get("key") == "from A"
assert agent_b_mem.get("key") == "from B"The unit of storage is MemoryEntry, and every store honours its fields:
from fireflyframework_agentic.memory import MemoryEntry, MemoryScopescope— aMemoryScope(CONVERSATION,WORKING, orLONG_TERM) classifying the entry's lifetime.key— optional key for key-value lookups (used byWorkingMemory).content— the stored value (string, dict, list, etc.).importance— a0.0–1.0priority weight (default0.5).expires_at— optional TTL timestamp. Entries past their expiry reportentry.is_expiredand are filtered out ofload()/load_by_key()by every backend; the database backends additionally providecleanup_expired()to delete them.
Dict-backed, fast, non-persistent. Suitable for testing and short-lived sessions.
from fireflyframework_agentic.memory import InMemoryStore
store = InMemoryStore()JSON file persistence. Each namespace is a separate file.
from fireflyframework_agentic.memory import FileStore
store = FileStore(base_dir=".firefly_memory")FileStore also provides async wrappers (async_save, async_load,
async_load_by_key, async_delete, async_clear) that delegate blocking I/O
to asyncio.to_thread(), keeping the event loop non-blocking in async
applications:
await store.async_save("conversations", entry)
entries = await store.async_load("conversations")Stdlib-only SQLite persistence (sqlite3), with the same constructor-and-go
ergonomics as FileStore. Compared to FileStore, every operation is atomic
(a crash mid-save cannot corrupt the file) and namespace queries use a SQL
index instead of loading and parsing every entry. The file and its parent
directories are created automatically.
from fireflyframework_agentic.memory import SQLiteStore
store = SQLiteStore("data/firefly_memory.sqlite3")SQLiteStore exposes the same async wrappers as FileStore (async_save,
async_load, async_load_by_key, async_delete, async_clear). Choose
FileStore for human-readable JSON; choose SQLiteStore for crash safety and
faster operations on larger namespaces.
Production-grade PostgreSQL persistence with connection pooling. Requires
asyncpg (install via pip install fireflyframework-agentic[postgres]).
The constructor is PostgreSQLStore(url, *, pool_size=10, pool_min_size=2, timeout=30.0, schema_name="firefly_memory"). You must await store.initialize() before use — it creates the connection pool and migrates
the schema (it is idempotent and safe to call more than once).
from fireflyframework_agentic.memory.database_store import PostgreSQLStore
store = PostgreSQLStore(
url="postgresql://user:pass@localhost/firefly",
pool_size=10,
pool_min_size=2,
schema_name="firefly_memory",
)
await store.initialize() # required before any operation
# Use with MemoryManager
memory = MemoryManager(store=store)
# During application shutdown
await store.close()The sync save/load/etc. methods run their async counterparts in a worker
thread and will lazily call initialize() if you skipped it, but the
async-native path (async_save, async_load, ...) is recommended. The store
also exposes await store.cleanup_expired() -> int, which deletes all
expired entries (by expires_at) and returns the count removed.
Environment Configuration:
export FIREFLY_AGENTIC_MEMORY_BACKEND=postgres
export FIREFLY_AGENTIC_MEMORY_POSTGRES_URL=postgresql://user:pass@localhost/firefly
export FIREFLY_AGENTIC_MEMORY_POSTGRES_POOL_SIZE=10
export FIREFLY_AGENTIC_MEMORY_POSTGRES_POOL_MIN_SIZE=2
export FIREFLY_AGENTIC_MEMORY_POSTGRES_SCHEMA=firefly_memoryMemoryManager.from_config() reads these and calls initialize() for you.
Schema (created automatically on initialize()):
CREATE TABLE IF NOT EXISTS firefly_memory.memory_entries (
entry_id TEXT PRIMARY KEY,
namespace TEXT NOT NULL,
scope TEXT NOT NULL,
key TEXT,
content JSONB NOT NULL,
metadata JSONB NOT NULL DEFAULT '{}',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
expires_at TIMESTAMPTZ,
importance FLOAT NOT NULL DEFAULT 0.5
CHECK (importance >= 0.0 AND importance <= 1.0)
);
CREATE INDEX idx_namespace ON firefly_memory.memory_entries(namespace);
CREATE INDEX idx_namespace_key ON firefly_memory.memory_entries(namespace, key)
WHERE key IS NOT NULL;
CREATE INDEX idx_expires_at ON firefly_memory.memory_entries(expires_at)
WHERE expires_at IS NOT NULL;The schema name (default firefly_memory) is configurable via schema_name.
Scalable MongoDB persistence with connection pooling. Requires motor and
pymongo (install via pip install fireflyframework-agentic[mongodb]).
The constructor is MongoDBStore(url, *, database="firefly_memory", collection="entries", pool_size=10). As with PostgreSQLStore, call
await store.initialize() before use (it connects and creates indexes) and
await store.close() on shutdown.
from fireflyframework_agentic.memory.database_store import MongoDBStore
store = MongoDBStore(
url="mongodb://localhost:27017/",
database="firefly_memory",
collection="entries",
pool_size=10,
)
await store.initialize() # required before any operation
memory = MemoryManager(store=store)
# During application shutdown
await store.close()MongoDBStore also exposes await store.cleanup_expired() -> int to purge
expired entries.
Environment Configuration:
export FIREFLY_AGENTIC_MEMORY_BACKEND=mongodb
export FIREFLY_AGENTIC_MEMORY_MONGODB_URL=mongodb://localhost:27017/
export FIREFLY_AGENTIC_MEMORY_MONGODB_DATABASE=firefly_memory
export FIREFLY_AGENTIC_MEMORY_MONGODB_COLLECTION=entries
export FIREFLY_AGENTIC_MEMORY_MONGODB_POOL_SIZE=10The store automatically creates the entries collection (in the
firefly_memory database by default) and these indexes:
// Each document is the serialized MemoryEntry plus namespace/scope fields:
{
entry_id: "abc123",
namespace: "working:session-42",
scope: "working",
key: "doc_type",
content: { ... },
metadata: { ... },
created_at: ISODate("..."),
expires_at: null,
importance: 0.5
}
// Indexes
db.entries.createIndex({ namespace: 1 })
db.entries.createIndex({ namespace: 1, key: 1 },
{ partialFilterExpression: { key: { $exists: true } } })
db.entries.createIndex({ expires_at: 1 },
{ partialFilterExpression: { expires_at: { $exists: true } } })
db.entries.createIndex({ entry_id: 1 }, { unique: true })Implement the MemoryStore protocol for Redis, SQL, or any other backend:
from fireflyframework_agentic.memory import MemoryStore, MemoryEntry
class RedisStore:
def save(self, namespace: str, entry: MemoryEntry) -> None: ...
def load(self, namespace: str) -> list[MemoryEntry]: ...
def load_by_key(self, namespace: str, key: str) -> MemoryEntry | None: ...
def delete(self, namespace: str, entry_id: str) -> None: ...
def clear(self, namespace: str) -> None: ...ConversationMemory supports exporting and importing conversations as
JSON-serialisable dictionaries. This is useful for backup, migration,
debugging, and cross-service conversation transfer.
# Export a conversation
data = conv_mem.export_conversation(conversation_id)
# data is a dict with: conversation_id, turns, summary, total_tokens
# Import into another instance (or after restart)
new_id = conv_mem.import_conversation(data)
# Or override the conversation ID:
new_id = conv_mem.import_conversation(data, conversation_id="custom-id")Exported turns include turn_id, user_prompt, assistant_response,
token_estimate, and metadata. Note that raw_messages (pydantic-ai
ModelMessage objects) are not exported as they are not portable.
create_llm_summarizer() is a factory function that returns a callable
suitable for ConversationMemory's summarizer parameter. It uses an
ephemeral Pydantic AI agent to compress evicted turns into a concise summary.
from fireflyframework_agentic.memory.summarization import create_llm_summarizer
summarizer = create_llm_summarizer(model="openai:gpt-4o-mini")
conv_mem = ConversationMemory(
max_tokens=4_000,
summarize_threshold=3_000,
summarizer=summarizer,
)Parameters:
model— Pydantic AI model string orModelinstance. WhenNone, uses the framework default model.prompt_template— Custom prompt template. Must contain a{turns_text}placeholder.max_summary_tokens— Soft limit on summary length (guidance, not enforced).
If the LLM call fails, the factory falls back to a non-LLM truncation strategy that extracts key sentences from the most recent turns.
MemoryManager is the single entry point for conversation and working memory.
It is the object you attach to agents, delegation routers, and pipelines. The
constructor takes keyword-only arguments:
from fireflyframework_agentic.memory import MemoryManager
mgr = MemoryManager(
store=None, # defaults to InMemoryStore
max_conversation_tokens=32_000,
summarize_threshold=10,
working_scope_id="main-session",
)
# Conversation
cid = mgr.new_conversation()
mgr.add_turn(cid, "hello", "hi", raw_messages)
history = mgr.get_message_history(cid)
# Working memory
mgr.set_fact("doc_type", "invoice")
mgr.get_fact("doc_type") # "invoice"
print(mgr.get_working_context()) # text block for prompt injectionMemoryManager.from_config() is the canonical env-driven wiring path. It reads
memory_backend, memory_max_conversation_tokens, and
memory_summarize_threshold from the framework config, selects the matching
backend (in_memory / file / postgres / mongodb), raises if a required
URL is missing, and calls initialize() on database backends for you:
mgr = MemoryManager.from_config()The composed subsystems are reachable directly, and lifecycle helpers clear state:
mgr.conversation # the ConversationMemory instance
mgr.working # the WorkingMemory instance
mgr.store # the underlying MemoryStore
mgr.clear_conversation(cid) # clear one conversation
mgr.clear_working() # clear working memory
mgr.clear_all() # clear bothWhen delegating to a sub-agent or branching a pipeline, fork() creates a
child manager that shares conversation memory but has independent working memory:
child = mgr.fork(working_scope_id="sub-agent-classify")
child.set_fact("classification", "invoice")
# parent's working memory is unaffectedagent = FireflyAgent(name="bot", model="openai:gpt-4o", memory=mgr)
result = await agent.run("Hi", conversation_id=cid)
# message_history is auto-injected, new messages auto-storedfrom fireflyframework_agentic.agents.delegation import DelegationRouter, RoundRobinStrategy
router = DelegationRouter([agent_a, agent_b], RoundRobinStrategy(), memory=mgr)
result = await router.route("Translate this text.")
# delegated agent receives a forked memory scopefrom fireflyframework_agentic.pipeline.context import PipelineContext
ctx = PipelineContext(inputs=data, memory=mgr)
result = await engine.run(context=ctx)
# AgentStep and ReasoningStep automatically propagate memoryMemory is injected into the reasoning state dict as state["memory"]:
result = await pattern.execute(agent, "Analyze this", memory=mgr)
# Inside pattern hooks: state["memory"].working.set("key", "value")The framework is a pure in-process library: it serves no HTTP port. When a host
service exposes agents over its own transport, it owns the wiring of
conversation_id from the inbound request to agent.run(..., conversation_id=...).
The memory backend simply provides the stateful continuity behind that call.
Memory settings are configured via environment variables (prefix
FIREFLY_AGENTIC_) and consumed by MemoryManager.from_config():
# Core
export FIREFLY_AGENTIC_MEMORY_BACKEND=in_memory # in_memory | file | postgres | mongodb
export FIREFLY_AGENTIC_MEMORY_MAX_CONVERSATION_TOKENS=128000
export FIREFLY_AGENTIC_MEMORY_SUMMARIZE_THRESHOLD=10 # token count (small default; raise for prod)
export FIREFLY_AGENTIC_MEMORY_FILE_DIR=.firefly_memory # used by the "file" backend
# PostgreSQL backend
export FIREFLY_AGENTIC_MEMORY_POSTGRES_URL=postgresql://user:pass@localhost/firefly
export FIREFLY_AGENTIC_MEMORY_POSTGRES_POOL_SIZE=10
export FIREFLY_AGENTIC_MEMORY_POSTGRES_POOL_MIN_SIZE=2
export FIREFLY_AGENTIC_MEMORY_POSTGRES_SCHEMA=firefly_memory
# MongoDB backend
export FIREFLY_AGENTIC_MEMORY_MONGODB_URL=mongodb://localhost:27017/
export FIREFLY_AGENTIC_MEMORY_MONGODB_DATABASE=firefly_memory
export FIREFLY_AGENTIC_MEMORY_MONGODB_COLLECTION=entries
export FIREFLY_AGENTIC_MEMORY_MONGODB_POOL_SIZE=10Note: the from_config() env path supports in_memory, file, postgres,
and mongodb. SQLiteStore is constructed explicitly (SQLiteStore(path))
and passed via MemoryManager(store=...).