Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion surfsense_backend/app/agents/new_chat/chat_deepagent.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@
from app.agents.new_chat.tools.registry import build_tools_async
from app.db import ChatVisibility
from app.services.connector_service import ConnectorService
from app.utils.perf import get_perf_logger

_perf_log = logging.getLogger("surfsense.perf")
_perf_log = get_perf_logger()

# =============================================================================
# Connector Type Mapping
Expand Down
30 changes: 28 additions & 2 deletions surfsense_backend/app/agents/new_chat/tools/knowledge_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import asyncio
import json
import time
from datetime import datetime
from typing import Any

Expand All @@ -19,6 +20,7 @@

from app.db import async_session_maker
from app.services.connector_service import ConnectorService
from app.utils.perf import get_perf_logger

# =============================================================================
# Connector Constants and Normalization
Expand Down Expand Up @@ -412,6 +414,9 @@ async def search_knowledge_base_async(
Returns:
Formatted string with search results
"""
perf = get_perf_logger()
t0 = time.perf_counter()

all_documents: list[dict[str, Any]] = []

# Resolve date range (default last 2 years)
Expand All @@ -423,6 +428,10 @@ async def search_knowledge_base_async(
)

connectors = _normalize_connectors(connectors_to_search, available_connectors)
perf.info(
"[kb_search] searching %d connectors: %s (space=%d, top_k=%d)",
len(connectors), connectors[:5], search_space_id, top_k,
)

connector_specs: dict[str, tuple[str, bool, bool, dict[str, Any]]] = {
"YOUTUBE_VIDEO": ("search_youtube", True, True, {}),
Expand Down Expand Up @@ -492,20 +501,32 @@ async def _search_one_connector(connector: str) -> list[dict[str, Any]]:
try:
# Use isolated session per connector. Shared AsyncSession cannot safely
# run concurrent DB operations.
t_conn = time.perf_counter()
async with semaphore, async_session_maker() as isolated_session:
isolated_connector_service = ConnectorService(
isolated_session, search_space_id
)
connector_method = getattr(isolated_connector_service, method_name)
_, chunks = await connector_method(**kwargs)
perf.info(
"[kb_search] connector=%s results=%d in %.3fs",
connector, len(chunks), time.perf_counter() - t_conn,
)
return chunks
except Exception as e:
print(f"Error searching connector {connector}: {e}")
perf.warning(
"[kb_search] connector=%s FAILED in %.3fs: %s",
connector, time.perf_counter() - t_conn, e,
)
return []

t_gather = time.perf_counter()
connector_results = await asyncio.gather(
*[_search_one_connector(connector) for connector in connectors]
)
perf.info(
"[kb_search] all connectors gathered in %.3fs", time.perf_counter() - t_gather,
)
for chunks in connector_results:
all_documents.extend(chunks)

Expand Down Expand Up @@ -552,7 +573,12 @@ def _content_fingerprint(document: dict[str, Any]) -> int | None:
deduplicated.append(doc)

output_budget = _compute_tool_output_budget(max_input_tokens)
return format_documents_for_context(deduplicated, max_chars=output_budget)
result = format_documents_for_context(deduplicated, max_chars=output_budget)
perf.info(
"[kb_search] TOTAL in %.3fs total_docs=%d deduped=%d output_chars=%d space=%d",
time.perf_counter() - t0, len(all_documents), len(deduplicated), len(result), search_space_id,
)
return result


def _build_connector_docstring(available_connectors: list[str] | None) -> str:
Expand Down
61 changes: 61 additions & 0 deletions surfsense_backend/app/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
from slowapi.middleware import SlowAPIMiddleware
from slowapi.util import get_remote_address
from sqlalchemy.ext.asyncio import AsyncSession
from starlette.middleware.base import BaseHTTPMiddleware, RequestResponseEndpoint
from starlette.requests import Request as StarletteRequest
from starlette.responses import Response as StarletteResponse
from uvicorn.middleware.proxy_headers import ProxyHeadersMiddleware

from app.agents.new_chat.checkpointer import (
Expand All @@ -28,6 +31,7 @@
from app.schemas import UserCreate, UserRead, UserUpdate
from app.tasks.surfsense_docs_indexer import seed_surfsense_docs
from app.users import SECRET, auth_backend, current_active_user, fastapi_users
from app.utils.perf import get_perf_logger, log_system_snapshot

rate_limit_logger = logging.getLogger("surfsense.rate_limit")

Expand Down Expand Up @@ -244,6 +248,63 @@ def registration_allowed():
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)


# ---------------------------------------------------------------------------
# Request-level performance middleware
# ---------------------------------------------------------------------------
# Logs wall-clock time, method, path, and status for every request so we can
# spot slow endpoints in production logs.

_PERF_SLOW_REQUEST_THRESHOLD = float(
__import__("os").environ.get("PERF_SLOW_REQUEST_MS", "2000")
)


class RequestPerfMiddleware(BaseHTTPMiddleware):
"""Middleware that logs per-request wall-clock time.

- ALL requests are logged at DEBUG level.
- Requests exceeding PERF_SLOW_REQUEST_MS (default 2000ms) are logged at
WARNING level with a system snapshot so we can correlate slow responses
with CPU/memory usage at that moment.
"""

async def dispatch(
self, request: StarletteRequest, call_next: RequestResponseEndpoint
) -> StarletteResponse:
perf = get_perf_logger()
t0 = time.perf_counter()
response = await call_next(request)
elapsed_ms = (time.perf_counter() - t0) * 1000

path = request.url.path
method = request.method
status = response.status_code

perf.debug(
"[request] %s %s -> %d in %.1fms",
method,
path,
status,
elapsed_ms,
)

if elapsed_ms > _PERF_SLOW_REQUEST_THRESHOLD:
perf.warning(
"[SLOW_REQUEST] %s %s -> %d in %.1fms (threshold=%.0fms)",
method,
path,
status,
elapsed_ms,
_PERF_SLOW_REQUEST_THRESHOLD,
)
log_system_snapshot("slow_request")

return response


app.add_middleware(RequestPerfMiddleware)

# Add SlowAPI middleware for automatic rate limiting
# Uses Starlette BaseHTTPMiddleware (not the raw ASGI variant) to avoid
# corrupting StreamingResponse — SlowAPIASGIMiddleware re-sends
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import contextlib
import time
from datetime import UTC, datetime

from sqlalchemy import delete, select
Expand Down Expand Up @@ -44,6 +45,7 @@
log_retryable_llm_error,
log_unexpected_error,
)
from app.utils.perf import get_perf_logger


class IndexingPipelineService:
Expand All @@ -58,6 +60,9 @@ async def prepare_for_indexing(
"""
Persist new documents and detect changes, returning only those that need indexing.
"""
perf = get_perf_logger()
t0 = time.perf_counter()

documents = []
seen_hashes: set[str] = set()
batch_ctx = PipelineLogContext(
Expand Down Expand Up @@ -140,11 +145,12 @@ async def prepare_for_indexing(

try:
await self.session.commit()
perf.info(
"[indexing] prepare_for_indexing in %.3fs input=%d output=%d",
time.perf_counter() - t0, len(connector_docs), len(documents),
)
return documents
except IntegrityError:
# A concurrent worker committed a document with the same content_hash
# or unique_identifier_hash between our check and our INSERT.
# The document already exists — roll back and let the next sync run handle it.
log_race_condition(batch_ctx)
await self.session.rollback()
return []
Expand All @@ -165,40 +171,61 @@ async def index(
unique_id=connector_doc.unique_id,
doc_id=document.id,
)
perf = get_perf_logger()
t_index = time.perf_counter()
try:
log_index_started(ctx)
document.status = DocumentStatus.processing()
await self.session.commit()

t_step = time.perf_counter()
if connector_doc.should_summarize and llm is not None:
content = await summarize_document(
connector_doc.source_markdown, llm, connector_doc.metadata
)
perf.info(
"[indexing] summarize_document doc=%d in %.3fs",
document.id, time.perf_counter() - t_step,
)
elif connector_doc.should_summarize and connector_doc.fallback_summary:
content = connector_doc.fallback_summary
else:
content = connector_doc.source_markdown

t_step = time.perf_counter()
embedding = embed_text(content)
perf.debug(
"[indexing] embed_text (summary) doc=%d in %.3fs",
document.id, time.perf_counter() - t_step,
)

await self.session.execute(
delete(Chunk).where(Chunk.document_id == document.id)
)

t_step = time.perf_counter()
chunks = [
Chunk(content=text, embedding=embed_text(text))
for text in chunk_text(
connector_doc.source_markdown,
use_code_chunker=connector_doc.should_use_code_chunker,
)
]
perf.info(
"[indexing] chunk+embed doc=%d chunks=%d in %.3fs",
document.id, len(chunks), time.perf_counter() - t_step,
)

document.content = content
document.embedding = embedding
attach_chunks_to_document(document, chunks)
document.updated_at = datetime.now(UTC)
document.status = DocumentStatus.ready()
await self.session.commit()
perf.info(
"[indexing] index TOTAL doc=%d chunks=%d in %.3fs",
document.id, len(chunks), time.perf_counter() - t_index,
)
log_index_success(ctx, chunk_count=len(chunks))

except RETRYABLE_LLM_ERRORS as e:
Expand Down
42 changes: 41 additions & 1 deletion surfsense_backend/app/retriever/chunks_hybrid_search.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import time
from datetime import datetime

from app.utils.perf import get_perf_logger


class ChucksHybridSearchRetriever:
def __init__(self, db_session):
Expand Down Expand Up @@ -38,9 +41,17 @@ async def vector_search(
from app.config import config
from app.db import Chunk, Document

perf = get_perf_logger()
t0 = time.perf_counter()

# Get embedding for the query
embedding_model = config.embedding_model_instance
t_embed = time.perf_counter()
query_embedding = embedding_model.embed(query_text)
perf.debug(
"[chunk_search] vector_search embedding in %.3fs",
time.perf_counter() - t_embed,
)

# Build the query filtered by search space
query = (
Expand All @@ -60,8 +71,13 @@ async def vector_search(
query = query.order_by(Chunk.embedding.op("<=>")(query_embedding)).limit(top_k)

# Execute the query
t_db = time.perf_counter()
result = await self.db_session.execute(query)
chunks = result.scalars().all()
perf.info(
"[chunk_search] vector_search DB query in %.3fs results=%d (total %.3fs) space=%d",
time.perf_counter() - t_db, len(chunks), time.perf_counter() - t0, search_space_id,
)

return chunks

Expand Down Expand Up @@ -91,6 +107,9 @@ async def full_text_search(

from app.db import Chunk, Document

perf = get_perf_logger()
t0 = time.perf_counter()

# Create tsvector and tsquery for PostgreSQL full-text search
tsvector = func.to_tsvector("english", Chunk.content)
tsquery = func.plainto_tsquery("english", query_text)
Expand Down Expand Up @@ -118,6 +137,10 @@ async def full_text_search(
# Execute the query
result = await self.db_session.execute(query)
chunks = result.scalars().all()
perf.info(
"[chunk_search] full_text_search in %.3fs results=%d space=%d",
time.perf_counter() - t0, len(chunks), search_space_id,
)

return chunks

Expand Down Expand Up @@ -157,9 +180,17 @@ async def hybrid_search(
from app.config import config
from app.db import Chunk, Document, DocumentType

perf = get_perf_logger()
t0 = time.perf_counter()

# Get embedding for the query
embedding_model = config.embedding_model_instance
t_embed = time.perf_counter()
query_embedding = embedding_model.embed(query_text)
perf.debug(
"[chunk_search] hybrid_search embedding in %.3fs",
time.perf_counter() - t_embed,
)

# RRF constants
k = 60
Expand Down Expand Up @@ -254,9 +285,14 @@ async def hybrid_search(
.limit(top_k)
)

# Execute the query
# Execute the RRF query
t_rrf = time.perf_counter()
result = await self.db_session.execute(final_query)
chunks_with_scores = result.all()
perf.info(
"[chunk_search] hybrid_search RRF query in %.3fs results=%d space=%d type=%s",
time.perf_counter() - t_rrf, len(chunks_with_scores), search_space_id, document_type,
)

# If no results were found, return an empty list
if not chunks_with_scores:
Expand Down Expand Up @@ -354,4 +390,8 @@ async def hybrid_search(
)
final_docs.append(entry)

perf.info(
"[chunk_search] hybrid_search TOTAL in %.3fs docs=%d space=%d type=%s",
time.perf_counter() - t0, len(final_docs), search_space_id, document_type,
)
return final_docs
Loading
Loading