Skip to content
Draft
2 changes: 1 addition & 1 deletion scripts/fetch-dashboard-assets.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ function retry_fetch() {
local url=$1
local filename=$2

curl --connect-timeout 10 --retry 3 -fsSL $url --output $filename || {
curl --connect-timeout 30 --max-time 60 --retry 3 -fsSL $url --output $filename || {
Copy link

Copilot AI Jan 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change increases curl timeouts from 10s connection timeout with 3 retries to 30s connection timeout, 60s max time, with 3 retries. While this may improve reliability, this change appears unrelated to the skip_wal feature. Consider moving this to a separate PR or explaining its relevance to this feature in the PR description.

Copilot uses AI. Check for mistakes.
echo "Failed to download $url"
echo "You may try to set http_proxy and https_proxy environment variables."
if [[ -z "$GITHUB_PROXY_URL" ]]; then
Expand Down
1 change: 1 addition & 0 deletions src/common/meta/src/ddl/alter_logical_tables/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ impl<'a> AlterLogicalTablesExecutor<'a> {
current_table_info_value,
Some(region_distribution),
new_raw_table_info,
None,
)
.await?;

Expand Down
30 changes: 30 additions & 0 deletions src/common/meta/src/ddl/alter_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,13 +283,43 @@ impl AlterTableProcedure {
};

// Safety: region distribution is set in `submit_alter_region_requests`.
// Check if skip_wal changed and update WAL options if needed
let current_skip_wal = table_info_value.table_info.meta.options.skip_wal;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer not to reallocate the WAL options, since a region might start writing to a different Kafka topic after reallocation, which could cause problems.

let new_skip_wal = new_info.meta.options.skip_wal;
let new_region_wal_options =
if current_skip_wal != new_skip_wal && self.data.region_distribution.is_some() {
// Get region numbers from region_distribution
let region_distribution = self.data.region_distribution.as_ref().unwrap();
let region_numbers: Vec<_> = region_distribution
.values()
.flat_map(|region_role_set| {
region_role_set
.leader_regions
.iter()
.chain(region_role_set.follower_regions.iter())
.copied()
})
.collect();
// Allocate new WAL options based on skip_wal
Some(
self.context
.table_metadata_allocator
.wal_options_allocator()
.allocate(&region_numbers, new_skip_wal)
.await?,
)
} else {
None
};

self.executor
.on_alter_metadata(
&self.context.table_metadata_manager,
table_info_value,
self.data.region_distribution.as_ref(),
new_info.into(),
&self.data.column_metadatas,
new_region_wal_options,
)
.await?;

Expand Down
4 changes: 4 additions & 0 deletions src/common/meta/src/ddl/alter_table/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ impl AlterTableExecutor {
region_distribution: Option<&RegionDistribution>,
mut raw_table_info: RawTableInfo,
column_metadatas: &[ColumnMetadata],
new_region_wal_options: Option<
std::collections::HashMap<store_api::storage::RegionNumber, String>,
>,
) -> Result<()> {
let table_ref = self.table.table_ref();
let table_id = self.table_id;
Expand Down Expand Up @@ -155,6 +158,7 @@ impl AlterTableExecutor {
current_table_info_value,
region_distribution.cloned(),
raw_table_info,
new_region_wal_options,
)
.await?;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl CreateLogicalTablesProcedure {
// Update physical table's metadata and we don't need to touch per-region settings.
self.context
.table_metadata_manager
.update_table_info(&physical_table_info, None, new_table_info)
.update_table_info(&physical_table_info, None, new_table_info, None)
.await?;

// Invalid physical table cache
Expand Down
26 changes: 22 additions & 4 deletions src/common/meta/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1119,6 +1119,9 @@ impl TableMetadataManager {
current_table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
region_distribution: Option<RegionDistribution>,
new_table_info: RawTableInfo,
new_region_wal_options: Option<
std::collections::HashMap<store_api::storage::RegionNumber, String>,
>,
) -> Result<()> {
let table_id = current_table_info_value.table_info.ident.table_id;
let new_table_info_value = current_table_info_value.update(new_table_info);
Expand All @@ -1133,7 +1136,12 @@ impl TableMetadataManager {
let new_region_options = new_table_info_value.table_info.to_region_options();
let update_datanode_table_options_txn = self
.datanode_table_manager
.build_update_table_options_txn(table_id, region_distribution, new_region_options)
.build_update_table_options_txn(
table_id,
region_distribution,
new_region_options,
new_region_wal_options,
)
.await?;
Txn::merge_all([update_table_info_txn, update_datanode_table_options_txn])
} else {
Expand Down Expand Up @@ -1993,12 +2001,22 @@ mod tests {
DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info.clone()));
// should be ok.
table_metadata_manager
.update_table_info(&current_table_info_value, None, new_table_info.clone())
.update_table_info(
&current_table_info_value,
None,
new_table_info.clone(),
None,
)
.await
.unwrap();
// if table info was updated, it should be ok.
table_metadata_manager
.update_table_info(&current_table_info_value, None, new_table_info.clone())
.update_table_info(
&current_table_info_value,
None,
new_table_info.clone(),
None,
)
.await
.unwrap();

Expand All @@ -2021,7 +2039,7 @@ mod tests {
// The ABA problem.
assert!(
table_metadata_manager
.update_table_info(&wrong_table_info_value, None, new_table_info)
.update_table_info(&wrong_table_info_value, None, new_table_info, None)
.await
.is_err()
)
Expand Down
24 changes: 20 additions & 4 deletions src/common/meta/src/key/datanode_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ impl DatanodeTableManager {
table_id: TableId,
region_distribution: RegionDistribution,
new_region_options: HashMap<String, String>,
new_region_wal_options: Option<HashMap<RegionNumber, String>>,
) -> Result<Txn> {
assert!(!region_distribution.is_empty());
// safety: region_distribution must not be empty
Expand All @@ -284,12 +285,27 @@ impl DatanodeTableManager {
.and_then(|r| DatanodeTableValue::try_from_raw_value(&r.value))?
.region_info;

// If the region options are the same, we don't need to update it.
if region_info.region_options == new_region_options {
// If the region options are the same and WAL options are not being updated, we don't need to update it.
let need_update_options = region_info.region_options != new_region_options;
let need_update_wal_options = if let Some(ref new_wal_options) = new_region_wal_options {
region_info.region_wal_options != *new_wal_options
} else {
false
};

if !need_update_options && !need_update_wal_options {
return Ok(Txn::new());
}
// substitute region options only.
region_info.region_options = new_region_options;

// substitute region options.
if need_update_options {
region_info.region_options = new_region_options;
}

// substitute region WAL options if provided.
if let Some(new_wal_options) = new_region_wal_options {
region_info.region_wal_options = new_wal_options;
}

let mut txns = Vec::with_capacity(region_distribution.len());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ impl State for UpdateTableInfo {
current_table_info_value,
Some(region_distribution),
new_table_info,
None,
)
.await?;

Expand Down
26 changes: 26 additions & 0 deletions src/mito2/src/worker/handle_alter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::sync::Arc;
use common_base::readable_size::ReadableSize;
use common_telemetry::info;
use common_telemetry::tracing::warn;
use common_wal::options::WalOptions;
use humantime_serde::re::humantime;
use snafu::{ResultExt, ensure};
use store_api::logstore::LogStore;
Expand Down Expand Up @@ -229,6 +230,24 @@ impl<S: LogStore> RegionWorkerLoop<S> {
);
}
}
SetRegionOption::SkipWal(skip_wal) => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can’t set the WAL options to RaftEngine; they could be Kafka. It would be better to let the MetaSrv send the detailed configuration.

info!(
"Update region skip_wal: {}, previous: {:?} new: {}",
region.region_id, current_options.wal_options, skip_wal
);
if skip_wal {
// Disable WAL by setting to Noop
current_options.wal_options = WalOptions::Noop;
} else {
// Enable WAL: restore to default (RaftEngine)
// TODO: In distributed mode, this should be allocated by metasrv,
// but for simplicity, we use RaftEngine as default here.
// The actual WAL options will be persisted in metasrv.
// We should read the correct WAL options from DatanodeTableValue
// or pass them through the AlterRegionRequest.
current_options.wal_options = WalOptions::RaftEngine;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The WAL type could be Kafak

}
}
}
}
region.version_control.alter_options(current_options);
Expand Down Expand Up @@ -264,6 +283,13 @@ fn new_region_options_on_empty_memtable(

current_options.sst_format = Some(new_format);
}
SetRegionOption::SkipWal(skip_wal) => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If only the WAL options are configured, the underlying WAL provider is not affected, so the region continues writing data to the WAL.

if *skip_wal {
current_options.wal_options = WalOptions::Noop;
} else {
current_options.wal_options = WalOptions::RaftEngine;
}
}
}
}
Some(current_options)
Expand Down
7 changes: 6 additions & 1 deletion src/query/src/sql/show_create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ use sql::statements::{self, OptionMap};
use store_api::metric_engine_consts::{is_metric_engine, is_metric_engine_internal_column};
use table::metadata::{TableInfoRef, TableMeta};
use table::requests::{
COMMENT_KEY as TABLE_COMMENT_KEY, FILE_TABLE_META_KEY, TTL_KEY, WRITE_BUFFER_SIZE_KEY,
COMMENT_KEY as TABLE_COMMENT_KEY, FILE_TABLE_META_KEY, SKIP_WAL_KEY, TTL_KEY,
WRITE_BUFFER_SIZE_KEY,
};

use crate::error::{
Expand Down Expand Up @@ -63,6 +64,10 @@ fn create_sql_options(table_meta: &TableMeta, schema_options: Option<SchemaOptio
options.insert(TTL_KEY.to_string(), database_ttl);
};

if table_opts.skip_wal {
options.insert(SKIP_WAL_KEY.to_string(), "true".to_string());
}

for (k, v) in table_opts
.extra_options
.iter()
Expand Down
15 changes: 14 additions & 1 deletion src/store-api/src/region_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ use crate::metadata::{
use crate::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
use crate::metrics;
use crate::mito_engine_options::{
SST_FORMAT_KEY, TTL_KEY, TWCS_MAX_OUTPUT_FILE_SIZE, TWCS_TIME_WINDOW, TWCS_TRIGGER_FILE_NUM,
SKIP_WAL_KEY, SST_FORMAT_KEY, TTL_KEY, TWCS_MAX_OUTPUT_FILE_SIZE, TWCS_TIME_WINDOW,
TWCS_TRIGGER_FILE_NUM,
};
use crate::path_utils::table_dir;
use crate::storage::{ColumnId, RegionId, ScanRequest};
Expand Down Expand Up @@ -1297,6 +1298,8 @@ pub enum SetRegionOption {
Twsc(String, String),
// Modifying the SST format.
Format(String),
// Modifying skip_wal option.
SkipWal(bool),
}

impl TryFrom<&PbOption> for SetRegionOption {
Expand All @@ -1315,6 +1318,12 @@ impl TryFrom<&PbOption> for SetRegionOption {
Ok(Self::Twsc(key.clone(), value.clone()))
}
SST_FORMAT_KEY => Ok(Self::Format(value.clone())),
SKIP_WAL_KEY => {
let skip_wal = value
.parse::<bool>()
.map_err(|_| InvalidSetRegionOptionRequestSnafu { key, value }.build())?;
Ok(Self::SkipWal(skip_wal))
}
_ => InvalidSetRegionOptionRequestSnafu { key, value }.fail(),
}
}
Expand All @@ -1333,6 +1342,7 @@ impl From<&UnsetRegionOption> for SetRegionOption {
SetRegionOption::Twsc(unset_option.to_string(), String::new())
}
UnsetRegionOption::Ttl => SetRegionOption::Ttl(Default::default()),
UnsetRegionOption::SkipWal => SetRegionOption::SkipWal(false),
}
}
}
Expand All @@ -1346,6 +1356,7 @@ impl TryFrom<&str> for UnsetRegionOption {
TWCS_TRIGGER_FILE_NUM => Ok(Self::TwcsTriggerFileNum),
TWCS_MAX_OUTPUT_FILE_SIZE => Ok(Self::TwcsMaxOutputFileSize),
TWCS_TIME_WINDOW => Ok(Self::TwcsTimeWindow),
SKIP_WAL_KEY => Ok(Self::SkipWal),
_ => InvalidUnsetRegionOptionRequestSnafu { key }.fail(),
}
}
Expand All @@ -1357,6 +1368,7 @@ pub enum UnsetRegionOption {
TwcsMaxOutputFileSize,
TwcsTimeWindow,
Ttl,
SkipWal,
}

impl UnsetRegionOption {
Expand All @@ -1366,6 +1378,7 @@ impl UnsetRegionOption {
Self::TwcsTriggerFileNum => TWCS_TRIGGER_FILE_NUM,
Self::TwcsMaxOutputFileSize => TWCS_MAX_OUTPUT_FILE_SIZE,
Self::TwcsTimeWindow => TWCS_TIME_WINDOW,
Self::SkipWal => SKIP_WAL_KEY,
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions src/table/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,9 @@ impl TableMeta {
.extra_options
.insert(SST_FORMAT_KEY.to_string(), value.clone());
}
SetRegionOption::SkipWal(skip_wal) => {
new_options.skip_wal = *skip_wal;
}
}
}
let mut builder = self.new_meta_builder();
Expand Down
Loading