Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 68 additions & 32 deletions packages/ai-bot/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,13 @@ import {
import type { MatrixEvent as DiscreteMatrixEvent } from 'https://cardstack.com/base/matrix-event';
import * as Sentry from '@sentry/node';

import { saveUsageCost } from '@cardstack/billing/ai-billing';
import {
spendUsageCost,
fetchGenerationCostWithBackoff,
} from '@cardstack/billing/ai-billing';
import { PgAdapter } from '@cardstack/postgres';
import type { ChatCompletionMessageParam } from 'openai/resources';
import { APIUserAbortError } from 'openai/error';
import type { OpenAIError } from 'openai/error';
import type { ChatCompletionStream } from 'openai/lib/ChatCompletionStream';
import { acquireRoomLock, releaseRoomLock } from './lib/queries';
Expand Down Expand Up @@ -86,22 +90,45 @@ class Assistant {
this.aiBotInstanceId = aiBotInstanceId;
}

async trackAiUsageCost(matrixUserId: string, generationId: string) {
async trackAiUsageCost(
matrixUserId: string,
opts: { costInUsd?: number; generationId?: string },
) {
if (trackAiUsageCostPromises.has(matrixUserId)) {
return;
}
// intentionally do not await saveUsageCost promise - it has a backoff mechanism to retry if the cost is not immediately available so we don't want to block the main thread
trackAiUsageCostPromises.set(
matrixUserId,
saveUsageCost(
this.pgAdapter,
matrixUserId,
generationId,
process.env.OPENROUTER_API_KEY!,
).finally(() => {
trackAiUsageCostPromises.delete(matrixUserId);
}),
);
const promise = (async () => {
let { costInUsd, generationId } = opts;
if (
typeof costInUsd === 'number' &&
Number.isFinite(costInUsd) &&
costInUsd > 0
) {
await spendUsageCost(this.pgAdapter, matrixUserId, costInUsd);
} else if (generationId) {
log.info(
`No inline cost for user ${matrixUserId}, falling back to generation cost API (generationId: ${generationId})`,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In which case there is no inline cost?

);
const fetchedCost = await fetchGenerationCostWithBackoff(
generationId,
process.env.OPENROUTER_API_KEY!,
);
if (fetchedCost !== null) {
await spendUsageCost(this.pgAdapter, matrixUserId, fetchedCost);
} else {
const message = `Failed to fetch generation cost for user ${matrixUserId} (generationId: ${generationId}), credit deduction skipped`;
log.error(message);
Sentry.captureMessage(message, 'error');
}
} else {
log.warn(
`No usage cost and no generation ID for user ${matrixUserId}, skipping credit deduction`,
);
}
})().finally(() => {
trackAiUsageCostPromises.delete(matrixUserId);
});
trackAiUsageCostPromises.set(matrixUserId, promise);
}

getResponse(prompt: PromptParts, senderMatrixUserId?: string) {
Expand Down Expand Up @@ -284,16 +311,9 @@ Common issues are:
event.getType() === 'm.room.message')
) {
activeGeneration.runner.abort();
await activeGeneration.responder.finalize({
isCanceled: true,
});
if (activeGeneration.lastGeneratedChunkId) {
await assistant.trackAiUsageCost(
senderMatrixUserId,
activeGeneration.lastGeneratedChunkId,
);
}
activeGenerations.delete(room.roomId);
// Finalization, credit tracking, and cleanup are all
// handled by the streaming code path's catch/finally
// blocks after the APIUserAbortError is thrown.
}

if (isShuttingDown()) {
Expand Down Expand Up @@ -448,6 +468,7 @@ Common issues are:

let chunkHandlingError: string | undefined;
let generationId: string | undefined;
let costInUsd: number | undefined;
log.info(
`[${eventId}] Starting generation with model %s`,
promptParts.model,
Expand All @@ -471,6 +492,9 @@ Common issues are:
});
}
generationId = chunk.id;
if (chunk.usage && (chunk.usage as any).cost != null) {
costInUsd = (chunk.usage as any).cost;
}
let activeGeneration = activeGenerations.get(room.roomId);
if (activeGeneration) {
activeGeneration.lastGeneratedChunkId = generationId;
Expand Down Expand Up @@ -517,17 +541,29 @@ Common issues are:
);
log.info(`[${eventId}] Response finalized`);
} catch (error) {
log.error(`[${eventId}] Error during generation or finalization`);
log.error(error);
if (chunkHandlingError) {
await responder.onError(chunkHandlingError); // E.g. MatrixError: [413] event too large
// When the cancel handler aborts the runner,
// finalChatCompletion() throws APIUserAbortError.
// Finalize the responder with the canceled flag and let
// the finally block handle credit tracking.
if (error instanceof APIUserAbortError) {
log.info(`[${eventId}] Generation was canceled by user`);
await responder.finalize({ isCanceled: true });
} else {
await responder.onError(error as OpenAIError);
log.error(`[${eventId}] Error during generation or finalization`);
log.error(error);
if (chunkHandlingError) {
await responder.onError(chunkHandlingError); // E.g. MatrixError: [413] event too large
} else {
await responder.onError(error as OpenAIError);
}
}
} finally {
if (generationId) {
assistant.trackAiUsageCost(senderMatrixUserId, generationId);
}
// Always track cost here — this path has the best data
// (both costInUsd from inline chunks and generationId).
assistant.trackAiUsageCost(senderMatrixUserId, {
costInUsd,
generationId,
});
activeGenerations.delete(room.roomId);
}

Expand Down
67 changes: 1 addition & 66 deletions packages/billing/ai-billing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,50 +109,7 @@ export async function spendUsageCost(
}
}

export async function saveUsageCost(
dbAdapter: DBAdapter,
matrixUserId: string,
generationId: string,
openRouterApiKey: string,
) {
try {
// Generation data is sometimes not immediately available, so we retry a couple of times until we are able to get the cost
let costInUsd = await fetchGenerationCostWithBackoff(
generationId,
openRouterApiKey,
);

if (costInUsd === null) {
Sentry.captureException(
new Error(
`Failed to fetch generation cost after retries (generationId: ${generationId})`,
),
);
return;
}

let creditsConsumed = Math.round(costInUsd * CREDITS_PER_USD);

let user = await getUserByMatrixUserId(dbAdapter, matrixUserId);

if (!user) {
throw new Error(
`should not happen: user with matrix id ${matrixUserId} not found in the users table`,
);
}

await spendCredits(dbAdapter, user.id, creditsConsumed);
} catch (err) {
log.error(
`Failed to track AI usage (matrixUserId: ${matrixUserId}, generationId: ${generationId}):`,
err,
);
Sentry.captureException(err);
// Don't throw, because we don't want to crash the application over this
}
}

async function fetchGenerationCostWithBackoff(
export async function fetchGenerationCostWithBackoff(
generationId: string,
openRouterApiKey: string,
): Promise<number | null> {
Expand Down Expand Up @@ -202,7 +159,6 @@ async function fetchGenerationCost(
},
);

// 404 means generation data probably isn't available yet - return null to trigger retry
if (response.status === 404) {
return null;
}
Expand All @@ -224,24 +180,3 @@ async function fetchGenerationCost(

return data.data.total_cost;
}

export function extractGenerationIdFromResponse(
response: any,
): string | undefined {
// OpenRouter responses typically include a generation_id in the response
// This might be in different places depending on the endpoint
if (response.id) {
return response.id;
}

if (response.choices && response.choices[0] && response.choices[0].id) {
return response.choices[0].id;
}

// For chat completions, the generation ID might be in usage
if (response.usage && response.usage.generation_id) {
return response.usage.generation_id;
}

return undefined;
}
75 changes: 33 additions & 42 deletions packages/realm-server/handlers/handle-request-forward.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,24 +57,30 @@ async function handleStreamingRequest(
if (!reader) throw new Error('No readable stream available');

let generationId: string | undefined;
let costInUsd: number | undefined;
let lastPing = Date.now();

await proxySSE(
reader,
async (data) => {
// Handle end of stream
if (data === '[DONE]') {
if (generationId) {
// Save cost in the background so we don't block the stream on OpenRouter's generation cost API.
// Chain per-user promises so costs are recorded sequentially.
// Only deduct credits when we observed billable metadata during
// the stream (an inline cost or a generation ID for the fallback).
if (
generationId != null ||
(typeof costInUsd === 'number' &&
Number.isFinite(costInUsd) &&
costInUsd > 0)
) {
const previousPromise =
pendingCostPromises.get(matrixUserId) ?? Promise.resolve();
const costPromise = previousPromise
.then(() =>
endpointConfig.creditStrategy.saveUsageCost(
dbAdapter,
matrixUserId,
{ id: generationId },
{ id: generationId, usage: { cost: costInUsd } },
),
)
.finally(() => {
Expand All @@ -83,7 +89,12 @@ async function handleStreamingRequest(
}
});
pendingCostPromises.set(matrixUserId, costPromise);
} else {
log.warn(
`Streaming response for user ${matrixUserId} contained no generation ID or usage cost, skipping credit deduction`,
);
}

ctxt.res.write(`data: [DONE]\n\n`);
return 'stop';
}
Expand All @@ -95,6 +106,10 @@ async function handleStreamingRequest(
if (!generationId && dataObj.id) {
generationId = dataObj.id;
}

if (dataObj.usage?.cost != null) {
costInUsd = dataObj.usage.cost;
}
} catch {
log.warn('Invalid JSON in streaming response:', data);
}
Expand Down Expand Up @@ -499,46 +514,22 @@ export default function handleRequestForward({

const responseData = await externalResponse.json();

// 6. Deduct credits in the background using the cost from the response,
// or fall back to saveUsageCost when the cost is not provided.
const costInUsd = responseData?.usage?.cost;
// 6. Deduct credits in the background using the cost from the response.
const previousPromise =
pendingCostPromises.get(matrixUserId) ?? Promise.resolve();
let costPromise: Promise<void>;

if (
typeof costInUsd === 'number' &&
Number.isFinite(costInUsd) &&
costInUsd > 0
) {
costPromise = previousPromise
.then(() =>
destinationConfig.creditStrategy.spendUsageCost(
dbAdapter,
matrixUserId,
costInUsd,
),
)
.finally(() => {
if (pendingCostPromises.get(matrixUserId) === costPromise) {
pendingCostPromises.delete(matrixUserId);
}
});
} else {
costPromise = previousPromise
.then(() =>
destinationConfig.creditStrategy.saveUsageCost(
dbAdapter,
matrixUserId,
responseData,
),
)
.finally(() => {
if (pendingCostPromises.get(matrixUserId) === costPromise) {
pendingCostPromises.delete(matrixUserId);
}
});
}
const costPromise = previousPromise
.then(() =>
destinationConfig.creditStrategy.saveUsageCost(
dbAdapter,
matrixUserId,
responseData,
),
)
.finally(() => {
if (pendingCostPromises.get(matrixUserId) === costPromise) {
pendingCostPromises.delete(matrixUserId);
}
});
pendingCostPromises.set(matrixUserId, costPromise);

// 7. Return response
Expand Down
Loading
Loading