diff --git a/differential-dataflow/examples/columnar.rs b/differential-dataflow/examples/columnar/columnar_support.rs similarity index 68% rename from differential-dataflow/examples/columnar.rs rename to differential-dataflow/examples/columnar/columnar_support.rs index 21ebf2397..ced700bec 100644 --- a/differential-dataflow/examples/columnar.rs +++ b/differential-dataflow/examples/columnar/columnar_support.rs @@ -1,97 +1,13 @@ -//! Wordcount based on `columnar`. +//! Columnar container infrastructure for differential dataflow. +//! +//! Provides trie-structured update storage (`Updates`, `RecordedUpdates`), +//! columnar arrangement types (`ValSpine`, `ValBatcher`, `ValBuilder`), +//! container traits for iterative scopes (`Enter`, `Leave`, `Negate`, `ResultsIn`), +//! exchange distribution (`ValPact`), and operators (`join_function`, `leave_dynamic`). +//! +//! Include via `#[path = "columnar_support.rs"] mod columnar_support;` -use timely::container::{ContainerBuilder, PushInto}; -use timely::dataflow::InputHandle; -use timely::dataflow::ProbeHandle; - -use differential_dataflow::operators::arrange::arrangement::arrange_core; - -use mimalloc::MiMalloc; - -#[global_allocator] -static GLOBAL: MiMalloc = MiMalloc; - -fn main() { - - type WordCount = (Vec, (), u64, i64); - type Builder = ValColBuilder; - - let keys: usize = std::env::args().nth(1).expect("missing argument 1").parse().unwrap(); - let size: usize = std::env::args().nth(2).expect("missing argument 2").parse().unwrap(); - - let timer1 = ::std::time::Instant::now(); - let timer2 = timer1.clone(); - - timely::execute_from_args(std::env::args(), move |worker| { - - let mut data_input = >::new_with_builder(); - let mut keys_input = >::new_with_builder(); - let mut probe = ProbeHandle::new(); - - worker.dataflow::(|scope| { - let data = data_input.to_stream(scope); - let keys = keys_input.to_stream(scope); - - use differential_dataflow::Hashable; - let data_pact = ValPact { hashfunc: |k: columnar::Ref<'_, Vec>| k.hashed() }; - let keys_pact = ValPact { hashfunc: |k: columnar::Ref<'_, Vec>| k.hashed() }; - - let data = arrange_core::<_,_,ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(data, data_pact, "Data"); - let keys = arrange_core::<_,_,ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(keys, keys_pact, "Keys"); - - keys.join_core(data, |_k, (), ()| { Option::<()>::None }) - .probe_with(&mut probe); - }); - - use std::fmt::Write; - let mut buffer = String::default(); - let mut builder = Builder::default(); - - let mut counter = 0; - while counter < 10 * keys { - let mut i = worker.index(); - let time = *data_input.time(); - while i < size { - let val = (counter + i) % keys; - write!(buffer, "{:?}", val).unwrap(); - builder.push_into((buffer.as_bytes(), (), time, 1)); - buffer.clear(); - i += worker.peers(); - } - while let Some(container) = builder.finish() { - data_input.send_batch(container); - } - counter += size; - data_input.advance_to(data_input.time() + 1); - keys_input.advance_to(keys_input.time() + 1); - while probe.less_than(data_input.time()) { worker.step_or_park(None); } - } - println!("{:?}\tloading complete", timer1.elapsed()); - - let mut queries = 0; - while queries < 10 * keys { - let mut i = worker.index(); - let time = *data_input.time(); - while i < size { - let val = (queries + i) % keys; - write!(buffer, "{:?}", val).unwrap(); - builder.push_into((buffer.as_bytes(), (), time, 1)); - buffer.clear(); - i += worker.peers(); - } - while let Some(container) = builder.finish() { - keys_input.send_batch(container); - } - queries += size; - data_input.advance_to(data_input.time() + 1); - keys_input.advance_to(keys_input.time() + 1); - while probe.less_than(data_input.time()) { worker.step_or_park(None); } - } - println!("{:?}\tqueries complete", timer1.elapsed()); - - }).unwrap(); - println!("{:?}\tshut down", timer2.elapsed()); -} +#![allow(dead_code, unused_imports)] pub use layout::{ColumnarLayout, ColumnarUpdate}; pub mod layout { @@ -174,6 +90,106 @@ impl timely::dataflow::channels::ContainerBytes for R fn into_bytes(&self, _writer: &mut W) { unimplemented!() } } +// Container trait impls for RecordedUpdates, enabling iterative scopes. +mod container_impls { + use columnar::{Borrow, Columnar, Index, Len, Push}; + use timely::progress::{Timestamp, timestamp::Refines}; + use differential_dataflow::difference::Abelian; + use differential_dataflow::collection::containers::{Negate, Enter, Leave, ResultsIn}; + + use crate::layout::ColumnarUpdate as Update; + use crate::{RecordedUpdates, Updates}; + + impl> Negate for RecordedUpdates { + fn negate(mut self) -> Self { + let len = self.updates.diffs.values.len(); + let mut new_diffs = <::Container as Default>::default(); + let mut owned = U::Diff::default(); + for i in 0..len { + columnar::Columnar::copy_from(&mut owned, self.updates.diffs.values.borrow().get(i)); + owned.negate(); + new_diffs.push(&owned); + } + self.updates.diffs.values = new_diffs; + self + } + } + + impl Enter for RecordedUpdates<(K, V, T1, R)> + where + (K, V, T1, R): Update, + (K, V, T2, R): Update, + T1: Timestamp + Columnar + Default + Clone, + T2: Refines + Columnar + Default + Clone, + K: Columnar, V: Columnar, R: Columnar, + { + type InnerContainer = RecordedUpdates<(K, V, T2, R)>; + fn enter(self) -> Self::InnerContainer { + // Rebuild the time column; everything else moves as-is. + let mut new_times = <::Container as Default>::default(); + let mut t1_owned = T1::default(); + for i in 0..self.updates.times.values.len() { + Columnar::copy_from(&mut t1_owned, self.updates.times.values.borrow().get(i)); + let t2 = T2::to_inner(t1_owned.clone()); + new_times.push(&t2); + } + RecordedUpdates { + updates: Updates { + keys: self.updates.keys, + vals: self.updates.vals, + times: crate::updates::Lists { values: new_times, bounds: self.updates.times.bounds }, + diffs: self.updates.diffs, + }, + records: self.records, + } + } + } + + impl Leave for RecordedUpdates<(K, V, T1, R)> + where + (K, V, T1, R): Update, + (K, V, T2, R): Update, + T1: Refines + Columnar + Default + Clone, + T2: Timestamp + Columnar + Default + Clone, + K: Columnar, V: Columnar, R: Columnar, + { + type OuterContainer = RecordedUpdates<(K, V, T2, R)>; + fn leave(self) -> Self::OuterContainer { + // Flatten, convert times, and reconsolidate via consolidate. + // Leave can collapse distinct T1 times to the same T2 time, + // so the trie must be rebuilt with consolidation. + let mut flat = Updates::<(K, V, T2, R)>::default(); + let mut t1_owned = T1::default(); + for (k, v, t, d) in self.updates.iter() { + Columnar::copy_from(&mut t1_owned, t); + let t2: T2 = t1_owned.clone().to_outer(); + flat.push((k, v, &t2, d)); + } + RecordedUpdates { + updates: flat.consolidate(), + records: self.records, + } + } + } + + impl ResultsIn<::Summary> for RecordedUpdates { + fn results_in(self, step: &::Summary) -> Self { + use timely::progress::PathSummary; + // Apply results_in to each time; drop updates whose time maps to None. + // This must rebuild the trie since some entries may be removed. + let mut output = Updates::::default(); + let mut time_owned = U::Time::default(); + for (k, v, t, d) in self.updates.iter() { + Columnar::copy_from(&mut time_owned, t); + if let Some(new_time) = step.results_in(&time_owned) { + output.push((k, v, &new_time, d)); + } + } + RecordedUpdates { updates: output, records: self.records } + } + } +} + pub use column_builder::ValBuilder as ValColBuilder; mod column_builder { @@ -256,7 +272,7 @@ mod column_builder { } -use distributor::ValPact; +pub use distributor::ValPact; mod distributor { use std::rc::Rc; @@ -731,15 +747,32 @@ pub mod arrangement { output } - pub struct ValMirror { marker: std::marker::PhantomData } + pub struct ValMirror { + current: Updates, + } impl differential_dataflow::trace::Builder for ValMirror { type Time = U::Time; type Input = Updates; type Output = OrdValBatch>; - fn with_capacity(_keys: usize, _vals: usize, _upds: usize) -> Self { Self { marker: std::marker::PhantomData } } - fn push(&mut self, _chunk: &mut Self::Input) { unimplemented!() } - fn done(self, _description: Description) -> Self::Output { unimplemented!() } + fn with_capacity(_keys: usize, _vals: usize, _upds: usize) -> Self { + Self { current: Updates::default() } + } + fn push(&mut self, chunk: &mut Self::Input) { + use columnar::Len; + let len = chunk.keys.values.len(); + if len > 0 { + self.current.extend_from_keys(chunk, 0..len); + } + } + fn done(self, description: Description) -> Self::Output { + let mut chain = if self.current.len() > 0 { + vec![self.current] + } else { + vec![] + }; + Self::seal(&mut chain, description) + } fn seal(chain: &mut Vec, description: Description) -> Self::Output { if chain.len() == 0 { let storage = OrdValStorage { @@ -770,8 +803,16 @@ pub mod arrangement { OrdValBatch { storage, description, updates } } else { - println!("chain length: {:?}", chain.len()); - unimplemented!() + use columnar::Len; + let mut merged = chain.remove(0); + for other in chain.drain(..) { + let len = other.keys.values.len(); + if len > 0 { + merged.extend_from_keys(&other, 0..len); + } + } + chain.push(merged); + Self::seal(chain, description) } } } @@ -894,63 +935,96 @@ pub mod updates { /// Forms a consolidated `Updates` from sorted `(key, val, time, diff)` refs. /// - /// Follows the same dedup pattern at each level: check if the current - /// entry differs from the previous, seal the previous group if so. - /// At the time level, equal `(key, val, time)` triples have diffs accumulated. - /// The `records` field tracks the input count for exchange accounting. + /// Tracks a `prev` reference to the previous element. On each new element, + /// compares against `prev` to detect key/val/time changes. Only pushes + /// accumulated diffs when they are nonzero, and only emits times/vals/keys + /// that have at least one nonzero diff beneath them. pub fn form<'a>(mut sorted: impl Iterator>>) -> Self { let mut output = Self::default(); let mut diff_stash = U::Diff::default(); let mut diff_temp = U::Diff::default(); - if let Some((key, val, time, diff)) = sorted.next() { - output.keys.values.push(key); - output.vals.values.push(val); - output.times.values.push(time); - Columnar::copy_from(&mut diff_stash, diff); + if let Some(first) = sorted.next() { - for (key, val, time, diff) in sorted { - let mut differs = false; - // Keys: seal vals for previous key if key changed. - let keys_len = output.keys.values.len(); - differs |= ContainerOf::::reborrow_ref(key) != output.keys.values.borrow().get(keys_len - 1); - if differs { output.keys.values.push(key); } - // Vals: seal times for previous val if key or val changed. - let vals_len = output.vals.values.len(); - if differs { output.vals.bounds.push(vals_len as u64); } - differs |= ContainerOf::::reborrow_ref(val) != output.vals.values.borrow().get(vals_len - 1); - if differs { output.vals.values.push(val); } - // Times: seal diffs for previous time if key, val, or time changed. - let times_len = output.times.values.len(); - if differs { output.times.bounds.push(times_len as u64); } - differs |= ContainerOf::::reborrow_ref(time) != output.times.values.borrow().get(times_len - 1); - if differs { - // Flush accumulated diff for the previous time. + let mut prev = first; + Columnar::copy_from(&mut diff_stash, prev.3); + + for curr in sorted { + let key_differs = ContainerOf::::reborrow_ref(curr.0) != ContainerOf::::reborrow_ref(prev.0); + let val_differs = key_differs || ContainerOf::::reborrow_ref(curr.1) != ContainerOf::::reborrow_ref(prev.1); + let time_differs = val_differs || ContainerOf::::reborrow_ref(curr.2) != ContainerOf::::reborrow_ref(prev.2); + + if time_differs { + // Flush the accumulated diff for prev's (key, val, time). if !diff_stash.is_zero() { + // We have a real update to emit. Push time (and val/key + // if this is the first time under them). + let times_len = output.times.values.len(); + let vals_len = output.vals.values.len(); + + if val_differs { + // Seal the previous val's time list, if any times were emitted. + if times_len > 0 { + output.times.bounds.push(times_len as u64); + } + if key_differs { + // Seal the previous key's val list, if any vals were emitted. + if vals_len > 0 { + output.vals.bounds.push(vals_len as u64); + } + output.keys.values.push(prev.0); + } + output.vals.values.push(prev.1); + } + output.times.values.push(prev.2); output.diffs.values.push(&diff_stash); + output.diffs.bounds.push(output.diffs.values.len() as u64); } - // TODO: Else is complicated, as we may want to pop prior values. - // It is perhaps fine to leave zeros as a thing that won't - // invalidate merging. - output.diffs.bounds.push(output.diffs.values.len() as u64); - // Start new time. - output.times.values.push(time); - Columnar::copy_from(&mut diff_stash, diff); + Columnar::copy_from(&mut diff_stash, curr.3); } else { // Same (key, val, time): accumulate diff. - Columnar::copy_from(&mut diff_temp, diff); + Columnar::copy_from(&mut diff_temp, curr.3); diff_stash.plus_equals(&diff_temp); } + prev = curr; } - // Flush the last accumulated diff and seal all levels. + + // Flush the final accumulated diff. if !diff_stash.is_zero() { + let keys_len = output.keys.values.len(); + let vals_len = output.vals.values.len(); + let times_len = output.times.values.len(); + let need_key = keys_len == 0 || ContainerOf::::reborrow_ref(prev.0) != output.keys.values.borrow().get(keys_len - 1); + let need_val = need_key || vals_len == 0 || ContainerOf::::reborrow_ref(prev.1) != output.vals.values.borrow().get(vals_len - 1); + + if need_val { + if times_len > 0 { + output.times.bounds.push(times_len as u64); + } + if need_key { + if vals_len > 0 { + output.vals.bounds.push(vals_len as u64); + } + output.keys.values.push(prev.0); + } + output.vals.values.push(prev.1); + } + output.times.values.push(prev.2); output.diffs.values.push(&diff_stash); + output.diffs.bounds.push(output.diffs.values.len() as u64); + } + + // Seal the final groups at each level. + if !output.times.values.is_empty() { + output.times.bounds.push(output.times.values.len() as u64); + } + if !output.vals.values.is_empty() { + output.vals.bounds.push(output.vals.values.len() as u64); + } + if !output.keys.values.is_empty() { + output.keys.bounds.push(output.keys.values.len() as u64); } - output.diffs.bounds.push(output.diffs.values.len() as u64); - output.times.bounds.push(output.times.values.len() as u64); - output.vals.bounds.push(output.vals.values.len() as u64); - output.keys.bounds.push(output.keys.values.len() as u64); } output @@ -1054,8 +1128,22 @@ pub mod updates { output } - /// Push a single flat update `(key, val, time, diff)` as a stride-1 entry. - pub fn push<'a>(&mut self, key: columnar::Ref<'a, U::Key>, val: columnar::Ref<'a, U::Val>, time: columnar::Ref<'a, U::Time>, diff: columnar::Ref<'a, U::Diff>) { + /// The number of leaf-level diff entries (total updates). + pub fn len(&self) -> usize { self.diffs.values.len() } + } + + /// Push a single flat update as a stride-1 entry. + /// + /// Each field is independently typed — columnar refs, `&Owned`, owned values, + /// or any other type the column container accepts via its `Push` impl. + impl Push<(KP, VP, TP, DP)> for Updates + where + ContainerOf: Push, + ContainerOf: Push, + ContainerOf: Push, + ContainerOf: Push, + { + fn push(&mut self, (key, val, time, diff): (KP, VP, TP, DP)) { self.keys.values.push(key); self.keys.bounds.push(self.keys.values.len() as u64); self.vals.values.push(val); @@ -1065,21 +1153,16 @@ pub mod updates { self.diffs.values.push(diff); self.diffs.bounds.push(self.diffs.values.len() as u64); } + } - /// Push a single flat update from owned values. - pub fn push_owned(&mut self, key: &U::Key, val: &U::Val, time: &U::Time, diff: &U::Diff) { - self.keys.values.push(key); - self.keys.bounds.push(self.keys.values.len() as u64); - self.vals.values.push(val); - self.vals.bounds.push(self.vals.values.len() as u64); - self.times.values.push(time); - self.times.bounds.push(self.times.values.len() as u64); - self.diffs.values.push(diff); - self.diffs.bounds.push(self.diffs.values.len() as u64); + /// PushInto for the `((K, V), T, R)` shape that reduce_trace uses. + impl timely::container::PushInto<((U::Key, U::Val), U::Time, U::Diff)> for Updates { + fn push_into(&mut self, ((key, val), time, diff): ((U::Key, U::Val), U::Time, U::Diff)) { + self.push((&key, &val, &time, &diff)); } + } - /// The number of leaf-level diff entries (total updates). - pub fn len(&self) -> usize { self.diffs.values.len() } + impl Updates { /// Iterate all `(key, val, time, diff)` entries as refs. pub fn iter(&self) -> impl Iterator) -> Vec<(u64, u64, u64, i64)> { updates.iter().map(|(k, v, t, d)| (*k, *v, *t, *d)).collect() } @@ -1138,49 +1217,39 @@ pub mod updates { #[test] fn test_push_and_consolidate_basic() { let mut updates = Updates::::default(); - updates.push_owned(&1, &10, &100, &1); - updates.push_owned(&1, &10, &100, &2); - updates.push_owned(&2, &20, &200, &5); - + updates.push((&1, &10, &100, &1)); + updates.push((&1, &10, &100, &2)); + updates.push((&2, &20, &200, &5)); assert_eq!(updates.len(), 3); - - let consolidated = updates.consolidate(); - let entries = collect(&consolidated); - assert_eq!(entries, vec![(1, 10, 100, 3), (2, 20, 200, 5)]); + assert_eq!(collect(&updates.consolidate()), vec![(1, 10, 100, 3), (2, 20, 200, 5)]); } #[test] fn test_cancellation() { let mut updates = Updates::::default(); - updates.push_owned(&1, &10, &100, &3); - updates.push_owned(&1, &10, &100, &-3); - updates.push_owned(&2, &20, &200, &1); - - let entries = collect(&updates.consolidate()); - assert_eq!(entries, vec![(2, 20, 200, 1)]); + updates.push((&1, &10, &100, &3)); + updates.push((&1, &10, &100, &-3)); + updates.push((&2, &20, &200, &1)); + assert_eq!(collect(&updates.consolidate()), vec![(2, 20, 200, 1)]); } #[test] fn test_multiple_vals_and_times() { let mut updates = Updates::::default(); - updates.push_owned(&1, &10, &100, &1); - updates.push_owned(&1, &10, &200, &2); - updates.push_owned(&1, &20, &100, &3); - updates.push_owned(&1, &20, &100, &4); - - let entries = collect(&updates.consolidate()); - assert_eq!(entries, vec![(1, 10, 100, 1), (1, 10, 200, 2), (1, 20, 100, 7)]); + updates.push((&1, &10, &100, &1)); + updates.push((&1, &10, &200, &2)); + updates.push((&1, &20, &100, &3)); + updates.push((&1, &20, &100, &4)); + assert_eq!(collect(&updates.consolidate()), vec![(1, 10, 100, 1), (1, 10, 200, 2), (1, 20, 100, 7)]); } #[test] fn test_val_cancellation_propagates() { let mut updates = Updates::::default(); - updates.push_owned(&1, &10, &100, &5); - updates.push_owned(&1, &10, &100, &-5); - updates.push_owned(&1, &20, &100, &1); - - let entries = collect(&updates.consolidate()); - assert_eq!(entries, vec![(1, 20, 100, 1)]); + updates.push((&1, &10, &100, &5)); + updates.push((&1, &10, &100, &-5)); + updates.push((&1, &20, &100, &1)); + assert_eq!(collect(&updates.consolidate()), vec![(1, 20, 100, 1)]); } #[test] @@ -1192,20 +1261,231 @@ pub mod updates { #[test] fn test_total_cancellation() { let mut updates = Updates::::default(); - updates.push_owned(&1, &10, &100, &1); - updates.push_owned(&1, &10, &100, &-1); + updates.push((&1, &10, &100, &1)); + updates.push((&1, &10, &100, &-1)); assert_eq!(collect(&updates.consolidate()), vec![]); } #[test] fn test_unsorted_input() { let mut updates = Updates::::default(); - updates.push_owned(&3, &30, &300, &1); - updates.push_owned(&1, &10, &100, &2); - updates.push_owned(&2, &20, &200, &3); + updates.push((&3, &30, &300, &1)); + updates.push((&1, &10, &100, &2)); + updates.push((&2, &20, &200, &3)); + assert_eq!(collect(&updates.consolidate()), vec![(1, 10, 100, 2), (2, 20, 200, 3), (3, 30, 300, 1)]); + } + + #[test] + fn test_first_key_cancels() { + let mut updates = Updates::::default(); + updates.push((&1, &10, &100, &5)); + updates.push((&1, &10, &100, &-5)); + updates.push((&2, &20, &200, &3)); + assert_eq!(collect(&updates.consolidate()), vec![(2, 20, 200, 3)]); + } - let entries = collect(&updates.consolidate()); - assert_eq!(entries, vec![(1, 10, 100, 2), (2, 20, 200, 3), (3, 30, 300, 1)]); + #[test] + fn test_middle_time_cancels() { + let mut updates = Updates::::default(); + updates.push((&1, &10, &100, &1)); + updates.push((&1, &10, &200, &2)); + updates.push((&1, &10, &200, &-2)); + updates.push((&1, &10, &300, &3)); + assert_eq!(collect(&updates.consolidate()), vec![(1, 10, 100, 1), (1, 10, 300, 3)]); + } + + #[test] + fn test_first_val_cancels() { + let mut updates = Updates::::default(); + updates.push((&1, &10, &100, &1)); + updates.push((&1, &10, &100, &-1)); + updates.push((&1, &20, &100, &5)); + assert_eq!(collect(&updates.consolidate()), vec![(1, 20, 100, 5)]); + } + + #[test] + fn test_interleaved_cancellations() { + let mut updates = Updates::::default(); + updates.push((&1, &10, &100, &1)); + updates.push((&1, &10, &100, &-1)); + updates.push((&2, &20, &200, &7)); + updates.push((&3, &30, &300, &4)); + updates.push((&3, &30, &300, &-4)); + assert_eq!(collect(&updates.consolidate()), vec![(2, 20, 200, 7)]); } } } + +/// A columnar flat_map: iterates RecordedUpdates, calls logic per (key, val, time, diff), +/// joins output times with input times, multiplies output diffs with input diffs. +/// +/// This subsumes map, filter, negate, and enter_at for columnar collections. +pub fn join_function( + input: differential_dataflow::Collection>, + mut logic: L, +) -> differential_dataflow::Collection> +where + G: timely::dataflow::Scope, + G::Timestamp: differential_dataflow::lattice::Lattice, + U: layout::ColumnarUpdate>, + I: IntoIterator, + L: FnMut( + columnar::Ref<'_, U::Key>, + columnar::Ref<'_, U::Val>, + columnar::Ref<'_, U::Time>, + columnar::Ref<'_, U::Diff>, + ) -> I + 'static, +{ + use timely::dataflow::operators::generic::Operator; + use timely::dataflow::channels::pact::Pipeline; + use differential_dataflow::AsCollection; + use differential_dataflow::difference::Multiply; + use differential_dataflow::lattice::Lattice; + use columnar::Columnar; + + input + .inner + .unary::, _, _, _>(Pipeline, "JoinFunction", move |_, _| { + move |input, output| { + input.for_each(|time, data| { + let mut session = output.session_with_builder(&time); + for (k1, v1, t1, d1) in data.updates.iter() { + let t1o: U::Time = Columnar::into_owned(t1); + let d1o: U::Diff = Columnar::into_owned(d1); + for (k2, v2, t2, d2) in logic(k1, v1, t1, d1) { + let t3 = t2.join(&t1o); + let d3 = d2.multiply(&d1o); + session.give((&k2, &v2, &t3, &d3)); + } + } + }); + } + }) + .as_collection() +} + +type DynTime = timely::order::Product>; + +/// Leave a dynamic iterative scope, truncating PointStamp coordinates. +/// +/// Uses OperatorBuilder (not unary) for the custom input connection summary +/// that tells timely how the PointStamp is affected (retain `level - 1` coordinates). +/// +/// Consolidates after truncation since distinct PointStamp coordinates can collapse. +pub fn leave_dynamic( + input: differential_dataflow::Collection>, + level: usize, +) -> differential_dataflow::Collection> +where + G: timely::dataflow::Scope, + K: columnar::Columnar, + V: columnar::Columnar, + R: columnar::Columnar, + (K, V, DynTime, R): layout::ColumnarUpdate, +{ + use timely::dataflow::channels::pact::Pipeline; + use timely::dataflow::operators::generic::builder_rc::OperatorBuilder; + use timely::dataflow::operators::generic::OutputBuilder; + use timely::order::Product; + use timely::progress::Antichain; + use timely::container::{ContainerBuilder, PushInto}; + use differential_dataflow::AsCollection; + use differential_dataflow::dynamic::pointstamp::{PointStamp, PointStampSummary}; + use columnar::Columnar; + + let mut builder = OperatorBuilder::new("LeaveDynamic".to_string(), input.inner.scope()); + let (output, stream) = builder.new_output(); + let mut output = OutputBuilder::from(output); + let mut op_input = builder.new_input_connection( + input.inner, + Pipeline, + [( + 0, + Antichain::from_elem(Product { + outer: Default::default(), + inner: PointStampSummary { + retain: Some(level - 1), + actions: Vec::new(), + }, + }), + )], + ); + + builder.build(move |_capability| { + let mut col_builder = ValColBuilder::<(K, V, DynTime, R)>::default(); + move |_frontier| { + let mut output = output.activate(); + op_input.for_each(|cap, data| { + // Truncate the capability's timestamp. + let mut new_time = cap.time().clone(); + let mut vec = std::mem::take(&mut new_time.inner).into_inner(); + vec.truncate(level - 1); + new_time.inner = PointStamp::new(vec); + let new_cap = cap.delayed(&new_time, 0); + // Push updates with truncated times into the builder. + // The builder's form call on flush sorts and consolidates, + // handling the duplicate times that truncation can produce. + // TODO: The input trie is already sorted; a streaming form + // that accepts pre-sorted, potentially-collapsing timestamps + // could avoid the re-sort inside the builder. + for (k, v, t, d) in data.updates.iter() { + let mut time: DynTime = Columnar::into_owned(t); + let mut inner_vec = std::mem::take(&mut time.inner).into_inner(); + inner_vec.truncate(level - 1); + time.inner = PointStamp::new(inner_vec); + col_builder.push_into((k, v, &time, d)); + } + let mut session = output.session(&new_cap); + while let Some(container) = col_builder.finish() { + session.give_container(container); + } + }); + } + }); + + stream.as_collection() +} + +/// Extract a `Collection<_, RecordedUpdates>` from a columnar `Arranged`. +/// +/// Cursors through each batch and pushes `(key, val, time, diff)` refs into +/// a `ValColBuilder`, which sorts and consolidates on flush. +pub fn as_recorded_updates( + arranged: differential_dataflow::operators::arrange::Arranged< + G, + differential_dataflow::operators::arrange::TraceAgent>, + >, +) -> differential_dataflow::Collection> +where + G: timely::dataflow::Scope, + U: layout::ColumnarUpdate, +{ + use timely::dataflow::operators::generic::Operator; + use timely::dataflow::channels::pact::Pipeline; + use differential_dataflow::trace::{BatchReader, Cursor}; + use differential_dataflow::AsCollection; + + arranged.stream + .unary::, _, _, _>(Pipeline, "AsRecordedUpdates", |_, _| { + move |input, output| { + input.for_each(|time, batches| { + let mut session = output.session_with_builder(&time); + for batch in batches.drain(..) { + let mut cursor = batch.cursor(); + while cursor.key_valid(&batch) { + while cursor.val_valid(&batch) { + let key = cursor.key(&batch); + let val = cursor.val(&batch); + cursor.map_times(&batch, |time, diff| { + session.give((key, val, time, diff)); + }); + cursor.step_val(&batch); + } + cursor.step_key(&batch); + } + } + }); + } + }) + .as_collection() +} diff --git a/differential-dataflow/examples/columnar/main.rs b/differential-dataflow/examples/columnar/main.rs new file mode 100644 index 000000000..8404e7aa4 --- /dev/null +++ b/differential-dataflow/examples/columnar/main.rs @@ -0,0 +1,184 @@ +//! Columnar reachability example for differential dataflow. +//! +//! Demonstrates columnar-backed arrangements in an iterative scope, +//! exercising Enter, Leave, Negate, ResultsIn on RecordedUpdates, +//! and Push on Updates for the reduce builder path. + +mod columnar_support; + +use timely::container::{ContainerBuilder, PushInto}; +use timely::dataflow::InputHandle; +use timely::dataflow::ProbeHandle; + +use columnar_support::*; + +use mimalloc::MiMalloc; + +#[global_allocator] +static GLOBAL: MiMalloc = MiMalloc; + +fn main() { + + let nodes: u32 = std::env::args().nth(1).unwrap_or("100".into()).parse().unwrap(); + let edges: u32 = std::env::args().nth(2).unwrap_or("300".into()).parse().unwrap(); + + let timer = ::std::time::Instant::now(); + + timely::execute_from_args(std::env::args(), move |worker| { + + type EdgeUpdate = (u32, u32, u64, i64); + type NodeUpdate = (u32, (), u64, i64); + type EdgeBuilder = ValColBuilder; + type NodeBuilder = ValColBuilder; + + let mut edge_input = >::new_with_builder(); + let mut root_input = >::new_with_builder(); + let mut probe = ProbeHandle::new(); + + worker.dataflow::(|scope| { + use differential_dataflow::AsCollection; + use timely::dataflow::operators::Probe; + + let edges = edge_input.to_stream(scope).as_collection(); + let roots = root_input.to_stream(scope).as_collection(); + + reachability::reach(edges, roots) + .inner + .probe_with(&mut probe); + }); + + // Generate a small random graph. + let mut edge_builder = EdgeBuilder::default(); + let mut node_builder = NodeBuilder::default(); + + if worker.index() == 0 { + // Simple deterministic "random" edges. + let mut src: u32 = 0; + for _ in 0..edges { + let dst = (src.wrapping_mul(7).wrapping_add(13)) % nodes; + edge_builder.push_into((src, dst, 0u64, 1i64)); + src = (src + 1) % nodes; + } + // Root: node 0. + node_builder.push_into((0u32, (), 0u64, 1i64)); + } + + while let Some(container) = edge_builder.finish() { + edge_input.send_batch(container); + } + while let Some(container) = node_builder.finish() { + root_input.send_batch(container); + } + + edge_input.advance_to(1); + root_input.advance_to(1); + edge_input.flush(); + root_input.flush(); + + while probe.less_than(edge_input.time()) { + worker.step_or_park(None); + } + + println!("{:?}\treachability complete ({} nodes, {} edges)", timer.elapsed(), nodes, edges); + + }).unwrap(); + println!("{:?}\tshut down", timer.elapsed()); +} + +/// Reachability on a random directed graph using columnar containers. +/// +/// This module exercises the container traits needed for iterative columnar +/// computation: Enter, Leave, Negate, ResultsIn on RecordedUpdates, and +/// Push on Updates for the reduce builder path. +mod reachability { + + use timely::order::Product; + use timely::dataflow::Scope; + use differential_dataflow::Collection; + use differential_dataflow::AsCollection; + use differential_dataflow::operators::iterate::Variable; + use differential_dataflow::operators::arrange::arrangement::arrange_core; + use differential_dataflow::operators::join::join_traces; + + use crate::columnar_support::*; + + type Node = u32; + type Time = u64; + type Diff = i64; + type IterTime = Product; + + /// Compute the set of nodes reachable from `roots` along directed `edges`. + /// + /// Returns `(node, ())` for each reachable node. + pub fn reach>( + edges: Collection>, + roots: Collection>, + ) -> Collection> + { + let mut scope = edges.inner.scope(); + + scope.iterative::(|nested| { + let summary = Product::new(Time::default(), 1); + + let roots_inner = roots.enter(nested); + let (variable, reach) = Variable::new_from(roots_inner.clone(), summary); + let edges_inner = edges.enter(nested); + + // Arrange both collections into columnar spines for joining. + let edges_pact = ValPact { hashfunc: |k: columnar::Ref<'_, Node>| *k as u64 }; + let reach_pact = ValPact { hashfunc: |k: columnar::Ref<'_, Node>| *k as u64 }; + + let edges_arr = arrange_core::<_, _, + ValBatcher, + ValBuilder, + ValSpine, + >(edges_inner.inner, edges_pact, "Edges"); + + let reach_arr = arrange_core::<_, _, + ValBatcher, + ValBuilder, + ValSpine, + >(reach.inner, reach_pact, "Reach"); + + // join_traces with ValColBuilder: produces Stream<_, RecordedUpdates<...>>. + let proposed = + join_traces::<_, _, _, _, ValColBuilder<(Node, (), IterTime, Diff)>>( + edges_arr, + reach_arr, + |_src, dst, (), time, d1, d2, session| { + use differential_dataflow::difference::Multiply; + let dst: Node = *dst; + let diff: Diff = d1.clone().multiply(d2); + session.give::<(Node, (), IterTime, Diff)>((dst, (), time.clone(), diff)); + }, + ).as_collection(); + + // concat: both sides are now Collection<_, RecordedUpdates<...>>. + let combined = proposed.concat(roots_inner); + + // Arrange for reduce. + let combined_pact = ValPact { hashfunc: |k: columnar::Ref<'_, Node>| *k as u64 }; + let combined_arr = arrange_core::<_, _, + ValBatcher, + ValBuilder, + ValSpine, + >(combined.inner, combined_pact, "Combined"); + + // reduce_abelian on the columnar arrangement. + let result = combined_arr.reduce_abelian::<_, + ValBuilder, + ValSpine, + >("Distinct", |_node, _input, output| { + output.push(((), 1)); + }); + + // Extract RecordedUpdates from the Arranged's batch stream. + let result_col = as_recorded_updates::<_, (Node, (), IterTime, Diff)>(result); + + variable.set(result_col.clone()); + + // Leave the iterative scope. + result_col.leave() + }) + } +}