Skip to content

Commit 9e35684

Browse files
committed
impl scheduler for cleaning up hot tier files
changed env vars 1. P_CACHE_DIR -> P_HOT_TIER_DIR 2. P_CACHE_SIZE -> P_HOT_TIER_SIZE 3. P_CACHE_TIME_RANGE -> P_HOT_TIER_TIME_RANGE (does not has a default value)
1 parent 566730e commit 9e35684

File tree

14 files changed

+159
-58
lines changed

14 files changed

+159
-58
lines changed

server/src/banner.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,9 +109,9 @@ async fn storage_info(config: &Config) {
109109
config.staging_dir().to_string_lossy(),
110110
);
111111

112-
if let Some(path) = &config.parseable.local_cache_path {
112+
if let Some(path) = &config.parseable.hot_tier_storage_path {
113113
let size: SpecificSize<human_size::Gigibyte> =
114-
SpecificSize::new(config.parseable.local_cache_size as f64, human_size::Byte)
114+
SpecificSize::new(config.parseable.hot_tier_size as f64, human_size::Byte)
115115
.unwrap()
116116
.into();
117117

server/src/cli.rs

Lines changed: 22 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -45,14 +45,14 @@ pub struct Cli {
4545
/// for incoming events and local cache
4646
pub local_staging_path: PathBuf,
4747

48-
/// The local cache path is used for speeding up query on latest data
49-
pub local_cache_path: Option<PathBuf>,
48+
/// The local hot_tier path is used for speeding up query on latest data
49+
pub hot_tier_storage_path: Option<PathBuf>,
5050

51-
/// Size for local cache
52-
pub local_cache_size: u64,
51+
/// Size for local hot_tier
52+
pub hot_tier_size: u64,
5353

54-
/// Size for local cache
55-
pub local_cache_time_range: i64,
54+
/// Size for local hot_tier
55+
pub hot_tier_time_range: Option<i64>,
5656

5757
/// Username for the basic authentication on the server
5858
pub username: String,
@@ -110,11 +110,11 @@ impl Cli {
110110
pub const ADDRESS: &'static str = "address";
111111
pub const DOMAIN_URI: &'static str = "origin";
112112
pub const STAGING: &'static str = "local-staging-path";
113-
pub const CACHE: &'static str = "cache-path";
114-
pub const CACHE_TIME_RANGE: &'static str = "cache-time-range";
115113
pub const QUERY_CACHE: &'static str = "query-cache-path";
116114
pub const QUERY_CACHE_SIZE: &'static str = "query-cache-size";
117-
pub const CACHE_SIZE: &'static str = "cache-size";
115+
pub const HOT_TIER: &'static str = "hot-tier-path";
116+
pub const HOT_TIER_TIME_RANGE: &'static str = "hot-tier-time-range";
117+
pub const HOT_TIER_SIZE: &'static str = "hot-tier-size";
118118
pub const USERNAME: &'static str = "username";
119119
pub const PASSWORD: &'static str = "password";
120120
pub const CHECK_UPDATE: &'static str = "check-update";
@@ -184,30 +184,29 @@ impl Cli {
184184
.next_line_help(true),
185185
)
186186
.arg(
187-
Arg::new(Self::CACHE)
188-
.long(Self::CACHE)
189-
.env("P_CACHE_DIR")
187+
Arg::new(Self::HOT_TIER)
188+
.long(Self::HOT_TIER)
189+
.env("P_HOT_TIER_DIR")
190190
.value_name("DIR")
191191
.value_parser(validation::canonicalize_path)
192192
.help("Local path on this device to be used for caching data")
193193
.next_line_help(true),
194194
)
195195
.arg(
196-
Arg::new(Self::CACHE_SIZE)
197-
.long(Self::CACHE_SIZE)
198-
.env("P_CACHE_SIZE")
196+
Arg::new(Self::HOT_TIER_SIZE)
197+
.long(Self::HOT_TIER_SIZE)
198+
.env("P_HOT_TIER_SIZE")
199199
.value_name("size")
200200
.default_value("1GiB")
201201
.value_parser(validation::cache_size)
202202
.help("Maximum allowed cache size for all streams combined (In human readable format, e.g 1GiB, 2GiB)")
203203
.next_line_help(true),
204204
)
205205
.arg(
206-
Arg::new(Self::CACHE_TIME_RANGE)
207-
.long(Self::CACHE_TIME_RANGE)
208-
.env("P_CACHE_TIME_RANGE")
206+
Arg::new(Self::HOT_TIER_TIME_RANGE)
207+
.long(Self::HOT_TIER_TIME_RANGE)
208+
.env("P_HOT_TIER_TIME_RANGE")
209209
.value_name("days")
210-
.default_value("10")
211210
.value_parser(value_parser!(i64))
212211
.help("Maximum allowed time in days for all streams combined (In human readable format, e.g 1, 2)")
213212
.next_line_help(true),
@@ -411,16 +410,13 @@ impl FromArgMatches for Cli {
411410
}
412411

413412
fn update_from_arg_matches(&mut self, m: &clap::ArgMatches) -> Result<(), clap::Error> {
414-
self.local_cache_path = m.get_one::<PathBuf>(Self::CACHE).cloned();
413+
self.hot_tier_storage_path = m.get_one::<PathBuf>(Self::HOT_TIER).cloned();
415414
self.query_cache_path = m.get_one::<PathBuf>(Self::QUERY_CACHE).cloned();
416415
self.tls_cert_path = m.get_one::<PathBuf>(Self::TLS_CERT).cloned();
417416
self.tls_key_path = m.get_one::<PathBuf>(Self::TLS_KEY).cloned();
418417
self.domain_address = m.get_one::<Url>(Self::DOMAIN_URI).cloned();
419418

420-
self.local_cache_time_range = m
421-
.get_one::<i64>(Self::CACHE_TIME_RANGE)
422-
.cloned()
423-
.expect("default value for cache time range");
419+
self.hot_tier_time_range = m.get_one::<i64>(Self::HOT_TIER_TIME_RANGE).cloned();
424420

425421
self.address = m
426422
.get_one::<String>(Self::ADDRESS)
@@ -436,8 +432,8 @@ impl FromArgMatches for Cli {
436432
.get_one::<PathBuf>(Self::STAGING)
437433
.cloned()
438434
.expect("default value for staging");
439-
self.local_cache_size = m
440-
.get_one::<u64>(Self::CACHE_SIZE)
435+
self.hot_tier_size = m
436+
.get_one::<u64>(Self::HOT_TIER_SIZE)
441437
.cloned()
442438
.expect("default value for cache size");
443439
self.query_cache_size = m

server/src/handlers/airplane.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,8 @@ use crate::handlers::http::query::{
4646
use crate::query::{TableScanVisitor, QUERY_SESSION};
4747
use crate::querycache::QueryCacheManager;
4848
use crate::utils::arrow::flight::{
49-
append_temporary_events, get_from_ingester_cache, get_query_from_ticket, into_flight_data, run_do_get_rpc,
50-
51-
send_to_ingester,
49+
append_temporary_events, get_from_ingester_cache, get_query_from_ticket, into_flight_data,
50+
run_do_get_rpc, send_to_ingester,
5251
};
5352
use arrow_flight::{
5453
flight_service_server::FlightService, Action, ActionType, Criteria, Empty, FlightData,
@@ -204,7 +203,6 @@ impl FlightService for AirServiceImpl {
204203
.await
205204
.map_err(|_| Status::internal("Failed to parse query"))?;
206205

207-
208206
// deal with ingester local cache
209207
if let Some(early) =
210208
get_from_ingester_cache(&query.start, &query.end, &stream_name, ticket.clone()).await

server/src/handlers/http/cluster/mod.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -567,17 +567,16 @@ pub fn init_cluster_metrics_schedular() -> Result<(), PostError> {
567567
.run(move || async {
568568
let result: Result<(), PostError> = async {
569569
let cluster_metrics = fetch_cluster_metrics().await;
570-
if let Ok(metrics) = cluster_metrics{
570+
if let Ok(metrics) = cluster_metrics {
571571
if !metrics.is_empty() {
572572
log::info!("Cluster metrics fetched successfully from all ingestors");
573573
if let Ok(metrics_bytes) = serde_json::to_vec(&metrics) {
574574
let stream_name = INTERNAL_STREAM_NAME;
575575

576-
if ingest_internal_stream(
577-
stream_name,
578-
metrics_bytes,
579-
)
580-
.await.is_ok() {
576+
if ingest_internal_stream(stream_name, metrics_bytes)
577+
.await
578+
.is_ok()
579+
{
581580
log::info!(
582581
"Cluster metrics successfully ingested into internal stream"
583582
);

server/src/handlers/http/logstream.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -395,7 +395,7 @@ pub async fn get_cache_enabled(req: HttpRequest) -> Result<impl Responder, Strea
395395

396396
match CONFIG.parseable.mode {
397397
Mode::Ingest | Mode::All => {
398-
if CONFIG.parseable.local_cache_path.is_none() {
398+
if CONFIG.parseable.hot_tier_storage_path.is_none() {
399399
return Err(StreamError::CacheNotEnabled(stream_name));
400400
}
401401
}
@@ -434,7 +434,7 @@ pub async fn put_enable_cache(
434434
}
435435
}
436436
Mode::Ingest => {
437-
if CONFIG.parseable.local_cache_path.is_none() {
437+
if CONFIG.parseable.hot_tier_storage_path.is_none() {
438438
return Err(StreamError::CacheNotEnabled(stream_name));
439439
}
440440
// here the ingest server has not found the stream
@@ -464,7 +464,7 @@ pub async fn put_enable_cache(
464464
if !metadata::STREAM_INFO.stream_exists(&stream_name) {
465465
return Err(StreamError::StreamNotFound(stream_name));
466466
}
467-
if CONFIG.parseable.local_cache_path.is_none() {
467+
if CONFIG.parseable.hot_tier_storage_path.is_none() {
468468
return Err(StreamError::CacheNotEnabled(stream_name));
469469
}
470470
}

server/src/handlers/http/modal/ingest_server.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use crate::migration::metadata_migration::migrate_ingester_metadata;
2929
use crate::rbac;
3030
use crate::rbac::role::Action;
3131
use crate::storage;
32+
use crate::storage::hot_tier;
3233
use crate::storage::object_storage::ingestor_metadata_path;
3334
use crate::storage::object_storage::parseable_json_path;
3435
use crate::storage::staging;
@@ -293,10 +294,10 @@ impl IngestServer {
293294
match store.get_object(&path).await {
294295
Ok(bytes) => {
295296
let size = serde_json::from_slice::<StorageMetadata>(&bytes)?.hot_tier_capacity;
296-
let hot_tier_enabled = CONFIG.is_hot_tier_active();
297+
let hot_tier_enabled = CONFIG.is_hot_tier_enabled();
297298
match size {
298299
Some(size) => {
299-
if hot_tier_enabled && CONFIG.parseable.local_cache_size != size {
300+
if hot_tier_enabled && CONFIG.parseable.hot_tier_size != size {
300301
return Err(ObjectStorageError::Custom("Hot Tier Capacity does not match with Other Nodes. Please check the hot tier capacity and try again."));
301302
}
302303
}
@@ -310,8 +311,7 @@ impl IngestServer {
310311
Ok(())
311312
}
312313
Err(_) => Err(ObjectStorageError::Custom(
313-
"Query Server has not been started yet. Please start the querier server first."
314-
,
314+
"Query Server has not been started yet. Please start the querier server first.",
315315
)),
316316
}
317317
}
@@ -353,7 +353,7 @@ impl IngestServer {
353353
async fn initialize(&self) -> anyhow::Result<()> {
354354
if let Some(cache_manager) = LocalCacheManager::global() {
355355
cache_manager
356-
.validate(CONFIG.parseable.local_cache_size)
356+
.validate(CONFIG.parseable.hot_tier_size)
357357
.await?;
358358
};
359359

@@ -374,6 +374,8 @@ impl IngestServer {
374374
let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) =
375375
sync::object_store_sync();
376376

377+
hot_tier::setup_hot_tier_scheduler().await?;
378+
377379
tokio::spawn(airplane::server());
378380

379381
let app = self.start(prometheus, CONFIG.parseable.openid.clone());

server/src/handlers/http/modal/server.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use crate::metrics;
3434
use crate::migration;
3535
use crate::rbac;
3636
use crate::storage;
37+
use crate::storage::hot_tier;
3738
use crate::sync;
3839
use crate::users::dashboards::DASHBOARDS;
3940
use crate::users::filters::FILTERS;
@@ -486,7 +487,7 @@ impl Server {
486487
async fn initialize(&self) -> anyhow::Result<()> {
487488
if let Some(cache_manager) = LocalCacheManager::global() {
488489
cache_manager
489-
.validate(CONFIG.parseable.local_cache_size)
490+
.validate(CONFIG.parseable.hot_tier_size)
490491
.await?;
491492
};
492493

@@ -515,6 +516,8 @@ impl Server {
515516
analytics::init_analytics_scheduler()?;
516517
}
517518

519+
hot_tier::setup_hot_tier_scheduler().await?;
520+
518521
tokio::spawn(handlers::livetail::server());
519522
tokio::spawn(handlers::airplane::server());
520523

server/src/localcache.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,15 +79,15 @@ impl LocalCacheManager {
7979
pub fn global() -> Option<&'static LocalCacheManager> {
8080
static INSTANCE: OnceCell<LocalCacheManager> = OnceCell::new();
8181

82-
let cache_path = CONFIG.parseable.local_cache_path.as_ref()?;
82+
let cache_path = CONFIG.parseable.hot_tier_storage_path.as_ref()?;
8383

8484
Some(INSTANCE.get_or_init(|| {
8585
let cache_path = cache_path.clone();
8686
std::fs::create_dir_all(&cache_path).unwrap();
8787
LocalCacheManager {
8888
filesystem: LocalFileSystem::new(),
8989
cache_path,
90-
cache_capacity: CONFIG.parseable.local_cache_size,
90+
cache_capacity: CONFIG.parseable.hot_tier_size,
9191
copy_options: CopyOptions {
9292
overwrite: true,
9393
skip_exist: false,

server/src/migration/metadata_migration.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ pub fn update_v3(mut storage_metadata: JsonValue) -> JsonValue {
125125
if hot_tier_capacity.is_none() {
126126
metadata.insert(
127127
"hot_tier_capacity".to_string(),
128-
JsonValue::Bool(CONFIG.is_hot_tier_active()),
128+
JsonValue::Bool(CONFIG.is_hot_tier_enabled()),
129129
);
130130
}
131131

server/src/option.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ Log Lake for the cloud-native world
7474
.exit()
7575
}
7676

77-
if cli.local_cache_path.is_some() {
77+
if cli.hot_tier_storage_path.is_some() {
7878
create_parseable_cli_command()
7979
.error(
8080
ErrorKind::ValueValidation,
@@ -147,11 +147,11 @@ Log Lake for the cloud-native world
147147
}
148148

149149
pub fn cache_size(&self) -> u64 {
150-
self.parseable.local_cache_size
150+
self.parseable.hot_tier_size
151151
}
152152

153153
pub fn cache_dir(&self) -> &Option<PathBuf> {
154-
&self.parseable.local_cache_path
154+
&self.parseable.hot_tier_storage_path
155155
}
156156

157157
pub fn is_default_creds(&self) -> bool {
@@ -177,8 +177,9 @@ Log Lake for the cloud-native world
177177
}
178178
}
179179

180-
pub fn is_hot_tier_active(&self) -> bool {
181-
self.parseable.local_cache_path.is_some()
180+
pub fn is_hot_tier_enabled(&self) -> bool {
181+
self.parseable.hot_tier_storage_path.is_some()
182+
&& self.parseable.hot_tier_time_range.is_some()
182183
}
183184
}
184185

server/src/storage.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use chrono::Local;
2222

2323
use std::fmt::Debug;
2424

25+
pub mod hot_tier;
2526
mod localfs;
2627
mod metrics_layer;
2728
pub(crate) mod object_storage;

0 commit comments

Comments
 (0)