Skip to content

Commit 2c50e2d

Browse files
committed
feat: add account rate limits handling and corresponding tests
1 parent 81c283c commit 2c50e2d

4 files changed

Lines changed: 169 additions & 26 deletions

File tree

apps/server/src/codexAppServerManager.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -519,6 +519,25 @@ export class CodexAppServerManager extends EventEmitter<CodexAppServerManagerEve
519519
console.log("codex account/read failed", error);
520520
}
521521

522+
try {
523+
const accountRateLimitsResponse = await this.sendRequest(
524+
context,
525+
"account/rateLimits/read",
526+
{},
527+
);
528+
this.emitEvent({
529+
id: EventId.makeUnsafe(randomUUID()),
530+
kind: "notification",
531+
provider: "codex",
532+
threadId: context.session.threadId,
533+
createdAt: new Date().toISOString(),
534+
method: "account/rateLimits/updated",
535+
payload: accountRateLimitsResponse,
536+
});
537+
} catch (error) {
538+
console.log("codex account/rateLimits/read failed", error);
539+
}
540+
522541
const normalizedModel = resolveCodexModelForAccount(
523542
normalizeCodexModelSlug(input.model),
524543
context.account,

apps/server/src/provider/Layers/CodexProvider.ts

Lines changed: 5 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -7,18 +7,7 @@ import type {
77
ServerProviderAuth,
88
ServerProviderState,
99
} from "@t3tools/contracts";
10-
import {
11-
Cache,
12-
Duration,
13-
Effect,
14-
Equal,
15-
FileSystem,
16-
Layer,
17-
Option,
18-
Path,
19-
Result,
20-
Stream,
21-
} from "effect";
10+
import { Effect, Equal, FileSystem, Layer, Option, Path, Result, Stream } from "effect";
2211
import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process";
2312

2413
import {
@@ -562,20 +551,11 @@ export const CodexProviderLive = Layer.effect(
562551
const fileSystem = yield* FileSystem.FileSystem;
563552
const path = yield* Path.Path;
564553
const spawner = yield* ChildProcessSpawner.ChildProcessSpawner;
565-
const accountProbeCache = yield* Cache.make({
566-
capacity: 4,
567-
timeToLive: Duration.minutes(5),
568-
lookup: (key: string) => {
569-
const [binaryPath, homePath] = JSON.parse(key) as [string, string | undefined];
570-
return probeCodexCapabilities({
571-
binaryPath,
572-
...(homePath ? { homePath } : {}),
573-
});
574-
},
575-
});
576-
577554
const checkProvider = checkCodexProviderStatus((input) =>
578-
Cache.get(accountProbeCache, JSON.stringify([input.binaryPath, input.homePath])),
555+
probeCodexCapabilities({
556+
binaryPath: input.binaryPath,
557+
...(input.homePath ? { homePath: input.homePath } : {}),
558+
}),
579559
).pipe(
580560
Effect.provideService(ServerSettingsService, serverSettings),
581561
Effect.provideService(FileSystem.FileSystem, fileSystem),
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
import { chmodSync, mkdtempSync, rmSync, writeFileSync } from "node:fs";
2+
import os from "node:os";
3+
import path from "node:path";
4+
import { afterEach, describe, expect, it } from "vitest";
5+
6+
import { probeCodexAccountState } from "./codexAppServer";
7+
8+
const tempDirs: Array<string> = [];
9+
10+
afterEach(() => {
11+
for (const tempDir of tempDirs.splice(0)) {
12+
rmSync(tempDir, { recursive: true, force: true });
13+
}
14+
});
15+
16+
function createCodexProbeStub(options: {
17+
readonly rateLimitsBehavior: "ignore" | "respond";
18+
}): string {
19+
const tempDir = mkdtempSync(path.join(os.tmpdir(), "codex-probe-stub-"));
20+
tempDirs.push(tempDir);
21+
22+
const scriptPath = path.join(tempDir, "codex-stub.mjs");
23+
const respondsRateLimits = options.rateLimitsBehavior === "respond";
24+
const content = `#!/usr/bin/env node
25+
import readline from "node:readline";
26+
27+
const output = (message) => {
28+
process.stdout.write(JSON.stringify(message) + "\\n");
29+
};
30+
31+
const reader = readline.createInterface({ input: process.stdin });
32+
reader.on("line", (line) => {
33+
const message = JSON.parse(line);
34+
35+
if (message.method === "initialize") {
36+
output({ id: 1, result: {} });
37+
return;
38+
}
39+
40+
if (message.id === 2 && message.method === "account/read") {
41+
output({
42+
id: 2,
43+
result: {
44+
account: {
45+
type: "chatgpt",
46+
planType: "pro",
47+
},
48+
},
49+
});
50+
return;
51+
}
52+
53+
if (message.id === 3 && message.method === "account/rateLimits/read") {
54+
if (${respondsRateLimits}) {
55+
output({
56+
id: 3,
57+
result: {
58+
rateLimits: {
59+
primary: {
60+
remaining: 7,
61+
used: 3,
62+
},
63+
},
64+
},
65+
});
66+
}
67+
}
68+
});
69+
`;
70+
writeFileSync(scriptPath, content, { encoding: "utf8" });
71+
chmodSync(scriptPath, 0o755);
72+
return scriptPath;
73+
}
74+
75+
describe("probeCodexAccountState", () => {
76+
it("resolves when account/rateLimits/read is ignored", async () => {
77+
const binaryPath = createCodexProbeStub({ rateLimitsBehavior: "ignore" });
78+
79+
const state = await probeCodexAccountState({
80+
binaryPath,
81+
signal: AbortSignal.timeout(1_000),
82+
});
83+
84+
expect(state.snapshot).toEqual({
85+
type: "chatgpt",
86+
planType: "pro",
87+
sparkEnabled: true,
88+
});
89+
expect(state.account).toEqual({
90+
type: "chatgpt",
91+
planType: "pro",
92+
});
93+
expect(state.rateLimits).toBeNull();
94+
});
95+
96+
it("includes rate limits when account/rateLimits/read responds", async () => {
97+
const binaryPath = createCodexProbeStub({ rateLimitsBehavior: "respond" });
98+
99+
const state = await probeCodexAccountState({
100+
binaryPath,
101+
signal: AbortSignal.timeout(1_000),
102+
});
103+
104+
expect(state.snapshot).toEqual({
105+
type: "chatgpt",
106+
planType: "pro",
107+
sparkEnabled: true,
108+
});
109+
expect(state.rateLimits).toEqual({
110+
primary: {
111+
remaining: 7,
112+
used: 3,
113+
},
114+
});
115+
});
116+
});

apps/server/src/provider/codexAppServer.ts

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,16 @@ export interface CodexAccountState {
1515

1616
interface JsonRpcProbeResponse {
1717
readonly id?: unknown;
18+
readonly method?: unknown;
19+
readonly params?: unknown;
1820
readonly result?: unknown;
1921
readonly error?: {
2022
readonly message?: unknown;
2123
};
2224
}
2325

26+
const RATE_LIMITS_PROBE_GRACE_MS = 1_500;
27+
2428
function readErrorMessage(response: JsonRpcProbeResponse): string | undefined {
2529
return typeof response.error?.message === "string" ? response.error.message : undefined;
2630
}
@@ -80,8 +84,13 @@ export async function probeCodexAccountState(input: {
8084
let accountSnapshot: CodexAccountSnapshot | null = null;
8185
let accountPayload: Record<string, unknown> | null = null;
8286
let rateLimitsPayload: Record<string, unknown> | null | undefined;
87+
let rateLimitsFallbackTimer: NodeJS.Timeout | undefined;
8388

8489
const cleanup = () => {
90+
if (rateLimitsFallbackTimer) {
91+
clearTimeout(rateLimitsFallbackTimer);
92+
rateLimitsFallbackTimer = undefined;
93+
}
8594
output.removeAllListeners();
8695
output.close();
8796
child.removeAllListeners();
@@ -107,7 +116,17 @@ export async function probeCodexAccountState(input: {
107116
);
108117

109118
const maybeFinish = () => {
110-
if (!accountSnapshot || rateLimitsPayload === undefined) {
119+
if (!accountSnapshot) {
120+
return;
121+
}
122+
123+
if (rateLimitsPayload === undefined) {
124+
if (!rateLimitsFallbackTimer) {
125+
rateLimitsFallbackTimer = setTimeout(() => {
126+
rateLimitsPayload = null;
127+
maybeFinish();
128+
}, RATE_LIMITS_PROBE_GRACE_MS);
129+
}
111130
return;
112131
}
113132

@@ -153,6 +172,15 @@ export async function probeCodexAccountState(input: {
153172
}
154173

155174
const response = parsed as JsonRpcProbeResponse;
175+
if (response.method === "account/rateLimits/updated") {
176+
const payload =
177+
readCodexRateLimitsPayload(response.params) ??
178+
readCodexRateLimitsPayload(response.result);
179+
rateLimitsPayload = payload ?? null;
180+
maybeFinish();
181+
return;
182+
}
183+
156184
if (response.id === 1) {
157185
const errorMessage = readErrorMessage(response);
158186
if (errorMessage) {

0 commit comments

Comments
 (0)