Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .changeset/ninety-actors-hide.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
'@tanstack/angular-store': minor
'@tanstack/preact-store': minor
'@tanstack/react-store': minor
'@tanstack/solid-store': minor
'@tanstack/vue-store': minor
'@tanstack/store': minor
'@tanstack/svelte-store': minor
---

Added atom.whileWatched(cb), store.whileWatched(cb), createExternalStoreAtom
8 changes: 8 additions & 0 deletions docs/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,10 @@
"label": "StoreActionsFactory",
"to": "reference/type-aliases/StoreActionsFactory"
},
{
"label": "WatchedEffect",
"to": "reference/type-aliases/WatchedEffect"
},
{
"label": "batch",
"to": "reference/functions/batch"
Expand All @@ -237,6 +241,10 @@
"label": "createAtom",
"to": "reference/functions/createAtom"
},
{
"label": "createExternalStoreAtom",
"to": "reference/functions/createExternalStoreAtom"
},
{
"label": "createStore",
"to": "reference/functions/createStore"
Expand Down
223 changes: 219 additions & 4 deletions packages/store/src/atom.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { ReactiveFlags, createReactiveSystem } from './alien'

import type { ReactiveNode } from './alien'
import type { Link, ReactiveNode } from './alien'
import type {
Atom,
AtomOptions,
Expand All @@ -26,16 +26,155 @@ export function toObserver<T>(
}
}

interface InternalAtom<T> extends ReactiveNode {
export type WatchedEffect = () => (() => void) | void | undefined

interface WatchableNode extends ReactiveNode {
/**
* Reference count: number of direct subs that are alive
* When >0, the node is "alive" and its watch effects should be running
* When =0, the node is "dead", and its watch effects should be stopped
*/
_watches?: number
_watchEffects?: Array<WatchedEffect>
_watchCleanups?: Array<(() => void) | void | undefined>
}

function isWatched(node: WatchableNode): boolean {
return !!node._watches
}

function addWatch(node: WatchableNode) {
node._watches ??= 0
const prev = node._watches++

// On first watch, node becomes alive:
if (prev === 0) {
// 1. propagate liveness to deps
// We become alive *after* everything we depend on becomes watched.
// (set up dependencies first before us, the subscriber.)
let deps = node.deps
while (deps !== undefined) {
addWatch(deps.dep)
deps = deps.nextDep
}

// 2. start/run own watch effects.
// Assign `_watchCleanups` BEFORE invoking any effect and seed it with
// packed `undefined` slots (avoid `new Array(len)` which gives a holey
// SMI array in V8). A re-entrant `whileWatched()` during `ef()` pushes
// into this same array at `i >= len`, keeping indices aligned with
// `_watchEffects`; the outer loop writes the pre-reserved slot by index.
const watchEffects = node._watchEffects
if (watchEffects?.length) {
const len = watchEffects.length
const cleanups: Array<(() => void) | void | undefined> = []
node._watchCleanups = cleanups
for (let i = 0; i < len; i++) cleanups.push(undefined)
for (let i = 0; i < len; i++) {
const ef = watchEffects[i]
if (ef) cleanups[i] = ef()
}
}
Comment on lines +67 to +77
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Exception handling gap in watch effect initialization.

If an effect at index i throws, subsequent effects won't run and their cleanups won't be registered. While this may be acceptable for simple cases, it could leave the system in an inconsistent state where some effects ran but others didn't.

Consider wrapping each effect invocation in a try-catch to ensure all effects get a chance to run, collecting errors to rethrow after.

🛡️ Suggested hardening
     if (watchEffects?.length) {
       const len = watchEffects.length
       const cleanups: Array<(() => void) | void | undefined> = []
       node._watchCleanups = cleanups
       for (let i = 0; i < len; i++) cleanups.push(undefined)
+      let firstError: unknown
       for (let i = 0; i < len; i++) {
         const ef = watchEffects[i]
-        if (ef) cleanups[i] = ef()
+        if (ef) {
+          try {
+            cleanups[i] = ef()
+          } catch (e) {
+            firstError ??= e
+          }
+        }
       }
+      if (firstError) throw firstError
     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const watchEffects = node._watchEffects
if (watchEffects?.length) {
const len = watchEffects.length
const cleanups: Array<(() => void) | void | undefined> = []
node._watchCleanups = cleanups
for (let i = 0; i < len; i++) cleanups.push(undefined)
for (let i = 0; i < len; i++) {
const ef = watchEffects[i]
if (ef) cleanups[i] = ef()
}
}
const watchEffects = node._watchEffects
if (watchEffects?.length) {
const len = watchEffects.length
const cleanups: Array<(() => void) | void | undefined> = []
node._watchCleanups = cleanups
for (let i = 0; i < len; i++) cleanups.push(undefined)
let firstError: unknown
for (let i = 0; i < len; i++) {
const ef = watchEffects[i]
if (ef) {
try {
cleanups[i] = ef()
} catch (e) {
firstError ??= e
}
}
}
if (firstError) throw firstError
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/store/src/atom.ts` around lines 68 - 78, The loop that runs
node._watchEffects should guard each invocation so one throwing effect doesn't
stop subsequent effects or leave a mismatched node._watchCleanups array; modify
the block around node._watchEffects/cleanups so each ef() call is wrapped in
try-catch, always assign a slot in node._watchCleanups (leaving undefined if ef
throws), collect any thrown errors into an array, continue invoking remaining
effects, and after the loop rethrow a combined error (or the first error) if any
were captured.

}
}

function removeWatch(node: WatchableNode) {
node._watches ??= 0
const next = --node._watches

// On last unwatch, node becomes dead:
if (next === 0) {
// 1. Clean up effects.
// Snapshot and clear `_watchCleanups` BEFORE invoking any cleanup, so a
// re-entrant subscribe during cleanup sees a consistent (empty) state and
// can rebuild cleanups into a fresh array via addWatch.
const cleanups = node._watchCleanups
node._watchCleanups = undefined
if (cleanups) {
for (let i = cleanups.length - 1; i >= 0; i--) {
cleanups[i]?.()
}
}
Comment on lines +93 to +97
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Cleanup loop could swallow errors from later teardowns.

If a cleanup function throws, the for-loop exits immediately and remaining cleanups won't run. This could leave external resources (like subscriptions) active.

🛡️ Suggested hardening
     const cleanups = node._watchCleanups
     node._watchCleanups = undefined
     if (cleanups) {
+      let firstError: unknown
       for (let i = cleanups.length - 1; i >= 0; i--) {
-        cleanups[i]?.()
+        try {
+          cleanups[i]?.()
+        } catch (e) {
+          firstError ??= e
+        }
       }
+      if (firstError) throw firstError
     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if (cleanups) {
for (let i = cleanups.length - 1; i >= 0; i--) {
cleanups[i]?.()
}
}
const cleanups = node._watchCleanups
node._watchCleanups = undefined
if (cleanups) {
let firstError: unknown
for (let i = cleanups.length - 1; i >= 0; i--) {
try {
cleanups[i]?.()
} catch (e) {
firstError ??= e
}
}
if (firstError) throw firstError
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/store/src/atom.ts` around lines 94 - 98, The cleanup loop over the
cleanups array can stop early if a cleanup function throws; update the loop in
the atom teardown (the block iterating cleanups[]) to wrap each invocation in a
try/catch so every cleanup is attempted even when one throws. Inside the catch,
either log the error (e.g., console.error or a logger) and continue, or collect
errors into an array and rethrow a single AggregateError after the loop so
callers are aware of failures while ensuring all cleanup functions run. Ensure
you reference the same cleanups[] invocation and preserve the optional call
pattern (cleanups[i]?.()) while adding per-iteration try/catch and error
handling.


// 2. propagate unwatch to deps.
// Capture `prevDep` BEFORE recursing, so cleanups that mutate the dep
// list can't redirect our traversal.
let deps = node.depsTail
while (deps !== undefined) {
const prev = deps.prevDep
removeWatch(deps.dep)
deps = prev
}
} else if (next < 0) {
throw new Error(`Unbalanced watch/unwatch led to negative watch count on ReactiveNode: ${next}`)
}
}

/**
* Causes `fn` to be called when `node` becomes gains its first live subscriber.
* If `fn` returns a cleanup function, it will be called when `node` loses its last live subscriber.
*/
export function whileWatched(node: WatchableNode, fn: WatchedEffect) {
const initialEffects = (node._watchEffects ??= [])
initialEffects.push(fn)
if (node._watches) {
// Node is already watched, start effect immediately
const cleanups = (node._watchCleanups ??= [])
cleanups.push(fn())
}

return function removeWhileWatched() {
const stoppableEffects = node._watchEffects
if (!stoppableEffects) {
return
}

const index = stoppableEffects.indexOf(fn)
if (index === -1) {
return
}

stoppableEffects.splice(index, 1)

if (node._watches) {
// If node is watched when we remove,
// also clean up the effect immediately
const watchCleanups = node._watchCleanups
if (watchCleanups?.length) {
const cleanup = watchCleanups[index]
cleanup?.()
watchCleanups.splice(index, 1)
if (watchCleanups.length === 0) {
node._watchCleanups = undefined
}
}
}
}
}

/**
* Called when the atom is watched.
* Returns a cleanup function that will be called when the atom is unwatched.
*/

interface InternalAtom<T> extends WatchableNode {
_snapshot: T
_update: (getValue?: T | ((snapshot: T) => T)) => boolean

get: () => T
subscribe: (observerOrFn: Observer<T> | ((value: T) => void)) => Subscription
/**
* `effect` will be called while the atom is watched. `effect` may return a
* cleanup function, which will be called when the atom is unwatched.
*
* Returns a `stop` function which cancels the listener.
*/
whileWatched: (effect: WatchedEffect) => () => void
}

const queuedEffects: Array<Effect | undefined> = []
let cycle = 0
const { link, unlink, propagate, checkDirty, shallowPropagate } =
const { link: _link, unlink: _unlink, propagate, checkDirty, shallowPropagate } =
createReactiveSystem({
update(atom: InternalAtom<any>): boolean {
return atom._update()
Expand All @@ -54,6 +193,31 @@ const { link, unlink, propagate, checkDirty, shallowPropagate } =
},
})

function link(dep: ReactiveNode, sub: ReactiveNode, version: number) {
const originalTail = dep.subsTail
_link(dep, sub, version)
const newTail = dep.subsTail

if (newTail && newTail !== originalTail && isWatched(sub)) {
// Propagate watch liveness from sub -> dep
addWatch(dep)
}
}


function unlink(
link: Link,
// sub must ALWAYS be link.sub, this arg is here for micro-optimization
sub: ReactiveNode = link.sub
): Link | undefined {
const dep = link.dep
if (isWatched(sub)) {
// Revoke liveness from this sub on dep when unlinked
removeWatch(dep)
}
return _unlink(link, sub)
}

let notifyIndex = 0
let queuedEffectsLength = 0
let activeSub: ReactiveNode | undefined
Expand Down Expand Up @@ -135,6 +299,48 @@ export function createAsyncAtom<T>(
return atom
}

/**
* Like React.useSyncExternalStore: pulls external state into an atom.
* This can be used for interoperating with other state management libraries.
*
Comment thread
coderabbitai[bot] marked this conversation as resolved.
* ```ts
* import * as redux from "redux"
*
* const reduxStore = redux.createStore((state: number, action: number) => state + action, 0)
* const atom = createExternalStoreAtom(reduxStore.getState, reduxStore.subscribe)
*
* const timesTwo = createAtom(() => atom.get() * 2)
* timesTwo.subscribe((value) => {
* console.log('timesTwo: ', value)
* })
*
* reduxStore.dispatch(1)
* // timesTwo: 2
* reduxStore.dispatch(1)
* // timesTwo: 4
* ```
*/
export function createExternalStoreAtom<T>(
getSnapshot: () => T,
subscribe: (onStoreChange: () => void) => () => void,
options?: AtomOptions<T>,
): ReadonlyAtom<T> {
const trigger = createAtom(0)
const invalidate = () => trigger.set((n) => n + 1)
const atom = createAtom(() => {
// Return latest snapshot when `trigger` changes
trigger.get()
return getSnapshot()
}, options)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
// Attach whileWatched to `atom`, not `trigger`. An unobserved `atom.get()`
// runs the getter with `activeSub = atom`, creating a trigger → atom link and
// firing `watched(trigger)` — but trigger has no whileWatched callback, so
// nothing happens. `watched(atom)` only fires when a real subscriber actually
// links in via subscribe/effect, which is what we want.
atom.whileWatched(() => subscribe(invalidate))
return atom
}

export function createAtom<T>(
getValue: (prev?: NoInfer<T>) => T,
options?: AtomOptions<T>,
Expand All @@ -153,7 +359,7 @@ export function createAtom<T>(
// Create plain object atom
const atom: InternalAtom<T> = {
_snapshot: isComputed ? undefined! : valueOrFn,

_watches: 0,
subs: undefined,
subsTail: undefined,
deps: undefined,
Expand Down Expand Up @@ -185,6 +391,11 @@ export function createAtom<T>(
},
}
},

whileWatched(listener: WatchedEffect): () => void {
return whileWatched(this, listener)
},

_update(getValue?: T | ((snapshot: T) => T)): boolean {
const prevSub = activeSub
const compare = options?.compare ?? Object.is
Expand Down Expand Up @@ -265,6 +476,7 @@ export function createAtom<T>(
}

interface Effect extends ReactiveNode {
_watches: number
notify: () => void
stop: () => void
}
Expand All @@ -290,6 +502,8 @@ function effect<T>(fn: () => T): Effect {
subs: undefined,
subsTail: undefined,
flags: ReactiveFlags.Watching | ReactiveFlags.RecursedCheck,
// Effects are the source of liveness - they are created alive
_watches: 1,

notify(): void {
const flags = this.flags
Expand All @@ -307,6 +521,7 @@ function effect<T>(fn: () => T): Effect {
this.flags = ReactiveFlags.None
this.depsTail = undefined
purgeDeps(this)
removeWatch(this)
},
}

Expand Down
3 changes: 3 additions & 0 deletions packages/store/src/signal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ const { link, unlink, propagate, checkDirty, shallowPropagate } =
queued[insertIndex] = left
}
},
watched(_node) {
// whileWatched not implemented for signal.ts
},
unwatched(node) {
if (!(node.flags & ReactiveFlags.Mutable)) {
effectScopeOper.call(node)
Expand Down
19 changes: 19 additions & 0 deletions packages/store/src/store.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { createAtom, toObserver } from './atom'
import type { WatchedEffect } from './atom'
import type { Atom, Observer, Subscription } from './types'

export type StoreAction = (...args: Array<any>) => any
Expand Down Expand Up @@ -54,6 +55,15 @@ export class Store<T, TActions extends StoreActionMap = never> {
): Subscription {
return this.atom.subscribe(toObserver(observerOrFn))
}
/**
* `effect` will be called while the atom is watched. `effect` may return a
* cleanup function, which will be called when the atom is unwatched.
*
* Returns a `stop` function which cancels the listener.
*/
public whileWatched(effect: WatchedEffect): () => void {
return this.atom.whileWatched(effect)
}
}

export class ReadonlyStore<T> implements Omit<
Expand Down Expand Up @@ -81,6 +91,15 @@ export class ReadonlyStore<T> implements Omit<
): Subscription {
return this.atom.subscribe(toObserver(observerOrFn))
}
/**
* `effect` will be called while the atom is watched. `effect` may return a
* cleanup function, which will be called when the atom is unwatched.
*
* Returns a `stop` function which cancels the listener.
*/
public whileWatched(effect: WatchedEffect): () => void {
return this.atom.whileWatched(effect)
}
}

export function createStore<T>(
Expand Down
Loading