Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 135 additions & 0 deletions src/mito2/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ use crate::cache::write_cache::WriteCacheRef;
use crate::memtable::record_batch_estimated_size;
use crate::metrics::{CACHE_BYTES, CACHE_EVICTION, CACHE_HIT, CACHE_MISS};
use crate::read::Batch;
use crate::read::range_cache::{RangeScanCacheKey, RangeScanCacheValue};
use crate::sst::file::{RegionFileId, RegionIndexId};
use crate::sst::parquet::reader::MetadataCacheMetrics;

Expand All @@ -64,6 +65,8 @@ const FILE_TYPE: &str = "file";
const INDEX_TYPE: &str = "index";
/// Metrics type key for selector result cache.
const SELECTOR_RESULT_TYPE: &str = "selector_result";
/// Metrics type key for range scan result cache.
const RANGE_RESULT_TYPE: &str = "range_result";

/// Cache strategies that may only enable a subset of caches.
#[derive(Clone)]
Expand Down Expand Up @@ -223,6 +226,32 @@ impl CacheStrategy {
}
}

/// Calls [CacheManager::get_range_result()].
/// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
#[cfg_attr(not(test), allow(dead_code))]
pub(crate) fn get_range_result(
&self,
key: &RangeScanCacheKey,
) -> Option<Arc<RangeScanCacheValue>> {
match self {
CacheStrategy::EnableAll(cache_manager) => cache_manager.get_range_result(key),
CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
}
}

/// Calls [CacheManager::put_range_result()].
/// It does nothing if the strategy isn't [CacheStrategy::EnableAll].
#[cfg_attr(not(test), allow(dead_code))]
pub(crate) fn put_range_result(
&self,
key: RangeScanCacheKey,
result: Arc<RangeScanCacheValue>,
) {
if let CacheStrategy::EnableAll(cache_manager) = self {
cache_manager.put_range_result(key, result);
}
}

/// Calls [CacheManager::write_cache()].
/// It returns None if the strategy is [CacheStrategy::Disabled].
pub fn write_cache(&self) -> Option<&WriteCacheRef> {
Expand Down Expand Up @@ -324,6 +353,9 @@ pub struct CacheManager {
puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
/// Cache for time series selectors.
selector_result_cache: Option<SelectorResultCache>,
/// Cache for range scan outputs in flat format.
#[cfg_attr(not(test), allow(dead_code))]
range_result_cache: Option<RangeResultCache>,
/// Cache for index result.
index_result_cache: Option<IndexResultCache>,
}
Expand Down Expand Up @@ -512,6 +544,32 @@ impl CacheManager {
}
}

/// Gets cached result for range scan.
#[cfg_attr(not(test), allow(dead_code))]
pub(crate) fn get_range_result(
&self,
key: &RangeScanCacheKey,
) -> Option<Arc<RangeScanCacheValue>> {
self.range_result_cache
.as_ref()
.and_then(|cache| update_hit_miss(cache.get(key), RANGE_RESULT_TYPE))
}

/// Puts range scan result into the cache.
#[cfg_attr(not(test), allow(dead_code))]
pub(crate) fn put_range_result(
&self,
key: RangeScanCacheKey,
result: Arc<RangeScanCacheValue>,
) {
if let Some(cache) = &self.range_result_cache {
CACHE_BYTES
.with_label_values(&[RANGE_RESULT_TYPE])
.add(range_result_cache_weight(&key, &result).into());
cache.insert(key, result);
}
}

/// Gets the write cache.
pub(crate) fn write_cache(&self) -> Option<&WriteCacheRef> {
self.write_cache.as_ref()
Expand Down Expand Up @@ -562,6 +620,7 @@ pub struct CacheManagerBuilder {
puffin_metadata_size: u64,
write_cache: Option<WriteCacheRef>,
selector_result_cache_size: u64,
range_result_cache_size: u64,
}

impl CacheManagerBuilder {
Expand Down Expand Up @@ -625,6 +684,12 @@ impl CacheManagerBuilder {
self
}

/// Sets range result cache size.
pub fn range_result_cache_size(mut self, bytes: u64) -> Self {
self.range_result_cache_size = bytes;
self
}

/// Builds the [CacheManager].
pub fn build(self) -> CacheManager {
fn to_str(cause: RemovalCause) -> &'static str {
Expand Down Expand Up @@ -712,6 +777,21 @@ impl CacheManagerBuilder {
})
.build()
});
let range_result_cache = (self.range_result_cache_size != 0).then(|| {
Cache::builder()
.max_capacity(self.range_result_cache_size)
.weigher(range_result_cache_weight)
.eviction_listener(|k, v, cause| {
let size = range_result_cache_weight(&k, &v);
CACHE_BYTES
.with_label_values(&[RANGE_RESULT_TYPE])
.sub(size.into());
CACHE_EVICTION
.with_label_values(&[RANGE_RESULT_TYPE, to_str(cause)])
.inc();
})
.build()
});
CacheManager {
sst_meta_cache,
vector_cache,
Expand All @@ -723,6 +803,7 @@ impl CacheManagerBuilder {
vector_index_cache,
puffin_metadata_cache: Some(Arc::new(puffin_metadata_cache)),
selector_result_cache,
range_result_cache,
index_result_cache,
}
}
Expand All @@ -746,6 +827,10 @@ fn selector_result_cache_weight(k: &SelectorResultKey, v: &Arc<SelectorResultVal
(mem::size_of_val(k) + v.estimated_size()) as u32
}

fn range_result_cache_weight(k: &RangeScanCacheKey, v: &Arc<RangeScanCacheValue>) -> u32 {
(k.estimated_size() + v.estimated_size()) as u32
}

/// Updates cache hit/miss metrics.
fn update_hit_miss<T>(value: Option<T>, cache_type: &str) -> Option<T> {
if value.is_some() {
Expand Down Expand Up @@ -902,6 +987,8 @@ type VectorCache = Cache<(ConcreteDataType, Value), VectorRef>;
type PageCache = Cache<PageKey, Arc<PageValue>>;
/// Maps (file id, row group id, time series row selector) to [SelectorResultValue].
type SelectorResultCache = Cache<SelectorResultKey, Arc<SelectorResultValue>>;
/// Maps partition-range scan key to cached flat batches.
type RangeResultCache = Cache<RangeScanCacheKey, Arc<RangeScanCacheValue>>;

#[cfg(test)]
mod tests {
Expand All @@ -916,6 +1003,9 @@ mod tests {
use crate::cache::index::bloom_filter_index::Tag;
use crate::cache::index::result_cache::PredicateKey;
use crate::cache::test_util::parquet_meta;
use crate::read::range_cache::{
RangeScanCacheKey, RangeScanCacheValue, ScanRequestFingerprintBuilder,
};
use crate::sst::parquet::row_selection::RowGroupSelection;

#[tokio::test]
Expand Down Expand Up @@ -1028,6 +1118,51 @@ mod tests {
assert!(cache.get_selector_result(&key).is_some());
}

#[test]
fn test_range_result_cache() {
let cache = Arc::new(
CacheManager::builder()
.range_result_cache_size(1024 * 1024)
.build(),
);

let key = RangeScanCacheKey {
region_id: RegionId::new(1, 1),
row_groups: vec![(FileId::random(), 0)],
scan: ScanRequestFingerprintBuilder {
read_column_ids: vec![],
read_column_types: vec![],
filters: vec!["tag_0 = 1".to_string()],
time_filters: vec![],
series_row_selector: None,
distribution: None,
append_mode: false,
filter_deleted: true,
merge_mode: crate::region::options::MergeMode::LastRow,
partition_expr_version: 0,
}
.build(),
};
let value = Arc::new(RangeScanCacheValue::new(Vec::new()));

assert!(cache.get_range_result(&key).is_none());
cache.put_range_result(key.clone(), value.clone());
assert!(cache.get_range_result(&key).is_some());

let enable_all = CacheStrategy::EnableAll(cache.clone());
assert!(enable_all.get_range_result(&key).is_some());

let compaction = CacheStrategy::Compaction(cache.clone());
assert!(compaction.get_range_result(&key).is_none());
compaction.put_range_result(key.clone(), value.clone());
assert!(cache.get_range_result(&key).is_some());

let disabled = CacheStrategy::Disabled;
assert!(disabled.get_range_result(&key).is_none());
disabled.put_range_result(key.clone(), value);
assert!(cache.get_range_result(&key).is_some());
}

#[tokio::test]
async fn test_evict_puffin_cache_clears_all_entries() {
use std::collections::{BTreeMap, HashMap};
Expand Down
1 change: 1 addition & 0 deletions src/mito2/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub mod projection;
pub(crate) mod prune;
pub(crate) mod pruner;
pub mod range;
pub(crate) mod range_cache;
pub mod scan_region;
pub mod scan_util;
pub(crate) mod seq_scan;
Expand Down
Loading
Loading