From a878036f63a510fcc6057058a8d8b4d5b2169bb2 Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Mon, 11 May 2026 17:44:09 -0400 Subject: [PATCH 1/5] feat(session): add native OpenAI runtime opt-in --- packages/opencode/src/session/llm-native.ts | 10 +- packages/opencode/src/session/llm.ts | 194 ++++++++++++-------- packages/opencode/test/session/llm.test.ts | 116 +++++++++++- 3 files changed, 241 insertions(+), 79 deletions(-) diff --git a/packages/opencode/src/session/llm-native.ts b/packages/opencode/src/session/llm-native.ts index 74be5988ee2f..817f29832dae 100644 --- a/packages/opencode/src/session/llm-native.ts +++ b/packages/opencode/src/session/llm-native.ts @@ -11,6 +11,8 @@ type ToolInput = { export type RequestInput = { readonly model: Provider.Model + readonly apiKey?: string + readonly baseURL?: string readonly system?: readonly string[] readonly messages: readonly ModelMessage[] readonly tools?: Record @@ -154,14 +156,16 @@ const baseURL = (model: Provider.Model) => { throw new Error(`Native LLM request adapter requires a base URL for ${model.providerID}/${model.id}`) } -export const model = (model: Provider.Model, headers?: Record) => { +export const model = (input: Provider.Model | RequestInput, headers?: Record) => { + const model = "model" in input ? input.model : input const route = ROUTE[model.api.npm] if (!route) throw new Error(`Native LLM request adapter does not support provider package ${model.api.npm}`) return LLM.model({ id: model.api.id, provider: model.providerID, route, - baseURL: baseURL(model), + baseURL: "model" in input && input.baseURL ? input.baseURL : baseURL(model), + apiKey: "model" in input ? input.apiKey : undefined, headers: Object.keys({ ...model.headers, ...headers }).length === 0 ? undefined : { ...model.headers, ...headers }, limits: { context: model.limit.context, @@ -173,7 +177,7 @@ export const model = (model: Provider.Model, headers?: Record) = export const request = (input: RequestInput) => { const converted = messages(input.messages) return LLM.request({ - model: model(input.model, input.headers), + model: model(input, input.headers), system: [...(input.system ?? []).map(SystemPart.make), ...converted.system], messages: converted.messages, tools: tools(input.tools), diff --git a/packages/opencode/src/session/llm.ts b/packages/opencode/src/session/llm.ts index 28e05aba8f97..3460a04c025b 100644 --- a/packages/opencode/src/session/llm.ts +++ b/packages/opencode/src/session/llm.ts @@ -4,6 +4,7 @@ import { Context, Effect, Layer, Record } from "effect" import * as Stream from "effect/Stream" import { streamText, wrapLanguageModel, type ModelMessage, type Tool, tool, jsonSchema } from "ai" import type { LLMEvent } from "@opencode-ai/llm" +import { LLMClient, RequestExecutor } from "@opencode-ai/llm/route" import { mergeDeep } from "remeda" import { GitLabWorkflowLanguageModel } from "gitlab-ai-provider" import { ProviderTransform } from "@/provider/transform" @@ -20,12 +21,12 @@ import { Bus } from "@/bus" import { Wildcard } from "@/util/wildcard" import { SessionID } from "@/session/schema" import { Auth } from "@/auth" -import { Installation } from "@/installation" import { InstallationVersion } from "@opencode-ai/core/installation/version" import { EffectBridge } from "@/effect/bridge" import * as Option from "effect/Option" import * as OtelTracer from "@effect/opentelemetry/Tracer" import { LLMAISDK } from "./llm-ai-sdk" +import { LLMNative } from "./llm-native" const log = Log.create({ service: "llm" }) export const OUTPUT_TOKEN_MAX = ProviderTransform.OUTPUT_TOKEN_MAX @@ -34,6 +35,8 @@ export const OUTPUT_TOKEN_MAX = ProviderTransform.OUTPUT_TOKEN_MAX const mergeOptions = (target: Record, source: Record | undefined): Record => mergeDeep(target, source ?? {}) as Record +const runtime = () => (process.env.OPENCODE_LLM_RUNTIME === "native" ? "native" : "ai-sdk") + export type StreamInput = { user: MessageV2.User sessionID: string @@ -333,86 +336,123 @@ const live: Layer.Layer< ? (yield* InstanceState.context).project.id : undefined - return streamText({ - onError(error) { - l.error("stream error", { - error, - }) - }, - async experimental_repairToolCall(failed) { - const lower = failed.toolCall.toolName.toLowerCase() - if (lower !== failed.toolCall.toolName && sortedTools[lower]) { - l.info("repairing tool call", { - tool: failed.toolCall.toolName, - repaired: lower, - }) - return { - ...failed.toolCall, - toolName: lower, + const requestHeaders = { + ...(input.model.providerID.startsWith("opencode") + ? { + ...(opencodeProjectID ? { "x-opencode-project": opencodeProjectID } : {}), + "x-opencode-session": input.sessionID, + "x-opencode-request": input.user.id, + "x-opencode-client": Flag.OPENCODE_CLIENT, + "User-Agent": `opencode/${InstallationVersion}`, } - } - return { - ...failed.toolCall, - input: JSON.stringify({ - tool: failed.toolCall.toolName, - error: failed.error.message, + : { + "x-session-affinity": input.sessionID, + ...(input.parentSessionID ? { "x-parent-session-id": input.parentSessionID } : {}), + "User-Agent": `opencode/${InstallationVersion}`, }), - toolName: "invalid", - } - }, - temperature: params.temperature, - topP: params.topP, - topK: params.topK, - providerOptions: ProviderTransform.providerOptions(input.model, params.options), - activeTools: Object.keys(sortedTools).filter((x) => x !== "invalid"), - tools: sortedTools, - toolChoice: input.toolChoice, - maxOutputTokens: params.maxOutputTokens, - abortSignal: input.abort, - headers: { - ...(input.model.providerID.startsWith("opencode") - ? { - "x-opencode-project": opencodeProjectID, - "x-opencode-session": input.sessionID, - "x-opencode-request": input.user.id, - "x-opencode-client": Flag.OPENCODE_CLIENT, - "User-Agent": `opencode/${InstallationVersion}`, + ...input.model.headers, + ...headers, + } + + if (runtime() === "native") { + if (input.model.providerID !== "openai" || input.model.api.npm !== "@ai-sdk/openai") { + return yield* Effect.fail(new Error("Native LLM runtime currently only supports OpenAI models")) + } + if (Object.keys(sortedTools).length > 0) { + return yield* Effect.fail(new Error("Native LLM runtime does not support tools yet")) + } + const apiKey = + info?.type === "api" ? info.key : typeof item.options.apiKey === "string" ? item.options.apiKey : undefined + if (!apiKey) return yield* Effect.fail(new Error("Native LLM runtime requires API key auth for OpenAI")) + const baseURL = typeof item.options.baseURL === "string" ? item.options.baseURL : undefined + return { + type: "native" as const, + stream: LLMClient.stream( + LLMNative.request({ + model: input.model, + apiKey, + baseURL, + system: isOpenaiOauth ? system : [], + messages: ProviderTransform.message(messages, input.model, options), + toolChoice: input.toolChoice, + temperature: params.temperature, + topP: params.topP, + topK: params.topK, + maxOutputTokens: params.maxOutputTokens, + providerOptions: ProviderTransform.providerOptions(input.model, params.options), + headers: requestHeaders, + }), + ).pipe(Stream.provide(LLMClient.layer), Stream.provide(RequestExecutor.defaultLayer)), + } + } + + return { + type: "ai-sdk" as const, + result: streamText({ + onError(error) { + l.error("stream error", { + error, + }) + }, + async experimental_repairToolCall(failed) { + const lower = failed.toolCall.toolName.toLowerCase() + if (lower !== failed.toolCall.toolName && sortedTools[lower]) { + l.info("repairing tool call", { + tool: failed.toolCall.toolName, + repaired: lower, + }) + return { + ...failed.toolCall, + toolName: lower, } - : { - "x-session-affinity": input.sessionID, - ...(input.parentSessionID ? { "x-parent-session-id": input.parentSessionID } : {}), - "User-Agent": `opencode/${InstallationVersion}`, + } + return { + ...failed.toolCall, + input: JSON.stringify({ + tool: failed.toolCall.toolName, + error: failed.error.message, }), - ...input.model.headers, - ...headers, - }, - maxRetries: input.retries ?? 0, - messages, - model: wrapLanguageModel({ - model: language, - middleware: [ - { - specificationVersion: "v3" as const, - async transformParams(args) { - if (args.type === "stream") { - // @ts-expect-error - args.params.prompt = ProviderTransform.message(args.params.prompt, input.model, options) - } - return args.params + toolName: "invalid", + } + }, + temperature: params.temperature, + topP: params.topP, + topK: params.topK, + providerOptions: ProviderTransform.providerOptions(input.model, params.options), + activeTools: Object.keys(sortedTools).filter((x) => x !== "invalid"), + tools: sortedTools, + toolChoice: input.toolChoice, + maxOutputTokens: params.maxOutputTokens, + abortSignal: input.abort, + headers: requestHeaders, + maxRetries: input.retries ?? 0, + messages, + model: wrapLanguageModel({ + model: language, + middleware: [ + { + specificationVersion: "v3" as const, + async transformParams(args) { + if (args.type === "stream") { + // @ts-expect-error + args.params.prompt = ProviderTransform.message(args.params.prompt, input.model, options) + } + return args.params + }, }, + ], + }), + experimental_telemetry: { + isEnabled: cfg.experimental?.openTelemetry, + functionId: "session.llm", + tracer: telemetryTracer, + metadata: { + userId: cfg.username ?? "unknown", + sessionId: input.sessionID, }, - ], - }), - experimental_telemetry: { - isEnabled: cfg.experimental?.openTelemetry, - functionId: "session.llm", - tracer: telemetryTracer, - metadata: { - userId: cfg.username ?? "unknown", - sessionId: input.sessionID, }, - }, - }) + }), + } }) const stream: Interface["stream"] = (input) => @@ -426,8 +466,12 @@ const live: Layer.Layer< const result = yield* run({ ...input, abort: ctrl.signal }) + if (result.type === "native") return result.stream + const state = LLMAISDK.adapterState() - return Stream.fromAsyncIterable(result.fullStream, (e) => (e instanceof Error ? e : new Error(String(e)))).pipe( + return Stream.fromAsyncIterable(result.result.fullStream, (e) => + e instanceof Error ? e : new Error(String(e)), + ).pipe( Stream.mapEffect((event) => LLMAISDK.toLLMEvents(state, event)), Stream.flatMap((events) => Stream.fromIterable(events)), ) diff --git a/packages/opencode/test/session/llm.test.ts b/packages/opencode/test/session/llm.test.ts index f078e352acc1..16a693dfa017 100644 --- a/packages/opencode/test/session/llm.test.ts +++ b/packages/opencode/test/session/llm.test.ts @@ -5,7 +5,6 @@ import { Cause, Effect, Exit, Stream } from "effect" import z from "zod" import { makeRuntime } from "../../src/effect/run-service" import { LLM } from "../../src/session/llm" -import { Instance } from "../../src/project/instance" import { WithInstance } from "../../src/project/with-instance" import { Provider } from "@/provider/provider" import { ProviderTransform } from "@/provider/transform" @@ -688,6 +687,121 @@ describe("session.llm.stream", () => { }) }) + test("streams OpenAI through native runtime when opted in", async () => { + const server = state.server + if (!server) { + throw new Error("Server not initialized") + } + + const source = await loadFixture("openai", "gpt-5.2") + const model = source.model + const chunks = [ + { + type: "response.created", + response: { + id: "resp-native", + }, + }, + { + type: "response.output_item.added", + item: { type: "message", id: "item-native", status: "in_progress" }, + }, + { + type: "response.output_text.delta", + item_id: "item-native", + delta: "Hello native", + }, + { + type: "response.completed", + response: { + incomplete_details: null, + usage: { + input_tokens: 1, + input_tokens_details: null, + output_tokens: 1, + output_tokens_details: null, + }, + }, + }, + ] + const request = waitRequest("/responses", createEventResponse(chunks, true)) + + await using tmp = await tmpdir({ + init: async (dir) => { + await Bun.write( + path.join(dir, "opencode.json"), + JSON.stringify({ + $schema: "https://opencode.ai/config.json", + enabled_providers: ["openai"], + provider: { + openai: { + name: "OpenAI", + env: ["OPENAI_API_KEY"], + npm: "@ai-sdk/openai", + api: "https://api.openai.com/v1", + models: { + [model.id]: model, + }, + options: { + apiKey: "test-openai-key", + baseURL: `${server.url.origin}/v1`, + }, + }, + }, + }), + ) + }, + }) + + await WithInstance.provide({ + directory: tmp.path, + fn: async () => { + const previous = process.env.OPENCODE_LLM_RUNTIME + process.env.OPENCODE_LLM_RUNTIME = "native" + try { + const resolved = await getModel(ProviderID.openai, ModelID.make(model.id)) + const sessionID = SessionID.make("session-test-native") + const agent = { + name: "test", + mode: "primary", + options: {}, + permission: [{ permission: "*", pattern: "*", action: "allow" }], + temperature: 0.2, + } satisfies Agent.Info + + await drain({ + user: { + id: MessageID.make("msg_user-native"), + sessionID, + role: "user", + time: { created: Date.now() }, + agent: agent.name, + model: { providerID: ProviderID.make("openai"), modelID: resolved.id, variant: "high" }, + } satisfies MessageV2.User, + sessionID, + model: resolved, + agent, + system: ["You are a helpful assistant."], + messages: [{ role: "user", content: "Hello" }], + tools: {}, + }) + } finally { + if (previous === undefined) delete process.env.OPENCODE_LLM_RUNTIME + else process.env.OPENCODE_LLM_RUNTIME = previous + } + + const capture = await request + expect(capture.url.pathname.endsWith("/responses")).toBe(true) + expect(capture.headers.get("Authorization")).toBe("Bearer test-openai-key") + expect(capture.body.model).toBe(model.id) + expect(capture.body.stream).toBe(true) + expect((capture.body.reasoning as { effort?: string } | undefined)?.effort).toBe("high") + expect(JSON.stringify(capture.body.input)).toContain("You are a helpful assistant.") + expect(capture.body.input).toContainEqual({ role: "user", content: [{ type: "input_text", text: "Hello" }] }) + }, + }) + }) + test("accepts user image attachments as data URLs for OpenAI models", async () => { const server = state.server if (!server) { From 03b60f7623946ad80f698319accc3b874930a63b Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Mon, 11 May 2026 19:42:38 -0400 Subject: [PATCH 2/5] fix(session): finish native request streams --- packages/opencode/src/session/processor.ts | 143 ++++++++++++--------- 1 file changed, 80 insertions(+), 63 deletions(-) diff --git a/packages/opencode/src/session/processor.ts b/packages/opencode/src/session/processor.ts index 2d78dc70e882..a207d4db74fa 100644 --- a/packages/opencode/src/session/processor.ts +++ b/packages/opencode/src/session/processor.ts @@ -293,12 +293,78 @@ export const layer: Layer.Layer< return { title: value.name, metadata: value.result.type === "json" && isRecord(value.result.value) ? value.result.value : {}, - output: typeof value.result.value === "string" ? value.result.value : (JSON.stringify(value.result.value) ?? ""), + output: + typeof value.result.value === "string" ? value.result.value : (JSON.stringify(value.result.value) ?? ""), } } const toolInput = (value: unknown): Record => (isRecord(value) ? value : { value }) + const finishStep = Effect.fn("SessionProcessor.finishStep")(function* ( + value: Extract, + ) { + const completedSnapshot = yield* snapshot.track() + yield* Effect.forEach(Object.keys(ctx.reasoningMap), finishReasoning) + const usage = Session.getUsage({ + model: ctx.model, + usage: value.usage ?? new Usage({}), + metadata: value.providerMetadata, + }) + if (!ctx.assistantMessage.summary) { + // TODO(v2): Temporary dual-write while migrating session messages to v2 events. + if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) { + yield* sync.run(SessionEvent.Step.Ended.Sync, { + sessionID: ctx.sessionID, + finish: value.reason, + cost: usage.cost, + tokens: usage.tokens, + snapshot: completedSnapshot, + timestamp: DateTime.makeUnsafe(Date.now()), + }) + } + } + ctx.assistantMessage.finish = value.reason + ctx.assistantMessage.cost += usage.cost + ctx.assistantMessage.tokens = usage.tokens + yield* session.updatePart({ + id: PartID.ascending(), + reason: value.reason, + snapshot: completedSnapshot, + messageID: ctx.assistantMessage.id, + sessionID: ctx.assistantMessage.sessionID, + type: "step-finish", + tokens: usage.tokens, + cost: usage.cost, + }) + yield* session.updateMessage(ctx.assistantMessage) + if (ctx.snapshot) { + const patch = yield* snapshot.patch(ctx.snapshot) + if (patch.files.length) { + yield* session.updatePart({ + id: PartID.ascending(), + messageID: ctx.assistantMessage.id, + sessionID: ctx.sessionID, + type: "patch", + hash: patch.hash, + files: patch.files, + }) + } + ctx.snapshot = undefined + } + yield* summary + .summarize({ + sessionID: ctx.sessionID, + messageID: ctx.assistantMessage.parentID, + }) + .pipe(Effect.ignore, Effect.forkIn(scope)) + if ( + !ctx.assistantMessage.summary && + isOverflow({ cfg: yield* config.get(), tokens: usage.tokens, model: ctx.model }) + ) { + ctx.needsCompaction = true + } + }) + const handleEvent = Effect.fnUntraced(function* (value: StreamEvent) { switch (value.type) { case "request-start": @@ -581,66 +647,7 @@ export const layer: Layer.Layer< return case "step-finish": { - const completedSnapshot = yield* snapshot.track() - yield* Effect.forEach(Object.keys(ctx.reasoningMap), finishReasoning) - const usage = Session.getUsage({ - model: ctx.model, - usage: value.usage ?? new Usage({}), - metadata: value.providerMetadata, - }) - if (!ctx.assistantMessage.summary) { - // TODO(v2): Temporary dual-write while migrating session messages to v2 events. - if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) { - yield* sync.run(SessionEvent.Step.Ended.Sync, { - sessionID: ctx.sessionID, - finish: value.reason, - cost: usage.cost, - tokens: usage.tokens, - snapshot: completedSnapshot, - timestamp: DateTime.makeUnsafe(Date.now()), - }) - } - } - ctx.assistantMessage.finish = value.reason - ctx.assistantMessage.cost += usage.cost - ctx.assistantMessage.tokens = usage.tokens - yield* session.updatePart({ - id: PartID.ascending(), - reason: value.reason, - snapshot: completedSnapshot, - messageID: ctx.assistantMessage.id, - sessionID: ctx.assistantMessage.sessionID, - type: "step-finish", - tokens: usage.tokens, - cost: usage.cost, - }) - yield* session.updateMessage(ctx.assistantMessage) - if (ctx.snapshot) { - const patch = yield* snapshot.patch(ctx.snapshot) - if (patch.files.length) { - yield* session.updatePart({ - id: PartID.ascending(), - messageID: ctx.assistantMessage.id, - sessionID: ctx.sessionID, - type: "patch", - hash: patch.hash, - files: patch.files, - }) - } - ctx.snapshot = undefined - } - yield* summary - .summarize({ - sessionID: ctx.sessionID, - messageID: ctx.assistantMessage.parentID, - }) - .pipe(Effect.ignore, Effect.forkIn(scope)) - if ( - !ctx.assistantMessage.summary && - isOverflow({ cfg: yield* config.get(), tokens: usage.tokens, model: ctx.model }) - ) { - ctx.needsCompaction = true - } + yield* finishStep(value) return } @@ -667,7 +674,17 @@ export const layer: Layer.Layer< return case "text-delta": - if (!ctx.currentText) return + if (!ctx.currentText) { + ctx.currentText = { + id: PartID.ascending(), + messageID: ctx.assistantMessage.id, + sessionID: ctx.assistantMessage.sessionID, + type: "text", + text: "", + time: { start: Date.now() }, + } + yield* session.updatePart(ctx.currentText) + } ctx.currentText.text += value.text yield* session.updatePartDelta({ sessionID: ctx.currentText.sessionID, @@ -711,8 +728,8 @@ export const layer: Layer.Layer< return case "request-finish": + if (!ctx.assistantMessage.finish) yield* finishStep(value) return - } }) From 4c75b463e78905207d1503d6049f31846c4b2c72 Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Mon, 11 May 2026 20:11:16 -0400 Subject: [PATCH 3/5] fix(session): finalize native text before request finish --- packages/opencode/src/session/processor.ts | 63 ++++++++++++---------- 1 file changed, 34 insertions(+), 29 deletions(-) diff --git a/packages/opencode/src/session/processor.ts b/packages/opencode/src/session/processor.ts index a207d4db74fa..1290cc545fee 100644 --- a/packages/opencode/src/session/processor.ts +++ b/packages/opencode/src/session/processor.ts @@ -365,6 +365,38 @@ export const layer: Layer.Layer< } }) + const finishText = Effect.fn("SessionProcessor.finishText")(function* ( + providerMetadata?: Extract["providerMetadata"], + ) { + if (!ctx.currentText) return + // oxlint-disable-next-line no-self-assign -- reactivity trigger + ctx.currentText.text = ctx.currentText.text + ctx.currentText.text = (yield* plugin.trigger( + "experimental.text.complete", + { + sessionID: ctx.sessionID, + messageID: ctx.assistantMessage.id, + partID: ctx.currentText.id, + }, + { text: ctx.currentText.text }, + )).text + if (!ctx.assistantMessage.summary) { + // TODO(v2): Temporary dual-write while migrating session messages to v2 events. + if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) { + yield* sync.run(SessionEvent.Text.Ended.Sync, { + sessionID: ctx.sessionID, + text: ctx.currentText.text, + timestamp: DateTime.makeUnsafe(Date.now()), + }) + } + } + const end = Date.now() + ctx.currentText.time = { start: ctx.currentText.time?.start ?? end, end } + if (providerMetadata) ctx.currentText.metadata = providerMetadata + yield* session.updatePart(ctx.currentText) + ctx.currentText = undefined + }) + const handleEvent = Effect.fnUntraced(function* (value: StreamEvent) { switch (value.type) { case "request-start": @@ -696,38 +728,11 @@ export const layer: Layer.Layer< return case "text-end": - if (!ctx.currentText) return - // oxlint-disable-next-line no-self-assign -- reactivity trigger - ctx.currentText.text = ctx.currentText.text - ctx.currentText.text = (yield* plugin.trigger( - "experimental.text.complete", - { - sessionID: ctx.sessionID, - messageID: ctx.assistantMessage.id, - partID: ctx.currentText.id, - }, - { text: ctx.currentText.text }, - )).text - if (!ctx.assistantMessage.summary) { - // TODO(v2): Temporary dual-write while migrating session messages to v2 events. - if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) { - yield* sync.run(SessionEvent.Text.Ended.Sync, { - sessionID: ctx.sessionID, - text: ctx.currentText.text, - timestamp: DateTime.makeUnsafe(Date.now()), - }) - } - } - { - const end = Date.now() - ctx.currentText.time = { start: ctx.currentText.time?.start ?? end, end } - } - if (value.providerMetadata) ctx.currentText.metadata = value.providerMetadata - yield* session.updatePart(ctx.currentText) - ctx.currentText = undefined + yield* finishText(value.providerMetadata) return case "request-finish": + yield* finishText() if (!ctx.assistantMessage.finish) yield* finishStep(value) return } From a6e8ec4b35d7894c85bdb9f45627bf5102f03631 Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Mon, 11 May 2026 21:33:52 -0400 Subject: [PATCH 4/5] fix(session): rely on native LLM lifecycle events --- packages/opencode/src/session/processor.ts | 206 +++++++++------------ 1 file changed, 92 insertions(+), 114 deletions(-) diff --git a/packages/opencode/src/session/processor.ts b/packages/opencode/src/session/processor.ts index 1290cc545fee..2d78dc70e882 100644 --- a/packages/opencode/src/session/processor.ts +++ b/packages/opencode/src/session/processor.ts @@ -293,110 +293,12 @@ export const layer: Layer.Layer< return { title: value.name, metadata: value.result.type === "json" && isRecord(value.result.value) ? value.result.value : {}, - output: - typeof value.result.value === "string" ? value.result.value : (JSON.stringify(value.result.value) ?? ""), + output: typeof value.result.value === "string" ? value.result.value : (JSON.stringify(value.result.value) ?? ""), } } const toolInput = (value: unknown): Record => (isRecord(value) ? value : { value }) - const finishStep = Effect.fn("SessionProcessor.finishStep")(function* ( - value: Extract, - ) { - const completedSnapshot = yield* snapshot.track() - yield* Effect.forEach(Object.keys(ctx.reasoningMap), finishReasoning) - const usage = Session.getUsage({ - model: ctx.model, - usage: value.usage ?? new Usage({}), - metadata: value.providerMetadata, - }) - if (!ctx.assistantMessage.summary) { - // TODO(v2): Temporary dual-write while migrating session messages to v2 events. - if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) { - yield* sync.run(SessionEvent.Step.Ended.Sync, { - sessionID: ctx.sessionID, - finish: value.reason, - cost: usage.cost, - tokens: usage.tokens, - snapshot: completedSnapshot, - timestamp: DateTime.makeUnsafe(Date.now()), - }) - } - } - ctx.assistantMessage.finish = value.reason - ctx.assistantMessage.cost += usage.cost - ctx.assistantMessage.tokens = usage.tokens - yield* session.updatePart({ - id: PartID.ascending(), - reason: value.reason, - snapshot: completedSnapshot, - messageID: ctx.assistantMessage.id, - sessionID: ctx.assistantMessage.sessionID, - type: "step-finish", - tokens: usage.tokens, - cost: usage.cost, - }) - yield* session.updateMessage(ctx.assistantMessage) - if (ctx.snapshot) { - const patch = yield* snapshot.patch(ctx.snapshot) - if (patch.files.length) { - yield* session.updatePart({ - id: PartID.ascending(), - messageID: ctx.assistantMessage.id, - sessionID: ctx.sessionID, - type: "patch", - hash: patch.hash, - files: patch.files, - }) - } - ctx.snapshot = undefined - } - yield* summary - .summarize({ - sessionID: ctx.sessionID, - messageID: ctx.assistantMessage.parentID, - }) - .pipe(Effect.ignore, Effect.forkIn(scope)) - if ( - !ctx.assistantMessage.summary && - isOverflow({ cfg: yield* config.get(), tokens: usage.tokens, model: ctx.model }) - ) { - ctx.needsCompaction = true - } - }) - - const finishText = Effect.fn("SessionProcessor.finishText")(function* ( - providerMetadata?: Extract["providerMetadata"], - ) { - if (!ctx.currentText) return - // oxlint-disable-next-line no-self-assign -- reactivity trigger - ctx.currentText.text = ctx.currentText.text - ctx.currentText.text = (yield* plugin.trigger( - "experimental.text.complete", - { - sessionID: ctx.sessionID, - messageID: ctx.assistantMessage.id, - partID: ctx.currentText.id, - }, - { text: ctx.currentText.text }, - )).text - if (!ctx.assistantMessage.summary) { - // TODO(v2): Temporary dual-write while migrating session messages to v2 events. - if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) { - yield* sync.run(SessionEvent.Text.Ended.Sync, { - sessionID: ctx.sessionID, - text: ctx.currentText.text, - timestamp: DateTime.makeUnsafe(Date.now()), - }) - } - } - const end = Date.now() - ctx.currentText.time = { start: ctx.currentText.time?.start ?? end, end } - if (providerMetadata) ctx.currentText.metadata = providerMetadata - yield* session.updatePart(ctx.currentText) - ctx.currentText = undefined - }) - const handleEvent = Effect.fnUntraced(function* (value: StreamEvent) { switch (value.type) { case "request-start": @@ -679,7 +581,66 @@ export const layer: Layer.Layer< return case "step-finish": { - yield* finishStep(value) + const completedSnapshot = yield* snapshot.track() + yield* Effect.forEach(Object.keys(ctx.reasoningMap), finishReasoning) + const usage = Session.getUsage({ + model: ctx.model, + usage: value.usage ?? new Usage({}), + metadata: value.providerMetadata, + }) + if (!ctx.assistantMessage.summary) { + // TODO(v2): Temporary dual-write while migrating session messages to v2 events. + if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) { + yield* sync.run(SessionEvent.Step.Ended.Sync, { + sessionID: ctx.sessionID, + finish: value.reason, + cost: usage.cost, + tokens: usage.tokens, + snapshot: completedSnapshot, + timestamp: DateTime.makeUnsafe(Date.now()), + }) + } + } + ctx.assistantMessage.finish = value.reason + ctx.assistantMessage.cost += usage.cost + ctx.assistantMessage.tokens = usage.tokens + yield* session.updatePart({ + id: PartID.ascending(), + reason: value.reason, + snapshot: completedSnapshot, + messageID: ctx.assistantMessage.id, + sessionID: ctx.assistantMessage.sessionID, + type: "step-finish", + tokens: usage.tokens, + cost: usage.cost, + }) + yield* session.updateMessage(ctx.assistantMessage) + if (ctx.snapshot) { + const patch = yield* snapshot.patch(ctx.snapshot) + if (patch.files.length) { + yield* session.updatePart({ + id: PartID.ascending(), + messageID: ctx.assistantMessage.id, + sessionID: ctx.sessionID, + type: "patch", + hash: patch.hash, + files: patch.files, + }) + } + ctx.snapshot = undefined + } + yield* summary + .summarize({ + sessionID: ctx.sessionID, + messageID: ctx.assistantMessage.parentID, + }) + .pipe(Effect.ignore, Effect.forkIn(scope)) + if ( + !ctx.assistantMessage.summary && + isOverflow({ cfg: yield* config.get(), tokens: usage.tokens, model: ctx.model }) + ) { + ctx.needsCompaction = true + } return } @@ -706,17 +667,7 @@ export const layer: Layer.Layer< return case "text-delta": - if (!ctx.currentText) { - ctx.currentText = { - id: PartID.ascending(), - messageID: ctx.assistantMessage.id, - sessionID: ctx.assistantMessage.sessionID, - type: "text", - text: "", - time: { start: Date.now() }, - } - yield* session.updatePart(ctx.currentText) - } + if (!ctx.currentText) return ctx.currentText.text += value.text yield* session.updatePartDelta({ sessionID: ctx.currentText.sessionID, @@ -728,13 +679,40 @@ export const layer: Layer.Layer< return case "text-end": - yield* finishText(value.providerMetadata) + if (!ctx.currentText) return + // oxlint-disable-next-line no-self-assign -- reactivity trigger + ctx.currentText.text = ctx.currentText.text + ctx.currentText.text = (yield* plugin.trigger( + "experimental.text.complete", + { + sessionID: ctx.sessionID, + messageID: ctx.assistantMessage.id, + partID: ctx.currentText.id, + }, + { text: ctx.currentText.text }, + )).text + if (!ctx.assistantMessage.summary) { + // TODO(v2): Temporary dual-write while migrating session messages to v2 events. + if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) { + yield* sync.run(SessionEvent.Text.Ended.Sync, { + sessionID: ctx.sessionID, + text: ctx.currentText.text, + timestamp: DateTime.makeUnsafe(Date.now()), + }) + } + } + { + const end = Date.now() + ctx.currentText.time = { start: ctx.currentText.time?.start ?? end, end } + } + if (value.providerMetadata) ctx.currentText.metadata = value.providerMetadata + yield* session.updatePart(ctx.currentText) + ctx.currentText = undefined return case "request-finish": - yield* finishText() - if (!ctx.assistantMessage.finish) yield* finishStep(value) return + } }) From a2b0ef98237380d9403a5430529d0054cca822bf Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Mon, 11 May 2026 21:43:24 -0400 Subject: [PATCH 5/5] feat(session): execute tools in native LLM runtime --- packages/llm/src/tool-runtime.ts | 8 +- packages/llm/src/tool.ts | 11 +- packages/opencode/src/session/llm.ts | 76 ++++++++---- packages/opencode/test/session/llm.test.ts | 128 +++++++++++++++++++++ 4 files changed, 195 insertions(+), 28 deletions(-) diff --git a/packages/llm/src/tool-runtime.ts b/packages/llm/src/tool-runtime.ts index f46452582703..a875d2e43861 100644 --- a/packages/llm/src/tool-runtime.ts +++ b/packages/llm/src/tool-runtime.ts @@ -200,17 +200,17 @@ const dispatch = (tools: Tools, call: ToolCallPart): Effect.Effect Effect.succeed({ type: "error" as const, value: failure.message } satisfies ToolResultValue), ), ) } -const decodeAndExecute = (tool: AnyTool, input: unknown): Effect.Effect => - tool._decode(input).pipe( +const decodeAndExecute = (tool: AnyTool, call: ToolCallPart): Effect.Effect => + tool._decode(call.input).pipe( Effect.mapError((error) => new ToolFailure({ message: `Invalid tool input: ${error.message}` })), - Effect.flatMap((decoded) => tool.execute!(decoded)), + Effect.flatMap((decoded) => tool.execute!(decoded, { id: call.id, name: call.name })), Effect.flatMap((value) => tool._encode(value).pipe( Effect.mapError( diff --git a/packages/llm/src/tool.ts b/packages/llm/src/tool.ts index 311c8798b6fa..14e22688aea8 100644 --- a/packages/llm/src/tool.ts +++ b/packages/llm/src/tool.ts @@ -11,6 +11,7 @@ export type ToolSchema = Schema.Codec export type ToolExecute, Success extends ToolSchema> = ( params: Schema.Schema.Type, + context?: { readonly id: string; readonly name: string }, ) => Effect.Effect, ToolFailure> /** @@ -61,7 +62,10 @@ type TypedToolConfig = { type DynamicToolConfig = { readonly description: string readonly jsonSchema: JsonSchema.JsonSchema - readonly execute?: (params: unknown) => Effect.Effect + readonly execute?: ( + params: unknown, + context?: { readonly id: string; readonly name: string }, + ) => Effect.Effect } /** @@ -110,7 +114,10 @@ export function make, Success extends ToolSch export function make(config: { readonly description: string readonly jsonSchema: JsonSchema.JsonSchema - readonly execute: (params: unknown) => Effect.Effect + readonly execute: ( + params: unknown, + context?: { readonly id: string; readonly name: string }, + ) => Effect.Effect }): AnyExecutableTool export function make(config: { readonly description: string diff --git a/packages/opencode/src/session/llm.ts b/packages/opencode/src/session/llm.ts index 3460a04c025b..32fb4cf88062 100644 --- a/packages/opencode/src/session/llm.ts +++ b/packages/opencode/src/session/llm.ts @@ -2,8 +2,8 @@ import { Provider } from "@/provider/provider" import * as Log from "@opencode-ai/core/util/log" import { Context, Effect, Layer, Record } from "effect" import * as Stream from "effect/Stream" -import { streamText, wrapLanguageModel, type ModelMessage, type Tool, tool, jsonSchema } from "ai" -import type { LLMEvent } from "@opencode-ai/llm" +import { streamText, wrapLanguageModel, type ModelMessage, type Tool, tool as aiTool, jsonSchema, asSchema } from "ai" +import { tool as nativeTool, ToolFailure, type JsonSchema, type LLMEvent } from "@opencode-ai/llm" import { LLMClient, RequestExecutor } from "@opencode-ai/llm/route" import { mergeDeep } from "remeda" import { GitLabWorkflowLanguageModel } from "gitlab-ai-provider" @@ -18,6 +18,7 @@ import { Flag } from "@opencode-ai/core/flag/flag" import { Permission } from "@/permission" import { PermissionID } from "@/permission/schema" import { Bus } from "@/bus" +import { errorMessage } from "@/util/error" import { Wildcard } from "@/util/wildcard" import { SessionID } from "@/session/schema" import { Auth } from "@/auth" @@ -216,7 +217,7 @@ const live: Layer.Layer< Object.keys(tools).length === 0 && hasToolCalls(input.messages) ) { - tools["_noop"] = tool({ + tools["_noop"] = aiTool({ description: "Do not call this tool. It exists only for API compatibility and must never be invoked.", inputSchema: jsonSchema({ type: "object", @@ -358,31 +359,31 @@ const live: Layer.Layer< if (input.model.providerID !== "openai" || input.model.api.npm !== "@ai-sdk/openai") { return yield* Effect.fail(new Error("Native LLM runtime currently only supports OpenAI models")) } - if (Object.keys(sortedTools).length > 0) { - return yield* Effect.fail(new Error("Native LLM runtime does not support tools yet")) - } const apiKey = info?.type === "api" ? info.key : typeof item.options.apiKey === "string" ? item.options.apiKey : undefined if (!apiKey) return yield* Effect.fail(new Error("Native LLM runtime requires API key auth for OpenAI")) const baseURL = typeof item.options.baseURL === "string" ? item.options.baseURL : undefined + const request = LLMNative.request({ + model: input.model, + apiKey, + baseURL, + system: isOpenaiOauth ? system : [], + messages: ProviderTransform.message(messages, input.model, options), + tools: sortedTools, + toolChoice: input.toolChoice, + temperature: params.temperature, + topP: params.topP, + topK: params.topK, + maxOutputTokens: params.maxOutputTokens, + providerOptions: ProviderTransform.providerOptions(input.model, params.options), + headers: requestHeaders, + }) return { type: "native" as const, - stream: LLMClient.stream( - LLMNative.request({ - model: input.model, - apiKey, - baseURL, - system: isOpenaiOauth ? system : [], - messages: ProviderTransform.message(messages, input.model, options), - toolChoice: input.toolChoice, - temperature: params.temperature, - topP: params.topP, - topK: params.topK, - maxOutputTokens: params.maxOutputTokens, - providerOptions: ProviderTransform.providerOptions(input.model, params.options), - headers: requestHeaders, - }), - ).pipe(Stream.provide(LLMClient.layer), Stream.provide(RequestExecutor.defaultLayer)), + stream: LLMClient.stream({ request, tools: nativeTools(sortedTools, input) }).pipe( + Stream.provide(LLMClient.layer), + Stream.provide(RequestExecutor.defaultLayer), + ), } } @@ -502,6 +503,37 @@ function resolveTools(input: Pick input.user.tools?.[k] !== false && !disabled.has(k)) } +function nativeSchema(value: unknown): JsonSchema { + if (!value || typeof value !== "object") return { type: "object", properties: {} } + if ("jsonSchema" in value && value.jsonSchema && typeof value.jsonSchema === "object") + return value.jsonSchema as JsonSchema + return asSchema(value as Parameters[0]).jsonSchema as JsonSchema +} + +function nativeTools(tools: Record, input: StreamRequest) { + return Object.fromEntries( + Object.entries(tools).map(([name, item]) => [ + name, + nativeTool({ + description: item.description ?? "", + jsonSchema: nativeSchema(item.inputSchema), + execute: (args: unknown, ctx?: { readonly id: string; readonly name: string }) => + Effect.tryPromise({ + try: () => { + if (!item.execute) throw new Error(`Tool has no execute handler: ${name}`) + return item.execute(args, { + toolCallId: ctx?.id ?? name, + messages: input.messages, + abortSignal: input.abort, + }) + }, + catch: (error) => new ToolFailure({ message: errorMessage(error) }), + }), + }), + ]), + ) +} + // Check if messages contain any tool-call content // Used to determine if a dummy tool should be added for LiteLLM proxy compatibility export function hasToolCalls(messages: ModelMessage[]): boolean { diff --git a/packages/opencode/test/session/llm.test.ts b/packages/opencode/test/session/llm.test.ts index 16a693dfa017..2054d05343c2 100644 --- a/packages/opencode/test/session/llm.test.ts +++ b/packages/opencode/test/session/llm.test.ts @@ -802,6 +802,134 @@ describe("session.llm.stream", () => { }) }) + test("executes OpenAI tool calls through native runtime", async () => { + const server = state.server + if (!server) { + throw new Error("Server not initialized") + } + + const source = await loadFixture("openai", "gpt-5.2") + const model = source.model + const chunks = [ + { + type: "response.output_item.added", + item: { type: "function_call", id: "item-native-tool", call_id: "call-native-tool", name: "lookup" }, + }, + { + type: "response.function_call_arguments.delta", + item_id: "item-native-tool", + delta: '{"query":"weather"}', + }, + { + type: "response.output_item.done", + item: { + type: "function_call", + id: "item-native-tool", + call_id: "call-native-tool", + name: "lookup", + arguments: '{"query":"weather"}', + }, + }, + { + type: "response.completed", + response: { incomplete_details: null, usage: { input_tokens: 1, output_tokens: 1 } }, + }, + ] + const request = waitRequest("/responses", createEventResponse(chunks, true)) + let executed: unknown + + await using tmp = await tmpdir({ + init: async (dir) => { + await Bun.write( + path.join(dir, "opencode.json"), + JSON.stringify({ + $schema: "https://opencode.ai/config.json", + enabled_providers: ["openai"], + provider: { + openai: { + name: "OpenAI", + env: ["OPENAI_API_KEY"], + npm: "@ai-sdk/openai", + api: "https://api.openai.com/v1", + models: { + [model.id]: model, + }, + options: { + apiKey: "test-openai-key", + baseURL: `${server.url.origin}/v1`, + }, + }, + }, + }), + ) + }, + }) + + await WithInstance.provide({ + directory: tmp.path, + fn: async () => { + const previous = process.env.OPENCODE_LLM_RUNTIME + process.env.OPENCODE_LLM_RUNTIME = "native" + try { + const resolved = await getModel(ProviderID.openai, ModelID.make(model.id)) + const sessionID = SessionID.make("session-test-native-tool") + const agent = { + name: "test", + mode: "primary", + options: {}, + permission: [{ permission: "*", pattern: "*", action: "allow" }], + } satisfies Agent.Info + + await drain({ + user: { + id: MessageID.make("msg_user-native-tool"), + sessionID, + role: "user", + time: { created: Date.now() }, + agent: agent.name, + model: { providerID: ProviderID.make("openai"), modelID: resolved.id }, + } satisfies MessageV2.User, + sessionID, + model: resolved, + agent, + system: [], + messages: [{ role: "user", content: "Use lookup" }], + tools: { + lookup: tool({ + description: "Lookup data", + inputSchema: z.object({ query: z.string() }), + execute: async (args, options) => { + executed = { args, toolCallId: options.toolCallId } + return { output: "looked up" } + }, + }), + }, + }) + } finally { + if (previous === undefined) delete process.env.OPENCODE_LLM_RUNTIME + else process.env.OPENCODE_LLM_RUNTIME = previous + } + + const capture = await request + expect(capture.body.tools).toEqual([ + { + type: "function", + name: "lookup", + description: "Lookup data", + parameters: { + type: "object", + properties: { query: { type: "string" } }, + required: ["query"], + additionalProperties: false, + $schema: "http://json-schema.org/draft-07/schema#", + }, + }, + ]) + expect(executed).toEqual({ args: { query: "weather" }, toolCallId: "call-native-tool" }) + }, + }) + }) + test("accepts user image attachments as data URLs for OpenAI models", async () => { const server = state.server if (!server) {