Skip to content

enhancement: added internal stream "meta" #801

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions server/src/event/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use std::{
};

use crate::{
handlers::http::cluster::INTERNAL_STREAM_NAME,
option::{Mode, CONFIG},
utils,
};
Expand Down Expand Up @@ -132,7 +133,7 @@ impl WriterTable {
parsed_timestamp: NaiveDateTime,
custom_partition_values: &HashMap<String, String>,
) -> Result<(), StreamWriterError> {
if CONFIG.parseable.mode != Mode::Query {
if CONFIG.parseable.mode != Mode::Query || stream_name == INTERNAL_STREAM_NAME {
stream_writer.lock().unwrap().push(
stream_name,
schema_key,
Expand Down Expand Up @@ -161,7 +162,7 @@ impl WriterTable {
) -> Result<(), StreamWriterError> {
match map.get(stream_name) {
Some(writer) => {
if CONFIG.parseable.mode != Mode::Query {
if CONFIG.parseable.mode != Mode::Query || stream_name == INTERNAL_STREAM_NAME {
writer.lock().unwrap().push(
stream_name,
schema_key,
Expand All @@ -174,7 +175,7 @@ impl WriterTable {
}
}
None => {
if CONFIG.parseable.mode != Mode::Query {
if CONFIG.parseable.mode != Mode::Query || stream_name == INTERNAL_STREAM_NAME {
let mut writer = Writer::default();
writer.push(
stream_name,
Expand Down
156 changes: 113 additions & 43 deletions server/src/handlers/http/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub mod utils;
use crate::handlers::http::cluster::utils::{
check_liveness, to_url_string, IngestionStats, QueriedStats,
};
use crate::handlers::http::ingest::PostError;
use crate::handlers::http::ingest::{ingest_internal_stream, PostError};
use crate::handlers::http::logstream::error::StreamError;
use crate::handlers::{STATIC_SCHEMA_FLAG, TIME_PARTITION_KEY};
use crate::option::CONFIG;
Expand All @@ -46,8 +46,13 @@ type IngestorMetadataArr = Vec<IngestorMetadata>;
use self::utils::StorageStats;

use super::base_path_without_preceding_slash;
use std::time::Duration;

use super::modal::IngestorMetadata;
use clokwerk::{AsyncScheduler, Interval};
pub const INTERNAL_STREAM_NAME: &str = "meta";

const CLUSTER_METRICS_INTERVAL_SECONDS: Interval = clokwerk::Interval::Minutes(1);

pub async fn sync_cache_with_ingestors(
url: &str,
Expand Down Expand Up @@ -432,50 +437,11 @@ pub async fn get_cluster_info() -> Result<impl Responder, StreamError> {
}

pub async fn get_cluster_metrics() -> Result<impl Responder, PostError> {
let ingestor_metadata = get_ingestor_info().await.map_err(|err| {
log::error!("Fatal: failed to get ingestor info: {:?}", err);
PostError::Invalid(err)
let dresses = fetch_cluster_metrics().await.map_err(|err| {
log::error!("Fatal: failed to fetch cluster metrics: {:?}", err);
PostError::Invalid(err.into())
})?;

let mut dresses = vec![];

for ingestor in ingestor_metadata {
let uri = Url::parse(&format!(
"{}{}/metrics",
&ingestor.domain_name,
base_path_without_preceding_slash()
))
.map_err(|err| {
PostError::Invalid(anyhow::anyhow!("Invalid URL in Ingestor Metadata: {}", err))
})?;

let res = reqwest::Client::new()
.get(uri)
.header(header::CONTENT_TYPE, "application/json")
.send()
.await;

if let Ok(res) = res {
let text = res.text().await.map_err(PostError::NetworkError)?;
let lines: Vec<Result<String, std::io::Error>> =
text.lines().map(|line| Ok(line.to_owned())).collect_vec();

let sample = prometheus_parse::Scrape::parse(lines.into_iter())
.map_err(|err| PostError::CustomError(err.to_string()))?
.samples;

dresses.push(Metrics::from_prometheus_samples(
sample,
ingestor.domain_name,
));
} else {
log::warn!(
"Failed to fetch metrics from ingestor: {}\n",
ingestor.domain_name,
);
}
}

Ok(actix_web::HttpResponse::Ok().json(dresses))
}

Expand Down Expand Up @@ -545,3 +511,107 @@ pub async fn remove_ingestor(req: HttpRequest) -> Result<impl Responder, PostErr
log::info!("{}", &msg);
Ok((msg, StatusCode::OK))
}

async fn fetch_cluster_metrics() -> Result<Vec<Metrics>, PostError> {
let ingestor_metadata = get_ingestor_info().await.map_err(|err| {
log::error!("Fatal: failed to get ingestor info: {:?}", err);
PostError::Invalid(err)
})?;

let mut dresses = vec![];

for ingestor in ingestor_metadata {
let uri = Url::parse(&format!(
"{}{}/metrics",
&ingestor.domain_name,
base_path_without_preceding_slash()
))
.map_err(|err| {
PostError::Invalid(anyhow::anyhow!("Invalid URL in Ingestor Metadata: {}", err))
})?;

let res = reqwest::Client::new()
.get(uri)
.header(header::CONTENT_TYPE, "application/json")
.send()
.await;

if let Ok(res) = res {
let text = res.text().await.map_err(PostError::NetworkError)?;
let lines: Vec<Result<String, std::io::Error>> =
text.lines().map(|line| Ok(line.to_owned())).collect_vec();

let sample = prometheus_parse::Scrape::parse(lines.into_iter())
.map_err(|err| PostError::CustomError(err.to_string()))?
.samples;
let ingestor_metrics = Metrics::from_prometheus_samples(sample, &ingestor)
.await
.map_err(|err| {
log::error!("Fatal: failed to get ingestor metrics: {:?}", err);
PostError::Invalid(err.into())
})?;
dresses.push(ingestor_metrics);
} else {
log::warn!(
"Failed to fetch metrics from ingestor: {}\n",
&ingestor.domain_name,
);
}
}
Ok(dresses)
}

pub fn init_cluster_metrics_schedular() -> Result<(), PostError> {
log::info!("Setting up schedular for cluster metrics ingestion");

let mut scheduler = AsyncScheduler::new();
scheduler
.every(CLUSTER_METRICS_INTERVAL_SECONDS)
.run(move || async {
let result: Result<(), PostError> = async {
let cluster_metrics = fetch_cluster_metrics().await;
if let Ok(metrics) = cluster_metrics {
if !metrics.is_empty() {
log::info!("Cluster metrics fetched successfully from all ingestors");
if let Ok(metrics_bytes) = serde_json::to_vec(&metrics) {
let stream_name = INTERNAL_STREAM_NAME;

if matches!(
ingest_internal_stream(
stream_name.to_string(),
bytes::Bytes::from(metrics_bytes),
)
.await,
Ok(())
) {
log::info!(
"Cluster metrics successfully ingested into internal stream"
);
} else {
log::error!(
"Failed to ingest cluster metrics into internal stream"
);
}
} else {
log::error!("Failed to serialize cluster metrics");
}
}
}
Ok(())
}
.await;

if let Err(err) = result {
log::error!("Error in cluster metrics scheduler: {:?}", err);
}
});

tokio::spawn(async move {
loop {
scheduler.run_pending().await;
tokio::time::sleep(Duration::from_secs(10)).await;
}
});

Ok(())
}
48 changes: 47 additions & 1 deletion server/src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*
*/

use super::cluster::INTERNAL_STREAM_NAME;
use super::logstream::error::CreateStreamError;
use super::{kinesis, otel};
use crate::event::{
Expand Down Expand Up @@ -52,6 +53,12 @@ pub async fn ingest(req: HttpRequest, body: Bytes) -> Result<HttpResponse, PostE
.find(|&(key, _)| key == STREAM_NAME_HEADER_KEY)
{
let stream_name = stream_name.to_str().unwrap().to_owned();
if stream_name.eq(INTERNAL_STREAM_NAME) {
return Err(PostError::Invalid(anyhow::anyhow!(
"Stream {} is an internal stream and cannot be ingested into",
stream_name
)));
}
create_stream_if_not_exists(&stream_name).await?;

flatten_and_push_logs(req, body, stream_name).await?;
Expand All @@ -61,6 +68,40 @@ pub async fn ingest(req: HttpRequest, body: Bytes) -> Result<HttpResponse, PostE
}
}

pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<(), PostError> {
create_stream_if_not_exists(&stream_name).await?;
let size: usize = body.len();
let parsed_timestamp = Utc::now().naive_utc();
let (rb, is_first) = {
let body_val: Value = serde_json::from_slice(&body)?;
let hash_map = STREAM_INFO.read().unwrap();
let schema = hash_map
.get(&stream_name)
.ok_or(PostError::StreamNotFound(stream_name.clone()))?
.schema
.clone();
let event = format::json::Event {
data: body_val,
tags: String::default(),
metadata: String::default(),
};
event.into_recordbatch(schema, None, None)?
};
event::Event {
rb,
stream_name,
origin_format: "json",
origin_size: size as u64,
is_first_event: is_first,
parsed_timestamp,
time_partition: None,
custom_partition_values: HashMap::new(),
}
.process()
.await?;
Ok(())
}

async fn flatten_and_push_logs(
req: HttpRequest,
body: Bytes,
Expand Down Expand Up @@ -93,7 +134,12 @@ async fn flatten_and_push_logs(
// fails if the logstream does not exist
pub async fn post_event(req: HttpRequest, body: Bytes) -> Result<HttpResponse, PostError> {
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();

if stream_name.eq(INTERNAL_STREAM_NAME) {
return Err(PostError::Invalid(anyhow::anyhow!(
"Stream {} is an internal stream and cannot be ingested into",
stream_name
)));
}
flatten_and_push_logs(req, body, stream_name).await?;
Ok(HttpResponse::Ok().finish())
}
Expand Down
6 changes: 4 additions & 2 deletions server/src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

use self::error::{CreateStreamError, StreamError};
use super::base_path_without_preceding_slash;
use super::cluster::fetch_stats_from_ingestors;
use super::cluster::utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats};
use super::cluster::{fetch_stats_from_ingestors, INTERNAL_STREAM_NAME};
use crate::alerts::Alerts;
use crate::handlers::{
CUSTOM_PARTITION_KEY, STATIC_SCHEMA_FLAG, TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY,
Expand Down Expand Up @@ -591,7 +591,9 @@ pub async fn create_stream(
schema: Arc<Schema>,
) -> Result<(), CreateStreamError> {
// fail to proceed if invalid stream name
validator::stream_name(&stream_name)?;
if stream_name.ne(INTERNAL_STREAM_NAME) {
validator::stream_name(&stream_name)?;
}

// Proceed to create log stream if it doesn't exist
let storage = CONFIG.storage().get_object_store();
Expand Down
3 changes: 3 additions & 0 deletions server/src/handlers/http/modal/ingest_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::handlers::http::middleware::RouteExt;
use crate::localcache::LocalCacheManager;
use crate::metadata;
use crate::metrics;
use crate::migration;
use crate::migration::metadata_migration::migrate_ingester_metadata;
use crate::rbac;
use crate::rbac::role::Action;
Expand Down Expand Up @@ -328,6 +329,8 @@ impl IngestServer {
let prometheus = metrics::build_metrics_handler();
CONFIG.storage().register_store_metrics(&prometheus);

migration::run_migration(&CONFIG).await?;

let storage = CONFIG.storage().get_object_store();
if let Err(err) = metadata::STREAM_INFO.load(&*storage).await {
log::warn!("could not populate local metadata. {:?}", err);
Expand Down
41 changes: 35 additions & 6 deletions server/src/handlers/http/modal/query_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@
*/

use crate::handlers::airplane;
use crate::handlers::http::cluster;
use crate::handlers::http::cluster::{self, init_cluster_metrics_schedular};
use crate::handlers::http::middleware::RouteExt;
use crate::handlers::http::{base_path, cross_origin_config, API_BASE_PATH, API_VERSION};

use crate::rbac::role::Action;
use crate::sync;
use crate::{analytics, banner, metadata, metrics, migration, rbac, storage};
use actix_web::web;
use actix_web::web::ServiceConfig;
Expand Down Expand Up @@ -185,11 +186,39 @@ impl QueryServer {
analytics::init_analytics_scheduler()?;
}

tokio::spawn(airplane::server());

self.start(prometheus, CONFIG.parseable.openid.clone())
.await?;
if matches!(init_cluster_metrics_schedular(), Ok(())) {
log::info!("Cluster metrics scheduler started successfully");
}
let (localsync_handler, mut localsync_outbox, localsync_inbox) = sync::run_local_sync();
let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) =
sync::object_store_sync();

Ok(())
tokio::spawn(airplane::server());
let app = self.start(prometheus, CONFIG.parseable.openid.clone());

tokio::pin!(app);
loop {
tokio::select! {
e = &mut app => {
// actix server finished .. stop other threads and stop the server
remote_sync_inbox.send(()).unwrap_or(());
localsync_inbox.send(()).unwrap_or(());
localsync_handler.join().unwrap_or(());
remote_sync_handler.join().unwrap_or(());
return e
},
_ = &mut localsync_outbox => {
// crash the server if localsync fails for any reason
// panic!("Local Sync thread died. Server will fail now!")
return Err(anyhow::Error::msg("Failed to sync local data to drive. Please restart the Parseable server.\n\nJoin us on Parseable Slack if the issue persists after restart : https://launchpass.com/parseable"))
},
_ = &mut remote_sync_outbox => {
// remote_sync failed, this is recoverable by just starting remote_sync thread again
remote_sync_handler.join().unwrap_or(());
(remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync();
}

};
}
}
}
Loading