diff --git a/packages/opencode/src/mcp/index.ts b/packages/opencode/src/mcp/index.ts index 4128764b1318..5d2c93623c76 100644 --- a/packages/opencode/src/mcp/index.ts +++ b/packages/opencode/src/mcp/index.ts @@ -12,6 +12,7 @@ import { ToolSchema, type Tool as MCPToolDef, ToolListChangedNotificationSchema, + ResourceUpdatedNotificationSchema, } from "@modelcontextprotocol/sdk/types.js" import { Config } from "@/config/config" import { ConfigMCPV1 } from "@opencode-ai/core/v1/config/mcp" @@ -56,6 +57,14 @@ export const ToolsChanged = EventV2.define({ }, }) +export const ResourceUpdated = EventV2.define({ + type: "mcp.resource.updated", + schema: { + server: Schema.String, + uri: Schema.String, + }, +}) + export const BrowserOpenFailed = EventV2.define({ type: "mcp.browser.open.failed", schema: { @@ -508,18 +517,29 @@ export const layer = Layer.effect( ) function watch(s: State, name: string, client: MCPClient, bridge: EffectBridge.Shape, timeout?: number) { - if (!client.getServerCapabilities()?.tools) return - client.setNotificationHandler(ToolListChangedNotificationSchema, async () => { - log.info("tools list changed notification received", { server: name }) - if (s.clients[name] !== client || s.status[name]?.status !== "connected") return - - const listed = await bridge.promise(defs(name, client, timeout)) - if (!listed) return - if (s.clients[name] !== client || s.status[name]?.status !== "connected") return - - s.defs[name] = listed - await bridge.promise(events.publish(ToolsChanged, { server: name }).pipe(Effect.ignore)) - }) + const capabilities = client.getServerCapabilities() + if (capabilities?.tools) { + client.setNotificationHandler(ToolListChangedNotificationSchema, async () => { + log.info("tools list changed notification received", { server: name }) + if (s.clients[name] !== client || s.status[name]?.status !== "connected") return + + const listed = await bridge.promise(defs(name, client, timeout)) + if (!listed) return + if (s.clients[name] !== client || s.status[name]?.status !== "connected") return + + s.defs[name] = listed + await bridge.promise(events.publish(ToolsChanged, { server: name }).pipe(Effect.ignore)) + }) + } + if (capabilities?.resources?.subscribe) { + client.setNotificationHandler(ResourceUpdatedNotificationSchema, async (notification) => { + log.info("resource updated notification received", { server: name, uri: notification.params.uri }) + if (s.clients[name] !== client || s.status[name]?.status !== "connected") return + await bridge.promise( + events.publish(ResourceUpdated, { server: name, uri: notification.params.uri }).pipe(Effect.ignore), + ) + }) + } } const state = yield* InstanceState.make( @@ -768,9 +788,15 @@ export const layer = Layer.effect( }) const readResource = Effect.fn("MCP.readResource")(function* (clientName: string, resourceUri: string) { - return yield* withClient(clientName, (client) => client.readResource({ uri: resourceUri }), "readResource", { - resourceUri, - }) + return yield* withClient( + clientName, + async (client) => { + if (client.getServerCapabilities()?.resources?.subscribe) await client.subscribeResource({ uri: resourceUri }) + return client.readResource({ uri: resourceUri }) + }, + "readResource", + { resourceUri }, + ) }) const getMcpConfig = Effect.fnUntraced(function* (mcpName: string) { diff --git a/packages/opencode/test/mcp/lifecycle.test.ts b/packages/opencode/test/mcp/lifecycle.test.ts index 7bd5e4f00b4d..00192fcc74fe 100644 --- a/packages/opencode/test/mcp/lifecycle.test.ts +++ b/packages/opencode/test/mcp/lifecycle.test.ts @@ -1,7 +1,8 @@ import { expect, mock, beforeEach } from "bun:test" -import { Cause, Effect, Exit } from "effect" +import { Cause, Deferred, Effect, Exit } from "effect" import type { MCP as MCPNS } from "../../src/mcp/index" import { testEffect } from "../lib/effect" +import { GlobalBus, type GlobalEvent } from "../../src/bus/global" // --- Mock infrastructure --- @@ -12,6 +13,8 @@ interface MockClientState { listToolsCalls: number listPromptsCalls: number listResourcesCalls: number + readResourceCalls: number + subscribedResources: string[] requestCalls: number listToolsShouldFail: boolean listToolsError: string @@ -43,6 +46,8 @@ function getOrCreateClientState(name?: string): MockClientState { listToolsCalls: 0, listPromptsCalls: 0, listResourcesCalls: 0, + readResourceCalls: 0, + subscribedResources: [], requestCalls: 0, listToolsShouldFail: false, listToolsError: "listTools failed", @@ -173,6 +178,16 @@ void mock.module("@modelcontextprotocol/sdk/client/index.js", () => ({ return { resources: this._state?.resources ?? [] } } + async subscribeResource(input: { uri: string }) { + this._state?.subscribedResources.push(input.uri) + return {} + } + + async readResource(input: { uri: string }) { + if (this._state) this._state.readResourceCalls++ + return { contents: [{ uri: input.uri, text: "resource contents" }] } + } + async close() { if (this._state) this._state.closed = true } @@ -192,6 +207,7 @@ beforeEach(() => { // Import after mocks const { MCP } = await import("../../src/mcp/index") const { McpOAuthCallback } = await import("../../src/mcp/oauth-callback") +const { ResourceUpdatedNotificationSchema } = await import("@modelcontextprotocol/sdk/types.js") const it = testEffect(MCP.defaultLayer) @@ -610,6 +626,94 @@ it.instance( }, ) +it.instance( + "readResource() subscribes when the server supports resource updates", + () => + MCP.Service.use((mcp: MCPNS.Interface) => + Effect.gen(function* () { + lastCreatedClientName = "resource-server" + const serverState = getOrCreateClientState("resource-server") + serverState.capabilities = { resources: { subscribe: true } } + + yield* mcp.add("resource-server", { + type: "local", + command: ["echo", "test"], + }) + + const resource = yield* mcp.readResource("resource-server", "docs://readme") + expect(resource?.contents).toEqual([{ uri: "docs://readme", text: "resource contents" }]) + expect(serverState.subscribedResources).toEqual(["docs://readme"]) + expect(serverState.readResourceCalls).toBe(1) + }), + ), + { config: { mcp: {} } }, +) + +it.instance( + "resource updates require advertised subscription support", + () => + MCP.Service.use((mcp: MCPNS.Interface) => + Effect.gen(function* () { + lastCreatedClientName = "resource-server" + const serverState = getOrCreateClientState("resource-server") + serverState.capabilities = { resources: {} } + + yield* mcp.add("resource-server", { + type: "local", + command: ["echo", "test"], + }) + yield* mcp.readResource("resource-server", "docs://readme") + + expect(serverState.notificationHandlers.has(ResourceUpdatedNotificationSchema)).toBe(false) + expect(serverState.subscribedResources).toEqual([]) + expect(serverState.readResourceCalls).toBe(1) + }), + ), + { config: { mcp: {} } }, +) + +it.instance( + "resource update notifications publish server and URI for the current client only", + () => + MCP.Service.use((mcp: MCPNS.Interface) => + Effect.gen(function* () { + const event = yield* Deferred.make() + const listener = (value: GlobalEvent) => { + if (value.payload.type === MCP.ResourceUpdated.type) Deferred.doneUnsafe(event, Effect.succeed(value)) + } + GlobalBus.on("event", listener) + yield* Effect.addFinalizer(() => Effect.sync(() => GlobalBus.off("event", listener))) + + lastCreatedClientName = "resource-server-old" + const oldState = getOrCreateClientState("resource-server-old") + oldState.capabilities = { resources: { subscribe: true } } + yield* mcp.add("resource-server", { type: "local", command: ["echo", "test"] }) + const oldHandler = oldState.notificationHandlers.get(ResourceUpdatedNotificationSchema) + + lastCreatedClientName = "resource-server-new" + const newState = getOrCreateClientState("resource-server-new") + newState.capabilities = { resources: { subscribe: true } } + yield* mcp.add("resource-server", { type: "local", command: ["echo", "test"] }) + const newHandler = newState.notificationHandlers.get(ResourceUpdatedNotificationSchema) + + expect(oldHandler).toBeDefined() + expect(newHandler).toBeDefined() + yield* Effect.promise(() => + oldHandler!({ method: "notifications/resources/updated", params: { uri: "docs://stale" } }), + ) + yield* Effect.promise(() => + newHandler!({ method: "notifications/resources/updated", params: { uri: "docs://current" } }), + ) + + expect((yield* Deferred.await(event)).payload).toMatchObject({ + type: "mcp.resource.updated", + properties: { server: "resource-server", uri: "docs://current" }, + }) + }), + ), + { config: { mcp: {} } }, +) + it.instance( "resource-only servers connect without listing tools", () => diff --git a/packages/sdk/js/src/v2/gen/types.gen.ts b/packages/sdk/js/src/v2/gen/types.gen.ts index 9c57ccd15d29..912978f91245 100644 --- a/packages/sdk/js/src/v2/gen/types.gen.ts +++ b/packages/sdk/js/src/v2/gen/types.gen.ts @@ -74,6 +74,7 @@ export type Event = | EventTuiToastShow2 | EventTuiSessionSelect2 | EventMcpToolsChanged + | EventMcpResourceUpdated | EventMcpBrowserOpenFailed | EventCommandExecuted | EventProjectDirectoriesUpdated @@ -1463,6 +1464,14 @@ export type GlobalEvent = { server: string } } + | { + id: string + type: "mcp.resource.updated" + properties: { + server: string + uri: string + } + } | { id: string type: "mcp.browser.open.failed" @@ -5062,6 +5071,15 @@ export type EventMcpToolsChanged = { } } +export type EventMcpResourceUpdated = { + id: string + type: "mcp.resource.updated" + properties: { + server: string + uri: string + } +} + export type EventMcpBrowserOpenFailed = { id: string type: "mcp.browser.open.failed"