Skip to content

Commit 5f36973

Browse files
authored
Merge pull request #1884 from cliffhall/everything-server-multiple-connection-support
Everything server multiple connection support
2 parents e515378 + 3adf594 commit 5f36973

File tree

2 files changed

+54
-40
lines changed

2 files changed

+54
-40
lines changed

src/everything/sse.ts

Lines changed: 33 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -6,36 +6,48 @@ console.error('Starting SSE server...');
66

77
const app = express();
88

9-
const { server, cleanup } = createServer();
10-
11-
let transport: SSEServerTransport;
9+
const transports: Map<string, SSEServerTransport> = new Map<string, SSEServerTransport>();
1210

1311
app.get("/sse", async (req, res) => {
14-
console.error("Received connection");
15-
transport = new SSEServerTransport("/message", res);
16-
await server.connect(transport);
17-
18-
server.onclose = async () => {
19-
await cleanup();
20-
await server.close();
21-
};
12+
let transport: SSEServerTransport;
13+
const { server, cleanup } = createServer();
14+
15+
if (req?.query?.sessionId) {
16+
const sessionId = (req?.query?.sessionId as string);
17+
transport = transports.get(sessionId) as SSEServerTransport;
18+
console.error("Client Reconnecting? This shouldn't happen; when client has a sessionId, GET /sse should not be called again.", transport.sessionId);
19+
} else {
20+
// Create and store transport for new session
21+
transport = new SSEServerTransport("/message", res);
22+
transports.set(transport.sessionId, transport);
23+
24+
// Connect server to transport
25+
await server.connect(transport);
26+
console.error("Client Connected: ", transport.sessionId);
27+
28+
// Handle close of connection
29+
server.onclose = async () => {
30+
console.error("Client Disconnected: ", transport.sessionId);
31+
transports.delete(transport.sessionId);
32+
await cleanup();
33+
};
34+
35+
}
2236

2337
});
2438

2539
app.post("/message", async (req, res) => {
26-
console.error("Received message");
27-
28-
await transport.handlePostMessage(req, res);
29-
});
30-
31-
process.on("SIGINT", async () => {
32-
await cleanup();
33-
await server.close();
34-
process.exit(0);
40+
const sessionId = (req?.query?.sessionId as string);
41+
const transport = transports.get(sessionId);
42+
if (transport) {
43+
console.error("Client Message from", sessionId);
44+
await transport.handlePostMessage(req, res);
45+
} else {
46+
console.error(`No transport found for sessionId ${sessionId}`)
47+
}
3548
});
3649

3750
const PORT = process.env.PORT || 3001;
3851
app.listen(PORT, () => {
3952
console.error(`Server is running on port ${PORT}`);
4053
});
41-

src/everything/streamableHttp.ts

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,7 @@ console.error('Starting Streamable HTTP server...');
88

99
const app = express();
1010

11-
const { server, cleanup } = createServer();
12-
13-
const transports: { [sessionId: string]: StreamableHTTPServerTransport } = {};
11+
const transports: Map<string, StreamableHTTPServerTransport> = new Map<string, StreamableHTTPServerTransport>();
1412

1513
app.post('/mcp', async (req: Request, res: Response) => {
1614
console.error('Received MCP POST request');
@@ -19,10 +17,13 @@ app.post('/mcp', async (req: Request, res: Response) => {
1917
const sessionId = req.headers['mcp-session-id'] as string | undefined;
2018
let transport: StreamableHTTPServerTransport;
2119

22-
if (sessionId && transports[sessionId]) {
20+
if (sessionId && transports.has(sessionId)) {
2321
// Reuse existing transport
24-
transport = transports[sessionId];
22+
transport = transports.get(sessionId)!;
2523
} else if (!sessionId) {
24+
25+
const { server, cleanup } = createServer();
26+
2627
// New initialization request
2728
const eventStore = new InMemoryEventStore();
2829
transport = new StreamableHTTPServerTransport({
@@ -32,16 +33,18 @@ app.post('/mcp', async (req: Request, res: Response) => {
3233
// Store the transport by session ID when session is initialized
3334
// This avoids race conditions where requests might come in before the session is stored
3435
console.error(`Session initialized with ID: ${sessionId}`);
35-
transports[sessionId] = transport;
36+
transports.set(sessionId, transport);
3637
}
3738
});
3839

40+
3941
// Set up onclose handler to clean up transport when closed
40-
transport.onclose = () => {
42+
server.onclose = async () => {
4143
const sid = transport.sessionId;
42-
if (sid && transports[sid]) {
44+
if (sid && transports.has(sid)) {
4345
console.error(`Transport closed for session ${sid}, removing from transports map`);
44-
delete transports[sid];
46+
transports.delete(sid);
47+
await cleanup();
4548
}
4649
};
4750

@@ -87,7 +90,7 @@ app.post('/mcp', async (req: Request, res: Response) => {
8790
app.get('/mcp', async (req: Request, res: Response) => {
8891
console.error('Received MCP GET request');
8992
const sessionId = req.headers['mcp-session-id'] as string | undefined;
90-
if (!sessionId || !transports[sessionId]) {
93+
if (!sessionId || !transports.has(sessionId)) {
9194
res.status(400).json({
9295
jsonrpc: '2.0',
9396
error: {
@@ -107,14 +110,14 @@ app.get('/mcp', async (req: Request, res: Response) => {
107110
console.error(`Establishing new SSE stream for session ${sessionId}`);
108111
}
109112

110-
const transport = transports[sessionId];
111-
await transport.handleRequest(req, res);
113+
const transport = transports.get(sessionId);
114+
await transport!.handleRequest(req, res);
112115
});
113116

114117
// Handle DELETE requests for session termination (according to MCP spec)
115118
app.delete('/mcp', async (req: Request, res: Response) => {
116119
const sessionId = req.headers['mcp-session-id'] as string | undefined;
117-
if (!sessionId || !transports[sessionId]) {
120+
if (!sessionId || !transports.has(sessionId)) {
118121
res.status(400).json({
119122
jsonrpc: '2.0',
120123
error: {
@@ -129,8 +132,8 @@ app.delete('/mcp', async (req: Request, res: Response) => {
129132
console.error(`Received session termination request for session ${sessionId}`);
130133

131134
try {
132-
const transport = transports[sessionId];
133-
await transport.handleRequest(req, res);
135+
const transport = transports.get(sessionId);
136+
await transport!.handleRequest(req, res);
134137
} catch (error) {
135138
console.error('Error handling session termination:', error);
136139
if (!res.headersSent) {
@@ -161,14 +164,13 @@ process.on('SIGINT', async () => {
161164
for (const sessionId in transports) {
162165
try {
163166
console.error(`Closing transport for session ${sessionId}`);
164-
await transports[sessionId].close();
165-
delete transports[sessionId];
167+
await transports.get(sessionId)!.close();
168+
transports.delete(sessionId);
166169
} catch (error) {
167170
console.error(`Error closing transport for session ${sessionId}:`, error);
168171
}
169172
}
170-
await cleanup();
171-
await server.close();
173+
172174
console.error('Server shutdown complete');
173175
process.exit(0);
174176
});

0 commit comments

Comments
 (0)