Skip to content

Commit 7eb4f5d

Browse files
Cover cleanup set+delete dual failure path during stream subscribe
Co-authored-by: Eric Allam <eric@trigger.dev>
1 parent aab79a0 commit 7eb4f5d

File tree

1 file changed

+78
-0
lines changed

1 file changed

+78
-0
lines changed

packages/ai/src/chatTransport.test.ts

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1413,6 +1413,59 @@ describe("TriggerChatTransport", function () {
14131413
expect(errors[0]?.error.message).toBe("stream subscribe root cause");
14141414
});
14151415

1416+
it("attempts both cleanup steps when set and delete both throw", async function () {
1417+
const errors: TriggerChatTransportError[] = [];
1418+
const runStore = new FailingCleanupSetAndDeleteRunStore();
1419+
1420+
const server = await startServer(function (req, res) {
1421+
if (req.method === "POST" && req.url === "/api/v1/tasks/chat-task/trigger") {
1422+
res.writeHead(200, {
1423+
"content-type": "application/json",
1424+
"x-trigger-jwt": "pk_stream_subscribe_cleanup_both_failure",
1425+
});
1426+
res.end(JSON.stringify({ id: "run_stream_subscribe_cleanup_both_failure" }));
1427+
return;
1428+
}
1429+
1430+
res.writeHead(404);
1431+
res.end();
1432+
});
1433+
1434+
const transport = new TriggerChatTransport({
1435+
task: "chat-task",
1436+
stream: "chat-stream",
1437+
accessToken: "pk_trigger",
1438+
baseURL: server.url,
1439+
runStore,
1440+
onError: function onError(error) {
1441+
errors.push(error);
1442+
},
1443+
});
1444+
1445+
(transport as any).fetchRunStream = async function fetchRunStream() {
1446+
throw new Error("stream subscribe root cause");
1447+
};
1448+
1449+
await expect(
1450+
transport.sendMessages({
1451+
trigger: "submit-message",
1452+
chatId: "chat-stream-subscribe-cleanup-both-failure",
1453+
messageId: undefined,
1454+
messages: [],
1455+
abortSignal: undefined,
1456+
})
1457+
).rejects.toThrowError("stream subscribe root cause");
1458+
1459+
expect(errors).toHaveLength(1);
1460+
expect(errors[0]).toMatchObject({
1461+
phase: "streamSubscribe",
1462+
chatId: "chat-stream-subscribe-cleanup-both-failure",
1463+
runId: "run_stream_subscribe_cleanup_both_failure",
1464+
});
1465+
expect(runStore.setCalls).toContain("chat-stream-subscribe-cleanup-both-failure");
1466+
expect(runStore.deleteCalls).toContain("chat-stream-subscribe-cleanup-both-failure");
1467+
});
1468+
14161469
it(
14171470
"preserves stream subscribe root failures when cleanup and onError callbacks both fail",
14181471
async function () {
@@ -2988,6 +3041,31 @@ class FailingCleanupDeleteRunStore extends InMemoryTriggerChatRunStore {
29883041
}
29893042
}
29903043

3044+
class FailingCleanupSetAndDeleteRunStore extends InMemoryTriggerChatRunStore {
3045+
private setCallCount = 0;
3046+
public readonly setCalls: string[] = [];
3047+
public readonly deleteCalls: string[] = [];
3048+
3049+
constructor(private readonly failOnSetCall: number = 2) {
3050+
super();
3051+
}
3052+
3053+
public set(state: TriggerChatRunState): void {
3054+
this.setCallCount += 1;
3055+
this.setCalls.push(state.chatId);
3056+
if (this.setCallCount === this.failOnSetCall) {
3057+
throw new Error("cleanup set failed");
3058+
}
3059+
3060+
super.set(state);
3061+
}
3062+
3063+
public delete(chatId: string): void {
3064+
this.deleteCalls.push(chatId);
3065+
throw new Error("cleanup delete failed");
3066+
}
3067+
}
3068+
29913069
class AsyncTrackedRunStore implements TriggerChatRunStore {
29923070
private readonly runs = new Map<string, TriggerChatRunState>();
29933071
public readonly getCalls: string[] = [];

0 commit comments

Comments
 (0)