From ffca204bf9300a898ded5e4923eb2747149d0275 Mon Sep 17 00:00:00 2001 From: Lalit Kapoor Date: Mon, 4 May 2026 21:36:19 -0400 Subject: [PATCH] feat(db): add keyed collection subscriptions --- .changeset/chatty-files-hear.md | 5 + packages/db/src/collection/changes.ts | 73 +++++++ packages/db/src/collection/index.ts | 27 +++ .../collection-subscribe-changes.test.ts | 183 ++++++++++++++++++ packages/db/tests/collection.test-d.ts | 24 ++- 5 files changed, 311 insertions(+), 1 deletion(-) create mode 100644 .changeset/chatty-files-hear.md diff --git a/.changeset/chatty-files-hear.md b/.changeset/chatty-files-hear.md new file mode 100644 index 0000000000..0fba67b524 --- /dev/null +++ b/.changeset/chatty-files-hear.md @@ -0,0 +1,5 @@ +--- +"@tanstack/db": patch +--- + +Add `subscribeKeyChanges` for subscribing to future changes for a single collection key. diff --git a/packages/db/src/collection/changes.ts b/packages/db/src/collection/changes.ts index dc07cd3f18..80c0f7d1b7 100644 --- a/packages/db/src/collection/changes.ts +++ b/packages/db/src/collection/changes.ts @@ -27,6 +27,7 @@ export class CollectionChangesManager< public activeSubscribersCount = 0 public changeSubscriptions = new Set() + private keyChangeSubscriptions = new Map>() public batchedEvents: Array> = [] public shouldBatchEvents = false @@ -112,6 +113,39 @@ export class CollectionChangesManager< for (const subscription of this.changeSubscriptions) { subscription.emitEvents(enrichedEvents) } + + this.emitKeyChangeEvents(enrichedEvents) + } + + private emitKeyChangeEvents( + enrichedEvents: Array, TKey>>, + ): void { + const changesBySubscription = new Map< + CollectionSubscription, + Array, TKey>> + >() + + for (const change of enrichedEvents) { + const subscriptions = this.keyChangeSubscriptions.get(change.key) + + if (!subscriptions) { + continue + } + + for (const subscription of subscriptions) { + const existingChanges = changesBySubscription.get(subscription) + + if (existingChanges) { + existingChanges.push(change) + } else { + changesBySubscription.set(subscription, [change]) + } + } + } + + for (const [subscription, changes] of changesBySubscription) { + subscription.emitEvents(changes) + } } /** @@ -176,6 +210,45 @@ export class CollectionChangesManager< return subscription } + /** + * Subscribe to future changes for a single collection key. + */ + public subscribeKeyChanges( + key: TKey, + callback: ( + changes: Array, TKey>>, + ) => void, + ): CollectionSubscription { + const subscription = new CollectionSubscription(this.collection, callback, { + onUnsubscribe: () => { + this.removeSubscriber() + + const subscriptions = this.keyChangeSubscriptions.get(key) + if (!subscriptions) { + return + } + + subscriptions.delete(subscription) + + if (subscriptions.size === 0) { + this.keyChangeSubscriptions.delete(key) + } + }, + }) + + // Key subscriptions do not request a snapshot. Callers can read the current + // row with collection.get(key), then use this method for future row changes. + subscription.markAllStateAsSeen() + + const subscriptions = this.keyChangeSubscriptions.get(key) ?? new Set() + subscriptions.add(subscription) + this.keyChangeSubscriptions.set(key, subscriptions) + + this.addSubscriber() + + return subscription + } + /** * Increment the active subscribers count and start sync if needed */ diff --git a/packages/db/src/collection/index.ts b/packages/db/src/collection/index.ts index e51eb998d6..66e59c1ec5 100644 --- a/packages/db/src/collection/index.ts +++ b/packages/db/src/collection/index.ts @@ -941,6 +941,33 @@ export class CollectionImpl< return this._changes.subscribeChanges(callback, options) } + /** + * Subscribe to future changes for a single collection key. + * + * This does not emit the current row. Use collection.get(key) to read the + * current value, then subscribeKeyChanges(key, callback) to react to future + * inserts, updates, and deletes for that key. + * + * @example + * const currentTodo = todos.get("todo-1") + * + * const subscription = todos.subscribeKeyChanges("todo-1", (changes) => { + * for (const change of changes) { + * console.log(change.type, change.value) + * } + * }) + * + * // Later: subscription.unsubscribe() + */ + public subscribeKeyChanges( + key: TKey, + callback: ( + changes: Array, TKey>>, + ) => void, + ): CollectionSubscription { + return this._changes.subscribeKeyChanges(key, callback) + } + /** * Subscribe to a collection event */ diff --git a/packages/db/tests/collection-subscribe-changes.test.ts b/packages/db/tests/collection-subscribe-changes.test.ts index 4f851f08a7..bf0070dcc2 100644 --- a/packages/db/tests/collection-subscribe-changes.test.ts +++ b/packages/db/tests/collection-subscribe-changes.test.ts @@ -20,6 +20,11 @@ import type { // Helper function to wait for changes to be processed const waitForChanges = () => new Promise((resolve) => setTimeout(resolve, 10)) +type SyncFunctions = Pick< + Parameters[`sync`]>[0], + `begin` | `write` | `commit` | `markReady` +> + const normalizeChange = >( change: ChangeMessage, ): ChangeMessage => ({ @@ -2154,6 +2159,184 @@ describe(`Collection.subscribeChanges`, () => { }) }) +describe(`Collection.subscribeKeyChanges`, () => { + type TestItem = { id: string; value: string } + + const createManualSyncCollection = (id: string) => { + let syncFns: SyncFunctions | undefined + + const collection = createCollection({ + id, + getKey: (item) => item.id, + sync: { + sync: (params) => { + syncFns = params + params.markReady() + }, + }, + }) + + collection.startSyncImmediate() + + if (!syncFns) { + throw new Error(`Sync functions were not initialized`) + } + + return { collection, syncFns } + } + + const writeSyncedChanges = ( + syncFns: SyncFunctions, + changes: Array, `key`>>, + ) => { + syncFns.begin() + + for (const change of changes) { + syncFns.write(change) + } + + syncFns.commit() + } + + it(`should emit only future changes for the subscribed key`, () => { + const { collection, syncFns } = createManualSyncCollection( + `subscribe-key-changes-filter-test`, + ) + const changes: Array< + ChangeMessage, string> + > = [] + + const subscription = collection.subscribeKeyChanges(`row-1`, (events) => { + changes.push(...events) + }) + + writeSyncedChanges(syncFns, [ + { type: `insert`, value: { id: `row-1`, value: `one` } }, + { type: `insert`, value: { id: `row-2`, value: `two` } }, + ]) + + expect(changes).toHaveLength(1) + expect(changes[0]).toMatchObject({ + type: `insert`, + key: `row-1`, + value: { id: `row-1`, value: `one` }, + }) + + changes.length = 0 + + writeSyncedChanges(syncFns, [ + { type: `update`, value: { id: `row-2`, value: `two updated` } }, + ]) + + expect(changes).toEqual([]) + + writeSyncedChanges(syncFns, [ + { type: `update`, value: { id: `row-1`, value: `one updated` } }, + ]) + + expect(changes).toHaveLength(1) + expect(changes[0]).toMatchObject({ + type: `update`, + key: `row-1`, + value: { id: `row-1`, value: `one updated` }, + }) + + subscription.unsubscribe() + }) + + it(`should emit matching sync changes written while the subscription starts sync`, () => { + const changes: Array< + ChangeMessage, string> + > = [] + + const collection = createCollection({ + id: `subscribe-key-changes-start-sync-test`, + getKey: (item) => item.id, + sync: { + sync: ({ begin, write, commit, markReady }) => { + begin() + write({ + type: `insert`, + value: { id: `row-1`, value: `one` }, + }) + write({ + type: `insert`, + value: { id: `row-2`, value: `two` }, + }) + commit() + markReady() + }, + }, + }) + + const subscription = collection.subscribeKeyChanges(`row-1`, (events) => { + changes.push(...events) + }) + + expect(changes).toHaveLength(1) + expect(changes[0]).toMatchObject({ + type: `insert`, + key: `row-1`, + value: { id: `row-1`, value: `one` }, + }) + + subscription.unsubscribe() + }) + + it(`should not emit an initial snapshot and should still emit future deletes`, () => { + const { collection, syncFns } = createManualSyncCollection( + `subscribe-key-changes-delete-test`, + ) + const changes: Array< + ChangeMessage, string> + > = [] + + writeSyncedChanges(syncFns, [ + { type: `insert`, value: { id: `row-1`, value: `one` } }, + ]) + + const subscription = collection.subscribeKeyChanges(`row-1`, (events) => { + changes.push(...events) + }) + + expect(changes).toEqual([]) + + writeSyncedChanges(syncFns, [ + { type: `delete`, value: { id: `row-1`, value: `one` } }, + ]) + + expect(changes).toHaveLength(1) + expect(changes[0]).toMatchObject({ + type: `delete`, + key: `row-1`, + value: { id: `row-1`, value: `one` }, + }) + + subscription.unsubscribe() + }) + + it(`should stop emitting changes after unsubscribe`, () => { + const { collection, syncFns } = createManualSyncCollection( + `subscribe-key-changes-unsubscribe-test`, + ) + const changes: Array< + ChangeMessage, string> + > = [] + + const subscription = collection.subscribeKeyChanges(`row-1`, (events) => { + changes.push(...events) + }) + + subscription.unsubscribe() + + writeSyncedChanges(syncFns, [ + { type: `insert`, value: { id: `row-1`, value: `one` } }, + ]) + + expect(changes).toEqual([]) + }) +}) + describe(`Virtual properties`, () => { it(`should include virtual properties in change messages`, async () => { const changes: Array< diff --git a/packages/db/tests/collection.test-d.ts b/packages/db/tests/collection.test-d.ts index 32edff2f8d..3241d502ff 100644 --- a/packages/db/tests/collection.test-d.ts +++ b/packages/db/tests/collection.test-d.ts @@ -2,7 +2,7 @@ import { assertType, describe, expectTypeOf, it } from 'vitest' import { z } from 'zod' import { createCollection } from '../src/collection/index.js' import type { OutputWithVirtual } from './utils' -import type { OperationConfig } from '../src/types' +import type { ChangeMessage, OperationConfig } from '../src/types' import type { StandardSchemaV1 } from '@standard-schema/spec' describe(`Collection.update type tests`, () => { @@ -49,6 +49,28 @@ describe(`Collection.update type tests`, () => { }) }) +describe(`Collection.subscribeKeyChanges type tests`, () => { + type TypeTestItem = { id: string; value: number } + + const testCollection = createCollection({ + getKey: (item) => item.id, + sync: { sync: () => {} }, + }) + + it(`should type callback changes by collection key and output`, () => { + const subscription = testCollection.subscribeKeyChanges( + `id1`, + (changes) => { + expectTypeOf(changes).toEqualTypeOf< + Array, string>> + >() + }, + ) + + expectTypeOf(subscription.unsubscribe).toEqualTypeOf<() => void>() + }) +}) + describe(`Collection type resolution tests`, () => { // Define test types type ExplicitType = { id: string; explicit: boolean }