diff --git a/PROJECT_STRUCTURE.md b/PROJECT_STRUCTURE.md
index 5d6d9a0..ff2475e 100644
--- a/PROJECT_STRUCTURE.md
+++ b/PROJECT_STRUCTURE.md
@@ -1,6 +1,6 @@
# Project Structure
-All source files are in the workspace root. Layering is preserved by filename prefix.
+The app now uses a packaged runtime architecture in `video_rss_aggregator/`, with a small set of root-level adapters and CLI entrypoints kept for compatibility.
/.gitignore
/.data/
@@ -14,8 +14,59 @@ All source files are in the workspace root. Layering is preserved by filename pr
/service_ollama.py
/service_transcribe.py
/service_summarize.py
-/service_pipeline.py
/adapter_gui.py
/adapter_api.py
/adapter_rss.py
/adapter_storage.py
+/video_rss_aggregator/
+ /__init__.py
+ /api.py
+ /bootstrap.py
+ /rss.py
+ /storage.py
+ /application/
+ /__init__.py
+ /ports.py
+ /use_cases/
+ /__init__.py
+ /ingest_feed.py
+ /process_source.py
+ /render_rss_feed.py
+ /runtime.py
+ /domain/
+ /__init__.py
+ /models.py
+ /outcomes.py
+ /publication.py
+ /infrastructure/
+ /__init__.py
+ /feed_source.py
+ /media_service.py
+ /publication_renderer.py
+ /runtime_adapters.py
+ /sqlite_repositories.py
+ /summarizer.py
+/tests/
+ /adapters/
+ /test_api_app.py
+ /test_cli_commands.py
+ /application/
+ /test_ingest_feed.py
+ /test_process_source.py
+ /test_render_rss_feed.py
+ /test_runtime_use_cases.py
+ /domain/
+ /test_outcomes.py
+ /infrastructure/
+ /test_feed_source.py
+ /test_legacy_adapter_shims.py
+ /test_media_service.py
+ /test_publication_renderer.py
+ /test_runtime_adapters.py
+ /test_sqlite_repositories.py
+ /test_summarizer.py
+ /test_api_setup.py
+ /test_config.py
+ /test_ollama.py
+ /test_project_layout.py
+ /test_summarize_helpers.py
diff --git a/README.md b/README.md
index a128eb5..319b519 100644
--- a/README.md
+++ b/README.md
@@ -7,6 +7,15 @@ This project has been rebuilt around Qwen 3.5 multimodal models and a strict loc
- Storage: SQLite (`.data/vra.db`)
- API: FastAPI
+## Architecture
+
+- `video_rss_aggregator/` contains the current runtime architecture.
+- `video_rss_aggregator/bootstrap.py` composes the application runtime and use cases.
+- `video_rss_aggregator/application/` holds use-case orchestration and ports.
+- `video_rss_aggregator/domain/` defines the core models and outcome types.
+- `video_rss_aggregator/infrastructure/` contains SQLite, RSS, media, summarization, and runtime adapters.
+- Root modules such as `adapter_api.py`, `adapter_rss.py`, `adapter_storage.py`, and `cli.py` remain as compatibility and entry-point surfaces around the packaged runtime.
+
## Design Goals
- Use Qwen 3.5 vision-capable small models for summarization quality.
diff --git a/adapter_api.py b/adapter_api.py
index fa272e5..ea4ce7b 100644
--- a/adapter_api.py
+++ b/adapter_api.py
@@ -1,165 +1,23 @@
from __future__ import annotations
-import platform
-import shutil
-import sys
-from datetime import datetime, timezone
-from importlib.util import find_spec
-
-from fastapi import Depends, FastAPI, Header, HTTPException, Query
-from fastapi.responses import HTMLResponse, Response
-from pydantic import BaseModel
-
-from adapter_gui import render_setup_page
from core_config import Config
-from service_media import runtime_dependency_report
-from service_pipeline import Pipeline
-
-
-class IngestRequest(BaseModel):
- feed_url: str
- process: bool = False
- max_items: int | None = None
-
-
-class ProcessRequest(BaseModel):
- source_url: str
- title: str | None = None
-
-
-def create_app(pipeline: Pipeline, config: Config) -> FastAPI:
- app = FastAPI(title="Video RSS Aggregator", version="0.1.0")
-
- def _check_auth(
- authorization: str | None = Header(None), x_api_key: str | None = Header(None)
- ):
- if config.api_key is None:
- return
- token = None
- if authorization:
- parts = authorization.split()
- if len(parts) == 2 and parts[0].lower() == "bearer":
- token = parts[1]
- if token is None:
- token = x_api_key
- if token != config.api_key:
- raise HTTPException(status_code=401, detail="unauthorized")
-
- @app.get("/health")
- async def health():
- return {"status": "ok", "timestamp": datetime.now(timezone.utc).isoformat()}
-
- @app.get("/", response_class=HTMLResponse)
- async def setup_home():
- return render_setup_page(config)
-
- @app.get("/setup/config")
- async def setup_config():
- return {
- "bind_address": f"{config.bind_host}:{config.bind_port}",
- "storage_dir": config.storage_dir,
- "database_path": config.database_path,
- "ollama_base_url": config.ollama_base_url,
- "model_priority": list(config.model_priority),
- "vram_budget_mb": config.vram_budget_mb,
- "model_selection_reserve_mb": config.model_selection_reserve_mb,
- "max_frames": config.max_frames,
- "frame_scene_detection": config.frame_scene_detection,
- "frame_scene_threshold": config.frame_scene_threshold,
- "frame_scene_min_frames": config.frame_scene_min_frames,
- "api_key_required": config.api_key is not None,
- "quick_commands": {
- "bootstrap": "python -m vra bootstrap",
- "status": "python -m vra status",
- "serve": "python -m vra serve --bind 127.0.0.1:8080",
- },
- }
-
- @app.get("/setup/diagnostics")
- async def setup_diagnostics():
- media_tools = runtime_dependency_report()
- yt_dlp_cmd = shutil.which("yt-dlp")
- ytdlp = {
- "command": yt_dlp_cmd,
- "module_available": find_spec("yt_dlp") is not None,
- }
- ytdlp["available"] = bool(ytdlp["command"] or ytdlp["module_available"])
-
- ollama: dict[str, object] = {
- "base_url": config.ollama_base_url,
- "reachable": False,
- "version": None,
- "models_found": 0,
- "error": None,
- }
- try:
- runtime = await pipeline.runtime_status()
- ollama["reachable"] = True
- ollama["version"] = runtime.get("ollama_version")
- local_models = runtime.get("local_models", {})
- ollama["models_found"] = len(local_models)
- except Exception as exc:
- ollama["error"] = str(exc)
-
- ffmpeg_ok = bool(media_tools["ffmpeg"].get("available"))
- ffprobe_ok = bool(media_tools["ffprobe"].get("available"))
- ytdlp_ok = bool(ytdlp["available"])
- ollama_ok = bool(ollama["reachable"])
-
- return {
- "platform": {
- "system": platform.system(),
- "release": platform.release(),
- "python_version": sys.version.split()[0],
- "python_executable": sys.executable,
- },
- "dependencies": {
- "ffmpeg": media_tools["ffmpeg"],
- "ffprobe": media_tools["ffprobe"],
- "yt_dlp": ytdlp,
- "ollama": ollama,
- },
- "ready": ffmpeg_ok and ffprobe_ok and ytdlp_ok and ollama_ok,
- }
-
- @app.post("/setup/bootstrap")
- async def setup_bootstrap(_=Depends(_check_auth)):
- return await pipeline.bootstrap_models()
+from video_rss_aggregator.api import IngestRequest, ProcessRequest
+from video_rss_aggregator.api import create_app as create_runtime_app
+from video_rss_aggregator.bootstrap import AppRuntime, AppUseCases, build_runtime
- @app.post("/ingest")
- async def ingest(req: IngestRequest, _=Depends(_check_auth)):
- report = await pipeline.ingest_feed(req.feed_url, req.process, req.max_items)
- return {
- "feed_title": report.feed_title,
- "item_count": report.item_count,
- "processed_count": report.processed_count,
- }
- @app.post("/process")
- async def process(req: ProcessRequest, _=Depends(_check_auth)):
- report = await pipeline.process_source(req.source_url, req.title)
- return {
- "source_url": report.source_url,
- "title": report.title,
- "transcript_chars": report.transcript_chars,
- "frame_count": report.frame_count,
- "summary": {
- "summary": report.summary.summary,
- "key_points": report.summary.key_points,
- "visual_highlights": report.summary.visual_highlights,
- "model_used": report.summary.model_used,
- "vram_mb": report.summary.vram_mb,
- "error": report.summary.error,
- },
- }
+def create_app(runtime: AppRuntime | None = None, config: Config | None = None):
+ if runtime is not None and not isinstance(runtime, AppRuntime):
+ raise TypeError("create_app expects an AppRuntime or None")
- @app.get("/rss")
- async def rss_feed(limit: int = Query(20, ge=1, le=200)):
- xml = await pipeline.rss_feed(limit)
- return Response(content=xml, media_type="application/rss+xml")
+ return create_runtime_app(runtime=runtime, config=config)
- @app.get("/runtime")
- async def runtime(_=Depends(_check_auth)):
- return await pipeline.runtime_status()
- return app
+__all__ = [
+ "AppRuntime",
+ "AppUseCases",
+ "IngestRequest",
+ "ProcessRequest",
+ "build_runtime",
+ "create_app",
+]
diff --git a/adapter_rss.py b/adapter_rss.py
index e81b51f..8e0a314 100644
--- a/adapter_rss.py
+++ b/adapter_rss.py
@@ -1,49 +1,3 @@
-from __future__ import annotations
+from video_rss_aggregator.rss import render_feed
-from datetime import datetime, timezone
-from xml.etree.ElementTree import Element, SubElement, tostring
-
-from adapter_storage import SummaryRecord
-
-
-def render_feed(
- title: str,
- link: str,
- description: str,
- records: list[SummaryRecord],
-) -> str:
- rss = Element("rss", version="2.0")
- channel = SubElement(rss, "channel")
- SubElement(channel, "title").text = title
- SubElement(channel, "link").text = link
- SubElement(channel, "description").text = description
-
- for rec in records:
- item = SubElement(channel, "item")
- SubElement(item, "title").text = rec.title or "Untitled video"
- SubElement(item, "link").text = rec.source_url
-
- desc_parts = [rec.summary]
- if rec.key_points:
- bullets = "\n".join(f"- {p}" for p in rec.key_points)
- desc_parts.append(bullets)
- if rec.visual_highlights:
- visuals = "\n".join(f"- {p}" for p in rec.visual_highlights)
- desc_parts.append(f"Visual Highlights:\n{visuals}")
- if rec.model_used:
- desc_parts.append(f"Model: {rec.model_used} (VRAM {rec.vram_mb:.2f} MB)")
-
- SubElement(item, "description").text = "\n\n".join(desc_parts)
-
- if rec.published_at:
- SubElement(item, "pubDate").text = _rfc2822(rec.published_at)
-
- return '\n' + tostring(
- rss, encoding="unicode"
- )
-
-
-def _rfc2822(dt: datetime) -> str:
- if dt.tzinfo is None:
- dt = dt.replace(tzinfo=timezone.utc)
- return dt.strftime("%a, %d %b %Y %H:%M:%S %z")
+__all__ = ["render_feed"]
diff --git a/adapter_storage.py b/adapter_storage.py
index 911fba6..33b92f3 100644
--- a/adapter_storage.py
+++ b/adapter_storage.py
@@ -1,284 +1,24 @@
from __future__ import annotations
-import json
-import uuid
-from dataclasses import dataclass
-from datetime import datetime, timezone
-from pathlib import Path
+from service_summarize import SummaryResult as LegacySummaryResult
+from video_rss_aggregator.domain.models import SummaryResult
+from video_rss_aggregator.storage import Database as PackageDatabase, SummaryRecord
-import aiosqlite
-
-from service_summarize import SummaryResult
-
-
-_SCHEMA = """
-CREATE TABLE IF NOT EXISTS feeds (
- id TEXT PRIMARY KEY,
- url TEXT NOT NULL UNIQUE,
- title TEXT,
- last_checked TEXT NOT NULL
-);
-
-CREATE TABLE IF NOT EXISTS videos (
- id TEXT PRIMARY KEY,
- feed_id TEXT,
- source_url TEXT NOT NULL UNIQUE,
- guid TEXT,
- title TEXT,
- published_at TEXT,
- media_path TEXT,
- created_at TEXT NOT NULL
-);
-
-CREATE TABLE IF NOT EXISTS transcripts (
- id TEXT PRIMARY KEY,
- video_id TEXT NOT NULL,
- text TEXT NOT NULL,
- created_at TEXT NOT NULL
-);
-
-CREATE TABLE IF NOT EXISTS summaries (
- id TEXT PRIMARY KEY,
- video_id TEXT NOT NULL,
- summary TEXT NOT NULL,
- key_points TEXT NOT NULL,
- visual_highlights TEXT NOT NULL,
- model_used TEXT,
- vram_mb REAL NOT NULL,
- transcript_chars INTEGER NOT NULL,
- frame_count INTEGER NOT NULL,
- error TEXT,
- created_at TEXT NOT NULL
-);
-
-CREATE INDEX IF NOT EXISTS idx_videos_feed_id ON videos(feed_id);
-CREATE INDEX IF NOT EXISTS idx_transcripts_video_id ON transcripts(video_id);
-CREATE INDEX IF NOT EXISTS idx_summaries_video_id ON summaries(video_id);
-CREATE INDEX IF NOT EXISTS idx_summaries_created_at ON summaries(created_at DESC);
-"""
-
-_PRAGMAS = (
- "PRAGMA foreign_keys = ON",
- "PRAGMA journal_mode = WAL",
- "PRAGMA synchronous = NORMAL",
- "PRAGMA temp_store = MEMORY",
- "PRAGMA busy_timeout = 5000",
-)
-
-
-def _utc_now() -> str:
- return datetime.now(timezone.utc).isoformat()
-
-
-def _parse_dt(value: str | None) -> datetime | None:
- if not value:
- return None
- try:
- return datetime.fromisoformat(value)
- except ValueError:
- return None
-
-
-@dataclass(slots=True)
-class SummaryRecord:
- title: str | None
- source_url: str
- published_at: datetime | None
- summary: str
- key_points: list[str]
- visual_highlights: list[str]
- model_used: str | None
- vram_mb: float
- created_at: datetime
-
-
-class Database:
- def __init__(self, conn: aiosqlite.Connection, database_path: str) -> None:
- self._conn = conn
- self._database_path = database_path
-
- @classmethod
- async def connect(cls, database_path: str) -> Database:
- path = Path(database_path)
- path.parent.mkdir(parents=True, exist_ok=True)
-
- conn = await aiosqlite.connect(str(path))
- conn.row_factory = aiosqlite.Row
- for pragma in _PRAGMAS:
- await conn.execute(pragma)
- await conn.commit()
- return cls(conn, str(path))
-
- @property
- def path(self) -> str:
- return self._database_path
-
- async def close(self) -> None:
- await self._conn.close()
-
- async def migrate(self) -> None:
- await self._conn.executescript(_SCHEMA)
- await self._conn.commit()
-
- async def upsert_feed(self, url: str, title: str | None) -> str:
- now = _utc_now()
- fid = str(uuid.uuid4())
- await self._conn.execute(
- """
- INSERT INTO feeds (id, url, title, last_checked)
- VALUES (?, ?, ?, ?)
- ON CONFLICT(url)
- DO UPDATE SET
- title = COALESCE(excluded.title, feeds.title),
- last_checked = excluded.last_checked
- """,
- (fid, url, title, now),
- )
- await self._conn.commit()
-
- async with self._conn.execute(
- "SELECT id FROM feeds WHERE url = ?", (url,)
- ) as cur:
- row = await cur.fetchone()
- if row is None:
- raise RuntimeError("Failed to upsert feed")
- return str(row["id"])
-
- async def upsert_video(
- self,
- feed_id: str | None,
- guid: str | None,
- title: str | None,
- source_url: str,
- published_at: datetime | None,
- media_path: str | None = None,
- ) -> str:
- now = _utc_now()
- vid = str(uuid.uuid4())
- pub_text = published_at.isoformat() if published_at else None
-
- await self._conn.execute(
- """
- INSERT INTO videos (
- id, feed_id, source_url, guid, title, published_at, media_path, created_at
- )
- VALUES (?, ?, ?, ?, ?, ?, ?, ?)
- ON CONFLICT(source_url)
- DO UPDATE SET
- feed_id = COALESCE(excluded.feed_id, videos.feed_id),
- guid = COALESCE(excluded.guid, videos.guid),
- title = COALESCE(excluded.title, videos.title),
- published_at = COALESCE(excluded.published_at, videos.published_at),
- media_path = COALESCE(excluded.media_path, videos.media_path)
- """,
- (vid, feed_id, source_url, guid, title, pub_text, media_path, now),
- )
- await self._conn.commit()
-
- async with self._conn.execute(
- "SELECT id FROM videos WHERE source_url = ?",
- (source_url,),
- ) as cur:
- row = await cur.fetchone()
- if row is None:
- raise RuntimeError("Failed to upsert video")
- return str(row["id"])
-
- async def insert_transcript(self, video_id: str, text: str) -> str:
- tid = str(uuid.uuid4())
- await self._conn.execute(
- """
- INSERT INTO transcripts (id, video_id, text, created_at)
- VALUES (?, ?, ?, ?)
- """,
- (tid, video_id, text, _utc_now()),
- )
- await self._conn.commit()
- return tid
+class Database(PackageDatabase):
async def insert_summary(self, video_id: str, result: SummaryResult) -> str:
- sid = str(uuid.uuid4())
- await self._conn.execute(
- """
- INSERT INTO summaries (
- id,
- video_id,
- summary,
- key_points,
- visual_highlights,
- model_used,
- vram_mb,
- transcript_chars,
- frame_count,
- error,
- created_at
+ if isinstance(result, LegacySummaryResult):
+ result = SummaryResult(
+ summary=result.summary,
+ key_points=tuple(result.key_points),
+ visual_highlights=tuple(result.visual_highlights),
+ model_used=result.model_used,
+ vram_mb=result.vram_mb,
+ transcript_chars=result.transcript_chars,
+ frame_count=result.frame_count,
+ error=result.error,
)
- VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
- """,
- (
- sid,
- video_id,
- result.summary,
- json.dumps(result.key_points),
- json.dumps(result.visual_highlights),
- result.model_used,
- float(result.vram_mb),
- int(result.transcript_chars),
- int(result.frame_count),
- result.error,
- _utc_now(),
- ),
- )
- await self._conn.commit()
- return sid
-
- async def latest_summaries(self, limit: int = 20) -> list[SummaryRecord]:
- async with self._conn.execute(
- """
- SELECT
- v.title,
- v.source_url,
- v.published_at,
- s.summary,
- s.key_points,
- s.visual_highlights,
- s.model_used,
- s.vram_mb,
- s.created_at
- FROM summaries s
- JOIN videos v ON v.id = s.video_id
- ORDER BY s.created_at DESC
- LIMIT ?
- """,
- (limit,),
- ) as cur:
- rows = await cur.fetchall()
+ return await super().insert_summary(video_id, result)
- out: list[SummaryRecord] = []
- for row in rows:
- key_points_raw = row["key_points"]
- visual_raw = row["visual_highlights"]
- try:
- key_points = json.loads(key_points_raw) if key_points_raw else []
- except json.JSONDecodeError:
- key_points = []
- try:
- visual = json.loads(visual_raw) if visual_raw else []
- except json.JSONDecodeError:
- visual = []
- created = _parse_dt(str(row["created_at"]))
- out.append(
- SummaryRecord(
- title=row["title"],
- source_url=row["source_url"],
- published_at=_parse_dt(row["published_at"]),
- summary=row["summary"],
- key_points=key_points if isinstance(key_points, list) else [],
- visual_highlights=visual if isinstance(visual, list) else [],
- model_used=row["model_used"],
- vram_mb=float(row["vram_mb"] or 0.0),
- created_at=created or datetime.now(timezone.utc),
- )
- )
- return out
+__all__ = ["Database", "SummaryRecord"]
diff --git a/cli.py b/cli.py
index 41df40c..ff32d55 100644
--- a/cli.py
+++ b/cli.py
@@ -15,8 +15,9 @@
from core_config import Config
from service_media import extract_frames_ffmpeg, prepare_source
-from service_pipeline import Pipeline
from service_summarize import SummarizationEngine
+from video_rss_aggregator.bootstrap import AppRuntime, build_runtime
+from video_rss_aggregator.domain.outcomes import Failure
log = logging.getLogger(__name__)
@@ -85,6 +86,12 @@ def _as_float(value: object) -> float:
return 0.0
+async def _close_runtime(runtime: AppRuntime) -> None:
+ close_result = runtime.close()
+ if close_result is not None:
+ await close_result
+
+
@click.group()
def cli() -> None:
"""Video RSS Aggregator powered by Qwen3.5 vision models."""
@@ -97,21 +104,30 @@ def bootstrap() -> None:
config = Config.from_env()
async def _run() -> None:
- summarizer = SummarizationEngine(config)
+ runtime = await build_runtime(config)
try:
- pulled = await summarizer.prepare_models()
- runtime = await summarizer.runtime_status()
+ report = await runtime.use_cases.bootstrap_runtime.execute()
+ models_prepared = report.get("models_prepared")
+ if models_prepared is None:
+ models_prepared = report["models"]
+
+ runtime_payload = report.get("runtime")
+ if isinstance(runtime_payload, dict):
+ runtime_response = runtime_payload
+ else:
+ runtime_response = await runtime.use_cases.get_runtime_status.execute()
+
print(
json.dumps(
{
- "models_prepared": pulled,
- "runtime": runtime,
+ "models_prepared": models_prepared,
+ "runtime": runtime_response,
},
indent=2,
)
)
finally:
- await summarizer.close()
+ await _close_runtime(runtime)
try:
asyncio.run(_run())
@@ -138,14 +154,10 @@ def serve(bind: str | None) -> None:
async def _run() -> None:
from adapter_api import create_app
- pipeline = await Pipeline.create(config)
- try:
- app = create_app(pipeline, config)
- uv_cfg = uvicorn.Config(app, host=host, port=port, log_level="info")
- server = uvicorn.Server(uv_cfg)
- await server.serve()
- finally:
- await pipeline.close()
+ app = create_app(config=config)
+ uv_cfg = uvicorn.Config(app, host=host, port=port, log_level="info")
+ server = uvicorn.Server(uv_cfg)
+ await server.serve()
try:
asyncio.run(_run())
@@ -159,13 +171,12 @@ def status() -> None:
config = Config.from_env()
async def _run() -> None:
- summarizer = SummarizationEngine(config)
+ runtime = await build_runtime(config)
try:
- await summarizer.prepare_models()
- runtime = await summarizer.runtime_status()
- print(json.dumps(runtime, indent=2))
+ status_payload = await runtime.use_cases.get_runtime_status.execute()
+ print(json.dumps(status_payload, indent=2))
finally:
- await summarizer.close()
+ await _close_runtime(runtime)
try:
asyncio.run(_run())
@@ -181,31 +192,34 @@ def verify(source: str, title: str | None) -> None:
config = Config.from_env()
async def _run() -> None:
- pipeline = await Pipeline.create(config)
+ runtime = await build_runtime(config)
try:
t0 = monotonic()
- report = await pipeline.process_source(source, title)
+ outcome = await runtime.use_cases.process_source.execute(source, title)
+ if isinstance(outcome, Failure):
+ raise click.ClickException(outcome.reason)
+
total_ms = int((monotonic() - t0) * 1000)
print(
json.dumps(
{
- "source_url": report.source_url,
- "title": report.title,
- "transcript_chars": report.transcript_chars,
- "frame_count": report.frame_count,
- "model_used": report.summary.model_used,
- "vram_mb": report.summary.vram_mb,
- "error": report.summary.error,
- "summary_chars": len(report.summary.summary),
- "key_points": len(report.summary.key_points),
+ "source_url": outcome.media.source_url,
+ "title": outcome.media.title,
+ "transcript_chars": outcome.summary.transcript_chars,
+ "frame_count": outcome.summary.frame_count,
+ "model_used": outcome.summary.model_used,
+ "vram_mb": outcome.summary.vram_mb,
+ "error": outcome.summary.error,
+ "summary_chars": len(outcome.summary.summary),
+ "key_points": len(outcome.summary.key_points),
"total_ms": total_ms,
},
indent=2,
)
)
finally:
- await pipeline.close()
+ await _close_runtime(runtime)
try:
asyncio.run(_run())
diff --git a/docs/plans/2026-03-09-codebase-architecture-redesign-design.md b/docs/plans/2026-03-09-codebase-architecture-redesign-design.md
new file mode 100644
index 0000000..f59429c
--- /dev/null
+++ b/docs/plans/2026-03-09-codebase-architecture-redesign-design.md
@@ -0,0 +1,214 @@
+# Codebase Architecture Redesign
+
+Date: 2026-03-09
+Status: Approved
+
+## Goal
+
+Define an ideal end-state architecture for the Video RSS Aggregator that improves
+maintainability, reliability, delivery velocity, and testability.
+
+## Context
+
+The current codebase works, but several design pressures now slow it down:
+
+- `Pipeline` mixes composition, orchestration, feed ingestion, processing,
+ persistence, runtime reporting, and RSS generation.
+- Data contracts leak across layers, especially between summarization,
+ persistence, and RSS rendering.
+- CLI and API do not consistently share one application-level workflow model.
+- Setup/runtime data is shaped in multiple places across Python, HTML, and JS.
+- Tests rely heavily on monkeypatching concrete modules instead of stable seams.
+
+## Chosen Approach
+
+Adopt a ports-and-use-cases architecture with four layers:
+
+1. Adapters
+2. Application
+3. Domain
+4. Infrastructure
+
+This was selected over a lighter workflow-slice refactor or a stabilized version
+of the current layering because the goal is an ideal long-term architecture,
+not only a lower-risk cleanup.
+
+## Architectural Principles
+
+- Dependencies point inward only.
+- Business workflows live in application use cases, not transport adapters.
+- Domain types are stable and independent of storage, HTTP, CLI, and model APIs.
+- Infrastructure implements ports instead of owning business rules.
+- Composition happens once in a single composition root.
+
+## Target Architecture
+
+### Adapters
+
+Adapters translate external interactions into application requests and map
+application responses back out.
+
+- FastAPI routes
+- CLI commands
+- GUI/setup endpoints and page models
+- RSS HTTP delivery surface
+
+Adapters should not contain orchestration logic beyond request mapping,
+validation, and response formatting.
+
+### Application
+
+Application use cases coordinate business workflows through explicit ports.
+
+Core use cases:
+
+- `BootstrapRuntime`
+- `GetRuntimeStatus`
+- `IngestFeed`
+- `ProcessSource`
+- `RenderRssFeed`
+
+The current `Pipeline` class should be replaced by these focused use cases.
+
+### Domain
+
+Domain types define the stable language of the system.
+
+Illustrative types:
+
+- `SourceItem`
+- `VideoRecord`
+- `Transcript`
+- `PreparedMedia`
+- `SummaryDraft`
+- `SummaryResult`
+- `ProcessOutcome`
+- `DiagnosticReport`
+
+These types must not depend on SQLite rows, Ollama payloads, FastAPI models,
+Click commands, or subprocess output.
+
+### Infrastructure
+
+Infrastructure adapters implement application ports.
+
+- SQLite repositories
+- Ollama client adapter
+- Feed fetching adapter
+- Media preparation adapter around yt-dlp, ffmpeg, and filesystem artifacts
+- RSS rendering adapter
+- Runtime inspection adapter
+
+Infrastructure owns transport and tool integration details, but not workflow
+decisions.
+
+## Ports
+
+The application layer should depend on explicit interfaces such as:
+
+- `FeedSource`
+- `VideoRepository`
+- `SummaryRepository`
+- `MediaPreparationService`
+- `Summarizer`
+- `RuntimeInspector`
+- `PublicationRenderer`
+- `ArtifactStore`
+
+This creates narrow seams for tests and keeps adapters replaceable.
+
+## Data Flow
+
+### Startup
+
+The composition root loads `Config`, builds infrastructure adapters, wires use
+cases, and exposes them to FastAPI and CLI entry points. Web startup and
+shutdown should be owned by FastAPI lifespan rather than by a prebuilt runtime
+object created externally.
+
+### Ingest
+
+`IngestFeed` fetches and parses a feed, normalizes entries into domain types,
+stores feed/video metadata, and optionally delegates processing to
+`ProcessSource`. It should not own processing internals.
+
+### Processing
+
+`ProcessSource` asks `MediaPreparationService` for `PreparedMedia`, passes the
+result to `Summarizer`, then persists a typed `ProcessOutcome`.
+
+This use case owns the decision about whether a result is successful, degraded,
+or failed.
+
+### Publication
+
+`RenderRssFeed` reads published summaries through repositories and passes stable
+publication models to a renderer. RSS generation should not depend on storage
+row types.
+
+### Setup and Runtime
+
+`BootstrapRuntime` and `GetRuntimeStatus` should return one application-level
+view model shared by API and GUI, replacing duplicated config/setup shaping.
+
+## Error Handling Model
+
+Replace implicit fallback-heavy success semantics with explicit outcome types:
+
+- `Success`
+- `PartialSuccess`
+- `Failure`
+
+Rules:
+
+- Adapter-specific exceptions are translated at the use-case boundary.
+- `PartialSuccess` is used when the system produced a degraded but valid result.
+- `Failure` means the business goal was not achieved and must not be presented
+ as a normal success.
+- Persistence records outcome status explicitly rather than relying on summary
+ text to reveal degradation.
+- API and CLI surfaces report status directly.
+
+Diagnostics remain separate from processing outcomes.
+
+## Testing Strategy
+
+The main regression net should move to application-boundary contract tests for:
+
+- `BootstrapRuntime`
+- `GetRuntimeStatus`
+- `IngestFeed`
+- `ProcessSource`
+- `RenderRssFeed`
+
+Supporting tests should be split into:
+
+- repository integration tests
+- Ollama adapter tests
+- media adapter tests
+- FastAPI adapter tests
+- CLI adapter tests
+- policy tests for model selection, degradation classification, normalization,
+ and retention/publication rules
+
+The desired end state is that most business behavior can be tested without
+FastAPI, Click, SQLite, subprocesses, or a live Ollama runtime.
+
+## Non-Goals
+
+- Defining the full migration sequence in this document
+- Implementing the redesign directly from adapters inward
+- Preserving the current `Pipeline` shape as a compatibility constraint
+
+## Expected Benefits
+
+- clearer ownership of business workflows
+- lower coupling between persistence, summarization, and presentation
+- consistent behavior across API and CLI
+- easier unit and contract testing
+- safer future changes to model policy, media tooling, and publishing
+
+## Next Step
+
+Create a dedicated implementation plan that stages the migration from the
+current codebase into this target architecture.
diff --git a/pyproject.toml b/pyproject.toml
index 32d25c0..3b8a078 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -36,13 +36,15 @@ py-modules = [
"service_ollama",
"service_transcribe",
"service_summarize",
- "service_pipeline",
"adapter_gui",
"adapter_api",
"adapter_rss",
"adapter_storage",
]
+[tool.setuptools.packages.find]
+include = ["video_rss_aggregator*"]
+
[tool.pytest.ini_options]
testpaths = ["tests"]
addopts = "-q"
diff --git a/service_pipeline.py b/service_pipeline.py
deleted file mode 100644
index 1640aea..0000000
--- a/service_pipeline.py
+++ /dev/null
@@ -1,208 +0,0 @@
-from __future__ import annotations
-
-from dataclasses import dataclass
-from datetime import datetime, timezone
-from time import struct_time
-from typing import Any
-
-import feedparser
-import httpx
-
-from adapter_rss import render_feed
-from adapter_storage import Database
-from core_config import Config
-from service_media import prepare_media
-from service_summarize import SummarizationEngine, SummaryResult
-
-
-@dataclass(slots=True)
-class IngestReport:
- feed_title: str | None = None
- item_count: int = 0
- processed_count: int = 0
-
-
-@dataclass(slots=True)
-class ProcessReport:
- source_url: str
- title: str | None
- transcript_chars: int
- frame_count: int
- summary: SummaryResult
-
-
-class Pipeline:
- def __init__(
- self,
- config: Config,
- db: Database,
- summarizer: SummarizationEngine,
- ) -> None:
- self._config = config
- self._db = db
- self._summarizer = summarizer
- self._client = httpx.AsyncClient(
- timeout=httpx.Timeout(connect=15.0, read=300.0, write=300.0, pool=60.0),
- follow_redirects=True,
- limits=httpx.Limits(max_keepalive_connections=20, max_connections=50),
- )
-
- @classmethod
- async def create(cls, config: Config) -> Pipeline:
- db = await Database.connect(config.database_path)
- await db.migrate()
-
- summarizer = SummarizationEngine(config)
- await summarizer.prepare_models()
-
- return cls(config, db, summarizer)
-
- async def close(self) -> None:
- await self._client.aclose()
- await self._summarizer.close()
- await self._db.close()
-
- async def runtime_status(self) -> dict[str, Any]:
- status = await self._summarizer.runtime_status()
- status["database_path"] = self._db.path
- status["storage_dir"] = self._config.storage_dir
- status["models"] = list(self._config.model_priority)
- return status
-
- async def bootstrap_models(self) -> dict[str, Any]:
- prepared = await self._summarizer.prepare_models()
- return {
- "models_prepared": prepared,
- "runtime": await self.runtime_status(),
- }
-
- async def ingest_feed(
- self,
- feed_url: str,
- process: bool = False,
- max_items: int | None = None,
- ) -> IngestReport:
- resp = await self._client.get(feed_url)
- resp.raise_for_status()
- parsed = feedparser.parse(resp.text)
-
- feed_title = parsed.feed.get("title")
- feed_id = await self._db.upsert_feed(feed_url, feed_title)
-
- entries = parsed.entries
- if max_items is not None:
- entries = entries[:max_items]
-
- report = IngestReport(feed_title=feed_title)
-
- for entry in entries:
- source_url = _pick_source_url(entry)
- if not source_url:
- continue
-
- title = entry.get("title")
- guid = entry.get("id") or None
- published = entry.get("published_parsed") or entry.get("updated_parsed")
- published_at = _struct_to_dt(published) if published else None
-
- video_id = await self._db.upsert_video(
- feed_id,
- guid,
- title,
- source_url,
- published_at,
- )
- report.item_count += 1
-
- if process:
- processed = await self._process_with_video(video_id, source_url, title)
- if processed.summary.summary:
- report.processed_count += 1
-
- return report
-
- async def process_source(
- self,
- source_url: str,
- title: str | None = None,
- ) -> ProcessReport:
- video_id = await self._db.upsert_video(
- feed_id=None,
- guid=None,
- title=title,
- source_url=source_url,
- published_at=None,
- )
- return await self._process_with_video(video_id, source_url, title)
-
- async def rss_feed(self, limit: int = 20) -> str:
- records = await self._db.latest_summaries(limit)
- return render_feed(
- self._config.rss_title,
- self._config.rss_link,
- self._config.rss_description,
- records,
- )
-
- async def _process_with_video(
- self,
- video_id: str,
- source_url: str,
- title: str | None,
- ) -> ProcessReport:
- prepared = await prepare_media(
- client=self._client,
- source=source_url,
- storage_dir=self._config.storage_dir,
- max_frames=self._config.max_frames,
- scene_detection=self._config.frame_scene_detection,
- scene_threshold=self._config.frame_scene_threshold,
- scene_min_frames=self._config.frame_scene_min_frames,
- max_transcript_chars=self._config.max_transcript_chars,
- )
-
- resolved_title = title or prepared.title
- await self._db.upsert_video(
- feed_id=None,
- guid=None,
- title=resolved_title,
- source_url=source_url,
- published_at=None,
- media_path=str(prepared.media_path),
- )
-
- if prepared.transcript:
- await self._db.insert_transcript(video_id, prepared.transcript)
-
- summary = await self._summarizer.summarize(
- source_url=source_url,
- title=resolved_title,
- transcript=prepared.transcript,
- frame_paths=prepared.frame_paths,
- )
- await self._db.insert_summary(video_id, summary)
-
- return ProcessReport(
- source_url=source_url,
- title=resolved_title,
- transcript_chars=len(prepared.transcript),
- frame_count=len(prepared.frame_paths),
- summary=summary,
- )
-
-
-def _pick_source_url(entry: Any) -> str | None:
- enclosures = entry.get("enclosures", [])
- if enclosures:
- return enclosures[0].get("href") or enclosures[0].get("url")
- links = entry.get("links", [])
- if links:
- return links[0].get("href")
- return entry.get("link")
-
-
-def _struct_to_dt(value: struct_time) -> datetime | None:
- try:
- return datetime(*value[:6], tzinfo=timezone.utc)
- except Exception:
- return None
diff --git a/tests/adapters/test_api_app.py b/tests/adapters/test_api_app.py
new file mode 100644
index 0000000..c6cbc25
--- /dev/null
+++ b/tests/adapters/test_api_app.py
@@ -0,0 +1,157 @@
+from __future__ import annotations
+
+from dataclasses import dataclass, field
+from typing import Any
+
+from fastapi.testclient import TestClient
+
+from core_config import Config
+from video_rss_aggregator.api import create_app
+from video_rss_aggregator.application.use_cases.ingest_feed import IngestReport
+from video_rss_aggregator.bootstrap import AppRuntime, AppUseCases
+from video_rss_aggregator.domain.models import PreparedMedia, SummaryResult
+from video_rss_aggregator.domain.outcomes import PartialSuccess
+
+
+@dataclass
+class RecordingUseCase:
+ result: Any
+ calls: list[tuple[Any, ...]] = field(default_factory=list)
+
+ async def execute(self, *args: Any) -> Any:
+ self.calls.append(args)
+ return self.result
+
+
+def _build_runtime() -> tuple[AppRuntime, dict[str, RecordingUseCase]]:
+ runtime_status = RecordingUseCase(
+ {
+ "ollama_version": "0.6.0",
+ "local_models": {"qwen": {"size": 1}},
+ "reachable": True,
+ "database_path": ".data/runtime.db",
+ "storage_dir": ".data",
+ "models": ["qwen", "qwen:min"],
+ }
+ )
+ bootstrap_runtime = RecordingUseCase({"models": ["qwen"]})
+ ingest_feed = RecordingUseCase(
+ IngestReport(feed_title="Example Feed", item_count=2, processed_count=1)
+ )
+ process_source = RecordingUseCase(
+ PartialSuccess(
+ media=PreparedMedia(
+ source_url="https://example.com/watch?v=1",
+ title="Example Title",
+ transcript="transcript text",
+ media_path="/tmp/video.mp4",
+ frame_paths=("/tmp/frame-1.jpg", "/tmp/frame-2.jpg"),
+ ),
+ reason="model degraded",
+ summary=SummaryResult(
+ summary="Compact summary",
+ key_points=("one", "two"),
+ visual_highlights=("frame",),
+ model_used="qwen",
+ vram_mb=512.0,
+ transcript_chars=15,
+ frame_count=2,
+ error="model degraded",
+ ),
+ )
+ )
+ render_rss_feed = RecordingUseCase("feed")
+
+ use_cases = {
+ "get_runtime_status": runtime_status,
+ "bootstrap_runtime": bootstrap_runtime,
+ "ingest_feed": ingest_feed,
+ "process_source": process_source,
+ "render_rss_feed": render_rss_feed,
+ }
+
+ return (
+ AppRuntime(
+ config=Config(),
+ use_cases=AppUseCases(**use_cases),
+ close=lambda: None,
+ ),
+ use_cases,
+ )
+
+
+def test_routes_delegate_to_runtime_use_cases_and_keep_http_shapes() -> None:
+ runtime, use_cases = _build_runtime()
+ client = TestClient(create_app(runtime))
+
+ ingest = client.post(
+ "/ingest",
+ json={
+ "feed_url": "https://example.com/feed.xml",
+ "process": True,
+ "max_items": 3,
+ },
+ )
+ process = client.post(
+ "/process",
+ json={
+ "source_url": "https://example.com/watch?v=1",
+ "title": "Example Title",
+ },
+ )
+ rss = client.get("/rss?limit=5")
+ runtime_response = client.get("/runtime")
+ bootstrap = client.post("/setup/bootstrap")
+
+ assert ingest.status_code == 200
+ assert ingest.json() == {
+ "feed_title": "Example Feed",
+ "item_count": 2,
+ "processed_count": 1,
+ }
+ assert process.status_code == 200
+ assert process.json() == {
+ "source_url": "https://example.com/watch?v=1",
+ "title": "Example Title",
+ "transcript_chars": 15,
+ "frame_count": 2,
+ "summary": {
+ "summary": "Compact summary",
+ "key_points": ["one", "two"],
+ "visual_highlights": ["frame"],
+ "model_used": "qwen",
+ "vram_mb": 512.0,
+ "error": "model degraded",
+ },
+ }
+ assert rss.status_code == 200
+ assert rss.text == "feed"
+ assert runtime_response.status_code == 200
+ assert runtime_response.json() == {
+ "ollama_version": "0.6.0",
+ "local_models": {"qwen": {"size": 1}},
+ "reachable": True,
+ "database_path": ".data/runtime.db",
+ "storage_dir": ".data",
+ "models": ["qwen", "qwen:min"],
+ }
+ assert bootstrap.status_code == 200
+ assert bootstrap.json() == {
+ "models_prepared": ["qwen"],
+ "runtime": {
+ "ollama_version": "0.6.0",
+ "local_models": {"qwen": {"size": 1}},
+ "reachable": True,
+ "database_path": ".data/runtime.db",
+ "storage_dir": ".data",
+ "models": ["qwen", "qwen:min"],
+ },
+ }
+
+ assert use_cases["ingest_feed"].calls == [("https://example.com/feed.xml", True, 3)]
+ assert use_cases["process_source"].calls == [
+ ("https://example.com/watch?v=1", "Example Title")
+ ]
+ assert use_cases["render_rss_feed"].calls == [(5,)]
+ assert use_cases["get_runtime_status"].calls == [(), ()]
+ assert use_cases["bootstrap_runtime"].calls == [()]
diff --git a/tests/adapters/test_cli_commands.py b/tests/adapters/test_cli_commands.py
new file mode 100644
index 0000000..f32638f
--- /dev/null
+++ b/tests/adapters/test_cli_commands.py
@@ -0,0 +1,227 @@
+from __future__ import annotations
+
+import json
+from dataclasses import dataclass, field
+from pathlib import Path
+from typing import Any
+
+from click.testing import CliRunner
+
+import cli as cli_module
+from core_config import Config
+from video_rss_aggregator.bootstrap import AppRuntime, AppUseCases
+from video_rss_aggregator.domain.models import PreparedMedia, SummaryResult
+from video_rss_aggregator.domain.outcomes import PartialSuccess
+
+
+@dataclass
+class RecordingUseCase:
+ result: Any
+ calls: list[tuple[Any, ...]] = field(default_factory=list)
+
+ async def execute(self, *args: Any) -> Any:
+ self.calls.append(args)
+ return self.result
+
+
+def _build_runtime() -> tuple[AppRuntime, dict[str, RecordingUseCase]]:
+ runtime_status = RecordingUseCase(
+ {
+ "ollama_version": "0.6.0",
+ "local_models": {"qwen": {"size": 1}},
+ "reachable": True,
+ "database_path": ".data/runtime.db",
+ "storage_dir": ".data",
+ "models": ["qwen", "qwen:min"],
+ }
+ )
+ bootstrap_runtime = RecordingUseCase({"models": ["qwen"]})
+ process_source = RecordingUseCase(
+ PartialSuccess(
+ media=PreparedMedia(
+ source_url="https://example.com/watch?v=1",
+ title="Example Title",
+ transcript="transcript text",
+ media_path="/tmp/video.mp4",
+ frame_paths=("/tmp/frame-1.jpg", "/tmp/frame-2.jpg"),
+ ),
+ reason="model degraded",
+ summary=SummaryResult(
+ summary="Compact summary",
+ key_points=("one", "two"),
+ visual_highlights=("frame",),
+ model_used="qwen",
+ vram_mb=512.0,
+ transcript_chars=15,
+ frame_count=2,
+ error="model degraded",
+ ),
+ )
+ )
+ passthrough = RecordingUseCase(None)
+
+ use_cases = {
+ "get_runtime_status": runtime_status,
+ "bootstrap_runtime": bootstrap_runtime,
+ "ingest_feed": passthrough,
+ "process_source": process_source,
+ "render_rss_feed": passthrough,
+ }
+
+ closed = {"value": False}
+
+ async def _close() -> None:
+ closed["value"] = True
+
+ return (
+ AppRuntime(
+ config=Config(),
+ use_cases=AppUseCases(**use_cases),
+ close=_close,
+ ),
+ {**use_cases, "closed": closed},
+ )
+
+
+def test_bootstrap_uses_runtime_use_cases_and_keeps_json_shape(monkeypatch) -> None:
+ runtime, use_cases = _build_runtime()
+
+ async def fake_build_runtime(config: Config | None = None) -> AppRuntime:
+ return runtime
+
+ class LegacySummarizationEngine:
+ def __init__(self, config: Config) -> None:
+ raise AssertionError("legacy summarizer wiring used")
+
+ monkeypatch.setattr(cli_module, "build_runtime", fake_build_runtime, raising=False)
+ monkeypatch.setattr(cli_module, "SummarizationEngine", LegacySummarizationEngine)
+
+ result = CliRunner().invoke(cli_module.cli, ["bootstrap"])
+
+ assert result.exit_code == 0, result.output
+ assert json.loads(result.output) == {
+ "models_prepared": ["qwen"],
+ "runtime": {
+ "ollama_version": "0.6.0",
+ "local_models": {"qwen": {"size": 1}},
+ "reachable": True,
+ "database_path": ".data/runtime.db",
+ "storage_dir": ".data",
+ "models": ["qwen", "qwen:min"],
+ },
+ }
+ assert use_cases["bootstrap_runtime"].calls == [()]
+ assert use_cases["get_runtime_status"].calls == [()]
+ assert use_cases["closed"]["value"] is True
+
+
+def test_status_uses_runtime_use_case_and_keeps_json_shape(monkeypatch) -> None:
+ runtime, use_cases = _build_runtime()
+
+ async def fake_build_runtime(config: Config | None = None) -> AppRuntime:
+ return runtime
+
+ class LegacySummarizationEngine:
+ def __init__(self, config: Config) -> None:
+ raise AssertionError("legacy summarizer wiring used")
+
+ monkeypatch.setattr(cli_module, "build_runtime", fake_build_runtime, raising=False)
+ monkeypatch.setattr(cli_module, "SummarizationEngine", LegacySummarizationEngine)
+
+ result = CliRunner().invoke(cli_module.cli, ["status"])
+
+ assert result.exit_code == 0, result.output
+ assert json.loads(result.output) == {
+ "ollama_version": "0.6.0",
+ "local_models": {"qwen": {"size": 1}},
+ "reachable": True,
+ "database_path": ".data/runtime.db",
+ "storage_dir": ".data",
+ "models": ["qwen", "qwen:min"],
+ }
+ assert use_cases["get_runtime_status"].calls == [()]
+ assert use_cases["closed"]["value"] is True
+
+
+def test_verify_uses_runtime_process_source_and_keeps_metrics_shape(
+ monkeypatch,
+) -> None:
+ runtime, use_cases = _build_runtime()
+ monotonic_values = iter([100.0, 100.25])
+
+ async def fake_build_runtime(config: Config | None = None) -> AppRuntime:
+ return runtime
+
+ monkeypatch.setattr(cli_module, "build_runtime", fake_build_runtime, raising=False)
+ monkeypatch.setattr(cli_module, "monotonic", lambda: next(monotonic_values))
+
+ result = CliRunner().invoke(
+ cli_module.cli,
+ [
+ "verify",
+ "--source",
+ "https://example.com/watch?v=1",
+ "--title",
+ "Example Title",
+ ],
+ )
+
+ assert result.exit_code == 0, result.output
+ assert json.loads(result.output) == {
+ "source_url": "https://example.com/watch?v=1",
+ "title": "Example Title",
+ "transcript_chars": 15,
+ "frame_count": 2,
+ "model_used": "qwen",
+ "vram_mb": 512.0,
+ "error": "model degraded",
+ "summary_chars": 15,
+ "key_points": 2,
+ "total_ms": 250,
+ }
+ assert use_cases["process_source"].calls == [
+ ("https://example.com/watch?v=1", "Example Title")
+ ]
+ assert use_cases["closed"]["value"] is True
+
+
+def test_serve_builds_runtime_then_passes_it_to_app_factory(monkeypatch) -> None:
+ calls: dict[str, Any] = {}
+
+ def fake_create_app(
+ runtime: AppRuntime | None = None,
+ config: Config | None = None,
+ ) -> object:
+ calls["create_app_runtime"] = runtime
+ calls["create_app_config"] = config
+ return object()
+
+ class FakeUvicornConfig:
+ def __init__(self, app: object, host: str, port: int, log_level: str) -> None:
+ calls["uvicorn_app"] = app
+ calls["uvicorn_host"] = host
+ calls["uvicorn_port"] = port
+ calls["uvicorn_log_level"] = log_level
+
+ class FakeUvicornServer:
+ def __init__(self, config: FakeUvicornConfig) -> None:
+ calls["uvicorn_config"] = config
+
+ async def serve(self) -> None:
+ calls["served"] = True
+
+ monkeypatch.setattr("adapter_api.create_app", fake_create_app)
+ monkeypatch.setattr(cli_module.uvicorn, "Config", FakeUvicornConfig)
+ monkeypatch.setattr(cli_module.uvicorn, "Server", FakeUvicornServer)
+
+ result = CliRunner().invoke(cli_module.cli, ["serve", "--bind", "127.0.0.1:8080"])
+
+ assert result.exit_code == 0, result.output
+ assert calls["create_app_runtime"] is None
+ assert calls["create_app_config"] == Config(
+ database_path=str(Path(".data") / "vra.db")
+ )
+ assert calls["uvicorn_host"] == "127.0.0.1"
+ assert calls["uvicorn_port"] == 8080
+ assert calls["uvicorn_log_level"] == "info"
+ assert calls["served"] is True
diff --git a/tests/application/test_ingest_feed.py b/tests/application/test_ingest_feed.py
new file mode 100644
index 0000000..75e9eec
--- /dev/null
+++ b/tests/application/test_ingest_feed.py
@@ -0,0 +1,214 @@
+from datetime import datetime, timezone
+
+import pytest
+
+from video_rss_aggregator.application.ports import FetchedFeed, FetchedFeedEntry
+from video_rss_aggregator.application.use_cases.ingest_feed import IngestFeed
+from video_rss_aggregator.domain.outcomes import Failure
+
+
+class FakeFeedSource:
+ async def fetch(self, feed_url: str, max_items: int | None = None):
+ entries = (
+ FetchedFeedEntry(source_url="https://example.com/1", title="One", guid="1"),
+ FetchedFeedEntry(source_url="https://example.com/2", title="Two", guid="2"),
+ )
+ return FetchedFeed(
+ title="Example Feed",
+ site_url="https://example.com",
+ entries=entries[:max_items] if max_items is not None else entries,
+ )
+
+
+class FakeFeedRepository:
+ def __init__(self) -> None:
+ self.saved: list[tuple[str, FetchedFeed]] = []
+
+ async def save(self, feed_url: str, feed: FetchedFeed) -> None:
+ self.saved.append((feed_url, feed))
+
+
+class FakeVideoRepository:
+ def __init__(self) -> None:
+ self.saved: list[tuple[str, FetchedFeedEntry]] = []
+
+ async def save_feed_item(self, feed_url: str, entry: FetchedFeedEntry) -> None:
+ self.saved.append((feed_url, entry))
+
+
+class FakeProcessSource:
+ def __init__(self) -> None:
+ self.calls: list[tuple[str, str | None]] = []
+ self.results: dict[str, object] = {}
+
+ async def execute(self, source_url: str, title: str | None):
+ self.calls.append((source_url, title))
+ return self.results.get(source_url)
+
+
+@pytest.fixture
+def anyio_backend() -> str:
+ return "asyncio"
+
+
+@pytest.mark.anyio
+async def test_ingest_feed_tracks_processed_items() -> None:
+ feeds = FakeFeedRepository()
+ videos = FakeVideoRepository()
+ process_source = FakeProcessSource()
+ use_case = IngestFeed(
+ feed_source=FakeFeedSource(),
+ feeds=feeds,
+ videos=videos,
+ process_source=process_source,
+ )
+
+ report = await use_case.execute(
+ "https://example.com/feed.xml", process=True, max_items=1
+ )
+
+ assert report.item_count == 1
+ assert report.processed_count == 1
+ assert report.feed_title == "Example Feed"
+ assert feeds.saved == [
+ (
+ "https://example.com/feed.xml",
+ FetchedFeed(
+ title="Example Feed",
+ site_url="https://example.com",
+ entries=(
+ FetchedFeedEntry(
+ source_url="https://example.com/1", title="One", guid="1"
+ ),
+ ),
+ ),
+ )
+ ]
+ assert videos.saved == [
+ (
+ "https://example.com/feed.xml",
+ FetchedFeedEntry(source_url="https://example.com/1", title="One", guid="1"),
+ )
+ ]
+ assert process_source.calls == [("https://example.com/1", "One")]
+
+
+class FakeFeedSourceWithInvalidEntries:
+ async def fetch(self, feed_url: str, max_items: int | None = None):
+ return FetchedFeed(
+ title=None,
+ site_url=None,
+ entries=(
+ FetchedFeedEntry(source_url="", title="Blank", guid="blank"),
+ FetchedFeedEntry(
+ source_url="https://example.com/valid", title=None, guid="ok"
+ ),
+ FetchedFeedEntry(source_url=" ", title="Whitespace", guid="space"),
+ ),
+ )
+
+
+@pytest.mark.anyio
+async def test_ingest_feed_skips_entries_without_source_url() -> None:
+ feeds = FakeFeedRepository()
+ videos = FakeVideoRepository()
+ process_source = FakeProcessSource()
+ use_case = IngestFeed(
+ feed_source=FakeFeedSourceWithInvalidEntries(),
+ feeds=feeds,
+ videos=videos,
+ process_source=process_source,
+ )
+
+ report = await use_case.execute("https://example.com/feed.xml", process=True)
+
+ assert report.item_count == 1
+ assert report.processed_count == 1
+ assert report.feed_title is None
+ assert feeds.saved == [
+ (
+ "https://example.com/feed.xml",
+ FetchedFeed(
+ title=None,
+ site_url=None,
+ entries=(
+ FetchedFeedEntry(
+ source_url="https://example.com/valid", title=None, guid="ok"
+ ),
+ ),
+ ),
+ )
+ ]
+ assert videos.saved == [
+ (
+ "https://example.com/feed.xml",
+ FetchedFeedEntry(
+ source_url="https://example.com/valid", title=None, guid="ok"
+ ),
+ )
+ ]
+ assert process_source.calls == [("https://example.com/valid", None)]
+
+
+@pytest.mark.anyio
+async def test_ingest_feed_counts_only_non_failure_results_as_processed() -> None:
+ feeds = FakeFeedRepository()
+ videos = FakeVideoRepository()
+ process_source = FakeProcessSource()
+ process_source.results = {
+ "https://example.com/2": Failure(
+ source_url="https://example.com/2", reason="download failed"
+ )
+ }
+ use_case = IngestFeed(
+ feed_source=FakeFeedSource(),
+ feeds=feeds,
+ videos=videos,
+ process_source=process_source,
+ )
+
+ report = await use_case.execute(
+ "https://example.com/feed.xml", process=True, max_items=2
+ )
+
+ assert report.item_count == 2
+ assert report.processed_count == 1
+
+
+class FakeFeedSourceWithPublishedEntries:
+ async def fetch(self, feed_url: str, max_items: int | None = None):
+ return FetchedFeed(
+ title="Published Feed",
+ site_url="https://example.com",
+ entries=(
+ FetchedFeedEntry(
+ source_url="https://example.com/published",
+ title="Published item",
+ guid="published-guid",
+ published_at=datetime(2024, 1, 2, 3, 4, tzinfo=timezone.utc),
+ ),
+ ),
+ )
+
+
+@pytest.mark.anyio
+async def test_ingest_feed_preserves_publication_timestamps() -> None:
+ feeds = FakeFeedRepository()
+ videos = FakeVideoRepository()
+ process_source = FakeProcessSource()
+ use_case = IngestFeed(
+ feed_source=FakeFeedSourceWithPublishedEntries(),
+ feeds=feeds,
+ videos=videos,
+ process_source=process_source,
+ )
+
+ await use_case.execute("https://example.com/feed.xml", process=False)
+
+ saved_feed = feeds.saved[0][1]
+ saved_entry = videos.saved[0][1]
+
+ assert saved_feed.entries[0].published_at == datetime(
+ 2024, 1, 2, 3, 4, tzinfo=timezone.utc
+ )
+ assert saved_entry.published_at == datetime(2024, 1, 2, 3, 4, tzinfo=timezone.utc)
diff --git a/tests/application/test_process_source.py b/tests/application/test_process_source.py
new file mode 100644
index 0000000..a42e05c
--- /dev/null
+++ b/tests/application/test_process_source.py
@@ -0,0 +1,139 @@
+import pytest
+
+from video_rss_aggregator.application.use_cases.process_source import ProcessSource
+from video_rss_aggregator.domain.models import PreparedMedia, SummaryResult
+from video_rss_aggregator.domain.outcomes import Failure, PartialSuccess, Success
+
+
+class FakeMediaPreparationService:
+ async def prepare(self, source_url: str, title: str | None) -> PreparedMedia:
+ return PreparedMedia(
+ source_url=source_url,
+ title=title or "Prepared title",
+ transcript="transcript",
+ media_path="/tmp/video.mp4",
+ frame_paths=("/tmp/frame-1.jpg",),
+ )
+
+
+class FakeSummarizer:
+ def __init__(self, result: SummaryResult) -> None:
+ self.result = result
+
+ async def summarize(self, prepared_media: PreparedMedia) -> SummaryResult:
+ return self.result
+
+
+class RaisingMediaPreparationService:
+ async def prepare(self, source_url: str, title: str | None) -> PreparedMedia:
+ raise RuntimeError("download failed")
+
+
+class FakeVideoRepository:
+ def __init__(self) -> None:
+ self.saved: list[PreparedMedia] = []
+
+ async def save(self, media: PreparedMedia) -> str:
+ self.saved.append(media)
+ return "video-123"
+
+
+class FakeSummaryRepository:
+ def __init__(self) -> None:
+ self.saved: list[tuple[str, SummaryResult]] = []
+
+ async def save(self, video_id: str, summary: SummaryResult) -> None:
+ self.saved.append((video_id, summary))
+
+
+class RaisingVideoRepository:
+ async def save(self, media: PreparedMedia) -> str:
+ raise RuntimeError("database write failed")
+
+
+@pytest.fixture
+def anyio_backend() -> str:
+ return "asyncio"
+
+
+def build_summary(*, error: str | None = None) -> SummaryResult:
+ return SummaryResult(
+ summary="Summarized output",
+ key_points=("point",),
+ visual_highlights=("highlight",),
+ model_used="qwen",
+ vram_mb=1024.0,
+ transcript_chars=10,
+ frame_count=1,
+ error=error,
+ )
+
+
+@pytest.mark.anyio
+async def test_process_source_returns_success_for_valid_media() -> None:
+ videos = FakeVideoRepository()
+ summaries = FakeSummaryRepository()
+ use_case = ProcessSource(
+ media_service=FakeMediaPreparationService(),
+ summarizer=FakeSummarizer(build_summary()),
+ videos=videos,
+ summaries=summaries,
+ )
+
+ outcome = await use_case.execute("https://example.com/video", None)
+
+ assert isinstance(outcome, Success)
+ assert outcome.summary.summary == "Summarized output"
+ assert len(videos.saved) == 1
+ assert len(summaries.saved) == 1
+ assert summaries.saved[0][0] == "video-123"
+
+
+@pytest.mark.anyio
+async def test_process_source_returns_partial_success_when_summary_has_error() -> None:
+ use_case = ProcessSource(
+ media_service=FakeMediaPreparationService(),
+ summarizer=FakeSummarizer(build_summary(error="model degraded")),
+ videos=FakeVideoRepository(),
+ summaries=FakeSummaryRepository(),
+ )
+
+ outcome = await use_case.execute("https://example.com/video", "Example")
+
+ assert isinstance(outcome, PartialSuccess)
+ assert outcome.reason == "model degraded"
+ assert outcome.summary.error == "model degraded"
+
+
+@pytest.mark.anyio
+async def test_process_source_returns_failure_when_media_preparation_raises() -> None:
+ use_case = ProcessSource(
+ media_service=RaisingMediaPreparationService(),
+ summarizer=FakeSummarizer(build_summary()),
+ videos=FakeVideoRepository(),
+ summaries=FakeSummaryRepository(),
+ )
+
+ outcome = await use_case.execute("https://example.com/video", None)
+
+ assert outcome == Failure(
+ source_url="https://example.com/video",
+ reason="download failed",
+ )
+
+
+@pytest.mark.anyio
+async def test_process_source_returns_failure_when_video_persistence_raises() -> None:
+ use_case = ProcessSource(
+ media_service=FakeMediaPreparationService(),
+ summarizer=FakeSummarizer(build_summary()),
+ videos=RaisingVideoRepository(),
+ summaries=FakeSummaryRepository(),
+ )
+
+ outcome = await use_case.execute("https://example.com/video", "Example")
+
+ assert outcome == Failure(
+ source_url="https://example.com/video",
+ reason="database write failed",
+ )
diff --git a/tests/application/test_render_rss_feed.py b/tests/application/test_render_rss_feed.py
new file mode 100644
index 0000000..c329d6d
--- /dev/null
+++ b/tests/application/test_render_rss_feed.py
@@ -0,0 +1,64 @@
+from datetime import datetime, timezone
+from typing import get_type_hints
+
+import pytest
+
+from video_rss_aggregator.application.ports import PublicationRepository
+from video_rss_aggregator.application.use_cases.render_rss_feed import RenderRssFeed
+from video_rss_aggregator.domain.publication import PublicationRecord
+
+
+class FakePublicationRepository:
+ async def latest_publications(self, limit: int) -> list[PublicationRecord]:
+ assert limit == 20
+ return [
+ PublicationRecord(
+ title="Video",
+ source_url="https://example.com/watch?v=1",
+ published_at=datetime(2024, 1, 1, tzinfo=timezone.utc),
+ summary="A compact summary",
+ key_points=("Point one", "Point two"),
+ visual_highlights=("Blue slide",),
+ model_used="qwen",
+ vram_mb=1234.5,
+ )
+ ]
+
+
+class FakePublicationRenderer:
+ def __init__(self) -> None:
+ self.received: tuple[PublicationRecord, ...] = ()
+
+ async def render(self, publications: tuple[PublicationRecord, ...]) -> str:
+ self.received = publications
+ return f"{publications[0].summary}"
+
+
+@pytest.fixture
+def anyio_backend() -> str:
+ return "asyncio"
+
+
+@pytest.mark.anyio
+async def test_render_rss_feed_uses_publication_records() -> None:
+ renderer = FakePublicationRenderer()
+
+ xml = await RenderRssFeed(
+ publications=FakePublicationRepository(),
+ renderer=renderer,
+ ).execute(limit=20)
+
+ assert "A compact summary" in xml
+ assert renderer.received[0].title == "Video"
+
+
+def test_publication_record_allows_nullable_source_fields_in_annotations() -> None:
+ hints = get_type_hints(PublicationRecord)
+
+ assert hints["title"] == str | None
+ assert hints["published_at"] == datetime | None
+ assert hints["model_used"] == str | None
+
+
+def test_publication_repository_exposes_latest_publications_read_model() -> None:
+ assert hasattr(PublicationRepository, "latest_publications")
diff --git a/tests/application/test_runtime_use_cases.py b/tests/application/test_runtime_use_cases.py
new file mode 100644
index 0000000..2fc68ba
--- /dev/null
+++ b/tests/application/test_runtime_use_cases.py
@@ -0,0 +1,46 @@
+import pytest
+
+from video_rss_aggregator.application.use_cases.runtime import (
+ BootstrapRuntime,
+ GetRuntimeStatus,
+)
+
+
+class FakeRuntimeInspector:
+ async def status(self) -> dict[str, object]:
+ return {"reachable": True, "local_models": {"qwen": {}}, "models": ["qwen"]}
+
+ async def bootstrap(self) -> list[str]:
+ return ["qwen"]
+
+
+@pytest.fixture
+def anyio_backend() -> str:
+ return "asyncio"
+
+
+@pytest.mark.anyio
+async def test_get_runtime_status_returns_app_view_model() -> None:
+ use_case = GetRuntimeStatus(
+ runtime=FakeRuntimeInspector(),
+ storage_path=".data/vra.db",
+ storage_dir=".data",
+ models=("qwen", "qwen:min"),
+ )
+
+ payload = await use_case.execute()
+
+ assert payload["reachable"] is True
+ assert payload["local_models"] == {"qwen": {}}
+ assert payload["database_path"] == ".data/vra.db"
+ assert payload["storage_dir"] == ".data"
+ assert payload["models"] == ["qwen", "qwen:min"]
+
+
+@pytest.mark.anyio
+async def test_bootstrap_runtime_returns_loaded_models() -> None:
+ use_case = BootstrapRuntime(runtime=FakeRuntimeInspector())
+
+ payload = await use_case.execute()
+
+ assert payload == {"models": ["qwen"]}
diff --git a/tests/domain/test_outcomes.py b/tests/domain/test_outcomes.py
new file mode 100644
index 0000000..168ed40
--- /dev/null
+++ b/tests/domain/test_outcomes.py
@@ -0,0 +1,131 @@
+from dataclasses import FrozenInstanceError
+from typing import Any, get_args
+
+import pytest
+
+from video_rss_aggregator.domain import ProcessOutcome
+from video_rss_aggregator.domain.models import PreparedMedia, SummaryResult
+from video_rss_aggregator.domain.outcomes import Failure, PartialSuccess, Success
+
+
+def test_success_outcome_exposes_status_and_payload() -> None:
+ summary = SummaryResult(
+ summary="ok",
+ key_points=cast_list(["a", "b", "c", "d"]),
+ visual_highlights=cast_list(["frame 1"]),
+ model_used="fake-model",
+ vram_mb=512.0,
+ transcript_chars=12,
+ frame_count=1,
+ error=None,
+ )
+ media = PreparedMedia(
+ source_url="https://example.com/video",
+ title="Example",
+ transcript="hello",
+ media_path="/tmp/video.mp4",
+ frame_paths=cast_list(["/tmp/frame-1.jpg"]),
+ )
+
+ outcome = Success(media=media, summary=summary)
+
+ assert outcome.status == "success"
+ assert outcome.summary.model_used == "fake-model"
+
+
+def test_failure_outcome_preserves_reason() -> None:
+ outcome = Failure(source_url="https://example.com/video", reason="ollama offline")
+
+ assert outcome.status == "failure"
+ assert outcome.reason == "ollama offline"
+
+
+def test_domain_models_normalize_collections_to_tuples() -> None:
+ summary = SummaryResult(
+ summary="ok",
+ key_points=cast_list(["a", "b"]),
+ visual_highlights=cast_list(["frame 1"]),
+ model_used="fake-model",
+ vram_mb=512.0,
+ transcript_chars=12,
+ frame_count=1,
+ error=None,
+ )
+ media = PreparedMedia(
+ source_url="https://example.com/video",
+ title="Example",
+ transcript="hello",
+ media_path="/tmp/video.mp4",
+ frame_paths=cast_list(["/tmp/frame-1.jpg", "/tmp/frame-2.jpg"]),
+ )
+
+ assert summary.key_points == ("a", "b")
+ assert summary.visual_highlights == ("frame 1",)
+ assert media.frame_paths == ("/tmp/frame-1.jpg", "/tmp/frame-2.jpg")
+
+
+def test_domain_models_are_frozen() -> None:
+ summary = SummaryResult(
+ summary="ok",
+ key_points=cast_list(["a"]),
+ visual_highlights=cast_list([]),
+ model_used="fake-model",
+ vram_mb=512.0,
+ transcript_chars=12,
+ frame_count=1,
+ error=None,
+ )
+
+ with pytest.raises(FrozenInstanceError):
+ cast_any(summary).summary = "changed"
+
+
+def test_partial_success_requires_summary_and_preserves_reason() -> None:
+ media = PreparedMedia(
+ source_url="https://example.com/video",
+ title="Example",
+ transcript="hello",
+ media_path="/tmp/video.mp4",
+ frame_paths=cast_list([]),
+ )
+ summary = SummaryResult(
+ summary="degraded",
+ key_points=cast_list(["a"]),
+ visual_highlights=cast_list([]),
+ model_used="fake-model",
+ vram_mb=512.0,
+ transcript_chars=12,
+ frame_count=0,
+ error="missing frames",
+ )
+
+ outcome = PartialSuccess(media=media, summary=summary, reason="missing frames")
+
+ assert outcome.status == "partial_success"
+ assert outcome.summary is summary
+ assert outcome.reason == "missing frames"
+
+
+def test_partial_success_rejects_missing_summary() -> None:
+ media = PreparedMedia(
+ source_url="https://example.com/video",
+ title="Example",
+ transcript="hello",
+ media_path="/tmp/video.mp4",
+ frame_paths=cast_list([]),
+ )
+
+ with pytest.raises(ValueError, match="summary is required"):
+ PartialSuccess(media=media, summary=cast_any(None), reason="missing frames")
+
+
+def test_process_outcome_alias_is_exported_for_all_outcome_types() -> None:
+ assert get_args(ProcessOutcome) == (Success, PartialSuccess, Failure)
+
+
+def cast_any(value: object) -> Any:
+ return value
+
+
+def cast_list(values: list[str]) -> Any:
+ return values
diff --git a/tests/infrastructure/test_feed_source.py b/tests/infrastructure/test_feed_source.py
new file mode 100644
index 0000000..f890b5a
--- /dev/null
+++ b/tests/infrastructure/test_feed_source.py
@@ -0,0 +1,147 @@
+from datetime import datetime, timezone
+from types import SimpleNamespace
+
+import pytest
+
+from video_rss_aggregator.application.ports import FetchedFeed, FetchedFeedEntry
+from video_rss_aggregator.infrastructure import feed_source as feed_source_module
+from video_rss_aggregator.infrastructure.feed_source import HttpFeedSource
+
+
+class FakeResponse:
+ def __init__(self, text: str) -> None:
+ self.text = text
+
+ def raise_for_status(self) -> None:
+ return None
+
+
+class FakeAsyncClient:
+ def __init__(self, response: FakeResponse) -> None:
+ self.response = response
+ self.calls: list[str] = []
+
+ async def get(self, url: str) -> FakeResponse:
+ self.calls.append(url)
+ return self.response
+
+
+@pytest.fixture
+def anyio_backend() -> str:
+ return "asyncio"
+
+
+@pytest.mark.anyio
+async def test_http_feed_source_fetches_and_maps_entries() -> None:
+ client = FakeAsyncClient(
+ FakeResponse(
+ """
+
+
+ Example feed
+ https://example.com
+ -
+ First
+ first-guid
+ Tue, 02 Jan 2024 03:04:05 GMT
+
+
+ -
+ Second
+ second-guid
+ https://example.com/watch?v=2
+
+
+
+ """
+ )
+ )
+ adapter = HttpFeedSource(client)
+
+ feed = await adapter.fetch("https://example.com/feed.xml", max_items=1)
+
+ assert client.calls == ["https://example.com/feed.xml"]
+ assert feed == FetchedFeed(
+ title="Example feed",
+ site_url="https://example.com",
+ entries=(
+ FetchedFeedEntry(
+ source_url="https://cdn.example.com/video.mp4",
+ title="First",
+ guid="first-guid",
+ published_at=datetime(2024, 1, 2, 3, 4, 5, tzinfo=timezone.utc),
+ ),
+ ),
+ )
+
+
+@pytest.mark.anyio
+async def test_http_feed_source_falls_back_to_updated_timestamp() -> None:
+ client = FakeAsyncClient(
+ FakeResponse(
+ """
+
+ Example atom feed
+
+
+ First
+ first-guid
+ 2024-01-02T03:04:05Z
+
+
+
+ """
+ )
+ )
+ adapter = HttpFeedSource(client)
+
+ feed = await adapter.fetch("https://example.com/feed.xml")
+
+ assert feed == FetchedFeed(
+ title="Example atom feed",
+ site_url="https://example.com",
+ entries=(
+ FetchedFeedEntry(
+ source_url="https://example.com/watch?v=1",
+ title="First",
+ guid="first-guid",
+ published_at=datetime(2024, 1, 2, 3, 4, 5, tzinfo=timezone.utc),
+ ),
+ ),
+ )
+
+
+@pytest.mark.anyio
+async def test_http_feed_source_ignores_malformed_parsed_dates(monkeypatch) -> None:
+ client = FakeAsyncClient(FakeResponse("ignored"))
+ adapter = HttpFeedSource(client)
+
+ def fake_parse(_text: str) -> SimpleNamespace:
+ return SimpleNamespace(
+ feed={"title": "Example feed", "link": "https://example.com"},
+ entries=[
+ {
+ "title": "First",
+ "id": "first-guid",
+ "link": "https://example.com/watch?v=1",
+ "published_parsed": (2024, 1),
+ }
+ ],
+ )
+
+ monkeypatch.setattr(feed_source_module.feedparser, "parse", fake_parse)
+
+ feed = await adapter.fetch("https://example.com/feed.xml")
+
+ assert feed == FetchedFeed(
+ title="Example feed",
+ site_url="https://example.com",
+ entries=(
+ FetchedFeedEntry(
+ source_url="https://example.com/watch?v=1",
+ title="First",
+ guid="first-guid",
+ published_at=None,
+ ),
+ ),
+ )
diff --git a/tests/infrastructure/test_legacy_adapter_shims.py b/tests/infrastructure/test_legacy_adapter_shims.py
new file mode 100644
index 0000000..5bf3107
--- /dev/null
+++ b/tests/infrastructure/test_legacy_adapter_shims.py
@@ -0,0 +1,115 @@
+from datetime import datetime, timezone
+
+import pytest
+
+from adapter_rss import render_feed
+from adapter_storage import Database, SummaryRecord
+from service_summarize import SummaryResult as LegacySummaryResult
+from video_rss_aggregator.domain.models import SummaryResult as DomainSummaryResult
+from video_rss_aggregator.storage import Database as PackageDatabase
+
+
+@pytest.fixture
+def anyio_backend() -> str:
+ return "asyncio"
+
+
+def build_legacy_summary(*, error: str | None = None) -> LegacySummaryResult:
+ return LegacySummaryResult(
+ summary="Legacy summary",
+ key_points=["Point one", "Point two"],
+ visual_highlights=["Blue slide"],
+ model_used="qwen",
+ vram_mb=256.0,
+ transcript_chars=42,
+ frame_count=2,
+ error=error,
+ )
+
+
+@pytest.mark.anyio
+async def test_legacy_database_insert_summary_adapts_legacy_summary_result(
+ monkeypatch,
+) -> None:
+ captured: dict[str, object] = {}
+
+ async def fake_insert_summary(
+ self, video_id: str, result: DomainSummaryResult
+ ) -> str:
+ captured["video_id"] = video_id
+ captured["result"] = result
+ return "summary-id"
+
+ monkeypatch.setattr(PackageDatabase, "insert_summary", fake_insert_summary)
+ db = await Database.connect(":memory:")
+
+ summary_id = await db.insert_summary("video-123", build_legacy_summary())
+
+ await db.close()
+
+ assert summary_id == "summary-id"
+ assert captured["video_id"] == "video-123"
+ assert captured["result"] == DomainSummaryResult(
+ summary="Legacy summary",
+ key_points=("Point one", "Point two"),
+ visual_highlights=("Blue slide",),
+ model_used="qwen",
+ vram_mb=256.0,
+ transcript_chars=42,
+ frame_count=2,
+ error=None,
+ )
+
+
+@pytest.mark.anyio
+async def test_legacy_database_persists_legacy_summary_through_shim(tmp_path) -> None:
+ db = await Database.connect(str(tmp_path / "rss.db"))
+ await db.migrate()
+ video_id = await db.upsert_video(
+ feed_id=None,
+ guid=None,
+ title="Legacy title",
+ source_url="https://example.com/watch?v=legacy",
+ published_at=datetime(2024, 1, 1, tzinfo=timezone.utc),
+ )
+
+ await db.insert_summary(video_id, build_legacy_summary())
+ publications = await db.latest_summaries(limit=1)
+
+ await db.close()
+
+ assert publications == [
+ SummaryRecord(
+ title="Legacy title",
+ source_url="https://example.com/watch?v=legacy",
+ published_at=datetime(2024, 1, 1, tzinfo=timezone.utc),
+ summary="Legacy summary",
+ key_points=("Point one", "Point two"),
+ visual_highlights=("Blue slide",),
+ model_used="qwen",
+ vram_mb=256.0,
+ )
+ ]
+
+
+def test_legacy_rss_render_feed_accepts_summary_record_shim() -> None:
+ xml = render_feed(
+ title="Feed title",
+ link="https://example.com",
+ description="Feed description",
+ publications=[
+ SummaryRecord(
+ title="Legacy title",
+ source_url="https://example.com/watch?v=legacy",
+ published_at=datetime(2024, 1, 1, tzinfo=timezone.utc),
+ summary="Legacy summary",
+ key_points=("Point one",),
+ visual_highlights=("Blue slide",),
+ model_used="qwen",
+ vram_mb=256.0,
+ )
+ ],
+ )
+
+ assert "Legacy title" in xml
+ assert "Legacy summary" in xml
diff --git a/tests/infrastructure/test_media_service.py b/tests/infrastructure/test_media_service.py
new file mode 100644
index 0000000..b3078eb
--- /dev/null
+++ b/tests/infrastructure/test_media_service.py
@@ -0,0 +1,92 @@
+from pathlib import Path
+
+import pytest
+
+import service_media
+from service_media import PreparedMedia as LegacyPreparedMedia
+from video_rss_aggregator.infrastructure.media_service import (
+ LegacyMediaPreparationService,
+)
+
+
+@pytest.fixture
+def anyio_backend() -> str:
+ return "asyncio"
+
+
+@pytest.mark.anyio
+async def test_media_preparation_service_maps_legacy_prepared_media(
+ monkeypatch,
+) -> None:
+ captured: dict[str, object] = {}
+
+ async def fake_prepare_media(**kwargs) -> LegacyPreparedMedia:
+ captured.update(kwargs)
+ return LegacyPreparedMedia(
+ media_path=Path("/tmp/downloaded.mp4"),
+ title="Legacy title",
+ transcript="captured transcript",
+ frame_paths=[Path("/tmp/frame-1.jpg"), Path("/tmp/frame-2.jpg")],
+ )
+
+ monkeypatch.setattr(service_media, "prepare_media", fake_prepare_media)
+
+ adapter = LegacyMediaPreparationService(
+ client=object(),
+ storage_dir=".data",
+ max_frames=4,
+ scene_detection=True,
+ scene_threshold=0.42,
+ scene_min_frames=3,
+ max_transcript_chars=5000,
+ )
+
+ prepared = await adapter.prepare("https://example.com/watch?v=1", "Feed title")
+
+ assert captured == {
+ "client": adapter.client,
+ "source": "https://example.com/watch?v=1",
+ "storage_dir": ".data",
+ "max_frames": 4,
+ "scene_detection": True,
+ "scene_threshold": 0.42,
+ "scene_min_frames": 3,
+ "max_transcript_chars": 5000,
+ }
+ assert prepared.source_url == "https://example.com/watch?v=1"
+ assert prepared.title == "Feed title"
+ assert prepared.transcript == "captured transcript"
+ assert Path(prepared.media_path) == Path("/tmp/downloaded.mp4")
+ assert tuple(Path(path) for path in prepared.frame_paths) == (
+ Path("/tmp/frame-1.jpg"),
+ Path("/tmp/frame-2.jpg"),
+ )
+
+
+@pytest.mark.anyio
+async def test_media_preparation_service_falls_back_to_legacy_title(
+ monkeypatch,
+) -> None:
+ async def fake_prepare_media(**kwargs) -> LegacyPreparedMedia:
+ return LegacyPreparedMedia(
+ media_path=Path("/tmp/downloaded.mp4"),
+ title="Legacy title",
+ transcript="",
+ frame_paths=[],
+ )
+
+ monkeypatch.setattr(service_media, "prepare_media", fake_prepare_media)
+
+ adapter = LegacyMediaPreparationService(
+ client=object(),
+ storage_dir=".data",
+ max_frames=1,
+ scene_detection=False,
+ scene_threshold=0.28,
+ scene_min_frames=1,
+ max_transcript_chars=100,
+ )
+
+ prepared = await adapter.prepare("https://example.com/watch?v=2", None)
+
+ assert prepared.title == "Legacy title"
diff --git a/tests/infrastructure/test_publication_renderer.py b/tests/infrastructure/test_publication_renderer.py
new file mode 100644
index 0000000..e02dfe2
--- /dev/null
+++ b/tests/infrastructure/test_publication_renderer.py
@@ -0,0 +1,64 @@
+from datetime import datetime, timezone
+
+import pytest
+
+from video_rss_aggregator.domain.publication import PublicationRecord
+from video_rss_aggregator.infrastructure.publication_renderer import (
+ RssPublicationRenderer,
+)
+from video_rss_aggregator.rss import render_feed
+
+
+@pytest.fixture
+def anyio_backend() -> str:
+ return "asyncio"
+
+
+def build_publication() -> PublicationRecord:
+ return PublicationRecord(
+ title="Video title",
+ source_url="https://example.com/watch?v=1",
+ published_at=datetime(2024, 1, 1, tzinfo=timezone.utc),
+ summary="A compact summary",
+ key_points=("Point one", "Point two"),
+ visual_highlights=("Blue slide",),
+ model_used="qwen",
+ vram_mb=512.0,
+ )
+
+
+@pytest.mark.anyio
+async def test_rss_publication_renderer_delegates_to_package_render_feed() -> None:
+ publications = (build_publication(),)
+ renderer = RssPublicationRenderer(
+ title="Feed title",
+ link="https://example.com",
+ description="Feed description",
+ )
+
+ xml = await renderer.render(publications)
+
+ assert xml == render_feed(
+ title="Feed title",
+ link="https://example.com",
+ description="Feed description",
+ publications=publications,
+ )
+
+
+@pytest.mark.anyio
+async def test_rss_publication_renderer_renders_publication_record_fields() -> None:
+ renderer = RssPublicationRenderer(
+ title="Feed title",
+ link="https://example.com",
+ description="Feed description",
+ )
+
+ xml = await renderer.render((build_publication(),))
+
+ assert "Video title" in xml
+ assert "https://example.com/watch?v=1" in xml
+ assert "A compact summary" in xml
+ assert "Point one" in xml
+ assert "Blue slide" in xml
+ assert "Model: qwen (VRAM 512.00 MB)" in xml
diff --git a/tests/infrastructure/test_runtime_adapters.py b/tests/infrastructure/test_runtime_adapters.py
new file mode 100644
index 0000000..8d062d6
--- /dev/null
+++ b/tests/infrastructure/test_runtime_adapters.py
@@ -0,0 +1,34 @@
+import pytest
+
+from video_rss_aggregator.infrastructure.runtime_adapters import LegacyRuntimeInspector
+
+
+class FakeSummarizationEngine:
+ def __init__(self) -> None:
+ self.calls: list[str] = []
+
+ async def runtime_status(self) -> dict[str, object]:
+ self.calls.append("runtime_status")
+ return {"reachable": True, "local_models": ["qwen"]}
+
+ async def prepare_models(self) -> list[str]:
+ self.calls.append("prepare_models")
+ return ["qwen", "min"]
+
+
+@pytest.fixture
+def anyio_backend() -> str:
+ return "asyncio"
+
+
+@pytest.mark.anyio
+async def test_runtime_inspector_delegates_to_legacy_engine() -> None:
+ engine = FakeSummarizationEngine()
+ adapter = LegacyRuntimeInspector(engine)
+
+ status = await adapter.status()
+ prepared = await adapter.bootstrap()
+
+ assert status == {"reachable": True, "local_models": ["qwen"]}
+ assert prepared == ["qwen", "min"]
+ assert engine.calls == ["runtime_status", "prepare_models"]
diff --git a/tests/infrastructure/test_sqlite_repositories.py b/tests/infrastructure/test_sqlite_repositories.py
new file mode 100644
index 0000000..1b09275
--- /dev/null
+++ b/tests/infrastructure/test_sqlite_repositories.py
@@ -0,0 +1,167 @@
+from datetime import datetime, timezone
+
+import pytest
+
+from video_rss_aggregator.domain.models import SummaryResult
+from video_rss_aggregator.domain.models import PreparedMedia
+from video_rss_aggregator.domain.publication import PublicationRecord
+from video_rss_aggregator.infrastructure.sqlite_repositories import (
+ SQLiteFeedRepository,
+ SQLiteFeedVideoRepository,
+ SQLitePublicationRepository,
+ SQLiteSummaryRepository,
+ SQLiteVideoRepository,
+)
+from video_rss_aggregator.application.ports import FetchedFeed, FetchedFeedEntry
+from video_rss_aggregator.storage import Database
+
+
+@pytest.fixture
+def anyio_backend() -> str:
+ return "asyncio"
+
+
+def build_summary(*, error: str | None = None) -> SummaryResult:
+ return SummaryResult(
+ summary="A compact summary",
+ key_points=("Point one", "Point two"),
+ visual_highlights=("Blue slide",),
+ model_used="qwen",
+ vram_mb=512.0,
+ transcript_chars=120,
+ frame_count=3,
+ error=error,
+ )
+
+
+@pytest.mark.anyio
+async def test_sqlite_summary_repository_saves_domain_summary_result(tmp_path) -> None:
+ db = await Database.connect(str(tmp_path / "rss.db"))
+ await db.migrate()
+ video_id = await db.upsert_video(
+ feed_id=None,
+ guid=None,
+ title="Video title",
+ source_url="https://example.com/watch?v=1",
+ published_at=datetime(2024, 1, 1, tzinfo=timezone.utc),
+ )
+
+ repository = SQLiteSummaryRepository(db)
+
+ await repository.save(video_id, build_summary())
+
+ publications = await SQLitePublicationRepository(db).latest_publications(limit=5)
+
+ await db.close()
+
+ assert publications == [
+ PublicationRecord(
+ title="Video title",
+ source_url="https://example.com/watch?v=1",
+ published_at=datetime(2024, 1, 1, tzinfo=timezone.utc),
+ summary="A compact summary",
+ key_points=("Point one", "Point two"),
+ visual_highlights=("Blue slide",),
+ model_used="qwen",
+ vram_mb=512.0,
+ )
+ ]
+
+
+@pytest.mark.anyio
+async def test_sqlite_publication_repository_returns_newest_publications_first(
+ tmp_path,
+) -> None:
+ db = await Database.connect(str(tmp_path / "rss.db"))
+ await db.migrate()
+
+ older_video_id = await db.upsert_video(
+ feed_id=None,
+ guid=None,
+ title="Older video",
+ source_url="https://example.com/watch?v=older",
+ published_at=None,
+ )
+ await SQLiteSummaryRepository(db).save(
+ older_video_id, build_summary(error="degraded")
+ )
+
+ newer_video_id = await db.upsert_video(
+ feed_id=None,
+ guid=None,
+ title="Newer video",
+ source_url="https://example.com/watch?v=newer",
+ published_at=None,
+ )
+ await SQLiteSummaryRepository(db).save(newer_video_id, build_summary())
+
+ publications = await SQLitePublicationRepository(db).latest_publications(limit=1)
+
+ await db.close()
+
+ assert [publication.title for publication in publications] == ["Newer video"]
+ assert publications[0].summary == "A compact summary"
+
+
+@pytest.mark.anyio
+async def test_sqlite_video_repository_saves_media_and_transcript(tmp_path) -> None:
+ db = await Database.connect(str(tmp_path / "rss.db"))
+ await db.migrate()
+
+ repository = SQLiteVideoRepository(db)
+
+ video_id = await repository.save(
+ PreparedMedia(
+ source_url="https://example.com/watch?v=prepared",
+ title="Prepared title",
+ transcript="transcript text",
+ media_path="/tmp/video.mp4",
+ frame_paths=("/tmp/frame-1.jpg",),
+ )
+ )
+
+ publications = await db.latest_publications(limit=5)
+
+ await db.close()
+
+ assert isinstance(video_id, str)
+ assert publications == []
+
+
+@pytest.mark.anyio
+async def test_sqlite_feed_adapters_persist_feed_and_video_metadata(tmp_path) -> None:
+ db = await Database.connect(str(tmp_path / "rss.db"))
+ await db.migrate()
+
+ await SQLiteFeedRepository(db).save(
+ "https://example.com/feed.xml",
+ FetchedFeed(title="Feed title", site_url=None, entries=()),
+ )
+ await SQLiteFeedVideoRepository(db).save_feed_item(
+ "https://example.com/feed.xml",
+ FetchedFeedEntry(
+ source_url="https://example.com/watch?v=from-feed",
+ title="Feed item",
+ guid="guid-1",
+ published_at=datetime(2024, 1, 2, 3, 4, tzinfo=timezone.utc),
+ ),
+ )
+
+ async with db._conn.execute(
+ "SELECT title FROM feeds WHERE url = ?", ("https://example.com/feed.xml",)
+ ) as cur:
+ feed_row = await cur.fetchone()
+ async with db._conn.execute(
+ "SELECT title, guid, published_at FROM videos WHERE source_url = ?",
+ ("https://example.com/watch?v=from-feed",),
+ ) as cur:
+ video_row = await cur.fetchone()
+
+ await db.close()
+
+ assert dict(feed_row) == {"title": "Feed title"}
+ assert dict(video_row) == {
+ "title": "Feed item",
+ "guid": "guid-1",
+ "published_at": "2024-01-02T03:04:00+00:00",
+ }
diff --git a/tests/infrastructure/test_summarizer.py b/tests/infrastructure/test_summarizer.py
new file mode 100644
index 0000000..e2946ea
--- /dev/null
+++ b/tests/infrastructure/test_summarizer.py
@@ -0,0 +1,65 @@
+from pathlib import Path
+
+import pytest
+
+from service_summarize import SummaryResult as LegacySummaryResult
+from video_rss_aggregator.domain.models import PreparedMedia
+from video_rss_aggregator.infrastructure.summarizer import LegacySummarizer
+
+
+class FakeSummarizationEngine:
+ def __init__(self, result: LegacySummaryResult) -> None:
+ self.result = result
+ self.calls: list[dict[str, object]] = []
+
+ async def summarize(self, **kwargs) -> LegacySummaryResult:
+ self.calls.append(kwargs)
+ return self.result
+
+
+@pytest.fixture
+def anyio_backend() -> str:
+ return "asyncio"
+
+
+@pytest.mark.anyio
+async def test_legacy_summarizer_maps_domain_media_to_legacy_engine() -> None:
+ engine = FakeSummarizationEngine(
+ LegacySummaryResult(
+ summary="Concise summary",
+ key_points=["One", "Two"],
+ visual_highlights=["Frame one"],
+ model_used="qwen",
+ vram_mb=512.0,
+ transcript_chars=123,
+ frame_count=2,
+ error=None,
+ )
+ )
+ adapter = LegacySummarizer(engine)
+ prepared = PreparedMedia(
+ source_url="https://example.com/watch?v=1",
+ title="Example title",
+ transcript="Transcript body",
+ media_path="/tmp/video.mp4",
+ frame_paths=("/tmp/frame-1.jpg", "/tmp/frame-2.jpg"),
+ )
+
+ result = await adapter.summarize(prepared)
+
+ assert engine.calls == [
+ {
+ "source_url": "https://example.com/watch?v=1",
+ "title": "Example title",
+ "transcript": "Transcript body",
+ "frame_paths": [Path("/tmp/frame-1.jpg"), Path("/tmp/frame-2.jpg")],
+ }
+ ]
+ assert result.summary == "Concise summary"
+ assert result.key_points == ("One", "Two")
+ assert result.visual_highlights == ("Frame one",)
+ assert result.model_used == "qwen"
+ assert result.vram_mb == 512.0
+ assert result.transcript_chars == 123
+ assert result.frame_count == 2
+ assert result.error is None
diff --git a/tests/test_api_setup.py b/tests/test_api_setup.py
index cc1d725..c0c309a 100644
--- a/tests/test_api_setup.py
+++ b/tests/test_api_setup.py
@@ -3,60 +3,47 @@
from dataclasses import dataclass
from typing import Any, cast
+import pytest
from fastapi.testclient import TestClient
from adapter_api import create_app
from core_config import Config
-from service_summarize import SummaryResult
+from video_rss_aggregator.bootstrap import AppRuntime, AppUseCases
@dataclass
-class _ProcessReport:
- source_url: str
- title: str | None
- transcript_chars: int
- frame_count: int
- summary: SummaryResult
-
-
-class _DummyPipeline:
- async def ingest_feed(self, *_args, **_kwargs):
- class _Report:
- feed_title = "dummy"
- item_count = 1
- processed_count = 0
-
- return _Report()
-
- async def process_source(self, source_url: str, title: str | None = None):
- return _ProcessReport(
- source_url=source_url,
- title=title,
- transcript_chars=42,
- frame_count=3,
- summary=SummaryResult(
- summary="ok",
- key_points=["a", "b", "c", "d"],
- visual_highlights=[],
- model_used="qwen3.5:2b-q4_K_M",
- vram_mb=2048.0,
- transcript_chars=42,
- frame_count=3,
- ),
- )
-
- async def rss_feed(self, _limit: int = 20) -> str:
- return ""
-
- async def runtime_status(self):
- return {"ok": True}
-
- async def bootstrap_models(self):
- return {"models_prepared": ["qwen3.5:2b-q4_K_M"]}
+class _AsyncValue:
+ value: Any
+
+ async def execute(self, *_args, **_kwargs) -> Any:
+ return self.value
+
+
+def _build_runtime(config: Config) -> AppRuntime:
+ runtime_status = {
+ "ollama_version": "0.6.0",
+ "local_models": {"qwen3.5:2b-q4_K_M": {}},
+ "reachable": True,
+ "database_path": config.database_path,
+ "storage_dir": config.storage_dir,
+ "models": list(config.model_priority),
+ }
+ return AppRuntime(
+ config=config,
+ use_cases=AppUseCases(
+ get_runtime_status=_AsyncValue(runtime_status),
+ bootstrap_runtime=_AsyncValue({"models": ["qwen3.5:2b-q4_K_M"]}),
+ ingest_feed=cast(Any, _AsyncValue(None)),
+ process_source=cast(Any, _AsyncValue(None)),
+ render_rss_feed=_AsyncValue(""),
+ ),
+ close=lambda: None,
+ )
def test_gui_and_setup_routes() -> None:
- app = create_app(cast(Any, _DummyPipeline()), Config())
+ config = Config()
+ app = create_app(_build_runtime(config))
client = TestClient(app)
home = client.get("/")
@@ -83,7 +70,7 @@ def test_gui_and_setup_routes() -> None:
def test_runtime_requires_api_key_when_enabled() -> None:
- app = create_app(cast(Any, _DummyPipeline()), Config(api_key="secret"))
+ app = create_app(_build_runtime(Config(api_key="secret")))
client = TestClient(app)
unauthorized = client.get("/runtime")
@@ -91,4 +78,20 @@ def test_runtime_requires_api_key_when_enabled() -> None:
authorized = client.get("/runtime", headers={"X-API-Key": "secret"})
assert authorized.status_code == 200
- assert authorized.json() == {"ok": True}
+ assert authorized.json() == {
+ "ollama_version": "0.6.0",
+ "local_models": {"qwen3.5:2b-q4_K_M": {}},
+ "reachable": True,
+ "database_path": ".data/vra.db",
+ "storage_dir": ".data",
+ "models": [
+ "qwen3.5:4b-q4_K_M",
+ "qwen3.5:2b-q4_K_M",
+ "qwen3.5:0.8b-q8_0",
+ ],
+ }
+
+
+def test_create_app_rejects_non_runtime_objects() -> None:
+ with pytest.raises(TypeError, match="AppRuntime"):
+ create_app(cast(Any, object()), Config())
diff --git a/tests/test_project_layout.py b/tests/test_project_layout.py
new file mode 100644
index 0000000..6e100c1
--- /dev/null
+++ b/tests/test_project_layout.py
@@ -0,0 +1,7 @@
+from pathlib import Path
+
+
+def test_legacy_service_pipeline_module_has_been_removed() -> None:
+ repo_root = Path(__file__).resolve().parents[1]
+
+ assert not (repo_root / "service_pipeline.py").exists()
diff --git a/video_rss_aggregator/__init__.py b/video_rss_aggregator/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/video_rss_aggregator/api.py b/video_rss_aggregator/api.py
new file mode 100644
index 0000000..debf8d6
--- /dev/null
+++ b/video_rss_aggregator/api.py
@@ -0,0 +1,233 @@
+from __future__ import annotations
+
+import platform
+import shutil
+import sys
+from contextlib import asynccontextmanager
+from datetime import datetime, timezone
+from importlib.util import find_spec
+from typing import AsyncIterator
+
+from fastapi import Depends, FastAPI, Header, HTTPException, Query, Request
+from fastapi.responses import HTMLResponse, Response
+from pydantic import BaseModel
+
+from adapter_gui import render_setup_page
+from core_config import Config
+from service_media import runtime_dependency_report
+from video_rss_aggregator.bootstrap import AppRuntime, build_runtime
+from video_rss_aggregator.domain.outcomes import Failure
+
+
+class IngestRequest(BaseModel):
+ feed_url: str
+ process: bool = False
+ max_items: int | None = None
+
+
+class ProcessRequest(BaseModel):
+ source_url: str
+ title: str | None = None
+
+
+def create_app(
+ runtime: AppRuntime | None = None,
+ config: Config | None = None,
+) -> FastAPI:
+ resolved_config = (
+ runtime.config if runtime is not None else config or Config.from_env()
+ )
+
+ @asynccontextmanager
+ async def lifespan(app: FastAPI) -> AsyncIterator[None]:
+ app.state.runtime = (
+ runtime if runtime is not None else await build_runtime(resolved_config)
+ )
+ try:
+ yield
+ finally:
+ close_result = app.state.runtime.close()
+ if close_result is not None:
+ await close_result
+
+ app = FastAPI(title="Video RSS Aggregator", version="0.1.0", lifespan=lifespan)
+ if runtime is not None:
+ app.state.runtime = runtime
+
+ def _runtime(request: Request) -> AppRuntime:
+ return request.app.state.runtime
+
+ def _check_auth(
+ authorization: str | None = Header(None), x_api_key: str | None = Header(None)
+ ) -> None:
+ if resolved_config.api_key is None:
+ return
+ token = None
+ if authorization:
+ parts = authorization.split()
+ if len(parts) == 2 and parts[0].lower() == "bearer":
+ token = parts[1]
+ if token is None:
+ token = x_api_key
+ if token != resolved_config.api_key:
+ raise HTTPException(status_code=401, detail="unauthorized")
+
+ @app.get("/health")
+ async def health() -> dict[str, str]:
+ return {"status": "ok", "timestamp": datetime.now(timezone.utc).isoformat()}
+
+ @app.get("/", response_class=HTMLResponse)
+ async def setup_home() -> str:
+ return render_setup_page(resolved_config)
+
+ @app.get("/setup/config")
+ async def setup_config() -> dict[str, object]:
+ return {
+ "bind_address": f"{resolved_config.bind_host}:{resolved_config.bind_port}",
+ "storage_dir": resolved_config.storage_dir,
+ "database_path": resolved_config.database_path,
+ "ollama_base_url": resolved_config.ollama_base_url,
+ "model_priority": list(resolved_config.model_priority),
+ "vram_budget_mb": resolved_config.vram_budget_mb,
+ "model_selection_reserve_mb": resolved_config.model_selection_reserve_mb,
+ "max_frames": resolved_config.max_frames,
+ "frame_scene_detection": resolved_config.frame_scene_detection,
+ "frame_scene_threshold": resolved_config.frame_scene_threshold,
+ "frame_scene_min_frames": resolved_config.frame_scene_min_frames,
+ "api_key_required": resolved_config.api_key is not None,
+ "quick_commands": {
+ "bootstrap": "python -m vra bootstrap",
+ "status": "python -m vra status",
+ "serve": "python -m vra serve --bind 127.0.0.1:8080",
+ },
+ }
+
+ @app.get("/setup/diagnostics")
+ async def setup_diagnostics(request: Request) -> dict[str, object]:
+ media_tools = runtime_dependency_report()
+ yt_dlp_cmd = shutil.which("yt-dlp")
+ ytdlp = {
+ "command": yt_dlp_cmd,
+ "module_available": find_spec("yt_dlp") is not None,
+ }
+ ytdlp["available"] = bool(ytdlp["command"] or ytdlp["module_available"])
+
+ ollama: dict[str, object] = {
+ "base_url": resolved_config.ollama_base_url,
+ "reachable": False,
+ "version": None,
+ "models_found": 0,
+ "error": None,
+ }
+ try:
+ runtime_status = await _runtime(
+ request
+ ).use_cases.get_runtime_status.execute()
+ runtime_details = runtime_status
+ if isinstance(runtime_details, dict):
+ ollama["reachable"] = True
+ ollama["version"] = runtime_details.get("ollama_version")
+ local_models = runtime_details.get("local_models", {})
+ ollama["models_found"] = (
+ len(local_models) if isinstance(local_models, dict) else 0
+ )
+ except Exception as exc:
+ ollama["error"] = str(exc)
+
+ ffmpeg_ok = bool(media_tools["ffmpeg"].get("available"))
+ ffprobe_ok = bool(media_tools["ffprobe"].get("available"))
+ ytdlp_ok = bool(ytdlp["available"])
+ ollama_ok = bool(ollama["reachable"])
+
+ return {
+ "platform": {
+ "system": platform.system(),
+ "release": platform.release(),
+ "python_version": sys.version.split()[0],
+ "python_executable": sys.executable,
+ },
+ "dependencies": {
+ "ffmpeg": media_tools["ffmpeg"],
+ "ffprobe": media_tools["ffprobe"],
+ "yt_dlp": ytdlp,
+ "ollama": ollama,
+ },
+ "ready": ffmpeg_ok and ffprobe_ok and ytdlp_ok and ollama_ok,
+ }
+
+ @app.post("/setup/bootstrap")
+ async def setup_bootstrap(
+ request: Request, _=Depends(_check_auth)
+ ) -> dict[str, object]:
+ report = await _runtime(request).use_cases.bootstrap_runtime.execute()
+ models_prepared = report.get("models_prepared")
+ if models_prepared is None:
+ models_prepared = report["models"]
+
+ runtime_payload = report.get("runtime")
+ if isinstance(runtime_payload, dict):
+ runtime_response = runtime_payload
+ else:
+ runtime_response = await _runtime(
+ request
+ ).use_cases.get_runtime_status.execute()
+
+ return {
+ "models_prepared": models_prepared,
+ "runtime": runtime_response,
+ }
+
+ @app.post("/ingest")
+ async def ingest(
+ req: IngestRequest, request: Request, _=Depends(_check_auth)
+ ) -> dict[str, object]:
+ report = await _runtime(request).use_cases.ingest_feed.execute(
+ req.feed_url,
+ req.process,
+ req.max_items,
+ )
+ return {
+ "feed_title": report.feed_title,
+ "item_count": report.item_count,
+ "processed_count": report.processed_count,
+ }
+
+ @app.post("/process")
+ async def process(
+ req: ProcessRequest, request: Request, _=Depends(_check_auth)
+ ) -> dict[str, object]:
+ outcome = await _runtime(request).use_cases.process_source.execute(
+ req.source_url,
+ req.title,
+ )
+ if isinstance(outcome, Failure):
+ raise HTTPException(status_code=502, detail=outcome.reason)
+ return {
+ "source_url": outcome.media.source_url,
+ "title": outcome.media.title,
+ "transcript_chars": outcome.summary.transcript_chars,
+ "frame_count": outcome.summary.frame_count,
+ "summary": {
+ "summary": outcome.summary.summary,
+ "key_points": list(outcome.summary.key_points),
+ "visual_highlights": list(outcome.summary.visual_highlights),
+ "model_used": outcome.summary.model_used,
+ "vram_mb": outcome.summary.vram_mb,
+ "error": outcome.summary.error,
+ },
+ }
+
+ @app.get("/rss")
+ async def rss_feed(
+ request: Request, limit: int = Query(20, ge=1, le=200)
+ ) -> Response:
+ xml = await _runtime(request).use_cases.render_rss_feed.execute(limit)
+ return Response(content=xml, media_type="application/rss+xml")
+
+ @app.get("/runtime")
+ async def runtime_status(
+ request: Request, _=Depends(_check_auth)
+ ) -> dict[str, object]:
+ return await _runtime(request).use_cases.get_runtime_status.execute()
+
+ return app
diff --git a/video_rss_aggregator/application/__init__.py b/video_rss_aggregator/application/__init__.py
new file mode 100644
index 0000000..da47c2f
--- /dev/null
+++ b/video_rss_aggregator/application/__init__.py
@@ -0,0 +1,3 @@
+from video_rss_aggregator.application.ports import RuntimeInspector
+
+__all__ = ["RuntimeInspector"]
diff --git a/video_rss_aggregator/application/ports.py b/video_rss_aggregator/application/ports.py
new file mode 100644
index 0000000..ae6622e
--- /dev/null
+++ b/video_rss_aggregator/application/ports.py
@@ -0,0 +1,75 @@
+from __future__ import annotations
+
+from datetime import datetime
+from dataclasses import dataclass, field
+from typing import Protocol, Sequence
+
+from video_rss_aggregator.domain.models import PreparedMedia, SummaryResult
+from video_rss_aggregator.domain.outcomes import ProcessOutcome
+from video_rss_aggregator.domain.publication import PublicationRecord
+
+
+class RuntimeInspector(Protocol):
+ async def status(self) -> dict[str, object]: ...
+
+ async def bootstrap(self) -> list[str]: ...
+
+
+class MediaPreparationService(Protocol):
+ async def prepare(self, source_url: str, title: str | None) -> PreparedMedia: ...
+
+
+@dataclass(frozen=True)
+class FetchedFeedEntry:
+ source_url: str | None
+ title: str | None = None
+ guid: str | None = None
+ published_at: datetime | None = None
+
+
+@dataclass(frozen=True)
+class FetchedFeed:
+ entries: tuple[FetchedFeedEntry, ...] = field(default_factory=tuple)
+ title: str | None = None
+ site_url: str | None = None
+
+ def __post_init__(self) -> None:
+ object.__setattr__(self, "entries", tuple(self.entries))
+
+
+class FeedSource(Protocol):
+ async def fetch(
+ self, feed_url: str, max_items: int | None = None
+ ) -> FetchedFeed: ...
+
+
+class FeedRepository(Protocol):
+ async def save(self, feed_url: str, feed: FetchedFeed) -> None: ...
+
+
+class FeedVideoRepository(Protocol):
+ async def save_feed_item(self, feed_url: str, entry: FetchedFeedEntry) -> None: ...
+
+
+class SourceProcessor(Protocol):
+ async def execute(self, source_url: str, title: str | None) -> ProcessOutcome: ...
+
+
+class Summarizer(Protocol):
+ async def summarize(self, prepared_media: PreparedMedia) -> SummaryResult: ...
+
+
+class VideoRepository(Protocol):
+ async def save(self, media: PreparedMedia) -> str: ...
+
+
+class SummaryRepository(Protocol):
+ async def save(self, video_id: str, summary: SummaryResult) -> None: ...
+
+
+class PublicationRepository(Protocol):
+ async def latest_publications(self, limit: int) -> Sequence[PublicationRecord]: ...
+
+
+class PublicationRenderer(Protocol):
+ async def render(self, publications: Sequence[PublicationRecord]) -> str: ...
diff --git a/video_rss_aggregator/application/use_cases/__init__.py b/video_rss_aggregator/application/use_cases/__init__.py
new file mode 100644
index 0000000..e2c44c5
--- /dev/null
+++ b/video_rss_aggregator/application/use_cases/__init__.py
@@ -0,0 +1,6 @@
+from video_rss_aggregator.application.use_cases.runtime import (
+ BootstrapRuntime,
+ GetRuntimeStatus,
+)
+
+__all__ = ["BootstrapRuntime", "GetRuntimeStatus"]
diff --git a/video_rss_aggregator/application/use_cases/ingest_feed.py b/video_rss_aggregator/application/use_cases/ingest_feed.py
new file mode 100644
index 0000000..ef2912e
--- /dev/null
+++ b/video_rss_aggregator/application/use_cases/ingest_feed.py
@@ -0,0 +1,79 @@
+from __future__ import annotations
+
+from dataclasses import dataclass
+from typing import cast
+
+from video_rss_aggregator.application.ports import (
+ FetchedFeed,
+ FetchedFeedEntry,
+ FeedRepository,
+ FeedSource,
+ FeedVideoRepository,
+ SourceProcessor,
+)
+from video_rss_aggregator.domain.outcomes import Failure
+
+
+@dataclass(frozen=True)
+class IngestReport:
+ feed_title: str | None
+ item_count: int
+ processed_count: int
+
+
+@dataclass(frozen=True)
+class IngestFeed:
+ feed_source: FeedSource
+ feeds: FeedRepository
+ videos: FeedVideoRepository
+ process_source: SourceProcessor
+
+ async def execute(
+ self, feed_url: str, process: bool = False, max_items: int | None = None
+ ) -> IngestReport:
+ fetched_feed = await self.feed_source.fetch(feed_url, max_items=max_items)
+ normalized_entries: list[FetchedFeedEntry] = []
+
+ for entry in fetched_feed.entries:
+ if entry.source_url is None:
+ continue
+
+ source_url = entry.source_url.strip()
+ if not source_url:
+ continue
+
+ normalized_entries.append(
+ FetchedFeedEntry(
+ source_url=source_url,
+ title=entry.title,
+ guid=entry.guid,
+ published_at=entry.published_at,
+ )
+ )
+
+ valid_entries = tuple(normalized_entries)
+ normalized_feed = FetchedFeed(
+ title=fetched_feed.title,
+ site_url=fetched_feed.site_url,
+ entries=valid_entries,
+ )
+
+ processed_count = 0
+
+ await self.feeds.save(feed_url, normalized_feed)
+
+ for entry in valid_entries:
+ await self.videos.save_feed_item(feed_url, entry)
+
+ if process:
+ result = await self.process_source.execute(
+ cast(str, entry.source_url), entry.title
+ )
+ if not isinstance(result, Failure):
+ processed_count += 1
+
+ return IngestReport(
+ feed_title=normalized_feed.title,
+ item_count=len(valid_entries),
+ processed_count=processed_count,
+ )
diff --git a/video_rss_aggregator/application/use_cases/process_source.py b/video_rss_aggregator/application/use_cases/process_source.py
new file mode 100644
index 0000000..bd85d81
--- /dev/null
+++ b/video_rss_aggregator/application/use_cases/process_source.py
@@ -0,0 +1,39 @@
+from __future__ import annotations
+
+from dataclasses import dataclass
+
+from video_rss_aggregator.application.ports import (
+ MediaPreparationService,
+ SummaryRepository,
+ Summarizer,
+ VideoRepository,
+)
+from video_rss_aggregator.domain.outcomes import (
+ Failure,
+ PartialSuccess,
+ ProcessOutcome,
+ Success,
+)
+
+
+@dataclass(frozen=True)
+class ProcessSource:
+ media_service: MediaPreparationService
+ summarizer: Summarizer
+ videos: VideoRepository
+ summaries: SummaryRepository
+
+ async def execute(self, source_url: str, title: str | None) -> ProcessOutcome:
+ try:
+ media = await self.media_service.prepare(source_url, title)
+ summary = await self.summarizer.summarize(media)
+
+ video_id = await self.videos.save(media)
+ await self.summaries.save(video_id, summary)
+ except Exception as exc:
+ return Failure(source_url=source_url, reason=str(exc))
+
+ if summary.error is not None:
+ return PartialSuccess(media=media, reason=summary.error, summary=summary)
+
+ return Success(media=media, summary=summary)
diff --git a/video_rss_aggregator/application/use_cases/render_rss_feed.py b/video_rss_aggregator/application/use_cases/render_rss_feed.py
new file mode 100644
index 0000000..39031da
--- /dev/null
+++ b/video_rss_aggregator/application/use_cases/render_rss_feed.py
@@ -0,0 +1,18 @@
+from __future__ import annotations
+
+from dataclasses import dataclass
+
+from video_rss_aggregator.application.ports import (
+ PublicationRenderer,
+ PublicationRepository,
+)
+
+
+@dataclass(frozen=True)
+class RenderRssFeed:
+ publications: PublicationRepository
+ renderer: PublicationRenderer
+
+ async def execute(self, limit: int) -> str:
+ publications = await self.publications.latest_publications(limit)
+ return await self.renderer.render(tuple(publications))
diff --git a/video_rss_aggregator/application/use_cases/runtime.py b/video_rss_aggregator/application/use_cases/runtime.py
new file mode 100644
index 0000000..62ced5e
--- /dev/null
+++ b/video_rss_aggregator/application/use_cases/runtime.py
@@ -0,0 +1,29 @@
+from __future__ import annotations
+
+from dataclasses import dataclass
+
+from video_rss_aggregator.application.ports import RuntimeInspector
+
+
+@dataclass(frozen=True)
+class GetRuntimeStatus:
+ runtime: RuntimeInspector
+ storage_path: str
+ storage_dir: str
+ models: tuple[str, ...]
+
+ async def execute(self) -> dict[str, object]:
+ return {
+ **await self.runtime.status(),
+ "database_path": self.storage_path,
+ "storage_dir": self.storage_dir,
+ "models": list(self.models),
+ }
+
+
+@dataclass(frozen=True)
+class BootstrapRuntime:
+ runtime: RuntimeInspector
+
+ async def execute(self) -> dict[str, list[str]]:
+ return {"models": await self.runtime.bootstrap()}
diff --git a/video_rss_aggregator/bootstrap.py b/video_rss_aggregator/bootstrap.py
new file mode 100644
index 0000000..a6cb757
--- /dev/null
+++ b/video_rss_aggregator/bootstrap.py
@@ -0,0 +1,109 @@
+from __future__ import annotations
+
+from dataclasses import dataclass
+from typing import Awaitable, Callable
+
+import httpx
+
+from core_config import Config
+from service_summarize import SummarizationEngine
+from video_rss_aggregator.application.use_cases.ingest_feed import IngestFeed
+from video_rss_aggregator.application.use_cases.process_source import ProcessSource
+from video_rss_aggregator.application.use_cases.render_rss_feed import RenderRssFeed
+from video_rss_aggregator.application.use_cases.runtime import (
+ BootstrapRuntime,
+ GetRuntimeStatus,
+)
+from video_rss_aggregator.infrastructure.feed_source import HttpFeedSource
+from video_rss_aggregator.infrastructure.media_service import (
+ LegacyMediaPreparationService,
+)
+from video_rss_aggregator.infrastructure.publication_renderer import (
+ RssPublicationRenderer,
+)
+from video_rss_aggregator.infrastructure.runtime_adapters import LegacyRuntimeInspector
+from video_rss_aggregator.infrastructure.sqlite_repositories import (
+ SQLiteFeedRepository,
+ SQLiteFeedVideoRepository,
+ SQLitePublicationRepository,
+ SQLiteSummaryRepository,
+ SQLiteVideoRepository,
+)
+from video_rss_aggregator.infrastructure.summarizer import LegacySummarizer
+from video_rss_aggregator.storage import Database
+
+
+@dataclass(frozen=True)
+class AppUseCases:
+ get_runtime_status: GetRuntimeStatus
+ bootstrap_runtime: BootstrapRuntime
+ ingest_feed: IngestFeed
+ process_source: ProcessSource
+ render_rss_feed: RenderRssFeed
+
+
+@dataclass(frozen=True)
+class AppRuntime:
+ config: Config
+ use_cases: AppUseCases
+ close: Callable[[], Awaitable[None] | None]
+
+
+async def build_runtime(config: Config | None = None) -> AppRuntime:
+ resolved_config = config or Config.from_env()
+ database = await Database.connect(resolved_config.database_path)
+ await database.migrate()
+ client = httpx.AsyncClient(
+ timeout=httpx.Timeout(connect=15.0, read=300.0, write=300.0, pool=60.0),
+ follow_redirects=True,
+ limits=httpx.Limits(max_keepalive_connections=20, max_connections=50),
+ )
+ summarization_engine = SummarizationEngine(resolved_config)
+
+ runtime_inspector = LegacyRuntimeInspector(summarization_engine)
+ process_source = ProcessSource(
+ media_service=LegacyMediaPreparationService(
+ client=client,
+ storage_dir=resolved_config.storage_dir,
+ max_frames=resolved_config.max_frames,
+ scene_detection=resolved_config.frame_scene_detection,
+ scene_threshold=resolved_config.frame_scene_threshold,
+ scene_min_frames=resolved_config.frame_scene_min_frames,
+ max_transcript_chars=resolved_config.max_transcript_chars,
+ ),
+ summarizer=LegacySummarizer(summarization_engine),
+ videos=SQLiteVideoRepository(database),
+ summaries=SQLiteSummaryRepository(database),
+ )
+
+ use_cases = AppUseCases(
+ get_runtime_status=GetRuntimeStatus(
+ runtime=runtime_inspector,
+ storage_path=database.path,
+ storage_dir=resolved_config.storage_dir,
+ models=resolved_config.model_priority,
+ ),
+ bootstrap_runtime=BootstrapRuntime(runtime=runtime_inspector),
+ ingest_feed=IngestFeed(
+ feed_source=HttpFeedSource(client),
+ feeds=SQLiteFeedRepository(database),
+ videos=SQLiteFeedVideoRepository(database),
+ process_source=process_source,
+ ),
+ process_source=process_source,
+ render_rss_feed=RenderRssFeed(
+ publications=SQLitePublicationRepository(database),
+ renderer=RssPublicationRenderer(
+ title=resolved_config.rss_title,
+ link=resolved_config.rss_link,
+ description=resolved_config.rss_description,
+ ),
+ ),
+ )
+
+ async def _close() -> None:
+ await client.aclose()
+ await summarization_engine.close()
+ await database.close()
+
+ return AppRuntime(config=resolved_config, use_cases=use_cases, close=_close)
diff --git a/video_rss_aggregator/domain/__init__.py b/video_rss_aggregator/domain/__init__.py
new file mode 100644
index 0000000..6f4c2da
--- /dev/null
+++ b/video_rss_aggregator/domain/__init__.py
@@ -0,0 +1,16 @@
+from video_rss_aggregator.domain.models import PreparedMedia, SummaryResult
+from video_rss_aggregator.domain.outcomes import (
+ Failure,
+ PartialSuccess,
+ ProcessOutcome,
+ Success,
+)
+
+__all__ = [
+ "Failure",
+ "PartialSuccess",
+ "ProcessOutcome",
+ "PreparedMedia",
+ "Success",
+ "SummaryResult",
+]
diff --git a/video_rss_aggregator/domain/models.py b/video_rss_aggregator/domain/models.py
new file mode 100644
index 0000000..d3686a8
--- /dev/null
+++ b/video_rss_aggregator/domain/models.py
@@ -0,0 +1,31 @@
+from __future__ import annotations
+
+from dataclasses import dataclass
+
+
+@dataclass(frozen=True)
+class PreparedMedia:
+ source_url: str
+ title: str
+ transcript: str
+ media_path: str
+ frame_paths: tuple[str, ...]
+
+ def __post_init__(self) -> None:
+ object.__setattr__(self, "frame_paths", tuple(self.frame_paths))
+
+
+@dataclass(frozen=True)
+class SummaryResult:
+ summary: str
+ key_points: tuple[str, ...]
+ visual_highlights: tuple[str, ...]
+ model_used: str
+ vram_mb: float
+ transcript_chars: int
+ frame_count: int
+ error: str | None
+
+ def __post_init__(self) -> None:
+ object.__setattr__(self, "key_points", tuple(self.key_points))
+ object.__setattr__(self, "visual_highlights", tuple(self.visual_highlights))
diff --git a/video_rss_aggregator/domain/outcomes.py b/video_rss_aggregator/domain/outcomes.py
new file mode 100644
index 0000000..fe37520
--- /dev/null
+++ b/video_rss_aggregator/domain/outcomes.py
@@ -0,0 +1,35 @@
+from __future__ import annotations
+
+from dataclasses import dataclass, field
+from typing import TypeAlias
+
+from video_rss_aggregator.domain.models import PreparedMedia, SummaryResult
+
+
+@dataclass(frozen=True)
+class Success:
+ media: PreparedMedia
+ summary: SummaryResult
+ status: str = field(init=False, default="success")
+
+
+@dataclass(frozen=True)
+class PartialSuccess:
+ media: PreparedMedia
+ reason: str
+ summary: SummaryResult
+ status: str = field(init=False, default="partial_success")
+
+ def __post_init__(self) -> None:
+ if self.summary is None:
+ raise ValueError("summary is required")
+
+
+@dataclass(frozen=True)
+class Failure:
+ source_url: str
+ reason: str
+ status: str = field(init=False, default="failure")
+
+
+ProcessOutcome: TypeAlias = Success | PartialSuccess | Failure
diff --git a/video_rss_aggregator/domain/publication.py b/video_rss_aggregator/domain/publication.py
new file mode 100644
index 0000000..2195acf
--- /dev/null
+++ b/video_rss_aggregator/domain/publication.py
@@ -0,0 +1,20 @@
+from __future__ import annotations
+
+from dataclasses import dataclass
+from datetime import datetime
+
+
+@dataclass(frozen=True)
+class PublicationRecord:
+ title: str | None
+ source_url: str
+ published_at: datetime | None
+ summary: str
+ key_points: tuple[str, ...]
+ visual_highlights: tuple[str, ...]
+ model_used: str | None
+ vram_mb: float
+
+ def __post_init__(self) -> None:
+ object.__setattr__(self, "key_points", tuple(self.key_points))
+ object.__setattr__(self, "visual_highlights", tuple(self.visual_highlights))
diff --git a/video_rss_aggregator/infrastructure/__init__.py b/video_rss_aggregator/infrastructure/__init__.py
new file mode 100644
index 0000000..0be910b
--- /dev/null
+++ b/video_rss_aggregator/infrastructure/__init__.py
@@ -0,0 +1,19 @@
+from video_rss_aggregator.infrastructure.publication_renderer import (
+ RssPublicationRenderer,
+)
+from video_rss_aggregator.infrastructure.sqlite_repositories import (
+ SQLiteFeedRepository,
+ SQLiteFeedVideoRepository,
+ SQLitePublicationRepository,
+ SQLiteSummaryRepository,
+ SQLiteVideoRepository,
+)
+
+__all__ = [
+ "RssPublicationRenderer",
+ "SQLiteFeedRepository",
+ "SQLiteFeedVideoRepository",
+ "SQLitePublicationRepository",
+ "SQLiteSummaryRepository",
+ "SQLiteVideoRepository",
+]
diff --git a/video_rss_aggregator/infrastructure/feed_source.py b/video_rss_aggregator/infrastructure/feed_source.py
new file mode 100644
index 0000000..33ca8c9
--- /dev/null
+++ b/video_rss_aggregator/infrastructure/feed_source.py
@@ -0,0 +1,93 @@
+from __future__ import annotations
+
+from dataclasses import dataclass
+from datetime import datetime, timezone
+from email.utils import parsedate_to_datetime
+from typing import Any
+
+import feedparser
+import httpx
+
+from video_rss_aggregator.application.ports import FetchedFeed, FetchedFeedEntry
+
+
+def _pick_source_url(entry: Any) -> str | None:
+ enclosures = entry.get("enclosures", [])
+ if enclosures:
+ return enclosures[0].get("href") or enclosures[0].get("url")
+ links = entry.get("links", [])
+ if links:
+ return links[0].get("href")
+ return entry.get("link")
+
+
+def _map_entry(entry: Any) -> FetchedFeedEntry:
+ return FetchedFeedEntry(
+ source_url=_pick_source_url(entry),
+ title=entry.get("title") or None,
+ guid=entry.get("id") or None,
+ published_at=_pick_published_at(entry),
+ )
+
+
+def _pick_published_at(entry: Any) -> datetime | None:
+ for field in ("published", "updated"):
+ value = entry.get(field)
+ if value:
+ parsed = _parse_datetime_value(value)
+ if parsed is not None:
+ return parsed
+
+ for field in ("published_parsed", "updated_parsed"):
+ value = entry.get(field)
+ if value is not None:
+ parsed = _parse_datetime_tuple(value)
+ if parsed is not None:
+ return parsed
+
+ return None
+
+
+def _parse_datetime_value(value: Any) -> datetime | None:
+ if not isinstance(value, str):
+ return None
+
+ try:
+ parsed = parsedate_to_datetime(value)
+ except (TypeError, ValueError, IndexError, OverflowError):
+ parsed = None
+
+ if parsed is None:
+ try:
+ parsed = datetime.fromisoformat(value.replace("Z", "+00:00"))
+ except ValueError:
+ return None
+
+ if parsed.tzinfo is None:
+ return parsed.replace(tzinfo=timezone.utc)
+ return parsed.astimezone(timezone.utc)
+
+
+def _parse_datetime_tuple(value: Any) -> datetime | None:
+ try:
+ return datetime(*value[:6], tzinfo=timezone.utc)
+ except (TypeError, ValueError, IndexError, OverflowError):
+ return None
+
+
+@dataclass(frozen=True)
+class HttpFeedSource:
+ client: httpx.AsyncClient
+
+ async def fetch(self, feed_url: str, max_items: int | None = None) -> FetchedFeed:
+ response = await self.client.get(feed_url)
+ response.raise_for_status()
+ parsed = feedparser.parse(response.text)
+ entries = (
+ parsed.entries[:max_items] if max_items is not None else parsed.entries
+ )
+ return FetchedFeed(
+ title=parsed.feed.get("title") or None,
+ site_url=parsed.feed.get("link") or None,
+ entries=tuple(_map_entry(entry) for entry in entries),
+ )
diff --git a/video_rss_aggregator/infrastructure/media_service.py b/video_rss_aggregator/infrastructure/media_service.py
new file mode 100644
index 0000000..34cd8f4
--- /dev/null
+++ b/video_rss_aggregator/infrastructure/media_service.py
@@ -0,0 +1,47 @@
+from __future__ import annotations
+
+from dataclasses import dataclass
+
+import httpx
+
+import service_media
+from video_rss_aggregator.domain.models import PreparedMedia
+
+
+def _map_prepared_media(
+ source_url: str,
+ title: str | None,
+ legacy_media: service_media.PreparedMedia,
+) -> PreparedMedia:
+ resolved_title = title or legacy_media.title or source_url
+ return PreparedMedia(
+ source_url=source_url,
+ title=resolved_title,
+ transcript=legacy_media.transcript,
+ media_path=str(legacy_media.media_path),
+ frame_paths=tuple(str(path) for path in legacy_media.frame_paths),
+ )
+
+
+@dataclass(frozen=True)
+class LegacyMediaPreparationService:
+ client: httpx.AsyncClient
+ storage_dir: str
+ max_frames: int
+ scene_detection: bool
+ scene_threshold: float
+ scene_min_frames: int
+ max_transcript_chars: int
+
+ async def prepare(self, source_url: str, title: str | None) -> PreparedMedia:
+ legacy_media = await service_media.prepare_media(
+ client=self.client,
+ source=source_url,
+ storage_dir=self.storage_dir,
+ max_frames=self.max_frames,
+ scene_detection=self.scene_detection,
+ scene_threshold=self.scene_threshold,
+ scene_min_frames=self.scene_min_frames,
+ max_transcript_chars=self.max_transcript_chars,
+ )
+ return _map_prepared_media(source_url, title, legacy_media)
diff --git a/video_rss_aggregator/infrastructure/publication_renderer.py b/video_rss_aggregator/infrastructure/publication_renderer.py
new file mode 100644
index 0000000..85a0b16
--- /dev/null
+++ b/video_rss_aggregator/infrastructure/publication_renderer.py
@@ -0,0 +1,17 @@
+from __future__ import annotations
+
+from dataclasses import dataclass
+from typing import Sequence
+
+from video_rss_aggregator.domain.publication import PublicationRecord
+from video_rss_aggregator.rss import render_feed
+
+
+@dataclass(frozen=True)
+class RssPublicationRenderer:
+ title: str
+ link: str
+ description: str
+
+ async def render(self, publications: Sequence[PublicationRecord]) -> str:
+ return render_feed(self.title, self.link, self.description, publications)
diff --git a/video_rss_aggregator/infrastructure/runtime_adapters.py b/video_rss_aggregator/infrastructure/runtime_adapters.py
new file mode 100644
index 0000000..621eeb9
--- /dev/null
+++ b/video_rss_aggregator/infrastructure/runtime_adapters.py
@@ -0,0 +1,21 @@
+from __future__ import annotations
+
+from dataclasses import dataclass
+from typing import Protocol
+
+
+class _RuntimeEngine(Protocol):
+ async def runtime_status(self) -> dict[str, object]: ...
+
+ async def prepare_models(self) -> list[str]: ...
+
+
+@dataclass(frozen=True)
+class LegacyRuntimeInspector:
+ engine: _RuntimeEngine
+
+ async def status(self) -> dict[str, object]:
+ return await self.engine.runtime_status()
+
+ async def bootstrap(self) -> list[str]:
+ return await self.engine.prepare_models()
diff --git a/video_rss_aggregator/infrastructure/sqlite_repositories.py b/video_rss_aggregator/infrastructure/sqlite_repositories.py
new file mode 100644
index 0000000..1387d6c
--- /dev/null
+++ b/video_rss_aggregator/infrastructure/sqlite_repositories.py
@@ -0,0 +1,69 @@
+from __future__ import annotations
+
+from dataclasses import dataclass
+from typing import Sequence
+
+from video_rss_aggregator.application.ports import FetchedFeed, FetchedFeedEntry
+from video_rss_aggregator.domain.models import PreparedMedia
+from video_rss_aggregator.domain.models import SummaryResult
+from video_rss_aggregator.domain.publication import PublicationRecord
+from video_rss_aggregator.storage import Database
+
+
+@dataclass(frozen=True)
+class SQLiteFeedRepository:
+ database: Database
+
+ async def save(self, feed_url: str, feed: FetchedFeed) -> None:
+ await self.database.upsert_feed(feed_url, feed.title)
+
+
+@dataclass(frozen=True)
+class SQLiteFeedVideoRepository:
+ database: Database
+
+ async def save_feed_item(self, feed_url: str, entry: FetchedFeedEntry) -> None:
+ if entry.source_url is None:
+ raise ValueError("source_url is required")
+ feed_id = await self.database.upsert_feed(feed_url, None)
+ await self.database.upsert_video(
+ feed_id=feed_id,
+ guid=entry.guid,
+ title=entry.title,
+ source_url=entry.source_url,
+ published_at=entry.published_at,
+ )
+
+
+@dataclass(frozen=True)
+class SQLiteVideoRepository:
+ database: Database
+
+ async def save(self, media: PreparedMedia) -> str:
+ video_id = await self.database.upsert_video(
+ feed_id=None,
+ guid=None,
+ title=media.title,
+ source_url=media.source_url,
+ published_at=None,
+ media_path=media.media_path,
+ )
+ if media.transcript:
+ await self.database.insert_transcript(video_id, media.transcript)
+ return video_id
+
+
+@dataclass(frozen=True)
+class SQLiteSummaryRepository:
+ database: Database
+
+ async def save(self, video_id: str, summary: SummaryResult) -> None:
+ await self.database.insert_summary(video_id, summary)
+
+
+@dataclass(frozen=True)
+class SQLitePublicationRepository:
+ database: Database
+
+ async def latest_publications(self, limit: int) -> Sequence[PublicationRecord]:
+ return await self.database.latest_publications(limit)
diff --git a/video_rss_aggregator/infrastructure/summarizer.py b/video_rss_aggregator/infrastructure/summarizer.py
new file mode 100644
index 0000000..42b3201
--- /dev/null
+++ b/video_rss_aggregator/infrastructure/summarizer.py
@@ -0,0 +1,46 @@
+from __future__ import annotations
+
+from dataclasses import dataclass
+from pathlib import Path
+from typing import Protocol
+
+from service_summarize import SummaryResult as LegacySummaryResult
+from video_rss_aggregator.domain.models import PreparedMedia, SummaryResult
+
+
+class _SummarizationEngine(Protocol):
+ async def summarize(
+ self,
+ *,
+ source_url: str,
+ title: str | None,
+ transcript: str,
+ frame_paths: list[Path],
+ ) -> LegacySummaryResult: ...
+
+
+def _map_summary_result(result: LegacySummaryResult) -> SummaryResult:
+ return SummaryResult(
+ summary=result.summary,
+ key_points=tuple(result.key_points),
+ visual_highlights=tuple(result.visual_highlights),
+ model_used=result.model_used,
+ vram_mb=result.vram_mb,
+ transcript_chars=result.transcript_chars,
+ frame_count=result.frame_count,
+ error=result.error,
+ )
+
+
+@dataclass(frozen=True)
+class LegacySummarizer:
+ engine: _SummarizationEngine
+
+ async def summarize(self, prepared_media: PreparedMedia) -> SummaryResult:
+ result = await self.engine.summarize(
+ source_url=prepared_media.source_url,
+ title=prepared_media.title,
+ transcript=prepared_media.transcript,
+ frame_paths=[Path(path) for path in prepared_media.frame_paths],
+ )
+ return _map_summary_result(result)
diff --git a/video_rss_aggregator/rss.py b/video_rss_aggregator/rss.py
new file mode 100644
index 0000000..7db92c7
--- /dev/null
+++ b/video_rss_aggregator/rss.py
@@ -0,0 +1,54 @@
+from __future__ import annotations
+
+from datetime import datetime, timezone
+from typing import Sequence
+from xml.etree.ElementTree import Element, SubElement, tostring
+
+from video_rss_aggregator.domain.publication import PublicationRecord
+
+
+def render_feed(
+ title: str,
+ link: str,
+ description: str,
+ publications: Sequence[PublicationRecord],
+) -> str:
+ rss = Element("rss", version="2.0")
+ channel = SubElement(rss, "channel")
+ SubElement(channel, "title").text = title
+ SubElement(channel, "link").text = link
+ SubElement(channel, "description").text = description
+
+ for publication in publications:
+ item = SubElement(channel, "item")
+ SubElement(item, "title").text = publication.title or "Untitled video"
+ SubElement(item, "link").text = publication.source_url
+
+ description_parts = [publication.summary]
+ if publication.key_points:
+ bullets = "\n".join(f"- {point}" for point in publication.key_points)
+ description_parts.append(bullets)
+ if publication.visual_highlights:
+ visuals = "\n".join(
+ f"- {highlight}" for highlight in publication.visual_highlights
+ )
+ description_parts.append(f"Visual Highlights:\n{visuals}")
+ if publication.model_used:
+ description_parts.append(
+ f"Model: {publication.model_used} (VRAM {publication.vram_mb:.2f} MB)"
+ )
+
+ SubElement(item, "description").text = "\n\n".join(description_parts)
+
+ if publication.published_at:
+ SubElement(item, "pubDate").text = _rfc2822(publication.published_at)
+
+ return '\n' + tostring(
+ rss, encoding="unicode"
+ )
+
+
+def _rfc2822(dt: datetime) -> str:
+ if dt.tzinfo is None:
+ dt = dt.replace(tzinfo=timezone.utc)
+ return dt.strftime("%a, %d %b %Y %H:%M:%S %z")
diff --git a/video_rss_aggregator/storage.py b/video_rss_aggregator/storage.py
new file mode 100644
index 0000000..f2817e1
--- /dev/null
+++ b/video_rss_aggregator/storage.py
@@ -0,0 +1,275 @@
+from __future__ import annotations
+
+import json
+import uuid
+from datetime import datetime, timezone
+from pathlib import Path
+
+import aiosqlite
+
+from video_rss_aggregator.domain.models import SummaryResult
+from video_rss_aggregator.domain.publication import PublicationRecord
+
+_SCHEMA = """
+CREATE TABLE IF NOT EXISTS feeds (
+ id TEXT PRIMARY KEY,
+ url TEXT NOT NULL UNIQUE,
+ title TEXT,
+ last_checked TEXT NOT NULL
+);
+
+CREATE TABLE IF NOT EXISTS videos (
+ id TEXT PRIMARY KEY,
+ feed_id TEXT,
+ source_url TEXT NOT NULL UNIQUE,
+ guid TEXT,
+ title TEXT,
+ published_at TEXT,
+ media_path TEXT,
+ created_at TEXT NOT NULL
+);
+
+CREATE TABLE IF NOT EXISTS transcripts (
+ id TEXT PRIMARY KEY,
+ video_id TEXT NOT NULL,
+ text TEXT NOT NULL,
+ created_at TEXT NOT NULL
+);
+
+CREATE TABLE IF NOT EXISTS summaries (
+ id TEXT PRIMARY KEY,
+ video_id TEXT NOT NULL,
+ summary TEXT NOT NULL,
+ key_points TEXT NOT NULL,
+ visual_highlights TEXT NOT NULL,
+ model_used TEXT,
+ vram_mb REAL NOT NULL,
+ transcript_chars INTEGER NOT NULL,
+ frame_count INTEGER NOT NULL,
+ error TEXT,
+ created_at TEXT NOT NULL
+);
+
+CREATE INDEX IF NOT EXISTS idx_videos_feed_id ON videos(feed_id);
+CREATE INDEX IF NOT EXISTS idx_transcripts_video_id ON transcripts(video_id);
+CREATE INDEX IF NOT EXISTS idx_summaries_video_id ON summaries(video_id);
+CREATE INDEX IF NOT EXISTS idx_summaries_created_at ON summaries(created_at DESC);
+"""
+
+_PRAGMAS = (
+ "PRAGMA foreign_keys = ON",
+ "PRAGMA journal_mode = WAL",
+ "PRAGMA synchronous = NORMAL",
+ "PRAGMA temp_store = MEMORY",
+ "PRAGMA busy_timeout = 5000",
+)
+
+SummaryRecord = PublicationRecord
+
+
+def _utc_now() -> str:
+ return datetime.now(timezone.utc).isoformat()
+
+
+def _parse_dt(value: str | None) -> datetime | None:
+ if not value:
+ return None
+ try:
+ return datetime.fromisoformat(value)
+ except ValueError:
+ return None
+
+
+class Database:
+ def __init__(self, conn: aiosqlite.Connection, database_path: str) -> None:
+ self._conn = conn
+ self._database_path = database_path
+
+ @classmethod
+ async def connect(cls, database_path: str) -> Database:
+ path = Path(database_path)
+ path.parent.mkdir(parents=True, exist_ok=True)
+
+ conn = await aiosqlite.connect(str(path))
+ conn.row_factory = aiosqlite.Row
+ for pragma in _PRAGMAS:
+ await conn.execute(pragma)
+ await conn.commit()
+ return cls(conn, str(path))
+
+ @property
+ def path(self) -> str:
+ return self._database_path
+
+ async def close(self) -> None:
+ await self._conn.close()
+
+ async def migrate(self) -> None:
+ await self._conn.executescript(_SCHEMA)
+ await self._conn.commit()
+
+ async def upsert_feed(self, url: str, title: str | None) -> str:
+ now = _utc_now()
+ feed_id = str(uuid.uuid4())
+ await self._conn.execute(
+ """
+ INSERT INTO feeds (id, url, title, last_checked)
+ VALUES (?, ?, ?, ?)
+ ON CONFLICT(url)
+ DO UPDATE SET
+ title = COALESCE(excluded.title, feeds.title),
+ last_checked = excluded.last_checked
+ """,
+ (feed_id, url, title, now),
+ )
+ await self._conn.commit()
+
+ async with self._conn.execute(
+ "SELECT id FROM feeds WHERE url = ?", (url,)
+ ) as cur:
+ row = await cur.fetchone()
+ if row is None:
+ raise RuntimeError("Failed to upsert feed")
+ return str(row["id"])
+
+ async def upsert_video(
+ self,
+ feed_id: str | None,
+ guid: str | None,
+ title: str | None,
+ source_url: str,
+ published_at: datetime | None,
+ media_path: str | None = None,
+ ) -> str:
+ now = _utc_now()
+ video_id = str(uuid.uuid4())
+ pub_text = published_at.isoformat() if published_at else None
+
+ await self._conn.execute(
+ """
+ INSERT INTO videos (
+ id, feed_id, source_url, guid, title, published_at, media_path, created_at
+ )
+ VALUES (?, ?, ?, ?, ?, ?, ?, ?)
+ ON CONFLICT(source_url)
+ DO UPDATE SET
+ feed_id = COALESCE(excluded.feed_id, videos.feed_id),
+ guid = COALESCE(excluded.guid, videos.guid),
+ title = COALESCE(excluded.title, videos.title),
+ published_at = COALESCE(excluded.published_at, videos.published_at),
+ media_path = COALESCE(excluded.media_path, videos.media_path)
+ """,
+ (video_id, feed_id, source_url, guid, title, pub_text, media_path, now),
+ )
+ await self._conn.commit()
+
+ async with self._conn.execute(
+ "SELECT id FROM videos WHERE source_url = ?",
+ (source_url,),
+ ) as cur:
+ row = await cur.fetchone()
+ if row is None:
+ raise RuntimeError("Failed to upsert video")
+ return str(row["id"])
+
+ async def insert_transcript(self, video_id: str, text: str) -> str:
+ transcript_id = str(uuid.uuid4())
+ await self._conn.execute(
+ """
+ INSERT INTO transcripts (id, video_id, text, created_at)
+ VALUES (?, ?, ?, ?)
+ """,
+ (transcript_id, video_id, text, _utc_now()),
+ )
+ await self._conn.commit()
+ return transcript_id
+
+ async def insert_summary(self, video_id: str, result: SummaryResult) -> str:
+ summary_id = str(uuid.uuid4())
+ await self._conn.execute(
+ """
+ INSERT INTO summaries (
+ id,
+ video_id,
+ summary,
+ key_points,
+ visual_highlights,
+ model_used,
+ vram_mb,
+ transcript_chars,
+ frame_count,
+ error,
+ created_at
+ )
+ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
+ """,
+ (
+ summary_id,
+ video_id,
+ result.summary,
+ json.dumps(result.key_points),
+ json.dumps(result.visual_highlights),
+ result.model_used,
+ float(result.vram_mb),
+ int(result.transcript_chars),
+ int(result.frame_count),
+ result.error,
+ _utc_now(),
+ ),
+ )
+ await self._conn.commit()
+ return summary_id
+
+ async def latest_publications(self, limit: int = 20) -> list[PublicationRecord]:
+ async with self._conn.execute(
+ """
+ SELECT
+ v.title,
+ v.source_url,
+ v.published_at,
+ s.summary,
+ s.key_points,
+ s.visual_highlights,
+ s.model_used,
+ s.vram_mb
+ FROM summaries s
+ JOIN videos v ON v.id = s.video_id
+ ORDER BY s.created_at DESC
+ LIMIT ?
+ """,
+ (limit,),
+ ) as cur:
+ rows = await cur.fetchall()
+
+ publications: list[PublicationRecord] = []
+ for row in rows:
+ key_points = _decode_text_list(row["key_points"])
+ visual_highlights = _decode_text_list(row["visual_highlights"])
+ publications.append(
+ PublicationRecord(
+ title=row["title"],
+ source_url=row["source_url"],
+ published_at=_parse_dt(row["published_at"]),
+ summary=row["summary"],
+ key_points=key_points,
+ visual_highlights=visual_highlights,
+ model_used=row["model_used"],
+ vram_mb=float(row["vram_mb"] or 0.0),
+ )
+ )
+ return publications
+
+ async def latest_summaries(self, limit: int = 20) -> list[PublicationRecord]:
+ return await self.latest_publications(limit)
+
+
+def _decode_text_list(value: str | None) -> list[str]:
+ if not value:
+ return []
+ try:
+ decoded = json.loads(value)
+ except json.JSONDecodeError:
+ return []
+ if not isinstance(decoded, list):
+ return []
+ return [str(item) for item in decoded]