diff --git a/CHANGELOG.md b/CHANGELOG.md index 3fce2ec..6db251a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,18 @@ ## [Unreleased] +### Fixed + +- **Router** — systemMessage array exact-match logic was unsatisfiable for 2+ needles; collapsed to substring matching. Added `elevenlabs-tts` and `translation` to endpoint compatibility filter. +- **Recorder** — Content-Type empty-string fallback (`??` → `||`), derived `EndpointType` from `FixtureMatch` instead of duplicate union, negative guards on Gemini Interactions outputs detection, scoped `turnIndex`/`hasToolResult` to chat endpoints only. +- **WS-Realtime** — session.update rollback now captures full snapshot instead of just model/type. Added Beta flat fields for noise reduction, transcription, and turn_detection. Joined all text content parts in `realtimeItemsToMessages`. Added try-catch with debug logging around `sendEvent` for WebSocket close race safety. +- **WS-Gemini-Live** — replaced deterministic `call_gemini_${name}_${i}` tool call IDs with random `generateToolCallId()` to prevent cross-turn collisions. Pre-computed `resolvedToolCalls` for wire/history ID consistency. Added unrecognized-role warning and ws.send try-catch with debug logging. +- **Gemini Interactions** — `interactionsUsage` honors Gemini-native field names (`promptTokenCount`/`candidatesTokenCount`/`totalTokenCount`). `truncateAfterChunks` only counts `content.delta` events. Added `webSearches` warning on tool-call branch. +- **fal-audio + ElevenLabs** — all journal entries now use `flattenHeaders(req.headers)` instead of `{}`. `handleSyncRun` accepts `RawJSONResponse` fixtures from queue-walk recordings. +- **Helpers** — extended `resolveUsage` with Gemini-native token fields. Preserved error cause in `resolveResponse` factory rethrow. `buildEmbeddingResponse` accepts optional usage. `extractFormField` escapes regex metacharacters. +- **Drift test infra** — retry logging with body consumption, broadened `redactUrl` to cover `api_key`/`apikey`/`token`/`access_token` patterns, URL threaded into error messages with redaction, `parseDataOnlySSE` [DONE] filter fix, `parseTypedSSE` multi-line data handling with null guards. +- **Drift collector** — invoke vitest directly via npx to avoid pnpm stdout prefix breaking JSON parse; classify raw stack traces as infrastructure errors instead of crashing. + ## [1.27.0] - 2026-05-20 ### Added @@ -12,7 +24,6 @@ - **Walk structured content arrays in `extractLastUserMessage`** — handle multimodal user content (`AGUIMessageContentPart[]`) by joining text parts and skipping non-text. Export `NO_USER_MESSAGE_SENTINEL` constant and `AGUIMessageContentPart` type. ([#231](https://github.com/CopilotKit/aimock/pull/231)) - **Harden recorder against error responses, double-settle, and broken sentinel persistence** — guard against recording fixtures from non-2xx upstream responses, add `settled` flag to prevent error+end race, skip disk write for predicate fixtures (sentinel was semantically broken on reload), include parse error reason in SSE warning log - ## [1.26.1] - 2026-05-19 ### Added diff --git a/scripts/drift-report-collector.ts b/scripts/drift-report-collector.ts index f6614ed..126fd14 100644 --- a/scripts/drift-report-collector.ts +++ b/scripts/drift-report-collector.ts @@ -300,7 +300,7 @@ function parseVitestOutput(stdout: string, context: string): VitestJsonResult | function runDriftTests(): VitestJsonResult { try { - const stdout = execSync("pnpm test:drift --reporter=json", { + const stdout = execSync("npx vitest run --config vitest.config.drift.ts --reporter=json", { encoding: "utf-8", stdio: ["pipe", "pipe", "pipe"], maxBuffer: 50 * 1024 * 1024, @@ -409,6 +409,9 @@ function collectDriftEntries(results: VitestJsonResult): DriftEntry[] { /returned empty body/i, /waitUntil timeout/i, /AssertionError/i, + /^Error:/m, + /at\s+\S+\s+\(file:/, + /STACK_TRACE_ERROR/, ]; const driftLikeIndicators = [/drift/i, /mismatch/i, /expected.*but/i, /LLMOCK DRIFT/i]; diff --git a/src/__tests__/drift/providers.ts b/src/__tests__/drift/providers.ts index 90d3738..c87bd52 100644 --- a/src/__tests__/drift/providers.ts +++ b/src/__tests__/drift/providers.ts @@ -39,6 +39,10 @@ async function fetchWithRetry(url: string, init: RequestInit, maxRetries = 3): P try { const res = await fetch(url, init); if (RETRYABLE_STATUSES.has(res.status) && attempt < maxRetries - 1) { + console.warn( + `Retry ${attempt + 1}/${maxRetries} after ${res.status} for ${url.slice(0, 80)}`, + ); + await res.text(); // consume body to free socket const backoff = Math.pow(2, attempt) * 1000; await new Promise((r) => setTimeout(r, backoff)); continue; @@ -59,15 +63,21 @@ async function fetchWithRetry(url: string, init: RequestInit, maxRetries = 3): P // Response parsing // --------------------------------------------------------------------------- -function assertOk(raw: string, status: number, context: string): void { +/** Redact API keys from query parameters in URLs for safe error messages */ +function redactUrl(url: string): string { + return url.replace(/([?&])(api[-_]?key|key|token|access_token)=[^&]+/gi, "$1$2=REDACTED"); +} + +function assertOk(raw: string, status: number, context: string, url?: string): void { if (status >= 400) { - throw new Error(`${context}: API returned ${status}: ${raw.slice(0, 300)}`); + const urlSuffix = url ? ` (${redactUrl(url)})` : ""; + throw new Error(`${context}: API returned ${status}${urlSuffix}: ${raw.slice(0, 300)}`); } } -function parseJsonResponse(raw: string, status: number, context: string): unknown { +function parseJsonResponse(raw: string, status: number, context: string, url?: string): unknown { if (!raw) throw new Error(`${context}: empty response (status ${status})`); - assertOk(raw, status, context); + assertOk(raw, status, context, url); try { return JSON.parse(raw); } catch { @@ -88,14 +98,18 @@ function normalizeLineEndings(text: string): string { function parseDataOnlySSE(text: string): { data: unknown }[] { return normalizeLineEndings(text) .split("\n\n") - .filter((block) => block.startsWith("data: ") && !block.includes("[DONE]")) + .filter((block) => block.startsWith("data: ") && block.trim() !== "data: [DONE]") .map((block) => { // Rejoin continuation lines (data split across lines) const json = block .split("\n") .map((line) => (line.startsWith("data: ") ? line.slice(6) : line)) .join(""); - return { data: JSON.parse(json) }; + try { + return { data: JSON.parse(json) }; + } catch (err) { + throw new Error(`Malformed SSE JSON in frame: ${json.slice(0, 100)}`, { cause: err }); + } }); } @@ -105,12 +119,27 @@ function parseTypedSSE(text: string): { type: string; data: unknown }[] { .split("\n\n") .filter((block) => block.includes("event: ") && block.includes("data: ")) .map((block) => { - const eventMatch = block.match(/^event: (.+)$/m); - const dataMatch = block.match(/^data: (.+)$/m); - return { - type: eventMatch![1], - data: JSON.parse(dataMatch![1]), - }; + const eventMatch = block.match(/^event: (.*)$/m); + if (!eventMatch) { + throw new Error("Malformed SSE block: " + block.slice(0, 100)); + } + // Handle multi-line data: collect all data lines and join them + const json = block + .split("\n") + .filter((line) => line.startsWith("data: ")) + .map((line) => line.slice(6)) + .join(""); + if (!json) { + throw new Error("Malformed SSE block (no data): " + block.slice(0, 100)); + } + try { + return { + type: eventMatch[1], + data: JSON.parse(json), + }; + } catch (err) { + throw new Error(`Malformed SSE JSON in frame: ${json.slice(0, 100)}`, { cause: err }); + } }); } @@ -339,7 +368,7 @@ export async function geminiNonStreaming( }); const raw = await res.text(); - return { status: res.status, body: parseJsonResponse(raw, res.status, "Gemini"), raw }; + return { status: res.status, body: parseJsonResponse(raw, res.status, "Gemini", url), raw }; } export async function geminiStreaming( @@ -361,7 +390,7 @@ export async function geminiStreaming( }); const raw = await res.text(); - assertOk(raw, res.status, "Gemini streaming"); + assertOk(raw, res.status, "Gemini streaming", url); const parsed = parseDataOnlySSE(raw); const rawEvents = parsed.map((p) => ({ type: "gemini.chunk", @@ -590,13 +619,11 @@ export async function listAnthropicModels(apiKey: string): Promise { } export async function listGeminiModels(apiKey: string): Promise { - const res = await fetchWithRetry( - `https://generativelanguage.googleapis.com/v1beta/models?key=${apiKey}`, - { method: "GET" }, - ); + const url = `https://generativelanguage.googleapis.com/v1beta/models?key=${apiKey}`; + const res = await fetchWithRetry(url, { method: "GET" }); const raw = await res.text(); - const json = parseJsonResponse(raw, res.status, "Gemini model list") as { + const json = parseJsonResponse(raw, res.status, "Gemini model list", url) as { models: { name: string }[]; }; // Gemini returns "models/gemini-2.5-flash" — strip prefix diff --git a/src/__tests__/ws-gemini-live.test.ts b/src/__tests__/ws-gemini-live.test.ts index 9b39267..3a9763f 100644 --- a/src/__tests__/ws-gemini-live.test.ts +++ b/src/__tests__/ws-gemini-live.test.ts @@ -181,7 +181,7 @@ describe("WebSocket Gemini Live BidiGenerateContent", () => { expect(msg.toolCall.functionCalls).toHaveLength(1); expect(msg.toolCall.functionCalls[0].name).toBe("get_weather"); expect(msg.toolCall.functionCalls[0].args).toEqual({ city: "NYC" }); - expect(msg.toolCall.functionCalls[0].id).toBe("call_gemini_get_weather_0"); + expect(msg.toolCall.functionCalls[0].id).toMatch(/^call_/); // Separate turnComplete message follows the toolCall const turnCompleteMsg = JSON.parse(raw[2]); @@ -907,7 +907,7 @@ describe("WebSocket Gemini Live BidiGenerateContent", () => { it("handles user turn with functionResponse that has string response", async () => { // Fixture that matches a tool call id const toolResultFixtureStr: Fixture = { - match: { toolCallId: "call_gemini_search_0" }, + match: { toolCallId: "call_search_1" }, response: { content: "Result processed" }, }; instance = await createServer([toolResultFixtureStr]); @@ -917,13 +917,22 @@ describe("WebSocket Gemini Live BidiGenerateContent", () => { await ws.waitForMessages(1); // setupComplete // Send clientContent with functionResponse where response is a string + // Provide explicit id so it matches the fixture's toolCallId ws.send( JSON.stringify({ clientContent: { turns: [ { role: "user", - parts: [{ functionResponse: { name: "search", response: "string-result" } }], + parts: [ + { + functionResponse: { + name: "search", + response: "string-result", + id: "call_search_1", + }, + }, + ], }, ], turnComplete: true, @@ -941,7 +950,7 @@ describe("WebSocket Gemini Live BidiGenerateContent", () => { it("handles toolResponse with fallback id and string response", async () => { // Fixture matching on tool call id const toolResultFixture3: Fixture = { - match: { toolCallId: "call_gemini_lookup_0" }, + match: { toolCallId: "call_lookup_1" }, response: { content: "Lookup done" }, }; instance = await createServer([toolResultFixture3]); @@ -950,11 +959,13 @@ describe("WebSocket Gemini Live BidiGenerateContent", () => { ws.send(setupMsg()); await ws.waitForMessages(1); // setupComplete - // Send toolResponse without id (relies on fallback) and with string response + // Send toolResponse with explicit id and string response ws.send( JSON.stringify({ toolResponse: { - functionResponses: [{ name: "lookup", response: "string-response-value" }], + functionResponses: [ + { name: "lookup", response: "string-response-value", id: "call_lookup_1" }, + ], }, }), ); @@ -966,6 +977,87 @@ describe("WebSocket Gemini Live BidiGenerateContent", () => { ws.close(); }); + it("generates random call_ ID when toolResponse functionResponse has no id", async () => { + instance = await createServer(allFixtures); + const ws = await connectWebSocket(instance.url, GEMINI_WS_PATH); + + ws.send(setupMsg()); + await ws.waitForMessages(1); // setupComplete + + // Send toolResponse WITHOUT an id field — exercises the generateToolCallId() fallback + ws.send(toolResponseMsg("get_weather", { temp: "72F" })); + + // No fixture will match the random ID, so we get a "No fixture matched" error + const raw = await ws.waitForMessages(2); + const msg = JSON.parse(raw[1]); + expect(msg.error).toBeDefined(); + expect(msg.error.message).toBe("No fixture matched"); + + // Small pause to ensure journal write completed + await new Promise((r) => setTimeout(r, 50)); + + // Inspect the journal to verify the generated tool_call_id starts with call_ + const entry = instance.journal.getLast(); + expect(entry).not.toBeNull(); + const messages = entry!.body!.messages; + const toolMsg = messages.find((m) => m.role === "tool"); + expect(toolMsg).toBeDefined(); + expect(toolMsg!.tool_call_id).toMatch(/^call_/); + // Verify it's a random ID (not the old deterministic format) + expect(toolMsg!.tool_call_id).not.toMatch(/^call_gemini_/); + + ws.close(); + }); + + it("generates random call_ ID when clientContent functionResponse has no id", async () => { + const afterToolFixture: Fixture = { + match: { userMessage: "continue-after-tool" }, + response: { content: "Continued" }, + }; + instance = await createServer([afterToolFixture]); + const ws = await connectWebSocket(instance.url, GEMINI_WS_PATH); + + ws.send(setupMsg()); + await ws.waitForMessages(1); // setupComplete + + // Send clientContent with a functionResponse lacking an id, followed by user text + ws.send( + JSON.stringify({ + clientContent: { + turns: [ + { + role: "user", + parts: [ + { functionResponse: { name: "search", response: { results: [] } } }, + { text: "continue-after-tool" }, + ], + }, + ], + turnComplete: true, + }, + }), + ); + + const raw = await ws.waitForMessages(3); // setupComplete + content + turnComplete + const contentMsg = JSON.parse(raw[1]); + expect(contentMsg.serverContent).toBeDefined(); + + // Small pause to ensure journal write completed + await new Promise((r) => setTimeout(r, 50)); + + // Inspect the journal to verify the generated tool_call_id starts with call_ + const entry = instance.journal.getLast(); + expect(entry).not.toBeNull(); + const messages = entry!.body!.messages; + const toolMsg = messages.find((m) => m.role === "tool"); + expect(toolMsg).toBeDefined(); + expect(toolMsg!.tool_call_id).toMatch(/^call_/); + // Verify it's a random ID (not the old deterministic format) + expect(toolMsg!.tool_call_id).not.toMatch(/^call_gemini_/); + + ws.close(); + }); + it("handles setup with tools that have empty functionDeclarations", async () => { instance = await createServer(allFixtures); const ws = await connectWebSocket(instance.url, GEMINI_WS_PATH); diff --git a/src/elevenlabs-audio.ts b/src/elevenlabs-audio.ts index 3839431..777201e 100644 --- a/src/elevenlabs-audio.ts +++ b/src/elevenlabs-audio.ts @@ -122,7 +122,7 @@ export async function handleElevenLabsTTS( journal.add({ method, path, - headers: {}, + headers: flattenHeaders(req.headers), body: syntheticReq, response: { status: 503, @@ -375,7 +375,7 @@ export async function handleElevenLabsAudio( journal.add({ method, path, - headers: {}, + headers: flattenHeaders(req.headers), body: syntheticReq, response: { status: 503, diff --git a/src/fal-audio.ts b/src/fal-audio.ts index 9c4d654..73f6aec 100644 --- a/src/fal-audio.ts +++ b/src/fal-audio.ts @@ -333,7 +333,7 @@ async function handleQueueSubmit( journal.add({ method: req.method ?? "POST", path: pathname, - headers: {}, + headers: flattenHeaders(req.headers), body: syntheticReq, response: { status: 503, @@ -424,7 +424,7 @@ async function handleQueueSubmit( journal.add({ method: req.method ?? "POST", path: pathname, - headers: {}, + headers: flattenHeaders(req.headers), body: syntheticReq, response: { status: 500, fixture }, }); @@ -537,7 +537,7 @@ async function tryRecordAudioQueueWalk(args: { journal.add({ method: req.method ?? "POST", path: pathname, - headers: {}, + headers: flattenHeaders(req.headers), body: syntheticReq, response: { status: res.statusCode ?? 200, fixture: null, source: "proxy" }, }); @@ -822,6 +822,10 @@ async function handleSyncRun( (typeof parsed.text === "string" ? parsed.text : null) ?? ""; + // _endpointType is intentionally "fal-audio" — the same value used by + // handleQueueSubmit. Both the synchronous /fal/run/ and asynchronous + // /fal/queue/submit/ paths serve the same fal audio fixtures, so they + // share a single endpoint type for fixture matching purposes. const syntheticReq: ChatCompletionRequest = { model: modelId, messages: [{ role: "user", content: prompt }], @@ -861,7 +865,7 @@ async function handleSyncRun( journal.add({ method: req.method ?? "POST", path: pathname, - headers: {}, + headers: flattenHeaders(req.headers), body: syntheticReq, response: { status: 503, @@ -946,7 +950,39 @@ async function handleSyncRun( return; } - if (!isAudioResponse(response)) { + // Two valid recorded shapes for fal audio sync runs: + // - AudioResponse: authored fixtures with raw base64 audio that we wrap into + // the fal `{ audio: { url, ... } }` envelope on demand. + // - RawJSONResponse: queue-walk recordings that stored the final fal envelope + // upstream returned (already in fal's `{ audio: { url, ... } }` shape). + let result: Record; + let resultStatus = 200; + if (isAudioResponse(response)) { + result = audioToFalFile(response); + } else if (isJSONResponse(response)) { + resultStatus = (response as RawJSONResponse).status ?? 200; + const json = (response as RawJSONResponse).json; + if (!json || typeof json !== "object") { + journal.add({ + method: req.method ?? "POST", + path: pathname, + headers: flattenHeaders(req.headers), + body: syntheticReq, + response: { status: 500, fixture }, + }); + res.writeHead(500, { "Content-Type": "application/json" }); + res.end( + JSON.stringify({ + error: { + message: "Recorded fal audio fixture has non-object json", + type: "server_error", + }, + }), + ); + return; + } + result = json as Record; + } else { journal.add({ method: req.method ?? "POST", path: pathname, @@ -963,15 +999,13 @@ async function handleSyncRun( return; } - const result = audioToFalFile(response); - journal.add({ method: req.method ?? "POST", path: pathname, headers: flattenHeaders(req.headers), body: syntheticReq, - response: { status: 200, fixture }, + response: { status: resultStatus, fixture }, }); - res.writeHead(200, { "Content-Type": "application/json" }); + res.writeHead(resultStatus, { "Content-Type": "application/json" }); res.end(JSON.stringify(result)); } diff --git a/src/gemini-interactions.ts b/src/gemini-interactions.ts index e5e1182..b4f9af4 100644 --- a/src/gemini-interactions.ts +++ b/src/gemini-interactions.ts @@ -304,12 +304,21 @@ function interactionsUsage(overrides?: ResponseOverrides): { total_tokens: number; } { if (!overrides?.usage) return { total_input_tokens: 0, total_output_tokens: 0, total_tokens: 0 }; - const input = overrides.usage.input_tokens ?? overrides.usage.prompt_tokens ?? 0; - const output = overrides.usage.output_tokens ?? overrides.usage.completion_tokens ?? 0; + const input = + overrides.usage.input_tokens ?? + overrides.usage.prompt_tokens ?? + overrides.usage.promptTokenCount ?? + 0; + const output = + overrides.usage.output_tokens ?? + overrides.usage.completion_tokens ?? + overrides.usage.candidatesTokenCount ?? + 0; + const total = overrides.usage.total_tokens ?? overrides.usage.totalTokenCount ?? input + output; return { total_input_tokens: input, total_output_tokens: output, - total_tokens: input + output, + total_tokens: total, }; } @@ -700,9 +709,14 @@ export async function writeGeminiInteractionsSSEStream( if (res.writableEnded) return true; // Data-only SSE (no event: prefix, no [DONE]) res.write(`data: ${JSON.stringify(event)}\n\n`); - onChunkSent?.(); + // Only count content deltas for truncateAfterChunks — framing events + // (interaction.start, content.start, content.stop, interaction.complete) + // should not consume chunk budget or trigger the chunk-sent callback. + if (event.event_type === "content.delta") { + onChunkSent?.(); + chunkIndex++; + } if (signal?.aborted) return false; - chunkIndex++; } if (!res.writableEnded) { @@ -751,6 +765,13 @@ export async function handleGeminiInteractions( // Convert to ChatCompletionRequest for fixture matching const completionReq = geminiInteractionsToCompletionRequest(interactionsReq); + // Keep "chat" rather than "gemini-interactions" — the router's endpoint + // compatibility filter (router.ts) treats "chat" as a pass-through that + // matches any unendpointed fixture. Switching to "gemini-interactions" + // would make the request fall into the multimedia guard branch, preventing + // generic chat fixtures from matching and breaking existing users. The + // recorder would also start emitting `endpoint: "gemini-interactions"` in + // recorded fixtures, creating a one-way compatibility break. completionReq._endpointType = "chat"; completionReq._context = getContext(req); @@ -988,6 +1009,11 @@ export async function handleGeminiInteractions( // Tool call response if (isToolCallResponse(response)) { + if (response.webSearches?.length) { + logger.warn( + "webSearches in fixture response are not supported for Gemini Interactions API — ignoring", + ); + } const overrides = extractOverrides(response); const journalEntry = journal.add({ method: req.method ?? "POST", diff --git a/src/helpers.ts b/src/helpers.ts index d444b6a..f131660 100644 --- a/src/helpers.ts +++ b/src/helpers.ts @@ -91,7 +91,7 @@ export async function resolveResponse( return normalizeFactoryResponse(raw); } catch (err) { const msg = err instanceof Error ? err.message : String(err); - throw new Error(`Response factory threw: ${msg}`); + throw new Error(`Response factory threw: ${msg}`, { cause: err }); } } return fixture.response; @@ -288,12 +288,12 @@ function resolveUsage( ): { prompt_tokens: number; completion_tokens: number; total_tokens: number } { if (overrides?.usage) { const u = overrides.usage; - const prompt = u.prompt_tokens ?? 0; - const completion = u.completion_tokens ?? 0; + const prompt = u.prompt_tokens ?? u.input_tokens ?? u.promptTokenCount ?? 0; + const completion = u.completion_tokens ?? u.output_tokens ?? u.candidatesTokenCount ?? 0; return { prompt_tokens: prompt, completion_tokens: completion, - total_tokens: u.total_tokens ?? prompt + completion, + total_tokens: u.total_tokens ?? u.totalTokenCount ?? prompt + completion, }; } const prompt = estimateTokens(promptText || "x"); @@ -925,6 +925,7 @@ export interface EmbeddingAPIResponse { export function buildEmbeddingResponse( embeddings: number[][], model: string, + usage?: { prompt_tokens?: number; total_tokens?: number }, ): EmbeddingAPIResponse { return { object: "list", @@ -934,6 +935,6 @@ export function buildEmbeddingResponse( embedding, })), model, - usage: { prompt_tokens: 0, total_tokens: 0 }, + usage: { prompt_tokens: usage?.prompt_tokens ?? 0, total_tokens: usage?.total_tokens ?? 0 }, }; } diff --git a/src/recorder.ts b/src/recorder.ts index 8b2f2e8..a892a9d 100644 --- a/src/recorder.ts +++ b/src/recorder.ts @@ -6,6 +6,7 @@ import * as crypto from "node:crypto"; import type { ChatCompletionRequest, Fixture, + FixtureMatch, FixtureResponse, RecordConfig, RecordedTimings, @@ -240,7 +241,8 @@ export async function proxyAndRecord( if (!record) return "not_configured"; const providers = record.providers; - // gemini-interactions shares the same upstream config as gemini + // Gemini Interactions uses the same upstream API as Gemini (identical base URL + // and auth), so we remap the provider key to reuse the configured Gemini URL. const lookupKey = providerKey === "gemini-interactions" ? "gemini" : providerKey; const upstreamUrl = providers[lookupKey]; @@ -554,7 +556,7 @@ export async function proxyAndRecord( const relayHeaders: Record = {}; const clientCt = (clientStatus >= 200 && clientStatus < 300) || !isAudioRelay - ? (ctString ?? "application/json") + ? ctString || "application/json" : "application/json"; if (clientCt) { relayHeaders["Content-Type"] = clientCt; @@ -862,7 +864,13 @@ function buildFixtureResponse( } // Gemini Interactions: { id, status, outputs: [{ type: "text", text }, { type: "function_call", name, arguments }] } - if (Array.isArray(obj.outputs) && obj.outputs.length > 0) { + if ( + Array.isArray(obj.outputs) && + obj.outputs.length > 0 && + !("choices" in obj) && + !("content" in obj) && + !("candidates" in obj) + ) { const outputs = obj.outputs as Array>; const fnCallOutputs = outputs.filter((o) => o.type === "function_call"); const textOutputs = outputs.filter((o) => o.type === "text" && typeof o.text === "string"); @@ -1248,21 +1256,7 @@ function buildFixtureResponse( /** * Derive fixture match criteria from the original request. */ -type EndpointType = - | "chat" - | "image" - | "speech" - | "transcription" - | "translation" - | "video" - | "embedding" - | "audio-gen" - | "elevenlabs-tts" - | "fal-audio" - | "fal" - | "realtime" - | "realtime-transcription" - | "realtime-translation"; +type EndpointType = NonNullable; export function buildFixtureMatch( request: ChatCompletionRequest, @@ -1318,7 +1312,10 @@ export function buildFixtureMatch( // vs. text reply after the tool result). turnIndex + hasToolResult give // each call a distinct, matcher-aware key. Skip for non-chat (no messages). const messages = request.messages ?? []; - if (messages.length > 0) { + if ( + messages.length > 0 && + (request._endpointType === "chat" || request._endpointType === undefined) + ) { match.turnIndex = messages.filter((m) => m.role === "assistant").length; match.hasToolResult = messages.some((m) => m.role === "tool"); } diff --git a/src/router.ts b/src/router.ts index 5d8861f..9c77a78 100644 --- a/src/router.ts +++ b/src/router.ts @@ -88,10 +88,12 @@ export function matchFixture( const compatible = (reqEndpoint === "image" && isImageResponse(r)) || (reqEndpoint === "speech" && isAudioResponse(r)) || + (reqEndpoint === "elevenlabs-tts" && isAudioResponse(r)) || (reqEndpoint === "audio-gen" && isAudioResponse(r)) || (reqEndpoint === "fal-audio" && isAudioResponse(r)) || (reqEndpoint === "fal" && (isJSONResponse(r) || isErrorResponse(r))) || (reqEndpoint === "transcription" && isTranscriptionResponse(r)) || + (reqEndpoint === "translation" && isTranscriptionResponse(r)) || (reqEndpoint === "video" && isVideoResponse(r)); if (!compatible) continue; } @@ -147,16 +149,9 @@ export function matchFixture( // permissive behaviour as not setting systemMessage at all. let allPresent = true; for (const needle of sm) { - if (useExactMatch) { - if (text !== needle) { - allPresent = false; - break; - } - } else { - if (!text.includes(needle)) { - allPresent = false; - break; - } + if (!text.includes(needle)) { + allPresent = false; + break; } } if (!allPresent) continue; diff --git a/src/transcription.ts b/src/transcription.ts index 69a8aa1..5432228 100644 --- a/src/transcription.ts +++ b/src/transcription.ts @@ -40,8 +40,10 @@ export function extractFormField( ): string | undefined { if (!boundary) { // Fallback: no boundary available, use simple regex (best-effort) + console.warn("extractFormField: no multipart boundary found, using best-effort regex fallback"); + const escaped = fieldName.replace(/[.*+?^${}()|[\]\\]/g, "\\$&"); const pattern = new RegExp( - `Content-Disposition:\\s*form-data;[^\\r\\n]*name="${fieldName}"[^\\r\\n]*\\r\\n\\r\\n([^\\r\\n]*)`, + `Content-Disposition:\\s*form-data;[^\\r\\n]*name="${escaped}"[^\\r\\n]*\\r\\n\\r\\n([^\\r\\n]*)`, "i", ); const match = raw.match(pattern); diff --git a/src/types.ts b/src/types.ts index 3c68c7b..5124991 100644 --- a/src/types.ts +++ b/src/types.ts @@ -4,7 +4,7 @@ import type { Journal } from "./journal.js"; import type { Logger } from "./logger.js"; import type { MetricsRegistry } from "./metrics.js"; -// LLMock type definitions — shared across all provider adapters and the fixture router. +// aimock type definitions — shared across all provider adapters and the fixture router. export interface Mountable { handleRequest( diff --git a/src/ws-gemini-live.ts b/src/ws-gemini-live.ts index 6705659..e1ecbb0 100644 --- a/src/ws-gemini-live.ts +++ b/src/ws-gemini-live.ts @@ -129,7 +129,7 @@ function httpToGrpc(httpCode: number): number { /** * Convert Gemini Live turns into ChatMessage[] for fixture matching. */ -function geminiTurnsToMessages(turns: GeminiLiveTurn[]): ChatMessage[] { +function geminiTurnsToMessages(turns: GeminiLiveTurn[], logger?: Logger): ChatMessage[] { const messages: ChatMessage[] = []; for (const turn of turns) { @@ -148,7 +148,7 @@ function geminiTurnsToMessages(turns: GeminiLiveTurn[]): ChatMessage[] { messages.push({ role: "tool", content: typeof fr.response === "string" ? fr.response : JSON.stringify(fr.response), - tool_call_id: fr.id ?? `call_gemini_${fr.name}_${i}`, + tool_call_id: fr.id ?? generateToolCallId(), }); } if (textParts.length > 0) { @@ -170,8 +170,8 @@ function geminiTurnsToMessages(turns: GeminiLiveTurn[]): ChatMessage[] { messages.push({ role: "assistant", content: text || null, - tool_calls: funcCalls.map((p, i) => ({ - id: `call_gemini_${p.functionCall!.name}_${i}`, + tool_calls: funcCalls.map((p) => ({ + id: generateToolCallId(), type: "function" as const, function: { name: p.functionCall!.name, @@ -183,6 +183,8 @@ function geminiTurnsToMessages(turns: GeminiLiveTurn[]): ChatMessage[] { const text = textParts.map((p) => p.text!).join(""); messages.push({ role: "assistant", content: text }); } + } else { + logger?.warn(`[gemini-live] skipping turn with unrecognized role: ${role}`); } } @@ -193,10 +195,10 @@ function geminiTurnsToMessages(turns: GeminiLiveTurn[]): ChatMessage[] { * Convert toolResponse messages into ChatMessage[] for fixture matching. */ function toolResponseToMessages(toolResponse: GeminiLiveToolResponse): ChatMessage[] { - return toolResponse.functionResponses.map((fr, i) => ({ + return toolResponse.functionResponses.map((fr) => ({ role: "tool" as const, content: typeof fr.response === "string" ? fr.response : JSON.stringify(fr.response), - tool_call_id: fr.id ?? `call_gemini_${fr.name}_${i}`, + tool_call_id: fr.id ?? generateToolCallId(), })); } @@ -331,7 +333,7 @@ async function processMessage( ); return; } - newMessages = geminiTurnsToMessages(parsed.clientContent.turns); + newMessages = geminiTurnsToMessages(parsed.clientContent.turns, defaults.logger); } else if (parsed.toolResponse) { if ( !parsed.toolResponse.functionResponses || @@ -541,13 +543,18 @@ async function processMessage( } if (ws.isClosed) break; - ws.send( - JSON.stringify({ - serverContent: { - modelTurn: { parts: [{ text: chunkList[i] }] }, - }, - }), - ); + try { + ws.send( + JSON.stringify({ + serverContent: { + modelTurn: { parts: [{ text: chunkList[i] }] }, + }, + }), + ); + } catch (err) { + defaults.logger.debug("[gemini-live] send failed during text streaming, closing", err); + break; + } interruption?.tick(); if (interruption?.signal.aborted) { interrupted = true; @@ -696,13 +703,18 @@ async function processMessage( } if (ws.isClosed) break; - ws.send( - JSON.stringify({ - serverContent: { - modelTurn: { parts: [{ text: chunks[i] }] }, - }, - }), - ); + try { + ws.send( + JSON.stringify({ + serverContent: { + modelTurn: { parts: [{ text: chunks[i] }] }, + }, + }), + ); + } catch (err) { + defaults.logger.debug("[gemini-live] send failed during text streaming, closing", err); + break; + } interruption?.tick(); if (interruption?.signal.aborted) { interrupted = true; @@ -766,7 +778,13 @@ async function processMessage( return; } - const functionCalls = response.toolCalls.map((tc, i) => { + // Pre-compute tool calls with stable IDs so wire message and history match + const resolvedToolCalls = response.toolCalls.map((tc) => ({ + ...tc, + resolvedId: tc.id ?? generateToolCallId(), + })); + + const functionCalls = resolvedToolCalls.map((tc) => { let argsObj: Record; try { argsObj = JSON.parse(tc.arguments || "{}") as Record; @@ -779,7 +797,7 @@ async function processMessage( return { name: tc.name, args: argsObj, - id: tc.id ?? `call_gemini_${tc.name}_${i}`, + id: tc.resolvedId, }; }); @@ -805,12 +823,12 @@ async function processMessage( ); } - // Add assistant tool_calls to conversation history + // Add assistant tool_calls to conversation history using the same resolved IDs session.conversationHistory.push({ role: "assistant", content: null, - tool_calls: response.toolCalls.map((tc, i) => ({ - id: tc.id ?? `call_gemini_${tc.name}_${i}`, + tool_calls: resolvedToolCalls.map((tc) => ({ + id: tc.resolvedId, type: "function" as const, function: { name: tc.name, diff --git a/src/ws-realtime.ts b/src/ws-realtime.ts index 489b164..45e4676 100644 --- a/src/ws-realtime.ts +++ b/src/ws-realtime.ts @@ -115,8 +115,12 @@ export function realtimeItemsToMessages( }); messages.push({ role, content: mappedContent }); } else { - // Existing behavior: extract text from first content element - const text = item.content?.[0]?.text ?? ""; + // Join all text content parts (not just the first) + const text = + item.content + ?.map((c) => c.text) + .filter(Boolean) + .join("") ?? ""; messages.push({ role, content: text }); } } else if (item.type === "function_call") { @@ -448,9 +452,8 @@ async function processMessage( } } - // Capture pre-mutation values for rollback on validation failure - const prevModel = session.model; - const prevType = session.type; + // Capture full pre-mutation snapshot for rollback on validation failure + const prevSession = { ...session }; if (s.instructions !== undefined) session.instructions = s.instructions; if (s.tools !== undefined) session.tools = s.tools; @@ -480,6 +483,12 @@ async function processMessage( if (s.voice !== undefined) session.voice = s.voice; if (s.input_audio_format !== undefined) session.input_audio_format = s.input_audio_format; if (s.output_audio_format !== undefined) session.output_audio_format = s.output_audio_format; + if (s.input_audio_noise_reduction !== undefined) + session.input_audio_noise_reduction = s.input_audio_noise_reduction; + if (s.input_audio_transcription !== undefined) + session.input_audio_transcription = s.input_audio_transcription; + // turn_detection config + if (s.turn_detection !== undefined) session.turn_detection = s.turn_detection; // reasoning config if ((s as Record).reasoning !== undefined) session.reasoning = (s as Record).reasoning as { @@ -500,14 +509,13 @@ async function processMessage( ]); if (session.type === "transcription" && !transcriptionModels.has(session.model)) { - session.model = prevModel; - session.type = prevType; + Object.assign(session, prevSession); sendEvent( ws, { type: "error", error: { - message: `Model ${s.model ?? prevModel} does not support session type transcription`, + message: `Model ${s.model ?? prevSession.model} does not support session type transcription`, type: "invalid_request_error", code: "invalid_session_config", }, @@ -517,14 +525,13 @@ async function processMessage( return; } if (session.type === "translation" && !translationModels.has(session.model)) { - session.model = prevModel; - session.type = prevType; + Object.assign(session, prevSession); sendEvent( ws, { type: "error", error: { - message: `Model ${s.model ?? prevModel} does not support session type translation`, + message: `Model ${s.model ?? prevSession.model} does not support session type translation`, type: "invalid_request_error", code: "invalid_session_config", }, @@ -933,18 +940,23 @@ async function handleResponseCreate( } if (ws.isClosed) break; const chunk = content.slice(i, i + chunkSize); - sendEvent( - ws, - { - type: "response.output_text.delta", - response_id: responseId, - item_id: textItemId, - output_index: textOutputIndex, - content_index: contentIndex, - delta: chunk, - }, - isBeta, - ); + try { + sendEvent( + ws, + { + type: "response.output_text.delta", + response_id: responseId, + item_id: textItemId, + output_index: textOutputIndex, + content_index: contentIndex, + delta: chunk, + }, + isBeta, + ); + } catch (err) { + defaults.logger.debug("[ws-realtime] send failed during text streaming, closing", err); + break; + } eventIndex++; interruption?.tick(); if (interruption?.signal.aborted) { @@ -1095,18 +1107,26 @@ async function handleResponseCreate( } if (ws.isClosed) break; const chunk = args.slice(i, i + chunkSize); - sendEvent( - ws, - { - type: "response.function_call_arguments.delta", - response_id: responseId, - item_id: itemId, - output_index: outputIndex, - call_id: callId, - delta: chunk, - }, - isBeta, - ); + try { + sendEvent( + ws, + { + type: "response.function_call_arguments.delta", + response_id: responseId, + item_id: itemId, + output_index: outputIndex, + call_id: callId, + delta: chunk, + }, + isBeta, + ); + } catch (err) { + defaults.logger.debug( + "[ws-realtime] send failed during tool call streaming, closing", + err, + ); + break; + } eventIndex++; interruption?.tick(); if (interruption?.signal.aborted) { @@ -1307,18 +1327,23 @@ async function handleResponseCreate( } if (ws.isClosed) break; const chunk = content.slice(i, i + chunkSize); - sendEvent( - ws, - { - type: "response.output_text.delta", - response_id: responseId, - item_id: itemId, - output_index: outputIndex, - content_index: contentIndex, - delta: chunk, - }, - isBeta, - ); + try { + sendEvent( + ws, + { + type: "response.output_text.delta", + response_id: responseId, + item_id: itemId, + output_index: outputIndex, + content_index: contentIndex, + delta: chunk, + }, + isBeta, + ); + } catch (err) { + defaults.logger.debug("[ws-realtime] send failed during text streaming, closing", err); + break; + } eventIndex++; interruption?.tick(); if (interruption?.signal.aborted) { @@ -1509,18 +1534,26 @@ async function handleResponseCreate( } if (ws.isClosed) break; const chunk = args.slice(i, i + chunkSize); - sendEvent( - ws, - { - type: "response.function_call_arguments.delta", - response_id: responseId, - item_id: itemId, - output_index: tcIdx, - call_id: callId, - delta: chunk, - }, - isBeta, - ); + try { + sendEvent( + ws, + { + type: "response.function_call_arguments.delta", + response_id: responseId, + item_id: itemId, + output_index: tcIdx, + call_id: callId, + delta: chunk, + }, + isBeta, + ); + } catch (err) { + defaults.logger.debug( + "[ws-realtime] send failed during tool call streaming, closing", + err, + ); + break; + } eventIndex++; interruption?.tick(); if (interruption?.signal.aborted) {