Skip to content

Commit 230e39b

Browse files
committed
add data to bigquery table
1 parent 7cbd50b commit 230e39b

File tree

7 files changed

+408
-15
lines changed

7 files changed

+408
-15
lines changed

npm-app/src/asdf.ts

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,13 @@ const codebuffBackendProvider = createOpenAICompatible({
1010
})
1111

1212
const response = streamText({
13-
model: codebuffBackendProvider('anthropic/claude-sonnet-4.5'),
13+
model: codebuffBackendProvider('openai/gpt-5'),
1414
messages: [
1515
{
1616
role: 'user',
1717
content:
1818
'This is a bunch of text just to fill out some space. Ignore this.'.repeat(
19-
1000,
19+
100,
2020
),
2121
},
2222
{
@@ -29,6 +29,15 @@ const response = streamText({
2929
},
3030
},
3131
],
32+
providerOptions: {
33+
codebuff: {
34+
// all these get directly added to the body at the top level
35+
reasoningEffort: 'low',
36+
codebuff_metadata: {
37+
agent_run_id: 'testing',
38+
},
39+
},
40+
},
3241
})
3342
for await (const chunk of response.fullStream) {
3443
console.log('asdf', { chunk })

packages/bigquery/src/client.ts

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,16 @@
11
import { logger } from '@codebuff/common/util/logger'
2+
import { errorToObject } from '@codebuff/common/util/object'
23
import { BigQuery } from '@google-cloud/bigquery'
34

4-
import { RELABELS_SCHEMA, TRACES_SCHEMA } from './schema'
5+
import { MESSAGE_SCHEMA, RELABELS_SCHEMA, TRACES_SCHEMA } from './schema'
56

6-
import type { BaseTrace, GetRelevantFilesTrace, Relabel, Trace } from './schema'
7+
import type {
8+
BaseTrace,
9+
GetRelevantFilesTrace,
10+
MessageRow,
11+
Relabel,
12+
Trace,
13+
} from './schema'
714

815
const DATASET =
916
process.env.NEXT_PUBLIC_CB_ENVIRONMENT === 'prod'
@@ -12,6 +19,7 @@ const DATASET =
1219

1320
const TRACES_TABLE = 'traces'
1421
const RELABELS_TABLE = 'relabels'
22+
const MESSAGE_TABLE = 'message'
1523

1624
// Create a single BigQuery client instance to be used by all functions
1725
let client: BigQuery | null = null
@@ -26,6 +34,9 @@ function getClient(): BigQuery {
2634
}
2735

2836
export async function setupBigQuery(dataset: string = DATASET) {
37+
if (client) {
38+
return
39+
}
2940
try {
3041
client = new BigQuery()
3142

@@ -55,6 +66,17 @@ export async function setupBigQuery(dataset: string = DATASET) {
5566
fields: ['user_id', 'agent_step_id'],
5667
},
5768
})
69+
await ds.table(MESSAGE_TABLE).get({
70+
autoCreate: true,
71+
schema: MESSAGE_SCHEMA,
72+
timePartitioning: {
73+
type: 'MONTH',
74+
field: 'finished_at',
75+
},
76+
clustering: {
77+
fields: ['user_id'],
78+
},
79+
})
5880
} catch (error) {
5981
logger.error(
6082
{
@@ -71,6 +93,34 @@ export async function setupBigQuery(dataset: string = DATASET) {
7193
}
7294
}
7395

96+
export async function insertMessage(
97+
row: MessageRow,
98+
dataset: string = DATASET,
99+
) {
100+
try {
101+
await getClient()
102+
.dataset(dataset)
103+
.table(MESSAGE_TABLE)
104+
.insert({ ...row, request: JSON.stringify(row.request) })
105+
106+
logger.debug(
107+
{
108+
...row,
109+
request: undefined,
110+
},
111+
'Inserted message into BigQuery',
112+
)
113+
return true
114+
} catch (error) {
115+
logger.error(
116+
{ error: errorToObject(error), messageId: row.id },
117+
'Failed to insert message into BigQuery',
118+
)
119+
120+
return false
121+
}
122+
}
123+
74124
export async function insertTrace(trace: Trace, dataset: string = DATASET) {
75125
try {
76126
// Create a copy of the trace and stringify payload if needed

packages/bigquery/src/schema.ts

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,3 +125,35 @@ export const RELABELS_SCHEMA: TableSchema = {
125125
{ name: 'payload', type: 'JSON', mode: 'REQUIRED' },
126126
],
127127
}
128+
129+
export type MessageRow = {
130+
id: string
131+
user_id: string
132+
finished_at: Date
133+
created_at: Date
134+
request: unknown
135+
response: string
136+
output_tokens?: number | null
137+
reasoning_tokens?: number | null
138+
cost?: number | null
139+
upstream_inference_cost?: number | null
140+
input_tokens?: number | null
141+
cache_read_input_tokens?: number | null
142+
}
143+
144+
export const MESSAGE_SCHEMA: TableSchema = {
145+
fields: [
146+
{ name: 'id', type: 'STRING', mode: 'REQUIRED' },
147+
{ name: 'user_id', type: 'STRING', mode: 'REQUIRED' },
148+
{ name: 'finished_at', type: 'TIMESTAMP', mode: 'REQUIRED' },
149+
{ name: 'created_at', type: 'TIMESTAMP', mode: 'REQUIRED' },
150+
{ name: 'request', type: 'JSON', mode: 'REQUIRED' },
151+
{ name: 'response', type: 'STRING', mode: 'REQUIRED' },
152+
{ name: 'output_tokens', type: 'INTEGER', mode: 'NULLABLE' },
153+
{ name: 'reasoning_tokens', type: 'INTEGER', mode: 'NULLABLE' },
154+
{ name: 'cost', type: 'FLOAT', mode: 'NULLABLE' },
155+
{ name: 'upstream_inference_cost', type: 'FLOAT', mode: 'NULLABLE' },
156+
{ name: 'input_tokens', type: 'INTEGER', mode: 'NULLABLE' },
157+
{ name: 'cache_read_input_tokens', type: 'INTEGER', mode: 'NULLABLE' },
158+
],
159+
}

web/src/app/api/v1/chat/completions/route.ts

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
1+
import { getUserUsageData } from '@codebuff/billing/usage-service'
12
import { NextResponse } from 'next/server'
23

34
import type { NextRequest } from 'next/server'
45

56
import { getUserInfoFromApiKey } from '@/db/user'
6-
import { handleOpenrouterStream } from '@/llm-api/openrouter'
7+
import { handleOpenRouterStream } from '@/llm-api/openrouter'
78
import { extractApiKeyFromHeader } from '@/util/auth'
89
import { errorToObject } from '@/util/error'
910
import { logger } from '@/util/logger'
@@ -30,10 +31,25 @@ export async function POST(req: NextRequest) {
3031
)
3132
}
3233

34+
const userId = userInfo.id
35+
const {
36+
balance: { totalRemaining },
37+
nextQuotaReset,
38+
} = await getUserUsageData(userId)
39+
if (totalRemaining <= 0) {
40+
return NextResponse.json(
41+
{
42+
message: `Insufficient credits. Please add credits at ${process.env.NEXT_PUBLIC_APP_URL}/usage or wait for your next cycle to begin (${nextQuotaReset}).`,
43+
},
44+
{ status: 402 }
45+
)
46+
}
47+
3348
if (body.stream) {
3449
try {
35-
const stream = await handleOpenrouterStream({
50+
const stream = await handleOpenRouterStream({
3651
body,
52+
userId,
3753
})
3854

3955
return new NextResponse(stream, {

web/src/llm-api/openrouter.ts

Lines changed: 130 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,25 @@
1+
import {
2+
insertMessage as insertMessageIntoBigquery,
3+
setupBigQuery,
4+
} from '@codebuff/bigquery'
15
import { env } from '@codebuff/internal/env'
26

3-
export async function handleOpenrouterStream({ body }: { body: any }) {
7+
import { OpenRouterStreamChatCompletionChunkSchema } from './type/openrouter'
8+
9+
import type { OpenRouterStreamChatCompletionChunk } from './type/openrouter'
10+
11+
import { errorToObject } from '@/util/error'
12+
import { logger } from '@/util/logger'
13+
14+
type StreamState = { responseText: string }
15+
16+
export async function handleOpenRouterStream({
17+
body,
18+
userId,
19+
}: {
20+
body: any
21+
userId: string
22+
}) {
423
// Ensure usage tracking is enabled
524
if (body.usage === undefined) {
625
body.usage = {}
@@ -30,7 +49,8 @@ export async function handleOpenrouterStream({ body }: { body: any }) {
3049
throw new Error('Failed to get response reader')
3150
}
3251

33-
let heartbeatInterval: ReturnType<typeof setInterval>
52+
let heartbeatInterval: NodeJS.Timeout
53+
let state: StreamState = { responseText: '' }
3454

3555
// Create a ReadableStream that Next.js can handle
3656
const stream = new ReadableStream({
@@ -67,6 +87,8 @@ export async function handleOpenrouterStream({ body }: { body: any }) {
6787
const line = buffer.slice(0, lineEnd + 1)
6888
buffer = buffer.slice(lineEnd + 1)
6989

90+
state = await handleLine({ userId, request: body, line, state })
91+
7092
// Forward the line to the client
7193
controller.enqueue(new TextEncoder().encode(line))
7294

@@ -90,3 +112,109 @@ export async function handleOpenrouterStream({ body }: { body: any }) {
90112

91113
return stream
92114
}
115+
116+
async function handleLine({
117+
userId,
118+
request,
119+
line,
120+
state,
121+
}: {
122+
userId: string
123+
request: unknown
124+
line: string
125+
state: StreamState
126+
}): Promise<StreamState> {
127+
if (!line.startsWith('data: ')) {
128+
return state
129+
}
130+
131+
const raw = line.slice('data: '.length)
132+
if (raw === '[DONE]\n') {
133+
return state
134+
}
135+
136+
// Parse the string into an object
137+
let obj
138+
try {
139+
obj = JSON.parse(raw)
140+
} catch (error) {
141+
logger.warn(
142+
`Received non-JSON OpenRouter response: ${JSON.stringify(errorToObject(error), null, 2)}`
143+
)
144+
return state
145+
}
146+
147+
// Extract usage
148+
const parsed = OpenRouterStreamChatCompletionChunkSchema.safeParse(obj)
149+
if (!parsed.success) {
150+
logger.warn(
151+
`Unable to parse OpenRotuer response: ${JSON.stringify(errorToObject(parsed.error), null, 2)}`
152+
)
153+
return state
154+
}
155+
156+
return await handleResponse({ userId, request, data: parsed.data, state })
157+
}
158+
159+
async function handleResponse({
160+
userId,
161+
request,
162+
data,
163+
state,
164+
}: {
165+
userId: string
166+
request: unknown
167+
data: OpenRouterStreamChatCompletionChunk
168+
state: StreamState
169+
}): Promise<StreamState> {
170+
state = await handleStreamChunk({ data, state })
171+
172+
if ('error' in data || !data.usage) {
173+
// Stream not finished
174+
return state
175+
}
176+
const usage = data.usage
177+
178+
// do not await this
179+
setupBigQuery().then(() =>
180+
insertMessageIntoBigquery({
181+
id: data.id,
182+
user_id: userId,
183+
finished_at: new Date(),
184+
created_at: new Date(data.created * 1000),
185+
request,
186+
response: state.responseText,
187+
output_tokens: usage.completion_tokens,
188+
reasoning_tokens: usage.completion_tokens_details?.reasoning_tokens,
189+
cost: usage.cost,
190+
upstream_inference_cost: usage.cost_details?.upstream_inference_cost,
191+
input_tokens: usage.prompt_tokens,
192+
cache_read_input_tokens: usage.prompt_tokens_details?.cached_tokens,
193+
})
194+
)
195+
const openRouterCost = usage.cost ?? 0
196+
const upstreamCost = usage.cost_details?.upstream_inference_cost ?? 0
197+
const cost = openRouterCost + upstreamCost
198+
// asdf todo: charge user
199+
return state
200+
}
201+
202+
async function handleStreamChunk({
203+
data,
204+
state,
205+
}: {
206+
data: OpenRouterStreamChatCompletionChunk
207+
state: StreamState
208+
}): Promise<StreamState> {
209+
if ('error' in data) {
210+
logger.warn({ streamChunk: data }, 'Received error from OpenRouter')
211+
return state
212+
}
213+
214+
if (!data.choices.length) {
215+
logger.warn({ streamChunk: data }, 'Received empty choices from OpenRouter')
216+
}
217+
const choice = data.choices[0]
218+
state.responseText += choice.delta?.content ?? ''
219+
return state
220+
}

0 commit comments

Comments
 (0)