Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 77 additions & 46 deletions src/mito2/src/memtable/bulk/part.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ use parquet::basic::{Compression, ZstdLevel};
use parquet::data_type::AsBytes;
use parquet::file::metadata::ParquetMetaData;
use parquet::file::properties::WriterProperties;
use smallvec::SmallVec;
use snafu::{OptionExt, ResultExt, Snafu};
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
Expand Down Expand Up @@ -813,6 +814,60 @@ pub fn convert_bulk_part(
.map(|(idx, field)| (field.name().as_str(), idx))
.collect();

// Pre-compute column indices.
// For sparse encoding, primary key columns are not in the input schema (already encoded)
let pk_col_indices = if !is_sparse {
Comment on lines +817 to +819
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a benchmark for it to see the how much is the improvement?

region_metadata
.primary_key_columns()
.map(|col_meta| {
column_indices
.get(col_meta.column_schema.name.as_str())
.copied()
.context(ColumnNotFoundSnafu {
column: &col_meta.column_schema.name,
})
})
.collect::<Result<Vec<_>>>()
} else {
Ok(Vec::new()) // Empty for sparse encoding
}?;

let field_col_indices = region_metadata
.field_columns()
.map(|col_meta| {
column_indices
.get(col_meta.column_schema.name.as_str())
.copied()
.context(ColumnNotFoundSnafu {
column: &col_meta.column_schema.name,
})
})
.collect::<Result<Vec<_>>>()?;

let ts_col_idx = *column_indices
.get(
region_metadata
.time_index_column()
.column_schema
.name
.as_str(),
)
.context(ColumnNotFoundSnafu {
column: &region_metadata.time_index_column().column_schema.name,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: with_context

time_index_column has one hash loop up inside it

})?;

let pk_dict_col_idx = if is_sparse {
Some(
*column_indices
.get(PRIMARY_KEY_COLUMN_NAME)
.context(ColumnNotFoundSnafu {
column: PRIMARY_KEY_COLUMN_NAME,
})?,
)
} else {
None
};

// Determines the structure of the input batch by looking up columns by name
let mut output_columns = Vec::new();

Expand All @@ -823,32 +878,32 @@ pub fn convert_bulk_part(
None
} else {
// For dense encoding, extract and encode primary key columns by name
let pk_vectors: Result<Vec<_>> = region_metadata
.primary_key_columns()
.map(|col_meta| {
let col_idx = column_indices
.get(col_meta.column_schema.name.as_str())
.context(ColumnNotFoundSnafu {
column: &col_meta.column_schema.name,
})?;
let col = part.batch.column(*col_idx);
let pk_vectors: Result<Vec<_>> = pk_col_indices
.iter()
.map(|&col_idx| {
let col = part.batch.column(col_idx);
Helper::try_into_vector(col).context(error::ComputeVectorSnafu)
})
.collect();
let pk_vectors = pk_vectors?;

// Pre-compute the (column_id, vector) pairs to avoid repeated zip operations
let pk_col_id_vector_pairs: Vec<_> = region_metadata
.primary_key
.iter()
.zip(pk_vectors.iter())
.collect();

let mut key_array_builder = PrimaryKeyArrayBuilder::new();
let mut encode_buf = Vec::new();

for row_idx in 0..num_rows {
encode_buf.clear();

// Collects primary key values with column IDs for this row
let pk_values_with_ids: Vec<_> = region_metadata
.primary_key
// Use SmallVec to avoid heap allocation for small primary keys (<=16 columns)
let pk_values_with_ids: SmallVec<[_; 16]> = pk_col_id_vector_pairs
.iter()
.zip(pk_vectors.iter())
.map(|(col_id, vector)| (*col_id, vector.get_ref(row_idx)))
.map(|(col_id, vector)| (**col_id, vector.get_ref(row_idx)))
.collect();

// Encodes the primary key
Expand All @@ -866,12 +921,10 @@ pub fn convert_bulk_part(

// Adds primary key columns if storing them (only for dense encoding)
if store_primary_key_columns && !is_sparse {
for col_meta in region_metadata.primary_key_columns() {
let col_idx = column_indices
.get(col_meta.column_schema.name.as_str())
.context(ColumnNotFoundSnafu {
column: &col_meta.column_schema.name,
})?;
for (col_idx, col_meta) in pk_col_indices
.iter()
.zip(region_metadata.primary_key_columns())
{
let col = part.batch.column(*col_idx);

// Converts to dictionary if needed for string types
Expand All @@ -889,41 +942,19 @@ pub fn convert_bulk_part(
}

// Adds field columns
for col_meta in region_metadata.field_columns() {
let col_idx = column_indices
.get(col_meta.column_schema.name.as_str())
.context(ColumnNotFoundSnafu {
column: &col_meta.column_schema.name,
})?;
output_columns.push(part.batch.column(*col_idx).clone());
for &col_idx in &field_col_indices {
output_columns.push(part.batch.column(col_idx).clone());
}

// Adds timestamp column
let new_timestamp_index = output_columns.len();
let ts_col_idx = column_indices
.get(
region_metadata
.time_index_column()
.column_schema
.name
.as_str(),
)
.context(ColumnNotFoundSnafu {
column: &region_metadata.time_index_column().column_schema.name,
})?;
output_columns.push(part.batch.column(*ts_col_idx).clone());
output_columns.push(part.batch.column(ts_col_idx).clone());

// Adds encoded primary key dictionary column
let pk_dictionary = if let Some(pk_dict_array) = pk_array {
Arc::new(pk_dict_array) as ArrayRef
} else {
let pk_col_idx =
column_indices
.get(PRIMARY_KEY_COLUMN_NAME)
.context(ColumnNotFoundSnafu {
column: PRIMARY_KEY_COLUMN_NAME,
})?;
let col = part.batch.column(*pk_col_idx);
let col = part.batch.column(pk_dict_col_idx.unwrap());

// Casts to dictionary type if needed
let target_type = ArrowDataType::Dictionary(
Expand Down