diff --git a/packages/appkit-ui/src/react/genie/genie-chat-message-list.tsx b/packages/appkit-ui/src/react/genie/genie-chat-message-list.tsx index e741bc29..89819894 100644 --- a/packages/appkit-ui/src/react/genie/genie-chat-message-list.tsx +++ b/packages/appkit-ui/src/react/genie/genie-chat-message-list.tsx @@ -26,6 +26,8 @@ const STATUS_LABELS: Record = { COMPLETED: "Done", }; +const TERMINAL_STATUSES = new Set(["COMPLETED", "FAILED"]); + function formatStatus(status: string): string { return STATUS_LABELS[status] ?? status.replace(/_/g, " ").toLowerCase(); } @@ -166,11 +168,18 @@ export function GenieChatMessageList({ const showStreamingIndicator = status === "streaming" && lastMessage?.role === "assistant" && - lastMessage.id === ""; + !lastMessage.content && + !TERMINAL_STATUSES.has(lastMessage.status); return ( - -
+ div]:!block", + className, + )} + > +
{hasPreviousPage &&
} {status === "loading-older" && ( @@ -192,7 +201,10 @@ export function GenieChatMessageList({ {messages .filter( - (msg) => msg.role !== "assistant" || msg.id !== "" || msg.content, + (msg) => + msg.role !== "assistant" || + msg.content || + (msg.id !== "" && TERMINAL_STATUSES.has(msg.status)), ) .map((msg) => ( diff --git a/packages/appkit-ui/src/react/genie/genie-chat-message.tsx b/packages/appkit-ui/src/react/genie/genie-chat-message.tsx index ef34b048..7f19b8d8 100644 --- a/packages/appkit-ui/src/react/genie/genie-chat-message.tsx +++ b/packages/appkit-ui/src/react/genie/genie-chat-message.tsx @@ -14,11 +14,12 @@ import type { GenieAttachmentResponse, GenieMessageItem } from "./types"; marked.setOptions({ breaks: true, gfm: true }); const markdownStyles = cn( - "text-sm", + "text-sm break-words", "[&_p]:my-1 [&_ul]:my-1 [&_ol]:my-1 [&_li]:my-0", "[&_pre]:bg-background/50 [&_pre]:p-2 [&_pre]:rounded [&_pre]:text-xs [&_pre]:overflow-x-auto", "[&_code]:text-xs [&_code]:bg-background/50 [&_code]:px-1 [&_code]:rounded", - "[&_table]:text-xs [&_th]:px-2 [&_th]:py-1 [&_td]:px-2 [&_td]:py-1", + "[&_table]:text-xs [&_table]:block [&_table]:overflow-x-auto [&_table]:max-w-full", + "[&_th]:px-2 [&_th]:py-1 [&_td]:px-2 [&_td]:py-1", "[&_table]:border-collapse [&_th]:border [&_td]:border", "[&_th]:border-border [&_td]:border-border", "[&_a]:underline", @@ -66,15 +67,10 @@ export function GenieChatMessage({ -
+
{queryResult != null && ( - + )} diff --git a/packages/appkit-ui/src/react/genie/genie-query-visualization.tsx b/packages/appkit-ui/src/react/genie/genie-query-visualization.tsx index 266a88e4..f0227f79 100644 --- a/packages/appkit-ui/src/react/genie/genie-query-visualization.tsx +++ b/packages/appkit-ui/src/react/genie/genie-query-visualization.tsx @@ -4,6 +4,7 @@ import type { GenieStatementResponse } from "shared"; import { BaseChart } from "../charts/base"; import { ChartErrorBoundary } from "../charts/chart-error-boundary"; import type { ChartType } from "../charts/types"; +import { cn } from "../lib/utils"; import { Button } from "../ui/button"; import { DropdownMenu, @@ -118,11 +119,11 @@ export function GenieQueryVisualization({ ); if (!inference || !activeChartType) { - return
{dataTable}
; + return
{dataTable}
; } return ( - +
Chart @@ -157,7 +158,7 @@ export function GenieQueryVisualization({ )}
-
+
0; - if (!hasAttachments) return [makeUserItem(msg)]; + + if (!hasAttachments && TERMINAL_STATUSES.has(msg.status)) { + return [makeUserItem(msg)]; + } + if (!hasAttachments) { + return [ + makeUserItem(msg, "-user"), + { + id: msg.messageId, + role: "assistant", + content: "", + status: msg.status, + attachments: [], + queryResults: new Map(), + }, + ]; + } return [makeUserItem(msg, "-user"), makeAssistantItem(msg)]; } @@ -202,19 +224,21 @@ export function useGenieChat(options: UseGenieChatOptions): UseGenieChatReturn { const msg = event.message; const hasAttachments = (msg.attachments?.length ?? 0) > 0; - if (hasAttachments) { - // During streaming we already appended the user message locally, - // so only handle assistant results. Messages without attachments - // are the user-message echo from the API — skip those. - const item = makeAssistantItem(msg); - setMessages((prev) => { - const last = prev[prev.length - 1]; - if (last?.role === "assistant" && last.id === "") { - return [...prev.slice(0, -1), item]; - } - return [...prev, item]; - }); - } + const item = makeAssistantItem(msg); + setMessages((prev) => { + const last = prev[prev.length - 1]; + if (!last || last.role !== "assistant") return prev; + + if (last.id === msg.messageId) { + return [...prev.slice(0, -1), item]; + } + + if (last.id === "" && hasAttachments) { + return [...prev.slice(0, -1), item]; + } + + return prev; + }); break; } @@ -362,6 +386,47 @@ export function useGenieChat(options: UseGenieChatOptions): UseGenieChatReturn { [alias, basePath], ); + const pollPendingMessage = useCallback( + ( + convId: string, + messageId: string, + parentAbortController: AbortController, + ) => { + setStatus("streaming"); + + const requestId = crypto.randomUUID(); + const url = + `${basePath}/${encodeURIComponent(alias)}/conversations/${encodeURIComponent(convId)}` + + `/messages/${encodeURIComponent(messageId)}?requestId=${encodeURIComponent(requestId)}`; + + connectSSE({ + url, + signal: parentAbortController.signal, + onMessage: async (message) => { + try { + processStreamEvent(JSON.parse(message.data) as GenieStreamEvent); + } catch { + // Malformed SSE data + } + }, + onError: (err) => { + if (parentAbortController.signal.aborted) return; + setError( + err instanceof Error + ? err.message + : "Failed to poll pending message.", + ); + setStatus("error"); + }, + }).then(() => { + if (!parentAbortController.signal.aborted) { + setStatus((prev) => (prev === "error" ? "error" : "idle")); + } + }); + }, + [alias, basePath, processStreamEvent], + ); + const loadHistory = useCallback( (convId: string) => { paginationAbortRef.current?.abort(); @@ -376,13 +441,21 @@ export function useGenieChat(options: UseGenieChatOptions): UseGenieChatReturn { { errorMessage: "Failed to load conversation history." }, ); promise.then((items) => { - if (!abortController.signal.aborted) { - setMessages(items); + if (abortController.signal.aborted) return; + setMessages(items); + + const lastItem = items[items.length - 1]; + if ( + lastItem?.role === "assistant" && + !TERMINAL_STATUSES.has(lastItem.status) + ) { + pollPendingMessage(convId, lastItem.id, abortController); + } else { setStatus((prev) => (prev === "error" ? "error" : "idle")); } }); }, - [fetchPage], + [fetchPage, pollPendingMessage], ); const fetchPreviousPage = useCallback(() => { diff --git a/packages/appkit/src/connectors/genie/client.ts b/packages/appkit/src/connectors/genie/client.ts index 280c56e4..68a4501e 100644 --- a/packages/appkit/src/connectors/genie/client.ts +++ b/packages/appkit/src/connectors/genie/client.ts @@ -381,6 +381,71 @@ export class GenieConnector { } } + /** + * Polls a single message via `getMessage` until it reaches a terminal + * state (`COMPLETED` or `FAILED`). Yields the same event types as + * `streamSendMessage` so callers can reuse the same SSE processing logic. + */ + async *streamGetMessage( + workspaceClient: WorkspaceClient, + spaceId: string, + conversationId: string, + messageId: string, + options?: { timeout?: number; pollInterval?: number }, + ): AsyncGenerator { + const timeout = options?.timeout ?? this.config.timeout; + const pollInterval = options?.pollInterval ?? 3_000; + const deadline = + timeout > 0 ? Date.now() + timeout : Number.POSITIVE_INFINITY; + let lastStatus = ""; + + try { + while (true) { + const message = await workspaceClient.genie.getMessage({ + space_id: spaceId, + conversation_id: conversationId, + message_id: messageId, + }); + + if (message.status && message.status !== lastStatus) { + lastStatus = message.status; + yield { type: "status", status: message.status }; + } + + const isTerminal = + message.status === "COMPLETED" || message.status === "FAILED"; + if (isTerminal) { + const messageResponse = toMessageResponse(message); + yield { type: "message_result", message: messageResponse }; + yield* this.emitQueryResults( + workspaceClient, + spaceId, + conversationId, + messageId, + messageResponse, + ); + return; + } + + if (Date.now() >= deadline) { + yield { type: "error", error: "Message polling timed out" }; + return; + } + + await new Promise((r) => setTimeout(r, pollInterval)); + } + } catch (error) { + logger.error( + "Genie getMessage poll error (spaceId=%s, conversationId=%s, messageId=%s): %O", + spaceId, + conversationId, + messageId, + error, + ); + yield { type: "error", error: classifyGenieError(error) }; + } + } + async sendMessage( workspaceClient: WorkspaceClient, spaceId: string, diff --git a/packages/appkit/src/plugins/genie/genie.ts b/packages/appkit/src/plugins/genie/genie.ts index 2ca348b4..8fe69d15 100644 --- a/packages/appkit/src/plugins/genie/genie.ts +++ b/packages/appkit/src/plugins/genie/genie.ts @@ -65,6 +65,15 @@ export class GeniePlugin extends Plugin { await this.asUser(req)._handleGetConversation(req, res); }, }); + + this.route(router, { + name: "getMessage", + method: "get", + path: "/:alias/conversations/:conversationId/messages/:messageId", + handler: async (req: express.Request, res: express.Response) => { + await this.asUser(req)._handleGetMessage(req, res); + }, + }); } async _handleSendMessage( @@ -177,6 +186,59 @@ export class GeniePlugin extends Plugin { ); } + async _handleGetMessage( + req: express.Request, + res: express.Response, + ): Promise { + const { alias, conversationId, messageId } = req.params; + const spaceId = this.resolveSpaceId(alias); + + if (!spaceId) { + res.status(404).json({ error: `Unknown space alias: ${alias}` }); + return; + } + + const requestId = + (typeof req.query.requestId === "string" && req.query.requestId) || + randomUUID(); + + logger.debug( + "Polling message %s in conversation %s from space %s (alias=%s)", + messageId, + conversationId, + spaceId, + alias, + ); + + const timeout = this.config.timeout ?? 120_000; + const streamSettings: StreamExecutionSettings = { + ...genieStreamDefaults, + default: { + ...genieStreamDefaults.default, + timeout, + }, + stream: { + ...genieStreamDefaults.stream, + streamId: requestId, + }, + }; + + const workspaceClient = getWorkspaceClient(); + + await this.executeStream( + res, + () => + this.genieConnector.streamGetMessage( + workspaceClient, + spaceId, + conversationId, + messageId, + { timeout }, + ), + streamSettings, + ); + } + async getConversation( alias: string, conversationId: string, diff --git a/packages/appkit/src/plugins/genie/tests/genie.test.ts b/packages/appkit/src/plugins/genie/tests/genie.test.ts index 37bcb0e6..3cf0784d 100644 --- a/packages/appkit/src/plugins/genie/tests/genie.test.ts +++ b/packages/appkit/src/plugins/genie/tests/genie.test.ts @@ -175,11 +175,15 @@ describe("Genie Plugin", () => { expect.any(Function), ); - expect(router.get).toHaveBeenCalledTimes(1); + expect(router.get).toHaveBeenCalledTimes(2); expect(router.get).toHaveBeenCalledWith( "/:alias/conversations/:conversationId", expect.any(Function), ); + expect(router.get).toHaveBeenCalledWith( + "/:alias/conversations/:conversationId/messages/:messageId", + expect.any(Function), + ); }); });