diff --git a/src/fireflyframework_genai/agents/__init__.py b/src/fireflyframework_genai/agents/__init__.py index d53c2c8d..ee169c7e 100644 --- a/src/fireflyframework_genai/agents/__init__.py +++ b/src/fireflyframework_genai/agents/__init__.py @@ -29,7 +29,6 @@ RetryMiddleware, ValidationMiddleware, ) -from fireflyframework_genai.agents.prompt_cache import CacheStatistics, PromptCacheMiddleware from fireflyframework_genai.agents.cache import ResultCache from fireflyframework_genai.agents.context import AgentContext from fireflyframework_genai.agents.decorators import firefly_agent @@ -48,6 +47,7 @@ MiddlewareChain, MiddlewareContext, ) +from fireflyframework_genai.agents.prompt_cache import CacheStatistics, PromptCacheMiddleware from fireflyframework_genai.agents.registry import AgentInfo, AgentRegistry, agent_registry from fireflyframework_genai.agents.templates import ( create_classifier_agent, diff --git a/src/fireflyframework_genai/exposure/rest/router.py b/src/fireflyframework_genai/exposure/rest/router.py index dbbe8e00..a5581318 100644 --- a/src/fireflyframework_genai/exposure/rest/router.py +++ b/src/fireflyframework_genai/exposure/rest/router.py @@ -26,13 +26,13 @@ if TYPE_CHECKING: from fastapi import APIRouter # type: ignore[import-not-found] -logger = logging.getLogger(__name__) - from fireflyframework_genai.agents.registry import agent_registry from fireflyframework_genai.exposure.rest.schemas import AgentRequest, AgentResponse from fireflyframework_genai.exposure.rest.streaming import sse_stream, sse_stream_incremental from fireflyframework_genai.memory.manager import MemoryManager +logger = logging.getLogger(__name__) + # Server-side memory manager for REST conversations _rest_memory = MemoryManager(working_scope_id="rest") @@ -83,7 +83,7 @@ async def run_agent(name: str, request: AgentRequest) -> AgentResponse: result = await agent.run(prompt, deps=request.deps, conversation_id=conv_id) output = result.output if hasattr(result, "output") else str(result) return AgentResponse(agent_name=name, output=output) - except Exception as exc: + except Exception: logger.exception("Agent '%s' run failed", name) return AgentResponse(agent_name=name, output=None, success=False, error="Internal server error") diff --git a/src/fireflyframework_genai/exposure/rest/websocket.py b/src/fireflyframework_genai/exposure/rest/websocket.py index 45479b36..ae9a768b 100644 --- a/src/fireflyframework_genai/exposure/rest/websocket.py +++ b/src/fireflyframework_genai/exposure/rest/websocket.py @@ -73,7 +73,7 @@ async def agent_ws(websocket: WebSocket, name: str) -> None: # Use a per-connection memory scope to avoid cross-talk between # concurrent WebSocket sessions sharing the same agent. conn_id = uuid.uuid4().hex[:8] - conn_memory = _ws_memory.fork(working_scope_id=f"ws:{conn_id}") + _ws_memory.fork(working_scope_id=f"ws:{conn_id}") try: while True: diff --git a/src/fireflyframework_genai/memory/database_store.py b/src/fireflyframework_genai/memory/database_store.py index b68fba62..727de654 100644 --- a/src/fireflyframework_genai/memory/database_store.py +++ b/src/fireflyframework_genai/memory/database_store.py @@ -53,13 +53,13 @@ from datetime import UTC, datetime from typing import Any +from fireflyframework_genai.exceptions import DatabaseConnectionError, DatabaseStoreError +from fireflyframework_genai.memory.types import MemoryEntry + _SAFE_IDENTIFIER = re.compile(r"^[a-zA-Z_][a-zA-Z0-9_]*$") _sync_pool = ThreadPoolExecutor(max_workers=4) -from fireflyframework_genai.exceptions import DatabaseConnectionError, DatabaseStoreError -from fireflyframework_genai.memory.types import MemoryEntry - logger = logging.getLogger(__name__) @@ -72,8 +72,6 @@ def _run_sync(coro: Any) -> Any: if loop is not None: # Already inside an event loop -- offload to a background thread. - import concurrent.futures - future = _sync_pool.submit(asyncio.run, coro) return future.result() return asyncio.run(coro) diff --git a/src/fireflyframework_genai/memory/manager.py b/src/fireflyframework_genai/memory/manager.py index 6b0a371e..1d1e26d2 100644 --- a/src/fireflyframework_genai/memory/manager.py +++ b/src/fireflyframework_genai/memory/manager.py @@ -91,7 +91,7 @@ def from_config(cls) -> MemoryManager: from fireflyframework_genai.config import get_config - def _run_sync(coro: object) -> object: + def _run_sync(coro: Any) -> Any: """Run *coro* synchronously, safe even when an event loop is already running.""" try: loop = asyncio.get_running_loop() diff --git a/src/fireflyframework_genai/studio/api/assistant.py b/src/fireflyframework_genai/studio/api/assistant.py index 51d80931..715abd76 100644 --- a/src/fireflyframework_genai/studio/api/assistant.py +++ b/src/fireflyframework_genai/studio/api/assistant.py @@ -30,7 +30,7 @@ import re from typing import Any -from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Query # type: ignore[import-not-found] +from fastapi import APIRouter, Query, WebSocket, WebSocketDisconnect # type: ignore[import-not-found] from pydantic import BaseModel # type: ignore[import-not-found] from pydantic_ai.usage import UsageLimits # type: ignore[import-not-found] @@ -305,10 +305,7 @@ async def _handle_chat_blocking( usage_limits=UsageLimits(request_limit=_DEFAULT_REQUEST_LIMIT), ) - if hasattr(result, "output"): - full_text = str(result.output) if result.output else "" - else: - full_text = str(result) + full_text = (str(result.output) if result.output else "") if hasattr(result, "output") else str(result) tool_calls = _extract_tool_calls(result) @@ -638,9 +635,11 @@ def _build_project_context(canvas: Any = None, project_name: str = "") -> str: # Current project try: - from fireflyframework_genai.studio.projects import list_projects + from fireflyframework_genai.studio.config import StudioConfig + from fireflyframework_genai.studio.projects import ProjectManager - projects = list_projects() + pm = ProjectManager(StudioConfig().projects_dir) + projects = pm.list_all() if projects: names = [p.name for p in projects] parts.append(f"[CONTEXT] Active projects: {', '.join(names)}.") diff --git a/src/fireflyframework_genai/studio/api/custom_tools.py b/src/fireflyframework_genai/studio/api/custom_tools.py index 602ea946..063381ec 100644 --- a/src/fireflyframework_genai/studio/api/custom_tools.py +++ b/src/fireflyframework_genai/studio/api/custom_tools.py @@ -33,7 +33,6 @@ ToolParameter, ) - # --------------------------------------------------------------------------- # Request / response models # --------------------------------------------------------------------------- @@ -584,7 +583,7 @@ async def test_tool(name: str) -> dict[str, Any]: "response_time": elapsed, "result": str(result)[:1000], } - except asyncio.TimeoutError: + except TimeoutError: return {"status": "timeout", "tool_name": f"custom:{name}", "error": "Request timed out after 15s"} except Exception as exc: return {"status": "error", "tool_name": f"custom:{name}", "error": str(exc)} diff --git a/src/fireflyframework_genai/studio/api/graphql_api.py b/src/fireflyframework_genai/studio/api/graphql_api.py index e8fadb26..56e71913 100644 --- a/src/fireflyframework_genai/studio/api/graphql_api.py +++ b/src/fireflyframework_genai/studio/api/graphql_api.py @@ -201,7 +201,7 @@ async def run_pipeline(self, project: str, input: str) -> ExecutionResult: start_time = time.monotonic() try: engine = compile_graph(graph_model) - result = await engine.run(input) + result = await engine.run(inputs=input) duration_ms = round((time.monotonic() - start_time) * 1000, 2) return ExecutionResult( execution_id=execution_id, diff --git a/src/fireflyframework_genai/studio/api/project_api.py b/src/fireflyframework_genai/studio/api/project_api.py index b812d1fd..f6fc924f 100644 --- a/src/fireflyframework_genai/studio/api/project_api.py +++ b/src/fireflyframework_genai/studio/api/project_api.py @@ -148,7 +148,7 @@ async def run_pipeline(name: str, body: RunRequest) -> dict[str, Any]: try: engine = compile_graph(graph_model) - result = await engine.run(body.input) + result = await engine.run(inputs=body.input) except Exception as exc: logger.exception("Pipeline execution failed for project '%s'", name) raise HTTPException(status_code=500, detail=str(exc)) from exc @@ -195,7 +195,7 @@ async def _run_in_background() -> None: start_time = time.monotonic() try: engine = compile_graph(graph_model) - result = await engine.run(body.input) + result = await engine.run(inputs=body.input) duration_ms = round((time.monotonic() - start_time) * 1000, 2) _executions[execution_id].update({ "status": "completed", @@ -263,7 +263,7 @@ async def upload_file(name: str, file: UploadFile) -> dict[str, Any]: try: engine = compile_graph(graph_model) - result = await engine.run(inputs) + result = await engine.run(inputs=inputs) except Exception as exc: logger.exception("File upload pipeline execution failed for project '%s'", name) raise HTTPException(status_code=500, detail=str(exc)) from exc diff --git a/src/fireflyframework_genai/studio/api/smith.py b/src/fireflyframework_genai/studio/api/smith.py index b1689b1c..fa7760e6 100644 --- a/src/fireflyframework_genai/studio/api/smith.py +++ b/src/fireflyframework_genai/studio/api/smith.py @@ -41,6 +41,7 @@ from __future__ import annotations import asyncio +import contextlib import json import logging import re @@ -177,12 +178,10 @@ async def smith_ws( logger.info("Smith WebSocket disconnected") except Exception as exc: logger.exception("Smith WebSocket error") - try: + with contextlib.suppress(Exception): await websocket.send_json( {"type": "error", "message": str(exc)} ) - except Exception: - pass return router @@ -228,10 +227,8 @@ async def _handle_generate( # Pass user name so Smith can personalise responses user_name = "" - try: + with contextlib.suppress(Exception): user_name = settings.user_profile.name or "" - except Exception: - pass # Build shared cross-agent context for generation shared_context = "" @@ -314,7 +311,6 @@ async def _handle_chat( from fireflyframework_genai.studio.assistant.smith import ( create_smith_agent, ) - from fireflyframework_genai.studio.settings import load_settings as _load_settings _user_name = "" @@ -356,10 +352,7 @@ async def _handle_chat( message_history=message_history, ) - if hasattr(result, "output"): - full_text = str(result.output) if result.output else "" - else: - full_text = str(result) + full_text = (str(result.output) if result.output else "") if hasattr(result, "output") else str(result) if hasattr(result, "new_messages"): message_history.extend(result.new_messages()) @@ -383,9 +376,9 @@ async def _handle_chat( # Send narrative as chat tokens (or a brief note if empty) chat_text = narrative or "Code generated — see the Code tab." - _CHUNK_SIZE = 80 - for i in range(0, len(chat_text), _CHUNK_SIZE): - chunk = chat_text[i : i + _CHUNK_SIZE] + _chunk_size = 80 + for i in range(0, len(chat_text), _chunk_size): + chunk = chat_text[i : i + _chunk_size] await websocket.send_json( {"type": "smith_token", "content": chunk} ) @@ -403,9 +396,9 @@ async def _handle_chat( ) else: # No code blocks — send as regular chat - _CHUNK_SIZE = 80 - for i in range(0, len(full_text), _CHUNK_SIZE): - chunk = full_text[i : i + _CHUNK_SIZE] + _chunk_size = 80 + for i in range(0, len(full_text), _chunk_size): + chunk = full_text[i : i + _chunk_size] await websocket.send_json( {"type": "smith_token", "content": chunk} ) @@ -501,7 +494,7 @@ async def _handle_execute( stdout, stderr = await asyncio.wait_for( proc.communicate(), timeout=timeout ) - except asyncio.TimeoutError: + except TimeoutError: proc.kill() await proc.communicate() await websocket.send_json( @@ -531,10 +524,8 @@ async def _handle_execute( finally: # Clean up temp file if tmp_file is not None: - try: + with contextlib.suppress(Exception): tmp_file.unlink(missing_ok=True) - except Exception: - pass async def _handle_approve_command( @@ -601,7 +592,7 @@ async def _handle_approve_command( stdout, stderr = await asyncio.wait_for( proc.communicate(), timeout=timeout ) - except asyncio.TimeoutError: + except TimeoutError: proc.kill() await proc.communicate() await websocket.send_json( diff --git a/src/fireflyframework_genai/studio/api/tunnel.py b/src/fireflyframework_genai/studio/api/tunnel.py index 9313c7ae..8f5207c3 100644 --- a/src/fireflyframework_genai/studio/api/tunnel.py +++ b/src/fireflyframework_genai/studio/api/tunnel.py @@ -39,12 +39,14 @@ def create_tunnel_router(port: int = 8470) -> APIRouter: @router.get("/status") async def tunnel_status() -> dict[str, Any]: + assert _tunnel is not None status = _tunnel.get_status() status["cloudflared_installed"] = _tunnel.is_available() return status @router.post("/start") async def tunnel_start() -> dict[str, Any]: + assert _tunnel is not None if not _tunnel.is_available(): raise HTTPException( status_code=422, @@ -59,6 +61,7 @@ async def tunnel_start() -> dict[str, Any]: @router.post("/stop") async def tunnel_stop() -> dict[str, Any]: + assert _tunnel is not None await _tunnel.stop() return {"status": "stopped"} diff --git a/src/fireflyframework_genai/studio/assistant/shared_context.py b/src/fireflyframework_genai/studio/assistant/shared_context.py index f3414d9f..f2c05d20 100644 --- a/src/fireflyframework_genai/studio/assistant/shared_context.py +++ b/src/fireflyframework_genai/studio/assistant/shared_context.py @@ -42,12 +42,16 @@ def build_shared_context( # 1. Project metadata try: - from fireflyframework_genai.studio.projects import get_project + from fireflyframework_genai.studio.config import StudioConfig + from fireflyframework_genai.studio.projects import ProjectManager - project = get_project(project_name) + pm = ProjectManager(StudioConfig().projects_dir) + project = next((p for p in pm.list_all() if p.name == project_name), None) if project: - desc = getattr(project, "description", "") or "No description" + desc = project.description or "No description" parts.append(f"[PROJECT] {project.name}: {desc}") + elif project_name: + parts.append(f"[PROJECT] {project_name}") except Exception: if project_name: parts.append(f"[PROJECT] {project_name}") diff --git a/src/fireflyframework_genai/studio/assistant/smith.py b/src/fireflyframework_genai/studio/assistant/smith.py index 11f91906..d0675331 100644 --- a/src/fireflyframework_genai/studio/assistant/smith.py +++ b/src/fireflyframework_genai/studio/assistant/smith.py @@ -488,7 +488,7 @@ async def validate_python(code: str) -> str: if proc.returncode == 0: return json.dumps({"valid": True}) return json.dumps({"valid": False, "error": stderr.decode("utf-8", errors="replace")}) - except _asyncio.TimeoutError: + except TimeoutError: proc.kill() # type: ignore[union-attr] return json.dumps({"valid": False, "error": "Validation timed out after 10s"}) finally: @@ -512,7 +512,7 @@ async def run_python(code: str) -> str: ) try: stdout, stderr = await _asyncio.wait_for(proc.communicate(), timeout=30) - except _asyncio.TimeoutError: + except TimeoutError: proc.kill() await proc.communicate() return json.dumps({ @@ -546,7 +546,7 @@ async def run_shell(command: str) -> str: ) try: stdout, stderr = await _asyncio.wait_for(proc.communicate(), timeout=30) - except _asyncio.TimeoutError: + except TimeoutError: proc.kill() await proc.communicate() return json.dumps({ @@ -570,7 +570,7 @@ async def get_project_info() -> str: try: settings = load_settings() return json.dumps({ - "user": settings.user_profile.display_name, + "user": settings.user_profile.name, "model": settings.model_defaults.default_model, }) except Exception: diff --git a/src/fireflyframework_genai/studio/runtime.py b/src/fireflyframework_genai/studio/runtime.py index 6244ecce..e33441de 100644 --- a/src/fireflyframework_genai/studio/runtime.py +++ b/src/fireflyframework_genai/studio/runtime.py @@ -99,24 +99,29 @@ async def _start_queue_consumer(self) -> None: qc = self._input_config.queue_config logger.info("Starting %s consumer for topic '%s'", qc.broker, qc.topic_or_queue) + agent_name = f"studio-{self.project_name}" + if qc.broker == "kafka": from fireflyframework_genai.exposure.queues.kafka import KafkaAgentConsumer consumer = KafkaAgentConsumer( + agent_name, topic=qc.topic_or_queue, - group_id=qc.group_id or f"studio-{self.project_name}", + group_id=qc.group_id or agent_name, bootstrap_servers=qc.connection_url or "localhost:9092", ) elif qc.broker == "rabbitmq": from fireflyframework_genai.exposure.queues.rabbitmq import RabbitMQAgentConsumer consumer = RabbitMQAgentConsumer( + agent_name, queue_name=qc.topic_or_queue, - connection_url=qc.connection_url or "amqp://localhost", + url=qc.connection_url or "amqp://localhost", ) elif qc.broker == "redis": from fireflyframework_genai.exposure.queues.redis import RedisAgentConsumer consumer = RedisAgentConsumer( + agent_name, channel=qc.topic_or_queue, - redis_url=qc.connection_url or "redis://localhost", + url=qc.connection_url or "redis://localhost", ) else: logger.warning("Unknown broker: %s", qc.broker) @@ -135,8 +140,8 @@ async def _start_scheduler(self) -> None: logger.info("Starting scheduler: %s (%s)", sc.cron_expression, sc.timezone) try: - from apscheduler import AsyncScheduler - from apscheduler.triggers.cron import CronTrigger + from apscheduler import AsyncScheduler # type: ignore[import-not-found] + from apscheduler.triggers.cron import CronTrigger # type: ignore[import-not-found] scheduler = AsyncScheduler() trigger = CronTrigger.from_crontab(sc.cron_expression, timezone=sc.timezone) diff --git a/src/fireflyframework_genai/studio/tunnel.py b/src/fireflyframework_genai/studio/tunnel.py index d2aedeaa..84a2ba68 100644 --- a/src/fireflyframework_genai/studio/tunnel.py +++ b/src/fireflyframework_genai/studio/tunnel.py @@ -118,8 +118,8 @@ def _reader() -> None: try: await asyncio.wait_for(done.wait(), timeout=timeout) - except asyncio.TimeoutError: - raise TimeoutError("Timed out waiting for tunnel URL") + except TimeoutError: + raise TimeoutError("Timed out waiting for tunnel URL") from None if result: return result