Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
5d243a3
c
VaguelySerious Mar 20, 2026
2241748
feat: support negative startIndex for streaming
VaguelySerious Mar 20, 2026
40ba940
Merge branch 'main' into peter/negative-start-index
VaguelySerious Mar 20, 2026
749e64b
Merge branch 'main' into peter/negative-start
VaguelySerious Mar 20, 2026
45d474d
undo override
VaguelySerious Mar 20, 2026
4a22114
Consolidate outputStream e2e tests into data-driven loop
VaguelySerious Mar 20, 2026
34ef29e
fix
VaguelySerious Mar 20, 2026
a17b11b
Merge branch 'main' into peter/negative-start
VaguelySerious Mar 20, 2026
8b994fc
test
VaguelySerious Mar 20, 2026
23ce02d
Consolidate outputStream e2e tests into data-driven loop
VaguelySerious Mar 20, 2026
2784338
getchunks
VaguelySerious Mar 20, 2026
89939b3
Address PR feedback: docs clarifications and explicit type guards
VaguelySerious Mar 20, 2026
717fe2a
Merge branch 'peter/negative-start' into peter/stream-metadata-and-pages
VaguelySerious Mar 20, 2026
d7042d5
changes
VaguelySerious Mar 20, 2026
1a54db8
c
VaguelySerious Mar 20, 2026
7a2ee25
Update docs/content/docs/ai/resumable-streams.mdx
VaguelySerious Mar 20, 2026
e3c2169
Merge branch 'peter/negative-start' into peter/stream-metadata-and-pages
VaguelySerious Mar 20, 2026
63a3010
Merge branch 'main' into peter/stream-metadata-and-pages
VaguelySerious Mar 20, 2026
d64d41c
Fix edge case
VaguelySerious Mar 20, 2026
ae9f7e1
rename
VaguelySerious Mar 20, 2026
3bd0463
clarify comment
VaguelySerious Mar 20, 2026
374b554
fix postgres build
VaguelySerious Mar 20, 2026
4647450
tests
VaguelySerious Mar 20, 2026
a9c92a8
Add jsdoc to createStreamer (bust turbo cache)
VaguelySerious Mar 21, 2026
996df1a
Fix e2e test: use renamed getStreamChunks method
VaguelySerious Mar 21, 2026
4480092
Merge branch 'main' into peter/stream-metadata-and-pages
VaguelySerious Mar 22, 2026
24dbf1e
Merge branch 'main' into peter/stream-metadata-and-pages
VaguelySerious Mar 23, 2026
324dd79
Merge main
VaguelySerious Mar 24, 2026
91bcda3
Merge branch 'main' into peter/stream-metadata-and-pages
VaguelySerious Mar 24, 2026
89e418a
Merge branch 'main' into peter/stream-metadata-and-pages
VaguelySerious Mar 24, 2026
e3b0c34
Fix stream info edge cases and optimize world implementations
VaguelySerious Mar 24, 2026
a82fbe7
Docs, changeset, and observability gaps
VaguelySerious Mar 24, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .changeset/stream-info-endpoint.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
"@workflow/world": patch
"@workflow/world-local": patch
"@workflow/world-vercel": patch
"@workflow/world-postgres": patch
"@workflow/core": patch
"@workflow/ai": patch
---

Add `getStreamChunks()` and `getStreamInfo()` to the Streamer interface, and `getTailIndex()` to the readable stream returned by `run.getReadable()`. `WorkflowChatTransport` now reads the `x-workflow-stream-tail-index` response header to resolve negative `initialStartIndex` values into absolute positions, fixing reconnection retries after a disconnect.
39 changes: 36 additions & 3 deletions docs/content/docs/ai/resumable-streams.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ Let's add stream resumption to our Flight Booking Agent that we build in the [Bu

Modify your chat endpoint to include the workflow run ID in a response header. The Run ID uniquely identifies the run's stream, so it allows the client to know which stream to reconnect to.

{/* @skip-typecheck: incomplete code sample */}
{/*@skip-typecheck: incomplete code sample*/}

```typescript title="app/api/chat/route.ts" lineNumbers
// ... imports ...

Expand Down Expand Up @@ -76,14 +77,24 @@ export async function GET(

// Instead of starting a new run, we fetch an existing run.
const run = getRun(id); // [!code highlight]
const stream = run.getReadable({ startIndex }); // [!code highlight]
const readable = run.getReadable({ startIndex }); // [!code highlight]

// Provide the stream's tail index so the transport can resolve
// negative startIndex values into absolute positions for retries.
const tailIndex = await readable.getTailIndex(); // [!code highlight]

return createUIMessageStreamResponse({ stream }); // [!code highlight]
return createUIMessageStreamResponse({
stream: readable, // [!code highlight]
headers: { // [!code highlight]
"x-workflow-stream-tail-index": String(tailIndex), // [!code highlight]
}, // [!code highlight]
});
}
```

The `startIndex` parameter ensures the client can choose where to resume the stream from. For instance, if the function times out during streaming, the chat transport will use `startIndex` to resume the stream exactly from the last token it received. Negative values are also supported (e.g. `-5` starts 5 chunks before the end), which is useful for custom stream consumers (such as a dashboard showing recent output) that want to show the most recent output without replaying the full stream.

When using a negative `startIndex`, your stream endpoint must return a `x-workflow-stream-tail-index` header in order for relative resumption to work. Missing the header will fall back to replaying the entire stream.
</Step>

<Step>
Expand Down Expand Up @@ -161,6 +172,28 @@ Now try the flight booking example again. Open it up in a separate tab, or spam

This approach also handles page refreshes, as the client will automatically reconnect to the stream from the last known position when the UI loads with a stored run ID, following the behavior of [AI SDK's stream resumption](https://ai-sdk.dev/docs/ai-sdk-ui/chatbot-resume-streams#chatbot-resume-streams).

### Resuming from the end of the stream

By default, reconnecting replays the entire stream from the beginning (`startIndex: 0`). If you only need to show recent output — for example, when resuming a long conversation after a page refresh — you can set `initialStartIndex` to a negative value to read from the end of the stream instead:

{/*@skip-typecheck: incomplete code sample*/}

```typescript
const { messages, sendMessage } = useChat({
resume: !!activeWorkflowRunId,
transport: new WorkflowChatTransport({
initialStartIndex: -20, // Only fetch the last 20 chunks // [!code highlight]
// ... callbacks as above
}),
});
```

This avoids replaying potentially thousands of chunks and lets the UI render faster. The negative value is resolved server-side, so `-20` on a 500-chunk stream starts at chunk 480.

<Callout>
When using a negative `initialStartIndex`, the reconnection endpoint **must** return the `x-workflow-stream-tail-index` header (as shown in [Step 2](#add-a-stream-reconnection-endpoint) above). The transport uses this header to compute absolute chunk positions so that retries after a disconnect resume from the correct position. If the header is missing, the transport falls back to `startIndex: 0` (replaying the entire stream) and logs a warning.
</Callout>

## Related Documentation

- [`WorkflowChatTransport` API Reference](/docs/api-reference/workflow-ai/workflow-chat-transport) - Full configuration options
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ export default WorkflowChatTransportOptions;`}
- The `onChatSendMessage` callback receives the full response object, allowing you to extract and store the workflow run ID for session resumption
- Stream interruptions are automatically detected when a "finish" chunk is not received in the initial response
- The `maxConsecutiveErrors` option controls how many reconnection attempts are made before giving up (default: 3)
- `initialStartIndex` (constructor option) sets the default chunk position for the **first** reconnection attempt (e.g. after a page refresh). Subsequent retries within the same reconnection loop always resume from the last received chunk. Negative values (e.g. `-20`) read from the end of the stream, which is useful for showing only recent output without replaying the full conversation. `startIndex` (per-call option on `reconnectToStream`) overrides `initialStartIndex` for a single reconnection
- When using a negative `initialStartIndex`, the reconnection endpoint must return the `x-workflow-stream-tail-index` response header (via `readable.getTailIndex()`). The transport reads this header to compute absolute chunk positions for retries. Without it, startIndex is assumed to be 0, replaying the entire stream

## Examples

Expand Down
12 changes: 12 additions & 0 deletions docs/content/docs/api-reference/workflow-api/get-run.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,18 @@ export default Run;`}
showSections={["returns"]}
/>

#### WorkflowReadableStream

`run.getReadable()` returns a `WorkflowReadableStream` — a standard `ReadableStream` extended with a `getTailIndex()` helper:

<TSDoc
definition={`
import type { WorkflowReadableStream } from "workflow/api";
export default WorkflowReadableStream;`}
/>

`getTailIndex()` returns the index of the last known chunk (0-based), or `-1` when no chunks have been written. This is useful when building [reconnection endpoints](/docs/ai/resumable-streams) that need to inform clients where the stream starts.

#### WorkflowReadableStreamOptions

<TSDoc
Expand Down
6 changes: 1 addition & 5 deletions docs/content/docs/api-reference/workflow/meta.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
{
"pages": [
"...",
"fatal-error",
"retryable-error"
]
"pages": ["...", "fatal-error", "retryable-error"]
}
20 changes: 20 additions & 0 deletions docs/content/docs/deploying/building-a-world.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -180,11 +180,31 @@ interface Streamer {
): Promise<ReadableStream<Uint8Array>>;

listStreamsByRunId(runId: string): Promise<string[]>;

/** Paginated snapshot of stream chunks. */
getStreamChunks(
name: string,
runId: string,
options?: { limit?: number; cursor?: string }
): Promise<{
data: { index: number; data: Uint8Array }[];
cursor: string | null;
hasMore: boolean;
done: boolean;
}>;

/** Lightweight metadata: tail index and completion flag. */
getStreamInfo(
name: string,
runId: string
): Promise<{ tailIndex: number; done: boolean }>;
}
```

Streams are identified by a combination of `runId` and `name`. Each workflow run can have multiple named streams.

`getStreamChunks` returns a paginated snapshot of currently available chunks (unlike `readFromStream` which returns a live `ReadableStream` that waits for new chunks). `getStreamInfo` returns the tail index (last chunk index, 0-based, or `-1` when empty) and whether the stream is complete — useful for resolving negative `startIndex` values into absolute positions.

## Reference Implementations

Study these implementations for guidance:
Expand Down
2 changes: 1 addition & 1 deletion packages/ai/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -93,4 +93,4 @@
"@ai-sdk/openai": "^3.0.0",
"@ai-sdk/xai": "^3.0.0"
}
}
}
149 changes: 149 additions & 0 deletions packages/ai/src/workflow-chat-transport.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,155 @@ describe('WorkflowChatTransport', () => {
});
});

describe('negative initialStartIndex', () => {
function makeSSEStream(...events: string[]) {
return new ReadableStream({
start(controller) {
for (const event of events) {
controller.enqueue(new TextEncoder().encode(`data: ${event}\n\n`));
}
controller.close();
},
});
}

it('should resolve absolute chunkIndex from x-workflow-stream-tail-index header', async () => {
const transport = new WorkflowChatTransport({
fetch: mockFetch,
initialStartIndex: -20,
});

// First call: stream with tail-index header, closes immediately (simulating
// a timeout with no data) → triggers retry at the resolved absolute position
// Second call: retry completes with finish
mockFetch
.mockResolvedValueOnce({
ok: true,
headers: new Headers({
'x-workflow-stream-tail-index': '499',
}),
body: makeSSEStream(), // empty — simulates immediate disconnect
})
.mockResolvedValueOnce({
ok: true,
headers: new Headers(),
body: makeSSEStream('{"type":"finish"}'),
});

const stream = await transport.reconnectToStream({
chatId: 'test-chat',
});

const reader = stream!.getReader();
while (!(await reader.read()).done) {}

// First call: negative startIndex
expect(mockFetch).toHaveBeenNthCalledWith(
1,
'/api/chat/test-chat/stream?startIndex=-20',
expect.any(Object)
);
// Second call: resolved absolute position = max(0, 499 + 1 + (-20)) = 480
expect(mockFetch).toHaveBeenNthCalledWith(
2,
'/api/chat/test-chat/stream?startIndex=480',
expect.any(Object)
);
});

it('should fall back to startIndex=0 when header is missing', async () => {
const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {});

const transport = new WorkflowChatTransport({
fetch: mockFetch,
initialStartIndex: -10,
});

// First call: no tail-index header, closes immediately → triggers retry
// Second call: retry should use startIndex=0
mockFetch
.mockResolvedValueOnce({
ok: true,
headers: new Headers(),
body: makeSSEStream(), // empty — simulates immediate disconnect
})
.mockResolvedValueOnce({
ok: true,
headers: new Headers(),
body: makeSSEStream('{"type":"finish"}'),
});

const stream = await transport.reconnectToStream({
chatId: 'test-chat',
});

const reader = stream!.getReader();
while (!(await reader.read()).done) {}

expect(warnSpy).toHaveBeenCalledWith(
expect.stringContaining('Negative initialStartIndex is configured')
);

// First call: negative startIndex
expect(mockFetch).toHaveBeenNthCalledWith(
1,
'/api/chat/test-chat/stream?startIndex=-10',
expect.any(Object)
);
// Second call: falls back to 0
expect(mockFetch).toHaveBeenNthCalledWith(
2,
'/api/chat/test-chat/stream?startIndex=0',
expect.any(Object)
);

warnSpy.mockRestore();
});

it('should fall back to startIndex=0 when header value is not a number', async () => {
const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {});

const transport = new WorkflowChatTransport({
fetch: mockFetch,
initialStartIndex: -5,
});

mockFetch
.mockResolvedValueOnce({
ok: true,
headers: new Headers({
'x-workflow-stream-tail-index': 'not-a-number',
}),
body: makeSSEStream(),
})
.mockResolvedValueOnce({
ok: true,
headers: new Headers(),
body: makeSSEStream('{"type":"finish"}'),
});

const stream = await transport.reconnectToStream({
chatId: 'test-chat',
});

const reader = stream!.getReader();
while (!(await reader.read()).done) {}

expect(warnSpy).toHaveBeenCalledWith(
expect.stringContaining('valid "x-workflow-stream-tail-index"')
);

// Retry falls back to 0
expect(mockFetch).toHaveBeenNthCalledWith(
2,
'/api/chat/test-chat/stream?startIndex=0',
expect.any(Object)
);

warnSpy.mockRestore();
});
});
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Testing gap: malformed header values. The implementation has a !Number.isNaN(tailIndex) guard (transport line 362) but no test covers it. Consider adding a case where the header is "not-a-number"parseInt returns NaN, the guard should skip resolution, and the code should fall through without setting chunkIndex.

Also missing: a test for initialStartIndex: -1000 on a 5-chunk stream (should clamp to 0 via Math.max), and a test where a positive startIndex is passed via reconnectToStream options to override initialStartIndex.


describe('callbacks', () => {
it('should call onChatSendMessage callback', async () => {
const onChatSendMessage = vi.fn();
Expand Down
Loading
Loading