diff --git a/.claude/hooks/session-start.sh b/.claude/hooks/session-start.sh
new file mode 100755
index 0000000..d96ed93
--- /dev/null
+++ b/.claude/hooks/session-start.sh
@@ -0,0 +1,17 @@
+#!/bin/bash
+set -euo pipefail
+
+# Only run in remote (Claude Code on the web) environments
+if [ "${CLAUDE_CODE_REMOTE:-}" != "true" ]; then
+ exit 0
+fi
+
+# Install system dependencies required by pygraphviz
+if ! dpkg -s libgraphviz-dev >/dev/null 2>&1; then
+ sudo apt-get update -qq
+ sudo apt-get install -y -qq graphviz libgraphviz-dev >/dev/null 2>&1
+fi
+
+# Install Python dependencies using uv
+cd "$CLAUDE_PROJECT_DIR"
+uv sync --group dev
diff --git a/.claude/settings.json b/.claude/settings.json
new file mode 100644
index 0000000..e06b033
--- /dev/null
+++ b/.claude/settings.json
@@ -0,0 +1,14 @@
+{
+ "hooks": {
+ "SessionStart": [
+ {
+ "hooks": [
+ {
+ "type": "command",
+ "command": "$CLAUDE_PROJECT_DIR/.claude/hooks/session-start.sh"
+ }
+ ]
+ }
+ ]
+ }
+}
diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml
new file mode 100644
index 0000000..bbb1df8
--- /dev/null
+++ b/.github/workflows/docs.yml
@@ -0,0 +1,39 @@
+name: Deploy docs
+
+on:
+ push:
+ branches: [main]
+ workflow_dispatch:
+
+permissions:
+ contents: read
+ pages: write
+ id-token: write
+
+concurrency:
+ group: "pages"
+ cancel-in-progress: false
+
+jobs:
+ deploy:
+ runs-on: ubuntu-latest
+ environment:
+ name: github-pages
+ url: ${{ steps.deployment.outputs.page_url }}
+ steps:
+ - uses: actions/checkout@v4
+
+ - uses: astral-sh/setup-uv@v4
+
+ - name: Install docs dependencies
+ run: uv sync --group docs
+
+ - name: Build documentation
+ run: uv run mkdocs build
+
+ - uses: actions/upload-pages-artifact@v3
+ with:
+ path: site/
+
+ - id: deployment
+ uses: actions/deploy-pages@v4
diff --git a/.gitignore b/.gitignore
index f49339f..1bbd3f1 100644
--- a/.gitignore
+++ b/.gitignore
@@ -221,3 +221,6 @@ dj_*_conf.json
# pixi environments
.pixi/*
!.pixi/config.toml
+
+# Nested repo clone (pre-existing artifact)
+orcapod-python/
diff --git a/.zed/rules b/.zed/rules
index c1acd66..d3c9941 100644
--- a/.zed/rules
+++ b/.zed/rules
@@ -1,3 +1,8 @@
+## Naming convention
+
+Always write "orcapod" with a lowercase p — never "OrcaPod" or "Orcapod". This applies
+everywhere: documentation, docstrings, comments, commit messages, and code comments.
+
## Running commands
Always run Python commands via `uv run`, e.g.:
@@ -7,6 +12,14 @@ Always run Python commands via `uv run`, e.g.:
Never use `python`, `pytest`, or `python3` directly.
+## Branch hygiene
+
+Periodically check the target branch (typically dev) for updates and incorporate them into
+your working branch. Before pushing, fetch and rebase onto the latest target branch to avoid
+divergence and merge conflicts. If cherry-picking is needed due to unrelated commit history,
+prefer cherry-picking your commits onto a fresh branch from the target rather than resolving
+massive rebase conflicts.
+
## Updating agent instructions
When adding or changing any instruction, update BOTH:
@@ -57,37 +70,49 @@ Examples:
## Project layout
src/orcapod/
- types.py — Schema, ColumnConfig, ContentHash
+ types.py — Schema, ColumnConfig, ContentHash, PipelineConfig,
+ NodeConfig, ExecutorType, CacheMode
system_constants.py — Column prefixes and separators
errors.py — InputValidationError, DuplicateTagError, FieldNotResolvableError
config.py — Config dataclass
+ channels.py — Async channel primitives (Channel, BroadcastChannel,
+ ReadableChannel, WritableChannel, ChannelClosed)
contexts/ — DataContext (semantic_hasher, arrow_hasher, type_converter)
protocols/
hashing_protocols.py — PipelineElementProtocol, ContentIdentifiableProtocol
+ database_protocols.py — ArrowDatabaseProtocol
+ pipeline_protocols.py — Pipeline-level protocols
+ semantic_types_protocols.py — TypeConverterProtocol
core_protocols/ — StreamProtocol, PodProtocol, SourceProtocol,
PacketFunctionProtocol, DatagramProtocol, TagProtocol,
- PacketProtocol, TrackerProtocol
+ PacketProtocol, TrackerProtocol, AsyncExecutableProtocol,
+ PacketFunctionExecutorProtocol, OperatorPodProtocol,
+ LabelableProtocol, TemporalProtocol, TraceableProtocol
core/
- base.py — ContentIdentifiableBase, PipelineElementBase, TraceableBase
- static_output_pod.py — StaticOutputPod (operator base), DynamicPodStream
- function_pod.py — FunctionPod, FunctionPodStream, FunctionNode
+ base.py — LabelableMixin, DataContextMixin, TraceableBase
+ function_pod.py — FunctionPod, FunctionPodStream, @function_pod decorator
packet_function.py — PacketFunctionBase, PythonPacketFunction, CachedPacketFunction
- operator_node.py — OperatorNode (DB-backed operator execution)
- tracker.py — Invocation tracking
+ tracker.py — BasicTrackerManager, GraphTracker
datagrams/
datagram.py — Datagram (unified dict/Arrow backing, lazy conversion)
tag_packet.py — Tag (+ system tags), Packet (+ source info)
sources/
base.py — RootSource (abstract, no upstream)
arrow_table_source.py — Core source — all other sources delegate to it
- derived_source.py — DerivedSource (backed by FunctionNode/OperatorNode DB)
+ persistent_source.py — PersistentSource (DB-backed caching wrapper)
+ derived_source.py — DerivedSource (backed by node DB records)
csv_source.py, dict_source.py, list_source.py,
data_frame_source.py, delta_table_source.py — Delegating wrappers
source_registry.py — SourceRegistry for provenance resolution
streams/
base.py — StreamBase (abstract)
arrow_table_stream.py — ArrowTableStream (concrete, immutable)
+ nodes/
+ function_node.py — FunctionNode, PersistentFunctionNode
+ operator_node.py — OperatorNode, PersistentOperatorNode
+ source_node.py — SourceNode (leaf stream in graph)
operators/
+ static_output_pod.py — StaticOutputOperatorPod, DynamicPodStream
base.py — UnaryOperator, BinaryOperator, NonZeroInputOperator
join.py — Join (N-ary inner join, commutative)
merge_join.py — MergeJoin (binary, colliding cols → sorted list[T])
@@ -96,27 +121,61 @@ src/orcapod/
column_selection.py — Select/Drop Tag/Packet columns
mappers.py — MapTags, MapPackets (rename columns)
filters.py — PolarsFilter
+ executors/
+ base.py — PacketFunctionExecutorBase (ABC)
+ local.py — LocalExecutor (default in-process)
+ ray.py — RayExecutor (dispatch to Ray cluster)
+ pipeline/
+ graph.py — Pipeline (extends GraphTracker, compiles to persistent nodes)
+ nodes.py — PersistentSourceNode (DB-backed leaf wrapper)
+ orchestrator.py — AsyncPipelineOrchestrator (channel-based concurrent execution)
hashing/
- semantic_hashing/ — BaseSemanticHasher, type handlers
- semantic_types/ — Type conversion (Python ↔ Arrow)
- databases/ — ArrowDatabaseProtocol implementations (Delta Lake, in-memory)
+ file_hashers.py — BasicFileHasher, CachedFileHasher
+ arrow_hashers.py — Arrow-specific hashing
+ arrow_serialization.py — Arrow serialization utilities
+ arrow_utils.py — Arrow manipulation for hashing
+ defaults.py — Factory functions for default hashers
+ hash_utils.py — hash_file(), get_function_components()
+ string_cachers.py — String caching strategies
+ versioned_hashers.py — Versioned hasher support
+ visitors.py — Visitor pattern for hashing
+ semantic_hashing/ — BaseSemanticHasher, type handlers, TypeHandlerRegistry
+ semantic_types/ — Type conversion (Python ↔ Arrow), UniversalTypeConverter,
+ SemanticTypeRegistry, type inference
+ databases/
+ delta_lake_databases.py — DeltaTableDatabase
+ in_memory_databases.py — InMemoryArrowDatabase
+ noop_database.py — NoOpArrowDatabase
+ file_utils.py — File utilities for database operations
+ execution_engines/
+ ray_execution_engine.py — RayEngine (execution on Ray clusters)
utils/
arrow_data_utils.py — System tag manipulation, source info, column helpers
arrow_utils.py — Arrow table utilities
schema_utils.py — Schema extraction, union, intersection, compatibility
lazy_module.py — LazyModule for deferred heavy imports
+ function_info.py — Function introspection utilities
+ git_utils.py — Git metadata extraction
+ name.py — Name utilities
+ object_spec.py — Object specification/serialization
+ polars_data_utils.py — Polars-specific utilities
tests/
test_core/
datagrams/ — Lazy conversion, dict/Arrow round-trip
- sources/ — Source construction, protocol conformance, DerivedSource
- streams/ — ArrowTableStream behavior
- function_pod/ — FunctionPod, FunctionNode, pipeline hash integration
+ sources/ — Source construction, protocol conformance, DerivedSource,
+ PersistentSource
+ streams/ — ArrowTableStream behavior, convenience methods
+ function_pod/ — FunctionPod, FunctionNode, pipeline hash integration,
+ @function_pod decorator
operators/ — All operators, OperatorNode, MergeJoin
- packet_function/ — PacketFunction, CachedPacketFunction
- test_hashing/ — Semantic hasher, hash stability
+ packet_function/ — PacketFunction, CachedPacketFunction, executor
+ test_channels/ — Async channels, async_execute for operators/nodes/pods,
+ native async operators, pipeline integration
+ test_pipeline/ — Pipeline compilation, AsyncPipelineOrchestrator
+ test_hashing/ — Semantic hasher, hash stability, file hashers, string cachers
test_databases/ — Delta Lake, in-memory, no-op databases
- test_semantic_types/ — Type converter tests
+ test_semantic_types/ — Type converter, semantic registry, struct converters
---
@@ -126,8 +185,12 @@ See orcapod-design.md at the project root for the full design specification.
### Core data flow
+Pull-based (synchronous):
RootSource → ArrowTableStream → [Operator / FunctionPod] → ArrowTableStream → ...
+Push-based (async pipeline):
+ Pipeline.compile() → AsyncPipelineOrchestrator.run() → channels → persistent nodes → DB
+
Every stream is an immutable sequence of (Tag, Packet) pairs backed by a PyArrow Table.
Tag columns are join keys and metadata; packet columns are the data payload.
@@ -143,22 +206,53 @@ Key methods: output_schema(), keys(), iter_packets(), as_table().
Source (core/sources/) — produces a stream from external data. ArrowTableSource is the core
implementation; CSV/Delta/DataFrame/Dict/List sources all delegate to it internally. Each
-source adds source-info columns and a system tag column. DerivedSource wraps a
-FunctionNode/OperatorNode's DB records as a new source.
+source adds source-info columns and a system tag column. DerivedSource wraps a node's DB
+records as a new source. PersistentSource wraps any RootSource with DB-backed caching
+(deduped by per-row content hash).
Function Pod (core/function_pod.py) — wraps a PacketFunction that transforms individual
-packets. Never inspects tags. Two execution models:
-- FunctionPod → FunctionPodStream: lazy, in-memory
-- FunctionNode: DB-backed, two-phase (yield cached results first, then compute missing)
+packets. Never inspects tags. Supports async functions via PythonPacketFunction. The
+@function_pod decorator creates FunctionPod instances directly from Python functions.
+
+Node (core/nodes/) — graph-aware wrappers that participate in the computation DAG:
+- SourceNode — leaf stream in the graph (wraps a StreamProtocol)
+- FunctionNode / PersistentFunctionNode — packet function invocations (persistent variant
+ is DB-backed with two-phase execution: yield cached, then compute missing)
+- OperatorNode / PersistentOperatorNode — operator invocations (persistent variant
+ is DB-backed with deduplication)
Operator (core/operators/) — structural pod transforming streams without synthesizing new
-packet values. All subclass StaticOutputPod:
+packet values. All subclass StaticOutputOperatorPod. Each operator also implements
+AsyncExecutableProtocol for push-based channel execution:
- UnaryOperator — 1 input (Batch, Select/Drop columns, Map, Filter)
- BinaryOperator — 2 inputs (MergeJoin, SemiJoin)
- NonZeroInputOperator — 1+ inputs (Join)
-OperatorNode (core/operator_node.py) — DB-backed operator execution, analogous to
-FunctionNode.
+Executor (core/executors/) — pluggable execution backends for packet functions:
+- LocalExecutor — default in-process execution
+- RayExecutor — dispatches to a Ray cluster
+
+Channel (channels.py) — async primitives for push-based pipeline execution:
+- Channel[T] — bounded async channel with backpressure and close signaling
+- BroadcastChannel[T] — fan-out channel for multiple consumers
+- ReadableChannel[T] / WritableChannel[T] — consumer/producer protocols
+
+Pipeline (pipeline/) — persistent, async-capable pipeline infrastructure:
+- Pipeline — extends GraphTracker; records invocations during a with block, then compile()
+ replaces every node with its persistent variant (leaf streams → PersistentSourceNode,
+ function nodes → PersistentFunctionNode, operator nodes → PersistentOperatorNode)
+- AsyncPipelineOrchestrator — executes a compiled pipeline using channels; walks the
+ persistent node graph in topological order, creates bounded channels between nodes,
+ launches all nodes concurrently via asyncio.TaskGroup
+
+### Async execution model
+
+All pipeline nodes implement AsyncExecutableProtocol:
+ async def async_execute(inputs, output) → None
+
+The orchestrator wires channels between nodes and launches tasks without knowing node types.
+PipelineConfig controls buffer sizes (channel_buffer_size) and concurrency limits
+(default_max_concurrency). Per-node overrides are set via NodeConfig.
### Strict operator / function pod boundary
@@ -199,18 +293,28 @@ Prefixes are computed from SystemConstant in system_constants.py.
- LazyModule("pyarrow") — deferred import for heavy deps. Used in
if TYPE_CHECKING: / else: blocks.
- Argument symmetry — operators return frozenset (commutative) or tuple (ordered).
-- StaticOutputPod.process() → DynamicPodStream — wraps static_process() with staleness
- detection and automatic recomputation.
+- StaticOutputOperatorPod.process() → DynamicPodStream — wraps static_process() with
+ staleness detection and automatic recomputation.
- Source delegation — CSVSource, DictSource, etc. create an internal ArrowTableSource.
+- Pipeline context manager — records non-persistent nodes during with block, then compile()
+ promotes them to persistent variants with DB backing.
+- AsyncExecutableProtocol — unified interface for all pipeline nodes. The orchestrator
+ wires channels and launches tasks without knowing node types.
+- GraphTracker — tracks operator/function pod invocations in a NetworkX DAG; Pipeline
+ extends it to add compilation and persistence.
### Important implementation details
- ArrowTableSource raises ValueError if any tag_columns are not in the table.
- ArrowTableStream requires at least one packet column; raises ValueError otherwise.
-- FunctionNode Phase 1 returns ALL records in the shared pipeline_path DB table.
+- PersistentFunctionNode Phase 1 returns ALL records in the shared pipeline_path DB table.
Phase 2 skips inputs whose hash is already in the DB.
- Empty data → ArrowTableSource raises ValueError("Table is empty").
- DerivedSource before run() → raises ValueError (no computed records).
- Join requires non-overlapping packet columns; raises InputValidationError on collision.
- MergeJoin requires colliding columns to have identical types; merges into sorted list[T].
- Operators predict output schema (including system tag names) without computation.
+- CachedFileHasher uses mtime+size cache busting to detect file changes without re-hashing.
+- PersistentSource cache is always on; returns the union of all cached data across runs.
+- AsyncPipelineOrchestrator uses BroadcastChannel for fan-out (one node feeding multiple
+ downstream consumers).
diff --git a/CLAUDE.md b/CLAUDE.md
index 79612da..1f2b78c 100644
--- a/CLAUDE.md
+++ b/CLAUDE.md
@@ -1,5 +1,10 @@
# Claude Code instructions for orcapod-python
+## Naming convention
+
+Always write "orcapod" with a **lowercase p** — never "OrcaPod" or "Orcapod". This applies
+everywhere: documentation, docstrings, comments, commit messages, and code comments.
+
## Running commands
Always run Python commands via `uv run`, e.g.:
@@ -11,6 +16,14 @@ uv run python -c "..."
Never use `python`, `pytest`, or `python3` directly.
+## Branch hygiene
+
+Periodically check the target branch (typically `dev`) for updates and incorporate them into
+your working branch. Before pushing, fetch and rebase onto the latest target branch to avoid
+divergence and merge conflicts. If cherry-picking is needed due to unrelated commit history,
+prefer cherry-picking your commits onto a fresh branch from the target rather than resolving
+massive rebase conflicts.
+
## Updating agent instructions
When adding or changing any instruction, update BOTH:
@@ -64,66 +77,112 @@ Examples:
```
src/orcapod/
-├── types.py # Schema, ColumnConfig, ContentHash
+├── types.py # Schema, ColumnConfig, ContentHash, PipelineConfig,
+│ # NodeConfig, ExecutorType, CacheMode
├── system_constants.py # Column prefixes and separators
├── errors.py # InputValidationError, DuplicateTagError, FieldNotResolvableError
├── config.py # Config dataclass
+├── channels.py # Async channel primitives (Channel, BroadcastChannel,
+│ # ReadableChannel, WritableChannel, ChannelClosed)
├── contexts/ # DataContext (semantic_hasher, arrow_hasher, type_converter)
├── protocols/
│ ├── hashing_protocols.py # PipelineElementProtocol, ContentIdentifiableProtocol
+│ ├── database_protocols.py # ArrowDatabaseProtocol
+│ ├── pipeline_protocols.py # Pipeline-level protocols
+│ ├── semantic_types_protocols.py # TypeConverterProtocol
│ └── core_protocols/ # StreamProtocol, PodProtocol, SourceProtocol,
│ # PacketFunctionProtocol, DatagramProtocol, TagProtocol,
-│ # PacketProtocol, TrackerProtocol
+│ # PacketProtocol, TrackerProtocol, AsyncExecutableProtocol,
+│ # PacketFunctionExecutorProtocol, OperatorPodProtocol,
+│ # LabelableProtocol, TemporalProtocol, TraceableProtocol
├── core/
-│ ├── base.py # ContentIdentifiableBase, PipelineElementBase, TraceableBase
-│ ├── static_output_pod.py # StaticOutputPod (operator base), DynamicPodStream
-│ ├── function_pod.py # FunctionPod, FunctionPodStream, FunctionNode
+│ ├── base.py # LabelableMixin, DataContextMixin, TraceableBase
+│ ├── function_pod.py # FunctionPod, FunctionPodStream, @function_pod decorator
│ ├── packet_function.py # PacketFunctionBase, PythonPacketFunction, CachedPacketFunction
-│ ├── operator_node.py # OperatorNode (DB-backed operator execution)
-│ ├── tracker.py # Invocation tracking
+│ ├── tracker.py # BasicTrackerManager, GraphTracker
│ ├── datagrams/
│ │ ├── datagram.py # Datagram (unified dict/Arrow backing, lazy conversion)
│ │ └── tag_packet.py # Tag (+ system tags), Packet (+ source info)
│ ├── sources/
│ │ ├── base.py # RootSource (abstract, no upstream)
│ │ ├── arrow_table_source.py # Core source — all other sources delegate to it
-│ │ ├── derived_source.py # DerivedSource (backed by FunctionNode/OperatorNode DB)
+│ │ ├── persistent_source.py # PersistentSource (DB-backed caching wrapper)
+│ │ ├── derived_source.py # DerivedSource (backed by node DB records)
│ │ ├── csv_source.py, dict_source.py, list_source.py,
│ │ │ data_frame_source.py, delta_table_source.py # Delegating wrappers
│ │ └── source_registry.py # SourceRegistry for provenance resolution
│ ├── streams/
│ │ ├── base.py # StreamBase (abstract)
│ │ └── arrow_table_stream.py # ArrowTableStream (concrete, immutable)
-│ └── operators/
-│ ├── base.py # UnaryOperator, BinaryOperator, NonZeroInputOperator
-│ ├── join.py # Join (N-ary inner join, commutative)
-│ ├── merge_join.py # MergeJoin (binary, colliding cols → sorted list[T])
-│ ├── semijoin.py # SemiJoin (binary, non-commutative)
-│ ├── batch.py # Batch (group rows, types become list[T])
-│ ├── column_selection.py # Select/Drop Tag/Packet columns
-│ ├── mappers.py # MapTags, MapPackets (rename columns)
-│ └── filters.py # PolarsFilter
+│ ├── nodes/
+│ │ ├── function_node.py # FunctionNode, PersistentFunctionNode
+│ │ ├── operator_node.py # OperatorNode, PersistentOperatorNode
+│ │ └── source_node.py # SourceNode (leaf stream in graph)
+│ ├── operators/
+│ │ ├── static_output_pod.py # StaticOutputOperatorPod, DynamicPodStream
+│ │ ├── base.py # UnaryOperator, BinaryOperator, NonZeroInputOperator
+│ │ ├── join.py # Join (N-ary inner join, commutative)
+│ │ ├── merge_join.py # MergeJoin (binary, colliding cols → sorted list[T])
+│ │ ├── semijoin.py # SemiJoin (binary, non-commutative)
+│ │ ├── batch.py # Batch (group rows, types become list[T])
+│ │ ├── column_selection.py # Select/Drop Tag/Packet columns
+│ │ ├── mappers.py # MapTags, MapPackets (rename columns)
+│ │ └── filters.py # PolarsFilter
+│ └── executors/
+│ ├── base.py # PacketFunctionExecutorBase (ABC)
+│ ├── local.py # LocalExecutor (default in-process)
+│ └── ray.py # RayExecutor (dispatch to Ray cluster)
+├── pipeline/
+│ ├── graph.py # Pipeline (extends GraphTracker, compiles to persistent nodes)
+│ ├── nodes.py # PersistentSourceNode (DB-backed leaf wrapper)
+│ └── orchestrator.py # AsyncPipelineOrchestrator (channel-based concurrent execution)
├── hashing/
-│ └── semantic_hashing/ # BaseSemanticHasher, type handlers
-├── semantic_types/ # Type conversion (Python ↔ Arrow)
-├── databases/ # ArrowDatabaseProtocol implementations (Delta Lake, in-memory)
+│ ├── file_hashers.py # BasicFileHasher, CachedFileHasher
+│ ├── arrow_hashers.py # Arrow-specific hashing
+│ ├── arrow_serialization.py # Arrow serialization utilities
+│ ├── arrow_utils.py # Arrow manipulation for hashing
+│ ├── defaults.py # Factory functions for default hashers
+│ ├── hash_utils.py # hash_file(), get_function_components()
+│ ├── string_cachers.py # String caching strategies
+│ ├── versioned_hashers.py # Versioned hasher support
+│ ├── visitors.py # Visitor pattern for hashing
+│ └── semantic_hashing/ # BaseSemanticHasher, type handlers, TypeHandlerRegistry
+├── semantic_types/ # Type conversion (Python ↔ Arrow), UniversalTypeConverter,
+│ # SemanticTypeRegistry, type inference
+├── databases/ # ArrowDatabaseProtocol implementations
+│ ├── delta_lake_databases.py # DeltaTableDatabase
+│ ├── in_memory_databases.py # InMemoryArrowDatabase
+│ ├── noop_database.py # NoOpArrowDatabase
+│ └── file_utils.py # File utilities for database operations
+├── execution_engines/
+│ └── ray_execution_engine.py # RayEngine (execution on Ray clusters)
└── utils/
├── arrow_data_utils.py # System tag manipulation, source info, column helpers
├── arrow_utils.py # Arrow table utilities
├── schema_utils.py # Schema extraction, union, intersection, compatibility
- └── lazy_module.py # LazyModule for deferred heavy imports
+ ├── lazy_module.py # LazyModule for deferred heavy imports
+ ├── function_info.py # Function introspection utilities
+ ├── git_utils.py # Git metadata extraction
+ ├── name.py # Name utilities
+ ├── object_spec.py # Object specification/serialization
+ └── polars_data_utils.py # Polars-specific utilities
tests/
├── test_core/
│ ├── datagrams/ # Lazy conversion, dict/Arrow round-trip
-│ ├── sources/ # Source construction, protocol conformance, DerivedSource
-│ ├── streams/ # ArrowTableStream behavior
-│ ├── function_pod/ # FunctionPod, FunctionNode, pipeline hash integration
+│ ├── sources/ # Source construction, protocol conformance, DerivedSource,
+│ │ # PersistentSource
+│ ├── streams/ # ArrowTableStream behavior, convenience methods
+│ ├── function_pod/ # FunctionPod, FunctionNode, pipeline hash integration,
+│ │ # @function_pod decorator
│ ├── operators/ # All operators, OperatorNode, MergeJoin
-│ └── packet_function/ # PacketFunction, CachedPacketFunction
-├── test_hashing/ # Semantic hasher, hash stability
+│ └── packet_function/ # PacketFunction, CachedPacketFunction, executor
+├── test_channels/ # Async channels, async_execute for operators/nodes/pods,
+│ # native async operators, pipeline integration
+├── test_pipeline/ # Pipeline compilation, AsyncPipelineOrchestrator
+├── test_hashing/ # Semantic hasher, hash stability, file hashers, string cachers
├── test_databases/ # Delta Lake, in-memory, no-op databases
-└── test_semantic_types/ # Type converter tests
+└── test_semantic_types/ # Type converter, semantic registry, struct converters
```
---
@@ -134,10 +193,16 @@ See `orcapod-design.md` at the project root for the full design specification.
### Core data flow
+**Pull-based (synchronous):**
```
RootSource → ArrowTableStream → [Operator / FunctionPod] → ArrowTableStream → ...
```
+**Push-based (async pipeline):**
+```
+Pipeline.compile() → AsyncPipelineOrchestrator.run() → channels → persistent nodes → DB
+```
+
Every stream is an immutable sequence of (Tag, Packet) pairs backed by a PyArrow Table.
Tag columns are join keys and metadata; packet columns are the data payload.
@@ -153,22 +218,59 @@ Key methods: `output_schema()`, `keys()`, `iter_packets()`, `as_table()`.
**Source** (`core/sources/`) — produces a stream from external data. `ArrowTableSource` is the
core implementation; CSV/Delta/DataFrame/Dict/List sources all delegate to it internally. Each
-source adds source-info columns and a system tag column. `DerivedSource` wraps a
-FunctionNode/OperatorNode's DB records as a new source.
+source adds source-info columns and a system tag column. `DerivedSource` wraps a node's DB
+records as a new source. `PersistentSource` wraps any `RootSource` with DB-backed caching
+(deduped by per-row content hash).
**Function Pod** (`core/function_pod.py`) — wraps a `PacketFunction` that transforms individual
-packets. Never inspects tags. Two execution models:
-- `FunctionPod` → `FunctionPodStream`: lazy, in-memory
-- `FunctionNode`: DB-backed, two-phase (yield cached results first, then compute missing)
+packets. Never inspects tags. Supports async functions via `PythonPacketFunction`. The
+`@function_pod` decorator creates `FunctionPod` instances directly from Python functions.
+
+**Node** (`core/nodes/`) — graph-aware wrappers that participate in the computation DAG:
+- `SourceNode` — leaf stream in the graph (wraps a `StreamProtocol`)
+- `FunctionNode` / `PersistentFunctionNode` — packet function invocations (persistent variant
+ is DB-backed with two-phase execution: yield cached, then compute missing)
+- `OperatorNode` / `PersistentOperatorNode` — operator invocations (persistent variant
+ is DB-backed with deduplication)
**Operator** (`core/operators/`) — structural pod transforming streams without synthesizing new
-packet values. All subclass `StaticOutputPod`:
+packet values. All subclass `StaticOutputOperatorPod`. Each operator also implements
+`AsyncExecutableProtocol` for push-based channel execution:
- `UnaryOperator` — 1 input (Batch, Select/Drop columns, Map, Filter)
- `BinaryOperator` — 2 inputs (MergeJoin, SemiJoin)
- `NonZeroInputOperator` — 1+ inputs (Join)
-**OperatorNode** (`core/operator_node.py`) — DB-backed operator execution, analogous to
-FunctionNode.
+**Executor** (`core/executors/`) — pluggable execution backends for packet functions:
+- `LocalExecutor` — default in-process execution
+- `RayExecutor` — dispatches to a Ray cluster
+
+**Channel** (`channels.py`) — async primitives for push-based pipeline execution:
+- `Channel[T]` — bounded async channel with backpressure and close signaling
+- `BroadcastChannel[T]` — fan-out channel for multiple consumers
+- `ReadableChannel[T]` / `WritableChannel[T]` — consumer/producer protocols
+
+**Pipeline** (`pipeline/`) — persistent, async-capable pipeline infrastructure:
+- `Pipeline` — extends `GraphTracker`; records operator/function pod invocations during a
+ `with` block, then `compile()` replaces every node with its persistent variant
+ (leaf streams → `PersistentSourceNode`, function nodes → `PersistentFunctionNode`,
+ operator nodes → `PersistentOperatorNode`)
+- `AsyncPipelineOrchestrator` — executes a compiled pipeline using channels; walks the
+ persistent node graph in topological order, creates bounded channels between nodes,
+ launches all nodes concurrently via `asyncio.TaskGroup`
+
+### Async execution model
+
+All pipeline nodes implement `AsyncExecutableProtocol`:
+```python
+async def async_execute(
+ inputs: Sequence[ReadableChannel[tuple[TagProtocol, PacketProtocol]]],
+ output: WritableChannel[tuple[TagProtocol, PacketProtocol]],
+) -> None
+```
+
+The orchestrator wires channels between nodes and launches tasks without knowing node types.
+`PipelineConfig` controls buffer sizes (`channel_buffer_size`) and concurrency limits
+(`default_max_concurrency`). Per-node overrides are set via `NodeConfig`.
### Strict operator / function pod boundary
@@ -233,18 +335,24 @@ and `as_table()` methods. `all_info=True` sets everything to True.
`if TYPE_CHECKING:` / `else:` blocks at module level.
- **Argument symmetry** — each operator declares `argument_symmetry(streams)` returning
`frozenset` (commutative) or `tuple` (ordered). Determines how upstream hashes combine.
-- **`StaticOutputPod.process()` → `DynamicPodStream`** — wraps `static_process()` output
- with timestamp-based staleness detection and automatic recomputation.
+- **`StaticOutputOperatorPod.process()` → `DynamicPodStream`** — wraps `static_process()`
+ output with timestamp-based staleness detection and automatic recomputation.
- **Source delegation** — CSVSource, DictSource, etc. all create an internal
`ArrowTableSource` and delegate every method to it.
+- **`Pipeline` context manager** — records non-persistent nodes during `with` block, then
+ `compile()` promotes them to persistent variants with DB backing.
+- **`AsyncExecutableProtocol`** — unified interface for all pipeline nodes. The orchestrator
+ wires channels and launches tasks without knowing node types.
+- **`GraphTracker`** — tracks operator/function pod invocations in a NetworkX DAG; `Pipeline`
+ extends it to add compilation and persistence.
### Important implementation details
- `ArrowTableSource.__init__` raises `ValueError` if any `tag_columns` are not in the table.
- `ArrowTableStream` requires at least one packet column; raises `ValueError` otherwise.
-- `FunctionNode.iter_packets()` Phase 1 returns ALL records in the shared `pipeline_path`
- DB table (not filtered to current inputs). Phase 2 skips inputs whose hash is already
- in the DB.
+- `PersistentFunctionNode.iter_packets()` Phase 1 returns ALL records in the shared
+ `pipeline_path` DB table (not filtered to current inputs). Phase 2 skips inputs whose hash
+ is already in the DB.
- Empty data → `ArrowTableSource` raises `ValueError("Table is empty")`.
- `DerivedSource` before `run()` → raises `ValueError` (no computed records).
- Join requires non-overlapping packet columns; raises `InputValidationError` on collision.
@@ -252,3 +360,7 @@ and `as_table()` methods. `all_info=True` sets everything to True.
`list[T]` with source columns reordered to match.
- Operators predict their output schema (including system tag column names) without
performing the actual computation.
+- `CachedFileHasher` uses mtime+size cache busting to detect file changes without re-hashing.
+- `PersistentSource` cache is always on; returns the union of all cached data across runs.
+- `AsyncPipelineOrchestrator` uses `BroadcastChannel` for fan-out (one node feeding multiple
+ downstream consumers).
diff --git a/README.md b/README.md
index 9641750..948a142 100644
--- a/README.md
+++ b/README.md
@@ -1,31 +1,115 @@
-# Orcapod Python
-Orcapod's Python library for developing reproducbile scientific pipelines.
+# orcapod
-## Continuous Integration
+[](https://www.python.org/downloads/)
+[](https://opensource.org/licenses/MIT)
-This project uses GitHub Actions for continuous integration:
+**Intuitive and powerful library for highly reproducible scientific data pipelines.**
-- **Run Tests**: A workflow that runs tests on Ubuntu with multiple Python versions.
+orcapod is a Python framework for building data pipelines with built-in provenance tracking, content-addressable caching, and deterministic computation. Every value produced by an orcapod pipeline is traceable back to its original source, every computation is memoizable, and every result is verifiable.
-### Running Tests Locally
+## Key Features
-To run tests locally:
+- **Full Provenance Tracking** — Every value carries metadata tracing it back to its originating source and record.
+- **Content-Addressable Caching** — Identical computations are never repeated. Results are automatically shared across compatible pipeline runs.
+- **Immutable Data Flow** — Streams are immutable (Tag, Packet) sequences backed by Apache Arrow tables.
+- **Strict Operator / Function Pod Boundary** — Operators transform structure without inspecting data. Function pods transform data without inspecting tags.
+- **Schema as a First-Class Citizen** — Every stream is self-describing with schemas predicted at construction time.
+- **Incremental Computation** — Database-backed nodes compute only what's missing.
+- **Pluggable Execution** — Synchronous, async channels, or distributed via Ray — results are identical.
+
+## Quick Example
+
+```python
+import pyarrow as pa
+from orcapod import ArrowTableSource, FunctionPod
+from orcapod.core.packet_function import PythonPacketFunction
+from orcapod.core.operators import Join
+
+# Create sources with tag (join key) and packet (data) columns
+patients = ArrowTableSource(
+ pa.table({"patient_id": ["p1", "p2", "p3"], "age": [30, 45, 60]}),
+ tag_columns=["patient_id"],
+)
+labs = ArrowTableSource(
+ pa.table({"patient_id": ["p1", "p2", "p3"], "cholesterol": [180, 220, 260]}),
+ tag_columns=["patient_id"],
+)
+
+# Join on shared tag columns
+joined = Join()(patients, labs)
+
+# Apply a computation to each packet
+def risk_score(age: int, cholesterol: int) -> float:
+ return age * 0.5 + cholesterol * 0.3
+
+risk_fn = PythonPacketFunction(risk_score, output_keys="risk")
+result = FunctionPod(packet_function=risk_fn)(joined)
+
+# Iterate results
+for tag, packet in result.iter_packets():
+ print(f"{tag.as_dict()} → {packet.as_dict()}")
+```
+
+## Installation
```bash
-# Install the package with test dependencies
-pip install -e ".[test]"
+# From source with uv (recommended)
+git clone https://github.com/walkerlab/orcapod-python.git
+cd orcapod-python
+uv sync
-# Run tests with coverage
-pytest -v --cov=src --cov-report=term-missing
+# Or with pip
+pip install -e .
+```
+
+### Optional Dependencies
+
+```bash
+pip install orcapod[ray] # Distributed execution via Ray
+pip install orcapod[redis] # Redis-backed caching
+pip install orcapod[all] # Everything
```
-### Development Setup
+## Documentation
-For development, you can install all optional dependencies:
+Full documentation is available at the [orcapod docs site](https://walkerlab.github.io/orcapod-python/).
+
+- [Getting Started](https://walkerlab.github.io/orcapod-python/getting-started/installation/) — Installation and quickstart
+- [Concepts](https://walkerlab.github.io/orcapod-python/concepts/architecture/) — Architecture and design principles
+- [User Guide](https://walkerlab.github.io/orcapod-python/user-guide/sources/) — Detailed guides for each component
+- [API Reference](https://walkerlab.github.io/orcapod-python/api/) — Auto-generated API documentation
+
+## Development
```bash
-# Install all development dependencies
-pip install -e ".[test,dev]"
-# or
-pip install -r requirements-dev.txt
+# Install dev dependencies
+uv sync --group dev
+
+# Run tests
+uv run pytest tests/
+
+# Run tests with coverage
+uv run pytest tests/ --cov=src --cov-report=term-missing
+
+# Build documentation locally
+uv sync --group docs
+uv run mkdocs serve
```
+
+## Architecture at a Glance
+
+```
+Source → Stream → [Operator / FunctionPod] → Stream → ...
+```
+
+| Abstraction | Role |
+|-------------|------|
+| **Source** | Load external data, establish provenance |
+| **Stream** | Immutable (Tag, Packet) sequence over shared schema |
+| **Operator** | Structural transformation (join, filter, select, rename) |
+| **Function Pod** | Data transformation (compute new values) |
+| **Pipeline** | Orchestrate, persist, and incrementally recompute |
+
+## License
+
+MIT License — see [LICENSE](LICENSE) for details.
diff --git a/docs/CNAME b/docs/CNAME
new file mode 100644
index 0000000..956369b
--- /dev/null
+++ b/docs/CNAME
@@ -0,0 +1 @@
+orcapod.org
diff --git a/docs/CONTRIBUTING_DOCS.md b/docs/CONTRIBUTING_DOCS.md
new file mode 100644
index 0000000..f9a735a
--- /dev/null
+++ b/docs/CONTRIBUTING_DOCS.md
@@ -0,0 +1,295 @@
+# Documentation Site Setup & Maintenance
+
+This guide covers how the orcapod documentation site is built, deployed, and maintained.
+Follow these instructions to replicate the setup from scratch, troubleshoot deployment
+issues, or make changes to the documentation infrastructure.
+
+---
+
+## Overview
+
+| Component | Technology |
+|-----------|-----------|
+| Documentation framework | [MkDocs](https://www.mkdocs.org/) with [Material for MkDocs](https://squidfunk.github.io/mkdocs-material/) |
+| API docs generation | [mkdocstrings](https://mkdocstrings.github.io/) (Python handler) |
+| Hosting | [GitHub Pages](https://pages.github.com/) |
+| Deployment | [GitHub Actions](https://github.com/features/actions) (`.github/workflows/docs.yml`) |
+| Custom domain | `orcapod.org` |
+
+---
+
+## Local Development
+
+### Install dependencies
+
+
+```bash
+uv sync --group docs
+```
+
+### Live preview
+
+
+```bash
+uv run mkdocs serve
+```
+
+Opens a local server at `http://127.0.0.1:8000` with hot-reload on file changes.
+
+### Build static site
+
+
+```bash
+uv run mkdocs build
+```
+
+Outputs static HTML to the `site/` directory (gitignored).
+
+---
+
+## File Structure
+
+```
+orcapod-python/
+├── mkdocs.yml # MkDocs configuration (nav, theme, plugins)
+├── docs/
+│ ├── CNAME # Custom domain file (deployed to GitHub Pages root)
+│ ├── CONTRIBUTING_DOCS.md # This file
+│ ├── index.md # Homepage
+│ ├── getting-started/
+│ │ ├── installation.md
+│ │ ├── quickstart.md
+│ │ └── first-pipeline.md
+│ ├── concepts/
+│ │ ├── architecture.md
+│ │ ├── datagrams.md
+│ │ ├── streams.md
+│ │ ├── identity.md
+│ │ ├── provenance.md
+│ │ └── schema.md
+│ ├── user-guide/
+│ │ ├── sources.md
+│ │ ├── function-pods.md
+│ │ ├── operators.md
+│ │ ├── pipelines.md
+│ │ ├── caching.md
+│ │ └── execution.md
+│ └── api/
+│ ├── index.md
+│ ├── types.md
+│ ├── sources.md
+│ ├── streams.md
+│ ├── datagrams.md
+│ ├── function-pods.md
+│ ├── packet-functions.md
+│ ├── operators.md
+│ ├── nodes.md
+│ ├── pipeline.md
+│ ├── databases.md
+│ ├── errors.md
+│ └── configuration.md
+├── .github/workflows/
+│ └── docs.yml # GitHub Actions deployment workflow
+└── pyproject.toml # docs dependency group defined here
+```
+
+---
+
+## GitHub Pages Setup (from scratch)
+
+### Step 1: Enable GitHub Pages
+
+1. Go to **https://github.com/walkerlab/orcapod-python/settings/pages**
+2. Under **Source**, select **GitHub Actions** (not "Deploy from a branch")
+3. Click **Save**
+
+That's all that's needed on the GitHub side. The workflow in `.github/workflows/docs.yml`
+handles building and deploying.
+
+### Step 2: Verify the workflow
+
+The workflow triggers on:
+
+- **Push to `main`** — automatic deployment on every merge
+- **`workflow_dispatch`** — manual trigger from the Actions tab
+
+To manually trigger:
+
+1. Go to **https://github.com/walkerlab/orcapod-python/actions**
+2. Select the **Deploy docs** workflow
+3. Click **Run workflow** → **Run workflow**
+
+### Step 3: Verify deployment
+
+After the first successful run:
+
+- The site is live at `https://walkerlab.github.io/orcapod-python/`
+- Check the **Environments** section on the repo homepage for the deployment URL
+
+---
+
+## Custom Domain Setup (orcapod.org)
+
+### Step 1: Configure DNS records
+
+At your domain registrar's DNS management panel (e.g., Cloudflare, Namecheap, Route 53),
+add the following records:
+
+#### A records (apex domain — `orcapod.org`)
+
+| Type | Name | Value | TTL |
+|------|------|-------|-----|
+| A | `@` | `185.199.108.153` | 3600 |
+| A | `@` | `185.199.109.153` | 3600 |
+| A | `@` | `185.199.110.153` | 3600 |
+| A | `@` | `185.199.111.153` | 3600 |
+
+These are GitHub Pages' IP addresses. All four are required for redundancy.
+
+#### CNAME record (www subdomain — optional but recommended)
+
+| Type | Name | Value | TTL |
+|------|------|-------|-----|
+| CNAME | `www` | `walkerlab.github.io` | 3600 |
+
+This redirects `www.orcapod.org` to the GitHub Pages site.
+
+### Step 2: Configure GitHub Pages custom domain
+
+1. Go to **https://github.com/walkerlab/orcapod-python/settings/pages**
+2. Under **Custom domain**, enter `orcapod.org`
+3. Click **Save**
+4. GitHub will run a DNS check — this may take a few minutes
+5. Once the DNS check passes, check **Enforce HTTPS**
+
+### Step 3: CNAME file in the repository
+
+The file `docs/CNAME` contains the custom domain (`orcapod.org`). MkDocs copies this file
+to the root of the built site, which tells GitHub Pages to serve the site at the custom
+domain.
+
+**Important:** Do not delete `docs/CNAME`. If this file is missing, GitHub Pages will revert
+to serving at `walkerlab.github.io/orcapod-python/` and the custom domain will stop working
+after the next deployment.
+
+### Step 4: Verify
+
+
+```bash
+# Check DNS propagation (may take up to 24 hours, usually minutes)
+dig orcapod.org +short
+# Should return:
+# 185.199.108.153
+# 185.199.109.153
+# 185.199.110.153
+# 185.199.111.153
+
+# Check HTTPS
+curl -I https://orcapod.org
+# Should return HTTP/2 200
+```
+
+---
+
+## Troubleshooting
+
+### Site not updating after push
+
+1. Check **Actions tab** → look for the latest "Deploy docs" run
+2. If the run failed, click into it to see the error logs
+3. Common issues:
+ - **mkdocstrings import error** — a module referenced in an API doc page doesn't exist
+ or has an import error. Check the build log for the specific module path.
+ - **Missing dependency** — add it to the `docs` group in `pyproject.toml`
+
+### Custom domain shows 404
+
+1. Verify `docs/CNAME` exists and contains `orcapod.org`
+2. Check GitHub Pages settings → Custom domain should show `orcapod.org`
+3. Re-save the custom domain in settings to re-trigger DNS verification
+4. Verify DNS records: `dig orcapod.org +short` should show GitHub's IPs
+
+### Custom domain shows GitHub Pages 404 (not your site)
+
+The CNAME file may have been removed during a deployment. Verify `docs/CNAME` exists in
+the repository and redeploy.
+
+### HTTPS not available
+
+- HTTPS is only available after DNS propagation completes and GitHub verifies ownership
+- Check **Settings > Pages** — if the DNS check shows a warning, wait and try again
+- GitHub provisions TLS certificates via Let's Encrypt, which can take up to 1 hour after
+ DNS verification
+
+### API docs page shows "Module not found"
+
+The mkdocstrings directive references a Python module path. If you see an error like:
+
+```
+ERROR - mkdocstrings: No module named 'orcapod.some.module'
+```
+
+1. Check that the module path in the `.md` file matches the actual Python module path
+2. Verify the module has no import-time errors: `uv run python -c "import orcapod.some.module"`
+3. Check that `src` is listed in `mkdocstrings` handler paths in `mkdocs.yml`
+
+### Build works locally but fails in CI
+
+1. Check Python version — the CI uses whatever `uv` resolves; ensure compatibility
+2. Check for system dependencies — some packages (e.g., `pygraphviz`) need system libraries
+ that may not be available in the CI runner
+
+---
+
+## Making Changes
+
+### Adding a new documentation page
+
+1. Create the `.md` file in the appropriate `docs/` subdirectory
+2. Add it to the `nav` section in `mkdocs.yml`
+3. Preview locally with `uv run mkdocs serve`
+
+### Adding API docs for a new module
+
+Add a mkdocstrings directive in the appropriate `docs/api/` file:
+
+```markdown
+## MyNewClass
+
+::: orcapod.module.path.MyNewClass
+ options:
+ members:
+ - method_a
+ - method_b
+```
+
+The `members` list controls which methods are documented. Omit it to show all public members
+that have docstrings.
+
+### Updating the navigation
+
+Edit the `nav` section in `mkdocs.yml`. The structure maps directly to the site's sidebar
+navigation.
+
+### Changing the theme or plugins
+
+Edit `mkdocs.yml`. See the [Material for MkDocs documentation](https://squidfunk.github.io/mkdocs-material/)
+for available options.
+
+---
+
+## Dependencies
+
+Documentation dependencies are managed in the `docs` dependency group in `pyproject.toml`:
+
+```toml
+[dependency-groups]
+docs = [
+ "mkdocs>=1.6.0",
+ "mkdocs-material>=9.5.0",
+ "mkdocstrings[python]>=0.27.0",
+ "pymdown-extensions>=10.7",
+]
+```
+
+To update: edit the versions in `pyproject.toml` and run `uv sync --group docs`.
diff --git a/docs/api/configuration.md b/docs/api/configuration.md
new file mode 100644
index 0000000..2786483
--- /dev/null
+++ b/docs/api/configuration.md
@@ -0,0 +1,7 @@
+# Configuration
+
+Global configuration for hashing and identity parameters.
+
+## Config
+
+::: orcapod.config.Config
diff --git a/docs/api/databases.md b/docs/api/databases.md
new file mode 100644
index 0000000..823f90e
--- /dev/null
+++ b/docs/api/databases.md
@@ -0,0 +1,15 @@
+# Databases
+
+Database backends for persistent storage of pipeline results.
+
+## InMemoryArrowDatabase
+
+::: orcapod.databases.in_memory_databases.InMemoryArrowDatabase
+
+## DeltaTableDatabase
+
+::: orcapod.databases.delta_lake_databases.DeltaTableDatabase
+
+## NoOpArrowDatabase
+
+::: orcapod.databases.in_memory_databases.NoOpArrowDatabase
diff --git a/docs/api/datagrams.md b/docs/api/datagrams.md
new file mode 100644
index 0000000..2c39eee
--- /dev/null
+++ b/docs/api/datagrams.md
@@ -0,0 +1,50 @@
+# Datagrams
+
+Immutable data containers with lazy dict/Arrow conversion.
+
+## Datagram
+
+::: orcapod.core.datagrams.datagram.Datagram
+ options:
+ members:
+ - keys
+ - schema
+ - as_dict
+ - as_table
+ - content_hash
+ - select
+ - drop
+ - rename
+ - update
+ - with_columns
+ - copy
+ - get_meta_value
+ - datagram_id
+
+## Tag
+
+::: orcapod.core.datagrams.tag_packet.Tag
+ options:
+ members:
+ - keys
+ - schema
+ - as_dict
+ - as_table
+ - system_tags
+ - as_datagram
+ - copy
+
+## Packet
+
+::: orcapod.core.datagrams.tag_packet.Packet
+ options:
+ members:
+ - keys
+ - schema
+ - as_dict
+ - as_table
+ - source_info
+ - with_source_info
+ - rename
+ - as_datagram
+ - copy
diff --git a/docs/api/errors.md b/docs/api/errors.md
new file mode 100644
index 0000000..0905272
--- /dev/null
+++ b/docs/api/errors.md
@@ -0,0 +1,15 @@
+# Errors
+
+Exception classes used throughout orcapod.
+
+## InputValidationError
+
+::: orcapod.errors.InputValidationError
+
+## DuplicateTagError
+
+::: orcapod.errors.DuplicateTagError
+
+## FieldNotResolvableError
+
+::: orcapod.errors.FieldNotResolvableError
diff --git a/docs/api/function-pods.md b/docs/api/function-pods.md
new file mode 100644
index 0000000..5e2b075
--- /dev/null
+++ b/docs/api/function-pods.md
@@ -0,0 +1,31 @@
+# Function Pods
+
+Pods that apply packet functions to stream data.
+
+## FunctionPod
+
+::: orcapod.core.function_pod.FunctionPod
+ options:
+ members:
+ - process
+ - process_packet
+ - validate_inputs
+ - output_schema
+ - packet_function
+
+## FunctionPodStream
+
+::: orcapod.core.function_pod.FunctionPodStream
+ options:
+ members:
+ - iter_packets
+ - as_table
+ - output_schema
+ - keys
+ - clear_cache
+ - content_hash
+ - pipeline_hash
+
+## function_pod (Decorator)
+
+::: orcapod.core.function_pod.function_pod
diff --git a/docs/api/index.md b/docs/api/index.md
new file mode 100644
index 0000000..146d4c6
--- /dev/null
+++ b/docs/api/index.md
@@ -0,0 +1,38 @@
+# API Reference
+
+This section provides auto-generated API documentation from the orcapod source code. Browse
+the reference by module:
+
+## Core Types
+
+- **[Types](types.md)** — `Schema`, `ColumnConfig`, `ContentHash`, `DataType`, `CacheMode`,
+ `ExecutorType`, `NodeConfig`, `PipelineConfig`
+- **[Errors](errors.md)** — `InputValidationError`, `DuplicateTagError`, `FieldNotResolvableError`
+- **[Configuration](configuration.md)** — `Config` dataclass
+
+## Data Containers
+
+- **[Datagrams](datagrams.md)** — `Datagram`, `Tag`, `Packet`
+- **[Streams](streams.md)** — `ArrowTableStream`
+
+## Data Sources
+
+- **[Sources](sources.md)** — `ArrowTableSource`, `DictSource`, `ListSource`,
+ `DataFrameSource`, `DeltaTableSource`, `CSVSource`, `DerivedSource`
+
+## Computation
+
+- **[Packet Functions](packet-functions.md)** — `PythonPacketFunction`,
+ `PacketFunctionBase`, `CachedPacketFunction`
+- **[Function Pods](function-pods.md)** — `FunctionPod`, `FunctionPodStream`,
+ `function_pod` decorator
+- **[Operators](operators.md)** — `Join`, `MergeJoin`, `SemiJoin`, `Batch`,
+ `SelectTagColumns`, `SelectPacketColumns`, `DropTagColumns`, `DropPacketColumns`,
+ `MapTags`, `MapPackets`, `PolarsFilter`
+
+## Execution
+
+- **[Operator & Function Nodes](nodes.md)** — `FunctionNode`, `PersistentFunctionNode`,
+ `OperatorNode`, `PersistentOperatorNode`
+- **[Pipeline](pipeline.md)** — `Pipeline`, `PersistentSourceNode`
+- **[Databases](databases.md)** — `InMemoryArrowDatabase`, `DeltaTableDatabase`
diff --git a/docs/api/nodes.md b/docs/api/nodes.md
new file mode 100644
index 0000000..05a3af9
--- /dev/null
+++ b/docs/api/nodes.md
@@ -0,0 +1,58 @@
+# Operator & Function Nodes
+
+Database-backed execution nodes for persistent computation.
+
+## FunctionNode
+
+::: orcapod.core.function_pod.FunctionNode
+ options:
+ members:
+ - iter_packets
+ - as_table
+ - output_schema
+ - keys
+ - clear_cache
+ - content_hash
+ - pipeline_hash
+ - run
+
+## PersistentFunctionNode
+
+::: orcapod.core.function_pod.PersistentFunctionNode
+ options:
+ members:
+ - iter_packets
+ - as_table
+ - output_schema
+ - keys
+ - run
+ - process_packet
+ - add_pipeline_record
+ - get_all_records
+ - pipeline_path
+ - as_source
+
+## OperatorNode
+
+::: orcapod.core.operator_node.OperatorNode
+ options:
+ members:
+ - iter_packets
+ - as_table
+ - output_schema
+ - keys
+ - run
+ - clear_cache
+ - content_hash
+ - pipeline_hash
+
+## PersistentOperatorNode
+
+::: orcapod.core.operator_node.PersistentOperatorNode
+ options:
+ members:
+ - run
+ - get_all_records
+ - as_source
+ - cache_mode
+ - pipeline_path
diff --git a/docs/api/operators.md b/docs/api/operators.md
new file mode 100644
index 0000000..c2cbbfb
--- /dev/null
+++ b/docs/api/operators.md
@@ -0,0 +1,87 @@
+# Operators
+
+Structural transformers that reshape streams without synthesizing new values.
+
+## Join
+
+::: orcapod.core.operators.join.Join
+
+## MergeJoin
+
+::: orcapod.core.operators.merge_join.MergeJoin
+
+## SemiJoin
+
+::: orcapod.core.operators.semijoin.SemiJoin
+
+## Batch
+
+::: orcapod.core.operators.batch.Batch
+
+## SelectTagColumns
+
+::: orcapod.core.operators.column_selection.SelectTagColumns
+
+## SelectPacketColumns
+
+::: orcapod.core.operators.column_selection.SelectPacketColumns
+
+## DropTagColumns
+
+::: orcapod.core.operators.column_selection.DropTagColumns
+
+## DropPacketColumns
+
+::: orcapod.core.operators.column_selection.DropPacketColumns
+
+## MapTags
+
+::: orcapod.core.operators.mappers.MapTags
+
+## MapPackets
+
+::: orcapod.core.operators.mappers.MapPackets
+
+## PolarsFilter
+
+::: orcapod.core.operators.filters.PolarsFilter
+
+## Base Classes
+
+### UnaryOperator
+
+::: orcapod.core.operators.base.UnaryOperator
+ options:
+ members:
+ - validate_unary_input
+ - unary_static_process
+ - unary_output_schema
+
+### BinaryOperator
+
+::: orcapod.core.operators.base.BinaryOperator
+ options:
+ members:
+ - validate_binary_inputs
+ - binary_static_process
+ - binary_output_schema
+ - is_commutative
+
+### NonZeroInputOperator
+
+::: orcapod.core.operators.base.NonZeroInputOperator
+ options:
+ members:
+ - validate_nonzero_inputs
+
+### StaticOutputPod
+
+::: orcapod.core.static_output_pod.StaticOutputPod
+ options:
+ members:
+ - process
+ - validate_inputs
+ - argument_symmetry
+ - output_schema
+ - static_process
+ - async_execute
diff --git a/docs/api/packet-functions.md b/docs/api/packet-functions.md
new file mode 100644
index 0000000..cabec6a
--- /dev/null
+++ b/docs/api/packet-functions.md
@@ -0,0 +1,45 @@
+# Packet Functions
+
+Stateless computations that transform individual packets.
+
+## PythonPacketFunction
+
+::: orcapod.core.packet_function.PythonPacketFunction
+ options:
+ members:
+ - direct_call
+ - direct_async_call
+ - call
+ - async_call
+ - input_packet_schema
+ - output_packet_schema
+ - canonical_function_name
+ - is_active
+ - set_active
+ - identity_structure
+ - pipeline_identity_structure
+
+## PacketFunctionBase (Abstract Base)
+
+::: orcapod.core.packet_function.PacketFunctionBase
+ options:
+ members:
+ - call
+ - async_call
+ - direct_call
+ - direct_async_call
+ - executor
+ - major_version
+ - packet_function_type_id
+ - canonical_function_name
+ - output_packet_schema_hash
+ - uri
+
+## CachedPacketFunction
+
+::: orcapod.core.packet_function.CachedPacketFunction
+ options:
+ members:
+ - get_cached_output_for_packet
+ - record_packet
+ - get_all_cached_outputs
diff --git a/docs/api/pipeline.md b/docs/api/pipeline.md
new file mode 100644
index 0000000..af5d4f2
--- /dev/null
+++ b/docs/api/pipeline.md
@@ -0,0 +1,21 @@
+# Pipeline
+
+Pipeline orchestration and compilation.
+
+## Pipeline
+
+::: orcapod.pipeline.graph.Pipeline
+ options:
+ members:
+ - compile
+ - run
+ - compiled_nodes
+
+## PersistentSourceNode
+
+::: orcapod.pipeline.nodes.PersistentSourceNode
+ options:
+ members:
+ - run
+ - get_all_records
+ - cache_path
diff --git a/docs/api/sources.md b/docs/api/sources.md
new file mode 100644
index 0000000..64b47b6
--- /dev/null
+++ b/docs/api/sources.md
@@ -0,0 +1,60 @@
+# Sources
+
+Data source implementations for loading external data into orcapod streams.
+
+## ArrowTableSource
+
+::: orcapod.core.sources.arrow_table_source.ArrowTableSource
+ options:
+ members:
+ - resolve_field
+ - output_schema
+ - keys
+ - as_table
+ - as_stream
+ - iter_packets
+ - content_hash
+ - pipeline_hash
+ - source_id
+ - pipeline_identity_structure
+
+## DictSource
+
+::: orcapod.core.sources.dict_source.DictSource
+
+## ListSource
+
+::: orcapod.core.sources.list_source.ListSource
+
+## DataFrameSource
+
+::: orcapod.core.sources.data_frame_source.DataFrameSource
+
+## DeltaTableSource
+
+::: orcapod.core.sources.delta_table_source.DeltaTableSource
+
+## CSVSource
+
+::: orcapod.core.sources.csv_source.CSVSource
+
+## DerivedSource
+
+::: orcapod.core.sources.derived_source.DerivedSource
+
+## RootSource (Abstract Base)
+
+::: orcapod.core.sources.base.RootSource
+ options:
+ members:
+ - resolve_field
+ - pipeline_identity_structure
+ - source_id
+
+## PersistentSource
+
+::: orcapod.core.sources.persistent_source.PersistentSource
+
+## SourceRegistry
+
+::: orcapod.core.sources.source_registry.SourceRegistry
diff --git a/docs/api/streams.md b/docs/api/streams.md
new file mode 100644
index 0000000..3a40338
--- /dev/null
+++ b/docs/api/streams.md
@@ -0,0 +1,32 @@
+# Streams
+
+Immutable stream implementations for carrying (Tag, Packet) pairs.
+
+## ArrowTableStream
+
+::: orcapod.core.streams.arrow_table_stream.ArrowTableStream
+ options:
+ members:
+ - output_schema
+ - keys
+ - iter_packets
+ - as_table
+ - content_hash
+ - pipeline_hash
+ - clear_cache
+
+## StreamBase (Abstract Base)
+
+::: orcapod.core.streams.base.StreamBase
+ options:
+ members:
+ - output_schema
+ - keys
+ - iter_packets
+ - as_table
+ - content_hash
+ - pipeline_hash
+ - producer
+ - upstreams
+ - last_modified
+ - is_stale
diff --git a/docs/api/types.md b/docs/api/types.md
new file mode 100644
index 0000000..aba39ba
--- /dev/null
+++ b/docs/api/types.md
@@ -0,0 +1,67 @@
+# Types
+
+Core type definitions used throughout orcapod.
+
+## Schema
+
+::: orcapod.types.Schema
+ options:
+ members:
+ - merge
+ - with_values
+ - select
+ - drop
+ - is_compatible_with
+ - empty
+ - optional_fields
+ - required_fields
+
+## ColumnConfig
+
+::: orcapod.types.ColumnConfig
+ options:
+ members:
+ - all
+ - data_only
+ - handle_config
+
+## ContentHash
+
+::: orcapod.types.ContentHash
+ options:
+ members:
+ - to_hex
+ - to_int
+ - to_uuid
+ - to_base64
+ - to_string
+ - from_string
+ - display_name
+
+## CacheMode
+
+::: orcapod.types.CacheMode
+
+## ExecutorType
+
+::: orcapod.types.ExecutorType
+
+## NodeConfig
+
+::: orcapod.types.NodeConfig
+
+## PipelineConfig
+
+::: orcapod.types.PipelineConfig
+
+## Type Aliases
+
+The following type aliases are used throughout the API:
+
+| Alias | Definition | Description |
+|-------|-----------|-------------|
+| `DataType` | `type \| UnionType` | A Python type or union of types |
+| `TagValue` | `int \| str \| None \| Collection` | Valid tag column values |
+| `DataValue` | Scalar, path, or nested collections | Valid packet column values |
+| `PacketLike` | `dict[str, DataValue]` | Dict mapping field names to values |
+| `SchemaLike` | `dict[str, DataType]` | Dict mapping field names to types |
diff --git a/docs/concepts/architecture.md b/docs/concepts/architecture.md
new file mode 100644
index 0000000..9aea240
--- /dev/null
+++ b/docs/concepts/architecture.md
@@ -0,0 +1,121 @@
+# Architecture Overview
+
+orcapod is built around a small number of composable abstractions that enforce a strict
+separation between data transformation and structural manipulation. This page provides a
+high-level map of how the pieces fit together.
+
+## Core Data Flow
+
+```mermaid
+graph LR
+ S1[Source A] --> ST1[Stream]
+ S2[Source B] --> ST2[Stream]
+ ST1 --> OP[Operator
Join / Filter / ...]
+ ST2 --> OP
+ OP --> ST3[Stream]
+ ST3 --> FP[Function Pod]
+ FP --> ST4[Stream]
+ ST4 --> NEXT[...]
+```
+
+Every pipeline follows this pattern:
+
+1. **Sources** load external data and annotate it with provenance metadata.
+2. **Streams** carry data as immutable (Tag, Packet) pairs.
+3. **Operators** reshape streams (join, filter, select, rename, batch) without creating new values.
+4. **Function Pods** apply computations to individual packets, producing new values with tracked provenance.
+
+## The Five Core Abstractions
+
+### Datagram
+
+The universal immutable data container. Holds named columns with explicit types and supports
+lazy conversion between Python dict and Apache Arrow representations. Comes in two forms:
+
+- **Tag** — metadata columns for routing, filtering, and joining. Carries hidden system tags
+ for provenance.
+- **Packet** — data payload columns. Carries source-info provenance tokens per column.
+
+### Stream
+
+An immutable sequence of (Tag, Packet) pairs over a shared schema. The fundamental data-flow
+abstraction — every source emits one, every operator consumes and produces them.
+
+### Source
+
+Produces a stream from external data with no upstream dependencies. Establishes provenance
+by annotating each row with source identity and record identity.
+
+### Function Pod
+
+Wraps a stateless **packet function** that transforms individual packets. Never inspects tags.
+Used when the computation synthesizes new values.
+
+### Operator
+
+A structural transformer that reshapes streams without synthesizing new packet values. Every
+output value is traceable to a concrete input value. Used for joins, filters, projections,
+renames, and batching.
+
+## The Operator / Function Pod Boundary
+
+This is orcapod's most important architectural constraint:
+
+| | Operator | Function Pod |
+|---|---|---|
+| Inspects packet content | Never | Yes |
+| Inspects / uses tags | Yes | No |
+| Can rename columns | Yes | No |
+| Synthesizes new values | No | Yes |
+| Stream arity | Configurable | Single in, single out |
+| Cached by content hash | No | Yes |
+
+This strict separation keeps provenance clean. Operators are provenance-transparent (no new
+values, no provenance footprint). Function pods are provenance-tracked (new values always
+carry source-info pointing back to the function).
+
+## Two Parallel Identity Chains
+
+Every pipeline element maintains two hashes:
+
+1. **`content_hash()`** — data-inclusive. Changes when data changes. Used for deduplication
+ and memoization.
+2. **`pipeline_hash()`** — schema and topology only. Ignores data content. Used for database
+ path scoping so different sources with identical schemas share tables.
+
+See [Identity & Hashing](identity.md) for the full specification.
+
+## Execution Models
+
+orcapod supports multiple execution strategies that produce identical results:
+
+| Model | Mechanism | Use Case |
+|-------|-----------|----------|
+| Lazy in-memory | `FunctionPod` → `FunctionPodStream` | Exploration, one-off computations |
+| Static with recomputation | `StaticOutputPod` → `DynamicPodStream` | Operator output with staleness detection |
+| DB-backed incremental | `FunctionNode` / `OperatorNode` | Production pipelines with caching |
+| Async push-based | `async_execute()` with channels | Pipeline-level parallelism |
+
+See [Execution Models](../user-guide/execution.md) for details.
+
+## Pipeline Compilation
+
+The `Pipeline` class automatically captures computation graphs and upgrades all nodes
+to their persistent variants:
+
+```mermaid
+graph TD
+ subgraph "Recording Phase"
+ A[Source] --> B[Join]
+ B --> C[FunctionPod]
+ end
+ subgraph "After Compilation"
+ D[PersistentSourceNode] --> E[PersistentOperatorNode]
+ E --> F[PersistentFunctionNode]
+ end
+ A -.->|compile| D
+ B -.->|compile| E
+ C -.->|compile| F
+```
+
+See [Pipelines](../user-guide/pipelines.md) for the full pipeline lifecycle.
diff --git a/docs/concepts/datagrams.md b/docs/concepts/datagrams.md
new file mode 100644
index 0000000..8491b6c
--- /dev/null
+++ b/docs/concepts/datagrams.md
@@ -0,0 +1,129 @@
+# Datagrams, Tags & Packets
+
+Datagrams are orcapod's universal immutable data containers. They hold named columns with
+explicit type information and support lazy conversion between Python dict and Apache Arrow
+representations.
+
+## Datagram
+
+A `Datagram` is the base container. It can be constructed from either a Python dict or an
+Arrow table/record batch:
+
+```python
+from orcapod.core.datagrams import Datagram
+
+# From a dict
+dg = Datagram({"name": "Alice", "age": 30})
+
+# Access as dict (always available)
+print(dg.as_dict()) # {'name': 'Alice', 'age': 30}
+
+# Access as Arrow table (lazily computed and cached)
+table = dg.as_table()
+
+# Schema introspection
+print(dg.schema()) # Schema({'name': str, 'age': int})
+print(dg.keys()) # ('name', 'age')
+```
+
+### Lazy Conversion
+
+Datagrams convert between dict and Arrow representations lazily:
+
+- If created from a dict, the Arrow table is computed on first `.as_table()` call and cached.
+- If created from an Arrow table, the dict is computed on first `.as_dict()` call and cached.
+- Content hashing always uses the Arrow representation for determinism.
+- Value access always uses the Python dict for convenience.
+
+### Immutability
+
+Datagrams are immutable. Operations like `select()`, `drop()`, `rename()`, and `update()`
+return new datagrams:
+
+```python
+from orcapod.core.datagrams import Datagram
+
+dg = Datagram({"a": 1, "b": 2, "c": 3})
+
+selected = dg.select("a", "b") # Datagram({'a': 1, 'b': 2})
+dropped = dg.drop("c") # Datagram({'a': 1, 'b': 2})
+renamed = dg.rename({"a": "alpha"}) # Datagram({'alpha': 1, 'b': 2, 'c': 3})
+```
+
+## Tag
+
+A **Tag** is a datagram specialization for metadata columns. Tags are used for routing,
+filtering, joining, and annotation. They carry additional **system tags** — framework-managed
+hidden provenance columns.
+
+
+```python
+from orcapod.core.datagrams import Tag
+
+tag = Tag({"patient_id": "p1", "visit": "v1"})
+
+# Regular keys (user-visible)
+print(tag.keys()) # ('patient_id', 'visit')
+
+# System tags (hidden provenance columns)
+print(tag.system_tags()) # {...}
+```
+
+### Key Properties of Tags
+
+- **Non-authoritative** — never used for cache lookup or pod identity computation.
+- **Auto-propagated** — tags flow forward through the pipeline automatically.
+- **Join keys** — operators join streams by matching tag columns.
+
+### Tag Merging in Joins
+
+When streams are joined:
+
+- **Shared tag keys** act as the join predicate — values must match.
+- **Non-shared tag keys** propagate freely into the joined output.
+
+## Packet
+
+A **Packet** is a datagram specialization for data payload columns. Packets carry additional
+**source info** — per-column provenance tokens tracing each value back to its originating
+source and record.
+
+
+```python
+from orcapod.core.datagrams import Packet
+
+packet = Packet({"age": 30, "cholesterol": 180})
+
+# Source info (provenance pointers)
+print(packet.source_info())
+# {'age': 'source_abc::row_0::age', 'cholesterol': 'source_abc::row_0::cholesterol'}
+```
+
+### Source Info Format
+
+Each packet column carries a source info string:
+
+```
+{source_id}::{record_id}::{column_name}
+```
+
+- `source_id` — canonical identifier of the originating source
+- `record_id` — row identifier (positional like `row_0` or column-based like `user_id=abc123`)
+- `column_name` — the original column name
+
+Source info is **immutable through the pipeline** — set once when a source creates the data
+and preserved through all downstream transformations.
+
+## Column Naming Conventions
+
+orcapod uses column name prefixes to distinguish metadata from user data:
+
+| Prefix | Meaning | Example |
+|--------|---------|---------|
+| `__` | System metadata | `__packet_id`, `__pod_version` |
+| `_source_` | Source info provenance | `_source_age` |
+| `_tag::` | System tag | `_tag::source_id::abc123` |
+| `_context_key` | Data context | `_context_key` |
+
+These prefixes are controlled by `ColumnConfig` and excluded from standard output by default.
+See [Schema & Column Configuration](schema.md) for details.
diff --git a/docs/concepts/identity.md b/docs/concepts/identity.md
new file mode 100644
index 0000000..55d360e
--- /dev/null
+++ b/docs/concepts/identity.md
@@ -0,0 +1,120 @@
+# Identity & Hashing
+
+orcapod maintains two parallel identity chains implemented as recursive Merkle-like hash
+trees. These hashes are central to caching, deduplication, and database scoping.
+
+## Two Identity Chains
+
+### Content Hash (`content_hash()`)
+
+Data-inclusive identity capturing the precise semantic content of an object. Changes when
+data changes. Used for deduplication and memoization.
+
+| Component | What Gets Hashed |
+|-----------|-----------------|
+| RootSource | Class name + tag columns + table content hash |
+| PacketFunction | URI (canonical name + output schema hash + version + type ID) |
+| FunctionPodStream | Function pod + argument symmetry of inputs |
+| Operator | Operator class + identity structure |
+| ArrowTableStream | Producer + upstreams (or table content if no producer) |
+| Datagram | Arrow table content |
+| DerivedSource | Origin node's content hash |
+
+### Pipeline Hash (`pipeline_hash()`)
+
+Schema-and-topology-only identity. Excludes data content so that different sources with
+identical schemas share database tables. Used for database path scoping.
+
+| Component | What Gets Hashed |
+|-----------|-----------------|
+| RootSource | `(tag_schema, packet_schema)` — base case |
+| PacketFunction | Raw packet function object (via content hash) |
+| FunctionPodStream | Function pod + input stream pipeline hashes |
+| Operator | Operator class + argument symmetry (pipeline hashes of inputs) |
+| ArrowTableStream | Producer + upstreams pipeline hashes (or schema if no producer) |
+| DerivedSource | Inherited from RootSource: `(tag_schema, packet_schema)` |
+
+### Why Two Hashes?
+
+Consider a medical pipeline that processes patient data from different clinics. Both clinics
+produce tables with schema `{patient_id: str, age: int, cholesterol: int}` but different
+data.
+
+- **Content hash** differs because the data differs — each clinic's results are cached
+ separately.
+- **Pipeline hash** is identical because the schema and topology match — both clinics' data
+ can share the same database table, distinguished by system tags.
+
+## The ContentHash Type
+
+All hashes are represented as `ContentHash` — a frozen dataclass pairing a method identifier
+with raw digest bytes:
+
+
+```python
+from orcapod.types import ContentHash
+
+hash_val = source.content_hash()
+
+# Various representations
+print(hash_val.to_hex()) # Hexadecimal string
+print(hash_val.to_int()) # Integer
+print(hash_val.to_uuid()) # UUID
+print(hash_val.to_base64()) # Base64 string
+print(hash_val.to_string()) # "{method}:{hex_digest}"
+```
+
+The method name (e.g., `"object_v0.1"`, `"arrow_v2.1"`) enables detecting version mismatches
+across hash configurations.
+
+## Semantic Hashing
+
+Content hashes use a `BaseSemanticHasher` that:
+
+1. Recursively expands structures (dicts, lists, tuples).
+2. Dispatches to type-specific handlers for each leaf value.
+3. Terminates at `ContentHash` leaves (preventing hash-of-hash inflation).
+
+This ensures that structurally identical objects produce identical hashes regardless of how
+they were constructed.
+
+## The Resolver Pattern
+
+Pipeline hash uses a **resolver pattern** — a callback that routes objects to the correct
+hash method:
+
+- `PipelineElementProtocol` objects → `pipeline_hash()`
+- Other `ContentIdentifiable` objects → `content_hash()`
+
+This ensures the correct identity chain is used for nested objects within a single hash
+computation.
+
+## Argument Symmetry
+
+Each pod declares how upstream hashes are combined:
+
+- **Commutative** (`frozenset`) — upstream hashes sorted before combining. Used when input
+ order is semantically irrelevant (Join, MergeJoin).
+- **Non-commutative** (`tuple`) — upstream hashes combined in declared order. Used when
+ input position is significant (SemiJoin).
+- **Partial symmetry** — nesting expresses mixed constraints.
+
+
+```python
+# Commutative: Join(A, B) == Join(B, A)
+join.argument_symmetry(streams) # returns frozenset
+
+# Non-commutative: SemiJoin(A, B) != SemiJoin(B, A)
+semi_join.argument_symmetry(streams) # returns tuple
+```
+
+## Packet Function URI
+
+Every packet function has a unique signature:
+
+```
+(canonical_function_name, output_schema_hash, major_version, packet_function_type_id)
+```
+
+For Python functions, the identity structure additionally includes the function's bytecode
+hash, input parameter signature, and Git version information.
diff --git a/docs/concepts/provenance.md b/docs/concepts/provenance.md
new file mode 100644
index 0000000..9727df7
--- /dev/null
+++ b/docs/concepts/provenance.md
@@ -0,0 +1,136 @@
+# System Tags & Provenance
+
+orcapod provides two complementary provenance mechanisms: **source info** for tracking
+value-level lineage, and **system tags** for tracking row-level lineage through structural
+operations.
+
+## Source Info
+
+Every packet column carries a **source info** string — a provenance pointer to the source
+and record that produced the value:
+
+```
+{source_id}::{record_id}::{column_name}
+```
+
+For example:
+
+```
+customers_2024::row_42::age
+```
+
+Source info is:
+
+- **Set once** — when a source creates the data.
+- **Immutable** — preserved through all downstream operations, including column renames.
+- **Column-level** — each column independently tracks its origin.
+
+## System Tags
+
+System tags are **framework-managed, hidden provenance columns** automatically attached to
+every row. They maintain perfect traceability from any result back to its original source
+rows.
+
+### How System Tags Are Created
+
+Each source automatically adds a pair of system tag columns:
+
+```
+_tag::source_id::{schema_hash} → the source's canonical source_id
+_tag::record_id::{schema_hash} → the row identifier within that source
+```
+
+For example, a source with schema hash `schema1`:
+
+```
+_tag::source_id::schema1 = "customers_2024"
+_tag::record_id::schema1 = "row_42"
+```
+
+### Three Evolution Rules
+
+System tags evolve differently depending on the operation:
+
+#### 1. Name-Preserving (~90% of operations)
+
+Single-stream operations: filter, select, rename, batch, map.
+
+System tag column names and values pass through **unchanged**. The operation doesn't affect
+provenance tracking.
+
+#### 2. Name-Extending (multi-input operations)
+
+Joins and merges. Each input's system tag column name is extended with the node's pipeline
+hash and canonical position:
+
+```
+Before join:
+ Stream A: _tag::source_id::schema1
+ Stream B: _tag::source_id::schema1
+
+After join (pipeline_hash=abc123):
+ _tag::source_id::schema1::abc123:0 (from Stream A)
+ _tag::source_id::schema1::abc123:1 (from Stream B)
+```
+
+For commutative operations, inputs are sorted by `pipeline_hash` to ensure identical column
+names regardless of wiring order.
+
+#### 3. Type-Evolving (aggregation operations)
+
+Batch and grouping operations. Column names are unchanged but types evolve:
+
+```
+Before batch: _tag::source_id::schema1 (type: str)
+After batch: _tag::source_id::schema1 (type: list[str])
+```
+
+## The Provenance Graph
+
+orcapod's provenance graph is a **bipartite graph of sources and function pods**:
+
+```mermaid
+graph LR
+ S1[Source A] -->|source_info| FP1[FunctionPod 1]
+ S2[Source B] -->|source_info| FP1
+ FP1 -->|source_info| FP2[FunctionPod 2]
+```
+
+Operators do not appear in the provenance graph because they never synthesize new values.
+This means:
+
+- **Operators can be refactored** without invalidating data provenance.
+- **Provenance queries are simpler** — trace source info pointers between function pod table
+ entries.
+- **Provenance is robust** — lineage is told by what generated the data, not how it was routed.
+
+## Inspecting Provenance
+
+Use `ColumnConfig` to include provenance columns in output:
+
+
+```python
+from orcapod.types import ColumnConfig
+
+# Source info columns (value-level provenance)
+table = stream.as_table(columns=ColumnConfig(source=True))
+
+# System tag columns (row-level provenance)
+table = stream.as_table(columns=ColumnConfig(system_tags=True))
+
+# Everything
+table = stream.as_table(columns=ColumnConfig.all())
+```
+
+## Schema Prediction
+
+Operators predict output system tag column names at schema time — without performing the
+actual computation — by computing `pipeline_hash` values and canonical positions. This is
+exposed via:
+
+
+```python
+tag_schema, packet_schema = operator_stream.output_schema(
+ columns=ColumnConfig(system_tags=True)
+)
+```
diff --git a/docs/concepts/schema.md b/docs/concepts/schema.md
new file mode 100644
index 0000000..0b80808
--- /dev/null
+++ b/docs/concepts/schema.md
@@ -0,0 +1,116 @@
+# Schema & Column Configuration
+
+Every stream in orcapod is self-describing. Schemas are embedded explicitly at every level
+rather than resolved against a central registry.
+
+## Schema
+
+A `Schema` is an immutable mapping from field names to Python types, with support for
+optional fields:
+
+```python
+from orcapod.types import Schema
+
+schema = Schema({"name": str, "age": int, "email": str}, optional_fields={"email"})
+
+print(schema) # Schema({'name': str, 'age': int, 'email': str})
+print(schema.optional_fields) # frozenset({'email'})
+print(schema.required_fields) # frozenset({'name', 'age'})
+```
+
+### Schema Operations
+
+```python
+from orcapod.types import Schema
+
+a = Schema({"x": int, "y": str})
+b = Schema({"y": str, "z": float})
+
+# Merge (union) — raises on type conflicts
+merged = a.merge(b) # Schema({'x': int, 'y': str, 'z': float})
+
+# Select specific fields
+selected = a.select("x") # Schema({'x': int})
+
+# Drop specific fields
+dropped = a.drop("y") # Schema({'x': int})
+
+# Compatibility check
+a.is_compatible_with(b) # True if shared keys have compatible types
+```
+
+### Output Schema
+
+Every stream and pod exposes `output_schema()` returning a tuple:
+
+
+```python
+tag_schema, packet_schema = stream.output_schema()
+```
+
+- `tag_schema` — the schema of tag (metadata) columns.
+- `packet_schema` — the schema of packet (data) columns.
+
+## ColumnConfig
+
+`ColumnConfig` controls which column groups are included in schema and data output. By
+default, metadata columns are excluded for clean output.
+
+```python
+from orcapod.types import ColumnConfig
+
+# Default: only user data columns
+config = ColumnConfig()
+
+# Include specific metadata
+config = ColumnConfig(source=True) # Include source-info columns
+config = ColumnConfig(system_tags=True) # Include system tag columns
+config = ColumnConfig(meta=True) # Include system metadata (__packet_id, etc.)
+
+# Include everything
+config = ColumnConfig.all()
+
+# Explicitly data-only
+config = ColumnConfig.data_only()
+```
+
+### ColumnConfig Fields
+
+| Field | Default | Controls |
+|-------|---------|----------|
+| `meta` | `False` | System metadata columns (`__` prefix) |
+| `context` | `False` | Data context column (`_context_key`) |
+| `source` | `False` | Source-info provenance columns (`_source_` prefix) |
+| `system_tags` | `False` | System tag columns (`_tag::` prefix) |
+| `content_hash` | `False` | Per-row content hash column |
+| `sort_by_tags` | `False` | Whether to sort output by tag columns |
+
+### Using ColumnConfig
+
+Pass `ColumnConfig` to `output_schema()` and `as_table()`:
+
+
+```python
+# Schema with source info columns
+tag_schema, packet_schema = stream.output_schema(
+ columns=ColumnConfig(source=True)
+)
+
+# Materialized table with everything
+table = stream.as_table(columns=ColumnConfig.all())
+```
+
+## Column Naming Conventions
+
+orcapod uses prefixes to distinguish column types:
+
+| Prefix | Category | Example | ColumnConfig Field |
+|--------|----------|---------|--------------------|
+| `__` | System metadata | `__packet_id` | `meta` |
+| `_source_` | Source info | `_source_age` | `source` |
+| `_tag::` | System tag | `_tag::source_id::abc` | `system_tags` |
+| `_context_key` | Data context | `_context_key` | `context` |
+| *(no prefix)* | User data | `age`, `name` | Always included |
+
+These prefixes are defined in `SystemConstant` (`system_constants.py`) and computed from
+a shared `constants` singleton.
diff --git a/docs/concepts/streams.md b/docs/concepts/streams.md
new file mode 100644
index 0000000..478c916
--- /dev/null
+++ b/docs/concepts/streams.md
@@ -0,0 +1,87 @@
+# Streams
+
+Streams are orcapod's fundamental data-flow abstraction. A stream is an immutable sequence
+of (Tag, Packet) pairs over a shared schema. Every source emits a stream, every operator
+consumes and produces streams, and every function pod iterates over them.
+
+## ArrowTableStream
+
+The concrete stream implementation is `ArrowTableStream`, backed by an immutable PyArrow
+Table with explicit tag/packet column assignment.
+
+```python
+import pyarrow as pa
+from orcapod.core.streams import ArrowTableStream
+
+table = pa.table({
+ "id": pa.array(["a", "b", "c"], type=pa.large_string()),
+ "value": pa.array([1, 2, 3], type=pa.int64()),
+})
+
+stream = ArrowTableStream(table, tag_columns=["id"])
+```
+
+## Schema Introspection
+
+Every stream exposes its schema as a tuple of `(tag_schema, packet_schema)`:
+
+
+```python
+tag_schema, packet_schema = stream.output_schema()
+print(tag_schema) # Schema({'id': str})
+print(packet_schema) # Schema({'value': int})
+```
+
+## Iterating
+
+Streams provide lazy iteration over (Tag, Packet) pairs:
+
+
+```python
+for tag, packet in stream.iter_packets():
+ print(f"Tag: {tag.as_dict()}, Packet: {packet.as_dict()}")
+```
+
+## Materialization
+
+Materialize a stream as a PyArrow table:
+
+
+```python
+table = stream.as_table()
+print(table.to_pandas())
+```
+
+Use `ColumnConfig` to control which metadata columns are included:
+
+
+```python
+from orcapod.types import ColumnConfig
+
+# Data columns only (default)
+table = stream.as_table()
+
+# Include source-info provenance columns
+table = stream.as_table(columns=ColumnConfig(source=True))
+
+# Include everything
+table = stream.as_table(columns=ColumnConfig.all())
+```
+
+## Key Methods
+
+| Method | Description |
+|--------|-------------|
+| `output_schema()` | Returns `(tag_schema, packet_schema)` |
+| `keys()` | Returns the tag column names |
+| `iter_packets()` | Lazy iteration over `(Tag, Packet)` pairs |
+| `as_table()` | Materialize as a PyArrow table |
+| `content_hash()` | Data-inclusive identity hash |
+| `pipeline_hash()` | Schema-and-topology-only identity hash |
+
+## Stream Properties
+
+- **Immutable** — once created, a stream's data never changes.
+- **Lazy** — iteration and materialization are deferred until requested.
+- **Self-describing** — streams carry their schema explicitly, not by reference to a registry.
+- **At least one packet column** — a stream with only tag columns raises `ValueError`.
diff --git a/docs/conftest.py b/docs/conftest.py
new file mode 100644
index 0000000..1cfad13
--- /dev/null
+++ b/docs/conftest.py
@@ -0,0 +1,80 @@
+"""Conftest for pytest-codeblocks doc tests.
+
+Patches the pytest-codeblocks exec namespace to include ``__name__`` so that
+functions defined inside code blocks get a proper ``__module__`` attribute.
+Without this, orcapod's ``get_function_signature`` fails because
+``function.__module__`` is ``None``.
+"""
+
+from __future__ import annotations
+
+try:
+ from pytest_codeblocks import plugin as _codeblocks_plugin
+
+ _original_runtest = _codeblocks_plugin.TestBlock.runtest
+
+ def _patched_runtest(self):
+ """Run code blocks with __name__ set in the exec globals."""
+ import contextlib
+ import io
+ import re as re_mod
+ import subprocess
+
+ assert self.obj is not None
+ output = None
+
+ if self.obj.importorskip is not None:
+ import pytest
+
+ try:
+ __import__(self.obj.importorskip)
+ except (ImportError, ModuleNotFoundError):
+ pytest.skip()
+
+ if self.obj.syntax == "python":
+ with contextlib.redirect_stdout(io.StringIO()) as s:
+ try:
+ exec(
+ self.obj.code,
+ {"__MODULE__": "__main__", "__name__": "__main__"},
+ )
+ except Exception as e:
+ raise RuntimeError(
+ f"{self.name}, line {self.obj.lineno}:\n```\n"
+ + self.obj.code
+ + "```\n\n"
+ + f"{e}"
+ )
+ output = s.getvalue()
+ else:
+ assert self.obj.syntax in ["sh", "bash"]
+ executable = {
+ "sh": None,
+ "bash": "/bin/bash",
+ "zsh": "/bin/zsh",
+ }[self.obj.syntax]
+ ret = subprocess.run(
+ self.obj.code,
+ shell=True,
+ check=True,
+ stdout=subprocess.PIPE,
+ executable=executable,
+ )
+ output = ret.stdout.decode()
+
+ if output is not None and self.obj.expected_output is not None:
+ str0 = self.obj.expected_output
+ str1 = output
+ if getattr(self.obj, "expected_output_ignore_whitespace", False):
+ str0 = re_mod.sub(r"\s+", "", str0)
+ str1 = re_mod.sub(r"\s+", "", str1)
+ if str0 != str1:
+ raise RuntimeError(
+ f"{self.name}, line {self.obj.lineno}:\n```\n"
+ + f"Expected output\n```\n{self.obj.expected_output}```\n"
+ + f"but got\n```\n{output}```"
+ )
+
+ _codeblocks_plugin.TestBlock.runtest = _patched_runtest
+except ImportError:
+ pass
diff --git a/docs/getting-started/first-pipeline.md b/docs/getting-started/first-pipeline.md
new file mode 100644
index 0000000..b9d70d8
--- /dev/null
+++ b/docs/getting-started/first-pipeline.md
@@ -0,0 +1,190 @@
+# Building Your First Pipeline
+
+This guide walks through building a persistent, incremental pipeline using orcapod's
+`Pipeline` class. You'll learn how to:
+
+- Define a multi-step computation graph
+- Persist results to a database
+- Re-run with incremental computation
+
+## The Pipeline Class
+
+A `Pipeline` wraps your computation graph with automatic persistence. Inside a `with pipeline:`
+block, all source, operator, and function pod invocations are tracked and automatically
+upgraded to their persistent variants when the context exits.
+
+```python
+import pyarrow as pa
+from orcapod import ArrowTableSource, FunctionPod
+from orcapod.core.packet_function import PythonPacketFunction
+from orcapod.databases import InMemoryArrowDatabase
+from orcapod.pipeline import Pipeline
+
+# Define sources
+patients = ArrowTableSource(
+ pa.table({
+ "patient_id": pa.array(["p1", "p2", "p3"], type=pa.large_string()),
+ "age": pa.array([30, 45, 60], type=pa.int64()),
+ }),
+ tag_columns=["patient_id"],
+)
+
+labs = ArrowTableSource(
+ pa.table({
+ "patient_id": pa.array(["p1", "p2", "p3"], type=pa.large_string()),
+ "cholesterol": pa.array([180, 220, 260], type=pa.int64()),
+ }),
+ tag_columns=["patient_id"],
+)
+
+# Define computation
+def risk_score(age: int, cholesterol: int) -> float:
+ return age * 0.5 + cholesterol * 0.3
+
+risk_fn = PythonPacketFunction(risk_score, output_keys="risk")
+risk_pod = FunctionPod(packet_function=risk_fn)
+
+# Build the pipeline
+db = InMemoryArrowDatabase()
+pipeline = Pipeline(name="risk_pipeline", pipeline_database=db)
+
+with pipeline:
+ joined = patients.join(labs, label="join_data")
+ risk_pod(joined, label="compute_risk")
+
+pipeline.run()
+```
+
+## Labels and Node Access
+
+Every operation inside a pipeline context can be given a `label`. After compilation and
+execution, access nodes by label as attributes:
+
+
+```python
+# Access results by label
+risk_table = pipeline.compute_risk.as_table()
+print(risk_table.to_pandas()[["patient_id", "risk"]])
+# patient_id risk
+# 0 p1 69.0
+# 1 p2 88.5
+# 2 p3 108.0
+```
+
+## Convenience Methods
+
+Streams and sources expose convenience methods for common operators, making pipeline
+construction more fluent:
+
+
+```python
+with pipeline:
+ joined = patients.join(labs, label="join_data")
+ selected = joined.select_packet_columns(["age"], label="select_age")
+ renamed = selected.map_packets({"age": "patient_age"}, label="rename")
+ risk_pod(renamed, label="compute")
+```
+
+Available convenience methods:
+
+| Method | Operator |
+|--------|----------|
+| `.join(other)` | `Join` |
+| `.semi_join(other)` | `SemiJoin` |
+| `.map_tags(mapping)` | `MapTags` |
+| `.map_packets(mapping)` | `MapPackets` |
+| `.select_tag_columns(cols)` | `SelectTagColumns` |
+| `.select_packet_columns(cols)` | `SelectPacketColumns` |
+| `.drop_tag_columns(cols)` | `DropTagColumns` |
+| `.drop_packet_columns(cols)` | `DropPacketColumns` |
+| `.batch(batch_size=N)` | `Batch` |
+| `.polars_filter(col="val")` | `PolarsFilter` |
+
+## Persistent Storage with Delta Lake
+
+For durable persistence, use `DeltaTableDatabase` instead of `InMemoryArrowDatabase`:
+
+
+```python
+from pathlib import Path
+from orcapod.databases import DeltaTableDatabase
+
+db = DeltaTableDatabase(base_path=Path("./my_pipeline_db"))
+pipeline = Pipeline(name="risk_pipeline", pipeline_database=db)
+
+with pipeline:
+ joined = patients.join(labs, label="join_data")
+ risk_pod(joined, label="compute_risk")
+
+pipeline.run()
+```
+
+Results are stored as Delta Lake tables on disk and survive across process restarts.
+
+## Incremental Computation
+
+When you re-run a pipeline with new data, only the new rows are computed:
+
+
+```python
+# First run: 3 patients
+pipeline.run() # computes all 3
+
+# Add a new patient to the source
+patients_v2 = ArrowTableSource(
+ pa.table({
+ "patient_id": pa.array(["p1", "p2", "p3", "p4"], type=pa.large_string()),
+ "age": pa.array([30, 45, 60, 25], type=pa.int64()),
+ }),
+ tag_columns=["patient_id"],
+)
+
+labs_v2 = ArrowTableSource(
+ pa.table({
+ "patient_id": pa.array(["p1", "p2", "p3", "p4"], type=pa.large_string()),
+ "cholesterol": pa.array([180, 220, 260, 200], type=pa.int64()),
+ }),
+ tag_columns=["patient_id"],
+)
+
+# Rebuild pipeline with updated sources
+pipeline2 = Pipeline(name="risk_pipeline", pipeline_database=db)
+with pipeline2:
+ joined = patients_v2.join(labs_v2, label="join_data")
+ risk_pod(joined, label="compute_risk")
+
+pipeline2.run() # only p4 is computed; p1-p3 come from cache
+```
+
+## Compiled Node Types
+
+When a pipeline compiles, each node is replaced with its persistent variant:
+
+| Original | Persistent Variant | Cache Scoping |
+|----------|-------------------|---------------|
+| Leaf stream | `PersistentSourceNode` | Content hash |
+| Operator call | `PersistentOperatorNode` | Content hash |
+| Function pod call | `PersistentFunctionNode` | Pipeline hash (schema + topology) |
+
+## Separate Function Database
+
+For isolating function pod result caches from the main pipeline database:
+
+
+```python
+pipeline_db = DeltaTableDatabase(base_path=Path("./pipeline"))
+function_db = DeltaTableDatabase(base_path=Path("./functions"))
+
+pipeline = Pipeline(
+ name="risk_pipeline",
+ pipeline_database=pipeline_db,
+ function_database=function_db,
+)
+```
+
+## Next Steps
+
+- [User Guide: Pipelines](../user-guide/pipelines.md) — Advanced pipeline patterns
+ and composition
+- [User Guide: Caching & Persistence](../user-guide/caching.md) — Deep dive into
+ orcapod's three-tier caching strategy
diff --git a/docs/getting-started/installation.md b/docs/getting-started/installation.md
new file mode 100644
index 0000000..b7b9e52
--- /dev/null
+++ b/docs/getting-started/installation.md
@@ -0,0 +1,83 @@
+# Installation
+
+## Requirements
+
+- Python 3.11 or later
+- [uv](https://docs.astral.sh/uv/) (recommended package manager)
+
+## Install from Source
+
+Clone the repository and install with `uv`:
+
+
+```bash
+git clone https://github.com/walkerlab/orcapod-python.git
+cd orcapod-python
+uv sync
+```
+
+Or install with pip:
+
+
+```bash
+pip install -e .
+```
+
+## Optional Dependencies
+
+orcapod has optional dependency groups for extended functionality:
+
+=== "Redis"
+
+ ```bash
+ pip install orcapod[redis]
+ ```
+
+ Enables Redis-backed caching.
+
+=== "Ray"
+
+ ```bash
+ pip install orcapod[ray]
+ ```
+
+ Enables distributed execution via Ray.
+
+=== "All"
+
+ ```bash
+ pip install orcapod[all]
+ ```
+
+ Installs all optional dependencies.
+
+## Development Setup
+
+For contributing to orcapod:
+
+
+```bash
+git clone https://github.com/walkerlab/orcapod-python.git
+cd orcapod-python
+uv sync --group dev
+```
+
+Verify your installation:
+
+
+```bash
+uv run pytest tests/ -x -q
+```
+
+## Core Dependencies
+
+orcapod builds on several key libraries:
+
+| Library | Purpose |
+|---------|---------|
+| [PyArrow](https://arrow.apache.org/docs/python/) | Columnar data representation and Arrow table backing |
+| [Polars](https://pola.rs/) | DataFrame filtering (used by `PolarsFilter` operator) |
+| [Delta Lake](https://delta.io/) | Persistent database storage via Delta tables |
+| [xxhash](https://github.com/Cyan4973/xxHash) | Fast content hashing |
+| [NetworkX](https://networkx.org/) | Pipeline graph compilation and topological sorting |
+| [Graphviz](https://graphviz.org/) | Pipeline visualization |
diff --git a/docs/getting-started/quickstart.md b/docs/getting-started/quickstart.md
new file mode 100644
index 0000000..a1a3ca6
--- /dev/null
+++ b/docs/getting-started/quickstart.md
@@ -0,0 +1,168 @@
+# Quickstart
+
+This guide introduces orcapod's core concepts through a hands-on example. You'll create
+sources, join them, apply a computation, and inspect the results — all with automatic
+provenance tracking.
+
+## Creating Sources
+
+Sources are the entry points for data in orcapod. The simplest way to create one is from a
+Python dictionary:
+
+```python
+from orcapod import DictSource
+
+patients = DictSource(
+ data=[
+ {"patient_id": "p1", "age": 30},
+ {"patient_id": "p2", "age": 45},
+ {"patient_id": "p3", "age": 60},
+ ],
+ tag_columns=["patient_id"],
+)
+```
+
+The `tag_columns` parameter specifies which columns are **tags** (metadata used for joining
+and routing) versus **packets** (the data payload). Here, `patient_id` is a tag and `age`
+is a packet column.
+
+orcapod supports many source types:
+
+
+```python
+import pyarrow as pa
+from orcapod import ArrowTableSource, ListSource
+
+# From a PyArrow table
+arrow_src = ArrowTableSource(
+ pa.table({"id": ["a", "b"], "value": [1, 2]}),
+ tag_columns=["id"],
+)
+
+# From a list of objects with a tag function (see ListSource docs for details)
+# list_src = ListSource(name="images", data=[img1, img2], ...)
+```
+
+## Exploring Streams
+
+Every source produces a **stream** — an immutable sequence of (Tag, Packet) pairs:
+
+
+```python
+# Sources implement the stream protocol directly
+stream = patients
+
+# Check the schema
+tag_schema, packet_schema = stream.output_schema()
+print(f"Tags: {tag_schema}") # Schema({'patient_id': str})
+print(f"Packets: {packet_schema}") # Schema({'age': int})
+
+# Iterate over entries
+for tag, packet in stream.iter_packets():
+ print(f" {tag.as_dict()} → {packet.as_dict()}")
+```
+
+## Joining Streams
+
+Use the **Join** operator to combine streams on their shared tag columns:
+
+
+```python
+from orcapod.core.operators import Join
+
+labs = DictSource(
+ data=[
+ {"patient_id": "p1", "cholesterol": 180},
+ {"patient_id": "p2", "cholesterol": 220},
+ {"patient_id": "p3", "cholesterol": 260},
+ ],
+ tag_columns=["patient_id"],
+)
+
+joined = Join()(patients, labs)
+
+# The joined stream has both age and cholesterol as packet columns
+tag_schema, packet_schema = joined.output_schema()
+print(f"Packets: {packet_schema}") # Schema({'age': int, 'cholesterol': int})
+```
+
+## Applying Computations
+
+**Function pods** apply stateless computations to individual packets. Define a regular Python
+function and wrap it:
+
+
+```python
+from orcapod import FunctionPod
+from orcapod.core.packet_function import PythonPacketFunction
+
+def risk_score(age: int, cholesterol: int) -> float:
+ """Compute a simple risk score."""
+ return age * 0.5 + cholesterol * 0.3
+
+risk_fn = PythonPacketFunction(risk_score, output_keys="risk")
+risk_pod = FunctionPod(packet_function=risk_fn)
+
+# Apply the function pod to the joined stream
+result = risk_pod(joined)
+
+for tag, packet in result.iter_packets():
+ print(f" {tag.as_dict()} → {packet.as_dict()}")
+# {'patient_id': 'p1'} → {'risk': 69.0}
+# {'patient_id': 'p2'} → {'risk': 88.5}
+# {'patient_id': 'p3'} → {'risk': 108.0}
+```
+
+You can also use the decorator syntax:
+
+
+```python
+from orcapod import function_pod
+
+@function_pod(output_keys="risk")
+def compute_risk(age: int, cholesterol: int) -> float:
+ return age * 0.5 + cholesterol * 0.3
+
+result = compute_risk.pod(joined)
+```
+
+## Materializing Results
+
+Streams are lazy — data is only computed when you request it. Materialize a stream
+as a PyArrow table:
+
+
+```python
+table = result.as_table()
+print(table.to_pandas())
+# patient_id risk
+# 0 p1 69.0
+# 1 p2 88.5
+# 2 p3 108.0
+```
+
+## Inspecting Provenance
+
+Every value in orcapod is traceable. Use `ColumnConfig` to inspect provenance metadata:
+
+
+```python
+from orcapod.types import ColumnConfig
+
+# Include source-info columns
+table = result.as_table(columns=ColumnConfig(source=True))
+print(table.column_names)
+# [..., '_source_risk', ...]
+
+# Include system tags for full lineage
+table = result.as_table(columns=ColumnConfig(system_tags=True))
+print(table.column_names)
+# [..., '_tag::source_id::...', '_tag::record_id::...', ...]
+```
+
+## Next Steps
+
+- [Building Your First Pipeline](first-pipeline.md) — Learn how to orchestrate multi-step
+ pipelines with persistence and incremental computation.
+- [Concepts: Architecture Overview](../concepts/architecture.md) — Understand the design
+ principles behind orcapod.
diff --git a/docs/index.md b/docs/index.md
new file mode 100644
index 0000000..fa87034
--- /dev/null
+++ b/docs/index.md
@@ -0,0 +1,111 @@
+# orcapod
+
+**Intuitive and powerful library for highly reproducible scientific data pipelines.**
+
+orcapod is a Python framework for building data pipelines with built-in provenance tracking,
+content-addressable caching, and deterministic computation. Every value produced by an orcapod
+pipeline is traceable back to its original source, every computation is memoizable, and every
+result is verifiable.
+
+---
+
+## Key Features
+
+- **Full Provenance Tracking** — Every value carries metadata tracing it back to its originating source and record. Operator topology is captured in system tags, forming a complete lineage chain.
+
+- **Content-Addressable Caching** — Computations are identified by their content hash. Identical computations are never repeated, and results are automatically shared across compatible pipeline runs.
+
+- **Immutable Data Flow** — Streams are immutable sequences of (Tag, Packet) pairs backed by Apache Arrow tables. Data flows forward through the pipeline without side effects.
+
+- **Strict Operator / Function Pod Boundary** — Operators transform structure (joins, filters, renames) without inspecting packet data. Function pods transform data without inspecting tags. This separation keeps provenance clean and reasoning simple.
+
+- **Schema as a First-Class Citizen** — Every stream is self-describing. Schemas are predicted at construction time, not discovered at runtime, enabling early validation and deterministic system tag naming.
+
+- **Incremental Computation** — Database-backed nodes compute only what's missing. Add new data to a source and re-run: only the new rows are processed.
+
+- **Pluggable Execution** — Run pipelines synchronously for debugging or with async push-based channels for production. Swap in a Ray executor for distributed computation. Results are identical regardless of execution strategy.
+
+---
+
+## How It Works
+
+```
+Source → Stream → [Operator / FunctionPod] → Stream → ...
+```
+
+1. **Sources** load data from external systems (CSV, Delta Lake, DataFrames, dicts) and annotate each row with provenance metadata.
+
+2. **Streams** carry the data as immutable (Tag, Packet) pairs over a shared schema.
+
+3. **Operators** reshape streams — join, filter, batch, select, rename — without creating new values.
+
+4. **Function Pods** apply packet functions that transform individual packets, producing new computed values with tracked provenance.
+
+5. **Pipelines** orchestrate the full graph, automatically persisting results and enabling incremental re-computation.
+
+---
+
+## Quick Example
+
+```python
+import pyarrow as pa
+from orcapod import ArrowTableSource, FunctionPod
+from orcapod.core.packet_function import PythonPacketFunction
+from orcapod.core.operators import Join
+
+# Create sources
+patients = ArrowTableSource(
+ pa.table({
+ "patient_id": ["p1", "p2", "p3"],
+ "age": [30, 45, 60],
+ }),
+ tag_columns=["patient_id"],
+)
+
+labs = ArrowTableSource(
+ pa.table({
+ "patient_id": ["p1", "p2", "p3"],
+ "cholesterol": [180, 220, 260],
+ }),
+ tag_columns=["patient_id"],
+)
+
+# Join sources on shared tag columns
+joined = Join()(patients, labs)
+
+# Define and apply a packet function
+def risk_score(age: int, cholesterol: int) -> float:
+ return age * 0.5 + cholesterol * 0.3
+
+risk_fn = PythonPacketFunction(risk_score, output_keys="risk")
+risk_pod = FunctionPod(packet_function=risk_fn)
+result = risk_pod(joined)
+
+# Iterate over results
+for tag, packet in result.iter_packets():
+ print(f"{tag.as_dict()} → risk={packet.as_dict()['risk']}")
+```
+
+---
+
+## Next Steps
+
+