Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions src/mito2/benches/memtable_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use mito2::memtable::bulk::part_reader::BulkPartBatchIter;
use mito2::memtable::bulk::{BulkMemtable, BulkMemtableConfig};
use mito2::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtable};
use mito2::memtable::time_series::TimeSeriesMemtable;
use mito2::memtable::{KeyValues, Memtable, RangesOptions};
use mito2::memtable::{IterBuilder, KeyValues, Memtable, RangesOptions};
use mito2::read::flat_merge::FlatMergeIterator;
use mito2::read::scan_region::PredicateGroup;
use mito2::region::options::MergeMode;
Expand Down Expand Up @@ -105,7 +105,11 @@ fn full_scan(c: &mut Criterion) {
}

b.iter(|| {
let iter = memtable.iter(None, None, None).unwrap();
let iter = memtable
.ranges(None, RangesOptions::default())
.unwrap()
.build(None)
.unwrap();
for batch in iter {
let _batch = batch.unwrap();
}
Expand Down Expand Up @@ -145,7 +149,17 @@ fn filter_1_host(c: &mut Criterion) {
let predicate = generator.random_host_filter();

b.iter(|| {
let iter = memtable.iter(None, Some(predicate.clone()), None).unwrap();
let iter = memtable
.ranges(
None,
RangesOptions {
predicate: PredicateGroup::new(&metadata, predicate.exprs()).unwrap(),
..Default::default()
},
)
.unwrap()
.build(None)
.unwrap();
for batch in iter {
let _batch = batch.unwrap();
}
Expand Down
8 changes: 6 additions & 2 deletions src/mito2/benches/simple_bulk_memtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use criterion::{Criterion, criterion_group, criterion_main};
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use mito2::memtable::simple_bulk_memtable::SimpleBulkMemtable;
use mito2::memtable::{KeyValues, Memtable, MemtableRanges, RangesOptions};
use mito2::memtable::{IterBuilder, KeyValues, Memtable, MemtableRanges, RangesOptions};
use mito2::read;
use mito2::read::Source;
use mito2::read::dedup::DedupReader;
Expand Down Expand Up @@ -156,7 +156,11 @@ async fn flush(mem: &SimpleBulkMemtable) {
}

async fn flush_original(mem: &SimpleBulkMemtable) {
let iter = mem.iter(None, None, None).unwrap();
let iter = mem
.ranges(None, RangesOptions::default())
.unwrap()
.build(None)
.unwrap();
for b in iter {
black_box(b.unwrap());
}
Expand Down
20 changes: 2 additions & 18 deletions src/mito2/src/memtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,10 +231,8 @@ impl MemtableRanges {

impl IterBuilder for MemtableRanges {
fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
UnsupportedOperationSnafu {
err_msg: "MemtableRanges does not support build iterator",
}
.fail()
assert_eq!(self.ranges.len(), 1);
self.ranges.values().next().unwrap().build_iter()
}

fn is_record_batch(&self) -> bool {
Expand All @@ -256,20 +254,6 @@ pub trait Memtable: Send + Sync + fmt::Debug {
/// Writes an encoded batch of into memtable.
fn write_bulk(&self, part: crate::memtable::bulk::part::BulkPart) -> Result<()>;

/// Scans the memtable.
/// `projection` selects columns to read, `None` means reading all columns.
/// `filters` are the predicates to be pushed down to memtable.
///
/// # Note
/// This method should only be used for tests.
#[cfg(any(test, feature = "test"))]
fn iter(
&self,
projection: Option<&[ColumnId]>,
predicate: Option<table::predicate::Predicate>,
sequence: Option<SequenceRange>,
) -> Result<BoxedBatchIterator>;

/// Returns the ranges in the memtable.
///
/// The returned map contains the range id and the range after applying the predicate.
Expand Down
10 changes: 0 additions & 10 deletions src/mito2/src/memtable/bulk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,16 +462,6 @@ impl Memtable for BulkMemtable {
Ok(())
}

#[cfg(any(test, feature = "test"))]
fn iter(
&self,
_projection: Option<&[ColumnId]>,
_predicate: Option<table::predicate::Predicate>,
_sequence: Option<SequenceRange>,
) -> Result<crate::memtable::BoxedBatchIterator> {
todo!()
}

fn ranges(
&self,
projection: Option<&[ColumnId]>,
Expand Down
85 changes: 33 additions & 52 deletions src/mito2/src/memtable/partition_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,16 +177,6 @@ impl Memtable for PartitionTreeMemtable {
.fail()
}

#[cfg(any(test, feature = "test"))]
fn iter(
&self,
projection: Option<&[ColumnId]>,
predicate: Option<Predicate>,
sequence: Option<SequenceRange>,
) -> Result<BoxedBatchIterator> {
self.tree.read(projection, predicate, sequence, None)
}

fn ranges(
&self,
projection: Option<&[ColumnId]>,
Expand Down Expand Up @@ -396,8 +386,6 @@ mod tests {
use api::v1::{Mutation, OpType, Rows, SemanticType};
use common_query::prelude::{greptime_timestamp, greptime_value};
use common_time::Timestamp;
use datafusion_common::Column;
use datafusion_expr::{BinaryExpr, Expr, Literal, Operator};
use datatypes::data_type::ConcreteDataType;
use datatypes::prelude::Vector;
use datatypes::scalars::ScalarVector;
Expand Down Expand Up @@ -548,7 +536,10 @@ mod tests {
let expect = (0..100).collect::<Vec<_>>();
let kvs = memtable_util::build_key_values(&metadata, "hello".to_string(), 10, &expect, 1);
memtable.write(&kvs).unwrap();
let iter = memtable.iter(Some(&[3]), None, None).unwrap();
let ranges = memtable
.ranges(Some(&[3]), RangesOptions::default())
.unwrap();
let iter = ranges.build(None).unwrap();

let mut v0_all = vec![];
for res in iter {
Expand Down Expand Up @@ -625,41 +616,6 @@ mod tests {
assert_eq!(expect, read);
}

#[test]
fn test_memtable_filter() {
let metadata = Arc::new(memtable_util::metadata_with_primary_key(vec![0, 1], false));
// Try to build a memtable via the builder.
let memtable = PartitionTreeMemtableBuilder::new(
PartitionTreeConfig {
index_max_keys_per_shard: 40,
..Default::default()
},
None,
)
.build(1, &metadata);

for i in 0..100 {
let timestamps: Vec<_> = (0..10).map(|v| i as i64 * 1000 + v).collect();
let kvs =
memtable_util::build_key_values(&metadata, "hello".to_string(), i, &timestamps, 1);
memtable.write(&kvs).unwrap();
}

for i in 0..100 {
let timestamps: Vec<_> = (0..10).map(|v| i as i64 * 1000 + v).collect();
let expr = Expr::BinaryExpr(BinaryExpr {
left: Box::new(Expr::Column(Column::from_name("k1"))),
op: Operator::Eq,
right: Box::new((i as u32).lit()),
});
let iter = memtable
.iter(None, Some(Predicate::new(vec![expr])), None)
.unwrap();
let read = collect_iter_timestamps(iter);
assert_eq!(timestamps, read);
}
}

#[test]
fn test_deserialize_config() {
let config = PartitionTreeConfig {
Expand Down Expand Up @@ -811,7 +767,11 @@ mod tests {
))
.unwrap();

let mut reader = new_memtable.iter(None, None, None).unwrap();
let mut reader = new_memtable
.ranges(None, RangesOptions::default())
.unwrap()
.build(None)
.unwrap();
let batch = reader.next().unwrap().unwrap();
let pk = codec.decode(batch.primary_key()).unwrap().into_dense();
if let Value::String(s) = &pk[2] {
Expand Down Expand Up @@ -916,7 +876,14 @@ mod tests {
.unwrap();
memtable.freeze().unwrap();
assert_eq!(
collect_kvs(memtable.iter(None, None, None).unwrap(), &metadata),
collect_kvs(
memtable
.ranges(None, RangesOptions::default())
.unwrap()
.build(None)
.unwrap(),
&metadata
),
('a'..'h').map(|c| (c.to_string(), c.to_string())).collect()
);
let forked = memtable.fork(2, &metadata);
Expand All @@ -925,7 +892,14 @@ mod tests {
forked.write(&key_values(&metadata, keys.iter())).unwrap();
forked.freeze().unwrap();
assert_eq!(
collect_kvs(forked.iter(None, None, None).unwrap(), &metadata),
collect_kvs(
forked
.ranges(None, RangesOptions::default())
.unwrap()
.build(None)
.unwrap(),
&metadata
),
keys.iter()
.map(|c| (c.to_string(), c.to_string()))
.collect()
Expand All @@ -936,7 +910,14 @@ mod tests {
let keys = ["g", "e", "a", "f", "b", "c", "h"];
forked2.write(&key_values(&metadata, keys.iter())).unwrap();

let kvs = collect_kvs(forked2.iter(None, None, None).unwrap(), &metadata);
let kvs = collect_kvs(
forked2
.ranges(None, RangesOptions::default())
.unwrap()
.build(None)
.unwrap(),
&metadata,
);
let expected = keys
.iter()
.map(|c| (c.to_string(), c.to_string()))
Expand Down
68 changes: 44 additions & 24 deletions src/mito2/src/memtable/simple_bulk_memtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,22 +213,6 @@ impl Memtable for SimpleBulkMemtable {
Ok(())
}

#[cfg(any(test, feature = "test"))]
fn iter(
&self,
projection: Option<&[ColumnId]>,
_predicate: Option<table::predicate::Predicate>,
sequence: Option<store_api::storage::SequenceRange>,
) -> error::Result<BoxedBatchIterator> {
let iter = self.create_iter(projection, sequence)?.build(None)?;
if self.merge_mode == MergeMode::LastNonNull {
let iter = LastNonNullIter::new(iter);
Ok(Box::new(iter))
} else {
Ok(Box::new(iter))
}
}

fn ranges(
&self,
projection: Option<&[ColumnId]>,
Expand Down Expand Up @@ -526,7 +510,11 @@ mod tests {
))
.unwrap();

let mut iter = memtable.iter(None, None, None).unwrap();
let mut iter = memtable
.ranges(None, RangesOptions::default())
.unwrap()
.build(None)
.unwrap();
let batch = iter.next().unwrap().unwrap();
assert_eq!(2, batch.num_rows());
assert_eq!(2, batch.fields().len());
Expand All @@ -551,7 +539,11 @@ mod tests {
))
.unwrap();

let mut iter = memtable.iter(None, None, None).unwrap();
let mut iter = memtable
.ranges(None, RangesOptions::default())
.unwrap()
.build(None)
.unwrap();
let batch = iter.next().unwrap().unwrap();
assert_eq!(1, batch.num_rows());
assert_eq!(2, batch.fields().len());
Expand All @@ -565,7 +557,11 @@ mod tests {

// Only project column 2 (f1)
let projection = vec![2];
let mut iter = memtable.iter(Some(&projection), None, None).unwrap();
let mut iter = memtable
.ranges(Some(&projection), RangesOptions::default())
.unwrap()
.build(None)
.unwrap();
let batch = iter.next().unwrap().unwrap();

assert_eq!(1, batch.num_rows());
Expand All @@ -592,7 +588,11 @@ mod tests {
OpType::Put,
))
.unwrap();
let mut iter = memtable.iter(None, None, None).unwrap();
let mut iter = memtable
.ranges(None, RangesOptions::default())
.unwrap()
.build(None)
.unwrap();
let batch = iter.next().unwrap().unwrap();

assert_eq!(1, batch.num_rows()); // deduped to 1 row
Expand All @@ -611,7 +611,11 @@ mod tests {
let kv = kvs.iter().next().unwrap();
memtable.write_one(kv).unwrap();

let mut iter = memtable.iter(None, None, None).unwrap();
let mut iter = memtable
.ranges(None, RangesOptions::default())
.unwrap()
.build(None)
.unwrap();
let batch = iter.next().unwrap().unwrap();
assert_eq!(1, batch.num_rows());
}
Expand Down Expand Up @@ -745,7 +749,11 @@ mod tests {
};
memtable.write_bulk(part).unwrap();

let mut iter = memtable.iter(None, None, None).unwrap();
let mut iter = memtable
.ranges(None, RangesOptions::default())
.unwrap()
.build(None)
.unwrap();
let batch = iter.next().unwrap().unwrap();
assert_eq!(2, batch.num_rows());

Expand All @@ -764,7 +772,11 @@ mod tests {
OpType::Put,
);
memtable.write(&kvs).unwrap();
let mut iter = memtable.iter(None, None, None).unwrap();
let mut iter = memtable
.ranges(None, RangesOptions::default())
.unwrap()
.build(None)
.unwrap();
let batch = iter.next().unwrap().unwrap();
assert_eq!(3, batch.num_rows());
assert_eq!(
Expand Down Expand Up @@ -854,7 +866,15 @@ mod tests {

// Filter with sequence 0 should only return first write
let mut iter = memtable
.iter(None, None, Some(SequenceRange::LtEq { max: 0 }))
.ranges(
None,
RangesOptions {
sequence: Some(SequenceRange::LtEq { max: 0 }),
..Default::default()
},
)
.unwrap()
.build(None)
.unwrap();
let batch = iter.next().unwrap().unwrap();
assert_eq!(1, batch.num_rows());
Expand Down
Loading
Loading