Skip to content

Commit 938d3e2

Browse files
env to collect stats, refactor
1 parent d78169f commit 938d3e2

File tree

4 files changed

+18
-10
lines changed

4 files changed

+18
-10
lines changed

src/cli.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -397,6 +397,15 @@ pub struct Options {
397397
help = "Maximum number of field statistics to store"
398398
)]
399399
pub max_field_statistics: usize,
400+
401+
// collect dataset stats
402+
#[arg(
403+
long,
404+
env = "P_COLLECT_DATASET_STATS",
405+
default_value = "false",
406+
help = "Enable/Disable collecting dataset stats"
407+
)]
408+
pub collect_dataset_stats: bool,
400409
}
401410

402411
#[derive(Parser, Debug)]

src/handlers/http/prism_home.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,7 @@ pub async fn home_api(req: HttpRequest) -> Result<impl Responder, PrismHomeError
3939
let query_map = web::Query::<HashMap<String, String>>::from_query(req.query_string())
4040
.map_err(|_| PrismHomeError::InvalidQueryParameter(HOME_QUERY_PARAM.to_string()))?;
4141

42-
let include_internal = query_map
43-
.get(HOME_QUERY_PARAM)
44-
.map_or(false, |v| v == "true");
42+
let include_internal = query_map.get(HOME_QUERY_PARAM).is_some_and(|v| v == "true");
4543

4644
let res = generate_home_response(&key, include_internal).await?;
4745

src/parseable/streams.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -518,7 +518,9 @@ impl Stream {
518518

519519
if let Some(mut schema) = schema {
520520
// calculate field stats for all user defined streams
521-
if self.get_stream_type() != StreamType::Internal {
521+
if self.get_stream_type() != StreamType::Internal
522+
&& PARSEABLE.options.collect_dataset_stats
523+
{
522524
let stats_schema = schema.clone();
523525
let stream_name = self.stream_name.clone();
524526
let stats_rbs = rbs.clone();
@@ -1199,11 +1201,10 @@ async fn query_distinct_stats(
11991201
if let Ok(df) = ctx.sql(&sql).await {
12001202
if let Ok(batches) = df.collect().await {
12011203
for rb in batches {
1202-
let counts = rb
1203-
.column(0)
1204-
.as_any()
1205-
.downcast_ref::<Int64Array>()
1206-
.expect("Counts should be Int64Array");
1204+
let Some(counts) = rb.column(0).as_any().downcast_ref::<Int64Array>() else {
1205+
warn!("Unexpected type for count column in stats query");
1206+
continue;
1207+
};
12071208
let values = rb.column(1).as_ref();
12081209
for i in 0..rb.num_rows() {
12091210
let value = format_arrow_value(values, i);

src/prism/home/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ pub async fn generate_home_response(
127127
if !include_internal
128128
&& metadata
129129
.iter()
130-
.any(|m| m.stream_type == StreamType::Internal)
130+
.all(|m| m.stream_type == StreamType::Internal)
131131
{
132132
continue;
133133
}

0 commit comments

Comments
 (0)