Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/fireflyframework_genai/agents/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
RetryMiddleware,
ValidationMiddleware,
)
from fireflyframework_genai.agents.prompt_cache import CacheStatistics, PromptCacheMiddleware
from fireflyframework_genai.agents.cache import ResultCache
from fireflyframework_genai.agents.context import AgentContext
from fireflyframework_genai.agents.decorators import firefly_agent
Expand All @@ -48,6 +47,7 @@
MiddlewareChain,
MiddlewareContext,
)
from fireflyframework_genai.agents.prompt_cache import CacheStatistics, PromptCacheMiddleware
from fireflyframework_genai.agents.registry import AgentInfo, AgentRegistry, agent_registry
from fireflyframework_genai.agents.templates import (
create_classifier_agent,
Expand Down
6 changes: 3 additions & 3 deletions src/fireflyframework_genai/exposure/rest/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@
if TYPE_CHECKING:
from fastapi import APIRouter # type: ignore[import-not-found]

logger = logging.getLogger(__name__)

from fireflyframework_genai.agents.registry import agent_registry
from fireflyframework_genai.exposure.rest.schemas import AgentRequest, AgentResponse
from fireflyframework_genai.exposure.rest.streaming import sse_stream, sse_stream_incremental
from fireflyframework_genai.memory.manager import MemoryManager

logger = logging.getLogger(__name__)

# Server-side memory manager for REST conversations
_rest_memory = MemoryManager(working_scope_id="rest")

Expand Down Expand Up @@ -83,7 +83,7 @@ async def run_agent(name: str, request: AgentRequest) -> AgentResponse:
result = await agent.run(prompt, deps=request.deps, conversation_id=conv_id)
output = result.output if hasattr(result, "output") else str(result)
return AgentResponse(agent_name=name, output=output)
except Exception as exc:
except Exception:
logger.exception("Agent '%s' run failed", name)
return AgentResponse(agent_name=name, output=None, success=False, error="Internal server error")

Expand Down
2 changes: 1 addition & 1 deletion src/fireflyframework_genai/exposure/rest/websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ async def agent_ws(websocket: WebSocket, name: str) -> None:
# Use a per-connection memory scope to avoid cross-talk between
# concurrent WebSocket sessions sharing the same agent.
conn_id = uuid.uuid4().hex[:8]
conn_memory = _ws_memory.fork(working_scope_id=f"ws:{conn_id}")
_ws_memory.fork(working_scope_id=f"ws:{conn_id}")

try:
while True:
Expand Down
8 changes: 3 additions & 5 deletions src/fireflyframework_genai/memory/database_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,13 @@
from datetime import UTC, datetime
from typing import Any

from fireflyframework_genai.exceptions import DatabaseConnectionError, DatabaseStoreError
from fireflyframework_genai.memory.types import MemoryEntry

_SAFE_IDENTIFIER = re.compile(r"^[a-zA-Z_][a-zA-Z0-9_]*$")

_sync_pool = ThreadPoolExecutor(max_workers=4)

from fireflyframework_genai.exceptions import DatabaseConnectionError, DatabaseStoreError
from fireflyframework_genai.memory.types import MemoryEntry

logger = logging.getLogger(__name__)


Expand All @@ -72,8 +72,6 @@ def _run_sync(coro: Any) -> Any:

if loop is not None:
# Already inside an event loop -- offload to a background thread.
import concurrent.futures

future = _sync_pool.submit(asyncio.run, coro)
return future.result()
return asyncio.run(coro)
Expand Down
2 changes: 1 addition & 1 deletion src/fireflyframework_genai/memory/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def from_config(cls) -> MemoryManager:

from fireflyframework_genai.config import get_config

def _run_sync(coro: object) -> object:
def _run_sync(coro: Any) -> Any:
"""Run *coro* synchronously, safe even when an event loop is already running."""
try:
loop = asyncio.get_running_loop()
Expand Down
13 changes: 6 additions & 7 deletions src/fireflyframework_genai/studio/api/assistant.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import re
from typing import Any

from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Query # type: ignore[import-not-found]
from fastapi import APIRouter, Query, WebSocket, WebSocketDisconnect # type: ignore[import-not-found]
from pydantic import BaseModel # type: ignore[import-not-found]
from pydantic_ai.usage import UsageLimits # type: ignore[import-not-found]

Expand Down Expand Up @@ -305,10 +305,7 @@ async def _handle_chat_blocking(
usage_limits=UsageLimits(request_limit=_DEFAULT_REQUEST_LIMIT),
)

if hasattr(result, "output"):
full_text = str(result.output) if result.output else ""
else:
full_text = str(result)
full_text = (str(result.output) if result.output else "") if hasattr(result, "output") else str(result)

tool_calls = _extract_tool_calls(result)

Expand Down Expand Up @@ -638,9 +635,11 @@ def _build_project_context(canvas: Any = None, project_name: str = "") -> str:

# Current project
try:
from fireflyframework_genai.studio.projects import list_projects
from fireflyframework_genai.studio.config import StudioConfig
from fireflyframework_genai.studio.projects import ProjectManager

projects = list_projects()
pm = ProjectManager(StudioConfig().projects_dir)
projects = pm.list_all()
if projects:
names = [p.name for p in projects]
parts.append(f"[CONTEXT] Active projects: {', '.join(names)}.")
Expand Down
3 changes: 1 addition & 2 deletions src/fireflyframework_genai/studio/api/custom_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
ToolParameter,
)


# ---------------------------------------------------------------------------
# Request / response models
# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -584,7 +583,7 @@ async def test_tool(name: str) -> dict[str, Any]:
"response_time": elapsed,
"result": str(result)[:1000],
}
except asyncio.TimeoutError:
except TimeoutError:
return {"status": "timeout", "tool_name": f"custom:{name}", "error": "Request timed out after 15s"}
except Exception as exc:
return {"status": "error", "tool_name": f"custom:{name}", "error": str(exc)}
Expand Down
2 changes: 1 addition & 1 deletion src/fireflyframework_genai/studio/api/graphql_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ async def run_pipeline(self, project: str, input: str) -> ExecutionResult:
start_time = time.monotonic()
try:
engine = compile_graph(graph_model)
result = await engine.run(input)
result = await engine.run(inputs=input)
duration_ms = round((time.monotonic() - start_time) * 1000, 2)
return ExecutionResult(
execution_id=execution_id,
Expand Down
6 changes: 3 additions & 3 deletions src/fireflyframework_genai/studio/api/project_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ async def run_pipeline(name: str, body: RunRequest) -> dict[str, Any]:

try:
engine = compile_graph(graph_model)
result = await engine.run(body.input)
result = await engine.run(inputs=body.input)
except Exception as exc:
logger.exception("Pipeline execution failed for project '%s'", name)
raise HTTPException(status_code=500, detail=str(exc)) from exc
Expand Down Expand Up @@ -195,7 +195,7 @@ async def _run_in_background() -> None:
start_time = time.monotonic()
try:
engine = compile_graph(graph_model)
result = await engine.run(body.input)
result = await engine.run(inputs=body.input)
duration_ms = round((time.monotonic() - start_time) * 1000, 2)
_executions[execution_id].update({
"status": "completed",
Expand Down Expand Up @@ -263,7 +263,7 @@ async def upload_file(name: str, file: UploadFile) -> dict[str, Any]:

try:
engine = compile_graph(graph_model)
result = await engine.run(inputs)
result = await engine.run(inputs=inputs)
except Exception as exc:
logger.exception("File upload pipeline execution failed for project '%s'", name)
raise HTTPException(status_code=500, detail=str(exc)) from exc
Expand Down
35 changes: 13 additions & 22 deletions src/fireflyframework_genai/studio/api/smith.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from __future__ import annotations

import asyncio
import contextlib
import json
import logging
import re
Expand Down Expand Up @@ -177,12 +178,10 @@ async def smith_ws(
logger.info("Smith WebSocket disconnected")
except Exception as exc:
logger.exception("Smith WebSocket error")
try:
with contextlib.suppress(Exception):
await websocket.send_json(
{"type": "error", "message": str(exc)}
)
except Exception:
pass

return router

Expand Down Expand Up @@ -228,10 +227,8 @@ async def _handle_generate(

# Pass user name so Smith can personalise responses
user_name = ""
try:
with contextlib.suppress(Exception):
user_name = settings.user_profile.name or ""
except Exception:
pass

# Build shared cross-agent context for generation
shared_context = ""
Expand Down Expand Up @@ -314,7 +311,6 @@ async def _handle_chat(
from fireflyframework_genai.studio.assistant.smith import (
create_smith_agent,
)

from fireflyframework_genai.studio.settings import load_settings as _load_settings

_user_name = ""
Expand Down Expand Up @@ -356,10 +352,7 @@ async def _handle_chat(
message_history=message_history,
)

if hasattr(result, "output"):
full_text = str(result.output) if result.output else ""
else:
full_text = str(result)
full_text = (str(result.output) if result.output else "") if hasattr(result, "output") else str(result)

if hasattr(result, "new_messages"):
message_history.extend(result.new_messages())
Expand All @@ -383,9 +376,9 @@ async def _handle_chat(

# Send narrative as chat tokens (or a brief note if empty)
chat_text = narrative or "Code generated — see the Code tab."
_CHUNK_SIZE = 80
for i in range(0, len(chat_text), _CHUNK_SIZE):
chunk = chat_text[i : i + _CHUNK_SIZE]
_chunk_size = 80
for i in range(0, len(chat_text), _chunk_size):
chunk = chat_text[i : i + _chunk_size]
await websocket.send_json(
{"type": "smith_token", "content": chunk}
)
Expand All @@ -403,9 +396,9 @@ async def _handle_chat(
)
else:
# No code blocks — send as regular chat
_CHUNK_SIZE = 80
for i in range(0, len(full_text), _CHUNK_SIZE):
chunk = full_text[i : i + _CHUNK_SIZE]
_chunk_size = 80
for i in range(0, len(full_text), _chunk_size):
chunk = full_text[i : i + _chunk_size]
await websocket.send_json(
{"type": "smith_token", "content": chunk}
)
Expand Down Expand Up @@ -501,7 +494,7 @@ async def _handle_execute(
stdout, stderr = await asyncio.wait_for(
proc.communicate(), timeout=timeout
)
except asyncio.TimeoutError:
except TimeoutError:
proc.kill()
await proc.communicate()
await websocket.send_json(
Expand Down Expand Up @@ -531,10 +524,8 @@ async def _handle_execute(
finally:
# Clean up temp file
if tmp_file is not None:
try:
with contextlib.suppress(Exception):
tmp_file.unlink(missing_ok=True)
except Exception:
pass


async def _handle_approve_command(
Expand Down Expand Up @@ -601,7 +592,7 @@ async def _handle_approve_command(
stdout, stderr = await asyncio.wait_for(
proc.communicate(), timeout=timeout
)
except asyncio.TimeoutError:
except TimeoutError:
proc.kill()
await proc.communicate()
await websocket.send_json(
Expand Down
3 changes: 3 additions & 0 deletions src/fireflyframework_genai/studio/api/tunnel.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,14 @@ def create_tunnel_router(port: int = 8470) -> APIRouter:

@router.get("/status")
async def tunnel_status() -> dict[str, Any]:
assert _tunnel is not None
status = _tunnel.get_status()
status["cloudflared_installed"] = _tunnel.is_available()
return status

@router.post("/start")
async def tunnel_start() -> dict[str, Any]:
assert _tunnel is not None
if not _tunnel.is_available():
raise HTTPException(
status_code=422,
Expand All @@ -59,6 +61,7 @@ async def tunnel_start() -> dict[str, Any]:

@router.post("/stop")
async def tunnel_stop() -> dict[str, Any]:
assert _tunnel is not None
await _tunnel.stop()
return {"status": "stopped"}

Expand Down
10 changes: 7 additions & 3 deletions src/fireflyframework_genai/studio/assistant/shared_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,16 @@ def build_shared_context(

# 1. Project metadata
try:
from fireflyframework_genai.studio.projects import get_project
from fireflyframework_genai.studio.config import StudioConfig
from fireflyframework_genai.studio.projects import ProjectManager

project = get_project(project_name)
pm = ProjectManager(StudioConfig().projects_dir)
project = next((p for p in pm.list_all() if p.name == project_name), None)
if project:
desc = getattr(project, "description", "") or "No description"
desc = project.description or "No description"
parts.append(f"[PROJECT] {project.name}: {desc}")
elif project_name:
parts.append(f"[PROJECT] {project_name}")
except Exception:
if project_name:
parts.append(f"[PROJECT] {project_name}")
Expand Down
8 changes: 4 additions & 4 deletions src/fireflyframework_genai/studio/assistant/smith.py
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ async def validate_python(code: str) -> str:
if proc.returncode == 0:
return json.dumps({"valid": True})
return json.dumps({"valid": False, "error": stderr.decode("utf-8", errors="replace")})
except _asyncio.TimeoutError:
except TimeoutError:
proc.kill() # type: ignore[union-attr]
return json.dumps({"valid": False, "error": "Validation timed out after 10s"})
finally:
Expand All @@ -512,7 +512,7 @@ async def run_python(code: str) -> str:
)
try:
stdout, stderr = await _asyncio.wait_for(proc.communicate(), timeout=30)
except _asyncio.TimeoutError:
except TimeoutError:
proc.kill()
await proc.communicate()
return json.dumps({
Expand Down Expand Up @@ -546,7 +546,7 @@ async def run_shell(command: str) -> str:
)
try:
stdout, stderr = await _asyncio.wait_for(proc.communicate(), timeout=30)
except _asyncio.TimeoutError:
except TimeoutError:
proc.kill()
await proc.communicate()
return json.dumps({
Expand All @@ -570,7 +570,7 @@ async def get_project_info() -> str:
try:
settings = load_settings()
return json.dumps({
"user": settings.user_profile.display_name,
"user": settings.user_profile.name,
"model": settings.model_defaults.default_model,
})
except Exception:
Expand Down
Loading
Loading