diff --git a/.changeset/replay-request-sse-disconnects.md b/.changeset/replay-request-sse-disconnects.md new file mode 100644 index 0000000000..8e977563f0 --- /dev/null +++ b/.changeset/replay-request-sse-disconnects.md @@ -0,0 +1,6 @@ +--- +"@modelcontextprotocol/node": patch +"@modelcontextprotocol/server": patch +--- + +Replay pending request-scoped SSE responses after a client reconnects. diff --git a/packages/middleware/node/test/streamableHttp.test.ts b/packages/middleware/node/test/streamableHttp.test.ts index c427aa2eea..4c315b307c 100644 --- a/packages/middleware/node/test/streamableHttp.test.ts +++ b/packages/middleware/node/test/streamableHttp.test.ts @@ -1954,6 +1954,99 @@ describe('Zod v4', () => { toolResolve!(); }); + it('should replay terminal response after request-scoped SSE stream is closed', async () => { + const result = await createTestServer({ + sessionIdGenerator: () => randomUUID(), + eventStore: createEventStore(), + retryInterval: 1000 + }); + server = result.server; + transport = result.transport; + baseUrl = result.baseUrl; + mcpServer = result.mcpServer; + + let streamCloseCalled = false; + let toolResolve: () => void; + const toolCompletePromise = new Promise(resolve => { + toolResolve = resolve; + }); + + mcpServer.registerTool('close-and-finish-tool', { description: 'Closes and then finishes' }, async ctx => { + ctx.http?.closeSSE?.(); + streamCloseCalled = true; + await toolCompletePromise; + return { content: [{ type: 'text', text: 'finished after reconnect' }] }; + }); + + const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize); + sessionId = initResponse.headers.get('mcp-session-id') as string; + expect(sessionId).toBeDefined(); + + const toolCallRequest: JSONRPCMessage = { + jsonrpc: '2.0', + id: 101, + method: 'tools/call', + params: { name: 'close-and-finish-tool', arguments: {} } + }; + + const postResponse = await fetch(baseUrl, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Accept: 'text/event-stream, application/json', + 'mcp-session-id': sessionId, + 'mcp-protocol-version': '2025-11-25' + }, + body: JSON.stringify(toolCallRequest) + }); + + expect(postResponse.status).toBe(200); + + const reader = postResponse.body?.getReader(); + const priming = await reader!.read(); + const primingText = new TextDecoder().decode(priming.value); + const idMatch = primingText.match(/id: ([^\n]+)/); + expect(idMatch).toBeTruthy(); + const lastEventId = idMatch![1]!; + + await new Promise(resolve => setTimeout(resolve, 100)); + expect(streamCloseCalled).toBe(true); + + const { done } = await reader!.read(); + expect(done).toBe(true); + + toolResolve!(); + await new Promise(resolve => setTimeout(resolve, 100)); + + const reconnectResponse = await fetch(baseUrl, { + method: 'GET', + headers: { + Accept: 'text/event-stream', + 'mcp-session-id': sessionId, + 'mcp-protocol-version': '2025-11-25', + 'last-event-id': lastEventId + } + }); + + expect(reconnectResponse.status).toBe(200); + + const reconnectReader = reconnectResponse.body?.getReader(); + let replayedText = ''; + const timeout = setTimeout(() => reconnectReader!.cancel(), 2000); + try { + while (!replayedText.includes('finished after reconnect')) { + const { value, done } = await reconnectReader!.read(); + if (done) break; + replayedText += new TextDecoder().decode(value); + } + } finally { + clearTimeout(timeout); + } + + expect(replayedText).toContain('finished after reconnect'); + expect(replayedText).toContain('"id":101'); + }); + it('should provide closeSSEStream callback in ctx when eventStore is configured', async () => { const result = await createTestServer({ sessionIdGenerator: () => randomUUID(), diff --git a/packages/server/src/server/streamableHttp.ts b/packages/server/src/server/streamableHttp.ts index fd3563a077..dcd307e76e 100644 --- a/packages/server/src/server/streamableHttp.ts +++ b/packages/server/src/server/streamableHttp.ts @@ -985,14 +985,12 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { const stream = this._streamMapping.get(streamId); - if (!this._enableJsonResponse && stream?.controller && stream?.encoder) { - // For SSE responses, generate event ID if event store is provided - let eventId: string | undefined; + let eventId: string | undefined; + if (!this._enableJsonResponse && this._eventStore) { + eventId = await this._eventStore.storeEvent(streamId, message); + } - if (this._eventStore) { - eventId = await this._eventStore.storeEvent(streamId, message); - } - // Write the event to the response stream + if (!this._enableJsonResponse && stream?.controller && stream?.encoder) { this.writeSSEEvent(stream.controller, stream.encoder, message, eventId); } @@ -1005,6 +1003,13 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { if (allResponsesReady) { if (!stream) { + if (!this._enableJsonResponse && this._eventStore) { + for (const id of relatedIds) { + this._requestResponseMap.delete(id); + this._requestToStreamMapping.delete(id); + } + return; + } throw new Error(`No connection established for request ID: ${String(requestId)}`); } if (this._enableJsonResponse && stream.resolveJson) {