Skip to content

Latest commit

 

History

History
627 lines (472 loc) · 19.7 KB

File metadata and controls

627 lines (472 loc) · 19.7 KB

Memory Guide

Copyright 2026 Firefly Software Foundation. Licensed under the Apache License 2.0.

The Memory module provides conversation history, working memory, and pluggable storage backends for agents, multi-agent delegation, reasoning patterns, and pipelines. It ensures that LLM interactions are stateful and context-aware across turns, sessions, and pipeline steps.


Architecture

The memory subsystem has four layers:

graph TD
    MM[MemoryManager] --> CM[ConversationMemory]
    MM --> WM[WorkingMemory]
    WM --> STORE[MemoryStore]
    CM --> TOKENS[TokenEstimator]
    STORE --> IM[InMemoryStore]
    STORE --> FS[FileStore]
    STORE --> SQ[SQLiteStore]
    STORE --> PG[PostgreSQLStore]
    STORE --> MG[MongoDBStore]
    STORE --> CUSTOM[Custom Backend]
Loading
  • ConversationMemory -- Token-aware, per-conversation chat history that wraps pydantic-ai's message_history mechanism.
  • WorkingMemory -- Scoped key-value scratchpad for session facts, entities, and intermediate state.
  • MemoryStore -- Pluggable persistence backends. Three are stdlib-only (InMemoryStore, FileStore, SQLiteStore); PostgreSQLStore and MongoDBStore live behind optional dependency groups. Implement the MemoryStore protocol for any custom backend.
  • MemoryManager -- Facade that composes conversation and working memory behind a single API.

Quick Start

from fireflyframework_agentic.agents import FireflyAgent
from fireflyframework_agentic.memory import MemoryManager

# Create a memory manager
memory = MemoryManager(max_conversation_tokens=32_000)

# Attach to an agent
agent = FireflyAgent(
    name="assistant",
    model="openai:gpt-4o",
    memory=memory,
)

# Start a conversation
conv_id = memory.new_conversation()

# Each run automatically loads/stores conversation history
result1 = await agent.run("What is Python?", conversation_id=conv_id)
result2 = await agent.run("What about its type system?", conversation_id=conv_id)
# result2 has full context from result1

Conversation Memory

ConversationMemory manages list[ModelMessage] per conversation ID. It automatically enforces a token budget by dropping oldest turns (FIFO).

from fireflyframework_agentic.memory import ConversationMemory

conv_mem = ConversationMemory(max_tokens=16_000)
cid = conv_mem.new_conversation()

# After an agent run, store the turn
conv_mem.add_turn(
    cid,
    user_prompt="Hello",
    assistant_response="Hi there!",
    raw_messages=result.new_messages(), # pydantic-ai ModelMessage list
)

# Before the next run, get the trimmed history
history = conv_mem.get_message_history(cid)
# Pass to agent: agent.run("Next question", message_history=history)

When FireflyAgent has a memory attached, all of this happens automatically.

Token Budget

When the total token count exceeds max_tokens, older turns are evicted from the front. The TokenEstimator (from the content module) is used for counting.

conv_mem = ConversationMemory(max_tokens=4_000)
# After many turns, only the most recent ones fitting in 4000 tokens are returned

Conversation Summarization

When many turns are evicted, important context can be lost. The optional summarization feature compresses evicted turns into a summary that is retained for context enrichment.

summarize_threshold is a token count (default 3072). When the sum of the per-turn token_estimate values exceeds it, the oldest turns are evicted and (if a summarizer is configured) compressed into a summary. Pass a summarizer callable that takes a list of ConversationTurn objects and returns a summary string:

from fireflyframework_agentic.memory import ConversationMemory

def my_summarizer(turns):
    texts = [f"{t.user_prompt} -> {t.assistant_response}" for t in turns]
    return "Summary of earlier conversation: " + "; ".join(texts)

conv_mem = ConversationMemory(
    max_tokens=4_000,
    summarize_threshold=3_000,
    summarizer=my_summarizer,
)

Summarization only runs when a summarizer is supplied; with no summarizer, turns are still evicted by the token budget but no summary is produced.

Retrieve the summary for a conversation:

summary = conv_mem.get_summary(conversation_id)
if summary:
    print(f"Evicted turns summarised as: {summary}")

In production, the summarizer can be an LLM call that condenses older turns into a concise paragraph, preserving key facts while staying within budget.

When the manager is built via MemoryManager.from_config(), the threshold comes from memory_summarize_threshold (env FIREFLY_AGENTIC_MEMORY_SUMMARIZE_THRESHOLD) and is applied as a token count. The config default is 10, which is intentionally small; set a realistic token budget for production:

export FIREFLY_AGENTIC_MEMORY_SUMMARIZE_THRESHOLD=3000

Working Memory

WorkingMemory is a scoped key-value store for facts and intermediate state. It is useful for passing context between pipeline steps, storing extraction results for validation, or maintaining entities across reasoning iterations.

from fireflyframework_agentic.memory import WorkingMemory

wm = WorkingMemory(scope_id="idp-session-42")
wm.set("doc_type", "invoice")
wm.set("vendor", "Acme Corp", importance=0.9)  # importance is 0.0-1.0, default 0.5

print(wm.get("doc_type")) # "invoice"
print(wm.has("vendor")) # True
print(wm.keys()) # ["doc_type", "vendor"]
print(wm.items()) # [("doc_type", "invoice"), ("vendor", "Acme Corp")]
print(wm.to_dict()) # {"doc_type": "invoice", "vendor": "Acme Corp"}

wm.delete("vendor")  # remove a single key

# Render as a text block for prompt injection
print(wm.to_context_string())
# Working Memory:
# - doc_type: invoice

The full key-value API is set(key, value, *, importance=0.5), get(key, default=None), has(key), delete(key), keys(), items(), to_dict(), to_context_string(), and clear().

Scoped Isolation

Multiple WorkingMemory instances can share the same store backend while maintaining independent namespaces:

from fireflyframework_agentic.memory import InMemoryStore, WorkingMemory

store = InMemoryStore()
agent_a_mem = WorkingMemory(store=store, scope_id="agent_a")
agent_b_mem = WorkingMemory(store=store, scope_id="agent_b")

agent_a_mem.set("key", "from A")
agent_b_mem.set("key", "from B")
assert agent_a_mem.get("key") == "from A"
assert agent_b_mem.get("key") == "from B"

Memory Entries

The unit of storage is MemoryEntry, and every store honours its fields:

from fireflyframework_agentic.memory import MemoryEntry, MemoryScope
  • scope — a MemoryScope (CONVERSATION, WORKING, or LONG_TERM) classifying the entry's lifetime.
  • key — optional key for key-value lookups (used by WorkingMemory).
  • content — the stored value (string, dict, list, etc.).
  • importance — a 0.0–1.0 priority weight (default 0.5).
  • expires_at — optional TTL timestamp. Entries past their expiry report entry.is_expired and are filtered out of load() / load_by_key() by every backend; the database backends additionally provide cleanup_expired() to delete them.

Storage Backends

InMemoryStore

Dict-backed, fast, non-persistent. Suitable for testing and short-lived sessions.

from fireflyframework_agentic.memory import InMemoryStore
store = InMemoryStore()

FileStore

JSON file persistence. Each namespace is a separate file.

from fireflyframework_agentic.memory import FileStore
store = FileStore(base_dir=".firefly_memory")

FileStore also provides async wrappers (async_save, async_load, async_load_by_key, async_delete, async_clear) that delegate blocking I/O to asyncio.to_thread(), keeping the event loop non-blocking in async applications:

await store.async_save("conversations", entry)
entries = await store.async_load("conversations")

SQLiteStore

Stdlib-only SQLite persistence (sqlite3), with the same constructor-and-go ergonomics as FileStore. Compared to FileStore, every operation is atomic (a crash mid-save cannot corrupt the file) and namespace queries use a SQL index instead of loading and parsing every entry. The file and its parent directories are created automatically.

from fireflyframework_agentic.memory import SQLiteStore
store = SQLiteStore("data/firefly_memory.sqlite3")

SQLiteStore exposes the same async wrappers as FileStore (async_save, async_load, async_load_by_key, async_delete, async_clear). Choose FileStore for human-readable JSON; choose SQLiteStore for crash safety and faster operations on larger namespaces.

PostgreSQLStore

Production-grade PostgreSQL persistence with connection pooling. Requires asyncpg (install via pip install fireflyframework-agentic[postgres]).

The constructor is PostgreSQLStore(url, *, pool_size=10, pool_min_size=2, timeout=30.0, schema_name="firefly_memory"). You must await store.initialize() before use — it creates the connection pool and migrates the schema (it is idempotent and safe to call more than once).

from fireflyframework_agentic.memory.database_store import PostgreSQLStore

store = PostgreSQLStore(
    url="postgresql://user:pass@localhost/firefly",
    pool_size=10,
    pool_min_size=2,
    schema_name="firefly_memory",
)
await store.initialize()  # required before any operation

# Use with MemoryManager
memory = MemoryManager(store=store)

# During application shutdown
await store.close()

The sync save/load/etc. methods run their async counterparts in a worker thread and will lazily call initialize() if you skipped it, but the async-native path (async_save, async_load, ...) is recommended. The store also exposes await store.cleanup_expired() -> int, which deletes all expired entries (by expires_at) and returns the count removed.

Environment Configuration:

export FIREFLY_AGENTIC_MEMORY_BACKEND=postgres
export FIREFLY_AGENTIC_MEMORY_POSTGRES_URL=postgresql://user:pass@localhost/firefly
export FIREFLY_AGENTIC_MEMORY_POSTGRES_POOL_SIZE=10
export FIREFLY_AGENTIC_MEMORY_POSTGRES_POOL_MIN_SIZE=2
export FIREFLY_AGENTIC_MEMORY_POSTGRES_SCHEMA=firefly_memory

MemoryManager.from_config() reads these and calls initialize() for you.

Schema (created automatically on initialize()):

CREATE TABLE IF NOT EXISTS firefly_memory.memory_entries (
    entry_id    TEXT PRIMARY KEY,
    namespace   TEXT NOT NULL,
    scope       TEXT NOT NULL,
    key         TEXT,
    content     JSONB NOT NULL,
    metadata    JSONB NOT NULL DEFAULT '{}',
    created_at  TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    expires_at  TIMESTAMPTZ,
    importance  FLOAT NOT NULL DEFAULT 0.5
        CHECK (importance >= 0.0 AND importance <= 1.0)
);

CREATE INDEX idx_namespace      ON firefly_memory.memory_entries(namespace);
CREATE INDEX idx_namespace_key  ON firefly_memory.memory_entries(namespace, key)
    WHERE key IS NOT NULL;
CREATE INDEX idx_expires_at     ON firefly_memory.memory_entries(expires_at)
    WHERE expires_at IS NOT NULL;

The schema name (default firefly_memory) is configurable via schema_name.

MongoDBStore

Scalable MongoDB persistence with connection pooling. Requires motor and pymongo (install via pip install fireflyframework-agentic[mongodb]).

The constructor is MongoDBStore(url, *, database="firefly_memory", collection="entries", pool_size=10). As with PostgreSQLStore, call await store.initialize() before use (it connects and creates indexes) and await store.close() on shutdown.

from fireflyframework_agentic.memory.database_store import MongoDBStore

store = MongoDBStore(
    url="mongodb://localhost:27017/",
    database="firefly_memory",
    collection="entries",
    pool_size=10,
)
await store.initialize()  # required before any operation

memory = MemoryManager(store=store)

# During application shutdown
await store.close()

MongoDBStore also exposes await store.cleanup_expired() -> int to purge expired entries.

Environment Configuration:

export FIREFLY_AGENTIC_MEMORY_BACKEND=mongodb
export FIREFLY_AGENTIC_MEMORY_MONGODB_URL=mongodb://localhost:27017/
export FIREFLY_AGENTIC_MEMORY_MONGODB_DATABASE=firefly_memory
export FIREFLY_AGENTIC_MEMORY_MONGODB_COLLECTION=entries
export FIREFLY_AGENTIC_MEMORY_MONGODB_POOL_SIZE=10

The store automatically creates the entries collection (in the firefly_memory database by default) and these indexes:

// Each document is the serialized MemoryEntry plus namespace/scope fields:
{
  entry_id: "abc123",
  namespace: "working:session-42",
  scope: "working",
  key: "doc_type",
  content: { ... },
  metadata: { ... },
  created_at: ISODate("..."),
  expires_at: null,
  importance: 0.5
}

// Indexes
db.entries.createIndex({ namespace: 1 })
db.entries.createIndex({ namespace: 1, key: 1 },
                       { partialFilterExpression: { key: { $exists: true } } })
db.entries.createIndex({ expires_at: 1 },
                       { partialFilterExpression: { expires_at: { $exists: true } } })
db.entries.createIndex({ entry_id: 1 }, { unique: true })

Custom Backends

Implement the MemoryStore protocol for Redis, SQL, or any other backend:

from fireflyframework_agentic.memory import MemoryStore, MemoryEntry

class RedisStore:
    def save(self, namespace: str, entry: MemoryEntry) -> None: ...
    def load(self, namespace: str) -> list[MemoryEntry]: ...
    def load_by_key(self, namespace: str, key: str) -> MemoryEntry | None: ...
    def delete(self, namespace: str, entry_id: str) -> None: ...
    def clear(self, namespace: str) -> None: ...

Conversation Export & Import

ConversationMemory supports exporting and importing conversations as JSON-serialisable dictionaries. This is useful for backup, migration, debugging, and cross-service conversation transfer.

# Export a conversation
data = conv_mem.export_conversation(conversation_id)
# data is a dict with: conversation_id, turns, summary, total_tokens

# Import into another instance (or after restart)
new_id = conv_mem.import_conversation(data)
# Or override the conversation ID:
new_id = conv_mem.import_conversation(data, conversation_id="custom-id")

Exported turns include turn_id, user_prompt, assistant_response, token_estimate, and metadata. Note that raw_messages (pydantic-ai ModelMessage objects) are not exported as they are not portable.


LLM-Based Summarization

create_llm_summarizer() is a factory function that returns a callable suitable for ConversationMemory's summarizer parameter. It uses an ephemeral Pydantic AI agent to compress evicted turns into a concise summary.

from fireflyframework_agentic.memory.summarization import create_llm_summarizer

summarizer = create_llm_summarizer(model="openai:gpt-4o-mini")
conv_mem = ConversationMemory(
    max_tokens=4_000,
    summarize_threshold=3_000,
    summarizer=summarizer,
)

Parameters:

  • model — Pydantic AI model string or Model instance. When None, uses the framework default model.
  • prompt_template — Custom prompt template. Must contain a {turns_text} placeholder.
  • max_summary_tokens — Soft limit on summary length (guidance, not enforced).

If the LLM call fails, the factory falls back to a non-LLM truncation strategy that extracts key sentences from the most recent turns.


MemoryManager

MemoryManager is the single entry point for conversation and working memory. It is the object you attach to agents, delegation routers, and pipelines. The constructor takes keyword-only arguments:

from fireflyframework_agentic.memory import MemoryManager

mgr = MemoryManager(
    store=None,                       # defaults to InMemoryStore
    max_conversation_tokens=32_000,
    summarize_threshold=10,
    working_scope_id="main-session",
)

# Conversation
cid = mgr.new_conversation()
mgr.add_turn(cid, "hello", "hi", raw_messages)
history = mgr.get_message_history(cid)

# Working memory
mgr.set_fact("doc_type", "invoice")
mgr.get_fact("doc_type") # "invoice"
print(mgr.get_working_context())  # text block for prompt injection

From Configuration

MemoryManager.from_config() is the canonical env-driven wiring path. It reads memory_backend, memory_max_conversation_tokens, and memory_summarize_threshold from the framework config, selects the matching backend (in_memory / file / postgres / mongodb), raises if a required URL is missing, and calls initialize() on database backends for you:

mgr = MemoryManager.from_config()

Accessors and Lifecycle

The composed subsystems are reachable directly, and lifecycle helpers clear state:

mgr.conversation  # the ConversationMemory instance
mgr.working       # the WorkingMemory instance
mgr.store         # the underlying MemoryStore

mgr.clear_conversation(cid)  # clear one conversation
mgr.clear_working()          # clear working memory
mgr.clear_all()              # clear both

Forking

When delegating to a sub-agent or branching a pipeline, fork() creates a child manager that shares conversation memory but has independent working memory:

child = mgr.fork(working_scope_id="sub-agent-classify")
child.set_fact("classification", "invoice")
# parent's working memory is unaffected

Integration Points

Agents

agent = FireflyAgent(name="bot", model="openai:gpt-4o", memory=mgr)
result = await agent.run("Hi", conversation_id=cid)
# message_history is auto-injected, new messages auto-stored

Multi-Agent Delegation

from fireflyframework_agentic.agents.delegation import DelegationRouter, RoundRobinStrategy

router = DelegationRouter([agent_a, agent_b], RoundRobinStrategy(), memory=mgr)
result = await router.route("Translate this text.")
# delegated agent receives a forked memory scope

Pipelines

from fireflyframework_agentic.pipeline.context import PipelineContext

ctx = PipelineContext(inputs=data, memory=mgr)
result = await engine.run(context=ctx)
# AgentStep and ReasoningStep automatically propagate memory

Reasoning Patterns

Memory is injected into the reasoning state dict as state["memory"]:

result = await pattern.execute(agent, "Analyze this", memory=mgr)
# Inside pattern hooks: state["memory"].working.set("key", "value")

Host Services

The framework is a pure in-process library: it serves no HTTP port. When a host service exposes agents over its own transport, it owns the wiring of conversation_id from the inbound request to agent.run(..., conversation_id=...). The memory backend simply provides the stateful continuity behind that call.


Configuration

Memory settings are configured via environment variables (prefix FIREFLY_AGENTIC_) and consumed by MemoryManager.from_config():

# Core
export FIREFLY_AGENTIC_MEMORY_BACKEND=in_memory  # in_memory | file | postgres | mongodb
export FIREFLY_AGENTIC_MEMORY_MAX_CONVERSATION_TOKENS=128000
export FIREFLY_AGENTIC_MEMORY_SUMMARIZE_THRESHOLD=10  # token count (small default; raise for prod)
export FIREFLY_AGENTIC_MEMORY_FILE_DIR=.firefly_memory  # used by the "file" backend

# PostgreSQL backend
export FIREFLY_AGENTIC_MEMORY_POSTGRES_URL=postgresql://user:pass@localhost/firefly
export FIREFLY_AGENTIC_MEMORY_POSTGRES_POOL_SIZE=10
export FIREFLY_AGENTIC_MEMORY_POSTGRES_POOL_MIN_SIZE=2
export FIREFLY_AGENTIC_MEMORY_POSTGRES_SCHEMA=firefly_memory

# MongoDB backend
export FIREFLY_AGENTIC_MEMORY_MONGODB_URL=mongodb://localhost:27017/
export FIREFLY_AGENTIC_MEMORY_MONGODB_DATABASE=firefly_memory
export FIREFLY_AGENTIC_MEMORY_MONGODB_COLLECTION=entries
export FIREFLY_AGENTIC_MEMORY_MONGODB_POOL_SIZE=10

Note: the from_config() env path supports in_memory, file, postgres, and mongodb. SQLiteStore is constructed explicitly (SQLiteStore(path)) and passed via MemoryManager(store=...).