Skip to content

Commit 8ce3850

Browse files
refactor
1 parent ce20362 commit 8ce3850

File tree

2 files changed

+38
-13
lines changed

2 files changed

+38
-13
lines changed

src/cli.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -388,6 +388,15 @@ pub struct Options {
388388
help = "Maximum level of flattening allowed for events"
389389
)]
390390
pub event_flatten_level: usize,
391+
392+
// maximum limit to store the statistics for a field
393+
#[arg(
394+
long,
395+
env = "P_MAX_FIELD_STATISTICS",
396+
default_value = "50",
397+
help = "Maximum number of field statistics to store"
398+
)]
399+
pub max_field_statistics: usize,
391400
}
392401

393402
#[derive(Parser, Debug)]

src/parseable/streams.rs

Lines changed: 29 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,13 @@ use std::{
2626
time::{Instant, SystemTime, UNIX_EPOCH},
2727
};
2828

29-
use arrow_array::{Array, Float64Array, Int64Array, NullArray, StringArray};
29+
use arrow_array::{Array, Date32Array, Float64Array, Int64Array, NullArray, StringArray};
3030
use arrow_array::{BooleanArray, RecordBatch, TimestampMillisecondArray};
3131
use arrow_schema::{Field, Fields, Schema};
3232
use chrono::{DateTime, NaiveDateTime, Timelike, Utc};
3333
use datafusion::{datasource::MemTable, prelude::SessionContext};
3434
use derive_more::{Deref, DerefMut};
35-
use futures::stream::{FuturesUnordered, StreamExt};
35+
use futures_util::StreamExt;
3636
use itertools::Itertools;
3737
use parquet::{
3838
arrow::ArrowWriter,
@@ -75,6 +75,8 @@ use super::{
7575
LogStream, ARROW_FILE_EXTENSION,
7676
};
7777

78+
const MAX_CONCURRENT_FIELD_STATS: usize = 10;
79+
7880
#[derive(Serialize, Debug)]
7981
struct DistinctStat {
8082
distinct_value: String,
@@ -517,7 +519,7 @@ impl Stream {
517519
// if yes, then merge them and save
518520

519521
if let Some(mut schema) = schema {
520-
if !&self.stream_name.contains(INTERNAL_STREAM_NAME) {
522+
if self.get_stream_type() != StreamType::Internal {
521523
if let Err(err) = self.calculate_field_stats(rbs, schema.clone().into()).await {
522524
warn!(
523525
"Error calculating field stats for stream {}: {}",
@@ -1067,19 +1069,25 @@ impl Stream {
10671069
ctx: &SessionContext,
10681070
schema: &Arc<Schema>,
10691071
) -> Vec<FieldStat> {
1070-
let field_futures = schema.fields().iter().map(|field| {
1072+
// Collect field names into an owned Vec<String> to avoid lifetime issues
1073+
let field_names: Vec<String> = schema
1074+
.fields()
1075+
.iter()
1076+
.map(|field| field.name().clone())
1077+
.collect();
1078+
1079+
let field_futures = field_names.into_iter().map(|field_name| {
10711080
let ctx = ctx.clone();
10721081
let stream_name = self.stream_name.clone();
1073-
let field_name = field.name().clone();
10741082
async move { Self::calculate_single_field_stats(ctx, stream_name, field_name).await }
10751083
});
10761084

1077-
FuturesUnordered::from_iter(field_futures)
1085+
futures::stream::iter(field_futures)
1086+
.buffer_unordered(MAX_CONCURRENT_FIELD_STATS)
10781087
.filter_map(|x| async { x })
10791088
.collect::<Vec<_>>()
10801089
.await
10811090
}
1082-
10831091
async fn calculate_single_field_stats(
10841092
ctx: SessionContext,
10851093
stream_name: String,
@@ -1117,11 +1125,12 @@ impl Stream {
11171125
async fn query_single_i64(ctx: &SessionContext, sql: &str) -> Option<i64> {
11181126
let df = ctx.sql(sql).await.ok()?;
11191127
let batches = df.collect().await.ok()?;
1120-
let array = batches
1121-
.first()?
1122-
.column(0)
1123-
.as_any()
1124-
.downcast_ref::<arrow_array::Int64Array>()?;
1128+
let batch = batches.first()?;
1129+
if batch.num_rows() == 0 {
1130+
return None;
1131+
}
1132+
let array = batch.column(0).as_any().downcast_ref::<Int64Array>()?;
1133+
11251134
Some(array.value(0))
11261135
}
11271136

@@ -1140,11 +1149,17 @@ impl Stream {
11401149
DateTime::from_timestamp_millis(timestamp)
11411150
.map(|dt| dt.to_string())
11421151
.unwrap_or_else(|| "INVALID_TIMESTAMP".to_string())
1152+
} else if let Some(arr) = array.as_any().downcast_ref::<Date32Array>() {
1153+
return arr.value(idx).to_string();
11431154
} else if let Some(arr) = array.as_any().downcast_ref::<BooleanArray>() {
11441155
arr.value(idx).to_string()
11451156
} else if array.as_any().downcast_ref::<NullArray>().is_some() {
11461157
"NULL".to_string()
11471158
} else {
1159+
warn!(
1160+
"Unsupported array type for statistics: {:?}",
1161+
array.data_type()
1162+
);
11481163
"UNSUPPORTED".to_string()
11491164
}
11501165
}
@@ -1155,7 +1170,8 @@ impl Stream {
11551170
field_name: &str,
11561171
) -> Vec<DistinctStat> {
11571172
let sql = format!(
1158-
"select count(*) as distinct_count, \"{field_name}\" from \"{stream_name}\" where \"{field_name}\" is not null group by \"{field_name}\" order by distinct_count desc limit 50"
1173+
"select count(*) as distinct_count, \"{field_name}\" from \"{stream_name}\" where \"{field_name}\" is not null group by \"{field_name}\" order by distinct_count desc limit {}",
1174+
PARSEABLE.options.max_field_statistics
11591175
);
11601176
let mut distinct_stats = Vec::new();
11611177
if let Ok(df) = ctx.sql(&sql).await {

0 commit comments

Comments
 (0)