Skip to content

Commit e143cba

Browse files
jackye1995claudehappy-otter
authored
feat: allow setting transaction properties in various operations (#6246)
Allow callers to be able to set transaction properties in various operations including index optimization, creation and data compaction. --------- Co-authored-by: Claude <noreply@anthropic.com> Co-authored-by: Happy <yesreply@happy.engineering>
1 parent 63d923b commit e143cba

File tree

4 files changed

+62
-13
lines changed

4 files changed

+62
-13
lines changed

rust/lance-index/src/optimize.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
// SPDX-License-Identifier: Apache-2.0
22
// SPDX-FileCopyrightText: Copyright The Lance Authors
33

4+
use std::collections::HashMap;
5+
use std::sync::Arc;
6+
47
/// Options for optimizing all indices.
58
#[non_exhaustive]
69
#[derive(Debug, Clone, Default)]
@@ -33,6 +36,13 @@ pub struct OptimizeOptions {
3336
///
3437
/// NOTE: this option is only supported for v3 vector indices.
3538
pub retrain: bool,
39+
40+
/// Transaction properties to store with this commit.
41+
///
42+
/// These key-value pairs are stored in the transaction file
43+
/// and can be read later to identify the source of the commit
44+
/// (e.g., job_id for tracking completed index jobs).
45+
pub transaction_properties: Option<Arc<HashMap<String, String>>>,
3646
}
3747

3848
impl OptimizeOptions {
@@ -61,6 +71,7 @@ impl OptimizeOptions {
6171
num_indices_to_merge: None,
6272
index_names: None,
6373
retrain: true,
74+
..Default::default()
6475
}
6576
}
6677

@@ -73,4 +84,10 @@ impl OptimizeOptions {
7384
self.index_names = Some(names);
7485
self
7586
}
87+
88+
/// Set transaction properties to store in the commit manifest.
89+
pub fn transaction_properties(mut self, properties: HashMap<String, String>) -> Self {
90+
self.transaction_properties = Some(Arc::new(properties));
91+
self
92+
}
7693
}

rust/lance/src/dataset/optimize.rs

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,9 @@ use std::sync::Arc;
8989
use super::fragment::FileFragment;
9090
use super::index::DatasetIndexRemapperOptions;
9191
use super::rowids::load_row_id_sequences;
92-
use super::transaction::{Operation, RewriteGroup, RewrittenIndex, Transaction};
92+
use super::transaction::{
93+
Operation, RewriteGroup, RewrittenIndex, Transaction, TransactionBuilder,
94+
};
9395
use super::utils::make_rowid_capture_stream;
9496
use super::{WriteMode, WriteParams, write_fragments_internal};
9597
use crate::Dataset;
@@ -207,6 +209,13 @@ pub struct CompactionOptions {
207209
/// fragments at a time).
208210
/// Defaults to `None` (no limit, all eligible fragments are compacted).
209211
pub max_source_fragments: Option<usize>,
212+
/// Transaction properties to store with this commit.
213+
///
214+
/// These key-value pairs are stored in the transaction file
215+
/// and can be read later to identify the source of the commit
216+
/// (e.g., job_id for tracking completed compaction jobs).
217+
#[serde(skip)]
218+
pub transaction_properties: Option<Arc<HashMap<String, String>>>,
210219
}
211220

212221
#[allow(deprecated)]
@@ -227,6 +236,7 @@ impl Default for CompactionOptions {
227236
enable_binary_copy_force: false,
228237
binary_copy_read_batch_bytes: Some(16 * 1024 * 1024),
229238
max_source_fragments: None,
239+
transaction_properties: None,
230240
}
231241
}
232242
}
@@ -378,6 +388,12 @@ impl CompactionOptions {
378388
_ => CompactionMode::Reencode,
379389
}
380390
}
391+
392+
/// Set transaction properties to store in the commit manifest.
393+
pub fn transaction_properties(mut self, properties: HashMap<String, String>) -> Self {
394+
self.transaction_properties = Some(Arc::new(properties));
395+
self
396+
}
381397
}
382398

383399
/// Determine if page-level binary copy can safely merge the provided fragments.
@@ -1569,15 +1585,16 @@ pub async fn commit_compaction(
15691585
None
15701586
};
15711587

1572-
let transaction = Transaction::new(
1588+
let transaction = TransactionBuilder::new(
15731589
dataset.manifest.version,
15741590
Operation::Rewrite {
15751591
groups: rewrite_groups,
15761592
rewritten_indices,
15771593
frag_reuse_index,
15781594
},
1579-
None,
1580-
);
1595+
)
1596+
.transaction_properties(options.transaction_properties.clone())
1597+
.build();
15811598

15821599
dataset
15831600
.apply_commit(transaction, &Default::default(), &Default::default())

rust/lance/src/index.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ use self::vector::remap_vector_index;
7878
use crate::dataset::index::LanceIndexStoreExt;
7979
use crate::dataset::optimize::RemappedIndex;
8080
use crate::dataset::optimize::remapping::RemapResult;
81-
use crate::dataset::transaction::{Operation, Transaction};
81+
use crate::dataset::transaction::{Operation, Transaction, TransactionBuilder};
8282
use crate::index::frag_reuse::{load_frag_reuse_index_details, open_frag_reuse_index};
8383
use crate::index::mem_wal::open_mem_wal_index;
8484
pub use crate::index::prefilter::{FilterLoader, PreFilter};
@@ -950,14 +950,15 @@ impl DatasetIndexExt for Dataset {
950950
return Ok(());
951951
}
952952

953-
let transaction = Transaction::new(
953+
let transaction = TransactionBuilder::new(
954954
self.manifest.version,
955955
Operation::CreateIndex {
956956
new_indices,
957957
removed_indices,
958958
},
959-
None,
960-
);
959+
)
960+
.transaction_properties(options.transaction_properties.clone())
961+
.build();
961962

962963
self.apply_commit(transaction, &Default::default(), &Default::default())
963964
.await?;

rust/lance/src/index/create.rs

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use crate::{
55
Error, Result,
66
dataset::{
77
Dataset,
8-
transaction::{Operation, Transaction},
8+
transaction::{Operation, TransactionBuilder},
99
},
1010
index::{
1111
DatasetIndexExt, DatasetIndexInternalExt,
@@ -26,7 +26,7 @@ use lance_index::{
2626
scalar::{LANCE_SCALAR_INDEX, ScalarIndexParams, inverted::tokenizer::InvertedIndexParams},
2727
};
2828
use lance_table::format::{IndexMetadata, list_index_files_with_sizes};
29-
use std::{future::IntoFuture, sync::Arc};
29+
use std::{collections::HashMap, future::IntoFuture, sync::Arc};
3030
use tracing::instrument;
3131
use uuid::Uuid;
3232

@@ -56,6 +56,8 @@ pub struct CreateIndexBuilder<'a> {
5656
index_uuid: Option<String>,
5757
preprocessed_data: Option<Box<dyn RecordBatchReader + Send + 'static>>,
5858
progress: Arc<dyn IndexBuildProgress>,
59+
/// Transaction properties to store with this commit.
60+
transaction_properties: Option<Arc<HashMap<String, String>>>,
5961
}
6062

6163
impl<'a> CreateIndexBuilder<'a> {
@@ -77,6 +79,7 @@ impl<'a> CreateIndexBuilder<'a> {
7779
index_uuid: None,
7880
preprocessed_data: None,
7981
progress: Arc::new(NoopIndexBuildProgress),
82+
transaction_properties: None,
8083
}
8184
}
8285

@@ -118,6 +121,16 @@ impl<'a> CreateIndexBuilder<'a> {
118121
self
119122
}
120123

124+
/// Set transaction properties to store with this commit.
125+
///
126+
/// These key-value pairs are stored in the transaction file
127+
/// and can be read later to identify the source of the commit
128+
/// (e.g., job_id for tracking completed index jobs).
129+
pub fn transaction_properties(mut self, properties: HashMap<String, String>) -> Self {
130+
self.transaction_properties = Some(Arc::new(properties));
131+
self
132+
}
133+
121134
#[instrument(skip_all)]
122135
pub async fn execute_uncommitted(&mut self) -> Result<IndexMetadata> {
123136
if self.columns.len() != 1 {
@@ -456,14 +469,15 @@ impl<'a> CreateIndexBuilder<'a> {
456469
} else {
457470
vec![]
458471
};
459-
let transaction = Transaction::new(
472+
let transaction = TransactionBuilder::new(
460473
new_idx.dataset_version,
461474
Operation::CreateIndex {
462475
new_indices: vec![new_idx],
463476
removed_indices,
464477
},
465-
None,
466-
);
478+
)
479+
.transaction_properties(self.transaction_properties.clone())
480+
.build();
467481

468482
self.dataset
469483
.apply_commit(transaction, &Default::default(), &Default::default())

0 commit comments

Comments
 (0)