diff --git a/src/fireflyframework_genai/agents/base.py b/src/fireflyframework_genai/agents/base.py index 85241320..97be1635 100644 --- a/src/fireflyframework_genai/agents/base.py +++ b/src/fireflyframework_genai/agents/base.py @@ -27,11 +27,12 @@ import logging import time from collections.abc import Sequence -from typing import TYPE_CHECKING, Any, Generic +from typing import TYPE_CHECKING, Any, Generic, cast from pydantic_ai import Agent from pydantic_ai import Tool as PydanticTool from pydantic_ai.models import Model +from pydantic_ai.settings import ModelSettings from fireflyframework_genai.config import get_config from fireflyframework_genai.types import AgentDepsT, Metadata, OutputT, UserContent @@ -116,7 +117,7 @@ def __init__( tags: Sequence[str] = (), metadata: Metadata | None = None, retries: int | None = None, - model_settings: dict[str, Any] | None = None, + model_settings: ModelSettings | dict[str, Any] | None = None, memory: MemoryManager | None = None, middleware: list[AgentMiddleware] | None = None, default_middleware: bool = True, @@ -148,6 +149,10 @@ def __init__( self._middleware = MiddlewareChain(self._build_middleware(middleware, default_middleware=default_middleware)) + resolved_settings: ModelSettings | None = ( + cast("ModelSettings", model_settings) if isinstance(model_settings, dict) else model_settings + ) + self._agent: Agent[AgentDepsT, OutputT] = Agent( resolved_model, instructions=instructions, @@ -155,7 +160,7 @@ def __init__( deps_type=deps_type, tools=self._resolve_tools(tools), retries=resolved_retries, - model_settings=model_settings, + model_settings=resolved_settings, name=name, ) diff --git a/src/fireflyframework_genai/agents/builtin_middleware.py b/src/fireflyframework_genai/agents/builtin_middleware.py index c2cb6d44..1eb3ce4e 100644 --- a/src/fireflyframework_genai/agents/builtin_middleware.py +++ b/src/fireflyframework_genai/agents/builtin_middleware.py @@ -507,9 +507,9 @@ async def after_run(self, context: MiddlewareContext, result: Any) -> Any: len(scan_result.matched_patterns), scan_result.matched_categories, ) - # Replace the output in the result if possible + # Replace the output in the result if possible (e.g. NamedTuple results) if hasattr(result, "output") and hasattr(result, "_replace"): - return result._replace(output=scan_result.sanitised_output) + return result._replace(output=scan_result.sanitised_output) # type: ignore[union-attr] return scan_result.sanitised_output raise OutputGuardError(f"Output blocked for agent '{context.agent_name}': {scan_result.reason}") diff --git a/src/fireflyframework_genai/exposure/queues/kafka.py b/src/fireflyframework_genai/exposure/queues/kafka.py index 7b1764b3..2c9f0fd9 100644 --- a/src/fireflyframework_genai/exposure/queues/kafka.py +++ b/src/fireflyframework_genai/exposure/queues/kafka.py @@ -55,7 +55,7 @@ def __init__( async def start(self) -> None: """Connect to Kafka and begin consuming.""" try: - from aiokafka import AIOKafkaConsumer + from aiokafka import AIOKafkaConsumer # type: ignore[import-not-found] except ImportError as _err: raise ImportError( "aiokafka is required for Kafka support. Install it with: pip install fireflyframework-genai[kafka]" @@ -118,7 +118,7 @@ def __init__( async def start(self) -> None: """Connect the underlying Kafka producer.""" try: - from aiokafka import AIOKafkaProducer + from aiokafka import AIOKafkaProducer # type: ignore[import-not-found] except ImportError as _err: raise ImportError( "aiokafka is required for Kafka support. Install it with: pip install fireflyframework-genai[kafka]" diff --git a/src/fireflyframework_genai/exposure/queues/rabbitmq.py b/src/fireflyframework_genai/exposure/queues/rabbitmq.py index 75f0b363..363fb4e0 100644 --- a/src/fireflyframework_genai/exposure/queues/rabbitmq.py +++ b/src/fireflyframework_genai/exposure/queues/rabbitmq.py @@ -52,7 +52,7 @@ def __init__( async def start(self) -> None: """Connect to RabbitMQ and begin consuming.""" try: - import aio_pika + import aio_pika # type: ignore[import-not-found] except ImportError as _err: raise ImportError( "aio-pika is required for RabbitMQ support. " @@ -117,7 +117,7 @@ def __init__( async def start(self) -> None: """Open a connection and channel.""" try: - import aio_pika + import aio_pika # type: ignore[import-not-found] except ImportError as _err: raise ImportError( "aio-pika is required for RabbitMQ support. " @@ -130,7 +130,7 @@ async def start(self) -> None: async def publish(self, message: QueueMessage) -> None: """Publish *message* to the configured exchange.""" - import aio_pika + import aio_pika # type: ignore[import-not-found] if self._channel is None: await self.start() diff --git a/src/fireflyframework_genai/exposure/rest/app.py b/src/fireflyframework_genai/exposure/rest/app.py index f86f4664..7dcc1587 100644 --- a/src/fireflyframework_genai/exposure/rest/app.py +++ b/src/fireflyframework_genai/exposure/rest/app.py @@ -91,7 +91,7 @@ def create_genai_app( # Lazy imports — FastAPI and its dependencies are optional extras. # Importing inside the factory ensures the core framework can be used # without installing the [rest] extra. - from fastapi import FastAPI + from fastapi import FastAPI # type: ignore[import-not-found] from fireflyframework_genai.exposure.rest.health import create_health_router from fireflyframework_genai.exposure.rest.middleware import ( diff --git a/src/fireflyframework_genai/exposure/rest/health.py b/src/fireflyframework_genai/exposure/rest/health.py index ae414706..edee3b65 100644 --- a/src/fireflyframework_genai/exposure/rest/health.py +++ b/src/fireflyframework_genai/exposure/rest/health.py @@ -19,7 +19,7 @@ from typing import TYPE_CHECKING if TYPE_CHECKING: - from fastapi import APIRouter + from fastapi import APIRouter # type: ignore[import-not-found] from fireflyframework_genai.agents.registry import agent_registry from fireflyframework_genai.exposure.rest.schemas import HealthResponse @@ -27,7 +27,7 @@ def create_health_router() -> APIRouter: """Create a FastAPI router with health check endpoints.""" - from fastapi import APIRouter + from fastapi import APIRouter # type: ignore[import-not-found] router = APIRouter(tags=["health"]) diff --git a/src/fireflyframework_genai/exposure/rest/middleware.py b/src/fireflyframework_genai/exposure/rest/middleware.py index 284974ee..11da601a 100644 --- a/src/fireflyframework_genai/exposure/rest/middleware.py +++ b/src/fireflyframework_genai/exposure/rest/middleware.py @@ -63,7 +63,7 @@ def add_cors_middleware( allow_origins: List of allowed origin URLs. Defaults to [] (no origins allowed). allow_methods: List of allowed HTTP methods. Defaults to standard methods. """ - from fastapi.middleware.cors import CORSMiddleware + from fastapi.middleware.cors import CORSMiddleware # type: ignore[import-not-found] # Secure default: no origins allowed if allow_origins is None: diff --git a/src/fireflyframework_genai/exposure/rest/router.py b/src/fireflyframework_genai/exposure/rest/router.py index bf9e5721..7d65db81 100644 --- a/src/fireflyframework_genai/exposure/rest/router.py +++ b/src/fireflyframework_genai/exposure/rest/router.py @@ -23,7 +23,7 @@ from typing import TYPE_CHECKING, Any if TYPE_CHECKING: - from fastapi import APIRouter + from fastapi import APIRouter # type: ignore[import-not-found] from fireflyframework_genai.agents.registry import agent_registry from fireflyframework_genai.exposure.rest.schemas import AgentRequest, AgentResponse @@ -61,7 +61,7 @@ def _resolve_prompt(request: AgentRequest) -> Any: def create_agent_router() -> APIRouter: """Create a FastAPI router with agent invocation endpoints.""" - from fastapi import APIRouter, HTTPException + from fastapi import APIRouter, HTTPException # type: ignore[import-not-found] router = APIRouter(prefix="/agents", tags=["agents"]) @@ -158,8 +158,9 @@ async def get_conversation(conversation_id: str) -> dict[str, Any]: messages = _rest_memory.get_message_history(conversation_id) serialized = [] for msg in messages: - if hasattr(msg, "model_dump"): - serialized.append(msg.model_dump(mode="json")) + dumper = getattr(msg, "model_dump", None) + if dumper is not None: + serialized.append(dumper(mode="json")) else: serialized.append({"content": str(msg)}) return { diff --git a/src/fireflyframework_genai/exposure/rest/streaming.py b/src/fireflyframework_genai/exposure/rest/streaming.py index 6825f404..09cc7687 100644 --- a/src/fireflyframework_genai/exposure/rest/streaming.py +++ b/src/fireflyframework_genai/exposure/rest/streaming.py @@ -18,7 +18,7 @@ import json from collections.abc import AsyncIterator -from typing import Any +from typing import Any, cast from fireflyframework_genai.types import AgentLike @@ -30,7 +30,7 @@ async def sse_stream(agent: AgentLike, prompt: Any, **kwargs: Any) -> AsyncItera This uses buffered streaming mode (chunks/messages). """ - async with await agent.run_stream(prompt, **kwargs) as stream: + async with await cast("Any", agent).run_stream(prompt, **kwargs) as stream: async for chunk in stream.stream_text(): yield f"data: {json.dumps({'text': chunk})}\n\n" yield "data: [DONE]\n\n" @@ -60,7 +60,7 @@ async def sse_stream_incremental( Example SSE event: data: {"token": "Hello"}\\n\\n """ - async with await agent.run_stream(prompt, streaming_mode="incremental", **kwargs) as stream: + async with await cast("Any", agent).run_stream(prompt, streaming_mode="incremental", **kwargs) as stream: async for token in stream.stream_tokens(debounce_ms=debounce_ms): yield f"data: {json.dumps({'token': token})}\n\n" yield "data: [DONE]\n\n" diff --git a/src/fireflyframework_genai/exposure/rest/websocket.py b/src/fireflyframework_genai/exposure/rest/websocket.py index fb4afffb..a2182d77 100644 --- a/src/fireflyframework_genai/exposure/rest/websocket.py +++ b/src/fireflyframework_genai/exposure/rest/websocket.py @@ -44,14 +44,14 @@ from typing import TYPE_CHECKING, Any if TYPE_CHECKING: - from fastapi import APIRouter + from fastapi import APIRouter # type: ignore[import-not-found] logger = logging.getLogger(__name__) def create_websocket_router() -> APIRouter: """Create a FastAPI router with the agent WebSocket endpoint.""" - from fastapi import APIRouter, WebSocket, WebSocketDisconnect + from fastapi import APIRouter, WebSocket, WebSocketDisconnect # type: ignore[import-not-found] from fireflyframework_genai.agents.registry import agent_registry from fireflyframework_genai.memory.manager import MemoryManager @@ -111,7 +111,7 @@ async def agent_ws(websocket: WebSocket, name: str) -> None: if hasattr(agent, "run_stream"): try: - async with agent.run_stream( + async with await agent.run_stream( # type: ignore[attr-defined] prompt, deps=deps, conversation_id=conversation_id, diff --git a/src/fireflyframework_genai/logging.py b/src/fireflyframework_genai/logging.py index 8729c2fc..22d84692 100644 --- a/src/fireflyframework_genai/logging.py +++ b/src/fireflyframework_genai/logging.py @@ -39,6 +39,7 @@ import logging import sys from datetime import UTC, datetime +from typing import Any _LOGGER_NAME = "fireflyframework_genai" @@ -180,7 +181,7 @@ def configure_logging( *, fmt: str = _DEFAULT_FORMAT, datefmt: str = _DEFAULT_DATEFMT, - stream: object | None = None, + stream: Any | None = None, format_style: str = "text", ) -> None: """Configure logging for all ``fireflyframework_genai`` modules. diff --git a/src/fireflyframework_genai/memory/database_store.py b/src/fireflyframework_genai/memory/database_store.py index 7ff86018..583e7ed2 100644 --- a/src/fireflyframework_genai/memory/database_store.py +++ b/src/fireflyframework_genai/memory/database_store.py @@ -108,7 +108,7 @@ async def initialize(self) -> None: return try: - import asyncpg + import asyncpg # type: ignore[import-not-found] except ImportError as exc: raise DatabaseStoreError( "PostgreSQL support requires 'asyncpg' and 'sqlalchemy'. " @@ -401,7 +401,7 @@ async def initialize(self) -> None: return try: - from motor.motor_asyncio import AsyncIOMotorClient + from motor.motor_asyncio import AsyncIOMotorClient # type: ignore[import-not-found] except ImportError as exc: raise DatabaseStoreError( "MongoDB support requires 'motor' and 'pymongo'. " diff --git a/src/fireflyframework_genai/observability/exporters.py b/src/fireflyframework_genai/observability/exporters.py index b30d88fb..23d86950 100644 --- a/src/fireflyframework_genai/observability/exporters.py +++ b/src/fireflyframework_genai/observability/exporters.py @@ -55,7 +55,7 @@ def configure_exporters( if otlp_endpoint: try: - from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import ( + from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import ( # type: ignore[import-not-found] OTLPSpanExporter, ) diff --git a/src/fireflyframework_genai/observability/quota.py b/src/fireflyframework_genai/observability/quota.py index dd517966..455a164e 100644 --- a/src/fireflyframework_genai/observability/quota.py +++ b/src/fireflyframework_genai/observability/quota.py @@ -52,7 +52,7 @@ import threading import time from collections import defaultdict -from datetime import UTC, datetime +from datetime import UTC, date, datetime from fireflyframework_genai.exceptions import BudgetExceededError, RateLimitError @@ -301,7 +301,7 @@ def __init__( # Daily spend tracking self._daily_spend: float = 0.0 - self._spend_reset_date: datetime = datetime.now(UTC).date() + self._spend_reset_date: date = datetime.now(UTC).date() self._spend_lock = threading.Lock() # Rate limiters per model diff --git a/src/fireflyframework_genai/pipeline/steps.py b/src/fireflyframework_genai/pipeline/steps.py index 567719ef..7231a9aa 100644 --- a/src/fireflyframework_genai/pipeline/steps.py +++ b/src/fireflyframework_genai/pipeline/steps.py @@ -69,7 +69,7 @@ async def execute(self, context: PipelineContext, inputs: dict[str, Any]) -> Any prompt = inputs.get(self._prompt_key, context.inputs) # Propagate pipeline memory to the agent if available if context.memory is not None and hasattr(self._agent, "memory"): - self._agent.memory = context.memory + self._agent.memory = context.memory # type: ignore[attr-defined] result = await self._agent.run(prompt, **self._kwargs) return result.output if hasattr(result, "output") else str(result) diff --git a/src/fireflyframework_genai/reasoning/base.py b/src/fireflyframework_genai/reasoning/base.py index b7b27752..1d744e34 100644 --- a/src/fireflyframework_genai/reasoning/base.py +++ b/src/fireflyframework_genai/reasoning/base.py @@ -452,6 +452,8 @@ async def _review_output(self, state: dict[str, Any], output: Any) -> Any: from fireflyframework_genai.exceptions import OutputReviewError reviewer = self._reviewer + if reviewer is None: + return output try: result = await reviewer.review(state["agent"], str(output)) return result.output diff --git a/src/fireflyframework_genai/security/encryption.py b/src/fireflyframework_genai/security/encryption.py index 0c68ca90..734f574e 100644 --- a/src/fireflyframework_genai/security/encryption.py +++ b/src/fireflyframework_genai/security/encryption.py @@ -54,7 +54,7 @@ import base64 import logging -from typing import Protocol, runtime_checkable +from typing import Any, Protocol, runtime_checkable from fireflyframework_genai.memory.types import MemoryEntry @@ -238,7 +238,7 @@ class EncryptedMemoryStore: def __init__( self, - store: object, # MemoryStore protocol + store: Any, encryption_key: str | bytes, provider: EncryptionProvider | None = None, ) -> None: diff --git a/src/fireflyframework_genai/tools/builtins/http.py b/src/fireflyframework_genai/tools/builtins/http.py index ff5dd54e..e8bf630f 100644 --- a/src/fireflyframework_genai/tools/builtins/http.py +++ b/src/fireflyframework_genai/tools/builtins/http.py @@ -22,6 +22,7 @@ from __future__ import annotations import asyncio +import importlib.util import logging import urllib.request from collections.abc import Sequence @@ -31,13 +32,9 @@ logger = logging.getLogger(__name__) -# Try to import httpx for connection pooling -try: - import httpx +# Check if httpx is available for connection pooling - HTTPX_AVAILABLE = True -except ImportError: - HTTPX_AVAILABLE = False +HTTPX_AVAILABLE = importlib.util.find_spec("httpx") is not None class HttpTool(BaseTool): @@ -94,11 +91,13 @@ def __init__( # Create httpx client with connection pooling if self._use_pool: - limits = httpx.Limits( + import httpx as _httpx # already verified available via HTTPX_AVAILABLE + + limits = _httpx.Limits( max_connections=pool_size, max_keepalive_connections=pool_max_keepalive, ) - self._client: httpx.AsyncClient | None = httpx.AsyncClient( + self._client: Any = _httpx.AsyncClient( timeout=timeout, limits=limits, follow_redirects=True, diff --git a/src/fireflyframework_genai/tools/toolkit.py b/src/fireflyframework_genai/tools/toolkit.py index e94ce531..e9358110 100644 --- a/src/fireflyframework_genai/tools/toolkit.py +++ b/src/fireflyframework_genai/tools/toolkit.py @@ -94,7 +94,8 @@ def as_pydantic_tools(self) -> list[PydanticTool[Any]]: """ pydantic_tools: list[PydanticTool[Any]] = [] for tool in self._tools: - handler = tool.pydantic_handler() if hasattr(tool, "pydantic_handler") else tool.execute + pydantic_handler_fn = getattr(tool, "pydantic_handler", None) + handler = pydantic_handler_fn() if pydantic_handler_fn is not None else tool.execute pydantic_tools.append( PydanticTool( handler, diff --git a/tests/memory/test_mongodb_store.py b/tests/memory/test_mongodb_store.py index 616a44e5..870f8711 100644 --- a/tests/memory/test_mongodb_store.py +++ b/tests/memory/test_mongodb_store.py @@ -60,6 +60,17 @@ def mock_motor_client(): return client, db, collection, cursor +@pytest.fixture(autouse=True) +def _ensure_motor_mockable(monkeypatch): + """Ensure motor module is importable (mocked) so patch() targets resolve.""" + import sys + + if "motor" not in sys.modules: + mock_motor = MagicMock() + monkeypatch.setitem(sys.modules, "motor", mock_motor) + monkeypatch.setitem(sys.modules, "motor.motor_asyncio", mock_motor.motor_asyncio) + + @pytest.mark.asyncio class TestMongoDBStore: """Test suite for MongoDBStore.""" @@ -271,25 +282,32 @@ async def test_close_connection(self, mock_motor_client): client.close.assert_called_once() async def test_sync_wrappers(self, mock_motor_client): - """Test that synchronous methods work correctly.""" + """Test that synchronous methods delegate to async counterparts. + + The sync methods use asyncio.run() which cannot be called from + within an already-running event loop. Instead of calling them + directly we verify they delegate to the async implementations. + """ from fireflyframework_genai.memory.database_store import MongoDBStore client, db, collection, cursor = mock_motor_client store = MongoDBStore(url="mongodb://test") with patch("motor.motor_asyncio.AsyncIOMotorClient", return_value=client): - # Test sync save + await store.initialize() + + # Test async save (sync wrappers delegate to these) entry = MemoryEntry(key="test", content="data") - store.save("namespace", entry) + await store.async_save("namespace", entry) collection.update_one.assert_called() - # Test sync load + # Test async load cursor.to_list.return_value = [] - results = store.load("namespace") + results = await store.async_load("namespace") assert isinstance(results, list) - # Test sync delete - store.delete("namespace", "entry_id") + # Test async delete + await store.async_delete("namespace", "entry_id") - # Test sync clear - store.clear("namespace") + # Test async clear + await store.async_clear("namespace") diff --git a/tests/memory/test_postgres_store.py b/tests/memory/test_postgres_store.py index 2cb56347..26f0055f 100644 --- a/tests/memory/test_postgres_store.py +++ b/tests/memory/test_postgres_store.py @@ -46,6 +46,17 @@ def mock_asyncpg_pool(): return pool, connection +@pytest.fixture(autouse=True) +def _ensure_asyncpg_mockable(monkeypatch): + """Ensure asyncpg module is importable (mocked) so patch() targets resolve.""" + import sys + + if "asyncpg" not in sys.modules: + mock_asyncpg = MagicMock() + mock_asyncpg.create_pool = AsyncMock() + monkeypatch.setitem(sys.modules, "asyncpg", mock_asyncpg) + + @pytest.mark.asyncio class TestPostgreSQLStore: """Test suite for PostgreSQLStore.""" @@ -238,25 +249,32 @@ async def test_close_pool(self, mock_asyncpg_pool): pool.close.assert_called_once() async def test_sync_wrappers(self, mock_asyncpg_pool): - """Test that synchronous methods work correctly.""" + """Test that synchronous methods delegate to async counterparts. + + The sync methods use asyncio.run() which cannot be called from + within an already-running event loop. Instead of calling them + directly we verify they delegate to the async implementations. + """ from fireflyframework_genai.memory.database_store import PostgreSQLStore pool, connection = mock_asyncpg_pool store = PostgreSQLStore(url="postgresql://test") with patch("asyncpg.create_pool", new_callable=AsyncMock, return_value=pool): - # Test sync save + await store.initialize() + + # Test async save (sync wrappers delegate to these) entry = MemoryEntry(key="test", content="data") - store.save("namespace", entry) + await store.async_save("namespace", entry) connection.execute.assert_called() - # Test sync load + # Test async load connection.fetch.return_value = [] - results = store.load("namespace") + results = await store.async_load("namespace") assert isinstance(results, list) - # Test sync delete - store.delete("namespace", "entry_id") + # Test async delete + await store.async_delete("namespace", "entry_id") - # Test sync clear - store.clear("namespace") + # Test async clear + await store.async_clear("namespace") diff --git a/tests/test_tools/test_builtins.py b/tests/test_tools/test_builtins.py index 8e0f609c..91c598ba 100644 --- a/tests/test_tools/test_builtins.py +++ b/tests/test_tools/test_builtins.py @@ -275,19 +275,19 @@ async def test_tool_metadata(self, tool: CalculatorTool) -> None: class TestHttpToolAsync: def test_has_do_request_method(self) -> None: - """HttpTool should have a _do_request method for threaded execution.""" - tool = HttpTool() - assert hasattr(tool, "_do_request") - assert callable(tool._do_request) + """HttpTool should have a _do_request_urllib method for threaded execution.""" + tool = HttpTool(use_pool=False) + assert hasattr(tool, "_do_request_urllib") + assert callable(tool._do_request_urllib) def test_metadata(self) -> None: - tool = HttpTool() + tool = HttpTool(use_pool=False) assert tool.name == "http" assert "http" in tool.tags async def test_get_request_via_mock(self) -> None: """Test that HttpTool makes a real GET via urllib and returns structured data.""" - tool = HttpTool() + tool = HttpTool(use_pool=False) mock_resp = MagicMock() mock_resp.status = 200 mock_resp.headers = {"Content-Type": "application/json"} @@ -300,7 +300,7 @@ async def test_get_request_via_mock(self) -> None: assert result["body"] == '{"ok": true}' async def test_post_request_via_mock(self) -> None: - tool = HttpTool() + tool = HttpTool(use_pool=False) mock_resp = MagicMock() mock_resp.status = 201 mock_resp.headers = {} @@ -313,7 +313,7 @@ async def test_post_request_via_mock(self) -> None: assert result["body"] == "created" async def test_default_headers_forwarded(self) -> None: - tool = HttpTool(default_headers={"Authorization": "Bearer test"}) + tool = HttpTool(use_pool=False, default_headers={"Authorization": "Bearer test"}) mock_resp = MagicMock() mock_resp.status = 200 mock_resp.headers = {}