From 621ccdf8ec2d67be91dc9c1d8f4426d83f677758 Mon Sep 17 00:00:00 2001 From: Jordan Ritter Date: Tue, 19 May 2026 06:34:44 -0700 Subject: [PATCH 1/7] fix: systemMessage array exact-match and missing endpoint types Collapse dead if/else branch in array systemMessage matching (both paths were identical after the exact-match fix). Add elevenlabs-tts and translation to the bidirectional endpoint compatibility filter. --- src/router.ts | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/src/router.ts b/src/router.ts index 5d8861f..9c77a78 100644 --- a/src/router.ts +++ b/src/router.ts @@ -88,10 +88,12 @@ export function matchFixture( const compatible = (reqEndpoint === "image" && isImageResponse(r)) || (reqEndpoint === "speech" && isAudioResponse(r)) || + (reqEndpoint === "elevenlabs-tts" && isAudioResponse(r)) || (reqEndpoint === "audio-gen" && isAudioResponse(r)) || (reqEndpoint === "fal-audio" && isAudioResponse(r)) || (reqEndpoint === "fal" && (isJSONResponse(r) || isErrorResponse(r))) || (reqEndpoint === "transcription" && isTranscriptionResponse(r)) || + (reqEndpoint === "translation" && isTranscriptionResponse(r)) || (reqEndpoint === "video" && isVideoResponse(r)); if (!compatible) continue; } @@ -147,16 +149,9 @@ export function matchFixture( // permissive behaviour as not setting systemMessage at all. let allPresent = true; for (const needle of sm) { - if (useExactMatch) { - if (text !== needle) { - allPresent = false; - break; - } - } else { - if (!text.includes(needle)) { - allPresent = false; - break; - } + if (!text.includes(needle)) { + allPresent = false; + break; } } if (!allPresent) continue; From e5602ab0a7329a0e0d36670af70819e58b482ecf Mon Sep 17 00:00:00 2001 From: Jordan Ritter Date: Tue, 19 May 2026 06:34:52 -0700 Subject: [PATCH 2/7] fix: session.update rollback, Beta flat fields, turn_detection, text parts, ws.send race Complete session.update rollback with full snapshot. Add missing Beta flat fields for noise reduction and transcription. Apply turn_detection updates from session.update. Concatenate all text content parts in realtimeItemsToMessages. Wrap ws.send in try-catch for TOCTOU safety. --- src/ws-realtime.ts | 141 ++++++++++++++++++++++++++------------------- 1 file changed, 82 insertions(+), 59 deletions(-) diff --git a/src/ws-realtime.ts b/src/ws-realtime.ts index 489b164..fb72e15 100644 --- a/src/ws-realtime.ts +++ b/src/ws-realtime.ts @@ -115,8 +115,12 @@ export function realtimeItemsToMessages( }); messages.push({ role, content: mappedContent }); } else { - // Existing behavior: extract text from first content element - const text = item.content?.[0]?.text ?? ""; + // Join all text content parts (not just the first) + const text = + item.content + ?.map((c) => c.text) + .filter(Boolean) + .join("") ?? ""; messages.push({ role, content: text }); } } else if (item.type === "function_call") { @@ -448,9 +452,8 @@ async function processMessage( } } - // Capture pre-mutation values for rollback on validation failure - const prevModel = session.model; - const prevType = session.type; + // Capture full pre-mutation snapshot for rollback on validation failure + const prevSession = { ...session }; if (s.instructions !== undefined) session.instructions = s.instructions; if (s.tools !== undefined) session.tools = s.tools; @@ -480,6 +483,12 @@ async function processMessage( if (s.voice !== undefined) session.voice = s.voice; if (s.input_audio_format !== undefined) session.input_audio_format = s.input_audio_format; if (s.output_audio_format !== undefined) session.output_audio_format = s.output_audio_format; + if (s.input_audio_noise_reduction !== undefined) + session.input_audio_noise_reduction = s.input_audio_noise_reduction; + if (s.input_audio_transcription !== undefined) + session.input_audio_transcription = s.input_audio_transcription; + // turn_detection config + if (s.turn_detection !== undefined) session.turn_detection = s.turn_detection; // reasoning config if ((s as Record).reasoning !== undefined) session.reasoning = (s as Record).reasoning as { @@ -500,14 +509,13 @@ async function processMessage( ]); if (session.type === "transcription" && !transcriptionModels.has(session.model)) { - session.model = prevModel; - session.type = prevType; + Object.assign(session, prevSession); sendEvent( ws, { type: "error", error: { - message: `Model ${s.model ?? prevModel} does not support session type transcription`, + message: `Model ${s.model ?? prevSession.model} does not support session type transcription`, type: "invalid_request_error", code: "invalid_session_config", }, @@ -517,14 +525,13 @@ async function processMessage( return; } if (session.type === "translation" && !translationModels.has(session.model)) { - session.model = prevModel; - session.type = prevType; + Object.assign(session, prevSession); sendEvent( ws, { type: "error", error: { - message: `Model ${s.model ?? prevModel} does not support session type translation`, + message: `Model ${s.model ?? prevSession.model} does not support session type translation`, type: "invalid_request_error", code: "invalid_session_config", }, @@ -933,18 +940,22 @@ async function handleResponseCreate( } if (ws.isClosed) break; const chunk = content.slice(i, i + chunkSize); - sendEvent( - ws, - { - type: "response.output_text.delta", - response_id: responseId, - item_id: textItemId, - output_index: textOutputIndex, - content_index: contentIndex, - delta: chunk, - }, - isBeta, - ); + try { + sendEvent( + ws, + { + type: "response.output_text.delta", + response_id: responseId, + item_id: textItemId, + output_index: textOutputIndex, + content_index: contentIndex, + delta: chunk, + }, + isBeta, + ); + } catch { + break; + } eventIndex++; interruption?.tick(); if (interruption?.signal.aborted) { @@ -1095,18 +1106,22 @@ async function handleResponseCreate( } if (ws.isClosed) break; const chunk = args.slice(i, i + chunkSize); - sendEvent( - ws, - { - type: "response.function_call_arguments.delta", - response_id: responseId, - item_id: itemId, - output_index: outputIndex, - call_id: callId, - delta: chunk, - }, - isBeta, - ); + try { + sendEvent( + ws, + { + type: "response.function_call_arguments.delta", + response_id: responseId, + item_id: itemId, + output_index: outputIndex, + call_id: callId, + delta: chunk, + }, + isBeta, + ); + } catch { + break; + } eventIndex++; interruption?.tick(); if (interruption?.signal.aborted) { @@ -1307,18 +1322,22 @@ async function handleResponseCreate( } if (ws.isClosed) break; const chunk = content.slice(i, i + chunkSize); - sendEvent( - ws, - { - type: "response.output_text.delta", - response_id: responseId, - item_id: itemId, - output_index: outputIndex, - content_index: contentIndex, - delta: chunk, - }, - isBeta, - ); + try { + sendEvent( + ws, + { + type: "response.output_text.delta", + response_id: responseId, + item_id: itemId, + output_index: outputIndex, + content_index: contentIndex, + delta: chunk, + }, + isBeta, + ); + } catch { + break; + } eventIndex++; interruption?.tick(); if (interruption?.signal.aborted) { @@ -1509,18 +1528,22 @@ async function handleResponseCreate( } if (ws.isClosed) break; const chunk = args.slice(i, i + chunkSize); - sendEvent( - ws, - { - type: "response.function_call_arguments.delta", - response_id: responseId, - item_id: itemId, - output_index: tcIdx, - call_id: callId, - delta: chunk, - }, - isBeta, - ); + try { + sendEvent( + ws, + { + type: "response.function_call_arguments.delta", + response_id: responseId, + item_id: itemId, + output_index: tcIdx, + call_id: callId, + delta: chunk, + }, + isBeta, + ); + } catch { + break; + } eventIndex++; interruption?.tick(); if (interruption?.signal.aborted) { From 1d4fd3c6bbc332f513c181524730c521bc9b97d6 Mon Sep 17 00:00:00 2001 From: Jordan Ritter Date: Tue, 19 May 2026 06:35:12 -0700 Subject: [PATCH 3/7] fix: Content-Type fallback, EndpointType derivation, fixture detection guards Change ctString ?? to || for empty-string fallback. Derive EndpointType from FixtureMatch instead of duplicate union. Add negative guards to Gemini Interactions outputs detection. Scope turnIndex/hasToolResult to chat endpoints. Document gemini-interactions provider key remap. --- src/recorder.ts | 35 ++++++++++++++++------------------- 1 file changed, 16 insertions(+), 19 deletions(-) diff --git a/src/recorder.ts b/src/recorder.ts index 8b2f2e8..a892a9d 100644 --- a/src/recorder.ts +++ b/src/recorder.ts @@ -6,6 +6,7 @@ import * as crypto from "node:crypto"; import type { ChatCompletionRequest, Fixture, + FixtureMatch, FixtureResponse, RecordConfig, RecordedTimings, @@ -240,7 +241,8 @@ export async function proxyAndRecord( if (!record) return "not_configured"; const providers = record.providers; - // gemini-interactions shares the same upstream config as gemini + // Gemini Interactions uses the same upstream API as Gemini (identical base URL + // and auth), so we remap the provider key to reuse the configured Gemini URL. const lookupKey = providerKey === "gemini-interactions" ? "gemini" : providerKey; const upstreamUrl = providers[lookupKey]; @@ -554,7 +556,7 @@ export async function proxyAndRecord( const relayHeaders: Record = {}; const clientCt = (clientStatus >= 200 && clientStatus < 300) || !isAudioRelay - ? (ctString ?? "application/json") + ? ctString || "application/json" : "application/json"; if (clientCt) { relayHeaders["Content-Type"] = clientCt; @@ -862,7 +864,13 @@ function buildFixtureResponse( } // Gemini Interactions: { id, status, outputs: [{ type: "text", text }, { type: "function_call", name, arguments }] } - if (Array.isArray(obj.outputs) && obj.outputs.length > 0) { + if ( + Array.isArray(obj.outputs) && + obj.outputs.length > 0 && + !("choices" in obj) && + !("content" in obj) && + !("candidates" in obj) + ) { const outputs = obj.outputs as Array>; const fnCallOutputs = outputs.filter((o) => o.type === "function_call"); const textOutputs = outputs.filter((o) => o.type === "text" && typeof o.text === "string"); @@ -1248,21 +1256,7 @@ function buildFixtureResponse( /** * Derive fixture match criteria from the original request. */ -type EndpointType = - | "chat" - | "image" - | "speech" - | "transcription" - | "translation" - | "video" - | "embedding" - | "audio-gen" - | "elevenlabs-tts" - | "fal-audio" - | "fal" - | "realtime" - | "realtime-transcription" - | "realtime-translation"; +type EndpointType = NonNullable; export function buildFixtureMatch( request: ChatCompletionRequest, @@ -1318,7 +1312,10 @@ export function buildFixtureMatch( // vs. text reply after the tool result). turnIndex + hasToolResult give // each call a distinct, matcher-aware key. Skip for non-chat (no messages). const messages = request.messages ?? []; - if (messages.length > 0) { + if ( + messages.length > 0 && + (request._endpointType === "chat" || request._endpointType === undefined) + ) { match.turnIndex = messages.filter((m) => m.role === "assistant").length; match.hasToolResult = messages.some((m) => m.role === "tool"); } From e806473a98eb6efa977d28639965b841fa9e646f Mon Sep 17 00:00:00 2001 From: Jordan Ritter Date: Tue, 19 May 2026 06:35:13 -0700 Subject: [PATCH 4/7] fix: webSearches warning, Gemini-native usage fields, truncateAfterChunks framing Add missing webSearches warning to tool-call-only branch. Align interactionsUsage with geminiUsageMetadata fallback chain. Only count content.delta events for truncateAfterChunks (skip framing events). --- src/gemini-interactions.ts | 36 +++++++++++++++++++++++++++++++----- 1 file changed, 31 insertions(+), 5 deletions(-) diff --git a/src/gemini-interactions.ts b/src/gemini-interactions.ts index e5e1182..b4f9af4 100644 --- a/src/gemini-interactions.ts +++ b/src/gemini-interactions.ts @@ -304,12 +304,21 @@ function interactionsUsage(overrides?: ResponseOverrides): { total_tokens: number; } { if (!overrides?.usage) return { total_input_tokens: 0, total_output_tokens: 0, total_tokens: 0 }; - const input = overrides.usage.input_tokens ?? overrides.usage.prompt_tokens ?? 0; - const output = overrides.usage.output_tokens ?? overrides.usage.completion_tokens ?? 0; + const input = + overrides.usage.input_tokens ?? + overrides.usage.prompt_tokens ?? + overrides.usage.promptTokenCount ?? + 0; + const output = + overrides.usage.output_tokens ?? + overrides.usage.completion_tokens ?? + overrides.usage.candidatesTokenCount ?? + 0; + const total = overrides.usage.total_tokens ?? overrides.usage.totalTokenCount ?? input + output; return { total_input_tokens: input, total_output_tokens: output, - total_tokens: input + output, + total_tokens: total, }; } @@ -700,9 +709,14 @@ export async function writeGeminiInteractionsSSEStream( if (res.writableEnded) return true; // Data-only SSE (no event: prefix, no [DONE]) res.write(`data: ${JSON.stringify(event)}\n\n`); - onChunkSent?.(); + // Only count content deltas for truncateAfterChunks — framing events + // (interaction.start, content.start, content.stop, interaction.complete) + // should not consume chunk budget or trigger the chunk-sent callback. + if (event.event_type === "content.delta") { + onChunkSent?.(); + chunkIndex++; + } if (signal?.aborted) return false; - chunkIndex++; } if (!res.writableEnded) { @@ -751,6 +765,13 @@ export async function handleGeminiInteractions( // Convert to ChatCompletionRequest for fixture matching const completionReq = geminiInteractionsToCompletionRequest(interactionsReq); + // Keep "chat" rather than "gemini-interactions" — the router's endpoint + // compatibility filter (router.ts) treats "chat" as a pass-through that + // matches any unendpointed fixture. Switching to "gemini-interactions" + // would make the request fall into the multimedia guard branch, preventing + // generic chat fixtures from matching and breaking existing users. The + // recorder would also start emitting `endpoint: "gemini-interactions"` in + // recorded fixtures, creating a one-way compatibility break. completionReq._endpointType = "chat"; completionReq._context = getContext(req); @@ -988,6 +1009,11 @@ export async function handleGeminiInteractions( // Tool call response if (isToolCallResponse(response)) { + if (response.webSearches?.length) { + logger.warn( + "webSearches in fixture response are not supported for Gemini Interactions API — ignoring", + ); + } const overrides = extractOverrides(response); const journalEntry = journal.add({ method: req.method ?? "POST", From fc6ba99874bc9797d6098eb85cf358f253137ae6 Mon Sep 17 00:00:00 2001 From: Jordan Ritter Date: Tue, 19 May 2026 06:35:13 -0700 Subject: [PATCH 5/7] fix: journal headers, RawJSONResponse sync-run, endpoint type docs Replace headers: {} with flattenHeaders(req.headers) in 6 journal entries. Add isJSONResponse branch to handleSyncRun. Document shared endpoint type between queue-submit and sync-run. --- src/elevenlabs-audio.ts | 4 ++-- src/fal-audio.ts | 52 ++++++++++++++++++++++++++++++++++------- 2 files changed, 45 insertions(+), 11 deletions(-) diff --git a/src/elevenlabs-audio.ts b/src/elevenlabs-audio.ts index 3839431..777201e 100644 --- a/src/elevenlabs-audio.ts +++ b/src/elevenlabs-audio.ts @@ -122,7 +122,7 @@ export async function handleElevenLabsTTS( journal.add({ method, path, - headers: {}, + headers: flattenHeaders(req.headers), body: syntheticReq, response: { status: 503, @@ -375,7 +375,7 @@ export async function handleElevenLabsAudio( journal.add({ method, path, - headers: {}, + headers: flattenHeaders(req.headers), body: syntheticReq, response: { status: 503, diff --git a/src/fal-audio.ts b/src/fal-audio.ts index 9c4d654..73f6aec 100644 --- a/src/fal-audio.ts +++ b/src/fal-audio.ts @@ -333,7 +333,7 @@ async function handleQueueSubmit( journal.add({ method: req.method ?? "POST", path: pathname, - headers: {}, + headers: flattenHeaders(req.headers), body: syntheticReq, response: { status: 503, @@ -424,7 +424,7 @@ async function handleQueueSubmit( journal.add({ method: req.method ?? "POST", path: pathname, - headers: {}, + headers: flattenHeaders(req.headers), body: syntheticReq, response: { status: 500, fixture }, }); @@ -537,7 +537,7 @@ async function tryRecordAudioQueueWalk(args: { journal.add({ method: req.method ?? "POST", path: pathname, - headers: {}, + headers: flattenHeaders(req.headers), body: syntheticReq, response: { status: res.statusCode ?? 200, fixture: null, source: "proxy" }, }); @@ -822,6 +822,10 @@ async function handleSyncRun( (typeof parsed.text === "string" ? parsed.text : null) ?? ""; + // _endpointType is intentionally "fal-audio" — the same value used by + // handleQueueSubmit. Both the synchronous /fal/run/ and asynchronous + // /fal/queue/submit/ paths serve the same fal audio fixtures, so they + // share a single endpoint type for fixture matching purposes. const syntheticReq: ChatCompletionRequest = { model: modelId, messages: [{ role: "user", content: prompt }], @@ -861,7 +865,7 @@ async function handleSyncRun( journal.add({ method: req.method ?? "POST", path: pathname, - headers: {}, + headers: flattenHeaders(req.headers), body: syntheticReq, response: { status: 503, @@ -946,7 +950,39 @@ async function handleSyncRun( return; } - if (!isAudioResponse(response)) { + // Two valid recorded shapes for fal audio sync runs: + // - AudioResponse: authored fixtures with raw base64 audio that we wrap into + // the fal `{ audio: { url, ... } }` envelope on demand. + // - RawJSONResponse: queue-walk recordings that stored the final fal envelope + // upstream returned (already in fal's `{ audio: { url, ... } }` shape). + let result: Record; + let resultStatus = 200; + if (isAudioResponse(response)) { + result = audioToFalFile(response); + } else if (isJSONResponse(response)) { + resultStatus = (response as RawJSONResponse).status ?? 200; + const json = (response as RawJSONResponse).json; + if (!json || typeof json !== "object") { + journal.add({ + method: req.method ?? "POST", + path: pathname, + headers: flattenHeaders(req.headers), + body: syntheticReq, + response: { status: 500, fixture }, + }); + res.writeHead(500, { "Content-Type": "application/json" }); + res.end( + JSON.stringify({ + error: { + message: "Recorded fal audio fixture has non-object json", + type: "server_error", + }, + }), + ); + return; + } + result = json as Record; + } else { journal.add({ method: req.method ?? "POST", path: pathname, @@ -963,15 +999,13 @@ async function handleSyncRun( return; } - const result = audioToFalFile(response); - journal.add({ method: req.method ?? "POST", path: pathname, headers: flattenHeaders(req.headers), body: syntheticReq, - response: { status: 200, fixture }, + response: { status: resultStatus, fixture }, }); - res.writeHead(200, { "Content-Type": "application/json" }); + res.writeHead(resultStatus, { "Content-Type": "application/json" }); res.end(JSON.stringify(result)); } From cb09124a8944961799eefff33648b18567f858fc Mon Sep 17 00:00:00 2001 From: Jordan Ritter Date: Tue, 19 May 2026 06:35:41 -0700 Subject: [PATCH 6/7] fix: unique tool call IDs and ws.send TOCTOU safety in Gemini Live Replace deterministic call_gemini_${name}_${i} IDs with random generateToolCallId() to prevent cross-turn collisions. Wrap ws.send in try-catch for connection-close race safety. Update tests for random IDs. --- src/__tests__/ws-gemini-live.test.ts | 23 +++++++--- src/ws-gemini-live.ts | 68 +++++++++++++++++----------- 2 files changed, 59 insertions(+), 32 deletions(-) diff --git a/src/__tests__/ws-gemini-live.test.ts b/src/__tests__/ws-gemini-live.test.ts index 9b39267..6508c3f 100644 --- a/src/__tests__/ws-gemini-live.test.ts +++ b/src/__tests__/ws-gemini-live.test.ts @@ -181,7 +181,7 @@ describe("WebSocket Gemini Live BidiGenerateContent", () => { expect(msg.toolCall.functionCalls).toHaveLength(1); expect(msg.toolCall.functionCalls[0].name).toBe("get_weather"); expect(msg.toolCall.functionCalls[0].args).toEqual({ city: "NYC" }); - expect(msg.toolCall.functionCalls[0].id).toBe("call_gemini_get_weather_0"); + expect(msg.toolCall.functionCalls[0].id).toMatch(/^call_/); // Separate turnComplete message follows the toolCall const turnCompleteMsg = JSON.parse(raw[2]); @@ -907,7 +907,7 @@ describe("WebSocket Gemini Live BidiGenerateContent", () => { it("handles user turn with functionResponse that has string response", async () => { // Fixture that matches a tool call id const toolResultFixtureStr: Fixture = { - match: { toolCallId: "call_gemini_search_0" }, + match: { toolCallId: "call_search_1" }, response: { content: "Result processed" }, }; instance = await createServer([toolResultFixtureStr]); @@ -917,13 +917,22 @@ describe("WebSocket Gemini Live BidiGenerateContent", () => { await ws.waitForMessages(1); // setupComplete // Send clientContent with functionResponse where response is a string + // Provide explicit id so it matches the fixture's toolCallId ws.send( JSON.stringify({ clientContent: { turns: [ { role: "user", - parts: [{ functionResponse: { name: "search", response: "string-result" } }], + parts: [ + { + functionResponse: { + name: "search", + response: "string-result", + id: "call_search_1", + }, + }, + ], }, ], turnComplete: true, @@ -941,7 +950,7 @@ describe("WebSocket Gemini Live BidiGenerateContent", () => { it("handles toolResponse with fallback id and string response", async () => { // Fixture matching on tool call id const toolResultFixture3: Fixture = { - match: { toolCallId: "call_gemini_lookup_0" }, + match: { toolCallId: "call_lookup_1" }, response: { content: "Lookup done" }, }; instance = await createServer([toolResultFixture3]); @@ -950,11 +959,13 @@ describe("WebSocket Gemini Live BidiGenerateContent", () => { ws.send(setupMsg()); await ws.waitForMessages(1); // setupComplete - // Send toolResponse without id (relies on fallback) and with string response + // Send toolResponse with explicit id and string response ws.send( JSON.stringify({ toolResponse: { - functionResponses: [{ name: "lookup", response: "string-response-value" }], + functionResponses: [ + { name: "lookup", response: "string-response-value", id: "call_lookup_1" }, + ], }, }), ); diff --git a/src/ws-gemini-live.ts b/src/ws-gemini-live.ts index 6705659..2e29d48 100644 --- a/src/ws-gemini-live.ts +++ b/src/ws-gemini-live.ts @@ -129,7 +129,7 @@ function httpToGrpc(httpCode: number): number { /** * Convert Gemini Live turns into ChatMessage[] for fixture matching. */ -function geminiTurnsToMessages(turns: GeminiLiveTurn[]): ChatMessage[] { +function geminiTurnsToMessages(turns: GeminiLiveTurn[], logger?: Logger): ChatMessage[] { const messages: ChatMessage[] = []; for (const turn of turns) { @@ -148,7 +148,7 @@ function geminiTurnsToMessages(turns: GeminiLiveTurn[]): ChatMessage[] { messages.push({ role: "tool", content: typeof fr.response === "string" ? fr.response : JSON.stringify(fr.response), - tool_call_id: fr.id ?? `call_gemini_${fr.name}_${i}`, + tool_call_id: fr.id ?? generateToolCallId(), }); } if (textParts.length > 0) { @@ -170,8 +170,8 @@ function geminiTurnsToMessages(turns: GeminiLiveTurn[]): ChatMessage[] { messages.push({ role: "assistant", content: text || null, - tool_calls: funcCalls.map((p, i) => ({ - id: `call_gemini_${p.functionCall!.name}_${i}`, + tool_calls: funcCalls.map((p) => ({ + id: generateToolCallId(), type: "function" as const, function: { name: p.functionCall!.name, @@ -183,6 +183,8 @@ function geminiTurnsToMessages(turns: GeminiLiveTurn[]): ChatMessage[] { const text = textParts.map((p) => p.text!).join(""); messages.push({ role: "assistant", content: text }); } + } else { + logger?.warn(`[gemini-live] skipping turn with unrecognized role: ${role}`); } } @@ -193,10 +195,10 @@ function geminiTurnsToMessages(turns: GeminiLiveTurn[]): ChatMessage[] { * Convert toolResponse messages into ChatMessage[] for fixture matching. */ function toolResponseToMessages(toolResponse: GeminiLiveToolResponse): ChatMessage[] { - return toolResponse.functionResponses.map((fr, i) => ({ + return toolResponse.functionResponses.map((fr) => ({ role: "tool" as const, content: typeof fr.response === "string" ? fr.response : JSON.stringify(fr.response), - tool_call_id: fr.id ?? `call_gemini_${fr.name}_${i}`, + tool_call_id: fr.id ?? generateToolCallId(), })); } @@ -331,7 +333,7 @@ async function processMessage( ); return; } - newMessages = geminiTurnsToMessages(parsed.clientContent.turns); + newMessages = geminiTurnsToMessages(parsed.clientContent.turns, defaults.logger); } else if (parsed.toolResponse) { if ( !parsed.toolResponse.functionResponses || @@ -541,13 +543,17 @@ async function processMessage( } if (ws.isClosed) break; - ws.send( - JSON.stringify({ - serverContent: { - modelTurn: { parts: [{ text: chunkList[i] }] }, - }, - }), - ); + try { + ws.send( + JSON.stringify({ + serverContent: { + modelTurn: { parts: [{ text: chunkList[i] }] }, + }, + }), + ); + } catch { + break; + } interruption?.tick(); if (interruption?.signal.aborted) { interrupted = true; @@ -696,13 +702,17 @@ async function processMessage( } if (ws.isClosed) break; - ws.send( - JSON.stringify({ - serverContent: { - modelTurn: { parts: [{ text: chunks[i] }] }, - }, - }), - ); + try { + ws.send( + JSON.stringify({ + serverContent: { + modelTurn: { parts: [{ text: chunks[i] }] }, + }, + }), + ); + } catch { + break; + } interruption?.tick(); if (interruption?.signal.aborted) { interrupted = true; @@ -766,7 +776,13 @@ async function processMessage( return; } - const functionCalls = response.toolCalls.map((tc, i) => { + // Pre-compute tool calls with stable IDs so wire message and history match + const resolvedToolCalls = response.toolCalls.map((tc) => ({ + ...tc, + resolvedId: tc.id ?? generateToolCallId(), + })); + + const functionCalls = resolvedToolCalls.map((tc) => { let argsObj: Record; try { argsObj = JSON.parse(tc.arguments || "{}") as Record; @@ -779,7 +795,7 @@ async function processMessage( return { name: tc.name, args: argsObj, - id: tc.id ?? `call_gemini_${tc.name}_${i}`, + id: tc.resolvedId, }; }); @@ -805,12 +821,12 @@ async function processMessage( ); } - // Add assistant tool_calls to conversation history + // Add assistant tool_calls to conversation history using the same resolved IDs session.conversationHistory.push({ role: "assistant", content: null, - tool_calls: response.toolCalls.map((tc, i) => ({ - id: tc.id ?? `call_gemini_${tc.name}_${i}`, + tool_calls: resolvedToolCalls.map((tc) => ({ + id: tc.resolvedId, type: "function" as const, function: { name: tc.name, From f770b8008b8dc03044fb9aafe6b64523aa7ab425 Mon Sep 17 00:00:00 2001 From: Jordan Ritter Date: Tue, 19 May 2026 06:35:42 -0700 Subject: [PATCH 7/7] fix: resolveUsage fallback chain, factory error cause, drift infra hardening Add input_tokens/promptTokenCount/candidatesTokenCount to resolveUsage. Preserve error cause in resolveResponse rethrow. Update aimock header. Escape regex in extractFormField, warn on boundary-less fallback. Fix parseTypedSSE multi-line handling, [DONE] filter, JSON error context, retry logging, and API key redaction in drift providers. --- src/__tests__/drift/providers.ts | 65 ++++++++++++++++++++++---------- src/helpers.ts | 11 +++--- src/transcription.ts | 4 +- src/types.ts | 2 +- 4 files changed, 56 insertions(+), 26 deletions(-) diff --git a/src/__tests__/drift/providers.ts b/src/__tests__/drift/providers.ts index 90d3738..fe2d9bb 100644 --- a/src/__tests__/drift/providers.ts +++ b/src/__tests__/drift/providers.ts @@ -39,6 +39,10 @@ async function fetchWithRetry(url: string, init: RequestInit, maxRetries = 3): P try { const res = await fetch(url, init); if (RETRYABLE_STATUSES.has(res.status) && attempt < maxRetries - 1) { + console.warn( + `Retry ${attempt + 1}/${maxRetries} after ${res.status} for ${url.slice(0, 80)}`, + ); + await res.text(); // consume body to free socket const backoff = Math.pow(2, attempt) * 1000; await new Promise((r) => setTimeout(r, backoff)); continue; @@ -59,15 +63,21 @@ async function fetchWithRetry(url: string, init: RequestInit, maxRetries = 3): P // Response parsing // --------------------------------------------------------------------------- -function assertOk(raw: string, status: number, context: string): void { +/** Redact API keys from query parameters in URLs for safe error messages */ +function redactUrl(url: string): string { + return url.replace(/([?&])key=[^&]+/g, "$1key=REDACTED"); +} + +function assertOk(raw: string, status: number, context: string, url?: string): void { if (status >= 400) { - throw new Error(`${context}: API returned ${status}: ${raw.slice(0, 300)}`); + const urlSuffix = url ? ` (${redactUrl(url)})` : ""; + throw new Error(`${context}: API returned ${status}${urlSuffix}: ${raw.slice(0, 300)}`); } } -function parseJsonResponse(raw: string, status: number, context: string): unknown { +function parseJsonResponse(raw: string, status: number, context: string, url?: string): unknown { if (!raw) throw new Error(`${context}: empty response (status ${status})`); - assertOk(raw, status, context); + assertOk(raw, status, context, url); try { return JSON.parse(raw); } catch { @@ -88,14 +98,18 @@ function normalizeLineEndings(text: string): string { function parseDataOnlySSE(text: string): { data: unknown }[] { return normalizeLineEndings(text) .split("\n\n") - .filter((block) => block.startsWith("data: ") && !block.includes("[DONE]")) + .filter((block) => block.startsWith("data: ") && block.trim() !== "data: [DONE]") .map((block) => { // Rejoin continuation lines (data split across lines) const json = block .split("\n") .map((line) => (line.startsWith("data: ") ? line.slice(6) : line)) .join(""); - return { data: JSON.parse(json) }; + try { + return { data: JSON.parse(json) }; + } catch (err) { + throw new Error(`Malformed SSE JSON in frame: ${json.slice(0, 100)}`, { cause: err }); + } }); } @@ -105,12 +119,27 @@ function parseTypedSSE(text: string): { type: string; data: unknown }[] { .split("\n\n") .filter((block) => block.includes("event: ") && block.includes("data: ")) .map((block) => { - const eventMatch = block.match(/^event: (.+)$/m); - const dataMatch = block.match(/^data: (.+)$/m); - return { - type: eventMatch![1], - data: JSON.parse(dataMatch![1]), - }; + const eventMatch = block.match(/^event: (.*)$/m); + if (!eventMatch) { + throw new Error("Malformed SSE block: " + block.slice(0, 100)); + } + // Handle multi-line data: collect all data lines and join them + const json = block + .split("\n") + .filter((line) => line.startsWith("data: ")) + .map((line) => line.slice(6)) + .join(""); + if (!json) { + throw new Error("Malformed SSE block (no data): " + block.slice(0, 100)); + } + try { + return { + type: eventMatch[1], + data: JSON.parse(json), + }; + } catch (err) { + throw new Error(`Malformed SSE JSON in frame: ${json.slice(0, 100)}`, { cause: err }); + } }); } @@ -339,7 +368,7 @@ export async function geminiNonStreaming( }); const raw = await res.text(); - return { status: res.status, body: parseJsonResponse(raw, res.status, "Gemini"), raw }; + return { status: res.status, body: parseJsonResponse(raw, res.status, "Gemini", url), raw }; } export async function geminiStreaming( @@ -361,7 +390,7 @@ export async function geminiStreaming( }); const raw = await res.text(); - assertOk(raw, res.status, "Gemini streaming"); + assertOk(raw, res.status, "Gemini streaming", url); const parsed = parseDataOnlySSE(raw); const rawEvents = parsed.map((p) => ({ type: "gemini.chunk", @@ -590,13 +619,11 @@ export async function listAnthropicModels(apiKey: string): Promise { } export async function listGeminiModels(apiKey: string): Promise { - const res = await fetchWithRetry( - `https://generativelanguage.googleapis.com/v1beta/models?key=${apiKey}`, - { method: "GET" }, - ); + const url = `https://generativelanguage.googleapis.com/v1beta/models?key=${apiKey}`; + const res = await fetchWithRetry(url, { method: "GET" }); const raw = await res.text(); - const json = parseJsonResponse(raw, res.status, "Gemini model list") as { + const json = parseJsonResponse(raw, res.status, "Gemini model list", url) as { models: { name: string }[]; }; // Gemini returns "models/gemini-2.5-flash" — strip prefix diff --git a/src/helpers.ts b/src/helpers.ts index d444b6a..f131660 100644 --- a/src/helpers.ts +++ b/src/helpers.ts @@ -91,7 +91,7 @@ export async function resolveResponse( return normalizeFactoryResponse(raw); } catch (err) { const msg = err instanceof Error ? err.message : String(err); - throw new Error(`Response factory threw: ${msg}`); + throw new Error(`Response factory threw: ${msg}`, { cause: err }); } } return fixture.response; @@ -288,12 +288,12 @@ function resolveUsage( ): { prompt_tokens: number; completion_tokens: number; total_tokens: number } { if (overrides?.usage) { const u = overrides.usage; - const prompt = u.prompt_tokens ?? 0; - const completion = u.completion_tokens ?? 0; + const prompt = u.prompt_tokens ?? u.input_tokens ?? u.promptTokenCount ?? 0; + const completion = u.completion_tokens ?? u.output_tokens ?? u.candidatesTokenCount ?? 0; return { prompt_tokens: prompt, completion_tokens: completion, - total_tokens: u.total_tokens ?? prompt + completion, + total_tokens: u.total_tokens ?? u.totalTokenCount ?? prompt + completion, }; } const prompt = estimateTokens(promptText || "x"); @@ -925,6 +925,7 @@ export interface EmbeddingAPIResponse { export function buildEmbeddingResponse( embeddings: number[][], model: string, + usage?: { prompt_tokens?: number; total_tokens?: number }, ): EmbeddingAPIResponse { return { object: "list", @@ -934,6 +935,6 @@ export function buildEmbeddingResponse( embedding, })), model, - usage: { prompt_tokens: 0, total_tokens: 0 }, + usage: { prompt_tokens: usage?.prompt_tokens ?? 0, total_tokens: usage?.total_tokens ?? 0 }, }; } diff --git a/src/transcription.ts b/src/transcription.ts index 69a8aa1..5432228 100644 --- a/src/transcription.ts +++ b/src/transcription.ts @@ -40,8 +40,10 @@ export function extractFormField( ): string | undefined { if (!boundary) { // Fallback: no boundary available, use simple regex (best-effort) + console.warn("extractFormField: no multipart boundary found, using best-effort regex fallback"); + const escaped = fieldName.replace(/[.*+?^${}()|[\]\\]/g, "\\$&"); const pattern = new RegExp( - `Content-Disposition:\\s*form-data;[^\\r\\n]*name="${fieldName}"[^\\r\\n]*\\r\\n\\r\\n([^\\r\\n]*)`, + `Content-Disposition:\\s*form-data;[^\\r\\n]*name="${escaped}"[^\\r\\n]*\\r\\n\\r\\n([^\\r\\n]*)`, "i", ); const match = raw.match(pattern); diff --git a/src/types.ts b/src/types.ts index 3c68c7b..5124991 100644 --- a/src/types.ts +++ b/src/types.ts @@ -4,7 +4,7 @@ import type { Journal } from "./journal.js"; import type { Logger } from "./logger.js"; import type { MetricsRegistry } from "./metrics.js"; -// LLMock type definitions — shared across all provider adapters and the fixture router. +// aimock type definitions — shared across all provider adapters and the fixture router. export interface Mountable { handleRequest(