diff --git a/server/src/banner.rs b/server/src/banner.rs index fa817eac1..272db6777 100644 --- a/server/src/banner.rs +++ b/server/src/banner.rs @@ -109,9 +109,9 @@ async fn storage_info(config: &Config) { config.staging_dir().to_string_lossy(), ); - if let Some(path) = &config.parseable.local_cache_path { + if let Some(path) = &config.parseable.hot_tier_storage_path { let size: SpecificSize = - SpecificSize::new(config.parseable.local_cache_size as f64, human_size::Byte) + SpecificSize::new(config.parseable.hot_tier_size as f64, human_size::Byte) .unwrap() .into(); diff --git a/server/src/catalog.rs b/server/src/catalog.rs index 304752ccf..6175cdf11 100644 --- a/server/src/catalog.rs +++ b/server/src/catalog.rs @@ -300,7 +300,7 @@ pub async fn get_first_event( let time_partition = meta.time_partition; if manifests.is_empty() { log::info!("No manifest found for stream {stream_name}"); - return Err(ObjectStorageError::Custom("No manifest found".to_string())); + return Err(ObjectStorageError::Custom("No manifest found")); } let manifest = &manifests[0]; let path = partition_path( diff --git a/server/src/cli.rs b/server/src/cli.rs index 961274035..783bf41f5 100644 --- a/server/src/cli.rs +++ b/server/src/cli.rs @@ -45,11 +45,14 @@ pub struct Cli { /// for incoming events and local cache pub local_staging_path: PathBuf, - /// The local cache path is used for speeding up query on latest data - pub local_cache_path: Option, + /// The local hot_tier path is used for speeding up query on latest data + pub hot_tier_storage_path: Option, - /// Size for local cache - pub local_cache_size: u64, + /// Size for local hot_tier + pub hot_tier_size: u64, + + /// Size for local hot_tier + pub hot_tier_time_range: Option, /// Username for the basic authentication on the server pub username: String, @@ -107,10 +110,11 @@ impl Cli { pub const ADDRESS: &'static str = "address"; pub const DOMAIN_URI: &'static str = "origin"; pub const STAGING: &'static str = "local-staging-path"; - pub const CACHE: &'static str = "cache-path"; pub const QUERY_CACHE: &'static str = "query-cache-path"; pub const QUERY_CACHE_SIZE: &'static str = "query-cache-size"; - pub const CACHE_SIZE: &'static str = "cache-size"; + pub const HOT_TIER: &'static str = "hot-tier-path"; + pub const HOT_TIER_TIME_RANGE: &'static str = "hot-tier-time-range"; + pub const HOT_TIER_SIZE: &'static str = "hot-tier-size"; pub const USERNAME: &'static str = "username"; pub const PASSWORD: &'static str = "password"; pub const CHECK_UPDATE: &'static str = "check-update"; @@ -180,25 +184,33 @@ impl Cli { .next_line_help(true), ) .arg( - Arg::new(Self::CACHE) - .long(Self::CACHE) - .env("P_CACHE_DIR") + Arg::new(Self::HOT_TIER) + .long(Self::HOT_TIER) + .env("P_HOT_TIER_DIR") .value_name("DIR") .value_parser(validation::canonicalize_path) .help("Local path on this device to be used for caching data") .next_line_help(true), ) .arg( - Arg::new(Self::CACHE_SIZE) - .long(Self::CACHE_SIZE) - .env("P_CACHE_SIZE") + Arg::new(Self::HOT_TIER_SIZE) + .long(Self::HOT_TIER_SIZE) + .env("P_HOT_TIER_SIZE") .value_name("size") .default_value("1GiB") .value_parser(validation::cache_size) - .help("Maximum allowed cache size for all streams combined (In human readable format, e.g 1GiB, 2GiB, 100MB)") + .help("Maximum allowed cache size for all streams combined (In human readable format, e.g 1GiB, 2GiB)") + .next_line_help(true), + ) + .arg( + Arg::new(Self::HOT_TIER_TIME_RANGE) + .long(Self::HOT_TIER_TIME_RANGE) + .env("P_HOT_TIER_TIME_RANGE") + .value_name("days") + .value_parser(value_parser!(i64)) + .help("Maximum allowed time in days for all streams combined (In human readable format, e.g 1, 2)") .next_line_help(true), ) - .arg( Arg::new(Self::QUERY_CACHE) .long(Self::QUERY_CACHE) @@ -398,12 +410,14 @@ impl FromArgMatches for Cli { } fn update_from_arg_matches(&mut self, m: &clap::ArgMatches) -> Result<(), clap::Error> { - self.local_cache_path = m.get_one::(Self::CACHE).cloned(); + self.hot_tier_storage_path = m.get_one::(Self::HOT_TIER).cloned(); self.query_cache_path = m.get_one::(Self::QUERY_CACHE).cloned(); self.tls_cert_path = m.get_one::(Self::TLS_CERT).cloned(); self.tls_key_path = m.get_one::(Self::TLS_KEY).cloned(); self.domain_address = m.get_one::(Self::DOMAIN_URI).cloned(); + self.hot_tier_time_range = m.get_one::(Self::HOT_TIER_TIME_RANGE).cloned(); + self.address = m .get_one::(Self::ADDRESS) .cloned() @@ -418,8 +432,8 @@ impl FromArgMatches for Cli { .get_one::(Self::STAGING) .cloned() .expect("default value for staging"); - self.local_cache_size = m - .get_one::(Self::CACHE_SIZE) + self.hot_tier_size = m + .get_one::(Self::HOT_TIER_SIZE) .cloned() .expect("default value for cache size"); self.query_cache_size = m diff --git a/server/src/handlers/airplane.rs b/server/src/handlers/airplane.rs index d6b814615..e00a881a7 100644 --- a/server/src/handlers/airplane.rs +++ b/server/src/handlers/airplane.rs @@ -46,8 +46,8 @@ use crate::handlers::http::query::{ use crate::query::{TableScanVisitor, QUERY_SESSION}; use crate::querycache::QueryCacheManager; use crate::utils::arrow::flight::{ - append_temporary_events, get_query_from_ticket, into_flight_data, run_do_get_rpc, - send_to_ingester, + append_temporary_events, get_from_ingester_cache, get_query_from_ticket, into_flight_data, + run_do_get_rpc, send_to_ingester, }; use arrow_flight::{ flight_service_server::FlightService, Action, ActionType, Criteria, Empty, FlightData, @@ -203,6 +203,13 @@ impl FlightService for AirServiceImpl { .await .map_err(|_| Status::internal("Failed to parse query"))?; + // deal with ingester local cache + if let Some(early) = + get_from_ingester_cache(&query.start, &query.end, &stream_name, ticket.clone()).await + { + return into_flight_data(early); + } + let event = if send_to_ingester(query.start.timestamp_millis(), query.end.timestamp_millis()) { let sql = format!("select * from {}", &stream_name); diff --git a/server/src/handlers/http.rs b/server/src/handlers/http.rs index 211d7d17d..62122060b 100644 --- a/server/src/handlers/http.rs +++ b/server/src/handlers/http.rs @@ -95,7 +95,7 @@ pub async fn fetch_schema(stream_name: &str) -> anyhow::Result anyhow::Result> { +pub async fn send_query_request_to_ingestor(query: &Query) -> anyhow::Result { // send the query request to the ingestor let mut res = vec![]; let ima = get_ingestor_info().await?; @@ -128,5 +128,5 @@ pub async fn send_query_request_to_ingestor(query: &Query) -> anyhow::Result Result<(), StreamError> { - if !utils::check_liveness(&ingestor.domain_name).await { - return Ok(()); - } - let request_body: Bytes = Bytes::from(body.to_string()); - let client = reqwest::Client::new(); - let resp = client + reqwest::Client::new() .put(url) .header(header::CONTENT_TYPE, "application/json") .header(header::AUTHORIZATION, ingestor.token) - .body(request_body) + .body(Bytes::from(body.to_string())) .send() .await .map_err(|err| { @@ -83,6 +78,7 @@ pub async fn sync_cache_with_ingestors( // if the response is not successful, log the error and return a custom error // this could be a bit too much, but we need to be sure it covers all cases + /* if !resp.status().is_success() { log::error!( "failed to set cache: {}\nResponse Returned: {:?}", @@ -90,6 +86,7 @@ pub async fn sync_cache_with_ingestors( resp.text().await ); } + */ Ok(()) } @@ -345,7 +342,7 @@ pub async fn send_retention_cleanup_request( ingestor.domain_name, err ); - ObjectStorageError::Custom(err.to_string()) + ObjectStorageError::ReqwestError(err) })?; // if the response is not successful, log the error and return a custom error @@ -360,7 +357,7 @@ pub async fn send_retention_cleanup_request( let resp_data = resp.bytes().await.map_err(|err| { log::error!("Fatal: failed to parse response to bytes: {:?}", err); - ObjectStorageError::Custom(err.to_string()) + err })?; first_event_at = String::from_utf8_lossy(&resp_data).to_string(); @@ -576,14 +573,10 @@ pub fn init_cluster_metrics_schedular() -> Result<(), PostError> { if let Ok(metrics_bytes) = serde_json::to_vec(&metrics) { let stream_name = INTERNAL_STREAM_NAME; - if matches!( - ingest_internal_stream( - stream_name.to_string(), - bytes::Bytes::from(metrics_bytes), - ) - .await, - Ok(()) - ) { + if ingest_internal_stream(stream_name, metrics_bytes) + .await + .is_ok() + { log::info!( "Cluster metrics successfully ingested into internal stream" ); diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index 2cb2ffe13..5ea38de01 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -71,16 +71,16 @@ pub async fn ingest(req: HttpRequest, body: Bytes) -> Result Result<(), PostError> { - create_stream_if_not_exists(&stream_name).await?; +pub async fn ingest_internal_stream(stream_name: &str, body: Vec) -> Result<(), PostError> { + create_stream_if_not_exists(stream_name).await?; let size: usize = body.len(); let parsed_timestamp = Utc::now().naive_utc(); let (rb, is_first) = { let body_val: Value = serde_json::from_slice(&body)?; let hash_map = STREAM_INFO.read().unwrap(); let schema = hash_map - .get(&stream_name) - .ok_or(PostError::StreamNotFound(stream_name.clone()))? + .get(stream_name) + .ok_or(PostError::StreamNotFound(stream_name.to_owned()))? .schema .clone(); let event = format::json::Event { @@ -92,7 +92,7 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result< }; event::Event { rb, - stream_name, + stream_name: stream_name.to_string(), origin_format: "json", origin_size: size as u64, is_first_event: is_first, diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index 15e343747..ede862c67 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -395,7 +395,7 @@ pub async fn get_cache_enabled(req: HttpRequest) -> Result { - if CONFIG.parseable.local_cache_path.is_none() { + if CONFIG.parseable.hot_tier_storage_path.is_none() { return Err(StreamError::CacheNotEnabled(stream_name)); } } @@ -430,31 +430,31 @@ pub async fn put_enable_cache( stream_name ); - super::cluster::sync_cache_with_ingestors(&url, ingestor.clone(), *body).await?; + super::cluster::sync_cache_with_ingestors(&url, ingestor, *body).await?; } } Mode::Ingest => { - if CONFIG.parseable.local_cache_path.is_none() { + if CONFIG.parseable.hot_tier_storage_path.is_none() { return Err(StreamError::CacheNotEnabled(stream_name)); } // here the ingest server has not found the stream // so it should check if the stream exists in storage - let check = storage + let check_if_stream_exists = storage .list_streams() .await? .iter() - .map(|stream| stream.name.clone()) + .map(|stream| &stream.name) .contains(&stream_name); - if !check { - log::error!("Stream {} not found", stream_name.clone()); + if !check_if_stream_exists { + log::error!("Stream {} not found", &stream_name); return Err(StreamError::StreamNotFound(stream_name.clone())); } metadata::STREAM_INFO .upsert_stream_info( &*storage, LogStream { - name: stream_name.clone().to_owned(), + name: stream_name.clone(), }, ) .await @@ -464,7 +464,7 @@ pub async fn put_enable_cache( if !metadata::STREAM_INFO.stream_exists(&stream_name) { return Err(StreamError::StreamNotFound(stream_name)); } - if CONFIG.parseable.local_cache_path.is_none() { + if CONFIG.parseable.hot_tier_storage_path.is_none() { return Err(StreamError::CacheNotEnabled(stream_name)); } } diff --git a/server/src/handlers/http/modal/ingest_server.rs b/server/src/handlers/http/modal/ingest_server.rs index 26ed25d8f..210eb65e8 100644 --- a/server/src/handlers/http/modal/ingest_server.rs +++ b/server/src/handlers/http/modal/ingest_server.rs @@ -29,10 +29,12 @@ use crate::migration::metadata_migration::migrate_ingester_metadata; use crate::rbac; use crate::rbac::role::Action; use crate::storage; +use crate::storage::hot_tier; use crate::storage::object_storage::ingestor_metadata_path; use crate::storage::object_storage::parseable_json_path; use crate::storage::staging; use crate::storage::ObjectStorageError; +use crate::storage::StorageMetadata; use crate::sync; use super::server::Server; @@ -188,6 +190,29 @@ impl IngestServer { .authorize_for_stream(Action::GetStats), ), ) + .service( + web::scope("/retention") + // PUT "/logstream/{logstream}/retention" ==> Set retention for given logstream + .route( + "", + web::put() + .to(logstream::put_retention) + .authorize_for_stream(Action::PutRetention), + ) + // GET "/logstream/{logstream}/retention" ==> Get retention for given logstream + .route( + "", + web::get() + .to(logstream::get_retention) + .authorize_for_stream(Action::GetRetention), + ) + .route( + "/cleanup", + web::post() + .to(logstream::retention_cleanup) + .authorize_for_stream(Action::PutRetention), + ), + ) .service( web::resource("/cache") // PUT "/logstream/{logstream}/cache" ==> Set retention for given logstream @@ -202,15 +227,6 @@ impl IngestServer { .to(logstream::get_cache_enabled) .authorize_for_stream(Action::GetCacheEnabled), ), - ) - .service( - web::scope("/retention").service( - web::resource("/cleanup").route( - web::post() - .to(logstream::retention_cleanup) - .authorize_for_stream(Action::PutRetention), - ), - ), ), ) } @@ -276,10 +292,26 @@ impl IngestServer { let path = parseable_json_path(); match store.get_object(&path).await { - Ok(_) => Ok(()), + Ok(bytes) => { + let size = serde_json::from_slice::(&bytes)?.hot_tier_capacity; + let hot_tier_enabled = CONFIG.is_hot_tier_enabled(); + match size { + Some(size) => { + if hot_tier_enabled && CONFIG.parseable.hot_tier_size != size { + return Err(ObjectStorageError::Custom("Hot Tier Capacity does not match with Other Nodes. Please check the hot tier capacity and try again.")); + } + } + None => { + if hot_tier_enabled { + return Err(ObjectStorageError::Custom("Hot Tier is active on Current Node but disabled on Other Nodes. Please set hot tier and try again.")); + } + } + } + // fall through + Ok(()) + } Err(_) => Err(ObjectStorageError::Custom( - "Query Server has not been started yet. Please start the querier server first." - .to_string(), + "Query Server has not been started yet. Please start the querier server first.", )), } } @@ -319,10 +351,9 @@ impl IngestServer { } async fn initialize(&self) -> anyhow::Result<()> { - // ! Undefined and Untested behaviour if let Some(cache_manager) = LocalCacheManager::global() { cache_manager - .validate(CONFIG.parseable.local_cache_size) + .validate(CONFIG.parseable.hot_tier_size) .await?; }; @@ -343,6 +374,8 @@ impl IngestServer { let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = sync::object_store_sync(); + hot_tier::setup_hot_tier_scheduler().await?; + tokio::spawn(airplane::server()); let app = self.start(prometheus, CONFIG.parseable.openid.clone()); diff --git a/server/src/handlers/http/modal/server.rs b/server/src/handlers/http/modal/server.rs index ad14750b4..ede16c8ff 100644 --- a/server/src/handlers/http/modal/server.rs +++ b/server/src/handlers/http/modal/server.rs @@ -34,6 +34,7 @@ use crate::metrics; use crate::migration; use crate::rbac; use crate::storage; +use crate::storage::hot_tier; use crate::sync; use crate::users::dashboards::DASHBOARDS; use crate::users::filters::FILTERS; @@ -304,29 +305,37 @@ impl Server { ), ) .service( - web::resource("/retention") + web::scope("/retention") // PUT "/logstream/{logstream}/retention" ==> Set retention for given logstream .route( + "", web::put() .to(logstream::put_retention) .authorize_for_stream(Action::PutRetention), ) // GET "/logstream/{logstream}/retention" ==> Get retention for given logstream .route( + "", web::get() .to(logstream::get_retention) .authorize_for_stream(Action::GetRetention), + ) + .route( + "/cleanup", + web::post() + .to(logstream::retention_cleanup) + .authorize_for_stream(Action::PutRetention), ), ) .service( web::resource("/cache") - // PUT "/logstream/{logstream}/cache" ==> Set retention for given logstream + // PUT "/logstream/{logstream}/cache" ==> Set cache for given logstream .route( web::put() .to(logstream::put_enable_cache) .authorize_for_stream(Action::PutCacheEnabled), ) - // GET "/logstream/{logstream}/cache" ==> Get retention for given logstream + // GET "/logstream/{logstream}/cache" ==> Get cache for given logstream .route( web::get() .to(logstream::get_cache_enabled) @@ -478,7 +487,7 @@ impl Server { async fn initialize(&self) -> anyhow::Result<()> { if let Some(cache_manager) = LocalCacheManager::global() { cache_manager - .validate(CONFIG.parseable.local_cache_size) + .validate(CONFIG.parseable.hot_tier_size) .await?; }; @@ -507,6 +516,8 @@ impl Server { analytics::init_analytics_scheduler()?; } + hot_tier::setup_hot_tier_scheduler().await?; + tokio::spawn(handlers::livetail::server()); tokio::spawn(handlers::airplane::server()); diff --git a/server/src/handlers/http/query.rs b/server/src/handlers/http/query.rs index 576f838fe..2ec0db413 100644 --- a/server/src/handlers/http/query.rs +++ b/server/src/handlers/http/query.rs @@ -52,7 +52,7 @@ use crate::storage::ObjectStorageError; use crate::utils::actix::extract_session_key_from_req; /// Query Request through http endpoint. -#[derive(Debug, serde::Deserialize, serde::Serialize)] +#[derive(Debug, serde::Deserialize, serde::Serialize, Clone)] #[serde(rename_all = "camelCase")] pub struct Query { pub query: String, diff --git a/server/src/localcache.rs b/server/src/localcache.rs index 07ee3eaaa..835e44242 100644 --- a/server/src/localcache.rs +++ b/server/src/localcache.rs @@ -79,7 +79,7 @@ impl LocalCacheManager { pub fn global() -> Option<&'static LocalCacheManager> { static INSTANCE: OnceCell = OnceCell::new(); - let cache_path = CONFIG.parseable.local_cache_path.as_ref()?; + let cache_path = CONFIG.parseable.hot_tier_storage_path.as_ref()?; Some(INSTANCE.get_or_init(|| { let cache_path = cache_path.clone(); @@ -87,7 +87,7 @@ impl LocalCacheManager { LocalCacheManager { filesystem: LocalFileSystem::new(), cache_path, - cache_capacity: CONFIG.parseable.local_cache_size, + cache_capacity: CONFIG.parseable.hot_tier_size, copy_options: CopyOptions { overwrite: true, skip_exist: false, diff --git a/server/src/migration.rs b/server/src/migration.rs index d8c89b578..81ed68ba2 100644 --- a/server/src/migration.rs +++ b/server/src/migration.rs @@ -230,6 +230,8 @@ pub async fn run_file_migration(config: &Config) -> anyhow::Result<()> { return Err(err.into()); } + // run metafile migration causes another read from object_store + // this can be removed by switching around the previous if let statement run_meta_file_migration(&object_store, old_meta_file_path).await?; run_stream_files_migration(&object_store).await?; diff --git a/server/src/migration/metadata_migration.rs b/server/src/migration/metadata_migration.rs index 72bab9db6..7ddea9960 100644 --- a/server/src/migration/metadata_migration.rs +++ b/server/src/migration/metadata_migration.rs @@ -121,6 +121,14 @@ pub fn update_v3(mut storage_metadata: JsonValue) -> JsonValue { ); } + let hot_tier_capacity = metadata.get("hot_tier_capacity"); + if hot_tier_capacity.is_none() { + metadata.insert( + "hot_tier_capacity".to_string(), + JsonValue::Bool(CONFIG.is_hot_tier_enabled()), + ); + } + storage_metadata } diff --git a/server/src/option.rs b/server/src/option.rs index e607c2062..71b643c47 100644 --- a/server/src/option.rs +++ b/server/src/option.rs @@ -74,7 +74,7 @@ Log Lake for the cloud-native world .exit() } - if cli.local_cache_path.is_some() { + if cli.hot_tier_storage_path.is_some() { create_parseable_cli_command() .error( ErrorKind::ValueValidation, @@ -131,11 +131,11 @@ Log Lake for the cloud-native world } if self.get_storage_mode_string() == "Local drive" { - return Err(ObjectStorageError::Custom(format!("Could not start the server because directory '{}' contains stale data, please use an empty directory, and restart the server.\n{}", self.storage.get_endpoint(), JOIN_COMMUNITY))); + return Err(ObjectStorageError::Invalid(anyhow::anyhow!("Could not start the server because directory '{}' contains stale data, please use an empty directory, and restart the server.\n{}", self.storage.get_endpoint(), JOIN_COMMUNITY))); } // S3 bucket mode - Err(ObjectStorageError::Custom(format!("Could not start the server because bucket '{}' contains stale data, please use an empty bucket and restart the server.\n{}", self.storage.get_endpoint(), JOIN_COMMUNITY))) + Err(ObjectStorageError::Invalid(anyhow::anyhow!("Could not start the server because bucket '{}' contains stale data, please use an empty bucket and restart the server.\n{}", self.storage.get_endpoint(), JOIN_COMMUNITY))) } pub fn storage(&self) -> Arc { @@ -147,11 +147,11 @@ Log Lake for the cloud-native world } pub fn cache_size(&self) -> u64 { - self.parseable.local_cache_size + self.parseable.hot_tier_size } pub fn cache_dir(&self) -> &Option { - &self.parseable.local_cache_path + &self.parseable.hot_tier_storage_path } pub fn is_default_creds(&self) -> bool { @@ -176,6 +176,11 @@ Log Lake for the cloud-native world Mode::All => "Standalone", } } + + pub fn is_hot_tier_enabled(&self) -> bool { + self.parseable.hot_tier_storage_path.is_some() + && self.parseable.hot_tier_time_range.is_some() + } } fn create_parseable_cli_command() -> Command { @@ -282,15 +287,15 @@ pub mod validation { use crate::option::MIN_CACHE_SIZE_BYTES; use human_size::{multiples, SpecificSize}; - pub fn file_path(s: &str) -> Result { + pub fn file_path(s: &str) -> Result { if s.is_empty() { - return Err("empty path".to_owned()); + return Err("empty path"); } let path = PathBuf::from(s); if !path.is_file() { - return Err("path specified does not point to an accessible file".to_string()); + return Err("path specified does not point to an accessible file"); } Ok(path) @@ -308,23 +313,23 @@ pub mod validation { Ok(absolute_path) } - pub fn canonicalize_path(s: &str) -> Result { + pub fn canonicalize_path(s: &str) -> Result { let path = PathBuf::from(s); - Ok(absolute_path(path).unwrap()) + absolute_path(path) } - pub fn socket_addr(s: &str) -> Result { + pub fn socket_addr(s: &str) -> Result { s.to_socket_addrs() .is_ok() .then_some(s.to_string()) - .ok_or_else(|| "Socket Address for server is invalid".to_string()) + .ok_or("Socket Address for server is invalid") } - pub fn url(s: &str) -> Result { - url::Url::parse(s).map_err(|_| "Invalid URL provided".to_string()) + pub fn url(s: &str) -> Result { + url::Url::parse(s).map_err(|_| "Invalid URL provided") } - fn human_size_to_bytes(s: &str) -> Result { + pub fn human_size_to_bytes(s: &str) -> Result { fn parse_and_map( s: &str, ) -> Result { @@ -337,23 +342,19 @@ pub mod validation { .or(parse_and_map::(s)) .or(parse_and_map::(s)) .or(parse_and_map::(s)) - .map_err(|_| "Could not parse given size".to_string())?; + .map_err(|_| "Could not parse given size")?; if size < MIN_CACHE_SIZE_BYTES { - return Err( - "Specified value of cache size is smaller than current minimum of 1GiB".to_string(), - ); + return Err("Specified value of cache size is smaller than current minimum of 1GiB"); } Ok(size) } - pub fn cache_size(s: &str) -> Result { + pub fn cache_size(s: &str) -> Result { let size = human_size_to_bytes(s)?; if size < MIN_CACHE_SIZE_BYTES { - return Err( - "Specified value of cache size is smaller than current minimum of 1GiB".to_string(), - ); + return Err("Specified value of cache size is smaller than current minimum of 1GiB"); } Ok(size) } diff --git a/server/src/storage.rs b/server/src/storage.rs index d28dcb00e..267d68407 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -16,14 +16,13 @@ * */ -use crate::{ - catalog::snapshot::Snapshot, metadata::error::stream_info::MetadataError, stats::FullStats, -}; - +use crate::metadata::error::stream_info::MetadataError; +use crate::{catalog::snapshot::Snapshot, stats::FullStats}; use chrono::Local; use std::fmt::Debug; +pub mod hot_tier; mod localfs; mod metrics_layer; pub(crate) mod object_storage; @@ -190,10 +189,11 @@ pub enum ObjectStorageError { NoSuchKey(String), #[error("Invalid Request: {0}")] Invalid(#[from] anyhow::Error), - + #[error("Error: {0}")] + ReqwestError(#[from] reqwest::Error), // custom #[error("{0}")] - Custom(String), + Custom(&'static str), // Could not connect to object storage #[error("Connection Error: {0}")] diff --git a/server/src/storage/hot_tier.rs b/server/src/storage/hot_tier.rs new file mode 100644 index 000000000..912e59eca --- /dev/null +++ b/server/src/storage/hot_tier.rs @@ -0,0 +1,97 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use chrono::Utc; +use clokwerk::{AsyncScheduler, Job, TimeUnits}; +use std::path::PathBuf; +use std::time::Duration; +use tokio::fs as AsyncFs; + +use crate::metadata::STREAM_INFO; +use crate::option::CONFIG; + +async fn cleanup() { + let path = CONFIG.parseable.hot_tier_storage_path.as_ref().unwrap(); + let cleanup_interval = CONFIG + .parseable + .hot_tier_time_range + .expect("alredy checked for none"); + let streams = STREAM_INFO.list_streams(); + + let now = Utc::now().date_naive(); + + for stream in streams { + let path = PathBuf::from(path).join(stream); + let mut files = AsyncFs::read_dir(path).await.unwrap(); + + while let Ok(file) = files.next_entry().await { + if let Some(file) = file { + if file.path().extension().expect("should have an extension") == "parquet" { + let file_str = file + .file_name() + .to_str() + .expect("should be valid str") + .to_owned(); + // 2024-05-24 + + let date = file_str + .split_once('.') + .expect("should be valid split") + .0 + .split_once('=') + .expect("should be valid split") + .0; + + let date_time = chrono::NaiveDate::parse_from_str(date, "%Y-%m-%d") + .expect("should be valid date"); + + let time_delta = now - date_time; + if time_delta.num_days() > cleanup_interval { + if let Err(err) = AsyncFs::remove_file(file.path()).await { + log::error!("Failed to remove file: {:?}", err); + } + } + } + } + } + } +} + +async fn run() -> anyhow::Result<()> { + log::info!("Setting up schedular for hot tier files cleanup"); + + let mut scheduler = AsyncScheduler::new(); + scheduler.every(1u32.day()).at("00:00").run(cleanup); + + tokio::spawn(async move { + loop { + scheduler.run_pending().await; + tokio::time::sleep(Duration::from_secs(10)).await; + } + }); + + Ok(()) +} + +pub async fn setup_hot_tier_scheduler() -> anyhow::Result<()> { + if CONFIG.is_hot_tier_enabled() { + run().await?; + } + + Ok(()) +} diff --git a/server/src/storage/s3.rs b/server/src/storage/s3.rs index 00ac3c01c..a04be57eb 100644 --- a/server/src/storage/s3.rs +++ b/server/src/storage/s3.rs @@ -16,6 +16,7 @@ * */ +use anyhow::anyhow; use async_trait::async_trait; use bytes::Bytes; use datafusion::datasource::listing::ListingTableUrl; @@ -265,9 +266,10 @@ impl S3 { if let Err(object_store::Error::NotFound { source, .. }) = &resp { let source_str = source.to_string(); if source_str.contains("NoSuchBucket") { - return Err(ObjectStorageError::Custom( - format!("Bucket '{}' does not exist in S3.", self.bucket).to_string(), - )); + return Err(ObjectStorageError::Invalid(anyhow!( + "Bucket '{}' does not exist in S3.", + self.bucket + ))); } } diff --git a/server/src/storage/store_metadata.rs b/server/src/storage/store_metadata.rs index d3ecd4040..1b2d5c4f0 100644 --- a/server/src/storage/store_metadata.rs +++ b/server/src/storage/store_metadata.rs @@ -61,10 +61,17 @@ pub struct StorageMetadata { pub roles: HashMap>, #[serde(default)] pub default_role: Option, + pub hot_tier_capacity: Option, } impl StorageMetadata { pub fn new() -> Self { + let hot_tier_capacity = if CONFIG.is_hot_tier_enabled() { + Some(CONFIG.parseable.hot_tier_size) + } else { + None + }; + Self { version: "v3".to_string(), mode: CONFIG.storage_name.to_owned(), @@ -76,6 +83,7 @@ impl StorageMetadata { streams: Vec::new(), roles: HashMap::default(), default_role: None, + hot_tier_capacity, } } @@ -127,7 +135,7 @@ pub async fn resolve_parseable_metadata() -> Result Result { - standalone_after_distributed(Mode::from_string(&metadata.server_mode).expect("mode should be valid at here")) - .map_err(|err| { - ObjectStorageError::Custom(err.to_string()) - })?; + standalone_after_distributed(Mode::from_string(&metadata.server_mode).expect("mode should be valid at here"))?; overwrite_remote = true; }, Mode::Query => { diff --git a/server/src/utils/arrow/flight.rs b/server/src/utils/arrow/flight.rs index 3b0b17eba..8465f85e6 100644 --- a/server/src/utils/arrow/flight.rs +++ b/server/src/utils/arrow/flight.rs @@ -19,6 +19,7 @@ use crate::event::Event; use crate::handlers::http::ingest::push_logs_unchecked; use crate::handlers::http::query::Query as QueryJson; +use crate::localcache::LocalCacheManager; use crate::metadata::STREAM_INFO; use crate::query::stream_schema_provider::include_now; use crate::{ @@ -31,10 +32,12 @@ use arrow_flight::encode::FlightDataEncoderBuilder; use arrow_flight::{FlightData, Ticket}; use arrow_ipc::writer::IpcWriteOptions; use arrow_select::concat::concat_batches; +use chrono::{DateTime, Utc}; use datafusion::logical_expr::BinaryExpr; use datafusion::prelude::Expr; use datafusion::scalar::ScalarValue; use futures::{stream, TryStreamExt}; +use serde_json::json; use tonic::{Request, Response, Status}; @@ -158,3 +161,62 @@ pub fn into_flight_data(records: Vec) -> Result, + end: &DateTime, + stream_name: &str, + ticket: QueryJson, +) -> Option> { + LocalCacheManager::global()?; + + let time_delta = *end - *start; + let goto_ingester = time_delta.num_days() + < CONFIG + .parseable + .hot_tier_time_range + .expect("alredy checked for none"); + let goto_ingester = goto_ingester + && STREAM_INFO + .read() + .expect("lock should not be poisoned") + .get(stream_name)? + .cache_enabled; + + if CONFIG.parseable.mode == Mode::Query && goto_ingester { + // send the grpc call to then ingesters, if fails continue with normal flow + let start_time = ticket.start_time; + let end_time = ticket.end_time; + let sql = ticket.query; + let out_ticket = json!({ + "query": sql, + "startTime": start_time, + "endTime": end_time + }) + .to_string(); + + // todo: cleanup the namespace + let ingester_metadatas = crate::handlers::http::cluster::get_ingestor_info() + .await + .ok()?; + let mut result_from_ingester: Vec = vec![]; + + let mut error = false; + for im in ingester_metadatas { + if let Ok(mut batches) = run_do_get_rpc(im, out_ticket.clone()).await { + result_from_ingester.append(&mut batches); + } else { + error = true; + break; + } + } + + if error { + None + } else { + Some(result_from_ingester) + } + } else { + None + } +}