Skip to content

Commit 4cfcbdd

Browse files
deepsource analysis fix
1 parent 8ce3850 commit 4cfcbdd

File tree

1 file changed

+20
-1
lines changed

1 file changed

+20
-1
lines changed

src/parseable/streams.rs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -519,6 +519,7 @@ impl Stream {
519519
// if yes, then merge them and save
520520

521521
if let Some(mut schema) = schema {
522+
// calculate field stats for all user defined streams
522523
if self.get_stream_type() != StreamType::Internal {
523524
if let Err(err) = self.calculate_field_stats(rbs, schema.clone().into()).await {
524525
warn!(
@@ -731,6 +732,7 @@ impl Stream {
731732
let mut writer = ArrowWriter::try_new(&mut part_file, schema.clone(), Some(props.clone()))?;
732733
for ref record in record_reader.merged_iter(schema.clone(), time_partition.cloned()) {
733734
writer.write(record)?;
735+
// Collect record batches for finding statistics later
734736
record_batches.push(record.clone());
735737
}
736738
writer.close()?;
@@ -1023,6 +1025,9 @@ impl Stream {
10231025
Ok(())
10241026
}
10251027

1028+
/// Calculates field statistics for the stream and pushes them to the internal stats dataset.
1029+
/// This function creates a new internal stream for stats if it doesn't exist.
1030+
/// It collects statistics for each field in the stream
10261031
async fn calculate_field_stats(
10271032
&self,
10281033
record_batches: Vec<RecordBatch>,
@@ -1064,6 +1069,9 @@ impl Stream {
10641069
Ok(())
10651070
}
10661071

1072+
/// Collects statistics for all fields in the stream.
1073+
/// Returns a vector of `FieldStat` for each field with non-zero count.
1074+
/// Uses `buffer_unordered` to run up to `MAX_CONCURRENT_FIELD_STATS` queries concurrently.
10671075
async fn collect_all_field_stats(
10681076
&self,
10691077
ctx: &SessionContext,
@@ -1084,10 +1092,13 @@ impl Stream {
10841092

10851093
futures::stream::iter(field_futures)
10861094
.buffer_unordered(MAX_CONCURRENT_FIELD_STATS)
1087-
.filter_map(|x| async { x })
1095+
.filter_map(std::future::ready)
10881096
.collect::<Vec<_>>()
10891097
.await
10901098
}
1099+
1100+
/// Calculates statistics for a single field in the stream.
1101+
/// Returns `None` if the count query returns 0.
10911102
async fn calculate_single_field_stats(
10921103
ctx: SessionContext,
10931104
stream_name: String,
@@ -1122,6 +1133,9 @@ impl Stream {
11221133
})
11231134
}
11241135

1136+
/// Queries a single integer value from the DataFusion context.
1137+
/// Returns `None` if the query fails or returns no rows.
1138+
/// This is used for fetching record count for a field and distinct count.
11251139
async fn query_single_i64(ctx: &SessionContext, sql: &str) -> Option<i64> {
11261140
let df = ctx.sql(sql).await.ok()?;
11271141
let batches = df.collect().await.ok()?;
@@ -1134,6 +1148,8 @@ impl Stream {
11341148
Some(array.value(0))
11351149
}
11361150

1151+
/// Helper function to format an Arrow value at a given index into a string.
1152+
/// Handles null values and different data types like String, Int64, Float64, Timestamp, Date32, and Boolean.
11371153
fn format_arrow_value(array: &dyn Array, idx: usize) -> String {
11381154
if array.is_null(idx) {
11391155
return "NULL".to_string();
@@ -1164,6 +1180,9 @@ impl Stream {
11641180
}
11651181
}
11661182

1183+
/// This function is used to fetch distinct values and their counts for a field in the stream.
1184+
/// Returns a vector of `DistinctStat` containing distinct values and their counts.
1185+
/// The query groups by the field and orders by the count in descending order, limiting the results to `PARSEABLE.options.max_field_statistics`.
11671186
async fn query_distinct_stats(
11681187
ctx: &SessionContext,
11691188
stream_name: &str,

0 commit comments

Comments
 (0)