Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions packages/api/drizzle/0012_add_source_failure_tracking.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- Add failure tracking columns to sources table for circuit breaker pattern
ALTER TABLE `sources` ADD `consecutive_failures` integer NOT NULL DEFAULT 0;
ALTER TABLE `sources` ADD `last_error_at` integer;
ALTER TABLE `sources` ADD `fetch_disabled_at` integer;
CREATE INDEX `idx_sources_fetch_disabled_at` ON `sources` (`fetch_disabled_at`);
4 changes: 4 additions & 0 deletions packages/api/src/db/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ export const sources = sqliteTable(
}).default("auto"),
iconUpdatedAt: integer("icon_updated_at", { mode: "timestamp" }),
lastFetched: integer("last_fetched", { mode: "timestamp" }),
consecutiveFailures: integer("consecutive_failures").notNull().default(0),
lastErrorAt: integer("last_error_at", { mode: "timestamp" }),
fetchDisabledAt: integer("fetch_disabled_at", { mode: "timestamp" }),
createdAt: integer("created_at", { mode: "timestamp" })
.notNull()
.$defaultFn(() => new Date()),
Expand All @@ -146,6 +149,7 @@ export const sources = sqliteTable(
index("idx_sources_url").on(table.url),
index("idx_sources_icon_url").on(table.iconUrl),
index("idx_sources_last_fetched").on(table.lastFetched),
index("idx_sources_fetch_disabled_at").on(table.fetchDisabledAt),
]
);

Expand Down
73 changes: 67 additions & 6 deletions packages/api/src/services/rss-fetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import type { Rss, Atom, Rdf, Json } from "@/types/feed";
import type { Database } from "@/db/client";
import * as schema from "@/db/schema";
import { and, eq, inArray, or, isNull, lt } from "drizzle-orm";
import { and, eq, inArray, or, isNull, lt, sql } from "drizzle-orm";
import { extractOgImage } from "@/utils/og-image-fetcher";
import {
sanitizeHtml,
Expand Down Expand Up @@ -59,6 +59,12 @@
batchSize: 20, // Feeds per batch (optimized for D1 query limits)
} as const;

/** Circuit breaker configuration for persistently-failing feeds */
const CIRCUIT_BREAKER = {
failureThreshold: 10, // Disable feed after this many consecutive failures
cooldownHours: 24, // Re-enable disabled feeds after this many hours
} as const;

// =============================================================================
// Types
// =============================================================================
Expand Down Expand Up @@ -116,17 +122,30 @@
}

/**
* Get stale sources that need fetching
* Get stale sources that need fetching, skipping circuit-broken feeds
* unless their cooldown period has elapsed.
*/
async function getStaleSources(
db: Database,
staleThreshold: Date,
limit: number
) {
const cooldownCutoff = new Date(
Date.now() - CIRCUIT_BREAKER.cooldownHours * 60 * 60 * 1000
);

// Include sources that are:
// (a) not disabled (fetchDisabledAt IS NULL), OR
// (b) disabled but cooldown has elapsed (fetchDisabledAt < cooldownCutoff)
const notDisabledOrCooledDown = or(
isNull(schema.sources.fetchDisabledAt),
lt(schema.sources.fetchDisabledAt, cooldownCutoff)
);

return await db
.select()
.from(schema.sources)
.where(buildStalenessWhereClause(staleThreshold))
.where(and(buildStalenessWhereClause(staleThreshold), notDisabledOrCooledDown))
.orderBy(schema.sources.lastFetched)
.limit(limit);
}
Expand Down Expand Up @@ -469,6 +488,41 @@
domain: extractDomain(feedUrl) || "unknown",
});

// Circuit breaker: increment consecutive failure count and disable
// the source if it exceeds the threshold.
try {
const now = new Date();
// Increment consecutive_failures atomically and fetch new value
const updated = await db
.update(schema.sources)
.set({
consecutiveFailures: sql`${schema.sources.consecutiveFailures} + 1`,
lastErrorAt: now,
lastFetched: now, // Advance lastFetched so the source isn't immediately re-queued
})
.where(eq(schema.sources.id, sourceId))
.returning({ consecutiveFailures: schema.sources.consecutiveFailures });

const newCount = updated[0]?.consecutiveFailures ?? 0;

if (newCount >= CIRCUIT_BREAKER.failureThreshold) {
await db
.update(schema.sources)
.set({ fetchDisabledAt: now })
.where(eq(schema.sources.id, sourceId));

console.warn(
`⚡ Circuit breaker opened for source ${sourceId} (${feedUrl}) after ${newCount} consecutive failures`
);
emitCounter("rss.feed_circuit_broken", 1, {
domain: extractDomain(feedUrl) || "unknown",
});
}
} catch (dbError) {
// Don't let failure tracking errors mask the original fetch error
console.error("Failed to update failure tracking for source:", sourceId, dbError);
}

// Error already captured in specific places, re-throw
throw error;
}
Expand Down Expand Up @@ -536,7 +590,7 @@
.from(schema.sources)
.where(eq(schema.sources.id, sourceId))
.limit(1)
.then((rows) => rows[0]);

Check failure on line 593 in packages/api/src/services/rss-fetcher.ts

View workflow job for this annotation

GitHub Actions / ci / Test API

src/services/__tests__/rss-fetcher.test.ts > RSS Fetcher Service > Error Handling > should handle non-existent source

SqliteError: no such column: "consecutive_failures" - should this be a string literal in single-quotes? ❯ Database.prepare ../../node_modules/.pnpm/better-sqlite3@12.4.6/node_modules/better-sqlite3/lib/methods/wrappers.js:5:21 ❯ BetterSQLiteSession.prepareQuery ../../node_modules/.pnpm/drizzle-orm@0.44.7_@cloudflare+workers-types@4.20251121.0_@opentelemetry+api@1.9.0_@typ_06980b4f9d6e304603209f47becaf980/node_modules/drizzle-orm/better-sqlite3/session.js:23:30 ❯ BetterSQLiteSession.prepareOneTimeQuery ../../node_modules/.pnpm/drizzle-orm@0.44.7_@cloudflare+workers-types@4.20251121.0_@opentelemetry+api@1.9.0_@typ_06980b4f9d6e304603209f47becaf980/node_modules/drizzle-orm/sqlite-core/session.js:141:17 ❯ SQLiteSelectBase._prepare ../../node_modules/.pnpm/drizzle-orm@0.44.7_@cloudflare+workers-types@4.20251121.0_@opentelemetry+api@1.9.0_@typ_06980b4f9d6e304603209f47becaf980/node_modules/drizzle-orm/sqlite-core/query-builders/select.js:615:88 ❯ SQLiteSelectBase.all ../../node_modules/.pnpm/drizzle-orm@0.44.7_@cloudflare+workers-types@4.20251121.0_@opentelemetry+api@1.9.0_@typ_06980b4f9d6e304603209f47becaf980/node_modules/drizzle-orm/sqlite-core/query-builders/select.js:641:17 ❯ SQLiteSelectBase.execute ../../node_modules/.pnpm/drizzle-orm@0.44.7_@cloudflare+workers-types@4.20251121.0_@opentelemetry+api@1.9.0_@typ_06980b4f9d6e304603209f47becaf980/node_modules/drizzle-orm/sqlite-core/query-builders/select.js:650:17 ❯ SQLiteSelectBase.then ../../node_modules/.pnpm/drizzle-orm@0.44.7_@cloudflare+workers-types@4.20251121.0_@opentelemetry+api@1.9.0_@typ_06980b4f9d6e304603209f47becaf980/node_modules/drizzle-orm/query-promise.js:21:17 ❯ updateSourceMetadata src/services/rss-fetcher.ts:593:6 ❯ src/services/rss-fetcher.ts:447:37 ⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯ Serialized Error: { code: 'SQLITE_ERROR' }

Check failure on line 593 in packages/api/src/services/rss-fetcher.ts

View workflow job for this annotation

GitHub Actions / ci / Test API

src/services/__tests__/rss-fetcher.test.ts > RSS Fetcher Service > Error Handling > should handle non-existent source

SqliteError: no such column: "consecutive_failures" - should this be a string literal in single-quotes? ❯ Database.prepare ../../node_modules/.pnpm/better-sqlite3@12.4.6/node_modules/better-sqlite3/lib/methods/wrappers.js:5:21 ❯ BetterSQLiteSession.prepareQuery ../../node_modules/.pnpm/drizzle-orm@0.44.7_@cloudflare+workers-types@4.20251121.0_@opentelemetry+api@1.9.0_@typ_06980b4f9d6e304603209f47becaf980/node_modules/drizzle-orm/better-sqlite3/session.js:23:30 ❯ BetterSQLiteSession.prepareOneTimeQuery ../../node_modules/.pnpm/drizzle-orm@0.44.7_@cloudflare+workers-types@4.20251121.0_@opentelemetry+api@1.9.0_@typ_06980b4f9d6e304603209f47becaf980/node_modules/drizzle-orm/sqlite-core/session.js:141:17 ❯ SQLiteSelectBase._prepare ../../node_modules/.pnpm/drizzle-orm@0.44.7_@cloudflare+workers-types@4.20251121.0_@opentelemetry+api@1.9.0_@typ_06980b4f9d6e304603209f47becaf980/node_modules/drizzle-orm/sqlite-core/query-builders/select.js:615:88 ❯ SQLiteSelectBase.all ../../node_modules/.pnpm/drizzle-orm@0.44.7_@cloudflare+workers-types@4.20251121.0_@opentelemetry+api@1.9.0_@typ_06980b4f9d6e304603209f47becaf980/node_modules/drizzle-orm/sqlite-core/query-builders/select.js:641:17 ❯ SQLiteSelectBase.execute ../../node_modules/.pnpm/drizzle-orm@0.44.7_@cloudflare+workers-types@4.20251121.0_@opentelemetry+api@1.9.0_@typ_06980b4f9d6e304603209f47becaf980/node_modules/drizzle-orm/sqlite-core/query-builders/select.js:650:17 ❯ SQLiteSelectBase.then ../../node_modules/.pnpm/drizzle-orm@0.44.7_@cloudflare+workers-types@4.20251121.0_@opentelemetry+api@1.9.0_@typ_06980b4f9d6e304603209f47becaf980/node_modules/drizzle-orm/query-promise.js:21:17 ❯ updateSourceMetadata src/services/rss-fetcher.ts:593:6 ❯ src/services/rss-fetcher.ts:447:37 ⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯ Serialized Error: { code: 'SQLITE_ERROR' }

// Only update icon if:
// 1. iconType is 'auto' (not custom or none)
Expand All @@ -552,19 +606,26 @@
updates.iconUpdatedAt = new Date();
}

// Always reset circuit breaker on a successful fetch
const circuitBreakerReset: Partial<typeof schema.sources.$inferInsert> = {
consecutiveFailures: 0,
lastErrorAt: null,
fetchDisabledAt: null,
};

// Only update if we have something to update (beyond lastFetched)
if (Object.keys(updates).length > 1) {
await db
.update(schema.sources)
.set(updates)
.set({ ...updates, ...circuitBreakerReset })
.where(eq(schema.sources.id, sourceId));
return true;
}

// Just update lastFetched
// Just update lastFetched and reset circuit breaker
await db
.update(schema.sources)
.set({ lastFetched: new Date() })
.set({ lastFetched: new Date(), ...circuitBreakerReset })
.where(eq(schema.sources.id, sourceId));
return false;
}
Expand Down
Loading