Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
304 changes: 146 additions & 158 deletions packages/opencode/test/session/session.test.ts
Original file line number Diff line number Diff line change
@@ -1,186 +1,174 @@
import { describe, expect, test } from "bun:test"
import path from "path"
import { describe, expect } from "bun:test"
import { Deferred, Effect, Exit, Layer } from "effect"
import { Session as SessionNs } from "@/session/session"
import { Bus } from "../../src/bus"
import { GlobalBus, type GlobalEvent } from "../../src/bus/global"
import * as Log from "@opencode-ai/core/util/log"
import { Instance } from "../../src/project/instance"
import { WithInstance } from "../../src/project/with-instance"
import { Flag } from "@opencode-ai/core/flag/flag"
import { MessageV2 } from "../../src/session/message-v2"
import { MessageID, PartID, type SessionID } from "../../src/session/schema"
import { AppRuntime } from "../../src/effect/app-runtime"
import { tmpdir } from "../fixture/fixture"
import { CrossSpawnSpawner } from "@opencode-ai/core/cross-spawn-spawner"
import { provideInstance, tmpdirScoped } from "../fixture/fixture"
import { testEffect } from "../lib/effect"

const projectRoot = path.join(__dirname, "../..")
void Log.init({ print: false })

function create(input?: SessionNs.CreateInput) {
return AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.create(input)))
}

function get(id: SessionID) {
return AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.get(id)))
}
const it = testEffect(Layer.mergeAll(SessionNs.defaultLayer, CrossSpawnSpawner.defaultLayer))

function remove(id: SessionID) {
return AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.remove(id)))
}
const awaitDeferred = <T>(deferred: Deferred.Deferred<T>, message: string) =>
Effect.race(
Deferred.await(deferred),
Effect.sleep("2 seconds").pipe(Effect.flatMap(() => Effect.fail(new Error(message)))),
)

function updateMessage<T extends MessageV2.Info>(msg: T) {
return AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.updateMessage(msg)))
}
const remove = (id: SessionID) => SessionNs.Service.use((svc) => svc.remove(id))

function updatePart<T extends MessageV2.Part>(part: T) {
return AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.updatePart(part)))
const subscribeGlobal = (type: string, callback: (event: NonNullable<GlobalEvent["payload"]>) => void) => {
const listener = (event: GlobalEvent) => {
if (event.payload?.type === type) callback(event.payload)
}
GlobalBus.on("event", listener)
return () => GlobalBus.off("event", listener)
}

describe("session.created event", () => {
test("should emit session.created event when session is created", async () => {
await WithInstance.provide({
directory: projectRoot,
fn: async () => {
let eventReceived = false
let receivedInfo: SessionNs.Info | undefined

const unsub = Bus.subscribe(SessionNs.Event.Created, (event) => {
eventReceived = true
receivedInfo = event.properties.info as SessionNs.Info
})
it.instance("should emit session.created event when session is created", () =>
Effect.gen(function* () {
const session = yield* SessionNs.Service
const received = yield* Deferred.make<SessionNs.Info>()

const info = await create({})
await new Promise((resolve) => setTimeout(resolve, 100))
unsub()

expect(eventReceived).toBe(true)
expect(receivedInfo).toBeDefined()
expect(receivedInfo?.id).toBe(info.id)
expect(receivedInfo?.projectID).toBe(info.projectID)
expect(receivedInfo?.directory).toBe(info.directory)
expect(receivedInfo?.path).toBe(info.path)
expect(receivedInfo?.title).toBe(info.title)

await remove(info.id)
},
})
})

test("session.created event should be emitted before session.updated", async () => {
await WithInstance.provide({
directory: projectRoot,
fn: async () => {
const events: string[] = []

const unsubCreated = Bus.subscribe(SessionNs.Event.Created, () => {
events.push("created")
})
const unsub = subscribeGlobal(SessionNs.Event.Created.type, (event) => {
Deferred.doneUnsafe(received, Effect.succeed(event.properties.info as SessionNs.Info))
})
yield* Effect.addFinalizer(() => Effect.sync(unsub))

const unsubUpdated = Bus.subscribe(SessionNs.Event.Updated, () => {
events.push("updated")
})
const info = yield* session.create({})
const receivedInfo = yield* awaitDeferred(received, "timed out waiting for session.created")

const info = await create({})
await new Promise((resolve) => setTimeout(resolve, 100))
unsubCreated()
unsubUpdated()
expect(receivedInfo.id).toBe(info.id)
expect(receivedInfo.projectID).toBe(info.projectID)
expect(receivedInfo.directory).toBe(info.directory)
expect(receivedInfo.path).toBe(info.path)
expect(receivedInfo.title).toBe(info.title)

expect(events).toContain("created")
expect(events).toContain("updated")
expect(events.indexOf("created")).toBeLessThan(events.indexOf("updated"))
yield* session.remove(info.id)
}),
)

it.instance("session.created event should be emitted before session.updated", () =>
Effect.gen(function* () {
if (Flag.OPENCODE_EXPERIMENTAL_WORKSPACES) return

const session = yield* SessionNs.Service
const events: string[] = []
const received = yield* Deferred.make<string[]>()
const push = (event: string) => {
events.push(event)
if (events.includes("created") && events.includes("updated")) {
Deferred.doneUnsafe(received, Effect.succeed(events))
}
}

const unsubCreated = subscribeGlobal(SessionNs.Event.Created.type, () => {
push("created")
})
yield* Effect.addFinalizer(() => Effect.sync(unsubCreated))

await remove(info.id)
},
})
})
const unsubUpdated = subscribeGlobal(SessionNs.Event.Updated.type, () => {
push("updated")
})
yield* Effect.addFinalizer(() => Effect.sync(unsubUpdated))

const info = yield* session.create({})
const receivedEvents = yield* awaitDeferred(received, "timed out waiting for session created/updated events")

expect(receivedEvents).toContain("created")
expect(receivedEvents).toContain("updated")
expect(receivedEvents.indexOf("created")).toBeLessThan(receivedEvents.indexOf("updated"))

yield* session.remove(info.id)
}),
)
})

describe("step-finish token propagation via Bus event", () => {
test(
it.instance(
"non-zero tokens propagate through PartUpdated event",
async () => {
await WithInstance.provide({
directory: projectRoot,
fn: async () => {
const info = await create({})

const messageID = MessageID.ascending()
await updateMessage({
id: messageID,
sessionID: info.id,
role: "user",
time: { created: Date.now() },
agent: "user",
model: { providerID: "test", modelID: "test" },
tools: {},
mode: "",
} as unknown as MessageV2.Info)

// Bus subscribers receive readonly Schema.Type payloads; `MessageV2.Part`
// is the mutable domain type. Cast bridges the two — safe because the
// test only reads the value afterwards.
let received: MessageV2.Part | undefined
const unsub = Bus.subscribe(MessageV2.Event.PartUpdated, (event) => {
received = event.properties.part as MessageV2.Part
})

const tokens = {
total: 1500,
input: 500,
output: 800,
reasoning: 200,
cache: { read: 100, write: 50 },
}

const partInput = {
id: PartID.ascending(),
messageID,
sessionID: info.id,
type: "step-finish" as const,
reason: "stop",
cost: 0.005,
tokens,
}

await updatePart(partInput)
await new Promise((resolve) => setTimeout(resolve, 100))

expect(received).toBeDefined()
expect(received!.type).toBe("step-finish")
const finish = received as MessageV2.StepFinishPart
expect(finish.tokens.input).toBe(500)
expect(finish.tokens.output).toBe(800)
expect(finish.tokens.reasoning).toBe(200)
expect(finish.tokens.total).toBe(1500)
expect(finish.tokens.cache.read).toBe(100)
expect(finish.tokens.cache.write).toBe(50)
expect(finish.cost).toBe(0.005)
expect(received).not.toBe(partInput)

unsub()
await remove(info.id)
},
})
},
() =>
Effect.gen(function* () {
const session = yield* SessionNs.Service
const info = yield* session.create({})

const messageID = MessageID.ascending()
yield* session.updateMessage({
id: messageID,
sessionID: info.id,
role: "user",
time: { created: Date.now() },
agent: "user",
model: { providerID: "test", modelID: "test" },
tools: {},
mode: "",
} as unknown as MessageV2.Info)

// Bus subscribers receive readonly Schema.Type payloads; `MessageV2.Part`
// is the mutable domain type. Cast bridges the two — safe because the
// test only reads the value afterwards.
const received = yield* Deferred.make<MessageV2.Part>()
const unsub = subscribeGlobal(MessageV2.Event.PartUpdated.type, (event) => {
Deferred.doneUnsafe(received, Effect.succeed(event.properties.part as MessageV2.Part))
})
yield* Effect.addFinalizer(() => Effect.sync(unsub))

const tokens = {
total: 1500,
input: 500,
output: 800,
reasoning: 200,
cache: { read: 100, write: 50 },
}

const partInput = {
id: PartID.ascending(),
messageID,
sessionID: info.id,
type: "step-finish" as const,
reason: "stop",
cost: 0.005,
tokens,
}

yield* session.updatePart(partInput)
const receivedPart = yield* awaitDeferred(received, "timed out waiting for message.part.updated")

expect(receivedPart.type).toBe("step-finish")
const finish = receivedPart as MessageV2.StepFinishPart
expect(finish.tokens.input).toBe(500)
expect(finish.tokens.output).toBe(800)
expect(finish.tokens.reasoning).toBe(200)
expect(finish.tokens.total).toBe(1500)
expect(finish.tokens.cache.read).toBe(100)
expect(finish.tokens.cache.write).toBe(50)
expect(finish.cost).toBe(0.005)
expect(receivedPart).not.toBe(partInput)

yield* session.remove(info.id)
}),
{ timeout: 30000 },
)
})

describe("Session", () => {
test("remove works without an instance", async () => {
await using tmp = await tmpdir({ git: true })

const info = await WithInstance.provide({
directory: tmp.path,
fn: () => create({ title: "remove-without-instance" }),
})

await expect(async () => {
await remove(info.id)
}).not.toThrow()

let missing = false
await get(info.id).catch(() => {
missing = true
})

expect(missing).toBe(true)
})
it.live("remove works without an instance", () =>
Effect.gen(function* () {
const session = yield* SessionNs.Service
const dir = yield* tmpdirScoped({ git: true })
const info = yield* provideInstance(dir)(session.create({ title: "remove-without-instance" }))

const removeExit = yield* remove(info.id).pipe(Effect.exit)
expect(Exit.isSuccess(removeExit)).toBe(true)

const getExit = yield* session.get(info.id).pipe(Effect.exit)
expect(Exit.isFailure(getExit)).toBe(true)
}),
)
})
Loading