diff --git a/packages/appkit/src/connectors/index.ts b/packages/appkit/src/connectors/index.ts index 41e7748c..54a24fa4 100644 --- a/packages/appkit/src/connectors/index.ts +++ b/packages/appkit/src/connectors/index.ts @@ -3,3 +3,4 @@ export * from "./genie"; export * from "./lakebase"; export * from "./lakebase-v1"; export * from "./sql-warehouse"; +export * from "./vector-search"; diff --git a/packages/appkit/src/connectors/vector-search/client.ts b/packages/appkit/src/connectors/vector-search/client.ts new file mode 100644 index 00000000..264a2dc6 --- /dev/null +++ b/packages/appkit/src/connectors/vector-search/client.ts @@ -0,0 +1,178 @@ +import { Context, type WorkspaceClient } from "@databricks/sdk-experimental"; +import type { TelemetryOptions } from "shared"; +import { createLogger } from "../../logging/logger"; +import { + type Span, + SpanKind, + SpanStatusCode, + TelemetryManager, +} from "../../telemetry"; +import type { TelemetryProvider } from "../../telemetry"; +import type { + VectorSearchConnectorConfig, + VsNextPageParams, + VsQueryParams, + VsRawResponse, +} from "./types"; + +const logger = createLogger("connectors:vector-search"); + +export class VectorSearchConnector { + private readonly config: Required; + private readonly telemetry: TelemetryProvider; + + constructor(config: VectorSearchConnectorConfig = {}) { + this.config = { + timeout: config.timeout ?? 30_000, + }; + this.telemetry = TelemetryManager.getProvider( + "vector-search", + config.telemetry, + ); + } + + async query( + workspaceClient: WorkspaceClient, + params: VsQueryParams, + signal?: AbortSignal, + ): Promise { + if (signal?.aborted) { + throw new Error("Query cancelled before execution"); + } + + const body: Record = { + columns: params.columns, + num_results: params.numResults, + query_type: params.queryType.toUpperCase(), + debug_level: 1, + }; + + if (params.queryText) body.query_text = params.queryText; + if (params.queryVector) body.query_vector = params.queryVector; + if (params.filters && Object.keys(params.filters).length > 0) { + body.filters = params.filters; + } + if (params.reranker) { + body.reranker = { + model: "databricks_reranker", + parameters: { columns_to_rerank: params.reranker.columnsToRerank }, + }; + } + + logger.debug( + "Querying VS index %s (type=%s, num_results=%d)", + params.indexName, + params.queryType, + params.numResults, + ); + + return this.telemetry.startActiveSpan( + "vector-search.query", + { + kind: SpanKind.CLIENT, + attributes: { + "db.system": "databricks", + "vs.index_name": params.indexName, + "vs.query_type": params.queryType, + "vs.num_results": params.numResults, + "vs.has_filters": !!( + params.filters && Object.keys(params.filters).length > 0 + ), + "vs.has_reranker": !!params.reranker, + }, + }, + async (span: Span) => { + const startTime = Date.now(); + try { + const response = (await workspaceClient.apiClient.request({ + method: "POST", + path: `/api/2.0/vector-search/indexes/${params.indexName}/query`, + body, + headers: new Headers({ "Content-Type": "application/json" }), + raw: false, + query: {}, + })) as VsRawResponse; + + const duration = Date.now() - startTime; + span.setAttribute("vs.result_count", response.result.row_count); + span.setAttribute("vs.query_time_ms", response.debug_info?.response_time ?? 0); + span.setAttribute("vs.duration_ms", duration); + span.setStatus({ code: SpanStatusCode.OK }); + + logger.event()?.setContext("vector-search", { + index_name: params.indexName, + query_type: params.queryType, + result_count: response.result.row_count, + query_time_ms: response.debug_info?.response_time ?? 0, + duration_ms: duration, + }); + + return response; + } catch (error) { + span.recordException(error as Error); + span.setStatus({ + code: SpanStatusCode.ERROR, + message: error instanceof Error ? error.message : String(error), + }); + throw error; + } + }, + { name: "vector-search", includePrefix: true }, + ); + } + + async queryNextPage( + workspaceClient: WorkspaceClient, + params: VsNextPageParams, + signal?: AbortSignal, + ): Promise { + if (signal?.aborted) { + throw new Error("Query cancelled before execution"); + } + + logger.debug( + "Fetching next page for index %s (endpoint=%s)", + params.indexName, + params.endpointName, + ); + + return this.telemetry.startActiveSpan( + "vector-search.queryNextPage", + { + kind: SpanKind.CLIENT, + attributes: { + "db.system": "databricks", + "vs.index_name": params.indexName, + "vs.endpoint_name": params.endpointName, + }, + }, + async (span: Span) => { + try { + const response = (await workspaceClient.apiClient.request({ + method: "POST", + path: `/api/2.0/vector-search/indexes/${params.indexName}/query-next-page`, + body: { + endpoint_name: params.endpointName, + page_token: params.pageToken, + }, + headers: new Headers({ "Content-Type": "application/json" }), + raw: false, + query: {}, + })) as VsRawResponse; + + span.setAttribute("vs.result_count", response.result.row_count); + span.setStatus({ code: SpanStatusCode.OK }); + return response; + } catch (error) { + span.recordException(error as Error); + span.setStatus({ + code: SpanStatusCode.ERROR, + message: error instanceof Error ? error.message : String(error), + }); + throw error; + } + }, + { name: "vector-search", includePrefix: true }, + ); + } +} diff --git a/packages/appkit/src/connectors/vector-search/index.ts b/packages/appkit/src/connectors/vector-search/index.ts new file mode 100644 index 00000000..d2ec2302 --- /dev/null +++ b/packages/appkit/src/connectors/vector-search/index.ts @@ -0,0 +1,2 @@ +export * from "./client"; +export * from "./types"; diff --git a/packages/appkit/src/connectors/vector-search/types.ts b/packages/appkit/src/connectors/vector-search/types.ts new file mode 100644 index 00000000..df042e8c --- /dev/null +++ b/packages/appkit/src/connectors/vector-search/types.ts @@ -0,0 +1,42 @@ +import type { TelemetryOptions } from "shared"; + +export interface VectorSearchConnectorConfig { + timeout?: number; + telemetry?: TelemetryOptions; +} + +export interface VsQueryParams { + indexName: string; + queryText?: string; + queryVector?: number[]; + columns: string[]; + numResults: number; + queryType: "ann" | "hybrid" | "full_text"; + filters?: Record; + reranker?: { columnsToRerank: string[] }; +} + +export interface VsNextPageParams { + indexName: string; + endpointName: string; + pageToken: string; +} + +export interface VsRawResponse { + manifest: { + column_count: number; + columns: Array<{ name: string; type?: string }>; + }; + result: { + row_count: number; + data_array: unknown[][]; + }; + next_page_token?: string | null; + debug_info?: { + response_time?: number; + ann_time?: number; + embedding_gen_time?: number; + latency_ms?: number; + [key: string]: unknown; + }; +} diff --git a/packages/appkit/src/plugins/index.ts b/packages/appkit/src/plugins/index.ts index 7caa040f..9a1819b3 100644 --- a/packages/appkit/src/plugins/index.ts +++ b/packages/appkit/src/plugins/index.ts @@ -3,3 +3,4 @@ export * from "./files"; export * from "./genie"; export * from "./lakebase"; export * from "./server"; +export * from "./vector-search"; diff --git a/packages/appkit/src/plugins/vector-search/defaults.ts b/packages/appkit/src/plugins/vector-search/defaults.ts new file mode 100644 index 00000000..c02b6e80 --- /dev/null +++ b/packages/appkit/src/plugins/vector-search/defaults.ts @@ -0,0 +1,7 @@ +import type { PluginExecuteConfig } from "shared"; + +export const vectorSearchDefaults: PluginExecuteConfig = { + cache: { enabled: false }, + retry: { enabled: true, initialDelay: 1000, attempts: 3 }, + timeout: 30_000, +}; diff --git a/packages/appkit/src/plugins/vector-search/index.ts b/packages/appkit/src/plugins/vector-search/index.ts new file mode 100644 index 00000000..9052cb03 --- /dev/null +++ b/packages/appkit/src/plugins/vector-search/index.ts @@ -0,0 +1,2 @@ +export * from "./vector-search"; +export * from "./types"; diff --git a/packages/appkit/src/plugins/vector-search/manifest.json b/packages/appkit/src/plugins/vector-search/manifest.json new file mode 100644 index 00000000..861876ff --- /dev/null +++ b/packages/appkit/src/plugins/vector-search/manifest.json @@ -0,0 +1,22 @@ +{ + "$schema": "https://databricks.github.io/appkit/schemas/plugin-manifest.schema.json", + "name": "vector-search", + "displayName": "Vector Search Plugin", + "description": "Query Databricks Vector Search indexes with built-in hybrid search, reranking, and pagination", + "resources": { + "required": [], + "optional": [] + }, + "config": { + "schema": { + "type": "object", + "properties": { + "timeout": { + "type": "number", + "default": 30000, + "description": "Query execution timeout in milliseconds" + } + } + } + } +} diff --git a/packages/appkit/src/plugins/vector-search/tests/vector-search.test.ts b/packages/appkit/src/plugins/vector-search/tests/vector-search.test.ts new file mode 100644 index 00000000..c320ccc2 --- /dev/null +++ b/packages/appkit/src/plugins/vector-search/tests/vector-search.test.ts @@ -0,0 +1,288 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; + +vi.mock("../../../context", () => ({ + getWorkspaceClient: vi.fn(() => mockWorkspaceClient), + getCurrentUserId: vi.fn(() => "test-user"), +})); + +vi.mock("../../../logging/logger", () => ({ + createLogger: () => ({ + debug: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + event: () => ({ + setComponent: vi.fn().mockReturnThis(), + setContext: vi.fn().mockReturnThis(), + }), + }), +})); + +vi.mock("../../../telemetry", () => ({ + TelemetryManager: { + getProvider: () => ({ + getTracer: () => ({}), + getMeter: () => ({ + createCounter: () => ({ add: vi.fn() }), + createHistogram: () => ({ record: vi.fn() }), + }), + }), + }, + normalizeTelemetryOptions: () => ({ traces: false, metrics: false }), +})); + +vi.mock("../../../cache", () => ({ + CacheManager: { + getInstanceSync: () => ({ get: vi.fn(), set: vi.fn() }), + }, +})); + +vi.mock("../../../app", () => ({ + AppManager: vi.fn().mockImplementation(() => ({})), +})); + +vi.mock("../../../plugin/dev-reader", () => ({ + DevFileReader: { + getInstance: () => ({}), + }, +})); + +vi.mock("../../../stream", () => ({ + StreamManager: vi.fn().mockImplementation(() => ({ + abortAll: vi.fn(), + stream: vi.fn(), + })), +})); + +const validVsResponse = { + manifest: { + column_count: 3, + columns: [{ name: "id" }, { name: "title" }, { name: "score" }], + }, + result: { + row_count: 2, + data_array: [ + [1, "ML Guide", 0.95], + [2, "AI Primer", 0.87], + ], + }, + next_page_token: null, + debug_info: { response_time: 35 }, +}; + +const mockRequest = vi.fn().mockResolvedValue(validVsResponse); +const mockWorkspaceClient = { + apiClient: { request: mockRequest }, +}; + +import { VectorSearchPlugin } from "../vector-search"; + +describe("VectorSearchPlugin", () => { + beforeEach(() => { + mockRequest.mockClear(); + mockRequest.mockResolvedValue(validVsResponse); + }); + + describe("setup()", () => { + it("throws if any index is missing indexName", async () => { + const plugin = new VectorSearchPlugin({ + indexes: { + test: { indexName: "", columns: ["id"] }, + }, + }); + await expect(plugin.setup()).rejects.toThrow("indexName"); + }); + + it("throws if any index is missing columns", async () => { + const plugin = new VectorSearchPlugin({ + indexes: { + test: { indexName: "cat.sch.idx", columns: [] }, + }, + }); + await expect(plugin.setup()).rejects.toThrow("columns"); + }); + + it("throws if pagination enabled but no endpointName", async () => { + const plugin = new VectorSearchPlugin({ + indexes: { + test: { + indexName: "cat.sch.idx", + columns: ["id"], + pagination: true, + }, + }, + }); + await expect(plugin.setup()).rejects.toThrow("endpointName"); + }); + + it("succeeds with valid config", async () => { + const plugin = new VectorSearchPlugin({ + indexes: { + products: { + indexName: "cat.sch.products_idx", + columns: ["id", "name", "description"], + queryType: "hybrid", + numResults: 20, + }, + }, + }); + await expect(plugin.setup()).resolves.not.toThrow(); + }); + }); + + describe("manifest", () => { + it("has correct name", () => { + expect(VectorSearchPlugin.manifest.name).toBe("vector-search"); + }); + }); + + describe("exports()", () => { + it("returns object with query function", () => { + const plugin = new VectorSearchPlugin({ + indexes: { + test: { indexName: "cat.sch.idx", columns: ["id"] }, + }, + }); + const exports = plugin.exports(); + expect(exports).toHaveProperty("query"); + expect(typeof exports.query).toBe("function"); + }); + }); + + describe("query()", () => { + it("calls VS API via connector and parses response", async () => { + const plugin = new VectorSearchPlugin({ + indexes: { + products: { + indexName: "cat.sch.products", + columns: ["id", "title"], + queryType: "hybrid", + }, + }, + }); + await plugin.setup(); + + const result = await plugin.query("products", { + queryText: "machine learning", + }); + + expect(result.results).toHaveLength(2); + expect(result.results[0].score).toBe(0.95); + expect(result.results[0].data).toEqual({ id: 1, title: "ML Guide" }); + expect(result.results[1].score).toBe(0.87); + expect(result.totalCount).toBe(2); + expect(result.queryTimeMs).toBe(35); + }); + + it("constructs correct API request", async () => { + const plugin = new VectorSearchPlugin({ + indexes: { + test: { + indexName: "cat.sch.idx", + columns: ["id", "title"], + queryType: "hybrid", + numResults: 10, + }, + }, + }); + await plugin.setup(); + await plugin.query("test", { queryText: "test query" }); + + expect(mockRequest).toHaveBeenCalledWith( + expect.objectContaining({ + method: "POST", + path: "/api/2.0/vector-search/indexes/cat.sch.idx/query", + }), + ); + + const callBody = mockRequest.mock.calls[0][0].body; + expect(callBody.query_text).toBe("test query"); + expect(callBody.query_type).toBe("HYBRID"); + expect(callBody.num_results).toBe(10); + expect(callBody.columns).toEqual(["id", "title"]); + }); + + it("throws INDEX_NOT_FOUND for unknown alias", async () => { + const plugin = new VectorSearchPlugin({ + indexes: { + test: { indexName: "cat.sch.idx", columns: ["id"] }, + }, + }); + await plugin.setup(); + + await expect(plugin.query("unknown", { queryText: "test" })).rejects + .toMatchObject({ code: "INDEX_NOT_FOUND" }); + }); + + it("includes filters when provided", async () => { + const plugin = new VectorSearchPlugin({ + indexes: { + test: { + indexName: "cat.sch.idx", + columns: ["id", "title"], + }, + }, + }); + await plugin.setup(); + await plugin.query("test", { + queryText: "test", + filters: { category: ["books"] }, + }); + + const callBody = mockRequest.mock.calls[0][0].body; + expect(callBody.filters).toEqual({ category: ["books"] }); + }); + + it("includes reranker config when enabled on index", async () => { + const plugin = new VectorSearchPlugin({ + indexes: { + test: { + indexName: "cat.sch.idx", + columns: ["id", "title", "desc"], + reranker: true, + }, + }, + }); + await plugin.setup(); + await plugin.query("test", { queryText: "test" }); + + const callBody = mockRequest.mock.calls[0][0].body; + expect(callBody.reranker.model).toBe("databricks_reranker"); + expect(callBody.reranker.parameters.columns_to_rerank).toEqual([ + "title", + "desc", + ]); + }); + + it("calls embeddingFn for self-managed indexes", async () => { + const mockEmbeddingFn = vi.fn().mockResolvedValue([0.1, 0.2, 0.3]); + const plugin = new VectorSearchPlugin({ + indexes: { + test: { + indexName: "cat.sch.idx", + columns: ["id", "title"], + embeddingFn: mockEmbeddingFn, + }, + }, + }); + await plugin.setup(); + await plugin.query("test", { queryText: "test" }); + + expect(mockEmbeddingFn).toHaveBeenCalledWith("test"); + const callBody = mockRequest.mock.calls[0][0].body; + expect(callBody.query_vector).toEqual([0.1, 0.2, 0.3]); + expect(callBody.query_text).toBeUndefined(); + }); + }); + + describe("shutdown()", () => { + it("does not throw", async () => { + const plugin = new VectorSearchPlugin({ + indexes: { + test: { indexName: "cat.sch.idx", columns: ["id"] }, + }, + }); + await expect(plugin.shutdown()).resolves.not.toThrow(); + }); + }); +}); diff --git a/packages/appkit/src/plugins/vector-search/types.ts b/packages/appkit/src/plugins/vector-search/types.ts new file mode 100644 index 00000000..77e1287d --- /dev/null +++ b/packages/appkit/src/plugins/vector-search/types.ts @@ -0,0 +1,79 @@ +import type { BasePluginConfig } from "shared"; + +export interface IVectorSearchConfig extends BasePluginConfig { + timeout?: number; + indexes: Record; +} + +export interface IndexConfig { + /** Three-level UC name: catalog.schema.index_name */ + indexName: string; + /** Columns to return in results */ + columns: string[]; + /** Default search mode */ + queryType?: "ann" | "hybrid" | "full_text"; + /** Max results per query */ + numResults?: number; + /** Enable built-in reranker. Pass true to rerank all non-id columns, or an object for fine control. */ + reranker?: boolean | RerankerConfig; + /** Auth mode — "service-principal" uses the app's SP, "on-behalf-of-user" proxies the logged-in user's token */ + auth?: "service-principal" | "on-behalf-of-user"; + /** Enable cursor pagination */ + pagination?: boolean; + /** VS endpoint name (required when pagination is true) */ + endpointName?: string; + /** + * For self-managed embedding indexes: converts query text to an embedding vector. + * When provided, the plugin calls this function and sends query_vector to VS. + * When omitted, query_text is sent and VS computes embeddings server-side (managed mode). + */ + embeddingFn?: (text: string) => Promise; +} + +export interface RerankerConfig { + columnsToRerank: string[]; +} + +export type SearchFilters = Record< + string, + string | number | boolean | (string | number)[] +>; + +export interface SearchRequest { + queryText?: string; + queryVector?: number[]; + columns?: string[]; + numResults?: number; + queryType?: "ann" | "hybrid" | "full_text"; + filters?: SearchFilters; + reranker?: boolean; +} + +export interface SearchResponse< + T extends Record = Record, +> { + results: SearchResult[]; + totalCount: number; + queryTimeMs: number; + queryType: "ann" | "hybrid" | "full_text"; + fromCache: boolean; + nextPageToken: string | null; +} + +export interface SearchResult< + T extends Record = Record, +> { + score: number; + data: T; +} + +export interface SearchError { + code: + | "UNAUTHORIZED" + | "INDEX_NOT_FOUND" + | "INVALID_QUERY" + | "RATE_LIMITED" + | "INTERNAL"; + message: string; + statusCode: number; +} diff --git a/packages/appkit/src/plugins/vector-search/vector-search.ts b/packages/appkit/src/plugins/vector-search/vector-search.ts new file mode 100644 index 00000000..117f3e46 --- /dev/null +++ b/packages/appkit/src/plugins/vector-search/vector-search.ts @@ -0,0 +1,354 @@ +import type express from "express"; +import type { IAppRouter } from "shared"; +import { VectorSearchConnector } from "../../connectors"; +import { getWorkspaceClient } from "../../context"; +import { createLogger } from "../../logging/logger"; +import { Plugin, toPlugin } from "../../plugin"; +import type { PluginManifest } from "../../registry"; +import type { VsRawResponse } from "../../connectors/vector-search/types"; +import manifest from "./manifest.json"; +import type { + IVectorSearchConfig, + IndexConfig, + SearchRequest, + SearchResponse, +} from "./types"; + +const logger = createLogger("vector-search"); + +export class VectorSearchPlugin extends Plugin { + static manifest = manifest as PluginManifest<"vector-search">; + + protected static description = + "Query Databricks Vector Search indexes with hybrid search, reranking, and pagination"; + protected declare config: IVectorSearchConfig; + + private connector: VectorSearchConnector; + + constructor(config: IVectorSearchConfig) { + super(config); + this.config = config; + this.connector = new VectorSearchConnector({ + timeout: config.timeout, + telemetry: config.telemetry, + }); + } + + async setup(): Promise { + for (const [alias, idx] of Object.entries(this.config.indexes)) { + if (!idx.indexName) { + throw new Error( + `Index "${alias}" is missing required field "indexName"`, + ); + } + if (!idx.columns || idx.columns.length === 0) { + throw new Error( + `Index "${alias}" is missing required field "columns"`, + ); + } + if (idx.pagination && !idx.endpointName) { + throw new Error( + `Index "${alias}" has pagination enabled but is missing "endpointName"`, + ); + } + } + logger.debug( + "Vector Search plugin configured with %d index(es)", + Object.keys(this.config.indexes).length, + ); + } + + injectRoutes(router: IAppRouter) { + this.route(router, { + name: "query", + method: "post", + path: "/:alias/query", + handler: async (req: express.Request, res: express.Response) => { + const indexConfig = this._resolveIndex(req.params.alias); + if (!indexConfig) { + res.status(404).json({ + code: "INDEX_NOT_FOUND", + message: `No index configured with alias "${req.params.alias}"`, + statusCode: 404, + }); + return; + } + + if (indexConfig.auth === "on-behalf-of-user") { + await this.asUser(req)._handleQuery(req, res, indexConfig); + } else { + await this._handleQuery(req, res, indexConfig); + } + }, + }); + + this.route(router, { + name: "queryNextPage", + method: "post", + path: "/:alias/next-page", + handler: async (req: express.Request, res: express.Response) => { + const indexConfig = this._resolveIndex(req.params.alias); + if (!indexConfig) { + res.status(404).json({ + code: "INDEX_NOT_FOUND", + message: `No index configured with alias "${req.params.alias}"`, + statusCode: 404, + }); + return; + } + + if (indexConfig.auth === "on-behalf-of-user") { + await this.asUser(req)._handleNextPage(req, res, indexConfig); + } else { + await this._handleNextPage(req, res, indexConfig); + } + }, + }); + + this.route(router, { + name: "getConfig", + method: "get", + path: "/:alias/config", + handler: (req: express.Request, res: express.Response) => { + const { alias } = req.params; + const indexConfig = this._resolveIndex(alias); + if (!indexConfig) { + res.status(404).json({ + code: "INDEX_NOT_FOUND", + message: `No index configured with alias "${alias}"`, + statusCode: 404, + }); + return; + } + res.json({ + alias, + columns: indexConfig.columns, + queryType: indexConfig.queryType ?? "hybrid", + numResults: indexConfig.numResults ?? 20, + reranker: !!indexConfig.reranker, + pagination: !!indexConfig.pagination, + }); + }, + }); + } + + async _handleQuery( + req: express.Request, + res: express.Response, + indexConfig: IndexConfig, + ): Promise { + const body: SearchRequest = req.body; + + if (!body.queryText && !body.queryVector) { + res.status(400).json({ + code: "INVALID_QUERY", + message: "queryText or queryVector is required", + statusCode: 400, + }); + return; + } + + const event = logger.event(req); + event + ?.setComponent("vector-search", "query") + .setContext("vector-search", { + index: indexConfig.indexName, + query_type: body.queryType ?? indexConfig.queryType ?? "hybrid", + plugin: this.name, + }); + + const queryType = body.queryType ?? indexConfig.queryType ?? "hybrid"; + let queryText = body.queryText; + let queryVector = body.queryVector; + + if (indexConfig.embeddingFn && queryText && !queryVector) { + queryVector = await indexConfig.embeddingFn(queryText); + queryText = undefined; + } + + const rerankerConfig = this._resolveReranker( + body.reranker, + indexConfig, + body.columns ?? indexConfig.columns, + ); + + try { + const workspaceClient = getWorkspaceClient(); + const raw = await this.connector.query( + workspaceClient, + { + indexName: indexConfig.indexName, + queryText, + queryVector, + columns: body.columns ?? indexConfig.columns, + numResults: body.numResults ?? indexConfig.numResults ?? 20, + queryType, + filters: body.filters, + reranker: rerankerConfig, + }, + ); + res.json(this._parseResponse(raw, queryType)); + } catch (error) { + logger.error("Vector search query failed: %O", error); + const statusCode = + (error as { statusCode?: number }).statusCode ?? 500; + res.status(statusCode).json({ + code: (error as { code?: string }).code ?? "INTERNAL", + message: + error instanceof Error ? error.message : "Query execution failed", + statusCode, + }); + } + } + + async _handleNextPage( + req: express.Request, + res: express.Response, + indexConfig: IndexConfig, + ): Promise { + if (!indexConfig.pagination) { + res.status(400).json({ + code: "INVALID_QUERY", + message: `Pagination is not enabled for index "${req.params.alias}"`, + statusCode: 400, + }); + return; + } + + const { pageToken } = req.body; + if (!pageToken) { + res.status(400).json({ + code: "INVALID_QUERY", + message: "pageToken is required", + statusCode: 400, + }); + return; + } + + try { + const workspaceClient = getWorkspaceClient(); + const raw = await this.connector.queryNextPage( + workspaceClient, + { + indexName: indexConfig.indexName, + endpointName: indexConfig.endpointName!, + pageToken, + }, + ); + res.json(this._parseResponse(raw, "hybrid")); + } catch (error) { + logger.error("Vector search next-page query failed: %O", error); + const statusCode = + (error as { statusCode?: number }).statusCode ?? 500; + res.status(statusCode).json({ + code: (error as { code?: string }).code ?? "INTERNAL", + message: + error instanceof Error ? error.message : "Next-page query failed", + statusCode, + }); + } + } + + /** + * Programmatic query API — available as `appkit.vectorSearch.query()`. + * When called through `asUser(req)`, executes with the user's credentials. + */ + async query(alias: string, request: SearchRequest): Promise { + const indexConfig = this._resolveIndex(alias); + if (!indexConfig) { + throw { + code: "INDEX_NOT_FOUND" as const, + message: `No index configured with alias "${alias}"`, + statusCode: 404, + }; + } + + const queryType = request.queryType ?? indexConfig.queryType ?? "hybrid"; + let queryText = request.queryText; + let queryVector = request.queryVector; + + if (indexConfig.embeddingFn && queryText && !queryVector) { + queryVector = await indexConfig.embeddingFn(queryText); + queryText = undefined; + } + + const rerankerConfig = this._resolveReranker( + request.reranker, + indexConfig, + request.columns ?? indexConfig.columns, + ); + + const workspaceClient = getWorkspaceClient(); + const raw = await this.connector.query(workspaceClient, { + indexName: indexConfig.indexName, + queryText, + queryVector, + columns: request.columns ?? indexConfig.columns, + numResults: request.numResults ?? indexConfig.numResults ?? 20, + queryType, + filters: request.filters, + reranker: rerankerConfig, + }); + + return this._parseResponse(raw, queryType); + } + + async shutdown(): Promise { + this.streamManager.abortAll(); + } + + exports() { + return { + query: this.query.bind(this), + }; + } + + private _resolveIndex(alias: string): IndexConfig | undefined { + return this.config.indexes[alias]; + } + + private _resolveReranker( + requestReranker: boolean | undefined, + indexConfig: IndexConfig, + columns: string[], + ): { columnsToRerank: string[] } | undefined { + const shouldRerank = requestReranker ?? indexConfig.reranker; + if (!shouldRerank) return undefined; + + if (typeof indexConfig.reranker === "object") { + return indexConfig.reranker; + } + return { columnsToRerank: columns.filter((c) => c !== "id") }; + } + + private _parseResponse( + raw: VsRawResponse, + queryType: "ann" | "hybrid" | "full_text", + ): SearchResponse { + const columnNames = raw.manifest.columns.map((c) => c.name); + const scoreIndex = columnNames.indexOf("score"); + + const results = raw.result.data_array.map((row) => { + const data: Record = {}; + for (let i = 0; i < columnNames.length; i++) { + if (columnNames[i] !== "score") data[columnNames[i]] = row[i]; + } + return { + score: scoreIndex >= 0 ? (row[scoreIndex] as number) : 0, + data, + }; + }); + + return { + results, + totalCount: raw.result.row_count, + queryTimeMs: + raw.debug_info?.response_time ?? raw.debug_info?.latency_ms ?? 0, + queryType, + fromCache: false, + nextPageToken: raw.next_page_token ?? null, + }; + } +} + +export const vectorSearch = toPlugin(VectorSearchPlugin);