diff --git a/PROJECT_STRUCTURE.md b/PROJECT_STRUCTURE.md index 5d6d9a0..ff2475e 100644 --- a/PROJECT_STRUCTURE.md +++ b/PROJECT_STRUCTURE.md @@ -1,6 +1,6 @@ # Project Structure -All source files are in the workspace root. Layering is preserved by filename prefix. +The app now uses a packaged runtime architecture in `video_rss_aggregator/`, with a small set of root-level adapters and CLI entrypoints kept for compatibility. /.gitignore /.data/ @@ -14,8 +14,59 @@ All source files are in the workspace root. Layering is preserved by filename pr /service_ollama.py /service_transcribe.py /service_summarize.py -/service_pipeline.py /adapter_gui.py /adapter_api.py /adapter_rss.py /adapter_storage.py +/video_rss_aggregator/ + /__init__.py + /api.py + /bootstrap.py + /rss.py + /storage.py + /application/ + /__init__.py + /ports.py + /use_cases/ + /__init__.py + /ingest_feed.py + /process_source.py + /render_rss_feed.py + /runtime.py + /domain/ + /__init__.py + /models.py + /outcomes.py + /publication.py + /infrastructure/ + /__init__.py + /feed_source.py + /media_service.py + /publication_renderer.py + /runtime_adapters.py + /sqlite_repositories.py + /summarizer.py +/tests/ + /adapters/ + /test_api_app.py + /test_cli_commands.py + /application/ + /test_ingest_feed.py + /test_process_source.py + /test_render_rss_feed.py + /test_runtime_use_cases.py + /domain/ + /test_outcomes.py + /infrastructure/ + /test_feed_source.py + /test_legacy_adapter_shims.py + /test_media_service.py + /test_publication_renderer.py + /test_runtime_adapters.py + /test_sqlite_repositories.py + /test_summarizer.py + /test_api_setup.py + /test_config.py + /test_ollama.py + /test_project_layout.py + /test_summarize_helpers.py diff --git a/README.md b/README.md index a128eb5..319b519 100644 --- a/README.md +++ b/README.md @@ -7,6 +7,15 @@ This project has been rebuilt around Qwen 3.5 multimodal models and a strict loc - Storage: SQLite (`.data/vra.db`) - API: FastAPI +## Architecture + +- `video_rss_aggregator/` contains the current runtime architecture. +- `video_rss_aggregator/bootstrap.py` composes the application runtime and use cases. +- `video_rss_aggregator/application/` holds use-case orchestration and ports. +- `video_rss_aggregator/domain/` defines the core models and outcome types. +- `video_rss_aggregator/infrastructure/` contains SQLite, RSS, media, summarization, and runtime adapters. +- Root modules such as `adapter_api.py`, `adapter_rss.py`, `adapter_storage.py`, and `cli.py` remain as compatibility and entry-point surfaces around the packaged runtime. + ## Design Goals - Use Qwen 3.5 vision-capable small models for summarization quality. diff --git a/adapter_api.py b/adapter_api.py index fa272e5..ea4ce7b 100644 --- a/adapter_api.py +++ b/adapter_api.py @@ -1,165 +1,23 @@ from __future__ import annotations -import platform -import shutil -import sys -from datetime import datetime, timezone -from importlib.util import find_spec - -from fastapi import Depends, FastAPI, Header, HTTPException, Query -from fastapi.responses import HTMLResponse, Response -from pydantic import BaseModel - -from adapter_gui import render_setup_page from core_config import Config -from service_media import runtime_dependency_report -from service_pipeline import Pipeline - - -class IngestRequest(BaseModel): - feed_url: str - process: bool = False - max_items: int | None = None - - -class ProcessRequest(BaseModel): - source_url: str - title: str | None = None - - -def create_app(pipeline: Pipeline, config: Config) -> FastAPI: - app = FastAPI(title="Video RSS Aggregator", version="0.1.0") - - def _check_auth( - authorization: str | None = Header(None), x_api_key: str | None = Header(None) - ): - if config.api_key is None: - return - token = None - if authorization: - parts = authorization.split() - if len(parts) == 2 and parts[0].lower() == "bearer": - token = parts[1] - if token is None: - token = x_api_key - if token != config.api_key: - raise HTTPException(status_code=401, detail="unauthorized") - - @app.get("/health") - async def health(): - return {"status": "ok", "timestamp": datetime.now(timezone.utc).isoformat()} - - @app.get("/", response_class=HTMLResponse) - async def setup_home(): - return render_setup_page(config) - - @app.get("/setup/config") - async def setup_config(): - return { - "bind_address": f"{config.bind_host}:{config.bind_port}", - "storage_dir": config.storage_dir, - "database_path": config.database_path, - "ollama_base_url": config.ollama_base_url, - "model_priority": list(config.model_priority), - "vram_budget_mb": config.vram_budget_mb, - "model_selection_reserve_mb": config.model_selection_reserve_mb, - "max_frames": config.max_frames, - "frame_scene_detection": config.frame_scene_detection, - "frame_scene_threshold": config.frame_scene_threshold, - "frame_scene_min_frames": config.frame_scene_min_frames, - "api_key_required": config.api_key is not None, - "quick_commands": { - "bootstrap": "python -m vra bootstrap", - "status": "python -m vra status", - "serve": "python -m vra serve --bind 127.0.0.1:8080", - }, - } - - @app.get("/setup/diagnostics") - async def setup_diagnostics(): - media_tools = runtime_dependency_report() - yt_dlp_cmd = shutil.which("yt-dlp") - ytdlp = { - "command": yt_dlp_cmd, - "module_available": find_spec("yt_dlp") is not None, - } - ytdlp["available"] = bool(ytdlp["command"] or ytdlp["module_available"]) - - ollama: dict[str, object] = { - "base_url": config.ollama_base_url, - "reachable": False, - "version": None, - "models_found": 0, - "error": None, - } - try: - runtime = await pipeline.runtime_status() - ollama["reachable"] = True - ollama["version"] = runtime.get("ollama_version") - local_models = runtime.get("local_models", {}) - ollama["models_found"] = len(local_models) - except Exception as exc: - ollama["error"] = str(exc) - - ffmpeg_ok = bool(media_tools["ffmpeg"].get("available")) - ffprobe_ok = bool(media_tools["ffprobe"].get("available")) - ytdlp_ok = bool(ytdlp["available"]) - ollama_ok = bool(ollama["reachable"]) - - return { - "platform": { - "system": platform.system(), - "release": platform.release(), - "python_version": sys.version.split()[0], - "python_executable": sys.executable, - }, - "dependencies": { - "ffmpeg": media_tools["ffmpeg"], - "ffprobe": media_tools["ffprobe"], - "yt_dlp": ytdlp, - "ollama": ollama, - }, - "ready": ffmpeg_ok and ffprobe_ok and ytdlp_ok and ollama_ok, - } - - @app.post("/setup/bootstrap") - async def setup_bootstrap(_=Depends(_check_auth)): - return await pipeline.bootstrap_models() +from video_rss_aggregator.api import IngestRequest, ProcessRequest +from video_rss_aggregator.api import create_app as create_runtime_app +from video_rss_aggregator.bootstrap import AppRuntime, AppUseCases, build_runtime - @app.post("/ingest") - async def ingest(req: IngestRequest, _=Depends(_check_auth)): - report = await pipeline.ingest_feed(req.feed_url, req.process, req.max_items) - return { - "feed_title": report.feed_title, - "item_count": report.item_count, - "processed_count": report.processed_count, - } - @app.post("/process") - async def process(req: ProcessRequest, _=Depends(_check_auth)): - report = await pipeline.process_source(req.source_url, req.title) - return { - "source_url": report.source_url, - "title": report.title, - "transcript_chars": report.transcript_chars, - "frame_count": report.frame_count, - "summary": { - "summary": report.summary.summary, - "key_points": report.summary.key_points, - "visual_highlights": report.summary.visual_highlights, - "model_used": report.summary.model_used, - "vram_mb": report.summary.vram_mb, - "error": report.summary.error, - }, - } +def create_app(runtime: AppRuntime | None = None, config: Config | None = None): + if runtime is not None and not isinstance(runtime, AppRuntime): + raise TypeError("create_app expects an AppRuntime or None") - @app.get("/rss") - async def rss_feed(limit: int = Query(20, ge=1, le=200)): - xml = await pipeline.rss_feed(limit) - return Response(content=xml, media_type="application/rss+xml") + return create_runtime_app(runtime=runtime, config=config) - @app.get("/runtime") - async def runtime(_=Depends(_check_auth)): - return await pipeline.runtime_status() - return app +__all__ = [ + "AppRuntime", + "AppUseCases", + "IngestRequest", + "ProcessRequest", + "build_runtime", + "create_app", +] diff --git a/adapter_rss.py b/adapter_rss.py index e81b51f..8e0a314 100644 --- a/adapter_rss.py +++ b/adapter_rss.py @@ -1,49 +1,3 @@ -from __future__ import annotations +from video_rss_aggregator.rss import render_feed -from datetime import datetime, timezone -from xml.etree.ElementTree import Element, SubElement, tostring - -from adapter_storage import SummaryRecord - - -def render_feed( - title: str, - link: str, - description: str, - records: list[SummaryRecord], -) -> str: - rss = Element("rss", version="2.0") - channel = SubElement(rss, "channel") - SubElement(channel, "title").text = title - SubElement(channel, "link").text = link - SubElement(channel, "description").text = description - - for rec in records: - item = SubElement(channel, "item") - SubElement(item, "title").text = rec.title or "Untitled video" - SubElement(item, "link").text = rec.source_url - - desc_parts = [rec.summary] - if rec.key_points: - bullets = "\n".join(f"- {p}" for p in rec.key_points) - desc_parts.append(bullets) - if rec.visual_highlights: - visuals = "\n".join(f"- {p}" for p in rec.visual_highlights) - desc_parts.append(f"Visual Highlights:\n{visuals}") - if rec.model_used: - desc_parts.append(f"Model: {rec.model_used} (VRAM {rec.vram_mb:.2f} MB)") - - SubElement(item, "description").text = "\n\n".join(desc_parts) - - if rec.published_at: - SubElement(item, "pubDate").text = _rfc2822(rec.published_at) - - return '\n' + tostring( - rss, encoding="unicode" - ) - - -def _rfc2822(dt: datetime) -> str: - if dt.tzinfo is None: - dt = dt.replace(tzinfo=timezone.utc) - return dt.strftime("%a, %d %b %Y %H:%M:%S %z") +__all__ = ["render_feed"] diff --git a/adapter_storage.py b/adapter_storage.py index 911fba6..33b92f3 100644 --- a/adapter_storage.py +++ b/adapter_storage.py @@ -1,284 +1,24 @@ from __future__ import annotations -import json -import uuid -from dataclasses import dataclass -from datetime import datetime, timezone -from pathlib import Path +from service_summarize import SummaryResult as LegacySummaryResult +from video_rss_aggregator.domain.models import SummaryResult +from video_rss_aggregator.storage import Database as PackageDatabase, SummaryRecord -import aiosqlite - -from service_summarize import SummaryResult - - -_SCHEMA = """ -CREATE TABLE IF NOT EXISTS feeds ( - id TEXT PRIMARY KEY, - url TEXT NOT NULL UNIQUE, - title TEXT, - last_checked TEXT NOT NULL -); - -CREATE TABLE IF NOT EXISTS videos ( - id TEXT PRIMARY KEY, - feed_id TEXT, - source_url TEXT NOT NULL UNIQUE, - guid TEXT, - title TEXT, - published_at TEXT, - media_path TEXT, - created_at TEXT NOT NULL -); - -CREATE TABLE IF NOT EXISTS transcripts ( - id TEXT PRIMARY KEY, - video_id TEXT NOT NULL, - text TEXT NOT NULL, - created_at TEXT NOT NULL -); - -CREATE TABLE IF NOT EXISTS summaries ( - id TEXT PRIMARY KEY, - video_id TEXT NOT NULL, - summary TEXT NOT NULL, - key_points TEXT NOT NULL, - visual_highlights TEXT NOT NULL, - model_used TEXT, - vram_mb REAL NOT NULL, - transcript_chars INTEGER NOT NULL, - frame_count INTEGER NOT NULL, - error TEXT, - created_at TEXT NOT NULL -); - -CREATE INDEX IF NOT EXISTS idx_videos_feed_id ON videos(feed_id); -CREATE INDEX IF NOT EXISTS idx_transcripts_video_id ON transcripts(video_id); -CREATE INDEX IF NOT EXISTS idx_summaries_video_id ON summaries(video_id); -CREATE INDEX IF NOT EXISTS idx_summaries_created_at ON summaries(created_at DESC); -""" - -_PRAGMAS = ( - "PRAGMA foreign_keys = ON", - "PRAGMA journal_mode = WAL", - "PRAGMA synchronous = NORMAL", - "PRAGMA temp_store = MEMORY", - "PRAGMA busy_timeout = 5000", -) - - -def _utc_now() -> str: - return datetime.now(timezone.utc).isoformat() - - -def _parse_dt(value: str | None) -> datetime | None: - if not value: - return None - try: - return datetime.fromisoformat(value) - except ValueError: - return None - - -@dataclass(slots=True) -class SummaryRecord: - title: str | None - source_url: str - published_at: datetime | None - summary: str - key_points: list[str] - visual_highlights: list[str] - model_used: str | None - vram_mb: float - created_at: datetime - - -class Database: - def __init__(self, conn: aiosqlite.Connection, database_path: str) -> None: - self._conn = conn - self._database_path = database_path - - @classmethod - async def connect(cls, database_path: str) -> Database: - path = Path(database_path) - path.parent.mkdir(parents=True, exist_ok=True) - - conn = await aiosqlite.connect(str(path)) - conn.row_factory = aiosqlite.Row - for pragma in _PRAGMAS: - await conn.execute(pragma) - await conn.commit() - return cls(conn, str(path)) - - @property - def path(self) -> str: - return self._database_path - - async def close(self) -> None: - await self._conn.close() - - async def migrate(self) -> None: - await self._conn.executescript(_SCHEMA) - await self._conn.commit() - - async def upsert_feed(self, url: str, title: str | None) -> str: - now = _utc_now() - fid = str(uuid.uuid4()) - await self._conn.execute( - """ - INSERT INTO feeds (id, url, title, last_checked) - VALUES (?, ?, ?, ?) - ON CONFLICT(url) - DO UPDATE SET - title = COALESCE(excluded.title, feeds.title), - last_checked = excluded.last_checked - """, - (fid, url, title, now), - ) - await self._conn.commit() - - async with self._conn.execute( - "SELECT id FROM feeds WHERE url = ?", (url,) - ) as cur: - row = await cur.fetchone() - if row is None: - raise RuntimeError("Failed to upsert feed") - return str(row["id"]) - - async def upsert_video( - self, - feed_id: str | None, - guid: str | None, - title: str | None, - source_url: str, - published_at: datetime | None, - media_path: str | None = None, - ) -> str: - now = _utc_now() - vid = str(uuid.uuid4()) - pub_text = published_at.isoformat() if published_at else None - - await self._conn.execute( - """ - INSERT INTO videos ( - id, feed_id, source_url, guid, title, published_at, media_path, created_at - ) - VALUES (?, ?, ?, ?, ?, ?, ?, ?) - ON CONFLICT(source_url) - DO UPDATE SET - feed_id = COALESCE(excluded.feed_id, videos.feed_id), - guid = COALESCE(excluded.guid, videos.guid), - title = COALESCE(excluded.title, videos.title), - published_at = COALESCE(excluded.published_at, videos.published_at), - media_path = COALESCE(excluded.media_path, videos.media_path) - """, - (vid, feed_id, source_url, guid, title, pub_text, media_path, now), - ) - await self._conn.commit() - - async with self._conn.execute( - "SELECT id FROM videos WHERE source_url = ?", - (source_url,), - ) as cur: - row = await cur.fetchone() - if row is None: - raise RuntimeError("Failed to upsert video") - return str(row["id"]) - - async def insert_transcript(self, video_id: str, text: str) -> str: - tid = str(uuid.uuid4()) - await self._conn.execute( - """ - INSERT INTO transcripts (id, video_id, text, created_at) - VALUES (?, ?, ?, ?) - """, - (tid, video_id, text, _utc_now()), - ) - await self._conn.commit() - return tid +class Database(PackageDatabase): async def insert_summary(self, video_id: str, result: SummaryResult) -> str: - sid = str(uuid.uuid4()) - await self._conn.execute( - """ - INSERT INTO summaries ( - id, - video_id, - summary, - key_points, - visual_highlights, - model_used, - vram_mb, - transcript_chars, - frame_count, - error, - created_at + if isinstance(result, LegacySummaryResult): + result = SummaryResult( + summary=result.summary, + key_points=tuple(result.key_points), + visual_highlights=tuple(result.visual_highlights), + model_used=result.model_used, + vram_mb=result.vram_mb, + transcript_chars=result.transcript_chars, + frame_count=result.frame_count, + error=result.error, ) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - """, - ( - sid, - video_id, - result.summary, - json.dumps(result.key_points), - json.dumps(result.visual_highlights), - result.model_used, - float(result.vram_mb), - int(result.transcript_chars), - int(result.frame_count), - result.error, - _utc_now(), - ), - ) - await self._conn.commit() - return sid - - async def latest_summaries(self, limit: int = 20) -> list[SummaryRecord]: - async with self._conn.execute( - """ - SELECT - v.title, - v.source_url, - v.published_at, - s.summary, - s.key_points, - s.visual_highlights, - s.model_used, - s.vram_mb, - s.created_at - FROM summaries s - JOIN videos v ON v.id = s.video_id - ORDER BY s.created_at DESC - LIMIT ? - """, - (limit,), - ) as cur: - rows = await cur.fetchall() + return await super().insert_summary(video_id, result) - out: list[SummaryRecord] = [] - for row in rows: - key_points_raw = row["key_points"] - visual_raw = row["visual_highlights"] - try: - key_points = json.loads(key_points_raw) if key_points_raw else [] - except json.JSONDecodeError: - key_points = [] - try: - visual = json.loads(visual_raw) if visual_raw else [] - except json.JSONDecodeError: - visual = [] - created = _parse_dt(str(row["created_at"])) - out.append( - SummaryRecord( - title=row["title"], - source_url=row["source_url"], - published_at=_parse_dt(row["published_at"]), - summary=row["summary"], - key_points=key_points if isinstance(key_points, list) else [], - visual_highlights=visual if isinstance(visual, list) else [], - model_used=row["model_used"], - vram_mb=float(row["vram_mb"] or 0.0), - created_at=created or datetime.now(timezone.utc), - ) - ) - return out +__all__ = ["Database", "SummaryRecord"] diff --git a/cli.py b/cli.py index 41df40c..ff32d55 100644 --- a/cli.py +++ b/cli.py @@ -15,8 +15,9 @@ from core_config import Config from service_media import extract_frames_ffmpeg, prepare_source -from service_pipeline import Pipeline from service_summarize import SummarizationEngine +from video_rss_aggregator.bootstrap import AppRuntime, build_runtime +from video_rss_aggregator.domain.outcomes import Failure log = logging.getLogger(__name__) @@ -85,6 +86,12 @@ def _as_float(value: object) -> float: return 0.0 +async def _close_runtime(runtime: AppRuntime) -> None: + close_result = runtime.close() + if close_result is not None: + await close_result + + @click.group() def cli() -> None: """Video RSS Aggregator powered by Qwen3.5 vision models.""" @@ -97,21 +104,30 @@ def bootstrap() -> None: config = Config.from_env() async def _run() -> None: - summarizer = SummarizationEngine(config) + runtime = await build_runtime(config) try: - pulled = await summarizer.prepare_models() - runtime = await summarizer.runtime_status() + report = await runtime.use_cases.bootstrap_runtime.execute() + models_prepared = report.get("models_prepared") + if models_prepared is None: + models_prepared = report["models"] + + runtime_payload = report.get("runtime") + if isinstance(runtime_payload, dict): + runtime_response = runtime_payload + else: + runtime_response = await runtime.use_cases.get_runtime_status.execute() + print( json.dumps( { - "models_prepared": pulled, - "runtime": runtime, + "models_prepared": models_prepared, + "runtime": runtime_response, }, indent=2, ) ) finally: - await summarizer.close() + await _close_runtime(runtime) try: asyncio.run(_run()) @@ -138,14 +154,10 @@ def serve(bind: str | None) -> None: async def _run() -> None: from adapter_api import create_app - pipeline = await Pipeline.create(config) - try: - app = create_app(pipeline, config) - uv_cfg = uvicorn.Config(app, host=host, port=port, log_level="info") - server = uvicorn.Server(uv_cfg) - await server.serve() - finally: - await pipeline.close() + app = create_app(config=config) + uv_cfg = uvicorn.Config(app, host=host, port=port, log_level="info") + server = uvicorn.Server(uv_cfg) + await server.serve() try: asyncio.run(_run()) @@ -159,13 +171,12 @@ def status() -> None: config = Config.from_env() async def _run() -> None: - summarizer = SummarizationEngine(config) + runtime = await build_runtime(config) try: - await summarizer.prepare_models() - runtime = await summarizer.runtime_status() - print(json.dumps(runtime, indent=2)) + status_payload = await runtime.use_cases.get_runtime_status.execute() + print(json.dumps(status_payload, indent=2)) finally: - await summarizer.close() + await _close_runtime(runtime) try: asyncio.run(_run()) @@ -181,31 +192,34 @@ def verify(source: str, title: str | None) -> None: config = Config.from_env() async def _run() -> None: - pipeline = await Pipeline.create(config) + runtime = await build_runtime(config) try: t0 = monotonic() - report = await pipeline.process_source(source, title) + outcome = await runtime.use_cases.process_source.execute(source, title) + if isinstance(outcome, Failure): + raise click.ClickException(outcome.reason) + total_ms = int((monotonic() - t0) * 1000) print( json.dumps( { - "source_url": report.source_url, - "title": report.title, - "transcript_chars": report.transcript_chars, - "frame_count": report.frame_count, - "model_used": report.summary.model_used, - "vram_mb": report.summary.vram_mb, - "error": report.summary.error, - "summary_chars": len(report.summary.summary), - "key_points": len(report.summary.key_points), + "source_url": outcome.media.source_url, + "title": outcome.media.title, + "transcript_chars": outcome.summary.transcript_chars, + "frame_count": outcome.summary.frame_count, + "model_used": outcome.summary.model_used, + "vram_mb": outcome.summary.vram_mb, + "error": outcome.summary.error, + "summary_chars": len(outcome.summary.summary), + "key_points": len(outcome.summary.key_points), "total_ms": total_ms, }, indent=2, ) ) finally: - await pipeline.close() + await _close_runtime(runtime) try: asyncio.run(_run()) diff --git a/docs/plans/2026-03-09-codebase-architecture-redesign-design.md b/docs/plans/2026-03-09-codebase-architecture-redesign-design.md new file mode 100644 index 0000000..f59429c --- /dev/null +++ b/docs/plans/2026-03-09-codebase-architecture-redesign-design.md @@ -0,0 +1,214 @@ +# Codebase Architecture Redesign + +Date: 2026-03-09 +Status: Approved + +## Goal + +Define an ideal end-state architecture for the Video RSS Aggregator that improves +maintainability, reliability, delivery velocity, and testability. + +## Context + +The current codebase works, but several design pressures now slow it down: + +- `Pipeline` mixes composition, orchestration, feed ingestion, processing, + persistence, runtime reporting, and RSS generation. +- Data contracts leak across layers, especially between summarization, + persistence, and RSS rendering. +- CLI and API do not consistently share one application-level workflow model. +- Setup/runtime data is shaped in multiple places across Python, HTML, and JS. +- Tests rely heavily on monkeypatching concrete modules instead of stable seams. + +## Chosen Approach + +Adopt a ports-and-use-cases architecture with four layers: + +1. Adapters +2. Application +3. Domain +4. Infrastructure + +This was selected over a lighter workflow-slice refactor or a stabilized version +of the current layering because the goal is an ideal long-term architecture, +not only a lower-risk cleanup. + +## Architectural Principles + +- Dependencies point inward only. +- Business workflows live in application use cases, not transport adapters. +- Domain types are stable and independent of storage, HTTP, CLI, and model APIs. +- Infrastructure implements ports instead of owning business rules. +- Composition happens once in a single composition root. + +## Target Architecture + +### Adapters + +Adapters translate external interactions into application requests and map +application responses back out. + +- FastAPI routes +- CLI commands +- GUI/setup endpoints and page models +- RSS HTTP delivery surface + +Adapters should not contain orchestration logic beyond request mapping, +validation, and response formatting. + +### Application + +Application use cases coordinate business workflows through explicit ports. + +Core use cases: + +- `BootstrapRuntime` +- `GetRuntimeStatus` +- `IngestFeed` +- `ProcessSource` +- `RenderRssFeed` + +The current `Pipeline` class should be replaced by these focused use cases. + +### Domain + +Domain types define the stable language of the system. + +Illustrative types: + +- `SourceItem` +- `VideoRecord` +- `Transcript` +- `PreparedMedia` +- `SummaryDraft` +- `SummaryResult` +- `ProcessOutcome` +- `DiagnosticReport` + +These types must not depend on SQLite rows, Ollama payloads, FastAPI models, +Click commands, or subprocess output. + +### Infrastructure + +Infrastructure adapters implement application ports. + +- SQLite repositories +- Ollama client adapter +- Feed fetching adapter +- Media preparation adapter around yt-dlp, ffmpeg, and filesystem artifacts +- RSS rendering adapter +- Runtime inspection adapter + +Infrastructure owns transport and tool integration details, but not workflow +decisions. + +## Ports + +The application layer should depend on explicit interfaces such as: + +- `FeedSource` +- `VideoRepository` +- `SummaryRepository` +- `MediaPreparationService` +- `Summarizer` +- `RuntimeInspector` +- `PublicationRenderer` +- `ArtifactStore` + +This creates narrow seams for tests and keeps adapters replaceable. + +## Data Flow + +### Startup + +The composition root loads `Config`, builds infrastructure adapters, wires use +cases, and exposes them to FastAPI and CLI entry points. Web startup and +shutdown should be owned by FastAPI lifespan rather than by a prebuilt runtime +object created externally. + +### Ingest + +`IngestFeed` fetches and parses a feed, normalizes entries into domain types, +stores feed/video metadata, and optionally delegates processing to +`ProcessSource`. It should not own processing internals. + +### Processing + +`ProcessSource` asks `MediaPreparationService` for `PreparedMedia`, passes the +result to `Summarizer`, then persists a typed `ProcessOutcome`. + +This use case owns the decision about whether a result is successful, degraded, +or failed. + +### Publication + +`RenderRssFeed` reads published summaries through repositories and passes stable +publication models to a renderer. RSS generation should not depend on storage +row types. + +### Setup and Runtime + +`BootstrapRuntime` and `GetRuntimeStatus` should return one application-level +view model shared by API and GUI, replacing duplicated config/setup shaping. + +## Error Handling Model + +Replace implicit fallback-heavy success semantics with explicit outcome types: + +- `Success` +- `PartialSuccess` +- `Failure` + +Rules: + +- Adapter-specific exceptions are translated at the use-case boundary. +- `PartialSuccess` is used when the system produced a degraded but valid result. +- `Failure` means the business goal was not achieved and must not be presented + as a normal success. +- Persistence records outcome status explicitly rather than relying on summary + text to reveal degradation. +- API and CLI surfaces report status directly. + +Diagnostics remain separate from processing outcomes. + +## Testing Strategy + +The main regression net should move to application-boundary contract tests for: + +- `BootstrapRuntime` +- `GetRuntimeStatus` +- `IngestFeed` +- `ProcessSource` +- `RenderRssFeed` + +Supporting tests should be split into: + +- repository integration tests +- Ollama adapter tests +- media adapter tests +- FastAPI adapter tests +- CLI adapter tests +- policy tests for model selection, degradation classification, normalization, + and retention/publication rules + +The desired end state is that most business behavior can be tested without +FastAPI, Click, SQLite, subprocesses, or a live Ollama runtime. + +## Non-Goals + +- Defining the full migration sequence in this document +- Implementing the redesign directly from adapters inward +- Preserving the current `Pipeline` shape as a compatibility constraint + +## Expected Benefits + +- clearer ownership of business workflows +- lower coupling between persistence, summarization, and presentation +- consistent behavior across API and CLI +- easier unit and contract testing +- safer future changes to model policy, media tooling, and publishing + +## Next Step + +Create a dedicated implementation plan that stages the migration from the +current codebase into this target architecture. diff --git a/pyproject.toml b/pyproject.toml index 32d25c0..3b8a078 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,13 +36,15 @@ py-modules = [ "service_ollama", "service_transcribe", "service_summarize", - "service_pipeline", "adapter_gui", "adapter_api", "adapter_rss", "adapter_storage", ] +[tool.setuptools.packages.find] +include = ["video_rss_aggregator*"] + [tool.pytest.ini_options] testpaths = ["tests"] addopts = "-q" diff --git a/service_pipeline.py b/service_pipeline.py deleted file mode 100644 index 1640aea..0000000 --- a/service_pipeline.py +++ /dev/null @@ -1,208 +0,0 @@ -from __future__ import annotations - -from dataclasses import dataclass -from datetime import datetime, timezone -from time import struct_time -from typing import Any - -import feedparser -import httpx - -from adapter_rss import render_feed -from adapter_storage import Database -from core_config import Config -from service_media import prepare_media -from service_summarize import SummarizationEngine, SummaryResult - - -@dataclass(slots=True) -class IngestReport: - feed_title: str | None = None - item_count: int = 0 - processed_count: int = 0 - - -@dataclass(slots=True) -class ProcessReport: - source_url: str - title: str | None - transcript_chars: int - frame_count: int - summary: SummaryResult - - -class Pipeline: - def __init__( - self, - config: Config, - db: Database, - summarizer: SummarizationEngine, - ) -> None: - self._config = config - self._db = db - self._summarizer = summarizer - self._client = httpx.AsyncClient( - timeout=httpx.Timeout(connect=15.0, read=300.0, write=300.0, pool=60.0), - follow_redirects=True, - limits=httpx.Limits(max_keepalive_connections=20, max_connections=50), - ) - - @classmethod - async def create(cls, config: Config) -> Pipeline: - db = await Database.connect(config.database_path) - await db.migrate() - - summarizer = SummarizationEngine(config) - await summarizer.prepare_models() - - return cls(config, db, summarizer) - - async def close(self) -> None: - await self._client.aclose() - await self._summarizer.close() - await self._db.close() - - async def runtime_status(self) -> dict[str, Any]: - status = await self._summarizer.runtime_status() - status["database_path"] = self._db.path - status["storage_dir"] = self._config.storage_dir - status["models"] = list(self._config.model_priority) - return status - - async def bootstrap_models(self) -> dict[str, Any]: - prepared = await self._summarizer.prepare_models() - return { - "models_prepared": prepared, - "runtime": await self.runtime_status(), - } - - async def ingest_feed( - self, - feed_url: str, - process: bool = False, - max_items: int | None = None, - ) -> IngestReport: - resp = await self._client.get(feed_url) - resp.raise_for_status() - parsed = feedparser.parse(resp.text) - - feed_title = parsed.feed.get("title") - feed_id = await self._db.upsert_feed(feed_url, feed_title) - - entries = parsed.entries - if max_items is not None: - entries = entries[:max_items] - - report = IngestReport(feed_title=feed_title) - - for entry in entries: - source_url = _pick_source_url(entry) - if not source_url: - continue - - title = entry.get("title") - guid = entry.get("id") or None - published = entry.get("published_parsed") or entry.get("updated_parsed") - published_at = _struct_to_dt(published) if published else None - - video_id = await self._db.upsert_video( - feed_id, - guid, - title, - source_url, - published_at, - ) - report.item_count += 1 - - if process: - processed = await self._process_with_video(video_id, source_url, title) - if processed.summary.summary: - report.processed_count += 1 - - return report - - async def process_source( - self, - source_url: str, - title: str | None = None, - ) -> ProcessReport: - video_id = await self._db.upsert_video( - feed_id=None, - guid=None, - title=title, - source_url=source_url, - published_at=None, - ) - return await self._process_with_video(video_id, source_url, title) - - async def rss_feed(self, limit: int = 20) -> str: - records = await self._db.latest_summaries(limit) - return render_feed( - self._config.rss_title, - self._config.rss_link, - self._config.rss_description, - records, - ) - - async def _process_with_video( - self, - video_id: str, - source_url: str, - title: str | None, - ) -> ProcessReport: - prepared = await prepare_media( - client=self._client, - source=source_url, - storage_dir=self._config.storage_dir, - max_frames=self._config.max_frames, - scene_detection=self._config.frame_scene_detection, - scene_threshold=self._config.frame_scene_threshold, - scene_min_frames=self._config.frame_scene_min_frames, - max_transcript_chars=self._config.max_transcript_chars, - ) - - resolved_title = title or prepared.title - await self._db.upsert_video( - feed_id=None, - guid=None, - title=resolved_title, - source_url=source_url, - published_at=None, - media_path=str(prepared.media_path), - ) - - if prepared.transcript: - await self._db.insert_transcript(video_id, prepared.transcript) - - summary = await self._summarizer.summarize( - source_url=source_url, - title=resolved_title, - transcript=prepared.transcript, - frame_paths=prepared.frame_paths, - ) - await self._db.insert_summary(video_id, summary) - - return ProcessReport( - source_url=source_url, - title=resolved_title, - transcript_chars=len(prepared.transcript), - frame_count=len(prepared.frame_paths), - summary=summary, - ) - - -def _pick_source_url(entry: Any) -> str | None: - enclosures = entry.get("enclosures", []) - if enclosures: - return enclosures[0].get("href") or enclosures[0].get("url") - links = entry.get("links", []) - if links: - return links[0].get("href") - return entry.get("link") - - -def _struct_to_dt(value: struct_time) -> datetime | None: - try: - return datetime(*value[:6], tzinfo=timezone.utc) - except Exception: - return None diff --git a/tests/adapters/test_api_app.py b/tests/adapters/test_api_app.py new file mode 100644 index 0000000..c6cbc25 --- /dev/null +++ b/tests/adapters/test_api_app.py @@ -0,0 +1,157 @@ +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any + +from fastapi.testclient import TestClient + +from core_config import Config +from video_rss_aggregator.api import create_app +from video_rss_aggregator.application.use_cases.ingest_feed import IngestReport +from video_rss_aggregator.bootstrap import AppRuntime, AppUseCases +from video_rss_aggregator.domain.models import PreparedMedia, SummaryResult +from video_rss_aggregator.domain.outcomes import PartialSuccess + + +@dataclass +class RecordingUseCase: + result: Any + calls: list[tuple[Any, ...]] = field(default_factory=list) + + async def execute(self, *args: Any) -> Any: + self.calls.append(args) + return self.result + + +def _build_runtime() -> tuple[AppRuntime, dict[str, RecordingUseCase]]: + runtime_status = RecordingUseCase( + { + "ollama_version": "0.6.0", + "local_models": {"qwen": {"size": 1}}, + "reachable": True, + "database_path": ".data/runtime.db", + "storage_dir": ".data", + "models": ["qwen", "qwen:min"], + } + ) + bootstrap_runtime = RecordingUseCase({"models": ["qwen"]}) + ingest_feed = RecordingUseCase( + IngestReport(feed_title="Example Feed", item_count=2, processed_count=1) + ) + process_source = RecordingUseCase( + PartialSuccess( + media=PreparedMedia( + source_url="https://example.com/watch?v=1", + title="Example Title", + transcript="transcript text", + media_path="/tmp/video.mp4", + frame_paths=("/tmp/frame-1.jpg", "/tmp/frame-2.jpg"), + ), + reason="model degraded", + summary=SummaryResult( + summary="Compact summary", + key_points=("one", "two"), + visual_highlights=("frame",), + model_used="qwen", + vram_mb=512.0, + transcript_chars=15, + frame_count=2, + error="model degraded", + ), + ) + ) + render_rss_feed = RecordingUseCase("feed") + + use_cases = { + "get_runtime_status": runtime_status, + "bootstrap_runtime": bootstrap_runtime, + "ingest_feed": ingest_feed, + "process_source": process_source, + "render_rss_feed": render_rss_feed, + } + + return ( + AppRuntime( + config=Config(), + use_cases=AppUseCases(**use_cases), + close=lambda: None, + ), + use_cases, + ) + + +def test_routes_delegate_to_runtime_use_cases_and_keep_http_shapes() -> None: + runtime, use_cases = _build_runtime() + client = TestClient(create_app(runtime)) + + ingest = client.post( + "/ingest", + json={ + "feed_url": "https://example.com/feed.xml", + "process": True, + "max_items": 3, + }, + ) + process = client.post( + "/process", + json={ + "source_url": "https://example.com/watch?v=1", + "title": "Example Title", + }, + ) + rss = client.get("/rss?limit=5") + runtime_response = client.get("/runtime") + bootstrap = client.post("/setup/bootstrap") + + assert ingest.status_code == 200 + assert ingest.json() == { + "feed_title": "Example Feed", + "item_count": 2, + "processed_count": 1, + } + assert process.status_code == 200 + assert process.json() == { + "source_url": "https://example.com/watch?v=1", + "title": "Example Title", + "transcript_chars": 15, + "frame_count": 2, + "summary": { + "summary": "Compact summary", + "key_points": ["one", "two"], + "visual_highlights": ["frame"], + "model_used": "qwen", + "vram_mb": 512.0, + "error": "model degraded", + }, + } + assert rss.status_code == 200 + assert rss.text == "feed" + assert runtime_response.status_code == 200 + assert runtime_response.json() == { + "ollama_version": "0.6.0", + "local_models": {"qwen": {"size": 1}}, + "reachable": True, + "database_path": ".data/runtime.db", + "storage_dir": ".data", + "models": ["qwen", "qwen:min"], + } + assert bootstrap.status_code == 200 + assert bootstrap.json() == { + "models_prepared": ["qwen"], + "runtime": { + "ollama_version": "0.6.0", + "local_models": {"qwen": {"size": 1}}, + "reachable": True, + "database_path": ".data/runtime.db", + "storage_dir": ".data", + "models": ["qwen", "qwen:min"], + }, + } + + assert use_cases["ingest_feed"].calls == [("https://example.com/feed.xml", True, 3)] + assert use_cases["process_source"].calls == [ + ("https://example.com/watch?v=1", "Example Title") + ] + assert use_cases["render_rss_feed"].calls == [(5,)] + assert use_cases["get_runtime_status"].calls == [(), ()] + assert use_cases["bootstrap_runtime"].calls == [()] diff --git a/tests/adapters/test_cli_commands.py b/tests/adapters/test_cli_commands.py new file mode 100644 index 0000000..f32638f --- /dev/null +++ b/tests/adapters/test_cli_commands.py @@ -0,0 +1,227 @@ +from __future__ import annotations + +import json +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any + +from click.testing import CliRunner + +import cli as cli_module +from core_config import Config +from video_rss_aggregator.bootstrap import AppRuntime, AppUseCases +from video_rss_aggregator.domain.models import PreparedMedia, SummaryResult +from video_rss_aggregator.domain.outcomes import PartialSuccess + + +@dataclass +class RecordingUseCase: + result: Any + calls: list[tuple[Any, ...]] = field(default_factory=list) + + async def execute(self, *args: Any) -> Any: + self.calls.append(args) + return self.result + + +def _build_runtime() -> tuple[AppRuntime, dict[str, RecordingUseCase]]: + runtime_status = RecordingUseCase( + { + "ollama_version": "0.6.0", + "local_models": {"qwen": {"size": 1}}, + "reachable": True, + "database_path": ".data/runtime.db", + "storage_dir": ".data", + "models": ["qwen", "qwen:min"], + } + ) + bootstrap_runtime = RecordingUseCase({"models": ["qwen"]}) + process_source = RecordingUseCase( + PartialSuccess( + media=PreparedMedia( + source_url="https://example.com/watch?v=1", + title="Example Title", + transcript="transcript text", + media_path="/tmp/video.mp4", + frame_paths=("/tmp/frame-1.jpg", "/tmp/frame-2.jpg"), + ), + reason="model degraded", + summary=SummaryResult( + summary="Compact summary", + key_points=("one", "two"), + visual_highlights=("frame",), + model_used="qwen", + vram_mb=512.0, + transcript_chars=15, + frame_count=2, + error="model degraded", + ), + ) + ) + passthrough = RecordingUseCase(None) + + use_cases = { + "get_runtime_status": runtime_status, + "bootstrap_runtime": bootstrap_runtime, + "ingest_feed": passthrough, + "process_source": process_source, + "render_rss_feed": passthrough, + } + + closed = {"value": False} + + async def _close() -> None: + closed["value"] = True + + return ( + AppRuntime( + config=Config(), + use_cases=AppUseCases(**use_cases), + close=_close, + ), + {**use_cases, "closed": closed}, + ) + + +def test_bootstrap_uses_runtime_use_cases_and_keeps_json_shape(monkeypatch) -> None: + runtime, use_cases = _build_runtime() + + async def fake_build_runtime(config: Config | None = None) -> AppRuntime: + return runtime + + class LegacySummarizationEngine: + def __init__(self, config: Config) -> None: + raise AssertionError("legacy summarizer wiring used") + + monkeypatch.setattr(cli_module, "build_runtime", fake_build_runtime, raising=False) + monkeypatch.setattr(cli_module, "SummarizationEngine", LegacySummarizationEngine) + + result = CliRunner().invoke(cli_module.cli, ["bootstrap"]) + + assert result.exit_code == 0, result.output + assert json.loads(result.output) == { + "models_prepared": ["qwen"], + "runtime": { + "ollama_version": "0.6.0", + "local_models": {"qwen": {"size": 1}}, + "reachable": True, + "database_path": ".data/runtime.db", + "storage_dir": ".data", + "models": ["qwen", "qwen:min"], + }, + } + assert use_cases["bootstrap_runtime"].calls == [()] + assert use_cases["get_runtime_status"].calls == [()] + assert use_cases["closed"]["value"] is True + + +def test_status_uses_runtime_use_case_and_keeps_json_shape(monkeypatch) -> None: + runtime, use_cases = _build_runtime() + + async def fake_build_runtime(config: Config | None = None) -> AppRuntime: + return runtime + + class LegacySummarizationEngine: + def __init__(self, config: Config) -> None: + raise AssertionError("legacy summarizer wiring used") + + monkeypatch.setattr(cli_module, "build_runtime", fake_build_runtime, raising=False) + monkeypatch.setattr(cli_module, "SummarizationEngine", LegacySummarizationEngine) + + result = CliRunner().invoke(cli_module.cli, ["status"]) + + assert result.exit_code == 0, result.output + assert json.loads(result.output) == { + "ollama_version": "0.6.0", + "local_models": {"qwen": {"size": 1}}, + "reachable": True, + "database_path": ".data/runtime.db", + "storage_dir": ".data", + "models": ["qwen", "qwen:min"], + } + assert use_cases["get_runtime_status"].calls == [()] + assert use_cases["closed"]["value"] is True + + +def test_verify_uses_runtime_process_source_and_keeps_metrics_shape( + monkeypatch, +) -> None: + runtime, use_cases = _build_runtime() + monotonic_values = iter([100.0, 100.25]) + + async def fake_build_runtime(config: Config | None = None) -> AppRuntime: + return runtime + + monkeypatch.setattr(cli_module, "build_runtime", fake_build_runtime, raising=False) + monkeypatch.setattr(cli_module, "monotonic", lambda: next(monotonic_values)) + + result = CliRunner().invoke( + cli_module.cli, + [ + "verify", + "--source", + "https://example.com/watch?v=1", + "--title", + "Example Title", + ], + ) + + assert result.exit_code == 0, result.output + assert json.loads(result.output) == { + "source_url": "https://example.com/watch?v=1", + "title": "Example Title", + "transcript_chars": 15, + "frame_count": 2, + "model_used": "qwen", + "vram_mb": 512.0, + "error": "model degraded", + "summary_chars": 15, + "key_points": 2, + "total_ms": 250, + } + assert use_cases["process_source"].calls == [ + ("https://example.com/watch?v=1", "Example Title") + ] + assert use_cases["closed"]["value"] is True + + +def test_serve_builds_runtime_then_passes_it_to_app_factory(monkeypatch) -> None: + calls: dict[str, Any] = {} + + def fake_create_app( + runtime: AppRuntime | None = None, + config: Config | None = None, + ) -> object: + calls["create_app_runtime"] = runtime + calls["create_app_config"] = config + return object() + + class FakeUvicornConfig: + def __init__(self, app: object, host: str, port: int, log_level: str) -> None: + calls["uvicorn_app"] = app + calls["uvicorn_host"] = host + calls["uvicorn_port"] = port + calls["uvicorn_log_level"] = log_level + + class FakeUvicornServer: + def __init__(self, config: FakeUvicornConfig) -> None: + calls["uvicorn_config"] = config + + async def serve(self) -> None: + calls["served"] = True + + monkeypatch.setattr("adapter_api.create_app", fake_create_app) + monkeypatch.setattr(cli_module.uvicorn, "Config", FakeUvicornConfig) + monkeypatch.setattr(cli_module.uvicorn, "Server", FakeUvicornServer) + + result = CliRunner().invoke(cli_module.cli, ["serve", "--bind", "127.0.0.1:8080"]) + + assert result.exit_code == 0, result.output + assert calls["create_app_runtime"] is None + assert calls["create_app_config"] == Config( + database_path=str(Path(".data") / "vra.db") + ) + assert calls["uvicorn_host"] == "127.0.0.1" + assert calls["uvicorn_port"] == 8080 + assert calls["uvicorn_log_level"] == "info" + assert calls["served"] is True diff --git a/tests/application/test_ingest_feed.py b/tests/application/test_ingest_feed.py new file mode 100644 index 0000000..75e9eec --- /dev/null +++ b/tests/application/test_ingest_feed.py @@ -0,0 +1,214 @@ +from datetime import datetime, timezone + +import pytest + +from video_rss_aggregator.application.ports import FetchedFeed, FetchedFeedEntry +from video_rss_aggregator.application.use_cases.ingest_feed import IngestFeed +from video_rss_aggregator.domain.outcomes import Failure + + +class FakeFeedSource: + async def fetch(self, feed_url: str, max_items: int | None = None): + entries = ( + FetchedFeedEntry(source_url="https://example.com/1", title="One", guid="1"), + FetchedFeedEntry(source_url="https://example.com/2", title="Two", guid="2"), + ) + return FetchedFeed( + title="Example Feed", + site_url="https://example.com", + entries=entries[:max_items] if max_items is not None else entries, + ) + + +class FakeFeedRepository: + def __init__(self) -> None: + self.saved: list[tuple[str, FetchedFeed]] = [] + + async def save(self, feed_url: str, feed: FetchedFeed) -> None: + self.saved.append((feed_url, feed)) + + +class FakeVideoRepository: + def __init__(self) -> None: + self.saved: list[tuple[str, FetchedFeedEntry]] = [] + + async def save_feed_item(self, feed_url: str, entry: FetchedFeedEntry) -> None: + self.saved.append((feed_url, entry)) + + +class FakeProcessSource: + def __init__(self) -> None: + self.calls: list[tuple[str, str | None]] = [] + self.results: dict[str, object] = {} + + async def execute(self, source_url: str, title: str | None): + self.calls.append((source_url, title)) + return self.results.get(source_url) + + +@pytest.fixture +def anyio_backend() -> str: + return "asyncio" + + +@pytest.mark.anyio +async def test_ingest_feed_tracks_processed_items() -> None: + feeds = FakeFeedRepository() + videos = FakeVideoRepository() + process_source = FakeProcessSource() + use_case = IngestFeed( + feed_source=FakeFeedSource(), + feeds=feeds, + videos=videos, + process_source=process_source, + ) + + report = await use_case.execute( + "https://example.com/feed.xml", process=True, max_items=1 + ) + + assert report.item_count == 1 + assert report.processed_count == 1 + assert report.feed_title == "Example Feed" + assert feeds.saved == [ + ( + "https://example.com/feed.xml", + FetchedFeed( + title="Example Feed", + site_url="https://example.com", + entries=( + FetchedFeedEntry( + source_url="https://example.com/1", title="One", guid="1" + ), + ), + ), + ) + ] + assert videos.saved == [ + ( + "https://example.com/feed.xml", + FetchedFeedEntry(source_url="https://example.com/1", title="One", guid="1"), + ) + ] + assert process_source.calls == [("https://example.com/1", "One")] + + +class FakeFeedSourceWithInvalidEntries: + async def fetch(self, feed_url: str, max_items: int | None = None): + return FetchedFeed( + title=None, + site_url=None, + entries=( + FetchedFeedEntry(source_url="", title="Blank", guid="blank"), + FetchedFeedEntry( + source_url="https://example.com/valid", title=None, guid="ok" + ), + FetchedFeedEntry(source_url=" ", title="Whitespace", guid="space"), + ), + ) + + +@pytest.mark.anyio +async def test_ingest_feed_skips_entries_without_source_url() -> None: + feeds = FakeFeedRepository() + videos = FakeVideoRepository() + process_source = FakeProcessSource() + use_case = IngestFeed( + feed_source=FakeFeedSourceWithInvalidEntries(), + feeds=feeds, + videos=videos, + process_source=process_source, + ) + + report = await use_case.execute("https://example.com/feed.xml", process=True) + + assert report.item_count == 1 + assert report.processed_count == 1 + assert report.feed_title is None + assert feeds.saved == [ + ( + "https://example.com/feed.xml", + FetchedFeed( + title=None, + site_url=None, + entries=( + FetchedFeedEntry( + source_url="https://example.com/valid", title=None, guid="ok" + ), + ), + ), + ) + ] + assert videos.saved == [ + ( + "https://example.com/feed.xml", + FetchedFeedEntry( + source_url="https://example.com/valid", title=None, guid="ok" + ), + ) + ] + assert process_source.calls == [("https://example.com/valid", None)] + + +@pytest.mark.anyio +async def test_ingest_feed_counts_only_non_failure_results_as_processed() -> None: + feeds = FakeFeedRepository() + videos = FakeVideoRepository() + process_source = FakeProcessSource() + process_source.results = { + "https://example.com/2": Failure( + source_url="https://example.com/2", reason="download failed" + ) + } + use_case = IngestFeed( + feed_source=FakeFeedSource(), + feeds=feeds, + videos=videos, + process_source=process_source, + ) + + report = await use_case.execute( + "https://example.com/feed.xml", process=True, max_items=2 + ) + + assert report.item_count == 2 + assert report.processed_count == 1 + + +class FakeFeedSourceWithPublishedEntries: + async def fetch(self, feed_url: str, max_items: int | None = None): + return FetchedFeed( + title="Published Feed", + site_url="https://example.com", + entries=( + FetchedFeedEntry( + source_url="https://example.com/published", + title="Published item", + guid="published-guid", + published_at=datetime(2024, 1, 2, 3, 4, tzinfo=timezone.utc), + ), + ), + ) + + +@pytest.mark.anyio +async def test_ingest_feed_preserves_publication_timestamps() -> None: + feeds = FakeFeedRepository() + videos = FakeVideoRepository() + process_source = FakeProcessSource() + use_case = IngestFeed( + feed_source=FakeFeedSourceWithPublishedEntries(), + feeds=feeds, + videos=videos, + process_source=process_source, + ) + + await use_case.execute("https://example.com/feed.xml", process=False) + + saved_feed = feeds.saved[0][1] + saved_entry = videos.saved[0][1] + + assert saved_feed.entries[0].published_at == datetime( + 2024, 1, 2, 3, 4, tzinfo=timezone.utc + ) + assert saved_entry.published_at == datetime(2024, 1, 2, 3, 4, tzinfo=timezone.utc) diff --git a/tests/application/test_process_source.py b/tests/application/test_process_source.py new file mode 100644 index 0000000..a42e05c --- /dev/null +++ b/tests/application/test_process_source.py @@ -0,0 +1,139 @@ +import pytest + +from video_rss_aggregator.application.use_cases.process_source import ProcessSource +from video_rss_aggregator.domain.models import PreparedMedia, SummaryResult +from video_rss_aggregator.domain.outcomes import Failure, PartialSuccess, Success + + +class FakeMediaPreparationService: + async def prepare(self, source_url: str, title: str | None) -> PreparedMedia: + return PreparedMedia( + source_url=source_url, + title=title or "Prepared title", + transcript="transcript", + media_path="/tmp/video.mp4", + frame_paths=("/tmp/frame-1.jpg",), + ) + + +class FakeSummarizer: + def __init__(self, result: SummaryResult) -> None: + self.result = result + + async def summarize(self, prepared_media: PreparedMedia) -> SummaryResult: + return self.result + + +class RaisingMediaPreparationService: + async def prepare(self, source_url: str, title: str | None) -> PreparedMedia: + raise RuntimeError("download failed") + + +class FakeVideoRepository: + def __init__(self) -> None: + self.saved: list[PreparedMedia] = [] + + async def save(self, media: PreparedMedia) -> str: + self.saved.append(media) + return "video-123" + + +class FakeSummaryRepository: + def __init__(self) -> None: + self.saved: list[tuple[str, SummaryResult]] = [] + + async def save(self, video_id: str, summary: SummaryResult) -> None: + self.saved.append((video_id, summary)) + + +class RaisingVideoRepository: + async def save(self, media: PreparedMedia) -> str: + raise RuntimeError("database write failed") + + +@pytest.fixture +def anyio_backend() -> str: + return "asyncio" + + +def build_summary(*, error: str | None = None) -> SummaryResult: + return SummaryResult( + summary="Summarized output", + key_points=("point",), + visual_highlights=("highlight",), + model_used="qwen", + vram_mb=1024.0, + transcript_chars=10, + frame_count=1, + error=error, + ) + + +@pytest.mark.anyio +async def test_process_source_returns_success_for_valid_media() -> None: + videos = FakeVideoRepository() + summaries = FakeSummaryRepository() + use_case = ProcessSource( + media_service=FakeMediaPreparationService(), + summarizer=FakeSummarizer(build_summary()), + videos=videos, + summaries=summaries, + ) + + outcome = await use_case.execute("https://example.com/video", None) + + assert isinstance(outcome, Success) + assert outcome.summary.summary == "Summarized output" + assert len(videos.saved) == 1 + assert len(summaries.saved) == 1 + assert summaries.saved[0][0] == "video-123" + + +@pytest.mark.anyio +async def test_process_source_returns_partial_success_when_summary_has_error() -> None: + use_case = ProcessSource( + media_service=FakeMediaPreparationService(), + summarizer=FakeSummarizer(build_summary(error="model degraded")), + videos=FakeVideoRepository(), + summaries=FakeSummaryRepository(), + ) + + outcome = await use_case.execute("https://example.com/video", "Example") + + assert isinstance(outcome, PartialSuccess) + assert outcome.reason == "model degraded" + assert outcome.summary.error == "model degraded" + + +@pytest.mark.anyio +async def test_process_source_returns_failure_when_media_preparation_raises() -> None: + use_case = ProcessSource( + media_service=RaisingMediaPreparationService(), + summarizer=FakeSummarizer(build_summary()), + videos=FakeVideoRepository(), + summaries=FakeSummaryRepository(), + ) + + outcome = await use_case.execute("https://example.com/video", None) + + assert outcome == Failure( + source_url="https://example.com/video", + reason="download failed", + ) + + +@pytest.mark.anyio +async def test_process_source_returns_failure_when_video_persistence_raises() -> None: + use_case = ProcessSource( + media_service=FakeMediaPreparationService(), + summarizer=FakeSummarizer(build_summary()), + videos=RaisingVideoRepository(), + summaries=FakeSummaryRepository(), + ) + + outcome = await use_case.execute("https://example.com/video", "Example") + + assert outcome == Failure( + source_url="https://example.com/video", + reason="database write failed", + ) diff --git a/tests/application/test_render_rss_feed.py b/tests/application/test_render_rss_feed.py new file mode 100644 index 0000000..c329d6d --- /dev/null +++ b/tests/application/test_render_rss_feed.py @@ -0,0 +1,64 @@ +from datetime import datetime, timezone +from typing import get_type_hints + +import pytest + +from video_rss_aggregator.application.ports import PublicationRepository +from video_rss_aggregator.application.use_cases.render_rss_feed import RenderRssFeed +from video_rss_aggregator.domain.publication import PublicationRecord + + +class FakePublicationRepository: + async def latest_publications(self, limit: int) -> list[PublicationRecord]: + assert limit == 20 + return [ + PublicationRecord( + title="Video", + source_url="https://example.com/watch?v=1", + published_at=datetime(2024, 1, 1, tzinfo=timezone.utc), + summary="A compact summary", + key_points=("Point one", "Point two"), + visual_highlights=("Blue slide",), + model_used="qwen", + vram_mb=1234.5, + ) + ] + + +class FakePublicationRenderer: + def __init__(self) -> None: + self.received: tuple[PublicationRecord, ...] = () + + async def render(self, publications: tuple[PublicationRecord, ...]) -> str: + self.received = publications + return f"{publications[0].summary}" + + +@pytest.fixture +def anyio_backend() -> str: + return "asyncio" + + +@pytest.mark.anyio +async def test_render_rss_feed_uses_publication_records() -> None: + renderer = FakePublicationRenderer() + + xml = await RenderRssFeed( + publications=FakePublicationRepository(), + renderer=renderer, + ).execute(limit=20) + + assert "A compact summary" in xml + assert renderer.received[0].title == "Video" + + +def test_publication_record_allows_nullable_source_fields_in_annotations() -> None: + hints = get_type_hints(PublicationRecord) + + assert hints["title"] == str | None + assert hints["published_at"] == datetime | None + assert hints["model_used"] == str | None + + +def test_publication_repository_exposes_latest_publications_read_model() -> None: + assert hasattr(PublicationRepository, "latest_publications") diff --git a/tests/application/test_runtime_use_cases.py b/tests/application/test_runtime_use_cases.py new file mode 100644 index 0000000..2fc68ba --- /dev/null +++ b/tests/application/test_runtime_use_cases.py @@ -0,0 +1,46 @@ +import pytest + +from video_rss_aggregator.application.use_cases.runtime import ( + BootstrapRuntime, + GetRuntimeStatus, +) + + +class FakeRuntimeInspector: + async def status(self) -> dict[str, object]: + return {"reachable": True, "local_models": {"qwen": {}}, "models": ["qwen"]} + + async def bootstrap(self) -> list[str]: + return ["qwen"] + + +@pytest.fixture +def anyio_backend() -> str: + return "asyncio" + + +@pytest.mark.anyio +async def test_get_runtime_status_returns_app_view_model() -> None: + use_case = GetRuntimeStatus( + runtime=FakeRuntimeInspector(), + storage_path=".data/vra.db", + storage_dir=".data", + models=("qwen", "qwen:min"), + ) + + payload = await use_case.execute() + + assert payload["reachable"] is True + assert payload["local_models"] == {"qwen": {}} + assert payload["database_path"] == ".data/vra.db" + assert payload["storage_dir"] == ".data" + assert payload["models"] == ["qwen", "qwen:min"] + + +@pytest.mark.anyio +async def test_bootstrap_runtime_returns_loaded_models() -> None: + use_case = BootstrapRuntime(runtime=FakeRuntimeInspector()) + + payload = await use_case.execute() + + assert payload == {"models": ["qwen"]} diff --git a/tests/domain/test_outcomes.py b/tests/domain/test_outcomes.py new file mode 100644 index 0000000..168ed40 --- /dev/null +++ b/tests/domain/test_outcomes.py @@ -0,0 +1,131 @@ +from dataclasses import FrozenInstanceError +from typing import Any, get_args + +import pytest + +from video_rss_aggregator.domain import ProcessOutcome +from video_rss_aggregator.domain.models import PreparedMedia, SummaryResult +from video_rss_aggregator.domain.outcomes import Failure, PartialSuccess, Success + + +def test_success_outcome_exposes_status_and_payload() -> None: + summary = SummaryResult( + summary="ok", + key_points=cast_list(["a", "b", "c", "d"]), + visual_highlights=cast_list(["frame 1"]), + model_used="fake-model", + vram_mb=512.0, + transcript_chars=12, + frame_count=1, + error=None, + ) + media = PreparedMedia( + source_url="https://example.com/video", + title="Example", + transcript="hello", + media_path="/tmp/video.mp4", + frame_paths=cast_list(["/tmp/frame-1.jpg"]), + ) + + outcome = Success(media=media, summary=summary) + + assert outcome.status == "success" + assert outcome.summary.model_used == "fake-model" + + +def test_failure_outcome_preserves_reason() -> None: + outcome = Failure(source_url="https://example.com/video", reason="ollama offline") + + assert outcome.status == "failure" + assert outcome.reason == "ollama offline" + + +def test_domain_models_normalize_collections_to_tuples() -> None: + summary = SummaryResult( + summary="ok", + key_points=cast_list(["a", "b"]), + visual_highlights=cast_list(["frame 1"]), + model_used="fake-model", + vram_mb=512.0, + transcript_chars=12, + frame_count=1, + error=None, + ) + media = PreparedMedia( + source_url="https://example.com/video", + title="Example", + transcript="hello", + media_path="/tmp/video.mp4", + frame_paths=cast_list(["/tmp/frame-1.jpg", "/tmp/frame-2.jpg"]), + ) + + assert summary.key_points == ("a", "b") + assert summary.visual_highlights == ("frame 1",) + assert media.frame_paths == ("/tmp/frame-1.jpg", "/tmp/frame-2.jpg") + + +def test_domain_models_are_frozen() -> None: + summary = SummaryResult( + summary="ok", + key_points=cast_list(["a"]), + visual_highlights=cast_list([]), + model_used="fake-model", + vram_mb=512.0, + transcript_chars=12, + frame_count=1, + error=None, + ) + + with pytest.raises(FrozenInstanceError): + cast_any(summary).summary = "changed" + + +def test_partial_success_requires_summary_and_preserves_reason() -> None: + media = PreparedMedia( + source_url="https://example.com/video", + title="Example", + transcript="hello", + media_path="/tmp/video.mp4", + frame_paths=cast_list([]), + ) + summary = SummaryResult( + summary="degraded", + key_points=cast_list(["a"]), + visual_highlights=cast_list([]), + model_used="fake-model", + vram_mb=512.0, + transcript_chars=12, + frame_count=0, + error="missing frames", + ) + + outcome = PartialSuccess(media=media, summary=summary, reason="missing frames") + + assert outcome.status == "partial_success" + assert outcome.summary is summary + assert outcome.reason == "missing frames" + + +def test_partial_success_rejects_missing_summary() -> None: + media = PreparedMedia( + source_url="https://example.com/video", + title="Example", + transcript="hello", + media_path="/tmp/video.mp4", + frame_paths=cast_list([]), + ) + + with pytest.raises(ValueError, match="summary is required"): + PartialSuccess(media=media, summary=cast_any(None), reason="missing frames") + + +def test_process_outcome_alias_is_exported_for_all_outcome_types() -> None: + assert get_args(ProcessOutcome) == (Success, PartialSuccess, Failure) + + +def cast_any(value: object) -> Any: + return value + + +def cast_list(values: list[str]) -> Any: + return values diff --git a/tests/infrastructure/test_feed_source.py b/tests/infrastructure/test_feed_source.py new file mode 100644 index 0000000..f890b5a --- /dev/null +++ b/tests/infrastructure/test_feed_source.py @@ -0,0 +1,147 @@ +from datetime import datetime, timezone +from types import SimpleNamespace + +import pytest + +from video_rss_aggregator.application.ports import FetchedFeed, FetchedFeedEntry +from video_rss_aggregator.infrastructure import feed_source as feed_source_module +from video_rss_aggregator.infrastructure.feed_source import HttpFeedSource + + +class FakeResponse: + def __init__(self, text: str) -> None: + self.text = text + + def raise_for_status(self) -> None: + return None + + +class FakeAsyncClient: + def __init__(self, response: FakeResponse) -> None: + self.response = response + self.calls: list[str] = [] + + async def get(self, url: str) -> FakeResponse: + self.calls.append(url) + return self.response + + +@pytest.fixture +def anyio_backend() -> str: + return "asyncio" + + +@pytest.mark.anyio +async def test_http_feed_source_fetches_and_maps_entries() -> None: + client = FakeAsyncClient( + FakeResponse( + """ + + + Example feed + https://example.com + + First + first-guid + Tue, 02 Jan 2024 03:04:05 GMT + + + + Second + second-guid + https://example.com/watch?v=2 + + + + """ + ) + ) + adapter = HttpFeedSource(client) + + feed = await adapter.fetch("https://example.com/feed.xml", max_items=1) + + assert client.calls == ["https://example.com/feed.xml"] + assert feed == FetchedFeed( + title="Example feed", + site_url="https://example.com", + entries=( + FetchedFeedEntry( + source_url="https://cdn.example.com/video.mp4", + title="First", + guid="first-guid", + published_at=datetime(2024, 1, 2, 3, 4, 5, tzinfo=timezone.utc), + ), + ), + ) + + +@pytest.mark.anyio +async def test_http_feed_source_falls_back_to_updated_timestamp() -> None: + client = FakeAsyncClient( + FakeResponse( + """ + + Example atom feed + + + First + first-guid + 2024-01-02T03:04:05Z + + + + """ + ) + ) + adapter = HttpFeedSource(client) + + feed = await adapter.fetch("https://example.com/feed.xml") + + assert feed == FetchedFeed( + title="Example atom feed", + site_url="https://example.com", + entries=( + FetchedFeedEntry( + source_url="https://example.com/watch?v=1", + title="First", + guid="first-guid", + published_at=datetime(2024, 1, 2, 3, 4, 5, tzinfo=timezone.utc), + ), + ), + ) + + +@pytest.mark.anyio +async def test_http_feed_source_ignores_malformed_parsed_dates(monkeypatch) -> None: + client = FakeAsyncClient(FakeResponse("ignored")) + adapter = HttpFeedSource(client) + + def fake_parse(_text: str) -> SimpleNamespace: + return SimpleNamespace( + feed={"title": "Example feed", "link": "https://example.com"}, + entries=[ + { + "title": "First", + "id": "first-guid", + "link": "https://example.com/watch?v=1", + "published_parsed": (2024, 1), + } + ], + ) + + monkeypatch.setattr(feed_source_module.feedparser, "parse", fake_parse) + + feed = await adapter.fetch("https://example.com/feed.xml") + + assert feed == FetchedFeed( + title="Example feed", + site_url="https://example.com", + entries=( + FetchedFeedEntry( + source_url="https://example.com/watch?v=1", + title="First", + guid="first-guid", + published_at=None, + ), + ), + ) diff --git a/tests/infrastructure/test_legacy_adapter_shims.py b/tests/infrastructure/test_legacy_adapter_shims.py new file mode 100644 index 0000000..5bf3107 --- /dev/null +++ b/tests/infrastructure/test_legacy_adapter_shims.py @@ -0,0 +1,115 @@ +from datetime import datetime, timezone + +import pytest + +from adapter_rss import render_feed +from adapter_storage import Database, SummaryRecord +from service_summarize import SummaryResult as LegacySummaryResult +from video_rss_aggregator.domain.models import SummaryResult as DomainSummaryResult +from video_rss_aggregator.storage import Database as PackageDatabase + + +@pytest.fixture +def anyio_backend() -> str: + return "asyncio" + + +def build_legacy_summary(*, error: str | None = None) -> LegacySummaryResult: + return LegacySummaryResult( + summary="Legacy summary", + key_points=["Point one", "Point two"], + visual_highlights=["Blue slide"], + model_used="qwen", + vram_mb=256.0, + transcript_chars=42, + frame_count=2, + error=error, + ) + + +@pytest.mark.anyio +async def test_legacy_database_insert_summary_adapts_legacy_summary_result( + monkeypatch, +) -> None: + captured: dict[str, object] = {} + + async def fake_insert_summary( + self, video_id: str, result: DomainSummaryResult + ) -> str: + captured["video_id"] = video_id + captured["result"] = result + return "summary-id" + + monkeypatch.setattr(PackageDatabase, "insert_summary", fake_insert_summary) + db = await Database.connect(":memory:") + + summary_id = await db.insert_summary("video-123", build_legacy_summary()) + + await db.close() + + assert summary_id == "summary-id" + assert captured["video_id"] == "video-123" + assert captured["result"] == DomainSummaryResult( + summary="Legacy summary", + key_points=("Point one", "Point two"), + visual_highlights=("Blue slide",), + model_used="qwen", + vram_mb=256.0, + transcript_chars=42, + frame_count=2, + error=None, + ) + + +@pytest.mark.anyio +async def test_legacy_database_persists_legacy_summary_through_shim(tmp_path) -> None: + db = await Database.connect(str(tmp_path / "rss.db")) + await db.migrate() + video_id = await db.upsert_video( + feed_id=None, + guid=None, + title="Legacy title", + source_url="https://example.com/watch?v=legacy", + published_at=datetime(2024, 1, 1, tzinfo=timezone.utc), + ) + + await db.insert_summary(video_id, build_legacy_summary()) + publications = await db.latest_summaries(limit=1) + + await db.close() + + assert publications == [ + SummaryRecord( + title="Legacy title", + source_url="https://example.com/watch?v=legacy", + published_at=datetime(2024, 1, 1, tzinfo=timezone.utc), + summary="Legacy summary", + key_points=("Point one", "Point two"), + visual_highlights=("Blue slide",), + model_used="qwen", + vram_mb=256.0, + ) + ] + + +def test_legacy_rss_render_feed_accepts_summary_record_shim() -> None: + xml = render_feed( + title="Feed title", + link="https://example.com", + description="Feed description", + publications=[ + SummaryRecord( + title="Legacy title", + source_url="https://example.com/watch?v=legacy", + published_at=datetime(2024, 1, 1, tzinfo=timezone.utc), + summary="Legacy summary", + key_points=("Point one",), + visual_highlights=("Blue slide",), + model_used="qwen", + vram_mb=256.0, + ) + ], + ) + + assert "Legacy title" in xml + assert "Legacy summary" in xml diff --git a/tests/infrastructure/test_media_service.py b/tests/infrastructure/test_media_service.py new file mode 100644 index 0000000..b3078eb --- /dev/null +++ b/tests/infrastructure/test_media_service.py @@ -0,0 +1,92 @@ +from pathlib import Path + +import pytest + +import service_media +from service_media import PreparedMedia as LegacyPreparedMedia +from video_rss_aggregator.infrastructure.media_service import ( + LegacyMediaPreparationService, +) + + +@pytest.fixture +def anyio_backend() -> str: + return "asyncio" + + +@pytest.mark.anyio +async def test_media_preparation_service_maps_legacy_prepared_media( + monkeypatch, +) -> None: + captured: dict[str, object] = {} + + async def fake_prepare_media(**kwargs) -> LegacyPreparedMedia: + captured.update(kwargs) + return LegacyPreparedMedia( + media_path=Path("/tmp/downloaded.mp4"), + title="Legacy title", + transcript="captured transcript", + frame_paths=[Path("/tmp/frame-1.jpg"), Path("/tmp/frame-2.jpg")], + ) + + monkeypatch.setattr(service_media, "prepare_media", fake_prepare_media) + + adapter = LegacyMediaPreparationService( + client=object(), + storage_dir=".data", + max_frames=4, + scene_detection=True, + scene_threshold=0.42, + scene_min_frames=3, + max_transcript_chars=5000, + ) + + prepared = await adapter.prepare("https://example.com/watch?v=1", "Feed title") + + assert captured == { + "client": adapter.client, + "source": "https://example.com/watch?v=1", + "storage_dir": ".data", + "max_frames": 4, + "scene_detection": True, + "scene_threshold": 0.42, + "scene_min_frames": 3, + "max_transcript_chars": 5000, + } + assert prepared.source_url == "https://example.com/watch?v=1" + assert prepared.title == "Feed title" + assert prepared.transcript == "captured transcript" + assert Path(prepared.media_path) == Path("/tmp/downloaded.mp4") + assert tuple(Path(path) for path in prepared.frame_paths) == ( + Path("/tmp/frame-1.jpg"), + Path("/tmp/frame-2.jpg"), + ) + + +@pytest.mark.anyio +async def test_media_preparation_service_falls_back_to_legacy_title( + monkeypatch, +) -> None: + async def fake_prepare_media(**kwargs) -> LegacyPreparedMedia: + return LegacyPreparedMedia( + media_path=Path("/tmp/downloaded.mp4"), + title="Legacy title", + transcript="", + frame_paths=[], + ) + + monkeypatch.setattr(service_media, "prepare_media", fake_prepare_media) + + adapter = LegacyMediaPreparationService( + client=object(), + storage_dir=".data", + max_frames=1, + scene_detection=False, + scene_threshold=0.28, + scene_min_frames=1, + max_transcript_chars=100, + ) + + prepared = await adapter.prepare("https://example.com/watch?v=2", None) + + assert prepared.title == "Legacy title" diff --git a/tests/infrastructure/test_publication_renderer.py b/tests/infrastructure/test_publication_renderer.py new file mode 100644 index 0000000..e02dfe2 --- /dev/null +++ b/tests/infrastructure/test_publication_renderer.py @@ -0,0 +1,64 @@ +from datetime import datetime, timezone + +import pytest + +from video_rss_aggregator.domain.publication import PublicationRecord +from video_rss_aggregator.infrastructure.publication_renderer import ( + RssPublicationRenderer, +) +from video_rss_aggregator.rss import render_feed + + +@pytest.fixture +def anyio_backend() -> str: + return "asyncio" + + +def build_publication() -> PublicationRecord: + return PublicationRecord( + title="Video title", + source_url="https://example.com/watch?v=1", + published_at=datetime(2024, 1, 1, tzinfo=timezone.utc), + summary="A compact summary", + key_points=("Point one", "Point two"), + visual_highlights=("Blue slide",), + model_used="qwen", + vram_mb=512.0, + ) + + +@pytest.mark.anyio +async def test_rss_publication_renderer_delegates_to_package_render_feed() -> None: + publications = (build_publication(),) + renderer = RssPublicationRenderer( + title="Feed title", + link="https://example.com", + description="Feed description", + ) + + xml = await renderer.render(publications) + + assert xml == render_feed( + title="Feed title", + link="https://example.com", + description="Feed description", + publications=publications, + ) + + +@pytest.mark.anyio +async def test_rss_publication_renderer_renders_publication_record_fields() -> None: + renderer = RssPublicationRenderer( + title="Feed title", + link="https://example.com", + description="Feed description", + ) + + xml = await renderer.render((build_publication(),)) + + assert "Video title" in xml + assert "https://example.com/watch?v=1" in xml + assert "A compact summary" in xml + assert "Point one" in xml + assert "Blue slide" in xml + assert "Model: qwen (VRAM 512.00 MB)" in xml diff --git a/tests/infrastructure/test_runtime_adapters.py b/tests/infrastructure/test_runtime_adapters.py new file mode 100644 index 0000000..8d062d6 --- /dev/null +++ b/tests/infrastructure/test_runtime_adapters.py @@ -0,0 +1,34 @@ +import pytest + +from video_rss_aggregator.infrastructure.runtime_adapters import LegacyRuntimeInspector + + +class FakeSummarizationEngine: + def __init__(self) -> None: + self.calls: list[str] = [] + + async def runtime_status(self) -> dict[str, object]: + self.calls.append("runtime_status") + return {"reachable": True, "local_models": ["qwen"]} + + async def prepare_models(self) -> list[str]: + self.calls.append("prepare_models") + return ["qwen", "min"] + + +@pytest.fixture +def anyio_backend() -> str: + return "asyncio" + + +@pytest.mark.anyio +async def test_runtime_inspector_delegates_to_legacy_engine() -> None: + engine = FakeSummarizationEngine() + adapter = LegacyRuntimeInspector(engine) + + status = await adapter.status() + prepared = await adapter.bootstrap() + + assert status == {"reachable": True, "local_models": ["qwen"]} + assert prepared == ["qwen", "min"] + assert engine.calls == ["runtime_status", "prepare_models"] diff --git a/tests/infrastructure/test_sqlite_repositories.py b/tests/infrastructure/test_sqlite_repositories.py new file mode 100644 index 0000000..1b09275 --- /dev/null +++ b/tests/infrastructure/test_sqlite_repositories.py @@ -0,0 +1,167 @@ +from datetime import datetime, timezone + +import pytest + +from video_rss_aggregator.domain.models import SummaryResult +from video_rss_aggregator.domain.models import PreparedMedia +from video_rss_aggregator.domain.publication import PublicationRecord +from video_rss_aggregator.infrastructure.sqlite_repositories import ( + SQLiteFeedRepository, + SQLiteFeedVideoRepository, + SQLitePublicationRepository, + SQLiteSummaryRepository, + SQLiteVideoRepository, +) +from video_rss_aggregator.application.ports import FetchedFeed, FetchedFeedEntry +from video_rss_aggregator.storage import Database + + +@pytest.fixture +def anyio_backend() -> str: + return "asyncio" + + +def build_summary(*, error: str | None = None) -> SummaryResult: + return SummaryResult( + summary="A compact summary", + key_points=("Point one", "Point two"), + visual_highlights=("Blue slide",), + model_used="qwen", + vram_mb=512.0, + transcript_chars=120, + frame_count=3, + error=error, + ) + + +@pytest.mark.anyio +async def test_sqlite_summary_repository_saves_domain_summary_result(tmp_path) -> None: + db = await Database.connect(str(tmp_path / "rss.db")) + await db.migrate() + video_id = await db.upsert_video( + feed_id=None, + guid=None, + title="Video title", + source_url="https://example.com/watch?v=1", + published_at=datetime(2024, 1, 1, tzinfo=timezone.utc), + ) + + repository = SQLiteSummaryRepository(db) + + await repository.save(video_id, build_summary()) + + publications = await SQLitePublicationRepository(db).latest_publications(limit=5) + + await db.close() + + assert publications == [ + PublicationRecord( + title="Video title", + source_url="https://example.com/watch?v=1", + published_at=datetime(2024, 1, 1, tzinfo=timezone.utc), + summary="A compact summary", + key_points=("Point one", "Point two"), + visual_highlights=("Blue slide",), + model_used="qwen", + vram_mb=512.0, + ) + ] + + +@pytest.mark.anyio +async def test_sqlite_publication_repository_returns_newest_publications_first( + tmp_path, +) -> None: + db = await Database.connect(str(tmp_path / "rss.db")) + await db.migrate() + + older_video_id = await db.upsert_video( + feed_id=None, + guid=None, + title="Older video", + source_url="https://example.com/watch?v=older", + published_at=None, + ) + await SQLiteSummaryRepository(db).save( + older_video_id, build_summary(error="degraded") + ) + + newer_video_id = await db.upsert_video( + feed_id=None, + guid=None, + title="Newer video", + source_url="https://example.com/watch?v=newer", + published_at=None, + ) + await SQLiteSummaryRepository(db).save(newer_video_id, build_summary()) + + publications = await SQLitePublicationRepository(db).latest_publications(limit=1) + + await db.close() + + assert [publication.title for publication in publications] == ["Newer video"] + assert publications[0].summary == "A compact summary" + + +@pytest.mark.anyio +async def test_sqlite_video_repository_saves_media_and_transcript(tmp_path) -> None: + db = await Database.connect(str(tmp_path / "rss.db")) + await db.migrate() + + repository = SQLiteVideoRepository(db) + + video_id = await repository.save( + PreparedMedia( + source_url="https://example.com/watch?v=prepared", + title="Prepared title", + transcript="transcript text", + media_path="/tmp/video.mp4", + frame_paths=("/tmp/frame-1.jpg",), + ) + ) + + publications = await db.latest_publications(limit=5) + + await db.close() + + assert isinstance(video_id, str) + assert publications == [] + + +@pytest.mark.anyio +async def test_sqlite_feed_adapters_persist_feed_and_video_metadata(tmp_path) -> None: + db = await Database.connect(str(tmp_path / "rss.db")) + await db.migrate() + + await SQLiteFeedRepository(db).save( + "https://example.com/feed.xml", + FetchedFeed(title="Feed title", site_url=None, entries=()), + ) + await SQLiteFeedVideoRepository(db).save_feed_item( + "https://example.com/feed.xml", + FetchedFeedEntry( + source_url="https://example.com/watch?v=from-feed", + title="Feed item", + guid="guid-1", + published_at=datetime(2024, 1, 2, 3, 4, tzinfo=timezone.utc), + ), + ) + + async with db._conn.execute( + "SELECT title FROM feeds WHERE url = ?", ("https://example.com/feed.xml",) + ) as cur: + feed_row = await cur.fetchone() + async with db._conn.execute( + "SELECT title, guid, published_at FROM videos WHERE source_url = ?", + ("https://example.com/watch?v=from-feed",), + ) as cur: + video_row = await cur.fetchone() + + await db.close() + + assert dict(feed_row) == {"title": "Feed title"} + assert dict(video_row) == { + "title": "Feed item", + "guid": "guid-1", + "published_at": "2024-01-02T03:04:00+00:00", + } diff --git a/tests/infrastructure/test_summarizer.py b/tests/infrastructure/test_summarizer.py new file mode 100644 index 0000000..e2946ea --- /dev/null +++ b/tests/infrastructure/test_summarizer.py @@ -0,0 +1,65 @@ +from pathlib import Path + +import pytest + +from service_summarize import SummaryResult as LegacySummaryResult +from video_rss_aggregator.domain.models import PreparedMedia +from video_rss_aggregator.infrastructure.summarizer import LegacySummarizer + + +class FakeSummarizationEngine: + def __init__(self, result: LegacySummaryResult) -> None: + self.result = result + self.calls: list[dict[str, object]] = [] + + async def summarize(self, **kwargs) -> LegacySummaryResult: + self.calls.append(kwargs) + return self.result + + +@pytest.fixture +def anyio_backend() -> str: + return "asyncio" + + +@pytest.mark.anyio +async def test_legacy_summarizer_maps_domain_media_to_legacy_engine() -> None: + engine = FakeSummarizationEngine( + LegacySummaryResult( + summary="Concise summary", + key_points=["One", "Two"], + visual_highlights=["Frame one"], + model_used="qwen", + vram_mb=512.0, + transcript_chars=123, + frame_count=2, + error=None, + ) + ) + adapter = LegacySummarizer(engine) + prepared = PreparedMedia( + source_url="https://example.com/watch?v=1", + title="Example title", + transcript="Transcript body", + media_path="/tmp/video.mp4", + frame_paths=("/tmp/frame-1.jpg", "/tmp/frame-2.jpg"), + ) + + result = await adapter.summarize(prepared) + + assert engine.calls == [ + { + "source_url": "https://example.com/watch?v=1", + "title": "Example title", + "transcript": "Transcript body", + "frame_paths": [Path("/tmp/frame-1.jpg"), Path("/tmp/frame-2.jpg")], + } + ] + assert result.summary == "Concise summary" + assert result.key_points == ("One", "Two") + assert result.visual_highlights == ("Frame one",) + assert result.model_used == "qwen" + assert result.vram_mb == 512.0 + assert result.transcript_chars == 123 + assert result.frame_count == 2 + assert result.error is None diff --git a/tests/test_api_setup.py b/tests/test_api_setup.py index cc1d725..c0c309a 100644 --- a/tests/test_api_setup.py +++ b/tests/test_api_setup.py @@ -3,60 +3,47 @@ from dataclasses import dataclass from typing import Any, cast +import pytest from fastapi.testclient import TestClient from adapter_api import create_app from core_config import Config -from service_summarize import SummaryResult +from video_rss_aggregator.bootstrap import AppRuntime, AppUseCases @dataclass -class _ProcessReport: - source_url: str - title: str | None - transcript_chars: int - frame_count: int - summary: SummaryResult - - -class _DummyPipeline: - async def ingest_feed(self, *_args, **_kwargs): - class _Report: - feed_title = "dummy" - item_count = 1 - processed_count = 0 - - return _Report() - - async def process_source(self, source_url: str, title: str | None = None): - return _ProcessReport( - source_url=source_url, - title=title, - transcript_chars=42, - frame_count=3, - summary=SummaryResult( - summary="ok", - key_points=["a", "b", "c", "d"], - visual_highlights=[], - model_used="qwen3.5:2b-q4_K_M", - vram_mb=2048.0, - transcript_chars=42, - frame_count=3, - ), - ) - - async def rss_feed(self, _limit: int = 20) -> str: - return "" - - async def runtime_status(self): - return {"ok": True} - - async def bootstrap_models(self): - return {"models_prepared": ["qwen3.5:2b-q4_K_M"]} +class _AsyncValue: + value: Any + + async def execute(self, *_args, **_kwargs) -> Any: + return self.value + + +def _build_runtime(config: Config) -> AppRuntime: + runtime_status = { + "ollama_version": "0.6.0", + "local_models": {"qwen3.5:2b-q4_K_M": {}}, + "reachable": True, + "database_path": config.database_path, + "storage_dir": config.storage_dir, + "models": list(config.model_priority), + } + return AppRuntime( + config=config, + use_cases=AppUseCases( + get_runtime_status=_AsyncValue(runtime_status), + bootstrap_runtime=_AsyncValue({"models": ["qwen3.5:2b-q4_K_M"]}), + ingest_feed=cast(Any, _AsyncValue(None)), + process_source=cast(Any, _AsyncValue(None)), + render_rss_feed=_AsyncValue(""), + ), + close=lambda: None, + ) def test_gui_and_setup_routes() -> None: - app = create_app(cast(Any, _DummyPipeline()), Config()) + config = Config() + app = create_app(_build_runtime(config)) client = TestClient(app) home = client.get("/") @@ -83,7 +70,7 @@ def test_gui_and_setup_routes() -> None: def test_runtime_requires_api_key_when_enabled() -> None: - app = create_app(cast(Any, _DummyPipeline()), Config(api_key="secret")) + app = create_app(_build_runtime(Config(api_key="secret"))) client = TestClient(app) unauthorized = client.get("/runtime") @@ -91,4 +78,20 @@ def test_runtime_requires_api_key_when_enabled() -> None: authorized = client.get("/runtime", headers={"X-API-Key": "secret"}) assert authorized.status_code == 200 - assert authorized.json() == {"ok": True} + assert authorized.json() == { + "ollama_version": "0.6.0", + "local_models": {"qwen3.5:2b-q4_K_M": {}}, + "reachable": True, + "database_path": ".data/vra.db", + "storage_dir": ".data", + "models": [ + "qwen3.5:4b-q4_K_M", + "qwen3.5:2b-q4_K_M", + "qwen3.5:0.8b-q8_0", + ], + } + + +def test_create_app_rejects_non_runtime_objects() -> None: + with pytest.raises(TypeError, match="AppRuntime"): + create_app(cast(Any, object()), Config()) diff --git a/tests/test_project_layout.py b/tests/test_project_layout.py new file mode 100644 index 0000000..6e100c1 --- /dev/null +++ b/tests/test_project_layout.py @@ -0,0 +1,7 @@ +from pathlib import Path + + +def test_legacy_service_pipeline_module_has_been_removed() -> None: + repo_root = Path(__file__).resolve().parents[1] + + assert not (repo_root / "service_pipeline.py").exists() diff --git a/video_rss_aggregator/__init__.py b/video_rss_aggregator/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/video_rss_aggregator/api.py b/video_rss_aggregator/api.py new file mode 100644 index 0000000..debf8d6 --- /dev/null +++ b/video_rss_aggregator/api.py @@ -0,0 +1,233 @@ +from __future__ import annotations + +import platform +import shutil +import sys +from contextlib import asynccontextmanager +from datetime import datetime, timezone +from importlib.util import find_spec +from typing import AsyncIterator + +from fastapi import Depends, FastAPI, Header, HTTPException, Query, Request +from fastapi.responses import HTMLResponse, Response +from pydantic import BaseModel + +from adapter_gui import render_setup_page +from core_config import Config +from service_media import runtime_dependency_report +from video_rss_aggregator.bootstrap import AppRuntime, build_runtime +from video_rss_aggregator.domain.outcomes import Failure + + +class IngestRequest(BaseModel): + feed_url: str + process: bool = False + max_items: int | None = None + + +class ProcessRequest(BaseModel): + source_url: str + title: str | None = None + + +def create_app( + runtime: AppRuntime | None = None, + config: Config | None = None, +) -> FastAPI: + resolved_config = ( + runtime.config if runtime is not None else config or Config.from_env() + ) + + @asynccontextmanager + async def lifespan(app: FastAPI) -> AsyncIterator[None]: + app.state.runtime = ( + runtime if runtime is not None else await build_runtime(resolved_config) + ) + try: + yield + finally: + close_result = app.state.runtime.close() + if close_result is not None: + await close_result + + app = FastAPI(title="Video RSS Aggregator", version="0.1.0", lifespan=lifespan) + if runtime is not None: + app.state.runtime = runtime + + def _runtime(request: Request) -> AppRuntime: + return request.app.state.runtime + + def _check_auth( + authorization: str | None = Header(None), x_api_key: str | None = Header(None) + ) -> None: + if resolved_config.api_key is None: + return + token = None + if authorization: + parts = authorization.split() + if len(parts) == 2 and parts[0].lower() == "bearer": + token = parts[1] + if token is None: + token = x_api_key + if token != resolved_config.api_key: + raise HTTPException(status_code=401, detail="unauthorized") + + @app.get("/health") + async def health() -> dict[str, str]: + return {"status": "ok", "timestamp": datetime.now(timezone.utc).isoformat()} + + @app.get("/", response_class=HTMLResponse) + async def setup_home() -> str: + return render_setup_page(resolved_config) + + @app.get("/setup/config") + async def setup_config() -> dict[str, object]: + return { + "bind_address": f"{resolved_config.bind_host}:{resolved_config.bind_port}", + "storage_dir": resolved_config.storage_dir, + "database_path": resolved_config.database_path, + "ollama_base_url": resolved_config.ollama_base_url, + "model_priority": list(resolved_config.model_priority), + "vram_budget_mb": resolved_config.vram_budget_mb, + "model_selection_reserve_mb": resolved_config.model_selection_reserve_mb, + "max_frames": resolved_config.max_frames, + "frame_scene_detection": resolved_config.frame_scene_detection, + "frame_scene_threshold": resolved_config.frame_scene_threshold, + "frame_scene_min_frames": resolved_config.frame_scene_min_frames, + "api_key_required": resolved_config.api_key is not None, + "quick_commands": { + "bootstrap": "python -m vra bootstrap", + "status": "python -m vra status", + "serve": "python -m vra serve --bind 127.0.0.1:8080", + }, + } + + @app.get("/setup/diagnostics") + async def setup_diagnostics(request: Request) -> dict[str, object]: + media_tools = runtime_dependency_report() + yt_dlp_cmd = shutil.which("yt-dlp") + ytdlp = { + "command": yt_dlp_cmd, + "module_available": find_spec("yt_dlp") is not None, + } + ytdlp["available"] = bool(ytdlp["command"] or ytdlp["module_available"]) + + ollama: dict[str, object] = { + "base_url": resolved_config.ollama_base_url, + "reachable": False, + "version": None, + "models_found": 0, + "error": None, + } + try: + runtime_status = await _runtime( + request + ).use_cases.get_runtime_status.execute() + runtime_details = runtime_status + if isinstance(runtime_details, dict): + ollama["reachable"] = True + ollama["version"] = runtime_details.get("ollama_version") + local_models = runtime_details.get("local_models", {}) + ollama["models_found"] = ( + len(local_models) if isinstance(local_models, dict) else 0 + ) + except Exception as exc: + ollama["error"] = str(exc) + + ffmpeg_ok = bool(media_tools["ffmpeg"].get("available")) + ffprobe_ok = bool(media_tools["ffprobe"].get("available")) + ytdlp_ok = bool(ytdlp["available"]) + ollama_ok = bool(ollama["reachable"]) + + return { + "platform": { + "system": platform.system(), + "release": platform.release(), + "python_version": sys.version.split()[0], + "python_executable": sys.executable, + }, + "dependencies": { + "ffmpeg": media_tools["ffmpeg"], + "ffprobe": media_tools["ffprobe"], + "yt_dlp": ytdlp, + "ollama": ollama, + }, + "ready": ffmpeg_ok and ffprobe_ok and ytdlp_ok and ollama_ok, + } + + @app.post("/setup/bootstrap") + async def setup_bootstrap( + request: Request, _=Depends(_check_auth) + ) -> dict[str, object]: + report = await _runtime(request).use_cases.bootstrap_runtime.execute() + models_prepared = report.get("models_prepared") + if models_prepared is None: + models_prepared = report["models"] + + runtime_payload = report.get("runtime") + if isinstance(runtime_payload, dict): + runtime_response = runtime_payload + else: + runtime_response = await _runtime( + request + ).use_cases.get_runtime_status.execute() + + return { + "models_prepared": models_prepared, + "runtime": runtime_response, + } + + @app.post("/ingest") + async def ingest( + req: IngestRequest, request: Request, _=Depends(_check_auth) + ) -> dict[str, object]: + report = await _runtime(request).use_cases.ingest_feed.execute( + req.feed_url, + req.process, + req.max_items, + ) + return { + "feed_title": report.feed_title, + "item_count": report.item_count, + "processed_count": report.processed_count, + } + + @app.post("/process") + async def process( + req: ProcessRequest, request: Request, _=Depends(_check_auth) + ) -> dict[str, object]: + outcome = await _runtime(request).use_cases.process_source.execute( + req.source_url, + req.title, + ) + if isinstance(outcome, Failure): + raise HTTPException(status_code=502, detail=outcome.reason) + return { + "source_url": outcome.media.source_url, + "title": outcome.media.title, + "transcript_chars": outcome.summary.transcript_chars, + "frame_count": outcome.summary.frame_count, + "summary": { + "summary": outcome.summary.summary, + "key_points": list(outcome.summary.key_points), + "visual_highlights": list(outcome.summary.visual_highlights), + "model_used": outcome.summary.model_used, + "vram_mb": outcome.summary.vram_mb, + "error": outcome.summary.error, + }, + } + + @app.get("/rss") + async def rss_feed( + request: Request, limit: int = Query(20, ge=1, le=200) + ) -> Response: + xml = await _runtime(request).use_cases.render_rss_feed.execute(limit) + return Response(content=xml, media_type="application/rss+xml") + + @app.get("/runtime") + async def runtime_status( + request: Request, _=Depends(_check_auth) + ) -> dict[str, object]: + return await _runtime(request).use_cases.get_runtime_status.execute() + + return app diff --git a/video_rss_aggregator/application/__init__.py b/video_rss_aggregator/application/__init__.py new file mode 100644 index 0000000..da47c2f --- /dev/null +++ b/video_rss_aggregator/application/__init__.py @@ -0,0 +1,3 @@ +from video_rss_aggregator.application.ports import RuntimeInspector + +__all__ = ["RuntimeInspector"] diff --git a/video_rss_aggregator/application/ports.py b/video_rss_aggregator/application/ports.py new file mode 100644 index 0000000..ae6622e --- /dev/null +++ b/video_rss_aggregator/application/ports.py @@ -0,0 +1,75 @@ +from __future__ import annotations + +from datetime import datetime +from dataclasses import dataclass, field +from typing import Protocol, Sequence + +from video_rss_aggregator.domain.models import PreparedMedia, SummaryResult +from video_rss_aggregator.domain.outcomes import ProcessOutcome +from video_rss_aggregator.domain.publication import PublicationRecord + + +class RuntimeInspector(Protocol): + async def status(self) -> dict[str, object]: ... + + async def bootstrap(self) -> list[str]: ... + + +class MediaPreparationService(Protocol): + async def prepare(self, source_url: str, title: str | None) -> PreparedMedia: ... + + +@dataclass(frozen=True) +class FetchedFeedEntry: + source_url: str | None + title: str | None = None + guid: str | None = None + published_at: datetime | None = None + + +@dataclass(frozen=True) +class FetchedFeed: + entries: tuple[FetchedFeedEntry, ...] = field(default_factory=tuple) + title: str | None = None + site_url: str | None = None + + def __post_init__(self) -> None: + object.__setattr__(self, "entries", tuple(self.entries)) + + +class FeedSource(Protocol): + async def fetch( + self, feed_url: str, max_items: int | None = None + ) -> FetchedFeed: ... + + +class FeedRepository(Protocol): + async def save(self, feed_url: str, feed: FetchedFeed) -> None: ... + + +class FeedVideoRepository(Protocol): + async def save_feed_item(self, feed_url: str, entry: FetchedFeedEntry) -> None: ... + + +class SourceProcessor(Protocol): + async def execute(self, source_url: str, title: str | None) -> ProcessOutcome: ... + + +class Summarizer(Protocol): + async def summarize(self, prepared_media: PreparedMedia) -> SummaryResult: ... + + +class VideoRepository(Protocol): + async def save(self, media: PreparedMedia) -> str: ... + + +class SummaryRepository(Protocol): + async def save(self, video_id: str, summary: SummaryResult) -> None: ... + + +class PublicationRepository(Protocol): + async def latest_publications(self, limit: int) -> Sequence[PublicationRecord]: ... + + +class PublicationRenderer(Protocol): + async def render(self, publications: Sequence[PublicationRecord]) -> str: ... diff --git a/video_rss_aggregator/application/use_cases/__init__.py b/video_rss_aggregator/application/use_cases/__init__.py new file mode 100644 index 0000000..e2c44c5 --- /dev/null +++ b/video_rss_aggregator/application/use_cases/__init__.py @@ -0,0 +1,6 @@ +from video_rss_aggregator.application.use_cases.runtime import ( + BootstrapRuntime, + GetRuntimeStatus, +) + +__all__ = ["BootstrapRuntime", "GetRuntimeStatus"] diff --git a/video_rss_aggregator/application/use_cases/ingest_feed.py b/video_rss_aggregator/application/use_cases/ingest_feed.py new file mode 100644 index 0000000..ef2912e --- /dev/null +++ b/video_rss_aggregator/application/use_cases/ingest_feed.py @@ -0,0 +1,79 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import cast + +from video_rss_aggregator.application.ports import ( + FetchedFeed, + FetchedFeedEntry, + FeedRepository, + FeedSource, + FeedVideoRepository, + SourceProcessor, +) +from video_rss_aggregator.domain.outcomes import Failure + + +@dataclass(frozen=True) +class IngestReport: + feed_title: str | None + item_count: int + processed_count: int + + +@dataclass(frozen=True) +class IngestFeed: + feed_source: FeedSource + feeds: FeedRepository + videos: FeedVideoRepository + process_source: SourceProcessor + + async def execute( + self, feed_url: str, process: bool = False, max_items: int | None = None + ) -> IngestReport: + fetched_feed = await self.feed_source.fetch(feed_url, max_items=max_items) + normalized_entries: list[FetchedFeedEntry] = [] + + for entry in fetched_feed.entries: + if entry.source_url is None: + continue + + source_url = entry.source_url.strip() + if not source_url: + continue + + normalized_entries.append( + FetchedFeedEntry( + source_url=source_url, + title=entry.title, + guid=entry.guid, + published_at=entry.published_at, + ) + ) + + valid_entries = tuple(normalized_entries) + normalized_feed = FetchedFeed( + title=fetched_feed.title, + site_url=fetched_feed.site_url, + entries=valid_entries, + ) + + processed_count = 0 + + await self.feeds.save(feed_url, normalized_feed) + + for entry in valid_entries: + await self.videos.save_feed_item(feed_url, entry) + + if process: + result = await self.process_source.execute( + cast(str, entry.source_url), entry.title + ) + if not isinstance(result, Failure): + processed_count += 1 + + return IngestReport( + feed_title=normalized_feed.title, + item_count=len(valid_entries), + processed_count=processed_count, + ) diff --git a/video_rss_aggregator/application/use_cases/process_source.py b/video_rss_aggregator/application/use_cases/process_source.py new file mode 100644 index 0000000..bd85d81 --- /dev/null +++ b/video_rss_aggregator/application/use_cases/process_source.py @@ -0,0 +1,39 @@ +from __future__ import annotations + +from dataclasses import dataclass + +from video_rss_aggregator.application.ports import ( + MediaPreparationService, + SummaryRepository, + Summarizer, + VideoRepository, +) +from video_rss_aggregator.domain.outcomes import ( + Failure, + PartialSuccess, + ProcessOutcome, + Success, +) + + +@dataclass(frozen=True) +class ProcessSource: + media_service: MediaPreparationService + summarizer: Summarizer + videos: VideoRepository + summaries: SummaryRepository + + async def execute(self, source_url: str, title: str | None) -> ProcessOutcome: + try: + media = await self.media_service.prepare(source_url, title) + summary = await self.summarizer.summarize(media) + + video_id = await self.videos.save(media) + await self.summaries.save(video_id, summary) + except Exception as exc: + return Failure(source_url=source_url, reason=str(exc)) + + if summary.error is not None: + return PartialSuccess(media=media, reason=summary.error, summary=summary) + + return Success(media=media, summary=summary) diff --git a/video_rss_aggregator/application/use_cases/render_rss_feed.py b/video_rss_aggregator/application/use_cases/render_rss_feed.py new file mode 100644 index 0000000..39031da --- /dev/null +++ b/video_rss_aggregator/application/use_cases/render_rss_feed.py @@ -0,0 +1,18 @@ +from __future__ import annotations + +from dataclasses import dataclass + +from video_rss_aggregator.application.ports import ( + PublicationRenderer, + PublicationRepository, +) + + +@dataclass(frozen=True) +class RenderRssFeed: + publications: PublicationRepository + renderer: PublicationRenderer + + async def execute(self, limit: int) -> str: + publications = await self.publications.latest_publications(limit) + return await self.renderer.render(tuple(publications)) diff --git a/video_rss_aggregator/application/use_cases/runtime.py b/video_rss_aggregator/application/use_cases/runtime.py new file mode 100644 index 0000000..62ced5e --- /dev/null +++ b/video_rss_aggregator/application/use_cases/runtime.py @@ -0,0 +1,29 @@ +from __future__ import annotations + +from dataclasses import dataclass + +from video_rss_aggregator.application.ports import RuntimeInspector + + +@dataclass(frozen=True) +class GetRuntimeStatus: + runtime: RuntimeInspector + storage_path: str + storage_dir: str + models: tuple[str, ...] + + async def execute(self) -> dict[str, object]: + return { + **await self.runtime.status(), + "database_path": self.storage_path, + "storage_dir": self.storage_dir, + "models": list(self.models), + } + + +@dataclass(frozen=True) +class BootstrapRuntime: + runtime: RuntimeInspector + + async def execute(self) -> dict[str, list[str]]: + return {"models": await self.runtime.bootstrap()} diff --git a/video_rss_aggregator/bootstrap.py b/video_rss_aggregator/bootstrap.py new file mode 100644 index 0000000..a6cb757 --- /dev/null +++ b/video_rss_aggregator/bootstrap.py @@ -0,0 +1,109 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import Awaitable, Callable + +import httpx + +from core_config import Config +from service_summarize import SummarizationEngine +from video_rss_aggregator.application.use_cases.ingest_feed import IngestFeed +from video_rss_aggregator.application.use_cases.process_source import ProcessSource +from video_rss_aggregator.application.use_cases.render_rss_feed import RenderRssFeed +from video_rss_aggregator.application.use_cases.runtime import ( + BootstrapRuntime, + GetRuntimeStatus, +) +from video_rss_aggregator.infrastructure.feed_source import HttpFeedSource +from video_rss_aggregator.infrastructure.media_service import ( + LegacyMediaPreparationService, +) +from video_rss_aggregator.infrastructure.publication_renderer import ( + RssPublicationRenderer, +) +from video_rss_aggregator.infrastructure.runtime_adapters import LegacyRuntimeInspector +from video_rss_aggregator.infrastructure.sqlite_repositories import ( + SQLiteFeedRepository, + SQLiteFeedVideoRepository, + SQLitePublicationRepository, + SQLiteSummaryRepository, + SQLiteVideoRepository, +) +from video_rss_aggregator.infrastructure.summarizer import LegacySummarizer +from video_rss_aggregator.storage import Database + + +@dataclass(frozen=True) +class AppUseCases: + get_runtime_status: GetRuntimeStatus + bootstrap_runtime: BootstrapRuntime + ingest_feed: IngestFeed + process_source: ProcessSource + render_rss_feed: RenderRssFeed + + +@dataclass(frozen=True) +class AppRuntime: + config: Config + use_cases: AppUseCases + close: Callable[[], Awaitable[None] | None] + + +async def build_runtime(config: Config | None = None) -> AppRuntime: + resolved_config = config or Config.from_env() + database = await Database.connect(resolved_config.database_path) + await database.migrate() + client = httpx.AsyncClient( + timeout=httpx.Timeout(connect=15.0, read=300.0, write=300.0, pool=60.0), + follow_redirects=True, + limits=httpx.Limits(max_keepalive_connections=20, max_connections=50), + ) + summarization_engine = SummarizationEngine(resolved_config) + + runtime_inspector = LegacyRuntimeInspector(summarization_engine) + process_source = ProcessSource( + media_service=LegacyMediaPreparationService( + client=client, + storage_dir=resolved_config.storage_dir, + max_frames=resolved_config.max_frames, + scene_detection=resolved_config.frame_scene_detection, + scene_threshold=resolved_config.frame_scene_threshold, + scene_min_frames=resolved_config.frame_scene_min_frames, + max_transcript_chars=resolved_config.max_transcript_chars, + ), + summarizer=LegacySummarizer(summarization_engine), + videos=SQLiteVideoRepository(database), + summaries=SQLiteSummaryRepository(database), + ) + + use_cases = AppUseCases( + get_runtime_status=GetRuntimeStatus( + runtime=runtime_inspector, + storage_path=database.path, + storage_dir=resolved_config.storage_dir, + models=resolved_config.model_priority, + ), + bootstrap_runtime=BootstrapRuntime(runtime=runtime_inspector), + ingest_feed=IngestFeed( + feed_source=HttpFeedSource(client), + feeds=SQLiteFeedRepository(database), + videos=SQLiteFeedVideoRepository(database), + process_source=process_source, + ), + process_source=process_source, + render_rss_feed=RenderRssFeed( + publications=SQLitePublicationRepository(database), + renderer=RssPublicationRenderer( + title=resolved_config.rss_title, + link=resolved_config.rss_link, + description=resolved_config.rss_description, + ), + ), + ) + + async def _close() -> None: + await client.aclose() + await summarization_engine.close() + await database.close() + + return AppRuntime(config=resolved_config, use_cases=use_cases, close=_close) diff --git a/video_rss_aggregator/domain/__init__.py b/video_rss_aggregator/domain/__init__.py new file mode 100644 index 0000000..6f4c2da --- /dev/null +++ b/video_rss_aggregator/domain/__init__.py @@ -0,0 +1,16 @@ +from video_rss_aggregator.domain.models import PreparedMedia, SummaryResult +from video_rss_aggregator.domain.outcomes import ( + Failure, + PartialSuccess, + ProcessOutcome, + Success, +) + +__all__ = [ + "Failure", + "PartialSuccess", + "ProcessOutcome", + "PreparedMedia", + "Success", + "SummaryResult", +] diff --git a/video_rss_aggregator/domain/models.py b/video_rss_aggregator/domain/models.py new file mode 100644 index 0000000..d3686a8 --- /dev/null +++ b/video_rss_aggregator/domain/models.py @@ -0,0 +1,31 @@ +from __future__ import annotations + +from dataclasses import dataclass + + +@dataclass(frozen=True) +class PreparedMedia: + source_url: str + title: str + transcript: str + media_path: str + frame_paths: tuple[str, ...] + + def __post_init__(self) -> None: + object.__setattr__(self, "frame_paths", tuple(self.frame_paths)) + + +@dataclass(frozen=True) +class SummaryResult: + summary: str + key_points: tuple[str, ...] + visual_highlights: tuple[str, ...] + model_used: str + vram_mb: float + transcript_chars: int + frame_count: int + error: str | None + + def __post_init__(self) -> None: + object.__setattr__(self, "key_points", tuple(self.key_points)) + object.__setattr__(self, "visual_highlights", tuple(self.visual_highlights)) diff --git a/video_rss_aggregator/domain/outcomes.py b/video_rss_aggregator/domain/outcomes.py new file mode 100644 index 0000000..fe37520 --- /dev/null +++ b/video_rss_aggregator/domain/outcomes.py @@ -0,0 +1,35 @@ +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import TypeAlias + +from video_rss_aggregator.domain.models import PreparedMedia, SummaryResult + + +@dataclass(frozen=True) +class Success: + media: PreparedMedia + summary: SummaryResult + status: str = field(init=False, default="success") + + +@dataclass(frozen=True) +class PartialSuccess: + media: PreparedMedia + reason: str + summary: SummaryResult + status: str = field(init=False, default="partial_success") + + def __post_init__(self) -> None: + if self.summary is None: + raise ValueError("summary is required") + + +@dataclass(frozen=True) +class Failure: + source_url: str + reason: str + status: str = field(init=False, default="failure") + + +ProcessOutcome: TypeAlias = Success | PartialSuccess | Failure diff --git a/video_rss_aggregator/domain/publication.py b/video_rss_aggregator/domain/publication.py new file mode 100644 index 0000000..2195acf --- /dev/null +++ b/video_rss_aggregator/domain/publication.py @@ -0,0 +1,20 @@ +from __future__ import annotations + +from dataclasses import dataclass +from datetime import datetime + + +@dataclass(frozen=True) +class PublicationRecord: + title: str | None + source_url: str + published_at: datetime | None + summary: str + key_points: tuple[str, ...] + visual_highlights: tuple[str, ...] + model_used: str | None + vram_mb: float + + def __post_init__(self) -> None: + object.__setattr__(self, "key_points", tuple(self.key_points)) + object.__setattr__(self, "visual_highlights", tuple(self.visual_highlights)) diff --git a/video_rss_aggregator/infrastructure/__init__.py b/video_rss_aggregator/infrastructure/__init__.py new file mode 100644 index 0000000..0be910b --- /dev/null +++ b/video_rss_aggregator/infrastructure/__init__.py @@ -0,0 +1,19 @@ +from video_rss_aggregator.infrastructure.publication_renderer import ( + RssPublicationRenderer, +) +from video_rss_aggregator.infrastructure.sqlite_repositories import ( + SQLiteFeedRepository, + SQLiteFeedVideoRepository, + SQLitePublicationRepository, + SQLiteSummaryRepository, + SQLiteVideoRepository, +) + +__all__ = [ + "RssPublicationRenderer", + "SQLiteFeedRepository", + "SQLiteFeedVideoRepository", + "SQLitePublicationRepository", + "SQLiteSummaryRepository", + "SQLiteVideoRepository", +] diff --git a/video_rss_aggregator/infrastructure/feed_source.py b/video_rss_aggregator/infrastructure/feed_source.py new file mode 100644 index 0000000..33ca8c9 --- /dev/null +++ b/video_rss_aggregator/infrastructure/feed_source.py @@ -0,0 +1,93 @@ +from __future__ import annotations + +from dataclasses import dataclass +from datetime import datetime, timezone +from email.utils import parsedate_to_datetime +from typing import Any + +import feedparser +import httpx + +from video_rss_aggregator.application.ports import FetchedFeed, FetchedFeedEntry + + +def _pick_source_url(entry: Any) -> str | None: + enclosures = entry.get("enclosures", []) + if enclosures: + return enclosures[0].get("href") or enclosures[0].get("url") + links = entry.get("links", []) + if links: + return links[0].get("href") + return entry.get("link") + + +def _map_entry(entry: Any) -> FetchedFeedEntry: + return FetchedFeedEntry( + source_url=_pick_source_url(entry), + title=entry.get("title") or None, + guid=entry.get("id") or None, + published_at=_pick_published_at(entry), + ) + + +def _pick_published_at(entry: Any) -> datetime | None: + for field in ("published", "updated"): + value = entry.get(field) + if value: + parsed = _parse_datetime_value(value) + if parsed is not None: + return parsed + + for field in ("published_parsed", "updated_parsed"): + value = entry.get(field) + if value is not None: + parsed = _parse_datetime_tuple(value) + if parsed is not None: + return parsed + + return None + + +def _parse_datetime_value(value: Any) -> datetime | None: + if not isinstance(value, str): + return None + + try: + parsed = parsedate_to_datetime(value) + except (TypeError, ValueError, IndexError, OverflowError): + parsed = None + + if parsed is None: + try: + parsed = datetime.fromisoformat(value.replace("Z", "+00:00")) + except ValueError: + return None + + if parsed.tzinfo is None: + return parsed.replace(tzinfo=timezone.utc) + return parsed.astimezone(timezone.utc) + + +def _parse_datetime_tuple(value: Any) -> datetime | None: + try: + return datetime(*value[:6], tzinfo=timezone.utc) + except (TypeError, ValueError, IndexError, OverflowError): + return None + + +@dataclass(frozen=True) +class HttpFeedSource: + client: httpx.AsyncClient + + async def fetch(self, feed_url: str, max_items: int | None = None) -> FetchedFeed: + response = await self.client.get(feed_url) + response.raise_for_status() + parsed = feedparser.parse(response.text) + entries = ( + parsed.entries[:max_items] if max_items is not None else parsed.entries + ) + return FetchedFeed( + title=parsed.feed.get("title") or None, + site_url=parsed.feed.get("link") or None, + entries=tuple(_map_entry(entry) for entry in entries), + ) diff --git a/video_rss_aggregator/infrastructure/media_service.py b/video_rss_aggregator/infrastructure/media_service.py new file mode 100644 index 0000000..34cd8f4 --- /dev/null +++ b/video_rss_aggregator/infrastructure/media_service.py @@ -0,0 +1,47 @@ +from __future__ import annotations + +from dataclasses import dataclass + +import httpx + +import service_media +from video_rss_aggregator.domain.models import PreparedMedia + + +def _map_prepared_media( + source_url: str, + title: str | None, + legacy_media: service_media.PreparedMedia, +) -> PreparedMedia: + resolved_title = title or legacy_media.title or source_url + return PreparedMedia( + source_url=source_url, + title=resolved_title, + transcript=legacy_media.transcript, + media_path=str(legacy_media.media_path), + frame_paths=tuple(str(path) for path in legacy_media.frame_paths), + ) + + +@dataclass(frozen=True) +class LegacyMediaPreparationService: + client: httpx.AsyncClient + storage_dir: str + max_frames: int + scene_detection: bool + scene_threshold: float + scene_min_frames: int + max_transcript_chars: int + + async def prepare(self, source_url: str, title: str | None) -> PreparedMedia: + legacy_media = await service_media.prepare_media( + client=self.client, + source=source_url, + storage_dir=self.storage_dir, + max_frames=self.max_frames, + scene_detection=self.scene_detection, + scene_threshold=self.scene_threshold, + scene_min_frames=self.scene_min_frames, + max_transcript_chars=self.max_transcript_chars, + ) + return _map_prepared_media(source_url, title, legacy_media) diff --git a/video_rss_aggregator/infrastructure/publication_renderer.py b/video_rss_aggregator/infrastructure/publication_renderer.py new file mode 100644 index 0000000..85a0b16 --- /dev/null +++ b/video_rss_aggregator/infrastructure/publication_renderer.py @@ -0,0 +1,17 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import Sequence + +from video_rss_aggregator.domain.publication import PublicationRecord +from video_rss_aggregator.rss import render_feed + + +@dataclass(frozen=True) +class RssPublicationRenderer: + title: str + link: str + description: str + + async def render(self, publications: Sequence[PublicationRecord]) -> str: + return render_feed(self.title, self.link, self.description, publications) diff --git a/video_rss_aggregator/infrastructure/runtime_adapters.py b/video_rss_aggregator/infrastructure/runtime_adapters.py new file mode 100644 index 0000000..621eeb9 --- /dev/null +++ b/video_rss_aggregator/infrastructure/runtime_adapters.py @@ -0,0 +1,21 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import Protocol + + +class _RuntimeEngine(Protocol): + async def runtime_status(self) -> dict[str, object]: ... + + async def prepare_models(self) -> list[str]: ... + + +@dataclass(frozen=True) +class LegacyRuntimeInspector: + engine: _RuntimeEngine + + async def status(self) -> dict[str, object]: + return await self.engine.runtime_status() + + async def bootstrap(self) -> list[str]: + return await self.engine.prepare_models() diff --git a/video_rss_aggregator/infrastructure/sqlite_repositories.py b/video_rss_aggregator/infrastructure/sqlite_repositories.py new file mode 100644 index 0000000..1387d6c --- /dev/null +++ b/video_rss_aggregator/infrastructure/sqlite_repositories.py @@ -0,0 +1,69 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import Sequence + +from video_rss_aggregator.application.ports import FetchedFeed, FetchedFeedEntry +from video_rss_aggregator.domain.models import PreparedMedia +from video_rss_aggregator.domain.models import SummaryResult +from video_rss_aggregator.domain.publication import PublicationRecord +from video_rss_aggregator.storage import Database + + +@dataclass(frozen=True) +class SQLiteFeedRepository: + database: Database + + async def save(self, feed_url: str, feed: FetchedFeed) -> None: + await self.database.upsert_feed(feed_url, feed.title) + + +@dataclass(frozen=True) +class SQLiteFeedVideoRepository: + database: Database + + async def save_feed_item(self, feed_url: str, entry: FetchedFeedEntry) -> None: + if entry.source_url is None: + raise ValueError("source_url is required") + feed_id = await self.database.upsert_feed(feed_url, None) + await self.database.upsert_video( + feed_id=feed_id, + guid=entry.guid, + title=entry.title, + source_url=entry.source_url, + published_at=entry.published_at, + ) + + +@dataclass(frozen=True) +class SQLiteVideoRepository: + database: Database + + async def save(self, media: PreparedMedia) -> str: + video_id = await self.database.upsert_video( + feed_id=None, + guid=None, + title=media.title, + source_url=media.source_url, + published_at=None, + media_path=media.media_path, + ) + if media.transcript: + await self.database.insert_transcript(video_id, media.transcript) + return video_id + + +@dataclass(frozen=True) +class SQLiteSummaryRepository: + database: Database + + async def save(self, video_id: str, summary: SummaryResult) -> None: + await self.database.insert_summary(video_id, summary) + + +@dataclass(frozen=True) +class SQLitePublicationRepository: + database: Database + + async def latest_publications(self, limit: int) -> Sequence[PublicationRecord]: + return await self.database.latest_publications(limit) diff --git a/video_rss_aggregator/infrastructure/summarizer.py b/video_rss_aggregator/infrastructure/summarizer.py new file mode 100644 index 0000000..42b3201 --- /dev/null +++ b/video_rss_aggregator/infrastructure/summarizer.py @@ -0,0 +1,46 @@ +from __future__ import annotations + +from dataclasses import dataclass +from pathlib import Path +from typing import Protocol + +from service_summarize import SummaryResult as LegacySummaryResult +from video_rss_aggregator.domain.models import PreparedMedia, SummaryResult + + +class _SummarizationEngine(Protocol): + async def summarize( + self, + *, + source_url: str, + title: str | None, + transcript: str, + frame_paths: list[Path], + ) -> LegacySummaryResult: ... + + +def _map_summary_result(result: LegacySummaryResult) -> SummaryResult: + return SummaryResult( + summary=result.summary, + key_points=tuple(result.key_points), + visual_highlights=tuple(result.visual_highlights), + model_used=result.model_used, + vram_mb=result.vram_mb, + transcript_chars=result.transcript_chars, + frame_count=result.frame_count, + error=result.error, + ) + + +@dataclass(frozen=True) +class LegacySummarizer: + engine: _SummarizationEngine + + async def summarize(self, prepared_media: PreparedMedia) -> SummaryResult: + result = await self.engine.summarize( + source_url=prepared_media.source_url, + title=prepared_media.title, + transcript=prepared_media.transcript, + frame_paths=[Path(path) for path in prepared_media.frame_paths], + ) + return _map_summary_result(result) diff --git a/video_rss_aggregator/rss.py b/video_rss_aggregator/rss.py new file mode 100644 index 0000000..7db92c7 --- /dev/null +++ b/video_rss_aggregator/rss.py @@ -0,0 +1,54 @@ +from __future__ import annotations + +from datetime import datetime, timezone +from typing import Sequence +from xml.etree.ElementTree import Element, SubElement, tostring + +from video_rss_aggregator.domain.publication import PublicationRecord + + +def render_feed( + title: str, + link: str, + description: str, + publications: Sequence[PublicationRecord], +) -> str: + rss = Element("rss", version="2.0") + channel = SubElement(rss, "channel") + SubElement(channel, "title").text = title + SubElement(channel, "link").text = link + SubElement(channel, "description").text = description + + for publication in publications: + item = SubElement(channel, "item") + SubElement(item, "title").text = publication.title or "Untitled video" + SubElement(item, "link").text = publication.source_url + + description_parts = [publication.summary] + if publication.key_points: + bullets = "\n".join(f"- {point}" for point in publication.key_points) + description_parts.append(bullets) + if publication.visual_highlights: + visuals = "\n".join( + f"- {highlight}" for highlight in publication.visual_highlights + ) + description_parts.append(f"Visual Highlights:\n{visuals}") + if publication.model_used: + description_parts.append( + f"Model: {publication.model_used} (VRAM {publication.vram_mb:.2f} MB)" + ) + + SubElement(item, "description").text = "\n\n".join(description_parts) + + if publication.published_at: + SubElement(item, "pubDate").text = _rfc2822(publication.published_at) + + return '\n' + tostring( + rss, encoding="unicode" + ) + + +def _rfc2822(dt: datetime) -> str: + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + return dt.strftime("%a, %d %b %Y %H:%M:%S %z") diff --git a/video_rss_aggregator/storage.py b/video_rss_aggregator/storage.py new file mode 100644 index 0000000..f2817e1 --- /dev/null +++ b/video_rss_aggregator/storage.py @@ -0,0 +1,275 @@ +from __future__ import annotations + +import json +import uuid +from datetime import datetime, timezone +from pathlib import Path + +import aiosqlite + +from video_rss_aggregator.domain.models import SummaryResult +from video_rss_aggregator.domain.publication import PublicationRecord + +_SCHEMA = """ +CREATE TABLE IF NOT EXISTS feeds ( + id TEXT PRIMARY KEY, + url TEXT NOT NULL UNIQUE, + title TEXT, + last_checked TEXT NOT NULL +); + +CREATE TABLE IF NOT EXISTS videos ( + id TEXT PRIMARY KEY, + feed_id TEXT, + source_url TEXT NOT NULL UNIQUE, + guid TEXT, + title TEXT, + published_at TEXT, + media_path TEXT, + created_at TEXT NOT NULL +); + +CREATE TABLE IF NOT EXISTS transcripts ( + id TEXT PRIMARY KEY, + video_id TEXT NOT NULL, + text TEXT NOT NULL, + created_at TEXT NOT NULL +); + +CREATE TABLE IF NOT EXISTS summaries ( + id TEXT PRIMARY KEY, + video_id TEXT NOT NULL, + summary TEXT NOT NULL, + key_points TEXT NOT NULL, + visual_highlights TEXT NOT NULL, + model_used TEXT, + vram_mb REAL NOT NULL, + transcript_chars INTEGER NOT NULL, + frame_count INTEGER NOT NULL, + error TEXT, + created_at TEXT NOT NULL +); + +CREATE INDEX IF NOT EXISTS idx_videos_feed_id ON videos(feed_id); +CREATE INDEX IF NOT EXISTS idx_transcripts_video_id ON transcripts(video_id); +CREATE INDEX IF NOT EXISTS idx_summaries_video_id ON summaries(video_id); +CREATE INDEX IF NOT EXISTS idx_summaries_created_at ON summaries(created_at DESC); +""" + +_PRAGMAS = ( + "PRAGMA foreign_keys = ON", + "PRAGMA journal_mode = WAL", + "PRAGMA synchronous = NORMAL", + "PRAGMA temp_store = MEMORY", + "PRAGMA busy_timeout = 5000", +) + +SummaryRecord = PublicationRecord + + +def _utc_now() -> str: + return datetime.now(timezone.utc).isoformat() + + +def _parse_dt(value: str | None) -> datetime | None: + if not value: + return None + try: + return datetime.fromisoformat(value) + except ValueError: + return None + + +class Database: + def __init__(self, conn: aiosqlite.Connection, database_path: str) -> None: + self._conn = conn + self._database_path = database_path + + @classmethod + async def connect(cls, database_path: str) -> Database: + path = Path(database_path) + path.parent.mkdir(parents=True, exist_ok=True) + + conn = await aiosqlite.connect(str(path)) + conn.row_factory = aiosqlite.Row + for pragma in _PRAGMAS: + await conn.execute(pragma) + await conn.commit() + return cls(conn, str(path)) + + @property + def path(self) -> str: + return self._database_path + + async def close(self) -> None: + await self._conn.close() + + async def migrate(self) -> None: + await self._conn.executescript(_SCHEMA) + await self._conn.commit() + + async def upsert_feed(self, url: str, title: str | None) -> str: + now = _utc_now() + feed_id = str(uuid.uuid4()) + await self._conn.execute( + """ + INSERT INTO feeds (id, url, title, last_checked) + VALUES (?, ?, ?, ?) + ON CONFLICT(url) + DO UPDATE SET + title = COALESCE(excluded.title, feeds.title), + last_checked = excluded.last_checked + """, + (feed_id, url, title, now), + ) + await self._conn.commit() + + async with self._conn.execute( + "SELECT id FROM feeds WHERE url = ?", (url,) + ) as cur: + row = await cur.fetchone() + if row is None: + raise RuntimeError("Failed to upsert feed") + return str(row["id"]) + + async def upsert_video( + self, + feed_id: str | None, + guid: str | None, + title: str | None, + source_url: str, + published_at: datetime | None, + media_path: str | None = None, + ) -> str: + now = _utc_now() + video_id = str(uuid.uuid4()) + pub_text = published_at.isoformat() if published_at else None + + await self._conn.execute( + """ + INSERT INTO videos ( + id, feed_id, source_url, guid, title, published_at, media_path, created_at + ) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(source_url) + DO UPDATE SET + feed_id = COALESCE(excluded.feed_id, videos.feed_id), + guid = COALESCE(excluded.guid, videos.guid), + title = COALESCE(excluded.title, videos.title), + published_at = COALESCE(excluded.published_at, videos.published_at), + media_path = COALESCE(excluded.media_path, videos.media_path) + """, + (video_id, feed_id, source_url, guid, title, pub_text, media_path, now), + ) + await self._conn.commit() + + async with self._conn.execute( + "SELECT id FROM videos WHERE source_url = ?", + (source_url,), + ) as cur: + row = await cur.fetchone() + if row is None: + raise RuntimeError("Failed to upsert video") + return str(row["id"]) + + async def insert_transcript(self, video_id: str, text: str) -> str: + transcript_id = str(uuid.uuid4()) + await self._conn.execute( + """ + INSERT INTO transcripts (id, video_id, text, created_at) + VALUES (?, ?, ?, ?) + """, + (transcript_id, video_id, text, _utc_now()), + ) + await self._conn.commit() + return transcript_id + + async def insert_summary(self, video_id: str, result: SummaryResult) -> str: + summary_id = str(uuid.uuid4()) + await self._conn.execute( + """ + INSERT INTO summaries ( + id, + video_id, + summary, + key_points, + visual_highlights, + model_used, + vram_mb, + transcript_chars, + frame_count, + error, + created_at + ) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + summary_id, + video_id, + result.summary, + json.dumps(result.key_points), + json.dumps(result.visual_highlights), + result.model_used, + float(result.vram_mb), + int(result.transcript_chars), + int(result.frame_count), + result.error, + _utc_now(), + ), + ) + await self._conn.commit() + return summary_id + + async def latest_publications(self, limit: int = 20) -> list[PublicationRecord]: + async with self._conn.execute( + """ + SELECT + v.title, + v.source_url, + v.published_at, + s.summary, + s.key_points, + s.visual_highlights, + s.model_used, + s.vram_mb + FROM summaries s + JOIN videos v ON v.id = s.video_id + ORDER BY s.created_at DESC + LIMIT ? + """, + (limit,), + ) as cur: + rows = await cur.fetchall() + + publications: list[PublicationRecord] = [] + for row in rows: + key_points = _decode_text_list(row["key_points"]) + visual_highlights = _decode_text_list(row["visual_highlights"]) + publications.append( + PublicationRecord( + title=row["title"], + source_url=row["source_url"], + published_at=_parse_dt(row["published_at"]), + summary=row["summary"], + key_points=key_points, + visual_highlights=visual_highlights, + model_used=row["model_used"], + vram_mb=float(row["vram_mb"] or 0.0), + ) + ) + return publications + + async def latest_summaries(self, limit: int = 20) -> list[PublicationRecord]: + return await self.latest_publications(limit) + + +def _decode_text_list(value: str | None) -> list[str]: + if not value: + return [] + try: + decoded = json.loads(value) + except json.JSONDecodeError: + return [] + if not isinstance(decoded, list): + return [] + return [str(item) for item in decoded]