From ff7708e3273bfcdea6dd7c556072e7994837085e Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Tue, 24 Mar 2026 07:09:38 -0400 Subject: [PATCH 1/3] Further trait and method implementations --- differential-dataflow/examples/columnar.rs | 745 ++++++++++++++++----- 1 file changed, 586 insertions(+), 159 deletions(-) diff --git a/differential-dataflow/examples/columnar.rs b/differential-dataflow/examples/columnar.rs index 21ebf2397..b7d01f7c6 100644 --- a/differential-dataflow/examples/columnar.rs +++ b/differential-dataflow/examples/columnar.rs @@ -1,11 +1,11 @@ -//! Wordcount based on `columnar`. +//! Columnar containers for differential dataflow. +//! +//! Demonstrates columnar-backed arrangements, iterative scopes, and reachability. use timely::container::{ContainerBuilder, PushInto}; use timely::dataflow::InputHandle; use timely::dataflow::ProbeHandle; -use differential_dataflow::operators::arrange::arrangement::arrange_core; - use mimalloc::MiMalloc; #[global_allocator] @@ -13,84 +13,70 @@ static GLOBAL: MiMalloc = MiMalloc; fn main() { - type WordCount = (Vec, (), u64, i64); - type Builder = ValColBuilder; - - let keys: usize = std::env::args().nth(1).expect("missing argument 1").parse().unwrap(); - let size: usize = std::env::args().nth(2).expect("missing argument 2").parse().unwrap(); + let nodes: u32 = std::env::args().nth(1).unwrap_or("100".into()).parse().unwrap(); + let edges: u32 = std::env::args().nth(2).unwrap_or("300".into()).parse().unwrap(); - let timer1 = ::std::time::Instant::now(); - let timer2 = timer1.clone(); + let timer = ::std::time::Instant::now(); timely::execute_from_args(std::env::args(), move |worker| { - let mut data_input = >::new_with_builder(); - let mut keys_input = >::new_with_builder(); + type EdgeUpdate = (u32, u32, u64, i64); + type NodeUpdate = (u32, (), u64, i64); + type EdgeBuilder = ValColBuilder; + type NodeBuilder = ValColBuilder; + + let mut edge_input = >::new_with_builder(); + let mut root_input = >::new_with_builder(); let mut probe = ProbeHandle::new(); worker.dataflow::(|scope| { - let data = data_input.to_stream(scope); - let keys = keys_input.to_stream(scope); + use differential_dataflow::AsCollection; + use timely::dataflow::operators::Probe; - use differential_dataflow::Hashable; - let data_pact = ValPact { hashfunc: |k: columnar::Ref<'_, Vec>| k.hashed() }; - let keys_pact = ValPact { hashfunc: |k: columnar::Ref<'_, Vec>| k.hashed() }; + let edges = edge_input.to_stream(scope).as_collection(); + let roots = root_input.to_stream(scope).as_collection(); - let data = arrange_core::<_,_,ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(data, data_pact, "Data"); - let keys = arrange_core::<_,_,ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(keys, keys_pact, "Keys"); - - keys.join_core(data, |_k, (), ()| { Option::<()>::None }) + reachability::reach(edges, roots) + .inner .probe_with(&mut probe); }); - use std::fmt::Write; - let mut buffer = String::default(); - let mut builder = Builder::default(); - - let mut counter = 0; - while counter < 10 * keys { - let mut i = worker.index(); - let time = *data_input.time(); - while i < size { - let val = (counter + i) % keys; - write!(buffer, "{:?}", val).unwrap(); - builder.push_into((buffer.as_bytes(), (), time, 1)); - buffer.clear(); - i += worker.peers(); - } - while let Some(container) = builder.finish() { - data_input.send_batch(container); + // Generate a small random graph. + let mut edge_builder = EdgeBuilder::default(); + let mut node_builder = NodeBuilder::default(); + + if worker.index() == 0 { + // Simple deterministic "random" edges. + let mut src: u32 = 0; + for _ in 0..edges { + let dst = (src.wrapping_mul(7).wrapping_add(13)) % nodes; + edge_builder.push_into((src, dst, 0u64, 1i64)); + src = (src + 1) % nodes; } - counter += size; - data_input.advance_to(data_input.time() + 1); - keys_input.advance_to(keys_input.time() + 1); - while probe.less_than(data_input.time()) { worker.step_or_park(None); } + // Root: node 0. + node_builder.push_into((0u32, (), 0u64, 1i64)); } - println!("{:?}\tloading complete", timer1.elapsed()); - - let mut queries = 0; - while queries < 10 * keys { - let mut i = worker.index(); - let time = *data_input.time(); - while i < size { - let val = (queries + i) % keys; - write!(buffer, "{:?}", val).unwrap(); - builder.push_into((buffer.as_bytes(), (), time, 1)); - buffer.clear(); - i += worker.peers(); - } - while let Some(container) = builder.finish() { - keys_input.send_batch(container); - } - queries += size; - data_input.advance_to(data_input.time() + 1); - keys_input.advance_to(keys_input.time() + 1); - while probe.less_than(data_input.time()) { worker.step_or_park(None); } + + while let Some(container) = edge_builder.finish() { + edge_input.send_batch(container); } - println!("{:?}\tqueries complete", timer1.elapsed()); + while let Some(container) = node_builder.finish() { + root_input.send_batch(container); + } + + edge_input.advance_to(1); + root_input.advance_to(1); + edge_input.flush(); + root_input.flush(); + + while probe.less_than(edge_input.time()) { + worker.step_or_park(None); + } + + println!("{:?}\treachability complete ({} nodes, {} edges)", timer.elapsed(), nodes, edges); }).unwrap(); - println!("{:?}\tshut down", timer2.elapsed()); + println!("{:?}\tshut down", timer.elapsed()); } pub use layout::{ColumnarLayout, ColumnarUpdate}; @@ -174,6 +160,106 @@ impl timely::dataflow::channels::ContainerBytes for R fn into_bytes(&self, _writer: &mut W) { unimplemented!() } } +// Container trait impls for RecordedUpdates, enabling iterative scopes. +mod container_impls { + use columnar::{Borrow, Columnar, Index, Len, Push}; + use timely::progress::{Timestamp, timestamp::Refines}; + use differential_dataflow::difference::Abelian; + use differential_dataflow::collection::containers::{Negate, Enter, Leave, ResultsIn}; + + use crate::layout::ColumnarUpdate as Update; + use crate::{RecordedUpdates, Updates}; + + impl> Negate for RecordedUpdates { + fn negate(mut self) -> Self { + let len = self.updates.diffs.values.len(); + let mut new_diffs = <::Container as Default>::default(); + let mut owned = U::Diff::default(); + for i in 0..len { + columnar::Columnar::copy_from(&mut owned, self.updates.diffs.values.borrow().get(i)); + owned.negate(); + new_diffs.push(&owned); + } + self.updates.diffs.values = new_diffs; + self + } + } + + impl Enter for RecordedUpdates<(K, V, T1, R)> + where + (K, V, T1, R): Update, + (K, V, T2, R): Update, + T1: Timestamp + Columnar + Default + Clone, + T2: Refines + Columnar + Default + Clone, + K: Columnar, V: Columnar, R: Columnar, + { + type InnerContainer = RecordedUpdates<(K, V, T2, R)>; + fn enter(self) -> Self::InnerContainer { + // Rebuild the time column; everything else moves as-is. + let mut new_times = <::Container as Default>::default(); + let mut t1_owned = T1::default(); + for i in 0..self.updates.times.values.len() { + Columnar::copy_from(&mut t1_owned, self.updates.times.values.borrow().get(i)); + let t2 = T2::to_inner(t1_owned.clone()); + new_times.push(&t2); + } + RecordedUpdates { + updates: Updates { + keys: self.updates.keys, + vals: self.updates.vals, + times: crate::updates::Lists { values: new_times, bounds: self.updates.times.bounds }, + diffs: self.updates.diffs, + }, + records: self.records, + } + } + } + + impl Leave for RecordedUpdates<(K, V, T1, R)> + where + (K, V, T1, R): Update, + (K, V, T2, R): Update, + T1: Refines + Columnar + Default + Clone, + T2: Timestamp + Columnar + Default + Clone, + K: Columnar, V: Columnar, R: Columnar, + { + type OuterContainer = RecordedUpdates<(K, V, T2, R)>; + fn leave(self) -> Self::OuterContainer { + // Flatten, convert times, and reconsolidate via consolidate. + // Leave can collapse distinct T1 times to the same T2 time, + // so the trie must be rebuilt with consolidation. + let mut flat = Updates::<(K, V, T2, R)>::default(); + let mut t1_owned = T1::default(); + for (k, v, t, d) in self.updates.iter() { + Columnar::copy_from(&mut t1_owned, t); + let t2: T2 = t1_owned.clone().to_outer(); + flat.push((k, v, &t2, d)); + } + RecordedUpdates { + updates: flat.consolidate(), + records: self.records, + } + } + } + + impl ResultsIn<::Summary> for RecordedUpdates { + fn results_in(self, step: &::Summary) -> Self { + use timely::progress::PathSummary; + // Apply results_in to each time; drop updates whose time maps to None. + // This must rebuild the trie since some entries may be removed. + let mut output = Updates::::default(); + let mut time_owned = U::Time::default(); + for (k, v, t, d) in self.updates.iter() { + Columnar::copy_from(&mut time_owned, t); + if let Some(new_time) = step.results_in(&time_owned) { + output.push((k, v, &new_time, d)); + } + } + RecordedUpdates { updates: output, records: self.records } + } + } +} + pub use column_builder::ValBuilder as ValColBuilder; mod column_builder { @@ -731,15 +817,32 @@ pub mod arrangement { output } - pub struct ValMirror { marker: std::marker::PhantomData } + pub struct ValMirror { + current: Updates, + } impl differential_dataflow::trace::Builder for ValMirror { type Time = U::Time; type Input = Updates; type Output = OrdValBatch>; - fn with_capacity(_keys: usize, _vals: usize, _upds: usize) -> Self { Self { marker: std::marker::PhantomData } } - fn push(&mut self, _chunk: &mut Self::Input) { unimplemented!() } - fn done(self, _description: Description) -> Self::Output { unimplemented!() } + fn with_capacity(_keys: usize, _vals: usize, _upds: usize) -> Self { + Self { current: Updates::default() } + } + fn push(&mut self, chunk: &mut Self::Input) { + use columnar::Len; + let len = chunk.keys.values.len(); + if len > 0 { + self.current.extend_from_keys(chunk, 0..len); + } + } + fn done(self, description: Description) -> Self::Output { + let mut chain = if self.current.len() > 0 { + vec![self.current] + } else { + vec![] + }; + Self::seal(&mut chain, description) + } fn seal(chain: &mut Vec, description: Description) -> Self::Output { if chain.len() == 0 { let storage = OrdValStorage { @@ -770,8 +873,16 @@ pub mod arrangement { OrdValBatch { storage, description, updates } } else { - println!("chain length: {:?}", chain.len()); - unimplemented!() + use columnar::Len; + let mut merged = chain.remove(0); + for other in chain.drain(..) { + let len = other.keys.values.len(); + if len > 0 { + merged.extend_from_keys(&other, 0..len); + } + } + chain.push(merged); + Self::seal(chain, description) } } } @@ -894,63 +1005,96 @@ pub mod updates { /// Forms a consolidated `Updates` from sorted `(key, val, time, diff)` refs. /// - /// Follows the same dedup pattern at each level: check if the current - /// entry differs from the previous, seal the previous group if so. - /// At the time level, equal `(key, val, time)` triples have diffs accumulated. - /// The `records` field tracks the input count for exchange accounting. + /// Tracks a `prev` reference to the previous element. On each new element, + /// compares against `prev` to detect key/val/time changes. Only pushes + /// accumulated diffs when they are nonzero, and only emits times/vals/keys + /// that have at least one nonzero diff beneath them. pub fn form<'a>(mut sorted: impl Iterator>>) -> Self { let mut output = Self::default(); let mut diff_stash = U::Diff::default(); let mut diff_temp = U::Diff::default(); - if let Some((key, val, time, diff)) = sorted.next() { - output.keys.values.push(key); - output.vals.values.push(val); - output.times.values.push(time); - Columnar::copy_from(&mut diff_stash, diff); + if let Some(first) = sorted.next() { - for (key, val, time, diff) in sorted { - let mut differs = false; - // Keys: seal vals for previous key if key changed. - let keys_len = output.keys.values.len(); - differs |= ContainerOf::::reborrow_ref(key) != output.keys.values.borrow().get(keys_len - 1); - if differs { output.keys.values.push(key); } - // Vals: seal times for previous val if key or val changed. - let vals_len = output.vals.values.len(); - if differs { output.vals.bounds.push(vals_len as u64); } - differs |= ContainerOf::::reborrow_ref(val) != output.vals.values.borrow().get(vals_len - 1); - if differs { output.vals.values.push(val); } - // Times: seal diffs for previous time if key, val, or time changed. - let times_len = output.times.values.len(); - if differs { output.times.bounds.push(times_len as u64); } - differs |= ContainerOf::::reborrow_ref(time) != output.times.values.borrow().get(times_len - 1); - if differs { - // Flush accumulated diff for the previous time. + let mut prev = first; + Columnar::copy_from(&mut diff_stash, prev.3); + + for curr in sorted { + let key_differs = ContainerOf::::reborrow_ref(curr.0) != ContainerOf::::reborrow_ref(prev.0); + let val_differs = key_differs || ContainerOf::::reborrow_ref(curr.1) != ContainerOf::::reborrow_ref(prev.1); + let time_differs = val_differs || ContainerOf::::reborrow_ref(curr.2) != ContainerOf::::reborrow_ref(prev.2); + + if time_differs { + // Flush the accumulated diff for prev's (key, val, time). if !diff_stash.is_zero() { + // We have a real update to emit. Push time (and val/key + // if this is the first time under them). + let times_len = output.times.values.len(); + let vals_len = output.vals.values.len(); + + if val_differs { + // Seal the previous val's time list, if any times were emitted. + if times_len > 0 { + output.times.bounds.push(times_len as u64); + } + if key_differs { + // Seal the previous key's val list, if any vals were emitted. + if vals_len > 0 { + output.vals.bounds.push(vals_len as u64); + } + output.keys.values.push(prev.0); + } + output.vals.values.push(prev.1); + } + output.times.values.push(prev.2); output.diffs.values.push(&diff_stash); + output.diffs.bounds.push(output.diffs.values.len() as u64); } - // TODO: Else is complicated, as we may want to pop prior values. - // It is perhaps fine to leave zeros as a thing that won't - // invalidate merging. - output.diffs.bounds.push(output.diffs.values.len() as u64); - // Start new time. - output.times.values.push(time); - Columnar::copy_from(&mut diff_stash, diff); + Columnar::copy_from(&mut diff_stash, curr.3); } else { // Same (key, val, time): accumulate diff. - Columnar::copy_from(&mut diff_temp, diff); + Columnar::copy_from(&mut diff_temp, curr.3); diff_stash.plus_equals(&diff_temp); } + prev = curr; } - // Flush the last accumulated diff and seal all levels. + + // Flush the final accumulated diff. if !diff_stash.is_zero() { + let keys_len = output.keys.values.len(); + let vals_len = output.vals.values.len(); + let times_len = output.times.values.len(); + let need_key = keys_len == 0 || ContainerOf::::reborrow_ref(prev.0) != output.keys.values.borrow().get(keys_len - 1); + let need_val = need_key || vals_len == 0 || ContainerOf::::reborrow_ref(prev.1) != output.vals.values.borrow().get(vals_len - 1); + + if need_val { + if times_len > 0 { + output.times.bounds.push(times_len as u64); + } + if need_key { + if vals_len > 0 { + output.vals.bounds.push(vals_len as u64); + } + output.keys.values.push(prev.0); + } + output.vals.values.push(prev.1); + } + output.times.values.push(prev.2); output.diffs.values.push(&diff_stash); + output.diffs.bounds.push(output.diffs.values.len() as u64); + } + + // Seal the final groups at each level. + if !output.times.values.is_empty() { + output.times.bounds.push(output.times.values.len() as u64); + } + if !output.vals.values.is_empty() { + output.vals.bounds.push(output.vals.values.len() as u64); + } + if !output.keys.values.is_empty() { + output.keys.bounds.push(output.keys.values.len() as u64); } - output.diffs.bounds.push(output.diffs.values.len() as u64); - output.times.bounds.push(output.times.values.len() as u64); - output.vals.bounds.push(output.vals.values.len() as u64); - output.keys.bounds.push(output.keys.values.len() as u64); } output @@ -1054,8 +1198,22 @@ pub mod updates { output } - /// Push a single flat update `(key, val, time, diff)` as a stride-1 entry. - pub fn push<'a>(&mut self, key: columnar::Ref<'a, U::Key>, val: columnar::Ref<'a, U::Val>, time: columnar::Ref<'a, U::Time>, diff: columnar::Ref<'a, U::Diff>) { + /// The number of leaf-level diff entries (total updates). + pub fn len(&self) -> usize { self.diffs.values.len() } + } + + /// Push a single flat update as a stride-1 entry. + /// + /// Each field is independently typed — columnar refs, `&Owned`, owned values, + /// or any other type the column container accepts via its `Push` impl. + impl Push<(KP, VP, TP, DP)> for Updates + where + ContainerOf: Push, + ContainerOf: Push, + ContainerOf: Push, + ContainerOf: Push, + { + fn push(&mut self, (key, val, time, diff): (KP, VP, TP, DP)) { self.keys.values.push(key); self.keys.bounds.push(self.keys.values.len() as u64); self.vals.values.push(val); @@ -1065,21 +1223,16 @@ pub mod updates { self.diffs.values.push(diff); self.diffs.bounds.push(self.diffs.values.len() as u64); } + } - /// Push a single flat update from owned values. - pub fn push_owned(&mut self, key: &U::Key, val: &U::Val, time: &U::Time, diff: &U::Diff) { - self.keys.values.push(key); - self.keys.bounds.push(self.keys.values.len() as u64); - self.vals.values.push(val); - self.vals.bounds.push(self.vals.values.len() as u64); - self.times.values.push(time); - self.times.bounds.push(self.times.values.len() as u64); - self.diffs.values.push(diff); - self.diffs.bounds.push(self.diffs.values.len() as u64); + /// PushInto for the `((K, V), T, R)` shape that reduce_trace uses. + impl timely::container::PushInto<((U::Key, U::Val), U::Time, U::Diff)> for Updates { + fn push_into(&mut self, ((key, val), time, diff): ((U::Key, U::Val), U::Time, U::Diff)) { + self.push((&key, &val, &time, &diff)); } + } - /// The number of leaf-level diff entries (total updates). - pub fn len(&self) -> usize { self.diffs.values.len() } + impl Updates { /// Iterate all `(key, val, time, diff)` entries as refs. pub fn iter(&self) -> impl Iterator) -> Vec<(u64, u64, u64, i64)> { updates.iter().map(|(k, v, t, d)| (*k, *v, *t, *d)).collect() } @@ -1138,49 +1290,39 @@ pub mod updates { #[test] fn test_push_and_consolidate_basic() { let mut updates = Updates::::default(); - updates.push_owned(&1, &10, &100, &1); - updates.push_owned(&1, &10, &100, &2); - updates.push_owned(&2, &20, &200, &5); - + updates.push((&1, &10, &100, &1)); + updates.push((&1, &10, &100, &2)); + updates.push((&2, &20, &200, &5)); assert_eq!(updates.len(), 3); - - let consolidated = updates.consolidate(); - let entries = collect(&consolidated); - assert_eq!(entries, vec![(1, 10, 100, 3), (2, 20, 200, 5)]); + assert_eq!(collect(&updates.consolidate()), vec![(1, 10, 100, 3), (2, 20, 200, 5)]); } #[test] fn test_cancellation() { let mut updates = Updates::::default(); - updates.push_owned(&1, &10, &100, &3); - updates.push_owned(&1, &10, &100, &-3); - updates.push_owned(&2, &20, &200, &1); - - let entries = collect(&updates.consolidate()); - assert_eq!(entries, vec![(2, 20, 200, 1)]); + updates.push((&1, &10, &100, &3)); + updates.push((&1, &10, &100, &-3)); + updates.push((&2, &20, &200, &1)); + assert_eq!(collect(&updates.consolidate()), vec![(2, 20, 200, 1)]); } #[test] fn test_multiple_vals_and_times() { let mut updates = Updates::::default(); - updates.push_owned(&1, &10, &100, &1); - updates.push_owned(&1, &10, &200, &2); - updates.push_owned(&1, &20, &100, &3); - updates.push_owned(&1, &20, &100, &4); - - let entries = collect(&updates.consolidate()); - assert_eq!(entries, vec![(1, 10, 100, 1), (1, 10, 200, 2), (1, 20, 100, 7)]); + updates.push((&1, &10, &100, &1)); + updates.push((&1, &10, &200, &2)); + updates.push((&1, &20, &100, &3)); + updates.push((&1, &20, &100, &4)); + assert_eq!(collect(&updates.consolidate()), vec![(1, 10, 100, 1), (1, 10, 200, 2), (1, 20, 100, 7)]); } #[test] fn test_val_cancellation_propagates() { let mut updates = Updates::::default(); - updates.push_owned(&1, &10, &100, &5); - updates.push_owned(&1, &10, &100, &-5); - updates.push_owned(&1, &20, &100, &1); - - let entries = collect(&updates.consolidate()); - assert_eq!(entries, vec![(1, 20, 100, 1)]); + updates.push((&1, &10, &100, &5)); + updates.push((&1, &10, &100, &-5)); + updates.push((&1, &20, &100, &1)); + assert_eq!(collect(&updates.consolidate()), vec![(1, 20, 100, 1)]); } #[test] @@ -1192,20 +1334,305 @@ pub mod updates { #[test] fn test_total_cancellation() { let mut updates = Updates::::default(); - updates.push_owned(&1, &10, &100, &1); - updates.push_owned(&1, &10, &100, &-1); + updates.push((&1, &10, &100, &1)); + updates.push((&1, &10, &100, &-1)); assert_eq!(collect(&updates.consolidate()), vec![]); } #[test] fn test_unsorted_input() { let mut updates = Updates::::default(); - updates.push_owned(&3, &30, &300, &1); - updates.push_owned(&1, &10, &100, &2); - updates.push_owned(&2, &20, &200, &3); + updates.push((&3, &30, &300, &1)); + updates.push((&1, &10, &100, &2)); + updates.push((&2, &20, &200, &3)); + assert_eq!(collect(&updates.consolidate()), vec![(1, 10, 100, 2), (2, 20, 200, 3), (3, 30, 300, 1)]); + } + + #[test] + fn test_first_key_cancels() { + let mut updates = Updates::::default(); + updates.push((&1, &10, &100, &5)); + updates.push((&1, &10, &100, &-5)); + updates.push((&2, &20, &200, &3)); + assert_eq!(collect(&updates.consolidate()), vec![(2, 20, 200, 3)]); + } + + #[test] + fn test_middle_time_cancels() { + let mut updates = Updates::::default(); + updates.push((&1, &10, &100, &1)); + updates.push((&1, &10, &200, &2)); + updates.push((&1, &10, &200, &-2)); + updates.push((&1, &10, &300, &3)); + assert_eq!(collect(&updates.consolidate()), vec![(1, 10, 100, 1), (1, 10, 300, 3)]); + } + + #[test] + fn test_first_val_cancels() { + let mut updates = Updates::::default(); + updates.push((&1, &10, &100, &1)); + updates.push((&1, &10, &100, &-1)); + updates.push((&1, &20, &100, &5)); + assert_eq!(collect(&updates.consolidate()), vec![(1, 20, 100, 5)]); + } + + #[test] + fn test_interleaved_cancellations() { + let mut updates = Updates::::default(); + updates.push((&1, &10, &100, &1)); + updates.push((&1, &10, &100, &-1)); + updates.push((&2, &20, &200, &7)); + updates.push((&3, &30, &300, &4)); + updates.push((&3, &30, &300, &-4)); + assert_eq!(collect(&updates.consolidate()), vec![(2, 20, 200, 7)]); + } + } +} - let entries = collect(&updates.consolidate()); - assert_eq!(entries, vec![(1, 10, 100, 2), (2, 20, 200, 3), (3, 30, 300, 1)]); +/// A columnar flat_map: iterates RecordedUpdates, calls logic per (key, val, time, diff), +/// joins output times with input times, multiplies output diffs with input diffs. +/// +/// This subsumes map, filter, negate, and enter_at for columnar collections. +pub fn join_function( + input: differential_dataflow::Collection>, + mut logic: L, +) -> differential_dataflow::Collection> +where + G: timely::dataflow::Scope, + G::Timestamp: differential_dataflow::lattice::Lattice, + U: layout::ColumnarUpdate>, + I: IntoIterator, + L: FnMut( + columnar::Ref<'_, U::Key>, + columnar::Ref<'_, U::Val>, + columnar::Ref<'_, U::Time>, + columnar::Ref<'_, U::Diff>, + ) -> I + 'static, +{ + use timely::dataflow::operators::generic::Operator; + use timely::dataflow::channels::pact::Pipeline; + use differential_dataflow::AsCollection; + use differential_dataflow::difference::Multiply; + use differential_dataflow::lattice::Lattice; + use columnar::Columnar; + + input + .inner + .unary::, _, _, _>(Pipeline, "JoinFunction", move |_, _| { + move |input, output| { + input.for_each(|time, data| { + let mut session = output.session_with_builder(&time); + for (k1, v1, t1, d1) in data.updates.iter() { + let t1o: U::Time = Columnar::into_owned(t1); + let d1o: U::Diff = Columnar::into_owned(d1); + for (k2, v2, t2, d2) in logic(k1, v1, t1, d1) { + let t3 = t2.join(&t1o); + let d3 = d2.multiply(&d1o); + session.give((&k2, &v2, &t3, &d3)); + } + } + }); + } + }) + .as_collection() +} + +/// Leave a dynamic iterative scope, truncating PointStamp coordinates. +/// +/// Uses OperatorBuilder (not unary) for the custom input connection summary +/// that tells timely how the PointStamp is affected (retain `level - 1` coordinates). +/// +/// Consolidates after truncation since distinct PointStamp coordinates can collapse. +pub fn leave_dynamic( + input: differential_dataflow::Collection>, + level: usize, +) -> differential_dataflow::Collection> +where + G: timely::dataflow::Scope, + K: columnar::Columnar, + V: columnar::Columnar, + R: columnar::Columnar, + (K, V, DynTime, R): layout::ColumnarUpdate, +{ + use timely::dataflow::channels::pact::Pipeline; + use timely::dataflow::operators::generic::builder_rc::OperatorBuilder; + use timely::dataflow::operators::generic::OutputBuilder; + use timely::order::Product; + use timely::progress::Antichain; + use differential_dataflow::AsCollection; + use differential_dataflow::dynamic::pointstamp::{PointStamp, PointStampSummary}; + use columnar::Columnar; + + let mut builder = OperatorBuilder::new("LeaveDynamic".to_string(), input.inner.scope()); + let (output, stream) = builder.new_output(); + let mut output = OutputBuilder::from(output); + let mut op_input = builder.new_input_connection( + input.inner, + Pipeline, + [( + 0, + Antichain::from_elem(Product { + outer: Default::default(), + inner: PointStampSummary { + retain: Some(level - 1), + actions: Vec::new(), + }, + }), + )], + ); + + builder.build(move |_capability| { + let mut col_builder = ValColBuilder::<(K, V, DynTime, R)>::default(); + move |_frontier| { + let mut output = output.activate(); + op_input.for_each(|cap, data| { + // Truncate the capability's timestamp. + let mut new_time = cap.time().clone(); + let mut vec = std::mem::take(&mut new_time.inner).into_inner(); + vec.truncate(level - 1); + new_time.inner = PointStamp::new(vec); + let new_cap = cap.delayed(&new_time, 0); + // Push updates with truncated times into the builder. + // The builder's form call on flush sorts and consolidates, + // handling the duplicate times that truncation can produce. + // TODO: The input trie is already sorted; a streaming form + // that accepts pre-sorted, potentially-collapsing timestamps + // could avoid the re-sort inside the builder. + for (k, v, t, d) in data.updates.iter() { + let mut time: DynTime = Columnar::into_owned(t); + let mut inner_vec = std::mem::take(&mut time.inner).into_inner(); + inner_vec.truncate(level - 1); + time.inner = PointStamp::new(inner_vec); + col_builder.push_into((k, v, &time, d)); + } + let mut session = output.session(&new_cap); + while let Some(container) = col_builder.finish() { + session.give_container(container); + } + }); } + }); + + stream.as_collection() +} + +pub type DynTime = timely::order::Product>; + +/// Reachability on a random directed graph using columnar containers. +/// +/// This module exercises the container traits needed for iterative columnar +/// computation: Enter, Leave, Negate, ResultsIn on RecordedUpdates, and +/// Push on Updates for the reduce builder path. +mod reachability { + + use timely::order::Product; + use timely::dataflow::Scope; + use differential_dataflow::Collection; + use differential_dataflow::AsCollection; + use differential_dataflow::operators::iterate::Variable; + use differential_dataflow::operators::arrange::arrangement::arrange_core; + use differential_dataflow::operators::join::join_traces; + + use crate::{RecordedUpdates, ValColBuilder, ValBatcher, ValBuilder, ValSpine, ValPact}; + + type Node = u32; + type Time = u64; + type Diff = i64; + type IterTime = Product; + + /// Compute the set of nodes reachable from `roots` along directed `edges`. + /// + /// Returns `(node, ())` for each reachable node. + pub fn reach>( + edges: Collection>, + roots: Collection>, + ) -> Collection> + { + let mut scope = edges.inner.scope(); + + scope.iterative::(|nested| { + let summary = Product::new(Time::default(), 1); + + let roots_inner = roots.enter(nested); + let (variable, reach) = Variable::new_from(roots_inner.clone(), summary); + let edges_inner = edges.enter(nested); + + // Arrange both collections into columnar spines for joining. + let edges_pact = ValPact { hashfunc: |k: columnar::Ref<'_, Node>| *k as u64 }; + let reach_pact = ValPact { hashfunc: |k: columnar::Ref<'_, Node>| *k as u64 }; + + let edges_arr = arrange_core::<_, _, + ValBatcher, + ValBuilder, + ValSpine, + >(edges_inner.inner, edges_pact, "Edges"); + + let reach_arr = arrange_core::<_, _, + ValBatcher, + ValBuilder, + ValSpine, + >(reach.inner, reach_pact, "Reach"); + + // join_traces with ValColBuilder: produces Stream<_, RecordedUpdates<...>>. + let proposed = + join_traces::<_, _, _, _, ValColBuilder<(Node, (), IterTime, Diff)>>( + edges_arr, + reach_arr, + |_src, dst, (), time, d1, d2, session| { + use differential_dataflow::difference::Multiply; + let dst: Node = *dst; + let diff: Diff = d1.clone().multiply(d2); + session.give::<(Node, (), IterTime, Diff)>((dst, (), time.clone(), diff)); + }, + ).as_collection(); + + // concat: both sides are now Collection<_, RecordedUpdates<...>>. + let combined = proposed.concat(roots_inner); + + // Arrange for reduce. + let combined_pact = ValPact { hashfunc: |k: columnar::Ref<'_, Node>| *k as u64 }; + let combined_arr = arrange_core::<_, _, + ValBatcher, + ValBuilder, + ValSpine, + >(combined.inner, combined_pact, "Combined"); + + // reduce_abelian on the columnar arrangement. + let result = combined_arr.reduce_abelian::<_, + ValBuilder, + ValSpine, + >("Distinct", |_node, _input, output| { + output.push(((), 1)); + }); + + // Extract RecordedUpdates from the Arranged's batch stream. + use differential_dataflow::trace::{BatchReader, Cursor}; + use timely::container::{ContainerBuilder, PushInto}; + let result_col = result.as_container(|batch| { + let mut builder = ValColBuilder::<(Node, (), IterTime, Diff)>::default(); + let mut cursor = batch.cursor(); + while cursor.key_valid(&batch) { + let k: Node = *cursor.key(&batch); + while cursor.val_valid(&batch) { + let v: () = cursor.val(&batch); + cursor.map_times(&batch, |time, diff| { + builder.push_into((k, v, time.clone(), diff.clone())); + }); + cursor.step_val(&batch); + } + cursor.step_key(&batch); + } + let mut out = Vec::new(); + while let Some(container) = builder.finish() { + out.push(std::mem::take(container)); + } + out + }); + + variable.set(result_col.clone()); + + // Leave the iterative scope. + result_col.leave() + }) } } From aec42985f61a68241dd7e2af306728f491be8e27 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Tue, 24 Mar 2026 08:13:36 -0400 Subject: [PATCH 2/3] Further implementations, tidying, and re-organization as an example 'module' --- differential-dataflow/examples/columnar.rs | 1475 +--------------- .../examples/columnar_support.rs | 1491 +++++++++++++++++ 2 files changed, 1502 insertions(+), 1464 deletions(-) create mode 100644 differential-dataflow/examples/columnar_support.rs diff --git a/differential-dataflow/examples/columnar.rs b/differential-dataflow/examples/columnar.rs index b7d01f7c6..e2e5cb024 100644 --- a/differential-dataflow/examples/columnar.rs +++ b/differential-dataflow/examples/columnar.rs @@ -1,11 +1,18 @@ -//! Columnar containers for differential dataflow. +//! Columnar reachability example for differential dataflow. //! -//! Demonstrates columnar-backed arrangements, iterative scopes, and reachability. +//! Demonstrates columnar-backed arrangements in an iterative scope, +//! exercising Enter, Leave, Negate, ResultsIn on RecordedUpdates, +//! and Push on Updates for the reduce builder path. + +#[path = "columnar_support.rs"] +mod columnar_support; use timely::container::{ContainerBuilder, PushInto}; use timely::dataflow::InputHandle; use timely::dataflow::ProbeHandle; +use columnar_support::*; + use mimalloc::MiMalloc; #[global_allocator] @@ -79,1445 +86,6 @@ fn main() { println!("{:?}\tshut down", timer.elapsed()); } -pub use layout::{ColumnarLayout, ColumnarUpdate}; -pub mod layout { - - use std::fmt::Debug; - use columnar::Columnar; - use differential_dataflow::trace::implementations::{Layout, OffsetList}; - use differential_dataflow::difference::Semigroup; - use differential_dataflow::lattice::Lattice; - use timely::progress::Timestamp; - - /// A layout based on columnar - pub struct ColumnarLayout { - phantom: std::marker::PhantomData, - } - - impl ColumnarUpdate for (K, V, T, R) - where - K: Columnar + Debug + Ord + Clone + 'static, - V: Columnar + Debug + Ord + Clone + 'static, - T: Columnar + Debug + Ord + Default + Clone + Lattice + Timestamp, - R: Columnar + Debug + Ord + Default + Semigroup + 'static, - { - type Key = K; - type Val = V; - type Time = T; - type Diff = R; - } - - use crate::arrangement::Coltainer; - impl Layout for ColumnarLayout { - type KeyContainer = Coltainer; - type ValContainer = Coltainer; - type TimeContainer = Coltainer; - type DiffContainer = Coltainer; - type OffsetContainer = OffsetList; - } - - /// A type that names constituent update types. - /// - /// We will use their associated `Columnar::Container` - pub trait ColumnarUpdate : Debug + 'static { - type Key: Columnar + Debug + Ord + Clone + 'static; - type Val: Columnar + Debug + Ord + Clone + 'static; - type Time: Columnar + Debug + Ord + Default + Clone + Lattice + Timestamp; - type Diff: Columnar + Debug + Ord + Default + Semigroup + 'static; - } - - /// A container whose references can be ordered. - pub trait OrdContainer : for<'a> columnar::Container : Ord> { } - impl columnar::Container : Ord>> OrdContainer for C { } - -} - -pub use updates::Updates; - -/// A thin wrapper around `Updates` that tracks the pre-consolidation record count -/// for timely's exchange accounting. This wrapper is the stream container type; -/// the `TrieChunker` strips it, passing bare `Updates` into the merge batcher. -pub struct RecordedUpdates { - pub updates: Updates, - pub records: usize, -} - -impl Default for RecordedUpdates { - fn default() -> Self { Self { updates: Default::default(), records: 0 } } -} - -impl Clone for RecordedUpdates { - fn clone(&self) -> Self { Self { updates: self.updates.clone(), records: self.records } } -} - -impl timely::Accountable for RecordedUpdates { - #[inline] fn record_count(&self) -> i64 { self.records as i64 } -} - -impl timely::dataflow::channels::ContainerBytes for RecordedUpdates { - fn from_bytes(_bytes: timely::bytes::arc::Bytes) -> Self { unimplemented!() } - fn length_in_bytes(&self) -> usize { unimplemented!() } - fn into_bytes(&self, _writer: &mut W) { unimplemented!() } -} - -// Container trait impls for RecordedUpdates, enabling iterative scopes. -mod container_impls { - use columnar::{Borrow, Columnar, Index, Len, Push}; - use timely::progress::{Timestamp, timestamp::Refines}; - use differential_dataflow::difference::Abelian; - use differential_dataflow::collection::containers::{Negate, Enter, Leave, ResultsIn}; - - use crate::layout::ColumnarUpdate as Update; - use crate::{RecordedUpdates, Updates}; - - impl> Negate for RecordedUpdates { - fn negate(mut self) -> Self { - let len = self.updates.diffs.values.len(); - let mut new_diffs = <::Container as Default>::default(); - let mut owned = U::Diff::default(); - for i in 0..len { - columnar::Columnar::copy_from(&mut owned, self.updates.diffs.values.borrow().get(i)); - owned.negate(); - new_diffs.push(&owned); - } - self.updates.diffs.values = new_diffs; - self - } - } - - impl Enter for RecordedUpdates<(K, V, T1, R)> - where - (K, V, T1, R): Update, - (K, V, T2, R): Update, - T1: Timestamp + Columnar + Default + Clone, - T2: Refines + Columnar + Default + Clone, - K: Columnar, V: Columnar, R: Columnar, - { - type InnerContainer = RecordedUpdates<(K, V, T2, R)>; - fn enter(self) -> Self::InnerContainer { - // Rebuild the time column; everything else moves as-is. - let mut new_times = <::Container as Default>::default(); - let mut t1_owned = T1::default(); - for i in 0..self.updates.times.values.len() { - Columnar::copy_from(&mut t1_owned, self.updates.times.values.borrow().get(i)); - let t2 = T2::to_inner(t1_owned.clone()); - new_times.push(&t2); - } - RecordedUpdates { - updates: Updates { - keys: self.updates.keys, - vals: self.updates.vals, - times: crate::updates::Lists { values: new_times, bounds: self.updates.times.bounds }, - diffs: self.updates.diffs, - }, - records: self.records, - } - } - } - - impl Leave for RecordedUpdates<(K, V, T1, R)> - where - (K, V, T1, R): Update, - (K, V, T2, R): Update, - T1: Refines + Columnar + Default + Clone, - T2: Timestamp + Columnar + Default + Clone, - K: Columnar, V: Columnar, R: Columnar, - { - type OuterContainer = RecordedUpdates<(K, V, T2, R)>; - fn leave(self) -> Self::OuterContainer { - // Flatten, convert times, and reconsolidate via consolidate. - // Leave can collapse distinct T1 times to the same T2 time, - // so the trie must be rebuilt with consolidation. - let mut flat = Updates::<(K, V, T2, R)>::default(); - let mut t1_owned = T1::default(); - for (k, v, t, d) in self.updates.iter() { - Columnar::copy_from(&mut t1_owned, t); - let t2: T2 = t1_owned.clone().to_outer(); - flat.push((k, v, &t2, d)); - } - RecordedUpdates { - updates: flat.consolidate(), - records: self.records, - } - } - } - - impl ResultsIn<::Summary> for RecordedUpdates { - fn results_in(self, step: &::Summary) -> Self { - use timely::progress::PathSummary; - // Apply results_in to each time; drop updates whose time maps to None. - // This must rebuild the trie since some entries may be removed. - let mut output = Updates::::default(); - let mut time_owned = U::Time::default(); - for (k, v, t, d) in self.updates.iter() { - Columnar::copy_from(&mut time_owned, t); - if let Some(new_time) = step.results_in(&time_owned) { - output.push((k, v, &new_time, d)); - } - } - RecordedUpdates { updates: output, records: self.records } - } - } -} - -pub use column_builder::ValBuilder as ValColBuilder; -mod column_builder { - - use std::collections::VecDeque; - use columnar::{Columnar, Clear, Len, Push}; - - use crate::layout::ColumnarUpdate as Update; - use crate::{Updates, RecordedUpdates}; - - type TupleContainer = <(::Key, ::Val, ::Time, ::Diff) as Columnar>::Container; - - /// A container builder that produces `RecordedUpdates` (sorted, consolidated trie + record count). - pub struct ValBuilder { - /// Container that we're writing to. - current: TupleContainer, - /// Empty allocation. - empty: Option>, - /// Completed containers pending to be sent. - pending: VecDeque>, - } - - use timely::container::PushInto; - impl PushInto for ValBuilder where TupleContainer : Push { - #[inline] - fn push_into(&mut self, item: T) { - self.current.push(item); - if self.current.len() > 1024 * 1024 { - use columnar::{Borrow, Index}; - let records = self.current.len(); - let mut refs = self.current.borrow().into_index_iter().collect::>(); - refs.sort(); - let updates = Updates::form(refs.into_iter()); - self.pending.push_back(RecordedUpdates { updates, records }); - self.current.clear(); - } - } - } - - impl Default for ValBuilder { - fn default() -> Self { - ValBuilder { - current: Default::default(), - empty: None, - pending: Default::default(), - } - } - } - - use timely::container::{ContainerBuilder, LengthPreservingContainerBuilder}; - impl ContainerBuilder for ValBuilder { - type Container = RecordedUpdates; - - #[inline] - fn extract(&mut self) -> Option<&mut Self::Container> { - if let Some(container) = self.pending.pop_front() { - self.empty = Some(container); - self.empty.as_mut() - } else { - None - } - } - - #[inline] - fn finish(&mut self) -> Option<&mut Self::Container> { - if !self.current.is_empty() { - use columnar::{Borrow, Index}; - let records = self.current.len(); - let mut refs = self.current.borrow().into_index_iter().collect::>(); - refs.sort(); - let updates = Updates::form(refs.into_iter()); - self.pending.push_back(RecordedUpdates { updates, records }); - self.current.clear(); - } - self.empty = self.pending.pop_front(); - self.empty.as_mut() - } - } - - impl LengthPreservingContainerBuilder for ValBuilder { } - -} - -use distributor::ValPact; -mod distributor { - - use std::rc::Rc; - - use columnar::{Borrow, Index, Len}; - use timely::logging::TimelyLogger; - use timely::dataflow::channels::pushers::{Exchange, exchange::Distributor}; - use timely::dataflow::channels::Message; - use timely::dataflow::channels::pact::{LogPuller, LogPusher, ParallelizationContract}; - use timely::progress::Timestamp; - use timely::worker::AsWorker; - - use crate::layout::ColumnarUpdate as Update; - use crate::{Updates, RecordedUpdates}; - - pub struct ValDistributor { - marker: std::marker::PhantomData, - hashfunc: H, - pre_lens: Vec, - } - - impl FnMut(columnar::Ref<'a, U::Key>)->u64> Distributor> for ValDistributor { - // TODO: For unsorted Updates (stride-1 outer keys), each key is its own outer group, - // so the per-group pre_lens snapshot and seal check costs O(keys × workers). Should - // either batch keys by destination first, or detect stride-1 outer bounds and use a - // simpler single-pass partitioning that seals once at the end. - fn partition>>>(&mut self, container: &mut RecordedUpdates, time: &T, pushers: &mut [P]) { - use crate::updates::child_range; - - let keys_b = container.updates.keys.borrow(); - let mut outputs: Vec> = (0..pushers.len()).map(|_| Updates::default()).collect(); - - // Each outer key group becomes a separate run in the destination. - for outer in 0..Len::len(&keys_b) { - self.pre_lens.clear(); - self.pre_lens.extend(outputs.iter().map(|o| o.keys.values.len())); - for k in child_range(keys_b.bounds, outer) { - let key = keys_b.values.get(k); - let idx = ((self.hashfunc)(key) as usize) % pushers.len(); - outputs[idx].extend_from_keys(&container.updates, k..k+1); - } - for (output, &pre) in outputs.iter_mut().zip(self.pre_lens.iter()) { - if output.keys.values.len() > pre { - output.keys.bounds.push(output.keys.values.len() as u64); - } - } - } - - // Distribute the input's record count across non-empty outputs. - let total_records = container.records; - let non_empty: usize = outputs.iter().filter(|o| !o.keys.values.is_empty()).count(); - let mut first_records = total_records.saturating_sub(non_empty.saturating_sub(1)); - for (pusher, output) in pushers.iter_mut().zip(outputs) { - if !output.keys.values.is_empty() { - let recorded = RecordedUpdates { updates: output, records: first_records }; - first_records = 1; - let mut recorded = recorded; - Message::push_at(&mut recorded, time.clone(), pusher); - } - } - } - fn flush>>>(&mut self, _time: &T, _pushers: &mut [P]) { } - fn relax(&mut self) { } - } - - pub struct ValPact { pub hashfunc: H } - - impl ParallelizationContract> for ValPact - where - T: Timestamp, - U: Update, - H: for<'a> FnMut(columnar::Ref<'a, U::Key>)->u64 + 'static, - { - type Pusher = Exchange< - T, - LogPusher>>>>, - ValDistributor - >; - type Puller = LogPuller>>>>; - - fn connect(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option) -> (Self::Pusher, Self::Puller) { - let (senders, receiver) = allocator.allocate::>>(identifier, address); - let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone())).collect::>(); - let distributor = ValDistributor { - marker: std::marker::PhantomData, - hashfunc: self.hashfunc, - pre_lens: Vec::new(), - }; - (Exchange::new(senders, distributor), LogPuller::new(receiver, allocator.index(), identifier, logging.clone())) - } - } -} - -pub use arrangement::{ValBatcher, ValBuilder, ValSpine}; -pub mod arrangement { - - use std::rc::Rc; - use differential_dataflow::trace::implementations::ord_neu::OrdValBatch; - use differential_dataflow::trace::rc_blanket_impls::RcBuilder; - use differential_dataflow::trace::implementations::spine_fueled::Spine; - - use crate::layout::ColumnarLayout; - - /// A trace implementation backed by columnar storage. - pub type ValSpine = Spine>>>; - /// A batcher for columnar storage. - pub type ValBatcher = ValBatcher2<(K,V,T,R)>; - /// A builder for columnar storage. - pub type ValBuilder = RcBuilder>; - - /// A batch container implementation for Coltainer. - pub use batch_container::Coltainer; - pub mod batch_container { - - use columnar::{Borrow, Columnar, Container, Clear, Push, Index, Len}; - use differential_dataflow::trace::implementations::BatchContainer; - - /// Container, anchored by `C` to provide an owned type. - pub struct Coltainer { - pub container: C::Container, - } - - impl Default for Coltainer { - fn default() -> Self { Self { container: Default::default() } } - } - - impl BatchContainer for Coltainer where for<'a> columnar::Ref<'a, C> : Ord { - - type ReadItem<'a> = columnar::Ref<'a, C>; - type Owned = C; - - #[inline(always)] fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned { C::into_owned(item) } - #[inline(always)] fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) { other.copy_from(item) } - - #[inline(always)] fn push_ref(&mut self, item: Self::ReadItem<'_>) { self.container.push(item) } - #[inline(always)] fn push_own(&mut self, item: &Self::Owned) { self.container.push(item) } - - /// Clears the container. May not release resources. - fn clear(&mut self) { self.container.clear() } - - /// Creates a new container with sufficient capacity. - fn with_capacity(_size: usize) -> Self { Self::default() } - /// Creates a new container with sufficient capacity. - fn merge_capacity(cont1: &Self, cont2: &Self) -> Self { - Self { - container: ::Container::with_capacity_for([cont1.container.borrow(), cont2.container.borrow()].into_iter()), - } - } - - /// Converts a read item into one with a narrower lifetime. - #[inline(always)] fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { columnar::ContainerOf::::reborrow_ref(item) } - - /// Reference to the element at this position. - #[inline(always)] fn index(&self, index: usize) -> Self::ReadItem<'_> { self.container.borrow().get(index) } - - #[inline(always)] fn len(&self) -> usize { self.container.len() } - } - } - - use crate::{Updates, RecordedUpdates}; - use differential_dataflow::trace::implementations::merge_batcher::{MergeBatcher, InternalMerger}; - type ValBatcher2 = MergeBatcher, TrieChunker, InternalMerger>>; - - /// A chunker that unwraps `RecordedUpdates` into bare `Updates` for the merge batcher. - /// The `records` accounting is discarded here — it has served its purpose for exchange. - /// - /// IMPORTANT: This chunker assumes the input `Updates` are sorted and consolidated - /// (as produced by `ValColBuilder::form`). The downstream `InternalMerge` relies on - /// this invariant. If `RecordedUpdates` could carry unsorted data (e.g. from a `map`), - /// we would need either a sorting chunker for that case, or a type-level distinction - /// (e.g. `RecordedUpdates` vs `RecordedUpdates`) to - /// route to the right chunker. - pub struct TrieChunker { - ready: std::collections::VecDeque>, - empty: Option>, - } - - impl Default for TrieChunker { - fn default() -> Self { Self { ready: Default::default(), empty: None } } - } - - impl<'a, U: crate::layout::ColumnarUpdate> timely::container::PushInto<&'a mut RecordedUpdates> for TrieChunker { - fn push_into(&mut self, container: &'a mut RecordedUpdates) { - self.ready.push_back(std::mem::take(&mut container.updates)); - } - } - - impl timely::container::ContainerBuilder for TrieChunker { - type Container = Updates; - fn extract(&mut self) -> Option<&mut Self::Container> { - if let Some(ready) = self.ready.pop_front() { - self.empty = Some(ready); - self.empty.as_mut() - } else { - None - } - } - fn finish(&mut self) -> Option<&mut Self::Container> { - self.empty = self.ready.pop_front(); - self.empty.as_mut() - } - } - - pub mod batcher { - - use std::ops::Range; - use columnar::{Borrow, Columnar, Container, Index, Len, Push}; - use differential_dataflow::difference::{Semigroup, IsZero}; - use timely::progress::frontier::{Antichain, AntichainRef}; - use differential_dataflow::trace::implementations::merge_batcher::container::InternalMerge; - - use crate::ColumnarUpdate as Update; - use crate::Updates; - - impl timely::container::SizableContainer for Updates { - fn at_capacity(&self) -> bool { - use columnar::Len; - self.diffs.values.len() >= 64 * 1024 - } - fn ensure_capacity(&mut self, _stash: &mut Option) { } - } - - impl InternalMerge for Updates { - - type TimeOwned = U::Time; - - fn len(&self) -> usize { self.diffs.values.len() } - fn clear(&mut self) { *self = Self::default(); } - - #[inline(never)] - fn merge_from(&mut self, others: &mut [Self], positions: &mut [usize]) { - match others.len() { - 0 => {}, - 1 => { - // Bulk copy: take remaining keys from position onward. - let other = &mut others[0]; - let pos = &mut positions[0]; - if self.keys.values.len() == 0 && *pos == 0 { - std::mem::swap(self, other); - return; - } - let other_len = other.keys.values.len(); - self.extend_from_keys(other, *pos .. other_len); - *pos = other_len; - }, - 2 => { - let mut this_sum = U::Diff::default(); - let mut that_sum = U::Diff::default(); - - let (left, right) = others.split_at_mut(1); - let this = &left[0]; - let that = &right[0]; - let this_keys = this.keys.values.borrow(); - let that_keys = that.keys.values.borrow(); - let mut this_key_range = positions[0] .. this_keys.len(); - let mut that_key_range = positions[1] .. that_keys.len(); - - while !this_key_range.is_empty() && !that_key_range.is_empty() && !timely::container::SizableContainer::at_capacity(self) { - let this_key = this_keys.get(this_key_range.start); - let that_key = that_keys.get(that_key_range.start); - match this_key.cmp(&that_key) { - std::cmp::Ordering::Less => { - let lower = this_key_range.start; - gallop(this_keys, &mut this_key_range, |x| x < that_key); - self.extend_from_keys(this, lower .. this_key_range.start); - }, - std::cmp::Ordering::Equal => { - let values_len = self.vals.values.len(); - let mut this_val_range = this.vals_bounds(this_key_range.start .. this_key_range.start+1); - let mut that_val_range = that.vals_bounds(that_key_range.start .. that_key_range.start+1); - while !this_val_range.is_empty() && !that_val_range.is_empty() { - let this_val = this.vals.values.borrow().get(this_val_range.start); - let that_val = that.vals.values.borrow().get(that_val_range.start); - match this_val.cmp(&that_val) { - std::cmp::Ordering::Less => { - let lower = this_val_range.start; - gallop(this.vals.values.borrow(), &mut this_val_range, |x| x < that_val); - self.extend_from_vals(this, lower .. this_val_range.start); - }, - std::cmp::Ordering::Equal => { - let updates_len = self.times.values.len(); - let mut this_time_range = this.times_bounds(this_val_range.start .. this_val_range.start+1); - let mut that_time_range = that.times_bounds(that_val_range.start .. that_val_range.start+1); - while !this_time_range.is_empty() && !that_time_range.is_empty() { - let this_time = this.times.values.borrow().get(this_time_range.start); - let this_diff = this.diffs.values.borrow().get(this_time_range.start); - let that_time = that.times.values.borrow().get(that_time_range.start); - let that_diff = that.diffs.values.borrow().get(that_time_range.start); - match this_time.cmp(&that_time) { - std::cmp::Ordering::Less => { - let lower = this_time_range.start; - gallop(this.times.values.borrow(), &mut this_time_range, |x| x < that_time); - self.times.values.extend_from_self(this.times.values.borrow(), lower .. this_time_range.start); - self.diffs.extend_from_self(this.diffs.borrow(), lower .. this_time_range.start); - }, - std::cmp::Ordering::Equal => { - this_sum.copy_from(this_diff); - that_sum.copy_from(that_diff); - this_sum.plus_equals(&that_sum); - if !this_sum.is_zero() { - self.times.values.push(this_time); - self.diffs.values.push(&this_sum); - self.diffs.bounds.push(self.diffs.values.len() as u64); - } - this_time_range.start += 1; - that_time_range.start += 1; - }, - std::cmp::Ordering::Greater => { - let lower = that_time_range.start; - gallop(that.times.values.borrow(), &mut that_time_range, |x| x < this_time); - self.times.values.extend_from_self(that.times.values.borrow(), lower .. that_time_range.start); - self.diffs.extend_from_self(that.diffs.borrow(), lower .. that_time_range.start); - }, - } - } - // Remaining from this side - if !this_time_range.is_empty() { - self.times.values.extend_from_self(this.times.values.borrow(), this_time_range.clone()); - self.diffs.extend_from_self(this.diffs.borrow(), this_time_range.clone()); - } - // Remaining from that side - if !that_time_range.is_empty() { - self.times.values.extend_from_self(that.times.values.borrow(), that_time_range.clone()); - self.diffs.extend_from_self(that.diffs.borrow(), that_time_range.clone()); - } - if self.times.values.len() > updates_len { - self.times.bounds.push(self.times.values.len() as u64); - self.vals.values.push(this_val); - } - this_val_range.start += 1; - that_val_range.start += 1; - }, - std::cmp::Ordering::Greater => { - let lower = that_val_range.start; - gallop(that.vals.values.borrow(), &mut that_val_range, |x| x < this_val); - self.extend_from_vals(that, lower .. that_val_range.start); - }, - } - } - self.extend_from_vals(this, this_val_range); - self.extend_from_vals(that, that_val_range); - if self.vals.values.len() > values_len { - self.vals.bounds.push(self.vals.values.len() as u64); - self.keys.values.push(this_key); - } - this_key_range.start += 1; - that_key_range.start += 1; - }, - std::cmp::Ordering::Greater => { - let lower = that_key_range.start; - gallop(that_keys, &mut that_key_range, |x| x < this_key); - self.extend_from_keys(that, lower .. that_key_range.start); - }, - } - } - // Copy remaining from whichever side has data, up to capacity. - while !this_key_range.is_empty() && !timely::container::SizableContainer::at_capacity(self) { - let lower = this_key_range.start; - this_key_range.start = this_key_range.end; // take all remaining - self.extend_from_keys(this, lower .. this_key_range.start); - } - while !that_key_range.is_empty() && !timely::container::SizableContainer::at_capacity(self) { - let lower = that_key_range.start; - that_key_range.start = that_key_range.end; - self.extend_from_keys(that, lower .. that_key_range.start); - } - positions[0] = this_key_range.start; - positions[1] = that_key_range.start; - }, - n => unimplemented!("{n}-way merge not supported"), - } - } - - fn extract( - &mut self, - upper: AntichainRef, - frontier: &mut Antichain, - keep: &mut Self, - ship: &mut Self, - ) { - let mut time = U::Time::default(); - for key_idx in 0 .. self.keys.values.len() { - let key = self.keys.values.borrow().get(key_idx); - let keep_vals_len = keep.vals.values.len(); - let ship_vals_len = ship.vals.values.len(); - for val_idx in self.vals_bounds(key_idx..key_idx+1) { - let val = self.vals.values.borrow().get(val_idx); - let keep_times_len = keep.times.values.len(); - let ship_times_len = ship.times.values.len(); - for time_idx in self.times_bounds(val_idx..val_idx+1) { - let t = self.times.values.borrow().get(time_idx); - let diff = self.diffs.values.borrow().get(time_idx); - time.copy_from(t); - if upper.less_equal(&time) { - frontier.insert_ref(&time); - keep.times.values.push(t); - keep.diffs.values.push(diff); - keep.diffs.bounds.push(keep.diffs.values.len() as u64); - } - else { - ship.times.values.push(t); - ship.diffs.values.push(diff); - ship.diffs.bounds.push(ship.diffs.values.len() as u64); - } - } - if keep.times.values.len() > keep_times_len { - keep.times.bounds.push(keep.times.values.len() as u64); - keep.vals.values.push(val); - } - if ship.times.values.len() > ship_times_len { - ship.times.bounds.push(ship.times.values.len() as u64); - ship.vals.values.push(val); - } - } - if keep.vals.values.len() > keep_vals_len { - keep.vals.bounds.push(keep.vals.values.len() as u64); - keep.keys.values.push(key); - } - if ship.vals.values.len() > ship_vals_len { - ship.vals.bounds.push(ship.vals.values.len() as u64); - ship.keys.values.push(key); - } - } - } - } - - #[inline(always)] - pub(crate) fn gallop(input: TC, range: &mut Range, mut cmp: impl FnMut(::Ref) -> bool) { - // if empty input, or already >= element, return - if !Range::::is_empty(range) && cmp(input.get(range.start)) { - let mut step = 1; - while range.start + step < range.end && cmp(input.get(range.start + step)) { - range.start += step; - step <<= 1; - } - - step >>= 1; - while step > 0 { - if range.start + step < range.end && cmp(input.get(range.start + step)) { - range.start += step; - } - step >>= 1; - } - - range.start += 1; - } - } - } - - use builder::ValMirror; - pub mod builder { - - use differential_dataflow::trace::implementations::ord_neu::{Vals, Upds}; - use differential_dataflow::trace::implementations::ord_neu::val_batch::{OrdValBatch, OrdValStorage}; - use differential_dataflow::trace::Description; - - use crate::Updates; - use crate::layout::ColumnarUpdate as Update; - use crate::layout::ColumnarLayout as Layout; - use crate::arrangement::Coltainer; - - use columnar::{Borrow, IndexAs}; - use columnar::primitive::offsets::Strides; - use differential_dataflow::trace::implementations::OffsetList; - fn strides_to_offset_list(bounds: &Strides, count: usize) -> OffsetList { - let mut output = OffsetList::with_capacity(count); - output.push(0); - let bounds_b = bounds.borrow(); - for i in 0..count { - output.push(bounds_b.index_as(i) as usize); - } - output - } - - pub struct ValMirror { - current: Updates, - } - impl differential_dataflow::trace::Builder for ValMirror { - type Time = U::Time; - type Input = Updates; - type Output = OrdValBatch>; - - fn with_capacity(_keys: usize, _vals: usize, _upds: usize) -> Self { - Self { current: Updates::default() } - } - fn push(&mut self, chunk: &mut Self::Input) { - use columnar::Len; - let len = chunk.keys.values.len(); - if len > 0 { - self.current.extend_from_keys(chunk, 0..len); - } - } - fn done(self, description: Description) -> Self::Output { - let mut chain = if self.current.len() > 0 { - vec![self.current] - } else { - vec![] - }; - Self::seal(&mut chain, description) - } - fn seal(chain: &mut Vec, description: Description) -> Self::Output { - if chain.len() == 0 { - let storage = OrdValStorage { - keys: Default::default(), - vals: Default::default(), - upds: Default::default(), - }; - OrdValBatch { storage, description, updates: 0 } - } - else if chain.len() == 1 { - use columnar::Len; - let storage = chain.pop().unwrap(); - let updates = storage.diffs.values.len(); - let val_offs = strides_to_offset_list(&storage.vals.bounds, storage.keys.values.len()); - let time_offs = strides_to_offset_list(&storage.times.bounds, storage.vals.values.len()); - let storage = OrdValStorage { - keys: Coltainer { container: storage.keys.values }, - vals: Vals { - offs: val_offs, - vals: Coltainer { container: storage.vals.values }, - }, - upds: Upds { - offs: time_offs, - times: Coltainer { container: storage.times.values }, - diffs: Coltainer { container: storage.diffs.values }, - }, - }; - OrdValBatch { storage, description, updates } - } - else { - use columnar::Len; - let mut merged = chain.remove(0); - for other in chain.drain(..) { - let len = other.keys.values.len(); - if len > 0 { - merged.extend_from_keys(&other, 0..len); - } - } - chain.push(merged); - Self::seal(chain, description) - } - } - } - - } -} - -pub mod updates { - - use columnar::{Columnar, Container, ContainerOf, Vecs, Borrow, Index, IndexAs, Len, Push}; - use columnar::primitive::offsets::Strides; - use differential_dataflow::difference::{Semigroup, IsZero}; - - use crate::layout::ColumnarUpdate as Update; - - /// A `Vecs` using strided offsets. - pub type Lists = Vecs; - - /// Trie-structured update storage using columnar containers. - /// - /// Four nested layers of `Lists`: - /// - `keys`: lists of keys (outer lists are independent groups) - /// - `vals`: per-key, lists of vals - /// - `times`: per-val, lists of times - /// - `diffs`: per-time, lists of diffs (singletons when consolidated) - /// - /// A flat unsorted input has stride 1 at every level (one key per entry, - /// one val per key, one time per val, one diff per time). - /// A fully consolidated trie has a single outer key list, all lists sorted - /// and deduplicated, and singleton diff lists. - pub struct Updates { - pub keys: Lists>, - pub vals: Lists>, - pub times: Lists>, - pub diffs: Lists>, - } - - impl Default for Updates { - fn default() -> Self { - Self { - keys: Default::default(), - vals: Default::default(), - times: Default::default(), - diffs: Default::default(), - } - } - } - - impl std::fmt::Debug for Updates { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("Updates").finish() - } - } - - impl Clone for Updates { - fn clone(&self) -> Self { - Self { - keys: self.keys.clone(), - vals: self.vals.clone(), - times: self.times.clone(), - diffs: self.diffs.clone(), - } - } - } - - pub type Tuple = (::Key, ::Val, ::Time, ::Diff); - - /// Returns the value-index range for list `i` given cumulative bounds. - #[inline] - pub fn child_range>(bounds: B, i: usize) -> std::ops::Range { - let lower = if i == 0 { 0 } else { bounds.index_as(i - 1) as usize }; - let upper = bounds.index_as(i) as usize; - lower..upper - } - - impl Updates { - - pub fn vals_bounds(&self, key_range: std::ops::Range) -> std::ops::Range { - if !key_range.is_empty() { - let bounds = self.vals.bounds.borrow(); - let lower = if key_range.start == 0 { 0 } else { bounds.index_as(key_range.start - 1) as usize }; - let upper = bounds.index_as(key_range.end - 1) as usize; - lower..upper - } else { key_range } - } - pub fn times_bounds(&self, val_range: std::ops::Range) -> std::ops::Range { - if !val_range.is_empty() { - let bounds = self.times.bounds.borrow(); - let lower = if val_range.start == 0 { 0 } else { bounds.index_as(val_range.start - 1) as usize }; - let upper = bounds.index_as(val_range.end - 1) as usize; - lower..upper - } else { val_range } - } - pub fn diffs_bounds(&self, time_range: std::ops::Range) -> std::ops::Range { - if !time_range.is_empty() { - let bounds = self.diffs.bounds.borrow(); - let lower = if time_range.start == 0 { 0 } else { bounds.index_as(time_range.start - 1) as usize }; - let upper = bounds.index_as(time_range.end - 1) as usize; - lower..upper - } else { time_range } - } - - /// Copies `other[key_range]` into self, keys and all. - pub fn extend_from_keys(&mut self, other: &Self, key_range: std::ops::Range) { - self.keys.values.extend_from_self(other.keys.values.borrow(), key_range.clone()); - self.vals.extend_from_self(other.vals.borrow(), key_range.clone()); - let val_range = other.vals_bounds(key_range); - self.times.extend_from_self(other.times.borrow(), val_range.clone()); - let time_range = other.times_bounds(val_range); - self.diffs.extend_from_self(other.diffs.borrow(), time_range); - } - - /// Copies a range of vals (with their times and diffs) from `other` into self. - pub fn extend_from_vals(&mut self, other: &Self, val_range: std::ops::Range) { - self.vals.values.extend_from_self(other.vals.values.borrow(), val_range.clone()); - self.times.extend_from_self(other.times.borrow(), val_range.clone()); - let time_range = other.times_bounds(val_range); - self.diffs.extend_from_self(other.diffs.borrow(), time_range); - } - - /// Forms a consolidated `Updates` from sorted `(key, val, time, diff)` refs. - /// - /// Tracks a `prev` reference to the previous element. On each new element, - /// compares against `prev` to detect key/val/time changes. Only pushes - /// accumulated diffs when they are nonzero, and only emits times/vals/keys - /// that have at least one nonzero diff beneath them. - pub fn form<'a>(mut sorted: impl Iterator>>) -> Self { - - let mut output = Self::default(); - let mut diff_stash = U::Diff::default(); - let mut diff_temp = U::Diff::default(); - - if let Some(first) = sorted.next() { - - let mut prev = first; - Columnar::copy_from(&mut diff_stash, prev.3); - - for curr in sorted { - let key_differs = ContainerOf::::reborrow_ref(curr.0) != ContainerOf::::reborrow_ref(prev.0); - let val_differs = key_differs || ContainerOf::::reborrow_ref(curr.1) != ContainerOf::::reborrow_ref(prev.1); - let time_differs = val_differs || ContainerOf::::reborrow_ref(curr.2) != ContainerOf::::reborrow_ref(prev.2); - - if time_differs { - // Flush the accumulated diff for prev's (key, val, time). - if !diff_stash.is_zero() { - // We have a real update to emit. Push time (and val/key - // if this is the first time under them). - let times_len = output.times.values.len(); - let vals_len = output.vals.values.len(); - - if val_differs { - // Seal the previous val's time list, if any times were emitted. - if times_len > 0 { - output.times.bounds.push(times_len as u64); - } - if key_differs { - // Seal the previous key's val list, if any vals were emitted. - if vals_len > 0 { - output.vals.bounds.push(vals_len as u64); - } - output.keys.values.push(prev.0); - } - output.vals.values.push(prev.1); - } - output.times.values.push(prev.2); - output.diffs.values.push(&diff_stash); - output.diffs.bounds.push(output.diffs.values.len() as u64); - } - Columnar::copy_from(&mut diff_stash, curr.3); - } else { - // Same (key, val, time): accumulate diff. - Columnar::copy_from(&mut diff_temp, curr.3); - diff_stash.plus_equals(&diff_temp); - } - prev = curr; - } - - // Flush the final accumulated diff. - if !diff_stash.is_zero() { - let keys_len = output.keys.values.len(); - let vals_len = output.vals.values.len(); - let times_len = output.times.values.len(); - let need_key = keys_len == 0 || ContainerOf::::reborrow_ref(prev.0) != output.keys.values.borrow().get(keys_len - 1); - let need_val = need_key || vals_len == 0 || ContainerOf::::reborrow_ref(prev.1) != output.vals.values.borrow().get(vals_len - 1); - - if need_val { - if times_len > 0 { - output.times.bounds.push(times_len as u64); - } - if need_key { - if vals_len > 0 { - output.vals.bounds.push(vals_len as u64); - } - output.keys.values.push(prev.0); - } - output.vals.values.push(prev.1); - } - output.times.values.push(prev.2); - output.diffs.values.push(&diff_stash); - output.diffs.bounds.push(output.diffs.values.len() as u64); - } - - // Seal the final groups at each level. - if !output.times.values.is_empty() { - output.times.bounds.push(output.times.values.len() as u64); - } - if !output.vals.values.is_empty() { - output.vals.bounds.push(output.vals.values.len() as u64); - } - if !output.keys.values.is_empty() { - output.keys.bounds.push(output.keys.values.len() as u64); - } - } - - output - } - - /// Consolidates into canonical trie form: - /// single outer key list, all lists sorted and deduplicated, - /// diff lists are singletons (or absent if cancelled). - pub fn consolidate(self) -> Self { - - let Self { keys, vals, times, diffs } = self; - - let keys_b = keys.borrow(); - let vals_b = vals.borrow(); - let times_b = times.borrow(); - let diffs_b = diffs.borrow(); - - // Flatten to index tuples: [key_abs, val_abs, time_abs, diff_abs]. - let mut tuples: Vec<[usize; 4]> = Vec::new(); - for outer in 0..Len::len(&keys_b) { - for k in child_range(keys_b.bounds, outer) { - for v in child_range(vals_b.bounds, k) { - for t in child_range(times_b.bounds, v) { - for d in child_range(diffs_b.bounds, t) { - tuples.push([k, v, t, d]); - } - } - } - } - } - - // Sort by (key, val, time). Diff is payload. - tuples.sort_by(|a, b| { - keys_b.values.get(a[0]).cmp(&keys_b.values.get(b[0])) - .then_with(|| vals_b.values.get(a[1]).cmp(&vals_b.values.get(b[1]))) - .then_with(|| times_b.values.get(a[2]).cmp(×_b.values.get(b[2]))) - }); - - // Build consolidated output, bottom-up cancellation. - let mut output = Self::default(); - let mut diff_stash = U::Diff::default(); - let mut diff_temp = U::Diff::default(); - - let mut idx = 0; - while idx < tuples.len() { - let key_ref = keys_b.values.get(tuples[idx][0]); - let key_start_vals = output.vals.values.len(); - - // All entries with this key. - while idx < tuples.len() && keys_b.values.get(tuples[idx][0]) == key_ref { - let val_ref = vals_b.values.get(tuples[idx][1]); - let val_start_times = output.times.values.len(); - - // All entries with this (key, val). - while idx < tuples.len() - && keys_b.values.get(tuples[idx][0]) == key_ref - && vals_b.values.get(tuples[idx][1]) == val_ref - { - let time_ref = times_b.values.get(tuples[idx][2]); - - // Sum all diffs for this (key, val, time). - Columnar::copy_from(&mut diff_stash, diffs_b.values.get(tuples[idx][3])); - idx += 1; - while idx < tuples.len() - && keys_b.values.get(tuples[idx][0]) == key_ref - && vals_b.values.get(tuples[idx][1]) == val_ref - && times_b.values.get(tuples[idx][2]) == time_ref - { - Columnar::copy_from(&mut diff_temp, diffs_b.values.get(tuples[idx][3])); - diff_stash.plus_equals(&diff_temp); - idx += 1; - } - - // Emit time + singleton diff if nonzero. - if !diff_stash.is_zero() { - output.times.values.push(time_ref); - output.diffs.values.push(&diff_stash); - output.diffs.bounds.push(output.diffs.values.len() as u64); - } - } - - // Seal time list for this val; emit val if any times survived. - if output.times.values.len() > val_start_times { - output.times.bounds.push(output.times.values.len() as u64); - output.vals.values.push(val_ref); - } - } - - // Seal val list for this key; emit key if any vals survived. - if output.vals.values.len() > key_start_vals { - output.vals.bounds.push(output.vals.values.len() as u64); - output.keys.values.push(key_ref); - } - } - - // Seal the single outer key list. - if !output.keys.values.is_empty() { - output.keys.bounds.push(output.keys.values.len() as u64); - } - - output - } - - /// The number of leaf-level diff entries (total updates). - pub fn len(&self) -> usize { self.diffs.values.len() } - } - - /// Push a single flat update as a stride-1 entry. - /// - /// Each field is independently typed — columnar refs, `&Owned`, owned values, - /// or any other type the column container accepts via its `Push` impl. - impl Push<(KP, VP, TP, DP)> for Updates - where - ContainerOf: Push, - ContainerOf: Push, - ContainerOf: Push, - ContainerOf: Push, - { - fn push(&mut self, (key, val, time, diff): (KP, VP, TP, DP)) { - self.keys.values.push(key); - self.keys.bounds.push(self.keys.values.len() as u64); - self.vals.values.push(val); - self.vals.bounds.push(self.vals.values.len() as u64); - self.times.values.push(time); - self.times.bounds.push(self.times.values.len() as u64); - self.diffs.values.push(diff); - self.diffs.bounds.push(self.diffs.values.len() as u64); - } - } - - /// PushInto for the `((K, V), T, R)` shape that reduce_trace uses. - impl timely::container::PushInto<((U::Key, U::Val), U::Time, U::Diff)> for Updates { - fn push_into(&mut self, ((key, val), time, diff): ((U::Key, U::Val), U::Time, U::Diff)) { - self.push((&key, &val, &time, &diff)); - } - } - - impl Updates { - - /// Iterate all `(key, val, time, diff)` entries as refs. - pub fn iter(&self) -> impl Iterator, - columnar::Ref<'_, U::Val>, - columnar::Ref<'_, U::Time>, - columnar::Ref<'_, U::Diff>, - )> { - let keys_b = self.keys.borrow(); - let vals_b = self.vals.borrow(); - let times_b = self.times.borrow(); - let diffs_b = self.diffs.borrow(); - - let mut result = Vec::new(); - for outer in 0..Len::len(&keys_b) { - for k in child_range(keys_b.bounds, outer) { - let key = keys_b.values.get(k); - for v in child_range(vals_b.bounds, k) { - let val = vals_b.values.get(v); - for t in child_range(times_b.bounds, v) { - let time = times_b.values.get(t); - for d in child_range(diffs_b.bounds, t) { - let diff = diffs_b.values.get(d); - result.push((key, val, time, diff)); - } - } - } - } - } - result.into_iter() - } - } - - impl timely::Accountable for Updates { - #[inline] fn record_count(&self) -> i64 { Len::len(&self.diffs.values) as i64 } - } - - impl timely::dataflow::channels::ContainerBytes for Updates { - fn from_bytes(_bytes: timely::bytes::arc::Bytes) -> Self { unimplemented!() } - fn length_in_bytes(&self) -> usize { unimplemented!() } - fn into_bytes(&self, _writer: &mut W) { unimplemented!() } - } - - #[cfg(test)] - mod tests { - use super::*; - use columnar::Push; - - type TestUpdate = (u64, u64, u64, i64); - - fn collect(updates: &Updates) -> Vec<(u64, u64, u64, i64)> { - updates.iter().map(|(k, v, t, d)| (*k, *v, *t, *d)).collect() - } - - #[test] - fn test_push_and_consolidate_basic() { - let mut updates = Updates::::default(); - updates.push((&1, &10, &100, &1)); - updates.push((&1, &10, &100, &2)); - updates.push((&2, &20, &200, &5)); - assert_eq!(updates.len(), 3); - assert_eq!(collect(&updates.consolidate()), vec![(1, 10, 100, 3), (2, 20, 200, 5)]); - } - - #[test] - fn test_cancellation() { - let mut updates = Updates::::default(); - updates.push((&1, &10, &100, &3)); - updates.push((&1, &10, &100, &-3)); - updates.push((&2, &20, &200, &1)); - assert_eq!(collect(&updates.consolidate()), vec![(2, 20, 200, 1)]); - } - - #[test] - fn test_multiple_vals_and_times() { - let mut updates = Updates::::default(); - updates.push((&1, &10, &100, &1)); - updates.push((&1, &10, &200, &2)); - updates.push((&1, &20, &100, &3)); - updates.push((&1, &20, &100, &4)); - assert_eq!(collect(&updates.consolidate()), vec![(1, 10, 100, 1), (1, 10, 200, 2), (1, 20, 100, 7)]); - } - - #[test] - fn test_val_cancellation_propagates() { - let mut updates = Updates::::default(); - updates.push((&1, &10, &100, &5)); - updates.push((&1, &10, &100, &-5)); - updates.push((&1, &20, &100, &1)); - assert_eq!(collect(&updates.consolidate()), vec![(1, 20, 100, 1)]); - } - - #[test] - fn test_empty() { - let updates = Updates::::default(); - assert_eq!(collect(&updates.consolidate()), vec![]); - } - - #[test] - fn test_total_cancellation() { - let mut updates = Updates::::default(); - updates.push((&1, &10, &100, &1)); - updates.push((&1, &10, &100, &-1)); - assert_eq!(collect(&updates.consolidate()), vec![]); - } - - #[test] - fn test_unsorted_input() { - let mut updates = Updates::::default(); - updates.push((&3, &30, &300, &1)); - updates.push((&1, &10, &100, &2)); - updates.push((&2, &20, &200, &3)); - assert_eq!(collect(&updates.consolidate()), vec![(1, 10, 100, 2), (2, 20, 200, 3), (3, 30, 300, 1)]); - } - - #[test] - fn test_first_key_cancels() { - let mut updates = Updates::::default(); - updates.push((&1, &10, &100, &5)); - updates.push((&1, &10, &100, &-5)); - updates.push((&2, &20, &200, &3)); - assert_eq!(collect(&updates.consolidate()), vec![(2, 20, 200, 3)]); - } - - #[test] - fn test_middle_time_cancels() { - let mut updates = Updates::::default(); - updates.push((&1, &10, &100, &1)); - updates.push((&1, &10, &200, &2)); - updates.push((&1, &10, &200, &-2)); - updates.push((&1, &10, &300, &3)); - assert_eq!(collect(&updates.consolidate()), vec![(1, 10, 100, 1), (1, 10, 300, 3)]); - } - - #[test] - fn test_first_val_cancels() { - let mut updates = Updates::::default(); - updates.push((&1, &10, &100, &1)); - updates.push((&1, &10, &100, &-1)); - updates.push((&1, &20, &100, &5)); - assert_eq!(collect(&updates.consolidate()), vec![(1, 20, 100, 5)]); - } - - #[test] - fn test_interleaved_cancellations() { - let mut updates = Updates::::default(); - updates.push((&1, &10, &100, &1)); - updates.push((&1, &10, &100, &-1)); - updates.push((&2, &20, &200, &7)); - updates.push((&3, &30, &300, &4)); - updates.push((&3, &30, &300, &-4)); - assert_eq!(collect(&updates.consolidate()), vec![(2, 20, 200, 7)]); - } - } -} - -/// A columnar flat_map: iterates RecordedUpdates, calls logic per (key, val, time, diff), -/// joins output times with input times, multiplies output diffs with input diffs. -/// -/// This subsumes map, filter, negate, and enter_at for columnar collections. -pub fn join_function( - input: differential_dataflow::Collection>, - mut logic: L, -) -> differential_dataflow::Collection> -where - G: timely::dataflow::Scope, - G::Timestamp: differential_dataflow::lattice::Lattice, - U: layout::ColumnarUpdate>, - I: IntoIterator, - L: FnMut( - columnar::Ref<'_, U::Key>, - columnar::Ref<'_, U::Val>, - columnar::Ref<'_, U::Time>, - columnar::Ref<'_, U::Diff>, - ) -> I + 'static, -{ - use timely::dataflow::operators::generic::Operator; - use timely::dataflow::channels::pact::Pipeline; - use differential_dataflow::AsCollection; - use differential_dataflow::difference::Multiply; - use differential_dataflow::lattice::Lattice; - use columnar::Columnar; - - input - .inner - .unary::, _, _, _>(Pipeline, "JoinFunction", move |_, _| { - move |input, output| { - input.for_each(|time, data| { - let mut session = output.session_with_builder(&time); - for (k1, v1, t1, d1) in data.updates.iter() { - let t1o: U::Time = Columnar::into_owned(t1); - let d1o: U::Diff = Columnar::into_owned(d1); - for (k2, v2, t2, d2) in logic(k1, v1, t1, d1) { - let t3 = t2.join(&t1o); - let d3 = d2.multiply(&d1o); - session.give((&k2, &v2, &t3, &d3)); - } - } - }); - } - }) - .as_collection() -} - -/// Leave a dynamic iterative scope, truncating PointStamp coordinates. -/// -/// Uses OperatorBuilder (not unary) for the custom input connection summary -/// that tells timely how the PointStamp is affected (retain `level - 1` coordinates). -/// -/// Consolidates after truncation since distinct PointStamp coordinates can collapse. -pub fn leave_dynamic( - input: differential_dataflow::Collection>, - level: usize, -) -> differential_dataflow::Collection> -where - G: timely::dataflow::Scope, - K: columnar::Columnar, - V: columnar::Columnar, - R: columnar::Columnar, - (K, V, DynTime, R): layout::ColumnarUpdate, -{ - use timely::dataflow::channels::pact::Pipeline; - use timely::dataflow::operators::generic::builder_rc::OperatorBuilder; - use timely::dataflow::operators::generic::OutputBuilder; - use timely::order::Product; - use timely::progress::Antichain; - use differential_dataflow::AsCollection; - use differential_dataflow::dynamic::pointstamp::{PointStamp, PointStampSummary}; - use columnar::Columnar; - - let mut builder = OperatorBuilder::new("LeaveDynamic".to_string(), input.inner.scope()); - let (output, stream) = builder.new_output(); - let mut output = OutputBuilder::from(output); - let mut op_input = builder.new_input_connection( - input.inner, - Pipeline, - [( - 0, - Antichain::from_elem(Product { - outer: Default::default(), - inner: PointStampSummary { - retain: Some(level - 1), - actions: Vec::new(), - }, - }), - )], - ); - - builder.build(move |_capability| { - let mut col_builder = ValColBuilder::<(K, V, DynTime, R)>::default(); - move |_frontier| { - let mut output = output.activate(); - op_input.for_each(|cap, data| { - // Truncate the capability's timestamp. - let mut new_time = cap.time().clone(); - let mut vec = std::mem::take(&mut new_time.inner).into_inner(); - vec.truncate(level - 1); - new_time.inner = PointStamp::new(vec); - let new_cap = cap.delayed(&new_time, 0); - // Push updates with truncated times into the builder. - // The builder's form call on flush sorts and consolidates, - // handling the duplicate times that truncation can produce. - // TODO: The input trie is already sorted; a streaming form - // that accepts pre-sorted, potentially-collapsing timestamps - // could avoid the re-sort inside the builder. - for (k, v, t, d) in data.updates.iter() { - let mut time: DynTime = Columnar::into_owned(t); - let mut inner_vec = std::mem::take(&mut time.inner).into_inner(); - inner_vec.truncate(level - 1); - time.inner = PointStamp::new(inner_vec); - col_builder.push_into((k, v, &time, d)); - } - let mut session = output.session(&new_cap); - while let Some(container) = col_builder.finish() { - session.give_container(container); - } - }); - } - }); - - stream.as_collection() -} - -pub type DynTime = timely::order::Product>; - /// Reachability on a random directed graph using columnar containers. /// /// This module exercises the container traits needed for iterative columnar @@ -1533,7 +101,7 @@ mod reachability { use differential_dataflow::operators::arrange::arrangement::arrange_core; use differential_dataflow::operators::join::join_traces; - use crate::{RecordedUpdates, ValColBuilder, ValBatcher, ValBuilder, ValSpine, ValPact}; + use crate::columnar_support::*; type Node = u32; type Time = u64; @@ -1606,28 +174,7 @@ mod reachability { }); // Extract RecordedUpdates from the Arranged's batch stream. - use differential_dataflow::trace::{BatchReader, Cursor}; - use timely::container::{ContainerBuilder, PushInto}; - let result_col = result.as_container(|batch| { - let mut builder = ValColBuilder::<(Node, (), IterTime, Diff)>::default(); - let mut cursor = batch.cursor(); - while cursor.key_valid(&batch) { - let k: Node = *cursor.key(&batch); - while cursor.val_valid(&batch) { - let v: () = cursor.val(&batch); - cursor.map_times(&batch, |time, diff| { - builder.push_into((k, v, time.clone(), diff.clone())); - }); - cursor.step_val(&batch); - } - cursor.step_key(&batch); - } - let mut out = Vec::new(); - while let Some(container) = builder.finish() { - out.push(std::mem::take(container)); - } - out - }); + let result_col = as_recorded_updates::<_, (Node, (), IterTime, Diff)>(result); variable.set(result_col.clone()); diff --git a/differential-dataflow/examples/columnar_support.rs b/differential-dataflow/examples/columnar_support.rs new file mode 100644 index 000000000..ced700bec --- /dev/null +++ b/differential-dataflow/examples/columnar_support.rs @@ -0,0 +1,1491 @@ +//! Columnar container infrastructure for differential dataflow. +//! +//! Provides trie-structured update storage (`Updates`, `RecordedUpdates`), +//! columnar arrangement types (`ValSpine`, `ValBatcher`, `ValBuilder`), +//! container traits for iterative scopes (`Enter`, `Leave`, `Negate`, `ResultsIn`), +//! exchange distribution (`ValPact`), and operators (`join_function`, `leave_dynamic`). +//! +//! Include via `#[path = "columnar_support.rs"] mod columnar_support;` + +#![allow(dead_code, unused_imports)] + +pub use layout::{ColumnarLayout, ColumnarUpdate}; +pub mod layout { + + use std::fmt::Debug; + use columnar::Columnar; + use differential_dataflow::trace::implementations::{Layout, OffsetList}; + use differential_dataflow::difference::Semigroup; + use differential_dataflow::lattice::Lattice; + use timely::progress::Timestamp; + + /// A layout based on columnar + pub struct ColumnarLayout { + phantom: std::marker::PhantomData, + } + + impl ColumnarUpdate for (K, V, T, R) + where + K: Columnar + Debug + Ord + Clone + 'static, + V: Columnar + Debug + Ord + Clone + 'static, + T: Columnar + Debug + Ord + Default + Clone + Lattice + Timestamp, + R: Columnar + Debug + Ord + Default + Semigroup + 'static, + { + type Key = K; + type Val = V; + type Time = T; + type Diff = R; + } + + use crate::arrangement::Coltainer; + impl Layout for ColumnarLayout { + type KeyContainer = Coltainer; + type ValContainer = Coltainer; + type TimeContainer = Coltainer; + type DiffContainer = Coltainer; + type OffsetContainer = OffsetList; + } + + /// A type that names constituent update types. + /// + /// We will use their associated `Columnar::Container` + pub trait ColumnarUpdate : Debug + 'static { + type Key: Columnar + Debug + Ord + Clone + 'static; + type Val: Columnar + Debug + Ord + Clone + 'static; + type Time: Columnar + Debug + Ord + Default + Clone + Lattice + Timestamp; + type Diff: Columnar + Debug + Ord + Default + Semigroup + 'static; + } + + /// A container whose references can be ordered. + pub trait OrdContainer : for<'a> columnar::Container : Ord> { } + impl columnar::Container : Ord>> OrdContainer for C { } + +} + +pub use updates::Updates; + +/// A thin wrapper around `Updates` that tracks the pre-consolidation record count +/// for timely's exchange accounting. This wrapper is the stream container type; +/// the `TrieChunker` strips it, passing bare `Updates` into the merge batcher. +pub struct RecordedUpdates { + pub updates: Updates, + pub records: usize, +} + +impl Default for RecordedUpdates { + fn default() -> Self { Self { updates: Default::default(), records: 0 } } +} + +impl Clone for RecordedUpdates { + fn clone(&self) -> Self { Self { updates: self.updates.clone(), records: self.records } } +} + +impl timely::Accountable for RecordedUpdates { + #[inline] fn record_count(&self) -> i64 { self.records as i64 } +} + +impl timely::dataflow::channels::ContainerBytes for RecordedUpdates { + fn from_bytes(_bytes: timely::bytes::arc::Bytes) -> Self { unimplemented!() } + fn length_in_bytes(&self) -> usize { unimplemented!() } + fn into_bytes(&self, _writer: &mut W) { unimplemented!() } +} + +// Container trait impls for RecordedUpdates, enabling iterative scopes. +mod container_impls { + use columnar::{Borrow, Columnar, Index, Len, Push}; + use timely::progress::{Timestamp, timestamp::Refines}; + use differential_dataflow::difference::Abelian; + use differential_dataflow::collection::containers::{Negate, Enter, Leave, ResultsIn}; + + use crate::layout::ColumnarUpdate as Update; + use crate::{RecordedUpdates, Updates}; + + impl> Negate for RecordedUpdates { + fn negate(mut self) -> Self { + let len = self.updates.diffs.values.len(); + let mut new_diffs = <::Container as Default>::default(); + let mut owned = U::Diff::default(); + for i in 0..len { + columnar::Columnar::copy_from(&mut owned, self.updates.diffs.values.borrow().get(i)); + owned.negate(); + new_diffs.push(&owned); + } + self.updates.diffs.values = new_diffs; + self + } + } + + impl Enter for RecordedUpdates<(K, V, T1, R)> + where + (K, V, T1, R): Update, + (K, V, T2, R): Update, + T1: Timestamp + Columnar + Default + Clone, + T2: Refines + Columnar + Default + Clone, + K: Columnar, V: Columnar, R: Columnar, + { + type InnerContainer = RecordedUpdates<(K, V, T2, R)>; + fn enter(self) -> Self::InnerContainer { + // Rebuild the time column; everything else moves as-is. + let mut new_times = <::Container as Default>::default(); + let mut t1_owned = T1::default(); + for i in 0..self.updates.times.values.len() { + Columnar::copy_from(&mut t1_owned, self.updates.times.values.borrow().get(i)); + let t2 = T2::to_inner(t1_owned.clone()); + new_times.push(&t2); + } + RecordedUpdates { + updates: Updates { + keys: self.updates.keys, + vals: self.updates.vals, + times: crate::updates::Lists { values: new_times, bounds: self.updates.times.bounds }, + diffs: self.updates.diffs, + }, + records: self.records, + } + } + } + + impl Leave for RecordedUpdates<(K, V, T1, R)> + where + (K, V, T1, R): Update, + (K, V, T2, R): Update, + T1: Refines + Columnar + Default + Clone, + T2: Timestamp + Columnar + Default + Clone, + K: Columnar, V: Columnar, R: Columnar, + { + type OuterContainer = RecordedUpdates<(K, V, T2, R)>; + fn leave(self) -> Self::OuterContainer { + // Flatten, convert times, and reconsolidate via consolidate. + // Leave can collapse distinct T1 times to the same T2 time, + // so the trie must be rebuilt with consolidation. + let mut flat = Updates::<(K, V, T2, R)>::default(); + let mut t1_owned = T1::default(); + for (k, v, t, d) in self.updates.iter() { + Columnar::copy_from(&mut t1_owned, t); + let t2: T2 = t1_owned.clone().to_outer(); + flat.push((k, v, &t2, d)); + } + RecordedUpdates { + updates: flat.consolidate(), + records: self.records, + } + } + } + + impl ResultsIn<::Summary> for RecordedUpdates { + fn results_in(self, step: &::Summary) -> Self { + use timely::progress::PathSummary; + // Apply results_in to each time; drop updates whose time maps to None. + // This must rebuild the trie since some entries may be removed. + let mut output = Updates::::default(); + let mut time_owned = U::Time::default(); + for (k, v, t, d) in self.updates.iter() { + Columnar::copy_from(&mut time_owned, t); + if let Some(new_time) = step.results_in(&time_owned) { + output.push((k, v, &new_time, d)); + } + } + RecordedUpdates { updates: output, records: self.records } + } + } +} + +pub use column_builder::ValBuilder as ValColBuilder; +mod column_builder { + + use std::collections::VecDeque; + use columnar::{Columnar, Clear, Len, Push}; + + use crate::layout::ColumnarUpdate as Update; + use crate::{Updates, RecordedUpdates}; + + type TupleContainer = <(::Key, ::Val, ::Time, ::Diff) as Columnar>::Container; + + /// A container builder that produces `RecordedUpdates` (sorted, consolidated trie + record count). + pub struct ValBuilder { + /// Container that we're writing to. + current: TupleContainer, + /// Empty allocation. + empty: Option>, + /// Completed containers pending to be sent. + pending: VecDeque>, + } + + use timely::container::PushInto; + impl PushInto for ValBuilder where TupleContainer : Push { + #[inline] + fn push_into(&mut self, item: T) { + self.current.push(item); + if self.current.len() > 1024 * 1024 { + use columnar::{Borrow, Index}; + let records = self.current.len(); + let mut refs = self.current.borrow().into_index_iter().collect::>(); + refs.sort(); + let updates = Updates::form(refs.into_iter()); + self.pending.push_back(RecordedUpdates { updates, records }); + self.current.clear(); + } + } + } + + impl Default for ValBuilder { + fn default() -> Self { + ValBuilder { + current: Default::default(), + empty: None, + pending: Default::default(), + } + } + } + + use timely::container::{ContainerBuilder, LengthPreservingContainerBuilder}; + impl ContainerBuilder for ValBuilder { + type Container = RecordedUpdates; + + #[inline] + fn extract(&mut self) -> Option<&mut Self::Container> { + if let Some(container) = self.pending.pop_front() { + self.empty = Some(container); + self.empty.as_mut() + } else { + None + } + } + + #[inline] + fn finish(&mut self) -> Option<&mut Self::Container> { + if !self.current.is_empty() { + use columnar::{Borrow, Index}; + let records = self.current.len(); + let mut refs = self.current.borrow().into_index_iter().collect::>(); + refs.sort(); + let updates = Updates::form(refs.into_iter()); + self.pending.push_back(RecordedUpdates { updates, records }); + self.current.clear(); + } + self.empty = self.pending.pop_front(); + self.empty.as_mut() + } + } + + impl LengthPreservingContainerBuilder for ValBuilder { } + +} + +pub use distributor::ValPact; +mod distributor { + + use std::rc::Rc; + + use columnar::{Borrow, Index, Len}; + use timely::logging::TimelyLogger; + use timely::dataflow::channels::pushers::{Exchange, exchange::Distributor}; + use timely::dataflow::channels::Message; + use timely::dataflow::channels::pact::{LogPuller, LogPusher, ParallelizationContract}; + use timely::progress::Timestamp; + use timely::worker::AsWorker; + + use crate::layout::ColumnarUpdate as Update; + use crate::{Updates, RecordedUpdates}; + + pub struct ValDistributor { + marker: std::marker::PhantomData, + hashfunc: H, + pre_lens: Vec, + } + + impl FnMut(columnar::Ref<'a, U::Key>)->u64> Distributor> for ValDistributor { + // TODO: For unsorted Updates (stride-1 outer keys), each key is its own outer group, + // so the per-group pre_lens snapshot and seal check costs O(keys × workers). Should + // either batch keys by destination first, or detect stride-1 outer bounds and use a + // simpler single-pass partitioning that seals once at the end. + fn partition>>>(&mut self, container: &mut RecordedUpdates, time: &T, pushers: &mut [P]) { + use crate::updates::child_range; + + let keys_b = container.updates.keys.borrow(); + let mut outputs: Vec> = (0..pushers.len()).map(|_| Updates::default()).collect(); + + // Each outer key group becomes a separate run in the destination. + for outer in 0..Len::len(&keys_b) { + self.pre_lens.clear(); + self.pre_lens.extend(outputs.iter().map(|o| o.keys.values.len())); + for k in child_range(keys_b.bounds, outer) { + let key = keys_b.values.get(k); + let idx = ((self.hashfunc)(key) as usize) % pushers.len(); + outputs[idx].extend_from_keys(&container.updates, k..k+1); + } + for (output, &pre) in outputs.iter_mut().zip(self.pre_lens.iter()) { + if output.keys.values.len() > pre { + output.keys.bounds.push(output.keys.values.len() as u64); + } + } + } + + // Distribute the input's record count across non-empty outputs. + let total_records = container.records; + let non_empty: usize = outputs.iter().filter(|o| !o.keys.values.is_empty()).count(); + let mut first_records = total_records.saturating_sub(non_empty.saturating_sub(1)); + for (pusher, output) in pushers.iter_mut().zip(outputs) { + if !output.keys.values.is_empty() { + let recorded = RecordedUpdates { updates: output, records: first_records }; + first_records = 1; + let mut recorded = recorded; + Message::push_at(&mut recorded, time.clone(), pusher); + } + } + } + fn flush>>>(&mut self, _time: &T, _pushers: &mut [P]) { } + fn relax(&mut self) { } + } + + pub struct ValPact { pub hashfunc: H } + + impl ParallelizationContract> for ValPact + where + T: Timestamp, + U: Update, + H: for<'a> FnMut(columnar::Ref<'a, U::Key>)->u64 + 'static, + { + type Pusher = Exchange< + T, + LogPusher>>>>, + ValDistributor + >; + type Puller = LogPuller>>>>; + + fn connect(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option) -> (Self::Pusher, Self::Puller) { + let (senders, receiver) = allocator.allocate::>>(identifier, address); + let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone())).collect::>(); + let distributor = ValDistributor { + marker: std::marker::PhantomData, + hashfunc: self.hashfunc, + pre_lens: Vec::new(), + }; + (Exchange::new(senders, distributor), LogPuller::new(receiver, allocator.index(), identifier, logging.clone())) + } + } +} + +pub use arrangement::{ValBatcher, ValBuilder, ValSpine}; +pub mod arrangement { + + use std::rc::Rc; + use differential_dataflow::trace::implementations::ord_neu::OrdValBatch; + use differential_dataflow::trace::rc_blanket_impls::RcBuilder; + use differential_dataflow::trace::implementations::spine_fueled::Spine; + + use crate::layout::ColumnarLayout; + + /// A trace implementation backed by columnar storage. + pub type ValSpine = Spine>>>; + /// A batcher for columnar storage. + pub type ValBatcher = ValBatcher2<(K,V,T,R)>; + /// A builder for columnar storage. + pub type ValBuilder = RcBuilder>; + + /// A batch container implementation for Coltainer. + pub use batch_container::Coltainer; + pub mod batch_container { + + use columnar::{Borrow, Columnar, Container, Clear, Push, Index, Len}; + use differential_dataflow::trace::implementations::BatchContainer; + + /// Container, anchored by `C` to provide an owned type. + pub struct Coltainer { + pub container: C::Container, + } + + impl Default for Coltainer { + fn default() -> Self { Self { container: Default::default() } } + } + + impl BatchContainer for Coltainer where for<'a> columnar::Ref<'a, C> : Ord { + + type ReadItem<'a> = columnar::Ref<'a, C>; + type Owned = C; + + #[inline(always)] fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned { C::into_owned(item) } + #[inline(always)] fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) { other.copy_from(item) } + + #[inline(always)] fn push_ref(&mut self, item: Self::ReadItem<'_>) { self.container.push(item) } + #[inline(always)] fn push_own(&mut self, item: &Self::Owned) { self.container.push(item) } + + /// Clears the container. May not release resources. + fn clear(&mut self) { self.container.clear() } + + /// Creates a new container with sufficient capacity. + fn with_capacity(_size: usize) -> Self { Self::default() } + /// Creates a new container with sufficient capacity. + fn merge_capacity(cont1: &Self, cont2: &Self) -> Self { + Self { + container: ::Container::with_capacity_for([cont1.container.borrow(), cont2.container.borrow()].into_iter()), + } + } + + /// Converts a read item into one with a narrower lifetime. + #[inline(always)] fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { columnar::ContainerOf::::reborrow_ref(item) } + + /// Reference to the element at this position. + #[inline(always)] fn index(&self, index: usize) -> Self::ReadItem<'_> { self.container.borrow().get(index) } + + #[inline(always)] fn len(&self) -> usize { self.container.len() } + } + } + + use crate::{Updates, RecordedUpdates}; + use differential_dataflow::trace::implementations::merge_batcher::{MergeBatcher, InternalMerger}; + type ValBatcher2 = MergeBatcher, TrieChunker, InternalMerger>>; + + /// A chunker that unwraps `RecordedUpdates` into bare `Updates` for the merge batcher. + /// The `records` accounting is discarded here — it has served its purpose for exchange. + /// + /// IMPORTANT: This chunker assumes the input `Updates` are sorted and consolidated + /// (as produced by `ValColBuilder::form`). The downstream `InternalMerge` relies on + /// this invariant. If `RecordedUpdates` could carry unsorted data (e.g. from a `map`), + /// we would need either a sorting chunker for that case, or a type-level distinction + /// (e.g. `RecordedUpdates` vs `RecordedUpdates`) to + /// route to the right chunker. + pub struct TrieChunker { + ready: std::collections::VecDeque>, + empty: Option>, + } + + impl Default for TrieChunker { + fn default() -> Self { Self { ready: Default::default(), empty: None } } + } + + impl<'a, U: crate::layout::ColumnarUpdate> timely::container::PushInto<&'a mut RecordedUpdates> for TrieChunker { + fn push_into(&mut self, container: &'a mut RecordedUpdates) { + self.ready.push_back(std::mem::take(&mut container.updates)); + } + } + + impl timely::container::ContainerBuilder for TrieChunker { + type Container = Updates; + fn extract(&mut self) -> Option<&mut Self::Container> { + if let Some(ready) = self.ready.pop_front() { + self.empty = Some(ready); + self.empty.as_mut() + } else { + None + } + } + fn finish(&mut self) -> Option<&mut Self::Container> { + self.empty = self.ready.pop_front(); + self.empty.as_mut() + } + } + + pub mod batcher { + + use std::ops::Range; + use columnar::{Borrow, Columnar, Container, Index, Len, Push}; + use differential_dataflow::difference::{Semigroup, IsZero}; + use timely::progress::frontier::{Antichain, AntichainRef}; + use differential_dataflow::trace::implementations::merge_batcher::container::InternalMerge; + + use crate::ColumnarUpdate as Update; + use crate::Updates; + + impl timely::container::SizableContainer for Updates { + fn at_capacity(&self) -> bool { + use columnar::Len; + self.diffs.values.len() >= 64 * 1024 + } + fn ensure_capacity(&mut self, _stash: &mut Option) { } + } + + impl InternalMerge for Updates { + + type TimeOwned = U::Time; + + fn len(&self) -> usize { self.diffs.values.len() } + fn clear(&mut self) { *self = Self::default(); } + + #[inline(never)] + fn merge_from(&mut self, others: &mut [Self], positions: &mut [usize]) { + match others.len() { + 0 => {}, + 1 => { + // Bulk copy: take remaining keys from position onward. + let other = &mut others[0]; + let pos = &mut positions[0]; + if self.keys.values.len() == 0 && *pos == 0 { + std::mem::swap(self, other); + return; + } + let other_len = other.keys.values.len(); + self.extend_from_keys(other, *pos .. other_len); + *pos = other_len; + }, + 2 => { + let mut this_sum = U::Diff::default(); + let mut that_sum = U::Diff::default(); + + let (left, right) = others.split_at_mut(1); + let this = &left[0]; + let that = &right[0]; + let this_keys = this.keys.values.borrow(); + let that_keys = that.keys.values.borrow(); + let mut this_key_range = positions[0] .. this_keys.len(); + let mut that_key_range = positions[1] .. that_keys.len(); + + while !this_key_range.is_empty() && !that_key_range.is_empty() && !timely::container::SizableContainer::at_capacity(self) { + let this_key = this_keys.get(this_key_range.start); + let that_key = that_keys.get(that_key_range.start); + match this_key.cmp(&that_key) { + std::cmp::Ordering::Less => { + let lower = this_key_range.start; + gallop(this_keys, &mut this_key_range, |x| x < that_key); + self.extend_from_keys(this, lower .. this_key_range.start); + }, + std::cmp::Ordering::Equal => { + let values_len = self.vals.values.len(); + let mut this_val_range = this.vals_bounds(this_key_range.start .. this_key_range.start+1); + let mut that_val_range = that.vals_bounds(that_key_range.start .. that_key_range.start+1); + while !this_val_range.is_empty() && !that_val_range.is_empty() { + let this_val = this.vals.values.borrow().get(this_val_range.start); + let that_val = that.vals.values.borrow().get(that_val_range.start); + match this_val.cmp(&that_val) { + std::cmp::Ordering::Less => { + let lower = this_val_range.start; + gallop(this.vals.values.borrow(), &mut this_val_range, |x| x < that_val); + self.extend_from_vals(this, lower .. this_val_range.start); + }, + std::cmp::Ordering::Equal => { + let updates_len = self.times.values.len(); + let mut this_time_range = this.times_bounds(this_val_range.start .. this_val_range.start+1); + let mut that_time_range = that.times_bounds(that_val_range.start .. that_val_range.start+1); + while !this_time_range.is_empty() && !that_time_range.is_empty() { + let this_time = this.times.values.borrow().get(this_time_range.start); + let this_diff = this.diffs.values.borrow().get(this_time_range.start); + let that_time = that.times.values.borrow().get(that_time_range.start); + let that_diff = that.diffs.values.borrow().get(that_time_range.start); + match this_time.cmp(&that_time) { + std::cmp::Ordering::Less => { + let lower = this_time_range.start; + gallop(this.times.values.borrow(), &mut this_time_range, |x| x < that_time); + self.times.values.extend_from_self(this.times.values.borrow(), lower .. this_time_range.start); + self.diffs.extend_from_self(this.diffs.borrow(), lower .. this_time_range.start); + }, + std::cmp::Ordering::Equal => { + this_sum.copy_from(this_diff); + that_sum.copy_from(that_diff); + this_sum.plus_equals(&that_sum); + if !this_sum.is_zero() { + self.times.values.push(this_time); + self.diffs.values.push(&this_sum); + self.diffs.bounds.push(self.diffs.values.len() as u64); + } + this_time_range.start += 1; + that_time_range.start += 1; + }, + std::cmp::Ordering::Greater => { + let lower = that_time_range.start; + gallop(that.times.values.borrow(), &mut that_time_range, |x| x < this_time); + self.times.values.extend_from_self(that.times.values.borrow(), lower .. that_time_range.start); + self.diffs.extend_from_self(that.diffs.borrow(), lower .. that_time_range.start); + }, + } + } + // Remaining from this side + if !this_time_range.is_empty() { + self.times.values.extend_from_self(this.times.values.borrow(), this_time_range.clone()); + self.diffs.extend_from_self(this.diffs.borrow(), this_time_range.clone()); + } + // Remaining from that side + if !that_time_range.is_empty() { + self.times.values.extend_from_self(that.times.values.borrow(), that_time_range.clone()); + self.diffs.extend_from_self(that.diffs.borrow(), that_time_range.clone()); + } + if self.times.values.len() > updates_len { + self.times.bounds.push(self.times.values.len() as u64); + self.vals.values.push(this_val); + } + this_val_range.start += 1; + that_val_range.start += 1; + }, + std::cmp::Ordering::Greater => { + let lower = that_val_range.start; + gallop(that.vals.values.borrow(), &mut that_val_range, |x| x < this_val); + self.extend_from_vals(that, lower .. that_val_range.start); + }, + } + } + self.extend_from_vals(this, this_val_range); + self.extend_from_vals(that, that_val_range); + if self.vals.values.len() > values_len { + self.vals.bounds.push(self.vals.values.len() as u64); + self.keys.values.push(this_key); + } + this_key_range.start += 1; + that_key_range.start += 1; + }, + std::cmp::Ordering::Greater => { + let lower = that_key_range.start; + gallop(that_keys, &mut that_key_range, |x| x < this_key); + self.extend_from_keys(that, lower .. that_key_range.start); + }, + } + } + // Copy remaining from whichever side has data, up to capacity. + while !this_key_range.is_empty() && !timely::container::SizableContainer::at_capacity(self) { + let lower = this_key_range.start; + this_key_range.start = this_key_range.end; // take all remaining + self.extend_from_keys(this, lower .. this_key_range.start); + } + while !that_key_range.is_empty() && !timely::container::SizableContainer::at_capacity(self) { + let lower = that_key_range.start; + that_key_range.start = that_key_range.end; + self.extend_from_keys(that, lower .. that_key_range.start); + } + positions[0] = this_key_range.start; + positions[1] = that_key_range.start; + }, + n => unimplemented!("{n}-way merge not supported"), + } + } + + fn extract( + &mut self, + upper: AntichainRef, + frontier: &mut Antichain, + keep: &mut Self, + ship: &mut Self, + ) { + let mut time = U::Time::default(); + for key_idx in 0 .. self.keys.values.len() { + let key = self.keys.values.borrow().get(key_idx); + let keep_vals_len = keep.vals.values.len(); + let ship_vals_len = ship.vals.values.len(); + for val_idx in self.vals_bounds(key_idx..key_idx+1) { + let val = self.vals.values.borrow().get(val_idx); + let keep_times_len = keep.times.values.len(); + let ship_times_len = ship.times.values.len(); + for time_idx in self.times_bounds(val_idx..val_idx+1) { + let t = self.times.values.borrow().get(time_idx); + let diff = self.diffs.values.borrow().get(time_idx); + time.copy_from(t); + if upper.less_equal(&time) { + frontier.insert_ref(&time); + keep.times.values.push(t); + keep.diffs.values.push(diff); + keep.diffs.bounds.push(keep.diffs.values.len() as u64); + } + else { + ship.times.values.push(t); + ship.diffs.values.push(diff); + ship.diffs.bounds.push(ship.diffs.values.len() as u64); + } + } + if keep.times.values.len() > keep_times_len { + keep.times.bounds.push(keep.times.values.len() as u64); + keep.vals.values.push(val); + } + if ship.times.values.len() > ship_times_len { + ship.times.bounds.push(ship.times.values.len() as u64); + ship.vals.values.push(val); + } + } + if keep.vals.values.len() > keep_vals_len { + keep.vals.bounds.push(keep.vals.values.len() as u64); + keep.keys.values.push(key); + } + if ship.vals.values.len() > ship_vals_len { + ship.vals.bounds.push(ship.vals.values.len() as u64); + ship.keys.values.push(key); + } + } + } + } + + #[inline(always)] + pub(crate) fn gallop(input: TC, range: &mut Range, mut cmp: impl FnMut(::Ref) -> bool) { + // if empty input, or already >= element, return + if !Range::::is_empty(range) && cmp(input.get(range.start)) { + let mut step = 1; + while range.start + step < range.end && cmp(input.get(range.start + step)) { + range.start += step; + step <<= 1; + } + + step >>= 1; + while step > 0 { + if range.start + step < range.end && cmp(input.get(range.start + step)) { + range.start += step; + } + step >>= 1; + } + + range.start += 1; + } + } + } + + use builder::ValMirror; + pub mod builder { + + use differential_dataflow::trace::implementations::ord_neu::{Vals, Upds}; + use differential_dataflow::trace::implementations::ord_neu::val_batch::{OrdValBatch, OrdValStorage}; + use differential_dataflow::trace::Description; + + use crate::Updates; + use crate::layout::ColumnarUpdate as Update; + use crate::layout::ColumnarLayout as Layout; + use crate::arrangement::Coltainer; + + use columnar::{Borrow, IndexAs}; + use columnar::primitive::offsets::Strides; + use differential_dataflow::trace::implementations::OffsetList; + fn strides_to_offset_list(bounds: &Strides, count: usize) -> OffsetList { + let mut output = OffsetList::with_capacity(count); + output.push(0); + let bounds_b = bounds.borrow(); + for i in 0..count { + output.push(bounds_b.index_as(i) as usize); + } + output + } + + pub struct ValMirror { + current: Updates, + } + impl differential_dataflow::trace::Builder for ValMirror { + type Time = U::Time; + type Input = Updates; + type Output = OrdValBatch>; + + fn with_capacity(_keys: usize, _vals: usize, _upds: usize) -> Self { + Self { current: Updates::default() } + } + fn push(&mut self, chunk: &mut Self::Input) { + use columnar::Len; + let len = chunk.keys.values.len(); + if len > 0 { + self.current.extend_from_keys(chunk, 0..len); + } + } + fn done(self, description: Description) -> Self::Output { + let mut chain = if self.current.len() > 0 { + vec![self.current] + } else { + vec![] + }; + Self::seal(&mut chain, description) + } + fn seal(chain: &mut Vec, description: Description) -> Self::Output { + if chain.len() == 0 { + let storage = OrdValStorage { + keys: Default::default(), + vals: Default::default(), + upds: Default::default(), + }; + OrdValBatch { storage, description, updates: 0 } + } + else if chain.len() == 1 { + use columnar::Len; + let storage = chain.pop().unwrap(); + let updates = storage.diffs.values.len(); + let val_offs = strides_to_offset_list(&storage.vals.bounds, storage.keys.values.len()); + let time_offs = strides_to_offset_list(&storage.times.bounds, storage.vals.values.len()); + let storage = OrdValStorage { + keys: Coltainer { container: storage.keys.values }, + vals: Vals { + offs: val_offs, + vals: Coltainer { container: storage.vals.values }, + }, + upds: Upds { + offs: time_offs, + times: Coltainer { container: storage.times.values }, + diffs: Coltainer { container: storage.diffs.values }, + }, + }; + OrdValBatch { storage, description, updates } + } + else { + use columnar::Len; + let mut merged = chain.remove(0); + for other in chain.drain(..) { + let len = other.keys.values.len(); + if len > 0 { + merged.extend_from_keys(&other, 0..len); + } + } + chain.push(merged); + Self::seal(chain, description) + } + } + } + + } +} + +pub mod updates { + + use columnar::{Columnar, Container, ContainerOf, Vecs, Borrow, Index, IndexAs, Len, Push}; + use columnar::primitive::offsets::Strides; + use differential_dataflow::difference::{Semigroup, IsZero}; + + use crate::layout::ColumnarUpdate as Update; + + /// A `Vecs` using strided offsets. + pub type Lists = Vecs; + + /// Trie-structured update storage using columnar containers. + /// + /// Four nested layers of `Lists`: + /// - `keys`: lists of keys (outer lists are independent groups) + /// - `vals`: per-key, lists of vals + /// - `times`: per-val, lists of times + /// - `diffs`: per-time, lists of diffs (singletons when consolidated) + /// + /// A flat unsorted input has stride 1 at every level (one key per entry, + /// one val per key, one time per val, one diff per time). + /// A fully consolidated trie has a single outer key list, all lists sorted + /// and deduplicated, and singleton diff lists. + pub struct Updates { + pub keys: Lists>, + pub vals: Lists>, + pub times: Lists>, + pub diffs: Lists>, + } + + impl Default for Updates { + fn default() -> Self { + Self { + keys: Default::default(), + vals: Default::default(), + times: Default::default(), + diffs: Default::default(), + } + } + } + + impl std::fmt::Debug for Updates { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Updates").finish() + } + } + + impl Clone for Updates { + fn clone(&self) -> Self { + Self { + keys: self.keys.clone(), + vals: self.vals.clone(), + times: self.times.clone(), + diffs: self.diffs.clone(), + } + } + } + + pub type Tuple = (::Key, ::Val, ::Time, ::Diff); + + /// Returns the value-index range for list `i` given cumulative bounds. + #[inline] + pub fn child_range>(bounds: B, i: usize) -> std::ops::Range { + let lower = if i == 0 { 0 } else { bounds.index_as(i - 1) as usize }; + let upper = bounds.index_as(i) as usize; + lower..upper + } + + impl Updates { + + pub fn vals_bounds(&self, key_range: std::ops::Range) -> std::ops::Range { + if !key_range.is_empty() { + let bounds = self.vals.bounds.borrow(); + let lower = if key_range.start == 0 { 0 } else { bounds.index_as(key_range.start - 1) as usize }; + let upper = bounds.index_as(key_range.end - 1) as usize; + lower..upper + } else { key_range } + } + pub fn times_bounds(&self, val_range: std::ops::Range) -> std::ops::Range { + if !val_range.is_empty() { + let bounds = self.times.bounds.borrow(); + let lower = if val_range.start == 0 { 0 } else { bounds.index_as(val_range.start - 1) as usize }; + let upper = bounds.index_as(val_range.end - 1) as usize; + lower..upper + } else { val_range } + } + pub fn diffs_bounds(&self, time_range: std::ops::Range) -> std::ops::Range { + if !time_range.is_empty() { + let bounds = self.diffs.bounds.borrow(); + let lower = if time_range.start == 0 { 0 } else { bounds.index_as(time_range.start - 1) as usize }; + let upper = bounds.index_as(time_range.end - 1) as usize; + lower..upper + } else { time_range } + } + + /// Copies `other[key_range]` into self, keys and all. + pub fn extend_from_keys(&mut self, other: &Self, key_range: std::ops::Range) { + self.keys.values.extend_from_self(other.keys.values.borrow(), key_range.clone()); + self.vals.extend_from_self(other.vals.borrow(), key_range.clone()); + let val_range = other.vals_bounds(key_range); + self.times.extend_from_self(other.times.borrow(), val_range.clone()); + let time_range = other.times_bounds(val_range); + self.diffs.extend_from_self(other.diffs.borrow(), time_range); + } + + /// Copies a range of vals (with their times and diffs) from `other` into self. + pub fn extend_from_vals(&mut self, other: &Self, val_range: std::ops::Range) { + self.vals.values.extend_from_self(other.vals.values.borrow(), val_range.clone()); + self.times.extend_from_self(other.times.borrow(), val_range.clone()); + let time_range = other.times_bounds(val_range); + self.diffs.extend_from_self(other.diffs.borrow(), time_range); + } + + /// Forms a consolidated `Updates` from sorted `(key, val, time, diff)` refs. + /// + /// Tracks a `prev` reference to the previous element. On each new element, + /// compares against `prev` to detect key/val/time changes. Only pushes + /// accumulated diffs when they are nonzero, and only emits times/vals/keys + /// that have at least one nonzero diff beneath them. + pub fn form<'a>(mut sorted: impl Iterator>>) -> Self { + + let mut output = Self::default(); + let mut diff_stash = U::Diff::default(); + let mut diff_temp = U::Diff::default(); + + if let Some(first) = sorted.next() { + + let mut prev = first; + Columnar::copy_from(&mut diff_stash, prev.3); + + for curr in sorted { + let key_differs = ContainerOf::::reborrow_ref(curr.0) != ContainerOf::::reborrow_ref(prev.0); + let val_differs = key_differs || ContainerOf::::reborrow_ref(curr.1) != ContainerOf::::reborrow_ref(prev.1); + let time_differs = val_differs || ContainerOf::::reborrow_ref(curr.2) != ContainerOf::::reborrow_ref(prev.2); + + if time_differs { + // Flush the accumulated diff for prev's (key, val, time). + if !diff_stash.is_zero() { + // We have a real update to emit. Push time (and val/key + // if this is the first time under them). + let times_len = output.times.values.len(); + let vals_len = output.vals.values.len(); + + if val_differs { + // Seal the previous val's time list, if any times were emitted. + if times_len > 0 { + output.times.bounds.push(times_len as u64); + } + if key_differs { + // Seal the previous key's val list, if any vals were emitted. + if vals_len > 0 { + output.vals.bounds.push(vals_len as u64); + } + output.keys.values.push(prev.0); + } + output.vals.values.push(prev.1); + } + output.times.values.push(prev.2); + output.diffs.values.push(&diff_stash); + output.diffs.bounds.push(output.diffs.values.len() as u64); + } + Columnar::copy_from(&mut diff_stash, curr.3); + } else { + // Same (key, val, time): accumulate diff. + Columnar::copy_from(&mut diff_temp, curr.3); + diff_stash.plus_equals(&diff_temp); + } + prev = curr; + } + + // Flush the final accumulated diff. + if !diff_stash.is_zero() { + let keys_len = output.keys.values.len(); + let vals_len = output.vals.values.len(); + let times_len = output.times.values.len(); + let need_key = keys_len == 0 || ContainerOf::::reborrow_ref(prev.0) != output.keys.values.borrow().get(keys_len - 1); + let need_val = need_key || vals_len == 0 || ContainerOf::::reborrow_ref(prev.1) != output.vals.values.borrow().get(vals_len - 1); + + if need_val { + if times_len > 0 { + output.times.bounds.push(times_len as u64); + } + if need_key { + if vals_len > 0 { + output.vals.bounds.push(vals_len as u64); + } + output.keys.values.push(prev.0); + } + output.vals.values.push(prev.1); + } + output.times.values.push(prev.2); + output.diffs.values.push(&diff_stash); + output.diffs.bounds.push(output.diffs.values.len() as u64); + } + + // Seal the final groups at each level. + if !output.times.values.is_empty() { + output.times.bounds.push(output.times.values.len() as u64); + } + if !output.vals.values.is_empty() { + output.vals.bounds.push(output.vals.values.len() as u64); + } + if !output.keys.values.is_empty() { + output.keys.bounds.push(output.keys.values.len() as u64); + } + } + + output + } + + /// Consolidates into canonical trie form: + /// single outer key list, all lists sorted and deduplicated, + /// diff lists are singletons (or absent if cancelled). + pub fn consolidate(self) -> Self { + + let Self { keys, vals, times, diffs } = self; + + let keys_b = keys.borrow(); + let vals_b = vals.borrow(); + let times_b = times.borrow(); + let diffs_b = diffs.borrow(); + + // Flatten to index tuples: [key_abs, val_abs, time_abs, diff_abs]. + let mut tuples: Vec<[usize; 4]> = Vec::new(); + for outer in 0..Len::len(&keys_b) { + for k in child_range(keys_b.bounds, outer) { + for v in child_range(vals_b.bounds, k) { + for t in child_range(times_b.bounds, v) { + for d in child_range(diffs_b.bounds, t) { + tuples.push([k, v, t, d]); + } + } + } + } + } + + // Sort by (key, val, time). Diff is payload. + tuples.sort_by(|a, b| { + keys_b.values.get(a[0]).cmp(&keys_b.values.get(b[0])) + .then_with(|| vals_b.values.get(a[1]).cmp(&vals_b.values.get(b[1]))) + .then_with(|| times_b.values.get(a[2]).cmp(×_b.values.get(b[2]))) + }); + + // Build consolidated output, bottom-up cancellation. + let mut output = Self::default(); + let mut diff_stash = U::Diff::default(); + let mut diff_temp = U::Diff::default(); + + let mut idx = 0; + while idx < tuples.len() { + let key_ref = keys_b.values.get(tuples[idx][0]); + let key_start_vals = output.vals.values.len(); + + // All entries with this key. + while idx < tuples.len() && keys_b.values.get(tuples[idx][0]) == key_ref { + let val_ref = vals_b.values.get(tuples[idx][1]); + let val_start_times = output.times.values.len(); + + // All entries with this (key, val). + while idx < tuples.len() + && keys_b.values.get(tuples[idx][0]) == key_ref + && vals_b.values.get(tuples[idx][1]) == val_ref + { + let time_ref = times_b.values.get(tuples[idx][2]); + + // Sum all diffs for this (key, val, time). + Columnar::copy_from(&mut diff_stash, diffs_b.values.get(tuples[idx][3])); + idx += 1; + while idx < tuples.len() + && keys_b.values.get(tuples[idx][0]) == key_ref + && vals_b.values.get(tuples[idx][1]) == val_ref + && times_b.values.get(tuples[idx][2]) == time_ref + { + Columnar::copy_from(&mut diff_temp, diffs_b.values.get(tuples[idx][3])); + diff_stash.plus_equals(&diff_temp); + idx += 1; + } + + // Emit time + singleton diff if nonzero. + if !diff_stash.is_zero() { + output.times.values.push(time_ref); + output.diffs.values.push(&diff_stash); + output.diffs.bounds.push(output.diffs.values.len() as u64); + } + } + + // Seal time list for this val; emit val if any times survived. + if output.times.values.len() > val_start_times { + output.times.bounds.push(output.times.values.len() as u64); + output.vals.values.push(val_ref); + } + } + + // Seal val list for this key; emit key if any vals survived. + if output.vals.values.len() > key_start_vals { + output.vals.bounds.push(output.vals.values.len() as u64); + output.keys.values.push(key_ref); + } + } + + // Seal the single outer key list. + if !output.keys.values.is_empty() { + output.keys.bounds.push(output.keys.values.len() as u64); + } + + output + } + + /// The number of leaf-level diff entries (total updates). + pub fn len(&self) -> usize { self.diffs.values.len() } + } + + /// Push a single flat update as a stride-1 entry. + /// + /// Each field is independently typed — columnar refs, `&Owned`, owned values, + /// or any other type the column container accepts via its `Push` impl. + impl Push<(KP, VP, TP, DP)> for Updates + where + ContainerOf: Push, + ContainerOf: Push, + ContainerOf: Push, + ContainerOf: Push, + { + fn push(&mut self, (key, val, time, diff): (KP, VP, TP, DP)) { + self.keys.values.push(key); + self.keys.bounds.push(self.keys.values.len() as u64); + self.vals.values.push(val); + self.vals.bounds.push(self.vals.values.len() as u64); + self.times.values.push(time); + self.times.bounds.push(self.times.values.len() as u64); + self.diffs.values.push(diff); + self.diffs.bounds.push(self.diffs.values.len() as u64); + } + } + + /// PushInto for the `((K, V), T, R)` shape that reduce_trace uses. + impl timely::container::PushInto<((U::Key, U::Val), U::Time, U::Diff)> for Updates { + fn push_into(&mut self, ((key, val), time, diff): ((U::Key, U::Val), U::Time, U::Diff)) { + self.push((&key, &val, &time, &diff)); + } + } + + impl Updates { + + /// Iterate all `(key, val, time, diff)` entries as refs. + pub fn iter(&self) -> impl Iterator, + columnar::Ref<'_, U::Val>, + columnar::Ref<'_, U::Time>, + columnar::Ref<'_, U::Diff>, + )> { + let keys_b = self.keys.borrow(); + let vals_b = self.vals.borrow(); + let times_b = self.times.borrow(); + let diffs_b = self.diffs.borrow(); + + (0..Len::len(&keys_b)) + .flat_map(move |outer| child_range(keys_b.bounds, outer)) + .flat_map(move |k| { + let key = keys_b.values.get(k); + child_range(vals_b.bounds, k).map(move |v| (key, v)) + }) + .flat_map(move |(key, v)| { + let val = vals_b.values.get(v); + child_range(times_b.bounds, v).map(move |t| (key, val, t)) + }) + .flat_map(move |(key, val, t)| { + let time = times_b.values.get(t); + child_range(diffs_b.bounds, t).map(move |d| (key, val, time, diffs_b.values.get(d))) + }) + } + } + + impl timely::Accountable for Updates { + #[inline] fn record_count(&self) -> i64 { Len::len(&self.diffs.values) as i64 } + } + + impl timely::dataflow::channels::ContainerBytes for Updates { + fn from_bytes(_bytes: timely::bytes::arc::Bytes) -> Self { unimplemented!() } + fn length_in_bytes(&self) -> usize { unimplemented!() } + fn into_bytes(&self, _writer: &mut W) { unimplemented!() } + } + + #[cfg(test)] + mod tests { + use super::*; + use columnar::Push; + + type TestUpdate = (u64, u64, u64, i64); + + fn collect(updates: &Updates) -> Vec<(u64, u64, u64, i64)> { + updates.iter().map(|(k, v, t, d)| (*k, *v, *t, *d)).collect() + } + + #[test] + fn test_push_and_consolidate_basic() { + let mut updates = Updates::::default(); + updates.push((&1, &10, &100, &1)); + updates.push((&1, &10, &100, &2)); + updates.push((&2, &20, &200, &5)); + assert_eq!(updates.len(), 3); + assert_eq!(collect(&updates.consolidate()), vec![(1, 10, 100, 3), (2, 20, 200, 5)]); + } + + #[test] + fn test_cancellation() { + let mut updates = Updates::::default(); + updates.push((&1, &10, &100, &3)); + updates.push((&1, &10, &100, &-3)); + updates.push((&2, &20, &200, &1)); + assert_eq!(collect(&updates.consolidate()), vec![(2, 20, 200, 1)]); + } + + #[test] + fn test_multiple_vals_and_times() { + let mut updates = Updates::::default(); + updates.push((&1, &10, &100, &1)); + updates.push((&1, &10, &200, &2)); + updates.push((&1, &20, &100, &3)); + updates.push((&1, &20, &100, &4)); + assert_eq!(collect(&updates.consolidate()), vec![(1, 10, 100, 1), (1, 10, 200, 2), (1, 20, 100, 7)]); + } + + #[test] + fn test_val_cancellation_propagates() { + let mut updates = Updates::::default(); + updates.push((&1, &10, &100, &5)); + updates.push((&1, &10, &100, &-5)); + updates.push((&1, &20, &100, &1)); + assert_eq!(collect(&updates.consolidate()), vec![(1, 20, 100, 1)]); + } + + #[test] + fn test_empty() { + let updates = Updates::::default(); + assert_eq!(collect(&updates.consolidate()), vec![]); + } + + #[test] + fn test_total_cancellation() { + let mut updates = Updates::::default(); + updates.push((&1, &10, &100, &1)); + updates.push((&1, &10, &100, &-1)); + assert_eq!(collect(&updates.consolidate()), vec![]); + } + + #[test] + fn test_unsorted_input() { + let mut updates = Updates::::default(); + updates.push((&3, &30, &300, &1)); + updates.push((&1, &10, &100, &2)); + updates.push((&2, &20, &200, &3)); + assert_eq!(collect(&updates.consolidate()), vec![(1, 10, 100, 2), (2, 20, 200, 3), (3, 30, 300, 1)]); + } + + #[test] + fn test_first_key_cancels() { + let mut updates = Updates::::default(); + updates.push((&1, &10, &100, &5)); + updates.push((&1, &10, &100, &-5)); + updates.push((&2, &20, &200, &3)); + assert_eq!(collect(&updates.consolidate()), vec![(2, 20, 200, 3)]); + } + + #[test] + fn test_middle_time_cancels() { + let mut updates = Updates::::default(); + updates.push((&1, &10, &100, &1)); + updates.push((&1, &10, &200, &2)); + updates.push((&1, &10, &200, &-2)); + updates.push((&1, &10, &300, &3)); + assert_eq!(collect(&updates.consolidate()), vec![(1, 10, 100, 1), (1, 10, 300, 3)]); + } + + #[test] + fn test_first_val_cancels() { + let mut updates = Updates::::default(); + updates.push((&1, &10, &100, &1)); + updates.push((&1, &10, &100, &-1)); + updates.push((&1, &20, &100, &5)); + assert_eq!(collect(&updates.consolidate()), vec![(1, 20, 100, 5)]); + } + + #[test] + fn test_interleaved_cancellations() { + let mut updates = Updates::::default(); + updates.push((&1, &10, &100, &1)); + updates.push((&1, &10, &100, &-1)); + updates.push((&2, &20, &200, &7)); + updates.push((&3, &30, &300, &4)); + updates.push((&3, &30, &300, &-4)); + assert_eq!(collect(&updates.consolidate()), vec![(2, 20, 200, 7)]); + } + } +} + +/// A columnar flat_map: iterates RecordedUpdates, calls logic per (key, val, time, diff), +/// joins output times with input times, multiplies output diffs with input diffs. +/// +/// This subsumes map, filter, negate, and enter_at for columnar collections. +pub fn join_function( + input: differential_dataflow::Collection>, + mut logic: L, +) -> differential_dataflow::Collection> +where + G: timely::dataflow::Scope, + G::Timestamp: differential_dataflow::lattice::Lattice, + U: layout::ColumnarUpdate>, + I: IntoIterator, + L: FnMut( + columnar::Ref<'_, U::Key>, + columnar::Ref<'_, U::Val>, + columnar::Ref<'_, U::Time>, + columnar::Ref<'_, U::Diff>, + ) -> I + 'static, +{ + use timely::dataflow::operators::generic::Operator; + use timely::dataflow::channels::pact::Pipeline; + use differential_dataflow::AsCollection; + use differential_dataflow::difference::Multiply; + use differential_dataflow::lattice::Lattice; + use columnar::Columnar; + + input + .inner + .unary::, _, _, _>(Pipeline, "JoinFunction", move |_, _| { + move |input, output| { + input.for_each(|time, data| { + let mut session = output.session_with_builder(&time); + for (k1, v1, t1, d1) in data.updates.iter() { + let t1o: U::Time = Columnar::into_owned(t1); + let d1o: U::Diff = Columnar::into_owned(d1); + for (k2, v2, t2, d2) in logic(k1, v1, t1, d1) { + let t3 = t2.join(&t1o); + let d3 = d2.multiply(&d1o); + session.give((&k2, &v2, &t3, &d3)); + } + } + }); + } + }) + .as_collection() +} + +type DynTime = timely::order::Product>; + +/// Leave a dynamic iterative scope, truncating PointStamp coordinates. +/// +/// Uses OperatorBuilder (not unary) for the custom input connection summary +/// that tells timely how the PointStamp is affected (retain `level - 1` coordinates). +/// +/// Consolidates after truncation since distinct PointStamp coordinates can collapse. +pub fn leave_dynamic( + input: differential_dataflow::Collection>, + level: usize, +) -> differential_dataflow::Collection> +where + G: timely::dataflow::Scope, + K: columnar::Columnar, + V: columnar::Columnar, + R: columnar::Columnar, + (K, V, DynTime, R): layout::ColumnarUpdate, +{ + use timely::dataflow::channels::pact::Pipeline; + use timely::dataflow::operators::generic::builder_rc::OperatorBuilder; + use timely::dataflow::operators::generic::OutputBuilder; + use timely::order::Product; + use timely::progress::Antichain; + use timely::container::{ContainerBuilder, PushInto}; + use differential_dataflow::AsCollection; + use differential_dataflow::dynamic::pointstamp::{PointStamp, PointStampSummary}; + use columnar::Columnar; + + let mut builder = OperatorBuilder::new("LeaveDynamic".to_string(), input.inner.scope()); + let (output, stream) = builder.new_output(); + let mut output = OutputBuilder::from(output); + let mut op_input = builder.new_input_connection( + input.inner, + Pipeline, + [( + 0, + Antichain::from_elem(Product { + outer: Default::default(), + inner: PointStampSummary { + retain: Some(level - 1), + actions: Vec::new(), + }, + }), + )], + ); + + builder.build(move |_capability| { + let mut col_builder = ValColBuilder::<(K, V, DynTime, R)>::default(); + move |_frontier| { + let mut output = output.activate(); + op_input.for_each(|cap, data| { + // Truncate the capability's timestamp. + let mut new_time = cap.time().clone(); + let mut vec = std::mem::take(&mut new_time.inner).into_inner(); + vec.truncate(level - 1); + new_time.inner = PointStamp::new(vec); + let new_cap = cap.delayed(&new_time, 0); + // Push updates with truncated times into the builder. + // The builder's form call on flush sorts and consolidates, + // handling the duplicate times that truncation can produce. + // TODO: The input trie is already sorted; a streaming form + // that accepts pre-sorted, potentially-collapsing timestamps + // could avoid the re-sort inside the builder. + for (k, v, t, d) in data.updates.iter() { + let mut time: DynTime = Columnar::into_owned(t); + let mut inner_vec = std::mem::take(&mut time.inner).into_inner(); + inner_vec.truncate(level - 1); + time.inner = PointStamp::new(inner_vec); + col_builder.push_into((k, v, &time, d)); + } + let mut session = output.session(&new_cap); + while let Some(container) = col_builder.finish() { + session.give_container(container); + } + }); + } + }); + + stream.as_collection() +} + +/// Extract a `Collection<_, RecordedUpdates>` from a columnar `Arranged`. +/// +/// Cursors through each batch and pushes `(key, val, time, diff)` refs into +/// a `ValColBuilder`, which sorts and consolidates on flush. +pub fn as_recorded_updates( + arranged: differential_dataflow::operators::arrange::Arranged< + G, + differential_dataflow::operators::arrange::TraceAgent>, + >, +) -> differential_dataflow::Collection> +where + G: timely::dataflow::Scope, + U: layout::ColumnarUpdate, +{ + use timely::dataflow::operators::generic::Operator; + use timely::dataflow::channels::pact::Pipeline; + use differential_dataflow::trace::{BatchReader, Cursor}; + use differential_dataflow::AsCollection; + + arranged.stream + .unary::, _, _, _>(Pipeline, "AsRecordedUpdates", |_, _| { + move |input, output| { + input.for_each(|time, batches| { + let mut session = output.session_with_builder(&time); + for batch in batches.drain(..) { + let mut cursor = batch.cursor(); + while cursor.key_valid(&batch) { + while cursor.val_valid(&batch) { + let key = cursor.key(&batch); + let val = cursor.val(&batch); + cursor.map_times(&batch, |time, diff| { + session.give((key, val, time, diff)); + }); + cursor.step_val(&batch); + } + cursor.step_key(&batch); + } + } + }); + } + }) + .as_collection() +} From 333fa7437b08667d3e1112f832d556b642297beb Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Tue, 24 Mar 2026 08:32:19 -0400 Subject: [PATCH 3/3] Relocate files to avoid clippy complaints --- .../examples/{ => columnar}/columnar_support.rs | 0 differential-dataflow/examples/{columnar.rs => columnar/main.rs} | 1 - 2 files changed, 1 deletion(-) rename differential-dataflow/examples/{ => columnar}/columnar_support.rs (100%) rename differential-dataflow/examples/{columnar.rs => columnar/main.rs} (99%) diff --git a/differential-dataflow/examples/columnar_support.rs b/differential-dataflow/examples/columnar/columnar_support.rs similarity index 100% rename from differential-dataflow/examples/columnar_support.rs rename to differential-dataflow/examples/columnar/columnar_support.rs diff --git a/differential-dataflow/examples/columnar.rs b/differential-dataflow/examples/columnar/main.rs similarity index 99% rename from differential-dataflow/examples/columnar.rs rename to differential-dataflow/examples/columnar/main.rs index e2e5cb024..8404e7aa4 100644 --- a/differential-dataflow/examples/columnar.rs +++ b/differential-dataflow/examples/columnar/main.rs @@ -4,7 +4,6 @@ //! exercising Enter, Leave, Negate, ResultsIn on RecordedUpdates, //! and Push on Updates for the reduce builder path. -#[path = "columnar_support.rs"] mod columnar_support; use timely::container::{ContainerBuilder, PushInto};