Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 55 additions & 5 deletions packages/opencode/src/session/llm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,16 @@ import { Provider } from "@/provider/provider"
import * as Log from "@opencode-ai/core/util/log"
import { Context, Effect, Layer, Record } from "effect"
import * as Stream from "effect/Stream"
import { streamText, wrapLanguageModel, type ModelMessage, type Tool, tool, jsonSchema } from "ai"
import {
streamText,
wrapLanguageModel,
type LanguageModelUsage,
type ModelMessage,
type ProviderMetadata as AiProviderMetadata,
type Tool,
tool,
jsonSchema,
} from "ai"
import { mergeDeep } from "remeda"
import { GitLabWorkflowLanguageModel } from "gitlab-ai-provider"
import { ProviderTransform } from "@/provider/transform"
Expand All @@ -27,7 +36,48 @@ import * as OtelTracer from "@effect/opentelemetry/Tracer"

const log = Log.create({ service: "llm" })
export const OUTPUT_TOKEN_MAX = ProviderTransform.OUTPUT_TOKEN_MAX
type Result = Awaited<ReturnType<typeof streamText>>
export type ProviderMetadata = AiProviderMetadata
export type Usage = LanguageModelUsage

export type ToolOutput = {
title: string
metadata: Record<string, any>
output: string
attachments?: MessageV2.FilePart[]
}

export type Event =
| { type: "start" }
| { type: "reasoning-start"; id: string; providerMetadata?: ProviderMetadata }
| { type: "reasoning-delta"; id: string; text: string; providerMetadata?: ProviderMetadata }
| { type: "reasoning-end"; id: string; providerMetadata?: ProviderMetadata }
| { type: "tool-input-start"; id: string; toolName: string; providerExecuted?: boolean }
| { type: "tool-input-delta"; id: string; delta?: string }
| { type: "tool-input-end"; id: string }
| { type: "tool-call"; toolCallId: string; toolName: string; input: any; providerMetadata?: ProviderMetadata }
| { type: "tool-result"; toolCallId: string; output: ToolOutput }
| { type: "tool-error"; toolCallId: string; error: unknown }
| { type: "error"; error: unknown }
| { type: "start-step" }
| {
type: "finish-step"
finishReason: string
rawFinishReason?: string
usage: Usage
response?: unknown
providerMetadata?: ProviderMetadata
}
| { type: "text-start"; id?: string; providerMetadata?: ProviderMetadata }
| { type: "text-delta"; id?: string; text: string; providerMetadata?: ProviderMetadata }
| { type: "text-end"; id?: string; providerMetadata?: ProviderMetadata }
| {
type: "finish"
finishReason?: string
rawFinishReason?: string
usage?: Usage
totalUsage?: Usage
providerMetadata?: ProviderMetadata
}

// Avoid re-instantiating remeda's deep merge types in this hot LLM path; the runtime behavior is still mergeDeep.
const mergeOptions = (target: Record<string, any>, source: Record<string, any> | undefined): Record<string, any> =>
Expand All @@ -52,8 +102,6 @@ export type StreamRequest = StreamInput & {
abort: AbortSignal
}

export type Event = Result["fullStream"] extends AsyncIterable<infer T> ? T : never

export interface Interface {
readonly stream: (input: StreamInput) => Stream.Stream<Event, unknown>
}
Expand Down Expand Up @@ -427,7 +475,9 @@ const live: Layer.Layer<

const result = yield* run({ ...input, abort: ctrl.signal })

return Stream.fromAsyncIterable(result.fullStream, (e) => (e instanceof Error ? e : new Error(String(e))))
return Stream.fromAsyncIterable(result.fullStream as AsyncIterable<Event>, (e) =>
e instanceof Error ? e : new Error(String(e)),
)
}),
),
)
Expand Down
4 changes: 2 additions & 2 deletions packages/opencode/src/session/processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ export const layer: Layer.Layer<
text: value.output.output,
},
...(value.output.attachments?.map((item: MessageV2.FilePart) => ({
type: "file",
type: "file" as const,
uri: item.url,
mime: item.mime,
name: item.filename,
Expand Down Expand Up @@ -578,7 +578,7 @@ export const layer: Layer.Layer<
return

default:
slog.info("unhandled", { event: value.type, value })
slog.info("unhandled", { event: (value as { type: string }).type, value })
return
}
})
Expand Down
110 changes: 109 additions & 1 deletion packages/opencode/test/session/processor-effect.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { NodeFileSystem } from "@effect/platform-node"
import { expect } from "bun:test"
import { Cause, Effect, Exit, Fiber, Layer } from "effect"
import * as Stream from "effect/Stream"
import path from "path"
import type { Agent } from "../../src/agent/agent"
import { Agent as AgentSvc } from "../../src/agent/agent"
Expand All @@ -20,7 +21,7 @@ import { SessionSummary } from "../../src/session/summary"
import { Snapshot } from "../../src/snapshot"
import * as Log from "@opencode-ai/core/util/log"
import { CrossSpawnSpawner } from "@opencode-ai/core/cross-spawn-spawner"
import { provideTmpdirServer } from "../fixture/fixture"
import { provideTmpdirServer, TestInstance } from "../fixture/fixture"
import { testEffect } from "../lib/effect"
import { raw, reply, TestLLMServer } from "../lib/llm-server"

Expand Down Expand Up @@ -173,6 +174,37 @@ const env = Layer.mergeAll(

const it = testEffect(env)

function llmEvents() {
const queue: Array<LLM.Event[]> = []

return {
push(...events: LLM.Event[]) {
queue.push(events)
},
layer: Layer.succeed(
LLM.Service,
LLM.Service.of({
stream: () => Stream.make(...(queue.shift() ?? [])),
}),
),
}
}

const directLLM = llmEvents()
const directDeps = Layer.mergeAll(
Session.defaultLayer,
Snapshot.defaultLayer,
AgentSvc.defaultLayer,
Permission.defaultLayer,
Plugin.defaultLayer,
Config.defaultLayer,
directLLM.layer,
Provider.defaultLayer,
status,
).pipe(Layer.provideMerge(infra))
const directEnv = SessionProcessor.layer.pipe(Layer.provide(summary), Layer.provideMerge(directDeps))
const directIt = testEffect(directEnv)

const boot = Effect.fn("test.boot")(function* () {
const processors = yield* SessionProcessor.Service
const session = yield* Session.Service
Expand All @@ -184,6 +216,82 @@ const boot = Effect.fn("test.boot")(function* () {
// Tests
// ---------------------------------------------------------------------------

directIt.instance(
"session.processor effect tests consume explicit llm events",
Effect.gen(function* () {
const { directory: dir } = yield* TestInstance
const { processors, session, provider } = yield* boot()

directLLM.push(
{ type: "start" },
{ type: "start-step" },
{ type: "reasoning-start", id: "reason-0" },
{ type: "reasoning-delta", id: "reason-0", text: "think" },
{ type: "reasoning-end", id: "reason-0" },
{ type: "text-start", id: "text-0" },
{ type: "text-delta", id: "text-0", text: "hello" },
{ type: "text-end", id: "text-0" },
{
type: "finish-step",
finishReason: "stop",
usage: {
inputTokens: 1,
outputTokens: 1,
totalTokens: 2,
inputTokenDetails: {
noCacheTokens: undefined,
cacheReadTokens: undefined,
cacheWriteTokens: undefined,
},
outputTokenDetails: {
textTokens: undefined,
reasoningTokens: undefined,
},
},
},
{ type: "finish", finishReason: "stop" },
)

const chat = yield* session.create({})
const parent = yield* user(chat.id, "hi")
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
const mdl = yield* provider.getModel(ref.providerID, ref.modelID)
const handle = yield* processors.create({
assistantMessage: msg,
sessionID: chat.id,
model: mdl,
})

const value = yield* handle.process({
user: {
id: parent.id,
sessionID: chat.id,
role: "user",
time: parent.time,
agent: parent.agent,
model: { providerID: ref.providerID, modelID: ref.modelID },
} satisfies MessageV2.User,
sessionID: chat.id,
model: mdl,
agent: agent(),
system: [],
messages: [{ role: "user", content: "hi" }],
tools: {},
})

const parts = MessageV2.parts(msg.id)
const reasoning = parts.find((part): part is MessageV2.ReasoningPart => part.type === "reasoning")
const text = parts.find((part): part is MessageV2.TextPart => part.type === "text")
const finish = parts.find((part): part is MessageV2.StepFinishPart => part.type === "step-finish")

expect(value).toBe("continue")
expect(reasoning?.text).toBe("think")
expect(text?.text).toBe("hello")
expect(finish?.reason).toBe("stop")
}),
{ git: true, config: providerCfg("http://localhost:1/v1") },
)

it.live("session.processor effect tests capture llm input cleanly", () =>
provideTmpdirServer(
({ dir, llm }) =>
Expand Down
Loading