Skip to content

Commit bf51964

Browse files
stats calculation in non blocking task
1 parent 197fe51 commit bf51964

File tree

1 file changed

+175
-169
lines changed

1 file changed

+175
-169
lines changed

src/parseable/streams.rs

Lines changed: 175 additions & 169 deletions
Original file line numberDiff line numberDiff line change
@@ -519,12 +519,19 @@ impl Stream {
519519
if let Some(mut schema) = schema {
520520
// calculate field stats for all user defined streams
521521
if self.get_stream_type() != StreamType::Internal {
522-
if let Err(err) = self.calculate_field_stats(rbs, schema.clone().into()).await {
523-
warn!(
524-
"Error calculating field stats for stream {}: {}",
525-
self.stream_name, err
526-
);
527-
}
522+
let stats_schema = schema.clone();
523+
let stream_name = self.stream_name.clone();
524+
let stats_rbs = rbs.clone();
525+
tokio::spawn(async move {
526+
if let Err(err) =
527+
calculate_field_stats(&stream_name, stats_rbs, stats_schema.into()).await
528+
{
529+
warn!(
530+
"Error calculating field stats for stream {}: {}",
531+
&stream_name, err
532+
);
533+
}
534+
});
528535
}
529536
let static_schema_flag = self.get_static_schema_flag();
530537
if !static_schema_flag {
@@ -1022,196 +1029,195 @@ impl Stream {
10221029

10231030
Ok(())
10241031
}
1032+
}
10251033

1026-
/// Calculates field statistics for the stream and pushes them to the internal stats dataset.
1027-
/// This function creates a new internal stream for stats if it doesn't exist.
1028-
/// It collects statistics for each field in the stream
1029-
async fn calculate_field_stats(
1030-
&self,
1031-
record_batches: Vec<RecordBatch>,
1032-
schema: Arc<Schema>,
1033-
) -> Result<(), PostError> {
1034-
let log_source_entry = LogSourceEntry::new(LogSource::Json, HashSet::new());
1035-
PARSEABLE
1036-
.create_stream_if_not_exists(
1037-
DATASET_STATS_STREAM_NAME,
1038-
StreamType::Internal,
1039-
vec![log_source_entry],
1040-
)
1041-
.await?;
1042-
let mem_table = MemTable::try_new(schema.clone(), vec![record_batches])
1043-
.map_err(|e| PostError::Invalid(e.into()))?;
1044-
let ctx = SessionContext::new();
1045-
ctx.register_table(&self.stream_name, Arc::new(mem_table))
1046-
.map_err(|e| PostError::Invalid(e.into()))?;
1047-
1048-
let field_stats = self.collect_all_field_stats(&ctx, &schema).await;
1049-
drop(ctx);
1050-
1051-
let stats = DatasetStats {
1052-
dataset_name: self.stream_name.clone(),
1053-
field_stats,
1054-
};
1055-
if stats.field_stats.is_empty() {
1056-
return Ok(());
1057-
}
1058-
let stats_value = serde_json::to_value(&stats).map_err(|e| PostError::Invalid(e.into()))?;
1059-
1060-
flatten_and_push_logs(
1061-
stats_value,
1034+
/// Calculates field statistics for the stream and pushes them to the internal stats dataset.
1035+
/// This function creates a new internal stream for stats if it doesn't exist.
1036+
/// It collects statistics for each field in the stream
1037+
async fn calculate_field_stats(
1038+
stream_name: &str,
1039+
record_batches: Vec<RecordBatch>,
1040+
schema: Arc<Schema>,
1041+
) -> Result<(), PostError> {
1042+
let log_source_entry = LogSourceEntry::new(LogSource::Json, HashSet::new());
1043+
PARSEABLE
1044+
.create_stream_if_not_exists(
10621045
DATASET_STATS_STREAM_NAME,
1063-
&LogSource::Json,
1064-
&HashMap::new(),
1046+
StreamType::Internal,
1047+
vec![log_source_entry],
10651048
)
10661049
.await?;
1067-
Ok(())
1068-
}
1069-
1070-
/// Collects statistics for all fields in the stream.
1071-
/// Returns a vector of `FieldStat` for each field with non-zero count.
1072-
/// Uses `buffer_unordered` to run up to `MAX_CONCURRENT_FIELD_STATS` queries concurrently.
1073-
async fn collect_all_field_stats(
1074-
&self,
1075-
ctx: &SessionContext,
1076-
schema: &Arc<Schema>,
1077-
) -> Vec<FieldStat> {
1078-
// Collect field names into an owned Vec<String> to avoid lifetime issues
1079-
let field_names: Vec<String> = schema
1080-
.fields()
1081-
.iter()
1082-
.map(|field| field.name().clone())
1083-
.collect();
1084-
1085-
let field_futures = field_names.into_iter().map(|field_name| {
1086-
let ctx = ctx.clone();
1087-
let stream_name = self.stream_name.clone();
1088-
async move { Self::calculate_single_field_stats(ctx, stream_name, field_name).await }
1089-
});
1050+
let mem_table = MemTable::try_new(schema.clone(), vec![record_batches])
1051+
.map_err(|e| PostError::Invalid(e.into()))?;
1052+
let ctx = SessionContext::new();
1053+
ctx.register_table(stream_name, Arc::new(mem_table))
1054+
.map_err(|e| PostError::Invalid(e.into()))?;
1055+
1056+
let field_stats = collect_all_field_stats(stream_name, &ctx, &schema).await;
1057+
drop(ctx);
1058+
1059+
let stats = DatasetStats {
1060+
dataset_name: stream_name.to_string(),
1061+
field_stats,
1062+
};
1063+
if stats.field_stats.is_empty() {
1064+
return Ok(());
1065+
}
1066+
let stats_value = serde_json::to_value(&stats).map_err(|e| PostError::Invalid(e.into()))?;
1067+
1068+
flatten_and_push_logs(
1069+
stats_value,
1070+
DATASET_STATS_STREAM_NAME,
1071+
&LogSource::Json,
1072+
&HashMap::new(),
1073+
)
1074+
.await?;
1075+
Ok(())
1076+
}
10901077

1091-
futures::stream::iter(field_futures)
1092-
.buffer_unordered(MAX_CONCURRENT_FIELD_STATS)
1093-
.filter_map(std::future::ready)
1094-
.collect::<Vec<_>>()
1095-
.await
1096-
}
1078+
/// Collects statistics for all fields in the stream.
1079+
/// Returns a vector of `FieldStat` for each field with non-zero count.
1080+
/// Uses `buffer_unordered` to run up to `MAX_CONCURRENT_FIELD_STATS` queries concurrently.
1081+
async fn collect_all_field_stats(
1082+
stream_name: &str,
1083+
ctx: &SessionContext,
1084+
schema: &Arc<Schema>,
1085+
) -> Vec<FieldStat> {
1086+
// Collect field names into an owned Vec<String> to avoid lifetime issues
1087+
let field_names: Vec<String> = schema
1088+
.fields()
1089+
.iter()
1090+
.map(|field| field.name().clone())
1091+
.collect();
1092+
1093+
let field_futures = field_names.into_iter().map(|field_name| {
1094+
let ctx = ctx.clone();
1095+
async move { calculate_single_field_stats(ctx, stream_name, &field_name).await }
1096+
});
1097+
1098+
futures::stream::iter(field_futures)
1099+
.buffer_unordered(MAX_CONCURRENT_FIELD_STATS)
1100+
.filter_map(std::future::ready)
1101+
.collect::<Vec<_>>()
1102+
.await
1103+
}
10971104

1098-
/// Calculates statistics for a single field in the stream.
1099-
/// Returns `None` if the count query returns 0.
1100-
async fn calculate_single_field_stats(
1101-
ctx: SessionContext,
1102-
stream_name: String,
1103-
field_name: String,
1104-
) -> Option<FieldStat> {
1105-
let count = Self::query_single_i64(
1105+
/// Calculates statistics for a single field in the stream.
1106+
/// Returns `None` if the count query returns 0.
1107+
async fn calculate_single_field_stats(
1108+
ctx: SessionContext,
1109+
stream_name: &str,
1110+
field_name: &str,
1111+
) -> Option<FieldStat> {
1112+
let count = query_single_i64(
11061113
&ctx,
11071114
&format!(
11081115
"select count(\"{field_name}\") as count from \"{stream_name}\" where \"{field_name}\" is not null"
11091116
),
11101117
)
11111118
.await?;
1112-
if count == 0 {
1113-
return None;
1114-
}
1115-
1116-
let distinct_count = Self::query_single_i64(
1117-
&ctx,
1118-
&format!(
1119-
"select COUNT(DISTINCT \"{field_name}\") as distinct_count from \"{stream_name}\""
1120-
),
1121-
)
1122-
.await?;
1119+
if count == 0 {
1120+
return None;
1121+
}
11231122

1124-
let distinct_stats = Self::query_distinct_stats(&ctx, &stream_name, &field_name).await;
1123+
let distinct_count = query_single_i64(
1124+
&ctx,
1125+
&format!(
1126+
"select COUNT(DISTINCT \"{field_name}\") as distinct_count from \"{stream_name}\""
1127+
),
1128+
)
1129+
.await?;
11251130

1126-
Some(FieldStat {
1127-
field_name,
1128-
count,
1129-
distinct_count,
1130-
distinct_stats,
1131-
})
1132-
}
1131+
let distinct_stats = query_distinct_stats(&ctx, stream_name, field_name).await;
11331132

1134-
/// Queries a single integer value from the DataFusion context.
1135-
/// Returns `None` if the query fails or returns no rows.
1136-
/// This is used for fetching record count for a field and distinct count.
1137-
async fn query_single_i64(ctx: &SessionContext, sql: &str) -> Option<i64> {
1138-
let df = ctx.sql(sql).await.ok()?;
1139-
let batches = df.collect().await.ok()?;
1140-
let batch = batches.first()?;
1141-
if batch.num_rows() == 0 {
1142-
return None;
1143-
}
1144-
let array = batch.column(0).as_any().downcast_ref::<Int64Array>()?;
1133+
Some(FieldStat {
1134+
field_name: field_name.to_string(),
1135+
count,
1136+
distinct_count,
1137+
distinct_stats,
1138+
})
1139+
}
11451140

1146-
Some(array.value(0))
1141+
/// Queries a single integer value from the DataFusion context.
1142+
/// Returns `None` if the query fails or returns no rows.
1143+
/// This is used for fetching record count for a field and distinct count.
1144+
async fn query_single_i64(ctx: &SessionContext, sql: &str) -> Option<i64> {
1145+
let df = ctx.sql(sql).await.ok()?;
1146+
let batches = df.collect().await.ok()?;
1147+
let batch = batches.first()?;
1148+
if batch.num_rows() == 0 {
1149+
return None;
11471150
}
1151+
let array = batch.column(0).as_any().downcast_ref::<Int64Array>()?;
11481152

1149-
/// Helper function to format an Arrow value at a given index into a string.
1150-
/// Handles null values and different data types like String, Int64, Float64, Timestamp, Date32, and Boolean.
1151-
fn format_arrow_value(array: &dyn Array, idx: usize) -> String {
1152-
if array.is_null(idx) {
1153-
return "NULL".to_string();
1154-
}
1155-
if let Some(arr) = array.as_any().downcast_ref::<StringArray>() {
1156-
arr.value(idx).to_string()
1157-
} else if let Some(arr) = array.as_any().downcast_ref::<Int64Array>() {
1158-
arr.value(idx).to_string()
1159-
} else if let Some(arr) = array.as_any().downcast_ref::<Float64Array>() {
1160-
arr.value(idx).to_string()
1161-
} else if let Some(arr) = array.as_any().downcast_ref::<TimestampMillisecondArray>() {
1162-
let timestamp = arr.value(idx);
1163-
DateTime::from_timestamp_millis(timestamp)
1164-
.map(|dt| dt.to_string())
1165-
.unwrap_or_else(|| "INVALID_TIMESTAMP".to_string())
1166-
} else if let Some(arr) = array.as_any().downcast_ref::<Date32Array>() {
1167-
return arr.value(idx).to_string();
1168-
} else if let Some(arr) = array.as_any().downcast_ref::<BooleanArray>() {
1169-
arr.value(idx).to_string()
1170-
} else if array.as_any().downcast_ref::<NullArray>().is_some() {
1171-
"NULL".to_string()
1172-
} else {
1173-
warn!(
1174-
"Unsupported array type for statistics: {:?}",
1175-
array.data_type()
1176-
);
1177-
"UNSUPPORTED".to_string()
1178-
}
1153+
Some(array.value(0))
1154+
}
1155+
1156+
/// Helper function to format an Arrow value at a given index into a string.
1157+
/// Handles null values and different data types like String, Int64, Float64, Timestamp, Date32, and Boolean.
1158+
fn format_arrow_value(array: &dyn Array, idx: usize) -> String {
1159+
if array.is_null(idx) {
1160+
return "NULL".to_string();
1161+
}
1162+
if let Some(arr) = array.as_any().downcast_ref::<StringArray>() {
1163+
arr.value(idx).to_string()
1164+
} else if let Some(arr) = array.as_any().downcast_ref::<Int64Array>() {
1165+
arr.value(idx).to_string()
1166+
} else if let Some(arr) = array.as_any().downcast_ref::<Float64Array>() {
1167+
arr.value(idx).to_string()
1168+
} else if let Some(arr) = array.as_any().downcast_ref::<TimestampMillisecondArray>() {
1169+
let timestamp = arr.value(idx);
1170+
DateTime::from_timestamp_millis(timestamp)
1171+
.map(|dt| dt.to_string())
1172+
.unwrap_or_else(|| "INVALID_TIMESTAMP".to_string())
1173+
} else if let Some(arr) = array.as_any().downcast_ref::<Date32Array>() {
1174+
return arr.value(idx).to_string();
1175+
} else if let Some(arr) = array.as_any().downcast_ref::<BooleanArray>() {
1176+
arr.value(idx).to_string()
1177+
} else if array.as_any().downcast_ref::<NullArray>().is_some() {
1178+
"NULL".to_string()
1179+
} else {
1180+
warn!(
1181+
"Unsupported array type for statistics: {:?}",
1182+
array.data_type()
1183+
);
1184+
"UNSUPPORTED".to_string()
11791185
}
1186+
}
11801187

1181-
/// This function is used to fetch distinct values and their counts for a field in the stream.
1182-
/// Returns a vector of `DistinctStat` containing distinct values and their counts.
1183-
/// The query groups by the field and orders by the count in descending order, limiting the results to `PARSEABLE.options.max_field_statistics`.
1184-
async fn query_distinct_stats(
1185-
ctx: &SessionContext,
1186-
stream_name: &str,
1187-
field_name: &str,
1188-
) -> Vec<DistinctStat> {
1189-
let sql = format!(
1188+
/// This function is used to fetch distinct values and their counts for a field in the stream.
1189+
/// Returns a vector of `DistinctStat` containing distinct values and their counts.
1190+
/// The query groups by the field and orders by the count in descending order, limiting the results to `PARSEABLE.options.max_field_statistics`.
1191+
async fn query_distinct_stats(
1192+
ctx: &SessionContext,
1193+
stream_name: &str,
1194+
field_name: &str,
1195+
) -> Vec<DistinctStat> {
1196+
let sql = format!(
11901197
"select count(*) as distinct_count, \"{field_name}\" from \"{stream_name}\" group by \"{field_name}\" order by distinct_count desc limit {}",
11911198
PARSEABLE.options.max_field_statistics
11921199
);
1193-
let mut distinct_stats = Vec::new();
1194-
if let Ok(df) = ctx.sql(&sql).await {
1195-
if let Ok(batches) = df.collect().await {
1196-
for rb in batches {
1197-
let counts = rb
1198-
.column(0)
1199-
.as_any()
1200-
.downcast_ref::<Int64Array>()
1201-
.expect("Counts should be Int64Array");
1202-
let values = rb.column(1).as_ref();
1203-
for i in 0..rb.num_rows() {
1204-
let value = Self::format_arrow_value(values, i);
1205-
distinct_stats.push(DistinctStat {
1206-
distinct_value: value,
1207-
count: counts.value(i),
1208-
});
1209-
}
1200+
let mut distinct_stats = Vec::new();
1201+
if let Ok(df) = ctx.sql(&sql).await {
1202+
if let Ok(batches) = df.collect().await {
1203+
for rb in batches {
1204+
let counts = rb
1205+
.column(0)
1206+
.as_any()
1207+
.downcast_ref::<Int64Array>()
1208+
.expect("Counts should be Int64Array");
1209+
let values = rb.column(1).as_ref();
1210+
for i in 0..rb.num_rows() {
1211+
let value = format_arrow_value(values, i);
1212+
distinct_stats.push(DistinctStat {
1213+
distinct_value: value,
1214+
count: counts.value(i),
1215+
});
12101216
}
12111217
}
12121218
}
1213-
distinct_stats
12141219
}
1220+
distinct_stats
12151221
}
12161222

12171223
#[derive(Deref, DerefMut, Default)]

0 commit comments

Comments
 (0)