diff --git a/.changeset/ninety-actors-hide.md b/.changeset/ninety-actors-hide.md new file mode 100644 index 00000000..09bf8116 --- /dev/null +++ b/.changeset/ninety-actors-hide.md @@ -0,0 +1,11 @@ +--- +'@tanstack/angular-store': minor +'@tanstack/preact-store': minor +'@tanstack/react-store': minor +'@tanstack/solid-store': minor +'@tanstack/vue-store': minor +'@tanstack/store': minor +'@tanstack/svelte-store': minor +--- + +Added atom.whileWatched(cb), store.whileWatched(cb), createExternalStoreAtom diff --git a/docs/config.json b/docs/config.json index 7f9a3bdb..f423dbb5 100644 --- a/docs/config.json +++ b/docs/config.json @@ -225,6 +225,10 @@ "label": "StoreActionsFactory", "to": "reference/type-aliases/StoreActionsFactory" }, + { + "label": "WatchedEffect", + "to": "reference/type-aliases/WatchedEffect" + }, { "label": "batch", "to": "reference/functions/batch" @@ -237,6 +241,10 @@ "label": "createAtom", "to": "reference/functions/createAtom" }, + { + "label": "createExternalStoreAtom", + "to": "reference/functions/createExternalStoreAtom" + }, { "label": "createStore", "to": "reference/functions/createStore" diff --git a/packages/store/src/atom.ts b/packages/store/src/atom.ts index 067b8640..aebb62aa 100644 --- a/packages/store/src/atom.ts +++ b/packages/store/src/atom.ts @@ -1,6 +1,6 @@ import { ReactiveFlags, createReactiveSystem } from './alien' -import type { ReactiveNode } from './alien' +import type { Link, ReactiveNode } from './alien' import type { Atom, AtomOptions, @@ -26,16 +26,155 @@ export function toObserver( } } -interface InternalAtom extends ReactiveNode { +export type WatchedEffect = () => (() => void) | void | undefined + +interface WatchableNode extends ReactiveNode { + /** + * Reference count: number of direct subs that are alive + * When >0, the node is "alive" and its watch effects should be running + * When =0, the node is "dead", and its watch effects should be stopped + */ + _watches?: number + _watchEffects?: Array + _watchCleanups?: Array<(() => void) | void | undefined> +} + +function isWatched(node: WatchableNode): boolean { + return !!node._watches +} + +function addWatch(node: WatchableNode) { + node._watches ??= 0 + const prev = node._watches++ + + // On first watch, node becomes alive: + if (prev === 0) { + // 1. propagate liveness to deps + // We become alive *after* everything we depend on becomes watched. + // (set up dependencies first before us, the subscriber.) + let deps = node.deps + while (deps !== undefined) { + addWatch(deps.dep) + deps = deps.nextDep + } + + // 2. start/run own watch effects. + // Assign `_watchCleanups` BEFORE invoking any effect and seed it with + // packed `undefined` slots (avoid `new Array(len)` which gives a holey + // SMI array in V8). A re-entrant `whileWatched()` during `ef()` pushes + // into this same array at `i >= len`, keeping indices aligned with + // `_watchEffects`; the outer loop writes the pre-reserved slot by index. + const watchEffects = node._watchEffects + if (watchEffects?.length) { + const len = watchEffects.length + const cleanups: Array<(() => void) | void | undefined> = [] + node._watchCleanups = cleanups + for (let i = 0; i < len; i++) cleanups.push(undefined) + for (let i = 0; i < len; i++) { + const ef = watchEffects[i] + if (ef) cleanups[i] = ef() + } + } + } +} + +function removeWatch(node: WatchableNode) { + node._watches ??= 0 + const next = --node._watches + + // On last unwatch, node becomes dead: + if (next === 0) { + // 1. Clean up effects. + // Snapshot and clear `_watchCleanups` BEFORE invoking any cleanup, so a + // re-entrant subscribe during cleanup sees a consistent (empty) state and + // can rebuild cleanups into a fresh array via addWatch. + const cleanups = node._watchCleanups + node._watchCleanups = undefined + if (cleanups) { + for (let i = cleanups.length - 1; i >= 0; i--) { + cleanups[i]?.() + } + } + + // 2. propagate unwatch to deps. + // Capture `prevDep` BEFORE recursing, so cleanups that mutate the dep + // list can't redirect our traversal. + let deps = node.depsTail + while (deps !== undefined) { + const prev = deps.prevDep + removeWatch(deps.dep) + deps = prev + } + } else if (next < 0) { + throw new Error(`Unbalanced watch/unwatch led to negative watch count on ReactiveNode: ${next}`) + } +} + +/** + * Causes `fn` to be called when `node` becomes gains its first live subscriber. + * If `fn` returns a cleanup function, it will be called when `node` loses its last live subscriber. + */ +export function whileWatched(node: WatchableNode, fn: WatchedEffect) { + const initialEffects = (node._watchEffects ??= []) + initialEffects.push(fn) + if (node._watches) { + // Node is already watched, start effect immediately + const cleanups = (node._watchCleanups ??= []) + cleanups.push(fn()) + } + + return function removeWhileWatched() { + const stoppableEffects = node._watchEffects + if (!stoppableEffects) { + return + } + + const index = stoppableEffects.indexOf(fn) + if (index === -1) { + return + } + + stoppableEffects.splice(index, 1) + + if (node._watches) { + // If node is watched when we remove, + // also clean up the effect immediately + const watchCleanups = node._watchCleanups + if (watchCleanups?.length) { + const cleanup = watchCleanups[index] + cleanup?.() + watchCleanups.splice(index, 1) + if (watchCleanups.length === 0) { + node._watchCleanups = undefined + } + } + } + } +} + +/** + * Called when the atom is watched. + * Returns a cleanup function that will be called when the atom is unwatched. + */ + +interface InternalAtom extends WatchableNode { _snapshot: T _update: (getValue?: T | ((snapshot: T) => T)) => boolean + get: () => T subscribe: (observerOrFn: Observer | ((value: T) => void)) => Subscription + /** + * `effect` will be called while the atom is watched. `effect` may return a + * cleanup function, which will be called when the atom is unwatched. + * + * Returns a `stop` function which cancels the listener. + */ + whileWatched: (effect: WatchedEffect) => () => void } const queuedEffects: Array = [] let cycle = 0 -const { link, unlink, propagate, checkDirty, shallowPropagate } = +const { link: _link, unlink: _unlink, propagate, checkDirty, shallowPropagate } = createReactiveSystem({ update(atom: InternalAtom): boolean { return atom._update() @@ -54,6 +193,31 @@ const { link, unlink, propagate, checkDirty, shallowPropagate } = }, }) +function link(dep: ReactiveNode, sub: ReactiveNode, version: number) { + const originalTail = dep.subsTail + _link(dep, sub, version) + const newTail = dep.subsTail + + if (newTail && newTail !== originalTail && isWatched(sub)) { + // Propagate watch liveness from sub -> dep + addWatch(dep) + } +} + + +function unlink( + link: Link, + // sub must ALWAYS be link.sub, this arg is here for micro-optimization + sub: ReactiveNode = link.sub +): Link | undefined { + const dep = link.dep + if (isWatched(sub)) { + // Revoke liveness from this sub on dep when unlinked + removeWatch(dep) + } + return _unlink(link, sub) +} + let notifyIndex = 0 let queuedEffectsLength = 0 let activeSub: ReactiveNode | undefined @@ -135,6 +299,48 @@ export function createAsyncAtom( return atom } +/** + * Like React.useSyncExternalStore: pulls external state into an atom. + * This can be used for interoperating with other state management libraries. + * + * ```ts + * import * as redux from "redux" + * + * const reduxStore = redux.createStore((state: number, action: number) => state + action, 0) + * const atom = createExternalStoreAtom(reduxStore.getState, reduxStore.subscribe) + * + * const timesTwo = createAtom(() => atom.get() * 2) + * timesTwo.subscribe((value) => { + * console.log('timesTwo: ', value) + * }) + * + * reduxStore.dispatch(1) + * // timesTwo: 2 + * reduxStore.dispatch(1) + * // timesTwo: 4 + * ``` + */ +export function createExternalStoreAtom( + getSnapshot: () => T, + subscribe: (onStoreChange: () => void) => () => void, + options?: AtomOptions, +): ReadonlyAtom { + const trigger = createAtom(0) + const invalidate = () => trigger.set((n) => n + 1) + const atom = createAtom(() => { + // Return latest snapshot when `trigger` changes + trigger.get() + return getSnapshot() + }, options) + // Attach whileWatched to `atom`, not `trigger`. An unobserved `atom.get()` + // runs the getter with `activeSub = atom`, creating a trigger → atom link and + // firing `watched(trigger)` — but trigger has no whileWatched callback, so + // nothing happens. `watched(atom)` only fires when a real subscriber actually + // links in via subscribe/effect, which is what we want. + atom.whileWatched(() => subscribe(invalidate)) + return atom +} + export function createAtom( getValue: (prev?: NoInfer) => T, options?: AtomOptions, @@ -153,7 +359,7 @@ export function createAtom( // Create plain object atom const atom: InternalAtom = { _snapshot: isComputed ? undefined! : valueOrFn, - + _watches: 0, subs: undefined, subsTail: undefined, deps: undefined, @@ -185,6 +391,11 @@ export function createAtom( }, } }, + + whileWatched(listener: WatchedEffect): () => void { + return whileWatched(this, listener) + }, + _update(getValue?: T | ((snapshot: T) => T)): boolean { const prevSub = activeSub const compare = options?.compare ?? Object.is @@ -265,6 +476,7 @@ export function createAtom( } interface Effect extends ReactiveNode { + _watches: number notify: () => void stop: () => void } @@ -290,6 +502,8 @@ function effect(fn: () => T): Effect { subs: undefined, subsTail: undefined, flags: ReactiveFlags.Watching | ReactiveFlags.RecursedCheck, + // Effects are the source of liveness - they are created alive + _watches: 1, notify(): void { const flags = this.flags @@ -307,6 +521,7 @@ function effect(fn: () => T): Effect { this.flags = ReactiveFlags.None this.depsTail = undefined purgeDeps(this) + removeWatch(this) }, } diff --git a/packages/store/src/signal.ts b/packages/store/src/signal.ts index c05bf26a..7c66f289 100644 --- a/packages/store/src/signal.ts +++ b/packages/store/src/signal.ts @@ -60,6 +60,9 @@ const { link, unlink, propagate, checkDirty, shallowPropagate } = queued[insertIndex] = left } }, + watched(_node) { + // whileWatched not implemented for signal.ts + }, unwatched(node) { if (!(node.flags & ReactiveFlags.Mutable)) { effectScopeOper.call(node) diff --git a/packages/store/src/store.ts b/packages/store/src/store.ts index 69a91678..00bca4f1 100644 --- a/packages/store/src/store.ts +++ b/packages/store/src/store.ts @@ -1,4 +1,5 @@ import { createAtom, toObserver } from './atom' +import type { WatchedEffect } from './atom' import type { Atom, Observer, Subscription } from './types' export type StoreAction = (...args: Array) => any @@ -54,6 +55,15 @@ export class Store { ): Subscription { return this.atom.subscribe(toObserver(observerOrFn)) } + /** + * `effect` will be called while the atom is watched. `effect` may return a + * cleanup function, which will be called when the atom is unwatched. + * + * Returns a `stop` function which cancels the listener. + */ + public whileWatched(effect: WatchedEffect): () => void { + return this.atom.whileWatched(effect) + } } export class ReadonlyStore implements Omit< @@ -81,6 +91,15 @@ export class ReadonlyStore implements Omit< ): Subscription { return this.atom.subscribe(toObserver(observerOrFn)) } + /** + * `effect` will be called while the atom is watched. `effect` may return a + * cleanup function, which will be called when the atom is unwatched. + * + * Returns a `stop` function which cancels the listener. + */ + public whileWatched(effect: WatchedEffect): () => void { + return this.atom.whileWatched(effect) + } } export function createStore( diff --git a/packages/store/src/types.ts b/packages/store/src/types.ts index 24df6663..b0da72f5 100644 --- a/packages/store/src/types.ts +++ b/packages/store/src/types.ts @@ -1,3 +1,4 @@ +import type { WatchedEffect } from './atom' import type { ReactiveNode } from './alien' export type Selection = Readable @@ -30,7 +31,36 @@ export interface Readable extends Subscribable { get: () => T } -export interface BaseAtom extends Subscribable, Readable {} +export interface BaseAtom extends Subscribable, Readable { + /** + * `effect` will be called when the first subscriber is added. `effect` + * may return a cleanup function, which will be called when the atom is + * unwatched. + * + * Returns a `cleanup` function which removes the `whileWatched` listener. + * + * This can be used to sync external resources into the atom, similar to + * `useLayoutEffect` in React. + * + * ```ts + * function createTicker(ms: number) { + * const now = createAtom(Date.now()) + * const refresh = () => now.set(Date.now()) + * now.whileWatched(() => { + * refresh() + * const interval = setInterval(refresh, ms) + * return () => clearInterval(interval) + * }) + * return createAtom(() => now.get()) + * } + * const ticker = createTicker(1000) + * ticker.subscribe(() => { + * console.log('current time: ', ticker.get()) + * }) + * ``` + */ + whileWatched: (effect: WatchedEffect) => () => void +} export interface InternalBaseAtom extends Subscribable, Readable { /** @internal */ diff --git a/packages/store/tests/external-store-atom.test.ts b/packages/store/tests/external-store-atom.test.ts new file mode 100644 index 00000000..0ace2e2c --- /dev/null +++ b/packages/store/tests/external-store-atom.test.ts @@ -0,0 +1,186 @@ +import { describe, expect, test, vi } from 'vitest' +import { createExternalStoreAtom } from '../src' + +function makeExternalStore(initial: T) { + let value = initial + const listeners = new Set<() => void>() + return { + getSnapshot: () => value, + set(next: T) { + value = next + listeners.forEach((l) => l()) + }, + subscribe(cb: () => void) { + listeners.add(cb) + return () => { + listeners.delete(cb) + } + }, + listenerCount: () => listeners.size, + } +} + +describe('createExternalStoreAtom', () => { + test('initial value is getSnapshot() at creation time', () => { + const ext = makeExternalStore(42) + const atom = createExternalStoreAtom(ext.getSnapshot, ext.subscribe) + expect(atom.get()).toBe(42) + }) + + test('does not subscribe to the external store without subscribers', () => { + const ext = makeExternalStore(0) + createExternalStoreAtom(ext.getSnapshot, ext.subscribe) + expect(ext.listenerCount()).toBe(0) + }) + + test('unobserved .get() does not activate the external subscription', () => { + const ext = makeExternalStore(7) + const atom = createExternalStoreAtom(ext.getSnapshot, ext.subscribe) + + expect(atom.get()).toBe(7) + expect(ext.listenerCount()).toBe(0) + + atom.get() + atom.get() + expect(ext.listenerCount()).toBe(0) + }) + + test('unobserved chain read through derived atoms does not activate', async () => { + const { createAtom } = await import('../src') + const ext = makeExternalStore(1) + const a = createExternalStoreAtom(ext.getSnapshot, ext.subscribe) + const b = createAtom(() => a.get() * 2) + const c = createAtom(() => `${b.get()} dogs`) + + expect(c.get()).toBe('2 dogs') + expect(ext.listenerCount()).toBe(0) + }) + + test('subscribes on first atom subscriber; unsubscribes on last', () => { + const ext = makeExternalStore(0) + const atom = createExternalStoreAtom(ext.getSnapshot, ext.subscribe) + + const sub = atom.subscribe(() => {}) + expect(ext.listenerCount()).toBe(1) + + sub.unsubscribe() + expect(ext.listenerCount()).toBe(0) + }) + + test('pulls a fresh snapshot on activation (external changes while nobody watching)', () => { + const ext = makeExternalStore(0) + const atom = createExternalStoreAtom(ext.getSnapshot, ext.subscribe) + + ext.set(5) + + const received: Array = [] + const sub = atom.subscribe((v) => received.push(v)) + expect(atom.get()).toBe(5) + + sub.unsubscribe() + }) + + test('notifies subscribers when the external store dispatches changes', () => { + const ext = makeExternalStore(0) + const atom = createExternalStoreAtom(ext.getSnapshot, ext.subscribe) + + const fn = vi.fn() + const sub = atom.subscribe(fn) + + ext.set(1) + ext.set(2) + ext.set(3) + + expect(fn).toHaveBeenNthCalledWith(1, 1) + expect(fn).toHaveBeenNthCalledWith(2, 2) + expect(fn).toHaveBeenNthCalledWith(3, 3) + expect(fn).toHaveBeenCalledTimes(3) + + sub.unsubscribe() + }) + + test('multiple subscribers share a single external subscription', () => { + const ext = makeExternalStore(0) + const subscribe = vi.fn(ext.subscribe) + const atom = createExternalStoreAtom(ext.getSnapshot, subscribe) + + const s1 = atom.subscribe(() => {}) + const s2 = atom.subscribe(() => {}) + const s3 = atom.subscribe(() => {}) + + expect(subscribe).toHaveBeenCalledTimes(1) + expect(ext.listenerCount()).toBe(1) + + s1.unsubscribe() + s2.unsubscribe() + expect(ext.listenerCount()).toBe(1) + + s3.unsubscribe() + expect(ext.listenerCount()).toBe(0) + }) + + test('re-subscribing after full teardown re-subscribes to the external store', () => { + const ext = makeExternalStore(0) + const subscribe = vi.fn(ext.subscribe) + const atom = createExternalStoreAtom(ext.getSnapshot, subscribe) + + atom.subscribe(() => {}).unsubscribe() + expect(ext.listenerCount()).toBe(0) + + atom.subscribe(() => {}).unsubscribe() + expect(subscribe).toHaveBeenCalledTimes(2) + }) + + test('deduplicates: no notification when external value is Object.is-equal', () => { + const ext = makeExternalStore({ x: 1 }) + const atom = createExternalStoreAtom(ext.getSnapshot, ext.subscribe) + + const fn = vi.fn() + const sub = atom.subscribe(fn) + + ext.set(ext.getSnapshot()) + ext.set(ext.getSnapshot()) + expect(fn).not.toHaveBeenCalled() + + ext.set({ x: 1 }) + expect(fn).toHaveBeenCalledTimes(1) + + sub.unsubscribe() + }) + + test('does not infinite-loop when a subscriber triggers another external set', () => { + const ext = makeExternalStore(0) + const atom = createExternalStoreAtom(ext.getSnapshot, ext.subscribe) + + const seen: Array = [] + const sub = atom.subscribe((v) => { + seen.push(v) + if (v === 1) ext.set(2) + }) + + ext.set(1) + + expect(seen).toEqual([1]) + expect(atom.get()).toBe(2) + + sub.unsubscribe() + }) + + test('works as a dep inside another computed atom', async () => { + const { createAtom } = await import('../src') + const ext = makeExternalStore(10) + const atom = createExternalStoreAtom(ext.getSnapshot, ext.subscribe) + const doubled = createAtom(() => atom.get() * 2) + + const fn = vi.fn() + const sub = doubled.subscribe(fn) + expect(doubled.get()).toBe(20) + + ext.set(7) + expect(doubled.get()).toBe(14) + expect(fn).toHaveBeenLastCalledWith(14) + + sub.unsubscribe() + expect(ext.listenerCount()).toBe(0) + }) +}) diff --git a/packages/store/tests/while-watched.test.ts b/packages/store/tests/while-watched.test.ts new file mode 100644 index 00000000..729db348 --- /dev/null +++ b/packages/store/tests/while-watched.test.ts @@ -0,0 +1,363 @@ +import { describe, expect, test, vi } from 'vitest' +import { createAtom, createExternalStoreAtom } from '../src' + +function makeExternalStore(initial: T) { + let value = initial + const listeners = new Set<() => void>() + return { + getSnapshot: () => value, + set(next: T) { + value = next + listeners.forEach((l) => l()) + }, + subscribe(cb: () => void) { + listeners.add(cb) + return () => { + listeners.delete(cb) + } + }, + listenerCount: () => listeners.size, + } +} + +describe('whileWatched', () => { + test('does not fire before any subscriber', () => { + const atom = createAtom(0) + const effect = vi.fn() + atom.whileWatched(effect) + + expect(effect).not.toHaveBeenCalled() + }) + + test('fires on first subscribe; cleanup runs on last unsubscribe', () => { + const atom = createAtom(0) + const cleanup = vi.fn() + const effect = vi.fn(() => cleanup) + atom.whileWatched(effect) + + const sub = atom.subscribe(() => {}) + expect(effect).toHaveBeenCalledTimes(1) + expect(cleanup).not.toHaveBeenCalled() + + sub.unsubscribe() + expect(cleanup).toHaveBeenCalledTimes(1) + }) + + test('is ref-counted: multiple subscribers fire effect exactly once', () => { + const atom = createAtom(0) + const cleanup = vi.fn() + const effect = vi.fn(() => cleanup) + atom.whileWatched(effect) + + const sub1 = atom.subscribe(() => {}) + const sub2 = atom.subscribe(() => {}) + const sub3 = atom.subscribe(() => {}) + expect(effect).toHaveBeenCalledTimes(1) + + sub1.unsubscribe() + sub2.unsubscribe() + expect(cleanup).not.toHaveBeenCalled() + + sub3.unsubscribe() + expect(cleanup).toHaveBeenCalledTimes(1) + }) + + test('re-fires on each 0→1 transition and cleans up on each 1→0', () => { + const atom = createAtom(0) + const cleanup = vi.fn() + const effect = vi.fn(() => cleanup) + atom.whileWatched(effect) + + atom.subscribe(() => {}).unsubscribe() + atom.subscribe(() => {}).unsubscribe() + atom.subscribe(() => {}).unsubscribe() + + expect(effect).toHaveBeenCalledTimes(3) + expect(cleanup).toHaveBeenCalledTimes(3) + }) + + test('registering while already watched fires immediately', () => { + const atom = createAtom(0) + const sub = atom.subscribe(() => {}) + + const cleanup = vi.fn() + const effect = vi.fn(() => cleanup) + atom.whileWatched(effect) + + expect(effect).toHaveBeenCalledTimes(1) + expect(cleanup).not.toHaveBeenCalled() + + sub.unsubscribe() + expect(cleanup).toHaveBeenCalledTimes(1) + }) + + test('multiple handlers all fire and all clean up', () => { + const atom = createAtom(0) + const cleanupA = vi.fn() + const cleanupB = vi.fn() + const effectA = vi.fn(() => cleanupA) + const effectB = vi.fn(() => cleanupB) + atom.whileWatched(effectA) + atom.whileWatched(effectB) + + const sub = atom.subscribe(() => {}) + expect(effectA).toHaveBeenCalledTimes(1) + expect(effectB).toHaveBeenCalledTimes(1) + + sub.unsubscribe() + expect(cleanupA).toHaveBeenCalledTimes(1) + expect(cleanupB).toHaveBeenCalledTimes(1) + }) + + test('effect returning void does not throw on unwatch', () => { + const atom = createAtom(0) + atom.whileWatched(() => { + // no cleanup + }) + + const sub = atom.subscribe(() => {}) + expect(() => sub.unsubscribe()).not.toThrow() + }) + + test('stop() prevents future activations', () => { + const atom = createAtom(0) + const effect = vi.fn() + const stop = atom.whileWatched(effect) + + atom.subscribe(() => {}).unsubscribe() + expect(effect).toHaveBeenCalledTimes(1) + + stop() + + atom.subscribe(() => {}).unsubscribe() + expect(effect).toHaveBeenCalledTimes(1) + }) + + test('stop() is idempotent', () => { + const atom = createAtom(0) + const stop = atom.whileWatched(() => {}) + expect(() => { + stop() + stop() + }).not.toThrow() + }) + + test('works on computed atoms', () => { + const source = createAtom(1) + const derived = createAtom(() => source.get() * 2) + + const cleanup = vi.fn() + const effect = vi.fn(() => cleanup) + derived.whileWatched(effect) + + const sub = derived.subscribe(() => {}) + expect(effect).toHaveBeenCalledTimes(1) + + sub.unsubscribe() + expect(cleanup).toHaveBeenCalledTimes(1) + }) + + test('cascades through computed deps: watching a derived atom watches its sources', () => { + const source = createAtom(1) + const derived = createAtom(() => source.get() * 2) + + const sourceCleanup = vi.fn() + const sourceEffect = vi.fn(() => sourceCleanup) + source.whileWatched(sourceEffect) + + const sub = derived.subscribe(() => {}) + expect(sourceEffect).toHaveBeenCalledTimes(1) + expect(sourceCleanup).not.toHaveBeenCalled() + + sub.unsubscribe() + expect(sourceCleanup).toHaveBeenCalledTimes(1) + }) + + test('subscribing directly to the source does not double-fire when a derived is also subscribed', () => { + const source = createAtom(1) + const derived = createAtom(() => source.get() * 2) + + const effect = vi.fn() + source.whileWatched(effect) + + const subDerived = derived.subscribe(() => {}) + const subSource = source.subscribe(() => {}) + expect(effect).toHaveBeenCalledTimes(1) + + subDerived.unsubscribe() + subSource.unsubscribe() + }) + + test('long chain sub/unsub cycles do not drift _watches', () => { + const ext = makeExternalStore(1) + const a = createExternalStoreAtom(ext.getSnapshot, ext.subscribe) + const b = createAtom(() => a.get() * 2) + const c = createAtom(() => `${b.get()} dogs`) + + const effect = vi.fn() + const cleanup = vi.fn() + a.whileWatched(() => { + effect() + return cleanup + }) + + for (let i = 0; i < 10; i++) { + const sub = c.subscribe(() => {}) + expect(ext.listenerCount()).toBe(1) + sub.unsubscribe() + expect(ext.listenerCount()).toBe(0) + } + + expect(effect).toHaveBeenCalledTimes(10) + expect(cleanup).toHaveBeenCalledTimes(10) + }) + + test('conditional deps: dropped branch cleanup fires on recompute', () => { + const cond = createAtom(true) + const a = createAtom(1) + const b = createAtom(10) + const pick = createAtom(() => (cond.get() ? a.get() : b.get())) + + const aCleanup = vi.fn() + const bCleanup = vi.fn() + const aEffect = vi.fn(() => aCleanup) + const bEffect = vi.fn(() => bCleanup) + a.whileWatched(aEffect) + b.whileWatched(bEffect) + + const sub = pick.subscribe(() => {}) + expect(aEffect).toHaveBeenCalledTimes(1) + expect(bEffect).not.toHaveBeenCalled() + + cond.set(false) + expect(aCleanup).toHaveBeenCalledTimes(1) + expect(bEffect).toHaveBeenCalledTimes(1) + expect(bCleanup).not.toHaveBeenCalled() + + cond.set(true) + expect(bCleanup).toHaveBeenCalledTimes(1) + expect(aEffect).toHaveBeenCalledTimes(2) + + sub.unsubscribe() + expect(aCleanup).toHaveBeenCalledTimes(2) + expect(bCleanup).toHaveBeenCalledTimes(1) + }) + + test('diamond graph: shared source counted once per activation', () => { + const ext = makeExternalStore(1) + const source = createExternalStoreAtom(ext.getSnapshot, ext.subscribe) + const left = createAtom(() => source.get() + 1) + const right = createAtom(() => source.get() + 2) + + const subL = left.subscribe(() => {}) + expect(ext.listenerCount()).toBe(1) + const subR = right.subscribe(() => {}) + expect(ext.listenerCount()).toBe(1) + + subL.unsubscribe() + expect(ext.listenerCount()).toBe(1) + subR.unsubscribe() + expect(ext.listenerCount()).toBe(0) + + // Re-activation cycle works + const subL2 = left.subscribe(() => {}) + expect(ext.listenerCount()).toBe(1) + subL2.unsubscribe() + expect(ext.listenerCount()).toBe(0) + }) + + test('stop() while watched runs cleanup immediately and only once', () => { + const atom = createAtom(0) + const cleanup = vi.fn() + const stop = atom.whileWatched(() => cleanup) + + const sub = atom.subscribe(() => {}) + expect(cleanup).not.toHaveBeenCalled() + + stop() + expect(cleanup).toHaveBeenCalledTimes(1) + + sub.unsubscribe() + expect(cleanup).toHaveBeenCalledTimes(1) + }) + + test('re-entrant subscribe during cleanup leaves graph consistent', () => { + const ext = makeExternalStore(0) + const atom = createExternalStoreAtom(ext.getSnapshot, ext.subscribe) + + const activate = vi.fn() + let reenter = true + + atom.whileWatched(() => { + activate() + return () => { + if (reenter) { + reenter = false + // briefly resubscribe during cleanup — activates a second cycle + const innerSub = atom.subscribe(() => {}) + innerSub.unsubscribe() + } + } + }) + + const sub = atom.subscribe(() => {}) + expect(activate).toHaveBeenCalledTimes(1) + expect(ext.listenerCount()).toBe(1) + + sub.unsubscribe() + // The re-entry created a 2nd activation, then cleaned up. + expect(activate).toHaveBeenCalledTimes(2) + // After all unwinding, external store must have zero listeners. + expect(ext.listenerCount()).toBe(0) + + // _watches must not be stuck: a fresh subscribe re-activates. + const sub2 = atom.subscribe(() => {}) + expect(activate).toHaveBeenCalledTimes(3) + expect(ext.listenerCount()).toBe(1) + sub2.unsubscribe() + expect(ext.listenerCount()).toBe(0) + }) + + test('adding a whileWatched during another effect runs it immediately', () => { + const atom = createAtom(0) + const outerEffect = vi.fn() + const lateEffect = vi.fn() + const lateCleanup = vi.fn() + + atom.whileWatched(() => { + outerEffect() + atom.whileWatched(() => { + lateEffect() + return lateCleanup + }) + }) + + // whileWatched should not have started the effect immediately + expect(outerEffect).not.toHaveBeenCalled() + expect(lateEffect).not.toHaveBeenCalled() + + const sub = atom.subscribe(() => {}) + expect(outerEffect).toHaveBeenCalledTimes(1) + expect(lateEffect).toHaveBeenCalledTimes(1) + expect(lateCleanup).not.toHaveBeenCalled() + + sub.unsubscribe() + expect(lateCleanup).toHaveBeenCalledTimes(1) + }) + + test('many subscribers release source exactly when the last one leaves', () => { + const ext = makeExternalStore(0) + const atom = createExternalStoreAtom(ext.getSnapshot, ext.subscribe) + + const subs = Array.from({ length: 20 }, () => atom.subscribe(() => {})) + expect(ext.listenerCount()).toBe(1) + + for (let i = 0; i < subs.length - 1; i++) { + subs[i].unsubscribe() + expect(ext.listenerCount()).toBe(1) + } + + subs[subs.length - 1].unsubscribe() + expect(ext.listenerCount()).toBe(0) + }) +})