Skip to content

Commit dfba50c

Browse files
committed
impl scheduler for cleaning up hot tier files
changed env vars 1. P_CACHE_DIR -> P_HOT_TIER_DIR 2. P_CACHE_SIZE -> P_HOT_TIER_SIZE 3. P_CACHE_TIME_RANGE -> P_HOT_TIER_TIME_RANGE (does not has a default value)
1 parent 566730e commit dfba50c

File tree

13 files changed

+155
-52
lines changed

13 files changed

+155
-52
lines changed

server/src/banner.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,9 +109,9 @@ async fn storage_info(config: &Config) {
109109
config.staging_dir().to_string_lossy(),
110110
);
111111

112-
if let Some(path) = &config.parseable.local_cache_path {
112+
if let Some(path) = &config.parseable.hot_tier_storage_path {
113113
let size: SpecificSize<human_size::Gigibyte> =
114-
SpecificSize::new(config.parseable.local_cache_size as f64, human_size::Byte)
114+
SpecificSize::new(config.parseable.hot_tier_size as f64, human_size::Byte)
115115
.unwrap()
116116
.into();
117117

server/src/cli.rs

Lines changed: 22 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -45,14 +45,14 @@ pub struct Cli {
4545
/// for incoming events and local cache
4646
pub local_staging_path: PathBuf,
4747

48-
/// The local cache path is used for speeding up query on latest data
49-
pub local_cache_path: Option<PathBuf>,
48+
/// The local hot_tier path is used for speeding up query on latest data
49+
pub hot_tier_storage_path: Option<PathBuf>,
5050

51-
/// Size for local cache
52-
pub local_cache_size: u64,
51+
/// Size for local hot_tier
52+
pub hot_tier_size: u64,
5353

54-
/// Size for local cache
55-
pub local_cache_time_range: i64,
54+
/// Size for local hot_tier
55+
pub hot_tier_time_range: Option<i64>,
5656

5757
/// Username for the basic authentication on the server
5858
pub username: String,
@@ -110,11 +110,11 @@ impl Cli {
110110
pub const ADDRESS: &'static str = "address";
111111
pub const DOMAIN_URI: &'static str = "origin";
112112
pub const STAGING: &'static str = "local-staging-path";
113-
pub const CACHE: &'static str = "cache-path";
114-
pub const CACHE_TIME_RANGE: &'static str = "cache-time-range";
115113
pub const QUERY_CACHE: &'static str = "query-cache-path";
116114
pub const QUERY_CACHE_SIZE: &'static str = "query-cache-size";
117-
pub const CACHE_SIZE: &'static str = "cache-size";
115+
pub const HOT_TIER: &'static str = "hot-tier-path";
116+
pub const HOT_TIER_TIME_RANGE: &'static str = "hot-tier-time-range";
117+
pub const HOT_TIER_SIZE: &'static str = "hot-tier-size";
118118
pub const USERNAME: &'static str = "username";
119119
pub const PASSWORD: &'static str = "password";
120120
pub const CHECK_UPDATE: &'static str = "check-update";
@@ -184,30 +184,29 @@ impl Cli {
184184
.next_line_help(true),
185185
)
186186
.arg(
187-
Arg::new(Self::CACHE)
188-
.long(Self::CACHE)
189-
.env("P_CACHE_DIR")
187+
Arg::new(Self::HOT_TIER)
188+
.long(Self::HOT_TIER)
189+
.env("P_HOT_TIER_DIR")
190190
.value_name("DIR")
191191
.value_parser(validation::canonicalize_path)
192192
.help("Local path on this device to be used for caching data")
193193
.next_line_help(true),
194194
)
195195
.arg(
196-
Arg::new(Self::CACHE_SIZE)
197-
.long(Self::CACHE_SIZE)
198-
.env("P_CACHE_SIZE")
196+
Arg::new(Self::HOT_TIER_SIZE)
197+
.long(Self::HOT_TIER_SIZE)
198+
.env("P_HOT_TIER_SIZE")
199199
.value_name("size")
200200
.default_value("1GiB")
201201
.value_parser(validation::cache_size)
202202
.help("Maximum allowed cache size for all streams combined (In human readable format, e.g 1GiB, 2GiB)")
203203
.next_line_help(true),
204204
)
205205
.arg(
206-
Arg::new(Self::CACHE_TIME_RANGE)
207-
.long(Self::CACHE_TIME_RANGE)
208-
.env("P_CACHE_TIME_RANGE")
206+
Arg::new(Self::HOT_TIER_TIME_RANGE)
207+
.long(Self::HOT_TIER_TIME_RANGE)
208+
.env("P_HOT_TIER_TIME_RANGE")
209209
.value_name("days")
210-
.default_value("10")
211210
.value_parser(value_parser!(i64))
212211
.help("Maximum allowed time in days for all streams combined (In human readable format, e.g 1, 2)")
213212
.next_line_help(true),
@@ -411,16 +410,13 @@ impl FromArgMatches for Cli {
411410
}
412411

413412
fn update_from_arg_matches(&mut self, m: &clap::ArgMatches) -> Result<(), clap::Error> {
414-
self.local_cache_path = m.get_one::<PathBuf>(Self::CACHE).cloned();
413+
self.hot_tier_storage_path = m.get_one::<PathBuf>(Self::HOT_TIER).cloned();
415414
self.query_cache_path = m.get_one::<PathBuf>(Self::QUERY_CACHE).cloned();
416415
self.tls_cert_path = m.get_one::<PathBuf>(Self::TLS_CERT).cloned();
417416
self.tls_key_path = m.get_one::<PathBuf>(Self::TLS_KEY).cloned();
418417
self.domain_address = m.get_one::<Url>(Self::DOMAIN_URI).cloned();
419418

420-
self.local_cache_time_range = m
421-
.get_one::<i64>(Self::CACHE_TIME_RANGE)
422-
.cloned()
423-
.expect("default value for cache time range");
419+
self.hot_tier_time_range = m.get_one::<i64>(Self::HOT_TIER_TIME_RANGE).cloned();
424420

425421
self.address = m
426422
.get_one::<String>(Self::ADDRESS)
@@ -436,8 +432,8 @@ impl FromArgMatches for Cli {
436432
.get_one::<PathBuf>(Self::STAGING)
437433
.cloned()
438434
.expect("default value for staging");
439-
self.local_cache_size = m
440-
.get_one::<u64>(Self::CACHE_SIZE)
435+
self.hot_tier_size = m
436+
.get_one::<u64>(Self::HOT_TIER_SIZE)
441437
.cloned()
442438
.expect("default value for cache size");
443439
self.query_cache_size = m

server/src/handlers/http/cluster/mod.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -567,17 +567,16 @@ pub fn init_cluster_metrics_schedular() -> Result<(), PostError> {
567567
.run(move || async {
568568
let result: Result<(), PostError> = async {
569569
let cluster_metrics = fetch_cluster_metrics().await;
570-
if let Ok(metrics) = cluster_metrics{
570+
if let Ok(metrics) = cluster_metrics {
571571
if !metrics.is_empty() {
572572
log::info!("Cluster metrics fetched successfully from all ingestors");
573573
if let Ok(metrics_bytes) = serde_json::to_vec(&metrics) {
574574
let stream_name = INTERNAL_STREAM_NAME;
575575

576-
if ingest_internal_stream(
577-
stream_name,
578-
metrics_bytes,
579-
)
580-
.await.is_ok() {
576+
if ingest_internal_stream(stream_name, metrics_bytes)
577+
.await
578+
.is_ok()
579+
{
581580
log::info!(
582581
"Cluster metrics successfully ingested into internal stream"
583582
);

server/src/handlers/http/logstream.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -395,7 +395,7 @@ pub async fn get_cache_enabled(req: HttpRequest) -> Result<impl Responder, Strea
395395

396396
match CONFIG.parseable.mode {
397397
Mode::Ingest | Mode::All => {
398-
if CONFIG.parseable.local_cache_path.is_none() {
398+
if CONFIG.parseable.hot_tier_storage_path.is_none() {
399399
return Err(StreamError::CacheNotEnabled(stream_name));
400400
}
401401
}
@@ -434,7 +434,7 @@ pub async fn put_enable_cache(
434434
}
435435
}
436436
Mode::Ingest => {
437-
if CONFIG.parseable.local_cache_path.is_none() {
437+
if CONFIG.parseable.hot_tier_storage_path.is_none() {
438438
return Err(StreamError::CacheNotEnabled(stream_name));
439439
}
440440
// here the ingest server has not found the stream
@@ -464,7 +464,7 @@ pub async fn put_enable_cache(
464464
if !metadata::STREAM_INFO.stream_exists(&stream_name) {
465465
return Err(StreamError::StreamNotFound(stream_name));
466466
}
467-
if CONFIG.parseable.local_cache_path.is_none() {
467+
if CONFIG.parseable.hot_tier_storage_path.is_none() {
468468
return Err(StreamError::CacheNotEnabled(stream_name));
469469
}
470470
}

server/src/handlers/http/modal/ingest_server.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use crate::migration::metadata_migration::migrate_ingester_metadata;
2929
use crate::rbac;
3030
use crate::rbac::role::Action;
3131
use crate::storage;
32+
use crate::storage::hot_tier;
3233
use crate::storage::object_storage::ingestor_metadata_path;
3334
use crate::storage::object_storage::parseable_json_path;
3435
use crate::storage::staging;
@@ -293,10 +294,10 @@ impl IngestServer {
293294
match store.get_object(&path).await {
294295
Ok(bytes) => {
295296
let size = serde_json::from_slice::<StorageMetadata>(&bytes)?.hot_tier_capacity;
296-
let hot_tier_enabled = CONFIG.is_hot_tier_active();
297+
let hot_tier_enabled = CONFIG.is_hot_tier_enabled();
297298
match size {
298299
Some(size) => {
299-
if hot_tier_enabled && CONFIG.parseable.local_cache_size != size {
300+
if hot_tier_enabled && CONFIG.parseable.hot_tier_size != size {
300301
return Err(ObjectStorageError::Custom("Hot Tier Capacity does not match with Other Nodes. Please check the hot tier capacity and try again."));
301302
}
302303
}
@@ -353,7 +354,7 @@ impl IngestServer {
353354
async fn initialize(&self) -> anyhow::Result<()> {
354355
if let Some(cache_manager) = LocalCacheManager::global() {
355356
cache_manager
356-
.validate(CONFIG.parseable.local_cache_size)
357+
.validate(CONFIG.parseable.hot_tier_size)
357358
.await?;
358359
};
359360

@@ -374,6 +375,8 @@ impl IngestServer {
374375
let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) =
375376
sync::object_store_sync();
376377

378+
hot_tier::setup_hot_tier_scheduler().await?;
379+
377380
tokio::spawn(airplane::server());
378381

379382
let app = self.start(prometheus, CONFIG.parseable.openid.clone());

server/src/handlers/http/modal/server.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use crate::metrics;
3434
use crate::migration;
3535
use crate::rbac;
3636
use crate::storage;
37+
use crate::storage::hot_tier;
3738
use crate::sync;
3839
use crate::users::dashboards::DASHBOARDS;
3940
use crate::users::filters::FILTERS;
@@ -486,7 +487,7 @@ impl Server {
486487
async fn initialize(&self) -> anyhow::Result<()> {
487488
if let Some(cache_manager) = LocalCacheManager::global() {
488489
cache_manager
489-
.validate(CONFIG.parseable.local_cache_size)
490+
.validate(CONFIG.parseable.hot_tier_size)
490491
.await?;
491492
};
492493

@@ -515,6 +516,8 @@ impl Server {
515516
analytics::init_analytics_scheduler()?;
516517
}
517518

519+
hot_tier::setup_hot_tier_scheduler().await?;
520+
518521
tokio::spawn(handlers::livetail::server());
519522
tokio::spawn(handlers::airplane::server());
520523

server/src/localcache.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,15 +79,15 @@ impl LocalCacheManager {
7979
pub fn global() -> Option<&'static LocalCacheManager> {
8080
static INSTANCE: OnceCell<LocalCacheManager> = OnceCell::new();
8181

82-
let cache_path = CONFIG.parseable.local_cache_path.as_ref()?;
82+
let cache_path = CONFIG.parseable.hot_tier_storage_path.as_ref()?;
8383

8484
Some(INSTANCE.get_or_init(|| {
8585
let cache_path = cache_path.clone();
8686
std::fs::create_dir_all(&cache_path).unwrap();
8787
LocalCacheManager {
8888
filesystem: LocalFileSystem::new(),
8989
cache_path,
90-
cache_capacity: CONFIG.parseable.local_cache_size,
90+
cache_capacity: CONFIG.parseable.hot_tier_size,
9191
copy_options: CopyOptions {
9292
overwrite: true,
9393
skip_exist: false,

server/src/migration/metadata_migration.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ pub fn update_v3(mut storage_metadata: JsonValue) -> JsonValue {
125125
if hot_tier_capacity.is_none() {
126126
metadata.insert(
127127
"hot_tier_capacity".to_string(),
128-
JsonValue::Bool(CONFIG.is_hot_tier_active()),
128+
JsonValue::Bool(CONFIG.is_hot_tier_enabled()),
129129
);
130130
}
131131

server/src/option.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ Log Lake for the cloud-native world
7474
.exit()
7575
}
7676

77-
if cli.local_cache_path.is_some() {
77+
if cli.hot_tier_storage_path.is_some() {
7878
create_parseable_cli_command()
7979
.error(
8080
ErrorKind::ValueValidation,
@@ -147,11 +147,11 @@ Log Lake for the cloud-native world
147147
}
148148

149149
pub fn cache_size(&self) -> u64 {
150-
self.parseable.local_cache_size
150+
self.parseable.hot_tier_size
151151
}
152152

153153
pub fn cache_dir(&self) -> &Option<PathBuf> {
154-
&self.parseable.local_cache_path
154+
&self.parseable.hot_tier_storage_path
155155
}
156156

157157
pub fn is_default_creds(&self) -> bool {
@@ -177,8 +177,8 @@ Log Lake for the cloud-native world
177177
}
178178
}
179179

180-
pub fn is_hot_tier_active(&self) -> bool {
181-
self.parseable.local_cache_path.is_some()
180+
pub fn is_hot_tier_enabled(&self) -> bool {
181+
self.parseable.hot_tier_storage_path.is_some() && self.parseable.hot_tier_time_range.is_some()
182182
}
183183
}
184184

server/src/storage.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use chrono::Local;
2222

2323
use std::fmt::Debug;
2424

25+
pub mod hot_tier;
2526
mod localfs;
2627
mod metrics_layer;
2728
pub(crate) mod object_storage;

server/src/storage/hot_tier.rs

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
* Parseable Server (C) 2022 - 2024 Parseable, Inc.
3+
*
4+
* This program is free software: you can redistribute it and/or modify
5+
* it under the terms of the GNU Affero General Public License as
6+
* published by the Free Software Foundation, either version 3 of the
7+
* License, or (at your option) any later version.
8+
*
9+
* This program is distributed in the hope that it will be useful,
10+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
* GNU Affero General Public License for more details.
13+
*
14+
* You should have received a copy of the GNU Affero General Public License
15+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
16+
*
17+
*/
18+
19+
use chrono::Utc;
20+
use clokwerk::{AsyncScheduler, Job, TimeUnits};
21+
use std::path::PathBuf;
22+
use std::time::Duration;
23+
use tokio::fs as AsyncFs;
24+
25+
use crate::metadata::STREAM_INFO;
26+
use crate::option::CONFIG;
27+
28+
async fn cleanup() {
29+
let path = CONFIG.parseable.hot_tier_storage_path.as_ref().unwrap();
30+
let cleanup_interval = CONFIG
31+
.parseable
32+
.hot_tier_time_range
33+
.expect("alredy checked for none");
34+
let streams = STREAM_INFO.list_streams();
35+
36+
let now = Utc::now().date_naive();
37+
38+
for stream in streams {
39+
let path = PathBuf::from(path).join(stream);
40+
let mut files = AsyncFs::read_dir(path).await.unwrap();
41+
42+
while let Ok(file) = files.next_entry().await {
43+
if let Some(file) = file {
44+
if file.path().extension().expect("should have an extension") == "parquet" {
45+
let file_str = file
46+
.file_name()
47+
.to_str()
48+
.expect("should be valid str")
49+
.to_owned();
50+
// 2024-05-24
51+
52+
let date = file_str
53+
.split_once('.')
54+
.expect("should be valid split")
55+
.0
56+
.split_once('=')
57+
.expect("should be valid split")
58+
.0;
59+
60+
let date_time = chrono::NaiveDate::parse_from_str(date, "%Y-%m-%d")
61+
.expect("should be valid date");
62+
63+
let time_delta = now - date_time;
64+
if time_delta.num_days() > cleanup_interval {
65+
if let Err(err) = AsyncFs::remove_file(file.path()).await {
66+
log::error!("Failed to remove file: {:?}", err);
67+
}
68+
}
69+
}
70+
}
71+
}
72+
}
73+
}
74+
75+
async fn run() -> anyhow::Result<()> {
76+
log::info!("Setting up schedular for hot tier files cleanup");
77+
78+
let mut scheduler = AsyncScheduler::new();
79+
scheduler.every(1u32.day()).at("00:00").run(cleanup);
80+
81+
tokio::spawn(async move {
82+
loop {
83+
scheduler.run_pending().await;
84+
tokio::time::sleep(Duration::from_secs(10)).await;
85+
}
86+
});
87+
88+
Ok(())
89+
}
90+
91+
pub async fn setup_hot_tier_scheduler() -> anyhow::Result<()> {
92+
if CONFIG.is_hot_tier_enabled() {
93+
run().await?;
94+
}
95+
96+
Ok(())
97+
}

0 commit comments

Comments
 (0)