Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import { OrchestrationEventStoreLive } from "../src/persistence/Layers/Orchestra
import { ProjectionCheckpointRepositoryLive } from "../src/persistence/Layers/ProjectionCheckpoints.ts";
import { ProjectionPendingApprovalRepositoryLive } from "../src/persistence/Layers/ProjectionPendingApprovals.ts";
import { ProviderSessionRuntimeRepositoryLive } from "../src/persistence/Layers/ProviderSessionRuntime.ts";
import { ProviderUsageLimitsRepositoryLive } from "../src/persistence/Layers/ProviderUsageLimits.ts";
import { makeSqlitePersistenceLive } from "../src/persistence/Layers/Sqlite.ts";
import { ProjectionCheckpointRepository } from "../src/persistence/Services/ProjectionCheckpoints.ts";
import { ProjectionPendingApprovalRepository } from "../src/persistence/Services/ProjectionPendingApprovals.ts";
Expand Down Expand Up @@ -287,6 +288,9 @@ export const makeOrchestrationIntegrationHarness = (
Layer.provide(fakeRegistry!),
Layer.provide(AnalyticsService.layerTest),
);
const usageLimitsRepositoryLayer = ProviderUsageLimitsRepositoryLive.pipe(
Layer.provide(persistenceLayer),
);

const checkpointStoreLayer = CheckpointStoreLive.pipe(Layer.provide(GitCoreLive));
const projectionSnapshotQueryLayer = OrchestrationProjectionSnapshotQueryLive;
Expand All @@ -296,7 +300,7 @@ export const makeOrchestrationIntegrationHarness = (
ProjectionCheckpointRepositoryLive,
ProjectionPendingApprovalRepositoryLive,
checkpointStoreLayer,
providerLayer,
providerLayer.pipe(Layer.provide(usageLimitsRepositoryLayer)),
RuntimeReceiptBusTest,
);
const serverSettingsLayer = ServerSettingsService.layerTest();
Expand Down
5 changes: 5 additions & 0 deletions apps/server/integration/providerService.integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import { ServerSettingsService } from "../src/serverSettings.ts";
import { AnalyticsService } from "../src/telemetry/Services/AnalyticsService.ts";
import { SqlitePersistenceMemory } from "../src/persistence/Layers/Sqlite.ts";
import { ProviderSessionRuntimeRepositoryLive } from "../src/persistence/Layers/ProviderSessionRuntime.ts";
import { ProviderUsageLimitsRepositoryLive } from "../src/persistence/Layers/ProviderUsageLimits.ts";

import {
makeTestProviderAdapterHarness,
Expand Down Expand Up @@ -58,9 +59,13 @@ const makeIntegrationFixture = Effect.gen(function* () {
const directoryLayer = ProviderSessionDirectoryLive.pipe(
Layer.provide(ProviderSessionRuntimeRepositoryLive),
);
const usageLimitsRepositoryLayer = ProviderUsageLimitsRepositoryLive.pipe(
Layer.provide(SqlitePersistenceMemory),
);

const shared = Layer.mergeAll(
directoryLayer,
usageLimitsRepositoryLayer,
Layer.succeed(ProviderAdapterRegistry, registry),
ServerSettingsService.layerTest(DEFAULT_SERVER_SETTINGS),
AnalyticsService.layerTest,
Expand Down
1 change: 1 addition & 0 deletions apps/server/src/persistence/Errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,5 +101,6 @@ export type OrchestrationCommandReceiptRepositoryError =
| PersistenceDecodeError;

export type ProviderSessionRuntimeRepositoryError = PersistenceSqlError | PersistenceDecodeError;
export type ProviderUsageLimitsRepositoryError = PersistenceSqlError | PersistenceDecodeError;

export type ProjectionRepositoryError = PersistenceSqlError | PersistenceDecodeError;
60 changes: 60 additions & 0 deletions apps/server/src/persistence/Layers/ProviderUsageLimits.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import { assert, it } from "@effect/vitest";
import { Effect, Layer, Option } from "effect";

import type { ServerProviderUsageLimits } from "@t3tools/contracts";
import { ProviderUsageLimitsRepository } from "../Services/ProviderUsageLimits.ts";
import { ProviderUsageLimitsRepositoryLive } from "./ProviderUsageLimits.ts";
import { SqlitePersistenceMemory } from "./Sqlite.ts";

const layer = it.layer(
ProviderUsageLimitsRepositoryLive.pipe(Layer.provideMerge(SqlitePersistenceMemory)),
);

layer("ProviderUsageLimitsRepository", (it) => {
it.effect("keeps the newest usage limits snapshot when a stale update arrives", () =>
Effect.gen(function* () {
const repository = yield* ProviderUsageLimitsRepository;
const newerUsageLimits = {
updatedAt: "2026-04-04T01:00:00.000Z",
windows: [
{
kind: "weekly" as const,
label: "Weekly limit",
usedPercentage: 31,
resetsAt: "2026-04-08T00:00:00.000Z",
windowDurationMins: 10_080,
},
],
} satisfies ServerProviderUsageLimits;
const staleUsageLimits = {
updatedAt: "2026-04-04T00:00:00.000Z",
windows: [
{
kind: "weekly" as const,
label: "Weekly limit",
usedPercentage: 12,
resetsAt: "2026-04-09T00:00:00.000Z",
windowDurationMins: 10_080,
},
],
} satisfies ServerProviderUsageLimits;

yield* repository.upsert({
provider: "codex",
usageLimits: newerUsageLimits,
});

yield* repository.upsert({
provider: "codex",
usageLimits: staleUsageLimits,
});

const stored = yield* repository.getByProvider({ provider: "codex" });
assert.equal(Option.isSome(stored), true);
if (Option.isSome(stored)) {
assert.deepStrictEqual(stored.value.usageLimits, newerUsageLimits);
assert.strictEqual(stored.value.updatedAt, newerUsageLimits.updatedAt);
}
}),
);
});
139 changes: 139 additions & 0 deletions apps/server/src/persistence/Layers/ProviderUsageLimits.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
import { IsoDateTime, ProviderKind, ServerProviderUsageLimits } from "@t3tools/contracts";
import * as SqlClient from "effect/unstable/sql/SqlClient";
import * as SqlSchema from "effect/unstable/sql/SqlSchema";
import { Effect, Layer, Option, PubSub, Schema, Stream } from "effect";

import {
toPersistenceDecodeError,
toPersistenceSqlError,
type ProviderUsageLimitsRepositoryError,
} from "../Errors.ts";
import {
ProviderUsageLimitsRepository,
type ProviderUsageLimitsRepositoryShape,
StoredProviderUsageLimits,
} from "../Services/ProviderUsageLimits.ts";

const ProviderUsageLimitsDbRowSchema = Schema.Struct({
provider: ProviderKind,
updatedAt: IsoDateTime,
usageLimits: Schema.fromJsonString(ServerProviderUsageLimits),
});

function toPersistenceSqlOrDecodeError(sqlOperation: string, decodeOperation: string) {
return (cause: unknown): ProviderUsageLimitsRepositoryError =>
Schema.isSchemaError(cause)
? toPersistenceDecodeError(decodeOperation)(cause)
: toPersistenceSqlError(sqlOperation)(cause);
}

const makeProviderUsageLimitsRepository = Effect.gen(function* () {
const sql = yield* SqlClient.SqlClient;
const changesPubSub = yield* Effect.acquireRelease(
PubSub.unbounded<StoredProviderUsageLimits>(),
PubSub.shutdown,
);

const upsertUsageLimitsRow = SqlSchema.findOneOption({
Request: ProviderUsageLimitsDbRowSchema,
Result: ProviderUsageLimitsDbRowSchema,
execute: (row) =>
sql`
INSERT INTO provider_usage_limits (
provider_name,
updated_at,
payload_json
)
VALUES (
${row.provider},
${row.updatedAt},
${row.usageLimits}
)
ON CONFLICT (provider_name)
DO UPDATE SET
updated_at = excluded.updated_at,
payload_json = excluded.payload_json
WHERE excluded.updated_at > provider_usage_limits.updated_at
RETURNING
provider_name AS "provider",
updated_at AS "updatedAt",
payload_json AS "usageLimits"
`,
});

const getUsageLimitsByProvider = SqlSchema.findOneOption({
Request: Schema.Struct({
provider: ProviderKind,
}),
Result: ProviderUsageLimitsDbRowSchema,
execute: ({ provider }) =>
sql`
SELECT
provider_name AS "provider",
updated_at AS "updatedAt",
payload_json AS "usageLimits"
FROM provider_usage_limits
WHERE provider_name = ${provider}
`,
});

const getByProvider: ProviderUsageLimitsRepositoryShape["getByProvider"] = (input) =>
getUsageLimitsByProvider(input).pipe(
Effect.mapError(
toPersistenceSqlOrDecodeError(
"ProviderUsageLimitsRepository.getByProvider:query",
"ProviderUsageLimitsRepository.getByProvider:decodeRow",
),
),
Effect.flatMap((rowOption) =>
Option.match(rowOption, {
onNone: () => Effect.succeed(Option.none()),
onSome: (row) =>
Schema.decodeUnknownEffect(StoredProviderUsageLimits)(row).pipe(
Effect.mapError(
toPersistenceDecodeError(
"ProviderUsageLimitsRepository.getByProvider:rowToUsageLimits",
),
),
Effect.map(Option.some),
),
}),
),
);

const upsert: ProviderUsageLimitsRepositoryShape["upsert"] = (input) => {
const row: StoredProviderUsageLimits = {
provider: input.provider,
updatedAt: input.usageLimits.updatedAt,
usageLimits: input.usageLimits,
};

return upsertUsageLimitsRow(row).pipe(
Effect.mapError(
toPersistenceSqlOrDecodeError(
"ProviderUsageLimitsRepository.upsert:query",
"ProviderUsageLimitsRepository.upsert:requestOrRow",
),
),
Effect.flatMap((rowOption) =>
Option.match(rowOption, {
onNone: () => Effect.void,
onSome: (updatedRow) => PubSub.publish(changesPubSub, updatedRow).pipe(Effect.asVoid),
}),
),
);
};

return {
getByProvider,
upsert,
get streamChanges() {
return Stream.fromPubSub(changesPubSub);
},
} satisfies ProviderUsageLimitsRepositoryShape;
});

export const ProviderUsageLimitsRepositoryLive = Layer.effect(
ProviderUsageLimitsRepository,
makeProviderUsageLimitsRepository,
);
2 changes: 2 additions & 0 deletions apps/server/src/persistence/Migrations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import Migration0016 from "./Migrations/016_CanonicalizeModelSelections.ts";
import Migration0017 from "./Migrations/017_ProjectionThreadsArchivedAt.ts";
import Migration0018 from "./Migrations/018_ProjectionThreadsArchivedAtIndex.ts";
import Migration0019 from "./Migrations/019_ProjectionSnapshotLookupIndexes.ts";
import Migration0020 from "./Migrations/020_ProviderUsageLimits.ts";

/**
* Migration loader with all migrations defined inline.
Expand Down Expand Up @@ -63,6 +64,7 @@ export const migrationEntries = [
[17, "ProjectionThreadsArchivedAt", Migration0017],
[18, "ProjectionThreadsArchivedAtIndex", Migration0018],
[19, "ProjectionSnapshotLookupIndexes", Migration0019],
[20, "ProviderUsageLimits", Migration0020],
] as const;

export const makeMigrationLoader = (throughId?: number) =>
Expand Down
14 changes: 14 additions & 0 deletions apps/server/src/persistence/Migrations/020_ProviderUsageLimits.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import * as SqlClient from "effect/unstable/sql/SqlClient";
import * as Effect from "effect/Effect";

export default Effect.gen(function* () {
const sql = yield* SqlClient.SqlClient;

yield* sql`
CREATE TABLE IF NOT EXISTS provider_usage_limits (
provider_name TEXT PRIMARY KEY,
updated_at TEXT NOT NULL,
payload_json TEXT NOT NULL
)
`;
});
38 changes: 38 additions & 0 deletions apps/server/src/persistence/Services/ProviderUsageLimits.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import {
IsoDateTime,
type ProviderKind,
ProviderKind as ProviderKindSchema,
} from "@t3tools/contracts";
import { ServerProviderUsageLimits } from "@t3tools/contracts";
import { Option, Schema, ServiceMap, type Stream } from "effect";
import type { Effect } from "effect";

import type { ProviderUsageLimitsRepositoryError } from "../Errors.ts";

export const StoredProviderUsageLimits = Schema.Struct({
provider: ProviderKindSchema,
updatedAt: IsoDateTime,
usageLimits: ServerProviderUsageLimits,
});
export type StoredProviderUsageLimits = typeof StoredProviderUsageLimits.Type;

export const GetProviderUsageLimitsInput = Schema.Struct({
provider: ProviderKindSchema,
});
export type GetProviderUsageLimitsInput = typeof GetProviderUsageLimitsInput.Type;

export interface ProviderUsageLimitsRepositoryShape {
readonly getByProvider: (
input: GetProviderUsageLimitsInput,
) => Effect.Effect<Option.Option<StoredProviderUsageLimits>, ProviderUsageLimitsRepositoryError>;
readonly upsert: (input: {
readonly provider: ProviderKind;
readonly usageLimits: ServerProviderUsageLimits;
}) => Effect.Effect<void, ProviderUsageLimitsRepositoryError>;
readonly streamChanges: Stream.Stream<StoredProviderUsageLimits>;
}

export class ProviderUsageLimitsRepository extends ServiceMap.Service<
ProviderUsageLimitsRepository,
ProviderUsageLimitsRepositoryShape
>()("t3/persistence/Services/ProviderUsageLimits/ProviderUsageLimitsRepository") {}
Loading
Loading