diff --git a/src/data/nav/aitransport.ts b/src/data/nav/aitransport.ts index bf68251537..3c8b19cdb9 100644 --- a/src/data/nav/aitransport.ts +++ b/src/data/nav/aitransport.ts @@ -13,6 +13,27 @@ export default { link: '/docs/ai-transport', index: true, }, + { + name: 'Getting started', + pages: [ + { + name: 'Anthropic', + link: '/docs/ai-transport/getting-started/anthropic', + }, + { + name: 'OpenAI', + link: '/docs/ai-transport/getting-started/openai', + }, + { + name: 'Vercel AI SDK', + link: '/docs/ai-transport/getting-started/vercel-ai-sdk', + }, + { + name: 'LangGraph', + link: '/docs/ai-transport/getting-started/langgraph', + }, + ], + }, { name: 'Token streaming', pages: [ diff --git a/src/pages/docs/ai-transport/getting-started/anthropic.mdx b/src/pages/docs/ai-transport/getting-started/anthropic.mdx new file mode 100644 index 0000000000..f54b9583f7 --- /dev/null +++ b/src/pages/docs/ai-transport/getting-started/anthropic.mdx @@ -0,0 +1,624 @@ +--- +title: "Getting started with Anthropic" +meta_description: "Build a realtime AI agent with Anthropic Claude that streams tokens over Ably, handles tool calls with human-in-the-loop approval, and authenticates users with verified identities." +meta_keywords: "AI Transport Anthropic, Ably AI agent, token streaming Claude, realtime AI, LLM streaming, AI agent tutorial, human in the loop, HITL, tool calls, Anthropic Messages API" +redirect_from: + - /docs/ai-transport/getting-started + - /docs/ai-transport/getting-started/javascript +--- + +This guide will get you started with Ably AI Transport using Anthropic's Messages API. + +You'll learn how to authenticate users with verified identities, stream tokens from an agent to clients in realtime, and implement human-in-the-loop approval for tool calls. The agent uses Anthropic's Claude model with a `send_email` tool that requires user approval before execution. + +## Prerequisites + +1. [Sign up](https://ably.com/signup) for an Ably account. + +2. Create a [new app](https://ably.com/accounts/any/apps/new), and create your first API key in the **API Keys** tab of the dashboard. + +3. Your API key will need the `publish`, `subscribe`, and `message-update-own` capabilities. + +4. Enable message appends for the channel: + 1. Go to the **Settings** tab of your app in the dashboard. + 2. Under **Rules**, click **Add new rule**. + 3. Enter `ai` as the channel namespace. + 4. Check **Message annotations, updates, deletes, and appends**. + 5. Click **Create channel rule** to save. + +5. Install any current LTS version of [Node.js](https://nodejs.org/en). + +6. Get an [Anthropic API key](https://console.anthropic.com/settings/keys). + +## Step 1: Project setup + +Create a new directory for your project and initialize it: + + +```shell +mkdir ai-agent-demo && cd ai-agent-demo +npm init -y && npm pkg set type=module +``` + + +Install the required dependencies: + + +```shell +npm install ably @anthropic-ai/sdk jsonwebtoken express +npm install -D typescript @types/node @types/express @types/jsonwebtoken +``` + + +Create a TypeScript configuration file: + + +```shell +npx tsc --init +``` + + +Create a `.env` file in your project root and add your API keys: + + +```shell +echo "ABLY_API_KEY={{API_KEY}}" > .env +echo "ANTHROPIC_API_KEY=your_anthropic_api_key" >> .env +``` + + +## Step 2: Authenticate users + +Users authenticate with Ably using [token authentication](/docs/auth/token). Your server generates signed JWTs that establish a verified identity for each user. Agents can trust this identity because only your server can issue valid tokens. + +Create a file called `auth-server.ts` with an endpoint that generates signed JWTs: + + +```typescript +import express from 'express'; +import jwt from 'jsonwebtoken'; + +const app = express(); + +const apiKey = process.env.ABLY_API_KEY; +if (!apiKey) { + throw new Error('ABLY_API_KEY environment variable is required'); +} + +const [keyName, keySecret] = apiKey.split(':'); +if (!keyName || !keySecret) { + throw new Error('ABLY_API_KEY must be in format "keyName:keySecret"'); +} + +app.get('/api/auth/token', (req, res) => { + // In production, authenticate the user and get their ID from your session + const userId = 'user-123'; + + const token = jwt.sign({ + 'x-ably-clientId': userId, + 'ably.channel.*': 'user' + }, keySecret, { + algorithm: 'HS256', + keyid: keyName, + expiresIn: '1h' + }); + + res.type('application/jwt').send(token); +}); + +app.listen(3001, () => { + console.log('Auth server running on http://localhost:3001'); +}); +``` + + + + +The JWT includes two claims: +- `x-ably-clientId`: Establishes a verified identity that appears on all messages the user publishes. +- `ably.channel.*`: Assigns a role that agents can use to distinguish users from other agents on the channel. + + + +## Step 3: Create the agent + +The agent runs in a trusted server environment and uses [API key authentication](/docs/auth#basic-authentication). It subscribes to a channel to receive user prompts, processes them with Anthropic's Claude model, and streams responses back using the [message-per-response](/docs/ai-transport/token-streaming/message-per-response) pattern. When Claude requests a tool call, the agent pauses to request human approval before executing. + +Create a file called `agent.ts` with the setup, tool definition, and human-in-the-loop helpers: + + +```typescript +import * as Ably from 'ably'; +import Anthropic from '@anthropic-ai/sdk'; + +const apiKey = process.env.ABLY_API_KEY; +if (!apiKey) { + throw new Error('ABLY_API_KEY environment variable is required'); +} + +const anthropic = new Anthropic(); + +const realtime = new Ably.Realtime({ + key: apiKey, + clientId: 'ai-agent', + echoMessages: false, +}); + +const channel = realtime.channels.get('ai:conversation'); + +// Define a tool that requires human approval +const tools: Anthropic.Tool[] = [ + { + name: 'send_email', + description: 'Send an email to a recipient. Always requires human approval.', + input_schema: { + type: 'object' as const, + properties: { + to: { type: 'string', description: 'Recipient email address' }, + subject: { type: 'string', description: 'Email subject line' }, + body: { type: 'string', description: 'Email body content' }, + }, + required: ['to', 'subject', 'body'], + }, + }, +]; + +// Track pending approval requests +const pendingApprovals = new Map void>(); + +// Listen for approval responses from users +await channel.subscribe('approval-response', (message: Ably.Message) => { + const toolCallId = message.extras?.headers?.toolCallId; + const resolve = pendingApprovals.get(toolCallId); + if (resolve) { + pendingApprovals.delete(toolCallId); + resolve(message.data.decision); + } +}); + +// Request human approval for a tool call via the channel +function requestApproval( + toolCallId: string, + toolName: string, + toolInput: Record, +): Promise { + return new Promise((resolve) => { + pendingApprovals.set(toolCallId, resolve); + channel.publish({ + name: 'approval-request', + data: { name: toolName, arguments: toolInput }, + extras: { headers: { toolCallId } }, + }); + console.log(`Awaiting approval for ${toolName} (${toolCallId})`); + }); +} + +// Execute a tool after approval +function executeTool(name: string, input: Record) { + if (name === 'send_email') { + console.log(`Sending email to ${input.to}: ${input.subject}`); + return { success: true, message: `Email sent to ${input.to}` }; + } + return { error: `Unknown tool: ${name}` }; +} +``` + + +The agent publishes `approval-request` messages to the channel when a tool call is detected, then waits for a matching `approval-response` correlated by `toolCallId`. The `executeTool` function simulates the email action. In production, replace this with actual email delivery logic. + + + +Add the streaming function to `agent.ts`. This streams Anthropic response tokens to Ably using `channel.appendMessage()`, while tracking any tool call the model requests: + + +```typescript +// Stream Anthropic response tokens to Ably, returning tool call info if any +async function streamToAbly( + messages: Anthropic.MessageParam[], + serial: string, + includeTools: boolean, +) { + const stream = await anthropic.messages.create({ + model: 'claude-sonnet-4-5', + max_tokens: 1024, + messages, + ...(includeTools ? { tools } : {}), + stream: true, + }); + + let textBlockIndex: number | null = null; + let currentToolUse: { id: string; name: string; index: number } | null = null; + let toolInput = ''; + let stopReason: string | null = null; + const assistantContent: Anthropic.ContentBlockParam[] = []; + let accumulatedText = ''; + + for await (const event of stream) { + switch (event.type) { + case 'content_block_start': + if (event.content_block.type === 'text') { + textBlockIndex = event.index; + accumulatedText = ''; + } else if (event.content_block.type === 'tool_use') { + currentToolUse = { + id: event.content_block.id, + name: event.content_block.name, + index: event.index, + }; + toolInput = ''; + } + break; + + case 'content_block_delta': + if (event.delta.type === 'text_delta' && event.index === textBlockIndex) { + channel.appendMessage({ serial, data: event.delta.text }); + accumulatedText += event.delta.text; + } else if (event.delta.type === 'input_json_delta') { + toolInput += event.delta.partial_json; + } + break; + + case 'content_block_stop': + if (event.index === textBlockIndex && accumulatedText) { + assistantContent.push({ type: 'text', text: accumulatedText }); + textBlockIndex = null; + } + if (currentToolUse && event.index === currentToolUse.index) { + assistantContent.push({ + type: 'tool_use', + id: currentToolUse.id, + name: currentToolUse.name, + input: JSON.parse(toolInput), + }); + currentToolUse = null; + } + break; + + case 'message_delta': + stopReason = event.delta.stop_reason ?? null; + break; + } + } + + return { stopReason, assistantContent }; +} +``` + + +The function filters for `text_delta` events and appends each token to the Ably message. It also tracks `tool_use` content blocks and accumulates their JSON input. The `stopReason` indicates whether the model finished normally (`end_turn`) or wants to call a tool (`tool_use`). + +Add the prompt handler to the end of `agent.ts`. This ties everything together, streaming the initial response and handling tool calls with HITL approval: + + +```typescript +// Handle incoming user prompts +await channel.subscribe('user-input', async (message: Ably.Message) => { + const { promptId, text } = message.data as { promptId: string; text: string }; + const userId = message.clientId; + const role = message.extras?.userClaim; + + console.log(`Received prompt from ${userId} (role: ${role}): ${text}`); + + if (role !== 'user') { + console.log('Ignoring message from non-user'); + return; + } + + // Create the initial Ably message for streaming + const response = await channel.publish({ + name: 'agent-response', + data: '', + extras: { headers: { promptId } }, + }); + + const serial = response.serials[0]; + if (!serial) { + console.error('No serial returned from publish'); + return; + } + + // Stream the response from Anthropic + const messages: Anthropic.MessageParam[] = [{ role: 'user', content: text }]; + const { stopReason, assistantContent } = await streamToAbly(messages, serial, true); + + // Handle tool call with human-in-the-loop approval + if (stopReason === 'tool_use') { + const toolCall = assistantContent.find( + (c): c is Anthropic.ToolUseBlockParam => c.type === 'tool_use', + ); + + if (toolCall) { + const decision = await requestApproval( + toolCall.id, + toolCall.name, + toolCall.input as Record, + ); + + let toolResult: Record; + if (decision === 'approved') { + toolResult = executeTool(toolCall.name, toolCall.input as Record); + } else { + toolResult = { error: 'The user rejected this action' }; + } + + // Continue the conversation with the tool result + const followUpMessages: Anthropic.MessageParam[] = [ + ...messages, + { role: 'assistant', content: assistantContent }, + { + role: 'user', + content: [ + { + type: 'tool_result', + tool_use_id: toolCall.id, + content: JSON.stringify(toolResult), + }, + ], + }, + ]; + + // Stream the follow-up response, appending to the same message + channel.appendMessage({ serial, data: '\n\n' }); + await streamToAbly(followUpMessages, serial, false); + } + } + + // Signal completion + await channel.publish({ + name: 'agent-response-complete', + extras: { headers: { promptId } }, + }); + + console.log(`Completed response for prompt ${promptId}`); +}); + +console.log('Agent is listening for prompts...'); +``` + + +The prompt handler: +1. Verifies the sender has the `user` role. +2. Creates an initial Ably message and captures its `serial` for appending. +3. Streams the Anthropic response, appending text tokens in realtime. +4. If the model requests a tool call, publishes an `approval-request` and waits for the user's decision. +5. After approval, executes the tool and streams a follow-up response appended to the same message. + +## Step 4: Create the client + +The client uses an [`authCallback`](/docs/auth/token#auth-callback) to obtain a signed JWT from your auth server. The `clientId` from the token is automatically attached to all messages the client publishes. + +Create a file called `client.ts` with the connection setup and token streaming subscription: + + +```typescript +import * as Ably from 'ably'; +import crypto from 'crypto'; +import * as readline from 'readline'; + +const realtime = new Ably.Realtime({ + authCallback: async ( + _tokenParams: Ably.TokenParams, + callback: (error: Ably.ErrorInfo | string | null, token: Ably.TokenDetails | Ably.TokenRequest | string | null) => void + ) => { + try { + const response = await fetch('http://localhost:3001/api/auth/token'); + const token = await response.text(); + callback(null, token); + } catch (error) { + callback(error instanceof Error ? error.message : String(error), null); + } + } +}); + +realtime.connection.on('connected', () => { + console.log('Connected to Ably as', realtime.auth.clientId); +}); + +const channel = realtime.channels.get('ai:conversation'); +const pendingPrompts = new Map void>(); + +const rl = readline.createInterface({ + input: process.stdin, + output: process.stdout, +}); + +// Subscribe to streamed agent responses +await channel.subscribe('agent-response', (message: Ably.Message) => { + const promptId = message.extras?.headers?.promptId; + if (!promptId) return; + + switch (message.action) { + case 'message.create': + break; + case 'message.append': + // Write each new token as it arrives + process.stdout.write(message.data || ''); + break; + case 'message.update': + // Full response after reconnection + console.log(message.data || ''); + break; + } +}); +``` + + +The client subscribes to `agent-response` messages and handles different [message actions](/docs/ai-transport/token-streaming/message-per-response): +- `message.create`: A new response has started. +- `message.append`: A token has been appended. Each token is written directly to the terminal as it arrives. +- `message.update`: The full response content, received after reconnection. + +Add the human-in-the-loop approval handler to `client.ts`. When the agent requests approval for a tool call, the client displays the details and prompts the user: + + +```typescript +// Subscribe to approval requests for human-in-the-loop +await channel.subscribe('approval-request', async (message: Ably.Message) => { + const { name, arguments: args } = message.data; + const toolCallId = message.extras?.headers?.toolCallId; + + console.log(`\n\nAgent wants to execute: ${name}`); + console.log(`Arguments: ${JSON.stringify(args, null, 2)}`); + + const answer = await new Promise((resolve) => { + rl.question('Approve? (yes/no): ', resolve); + }); + + const decision = answer.toLowerCase() === 'yes' ? 'approved' : 'rejected'; + + await channel.publish({ + name: 'approval-response', + data: { decision }, + extras: { headers: { toolCallId } }, + }); + + console.log(`Decision sent: ${decision}\n`); +}); +``` + + +## Step 5: Send user prompts + +Each prompt includes a unique `promptId` to correlate responses. The user's `clientId` is automatically attached to the message by Ably. + +Add the following to the end of `client.ts`: + + +```typescript +// Subscribe to completion signals +await channel.subscribe('agent-response-complete', (message: Ably.Message) => { + const promptId = message.extras?.headers?.promptId; + if (!promptId) return; + + console.log('\n'); + const resolve = pendingPrompts.get(promptId); + if (resolve) { + pendingPrompts.delete(promptId); + resolve(); + } +}); + +async function sendPrompt(text: string): Promise { + const promptId = crypto.randomUUID(); + + const completionPromise = new Promise((resolve) => { + pendingPrompts.set(promptId, resolve); + }); + + await channel.publish('user-input', { + promptId, + text, + }); + + await completionPromise; +} + +function askQuestion() { + rl.question('Enter a prompt (or "quit" to exit): ', async (text) => { + if (text.toLowerCase() === 'quit') { + rl.close(); + realtime.close(); + return; + } + + await sendPrompt(text); + askQuestion(); + }); +} + +askQuestion(); +``` + + +## Step 6: Run the example + +Open three terminal windows to run the auth server, agent, and client. + +Terminal 1: Start the auth server + + +```shell +npx tsx --env-file=.env auth-server.ts +``` + + +You should see: + + +```text +Auth server running on http://localhost:3001 +``` + + +Terminal 2: Start the agent + + +```shell +npx tsx --env-file=.env agent.ts +``` + + +You should see: + + +```text +Agent is listening for prompts... +``` + + +Terminal 3: Run the client + + +```shell +npx tsx --env-file=.env client.ts +``` + + +Try entering different prompts. For a regular response without tool calls: + + +```text +Enter a prompt (or "quit" to exit): What is the capital of France? + +The capital of France is Paris. + +Enter a prompt (or "quit" to exit): +``` + + +For a response that triggers a tool call with human-in-the-loop approval: + + +```text +Enter a prompt (or "quit" to exit): Send an email to alice@example.com saying hello + +Agent wants to execute: send_email +Arguments: { + "to": "alice@example.com", + "subject": "Hello", + "body": "Hello Alice!" +} +Approve? (yes/no): yes +Decision sent: approved + +I've sent the email to alice@example.com with the subject "Hello". + +Enter a prompt (or "quit" to exit): +``` + + +## Next steps + +Continue exploring AI Transport features: + +* Learn about [token streaming patterns](/docs/ai-transport/token-streaming) including [message-per-response](/docs/ai-transport/token-streaming/message-per-response) and [message-per-token](/docs/ai-transport/token-streaming/message-per-token). +* Understand [user input](/docs/ai-transport/messaging/accepting-user-input) patterns for handling prompts and correlating responses. +* Explore [identifying users and agents](/docs/ai-transport/sessions-identity/identifying-users-and-agents) for more advanced authentication scenarios. +* Implement more advanced [human-in-the-loop](/docs/ai-transport/messaging/human-in-the-loop) workflows with role-based authorization. +* Stream [tool call](/docs/ai-transport/messaging/tool-calls) information to build generative UI experiences. diff --git a/src/pages/docs/ai-transport/getting-started/langgraph.mdx b/src/pages/docs/ai-transport/getting-started/langgraph.mdx new file mode 100644 index 0000000000..b15d278955 --- /dev/null +++ b/src/pages/docs/ai-transport/getting-started/langgraph.mdx @@ -0,0 +1,603 @@ +--- +title: "Getting started with LangGraph" +meta_description: "Build a realtime AI agent with LangGraph that streams tokens over Ably, handles tool calls with human-in-the-loop approval, and authenticates users with verified identities." +meta_keywords: "AI Transport LangGraph, Ably AI agent, token streaming LangChain, realtime AI, LLM streaming, AI agent tutorial, human in the loop, HITL, tool calls, LangGraph streaming" +--- + +This guide will get you started with Ably AI Transport using LangGraph. + +You'll learn how to authenticate users with verified identities, stream tokens from an agent to clients in realtime, and implement human-in-the-loop approval for tool calls. The agent uses LangGraph with a `send_email` tool that requires user approval before execution. + +## Prerequisites + +1. [Sign up](https://ably.com/signup) for an Ably account. + +2. Create a [new app](https://ably.com/accounts/any/apps/new), and create your first API key in the **API Keys** tab of the dashboard. + +3. Your API key will need the `publish`, `subscribe`, and `message-update-own` capabilities. + +4. Enable message appends for the channel: + 1. Go to the **Settings** tab of your app in the dashboard. + 2. Under **Rules**, click **Add new rule**. + 3. Enter `ai` as the channel namespace. + 4. Check **Message annotations, updates, deletes, and appends**. + 5. Click **Create channel rule** to save. + +5. Install any current LTS version of [Node.js](https://nodejs.org/en). + +6. Get an [Anthropic API key](https://console.anthropic.com/settings/keys). + + + +## Step 1: Project setup + +Create a new directory for your project and initialize it: + + +```shell +mkdir ai-agent-demo && cd ai-agent-demo +npm init -y && npm pkg set type=module +``` + + +Install the required dependencies: + + +```shell +npm install ably @langchain/langgraph@^0.2 @langchain/anthropic@^0.3 @langchain/core@^0.3 zod jsonwebtoken express +npm install -D typescript @types/node @types/express @types/jsonwebtoken +``` + + +Create a TypeScript configuration file: + + +```shell +npx tsc --init +``` + + +Create a `.env` file in your project root and add your API keys: + + +```shell +echo "ABLY_API_KEY={{API_KEY}}" > .env +echo "ANTHROPIC_API_KEY=your_anthropic_api_key" >> .env +``` + + +## Step 2: Authenticate users + +Users authenticate with Ably using [token authentication](/docs/auth/token). Your server generates signed JWTs that establish a verified identity for each user. Agents can trust this identity because only your server can issue valid tokens. + +Create a file called `auth-server.ts` with an endpoint that generates signed JWTs: + + +```typescript +import express from 'express'; +import jwt from 'jsonwebtoken'; + +const app = express(); + +const apiKey = process.env.ABLY_API_KEY; +if (!apiKey) { + throw new Error('ABLY_API_KEY environment variable is required'); +} + +const [keyName, keySecret] = apiKey.split(':'); +if (!keyName || !keySecret) { + throw new Error('ABLY_API_KEY must be in format "keyName:keySecret"'); +} + +app.get('/api/auth/token', (req, res) => { + // In production, authenticate the user and get their ID from your session + const userId = 'user-123'; + + const token = jwt.sign({ + 'x-ably-clientId': userId, + 'ably.channel.*': 'user' + }, keySecret, { + algorithm: 'HS256', + keyid: keyName, + expiresIn: '1h' + }); + + res.type('application/jwt').send(token); +}); + +app.listen(3001, () => { + console.log('Auth server running on http://localhost:3001'); +}); +``` + + + + +The JWT includes two claims: +- `x-ably-clientId`: Establishes a verified identity that appears on all messages the user publishes. +- `ably.channel.*`: Assigns a role that agents can use to distinguish users from other agents on the channel. + + + +## Step 3: Create the agent + +The agent runs in a trusted server environment and uses [API key authentication](/docs/auth#basic-authentication). It subscribes to a channel to receive user prompts, processes them with a LangGraph graph that uses Claude, and streams responses back using the [message-per-response](/docs/ai-transport/token-streaming/message-per-response) pattern. When the model requests a tool call, the agent pauses to request human approval before executing. + +Create a file called `agent.ts` with the setup, tool definition, and human-in-the-loop helpers: + + +```typescript +import * as Ably from 'ably'; +import { ChatAnthropic } from '@langchain/anthropic'; +import { tool } from '@langchain/core/tools'; +import { StateGraph, Annotation, START, END } from '@langchain/langgraph'; +import { AIMessage } from '@langchain/core/messages'; +import { z } from 'zod'; + +const apiKey = process.env.ABLY_API_KEY; +if (!apiKey) { + throw new Error('ABLY_API_KEY environment variable is required'); +} + +const realtime = new Ably.Realtime({ + key: apiKey, + clientId: 'ai-agent', + echoMessages: false, +}); + +const channel = realtime.channels.get('ai:conversation'); + +// Define a tool that requires human approval +const sendEmail = tool( + async ({ to, subject, body }) => { + console.log(`Sending email to ${to}: ${subject}`); + return JSON.stringify({ success: true, message: `Email sent to ${to}` }); + }, + { + name: 'send_email', + description: 'Send an email to a recipient. Always requires human approval.', + schema: z.object({ + to: z.string().describe('Recipient email address'), + subject: z.string().describe('Email subject line'), + body: z.string().describe('Email body content'), + }), + }, +); + +// Initialize the model with tools +const model = new ChatAnthropic({ model: 'claude-sonnet-4-5' }); +const modelWithTools = model.bindTools([sendEmail]); + +// Track pending approval requests +const pendingApprovals = new Map void>(); + +// Listen for approval responses from users +await channel.subscribe('approval-response', (message: Ably.Message) => { + const toolCallId = message.extras?.headers?.toolCallId; + const resolve = pendingApprovals.get(toolCallId); + if (resolve) { + pendingApprovals.delete(toolCallId); + resolve(message.data.decision); + } +}); + +// Request human approval for a tool call via the channel +function requestApproval( + toolCallId: string, + toolName: string, + toolInput: Record, +): Promise { + return new Promise((resolve) => { + pendingApprovals.set(toolCallId, resolve); + channel.publish({ + name: 'approval-request', + data: { name: toolName, arguments: toolInput }, + extras: { headers: { toolCallId } }, + }); + console.log(`Awaiting approval for ${toolName} (${toolCallId})`); + }); +} +``` + + +The agent publishes `approval-request` messages to the channel when a tool call is detected, then waits for a matching `approval-response` correlated by `toolCallId`. The `sendEmail` tool simulates the email action. In production, replace this with actual email delivery logic. + + + +Add the streaming function to `agent.ts`. This streams LangGraph response tokens to Ably using `channel.appendMessage()`, while tracking any tool call the model requests: + + +```typescript +// Stream LangGraph response tokens to Ably +async function streamToAbly( + messages: any[], + serial: string, + useTools: boolean, +) { + const StateAnnotation = Annotation.Root({ + messages: Annotation({ + reducer: (x, y) => x.concat(y), + default: () => [], + }), + }); + + const graph = new StateGraph(StateAnnotation) + .addNode('agent', async (state) => { + const response = await (useTools ? modelWithTools : model).invoke(state.messages); + return { messages: [response] }; + }) + .addEdge(START, 'agent') + .addEdge('agent', END); + + const app = graph.compile(); + + const stream = await app.stream( + { messages }, + { streamMode: 'messages' }, + ); + + let toolCallId: string | undefined; + let toolCallName: string | undefined; + let toolCallArgsStr = ''; + + for await (const [messageChunk, metadata] of stream) { + const content = messageChunk?.content; + + // Stream text tokens to Ably + // Content may be a string or an array of content blocks (Anthropic format) + if (typeof content === 'string' && content) { + channel.appendMessage({ serial, data: content }); + } else if (Array.isArray(content)) { + for (const block of content) { + if (block.type === 'text' && block.text) { + channel.appendMessage({ serial, data: block.text }); + } + } + } + + // Accumulate tool call info from streaming chunks + for (const chunk of messageChunk?.tool_call_chunks ?? []) { + if (chunk.id) toolCallId = chunk.id; + if (chunk.name) toolCallName = chunk.name; + if (chunk.args) toolCallArgsStr += chunk.args; + } + } + + const toolCallDetected = toolCallId && toolCallName + ? { id: toolCallId, name: toolCallName, args: toolCallArgsStr ? JSON.parse(toolCallArgsStr) : {} } + : null; + + return { hasToolCall: !!toolCallDetected, toolCallInfo: toolCallDetected }; +} +``` + + +The function creates a LangGraph state graph and streams message chunks. It appends text content to the Ably message, handling both string and array content blocks (Anthropic streams content as arrays of content blocks). Tool call arguments arrive incrementally via `tool_call_chunks` and are accumulated as a string, then parsed as JSON after the stream completes. + +Add the prompt handler to the end of `agent.ts`. This ties everything together, streaming the initial response and handling tool calls with HITL approval: + + +```typescript +// Handle incoming user prompts +await channel.subscribe('user-input', async (message: Ably.Message) => { + const { promptId, text } = message.data as { promptId: string; text: string }; + const userId = message.clientId; + const role = message.extras?.userClaim; + + console.log(`Received prompt from ${userId} (role: ${role}): ${text}`); + + if (role !== 'user') { + console.log('Ignoring message from non-user'); + return; + } + + // Create the initial Ably message for streaming + const response = await channel.publish({ + name: 'agent-response', + data: '', + extras: { headers: { promptId } }, + }); + + const serial = response.serials[0]; + if (!serial) { + console.error('No serial returned from publish'); + return; + } + + // Stream the response from LangGraph + const messages = [{ role: 'user', content: text }]; + const { hasToolCall, toolCallInfo } = await streamToAbly(messages, serial, true); + + // Handle tool call with human-in-the-loop approval + if (hasToolCall && toolCallInfo) { + const decision = await requestApproval( + toolCallInfo.id, + toolCallInfo.name, + toolCallInfo.args, + ); + + let toolResult: string; + if (decision === 'approved') { + const result = await sendEmail.invoke(toolCallInfo.args); + toolResult = result; + } else { + toolResult = JSON.stringify({ error: 'The user rejected this action' }); + } + + // Stream follow-up response with the tool result + channel.appendMessage({ serial, data: '\n\n' }); + + const followUpMessages = [ + { role: 'user', content: text }, + new AIMessage({ + content: '', + tool_calls: [{ + id: toolCallInfo.id, + name: toolCallInfo.name, + args: toolCallInfo.args, + }], + }), + { role: 'tool', content: toolResult, tool_call_id: toolCallInfo.id }, + ]; + + await streamToAbly(followUpMessages, serial, false); + } + + // Signal completion + await channel.publish({ + name: 'agent-response-complete', + extras: { headers: { promptId } }, + }); + + console.log(`Completed response for prompt ${promptId}`); +}); + +console.log('Agent is listening for prompts...'); +``` + + +The prompt handler: +1. Verifies the sender has the `user` role. +2. Creates an initial Ably message and captures its `serial` for appending. +3. Streams the LangGraph response, appending text tokens in realtime. +4. If the model requests a tool call, publishes an `approval-request` and waits for the user's decision. +5. After approval, executes the tool and streams a follow-up response appended to the same message. + +## Step 4: Create the client + +The client uses an [`authCallback`](/docs/auth/token#auth-callback) to obtain a signed JWT from your auth server. The `clientId` from the token is automatically attached to all messages the client publishes. + +Create a file called `client.ts` with the connection setup and token streaming subscription: + + +```typescript +import * as Ably from 'ably'; +import crypto from 'crypto'; +import * as readline from 'readline'; + +const realtime = new Ably.Realtime({ + authCallback: async ( + _tokenParams: Ably.TokenParams, + callback: (error: Ably.ErrorInfo | string | null, token: Ably.TokenDetails | Ably.TokenRequest | string | null) => void + ) => { + try { + const response = await fetch('http://localhost:3001/api/auth/token'); + const token = await response.text(); + callback(null, token); + } catch (error) { + callback(error instanceof Error ? error.message : String(error), null); + } + } +}); + +realtime.connection.on('connected', () => { + console.log('Connected to Ably as', realtime.auth.clientId); +}); + +const channel = realtime.channels.get('ai:conversation'); +const pendingPrompts = new Map void>(); + +const rl = readline.createInterface({ + input: process.stdin, + output: process.stdout, +}); + +// Subscribe to streamed agent responses +await channel.subscribe('agent-response', (message: Ably.Message) => { + switch (message.action) { + case 'message.create': + break; + case 'message.append': + // Write each new token as it arrives + process.stdout.write(message.data || ''); + break; + case 'message.update': + // Full response after reconnection + console.log(message.data || ''); + break; + } +}); +``` + + +The client subscribes to `agent-response` messages and handles different [message actions](/docs/ai-transport/token-streaming/message-per-response): +- `message.create`: A new response has started. +- `message.append`: A token has been appended. Each token is written directly to the terminal as it arrives. +- `message.update`: The full response content, received after reconnection. + +Add the human-in-the-loop approval handler to `client.ts`. When the agent requests approval for a tool call, the client displays the details and prompts the user: + + +```typescript +// Subscribe to approval requests for human-in-the-loop +await channel.subscribe('approval-request', async (message: Ably.Message) => { + const { name, arguments: args } = message.data; + const toolCallId = message.extras?.headers?.toolCallId; + + console.log(`\n\nAgent wants to execute: ${name}`); + console.log(`Arguments: ${JSON.stringify(args, null, 2)}`); + + const answer = await new Promise((resolve) => { + rl.question('Approve? (yes/no): ', resolve); + }); + + const decision = answer.toLowerCase() === 'yes' ? 'approved' : 'rejected'; + + await channel.publish({ + name: 'approval-response', + data: { decision }, + extras: { headers: { toolCallId } }, + }); + + console.log(`Decision sent: ${decision}\n`); +}); +``` + + +## Step 5: Send user prompts + +Each prompt includes a unique `promptId` to correlate responses. The user's `clientId` is automatically attached to the message by Ably. + +Add the following to the end of `client.ts`: + + +```typescript +// Subscribe to completion signals +await channel.subscribe('agent-response-complete', (message: Ably.Message) => { + const promptId = message.extras?.headers?.promptId; + if (!promptId) return; + + console.log('\n'); + const resolve = pendingPrompts.get(promptId); + if (resolve) { + pendingPrompts.delete(promptId); + resolve(); + } +}); + +async function sendPrompt(text: string): Promise { + const promptId = crypto.randomUUID(); + + const completionPromise = new Promise((resolve) => { + pendingPrompts.set(promptId, resolve); + }); + + await channel.publish('user-input', { + promptId, + text, + }); + + await completionPromise; +} + +function askQuestion() { + rl.question('Enter a prompt (or "quit" to exit): ', async (text) => { + if (text.toLowerCase() === 'quit') { + rl.close(); + realtime.close(); + return; + } + + await sendPrompt(text); + askQuestion(); + }); +} + +askQuestion(); +``` + + +## Step 6: Run the example + +Open three terminal windows to run the auth server, agent, and client. + +Terminal 1: Start the auth server + + +```shell +npx tsx --env-file=.env auth-server.ts +``` + + +You should see: + + +```text +Auth server running on http://localhost:3001 +``` + + +Terminal 2: Start the agent + + +```shell +npx tsx --env-file=.env agent.ts +``` + + +You should see: + + +```text +Agent is listening for prompts... +``` + + +Terminal 3: Run the client + + +```shell +npx tsx --env-file=.env client.ts +``` + + +Try entering different prompts. For a regular response without tool calls: + + +```text +Enter a prompt (or "quit" to exit): What is the capital of France? + +The capital of France is Paris. + +Enter a prompt (or "quit" to exit): +``` + + +For a response that triggers a tool call with human-in-the-loop approval: + + +```text +Enter a prompt (or "quit" to exit): Send an email to alice@example.com saying hello + +Agent wants to execute: send_email +Arguments: { + "to": "alice@example.com", + "subject": "Hello", + "body": "Hello Alice!" +} +Approve? (yes/no): yes +Decision sent: approved + +I've sent the email to alice@example.com with the subject "Hello". + +Enter a prompt (or "quit" to exit): +``` + + +## Next steps + +Continue exploring AI Transport features: + +* Learn about [token streaming patterns](/docs/ai-transport/token-streaming) including [message-per-response](/docs/ai-transport/token-streaming/message-per-response) and [message-per-token](/docs/ai-transport/token-streaming/message-per-token). +* Understand [user input](/docs/ai-transport/messaging/accepting-user-input) patterns for handling prompts and correlating responses. +* Explore [identifying users and agents](/docs/ai-transport/sessions-identity/identifying-users-and-agents) for more advanced authentication scenarios. +* Implement more advanced [human-in-the-loop](/docs/ai-transport/messaging/human-in-the-loop) workflows with role-based authorization. +* Stream [tool call](/docs/ai-transport/messaging/tool-calls) information to build generative UI experiences. diff --git a/src/pages/docs/ai-transport/getting-started/openai.mdx b/src/pages/docs/ai-transport/getting-started/openai.mdx new file mode 100644 index 0000000000..b216a10797 --- /dev/null +++ b/src/pages/docs/ai-transport/getting-started/openai.mdx @@ -0,0 +1,606 @@ +--- +title: "Getting started with OpenAI" +meta_description: "Build a realtime AI agent with OpenAI that streams tokens over Ably, handles tool calls with human-in-the-loop approval, and authenticates users with verified identities." +meta_keywords: "AI Transport OpenAI, Ably AI agent, token streaming GPT, realtime AI, LLM streaming, AI agent tutorial, human in the loop, HITL, tool calls, OpenAI Responses API" +--- + +This guide will get you started with Ably AI Transport using OpenAI's Responses API. + +You'll learn how to authenticate users with verified identities, stream tokens from an agent to clients in realtime, and implement human-in-the-loop approval for tool calls. The agent uses OpenAI's GPT model with a `send_email` tool that requires user approval before execution. + +## Prerequisites + +1. [Sign up](https://ably.com/signup) for an Ably account. + +2. Create a [new app](https://ably.com/accounts/any/apps/new), and create your first API key in the **API Keys** tab of the dashboard. + +3. Your API key will need the `publish`, `subscribe`, and `message-update-own` capabilities. + +4. Enable message appends for the channel: + 1. Go to the **Settings** tab of your app in the dashboard. + 2. Under **Rules**, click **Add new rule**. + 3. Enter `ai` as the channel namespace. + 4. Check **Message annotations, updates, deletes, and appends**. + 5. Click **Create channel rule** to save. + +5. Install any current LTS version of [Node.js](https://nodejs.org/en). + +6. Get an [OpenAI API key](https://platform.openai.com/api-keys). + +## Step 1: Project setup + +Create a new directory for your project and initialize it: + + +```shell +mkdir ai-agent-demo && cd ai-agent-demo +npm init -y && npm pkg set type=module +``` + + +Install the required dependencies: + + +```shell +npm install ably openai jsonwebtoken express +npm install -D typescript @types/node @types/express @types/jsonwebtoken +``` + + +Create a TypeScript configuration file: + + +```shell +npx tsc --init +``` + + +Create a `.env` file in your project root and add your API keys: + + +```shell +echo "ABLY_API_KEY={{API_KEY}}" > .env +echo "OPENAI_API_KEY=your_openai_api_key" >> .env +``` + + +## Step 2: Authenticate users + +Users authenticate with Ably using [token authentication](/docs/auth/token). Your server generates signed JWTs that establish a verified identity for each user. Agents can trust this identity because only your server can issue valid tokens. + +Create a file called `auth-server.ts` with an endpoint that generates signed JWTs: + + +```typescript +import express from 'express'; +import jwt from 'jsonwebtoken'; + +const app = express(); + +const apiKey = process.env.ABLY_API_KEY; +if (!apiKey) { + throw new Error('ABLY_API_KEY environment variable is required'); +} + +const [keyName, keySecret] = apiKey.split(':'); +if (!keyName || !keySecret) { + throw new Error('ABLY_API_KEY must be in format "keyName:keySecret"'); +} + +app.get('/api/auth/token', (req, res) => { + // In production, authenticate the user and get their ID from your session + const userId = 'user-123'; + + const token = jwt.sign({ + 'x-ably-clientId': userId, + 'ably.channel.*': 'user' + }, keySecret, { + algorithm: 'HS256', + keyid: keyName, + expiresIn: '1h' + }); + + res.type('application/jwt').send(token); +}); + +app.listen(3001, () => { + console.log('Auth server running on http://localhost:3001'); +}); +``` + + + + +The JWT includes two claims: +- `x-ably-clientId`: Establishes a verified identity that appears on all messages the user publishes. +- `ably.channel.*`: Assigns a role that agents can use to distinguish users from other agents on the channel. + + + +## Step 3: Create the agent + +The agent runs in a trusted server environment and uses [API key authentication](/docs/auth#basic-authentication). It subscribes to a channel to receive user prompts, processes them with OpenAI's Responses API, and streams responses back using the [message-per-response](/docs/ai-transport/token-streaming/message-per-response) pattern. When the model requests a tool call, the agent pauses to request human approval before executing. + +Create a file called `agent.ts` with the setup, tool definition, and human-in-the-loop helpers: + + +```typescript +import * as Ably from 'ably'; +import OpenAI from 'openai'; + +const apiKey = process.env.ABLY_API_KEY; +if (!apiKey) { + throw new Error('ABLY_API_KEY environment variable is required'); +} + +const openai = new OpenAI(); + +const realtime = new Ably.Realtime({ + key: apiKey, + clientId: 'ai-agent', + echoMessages: false, +}); + +const channel = realtime.channels.get('ai:conversation'); + +// Define a tool that requires human approval +const tools: OpenAI.Responses.Tool[] = [ + { + type: 'function', + name: 'send_email', + description: 'Send an email to a recipient. Always requires human approval.', + parameters: { + type: 'object', + properties: { + to: { type: 'string', description: 'Recipient email address' }, + subject: { type: 'string', description: 'Email subject line' }, + body: { type: 'string', description: 'Email body content' }, + }, + required: ['to', 'subject', 'body'], + }, + }, +]; + +// Track pending approval requests +const pendingApprovals = new Map void>(); + +// Listen for approval responses from users +await channel.subscribe('approval-response', (message: Ably.Message) => { + const toolCallId = message.extras?.headers?.toolCallId; + const resolve = pendingApprovals.get(toolCallId); + if (resolve) { + pendingApprovals.delete(toolCallId); + resolve(message.data.decision); + } +}); + +// Request human approval for a tool call via the channel +function requestApproval( + toolCallId: string, + toolName: string, + toolInput: Record, +): Promise { + return new Promise((resolve) => { + pendingApprovals.set(toolCallId, resolve); + channel.publish({ + name: 'approval-request', + data: { name: toolName, arguments: toolInput }, + extras: { headers: { toolCallId } }, + }); + console.log(`Awaiting approval for ${toolName} (${toolCallId})`); + }); +} + +// Execute a tool after approval +function executeTool(name: string, input: Record) { + if (name === 'send_email') { + console.log(`Sending email to ${input.to}: ${input.subject}`); + return { success: true, message: `Email sent to ${input.to}` }; + } + return { error: `Unknown tool: ${name}` }; +} +``` + + +The agent publishes `approval-request` messages to the channel when a tool call is detected, then waits for a matching `approval-response` correlated by `toolCallId`. The `executeTool` function simulates the email action. In production, replace this with actual email delivery logic. + + + +Add the streaming function to `agent.ts`. This streams OpenAI response tokens to Ably using `channel.appendMessage()`, while tracking any tool call the model requests: + + +```typescript +// Stream OpenAI response tokens to Ably, returning tool call info if any +async function streamToAbly( + input: OpenAI.Responses.ResponseInput, + serial: string, +) { + const stream = await openai.responses.create({ + model: 'gpt-4o', + input, + tools, + stream: true, + }); + + let messageItemId: string | null = null; + let functionCallItem: { id: string; callId: string; name: string } | null = null; + let functionArgs = ''; + let hasToolCall = false; + + for await (const event of stream) { + switch (event.type) { + case 'response.output_item.added': + if (event.item.type === 'message') { + messageItemId = event.item.id; + } else if (event.item.type === 'function_call') { + functionCallItem = { + id: event.item.id, + callId: event.item.call_id, + name: event.item.name, + }; + functionArgs = ''; + hasToolCall = true; + } + break; + + case 'response.output_text.delta': + if (event.item_id === messageItemId) { + channel.appendMessage({ serial, data: event.delta }); + } + break; + + case 'response.function_call_arguments.delta': + functionArgs += event.delta; + break; + + case 'response.completed': + break; + } + } + + return { + hasToolCall, + functionCallItem, + functionArgs, + }; +} +``` + + +The function filters for `response.output_text.delta` events and appends each token to the Ably message. It also tracks `function_call` output items and accumulates their JSON arguments. The `hasToolCall` flag indicates whether the model wants to call a tool. + +Add the prompt handler to the end of `agent.ts`. This ties everything together, streaming the initial response and handling tool calls with HITL approval: + + +```typescript +// Handle incoming user prompts +await channel.subscribe('user-input', async (message: Ably.Message) => { + const { promptId, text } = message.data as { promptId: string; text: string }; + const userId = message.clientId; + const role = message.extras?.userClaim; + + console.log(`Received prompt from ${userId} (role: ${role}): ${text}`); + + if (role !== 'user') { + console.log('Ignoring message from non-user'); + return; + } + + // Create the initial Ably message for streaming + const response = await channel.publish({ + name: 'agent-response', + data: '', + extras: { headers: { promptId } }, + }); + + const serial = response.serials[0]; + if (!serial) { + console.error('No serial returned from publish'); + return; + } + + // Stream the response from OpenAI + const input: OpenAI.Responses.ResponseInput = [ + { role: 'user', content: text }, + ]; + + const { hasToolCall, functionCallItem, functionArgs } = await streamToAbly(input, serial); + + // Handle tool call with human-in-the-loop approval + if (hasToolCall && functionCallItem) { + const parsedArgs = JSON.parse(functionArgs); + + const decision = await requestApproval( + functionCallItem.callId, + functionCallItem.name, + parsedArgs, + ); + + let toolResult: Record; + if (decision === 'approved') { + toolResult = executeTool(functionCallItem.name, parsedArgs); + } else { + toolResult = { error: 'The user rejected this action' }; + } + + // Continue the conversation with the tool result + const followUpInput: OpenAI.Responses.ResponseInput = [ + { role: 'user', content: text }, + { + type: 'function_call', + id: functionCallItem.id, + call_id: functionCallItem.callId, + name: functionCallItem.name, + arguments: functionArgs, + }, + { + type: 'function_call_output', + call_id: functionCallItem.callId, + output: JSON.stringify(toolResult), + }, + ]; + + // Stream the follow-up response, appending to the same message + channel.appendMessage({ serial, data: '\n\n' }); + await streamToAbly(followUpInput, serial); + } + + // Signal completion + await channel.publish({ + name: 'agent-response-complete', + extras: { headers: { promptId } }, + }); + + console.log(`Completed response for prompt ${promptId}`); +}); + +console.log('Agent is listening for prompts...'); +``` + + +The prompt handler: +1. Verifies the sender has the `user` role. +2. Creates an initial Ably message and captures its `serial` for appending. +3. Streams the OpenAI response, appending text tokens in realtime. +4. If the model requests a tool call, publishes an `approval-request` and waits for the user's decision. +5. After approval, executes the tool and streams a follow-up response appended to the same message. + +## Step 4: Create the client + +The client uses an [`authCallback`](/docs/auth/token#auth-callback) to obtain a signed JWT from your auth server. The `clientId` from the token is automatically attached to all messages the client publishes. + +Create a file called `client.ts` with the connection setup and token streaming subscription: + + +```typescript +import * as Ably from 'ably'; +import crypto from 'crypto'; +import * as readline from 'readline'; + +const realtime = new Ably.Realtime({ + authCallback: async ( + _tokenParams: Ably.TokenParams, + callback: (error: Ably.ErrorInfo | string | null, token: Ably.TokenDetails | Ably.TokenRequest | string | null) => void + ) => { + try { + const response = await fetch('http://localhost:3001/api/auth/token'); + const token = await response.text(); + callback(null, token); + } catch (error) { + callback(error instanceof Error ? error.message : String(error), null); + } + } +}); + +realtime.connection.on('connected', () => { + console.log('Connected to Ably as', realtime.auth.clientId); +}); + +const channel = realtime.channels.get('ai:conversation'); +const pendingPrompts = new Map void>(); + +const rl = readline.createInterface({ + input: process.stdin, + output: process.stdout, +}); + +// Subscribe to streamed agent responses +await channel.subscribe('agent-response', (message: Ably.Message) => { + const promptId = message.extras?.headers?.promptId; + if (!promptId) return; + + switch (message.action) { + case 'message.create': + break; + case 'message.append': + // Write each new token as it arrives + process.stdout.write(message.data || ''); + break; + case 'message.update': + // Full response after reconnection + console.log(message.data || ''); + break; + } +}); +``` + + +The client subscribes to `agent-response` messages and handles different [message actions](/docs/ai-transport/token-streaming/message-per-response): +- `message.create`: A new response has started. +- `message.append`: A token has been appended. Each token is written directly to the terminal as it arrives. +- `message.update`: The full response content, received after reconnection. + +Add the human-in-the-loop approval handler to `client.ts`. When the agent requests approval for a tool call, the client displays the details and prompts the user: + + +```typescript +// Subscribe to approval requests for human-in-the-loop +await channel.subscribe('approval-request', async (message: Ably.Message) => { + const { name, arguments: args } = message.data; + const toolCallId = message.extras?.headers?.toolCallId; + + console.log(`\n\nAgent wants to execute: ${name}`); + console.log(`Arguments: ${JSON.stringify(args, null, 2)}`); + + const answer = await new Promise((resolve) => { + rl.question('Approve? (yes/no): ', resolve); + }); + + const decision = answer.toLowerCase() === 'yes' ? 'approved' : 'rejected'; + + await channel.publish({ + name: 'approval-response', + data: { decision }, + extras: { headers: { toolCallId } }, + }); + + console.log(`Decision sent: ${decision}\n`); +}); +``` + + +## Step 5: Send user prompts + +Each prompt includes a unique `promptId` to correlate responses. The user's `clientId` is automatically attached to the message by Ably. + +Add the following to the end of `client.ts`: + + +```typescript +// Subscribe to completion signals +await channel.subscribe('agent-response-complete', (message: Ably.Message) => { + const promptId = message.extras?.headers?.promptId; + if (!promptId) return; + + console.log('\n'); + const resolve = pendingPrompts.get(promptId); + if (resolve) { + pendingPrompts.delete(promptId); + resolve(); + } +}); + +async function sendPrompt(text: string): Promise { + const promptId = crypto.randomUUID(); + + const completionPromise = new Promise((resolve) => { + pendingPrompts.set(promptId, resolve); + }); + + await channel.publish('user-input', { + promptId, + text, + }); + + await completionPromise; +} + +function askQuestion() { + rl.question('Enter a prompt (or "quit" to exit): ', async (text) => { + if (text.toLowerCase() === 'quit') { + rl.close(); + realtime.close(); + return; + } + + await sendPrompt(text); + askQuestion(); + }); +} + +askQuestion(); +``` + + +## Step 6: Run the example + +Open three terminal windows to run the auth server, agent, and client. + +Terminal 1: Start the auth server + + +```shell +npx tsx --env-file=.env auth-server.ts +``` + + +You should see: + + +```text +Auth server running on http://localhost:3001 +``` + + +Terminal 2: Start the agent + + +```shell +npx tsx --env-file=.env agent.ts +``` + + +You should see: + + +```text +Agent is listening for prompts... +``` + + +Terminal 3: Run the client + + +```shell +npx tsx --env-file=.env client.ts +``` + + +Try entering different prompts. For a regular response without tool calls: + + +```text +Enter a prompt (or "quit" to exit): What is the capital of France? + +The capital of France is Paris. + +Enter a prompt (or "quit" to exit): +``` + + +For a response that triggers a tool call with human-in-the-loop approval: + + +```text +Enter a prompt (or "quit" to exit): Send an email to alice@example.com saying hello + +Agent wants to execute: send_email +Arguments: { + "to": "alice@example.com", + "subject": "Hello", + "body": "Hello Alice!" +} +Approve? (yes/no): yes +Decision sent: approved + +I've sent the email to alice@example.com with the subject "Hello". + +Enter a prompt (or "quit" to exit): +``` + + +## Next steps + +Continue exploring AI Transport features: + +* Learn about [token streaming patterns](/docs/ai-transport/token-streaming) including [message-per-response](/docs/ai-transport/token-streaming/message-per-response) and [message-per-token](/docs/ai-transport/token-streaming/message-per-token). +* Understand [user input](/docs/ai-transport/messaging/accepting-user-input) patterns for handling prompts and correlating responses. +* Explore [identifying users and agents](/docs/ai-transport/sessions-identity/identifying-users-and-agents) for more advanced authentication scenarios. +* Implement more advanced [human-in-the-loop](/docs/ai-transport/messaging/human-in-the-loop) workflows with role-based authorization. +* Stream [tool call](/docs/ai-transport/messaging/tool-calls) information to build generative UI experiences. diff --git a/src/pages/docs/ai-transport/getting-started/vercel-ai-sdk.mdx b/src/pages/docs/ai-transport/getting-started/vercel-ai-sdk.mdx new file mode 100644 index 0000000000..7e41d920ce --- /dev/null +++ b/src/pages/docs/ai-transport/getting-started/vercel-ai-sdk.mdx @@ -0,0 +1,583 @@ +--- +title: "Getting started with Vercel AI SDK" +meta_description: "Build a realtime AI agent with the Vercel AI SDK that streams tokens over Ably, handles tool calls with human-in-the-loop approval, and authenticates users with verified identities." +meta_keywords: "AI Transport Vercel AI SDK, Ably AI agent, token streaming, realtime AI, LLM streaming, AI agent tutorial, human in the loop, HITL, tool calls, streamText" +--- + +This guide will get you started with Ably AI Transport using the Vercel AI SDK. + +You'll learn how to authenticate users with verified identities, stream tokens from an agent to clients in realtime, and implement human-in-the-loop approval for tool calls. The agent uses the Vercel AI SDK with a `send_email` tool that requires user approval before execution. + +## Prerequisites + +1. [Sign up](https://ably.com/signup) for an Ably account. + +2. Create a [new app](https://ably.com/accounts/any/apps/new), and create your first API key in the **API Keys** tab of the dashboard. + +3. Your API key will need the `publish`, `subscribe`, and `message-update-own` capabilities. + +4. Enable message appends for the channel: + 1. Go to the **Settings** tab of your app in the dashboard. + 2. Under **Rules**, click **Add new rule**. + 3. Enter `ai` as the channel namespace. + 4. Check **Message annotations, updates, deletes, and appends**. + 5. Click **Create channel rule** to save. + +5. Install any current LTS version of [Node.js](https://nodejs.org/en). + +6. Get an API key for your chosen model provider. This guide uses OpenAI via the [Vercel AI Gateway](https://vercel.com/docs/ai-gateway), but you can use any [supported provider](https://ai-sdk.dev/providers/ai-sdk-providers). + +## Step 1: Project setup + +Create a new directory for your project and initialize it: + + +```shell +mkdir ai-agent-demo && cd ai-agent-demo +npm init -y && npm pkg set type=module +``` + + +Install the required dependencies: + + +```shell +npm install ably ai@^6 zod jsonwebtoken express +npm install -D typescript @types/node @types/express @types/jsonwebtoken +``` + + +Create a TypeScript configuration file: + + +```shell +npx tsc --init +``` + + +Create a `.env` file in your project root and add your API keys: + + +```shell +echo "ABLY_API_KEY={{API_KEY}}" > .env +echo "AI_GATEWAY_API_KEY=your_ai_gateway_api_key" >> .env +``` + + + + +## Step 2: Authenticate users + +Users authenticate with Ably using [token authentication](/docs/auth/token). Your server generates signed JWTs that establish a verified identity for each user. Agents can trust this identity because only your server can issue valid tokens. + +Create a file called `auth-server.ts` with an endpoint that generates signed JWTs: + + +```typescript +import express from 'express'; +import jwt from 'jsonwebtoken'; + +const app = express(); + +const apiKey = process.env.ABLY_API_KEY; +if (!apiKey) { + throw new Error('ABLY_API_KEY environment variable is required'); +} + +const [keyName, keySecret] = apiKey.split(':'); +if (!keyName || !keySecret) { + throw new Error('ABLY_API_KEY must be in format "keyName:keySecret"'); +} + +app.get('/api/auth/token', (req, res) => { + // In production, authenticate the user and get their ID from your session + const userId = 'user-123'; + + const token = jwt.sign({ + 'x-ably-clientId': userId, + 'ably.channel.*': 'user' + }, keySecret, { + algorithm: 'HS256', + keyid: keyName, + expiresIn: '1h' + }); + + res.type('application/jwt').send(token); +}); + +app.listen(3001, () => { + console.log('Auth server running on http://localhost:3001'); +}); +``` + + + + +The JWT includes two claims: +- `x-ably-clientId`: Establishes a verified identity that appears on all messages the user publishes. +- `ably.channel.*`: Assigns a role that agents can use to distinguish users from other agents on the channel. + + + +## Step 3: Create the agent + +The agent runs in a trusted server environment and uses [API key authentication](/docs/auth#basic-authentication). It subscribes to a channel to receive user prompts, processes them with the Vercel AI SDK's `streamText`, and streams responses back using the [message-per-response](/docs/ai-transport/token-streaming/message-per-response) pattern. When the model requests a tool call, the agent pauses to request human approval before executing. + +Create a file called `agent.ts` with the setup, tool definition, and human-in-the-loop helpers: + + +```typescript +import * as Ably from 'ably'; +import { streamText, tool } from 'ai'; +import { z } from 'zod'; + +const apiKey = process.env.ABLY_API_KEY; +if (!apiKey) { + throw new Error('ABLY_API_KEY environment variable is required'); +} + +const realtime = new Ably.Realtime({ + key: apiKey, + clientId: 'ai-agent', + echoMessages: false, +}); + +const channel = realtime.channels.get('ai:conversation'); + +// Define a tool that requires human approval +const sendEmailTool = tool({ + description: 'Send an email to a recipient. Always requires human approval.', + inputSchema: z.object({ + to: z.string().describe('Recipient email address'), + subject: z.string().describe('Email subject line'), + body: z.string().describe('Email body content'), + }), +}); + +// Track pending approval requests +const pendingApprovals = new Map void>(); + +// Listen for approval responses from users +await channel.subscribe('approval-response', (message: Ably.Message) => { + const toolCallId = message.extras?.headers?.toolCallId; + const resolve = pendingApprovals.get(toolCallId); + if (resolve) { + pendingApprovals.delete(toolCallId); + resolve(message.data.decision); + } +}); + +// Request human approval for a tool call via the channel +function requestApproval( + toolCallId: string, + toolName: string, + toolInput: Record, +): Promise { + return new Promise((resolve) => { + pendingApprovals.set(toolCallId, resolve); + channel.publish({ + name: 'approval-request', + data: { name: toolName, arguments: toolInput }, + extras: { headers: { toolCallId } }, + }); + console.log(`Awaiting approval for ${toolName} (${toolCallId})`); + }); +} + +// Execute a tool after approval +function executeTool(name: string, input: Record) { + if (name === 'send_email') { + console.log(`Sending email to ${input.to}: ${input.subject}`); + return { success: true, message: `Email sent to ${input.to}` }; + } + return { error: `Unknown tool: ${name}` }; +} +``` + + +The agent publishes `approval-request` messages to the channel when a tool call is detected, then waits for a matching `approval-response` correlated by `toolCallId`. The `executeTool` function simulates the email action. In production, replace this with actual email delivery logic. + + + +Add the streaming function to `agent.ts`. This streams response tokens to Ably using `channel.appendMessage()`, while tracking any tool call the model requests: + + +```typescript +// Stream AI response tokens to Ably, returning tool call info if any +async function streamToAbly( + options: { prompt: string } | { messages: any[] }, + serial: string, +) { + const result = streamText({ + model: 'openai/gpt-4o', + tools: { send_email: sendEmailTool }, + ...options, + }); + + let toolCallDetected: { toolCallId: string; toolName: string; args: Record } | null = null; + let lastAppend: Promise | undefined; + + for await (const event of result.fullStream) { + switch (event.type) { + case 'text-delta': + lastAppend = channel.appendMessage({ serial, data: event.text }); + break; + + case 'tool-call': + toolCallDetected = { + toolCallId: event.toolCallId, + toolName: event.toolName, + args: event.input as Record, + }; + break; + } + } + + // Ensure the last appended token is delivered before signaling completion + await lastAppend; + + return { toolCallDetected }; +} +``` + + +The function iterates over `fullStream` events from `streamText`. It appends each `text-delta` token to the Ably message using `appendMessage` and captures `tool-call` events. The `toolCallDetected` object is returned so the prompt handler can process tool calls with HITL approval. + +Add the prompt handler to the end of `agent.ts`. This ties everything together, streaming the initial response and handling tool calls with HITL approval: + + +```typescript +// Handle incoming user prompts +await channel.subscribe('user-input', async (message: Ably.Message) => { + const { promptId, text } = message.data as { promptId: string; text: string }; + const userId = message.clientId; + const role = message.extras?.userClaim; + + console.log(`Received prompt from ${userId} (role: ${role}): ${text}`); + + if (role !== 'user') { + console.log('Ignoring message from non-user'); + return; + } + + // Create the initial Ably message for streaming + const response = await channel.publish({ + name: 'agent-response', + data: '', + extras: { headers: { promptId } }, + }); + + const serial = response.serials[0]; + if (!serial) { + console.error('No serial returned from publish'); + return; + } + + // Stream the response + const { toolCallDetected } = await streamToAbly({ prompt: text }, serial); + + // Handle tool call with human-in-the-loop approval + if (toolCallDetected) { + const decision = await requestApproval( + toolCallDetected.toolCallId, + toolCallDetected.toolName, + toolCallDetected.args, + ); + + let toolResult: { type: string; value?: unknown; reason?: string }; + if (decision === 'approved') { + toolResult = { type: 'json', value: executeTool(toolCallDetected.toolName, toolCallDetected.args) }; + } else { + toolResult = { type: 'execution-denied', reason: 'The user rejected this action' }; + } + + // Stream follow-up response with the tool result + channel.appendMessage({ serial, data: '\n\n' }); + + await streamToAbly({ + messages: [ + { role: 'user', content: text }, + { + role: 'assistant', + content: [ + { + type: 'tool-call', + toolCallId: toolCallDetected.toolCallId, + toolName: toolCallDetected.toolName, + input: toolCallDetected.args, + }, + ], + }, + { + role: 'tool', + content: [ + { + type: 'tool-result', + toolCallId: toolCallDetected.toolCallId, + toolName: toolCallDetected.toolName, + output: toolResult, + }, + ], + }, + ], + }, serial); + } + + // Signal completion + await channel.publish({ + name: 'agent-response-complete', + extras: { headers: { promptId } }, + }); + + console.log(`Completed response for prompt ${promptId}`); +}); + +console.log('Agent is listening for prompts...'); +``` + + +The prompt handler: +1. Verifies the sender has the `user` role. +2. Creates an initial Ably message and captures its `serial` for appending. +3. Streams the response, appending text tokens in realtime. +4. If the model requests a tool call, publishes an `approval-request` and waits for the user's decision. +5. After approval, executes the tool and streams a follow-up response appended to the same message. + +## Step 4: Create the client + +The client uses an [`authCallback`](/docs/auth/token#auth-callback) to obtain a signed JWT from your auth server. The `clientId` from the token is automatically attached to all messages the client publishes. + +Create a file called `client.ts` with the connection setup and token streaming subscription: + + +```typescript +import * as Ably from 'ably'; +import crypto from 'crypto'; +import * as readline from 'readline'; + +const realtime = new Ably.Realtime({ + authCallback: async ( + _tokenParams: Ably.TokenParams, + callback: (error: Ably.ErrorInfo | string | null, token: Ably.TokenDetails | Ably.TokenRequest | string | null) => void + ) => { + try { + const response = await fetch('http://localhost:3001/api/auth/token'); + const token = await response.text(); + callback(null, token); + } catch (error) { + callback(error instanceof Error ? error.message : String(error), null); + } + } +}); + +realtime.connection.on('connected', () => { + console.log('Connected to Ably as', realtime.auth.clientId); +}); + +const channel = realtime.channels.get('ai:conversation'); +const pendingPrompts = new Map void>(); + +const rl = readline.createInterface({ + input: process.stdin, + output: process.stdout, +}); + +// Subscribe to streamed agent responses +await channel.subscribe('agent-response', (message: Ably.Message) => { + switch (message.action) { + case 'message.create': + break; + case 'message.append': + // Write each new token as it arrives + process.stdout.write(message.data || ''); + break; + case 'message.update': + // Full response after reconnection + console.log(message.data || ''); + break; + } +}); +``` + + +The client subscribes to `agent-response` messages and handles different [message actions](/docs/ai-transport/token-streaming/message-per-response): +- `message.create`: A new response has started. +- `message.append`: A token has been appended. Each token is written directly to the terminal as it arrives. +- `message.update`: The full response content, received after reconnection. + +Add the human-in-the-loop approval handler to `client.ts`. When the agent requests approval for a tool call, the client displays the details and prompts the user: + + +```typescript +// Subscribe to approval requests for human-in-the-loop +await channel.subscribe('approval-request', async (message: Ably.Message) => { + const { name, arguments: args } = message.data; + const toolCallId = message.extras?.headers?.toolCallId; + + console.log(`\n\nAgent wants to execute: ${name}`); + console.log(`Arguments: ${JSON.stringify(args, null, 2)}`); + + const answer = await new Promise((resolve) => { + rl.question('Approve? (yes/no): ', resolve); + }); + + const decision = answer.toLowerCase() === 'yes' ? 'approved' : 'rejected'; + + await channel.publish({ + name: 'approval-response', + data: { decision }, + extras: { headers: { toolCallId } }, + }); + + console.log(`Decision sent: ${decision}\n`); +}); +``` + + +## Step 5: Send user prompts + +Each prompt includes a unique `promptId` to correlate responses. The user's `clientId` is automatically attached to the message by Ably. + +Add the following to the end of `client.ts`: + + +```typescript +// Subscribe to completion signals +await channel.subscribe('agent-response-complete', (message: Ably.Message) => { + const promptId = message.extras?.headers?.promptId; + if (!promptId) return; + + console.log('\n'); + const resolve = pendingPrompts.get(promptId); + if (resolve) { + pendingPrompts.delete(promptId); + resolve(); + } +}); + +async function sendPrompt(text: string): Promise { + const promptId = crypto.randomUUID(); + + const completionPromise = new Promise((resolve) => { + pendingPrompts.set(promptId, resolve); + }); + + await channel.publish('user-input', { + promptId, + text, + }); + + await completionPromise; +} + +function askQuestion() { + rl.question('Enter a prompt (or "quit" to exit): ', async (text) => { + if (text.toLowerCase() === 'quit') { + rl.close(); + realtime.close(); + return; + } + + await sendPrompt(text); + askQuestion(); + }); +} + +askQuestion(); +``` + + +## Step 6: Run the example + +Open three terminal windows to run the auth server, agent, and client. + +Terminal 1: Start the auth server + + +```shell +npx tsx --env-file=.env auth-server.ts +``` + + +You should see: + + +```text +Auth server running on http://localhost:3001 +``` + + +Terminal 2: Start the agent + + +```shell +npx tsx --env-file=.env agent.ts +``` + + +You should see: + + +```text +Agent is listening for prompts... +``` + + +Terminal 3: Run the client + + +```shell +npx tsx --env-file=.env client.ts +``` + + +Try entering different prompts. For a regular response without tool calls: + + +```text +Enter a prompt (or "quit" to exit): What is the capital of France? + +The capital of France is Paris. + +Enter a prompt (or "quit" to exit): +``` + + +For a response that triggers a tool call with human-in-the-loop approval: + + +```text +Enter a prompt (or "quit" to exit): Send an email to alice@example.com saying hello + +Agent wants to execute: send_email +Arguments: { + "to": "alice@example.com", + "subject": "Hello", + "body": "Hello Alice!" +} +Approve? (yes/no): yes +Decision sent: approved + +I've sent the email to alice@example.com with the subject "Hello". + +Enter a prompt (or "quit" to exit): +``` + + +## Next steps + +Continue exploring AI Transport features: + +* Learn about [token streaming patterns](/docs/ai-transport/token-streaming) including [message-per-response](/docs/ai-transport/token-streaming/message-per-response) and [message-per-token](/docs/ai-transport/token-streaming/message-per-token). +* Understand [user input](/docs/ai-transport/messaging/accepting-user-input) patterns for handling prompts and correlating responses. +* Explore [identifying users and agents](/docs/ai-transport/sessions-identity/identifying-users-and-agents) for more advanced authentication scenarios. +* Implement more advanced [human-in-the-loop](/docs/ai-transport/messaging/human-in-the-loop) workflows with role-based authorization. +* Stream [tool call](/docs/ai-transport/messaging/tool-calls) information to build generative UI experiences.