Skip to content

Commit 45122d9

Browse files
include null, add query param in prism home api
1 parent 4cfcbdd commit 45122d9

File tree

3 files changed

+34
-19
lines changed

3 files changed

+34
-19
lines changed

src/handlers/http/prism_home.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use crate::{
2626
};
2727

2828
const HOME_SEARCH_QUERY_PARAM: &str = "key";
29-
29+
const HOME_QUERY_PARAM: &str = "includeInternal";
3030
/// Fetches the data to populate Prism's home
3131
///
3232
///
@@ -36,8 +36,14 @@ const HOME_SEARCH_QUERY_PARAM: &str = "key";
3636
pub async fn home_api(req: HttpRequest) -> Result<impl Responder, PrismHomeError> {
3737
let key = extract_session_key_from_req(&req)
3838
.map_err(|err| PrismHomeError::Anyhow(anyhow::Error::msg(err.to_string())))?;
39+
let query_map = web::Query::<HashMap<String, String>>::from_query(req.query_string())
40+
.map_err(|_| PrismHomeError::InvalidQueryParameter(HOME_QUERY_PARAM.to_string()))?;
3941

40-
let res = generate_home_response(&key).await?;
42+
let include_internal = query_map
43+
.get(HOME_QUERY_PARAM)
44+
.map_or(false, |v| v == "true");
45+
46+
let res = generate_home_response(&key, include_internal).await?;
4147

4248
Ok(web::Json(res))
4349
}
@@ -52,11 +58,12 @@ pub async fn home_search(req: HttpRequest) -> Result<impl Responder, PrismHomeEr
5258
return Ok(web::Json(serde_json::json!({})));
5359
}
5460

55-
let query_value = query_map
61+
let query_key = query_map
5662
.get(HOME_SEARCH_QUERY_PARAM)
5763
.ok_or_else(|| PrismHomeError::InvalidQueryParameter(HOME_SEARCH_QUERY_PARAM.to_string()))?
5864
.to_lowercase();
59-
let res = generate_home_search_response(&key, &query_value).await?;
65+
66+
let res = generate_home_search_response(&key, &query_key).await?;
6067
let json_res = serde_json::to_value(res)
6168
.map_err(|err| PrismHomeError::Anyhow(anyhow::Error::msg(err.to_string())))?;
6269

src/parseable/streams.rs

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,8 @@ use crate::{
5252
format::{LogSource, LogSourceEntry},
5353
DEFAULT_TIMESTAMP_KEY,
5454
},
55-
handlers::http::{
56-
cluster::INTERNAL_STREAM_NAME, ingest::PostError,
57-
modal::utils::ingest_utils::flatten_and_push_logs,
58-
},
5955
handlers::http::modal::{ingest_server::INGESTOR_META, query_server::QUERIER_META},
56+
handlers::http::{ingest::PostError, modal::utils::ingest_utils::flatten_and_push_logs},
6057
metadata::{LogStreamMetadata, SchemaVersion},
6158
metrics,
6259
option::Mode,
@@ -74,7 +71,7 @@ use super::{
7471
},
7572
LogStream, ARROW_FILE_EXTENSION,
7673
};
77-
74+
const DATASET_STATS_STREAM_NAME: &str = "pstats";
7875
const MAX_CONCURRENT_FIELD_STATS: usize = 10;
7976

8077
#[derive(Serialize, Debug)]
@@ -512,7 +509,7 @@ impl Stream {
512509
let (schema, rbs) = self.convert_disk_files_to_parquet(
513510
time_partition.as_ref(),
514511
custom_partition.as_ref(),
515-
init_signal,
512+
init_signal,
516513
shutdown_signal,
517514
)?;
518515
// check if there is already a schema file in staging pertaining to this stream
@@ -732,8 +729,8 @@ impl Stream {
732729
let mut writer = ArrowWriter::try_new(&mut part_file, schema.clone(), Some(props.clone()))?;
733730
for ref record in record_reader.merged_iter(schema.clone(), time_partition.cloned()) {
734731
writer.write(record)?;
735-
// Collect record batches for finding statistics later
736-
record_batches.push(record.clone());
732+
// Collect record batches for finding statistics later
733+
record_batches.push(record.clone());
737734
}
738735
writer.close()?;
739736

@@ -1033,11 +1030,10 @@ impl Stream {
10331030
record_batches: Vec<RecordBatch>,
10341031
schema: Arc<Schema>,
10351032
) -> Result<(), PostError> {
1036-
let stats_dataset_name = format!("dataset_{INTERNAL_STREAM_NAME}");
10371033
let log_source_entry = LogSourceEntry::new(LogSource::Json, HashSet::new());
10381034
PARSEABLE
10391035
.create_stream_if_not_exists(
1040-
&stats_dataset_name,
1036+
DATASET_STATS_STREAM_NAME,
10411037
StreamType::Internal,
10421038
vec![log_source_entry],
10431039
)
@@ -1061,7 +1057,7 @@ impl Stream {
10611057

10621058
flatten_and_push_logs(
10631059
stats_value,
1064-
&stats_dataset_name,
1060+
DATASET_STATS_STREAM_NAME,
10651061
&LogSource::Json,
10661062
&HashMap::new(),
10671063
)
@@ -1189,7 +1185,7 @@ impl Stream {
11891185
field_name: &str,
11901186
) -> Vec<DistinctStat> {
11911187
let sql = format!(
1192-
"select count(*) as distinct_count, \"{field_name}\" from \"{stream_name}\" where \"{field_name}\" is not null group by \"{field_name}\" order by distinct_count desc limit {}",
1188+
"select count(*) as distinct_count, \"{field_name}\" from \"{stream_name}\" group by \"{field_name}\" order by distinct_count desc limit {}",
11931189
PARSEABLE.options.max_field_statistics
11941190
);
11951191
let mut distinct_stats = Vec::new();
@@ -1300,7 +1296,8 @@ impl Streams {
13001296
.map(Arc::clone)
13011297
.collect();
13021298
for stream in streams {
1303-
joinset.spawn(async move { stream.flush_and_convert(init_signal, shutdown_signal).await });
1299+
joinset
1300+
.spawn(async move { stream.flush_and_convert(init_signal, shutdown_signal).await });
13041301
}
13051302
}
13061303
}

src/prism/home/mod.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use crate::{
3333
handlers::http::{cluster::fetch_daily_stats, logstream::error::StreamError},
3434
parseable::PARSEABLE,
3535
rbac::{map::SessionKey, role::Action, Users},
36-
storage::{ObjectStorageError, ObjectStoreFormat, STREAM_ROOT_DIRECTORY},
36+
storage::{ObjectStorageError, ObjectStoreFormat, StreamType, STREAM_ROOT_DIRECTORY},
3737
users::{dashboards::DASHBOARDS, filters::FILTERS},
3838
};
3939

@@ -88,7 +88,10 @@ pub struct HomeSearchResponse {
8888
resources: Vec<Resource>,
8989
}
9090

91-
pub async fn generate_home_response(key: &SessionKey) -> Result<HomeResponse, PrismHomeError> {
91+
pub async fn generate_home_response(
92+
key: &SessionKey,
93+
include_internal: bool,
94+
) -> Result<HomeResponse, PrismHomeError> {
9295
// Execute these operations concurrently
9396
let (stream_titles_result, alerts_info_result) =
9497
tokio::join!(get_stream_titles(key), get_alerts_info());
@@ -120,6 +123,14 @@ pub async fn generate_home_response(key: &SessionKey) -> Result<HomeResponse, Pr
120123
for result in stream_metadata_results {
121124
match result {
122125
Ok((stream, metadata, dataset_type)) => {
126+
// Skip internal streams if the flag is false
127+
if !include_internal
128+
&& metadata
129+
.iter()
130+
.any(|m| m.stream_type == StreamType::Internal)
131+
{
132+
continue;
133+
}
123134
stream_wise_stream_json.insert(stream.clone(), metadata);
124135
datasets.push(DataSet {
125136
title: stream,

0 commit comments

Comments
 (0)