diff --git a/dbms/src/Common/TiFlashMetrics.h b/dbms/src/Common/TiFlashMetrics.h index 1d709eb7f30..f29372d077e 100644 --- a/dbms/src/Common/TiFlashMetrics.h +++ b/dbms/src/Common/TiFlashMetrics.h @@ -274,7 +274,12 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva F(type_seg_split_fg, {"type", "seg_split_fg"}), \ F(type_seg_split_ingest, {"type", "seg_split_ingest"}), \ F(type_seg_merge_bg_gc, {"type", "seg_merge_bg_gc"}), \ - F(type_place_index_update, {"type", "place_index_update"})) \ + F(type_place_index_update, {"type", "place_index_update"}), \ + F(type_prepare_merge_delta, {"type", "prepare_merge_delta"}), \ + F(type_prepare_merge, {"type", "prepare_merge"}), \ + F(type_prepare_split_physical, {"type", "prepare_split_physical"}), \ + F(type_remote_upload, {"type", "remote_upload"}), \ + F(type_ingest, {"type", "ingest"})) \ M(tiflash_storage_subtask_duration_seconds, \ "Bucketed histogram of storage's sub task duration", \ Histogram, /* increase the bucket from 10ms to 87 minutes */ \ @@ -288,7 +293,12 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva F(type_seg_split_fg, {{"type", "seg_split_fg"}}, ExpBuckets{0.010, 2, 20}), \ F(type_seg_split_ingest, {{"type", "seg_split_ingest"}}, ExpBuckets{0.010, 2, 20}), \ F(type_seg_merge_bg_gc, {{"type", "seg_merge_bg_gc"}}, ExpBuckets{0.010, 2, 20}), \ - F(type_place_index_update, {{"type", "place_index_update"}}, ExpBuckets{0.010, 2, 20})) \ + F(type_place_index_update, {{"type", "place_index_update"}}, ExpBuckets{0.010, 2, 20}), \ + F(type_prepare_merge_delta, {{"type", "prepare_merge_delta"}}, ExpBuckets{0.010, 2, 20}), \ + F(type_prepare_merge, {{"type", "prepare_merge"}}, ExpBuckets{0.010, 2, 20}), \ + F(type_prepare_split_physical, {{"type", "prepare_split_physical"}}, ExpBuckets{0.010, 2, 20}), \ + F(type_remote_upload, {{"type", "remote_upload"}}, ExpBuckets{0.010, 2, 20}), \ + F(type_ingest, {{"type", "ingest"}}, ExpBuckets{0.010, 2, 20})) \ M(tiflash_storage_subtask_throughput_bytes, \ "Calculate the throughput of (maybe foreground) tasks of storage in bytes", \ Counter, /**/ \ @@ -940,6 +950,17 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva Gauge, \ F(type_bg_downloading_count, {{"type", "bg_downloading_count"}}), \ F(type_bg_download_queue_count, {{"type", "bg_download_queue_count"}})) \ + M(tiflash_storage_write_filecache_staging, \ + "Write-side FileCache local staging for background DMFile reads", \ + Counter, \ + F(type_attempt, {"type", "attempt"}), \ + F(type_object, {"type", "object"}), \ + F(type_download_ok, {"type", "download_ok"}), \ + F(type_download_failed, {"type", "download_failed"})) \ + M(tiflash_storage_write_filecache_staging_bytes, \ + "Bytes staged by write FileCache local staging", \ + Counter, \ + F(type_staged, {"type", "staged"})) \ M(tiflash_system_seconds, \ "system calls duration in seconds", \ Histogram, \ diff --git a/dbms/src/Interpreters/Settings.h b/dbms/src/Interpreters/Settings.h index 4388a78d5d5..a144c8dcf31 100644 --- a/dbms/src/Interpreters/Settings.h +++ b/dbms/src/Interpreters/Settings.h @@ -252,6 +252,7 @@ struct Settings M(SettingDouble, dt_filecache_max_downloading_count_scale, 10.0, "Max queue size of download task count of FileCache = number of logical cpu cores * dt_filecache_max_downloading_count_scale.") \ M(SettingUInt64, dt_filecache_min_age_seconds, 1800, "Files of the same priority can only be evicted from files that were not accessed within `dt_filecache_min_age_seconds` seconds.") \ M(SettingUInt64, dt_filecache_wait_on_downloading_ms, 0, "When a remote cache lookup sees the same key is already being downloaded, wait up to this many milliseconds for that download to finish. 0 disables the bounded wait.") \ + M(SettingBool, dt_enable_write_filecache, false, "Enable local FileCache staging for disaggregated tiflash write nodes when FileCache is available.") \ M(SettingBool, dt_enable_fetch_memtableset, true, "Whether fetching delta cache in FetchDisaggPages") \ M(SettingUInt64, dt_fetch_pages_packet_limit_size, 512 * 1024, "Response packet bytes limit of FetchDisaggPages, 0 means one page per packet") \ M(SettingDouble, dt_fetch_page_concurrency_scale, 4.0, "Concurrency of fetching pages of one query equals to num_streams * dt_fetch_page_concurrency_scale.") \ diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index 3d4769d7f34..2ef9a53fb81 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -752,8 +752,9 @@ try const auto is_disagg_compute_mode = global_context->getSharedContextDisagg()->isDisaggregatedComputeMode(); const auto is_disagg_storage_mode = global_context->getSharedContextDisagg()->isDisaggregatedStorageMode(); const auto not_disagg_mode = global_context->getSharedContextDisagg()->notDisaggregatedMode(); + const bool enable_remote_cache = is_disagg_compute_mode || is_disagg_storage_mode; const auto [remote_cache_paths, remote_cache_capacity_quota] - = storage_config.remote_cache_config.getCacheDirInfos(is_disagg_compute_mode); + = storage_config.remote_cache_config.getCacheDirInfos(enable_remote_cache); global_context->initializePathCapacityMetric( // global_capacity_quota, // storage_config.main_data_paths, @@ -769,7 +770,7 @@ try storage_config.kvstore_data_path, // global_context->getPathCapacity(), global_context->getFileProvider()); - if (const auto & config = storage_config.remote_cache_config; config.isCacheEnabled() && is_disagg_compute_mode) + if (const auto & config = storage_config.remote_cache_config; config.isCacheEnabled() && enable_remote_cache) { config.initCacheDir(); FileCache::initialize( @@ -781,6 +782,11 @@ try // here so startup-time values like dt_filecache_wait_on_downloading_ms take effect immediately // instead of waiting for a later config reload. FileCache::instance()->updateConfig(global_context->getSettingsRef()); + LOG_INFO( + log, + "Initialized FileCache for disaggregated remote cache (compute_mode={} storage_mode={})", + is_disagg_compute_mode, + is_disagg_storage_mode); } /// Determining PageStorage run mode based on current files on disk and storage config. diff --git a/dbms/src/Server/StorageConfigParser.cpp b/dbms/src/Server/StorageConfigParser.cpp index 58d39f57179..73cf9a4d522 100644 --- a/dbms/src/Server/StorageConfigParser.cpp +++ b/dbms/src/Server/StorageConfigParser.cpp @@ -732,13 +732,14 @@ UInt64 StorageRemoteCacheConfig::getReservedCapacity() const return capacity * reserved_rate; } -std::pair> StorageRemoteCacheConfig::getCacheDirInfos(bool is_compute_mode) const +std::pair> StorageRemoteCacheConfig::getCacheDirInfos(bool is_disagg_mode) const { - if (is_compute_mode && isCacheEnabled()) + if (is_disagg_mode && isCacheEnabled()) { return { Strings{getDTFileCacheDir(), getPageCacheDir()}, - std::vector{getDTFileCapacity(), getPageCapacity()}}; + std::vector{getDTFileCapacity(), getPageCapacity()}, + }; } else { diff --git a/dbms/src/Server/StorageConfigParser.h b/dbms/src/Server/StorageConfigParser.h index d9999f98678..9bb8a178f89 100644 --- a/dbms/src/Server/StorageConfigParser.h +++ b/dbms/src/Server/StorageConfigParser.h @@ -78,7 +78,8 @@ struct StorageRemoteCacheConfig UInt64 getReservedCapacity() const; void parse(const String & content, const LoggerPtr & log); - std::pair> getCacheDirInfos(bool is_compute_mode) const; + // `is_disagg_mode` is true when the process is a disaggregated compute or storage node that may use remote cache. + std::pair> getCacheDirInfos(bool is_disagg_mode) const; }; struct TiFlashStorageConfig diff --git a/dbms/src/Server/tests/gtest_storage_config.cpp b/dbms/src/Server/tests/gtest_storage_config.cpp index 6459fac75e6..c3e58a5eb40 100644 --- a/dbms/src/Server/tests/gtest_storage_config.cpp +++ b/dbms/src/Server/tests/gtest_storage_config.cpp @@ -1000,6 +1000,47 @@ delta_rate = 1.1 } CATCH +TEST_F(StorageConfigTest, RemoteCacheDirInfosForDisaggMode) +try +{ + auto log = Logger::get("StorageConfigTest.RemoteCacheDirInfosForDisaggMode"); + const String config_str = R"( +[storage] +[storage.main] +dir = ["123"] +[storage.remote.cache] +dir = "/tmp/StorageConfigTest/RemoteCacheDirInfosForDisaggMode" +capacity = 10000000 +dtfile_level = 11 +delta_rate = 0.33 + )"; + + auto config = loadConfigFromString(config_str); + size_t global_capacity_quota = 0; + TiFlashStorageConfig storage; + std::tie(global_capacity_quota, storage) = TiFlashStorageConfig::parseSettings(*config, log); + + const auto & cache_config = storage.remote_cache_config; + ASSERT_TRUE(cache_config.isCacheEnabled()); + + { + const auto [paths, capacities] = cache_config.getCacheDirInfos(false); + ASSERT_TRUE(paths.empty()); + ASSERT_TRUE(capacities.empty()); + } + + { + const auto [paths, capacities] = cache_config.getCacheDirInfos(true); + ASSERT_EQ(paths.size(), 2); + ASSERT_EQ(capacities.size(), 2); + ASSERT_EQ(paths[0], cache_config.getDTFileCacheDir()); + ASSERT_EQ(paths[1], cache_config.getPageCacheDir()); + ASSERT_EQ(capacities[0], cache_config.getDTFileCapacity()); + ASSERT_EQ(capacities[1], cache_config.getPageCapacity()); + } +} +CATCH + TEST_F(StorageConfigTest, TempPath) try { diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h index 1282fe97e71..32eeca1d0c0 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h @@ -170,11 +170,10 @@ class DeltaMergeStore #endif using TaskQueue = std::queue>; + std::mutex mutex; TaskQueue light_tasks; TaskQueue heavy_tasks; - std::mutex mutex; - public: size_t length() { diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Ingest.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Ingest.cpp index ae2a0a19a51..a2220269567 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Ingest.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Ingest.cpp @@ -14,6 +14,7 @@ #include #include +#include #include #include #include @@ -228,6 +229,25 @@ Segments DeltaMergeStore::ingestDTFilesUsingColumnFile( return updated_segments; } +namespace +{ +constexpr double slow_split_ingest_log_threshold_seconds = 10.0; + +struct SplitIngestLogContext +{ + const Stopwatch & watch; + + bool isSlow() const { return watch.elapsedSeconds() > slow_split_ingest_log_threshold_seconds; } + + double elapsedSeconds() const { return watch.elapsedSeconds(); } + + Poco::Message::Priority debugOrInfoLevel() const + { + return isSlow() ? Poco::Message::PRIO_INFORMATION : Poco::Message::PRIO_DEBUG; + } +}; +} // namespace + /** * Accept a target ingest range and a vector of DTFiles, ingest these DTFiles (clipped by the target ingest range) * using logical split. @@ -245,6 +265,9 @@ Segments DeltaMergeStore::ingestDTFilesUsingSplit( const DMFiles & files, bool clear_data_in_range) { + Stopwatch watch; + SplitIngestLogContext split_ingest_log{watch}; + { RUNTIME_CHECK(files.size() == external_files.size(), files.size(), external_files.size()); for (size_t i = 0; i < files.size(); ++i) @@ -331,9 +354,11 @@ Segments DeltaMergeStore::ingestDTFilesUsingSplit( * ↑ The segment we ingest DMFile into */ - LOG_DEBUG( + LOG_IMPL( log, - "Table ingest using split - split ingest phase - begin, ingest_range={}, files_n={}", + split_ingest_log.debugOrInfoLevel(), + "Table ingest using split - split ingest phase - begin, elapsed_seconds={:.3f} ingest_range={} files_n={}", + split_ingest_log.elapsedSeconds(), ingest_range.toDebugString(), files.size()); @@ -389,8 +414,10 @@ Segments DeltaMergeStore::ingestDTFilesUsingSplit( LOG_INFO( log, - "Table ingest using split - split ingest phase - Try to ingest file into segment, file_idx={} " + "Table ingest using split - split ingest phase - Try to ingest file into segment, " + "elapsed_seconds={:.3f} file_idx={} " "file_id=dmf_{} file_ingest_range={} segment={} segment_ingest_range={}", + split_ingest_log.elapsedSeconds(), file_idx, files[file_idx]->fileId(), file_ingest_range.toDebugString(), @@ -412,15 +439,17 @@ Segments DeltaMergeStore::ingestDTFilesUsingSplit( } else { - // this segment is abandoned, or may be split into multiples. + // this segment is abandoned, or may be split into multiples, or running segmentMerge/segmentMergeDelta, etc. // retry with current range and file and find segment again. } } } - LOG_DEBUG( + LOG_IMPL( log, - "Table ingest using split - split ingest phase - finished, updated_segments_n={}", + split_ingest_log.debugOrInfoLevel(), + "Table ingest using split - split ingest phase - finished, elapsed_seconds={:.3f} updated_segments_n={}", + split_ingest_log.elapsedSeconds(), updated_segments.size()); return std::vector(updated_segments.begin(), updated_segments.end()); @@ -592,6 +621,11 @@ UInt64 DeltaMergeStore::ingestFiles( throw Exception(msg); } + GET_METRIC(tiflash_storage_subtask_count, type_ingest).Increment(); + Stopwatch watch_ingest; + SCOPE_EXIT( + { GET_METRIC(tiflash_storage_subtask_duration_seconds, type_ingest).Observe(watch_ingest.elapsedSeconds()); }); + { // `ingestDTFilesUsingSplit` requires external_files to be not overlapped. Otherwise the results will be incorrect. // Here we verify the external_files are ordered and not overlapped. @@ -651,8 +685,6 @@ UInt64 DeltaMergeStore::ingestFiles( } } - EventRecorder write_block_recorder(ProfileEvents::DMWriteFile, ProfileEvents::DMWriteFileNS); - auto delegate = dm_context->path_pool->getStableDiskDelegator(); auto file_provider = dm_context->global_context.getFileProvider(); @@ -787,10 +819,21 @@ UInt64 DeltaMergeStore::ingestFiles( if (has_segments || !external_files.empty()) { if (use_split_replace) - updated_segments - = ingestDTFilesUsingSplit(dm_context, range, external_files, files, clear_data_in_range); + { + // For large files, we use split+replace to ingest the files into stable layer directly. + // Check the `ingestDTFilesUsingSplit` for the details steps. + updated_segments = ingestDTFilesUsingSplit( // + dm_context, + range, + external_files, + files, + clear_data_in_range); + } else + { + // For small files, we ingest them into the delta layer directly, which is more efficient than split+replace. updated_segments = ingestDTFilesUsingColumnFile(dm_context, range, files, clear_data_in_range); + } } } @@ -837,7 +880,8 @@ UInt64 DeltaMergeStore::ingestFiles( LOG_INFO( log, - "Table ingest files - finished ingested files into segments, {} clear={}", + "Table ingest files - finished ingested files into segments, elapsed_seconds={:.3f} {} clear={}", + watch_ingest.elapsedSeconds(), get_ingest_info(), clear_data_in_range); } diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp index 0dabaa6910a..d069d35c6d6 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp @@ -279,8 +279,13 @@ SegmentPair DeltaMergeStore::segmentSplit( LOG_INFO( log, - "Split - {} - Finish, segment is split into two, old_segment={} new_left={} new_right={}", + "Split - {} - Finish, segment is split into two, reason={} " + "prepare_seconds={:.3f} remote_upload_seconds={:.3f} " + "old_segment={} new_left={} new_right={}", split_info.is_logical ? "SplitLogical" : "SplitPhysical", + magic_enum::enum_name(reason), + split_info.prepare_seconds, + split_info.remote_upload_seconds, segment->info(), new_left->info(), new_right->info()); @@ -331,6 +336,7 @@ SegmentPtr DeltaMergeStore::segmentMerge( dm_context.min_version, Segment::simpleInfo(ordered_segments)); + // keep "for_update=true" snapshot for all related segments std::vector ordered_snapshots; ordered_snapshots.reserve(ordered_segments.size()); ColumnDefinesPtr schema_snap; @@ -395,7 +401,9 @@ SegmentPtr DeltaMergeStore::segmentMerge( }); WriteBatches wbs(*storage_pool, dm_context.getWriteLimiter()); - auto merged_stable = Segment::prepareMerge(dm_context, schema_snap, ordered_segments, ordered_snapshots, wbs); + const auto prepare_result + = Segment::prepareMerge(dm_context, schema_snap, ordered_segments, ordered_snapshots, wbs); + auto merged_stable = prepare_result.stable; wbs.writeLogAndData(); merged_stable->enableDMFilesGC(dm_context); @@ -437,9 +445,13 @@ SegmentPtr DeltaMergeStore::segmentMerge( LOG_INFO( log, - "Merge - Finish, {} segments are merged into one, reason={} merged={} segments_to_merge={}", + "Merge - Finish, {} segments are merged into one, reason={} " + "prepare_seconds={:.3f} remote_upload_seconds={:.3f} " + "merged={} segments_to_merge={}", ordered_segments.size(), magic_enum::enum_name(reason), + prepare_result.prepare_seconds, + prepare_result.remote_upload_seconds, merged->info(), Segment::info(ordered_segments)); } @@ -775,7 +787,7 @@ void DeltaMergeStore::segmentEnsureStableLocalIndex( DMFile::info(index_build_info.dm_files)); // 3. Update the meta version of the segments to the latest one. - // To avoid logical split between step 2 and 3, get lastest segments to update again. + // To avoid logical split between step 2 and 3, get latest segments to update again. // If TiFlash crashes during updating the meta version, some segments' meta are updated and some are not. // So after TiFlash restarts, we will update meta versions to latest versions again. { @@ -1273,7 +1285,8 @@ SegmentPtr DeltaMergeStore::segmentMergeDelta( WriteBatches wbs(*storage_pool, dm_context.getWriteLimiter()); - auto new_stable = segment->prepareMergeDelta(dm_context, schema_snap, segment_snap, wbs); + const auto prepare_result = segment->prepareMergeDelta(dm_context, schema_snap, segment_snap, wbs); + auto new_stable = prepare_result.stable; wbs.writeLogAndData(); new_stable->enableDMFilesGC(dm_context); @@ -1307,7 +1320,12 @@ SegmentPtr DeltaMergeStore::segmentMergeDelta( LOG_INFO( log, - "MergeDelta - Finish, delta is merged, old_segment={} new_segment={}", + "MergeDelta - Finish, delta is merged, reason={} " + "prepare_seconds={:.3f} remote_upload_seconds={:.3f} " + "old_segment={} new_segment={}", + magic_enum::enum_name(reason), + prepare_result.prepare_seconds, + prepare_result.remote_upload_seconds, segment->info(), new_segment->info()); } diff --git a/dbms/src/Storages/DeltaMerge/File/DMFile.h b/dbms/src/Storages/DeltaMerge/File/DMFile.h index f72876149f6..601284b3d6b 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFile.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFile.h @@ -44,6 +44,12 @@ class DMFileMetaV2Test; class DMStoreForSegmentReadTaskTest; } // namespace tests +struct LocalReadObject; +std::vector collectMetaV2MergedFilesForLocalRead( + const DMFilePtr & dmfile, + const ColumnDefines & read_columns, + const LoggerPtr & log, + const String & tracing_id); class DMFile : private boost::noncopyable { @@ -132,6 +138,7 @@ class DMFile : private boost::noncopyable const DMFileMeta::PackProperties & getPackProperties() const { return meta->pack_properties; } const ColumnStats & getColumnStats() const { return meta->column_stats; } const std::unordered_set & getColumnIndices() const { return meta->column_indices; } + size_t getNumColumns() const { return meta->column_stats.size(); } // only used in gtest void clearPackProperties() const { meta->pack_properties.clear_property(); } @@ -361,6 +368,11 @@ class DMFile : private boost::noncopyable friend class ColumnReadStream; friend class DMFilePackFilter; friend class DMFileBlockInputStreamBuilder; + friend std::vector collectMetaV2MergedFilesForLocalRead( + const DMFilePtr & dmfile, + const ColumnDefines & read_columns, + const LoggerPtr & log, + const String & tracing_id); friend class tests::DMFileTest; friend class tests::DMFileMetaV2Test; friend class tests::DMStoreForSegmentReadTaskTest; diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp index 93efb0fe9a3..f716e2950ad 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp @@ -15,6 +15,7 @@ #include // For ENABLE_CLARA #include #include +#include #include #include #include @@ -91,6 +92,13 @@ DMFileBlockInputStreamPtr DMFileBlockInputStreamBuilder::buildNoLocalIndex( tracing_id); } + auto local_read_files = tryDownloadMetaV2MergedFilesForLocalRead( + dmfile, + read_columns, + enable_write_filecache_local_read, + Logger::get(tracing_id), + tracing_id); + DMFileReader reader( dmfile, read_columns, @@ -111,7 +119,8 @@ DMFileBlockInputStreamPtr DMFileBlockInputStreamBuilder::buildNoLocalIndex( tracing_id, max_sharing_column_bytes_for_all, scan_context, - read_tag); + read_tag, + std::move(local_read_files)); return std::make_shared(std::move(reader), max_sharing_column_bytes_for_all > 0); } diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.h b/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.h index 5ec5f660765..3088b6d3e07 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.h @@ -192,6 +192,12 @@ class DMFileBlockInputStreamBuilder return *this; } + DMFileBlockInputStreamBuilder & enableWriteFileCacheLocalRead(bool enable) + { + enable_write_filecache_local_read = enable; + return *this; + } + private: DMFileBlockInputStreamPtr buildNoLocalIndex( const DMFilePtr & dmfile, @@ -234,8 +240,8 @@ class DMFileBlockInputStreamBuilder private: FileProviderPtr file_provider; + bool enable_write_filecache_local_read = false; // clean read - bool enable_handle_clean_read = false; bool is_fast_scan = false; bool enable_del_clean_read = false; diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileLocalStaging.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileLocalStaging.cpp new file mode 100644 index 00000000000..7a381f476c7 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/File/DMFileLocalStaging.cpp @@ -0,0 +1,269 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +namespace DB::DM +{ +namespace +{ +struct LogicalReadFile +{ + ColId col_id; + String filename; +}; + +std::optional getMergedFileSize(const DMFileMetaV2 & dmfile_meta, UInt32 number) +{ + for (const auto & merged_file : dmfile_meta.merged_files) + { + if (merged_file.number == number) + return merged_file.size; + } + return std::nullopt; +} + +void tryCollectS3Object( + std::unordered_map & objects_by_key, + const String & file_path, + UInt64 file_size, + const DMFilePtr & dmfile, + const LoggerPtr & log, + const String & tracing_id, + const String & logical_filename) +{ + if (file_size == 0) + return; + + const auto s3_fname = S3::S3FilenameView::fromKeyWithPrefix(file_path); + if (!s3_fname.isValid()) + { + LOG_DEBUG( + log, + "Skip local staging collection for non-S3 path, tracing_id={} dmfile={} logical_file={} path={}", + tracing_id, + dmfile->parentPath(), + logical_filename, + file_path); + return; + } + + objects_by_key.emplace( + s3_fname.toFullKey(), + LocalReadObject{ + .s3_key = s3_fname.toFullKey(), + .file_size = file_size, + }); +} +} // namespace + +std::vector collectMetaV2MergedFilesForLocalRead( + const DMFilePtr & dmfile, + const ColumnDefines & read_columns, + const LoggerPtr & log, + const String & tracing_id) +{ + if (!dmfile->useMetaV2()) + return {}; + + const auto * dmfile_meta = typeid_cast(dmfile->meta.get()); + if (dmfile_meta == nullptr) + return {}; + + // Logical subfile names (e.g. `c1.dat`, `c1.mrk`) required by read_columns. + // MetaV2 stores many logical files inside physical `.merged` blobs; this list is + // the per-column/substream view before resolving to merged file numbers or standalone paths. + std::vector logical_files; + for (const auto & cd : read_columns) + { + if (!dmfile->isColumnExist(cd.id)) + continue; + + const auto data_type = dmfile->getColumnStat(cd.id).type; + data_type->enumerateStreams( + [&](const IDataType::SubstreamPath & substream) { + const auto stream_name = DMFile::getFileNameBase(cd.id, substream); + logical_files.push_back({cd.id, colDataFileName(stream_name)}); + logical_files.push_back({cd.id, colMarkFileName(stream_name)}); + }, + {}); + } + + // Physical S3 objects to stage locally, keyed by full S3 key. + // Multiple logical subfiles may map to the same merged blob; dedup here so + // each object is downloaded at most once. + std::unordered_map objects_by_key; + for (const auto & logical_file : logical_files) + { + const auto & logical_filename = logical_file.filename; + const auto info_iter = dmfile_meta->merged_sub_file_infos.find(logical_filename); + if (info_iter == dmfile_meta->merged_sub_file_infos.end()) + { + // Large column subfiles (e.g. `.dat`) are kept as standalone S3 objects when they + // exceed `small_file_size_threshold`; only their marks/indexes may live in `.merged`. + const auto file_path = dmfile->subFilePath(logical_filename); + const auto file_size = dmfile->getReadFileSize(logical_file.col_id, logical_filename); + tryCollectS3Object(objects_by_key, file_path, file_size, dmfile, log, tracing_id, logical_filename); + continue; + } + + const auto & merged_file_info = info_iter->second; + const auto merged_file_size = getMergedFileSize(*dmfile_meta, merged_file_info.number); + if (!merged_file_size.has_value()) + { + LOG_DEBUG( + log, + "Skip local staging collection for unknown merged file, tracing_id={} dmfile={} logical_file={} " + "merged_number={}", + tracing_id, + dmfile->parentPath(), + logical_filename, + merged_file_info.number); + continue; + } + + const auto merged_path = dmfile_meta->mergedPath(merged_file_info.number); + tryCollectS3Object( + objects_by_key, + merged_path, + merged_file_size.value(), + dmfile, + log, + tracing_id, + logical_filename); + } + + std::vector objects; + objects.reserve(objects_by_key.size()); + for (auto & [_, object] : objects_by_key) + objects.emplace_back(std::move(object)); + return objects; +} + +namespace +{ +constexpr Int32 local_staging_download_retry_count = 3; +} // namespace + +std::vector tryDownloadMetaV2MergedFilesForLocalRead( + const DMFilePtr & dmfile, + const ColumnDefines & read_columns, + bool enable_write_filecache_local_read, + const LoggerPtr & log, + const String & tracing_id) +{ + if (!enable_write_filecache_local_read) + return {}; + + auto * file_cache = FileCache::instance(); + if (file_cache == nullptr) + return {}; + + const auto objects = collectMetaV2MergedFilesForLocalRead(dmfile, read_columns, log, tracing_id); + if (objects.empty()) + return {}; + + GET_METRIC(tiflash_storage_write_filecache_staging, type_attempt).Increment(); + GET_METRIC(tiflash_storage_write_filecache_staging, type_object).Increment(objects.size()); + + std::vector local_read_files; + local_read_files.reserve(objects.size()); + + size_t downloaded_count = 0; + size_t failed_count = 0; + for (const auto & object : objects) + { + const auto s3_fname = S3::S3FilenameView::fromKey(object.s3_key); + if (!s3_fname.isValid()) + { + ++failed_count; + LOG_DEBUG( + log, + "Skip write FileCache local staging for invalid S3 key, tracing_id={} dmfile={} key={}", + tracing_id, + dmfile->parentPath(), + object.s3_key); + continue; + } + + try + { + auto [file_seg, has_s3_download] = file_cache->downloadFileForLocalReadWithRetry( + s3_fname, + object.file_size, + local_staging_download_retry_count); + (void)has_s3_download; + if (!file_seg) + { + ++failed_count; + LOG_WARNING( + log, + "Write FileCache local staging download returned empty segment, tracing_id={} dmfile={} key={}", + tracing_id, + dmfile->parentPath(), + object.s3_key); + continue; + } + local_read_files.emplace_back(std::move(file_seg)); + GET_METRIC(tiflash_storage_write_filecache_staging, type_download_ok).Increment(); + // Cumulative bytes of physical objects successfully staged (metadata file_size). + // Counter only increases; reader pin release does not decrement it. For live FileCache + // occupancy, use tiflash_storage_remote_cache_bytes instead. + GET_METRIC(tiflash_storage_write_filecache_staging_bytes, type_staged).Increment(object.file_size); + ++downloaded_count; + } + catch (...) + { + ++failed_count; + tryLogCurrentException( + log, + fmt::format( + "Write FileCache local staging download failed, tracing_id={} dmfile={} key={}", + tracing_id, + dmfile->parentPath(), + object.s3_key)); + } + } + + if (failed_count > 0) + GET_METRIC(tiflash_storage_write_filecache_staging, type_download_failed).Increment(failed_count); + + LOG_DEBUG( + log, + "Write FileCache local staging finished, tracing_id={} dmfile={} objects={} downloaded={} failed={}", + tracing_id, + dmfile->parentPath(), + objects.size(), + downloaded_count, + failed_count); + + return local_read_files; +} + +} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileLocalStaging.h b/dbms/src/Storages/DeltaMerge/File/DMFileLocalStaging.h new file mode 100644 index 00000000000..0d209024120 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/File/DMFileLocalStaging.h @@ -0,0 +1,61 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include + +#include +#include + +namespace DB +{ +class FileSegment; +using FileSegmentPtr = std::shared_ptr; +} // namespace DB + +namespace DB::DM +{ + +struct LocalReadObject +{ + String s3_key; + UInt64 file_size = 0; +}; + +/// Collect S3 objects to stage locally before reading a MetaV2 DMFile. +/// +/// Collects physical `.merged` blobs referenced by `read_columns`, as well as +/// standalone column subfiles (e.g. large `.dat` files not merged into `.merged`). +/// +/// Returns empty for non-MetaV2 DMFiles or when paths are not valid remote S3 keys. +std::vector collectMetaV2MergedFilesForLocalRead( + const DMFilePtr & dmfile, + const ColumnDefines & read_columns, + const LoggerPtr & log, + const String & tracing_id); + +/// Download collected MetaV2 objects into FileCache and return pins for reader lifetime. +/// Returns empty when staging is disabled, FileCache is unavailable, or nothing to stage. +/// Per-object download failures are logged and counted; successful pins are still returned. +std::vector tryDownloadMetaV2MergedFilesForLocalRead( + const DMFilePtr & dmfile, + const ColumnDefines & read_columns, + bool enable_write_filecache_local_read, + const LoggerPtr & log, + const String & tracing_id); + +} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp index 62ae9a3d34a..d5718aa5618 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp @@ -73,7 +73,8 @@ DMFileReader::DMFileReader( const String & tracing_id_, size_t max_sharing_column_bytes_, const ScanContextPtr & scan_context_, - const ReadTag read_tag_) + const ReadTag read_tag_, + std::vector local_read_files_) : dmfile(dmfile_) , read_columns(read_columns_) , is_common_handle(is_common_handle_) @@ -92,6 +93,7 @@ DMFileReader::DMFileReader( , max_sharing_column_bytes(max_sharing_column_bytes_) , file_provider(file_provider_) , log(Logger::get(tracing_id_)) + , local_read_files(std::move(local_read_files_)) , read_block_infos(ReadBlockInfo::create( pack_filter->getPackRes(), dmfile->getPackStats(), diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileReader.h b/dbms/src/Storages/DeltaMerge/File/DMFileReader.h index 0c686e773a5..5d1e27b2d1d 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileReader.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFileReader.h @@ -29,6 +29,15 @@ #include #include +#include +#include + +namespace DB +{ +class FileSegment; +using FileSegmentPtr = std::shared_ptr; +} // namespace DB + namespace DB::DM { @@ -69,7 +78,8 @@ class DMFileReader const String & tracing_id_, size_t max_sharing_column_bytes_, const ScanContextPtr & scan_context_, - ReadTag read_tag_); + ReadTag read_tag_, + std::vector local_read_files_ = {}); Block getHeader() const { return toEmptyBlock(read_columns); } @@ -191,6 +201,9 @@ class DMFileReader // DataSharing ColumnCachePtr data_sharing_col_data_cache; + // Pin FileCache local files for reader lifetime. Not used in read logic directly. + std::vector local_read_files; + std::deque read_block_infos; // row_offset of the given pack_id const std::vector pack_offset; diff --git a/dbms/src/Storages/DeltaMerge/File/tests/gtest_dm_meta_version.cpp b/dbms/src/Storages/DeltaMerge/File/tests/gtest_dm_meta_version.cpp index c1f1bcb3487..6fc739e7eab 100644 --- a/dbms/src/Storages/DeltaMerge/File/tests/gtest_dm_meta_version.cpp +++ b/dbms/src/Storages/DeltaMerge/File/tests/gtest_dm_meta_version.cpp @@ -12,28 +12,73 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include +#include +#include #include +#include #include #include #include #include +#include #include +#include +#include +#include #include #include +#include #include #include #include #include #include +#include #include #include #include +#include namespace DB::DM::tests { +namespace +{ +// Keep in sync with gtests_dbms_main.cpp. +constexpr size_t s3_file_cache_pool_max_threads = 20; +constexpr size_t s3_file_cache_pool_max_free_threads = 10; +constexpr size_t s3_file_cache_pool_queue_size = 1000; + +void reinitS3FileCachePool() +{ + S3FileCachePool::initialize( + s3_file_cache_pool_max_threads, + s3_file_cache_pool_max_free_threads, + s3_file_cache_pool_queue_size); +} + +void shutdownWriteFileCache() +{ + if (FileCache::instance() == nullptr) + return; + FileCache::shutdown(); + // FileCache::shutdown() tears down S3FileCachePool; restore it for other tests. + reinitS3FileCachePool(); +} + +void initWriteFileCache(PathCapacityMetricsPtr capacity_metrics, IORateLimiter & rate_limiter, String cache_dir) +{ + StorageRemoteCacheConfig file_cache_config{ + .dir = std::move(cache_dir), + .capacity = 1 * 1000 * 1000 * 1000, + }; + FileCache::initialize(capacity_metrics, file_cache_config, /*logical_cores=*/8, rate_limiter); +} +} // namespace + class DMFileMetaVersionTestBase : public DB::base::TiFlashStorageTestBasic { public: @@ -353,6 +398,8 @@ class S3DMFile void TearDown() override { + shutdownWriteFileCache(); + DMFileMetaVersionTestBase::TearDown(); auto & global_context = db_context->getGlobalContext(); @@ -385,6 +432,53 @@ class S3DMFile return dm_file; } + DMFilePtr prepareDMFileWithStandaloneColumnData(UInt64 file_id) + { + constexpr UInt64 small_file_size_threshold = 4096; + auto dm_file = DMFile::create( + file_id, + parent_path, + std::make_optional(), + small_file_size_threshold, + 16 * 1024 * 1024, + DMFileFormat::V3); + + auto cols = DMTestEnv::getDefaultColumns(DMTestEnv::PkType::HiddenTiDBRowID, /*add_nullable*/ true); + constexpr size_t num_rows = 20000; + Block block = DMTestEnv::prepareSimpleWriteBlockWithNullable(0, num_rows); + + auto writer = DMFileWriter( + dm_file, + *cols, + file_provider_maybe_encrypted, + db_context->getWriteLimiter(), + DMFileWriter::Options()); + writer.write(block, DMFileBlockOutputStream::BlockProperty{0, 0, 0, 0}); + writer.finalize(); + + const auto * dmfile_meta = typeid_cast(dm_file->meta.get()); + RUNTIME_CHECK(dmfile_meta != nullptr); + const auto handle_stream_name = IDataType::getFileNameForStream(DB::toString(MutSup::extra_handle_id), {}); + const auto handle_dat_fname = colDataFileName(handle_stream_name); + RUNTIME_CHECK( + dm_file->getColumnStat(MutSup::extra_handle_id).data_bytes > small_file_size_threshold, + dm_file->getColumnStat(MutSup::extra_handle_id).data_bytes); + RUNTIME_CHECK( + dmfile_meta->merged_sub_file_infos.find(handle_dat_fname) == dmfile_meta->merged_sub_file_infos.end(), + handle_dat_fname); + + dataStore()->putDMFile( + dm_file, + S3::DMFileOID{ + .store_id = store_id, + .keyspace_id = keyspace_id, + .table_id = table_id, + .file_id = dm_file->fileId(), + }, + true); + return dm_file; + } + protected: const StoreID store_id = 17; @@ -469,16 +563,10 @@ CATCH TEST_P(S3DMFile, WithFileCache) try { - StorageRemoteCacheConfig file_cache_config{ - .dir = fmt::format("{}/fs_cache", getTemporaryPath()), - .capacity = 1 * 1000 * 1000 * 1000, - }; - UInt16 vcores = 8; - FileCache::initialize( + initWriteFileCache( db_context->getGlobalContext().getPathCapacity(), - file_cache_config, - vcores, - db_context->getGlobalContext().getIORateLimiter()); + db_context->getGlobalContext().getIORateLimiter(), + fmt::format("{}/fs_cache", getTemporaryPath())); auto dm_file = prepareDMFileRemote(/* file_id= */ 1); ASSERT_TRUE(dm_file->path().starts_with("s3://")); @@ -526,8 +614,221 @@ try cn_dmf = token->restore(DMFileMeta::ReadMode::all(), 1); ASSERT_EQ(1, cn_dmf->metaVersion()); ASSERT_STREQ("test", cn_dmf->meta->getColumnStats()[MutSup::extra_handle_id].additional_data_for_test.c_str()); +} +CATCH + +namespace +{ +std::unordered_set collectObjectKeys( + const DMFilePtr & dmfile, + const ColumnDefines & read_columns, + const LoggerPtr & log) +{ + const auto objects = collectMetaV2MergedFilesForLocalRead(dmfile, read_columns, log, "DMFileLocalStagingTest"); + std::unordered_set keys; + for (const auto & object : objects) + keys.insert(object.s3_key); + return keys; +} +} // namespace + +TEST_P(LocalDMFile, LocalStagingNonMetaV2Noop) +try +{ + auto log = Logger::get("DMFileLocalStagingTest"); + auto cols = DMTestEnv::getDefaultColumns(DMTestEnv::PkType::HiddenTiDBRowID, /*add_nullable*/ true); + auto dm_file = DMFile::create( + /*file_id=*/2, + parent_path, + std::make_optional(), + /*small_file_size_threshold=*/0, + 16 * 1024 * 1024, + DMFileFormat::V2); + ASSERT_FALSE(dm_file->useMetaV2()); + ASSERT_TRUE(collectMetaV2MergedFilesForLocalRead(dm_file, *cols, log, "DMFileLocalStagingTest").empty()); +} +CATCH + +TEST_P(LocalDMFile, LocalStagingInvalidS3PathNoop) +try +{ + auto log = Logger::get("DMFileLocalStagingTest"); + auto cols = DMTestEnv::getDefaultColumns(DMTestEnv::PkType::HiddenTiDBRowID, /*add_nullable*/ true); + auto dm_file = prepareDMFile(/* file_id= */ 3); + ASSERT_TRUE(dm_file->useMetaV2()); + ASSERT_TRUE(collectMetaV2MergedFilesForLocalRead(dm_file, *cols, log, "DMFileLocalStagingTest").empty()); +} +CATCH + +TEST_P(S3DMFile, LocalStagingDedupMergedFiles) +try +{ + auto log = Logger::get("DMFileLocalStagingTest"); + auto cols = DMTestEnv::getDefaultColumns(DMTestEnv::PkType::HiddenTiDBRowID, /*add_nullable*/ true); + auto dm_file = prepareDMFileRemote(/* file_id= */ 10); + const auto * dmfile_meta = typeid_cast(dm_file->meta.get()); + ASSERT_NE(dmfile_meta, nullptr); + + const auto objects = collectMetaV2MergedFilesForLocalRead(dm_file, *cols, log, "DMFileLocalStagingTest"); + ASSERT_FALSE(objects.empty()); + ASSERT_EQ(objects.size(), dmfile_meta->merged_files.size()); + + std::unordered_set keys; + for (const auto & object : objects) + { + EXPECT_TRUE(S3::S3FilenameView::fromKey(object.s3_key).isValid()); + EXPECT_TRUE(object.s3_key.ends_with(".merged")); + EXPECT_GT(object.file_size, 0); + EXPECT_TRUE(keys.insert(object.s3_key).second); + } +} +CATCH + +TEST_P(S3DMFile, LocalStagingCollectStandaloneDatFiles) +try +{ + auto log = Logger::get("DMFileLocalStagingTest"); + auto cols = DMTestEnv::getDefaultColumns(DMTestEnv::PkType::HiddenTiDBRowID, /*add_nullable*/ true); + auto dm_file = prepareDMFileWithStandaloneColumnData(/* file_id= */ 16); + const auto * dmfile_meta = typeid_cast(dm_file->meta.get()); + ASSERT_NE(dmfile_meta, nullptr); + + const auto handle_stream_name = IDataType::getFileNameForStream(DB::toString(MutSup::extra_handle_id), {}); + const auto handle_dat_fname = colDataFileName(handle_stream_name); + ASSERT_EQ(dmfile_meta->merged_sub_file_infos.find(handle_dat_fname), dmfile_meta->merged_sub_file_infos.end()); + + const auto objects = collectMetaV2MergedFilesForLocalRead(dm_file, *cols, log, "DMFileLocalStagingTest"); + ASSERT_FALSE(objects.empty()); + + bool has_merged = false; + bool has_standalone_dat = false; + for (const auto & object : objects) + { + EXPECT_TRUE(S3::S3FilenameView::fromKey(object.s3_key).isValid()); + EXPECT_GT(object.file_size, 0); + if (object.s3_key.ends_with(".merged")) + has_merged = true; + if (object.s3_key.ends_with(handle_dat_fname)) + has_standalone_dat = true; + } + EXPECT_TRUE(has_merged); + EXPECT_TRUE(has_standalone_dat); + + const ColumnDefines handle_only{getExtraHandleColumnDefine(/*is_common_handle=*/false)}; + const auto handle_objects + = collectMetaV2MergedFilesForLocalRead(dm_file, handle_only, log, "DMFileLocalStagingTest"); + ASSERT_FALSE(handle_objects.empty()); + for (const auto & object : handle_objects) + { + EXPECT_TRUE(object.s3_key.ends_with(".merged") || object.s3_key.ends_with(handle_dat_fname)); + } +} +CATCH + +TEST_P(S3DMFile, LocalStagingOnlyReadColumns) +try +{ + auto log = Logger::get("DMFileLocalStagingTest"); + auto all_cols = DMTestEnv::getDefaultColumns(DMTestEnv::PkType::HiddenTiDBRowID, /*add_nullable*/ true); + auto dm_file = prepareDMFileRemote(/* file_id= */ 11); + + const ColumnDefines handle_only{getExtraHandleColumnDefine(/*is_common_handle=*/false)}; + const ColumnDefines nullable_only{ColumnDefine{ + 1, + "Nullable(UInt64)", + DataTypeFactory::instance().get("Nullable(UInt64)"), + }}; + + const auto all_keys = collectObjectKeys(dm_file, *all_cols, log); + const auto handle_keys = collectObjectKeys(dm_file, handle_only, log); + const auto nullable_keys = collectObjectKeys(dm_file, nullable_only, log); + + ASSERT_FALSE(all_keys.empty()); + ASSERT_FALSE(handle_keys.empty()); + ASSERT_FALSE(nullable_keys.empty()); + for (const auto & key : handle_keys) + ASSERT_TRUE(all_keys.contains(key)); + for (const auto & key : nullable_keys) + ASSERT_TRUE(all_keys.contains(key)); + + ColumnDefines missing_column{ColumnDefine{ + 9999, + "missing", + DataTypeFactory::instance().get("Int64"), + }}; + ASSERT_TRUE(collectObjectKeys(dm_file, missing_column, log).empty()); +} +CATCH + +TEST_P(S3DMFile, LocalStagingDownloadDisabled) +try +{ + auto log = Logger::get("DMFileLocalStagingTest"); + auto cols = DMTestEnv::getDefaultColumns(DMTestEnv::PkType::HiddenTiDBRowID, /*add_nullable*/ true); + auto dm_file = prepareDMFileRemote(/* file_id= */ 12); + ASSERT_TRUE( + tryDownloadMetaV2MergedFilesForLocalRead(dm_file, *cols, /*enable=*/false, log, "DMFileLocalStagingTest") + .empty()); +} +CATCH + +TEST_P(S3DMFile, LocalStagingDownloadWithoutFileCache) +try +{ + auto log = Logger::get("DMFileLocalStagingTest"); + auto cols = DMTestEnv::getDefaultColumns(DMTestEnv::PkType::HiddenTiDBRowID, /*add_nullable*/ true); + auto dm_file = prepareDMFileRemote(/* file_id= */ 13); + ASSERT_EQ(FileCache::instance(), nullptr); + ASSERT_TRUE(tryDownloadMetaV2MergedFilesForLocalRead(dm_file, *cols, /*enable=*/true, log, "DMFileLocalStagingTest") + .empty()); +} +CATCH + +TEST_P(S3DMFile, LocalStagingDownloadMergedFiles) +try +{ + initWriteFileCache( + db_context->getGlobalContext().getPathCapacity(), + db_context->getGlobalContext().getIORateLimiter(), + fmt::format("{}/write_filecache_staging", getTemporaryPath())); + + auto log = Logger::get("DMFileLocalStagingTest"); + auto cols = DMTestEnv::getDefaultColumns(DMTestEnv::PkType::HiddenTiDBRowID, /*add_nullable*/ true); + auto dm_file = prepareDMFileRemote(/* file_id= */ 14); + + const auto attempt_before = GET_METRIC(tiflash_storage_write_filecache_staging, type_attempt).Value(); + const auto local_read_files + = tryDownloadMetaV2MergedFilesForLocalRead(dm_file, *cols, /*enable=*/true, log, "DMFileLocalStagingTest"); + ASSERT_FALSE(local_read_files.empty()); + ASSERT_EQ(attempt_before + 1, GET_METRIC(tiflash_storage_write_filecache_staging, type_attempt).Value()); + ASSERT_GE(GET_METRIC(tiflash_storage_write_filecache_staging, type_download_ok).Value(), local_read_files.size()); + ASSERT_GT(GET_METRIC(tiflash_storage_write_filecache_staging_bytes, type_staged).Value(), 0); + + auto * file_cache = FileCache::instance(); + ASSERT_NE(file_cache, nullptr); + ASSERT_FALSE(file_cache->getAll().empty()); +} +CATCH + +TEST_P(S3DMFile, LocalStagingBuildWithWriteFileCache) +try +{ + initWriteFileCache( + db_context->getGlobalContext().getPathCapacity(), + db_context->getGlobalContext().getIORateLimiter(), + fmt::format("{}/write_filecache_build", getTemporaryPath())); + + auto cols = DMTestEnv::getDefaultColumns(DMTestEnv::PkType::HiddenTiDBRowID, /*add_nullable*/ true); + auto dm_file = prepareDMFileRemote(/* file_id= */ 15); + ASSERT_TRUE(FileCache::instance()->getAll().empty()); + + DMFileBlockInputStreamBuilder builder(*db_context); + auto stream = builder.enableWriteFileCacheLocalRead(true) + .build(dm_file, *cols, {RowKeyRange::newAll(false, 1)}, std::make_shared()); + ASSERT_FALSE(FileCache::instance()->getAll().empty()); - SCOPE_EXIT({ FileCache::shutdown(); }); + auto block = stream->read(); + ASSERT_GT(block.rows(), 0); } CATCH diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index 46041df77ac..cfad67bded9 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -76,6 +76,7 @@ #include #include +#include #include #if ENABLE_CLARA @@ -147,6 +148,7 @@ extern const char pause_when_building_fap_segments[]; } // namespace FailPoints namespace DM { + String SegmentSnapshot::detailInfo() const { return fmt::format( @@ -232,9 +234,16 @@ DMFilePtr writeIntoNewDMFile( return dmfile; } +struct CreateNewStableResult +{ + StableValueSpacePtr stable; + /// Seconds spent uploading DMFile to remote store. Zero when writing locally. + double remote_upload_seconds = 0; +}; + // Create a new stable, the DMFile will write as External Page to disk, but the meta will not be written to disk. // The caller should write the meta to disk if needed. -StableValueSpacePtr createNewStable( // +CreateNewStableResult createNewStable( // DMContext & dm_context, const ColumnDefinesPtr & schema_snap, const BlockInputStreamPtr & input_stream, @@ -252,6 +261,7 @@ StableValueSpacePtr createNewStable( // auto stable = std::make_shared(stable_id); stable->setFiles({dtfile}, RowKeyRange::newAll(dm_context.is_common_handle, dm_context.rowkey_column_size)); + double remote_upload_seconds = 0; if (auto data_store = dm_context.global_context.getSharedContextDisagg()->remote_data_store; !data_store) { wbs.data.putExternal(dtfile_id, 0); @@ -266,7 +276,11 @@ StableValueSpacePtr createNewStable( // .table_id = dm_context.physical_table_id, .file_id = dtfile_id, }; + Stopwatch upload_watch; data_store->putDMFile(dtfile, oid, /*switch_to_remote*/ true); + remote_upload_seconds = upload_watch.elapsedSeconds(); + GET_METRIC(tiflash_storage_subtask_count, type_remote_upload).Increment(); + GET_METRIC(tiflash_storage_subtask_duration_seconds, type_remote_upload).Observe(remote_upload_seconds); PS::V3::CheckpointLocation loc{ .data_file_id = std::make_shared(S3::S3Filename::fromDMFileOID(oid).toFullKey()), .offset_in_file = 0, @@ -275,7 +289,7 @@ StableValueSpacePtr createNewStable( // delegator.addRemoteDTFileWithGCDisabled(dtfile_id, dtfile->getBytesOnDisk()); wbs.data.putRemoteExternal(dtfile_id, loc); } - return stable; + return {.stable = stable, .remote_upload_seconds = remote_upload_seconds}; } catch (...) { @@ -332,7 +346,8 @@ SegmentPtr Segment::newSegment( // auto delta = std::make_shared(delta_id); auto stable - = createNewStable(context, schema, std::make_shared(*schema), stable_id, wbs); + = createNewStable(context, schema, std::make_shared(*schema), stable_id, wbs) + .stable; auto segment = std::make_shared(parent_log, INITIAL_EPOCH, range, segment_id, next_segment_id, delta, stable); @@ -1202,18 +1217,33 @@ BlockInputStreamPtr Segment::getInputStreamModeNormal( expected_block_size); } -BlockInputStreamPtr Segment::getInputStreamForDataExport( +namespace +{ +/** + * Return a sorted stream which is suitable for exporting data. Unlike `getInputStream`, deletes will be preserved. + * But outdated versions (exceeds GC safe point) will still be removed. + * When `dt_enable_write_filecache` is true, the underlying `DMFileBlockInputStreamBuilder` is configured to try + * local FileCache staging for stable reads. The actual staging is performed in the builder/read path when enabled. + * + * - `read_info` must be prepared by the caller via `getReadInfo`. It controls delta place/index + * - `data_range` controls which rows are read from stable+delta + * + * The returned stream also reorganizes block boundaries via `PKSquashingBlockInputStream`, so rows with the same + * primary key are placed in the same block. + */ +BlockInputStreamPtr getInputStreamForDataExport( const DMContext & dm_context, - const ColumnDefines & columns_to_read, + const Segment::ReadInfo & read_info, const SegmentSnapshotPtr & segment_snap, - const RowKeyRange & data_range, - size_t expected_block_size, - bool reorganize_block) const + const RowKeyRange & data_range) { RowKeyRanges data_ranges{data_range}; - auto read_info = getReadInfo(dm_context, columns_to_read, segment_snap, data_ranges, ReadTag::Internal); + const bool enable_write_filecache_local_read = dm_context.global_context.getSettingsRef().dt_enable_write_filecache; + auto additional_builder_opt = [enable_write_filecache_local_read](DMFileBlockInputStreamBuilder & builder) { + builder.enableWriteFileCacheLocalRead(enable_write_filecache_local_read); + }; - BlockInputStreamPtr data_stream = getPlacedStream( + BlockInputStreamPtr data_stream = Segment::getPlacedStream( dm_context, *read_info.read_columns, data_ranges, @@ -1221,26 +1251,27 @@ BlockInputStreamPtr Segment::getInputStreamForDataExport( read_info.getDeltaReader(ReadTag::Internal), read_info.index_begin, read_info.index_end, - expected_block_size, - ReadTag::Internal); - + dm_context.stable_pack_rows, + ReadTag::Internal, + {}, + std::numeric_limits::max(), + false, + additional_builder_opt); data_stream = std::make_shared>(data_stream, data_ranges, 0); - if (reorganize_block) - { - data_stream = std::make_shared>( - data_stream, - MutSup::extra_handle_id, - is_common_handle); - } + data_stream = std::make_shared>( + data_stream, + MutSup::extra_handle_id, + dm_context.is_common_handle); data_stream = std::make_shared>( data_stream, *read_info.read_columns, dm_context.min_version, - is_common_handle); + dm_context.is_common_handle); return data_stream; } +} // namespace /// We call getInputStreamModeRaw in 'selraw xxxx' statement, which is always in test for debug. /// In this case, we will read all the data without mvcc filtering and sorted merge. @@ -1320,7 +1351,8 @@ SegmentPtr Segment::mergeDelta(DMContext & dm_context, const ColumnDefinesPtr & if (!segment_snap) return {}; - auto new_stable = prepareMergeDelta(dm_context, schema_snap, segment_snap, wbs); + auto prepare_result = prepareMergeDelta(dm_context, schema_snap, segment_snap, wbs); + auto new_stable = prepare_result.stable; wbs.writeLogAndData(); new_stable->enableDMFilesGC(dm_context); @@ -1334,12 +1366,15 @@ SegmentPtr Segment::mergeDelta(DMContext & dm_context, const ColumnDefinesPtr & return new_segment; } -StableValueSpacePtr Segment::prepareMergeDelta( +Segment::PrepareMergeDeltaResult Segment::prepareMergeDelta( DMContext & dm_context, const ColumnDefinesPtr & schema_snap, const SegmentSnapshotPtr & segment_snap, WriteBatches & wbs) const { + GET_METRIC(tiflash_storage_subtask_count, type_prepare_merge_delta).Increment(); + Stopwatch watch; + LOG_DEBUG( log, "MergeDelta - Begin prepare, delta_column_files={} delta_rows={} delta_bytes={}", @@ -1349,19 +1384,27 @@ StableValueSpacePtr Segment::prepareMergeDelta( EventRecorder recorder(ProfileEvents::DMDeltaMerge, ProfileEvents::DMDeltaMergeNS); - auto data_stream = getInputStreamForDataExport( - dm_context, - *schema_snap, - segment_snap, - rowkey_range, - dm_context.stable_pack_rows, - /*reorganize_block*/ true); + const auto read_info = getReadInfo(dm_context, *schema_snap, segment_snap, {rowkey_range}, ReadTag::Internal); + auto data_stream = getInputStreamForDataExport(dm_context, read_info, segment_snap, rowkey_range); + + const auto create_result + = createNewStable(dm_context, schema_snap, data_stream, segment_snap->stable->getId(), wbs); - auto new_stable = createNewStable(dm_context, schema_snap, data_stream, segment_snap->stable->getId(), wbs); + const auto prepare_seconds = watch.elapsedSeconds(); + GET_METRIC(tiflash_storage_subtask_duration_seconds, type_prepare_merge_delta).Observe(prepare_seconds); - LOG_DEBUG(log, "MergeDelta - Finish prepare, segment={}", info()); + LOG_DEBUG( + log, + "MergeDelta - Finish prepare, segment={} prepare_seconds={:.3f} remote_upload_seconds={:.3f}", + info(), + prepare_seconds, + create_result.remote_upload_seconds); - return new_stable; + return { + .stable = create_result.stable, + .prepare_seconds = prepare_seconds, + .remote_upload_seconds = create_result.remote_upload_seconds, + }; } SegmentPtr Segment::applyMergeDelta( @@ -1939,6 +1982,8 @@ Segment::prepareSplitLogical( // std::optional opt_split_point, WriteBatches & wbs) const { + Stopwatch watch; + LOG_DEBUG( log, "Split - SplitLogical - Begin prepare, opt_split_point={}", @@ -2032,18 +2077,23 @@ Segment::prepareSplitLogical( // my_stable->setFiles(my_stable_files, my_range, &dm_context); other_stable->setFiles(other_stable_files, other_range, &dm_context); + const auto prepare_seconds = watch.elapsedSeconds(); + LOG_DEBUG( log, - "Split - SplitLogical - Finish prepare, segment={} split_point={}", + "Split - SplitLogical - Finish prepare, segment={} split_point={} prepare_seconds={:.3f}", info(), - opt_split_point->toDebugString()); + opt_split_point->toDebugString(), + prepare_seconds); return { SplitInfo{ .is_logical = true, .split_point = opt_split_point.value(), .my_stable = my_stable, - .other_stable = other_stable}, + .other_stable = other_stable, + .prepare_seconds = prepare_seconds, + .remote_upload_seconds = 0}, PrepareSplitLogicalStatus::Success}; } @@ -2054,6 +2104,9 @@ std::optional Segment::prepareSplitPhysical( // std::optional opt_split_point, WriteBatches & wbs) const { + GET_METRIC(tiflash_storage_subtask_count, type_prepare_split_physical).Increment(); + Stopwatch watch; + LOG_DEBUG( log, "Split - SplitPhysical - Begin prepare, opt_split_point={}", @@ -2090,65 +2143,24 @@ std::optional Segment::prepareSplitPhysical( // StableValueSpacePtr my_new_stable; StableValueSpacePtr other_stable; + double remote_upload_seconds = 0; { - auto my_delta_reader = read_info.getDeltaReader(schema_snap, ReadTag::Internal); - - RowKeyRanges my_ranges{my_range}; - BlockInputStreamPtr my_data = getPlacedStream( - dm_context, - *read_info.read_columns, - my_ranges, - segment_snap->stable, - my_delta_reader, - read_info.index_begin, - read_info.index_end, - dm_context.stable_pack_rows, - ReadTag::Internal); - - - my_data = std::make_shared>(my_data, my_ranges, 0); - my_data - = std::make_shared>(my_data, MutSup::extra_handle_id, is_common_handle); - my_data = std::make_shared>( - my_data, - *read_info.read_columns, - dm_context.min_version, - is_common_handle); + auto my_data = getInputStreamForDataExport(dm_context, read_info, segment_snap, my_range); auto my_stable_id = segment_snap->stable->getId(); - my_new_stable = createNewStable(dm_context, schema_snap, my_data, my_stable_id, wbs); + const auto create_result = createNewStable(dm_context, schema_snap, my_data, my_stable_id, wbs); + my_new_stable = create_result.stable; + remote_upload_seconds += create_result.remote_upload_seconds; } LOG_DEBUG(log, "Split - SplitPhysical - Finish prepare my_new_stable"); { - // Write new segment's data - auto other_delta_reader = read_info.getDeltaReader(schema_snap, ReadTag::Internal); - - RowKeyRanges other_ranges{other_range}; - BlockInputStreamPtr other_data = getPlacedStream( - dm_context, - *read_info.read_columns, - other_ranges, - segment_snap->stable, - other_delta_reader, - read_info.index_begin, - read_info.index_end, - dm_context.stable_pack_rows, - ReadTag::Internal); - - other_data = std::make_shared>(other_data, other_ranges, 0); - other_data = std::make_shared>( - other_data, - MutSup::extra_handle_id, - is_common_handle); - other_data = std::make_shared>( - other_data, - *read_info.read_columns, - dm_context.min_version, - is_common_handle); + auto other_data = getInputStreamForDataExport(dm_context, read_info, segment_snap, other_range); auto other_stable_id = dm_context.storage_pool->newMetaPageId(); - other_stable = createNewStable(dm_context, schema_snap, other_data, other_stable_id, wbs); + const auto create_result = createNewStable(dm_context, schema_snap, other_data, other_stable_id, wbs); + other_stable = create_result.stable; + remote_upload_seconds += create_result.remote_upload_seconds; } LOG_DEBUG(log, "Split - SplitPhysical - Finish prepare other_stable"); @@ -2161,17 +2173,25 @@ std::optional Segment::prepareSplitPhysical( // wbs.removed_data.delPage(file->pageId()); } + const auto prepare_seconds = watch.elapsedSeconds(); + GET_METRIC(tiflash_storage_subtask_duration_seconds, type_prepare_split_physical).Observe(prepare_seconds); + LOG_DEBUG( log, - "Split - SplitPhysical - Finish prepare, segment={} split_point={}", + "Split - SplitPhysical - Finish prepare, segment={} split_point={} prepare_seconds={:.3f} " + "remote_upload_seconds={:.3f}", info(), - split_point.toDebugString()); + split_point.toDebugString(), + prepare_seconds, + remote_upload_seconds); return SplitInfo{ .is_logical = false, .split_point = split_point, .my_stable = my_new_stable, .other_stable = other_stable, + .prepare_seconds = prepare_seconds, + .remote_upload_seconds = remote_upload_seconds, }; } @@ -2271,7 +2291,8 @@ SegmentPtr Segment::merge( ordered_snapshots.emplace_back(snap); } - auto merged_stable = prepareMerge(dm_context, schema_snap, ordered_segments, ordered_snapshots, wbs); + const auto prepare_result = prepareMerge(dm_context, schema_snap, ordered_segments, ordered_snapshots, wbs); + auto merged_stable = prepare_result.stable; wbs.writeLogAndData(); merged_stable->enableDMFilesGC(dm_context); @@ -2289,13 +2310,16 @@ SegmentPtr Segment::merge( return merged; } -StableValueSpacePtr Segment::prepareMerge( +Segment::PrepareMergeResult Segment::prepareMerge( DMContext & dm_context, // const ColumnDefinesPtr & schema_snap, const std::vector & ordered_segments, const std::vector & ordered_snapshots, WriteBatches & wbs) { + GET_METRIC(tiflash_storage_subtask_count, type_prepare_merge).Increment(); + Stopwatch watch; + RUNTIME_CHECK(ordered_segments.size() >= 2, ordered_snapshots.size()); RUNTIME_CHECK( ordered_segments.size() == ordered_snapshots.size(), @@ -2319,43 +2343,22 @@ StableValueSpacePtr Segment::prepareMerge( ordered_segments[i]->info()); } - auto get_stream = [&](const SegmentPtr & segment, const SegmentSnapshotPtr & segment_snap) { - auto read_info = segment->getReadInfo( + std::vector input_streams; + input_streams.reserve(ordered_segments.size()); + for (size_t i = 0; i < ordered_segments.size(); ++i) + { + const auto & segment = ordered_segments[i]; + const auto & segment_snap = ordered_snapshots[i]; + const auto read_info = segment->getReadInfo( dm_context, *schema_snap, segment_snap, + // place all rows in the segment {RowKeyRange::newAll(segment->is_common_handle, segment->rowkey_column_size)}, ReadTag::Internal); - RowKeyRanges rowkey_ranges{segment->rowkey_range}; - BlockInputStreamPtr stream = getPlacedStream( - dm_context, - *read_info.read_columns, - rowkey_ranges, - segment_snap->stable, - read_info.getDeltaReader(ReadTag::Internal), - read_info.index_begin, - read_info.index_end, - dm_context.stable_pack_rows, - ReadTag::Internal); - - stream = std::make_shared>(stream, rowkey_ranges, 0); - stream = std::make_shared>( - stream, - MutSup::extra_handle_id, - dm_context.is_common_handle); - stream = std::make_shared>( - stream, - *read_info.read_columns, - dm_context.min_version, - dm_context.is_common_handle); - - return stream; - }; - - std::vector input_streams; - input_streams.reserve(ordered_segments.size()); - for (size_t i = 0; i < ordered_segments.size(); i++) - input_streams.emplace_back(get_stream(ordered_segments[i], ordered_snapshots[i])); + input_streams.emplace_back( + getInputStreamForDataExport(dm_context, read_info, segment_snap, segment->rowkey_range)); + } BlockInputStreamPtr merged_stream = std::make_shared(input_streams, /*req_id=*/""); // for the purpose to calculate StableProperty of the new segment @@ -2366,11 +2369,23 @@ StableValueSpacePtr Segment::prepareMerge( dm_context.is_common_handle); auto merged_stable_id = ordered_segments[0]->stable->getId(); - auto merged_stable = createNewStable(dm_context, schema_snap, merged_stream, merged_stable_id, wbs); + const auto create_result = createNewStable(dm_context, schema_snap, merged_stream, merged_stable_id, wbs); - LOG_DEBUG(log, "Merge - Finish prepare, segments_to_merge={}", info(ordered_segments)); + const auto prepare_seconds = watch.elapsedSeconds(); + GET_METRIC(tiflash_storage_subtask_duration_seconds, type_prepare_merge).Observe(prepare_seconds); - return merged_stable; + LOG_DEBUG( + log, + "Merge - Finish prepare, segments_to_merge={} prepare_seconds={:.3f} remote_upload_seconds={:.3f}", + info(ordered_segments), + prepare_seconds, + create_result.remote_upload_seconds); + + return { + .stable = create_result.stable, + .prepare_seconds = prepare_seconds, + .remote_upload_seconds = create_result.remote_upload_seconds, + }; } SegmentPtr Segment::applyMerge( @@ -2555,7 +2570,8 @@ String Segment::info() const "", + "stable_cols={} " + "dmf_rows={} dmf_bytes={} dmf_disk_bytes={} dmf_packs={}>", segment_id, epoch, rowkey_range.toDebugString(), @@ -2569,9 +2585,11 @@ String Segment::info() const stable->getDMFilesString(), stable->getRows(), stable->getBytes(), + stable->getDMFilesNumColumns(), stable->getDMFilesRows(), stable->getDMFilesBytes(), + stable->getDMFilesBytesOnDisk(), stable->getDMFilesPacks()); } @@ -2717,7 +2735,8 @@ SkippableBlockInputStreamPtr Segment::getPlacedStream( ReadTag read_tag, const DMFilePackFilterResults & pack_filter_results, UInt64 start_ts, - bool need_row_id) + bool need_row_id, + std::function additional_builder_opt) { if (unlikely(rowkey_ranges.empty())) throw Exception("rowkey ranges shouldn't be empty", ErrorCodes::LOGICAL_ERROR); @@ -2732,7 +2751,9 @@ SkippableBlockInputStreamPtr Segment::getPlacedStream( read_tag, pack_filter_results, /* is_fast_scan */ false, - /* enable_del_clean_read */ false); + /* enable_del_clean_read */ false, + /* read_packs */ {}, + additional_builder_opt); RowKeyRange rowkey_range = rowkey_ranges.size() == 1 ? rowkey_ranges[0] : mergeRanges(rowkey_ranges, rowkey_ranges[0].is_common_handle, rowkey_ranges[0].rowkey_column_size); diff --git a/dbms/src/Storages/DeltaMerge/Segment.h b/dbms/src/Storages/DeltaMerge/Segment.h index d42e8f1b78c..9b03a6efd00 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.h +++ b/dbms/src/Storages/DeltaMerge/Segment.h @@ -33,6 +33,8 @@ #include #include +#include + #if ENABLE_CLARA #include #endif @@ -128,6 +130,11 @@ class Segment StableValueSpacePtr my_stable; StableValueSpacePtr other_stable; + + /// Wall-clock seconds spent in prepareSplit (logical or physical). + double prepare_seconds = 0; + /// Seconds spent uploading DMFiles to remote store in createNewStable. Zero when writing locally or logical split. + double remote_upload_seconds = 0; }; DISALLOW_COPY_AND_MOVE(Segment); @@ -252,19 +259,6 @@ class Segment UInt64 start_ts = std::numeric_limits::max(), size_t expected_block_size = DEFAULT_BLOCK_SIZE); - /** - * Return a sorted stream which is suitable for exporting data. Unlike `getInputStream`, deletes will be preserved. - * But outdated versions (exceeds GC safe point) will still be removed. - * @param reorganize_block put those rows with the same pk rows into the same block or not. - */ - BlockInputStreamPtr getInputStreamForDataExport( - const DMContext & dm_context, - const ColumnDefines & columns_to_read, - const SegmentSnapshotPtr & segment_snap, - const RowKeyRange & data_range, - size_t expected_block_size = DEFAULT_BLOCK_SIZE, - bool reorganize_block = true) const; - BlockInputStreamPtr getInputStreamModeRaw( const DMContext & dm_context, const ColumnDefines & columns_to_read, @@ -376,7 +370,16 @@ class Segment const ColumnDefinesPtr & schema_snap, const std::vector & ordered_segments); - static StableValueSpacePtr prepareMerge( + struct PrepareMergeResult + { + StableValueSpacePtr stable; + /// Wall-clock seconds spent in prepareMerge. + double prepare_seconds = 0; + /// Seconds spent uploading DMFile to remote store in createNewStable. Zero when writing locally. + double remote_upload_seconds = 0; + }; + + static PrepareMergeResult prepareMerge( DMContext & dm_context, const ColumnDefinesPtr & schema_snap, const std::vector & ordered_segments, @@ -400,7 +403,16 @@ class Segment */ [[nodiscard]] SegmentPtr mergeDelta(DMContext & dm_context, const ColumnDefinesPtr & schema_snap) const; - StableValueSpacePtr prepareMergeDelta( + struct PrepareMergeDeltaResult + { + StableValueSpacePtr stable; + /// Wall-clock seconds spent in prepareMergeDelta. + double prepare_seconds = 0; + /// Seconds spent uploading DMFile to remote store in createNewStable. Zero when writing locally. + double remote_upload_seconds = 0; + }; + + PrepareMergeDeltaResult prepareMergeDelta( DMContext & dm_context, const ColumnDefinesPtr & schema_snap, const SegmentSnapshotPtr & segment_snap, @@ -668,21 +680,6 @@ class Segment const DMFilePackFilterResults & pack_filter_results, UInt64 start_ts); -#ifndef DBMS_PUBLIC_GTEST -private: -#else -public: -#endif - ReadInfo getReadInfo( - const DMContext & dm_context, - const ColumnDefines & read_columns, - const SegmentSnapshotPtr & segment_snap, - const RowKeyRanges & read_ranges, - ReadTag read_tag, - UInt64 start_ts = std::numeric_limits::max()) const; - - static ColumnDefinesPtr arrangeReadColumns(const ColumnDefine & handle, const ColumnDefines & columns_to_read); - /// Create a stream which merged delta and stable streams together. template static SkippableBlockInputStreamPtr getPlacedStream( @@ -697,7 +694,23 @@ class Segment ReadTag read_tag, const DMFilePackFilterResults & pack_filter_results = {}, UInt64 start_ts = std::numeric_limits::max(), - bool need_row_id = false); + bool need_row_id = false, + std::function additional_builder_opt = nullptr); + +#ifndef DBMS_PUBLIC_GTEST +private: +#else +public: +#endif + ReadInfo getReadInfo( + const DMContext & dm_context, + const ColumnDefines & read_columns, + const SegmentSnapshotPtr & segment_snap, + const RowKeyRanges & read_ranges, + ReadTag read_tag, + UInt64 start_ts = std::numeric_limits::max()) const; + + static ColumnDefinesPtr arrangeReadColumns(const ColumnDefine & handle, const ColumnDefines & columns_to_read); /// Make sure that all delta packs have been placed. /// Note that the index returned could be partial index, and cannot be updated to shared index. diff --git a/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp b/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp index b0b1a3367b4..9a492a7d608 100644 --- a/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp +++ b/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp @@ -267,6 +267,14 @@ size_t StableValueSpace::getDMFilesBytes() const return bytes; } +size_t StableValueSpace::getDMFilesNumColumns() const +{ + size_t num_columns = 0; + for (const auto & file : files) + num_columns = std::max(num_columns, file->getNumColumns()); + return num_columns; +} + String StableValueSpace::getDMFilesString() { return DMFile::info(files); diff --git a/dbms/src/Storages/DeltaMerge/StableValueSpace.h b/dbms/src/Storages/DeltaMerge/StableValueSpace.h index 2b1b29e6bf1..350974f1c43 100644 --- a/dbms/src/Storages/DeltaMerge/StableValueSpace.h +++ b/dbms/src/Storages/DeltaMerge/StableValueSpace.h @@ -86,6 +86,11 @@ class StableValueSpace : public std::enable_shared_from_this String getDMFilesString(); + /** + * Return the number of columns of the underlying DTFiles. + */ + size_t getDMFilesNumColumns() const; + /** * Return the total on-disk size of the underlying DTFiles. * DTFiles are not fully included in the segment range will be also counted in. diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp index ae121b21a31..a91da0f5b9f 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp @@ -1617,12 +1617,9 @@ try write_100_rows(other_segment); segment->flushCache(dmContext()); - auto merged_stable = Segment::prepareMerge( - dmContext(), - tableColumns(), - {segment, other_segment}, - {left_snap, right_snap}, - wbs); + auto merged_stable + = Segment::prepareMerge(dmContext(), tableColumns(), {segment, other_segment}, {left_snap, right_snap}, wbs) + .stable; wbs.writeLogAndData(); merged_stable->enableDMFilesGC(dmContext()); diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment_s3.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment_s3.cpp index 0edd58ba0fb..181be957afb 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment_s3.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment_s3.cpp @@ -12,11 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include +#include #include +#include #include #include -#include #include +#include +#include #include #include #include @@ -28,6 +32,7 @@ #include #include #include +#include #include #include #include @@ -39,6 +44,12 @@ #include #include +namespace CurrentMetrics +{ +extern const Metric DT_SnapshotOfSegmentSplit; +extern const Metric DT_SnapshotOfSegmentMerge; +} // namespace CurrentMetrics + namespace DB { namespace FailPoints @@ -49,6 +60,22 @@ namespace DM { namespace tests { +namespace +{ +// Keep in sync with gtests_dbms_main.cpp. +constexpr size_t s3_file_cache_pool_max_threads = 20; +constexpr size_t s3_file_cache_pool_max_free_threads = 10; +constexpr size_t s3_file_cache_pool_queue_size = 1000; + +void reinitS3FileCachePool() +{ + S3FileCachePool::initialize( + s3_file_cache_pool_max_threads, + s3_file_cache_pool_max_free_threads, + s3_file_cache_pool_queue_size); +} +} // namespace + class SegmentTestS3 : public DB::base::TiFlashStorageTestBasic { public: @@ -105,6 +132,8 @@ class SegmentTestS3 : public DB::base::TiFlashStorageTestBasic void TearDown() override { + shutdownWriteFileCache(); + FailPointHelper::disableFailPoint(FailPoints::force_use_dmfile_format_v3); auto & global_context = TiFlashTestEnv::getGlobalContext(); if (!already_initialize_data_store) @@ -170,6 +199,95 @@ class SegmentTestS3 : public DB::base::TiFlashStorageTestBasic DMContext & dmContext() { return *dm_context; } + void initWriteFileCache() + { + StorageRemoteCacheConfig file_cache_config{ + .dir = fmt::format("{}/wn_filecache", getTemporaryPath()), + .capacity = 1 * 1000 * 1000 * 1000, + }; + UInt16 vcores = 8; + FileCache::initialize( + db_context->getGlobalContext().getPathCapacity(), + file_cache_config, + vcores, + db_context->getGlobalContext().getIORateLimiter()); + } + + static void shutdownWriteFileCache() + { + if (FileCache::instance() == nullptr) + return; + FileCache::shutdown(); + // FileCache::shutdown() tears down S3FileCachePool; restore it for other tests. + reinitS3FileCachePool(); + } + + void setDtEnableWriteFileCache(bool enabled) + { + // DMContext stores global context, not session context. + db_context->getGlobalContext().getSettingsRef().dt_enable_write_filecache = enabled; + } + + static double stagingAttempt() { return GET_METRIC(tiflash_storage_write_filecache_staging, type_attempt).Value(); } + + static bool fileCacheHasMergedFile() + { + auto * file_cache = FileCache::instance(); + if (file_cache == nullptr) + return false; + for (const auto & file_seg : file_cache->getAll()) + { + if (file_seg->getLocalFileName().contains(".merged")) + return true; + } + return false; + } + + void writeRows(size_t start, size_t end) + { + Block block = DMTestEnv::prepareSimpleWriteBlock(start, end, false); + segment->write(dmContext(), std::move(block), /* flush */ true); + } + + void assertRemoteStableHasMergedSubFiles(const SegmentPtr & seg) + { + const auto files = seg->stable->getDMFiles(); + ASSERT_FALSE(files.empty()); + const auto & dmfile = files[0]; + ASSERT_TRUE(dmfile->useMetaV2()); + ASSERT_TRUE(dmfile->path().starts_with("s3://")); + const auto * dmfile_meta = typeid_cast(dmfile->meta.get()); + ASSERT_NE(dmfile_meta, nullptr); + ASSERT_FALSE(dmfile_meta->merged_sub_file_infos.empty()); + const auto objects = collectMetaV2MergedFilesForLocalRead( + dmfile, + *table_columns, + Logger::get("SegmentTestS3"), + "SegmentTestS3"); + ASSERT_FALSE(objects.empty()); + } + + /// Create a remote MetaV2 stable on S3, then leave delta data for a follow-up mergeDelta. + void writeRemoteStableWithDelta(size_t stable_end, size_t delta_end) + { + writeRows(0, stable_end); + segment = segment->mergeDelta(dmContext(), tableColumns()); + if (delta_end > stable_end) + writeRows(stable_end, delta_end); + } + + SegmentPtr mergeDeltaToRemoteStable() + { + writeRemoteStableWithDelta(100, 100); + return segment; + } + + size_t readRows(const SegmentPtr & seg) + { + auto in = seg->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); + return getInputStreamNRows(in); + } + protected: /// all these var lives as ref in dm_context GlobalPageIdAllocatorPtr page_id_allocator; @@ -236,6 +354,115 @@ try } CATCH +TEST_F(SegmentTestS3, WriteFileCacheDisabledWithFileCache) +try +{ + initWriteFileCache(); + + setDtEnableWriteFileCache(false); + ASSERT_TRUE(FileCache::instance()->getAll().empty()); + + const auto attempt_before = stagingAttempt(); + segment = mergeDeltaToRemoteStable(); + ASSERT_EQ(attempt_before, stagingAttempt()); + ASSERT_EQ(100, readRows(segment)); +} +CATCH + +TEST_F(SegmentTestS3, WriteFileCacheEnabledWithoutFileCache) +try +{ + ASSERT_EQ(FileCache::instance(), nullptr); + setDtEnableWriteFileCache(true); + + segment = mergeDeltaToRemoteStable(); + ASSERT_EQ(0, stagingAttempt()); + ASSERT_EQ(100, readRows(segment)); +} +CATCH + +TEST_F(SegmentTestS3, WriteFileCacheMergeDeltaStaging) +try +{ + initWriteFileCache(); + + setDtEnableWriteFileCache(true); + ASSERT_TRUE(FileCache::instance()->getAll().empty()); + + // Staging reads the existing remote stable; the first mergeDelta only creates stable. + writeRemoteStableWithDelta(100, 200); + assertRemoteStableHasMergedSubFiles(segment); + + const auto attempt_before = stagingAttempt(); + segment = segment->mergeDelta(dmContext(), tableColumns()); + + ASSERT_GT(stagingAttempt(), attempt_before); + ASSERT_TRUE(fileCacheHasMergedFile()); + ASSERT_EQ(200, readRows(segment)); +} +CATCH + +TEST_F(SegmentTestS3, WriteFileCachePrepareMergeStaging) +try +{ + initWriteFileCache(); + + setDtEnableWriteFileCache(true); + segment = mergeDeltaToRemoteStable(); + assertRemoteStableHasMergedSubFiles(segment); + + auto [left, right] = segment->split( + dmContext(), + tableColumns(), + /*opt_split_at=*/std::nullopt, + Segment::SplitMode::Logical); + ASSERT_NE(left, nullptr); + ASSERT_NE(right, nullptr); + + { + Block block = DMTestEnv::prepareSimpleWriteBlock(100, 200, false); + right->write(dmContext(), std::move(block), /* flush */ true); + right->flushCache(dmContext()); + } + + WriteBatches wbs(*storage_pool); + auto left_snap = left->createSnapshot(dmContext(), true, CurrentMetrics::DT_SnapshotOfSegmentMerge); + auto right_snap = right->createSnapshot(dmContext(), true, CurrentMetrics::DT_SnapshotOfSegmentMerge); + ASSERT_NE(left_snap, nullptr); + ASSERT_NE(right_snap, nullptr); + + const auto attempt_before = stagingAttempt(); + auto merged_stable + = Segment::prepareMerge(dmContext(), tableColumns(), {left, right}, {left_snap, right_snap}, wbs).stable; + ASSERT_NE(merged_stable, nullptr); + ASSERT_GT(stagingAttempt(), attempt_before); + ASSERT_TRUE(fileCacheHasMergedFile()); +} +CATCH + +TEST_F(SegmentTestS3, WriteFileCachePhysicalSplitStaging) +try +{ + initWriteFileCache(); + + setDtEnableWriteFileCache(true); + segment = mergeDeltaToRemoteStable(); + assertRemoteStableHasMergedSubFiles(segment); + + WriteBatches wbs(*storage_pool); + auto segment_snap = segment->createSnapshot(dmContext(), true, CurrentMetrics::DT_SnapshotOfSegmentSplit); + ASSERT_NE(segment_snap, nullptr); + + const auto split_at = RowKeyValue::fromIntHandle(50); + const auto attempt_before = stagingAttempt(); + auto split_info + = segment->prepareSplit(dmContext(), tableColumns(), segment_snap, split_at, Segment::SplitMode::Physical, wbs); + ASSERT_TRUE(split_info.has_value()); + ASSERT_GT(stagingAttempt(), attempt_before); + ASSERT_TRUE(fileCacheHasMergedFile()); +} +CATCH + } // namespace tests } // namespace DM } // namespace DB diff --git a/dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp b/dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp index aaa3963f488..ca9ebe33e6c 100644 --- a/dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp +++ b/dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +// FIXME: Disabled because the flaky test result errors in CI. +// And we don't plan to support enable FAP in short time. +#if 0 #include #include #include @@ -1206,3 +1209,4 @@ CATCH } // namespace tests } // namespace DB +#endif diff --git a/dbms/src/Storages/S3/FileCache.cpp b/dbms/src/Storages/S3/FileCache.cpp index a29740b9711..477c501a1e4 100644 --- a/dbms/src/Storages/S3/FileCache.cpp +++ b/dbms/src/Storages/S3/FileCache.cpp @@ -1302,7 +1302,11 @@ void downloadToLocal( wbuf.sync(); } -void FileCache::downloadImpl(const String & s3_key, FileSegmentPtr & file_seg, const WriteLimiterPtr & write_limiter) +void FileCache::downloadImpl( + const String & s3_key, + FileSegmentPtr & file_seg, + const WriteLimiterPtr & write_limiter, + DownloadType download_type) { Stopwatch sw; auto client = S3::ClientFactory::instance().sharedTiFlashClient(); @@ -1368,7 +1372,8 @@ void FileCache::downloadImpl(const String & s3_key, FileSegmentPtr & file_seg, c file_seg->setComplete(content_length); LOG_INFO( log, - "Download success, s3_key={} local={} size={} cost={}ms", + "Download success, type={} s3_key={} local={} size={} cost={}ms", + magic_enum::enum_name(download_type), s3_key, local_fname, content_length, @@ -1393,7 +1398,7 @@ void FileCache::bgDownloadExecutor( SYNC_FOR("before_FileCache::bgDownloadExecutor_fail_point"); FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::file_cache_bg_download_fail); GET_METRIC(tiflash_storage_remote_cache, type_dtfile_download).Increment(); - downloadImpl(s3_key, file_seg, write_limiter); + downloadImpl(s3_key, file_seg, write_limiter, DownloadType::Background); } catch (...) { @@ -1481,7 +1486,7 @@ void FileCache::fgDownload(const String & s3_key, FileSegmentPtr & file_seg) FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::file_cache_fg_download_fail); GET_METRIC(tiflash_storage_remote_cache, type_dtfile_download).Increment(); // not limit write speed for foreground download now - downloadImpl(s3_key, file_seg, nullptr); + downloadImpl(s3_key, file_seg, nullptr, DownloadType::Foreground); } catch (...) { diff --git a/dbms/src/Storages/S3/FileCache.h b/dbms/src/Storages/S3/FileCache.h index e64ba8f1ff9..af93a3d1cf2 100644 --- a/dbms/src/Storages/S3/FileCache.h +++ b/dbms/src/Storages/S3/FileCache.h @@ -374,7 +374,18 @@ class FileCache Int64 running_limit); void finishBgDownload(const String & s3_key, Int64 running_limit); void cleanupFailedDownload(const String & s3_key, FileSegmentPtr & file_seg); - void downloadImpl(const String & s3_key, FileSegmentPtr & file_seg, const WriteLimiterPtr & write_limiter); + + enum class DownloadType + { + Foreground, + Background, + }; + + void downloadImpl( + const String & s3_key, + FileSegmentPtr & file_seg, + const WriteLimiterPtr & write_limiter, + DownloadType download_type); static String toTemporaryFilename(const String & fname); static bool isTemporaryFilename(const String & fname); diff --git a/docs/design/2026-06-27-disagg-write-node-local-staging.md b/docs/design/2026-06-27-disagg-write-node-local-staging.md new file mode 100644 index 00000000000..918c4abba82 --- /dev/null +++ b/docs/design/2026-06-27-disagg-write-node-local-staging.md @@ -0,0 +1,436 @@ +# Proposal: Localized Reads for Remote DTFiles in Background Merge Tasks in Disaggregated write node + +Date: 2026-06-27 + +Purpose: Reduce the risk of `Segment::prepareMerge`, `Segment::prepareMergeDelta`, and `Segment::prepareSplitPhysical` opening a large number of S3 streams directly for wide tables in disaggregated mode, and define a first-version local read design with controlled complexity. + +## Summary + +This proposal introduces an explicit switch on write/storage nodes: + +```text +dt_enable_write_filecache = false +``` + +When this switch is enabled, remote DTFile localized reads are enabled only for the following background tasks: + +1. `Segment::prepareMerge` +2. `Segment::prepareMergeDelta` +3. `Segment::prepareSplitPhysical` + +The first version does not introduce `RemoteReadPolicy::Auto`, does not make automatic decisions based on thresholds, does not add a batch download API, and does not support non-MetaV2 files. The reason is that newly generated DMFiles in the disaggregated architecture use MetaV2, while the core problem to solve is that background tasks on wide tables construct readers by column and substream, amplifying a small number of physical `.merged` files into many direct S3 streams. + +The first-version design decisions are: + +- Use `dt_enable_write_filecache` to explicitly control whether write-side FileCache and background-task localized reads are enabled. +- Trigger the behavior only in `prepareMerge`, `prepareMergeDelta`, and `prepareSplitPhysical`. +- Let `DMFileBlockInputStream` or `DMFileReader` directly hold `std::vector local_read_files` to pin local cached files. +- Collect the physical objects that will actually be read under MetaV2 precisely according to `read_columns`, and deduplicate them by S3 key. +- Reuse the existing single-file API `FileCache::downloadFileForLocalReadWithRetry(...)` to download files one by one, without adding a batch API. +- If localization fails, always fall back to the existing direct S3 read path, and record logs and metrics. + +This design keeps the implementation surface small. `Segment` only decides which background tasks enable localization. `DMFileBlockInputStreamBuilder` / `DMFileReader` are responsible for object collection and pinning. `FileCache` continues to handle downloads, capacity, deduplication, failure cleanup, and local file lifetime. + +## Context + +### Current State + +The current `prepareMergeDelta` path is roughly: + +```text +Segment::prepareMergeDelta + -> getInputStreamForDataExport + -> getReadInfo + -> getPlacedStream + -> StableValueSpace::Snapshot::getInputStream + -> DMFileBlockInputStreamBuilder::build + -> DMFileReader + -> ColumnReadStream per column/substream +``` + +The `prepareMerge` path constructs internal read streams for the segments to merge: + +```text +Segment::prepareMerge + -> for each segment: getReadInfo + getPlacedStream + -> ConcatBlockInputStream + -> createNewStable +``` + +`prepareSplitPhysical` also constructs internal read streams, writing the original segment data into new stables according to the split point. + +The relevant confirmed code boundaries are: + +- `Segment::prepareMergeDelta` reads stable and delta data in the current segment through `getInputStreamForDataExport(...)`, then writes a new stable. +- `Segment::prepareMerge` constructs internal read streams for multiple segments and eventually writes the merged stable through `createNewStable(...)`. +- `Segment::prepareSplitPhysical` constructs data streams for both sides of a physical split and writes stables separately. +- `StableValueSpace::Snapshot::getInputStream(...)` creates a `DMFileBlockInputStream` for each DMFile in the stable. +- The `DMFileReader` constructor iterates over `read_columns` and creates a `ColumnReadStream` for every existing column and its substreams. +- When `ColumnReadStream` is constructed, it reads marks and creates a read buffer for column data. If the underlying path is S3, `FileProvider::newRandomAccessFile(...)` enters `S3RandomAccessFile::create(...)`. +- `S3RandomAccessFile::create(...)` first tries `FileCache::getRandomAccessFile(...)`. On miss, it constructs `S3RandomAccessFile` and opens an S3 `GetObject` stream during construction. +- `FileCache` is currently mainly initialized on disaggregated compute nodes when remote cache is enabled. Background merge/split happens on write/storage nodes, so these internal tasks usually cannot benefit from `FileCache`. + +Under Format V3 / MetaV2, multiple logical column files are recorded in DMFile metadata and may be written into physical `.merged` files. Today, `ColumnReadStream::buildColDataReadBuffByMetaV2(...)` uses `merged_sub_file_infos` to find the merged file, offset, and size for a logical subfile, then creates a random read object, seeks to the offset, and reads that subfile's raw data into memory. This layout reduces the number of physical files. However, if every column substream independently creates a read buffer, background tasks on wide tables can still amplify the number of columns into many S3 streams or range reads. + +### Problem Statement + +In a wide-table scenario, for example a table with 500 columns, `prepareMerge`, `prepareMergeDelta`, or `prepareSplitPhysical` on a segment often needs to read the full schema rather than a small projection from a query. The current read stream construction eagerly initializes `ColumnReadStream` by column and substream, causing: + +- A large number of S3 `GetObject` streams to be opened in a short time. +- Each stream may hold a connection, body, buffer, retry state, and rate-limiting path. +- S3-side latency or network jitter can affect many column readers at the same time. +- Background tasks and foreground queries share remote read resources, which can amplify system jitter. +- Even though `FileCache` already has capacity control and download deduplication, background tasks on write/storage nodes may not have it enabled. + +Background merge/split tasks are internal batch jobs that are known to perform broad data reads and rewrite stables. They are more suitable for pulling the relevant physical objects to local storage in a controlled way before constructing column readers, and then completing subsequent column-level seek/read operations from local files, instead of letting each column reader independently connect to S3 directly. + +### Constraints and Decision Drivers + +- DeltaMerge visible semantics must not change. Background merge/split only rewrites physical layout, and must not change row visibility, MVCC filtering, delete marks, or handle semantics. +- Existing remote object lifetime management must not be bypassed. In disaggregated mode, the remote location of DMFiles, GC enablement/disablement, and PageStorage external page references must remain managed by the existing mechanisms. +- Local disk must not be treated as unlimited capacity. Any localized download must go through `FileCache` capacity control, failure cleanup, and eviction/pin semantics. +- The first version must be easy to roll back. After disabling `dt_enable_write_filecache`, the behavior should return to the existing direct S3 read path. +- The first version only handles MetaV2. Newly generated DMFiles in the disaggregated architecture use MetaV2, so there is no need to introduce extra branches for old formats. +- The first version always falls back to direct reads, preventing local cache capacity shortage or download failures from reducing the success rate of background tasks. + +## Terminology Baseline + +| Term | Meaning | +| --- | --- | +| write FileCache | FileCache explicitly enabled on write/storage nodes by `dt_enable_write_filecache` | +| local staging | Downloading remote physical objects locally before a background task starts reading, and pinning them for the reader lifetime | +| direct S3 read | Reading directly from a remote stream after `S3RandomAccessFile` misses `FileCache` | +| physical object | An actual object on S3, for example `N.merged` | +| logical subfile | A column data, mark, or index file described by DMFile MetaV2 metadata; it may reside inside the same `.merged` physical object | + +## Goals + +1. Reduce S3 stream fan-out caused by per-column reads in `prepareMerge`, `prepareMergeDelta`, and `prepareSplitPhysical` for wide tables. +2. Control the behavior explicitly through `dt_enable_write_filecache`, with the default kept disabled. +3. For MetaV2 DMFiles, precisely collect the physical objects to localize according to the actual read columns, and deduplicate them by key. +4. Reuse the existing `FileCache::downloadFileForLocalReadWithRetry(...)` without adding a batch download API. +5. Pin downloaded local files for the lifetime of the reader/input stream, preventing eviction during reads. +6. Always fall back to direct S3 reads if localization fails, so the first version does not reduce the success rate of background tasks. + +## Non-Goals + +- Do not introduce `RemoteReadPolicy::Auto` or any heuristic automatic policy. +- Do not automatically decide enablement based on thresholds such as column count, bytes, or segment size. Enablement is decided only by `dt_enable_write_filecache` and the call site. +- Do not add a batch download API to `FileCache`. +- Do not support non-MetaV2 DMFiles. +- Do not cover internal read paths other than `prepareMerge`, `prepareMergeDelta`, and `prepareSplitPhysical`. +- Do not implement fallback=`fail`. +- Do not rewrite `DMFileReader` column-level decoding, block organization logic, or the eager `ColumnReadStream` initialization model. +- Do not force local staging for normal query paths. + +## Architecture Overview + +The target structure is: + +```text +Segment::prepareMerge / prepareMergeDelta / prepareSplitPhysical + -> build internal stream with write-filecache enabled flag + -> DMFileBlockInputStreamBuilder + -> collect MetaV2 physical objects for read_columns + -> FileCache::downloadFileForLocalReadWithRetry for each object + -> store returned FileSegmentPtr in local_read_files + -> DMFileReader / ColumnReadStream + -> FileProvider::newRandomAccessFile + -> S3RandomAccessFile::create + -> FileCache hit: PosixRandomAccessFile + -> miss/failure: S3RandomAccessFile direct read +``` + +Key points: + +- Localization happens before `DMFileReader` eagerly creates all `ColumnReadStream` instances. +- Subsequent column readers do not need a special read interface. They still go through `FileProvider` / `S3RandomAccessFile::create`. +- `local_read_files` holds `FileSegmentPtr`, ensuring that local files are not evicted during the reader lifetime. +- Localization failures do not block read stream construction. They always fall back to the existing direct read path. + +## Design + +### 1. Explicit Switch `dt_enable_write_filecache` + +Add a new setting: + +```text +dt_enable_write_filecache = false +``` + +Semantics: + +- `false`: the write/storage node does not actively use FileCache for these three background tasks, and all logic related to this design degenerates to the existing behavior. +- `true`: the write/storage node initializes an available FileCache, and attempts to localize MetaV2 physical objects before constructing DMFile readers in `prepareMerge`, `prepareMergeDelta`, and `prepareSplitPhysical`. + +This switch controls both write-side FileCache enablement and localized reads for the three background task types. The first version does not introduce a separate `RemoteReadPolicy` or automatic policy, to avoid too many configuration and behavior combinations. + +### 2. Cover Only Three Call Sites + +The first version only passes the "enable write FileCache local staging" flag from the following paths: + +- `Segment::prepareMerge` +- `Segment::prepareMergeDelta` +- `Segment::prepareSplitPhysical` + +In implementation, these paths can set a bool on `DMFileBlockInputStreamBuilder` when constructing internal read streams, for example: + +```cpp +builder.enableWriteFileCacheLocalRead(dm_context.global_context.getSettingsRef().dt_enable_write_filecache); +``` + +Alternatively, the flag can be passed through an explicit field in `DMContext`. In either approach, the trigger scope must be limited to these three call sites to avoid automatically changing the behavior of all `ReadTag::Internal` paths. + +### 3. Handle Only MetaV2 + +Localized object collection only supports files where `dmfile->useMetaV2()` is true. + +If `dt_enable_write_filecache=true` but the file is not MetaV2: + +- Record a debug or trace log. +- Do not localize it. +- Continue through the existing direct read path. + +There is no need to collect `.dat`, `.mrk`, or `.idx` files for non-MetaV2. Newly generated remote DMFiles in the disaggregated architecture use MetaV2, so old formats are not a first-version target. + +### 4. Precisely Collect Physical Objects by Read Columns + +Before `DMFileBlockInputStreamBuilder::buildNoLocalIndex(...)` creates `DMFileReader`, collect the physical objects to localize from `dmfile` and `read_columns`. + +The object collection flow is: + +```text +DMFile MetaV2 + read_columns + -> enumerate each column's substreams by IDataType::enumerateStreams + -> for each stream_name: + logical data file = colDataFileName(stream_name) + logical mark file = colMarkFileName(stream_name) + optional logical index file = colIndexFileName(stream_name) + -> find each logical file in merged_sub_file_infos + -> convert merged file number to physical mergedPath(number) + -> deduplicate physical S3 key + -> download each physical key through FileCache +``` + +Collection rules: + +- For every read column that actually exists, enumerate substreams in the same way the current `DMFileReader` constructs `ColumnReadStream`. +- For every substream, collect the corresponding data and mark logical files. +- If pack filter or index loading reads index files, also collect the corresponding logical index files. +- Map every logical file to a physical `.merged` file through `DMFileMetaV2::merged_sub_file_infos`. +- Deduplicate by physical S3 key to avoid downloading the same `.merged` file repeatedly. + +If a logical file is not found in `merged_sub_file_infos`, the first version does not perform any extra file download. It records a debug log and allows the subsequent read path to read directly. This fallback is compatible with abnormal or transitional states and avoids expanding the first version to non-merged file handling. + +### 5. Reuse the Single-File Download API + +The first version does not add a batch download API. For the deduplicated physical keys, call the following API one by one: + +```cpp +FileCache::downloadFileForLocalReadWithRetry(s3_file_name, file_size, retry_count) +``` + +Requirements: + +- `retry_count` can use a fixed value such as 3, or reuse an existing configuration. +- Files already Complete in FileCache should be returned directly. +- Files that are currently being downloaded should reuse the existing wait logic. +- On download failure, catch the exception, record a warning and metric, and continue with direct-read fallback. +- Every successfully returned `FileSegmentPtr` must be saved for the lifetime of the reader/input stream. + +Sequential downloads are not optimal for throughput, but the first-version implementation is simple, has controlled behavior, and can reuse the existing FileCache reservation, failure cleanup, retry, and local file opening logic. + +### 6. Hold `local_read_files` Directly in the Reader/Input Stream + +The first version does not add a `LocalReadGuard` abstraction. Add a member directly to `DMFileBlockInputStream` or `DMFileReader`: + +```cpp +std::vector local_read_files; +``` + +Responsibilities of this member: + +- Hold the `FileSegmentPtr` returned by FileCache. +- Prevent FileCache from evicting the corresponding local files during the reader/input stream lifetime. +- Not participate in remote object lifetime management. +- After destruction, the corresponding files return to normal FileCache LRU management. + +The recommended location is `DMFileReader`. The reason is that localized object collection happens before reader construction or at the reader construction entry point, while `ColumnReadStream` is managed by `DMFileReader`; the reader lifetime covers the column data read process. + +### 7. Always Fall Back to Direct Reads + +The first version does not add fallback configuration. The behavior on localization failure is fixed: + +```text +log warning + metric +continue with existing direct S3 read path +``` + +The following cases should all fall back to direct reads: + +- `dt_enable_write_filecache=false` +- FileCache is not initialized +- The DMFile is not MetaV2 +- A logical file cannot be mapped to a physical `.merged` file +- FileCache capacity is insufficient +- Download fails +- Download succeeds but opening the local file fails + +This ensures that the first version does not reduce background task success rate due to local cache capacity, download failures, or implementation gaps. If S3 protection is needed later, a fail-fast policy can be added. + +### 8. Observability + +Add or reuse the following metrics / scan context fields: + +- Number of write FileCache local staging attempts. +- Number of physical objects collected for staging. +- Number of objects and bytes hit in FileCache during staging. +- Number of objects and bytes downloaded during staging. +- Number of staging failures, classified by reason. +- Number of direct-read fallbacks. +- Total staging time for `prepareMerge`, `prepareMergeDelta`, and `prepareSplitPhysical`. +- Changes in `S3RandomAccessFile` `GetObject` count and current-open count. + +Logs should output summaries at task or DMFile granularity, avoiding per-column log spam: + +```text +Write FileCache local staging begin: + task=prepareMergeDelta dmfile=... columns=... physical_objects=... + +Write FileCache local staging finish: + dmfile=... hit=... downloaded=... failed=... fallback=... cost_ms=... +``` + +## Compatibility and Invariants + +- When `dt_enable_write_filecache=false`, behavior remains unchanged. +- Non-disaggregated mode behavior remains unchanged. +- Non-MetaV2 files remain unchanged. +- Only `prepareMerge`, `prepareMergeDelta`, and `prepareSplitPhysical` attempt localization. +- Localization only changes the read source. It does not change DMFile metadata, PageStorage external page references, or remote GC semantics. +- `local_read_files` only pins local cached files. It does not extend the lifetime of remote objects. Remote object lifetime is still controlled by checkpoint locations, external pages, and DMFile GC. +- Localization failures always fall back to direct reads and should not introduce extra prepare-stage failures. +- FileCache eviction must not delete local files still held by `FileSegmentPtr`. + +## Incremental Plan + +### Phase 1: Optional FileCache Initialization on Write/Storage Nodes + +1. Add `dt_enable_write_filecache=false`. +2. When this configuration is enabled and remote cache configuration is available, initialize `FileCache` on write/storage nodes. +3. Keep the existing behavior when the configuration is disabled. +4. Add startup logs that clearly show whether write FileCache is enabled, along with cache paths and capacity. + +### Phase 2: MetaV2 Physical Object Collection + +1. Implement a MetaV2 object collector in `DMFileBlockInputStreamBuilder` or a nearby helper. +2. Use `DMFile` and `read_columns` as inputs. +3. Output deduplicated physical `.merged` S3 keys and file sizes. +4. Skip non-MetaV2 files and unmappable logical files directly, and record debug logs. +5. Add unit tests covering deduplication when multiple logical subfiles across many columns map to the same `.merged` file. + +### Phase 3: Single-File Download and Pinning + +1. Before constructing `DMFileReader`, call `FileCache::downloadFileForLocalReadWithRetry(...)` in order for the deduplicated physical keys. +2. Store successfully returned `FileSegmentPtr` objects in `DMFileReader::local_read_files` or `DMFileBlockInputStream::local_read_files`. +3. Catch download exceptions and fall back to direct reads. +4. Add metrics and summary logs. + +### Phase 4: Integrate the Three Segment Call Sites + +1. Enable write FileCache local staging when `Segment::prepareMerge` constructs internal read streams. +2. Enable it when `Segment::prepareMergeDelta` constructs the data export stream. +3. Enable it when `Segment::prepareSplitPhysical` constructs the data streams for both sides. +4. Confirm other `ReadTag::Internal` paths are unaffected. + +## Validation Strategy + +### Unit Tests + +- When multiple logical subfiles in a MetaV2 DMFile map to the same `.merged` file, collect only one physical object. +- Collect only the substreams corresponding to `read_columns`, and do not collect unread columns. +- Non-MetaV2 DMFiles do not trigger localization. +- If a logical file cannot be mapped to `merged_sub_file_infos`, fall back to direct reads. +- When `local_read_files` holds `FileSegmentPtr`, FileCache eviction does not delete a local file that is being read. +- FileCache download failures do not prevent reader construction and can fall back to direct reads. + +### Integration Tests + +- Use mock S3 and FileCache to construct a wide-table MetaV2 DMFile, run `prepareMergeDelta`, and confirm that the S3 `GetObject` count changes from per-column amplification to deduplicated physical `.merged` objects. +- Run `prepareMerge`, and confirm that stable DMFiles from multiple segments can all trigger localization and pinning. +- Run `prepareSplitPhysical`, and confirm that read streams for both sides can hit the local cache before writing stables. +- When `dt_enable_write_filecache=false`, confirm that the three paths behave the same as the existing direct read path. +- When FileCache capacity is small, confirm that download failures fall back to direct reads and the background task still completes. + +### Benchmark Tests + +For a 500-column wide table with cold FileCache, compare direct S3 reads and write FileCache local staging on: + +- Total `prepareMergeDelta` time. +- Total `prepareMerge` time. +- Total `prepareSplitPhysical` time. +- Number of S3 `GetObject` calls. +- Number of currently opened `S3RandomAccessFile` objects. +- FileCache download bytes. +- Local disk read bytes. +- Impact on foreground query P99 latency. + +For a hot FileCache scenario, confirm that staging mostly produces hits and does not introduce significant extra latency. + +## Risks and Mitigations + +1. Background tasks may fill local disk capacity. + - Downloads must go through FileCache reservation and capacity quotas. Failures always fall back to direct reads. `dt_enable_write_filecache` is disabled by default. + +2. Downloading entire `.merged` files may cause read amplification. + - The first version only covers three broad-read background tasks. Metrics record downloaded bytes and fallback counts so read amplification can be evaluated. + +3. Background downloads may compete with foreground query resources. + - Reuse the existing FileCache download path and S3 read limiter. The first version does not perform concurrent batch downloads, reducing instantaneous impact. + +4. Enabling FileCache on write/storage nodes may increase operational complexity. + - `dt_enable_write_filecache=false` keeps the feature disabled by default. Startup logs make the state explicit. Disabling it fully falls back to existing behavior. + +5. Object collection may miss logical files that are actually read. + - The first version always falls back to direct reads. The collector covers data, mark, and necessary index files. Mock S3 tests verify direct fallback counts. + +6. Eager `ColumnReadStream` creation may still create many local file descriptors. + - The first version only addresses S3 stream amplification. Local file descriptor optimization is left as future work. + +## Alternatives Considered + +### 1. Introduce `RemoteReadPolicy::Auto` + +Not adopted. `Auto` requires heuristics based on column count, bytes, segment size, FileCache state, and other conditions. Its behavior is hard to explain and increases rollout and troubleshooting complexity. The first version uses the explicit `dt_enable_write_filecache` switch and fixes the covered scope to three background tasks. + +### 2. Add a FileCache Batch Download API + +Not adopted for the first version. A batch API could improve throughput and centralize statistics, but it would expand FileCache interface and concurrency-control changes. The first version calls the existing single-file API one by one to validate benefits and semantic correctness first. + +### 3. Support Non-MetaV2 Files + +Not adopted for the first version. Newly generated remote DMFiles in the disaggregated architecture use MetaV2. Non-MetaV2 support would introduce separate collection logic for `.dat`, `.mrk`, and `.idx` files, expanding the implementation surface. + +### 4. Download to a Handwritten Temporary Directory in `Segment` + +Not recommended. This approach bypasses FileCache capacity control, deduplication, LRU, failure cleanup, download throttling, and pin semantics. It would also require reimplementing temporary file cleanup, S3 retry, disk-full handling, and task failure recovery. + +### 5. Immediately Make `DMFileReader` Lazily Create `ColumnReadStream` + +Valuable, but not part of the first version. Lazy initialization can reduce local file descriptors and some memory peaks, but it does not directly solve the core problem of direct S3 reads amplifying streams by column. + +## Future Optimizations + +The following items are not included in the first version and are kept as future optimizations: + +- Add `RemoteReadPolicy::Auto` or threshold strategies to enable the feature automatically based on column count, bytes, or segment size. +- Add `FileCache::downloadFilesForLocalReadWithRetry(...)` as a batch API with task-level deduplication, concurrency control, and statistics. +- Support localization for non-MetaV2 DMFiles, including `.dat`, `.mrk`, and `.idx`. +- Add fallback=`fail`, allowing background tasks to fail and retry instead of continuing direct reads when S3 pressure is high. +- Make `ColumnReadStream` lazily initialized to reduce local file descriptors and memory peaks. +- Configure independent concurrency, priority, or byte limits for write FileCache downloads. +- Integrate more internal read paths into the same mechanism, such as replace stable data and local index build. + +## Open Questions + +There are no open questions blocking the first-version design. The first-version behavior is explicitly controlled by `dt_enable_write_filecache`, disabled by default, and always falls back to direct reads when enabled.