From 86564deab5c989216ac5d2287a1553f3c291970a Mon Sep 17 00:00:00 2001 From: Bob Bai Date: Sun, 7 Jun 2026 22:54:33 -0700 Subject: [PATCH 1/4] refactor(agent-service): centralize types and isolate clients - Move all type/interface definitions under types/ (new types/metadata.ts, types/api.ts); drop duplicate LogicalPlan/PortSchema/InputPortInfo and the identical AgentSettingsApi/UpdateAgentSettingsRequest. - Make api/ pure HTTP clients: extract the execution client (api/execution-client.ts), move JWT helpers to auth/jwt.ts and endpoint config to config/endpoints.ts, rename *-api.ts -> *-client.ts (backend-api -> operator-metadata-client). - Rename misleading *Config types: AgentDelegateConfig -> AgentDelegation (+ AgentDelegationDto), TexeraAgentConfig -> TexeraAgentOptions, ExecutionConfig -> ExecutionRequestParams, getBackendConfig -> getServiceEndpoints. - De-duplicate the table/IO-shape formatters into tools-utility.ts. - Align CommentBox, WorkflowSettings.executionMode, and WorkflowFatalError with the frontend wire contract. Refs #5563 --- agent-service/src/agent/texera-agent.ts | 113 +++++------ .../src/agent/tools/result-formatting.test.ts | 4 +- .../src/agent/tools/result-formatting.ts | 84 +------- .../src/agent/tools/tools-utility.ts | 86 ++++++++ .../agent/tools/workflow-execution-tools.ts | 185 ++---------------- agent-service/src/agent/util/context-utils.ts | 2 +- .../agent/util/workflow-system-metadata.ts | 14 +- .../src/agent/util/workflow-utils.ts | 11 +- agent-service/src/agent/workflow-state.ts | 2 + .../api/{compile-api.ts => compile-client.ts} | 29 +-- agent-service/src/api/execution-client.ts | 106 ++++++++++ agent-service/src/api/index.ts | 9 +- ...ion-api.ts => operator-metadata-client.ts} | 26 +-- .../{workflow-api.ts => workflow-client.ts} | 27 +-- .../src/{api/auth-api.ts => auth/jwt.ts} | 0 agent-service/src/config/endpoints.ts | 39 ++++ agent-service/src/index.ts | 2 +- agent-service/src/server.ts | 96 ++++----- agent-service/src/types/agent.ts | 66 +++++-- agent-service/src/types/api.ts | 95 +++++++++ agent-service/src/types/index.ts | 2 + .../{api/backend-api.ts => types/metadata.ts} | 44 ++--- agent-service/src/types/workflow.ts | 22 ++- 23 files changed, 546 insertions(+), 518 deletions(-) rename agent-service/src/api/{compile-api.ts => compile-client.ts} (68%) create mode 100644 agent-service/src/api/execution-client.ts rename agent-service/src/api/{execution-api.ts => operator-metadata-client.ts} (62%) rename agent-service/src/api/{workflow-api.ts => workflow-client.ts} (82%) rename agent-service/src/{api/auth-api.ts => auth/jwt.ts} (100%) create mode 100644 agent-service/src/config/endpoints.ts create mode 100644 agent-service/src/types/api.ts rename agent-service/src/{api/backend-api.ts => types/metadata.ts} (63%) diff --git a/agent-service/src/agent/texera-agent.ts b/agent-service/src/agent/texera-agent.ts index 37eb12d8688..1b53590f560 100644 --- a/agent-service/src/agent/texera-agent.ts +++ b/agent-service/src/agent/texera-agent.ts @@ -24,7 +24,16 @@ import { WorkflowState } from "./workflow-state"; import { WorkflowSystemMetadata } from "./util/workflow-system-metadata"; import { WorkflowResultState } from "./workflow-result-state"; import { formatOperatorResult } from "./tools/result-formatting"; -import type { AgentSettings, ReActStep, TokenUsage, UserInfo } from "../types/agent"; +import type { + AgentSettings, + ReActStep, + TokenUsage, + AgentDelegation, + AgentMessageResult, + TexeraAgentOptions, + ReActStepCallback, + ExecutionRequestParams, +} from "../types/agent"; import { AgentState as AgentStateEnum, DEFAULT_AGENT_SETTINGS, @@ -45,33 +54,15 @@ import { createExecuteOperatorTool, executeOperatorAndFormat, TOOL_NAME_EXECUTE_OPERATOR, - type ExecutionConfig, } from "./tools/workflow-execution-tools"; import { assembleContext } from "./util/context-utils"; -import { compileWorkflowAsync, type WorkflowCompilationResponse } from "../api/compile-api"; +import { compileWorkflowAsync } from "../api/compile-client"; +import type { WorkflowCompilationResponse } from "../types/api"; import { createLogger } from "../logger"; import type { Logger } from "pino"; const PERSIST_DEBOUNCE_MS = 500; -export interface TexeraAgentConfig { - model: LanguageModel; - modelType: string; - agentId: string; - agentName?: string; - systemPrompt?: string; -} - -export interface AgentMessageResult { - response: string; - messages: ModelMessage[]; - usage: TokenUsage; - stopped: boolean; - error?: string; -} - -type ReActStepCallback = (step: ReActStep) => void; - /** * A single Texera agent instance. * @@ -105,13 +96,7 @@ export class TexeraAgent { private currentMessageId: string | undefined = undefined; - private delegateConfig?: { - userToken: string; - userInfo?: UserInfo; - workflowId: number; - workflowName?: string; - computingUnitId?: number; - }; + private delegation?: AgentDelegation; private stepCallback: ReActStepCallback | null = null; @@ -125,13 +110,13 @@ export class TexeraAgent { private log: Logger; - constructor(config: TexeraAgentConfig) { - this.agentId = config.agentId; - this.agentName = config.agentName || `Agent-${config.agentId}`; - this.modelType = config.modelType; + constructor(options: TexeraAgentOptions) { + this.agentId = options.agentId; + this.agentName = options.agentName || `Agent-${options.agentId}`; + this.modelType = options.modelType; this.createdAt = new Date(); - this.model = config.model; - this.systemPrompt = config.systemPrompt || ""; + this.model = options.model; + this.systemPrompt = options.systemPrompt || ""; this.log = createLogger("TexeraAgent", { agentId: this.agentId }); this.workflowState = new WorkflowState(); @@ -179,12 +164,12 @@ export class TexeraAgent { this.settings.systemPrompt = this.systemPrompt; } - private buildExecutionConfig(): ExecutionConfig | undefined { - if (!this.delegateConfig) return undefined; + private buildExecutionParams(): ExecutionRequestParams | undefined { + if (!this.delegation) return undefined; return { - userToken: this.delegateConfig.userToken, - workflowId: this.delegateConfig.workflowId, - computingUnitId: this.delegateConfig.computingUnitId, + userToken: this.delegation.userToken, + workflowId: this.delegation.workflowId, + computingUnitId: this.delegation.computingUnitId, maxOperatorResultCharLimit: this.settings.maxOperatorResultCharLimit, maxOperatorResultCellCharLimit: this.settings.maxOperatorResultCellCharLimit, executionTimeoutMs: this.settings.executionTimeoutMs, @@ -201,7 +186,7 @@ export class TexeraAgent { } } - const getExecutionConfig = this.delegateConfig ? () => this.buildExecutionConfig()! : undefined; + const getExecutionParams = this.delegation ? () => this.buildExecutionParams()! : undefined; const context: ToolContext = { metadataStore: this.metadataStore, @@ -218,10 +203,10 @@ export class TexeraAgent { [TOOL_NAME_MODIFY_OPERATOR]: createModifyOperatorTool(this.workflowState, context), }; - if (getExecutionConfig) { + if (getExecutionParams) { tools[TOOL_NAME_EXECUTE_OPERATOR] = createExecuteOperatorTool( this.workflowState, - getExecutionConfig, + getExecutionParams, (opId, operatorInfo) => { this.workflowResultState.set(opId, this.head, operatorInfo); } @@ -415,38 +400,30 @@ export class TexeraAgent { return; } - if (!this.delegateConfig?.workflowId || !this.delegateConfig?.userToken) { + if (!this.delegation?.workflowId || !this.delegation?.userToken) { return; } try { - const { retrieveWorkflow } = await import("../api/workflow-api"); - const workflow = await retrieveWorkflow(this.delegateConfig.userToken, this.delegateConfig.workflowId); + const { retrieveWorkflow } = await import("../api/workflow-client"); + const workflow = await retrieveWorkflow(this.delegation.userToken, this.delegation.workflowId); this.workflowState.setWorkflowContent(workflow.content); - this.log.debug({ workflowId: this.delegateConfig.workflowId }, "refreshed workflow from backend"); + this.log.debug({ workflowId: this.delegation.workflowId }, "refreshed workflow from backend"); } catch (error) { this.log.warn({ err: error }, "failed to refresh workflow from backend"); } } - setDelegateConfig(config: { - userToken: string; - userInfo?: UserInfo; - workflowId: number; - workflowName?: string; - computingUnitId?: number; - }): void { - this.delegateConfig = config; + setDelegation(delegation: AgentDelegation): void { + this.delegation = delegation; this.tools = this.createTools(); this.setupWorkflowChangeHandlers(); } - getDelegateConfig(): - | { userToken: string; userInfo?: UserInfo; workflowId: number; workflowName?: string; computingUnitId?: number } - | undefined { - return this.delegateConfig; + getDelegation(): AgentDelegation | undefined { + return this.delegation; } private setupWorkflowChangeHandlers(): void { @@ -457,22 +434,22 @@ export class TexeraAgent { const subscription = new Subscription(); const workflowChanged$ = this.workflowState.getWorkflowChangedStream(); - if (this.delegateConfig?.workflowId && this.delegateConfig.userToken) { + if (this.delegation?.workflowId && this.delegation.userToken) { const persistSubscription = workflowChanged$.pipe(debounceTime(PERSIST_DEBOUNCE_MS)).subscribe(async () => { - if (!this.delegateConfig?.workflowId || !this.delegateConfig.userToken) { + if (!this.delegation?.workflowId || !this.delegation.userToken) { return; } try { - const { persistWorkflow } = await import("../api/workflow-api"); + const { persistWorkflow } = await import("../api/workflow-client"); const workflowContent = this.workflowState.getWorkflowContent(); await persistWorkflow( - this.delegateConfig.userToken, - this.delegateConfig.workflowId, - this.delegateConfig.workflowName || "Agent Workflow", + this.delegation.userToken, + this.delegation.workflowId, + this.delegation.workflowName || "Agent Workflow", workflowContent ); - this.log.debug({ workflowId: this.delegateConfig.workflowId }, "auto-persisted workflow"); + this.log.debug({ workflowId: this.delegation.workflowId }, "auto-persisted workflow"); } catch (error) { this.log.error({ err: error }, "failed to auto-persist workflow"); } @@ -612,8 +589,8 @@ export class TexeraAgent { this.addStep(agentStep); this.head = agentStepId; - const execConfig = this.buildExecutionConfig(); - if (execConfig && toolCalls && toolResults) { + const execParams = this.buildExecutionParams(); + if (execParams && toolCalls && toolResults) { const EXECUTE_AFTER_TOOLS = new Set([TOOL_NAME_ADD_OPERATOR, TOOL_NAME_MODIFY_OPERATOR]); for (let i = 0; i < toolCalls.length; i++) { @@ -628,7 +605,7 @@ export class TexeraAgent { if (!operatorId) continue; try { - await executeOperatorAndFormat(this.workflowState, execConfig, operatorId, { + await executeOperatorAndFormat(this.workflowState, execParams, operatorId, { abortSignal: this.abortController?.signal, onResult: (opId, operatorInfo) => { this.workflowResultState.set(opId, this.head, operatorInfo); diff --git a/agent-service/src/agent/tools/result-formatting.test.ts b/agent-service/src/agent/tools/result-formatting.test.ts index e6d1afdf2e3..a81c698d3d2 100644 --- a/agent-service/src/agent/tools/result-formatting.test.ts +++ b/agent-service/src/agent/tools/result-formatting.test.ts @@ -261,7 +261,7 @@ describe("formatOperatorResult - visualization rows", () => { }); }); -describe("jsonToTableFormat - cell coercion via formatOperatorResult", () => { +describe("formatRecordsAsTable - cell coercion via formatOperatorResult", () => { function tableLines(opInfo: Partial): string[] { const out = formatOperatorResult("op1", makeOpInfo({ outputTuples: 1, ...opInfo }), EMPTY_STATE); // Skip brief summary + shape line. @@ -295,7 +295,7 @@ describe("jsonToTableFormat - cell coercion via formatOperatorResult", () => { }); }); -describe("jsonToTableFormat - row index gaps", () => { +describe("formatRecordsAsTable - row index gaps", () => { test("inserts ... separator when __row_index__ skips ahead", () => { const out = formatOperatorResult( "op1", diff --git a/agent-service/src/agent/tools/result-formatting.ts b/agent-service/src/agent/tools/result-formatting.ts index 5ed4aacc5d4..370c1695c4f 100644 --- a/agent-service/src/agent/tools/result-formatting.ts +++ b/agent-service/src/agent/tools/result-formatting.ts @@ -19,7 +19,12 @@ import type { OperatorInfo } from "../../types/execution"; import type { WorkflowState } from "../workflow-state"; -import { formatExecuteOperatorResult, getVisibleResultHeaders } from "./tools-utility"; +import { + formatExecuteOperatorResult, + getVisibleResultHeaders, + formatOperatorIoShape, + formatRecordsAsTable, +} from "./tools-utility"; export function formatOperatorResult(operatorId: string, opInfo: OperatorInfo, workflowState: WorkflowState): string { if (opInfo.error) { @@ -50,86 +55,13 @@ export function formatOperatorResult(operatorId: string, opInfo: OperatorInfo, w }) : jsonArray; - const dataString = jsonToTableFormat(serializableArray); + const dataString = formatRecordsAsTable(serializableArray); const metadataLines = [ - formatInputOutputMetadata(workflowState, operatorId, opInfo, columns), + formatOperatorIoShape(workflowState, operatorId, opInfo, columns), ...(opInfo.warnings ?? []), ].filter(Boolean); const briefSummary = formatExecuteOperatorResult(operatorId); return [briefSummary, ...metadataLines, dataString].filter(Boolean).join("\n"); } - -function formatInputOutputMetadata( - workflowState: WorkflowState, - operatorId: string, - opInfo: OperatorInfo, - outputColumns: number -): string { - const outputRows = opInfo.totalRowCount ?? opInfo.outputTuples; - const outputLine = `Output table shape: (${outputRows}, ${outputColumns})`; - - const inputShapes = opInfo.inputPortShapes; - if (!inputShapes || inputShapes.length === 0) { - return outputLine; - } - - const inputLinks = workflowState.getAllLinks().filter(l => l.target.operatorID === operatorId); - const portIndexToUpstream = new Map(); - const op = workflowState.getOperator(operatorId); - for (const link of inputLinks) { - const portIdx = op?.inputPorts.findIndex(p => p.portID === link.target.portID) ?? -1; - if (portIdx >= 0) { - portIndexToUpstream.set(portIdx, link.source.operatorID); - } - } - - const inputPart = inputShapes - .sort((a, b) => a.portIndex - b.portIndex) - .map(p => { - const name = portIndexToUpstream.get(p.portIndex) ?? `input${p.portIndex}`; - return `${name}(${p.rows}, ${p.columns})`; - }) - .join(", "); - - return `Input operator(table shape): ${inputPart}\n${outputLine}`; -} - -function jsonToTableFormat(jsonResult: Record[]): string { - if (!jsonResult || jsonResult.length === 0) return ""; - - const hasRowIndex = "__row_index__" in jsonResult[0]; - const headers = getVisibleResultHeaders(jsonResult[0]); - if (headers.length === 0) return ""; - - const headerLine = "\t" + headers.join("\t"); - const formattedRows: string[] = []; - let prevIndex = -1; - - for (let i = 0; i < jsonResult.length; i++) { - const row = jsonResult[i]; - const rowIndex = hasRowIndex ? (row["__row_index__"] as number) : i; - - if (prevIndex >= 0 && rowIndex > prevIndex + 1) { - const dots = headers.map(() => "...").join("\t"); - formattedRows.push(`...\t${dots}`); - } - prevIndex = rowIndex; - - const cells = headers.map(h => { - const val = row[h]; - if (val === null) return "NaN"; - if (val === undefined) return ""; - if (typeof val === "number" || typeof val === "boolean") return String(val); - if (typeof val === "string") { - if (val === "NULL") return "NaN"; - return val.replace(/\t/g, "\\t").replace(/\n/g, "\\n"); - } - return JSON.stringify(val); - }); - formattedRows.push(`${rowIndex}\t${cells.join("\t")}`); - } - - return [headerLine, ...formattedRows].join("\n"); -} diff --git a/agent-service/src/agent/tools/tools-utility.ts b/agent-service/src/agent/tools/tools-utility.ts index 6c9ab004f6e..59f2ea69fd9 100644 --- a/agent-service/src/agent/tools/tools-utility.ts +++ b/agent-service/src/agent/tools/tools-utility.ts @@ -17,6 +17,9 @@ * under the License. */ +import type { WorkflowState } from "../workflow-state"; +import type { OperatorInfo } from "../../types/execution"; + export const INTERNAL_RESULT_KEYS: ReadonlySet = new Set(["__row_index__", "__is_visualization__"]); export function getVisibleResultHeaders(row: Record): string[] { @@ -74,3 +77,86 @@ export function formatExecuteOperatorResult(operatorId: string): string { export function formatOperatorError(operatorId: string, error: string): string { return `Error on operator ${operatorId}: ${error}`; } + +/** + * Renders an operator's input/output table shapes as a one/two-line summary, + * naming each input by its upstream operator id. + */ +export function formatOperatorIoShape( + workflowState: WorkflowState, + operatorId: string, + opInfo: OperatorInfo, + outputColumns: number +): string { + const outputRows = opInfo.totalRowCount ?? opInfo.outputTuples; + const outputLine = `Output table shape: (${outputRows}, ${outputColumns})`; + + const inputShapes = opInfo.inputPortShapes; + if (!inputShapes || inputShapes.length === 0) { + return outputLine; + } + + const inputLinks = workflowState.getAllLinks().filter(l => l.target.operatorID === operatorId); + const portIndexToUpstream = new Map(); + const op = workflowState.getOperator(operatorId); + for (const link of inputLinks) { + const portIdx = op?.inputPorts.findIndex(p => p.portID === link.target.portID) ?? -1; + if (portIdx >= 0) { + portIndexToUpstream.set(portIdx, link.source.operatorID); + } + } + + const inputPart = inputShapes + .sort((a, b) => a.portIndex - b.portIndex) + .map(p => { + const name = portIndexToUpstream.get(p.portIndex) ?? `input${p.portIndex}`; + return `${name}(${p.rows}, ${p.columns})`; + }) + .join(", "); + + return `Input operator(table shape): ${inputPart}\n${outputLine}`; +} + +/** + * Serializes result records as a tab-separated table with a leading index + * column (pandas `__repr__` style), collapsing gaps in `__row_index__` into a + * `...` separator row. + */ +export function formatRecordsAsTable(records: Record[]): string { + if (!records || records.length === 0) return ""; + + const hasRowIndex = "__row_index__" in records[0]; + const headers = getVisibleResultHeaders(records[0]); + if (headers.length === 0) return ""; + // Leading tab aligns headers with the index column (pandas __repr__ style). + const headerLine = "\t" + headers.join("\t"); + + const formattedRows: string[] = []; + let prevIndex = -1; + + for (let i = 0; i < records.length; i++) { + const row = records[i]; + const rowIndex = hasRowIndex ? (row["__row_index__"] as number) : i; + + if (prevIndex >= 0 && rowIndex > prevIndex + 1) { + const dots = headers.map(() => "...").join("\t"); + formattedRows.push(`...\t${dots}`); + } + prevIndex = rowIndex; + + const cells = headers.map(h => { + const val = row[h]; + if (val === null) return "NaN"; + if (val === undefined) return ""; + if (typeof val === "number" || typeof val === "boolean") return String(val); + if (typeof val === "string") { + if (val === "NULL") return "NaN"; + return val.replace(/\t/g, "\\t").replace(/\n/g, "\\n"); + } + return JSON.stringify(val); + }); + formattedRows.push(`${rowIndex}\t${cells.join("\t")}`); + } + + return [headerLine, ...formattedRows].join("\n"); +} diff --git a/agent-service/src/agent/tools/workflow-execution-tools.ts b/agent-service/src/agent/tools/workflow-execution-tools.ts index 78c6cfa3d55..799a6e02b29 100644 --- a/agent-service/src/agent/tools/workflow-execution-tools.ts +++ b/agent-service/src/agent/tools/workflow-execution-tools.ts @@ -19,29 +19,22 @@ import { z } from "zod"; import { tool } from "ai"; -import { createErrorResult, formatExecuteOperatorResult, getVisibleResultHeaders } from "./tools-utility"; +import { + createErrorResult, + formatExecuteOperatorResult, + getVisibleResultHeaders, + formatOperatorIoShape, + formatRecordsAsTable, +} from "./tools-utility"; import type { WorkflowState } from "../workflow-state"; -import { getBackendConfig } from "../../api/backend-api"; -import { env } from "../../config/env"; -import type { LogicalPlan, LogicalLink } from "../../api/execution-api"; +import { executeWorkflowHttp } from "../../api/execution-client"; +import type { LogicalPlan, LogicalLink } from "../../types/workflow"; import type { OperatorInfo, SyncExecutionResult } from "../../types/execution"; import { WorkflowSystemMetadata } from "../util/workflow-system-metadata"; -import { DEFAULT_AGENT_SETTINGS } from "../../types/agent"; -import { createLogger } from "../../logger"; - -const log = createLogger("ExecutionTools"); +import { DEFAULT_AGENT_SETTINGS, type ExecutionRequestParams } from "../../types/agent"; export const TOOL_NAME_EXECUTE_OPERATOR = "executeOperator"; -export interface ExecutionConfig { - userToken: string; - workflowId: number; - computingUnitId?: number; - maxOperatorResultCharLimit?: number; - maxOperatorResultCellCharLimit?: number; - executionTimeoutMs?: number; -} - /** * FIFO async lock used to serialize workflow executions per workflow id. * @@ -251,117 +244,6 @@ function buildLogicalPlan(workflowState: WorkflowState, opsToViewResult?: string }; } -async function executeWorkflowHttp( - config: ExecutionConfig, - logicalPlan: LogicalPlan, - options: { abortSignal?: AbortSignal } = {} -): Promise { - const backendConfig = getBackendConfig(); - - const workflowId = config.workflowId; - const computingUnitId = config.computingUnitId ?? 0; - - // In k8s each computing unit is a separate pod, so the endpoint varies per cuid. - const executionEndpoint = env.EXECUTION_ENDPOINT_TEMPLATE - ? env.EXECUTION_ENDPOINT_TEMPLATE.replace("{cuid}", String(computingUnitId)) - : backendConfig.executionEndpoint; - - const url = `${executionEndpoint}/api/execution/${workflowId}/${computingUnitId}/run`; - - const timeoutSeconds = config.executionTimeoutMs - ? Math.ceil(config.executionTimeoutMs / 1000) - : Math.ceil(DEFAULT_AGENT_SETTINGS.executionTimeoutMs / 1000); - - const request = { - executionName: "agent-execution", - logicalPlan: { - operators: logicalPlan.operators, - links: logicalPlan.links, - opsToViewResult: logicalPlan.opsToViewResult || [], - opsToReuseResult: [], - }, - targetOperatorIds: logicalPlan.opsToViewResult || [], - timeoutSeconds, - maxOperatorResultCharLimit: config.maxOperatorResultCharLimit ?? DEFAULT_AGENT_SETTINGS.maxOperatorResultCharLimit, - maxOperatorResultCellCharLimit: - config.maxOperatorResultCellCharLimit ?? DEFAULT_AGENT_SETTINGS.maxOperatorResultCellCharLimit, - }; - - log.debug( - { - url, - maxOperatorResultCharLimit: request.maxOperatorResultCharLimit, - maxOperatorResultCellCharLimit: request.maxOperatorResultCellCharLimit, - }, - "executing workflow" - ); - - try { - const response = await fetch(url, { - method: "POST", - headers: { - "Content-Type": "application/json", - Authorization: `Bearer ${config.userToken}`, - }, - body: JSON.stringify(request), - signal: options.abortSignal, - }); - - if (!response.ok) { - const errorText = await response.text(); - throw new Error(`Execution request failed: ${response.status} ${response.statusText} - ${errorText}`); - } - - return (await response.json()) as SyncExecutionResult; - } catch (error) { - if (error instanceof Error && error.name === "AbortError") { - throw error; - } - log.error({ err: error }, "execution failed"); - return { - success: false, - state: "Error", - operators: {}, - errors: [error instanceof Error ? error.message : "Unknown error"], - }; - } -} - -function formatInputOutput( - workflowState: WorkflowState, - operatorId: string, - opInfo: OperatorInfo, - outputColumns: number -): string { - const outputRows = opInfo.totalRowCount ?? opInfo.outputTuples; - const outputLine = `Output table shape: (${outputRows}, ${outputColumns})`; - - const inputShapes = opInfo.inputPortShapes; - if (!inputShapes || inputShapes.length === 0) { - return outputLine; - } - - const inputLinks = workflowState.getAllLinks().filter(l => l.target.operatorID === operatorId); - const portIndexToUpstream = new Map(); - const op = workflowState.getOperator(operatorId); - for (const link of inputLinks) { - const portIdx = op?.inputPorts.findIndex(p => p.portID === link.target.portID) ?? -1; - if (portIdx >= 0) { - portIndexToUpstream.set(portIdx, link.source.operatorID); - } - } - - const inputPart = inputShapes - .sort((a, b) => a.portIndex - b.portIndex) - .map(p => { - const name = portIndexToUpstream.get(p.portIndex) ?? `input${p.portIndex}`; - return `${name}(${p.rows}, ${p.columns})`; - }) - .join(", "); - - return `Input operator(table shape): ${inputPart}\n${outputLine}`; -} - function formatExecutionError( compilationErrors?: Record, operatorErrors?: Array<{ operatorId: string; error: string }>, @@ -393,48 +275,9 @@ function formatExecutionError( return lines.join("\n"); } -function jsonToTableFormat(jsonResult: Record[]): string { - if (!jsonResult || jsonResult.length === 0) return ""; - - const hasRowIndex = jsonResult.length > 0 && "__row_index__" in jsonResult[0]; - const headers = getVisibleResultHeaders(jsonResult[0]); - if (headers.length === 0) return ""; - // Leading tab aligns headers with the index column (pandas __repr__ style). - const headerLine = "\t" + headers.join("\t"); - - const formattedRows: string[] = []; - let prevIndex = -1; - - for (let i = 0; i < jsonResult.length; i++) { - const row = jsonResult[i]; - const rowIndex = hasRowIndex ? (row["__row_index__"] as number) : i; - - if (prevIndex >= 0 && rowIndex > prevIndex + 1) { - const dots = headers.map(() => "...").join("\t"); - formattedRows.push(`...\t${dots}`); - } - prevIndex = rowIndex; - - const cells = headers.map(h => { - const val = row[h]; - if (val === null) return "NaN"; - if (val === undefined) return ""; - if (typeof val === "number" || typeof val === "boolean") return String(val); - if (typeof val === "string") { - if (val === "NULL") return "NaN"; - return val.replace(/\t/g, "\\t").replace(/\n/g, "\\n"); - } - return JSON.stringify(val); - }); - formattedRows.push(`${rowIndex}\t${cells.join("\t")}`); - } - - return [headerLine, ...formattedRows].join("\n"); -} - export async function executeOperatorAndFormat( workflowState: WorkflowState, - config: ExecutionConfig, + config: ExecutionRequestParams, operatorId: string, options: { abortSignal?: AbortSignal; @@ -532,7 +375,7 @@ export async function executeOperatorAndFormat( } } - let dataString = jsonToTableFormat(jsonArray); + let dataString = formatRecordsAsTable(jsonArray); // Safety-net: TSV serialization may add padding beyond backend's raw-record budget. const charLimit = config.maxOperatorResultCharLimit ?? DEFAULT_AGENT_SETTINGS.maxOperatorResultCharLimit; @@ -568,7 +411,7 @@ export async function executeOperatorAndFormat( dataString = [headerLine, ...keptRows].join("\n"); } - const shapeLine = formatInputOutput(workflowState, operatorId, opInfo, columns); + const shapeLine = formatOperatorIoShape(workflowState, operatorId, opInfo, columns); const warningLines = opInfo.warnings?.map(w => w) ?? []; @@ -588,7 +431,7 @@ export async function executeOperatorAndFormat( export function createExecuteOperatorTool( workflowState: WorkflowState, - getConfig: () => ExecutionConfig, + getConfig: () => ExecutionRequestParams, onResult?: (operatorId: string, operatorInfo: OperatorInfo) => void ) { return tool({ diff --git a/agent-service/src/agent/util/context-utils.ts b/agent-service/src/agent/util/context-utils.ts index 195692cbf50..61f6ff15c38 100644 --- a/agent-service/src/agent/util/context-utils.ts +++ b/agent-service/src/agent/util/context-utils.ts @@ -25,7 +25,7 @@ import type { ModelMessage } from "ai"; import type { WorkflowState } from "../workflow-state"; import type { OperatorPredicate, OperatorPortSchemaMap, PortSchema } from "../../types/workflow"; import type { ReActStep } from "../../types/agent"; -import type { WorkflowCompilationResponse, WorkflowFatalError } from "../../api/compile-api"; +import type { WorkflowCompilationResponse, WorkflowFatalError } from "../../types/api"; import { extractOperatorInputPortSchemaMap } from "./workflow-utils"; import { createLogger } from "../../logger"; diff --git a/agent-service/src/agent/util/workflow-system-metadata.ts b/agent-service/src/agent/util/workflow-system-metadata.ts index 9269a0cff7c..06bbe58ce33 100644 --- a/agent-service/src/agent/util/workflow-system-metadata.ts +++ b/agent-service/src/agent/util/workflow-system-metadata.ts @@ -18,7 +18,8 @@ */ import Ajv from "ajv"; -import { fetchOperatorMetadata, type OperatorSchema, type OperatorMetadata } from "../../api/backend-api"; +import { fetchOperatorMetadata } from "../../api/operator-metadata-client"; +import type { OperatorMetadata, OperatorSchemaInfo, CompactOperatorSchema } from "../../types/metadata"; import type { ValidationError, Validation } from "../../types/workflow"; import { createLogger } from "../../logger"; @@ -26,17 +27,6 @@ const log = createLogger("WorkflowSystemMetadata"); export type { ValidationError, Validation } from "../../types/workflow"; -interface OperatorSchemaInfo { - properties: any; - required: any; - definitions: any; -} - -interface CompactOperatorSchema { - properties: Record; - required: string[]; -} - const FILTERED_PROPERTY_KEYS = ["dummyPropertyList"]; const FILTERED_DEFINITION_KEYS = [ diff --git a/agent-service/src/agent/util/workflow-utils.ts b/agent-service/src/agent/util/workflow-utils.ts index d723f61eff0..670f3968026 100644 --- a/agent-service/src/agent/util/workflow-utils.ts +++ b/agent-service/src/agent/util/workflow-utils.ts @@ -27,6 +27,7 @@ import type { } from "../../types/workflow"; import type { WorkflowSystemMetadata } from "./workflow-system-metadata"; import type { WorkflowState } from "../workflow-state"; +import type { InputPortInfo, OutputPortInfo } from "../../types/metadata"; // Format "{id}_{internal}" must align with the backend port-identity serializer. function serializePortIdentity(id: number, internal: boolean = false): string { @@ -97,16 +98,6 @@ export function extractOperatorInputPortSchemaMap( return hasAnySchema ? inputPortSchemaMap : undefined; } -interface InputPortInfo { - displayName?: string; - disallowMultiLinks?: boolean; - dependencies?: { id: number; internal: boolean }[]; -} - -interface OutputPortInfo { - displayName?: string; -} - function inputPortToPortDescription(portID: string, inputPortInfo: InputPortInfo): PortDescription { return { portID, diff --git a/agent-service/src/agent/workflow-state.ts b/agent-service/src/agent/workflow-state.ts index 04ad2b0e4e8..c725e23fea3 100644 --- a/agent-service/src/agent/workflow-state.ts +++ b/agent-service/src/agent/workflow-state.ts @@ -30,6 +30,7 @@ import type { WorkflowSettings, ValidationError, } from "../types/workflow"; +import { ExecutionMode } from "../types/workflow"; export type { ValidationError, Validation } from "../types/workflow"; @@ -40,6 +41,7 @@ interface ValidationOutput { const DEFAULT_WORKFLOW_SETTINGS: WorkflowSettings = { dataTransferBatchSize: 400, + executionMode: ExecutionMode.PIPELINED, }; /** diff --git a/agent-service/src/api/compile-api.ts b/agent-service/src/api/compile-client.ts similarity index 68% rename from agent-service/src/api/compile-api.ts rename to agent-service/src/api/compile-client.ts index 8ffd27fd52c..84edaf7368a 100644 --- a/agent-service/src/api/compile-api.ts +++ b/agent-service/src/api/compile-client.ts @@ -17,34 +17,15 @@ * under the License. */ -import { getBackendConfig } from "./backend-api"; -import type { LogicalPlan, OperatorPortSchemaMap } from "../types/workflow"; +import { getServiceEndpoints } from "../config/endpoints"; +import type { LogicalPlan } from "../types/workflow"; +import type { WorkflowCompilationResponse } from "../types/api"; import { createLogger } from "../logger"; -const log = createLogger("CompileAPI"); - -export interface SchemaAttribute { - attributeName: string; - attributeType: "string" | "integer" | "double" | "boolean" | "long" | "timestamp" | "binary"; -} - -export type PortSchema = ReadonlyArray; - -export interface WorkflowFatalError { - type: string; - message: string; - operatorId?: string; -} - -export interface WorkflowCompilationResponse { - physicalPlan?: any; - operatorOutputSchemas: Record; - operatorErrors: Record; -} +const log = createLogger("CompileClient"); export async function compileWorkflowAsync(logicalPlan: LogicalPlan): Promise { - const config = getBackendConfig(); - const url = `${config.compileEndpoint}/api/compile`; + const url = `${getServiceEndpoints().compileEndpoint}/api/compile`; const body = { operators: logicalPlan.operators, diff --git a/agent-service/src/api/execution-client.ts b/agent-service/src/api/execution-client.ts new file mode 100644 index 00000000000..08b7bd96efd --- /dev/null +++ b/agent-service/src/api/execution-client.ts @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { getServiceEndpoints } from "../config/endpoints"; +import { env } from "../config/env"; +import type { LogicalPlan } from "../types/workflow"; +import type { SyncExecutionResult } from "../types/execution"; +import { DEFAULT_AGENT_SETTINGS, type ExecutionRequestParams } from "../types/agent"; +import { createLogger } from "../logger"; + +const log = createLogger("ExecutionClient"); + +/** + * POSTs a logical plan to the Workflow Execution Service and returns the + * synchronous run result. Network/abort errors are surfaced; non-abort + * failures are logged and returned as an error-state result. + */ +export async function executeWorkflowHttp( + params: ExecutionRequestParams, + logicalPlan: LogicalPlan, + options: { abortSignal?: AbortSignal } = {} +): Promise { + const workflowId = params.workflowId; + const computingUnitId = params.computingUnitId ?? 0; + + // In k8s each computing unit is a separate pod, so the endpoint varies per cuid. + const executionEndpoint = env.EXECUTION_ENDPOINT_TEMPLATE + ? env.EXECUTION_ENDPOINT_TEMPLATE.replace("{cuid}", String(computingUnitId)) + : getServiceEndpoints().executionEndpoint; + + const url = `${executionEndpoint}/api/execution/${workflowId}/${computingUnitId}/run`; + + const timeoutSeconds = params.executionTimeoutMs + ? Math.ceil(params.executionTimeoutMs / 1000) + : Math.ceil(DEFAULT_AGENT_SETTINGS.executionTimeoutMs / 1000); + + const request = { + executionName: "agent-execution", + logicalPlan: { + operators: logicalPlan.operators, + links: logicalPlan.links, + opsToViewResult: logicalPlan.opsToViewResult || [], + opsToReuseResult: [], + }, + targetOperatorIds: logicalPlan.opsToViewResult || [], + timeoutSeconds, + maxOperatorResultCharLimit: params.maxOperatorResultCharLimit ?? DEFAULT_AGENT_SETTINGS.maxOperatorResultCharLimit, + maxOperatorResultCellCharLimit: + params.maxOperatorResultCellCharLimit ?? DEFAULT_AGENT_SETTINGS.maxOperatorResultCellCharLimit, + }; + + log.debug( + { + url, + maxOperatorResultCharLimit: request.maxOperatorResultCharLimit, + maxOperatorResultCellCharLimit: request.maxOperatorResultCellCharLimit, + }, + "executing workflow" + ); + + try { + const response = await fetch(url, { + method: "POST", + headers: { + "Content-Type": "application/json", + Authorization: `Bearer ${params.userToken}`, + }, + body: JSON.stringify(request), + signal: options.abortSignal, + }); + + if (!response.ok) { + const errorText = await response.text(); + throw new Error(`Execution request failed: ${response.status} ${response.statusText} - ${errorText}`); + } + + return (await response.json()) as SyncExecutionResult; + } catch (error) { + if (error instanceof Error && error.name === "AbortError") { + throw error; + } + log.error({ err: error }, "execution failed"); + return { + success: false, + state: "Error", + operators: {}, + errors: [error instanceof Error ? error.message : "Unknown error"], + }; + } +} diff --git a/agent-service/src/api/index.ts b/agent-service/src/api/index.ts index eca292d7ffe..2346bb053f5 100644 --- a/agent-service/src/api/index.ts +++ b/agent-service/src/api/index.ts @@ -17,8 +17,7 @@ * under the License. */ -export * from "./backend-api"; -export * from "./execution-api"; -export * from "./workflow-api"; -export * from "./auth-api"; -export * from "./compile-api"; +export * from "./operator-metadata-client"; +export * from "./compile-client"; +export * from "./workflow-client"; +export * from "./execution-client"; diff --git a/agent-service/src/api/execution-api.ts b/agent-service/src/api/operator-metadata-client.ts similarity index 62% rename from agent-service/src/api/execution-api.ts rename to agent-service/src/api/operator-metadata-client.ts index 4692a61d754..f5ebb48cd48 100644 --- a/agent-service/src/api/execution-api.ts +++ b/agent-service/src/api/operator-metadata-client.ts @@ -17,22 +17,16 @@ * under the License. */ -export interface LogicalLink { - fromOpId: string; - fromPortId: { id: number; internal: boolean }; - toOpId: string; - toPortId: { id: number; internal: boolean }; -} +import { getServiceEndpoints } from "../config/endpoints"; +import type { OperatorMetadata } from "../types/metadata"; -interface LogicalOperator { - operatorID: string; - operatorType: string; - [key: string]: any; -} +export async function fetchOperatorMetadata(): Promise { + const url = `${getServiceEndpoints().apiEndpoint}/api/resources/operator-metadata`; + const response = await fetch(url); + + if (!response.ok) { + throw new Error(`Failed to fetch operator metadata: ${response.status} ${response.statusText}`); + } -export interface LogicalPlan { - operators: LogicalOperator[]; - links: LogicalLink[]; - opsToViewResult?: string[]; - opsToReuseResult?: string[]; + return (await response.json()) as OperatorMetadata; } diff --git a/agent-service/src/api/workflow-api.ts b/agent-service/src/api/workflow-client.ts similarity index 82% rename from agent-service/src/api/workflow-api.ts rename to agent-service/src/api/workflow-client.ts index 7a96f979a1c..1e4bf932265 100644 --- a/agent-service/src/api/workflow-api.ts +++ b/agent-service/src/api/workflow-client.ts @@ -17,27 +17,10 @@ * under the License. */ -import { getBackendConfig } from "./backend-api"; -import { createAuthHeaders } from "./auth-api"; +import { getServiceEndpoints } from "../config/endpoints"; +import { createAuthHeaders } from "../auth/jwt"; import type { WorkflowContent } from "../types/workflow"; - -export interface Workflow { - wid: number; - name: string; - description?: string; - content: WorkflowContent; - creationTime?: number; - lastModifiedTime?: number; - isPublished?: boolean; -} - -interface WorkflowPersistRequest { - wid?: number; - name: string; - description?: string; - content: string; - isPublic?: boolean; -} +import type { Workflow, WorkflowPersistRequest } from "../types/api"; const WORKFLOW_BASE_URL = "workflow"; @@ -48,7 +31,7 @@ export async function persistWorkflow( content: WorkflowContent, description?: string ): Promise { - const config = getBackendConfig(); + const config = getServiceEndpoints(); const url = `${config.apiEndpoint}/api/${WORKFLOW_BASE_URL}/persist`; const response = await fetch(url, { @@ -76,7 +59,7 @@ export async function persistWorkflow( } export async function retrieveWorkflow(token: string, wid: number): Promise { - const config = getBackendConfig(); + const config = getServiceEndpoints(); const url = `${config.apiEndpoint}/api/${WORKFLOW_BASE_URL}/${wid}`; const response = await fetch(url, { diff --git a/agent-service/src/api/auth-api.ts b/agent-service/src/auth/jwt.ts similarity index 100% rename from agent-service/src/api/auth-api.ts rename to agent-service/src/auth/jwt.ts diff --git a/agent-service/src/config/endpoints.ts b/agent-service/src/config/endpoints.ts new file mode 100644 index 00000000000..cab8c7ff20c --- /dev/null +++ b/agent-service/src/config/endpoints.ts @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { env } from "./env"; + +/** Base URLs of the backend services this agent service talks to. */ +export interface ServiceEndpoints { + apiEndpoint: string; + modelsEndpoint: string; + compileEndpoint: string; + executionEndpoint: string; +} + +const endpoints: ServiceEndpoints = { + apiEndpoint: env.TEXERA_DASHBOARD_SERVICE_ENDPOINT, + modelsEndpoint: env.LLM_ENDPOINT, + compileEndpoint: env.WORKFLOW_COMPILING_SERVICE_ENDPOINT, + executionEndpoint: env.WORKFLOW_EXECUTION_SERVICE_ENDPOINT, +}; + +export function getServiceEndpoints(): ServiceEndpoints { + return { ...endpoints }; +} diff --git a/agent-service/src/index.ts b/agent-service/src/index.ts index 152a2d21703..691d5286c37 100644 --- a/agent-service/src/index.ts +++ b/agent-service/src/index.ts @@ -22,5 +22,5 @@ export { WorkflowState } from "./agent/workflow-state"; export { WorkflowResultState } from "./agent/workflow-result-state"; export { WorkflowSystemMetadata } from "./agent/util/workflow-system-metadata"; export * from "./agent/tools"; -export { TexeraAgent, type TexeraAgentConfig, type AgentMessageResult } from "./agent/texera-agent"; +export { TexeraAgent } from "./agent/texera-agent"; export { buildSystemPrompt } from "./agent/prompts"; diff --git a/agent-service/src/server.ts b/agent-service/src/server.ts index d5eeae82c9b..05ed22d8735 100644 --- a/agent-service/src/server.ts +++ b/agent-service/src/server.ts @@ -22,9 +22,9 @@ import { cors } from "@elysiajs/cors"; import { createOpenAI } from "@ai-sdk/openai"; import { TexeraAgent } from "./agent/texera-agent"; import { getVisibleResultHeaders } from "./agent/tools/tools-utility"; -import { getBackendConfig } from "./api/backend-api"; -import { extractUserFromToken, validateToken } from "./api/auth-api"; -import { retrieveWorkflow } from "./api/workflow-api"; +import { getServiceEndpoints } from "./config/endpoints"; +import { extractUserFromToken, validateToken } from "./auth/jwt"; +import { retrieveWorkflow } from "./api/workflow-client"; import { WorkflowSystemMetadata } from "./agent/util/workflow-system-metadata"; import { env } from "./config/env"; import { createLogger } from "./logger"; @@ -33,13 +33,14 @@ const log = createLogger("Server"); const wsLog = createLogger("WS"); import type { AgentInfo, - AgentDelegateConfig, + AgentDelegationDto, CreateAgentRequest, UpdateAgentSettingsRequest, - AgentSettingsApi, + AgentSettingsDto, ReActStep, } from "./types/agent"; import { OperatorResultSerializationMode } from "./types/agent"; +import type { WsMessage, WsOutgoingMessage, OperatorResultSummaryWs } from "./types/api"; const agentStore = new Map(); let agentCounter = 0; @@ -47,10 +48,10 @@ let agentCounter = 0; async function createAgentInstance( modelType: string, customName?: string, - delegateConfig?: AgentDelegateConfig + delegation?: AgentDelegationDto ): Promise<{ agentId: string; agent: TexeraAgent }> { const agentId = `agent-${++agentCounter}`; - const config = getBackendConfig(); + const config = getServiceEndpoints(); const openai = createOpenAI({ baseURL: `${config.modelsEndpoint}/api`, @@ -68,37 +69,37 @@ async function createAgentInstance( await agent.initialize(); - if (delegateConfig?.workflowId && delegateConfig.userToken) { + if (delegation?.workflowId && delegation.userToken) { try { - const workflow = await retrieveWorkflow(delegateConfig.userToken, delegateConfig.workflowId); - delegateConfig.workflowName = workflow.name; + const workflow = await retrieveWorkflow(delegation.userToken, delegation.workflowId); + delegation.workflowName = workflow.name; const workflowState = agent.getWorkflowState(); workflowState.setWorkflowContent(workflow.content); - agent.setDelegateConfig({ - userToken: delegateConfig.userToken, - userInfo: delegateConfig.userInfo, - workflowId: delegateConfig.workflowId, - workflowName: delegateConfig.workflowName, - computingUnitId: delegateConfig.computingUnitId, + agent.setDelegation({ + userToken: delegation.userToken, + userInfo: delegation.userInfo, + workflowId: delegation.workflowId, + workflowName: delegation.workflowName, + computingUnitId: delegation.computingUnitId, }); - log.info({ agentId, workflowId: delegateConfig.workflowId }, "loaded workflow for agent"); + log.info({ agentId, workflowId: delegation.workflowId }, "loaded workflow for agent"); } catch (error) { - log.warn({ agentId, workflowId: delegateConfig.workflowId, err: error }, "failed to load workflow"); + log.warn({ agentId, workflowId: delegation.workflowId, err: error }, "failed to load workflow"); } } agentStore.set(agentId, agent); - log.info({ agentId, delegate: !!delegateConfig }, "created agent"); + log.info({ agentId, delegate: !!delegation }, "created agent"); return { agentId, agent }; } function getAgentInfo(agentId: string, agent: TexeraAgent): AgentInfo { const agentSettings = agent.getSettings(); - const settingsApi: AgentSettingsApi = { + const settingsApi: AgentSettingsDto = { maxOperatorResultCharLimit: agentSettings.maxOperatorResultCharLimit, maxOperatorResultCellCharLimit: agentSettings.maxOperatorResultCellCharLimit, operatorResultSerializationMode: agentSettings.operatorResultSerializationMode, @@ -109,7 +110,7 @@ function getAgentInfo(agentId: string, agent: TexeraAgent): AgentInfo { allowedOperatorTypes: agentSettings.allowedOperatorTypes, }; - const delegateConfig = agent.getDelegateConfig(); + const delegation = agent.getDelegation(); return { id: agentId, @@ -117,13 +118,13 @@ function getAgentInfo(agentId: string, agent: TexeraAgent): AgentInfo { modelType: agent.modelType, state: agent.getState(), createdAt: agent.createdAt, - delegate: delegateConfig + delegate: delegation ? { userToken: "***", - userInfo: delegateConfig.userInfo, - workflowId: delegateConfig.workflowId, - workflowName: delegateConfig.workflowName, - computingUnitId: delegateConfig.computingUnitId, + userInfo: delegation.userInfo, + workflowId: delegation.workflowId, + workflowName: delegation.workflowName, + computingUnitId: delegation.computingUnitId, } : undefined, settings: settingsApi, @@ -174,14 +175,14 @@ const agentsRouter = new Elysia({ prefix: "/agents" }) throw new Error("modelType is required"); } - let delegateConfig: AgentDelegateConfig | undefined; + let delegation: AgentDelegationDto | undefined; if (userToken) { if (!validateToken(userToken)) { throw new Error("Invalid or expired token"); } const userInfo = extractUserFromToken(userToken); - delegateConfig = { + delegation = { userToken, userInfo, workflowId, @@ -189,7 +190,7 @@ const agentsRouter = new Elysia({ prefix: "/agents" }) }; } - const { agentId, agent } = await createAgentInstance(modelType, name, delegateConfig); + const { agentId, agent } = await createAgentInstance(modelType, name, delegation); if (settings) { log.info( @@ -403,37 +404,6 @@ const agentsRouter = new Elysia({ prefix: "/agents" }) } ); -interface WsMessage { - type: "message" | "stop"; - content?: string; - messageSource?: "chat" | "feedback"; -} - -interface OperatorResultSummaryWs { - state: string; - inputTuples: number; - outputTuples: number; - inputPortShapes?: { portIndex: number; rows: number; columns: number }[]; - outputColumns?: number; - error?: string; - warnings?: string[]; - consoleLogCount?: number; - totalRowCount?: number; - sampleRecords?: Record[]; - resultStatistics?: Record; -} - -interface WsOutgoingMessage { - type: "step" | "state" | "error" | "complete" | "init" | "headChange"; - step?: ReActStep; - state?: string; - error?: string; - steps?: ReActStep[]; - headId?: string; - operatorResults?: Record; - workflowContent?: any; -} - function getOperatorResultSummaries(agent: TexeraAgent): Record { const resultState = agent.getWorkflowResultState(); const visible = resultState.getAllVisible(); @@ -631,9 +601,9 @@ function printStartupMessage(app: ReturnType) { console.log(""); console.log("Environment:"); console.log(` LLM_API_KEY: ${env.LLM_API_KEY === "dummy" ? "dummy (default)" : "set"}`); - console.log(` LLM_ENDPOINT: ${getBackendConfig().modelsEndpoint}`); - console.log(` WORKFLOW_COMPILING_SERVICE_ENDPOINT: ${getBackendConfig().compileEndpoint}`); - console.log(` TEXERA_DASHBOARD_SERVICE_ENDPOINT: ${getBackendConfig().apiEndpoint}`); + console.log(` LLM_ENDPOINT: ${getServiceEndpoints().modelsEndpoint}`); + console.log(` WORKFLOW_COMPILING_SERVICE_ENDPOINT: ${getServiceEndpoints().compileEndpoint}`); + console.log(` TEXERA_DASHBOARD_SERVICE_ENDPOINT: ${getServiceEndpoints().apiEndpoint}`); console.log(""); console.log("Features:"); console.log(" - Auto-persistence with debounce (500ms)"); diff --git a/agent-service/src/types/agent.ts b/agent-service/src/types/agent.ts index 765f5a7cb46..0b18d7494b7 100644 --- a/agent-service/src/types/agent.ts +++ b/agent-service/src/types/agent.ts @@ -17,6 +17,7 @@ * under the License. */ +import type { LanguageModel, ModelMessage } from "ai"; import type { WorkflowContent } from "./workflow"; export enum AgentState { @@ -115,7 +116,25 @@ export interface UserInfo { role: string; } -export interface AgentDelegateConfig { +/** + * Runtime delegation binding for an agent: the user it acts on behalf of and + * the workflow / computing unit it is bound to. `workflowId` is required + * because delegation is only established once a workflow has been resolved. + * This is in-memory state, not "configuration". + */ +export interface AgentDelegation { + userToken: string; + userInfo?: UserInfo; + workflowId: number; + workflowName?: string; + computingUnitId?: number; +} + +/** + * Wire shape of a delegation at the HTTP boundary: `workflowId` is optional + * (resolved server-side) and `userToken` is masked in outbound responses. + */ +export interface AgentDelegationDto { userToken: string; userInfo?: UserInfo; workflowId?: number; @@ -123,7 +142,8 @@ export interface AgentDelegateConfig { computingUnitId?: number; } -export interface AgentSettingsApi { +/** Tunable agent settings in their wire form (units in seconds/minutes). */ +export interface AgentSettingsDto { maxOperatorResultCharLimit?: number; maxOperatorResultCellCharLimit?: number; operatorResultSerializationMode?: "tsv"; @@ -134,14 +154,17 @@ export interface AgentSettingsApi { allowedOperatorTypes?: string[]; } +/** Body of `PATCH /agents/:id/settings`; identical to the settings DTO. */ +export type UpdateAgentSettingsRequest = AgentSettingsDto; + export interface AgentInfo { id: string; name: string; modelType: string; state: AgentState; createdAt: Date; - delegate?: AgentDelegateConfig; - settings?: AgentSettingsApi; + delegate?: AgentDelegationDto; + settings?: AgentSettingsDto; } export interface CreateAgentRequest { @@ -150,16 +173,35 @@ export interface CreateAgentRequest { userToken?: string; workflowId?: number; computingUnitId?: number; - settings?: AgentSettingsApi; + settings?: AgentSettingsDto; } -export interface UpdateAgentSettingsRequest { +/** Options for constructing a `TexeraAgent`. */ +export interface TexeraAgentOptions { + model: LanguageModel; + modelType: string; + agentId: string; + agentName?: string; + systemPrompt?: string; +} + +/** Outcome of a single `sendMessage` generation run. */ +export interface AgentMessageResult { + response: string; + messages: ModelMessage[]; + usage: TokenUsage; + stopped: boolean; + error?: string; +} + +export type ReActStepCallback = (step: ReActStep) => void; + +/** Parameters for a single workflow-execution request. */ +export interface ExecutionRequestParams { + userToken: string; + workflowId: number; + computingUnitId?: number; maxOperatorResultCharLimit?: number; maxOperatorResultCellCharLimit?: number; - operatorResultSerializationMode?: "tsv"; - toolTimeoutSeconds?: number; - executionTimeoutMinutes?: number; - disabledTools?: string[]; - maxSteps?: number; - allowedOperatorTypes?: string[]; + executionTimeoutMs?: number; } diff --git a/agent-service/src/types/api.ts b/agent-service/src/types/api.ts new file mode 100644 index 00000000000..ff10eb1ca87 --- /dev/null +++ b/agent-service/src/types/api.ts @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +// Wire DTOs: request/response bodies exchanged with backend services and the +// WebSocket frames this service sends to its own clients. Distinct from domain +// types (workflow.ts, execution.ts, agent.ts) which model in-memory state. + +import type { WorkflowContent, OperatorPortSchemaMap } from "./workflow"; +import type { ReActStep } from "./agent"; + +// --- Dashboard Service: workflow persistence --- + +export interface Workflow { + wid: number; + name: string; + description?: string; + content: WorkflowContent; + creationTime?: number; + lastModifiedTime?: number; + isPublished?: boolean; +} + +export interface WorkflowPersistRequest { + wid?: number; + name: string; + description?: string; + content: string; + isPublic?: boolean; +} + +// --- Workflow Compiling Service --- + +export interface WorkflowFatalError { + message: string; + details: string; + operatorId: string; + workerId: string; + type: { name: string }; + timestamp: { nanos: number; seconds: number }; +} + +export interface WorkflowCompilationResponse { + physicalPlan?: any; + operatorOutputSchemas: Record; + operatorErrors: Record; +} + +// --- This service's WebSocket protocol (/agents/:id/react) --- + +export interface WsMessage { + type: "message" | "stop"; + content?: string; + messageSource?: "chat" | "feedback"; +} + +export interface OperatorResultSummaryWs { + state: string; + inputTuples: number; + outputTuples: number; + inputPortShapes?: { portIndex: number; rows: number; columns: number }[]; + outputColumns?: number; + error?: string; + warnings?: string[]; + consoleLogCount?: number; + totalRowCount?: number; + sampleRecords?: Record[]; + resultStatistics?: Record; +} + +export interface WsOutgoingMessage { + type: "step" | "state" | "error" | "complete" | "init" | "headChange"; + step?: ReActStep; + state?: string; + error?: string; + steps?: ReActStep[]; + headId?: string; + operatorResults?: Record; + workflowContent?: any; +} diff --git a/agent-service/src/types/index.ts b/agent-service/src/types/index.ts index c6d7291e51d..c4ed59255b2 100644 --- a/agent-service/src/types/index.ts +++ b/agent-service/src/types/index.ts @@ -19,4 +19,6 @@ export * from "./workflow"; export * from "./execution"; +export * from "./metadata"; export * from "./agent"; +export * from "./api"; diff --git a/agent-service/src/api/backend-api.ts b/agent-service/src/types/metadata.ts similarity index 63% rename from agent-service/src/api/backend-api.ts rename to agent-service/src/types/metadata.ts index ffd2c59433f..18ae452a51c 100644 --- a/agent-service/src/api/backend-api.ts +++ b/agent-service/src/types/metadata.ts @@ -17,25 +17,9 @@ * under the License. */ -import { env } from "../config/env"; - -interface BackendConfig { - apiEndpoint: string; - modelsEndpoint: string; - compileEndpoint: string; - executionEndpoint: string; -} - -const currentConfig: BackendConfig = { - apiEndpoint: env.TEXERA_DASHBOARD_SERVICE_ENDPOINT, - modelsEndpoint: env.LLM_ENDPOINT, - compileEndpoint: env.WORKFLOW_COMPILING_SERVICE_ENDPOINT, - executionEndpoint: env.WORKFLOW_EXECUTION_SERVICE_ENDPOINT, -}; - -export function getBackendConfig(): BackendConfig { - return { ...currentConfig }; -} +// Operator metadata shapes served by the Dashboard Service +// (`/api/resources/operator-metadata`) and the compact variants the agent +// derives from them for prompts and validation. export interface InputPortInfo { displayName?: string; @@ -47,7 +31,7 @@ export interface OutputPortInfo { displayName?: string; } -interface OperatorAdditionalMetadata { +export interface OperatorAdditionalMetadata { userFriendlyName: string; operatorGroupName: string; operatorDescription?: string; @@ -66,7 +50,7 @@ export interface OperatorSchema { operatorVersion: string; } -interface GroupInfo { +export interface GroupInfo { groupName: string; children?: GroupInfo[] | null; } @@ -76,13 +60,15 @@ export interface OperatorMetadata { groups: GroupInfo[]; } -export async function fetchOperatorMetadata(): Promise { - const url = `${currentConfig.apiEndpoint}/api/resources/operator-metadata`; - const response = await fetch(url); - - if (!response.ok) { - throw new Error(`Failed to fetch operator metadata: ${response.status} ${response.statusText}`); - } +/** Full per-operator schema slice surfaced to debugging/inspection callers. */ +export interface OperatorSchemaInfo { + properties: any; + required: any; + definitions: any; +} - return (await response.json()) as OperatorMetadata; +/** Reduced operator schema (refs inlined, noise stripped) used in prompts and errors. */ +export interface CompactOperatorSchema { + properties: Record; + required: string[]; } diff --git a/agent-service/src/types/workflow.ts b/agent-service/src/types/workflow.ts index 52c6493cf5f..c8acf055785 100644 --- a/agent-service/src/types/workflow.ts +++ b/agent-service/src/types/workflow.ts @@ -95,17 +95,27 @@ export interface Point { readonly y: number; } +export interface Comment { + readonly content: string; + readonly creationTime: string; + readonly creatorName: string; + readonly creatorID: number; +} + export interface CommentBox { - readonly commentBoxID: string; - readonly comments: string; - readonly x: number; - readonly y: number; - readonly width: number; - readonly height: number; + commentBoxID: string; + comments: Comment[]; + commentBoxPosition: Point; +} + +export enum ExecutionMode { + PIPELINED = "PIPELINED", + MATERIALIZED = "MATERIALIZED", } export interface WorkflowSettings { readonly dataTransferBatchSize: number; + readonly executionMode: ExecutionMode; } export interface WorkflowContent { From 1cb99c49f947c58fc7711d5cc49f4e75a07cbf4b Mon Sep 17 00:00:00 2001 From: Bob Bai Date: Sun, 7 Jun 2026 23:44:29 -0700 Subject: [PATCH 2/4] test(agent-service): cover api clients, jwt, and agent lifecycle Add unit tests (mocking fetch) for the client layer flagged by Codecov on #5564, plus the auth and agent-lifecycle paths: - execution-client, operator-metadata-client, workflow-client and compile-client (now ~100% line coverage) - auth/jwt token decoding and validation - TexeraAgent delegation, settings, and history accessors Refs #5563 --- agent-service/src/agent/texera-agent.test.ts | 96 ++++++++++++ agent-service/src/api/compile-client.test.ts | 80 ++++++++++ .../src/api/execution-client.test.ts | 143 ++++++++++++++++++ .../src/api/operator-metadata-client.test.ts | 54 +++++++ agent-service/src/api/workflow-client.test.ts | 101 +++++++++++++ agent-service/src/auth/jwt.test.ts | 68 +++++++++ 6 files changed, 542 insertions(+) create mode 100644 agent-service/src/agent/texera-agent.test.ts create mode 100644 agent-service/src/api/compile-client.test.ts create mode 100644 agent-service/src/api/execution-client.test.ts create mode 100644 agent-service/src/api/operator-metadata-client.test.ts create mode 100644 agent-service/src/api/workflow-client.test.ts create mode 100644 agent-service/src/auth/jwt.test.ts diff --git a/agent-service/src/agent/texera-agent.test.ts b/agent-service/src/agent/texera-agent.test.ts new file mode 100644 index 00000000000..341f53418a0 --- /dev/null +++ b/agent-service/src/agent/texera-agent.test.ts @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { describe, expect, test } from "bun:test"; +import { TexeraAgent } from "./texera-agent"; +import { AgentState, INITIAL_STEP_ID } from "../types/agent"; + +// A LanguageModel is only used inside sendMessage(); the lifecycle/accessor +// methods exercised here never touch it, so a stub is sufficient. +function newAgent(agentName?: string) { + return new TexeraAgent({ model: {} as any, modelType: "test-model", agentId: "a1", agentName }); +} + +describe("TexeraAgent lifecycle", () => { + test("starts AVAILABLE with the initial step as HEAD and no history", () => { + const agent = newAgent("Tester"); + expect(agent.agentId).toBe("a1"); + expect(agent.agentName).toBe("Tester"); + expect(agent.modelType).toBe("test-model"); + expect(agent.getState()).toBe(AgentState.AVAILABLE); + expect(agent.getHead()).toBe(INITIAL_STEP_ID); + expect(agent.getReActSteps()).toEqual([]); + expect(agent.getAllSteps()).toEqual([]); + }); + + test("derives a default name from the id when none is given", () => { + expect(new TexeraAgent({ model: {} as any, modelType: "m", agentId: "x9" }).agentName).toBe("Agent-x9"); + }); +}); + +describe("TexeraAgent delegation", () => { + test("stores and returns its delegation", () => { + const agent = newAgent(); + expect(agent.getDelegation()).toBeUndefined(); + agent.setDelegation({ userToken: "tok", workflowId: 12, computingUnitId: 3 }); + expect(agent.getDelegation()).toEqual({ userToken: "tok", workflowId: 12, computingUnitId: 3 }); + }); + + test("exposes the executeOperator tool only after a delegation is set", () => { + const agent = newAgent(); + expect(agent.getSystemInfo().tools.map(t => t.name)).not.toContain("executeOperator"); + agent.setDelegation({ userToken: "tok", workflowId: 1 }); + expect(agent.getSystemInfo().tools.map(t => t.name)).toContain("executeOperator"); + }); +}); + +describe("TexeraAgent settings & history", () => { + test("updates settings, including allowed operator types", () => { + const agent = newAgent(); + agent.updateSettings({ maxSteps: 7, allowedOperatorTypes: ["Filter"] }); + const settings = agent.getSettings(); + expect(settings.maxSteps).toBe(7); + expect(settings.allowedOperatorTypes).toEqual(["Filter"]); + }); + + test("getReActStepsByOperatorIds returns all steps when no ids are given", () => { + const agent = newAgent(); + expect(agent.getReActStepsByOperatorIds([])).toEqual([]); + expect(agent.getReActStepsByOperatorIds(["op1"])).toEqual([]); + }); + + test("clearHistory resets HEAD to the initial step", () => { + const agent = newAgent(); + agent.clearHistory(); + expect(agent.getHead()).toBe(INITIAL_STEP_ID); + expect(agent.getAllSteps()).toEqual([]); + }); + + test("checkout accepts the initial step and rejects an unknown one", () => { + const agent = newAgent(); + expect(agent.checkout(INITIAL_STEP_ID)).toBe(true); + expect(agent.checkout("nonexistent")).toBe(false); + }); + + test("destroy leaves the agent with no history", () => { + const agent = newAgent(); + agent.destroy(); + expect(agent.getReActSteps()).toEqual([]); + }); +}); diff --git a/agent-service/src/api/compile-client.test.ts b/agent-service/src/api/compile-client.test.ts new file mode 100644 index 00000000000..4f32867892a --- /dev/null +++ b/agent-service/src/api/compile-client.test.ts @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { describe, expect, test, afterEach } from "bun:test"; +import { compileWorkflowAsync } from "./compile-client"; +import type { LogicalPlan } from "../types/workflow"; + +const realFetch = globalThis.fetch; + +interface CapturedRequest { + url: string; + init: any; +} + +let captured: CapturedRequest | undefined; + +function mockFetch(responder: (req: CapturedRequest) => Response | Promise): void { + globalThis.fetch = (async (input: any, init: any) => { + captured = { url: String(input), init }; + return responder(captured); + }) as unknown as typeof fetch; +} + +afterEach(() => { + globalThis.fetch = realFetch; + captured = undefined; +}); + +const PLAN: LogicalPlan = { + operators: [{ operatorID: "op1", operatorType: "CSVFileScan" }], + links: [], +}; + +describe("compileWorkflowAsync", () => { + test("POSTs the plan to the compiling service and returns the parsed response", async () => { + const payload = { operatorOutputSchemas: {}, operatorErrors: {} }; + mockFetch(() => new Response(JSON.stringify(payload), { status: 200 })); + + const result = await compileWorkflowAsync(PLAN); + + expect(result).toEqual(payload); + expect(captured?.url).toBe("http://localhost:9090/api/compile"); + expect(captured?.init.method).toBe("POST"); + const body = JSON.parse(captured!.init.body); + expect(body.operators).toEqual(PLAN.operators); + expect(body.links).toEqual([]); + expect(body.opsToReuseResult).toEqual([]); + expect(body.opsToViewResult).toEqual([]); + }); + + test("returns null when the compiling service responds non-OK", async () => { + mockFetch(() => new Response("compile error", { status: 400, statusText: "Bad Request" })); + + expect(await compileWorkflowAsync(PLAN)).toBeNull(); + }); + + test("returns null on a network failure", async () => { + mockFetch(() => { + throw new Error("connection refused"); + }); + + expect(await compileWorkflowAsync(PLAN)).toBeNull(); + }); +}); diff --git a/agent-service/src/api/execution-client.test.ts b/agent-service/src/api/execution-client.test.ts new file mode 100644 index 00000000000..0992d3ebe54 --- /dev/null +++ b/agent-service/src/api/execution-client.test.ts @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { describe, expect, test, afterEach } from "bun:test"; +import { executeWorkflowHttp } from "./execution-client"; +import { DEFAULT_AGENT_SETTINGS, type ExecutionRequestParams } from "../types/agent"; +import type { LogicalPlan } from "../types/workflow"; +import { env } from "../config/env"; + +const realFetch = globalThis.fetch; + +interface CapturedRequest { + url: string; + init: any; +} + +let captured: CapturedRequest | undefined; + +function mockFetch(responder: (req: CapturedRequest) => Response | Promise): void { + globalThis.fetch = (async (input: any, init: any) => { + captured = { url: String(input), init }; + return responder(captured); + }) as unknown as typeof fetch; +} + +function okResponse(body: unknown): Response { + return new Response(JSON.stringify(body), { status: 200, headers: { "Content-Type": "application/json" } }); +} + +afterEach(() => { + globalThis.fetch = realFetch; + captured = undefined; +}); + +const PLAN: LogicalPlan = { + operators: [{ operatorID: "op1", operatorType: "CSVFileScan" }], + links: [], + opsToViewResult: ["op1"], + opsToReuseResult: [], +}; + +const PARAMS: ExecutionRequestParams = { + userToken: "test-token", + workflowId: 42, + computingUnitId: 7, + executionTimeoutMs: 60_000, + maxOperatorResultCharLimit: 1000, + maxOperatorResultCellCharLimit: 500, +}; + +describe("executeWorkflowHttp", () => { + test("posts to the per-CU execution endpoint with auth and a complete request body", async () => { + mockFetch(() => okResponse({ success: true, state: "Completed", operators: {} })); + + const result = await executeWorkflowHttp(PARAMS, PLAN); + + expect(result).toEqual({ success: true, state: "Completed", operators: {} }); + expect(captured?.url).toBe("http://localhost:8085/api/execution/42/7/run"); + expect(captured?.init.method).toBe("POST"); + expect(captured?.init.headers.Authorization).toBe("Bearer test-token"); + + const body = JSON.parse(captured!.init.body); + expect(body.executionName).toBe("agent-execution"); + expect(body.logicalPlan.operators).toEqual(PLAN.operators); + expect(body.logicalPlan.opsToReuseResult).toEqual([]); + expect(body.targetOperatorIds).toEqual(["op1"]); + expect(body.timeoutSeconds).toBe(60); + expect(body.maxOperatorResultCharLimit).toBe(1000); + expect(body.maxOperatorResultCellCharLimit).toBe(500); + }); + + test("defaults computingUnitId to 0 and falls back to default limits/timeout", async () => { + mockFetch(() => okResponse({ success: true, state: "Completed", operators: {} })); + + await executeWorkflowHttp({ userToken: "t", workflowId: 42 }, PLAN); + + expect(captured?.url).toBe("http://localhost:8085/api/execution/42/0/run"); + const body = JSON.parse(captured!.init.body); + expect(body.timeoutSeconds).toBe(Math.ceil(DEFAULT_AGENT_SETTINGS.executionTimeoutMs / 1000)); + expect(body.maxOperatorResultCharLimit).toBe(DEFAULT_AGENT_SETTINGS.maxOperatorResultCharLimit); + expect(body.maxOperatorResultCellCharLimit).toBe(DEFAULT_AGENT_SETTINGS.maxOperatorResultCellCharLimit); + }); + + test("returns an error-state result when the backend responds non-OK", async () => { + mockFetch(() => new Response("boom", { status: 500, statusText: "Server Error" })); + + const result = await executeWorkflowHttp(PARAMS, PLAN); + + expect(result.success).toBe(false); + expect(result.state).toBe("Error"); + expect(result.operators).toEqual({}); + expect(result.errors?.[0]).toContain("500"); + expect(result.errors?.[0]).toContain("boom"); + }); + + test("returns an error-state result on a network failure", async () => { + mockFetch(() => { + throw new Error("network down"); + }); + + const result = await executeWorkflowHttp(PARAMS, PLAN); + + expect(result).toEqual({ success: false, state: "Error", operators: {}, errors: ["network down"] }); + }); + + test("re-throws AbortError so callers can detect cancellation", async () => { + mockFetch(() => { + const err = new Error("aborted"); + err.name = "AbortError"; + throw err; + }); + + await expect(executeWorkflowHttp(PARAMS, PLAN)).rejects.toThrow("aborted"); + }); + + test("uses EXECUTION_ENDPOINT_TEMPLATE with {cuid} substituted when configured", async () => { + const previous = env.EXECUTION_ENDPOINT_TEMPLATE; + (env as any).EXECUTION_ENDPOINT_TEMPLATE = "http://cu-{cuid}.svc:8085"; + try { + mockFetch(() => okResponse({ success: true, state: "Completed", operators: {} })); + await executeWorkflowHttp(PARAMS, PLAN); + expect(captured?.url).toBe("http://cu-7.svc:8085/api/execution/42/7/run"); + } finally { + (env as any).EXECUTION_ENDPOINT_TEMPLATE = previous; + } + }); +}); diff --git a/agent-service/src/api/operator-metadata-client.test.ts b/agent-service/src/api/operator-metadata-client.test.ts new file mode 100644 index 00000000000..79c77b93856 --- /dev/null +++ b/agent-service/src/api/operator-metadata-client.test.ts @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { describe, expect, test, afterEach } from "bun:test"; +import { fetchOperatorMetadata } from "./operator-metadata-client"; + +const realFetch = globalThis.fetch; +let lastUrl: string | undefined; + +function mockFetch(responder: () => Response): void { + globalThis.fetch = (async (input: any) => { + lastUrl = String(input); + return responder(); + }) as unknown as typeof fetch; +} + +afterEach(() => { + globalThis.fetch = realFetch; + lastUrl = undefined; +}); + +describe("fetchOperatorMetadata", () => { + test("fetches operator metadata from the Dashboard Service endpoint", async () => { + const payload = { operators: [], groups: [] }; + mockFetch(() => new Response(JSON.stringify(payload), { status: 200 })); + + const result = await fetchOperatorMetadata(); + + expect(result).toEqual(payload); + expect(lastUrl).toBe("http://localhost:8080/api/resources/operator-metadata"); + }); + + test("throws with status text when the response is not OK", async () => { + mockFetch(() => new Response("nope", { status: 503, statusText: "Service Unavailable" })); + + await expect(fetchOperatorMetadata()).rejects.toThrow("Failed to fetch operator metadata: 503"); + }); +}); diff --git a/agent-service/src/api/workflow-client.test.ts b/agent-service/src/api/workflow-client.test.ts new file mode 100644 index 00000000000..c71f8da1027 --- /dev/null +++ b/agent-service/src/api/workflow-client.test.ts @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { describe, expect, test, afterEach } from "bun:test"; +import { persistWorkflow, retrieveWorkflow } from "./workflow-client"; +import { ExecutionMode, type WorkflowContent } from "../types/workflow"; + +const realFetch = globalThis.fetch; + +interface CapturedRequest { + url: string; + init: any; +} + +let captured: CapturedRequest | undefined; + +function mockFetch(responder: (req: CapturedRequest) => Response): void { + globalThis.fetch = (async (input: any, init: any) => { + captured = { url: String(input), init }; + return responder(captured); + }) as unknown as typeof fetch; +} + +afterEach(() => { + globalThis.fetch = realFetch; + captured = undefined; +}); + +const CONTENT: WorkflowContent = { + operators: [], + operatorPositions: {}, + links: [], + commentBoxes: [], + settings: { dataTransferBatchSize: 400, executionMode: ExecutionMode.PIPELINED }, +}; + +describe("retrieveWorkflow", () => { + test("GETs the workflow with a bearer token and parses the stringified content", async () => { + mockFetch( + () => new Response(JSON.stringify({ wid: 5, name: "W", content: JSON.stringify(CONTENT) }), { status: 200 }) + ); + + const workflow = await retrieveWorkflow("tok", 5); + + expect(captured?.url).toBe("http://localhost:8080/api/workflow/5"); + expect(captured?.init.method).toBe("GET"); + expect(captured?.init.headers.Authorization).toBe("Bearer tok"); + expect(workflow.content).toEqual(CONTENT); + }); + + test("throws when the response is not OK", async () => { + mockFetch(() => new Response("missing", { status: 404, statusText: "Not Found" })); + + await expect(retrieveWorkflow("tok", 5)).rejects.toThrow("Failed to retrieve workflow: 404"); + }); +}); + +describe("persistWorkflow", () => { + test("POSTs the workflow with a stringified content payload", async () => { + mockFetch( + () => new Response(JSON.stringify({ wid: 5, name: "W", content: JSON.stringify(CONTENT) }), { status: 200 }) + ); + + const workflow = await persistWorkflow("tok", 5, "W", CONTENT, "desc"); + + expect(captured?.url).toBe("http://localhost:8080/api/workflow/persist"); + expect(captured?.init.method).toBe("POST"); + expect(captured?.init.headers.Authorization).toBe("Bearer tok"); + + const body = JSON.parse(captured!.init.body); + expect(body.wid).toBe(5); + expect(body.name).toBe("W"); + expect(body.description).toBe("desc"); + expect(body.isPublic).toBe(false); + expect(typeof body.content).toBe("string"); + expect(JSON.parse(body.content)).toEqual(CONTENT); + expect(workflow.content).toEqual(CONTENT); + }); + + test("throws when persistence fails", async () => { + mockFetch(() => new Response("bad", { status: 400, statusText: "Bad Request" })); + + await expect(persistWorkflow("tok", 5, "W", CONTENT)).rejects.toThrow("Failed to persist workflow: 400"); + }); +}); diff --git a/agent-service/src/auth/jwt.test.ts b/agent-service/src/auth/jwt.test.ts new file mode 100644 index 00000000000..c5da9b0ef01 --- /dev/null +++ b/agent-service/src/auth/jwt.test.ts @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { describe, expect, test } from "bun:test"; +import { extractUserFromToken, validateToken, createAuthHeaders } from "./jwt"; + +function makeToken(payload: Record): string { + const encode = (o: Record) => Buffer.from(JSON.stringify(o)).toString("base64"); + return `${encode({ alg: "none", typ: "JWT" })}.${encode(payload)}.signature`; +} + +const nowSeconds = () => Math.floor(Date.now() / 1000); + +describe("extractUserFromToken", () => { + test("maps JWT claims onto a UserInfo", () => { + const token = makeToken({ userId: 7, sub: "alice", email: "alice@example.com", role: "ADMIN" }); + expect(extractUserFromToken(token)).toEqual({ uid: 7, name: "alice", email: "alice@example.com", role: "ADMIN" }); + }); + + test("defaults missing email and role", () => { + const token = makeToken({ userId: 1, sub: "bob" }); + expect(extractUserFromToken(token)).toEqual({ uid: 1, name: "bob", email: "", role: "REGULAR" }); + }); + + test("throws on a malformed token", () => { + expect(() => extractUserFromToken("not-a-jwt")).toThrow("Failed to decode JWT"); + }); +}); + +describe("validateToken", () => { + test("accepts a token expiring in the future", () => { + expect(validateToken(makeToken({ sub: "a", exp: nowSeconds() + 3600 }))).toBe(true); + }); + + test("rejects an expired token", () => { + expect(validateToken(makeToken({ sub: "a", exp: nowSeconds() - 3600 }))).toBe(false); + }); + + test("treats a token without exp as valid", () => { + expect(validateToken(makeToken({ sub: "a" }))).toBe(true); + }); + + test("rejects a malformed token", () => { + expect(validateToken("garbage")).toBe(false); + }); +}); + +describe("createAuthHeaders", () => { + test("builds bearer auth headers", () => { + expect(createAuthHeaders("tok")).toEqual({ Authorization: "Bearer tok", "Content-Type": "application/json" }); + }); +}); From 08f95bfd801ab0b7f75bf62b6b41dfc5d9c6c340 Mon Sep 17 00:00:00 2001 From: Bob Bai Date: Mon, 8 Jun 2026 00:40:03 -0700 Subject: [PATCH 3/4] test(agent-service): cover delegate, execution, and tool paths Extend coverage for the integration lines Codecov flagged on #5564: - executeOperatorAndFormat: success, validation-error (no backend call), and backend-failure paths - TexeraAgent.sendMessage: workflow refresh, post-step auto-execution via an addOperator tool call, and debounced auto-persist, driven by a stub LanguageModelV2 with mocked fetch - server: delegated-agent creation (token masking) and graceful handling of a failed workflow load Refs #5563 --- agent-service/src/agent/texera-agent.test.ts | 190 +++++++++++++++++- .../tools/workflow-execution-tools.test.ts | 142 +++++++++++++ agent-service/src/server.test.ts | 75 +++++++ 3 files changed, 406 insertions(+), 1 deletion(-) create mode 100644 agent-service/src/agent/tools/workflow-execution-tools.test.ts diff --git a/agent-service/src/agent/texera-agent.test.ts b/agent-service/src/agent/texera-agent.test.ts index 341f53418a0..a40d9364562 100644 --- a/agent-service/src/agent/texera-agent.test.ts +++ b/agent-service/src/agent/texera-agent.test.ts @@ -17,9 +17,26 @@ * under the License. */ -import { describe, expect, test } from "bun:test"; +import { describe, expect, test, afterEach, beforeAll } from "bun:test"; import { TexeraAgent } from "./texera-agent"; +import { WorkflowSystemMetadata } from "./util/workflow-system-metadata"; import { AgentState, INITIAL_STEP_ID } from "../types/agent"; +import { ExecutionMode, type OperatorPredicate, type WorkflowContent } from "../types/workflow"; + +beforeAll(() => { + // Seed the metadata singleton so the addOperator tool can build CSVFileScan. + WorkflowSystemMetadata.getInstance().loadFromMetadata({ + operators: [ + { + operatorType: "CSVFileScan", + jsonSchema: { type: "object", properties: { fileName: { type: "string" } }, required: [] }, + additionalMetadata: { userFriendlyName: "CSV", operatorGroupName: "source", inputPorts: [], outputPorts: [{}] }, + operatorVersion: "1", + }, + ], + groups: [], + }); +}); // A LanguageModel is only used inside sendMessage(); the lifecycle/accessor // methods exercised here never touch it, so a stub is sufficient. @@ -94,3 +111,174 @@ describe("TexeraAgent settings & history", () => { expect(agent.getReActSteps()).toEqual([]); }); }); + +// --- Integration: drive a generation turn with a stub language model --- + +const realFetch = globalThis.fetch; + +const EMPTY_CONTENT: WorkflowContent = { + operators: [], + operatorPositions: {}, + links: [], + commentBoxes: [], + settings: { dataTransferBatchSize: 400, executionMode: ExecutionMode.PIPELINED }, +}; + +// A LanguageModelV2 stub that returns a single text response and then stops. +function textModel(text: string = "done") { + return { + specificationVersion: "v2", + provider: "mock", + modelId: "mock", + supportedUrls: {}, + async doGenerate() { + return { + content: [{ type: "text", text }], + finishReason: "stop", + usage: { inputTokens: 1, outputTokens: 1, totalTokens: 2 }, + warnings: [], + }; + }, + async doStream() { + throw new Error("stream unused"); + }, + } as any; +} + +function routeFetch(handler: (url: string, init: any) => Response): void { + globalThis.fetch = (async (input: any, init: any) => handler(String(input), init)) as unknown as typeof fetch; +} + +function jsonResponse(body: unknown): Response { + return new Response(JSON.stringify(body), { status: 200 }); +} + +afterEach(() => { + globalThis.fetch = realFetch; +}); + +describe("TexeraAgent.sendMessage", () => { + test("refreshes the delegated workflow and records a completed turn", async () => { + routeFetch(url => { + if (url.includes("/api/workflow/")) { + return jsonResponse({ wid: 9, name: "WF", content: JSON.stringify(EMPTY_CONTENT) }); + } + return jsonResponse({}); + }); + + const agent = new TexeraAgent({ model: textModel("all set"), modelType: "m", agentId: "gen1" }); + agent.setDelegation({ userToken: "tok", workflowId: 9 }); + + const result = await agent.sendMessage("hello"); + + expect(result.stopped).toBe(false); + expect(result.response).toBe("all set"); + expect(agent.getState()).toBe(AgentState.AVAILABLE); + const steps = agent.getReActSteps(); + expect(steps.some(s => s.role === "user" && s.content === "hello")).toBe(true); + expect(steps.some(s => s.role === "agent" && s.isEnd)).toBe(true); + }); +}); + +function toolThenText(toolInput: Record) { + let calls = 0; + return { + specificationVersion: "v2", + provider: "mock", + modelId: "mock", + supportedUrls: {}, + async doGenerate() { + calls += 1; + if (calls === 1) { + return { + content: [{ type: "tool-call", toolCallId: "c1", toolName: "addOperator", input: JSON.stringify(toolInput) }], + finishReason: "tool-calls", + usage: { inputTokens: 1, outputTokens: 1, totalTokens: 2 }, + warnings: [], + }; + } + return { + content: [{ type: "text", text: "done" }], + finishReason: "stop", + usage: { inputTokens: 1, outputTokens: 1, totalTokens: 2 }, + warnings: [], + }; + }, + async doStream() { + throw new Error("stream unused"); + }, + } as any; +} + +describe("TexeraAgent.sendMessage with tool execution", () => { + test("applies an addOperator tool call and runs post-step execution", async () => { + routeFetch(url => { + if (url.includes("/api/workflow/persist")) + return jsonResponse({ wid: 9, name: "WF", content: JSON.stringify(EMPTY_CONTENT) }); + if (url.includes("/api/compile")) return jsonResponse({ operatorOutputSchemas: {}, operatorErrors: {} }); + if (url.includes("/api/execution/")) { + return jsonResponse({ + success: true, + state: "Completed", + operators: { + op1: { + state: "Completed", + inputTuples: 0, + outputTuples: 1, + resultMode: "table", + result: [{ a: 1 }], + totalRowCount: 1, + }, + }, + }); + } + if (url.includes("/api/workflow/")) + return jsonResponse({ wid: 9, name: "WF", content: JSON.stringify(EMPTY_CONTENT) }); + return jsonResponse({}); + }); + + const model = toolThenText({ operatorId: "op1", operatorType: "CSVFileScan", properties: {}, summary: "load csv" }); + const agent = new TexeraAgent({ model, modelType: "m", agentId: "tool1" }); + agent.setDelegation({ userToken: "tok", workflowId: 9 }); + + const result = await agent.sendMessage("add a csv source"); + + expect(result.response).toBe("done"); + expect(agent.getWorkflowState().getOperator("op1")).toBeDefined(); + // The post-step auto-execution stored a result snapshot for op1. + expect(agent.getWorkflowResultState().getAllVisible().has("op1")).toBe(true); + agent.destroy(); + }); +}); + +describe("TexeraAgent auto-persist", () => { + test("persists the workflow to the backend after a debounced edit", async () => { + let persisted = false; + routeFetch(url => { + if (url.includes("/api/workflow/persist")) { + persisted = true; + return jsonResponse({ wid: 9, name: "WF", content: JSON.stringify(EMPTY_CONTENT) }); + } + return jsonResponse({ wid: 9, name: "WF", content: JSON.stringify(EMPTY_CONTENT) }); + }); + + const agent = new TexeraAgent({ model: textModel(), modelType: "m", agentId: "persist1" }); + agent.setDelegation({ userToken: "tok", workflowId: 9 }); + + const operator: OperatorPredicate = { + operatorID: "op1", + operatorType: "CSVFileScan", + operatorVersion: "1", + operatorProperties: {}, + inputPorts: [], + outputPorts: [{ portID: "output-0", displayName: "", disallowMultiInputs: false, isDynamicPort: false }], + showAdvanced: false, + } as OperatorPredicate; + agent.getWorkflowState().addOperator(operator); + + await new Promise(resolve => setTimeout(resolve, 750)); + + expect(persisted).toBe(true); + agent.destroy(); + }); +}); diff --git a/agent-service/src/agent/tools/workflow-execution-tools.test.ts b/agent-service/src/agent/tools/workflow-execution-tools.test.ts new file mode 100644 index 00000000000..2e6650751e9 --- /dev/null +++ b/agent-service/src/agent/tools/workflow-execution-tools.test.ts @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { describe, expect, test, beforeAll, afterEach } from "bun:test"; +import { executeOperatorAndFormat } from "./workflow-execution-tools"; +import { WorkflowState } from "../workflow-state"; +import { WorkflowSystemMetadata } from "../util/workflow-system-metadata"; +import type { ExecutionRequestParams } from "../../types/agent"; +import type { OperatorInfo, SyncExecutionResult } from "../../types/execution"; +import type { OperatorPredicate } from "../../types/workflow"; + +const realFetch = globalThis.fetch; + +let captured: { url: string; init: any } | undefined; + +function mockFetch(responder: () => Response): void { + globalThis.fetch = (async (input: any, init: any) => { + captured = { url: String(input), init }; + return responder(); + }) as unknown as typeof fetch; +} + +function jsonResponse(body: unknown): Response { + return new Response(JSON.stringify(body), { status: 200 }); +} + +afterEach(() => { + globalThis.fetch = realFetch; + captured = undefined; +}); + +const PARAMS: ExecutionRequestParams = { userToken: "tok", workflowId: 9, computingUnitId: 0 }; + +function sourceOperator(): OperatorPredicate { + return { + operatorID: "op1", + operatorType: "CSVFileScan", + operatorVersion: "1", + operatorProperties: {}, + inputPorts: [], + outputPorts: [{ portID: "output-0", displayName: "", disallowMultiInputs: false, isDynamicPort: false }], + showAdvanced: false, + } as OperatorPredicate; +} + +beforeAll(() => { + // Seed the singleton so schema/connection validation passes for CSVFileScan. + WorkflowSystemMetadata.getInstance().loadFromMetadata({ + operators: [ + { + operatorType: "CSVFileScan", + jsonSchema: { type: "object", properties: { fileName: { type: "string" } }, required: [] }, + additionalMetadata: { userFriendlyName: "CSV", operatorGroupName: "source", inputPorts: [], outputPorts: [{}] }, + operatorVersion: "1", + }, + ], + groups: [], + }); +}); + +describe("executeOperatorAndFormat", () => { + test("formats a successful execution result with table shape and rows", async () => { + const ws = new WorkflowState(); + ws.addOperator(sourceOperator()); + + const opInfo: OperatorInfo = { + state: "Completed", + inputTuples: 0, + outputTuples: 2, + resultMode: "table", + result: [{ a: 1 }, { a: 2 }], + totalRowCount: 2, + }; + mockFetch(() => + jsonResponse({ success: true, state: "Completed", operators: { op1: opInfo } } as SyncExecutionResult) + ); + + const seen: string[] = []; + const output = await executeOperatorAndFormat(ws, PARAMS, "op1", { onResult: opId => seen.push(opId) }); + + expect(captured?.url).toBe("http://localhost:8085/api/execution/9/0/run"); + expect(output).toContain("Executed operator op1"); + expect(output).toContain("Output table shape: (2, 1)"); + expect(output).toContain("a"); // header + expect(seen).toContain("op1"); + }); + + test("returns a validation error without calling the backend for an unconnected operator", async () => { + const ws = new WorkflowState(); + // An operator type that requires an input port but has none connected. + ws.addOperator({ + ...sourceOperator(), + operatorType: "CSVFileScan", + inputPorts: [{ portID: "input-0", displayName: "in", disallowMultiInputs: true, isDynamicPort: false }], + } as OperatorPredicate); + + let fetched = false; + mockFetch(() => { + fetched = true; + return jsonResponse({ success: true, state: "Completed", operators: {} }); + }); + + const output = await executeOperatorAndFormat(ws, PARAMS, "op1", {}); + + expect(output).toContain("[ERROR]"); + expect(fetched).toBe(false); + }); + + test("surfaces a backend execution failure as an error result", async () => { + const ws = new WorkflowState(); + ws.addOperator(sourceOperator()); + + mockFetch(() => + jsonResponse({ + success: false, + state: "Failed", + operators: { op1: { state: "Failed", inputTuples: 0, outputTuples: 0, resultMode: "table", error: "boom" } }, + } as SyncExecutionResult) + ); + + const output = await executeOperatorAndFormat(ws, PARAMS, "op1", {}); + + expect(output).toContain("[ERROR]"); + expect(output).toContain("boom"); + }); +}); diff --git a/agent-service/src/server.test.ts b/agent-service/src/server.test.ts index 0f618e599c2..dec1f85db0e 100644 --- a/agent-service/src/server.test.ts +++ b/agent-service/src/server.test.ts @@ -120,6 +120,81 @@ describe(`POST ${API}/agents`, () => { }); }); +describe(`POST ${API}/agents (delegated)`, () => { + function signToken(payload: Record): string { + const encode = (o: Record) => Buffer.from(JSON.stringify(o)).toString("base64"); + return `${encode({ alg: "none" })}.${encode(payload)}.sig`; + } + + const CONTENT = { + operators: [], + operatorPositions: {}, + links: [], + commentBoxes: [], + settings: { dataTransferBatchSize: 400, executionMode: "PIPELINED" }, + }; + + test("loads the workflow, binds the delegation, and masks the token", async () => { + const realFetch = globalThis.fetch; + globalThis.fetch = (async (input: any) => { + const u = String(input); + if (u.includes("operator-metadata")) + return new Response(JSON.stringify({ operators: [], groups: [] }), { status: 200 }); + if (u.includes("/api/workflow/")) { + return new Response(JSON.stringify({ wid: 9, name: "My WF", content: JSON.stringify(CONTENT) }), { + status: 200, + }); + } + return new Response("{}", { status: 200 }); + }) as unknown as typeof fetch; + + try { + const token = signToken({ + userId: 3, + sub: "alice", + email: "a@x.com", + role: "REGULAR", + exp: Math.floor(Date.now() / 1000) + 3600, + }); + const res = await postJson(`${API}/agents`, { modelType: "m", userToken: token, workflowId: 9 }); + expect(res.status).toBe(200); + + const agent = await readJson<{ + delegate: { userToken: string; workflowId: number; workflowName: string; userInfo: { name: string } }; + }>(res); + expect(agent.delegate.userToken).toBe("***"); + expect(agent.delegate.workflowId).toBe(9); + expect(agent.delegate.workflowName).toBe("My WF"); + expect(agent.delegate.userInfo.name).toBe("alice"); + } finally { + globalThis.fetch = realFetch; + } + }); + + test("still creates the agent (without binding) when the workflow load fails", async () => { + const realFetch = globalThis.fetch; + globalThis.fetch = (async (input: any) => { + const u = String(input); + if (u.includes("operator-metadata")) + return new Response(JSON.stringify({ operators: [], groups: [] }), { status: 200 }); + if (u.includes("/api/workflow/")) return new Response("nope", { status: 500, statusText: "Server Error" }); + return new Response("{}", { status: 200 }); + }) as unknown as typeof fetch; + + try { + const token = signToken({ userId: 3, sub: "alice", exp: Math.floor(Date.now() / 1000) + 3600 }); + const res = await postJson(`${API}/agents`, { modelType: "m", userToken: token, workflowId: 9 }); + expect(res.status).toBe(200); + + // Delegation is only bound on a successful load, so it stays undefined here. + const agent = await readJson<{ delegate: unknown }>(res); + expect(agent.delegate).toBeUndefined(); + } finally { + globalThis.fetch = realFetch; + } + }); +}); + describe(`GET ${API}/agents`, () => { test("empty store returns no agents", async () => { const res = await getJson(`${API}/agents`); From f821f70102b493b23b92fda56476d863b0be29b8 Mon Sep 17 00:00:00 2001 From: Bob Bai Date: Mon, 8 Jun 2026 01:05:28 -0700 Subject: [PATCH 4/4] test(agent-service): cover startup banner logging Export and exercise printStartupMessage, covering the three getServiceEndpoints() log lines that were the last patch-coverage gap on #5564. Refs #5563 --- agent-service/src/server.test.ts | 23 ++++++++++++++++++++++- agent-service/src/server.ts | 3 ++- 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/agent-service/src/server.test.ts b/agent-service/src/server.test.ts index dec1f85db0e..b4d46379774 100644 --- a/agent-service/src/server.test.ts +++ b/agent-service/src/server.test.ts @@ -18,7 +18,7 @@ */ import { beforeEach, describe, expect, test } from "bun:test"; -import { buildApp, _resetAgentStoreForTests } from "./server"; +import { buildApp, _resetAgentStoreForTests, printStartupMessage } from "./server"; import { env } from "./config/env"; const API = env.API_PREFIX; @@ -296,3 +296,24 @@ describe(`PATCH ${API}/agents/:id/settings`, () => { expect(reread.toolTimeoutSeconds).toBe(30); }); }); + +describe("printStartupMessage", () => { + test("prints the banner including the configured service endpoints", () => { + const original = console.log; + const lines: string[] = []; + console.log = (...args: unknown[]) => { + lines.push(args.join(" ")); + }; + try { + printStartupMessage(app); + } finally { + console.log = original; + } + + const out = lines.join("\n"); + expect(out).toContain("Texera Agent Service"); + expect(out).toContain("LLM_ENDPOINT:"); + expect(out).toContain("WORKFLOW_COMPILING_SERVICE_ENDPOINT:"); + expect(out).toContain("TEXERA_DASHBOARD_SERVICE_ENDPOINT:"); + }); +}); diff --git a/agent-service/src/server.ts b/agent-service/src/server.ts index 05ed22d8735..a9980a84613 100644 --- a/agent-service/src/server.ts +++ b/agent-service/src/server.ts @@ -568,7 +568,8 @@ export function _resetAgentStoreForTests(): void { agentCounter = 0; } -function printStartupMessage(app: ReturnType) { +// Exported for tests; called at boot from start(). +export function printStartupMessage(app: ReturnType) { const LINE = "=".repeat(60); console.log(LINE); console.log("Texera Agent Service (Elysia.js + RxJS)");