From ef0769e5d07a11c091d58b130c7ad3739629769d Mon Sep 17 00:00:00 2001 From: Daniel Pittman Date: Thu, 5 Mar 2026 15:54:09 -0700 Subject: [PATCH] fix: call onerror callback for all transport error responses Several error paths in WebStandardStreamableHTTPServerTransport returned error responses via createJsonErrorResponse() without calling the onerror callback, making these errors unobservable for debugging and logging purposes. Fixes #1395 --- packages/server/src/server/streamableHttp.ts | 23 +- .../server/test/server/streamableHttp.test.ts | 260 ++++++++++++++++++ 2 files changed, 278 insertions(+), 5 deletions(-) diff --git a/packages/server/src/server/streamableHttp.ts b/packages/server/src/server/streamableHttp.ts index 74e689892..0f5f71125 100644 --- a/packages/server/src/server/streamableHttp.ts +++ b/packages/server/src/server/streamableHttp.ts @@ -404,6 +404,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { // The client MUST include an Accept header, listing text/event-stream as a supported content type. const acceptHeader = req.headers.get('accept'); if (!acceptHeader?.includes('text/event-stream')) { + this.onerror?.(new Error('Not Acceptable: Client must accept text/event-stream')); return this.createJsonErrorResponse(406, -32_000, 'Not Acceptable: Client must accept text/event-stream'); } @@ -430,6 +431,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { // Check if there's already an active standalone SSE stream for this session if (this._streamMapping.get(this._standaloneSseStreamId) !== undefined) { // Only one GET SSE stream is allowed per session + this.onerror?.(new Error('Conflict: Only one SSE stream is allowed per session')); return this.createJsonErrorResponse(409, -32_000, 'Conflict: Only one SSE stream is allowed per session'); } @@ -481,6 +483,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { */ private async replayEvents(lastEventId: string): Promise { if (!this._eventStore) { + this.onerror?.(new Error('Event store not configured')); return this.createJsonErrorResponse(400, -32_000, 'Event store not configured'); } @@ -491,11 +494,13 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { streamId = await this._eventStore.getStreamIdForEventId(lastEventId); if (!streamId) { + this.onerror?.(new Error('Invalid event ID format')); return this.createJsonErrorResponse(400, -32_000, 'Invalid event ID format'); } // Check conflict with the SAME streamId we'll use for mapping if (this._streamMapping.get(streamId) !== undefined) { + this.onerror?.(new Error('Conflict: Stream already has an active connection')); return this.createJsonErrorResponse(409, -32_000, 'Conflict: Stream already has an active connection'); } } @@ -586,6 +591,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { * Handles unsupported requests (`PUT`, `PATCH`, etc.) */ private handleUnsupportedRequest(): Response { + this.onerror?.(new Error('Method not allowed.')); return Response.json( { jsonrpc: '2.0', @@ -614,6 +620,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { const acceptHeader = req.headers.get('accept'); // The client MUST include an Accept header, listing both application/json and text/event-stream as supported content types. if (!acceptHeader?.includes('application/json') || !acceptHeader.includes('text/event-stream')) { + this.onerror?.(new Error('Not Acceptable: Client must accept both application/json and text/event-stream')); return this.createJsonErrorResponse( 406, -32_000, @@ -623,6 +630,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { const ct = req.headers.get('content-type'); if (!ct || !ct.includes('application/json')) { + this.onerror?.(new Error('Unsupported Media Type: Content-Type must be application/json')); return this.createJsonErrorResponse(415, -32_000, 'Unsupported Media Type: Content-Type must be application/json'); } @@ -636,6 +644,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { try { rawMessage = await req.json(); } catch { + this.onerror?.(new Error('Parse error: Invalid JSON')); return this.createJsonErrorResponse(400, -32_700, 'Parse error: Invalid JSON'); } } else { @@ -650,6 +659,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { ? rawMessage.map(msg => JSONRPCMessageSchema.parse(msg)) : [JSONRPCMessageSchema.parse(rawMessage)]; } catch { + this.onerror?.(new Error('Parse error: Invalid JSON-RPC message')); return this.createJsonErrorResponse(400, -32_700, 'Parse error: Invalid JSON-RPC message'); } @@ -660,9 +670,11 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { // If it's a server with session management and the session ID is already set we should reject the request // to avoid re-initialization. if (this._initialized && this.sessionId !== undefined) { + this.onerror?.(new Error('Invalid Request: Server already initialized')); return this.createJsonErrorResponse(400, -32_600, 'Invalid Request: Server already initialized'); } if (messages.length > 1) { + this.onerror?.(new Error('Invalid Request: Only one initialization request is allowed')); return this.createJsonErrorResponse(400, -32_600, 'Invalid Request: Only one initialization request is allowed'); } this.sessionId = this.sessionIdGenerator?.(); @@ -842,6 +854,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { } if (!this._initialized) { // If the server has not been initialized yet, reject all requests + this.onerror?.(new Error('Bad Request: Server not initialized')); return this.createJsonErrorResponse(400, -32_000, 'Bad Request: Server not initialized'); } @@ -849,11 +862,13 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { if (!sessionId) { // Non-initialization requests without a session ID should return 400 Bad Request + this.onerror?.(new Error('Bad Request: Mcp-Session-Id header is required')); return this.createJsonErrorResponse(400, -32_000, 'Bad Request: Mcp-Session-Id header is required'); } if (sessionId !== this.sessionId) { // Reject requests with invalid session ID with 404 Not Found + this.onerror?.(new Error('Session not found')); return this.createJsonErrorResponse(404, -32_001, 'Session not found'); } @@ -877,11 +892,9 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { const protocolVersion = req.headers.get('mcp-protocol-version'); if (protocolVersion !== null && !this._supportedProtocolVersions.includes(protocolVersion)) { - return this.createJsonErrorResponse( - 400, - -32_000, - `Bad Request: Unsupported protocol version: ${protocolVersion} (supported versions: ${this._supportedProtocolVersions.join(', ')})` - ); + const message = `Bad Request: Unsupported protocol version: ${protocolVersion} (supported versions: ${this._supportedProtocolVersions.join(', ')})`; + this.onerror?.(new Error(message)); + return this.createJsonErrorResponse(400, -32_000, message); } return undefined; } diff --git a/packages/server/test/server/streamableHttp.test.ts b/packages/server/test/server/streamableHttp.test.ts index ab6f22342..b22762297 100644 --- a/packages/server/test/server/streamableHttp.test.ts +++ b/packages/server/test/server/streamableHttp.test.ts @@ -765,4 +765,264 @@ describe('Zod v4', () => { await expect(transport.start()).rejects.toThrow('Transport already started'); }); }); + + describe('HTTPServerTransport - onerror callback', () => { + let transport: WebStandardStreamableHTTPServerTransport; + let mcpServer: McpServer; + let sessionId: string; + let onerrorSpy: ReturnType; + + beforeEach(async () => { + mcpServer = new McpServer({ name: 'test-server', version: '1.0.0' }, { capabilities: { logging: {} } }); + + mcpServer.registerTool( + 'greet', + { + description: 'A simple greeting tool', + inputSchema: z.object({ name: z.string().describe('Name to greet') }) + }, + async ({ name }): Promise => { + return { content: [{ type: 'text', text: `Hello, ${name}!` }] }; + } + ); + + transport = new WebStandardStreamableHTTPServerTransport({ + sessionIdGenerator: () => randomUUID() + }); + + onerrorSpy = vi.fn(); + transport.onerror = onerrorSpy; + + await mcpServer.connect(transport); + }); + + afterEach(async () => { + await transport.close(); + }); + + async function initializeServer(): Promise { + const request = createRequest('POST', TEST_MESSAGES.initialize); + const response = await transport.handleRequest(request); + expect(response.status).toBe(200); + const newSessionId = response.headers.get('mcp-session-id'); + expect(newSessionId).toBeDefined(); + return newSessionId as string; + } + + it('should call onerror for POST with wrong Accept header', async () => { + const request = createRequest('POST', TEST_MESSAGES.initialize, { accept: 'application/json' }); + const response = await transport.handleRequest(request); + + expect(response.status).toBe(406); + expect(onerrorSpy).toHaveBeenCalledWith( + expect.objectContaining({ + message: expect.stringMatching(/Not Acceptable/) + }) + ); + }); + + it('should call onerror for POST with wrong Content-Type', async () => { + const request = new Request('http://localhost/mcp', { + method: 'POST', + headers: { + Accept: 'application/json, text/event-stream', + 'Content-Type': 'text/plain' + }, + body: JSON.stringify(TEST_MESSAGES.initialize) + }); + const response = await transport.handleRequest(request); + + expect(response.status).toBe(415); + expect(onerrorSpy).toHaveBeenCalledWith( + expect.objectContaining({ + message: expect.stringMatching(/Unsupported Media Type/) + }) + ); + }); + + it('should call onerror for invalid JSON body', async () => { + const request = new Request('http://localhost/mcp', { + method: 'POST', + headers: { + Accept: 'application/json, text/event-stream', + 'Content-Type': 'application/json' + }, + body: 'not valid json' + }); + const response = await transport.handleRequest(request); + + expect(response.status).toBe(400); + expect(onerrorSpy).toHaveBeenCalledWith( + expect.objectContaining({ + message: expect.stringMatching(/Parse error.*Invalid JSON/) + }) + ); + }); + + it('should call onerror for invalid JSON-RPC message', async () => { + const request = new Request('http://localhost/mcp', { + method: 'POST', + headers: { + Accept: 'application/json, text/event-stream', + 'Content-Type': 'application/json' + }, + body: JSON.stringify({ not: 'a valid jsonrpc message' }) + }); + const response = await transport.handleRequest(request); + + expect(response.status).toBe(400); + expect(onerrorSpy).toHaveBeenCalledWith( + expect.objectContaining({ + message: expect.stringMatching(/Parse error.*Invalid JSON-RPC/) + }) + ); + }); + + it('should call onerror for duplicate initialization', async () => { + sessionId = await initializeServer(); + + // Reset spy after init (which succeeds without error) + onerrorSpy.mockClear(); + + const request = createRequest('POST', { ...TEST_MESSAGES.initialize, id: 'second-init' }); + const response = await transport.handleRequest(request); + + expect(response.status).toBe(400); + expect(onerrorSpy).toHaveBeenCalledWith( + expect.objectContaining({ + message: expect.stringMatching(/Server already initialized/) + }) + ); + }); + + it('should call onerror for batch initialization', async () => { + const batchInit: JSONRPCMessage[] = [TEST_MESSAGES.initialize, { ...TEST_MESSAGES.initialize, id: 'init-2' }]; + + const request = createRequest('POST', batchInit); + const response = await transport.handleRequest(request); + + expect(response.status).toBe(400); + expect(onerrorSpy).toHaveBeenCalledWith( + expect.objectContaining({ + message: expect.stringMatching(/Only one initialization request/) + }) + ); + }); + + it('should call onerror for missing session ID', async () => { + await initializeServer(); + onerrorSpy.mockClear(); + + const request = createRequest('POST', TEST_MESSAGES.toolsList); + const response = await transport.handleRequest(request); + + expect(response.status).toBe(400); + expect(onerrorSpy).toHaveBeenCalledWith( + expect.objectContaining({ + message: expect.stringMatching(/Mcp-Session-Id header is required/) + }) + ); + }); + + it('should call onerror for invalid session ID', async () => { + await initializeServer(); + onerrorSpy.mockClear(); + + const request = createRequest('POST', TEST_MESSAGES.toolsList, { sessionId: 'invalid-session' }); + const response = await transport.handleRequest(request); + + expect(response.status).toBe(404); + expect(onerrorSpy).toHaveBeenCalledWith( + expect.objectContaining({ + message: expect.stringMatching(/Session not found/) + }) + ); + }); + + it('should call onerror for unsupported protocol version', async () => { + sessionId = await initializeServer(); + onerrorSpy.mockClear(); + + const request = new Request('http://localhost/mcp', { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Accept: 'application/json, text/event-stream', + 'mcp-session-id': sessionId, + 'mcp-protocol-version': 'unsupported-version' + }, + body: JSON.stringify(TEST_MESSAGES.toolsList) + }); + const response = await transport.handleRequest(request); + + expect(response.status).toBe(400); + expect(onerrorSpy).toHaveBeenCalledWith( + expect.objectContaining({ + message: expect.stringMatching(/Unsupported protocol version/) + }) + ); + }); + + it('should call onerror for unsupported HTTP method', async () => { + const request = new Request('http://localhost/mcp', { method: 'PUT' }); + const response = await transport.handleRequest(request); + + expect(response.status).toBe(405); + expect(onerrorSpy).toHaveBeenCalledWith( + expect.objectContaining({ + message: expect.stringMatching(/Method not allowed/) + }) + ); + }); + + it('should call onerror for GET without Accept: text/event-stream', async () => { + sessionId = await initializeServer(); + onerrorSpy.mockClear(); + + const request = createRequest('GET', undefined, { sessionId, accept: 'application/json' }); + const response = await transport.handleRequest(request); + + expect(response.status).toBe(406); + expect(onerrorSpy).toHaveBeenCalledWith( + expect.objectContaining({ + message: expect.stringMatching(/Not Acceptable/) + }) + ); + }); + + it('should call onerror for duplicate GET SSE stream', async () => { + sessionId = await initializeServer(); + + // First SSE stream + const request1 = createRequest('GET', undefined, { sessionId }); + const response1 = await transport.handleRequest(request1); + expect(response1.status).toBe(200); + + onerrorSpy.mockClear(); + + // Second SSE stream should trigger onerror + const request2 = createRequest('GET', undefined, { sessionId }); + const response2 = await transport.handleRequest(request2); + + expect(response2.status).toBe(409); + expect(onerrorSpy).toHaveBeenCalledWith( + expect.objectContaining({ + message: expect.stringMatching(/Conflict/) + }) + ); + }); + + it('should call onerror for server not initialized', async () => { + // Don't initialize - just send a request that requires session + const request = createRequest('GET', undefined, { sessionId: 'some-session' }); + const response = await transport.handleRequest(request); + + expect(response.status).toBe(400); + expect(onerrorSpy).toHaveBeenCalledWith( + expect.objectContaining({ + message: expect.stringMatching(/Server not initialized/) + }) + ); + }); + }); });