Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ edition = "2021"
rust-version = "1.86"

[workspace.dependencies]
columnar = "0.11"
columnar = "0.12"

[workspace.lints.clippy]
type_complexity = "allow"
Expand Down
26 changes: 13 additions & 13 deletions timely/examples/columnar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ fn main() {
let mut session = output.session(&time);
for data in data {
for wordcount in data.borrow().into_index_iter().flat_map(|wordcount| {
wordcount.text.split_whitespace().map(move |text| WordCountReference { text, diff: wordcount.diff })
wordcount.text.split(|b| b.is_ascii_whitespace()).filter(|s| !s.is_empty()).map(move |text| WordCountReference { text, diff: wordcount.diff })
}) {
session.give(wordcount);
}
Expand All @@ -59,7 +59,7 @@ fn main() {
)
.container::<Container>()
.unary_frontier(
ExchangeCore::<ColumnBuilder<InnerContainer>,_>::new_core(|x: &WordCountReference<&str,&i64>| x.text.len() as u64),
ExchangeCore::<ColumnBuilder<InnerContainer>,_>::new_core(|x: &WordCountReference<&[u8],&i64>| x.text.len() as u64),
"WordCount",
|_capability, _info| {
let mut queues = HashMap::new();
Expand All @@ -85,7 +85,7 @@ fn main() {
*count
}
else {
counts.insert(wordcount.text.to_string(), *wordcount.diff);
counts.insert(wordcount.text.to_vec(), *wordcount.diff);
*wordcount.diff
};
session.give(WordCountReference { text: wordcount.text, diff: total });
Expand All @@ -104,7 +104,7 @@ fn main() {
Ok((time, data)) => {
println!("seen at: {:?}\t{:?} records", time, data.record_count());
for wc in data.borrow().into_index_iter() {
println!(" {}: {}", wc.text, wc.diff);
println!(" {}: {}", std::str::from_utf8(wc.text).unwrap_or("<invalid utf8>"), wc.diff);
}
},
Err(frontier) => println!("frontier advanced to {:?}", frontier),
Expand Down Expand Up @@ -135,7 +135,7 @@ mod container {
pub struct Column<C> { pub stash: Stash<C, timely_bytes::arc::Bytes> }

use columnar::{Len, Index};
use columnar::bytes::{EncodeDecode, Indexed};
use columnar::bytes::indexed;
use columnar::common::IterOwn;

impl<C: columnar::ContainerBytes> Column<C> {
Expand All @@ -157,7 +157,7 @@ mod container {
fn at_capacity(&self) -> bool {
match &self.stash {
Stash::Typed(t) => {
let length_in_bytes = 8 * Indexed::length_in_words(&t.borrow());
let length_in_bytes = 8 * indexed::length_in_words(&t.borrow());
length_in_bytes >= (1 << 20)
},
Stash::Bytes(_) => true,
Expand All @@ -167,14 +167,14 @@ mod container {
fn ensure_capacity(&mut self, _stash: &mut Option<Self>) { }
}

impl<C: columnar::Container, T> timely::container::PushInto<T> for Column<C> where C: columnar::Push<T> {
impl<C: columnar::Container + columnar::ContainerBytes, T> timely::container::PushInto<T> for Column<C> where C: columnar::Push<T> {
#[inline] fn push_into(&mut self, item: T) { use columnar::Push; self.stash.push(item) }
}

impl<C: columnar::ContainerBytes> timely::dataflow::channels::ContainerBytes for Column<C> {
fn from_bytes(bytes: timely::bytes::arc::Bytes) -> Self { Self { stash: bytes.into() } }
fn from_bytes(bytes: timely::bytes::arc::Bytes) -> Self { Self { stash: Stash::try_from_bytes(bytes).expect("valid columnar data") } }
fn length_in_bytes(&self) -> usize { self.stash.length_in_bytes() }
fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) { self.stash.into_bytes(writer) }
fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) { self.stash.write_bytes(writer).expect("write failed") }
}
}

Expand All @@ -183,7 +183,7 @@ use builder::ColumnBuilder;
mod builder {

use std::collections::VecDeque;
use columnar::bytes::{EncodeDecode, Indexed, stash::Stash};
use columnar::bytes::{indexed, stash::Stash};
use super::Column;

/// A container builder for `Column<C>`.
Expand All @@ -202,12 +202,12 @@ mod builder {
fn push_into(&mut self, item: T) {
self.current.push(item);
// If there is less than 10% slop with 2MB backing allocations, mint a container.
let words = Indexed::length_in_words(&self.current.borrow());
let words = indexed::length_in_words(&self.current.borrow());
let round = (words + ((1 << 18) - 1)) & !((1 << 18) - 1);
if round - words < round / 10 {
let mut alloc = Vec::with_capacity(round);
Indexed::encode(&mut alloc, &self.current.borrow());
self.pending.push_back(Column { stash: Stash::Align(alloc.into_boxed_slice()) });
indexed::encode(&mut alloc, &self.current.borrow());
self.pending.push_back(Column { stash: Stash::Align(alloc.into_boxed_slice().into()) });
self.current.clear();
}
}
Expand Down
Loading