diff --git a/packages/server/src/server/streamableHttp.ts b/packages/server/src/server/streamableHttp.ts index 74e689892..0f5f71125 100644 --- a/packages/server/src/server/streamableHttp.ts +++ b/packages/server/src/server/streamableHttp.ts @@ -404,6 +404,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { // The client MUST include an Accept header, listing text/event-stream as a supported content type. const acceptHeader = req.headers.get('accept'); if (!acceptHeader?.includes('text/event-stream')) { + this.onerror?.(new Error('Not Acceptable: Client must accept text/event-stream')); return this.createJsonErrorResponse(406, -32_000, 'Not Acceptable: Client must accept text/event-stream'); } @@ -430,6 +431,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { // Check if there's already an active standalone SSE stream for this session if (this._streamMapping.get(this._standaloneSseStreamId) !== undefined) { // Only one GET SSE stream is allowed per session + this.onerror?.(new Error('Conflict: Only one SSE stream is allowed per session')); return this.createJsonErrorResponse(409, -32_000, 'Conflict: Only one SSE stream is allowed per session'); } @@ -481,6 +483,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { */ private async replayEvents(lastEventId: string): Promise { if (!this._eventStore) { + this.onerror?.(new Error('Event store not configured')); return this.createJsonErrorResponse(400, -32_000, 'Event store not configured'); } @@ -491,11 +494,13 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { streamId = await this._eventStore.getStreamIdForEventId(lastEventId); if (!streamId) { + this.onerror?.(new Error('Invalid event ID format')); return this.createJsonErrorResponse(400, -32_000, 'Invalid event ID format'); } // Check conflict with the SAME streamId we'll use for mapping if (this._streamMapping.get(streamId) !== undefined) { + this.onerror?.(new Error('Conflict: Stream already has an active connection')); return this.createJsonErrorResponse(409, -32_000, 'Conflict: Stream already has an active connection'); } } @@ -586,6 +591,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { * Handles unsupported requests (`PUT`, `PATCH`, etc.) */ private handleUnsupportedRequest(): Response { + this.onerror?.(new Error('Method not allowed.')); return Response.json( { jsonrpc: '2.0', @@ -614,6 +620,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { const acceptHeader = req.headers.get('accept'); // The client MUST include an Accept header, listing both application/json and text/event-stream as supported content types. if (!acceptHeader?.includes('application/json') || !acceptHeader.includes('text/event-stream')) { + this.onerror?.(new Error('Not Acceptable: Client must accept both application/json and text/event-stream')); return this.createJsonErrorResponse( 406, -32_000, @@ -623,6 +630,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { const ct = req.headers.get('content-type'); if (!ct || !ct.includes('application/json')) { + this.onerror?.(new Error('Unsupported Media Type: Content-Type must be application/json')); return this.createJsonErrorResponse(415, -32_000, 'Unsupported Media Type: Content-Type must be application/json'); } @@ -636,6 +644,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { try { rawMessage = await req.json(); } catch { + this.onerror?.(new Error('Parse error: Invalid JSON')); return this.createJsonErrorResponse(400, -32_700, 'Parse error: Invalid JSON'); } } else { @@ -650,6 +659,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { ? rawMessage.map(msg => JSONRPCMessageSchema.parse(msg)) : [JSONRPCMessageSchema.parse(rawMessage)]; } catch { + this.onerror?.(new Error('Parse error: Invalid JSON-RPC message')); return this.createJsonErrorResponse(400, -32_700, 'Parse error: Invalid JSON-RPC message'); } @@ -660,9 +670,11 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { // If it's a server with session management and the session ID is already set we should reject the request // to avoid re-initialization. if (this._initialized && this.sessionId !== undefined) { + this.onerror?.(new Error('Invalid Request: Server already initialized')); return this.createJsonErrorResponse(400, -32_600, 'Invalid Request: Server already initialized'); } if (messages.length > 1) { + this.onerror?.(new Error('Invalid Request: Only one initialization request is allowed')); return this.createJsonErrorResponse(400, -32_600, 'Invalid Request: Only one initialization request is allowed'); } this.sessionId = this.sessionIdGenerator?.(); @@ -842,6 +854,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { } if (!this._initialized) { // If the server has not been initialized yet, reject all requests + this.onerror?.(new Error('Bad Request: Server not initialized')); return this.createJsonErrorResponse(400, -32_000, 'Bad Request: Server not initialized'); } @@ -849,11 +862,13 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { if (!sessionId) { // Non-initialization requests without a session ID should return 400 Bad Request + this.onerror?.(new Error('Bad Request: Mcp-Session-Id header is required')); return this.createJsonErrorResponse(400, -32_000, 'Bad Request: Mcp-Session-Id header is required'); } if (sessionId !== this.sessionId) { // Reject requests with invalid session ID with 404 Not Found + this.onerror?.(new Error('Session not found')); return this.createJsonErrorResponse(404, -32_001, 'Session not found'); } @@ -877,11 +892,9 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { const protocolVersion = req.headers.get('mcp-protocol-version'); if (protocolVersion !== null && !this._supportedProtocolVersions.includes(protocolVersion)) { - return this.createJsonErrorResponse( - 400, - -32_000, - `Bad Request: Unsupported protocol version: ${protocolVersion} (supported versions: ${this._supportedProtocolVersions.join(', ')})` - ); + const message = `Bad Request: Unsupported protocol version: ${protocolVersion} (supported versions: ${this._supportedProtocolVersions.join(', ')})`; + this.onerror?.(new Error(message)); + return this.createJsonErrorResponse(400, -32_000, message); } return undefined; } diff --git a/packages/server/test/server/streamableHttp.test.ts b/packages/server/test/server/streamableHttp.test.ts index ab6f22342..b22762297 100644 --- a/packages/server/test/server/streamableHttp.test.ts +++ b/packages/server/test/server/streamableHttp.test.ts @@ -765,4 +765,264 @@ describe('Zod v4', () => { await expect(transport.start()).rejects.toThrow('Transport already started'); }); }); + + describe('HTTPServerTransport - onerror callback', () => { + let transport: WebStandardStreamableHTTPServerTransport; + let mcpServer: McpServer; + let sessionId: string; + let onerrorSpy: ReturnType; + + beforeEach(async () => { + mcpServer = new McpServer({ name: 'test-server', version: '1.0.0' }, { capabilities: { logging: {} } }); + + mcpServer.registerTool( + 'greet', + { + description: 'A simple greeting tool', + inputSchema: z.object({ name: z.string().describe('Name to greet') }) + }, + async ({ name }): Promise => { + return { content: [{ type: 'text', text: `Hello, ${name}!` }] }; + } + ); + + transport = new WebStandardStreamableHTTPServerTransport({ + sessionIdGenerator: () => randomUUID() + }); + + onerrorSpy = vi.fn(); + transport.onerror = onerrorSpy; + + await mcpServer.connect(transport); + }); + + afterEach(async () => { + await transport.close(); + }); + + async function initializeServer(): Promise { + const request = createRequest('POST', TEST_MESSAGES.initialize); + const response = await transport.handleRequest(request); + expect(response.status).toBe(200); + const newSessionId = response.headers.get('mcp-session-id'); + expect(newSessionId).toBeDefined(); + return newSessionId as string; + } + + it('should call onerror for POST with wrong Accept header', async () => { + const request = createRequest('POST', TEST_MESSAGES.initialize, { accept: 'application/json' }); + const response = await transport.handleRequest(request); + + expect(response.status).toBe(406); + expect(onerrorSpy).toHaveBeenCalledWith( + expect.objectContaining({ + message: expect.stringMatching(/Not Acceptable/) + }) + ); + }); + + it('should call onerror for POST with wrong Content-Type', async () => { + const request = new Request('http://localhost/mcp', { + method: 'POST', + headers: { + Accept: 'application/json, text/event-stream', + 'Content-Type': 'text/plain' + }, + body: JSON.stringify(TEST_MESSAGES.initialize) + }); + const response = await transport.handleRequest(request); + + expect(response.status).toBe(415); + expect(onerrorSpy).toHaveBeenCalledWith( + expect.objectContaining({ + message: expect.stringMatching(/Unsupported Media Type/) + }) + ); + }); + + it('should call onerror for invalid JSON body', async () => { + const request = new Request('http://localhost/mcp', { + method: 'POST', + headers: { + Accept: 'application/json, text/event-stream', + 'Content-Type': 'application/json' + }, + body: 'not valid json' + }); + const response = await transport.handleRequest(request); + + expect(response.status).toBe(400); + expect(onerrorSpy).toHaveBeenCalledWith( + expect.objectContaining({ + message: expect.stringMatching(/Parse error.*Invalid JSON/) + }) + ); + }); + + it('should call onerror for invalid JSON-RPC message', async () => { + const request = new Request('http://localhost/mcp', { + method: 'POST', + headers: { + Accept: 'application/json, text/event-stream', + 'Content-Type': 'application/json' + }, + body: JSON.stringify({ not: 'a valid jsonrpc message' }) + }); + const response = await transport.handleRequest(request); + + expect(response.status).toBe(400); + expect(onerrorSpy).toHaveBeenCalledWith( + expect.objectContaining({ + message: expect.stringMatching(/Parse error.*Invalid JSON-RPC/) + }) + ); + }); + + it('should call onerror for duplicate initialization', async () => { + sessionId = await initializeServer(); + + // Reset spy after init (which succeeds without error) + onerrorSpy.mockClear(); + + const request = createRequest('POST', { ...TEST_MESSAGES.initialize, id: 'second-init' }); + const response = await transport.handleRequest(request); + + expect(response.status).toBe(400); + expect(onerrorSpy).toHaveBeenCalledWith( + expect.objectContaining({ + message: expect.stringMatching(/Server already initialized/) + }) + ); + }); + + it('should call onerror for batch initialization', async () => { + const batchInit: JSONRPCMessage[] = [TEST_MESSAGES.initialize, { ...TEST_MESSAGES.initialize, id: 'init-2' }]; + + const request = createRequest('POST', batchInit); + const response = await transport.handleRequest(request); + + expect(response.status).toBe(400); + expect(onerrorSpy).toHaveBeenCalledWith( + expect.objectContaining({ + message: expect.stringMatching(/Only one initialization request/) + }) + ); + }); + + it('should call onerror for missing session ID', async () => { + await initializeServer(); + onerrorSpy.mockClear(); + + const request = createRequest('POST', TEST_MESSAGES.toolsList); + const response = await transport.handleRequest(request); + + expect(response.status).toBe(400); + expect(onerrorSpy).toHaveBeenCalledWith( + expect.objectContaining({ + message: expect.stringMatching(/Mcp-Session-Id header is required/) + }) + ); + }); + + it('should call onerror for invalid session ID', async () => { + await initializeServer(); + onerrorSpy.mockClear(); + + const request = createRequest('POST', TEST_MESSAGES.toolsList, { sessionId: 'invalid-session' }); + const response = await transport.handleRequest(request); + + expect(response.status).toBe(404); + expect(onerrorSpy).toHaveBeenCalledWith( + expect.objectContaining({ + message: expect.stringMatching(/Session not found/) + }) + ); + }); + + it('should call onerror for unsupported protocol version', async () => { + sessionId = await initializeServer(); + onerrorSpy.mockClear(); + + const request = new Request('http://localhost/mcp', { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Accept: 'application/json, text/event-stream', + 'mcp-session-id': sessionId, + 'mcp-protocol-version': 'unsupported-version' + }, + body: JSON.stringify(TEST_MESSAGES.toolsList) + }); + const response = await transport.handleRequest(request); + + expect(response.status).toBe(400); + expect(onerrorSpy).toHaveBeenCalledWith( + expect.objectContaining({ + message: expect.stringMatching(/Unsupported protocol version/) + }) + ); + }); + + it('should call onerror for unsupported HTTP method', async () => { + const request = new Request('http://localhost/mcp', { method: 'PUT' }); + const response = await transport.handleRequest(request); + + expect(response.status).toBe(405); + expect(onerrorSpy).toHaveBeenCalledWith( + expect.objectContaining({ + message: expect.stringMatching(/Method not allowed/) + }) + ); + }); + + it('should call onerror for GET without Accept: text/event-stream', async () => { + sessionId = await initializeServer(); + onerrorSpy.mockClear(); + + const request = createRequest('GET', undefined, { sessionId, accept: 'application/json' }); + const response = await transport.handleRequest(request); + + expect(response.status).toBe(406); + expect(onerrorSpy).toHaveBeenCalledWith( + expect.objectContaining({ + message: expect.stringMatching(/Not Acceptable/) + }) + ); + }); + + it('should call onerror for duplicate GET SSE stream', async () => { + sessionId = await initializeServer(); + + // First SSE stream + const request1 = createRequest('GET', undefined, { sessionId }); + const response1 = await transport.handleRequest(request1); + expect(response1.status).toBe(200); + + onerrorSpy.mockClear(); + + // Second SSE stream should trigger onerror + const request2 = createRequest('GET', undefined, { sessionId }); + const response2 = await transport.handleRequest(request2); + + expect(response2.status).toBe(409); + expect(onerrorSpy).toHaveBeenCalledWith( + expect.objectContaining({ + message: expect.stringMatching(/Conflict/) + }) + ); + }); + + it('should call onerror for server not initialized', async () => { + // Don't initialize - just send a request that requires session + const request = createRequest('GET', undefined, { sessionId: 'some-session' }); + const response = await transport.handleRequest(request); + + expect(response.status).toBe(400); + expect(onerrorSpy).toHaveBeenCalledWith( + expect.objectContaining({ + message: expect.stringMatching(/Server not initialized/) + }) + ); + }); + }); });