From dbc7dcca29b8e8bf2031b1af9f381fd82950df91 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Wed, 25 Mar 2026 07:48:48 -0400 Subject: [PATCH] Update to columnar 0.12 --- Cargo.toml | 2 +- timely/examples/columnar.rs | 26 +++++++++++++------------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index f309dd658..a17f83fb9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,7 @@ edition = "2021" rust-version = "1.86" [workspace.dependencies] -columnar = "0.11" +columnar = "0.12" [workspace.lints.clippy] type_complexity = "allow" diff --git a/timely/examples/columnar.rs b/timely/examples/columnar.rs index c8c3b8400..1a87ab164 100644 --- a/timely/examples/columnar.rs +++ b/timely/examples/columnar.rs @@ -48,7 +48,7 @@ fn main() { let mut session = output.session(&time); for data in data { for wordcount in data.borrow().into_index_iter().flat_map(|wordcount| { - wordcount.text.split_whitespace().map(move |text| WordCountReference { text, diff: wordcount.diff }) + wordcount.text.split(|b| b.is_ascii_whitespace()).filter(|s| !s.is_empty()).map(move |text| WordCountReference { text, diff: wordcount.diff }) }) { session.give(wordcount); } @@ -59,7 +59,7 @@ fn main() { ) .container::() .unary_frontier( - ExchangeCore::,_>::new_core(|x: &WordCountReference<&str,&i64>| x.text.len() as u64), + ExchangeCore::,_>::new_core(|x: &WordCountReference<&[u8],&i64>| x.text.len() as u64), "WordCount", |_capability, _info| { let mut queues = HashMap::new(); @@ -85,7 +85,7 @@ fn main() { *count } else { - counts.insert(wordcount.text.to_string(), *wordcount.diff); + counts.insert(wordcount.text.to_vec(), *wordcount.diff); *wordcount.diff }; session.give(WordCountReference { text: wordcount.text, diff: total }); @@ -104,7 +104,7 @@ fn main() { Ok((time, data)) => { println!("seen at: {:?}\t{:?} records", time, data.record_count()); for wc in data.borrow().into_index_iter() { - println!(" {}: {}", wc.text, wc.diff); + println!(" {}: {}", std::str::from_utf8(wc.text).unwrap_or(""), wc.diff); } }, Err(frontier) => println!("frontier advanced to {:?}", frontier), @@ -135,7 +135,7 @@ mod container { pub struct Column { pub stash: Stash } use columnar::{Len, Index}; - use columnar::bytes::{EncodeDecode, Indexed}; + use columnar::bytes::indexed; use columnar::common::IterOwn; impl Column { @@ -157,7 +157,7 @@ mod container { fn at_capacity(&self) -> bool { match &self.stash { Stash::Typed(t) => { - let length_in_bytes = 8 * Indexed::length_in_words(&t.borrow()); + let length_in_bytes = 8 * indexed::length_in_words(&t.borrow()); length_in_bytes >= (1 << 20) }, Stash::Bytes(_) => true, @@ -167,14 +167,14 @@ mod container { fn ensure_capacity(&mut self, _stash: &mut Option) { } } - impl timely::container::PushInto for Column where C: columnar::Push { + impl timely::container::PushInto for Column where C: columnar::Push { #[inline] fn push_into(&mut self, item: T) { use columnar::Push; self.stash.push(item) } } impl timely::dataflow::channels::ContainerBytes for Column { - fn from_bytes(bytes: timely::bytes::arc::Bytes) -> Self { Self { stash: bytes.into() } } + fn from_bytes(bytes: timely::bytes::arc::Bytes) -> Self { Self { stash: Stash::try_from_bytes(bytes).expect("valid columnar data") } } fn length_in_bytes(&self) -> usize { self.stash.length_in_bytes() } - fn into_bytes(&self, writer: &mut W) { self.stash.into_bytes(writer) } + fn into_bytes(&self, writer: &mut W) { self.stash.write_bytes(writer).expect("write failed") } } } @@ -183,7 +183,7 @@ use builder::ColumnBuilder; mod builder { use std::collections::VecDeque; - use columnar::bytes::{EncodeDecode, Indexed, stash::Stash}; + use columnar::bytes::{indexed, stash::Stash}; use super::Column; /// A container builder for `Column`. @@ -202,12 +202,12 @@ mod builder { fn push_into(&mut self, item: T) { self.current.push(item); // If there is less than 10% slop with 2MB backing allocations, mint a container. - let words = Indexed::length_in_words(&self.current.borrow()); + let words = indexed::length_in_words(&self.current.borrow()); let round = (words + ((1 << 18) - 1)) & !((1 << 18) - 1); if round - words < round / 10 { let mut alloc = Vec::with_capacity(round); - Indexed::encode(&mut alloc, &self.current.borrow()); - self.pending.push_back(Column { stash: Stash::Align(alloc.into_boxed_slice()) }); + indexed::encode(&mut alloc, &self.current.borrow()); + self.pending.push_back(Column { stash: Stash::Align(alloc.into_boxed_slice().into()) }); self.current.clear(); } }