Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 93 additions & 0 deletions packages/middleware/node/test/streamableHttp.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1954,6 +1954,99 @@ describe('Zod v4', () => {
toolResolve!();
});

it('should replay terminal response after request-scoped SSE stream is closed', async () => {
const result = await createTestServer({
sessionIdGenerator: () => randomUUID(),
eventStore: createEventStore(),
retryInterval: 1000
});
server = result.server;
transport = result.transport;
baseUrl = result.baseUrl;
mcpServer = result.mcpServer;

let streamCloseCalled = false;
let toolResolve: () => void;
const toolCompletePromise = new Promise<void>(resolve => {
toolResolve = resolve;
});

mcpServer.registerTool('close-and-finish-tool', { description: 'Closes and then finishes' }, async ctx => {
ctx.http?.closeSSE?.();
streamCloseCalled = true;
await toolCompletePromise;
return { content: [{ type: 'text', text: 'finished after reconnect' }] };
});

const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize);
sessionId = initResponse.headers.get('mcp-session-id') as string;
expect(sessionId).toBeDefined();

const toolCallRequest: JSONRPCMessage = {
jsonrpc: '2.0',
id: 101,
method: 'tools/call',
params: { name: 'close-and-finish-tool', arguments: {} }
};

const postResponse = await fetch(baseUrl, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Accept: 'text/event-stream, application/json',
'mcp-session-id': sessionId,
'mcp-protocol-version': '2025-11-25'
},
body: JSON.stringify(toolCallRequest)
});

expect(postResponse.status).toBe(200);

const reader = postResponse.body?.getReader();
const priming = await reader!.read();
const primingText = new TextDecoder().decode(priming.value);
const idMatch = primingText.match(/id: ([^\n]+)/);
expect(idMatch).toBeTruthy();
const lastEventId = idMatch![1]!;

await new Promise(resolve => setTimeout(resolve, 100));
expect(streamCloseCalled).toBe(true);

const { done } = await reader!.read();
expect(done).toBe(true);

toolResolve!();
await new Promise(resolve => setTimeout(resolve, 100));

const reconnectResponse = await fetch(baseUrl, {
method: 'GET',
headers: {
Accept: 'text/event-stream',
'mcp-session-id': sessionId,
'mcp-protocol-version': '2025-11-25',
'last-event-id': lastEventId
}
});

expect(reconnectResponse.status).toBe(200);

const reconnectReader = reconnectResponse.body?.getReader();
let replayedText = '';
const timeout = setTimeout(() => reconnectReader!.cancel(), 2000);
try {
while (!replayedText.includes('finished after reconnect')) {
const { value, done } = await reconnectReader!.read();
if (done) break;
replayedText += new TextDecoder().decode(value);
}
} finally {
clearTimeout(timeout);
}

expect(replayedText).toContain('finished after reconnect');
expect(replayedText).toContain('"id":101');
});

it('should provide closeSSEStream callback in ctx when eventStore is configured', async () => {
const result = await createTestServer({
sessionIdGenerator: () => randomUUID(),
Expand Down
19 changes: 12 additions & 7 deletions packages/server/src/server/streamableHttp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -985,14 +985,12 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {

const stream = this._streamMapping.get(streamId);

if (!this._enableJsonResponse && stream?.controller && stream?.encoder) {
// For SSE responses, generate event ID if event store is provided
let eventId: string | undefined;
let eventId: string | undefined;
if (!this._enableJsonResponse && this._eventStore) {
eventId = await this._eventStore.storeEvent(streamId, message);
}

if (this._eventStore) {
eventId = await this._eventStore.storeEvent(streamId, message);
}
// Write the event to the response stream
if (!this._enableJsonResponse && stream?.controller && stream?.encoder) {
this.writeSSEEvent(stream.controller, stream.encoder, message, eventId);
}

Expand All @@ -1005,6 +1003,13 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {

if (allResponsesReady) {
if (!stream) {
if (!this._enableJsonResponse && this._eventStore) {
for (const id of relatedIds) {
this._requestResponseMap.delete(id);
this._requestToStreamMapping.delete(id);
}
return;
}
throw new Error(`No connection established for request ID: ${String(requestId)}`);
}
if (this._enableJsonResponse && stream.resolveJson) {
Expand Down
Loading