Skip to content

Impl Hot Tier to fetch data from ingesters #804

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions server/src/banner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,9 @@ async fn storage_info(config: &Config) {
config.staging_dir().to_string_lossy(),
);

if let Some(path) = &config.parseable.local_cache_path {
if let Some(path) = &config.parseable.hot_tier_storage_path {
let size: SpecificSize<human_size::Gigibyte> =
SpecificSize::new(config.parseable.local_cache_size as f64, human_size::Byte)
SpecificSize::new(config.parseable.hot_tier_size as f64, human_size::Byte)
.unwrap()
.into();

Expand Down
2 changes: 1 addition & 1 deletion server/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ pub async fn get_first_event(
let time_partition = meta.time_partition;
if manifests.is_empty() {
log::info!("No manifest found for stream {stream_name}");
return Err(ObjectStorageError::Custom("No manifest found".to_string()));
return Err(ObjectStorageError::Custom("No manifest found"));
}
let manifest = &manifests[0];
let path = partition_path(
Expand Down
48 changes: 31 additions & 17 deletions server/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,14 @@ pub struct Cli {
/// for incoming events and local cache
pub local_staging_path: PathBuf,

/// The local cache path is used for speeding up query on latest data
pub local_cache_path: Option<PathBuf>,
/// The local hot_tier path is used for speeding up query on latest data
pub hot_tier_storage_path: Option<PathBuf>,

/// Size for local cache
pub local_cache_size: u64,
/// Size for local hot_tier
pub hot_tier_size: u64,

/// Size for local hot_tier
pub hot_tier_time_range: Option<i64>,

/// Username for the basic authentication on the server
pub username: String,
Expand Down Expand Up @@ -107,10 +110,11 @@ impl Cli {
pub const ADDRESS: &'static str = "address";
pub const DOMAIN_URI: &'static str = "origin";
pub const STAGING: &'static str = "local-staging-path";
pub const CACHE: &'static str = "cache-path";
pub const QUERY_CACHE: &'static str = "query-cache-path";
pub const QUERY_CACHE_SIZE: &'static str = "query-cache-size";
pub const CACHE_SIZE: &'static str = "cache-size";
pub const HOT_TIER: &'static str = "hot-tier-path";
pub const HOT_TIER_TIME_RANGE: &'static str = "hot-tier-time-range";
pub const HOT_TIER_SIZE: &'static str = "hot-tier-size";
pub const USERNAME: &'static str = "username";
pub const PASSWORD: &'static str = "password";
pub const CHECK_UPDATE: &'static str = "check-update";
Expand Down Expand Up @@ -180,25 +184,33 @@ impl Cli {
.next_line_help(true),
)
.arg(
Arg::new(Self::CACHE)
.long(Self::CACHE)
.env("P_CACHE_DIR")
Arg::new(Self::HOT_TIER)
.long(Self::HOT_TIER)
.env("P_HOT_TIER_DIR")
.value_name("DIR")
.value_parser(validation::canonicalize_path)
.help("Local path on this device to be used for caching data")
.next_line_help(true),
)
.arg(
Arg::new(Self::CACHE_SIZE)
.long(Self::CACHE_SIZE)
.env("P_CACHE_SIZE")
Arg::new(Self::HOT_TIER_SIZE)
.long(Self::HOT_TIER_SIZE)
.env("P_HOT_TIER_SIZE")
.value_name("size")
.default_value("1GiB")
.value_parser(validation::cache_size)
.help("Maximum allowed cache size for all streams combined (In human readable format, e.g 1GiB, 2GiB, 100MB)")
.help("Maximum allowed cache size for all streams combined (In human readable format, e.g 1GiB, 2GiB)")
.next_line_help(true),
)
.arg(
Arg::new(Self::HOT_TIER_TIME_RANGE)
.long(Self::HOT_TIER_TIME_RANGE)
.env("P_HOT_TIER_TIME_RANGE")
.value_name("days")
.value_parser(value_parser!(i64))
.help("Maximum allowed time in days for all streams combined (In human readable format, e.g 1, 2)")
.next_line_help(true),
)

.arg(
Arg::new(Self::QUERY_CACHE)
.long(Self::QUERY_CACHE)
Expand Down Expand Up @@ -398,12 +410,14 @@ impl FromArgMatches for Cli {
}

fn update_from_arg_matches(&mut self, m: &clap::ArgMatches) -> Result<(), clap::Error> {
self.local_cache_path = m.get_one::<PathBuf>(Self::CACHE).cloned();
self.hot_tier_storage_path = m.get_one::<PathBuf>(Self::HOT_TIER).cloned();
self.query_cache_path = m.get_one::<PathBuf>(Self::QUERY_CACHE).cloned();
self.tls_cert_path = m.get_one::<PathBuf>(Self::TLS_CERT).cloned();
self.tls_key_path = m.get_one::<PathBuf>(Self::TLS_KEY).cloned();
self.domain_address = m.get_one::<Url>(Self::DOMAIN_URI).cloned();

self.hot_tier_time_range = m.get_one::<i64>(Self::HOT_TIER_TIME_RANGE).cloned();

self.address = m
.get_one::<String>(Self::ADDRESS)
.cloned()
Expand All @@ -418,8 +432,8 @@ impl FromArgMatches for Cli {
.get_one::<PathBuf>(Self::STAGING)
.cloned()
.expect("default value for staging");
self.local_cache_size = m
.get_one::<u64>(Self::CACHE_SIZE)
self.hot_tier_size = m
.get_one::<u64>(Self::HOT_TIER_SIZE)
.cloned()
.expect("default value for cache size");
self.query_cache_size = m
Expand Down
11 changes: 9 additions & 2 deletions server/src/handlers/airplane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ use crate::handlers::http::query::{
use crate::query::{TableScanVisitor, QUERY_SESSION};
use crate::querycache::QueryCacheManager;
use crate::utils::arrow::flight::{
append_temporary_events, get_query_from_ticket, into_flight_data, run_do_get_rpc,
send_to_ingester,
append_temporary_events, get_from_ingester_cache, get_query_from_ticket, into_flight_data,
run_do_get_rpc, send_to_ingester,
};
use arrow_flight::{
flight_service_server::FlightService, Action, ActionType, Criteria, Empty, FlightData,
Expand Down Expand Up @@ -203,6 +203,13 @@ impl FlightService for AirServiceImpl {
.await
.map_err(|_| Status::internal("Failed to parse query"))?;

// deal with ingester local cache
if let Some(early) =
get_from_ingester_cache(&query.start, &query.end, &stream_name, ticket.clone()).await
{
return into_flight_data(early);
}

let event =
if send_to_ingester(query.start.timestamp_millis(), query.end.timestamp_millis()) {
let sql = format!("select * from {}", &stream_name);
Expand Down
4 changes: 2 additions & 2 deletions server/src/handlers/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ pub async fn fetch_schema(stream_name: &str) -> anyhow::Result<arrow_schema::Sch

/// unused for now, might need it later
#[allow(unused)]
pub async fn send_query_request_to_ingestor(query: &Query) -> anyhow::Result<Vec<Value>> {
pub async fn send_query_request_to_ingestor(query: &Query) -> anyhow::Result<Value> {
// send the query request to the ingestor
let mut res = vec![];
let ima = get_ingestor_info().await?;
Expand Down Expand Up @@ -128,5 +128,5 @@ pub async fn send_query_request_to_ingestor(query: &Query) -> anyhow::Result<Vec
}
}

Ok(res)
Ok(Value::Array(res))
}
27 changes: 10 additions & 17 deletions server/src/handlers/http/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,11 @@ pub async fn sync_cache_with_ingestors(
ingestor: IngestorMetadata,
body: bool,
) -> Result<(), StreamError> {
if !utils::check_liveness(&ingestor.domain_name).await {
return Ok(());
}
let request_body: Bytes = Bytes::from(body.to_string());
let client = reqwest::Client::new();
let resp = client
reqwest::Client::new()
.put(url)
.header(header::CONTENT_TYPE, "application/json")
.header(header::AUTHORIZATION, ingestor.token)
.body(request_body)
.body(Bytes::from(body.to_string()))
.send()
.await
.map_err(|err| {
Expand All @@ -83,13 +78,15 @@ pub async fn sync_cache_with_ingestors(

// if the response is not successful, log the error and return a custom error
// this could be a bit too much, but we need to be sure it covers all cases
/*
if !resp.status().is_success() {
log::error!(
"failed to set cache: {}\nResponse Returned: {:?}",
ingestor.domain_name,
resp.text().await
);
}
*/

Ok(())
}
Expand Down Expand Up @@ -345,7 +342,7 @@ pub async fn send_retention_cleanup_request(
ingestor.domain_name,
err
);
ObjectStorageError::Custom(err.to_string())
ObjectStorageError::ReqwestError(err)
})?;

// if the response is not successful, log the error and return a custom error
Expand All @@ -360,7 +357,7 @@ pub async fn send_retention_cleanup_request(

let resp_data = resp.bytes().await.map_err(|err| {
log::error!("Fatal: failed to parse response to bytes: {:?}", err);
ObjectStorageError::Custom(err.to_string())
err
})?;

first_event_at = String::from_utf8_lossy(&resp_data).to_string();
Expand Down Expand Up @@ -576,14 +573,10 @@ pub fn init_cluster_metrics_schedular() -> Result<(), PostError> {
if let Ok(metrics_bytes) = serde_json::to_vec(&metrics) {
let stream_name = INTERNAL_STREAM_NAME;

if matches!(
ingest_internal_stream(
stream_name.to_string(),
bytes::Bytes::from(metrics_bytes),
)
.await,
Ok(())
) {
if ingest_internal_stream(stream_name, metrics_bytes)
.await
.is_ok()
{
log::info!(
"Cluster metrics successfully ingested into internal stream"
);
Expand Down
10 changes: 5 additions & 5 deletions server/src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,16 @@ pub async fn ingest(req: HttpRequest, body: Bytes) -> Result<HttpResponse, PostE
}
}

pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<(), PostError> {
create_stream_if_not_exists(&stream_name).await?;
pub async fn ingest_internal_stream(stream_name: &str, body: Vec<u8>) -> Result<(), PostError> {
create_stream_if_not_exists(stream_name).await?;
let size: usize = body.len();
let parsed_timestamp = Utc::now().naive_utc();
let (rb, is_first) = {
let body_val: Value = serde_json::from_slice(&body)?;
let hash_map = STREAM_INFO.read().unwrap();
let schema = hash_map
.get(&stream_name)
.ok_or(PostError::StreamNotFound(stream_name.clone()))?
.get(stream_name)
.ok_or(PostError::StreamNotFound(stream_name.to_owned()))?
.schema
.clone();
let event = format::json::Event {
Expand All @@ -92,7 +92,7 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<
};
event::Event {
rb,
stream_name,
stream_name: stream_name.to_string(),
origin_format: "json",
origin_size: size as u64,
is_first_event: is_first,
Expand Down
18 changes: 9 additions & 9 deletions server/src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ pub async fn get_cache_enabled(req: HttpRequest) -> Result<impl Responder, Strea

match CONFIG.parseable.mode {
Mode::Ingest | Mode::All => {
if CONFIG.parseable.local_cache_path.is_none() {
if CONFIG.parseable.hot_tier_storage_path.is_none() {
return Err(StreamError::CacheNotEnabled(stream_name));
}
}
Expand Down Expand Up @@ -430,31 +430,31 @@ pub async fn put_enable_cache(
stream_name
);

super::cluster::sync_cache_with_ingestors(&url, ingestor.clone(), *body).await?;
super::cluster::sync_cache_with_ingestors(&url, ingestor, *body).await?;
}
}
Mode::Ingest => {
if CONFIG.parseable.local_cache_path.is_none() {
if CONFIG.parseable.hot_tier_storage_path.is_none() {
return Err(StreamError::CacheNotEnabled(stream_name));
}
// here the ingest server has not found the stream
// so it should check if the stream exists in storage
let check = storage
let check_if_stream_exists = storage
.list_streams()
.await?
.iter()
.map(|stream| stream.name.clone())
.map(|stream| &stream.name)
.contains(&stream_name);

if !check {
log::error!("Stream {} not found", stream_name.clone());
if !check_if_stream_exists {
log::error!("Stream {} not found", &stream_name);
return Err(StreamError::StreamNotFound(stream_name.clone()));
}
metadata::STREAM_INFO
.upsert_stream_info(
&*storage,
LogStream {
name: stream_name.clone().to_owned(),
name: stream_name.clone(),
},
)
.await
Expand All @@ -464,7 +464,7 @@ pub async fn put_enable_cache(
if !metadata::STREAM_INFO.stream_exists(&stream_name) {
return Err(StreamError::StreamNotFound(stream_name));
}
if CONFIG.parseable.local_cache_path.is_none() {
if CONFIG.parseable.hot_tier_storage_path.is_none() {
return Err(StreamError::CacheNotEnabled(stream_name));
}
}
Expand Down
Loading
Loading