Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

38 changes: 25 additions & 13 deletions tegg/core/agent-runtime/src/AgentRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import type {
AgentStreamMessage,
AgentStore,
} from '@eggjs/tegg-types/agent-runtime';
import { RunStatus, AgentSSEEvent, AgentObjectType, MessageStatus } from '@eggjs/tegg-types/agent-runtime';
import { RunStatus, AgentSSEEvent, AgentObjectType } from '@eggjs/tegg-types/agent-runtime';
import { AgentConflictError } from '@eggjs/tegg-types/agent-runtime';
import type { EggLogger } from 'egg-logger';

Expand Down Expand Up @@ -284,14 +284,13 @@ export class AgentRuntime {
}

// event: thread.message.completed
msgObj.status = MessageStatus.Completed;
msgObj.content = content;
writer.writeEvent(AgentSSEEvent.ThreadMessageCompleted, msgObj);
const completedMsg = MessageConverter.completeMessage(msgObj, content);
writer.writeEvent(AgentSSEEvent.ThreadMessageCompleted, completedMsg);

// Persist and emit completion — append messages before marking run as completed
// so a failure leaves the run in_progress (retryable) instead of completed-but-incomplete.
// TODO(atomicity): add aggregate store method for full transactional guarantee.
const output: MessageObject[] = content.length > 0 ? [msgObj] : [];
const output: MessageObject[] = content.length > 0 ? [completedMsg] : [];
await this.store.appendMessages(threadId, [
...MessageConverter.toInputMessageObjects(input.input.messages, threadId),
...output,
Expand All @@ -301,15 +300,28 @@ export class AgentRuntime {
// event: thread.run.completed
writer.writeEvent(AgentSSEEvent.ThreadRunCompleted, rb.snapshot());
} catch (err: unknown) {
try {
await this.store.updateRun(run.id, rb.fail(err as Error));
} catch (storeErr) {
this.logger.error('[AgentRuntime] failed to update run status after error:', storeErr);
}
if (abortController.signal.aborted) {
// Client disconnected or cancelRun fired — mark as cancelled, not failed
rb.cancelling();
try {
await this.store.updateRun(run.id, rb.cancel());
} catch (storeErr) {
this.logger.error('[AgentRuntime] failed to write cancelled status during stream error:', storeErr);
}
if (!writer.closed) {
writer.writeEvent(AgentSSEEvent.ThreadRunCancelled, rb.snapshot());
}
} else {
try {
await this.store.updateRun(run.id, rb.fail(err as Error));
} catch (storeErr) {
this.logger.error('[AgentRuntime] failed to update run status after error:', storeErr);
}

// event: thread.run.failed
if (!writer.closed) {
writer.writeEvent(AgentSSEEvent.ThreadRunFailed, rb.snapshot());
// event: thread.run.failed
if (!writer.closed) {
writer.writeEvent(AgentSSEEvent.ThreadRunFailed, rb.snapshot());
}
Comment thread
jerryliang64 marked this conversation as resolved.
}
} finally {
resolveTask();
Expand Down
55 changes: 55 additions & 0 deletions tegg/core/agent-runtime/src/HttpSSEWriter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import type { ServerResponse } from 'node:http';

import type { SSEWriter } from './SSEWriter.ts';
Comment thread
jerryliang64 marked this conversation as resolved.

export class HttpSSEWriter implements SSEWriter {
private res: ServerResponse;
private _closed = false;
private closeCallbacks: Array<() => void> = [];
private headersSent = false;
private readonly onResClose: () => void;

constructor(res: ServerResponse) {
this.res = res;
this.onResClose = () => {
this._closed = true;
for (const cb of this.closeCallbacks) cb();
this.closeCallbacks.length = 0;
};
res.on('close', this.onResClose);
}

/** Lazily write headers on first event — avoids sending corrupt headers if constructor throws. */
private ensureHeaders(): void {
if (this.headersSent) return;
this.headersSent = true;
this.res.writeHead(200, {
'content-type': 'text/event-stream',
'cache-control': 'no-cache',
connection: 'keep-alive',
});
}

writeEvent(event: string, data: unknown): void {
if (this._closed) return;
this.ensureHeaders();
this.res.write(`event: ${event}\ndata: ${JSON.stringify(data)}\n\n`);
Comment thread
jerryliang64 marked this conversation as resolved.
Comment thread
jerryliang64 marked this conversation as resolved.
}

get closed(): boolean {
return this._closed;
}

end(): void {
if (!this._closed) {
this._closed = true;
this.res.off('close', this.onResClose);
this.closeCallbacks.length = 0;
this.res.end();
Comment thread
jerryliang64 marked this conversation as resolved.
}
}

onClose(callback: () => void): void {
this.closeCallbacks.push(callback);
}
}
7 changes: 7 additions & 0 deletions tegg/core/agent-runtime/src/MessageConverter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,13 @@ export class MessageConverter {
return { output, usage };
}

/**
* Produce a completed copy of a streaming MessageObject with final content.
*/
static completeMessage(msg: MessageObject, content: MessageContentBlock[]): MessageObject {
return { ...msg, status: MessageStatus.Completed, content };
}

/**
* Create an in-progress MessageObject for streaming (before content is known).
*/
Expand Down
1 change: 1 addition & 0 deletions tegg/core/agent-runtime/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@ export * from './AgentStoreUtils.ts';
export * from './MessageConverter.ts';
export * from './RunBuilder.ts';
export * from './SSEWriter.ts';
export * from './HttpSSEWriter.ts';
export { AgentRuntime, AGENT_RUNTIME } from './AgentRuntime.ts';
export type { AgentExecutor, AgentRuntimeOptions } from './AgentRuntime.ts';
35 changes: 19 additions & 16 deletions tegg/core/agent-runtime/test/AgentRuntime.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,7 @@ import {
MessageStatus,
ContentBlockType,
} from '@eggjs/tegg-types/agent-runtime';
import type {
RunRecord,
CreateRunInput,
AgentStreamMessage,
MessageContentBlock,
} from '@eggjs/tegg-types/agent-runtime';
import type { RunRecord, RunObject, CreateRunInput, AgentStreamMessage } from '@eggjs/tegg-types/agent-runtime';
import { AgentNotFoundError, AgentConflictError } from '@eggjs/tegg-types/agent-runtime';
import { describe, it, beforeEach, afterEach } from 'vitest';

Expand Down Expand Up @@ -183,9 +178,9 @@ describe('test/AgentRuntime.test.ts', () => {
assert(result.threadId.startsWith('thread_'));
assert.equal(result.output!.length, 1);
assert.equal(result.output![0].object, AgentObjectType.ThreadMessage);
assert.equal(result.output![0]['role'], MessageRole.Assistant);
assert.equal(result.output![0]['status'], MessageStatus.Completed);
const content = result.output![0]['content'] as MessageContentBlock[];
assert.equal(result.output![0].role, MessageRole.Assistant);
assert.equal(result.output![0].status, MessageStatus.Completed);
const content = result.output![0].content;
assert.equal(content[0].type, ContentBlockType.Text);
assert.equal(content[0].text.value, 'Hello 1 messages');
assert(Array.isArray(content[0].text.annotations));
Expand Down Expand Up @@ -226,8 +221,8 @@ describe('test/AgentRuntime.test.ts', () => {

const updated = await runtime.getThread(thread.id);
assert.equal(updated.messages.length, 2);
assert.equal(updated.messages[0]['role'], MessageRole.User);
assert.equal(updated.messages[1]['role'], MessageRole.Assistant);
assert.equal(updated.messages[0].role, MessageRole.User);
assert.equal(updated.messages[1].role, MessageRole.Assistant);
});

it('should auto-create thread and append messages when threadId not provided', async () => {
Expand All @@ -239,8 +234,8 @@ describe('test/AgentRuntime.test.ts', () => {

const thread = await runtime.getThread(result.threadId);
assert.equal(thread.messages.length, 2);
assert.equal(thread.messages[0]['role'], MessageRole.User);
assert.equal(thread.messages[1]['role'], MessageRole.Assistant);
assert.equal(thread.messages[0].role, MessageRole.User);
assert.equal(thread.messages[1].role, MessageRole.Assistant);
});

it('should not throw when store.updateRun fails in catch block', async () => {
Expand Down Expand Up @@ -290,7 +285,7 @@ describe('test/AgentRuntime.test.ts', () => {

const run = await store.getRun(result.id);
assert.equal(run.status, RunStatus.Completed);
const outputContent = run.output![0]['content'] as MessageContentBlock[];
const outputContent = run.output![0].content;
assert.equal(outputContent[0].text.value, 'Hello 1 messages');
});

Expand All @@ -304,8 +299,8 @@ describe('test/AgentRuntime.test.ts', () => {

const thread = await store.getThread(result.threadId);
assert.equal(thread.messages.length, 2);
assert.equal(thread.messages[0]['role'], MessageRole.User);
assert.equal(thread.messages[1]['role'], MessageRole.Assistant);
assert.equal(thread.messages[0].role, MessageRole.User);
assert.equal(thread.messages[1].role, MessageRole.Assistant);
});

it('should pass metadata through to store and return it', async () => {
Expand Down Expand Up @@ -352,6 +347,14 @@ describe('test/AgentRuntime.test.ts', () => {
assert(deltaIdx < msgCompletedIdx);
assert(msgCompletedIdx < runCompletedIdx);
assert(runCompletedIdx < doneIdx);

// Verify messages persisted to thread (consistent with syncRun/asyncRun tests)
const runCreatedEvent = writer.events.find((e) => e.event === AgentSSEEvent.ThreadRunCreated);
const threadId = (runCreatedEvent!.data as RunObject).threadId;
const thread = await runtime.getThread(threadId);
assert.equal(thread.messages.length, 2);
assert.equal(thread.messages[0]['role'], MessageRole.User);
assert.equal(thread.messages[1]['role'], MessageRole.Assistant);
});

it('should emit cancelled event on client disconnect', async () => {
Expand Down
139 changes: 139 additions & 0 deletions tegg/core/agent-runtime/test/HttpSSEWriter.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
import assert from 'node:assert';
import { EventEmitter } from 'node:events';

import { describe, it, beforeEach } from 'vitest';

import { HttpSSEWriter } from '../src/HttpSSEWriter.ts';

/**
* Minimal mock of Node.js ServerResponse for testing HttpSSEWriter.
* Captures writeHead/write/end calls and emits 'close' on demand.
*/
class MockServerResponse extends EventEmitter {
writtenHead: { statusCode: number; headers: Record<string, string> } | null = null;
chunks: string[] = [];
ended = false;

writeHead(statusCode: number, headers: Record<string, string>): void {
this.writtenHead = { statusCode, headers };
}

write(chunk: string): boolean {
this.chunks.push(chunk);
return true;
}

end(): void {
this.ended = true;
}
}

describe('test/HttpSSEWriter.test.ts', () => {
let res: MockServerResponse;

beforeEach(() => {
res = new MockServerResponse();
});

it('should delay headers until first writeEvent', () => {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const writer = new HttpSSEWriter(res as any);

// Headers not sent yet after construction
assert.equal(res.writtenHead, null);
assert.equal(res.chunks.length, 0);

writer.writeEvent('test', { foo: 'bar' });

// Now headers should be sent
assert.ok(res.writtenHead);
assert.equal(res.writtenHead.statusCode, 200);
});

it('should use lowercase header keys', () => {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const writer = new HttpSSEWriter(res as any);
writer.writeEvent('ping', {});

assert.ok(res.writtenHead);
assert.equal(res.writtenHead.headers['content-type'], 'text/event-stream');
assert.equal(res.writtenHead.headers['cache-control'], 'no-cache');
assert.equal(res.writtenHead.headers['connection'], 'keep-alive');
});

it('should format SSE events correctly', () => {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const writer = new HttpSSEWriter(res as any);
writer.writeEvent('message', { text: 'hello' });

assert.equal(res.chunks.length, 1);
assert.equal(res.chunks[0], 'event: message\ndata: {"text":"hello"}\n\n');
});

it('should not write after connection closes', () => {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const writer = new HttpSSEWriter(res as any);

// Simulate client disconnect
res.emit('close');

assert.equal(writer.closed, true);
writer.writeEvent('late', { data: 'ignored' });

// No headers sent, no chunks written
assert.equal(res.writtenHead, null);
assert.equal(res.chunks.length, 0);
});

it('should trigger onClose callbacks when connection closes', () => {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const writer = new HttpSSEWriter(res as any);
const calls: number[] = [];

writer.onClose(() => calls.push(1));
writer.onClose(() => calls.push(2));

res.emit('close');

assert.deepStrictEqual(calls, [1, 2]);
});

it('should handle end() idempotently', () => {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const writer = new HttpSSEWriter(res as any);

assert.equal(writer.closed, false);

writer.end();
assert.equal(writer.closed, true);
assert.equal(res.ended, true);

// Reset flag to verify second end() doesn't call res.end() again
res.ended = false;
writer.end();
assert.equal(res.ended, false); // Not called again
});

it('should write multiple events sequentially', () => {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const writer = new HttpSSEWriter(res as any);

writer.writeEvent('event1', { n: 1 });
writer.writeEvent('event2', { n: 2 });
writer.writeEvent('event3', { n: 3 });

assert.equal(res.chunks.length, 3);
assert.equal(res.chunks[0], 'event: event1\ndata: {"n":1}\n\n');
assert.equal(res.chunks[1], 'event: event2\ndata: {"n":2}\n\n');
assert.equal(res.chunks[2], 'event: event3\ndata: {"n":3}\n\n');

// Headers sent only once
assert.ok(res.writtenHead);
});

it('should start with closed=false', () => {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const writer = new HttpSSEWriter(res as any);
assert.equal(writer.closed, false);
});
});
Loading
Loading