@@ -27,10 +27,13 @@ use std::{
27
27
time:: { Instant , SystemTime , UNIX_EPOCH } ,
28
28
} ;
29
29
30
- use arrow_array:: RecordBatch ;
30
+ use arrow_array:: { Array , Float64Array , Int64Array , NullArray , StringArray } ;
31
+ use arrow_array:: { BooleanArray , RecordBatch , TimestampMillisecondArray } ;
31
32
use arrow_schema:: { Field , Fields , Schema } ;
32
- use chrono:: { NaiveDateTime , Timelike , Utc } ;
33
+ use chrono:: { DateTime , NaiveDateTime , Timelike , Utc } ;
34
+ use datafusion:: { datasource:: MemTable , prelude:: SessionContext } ;
33
35
use derive_more:: { Deref , DerefMut } ;
36
+ use futures:: stream:: { FuturesUnordered , StreamExt } ;
34
37
use itertools:: Itertools ;
35
38
use parquet:: {
36
39
arrow:: ArrowWriter ,
@@ -41,6 +44,7 @@ use parquet::{
41
44
} ;
42
45
use rand:: distributions:: DistString ;
43
46
use relative_path:: RelativePathBuf ;
47
+ use serde:: Serialize ;
44
48
use tokio:: task:: JoinSet ;
45
49
use tracing:: { error, info, trace, warn} ;
46
50
@@ -50,9 +54,14 @@ use crate::{
50
54
format:: { LogSource , LogSourceEntry } ,
51
55
DEFAULT_TIMESTAMP_KEY ,
52
56
} ,
57
+ handlers:: http:: {
58
+ cluster:: INTERNAL_STREAM_NAME , ingest:: PostError ,
59
+ modal:: utils:: ingest_utils:: flatten_and_push_logs,
60
+ } ,
53
61
metadata:: { LogStreamMetadata , SchemaVersion } ,
54
62
metrics,
55
63
option:: Mode ,
64
+ parseable:: PARSEABLE ,
56
65
storage:: { object_storage:: to_bytes, retention:: Retention , StreamType } ,
57
66
utils:: time:: { Minute , TimeRange } ,
58
67
LOCK_EXPECT , OBJECT_STORE_DATA_GRANULARITY ,
@@ -67,6 +76,26 @@ use super::{
67
76
LogStream , ARROW_FILE_EXTENSION ,
68
77
} ;
69
78
79
+ #[ derive( Serialize , Debug ) ]
80
+ struct DistinctStat {
81
+ distinct_value : String ,
82
+ count : i64 ,
83
+ }
84
+
85
+ #[ derive( Serialize , Debug ) ]
86
+ struct FieldStat {
87
+ field_name : String ,
88
+ count : i64 ,
89
+ distinct_count : i64 ,
90
+ distinct_stats : Vec < DistinctStat > ,
91
+ }
92
+
93
+ #[ derive( Serialize , Debug ) ]
94
+ struct DatasetStats {
95
+ dataset_name : String ,
96
+ field_stats : Vec < FieldStat > ,
97
+ }
98
+
70
99
/// Returns the filename for parquet if provided arrows file path is valid as per our expectation
71
100
fn arrow_path_to_parquet ( path : & Path , random_string : & str ) -> Option < PathBuf > {
72
101
let filename = path. file_stem ( ) ?. to_str ( ) ?;
@@ -114,7 +143,7 @@ impl Stream {
114
143
let data_path = options. local_stream_data_path ( & stream_name) ;
115
144
116
145
Arc :: new ( Self {
117
- stream_name,
146
+ stream_name : stream_name . clone ( ) ,
118
147
metadata : RwLock :: new ( metadata) ,
119
148
data_path,
120
149
options,
@@ -306,7 +335,7 @@ impl Stream {
306
335
}
307
336
308
337
/// Converts arrow files in staging into parquet files, does so only for past minutes when run with `!shutdown_signal`
309
- pub fn prepare_parquet ( & self , shutdown_signal : bool ) -> Result < ( ) , StagingError > {
338
+ pub async fn prepare_parquet ( & self , shutdown_signal : bool ) -> Result < ( ) , StagingError > {
310
339
info ! (
311
340
"Starting arrow_conversion job for stream- {}" ,
312
341
self . stream_name
@@ -317,18 +346,23 @@ impl Stream {
317
346
318
347
// read arrow files on disk
319
348
// convert them to parquet
320
- let schema = self
321
- . convert_disk_files_to_parquet (
322
- time_partition. as_ref ( ) ,
323
- custom_partition. as_ref ( ) ,
324
- shutdown_signal,
325
- )
326
- . inspect_err ( |err| warn ! ( "Error while converting arrow to parquet- {err:?}" ) ) ?;
327
-
349
+ let ( schema, rbs) = self . convert_disk_files_to_parquet (
350
+ time_partition. as_ref ( ) ,
351
+ custom_partition. as_ref ( ) ,
352
+ shutdown_signal,
353
+ ) ?;
328
354
// check if there is already a schema file in staging pertaining to this stream
329
355
// if yes, then merge them and save
330
356
331
357
if let Some ( mut schema) = schema {
358
+ if !& self . stream_name . contains ( INTERNAL_STREAM_NAME ) {
359
+ if let Err ( err) = self . calculate_field_stats ( rbs, schema. clone ( ) . into ( ) ) . await {
360
+ warn ! (
361
+ "Error calculating field stats for stream {}: {}" ,
362
+ self . stream_name, err
363
+ ) ;
364
+ }
365
+ }
332
366
let static_schema_flag = self . get_static_schema_flag ( ) ;
333
367
if !static_schema_flag {
334
368
// schema is dynamic, read from staging and merge if present
@@ -429,7 +463,7 @@ impl Stream {
429
463
time_partition : Option < & String > ,
430
464
custom_partition : Option < & String > ,
431
465
shutdown_signal : bool ,
432
- ) -> Result < Option < Schema > , StagingError > {
466
+ ) -> Result < ( Option < Schema > , Vec < RecordBatch > ) , StagingError > {
433
467
let mut schemas = Vec :: new ( ) ;
434
468
435
469
let now = SystemTime :: now ( ) ;
@@ -464,8 +498,7 @@ impl Stream {
464
498
metrics:: STORAGE_SIZE
465
499
. with_label_values ( & [ "staging" , & self . stream_name , "arrows" ] )
466
500
. set ( total_arrow_files_size as i64 ) ;
467
-
468
- // warn!("staging files-\n{staging_files:?}\n");
501
+ let mut record_batches = Vec :: new ( ) ;
469
502
for ( parquet_path, arrow_files) in staging_files {
470
503
let record_reader = MergedReverseRecordReader :: try_new ( & arrow_files) ;
471
504
if record_reader. readers . is_empty ( ) {
@@ -486,6 +519,7 @@ impl Stream {
486
519
let mut writer = ArrowWriter :: try_new ( & mut part_file, schema. clone ( ) , Some ( props) ) ?;
487
520
for ref record in record_reader. merged_iter ( schema, time_partition. cloned ( ) ) {
488
521
writer. write ( record) ?;
522
+ record_batches. push ( record. clone ( ) ) ;
489
523
}
490
524
writer. close ( ) ?;
491
525
@@ -525,10 +559,10 @@ impl Stream {
525
559
}
526
560
527
561
if schemas. is_empty ( ) {
528
- return Ok ( None ) ;
562
+ return Ok ( ( None , record_batches ) ) ;
529
563
}
530
564
531
- Ok ( Some ( Schema :: try_merge ( schemas) . unwrap ( ) ) )
565
+ Ok ( ( Some ( Schema :: try_merge ( schemas) ? ) , record_batches ) )
532
566
}
533
567
534
568
pub fn updated_schema ( & self , current_schema : Schema ) -> Schema {
@@ -725,7 +759,7 @@ impl Stream {
725
759
}
726
760
727
761
/// First flushes arrows onto disk and then converts the arrow into parquet
728
- pub fn flush_and_convert ( & self , shutdown_signal : bool ) -> Result < ( ) , StagingError > {
762
+ pub async fn flush_and_convert ( & self , shutdown_signal : bool ) -> Result < ( ) , StagingError > {
729
763
let start_flush = Instant :: now ( ) ;
730
764
self . flush ( shutdown_signal) ;
731
765
trace ! (
@@ -735,7 +769,8 @@ impl Stream {
735
769
) ;
736
770
737
771
let start_convert = Instant :: now ( ) ;
738
- self . prepare_parquet ( shutdown_signal) ?;
772
+
773
+ self . prepare_parquet ( shutdown_signal) . await ?;
739
774
trace ! (
740
775
"Converting arrows to parquet on stream ({}) took: {}s" ,
741
776
self . stream_name,
@@ -744,6 +779,165 @@ impl Stream {
744
779
745
780
Ok ( ( ) )
746
781
}
782
+
783
+ async fn calculate_field_stats (
784
+ & self ,
785
+ record_batches : Vec < RecordBatch > ,
786
+ schema : Arc < Schema > ,
787
+ ) -> Result < ( ) , PostError > {
788
+ let dataset_meta = format ! ( "{}_{INTERNAL_STREAM_NAME}" , & self . stream_name) ;
789
+ let log_source_entry = LogSourceEntry :: new ( LogSource :: Json , HashSet :: new ( ) ) ;
790
+ PARSEABLE
791
+ . create_stream_if_not_exists (
792
+ & dataset_meta,
793
+ StreamType :: Internal ,
794
+ vec ! [ log_source_entry] ,
795
+ )
796
+ . await ?;
797
+ let mem_table = MemTable :: try_new ( schema. clone ( ) , vec ! [ record_batches] )
798
+ . map_err ( |e| PostError :: Invalid ( e. into ( ) ) ) ?;
799
+ let ctx = SessionContext :: new ( ) ;
800
+ ctx. register_table ( & self . stream_name , Arc :: new ( mem_table) )
801
+ . map_err ( |e| PostError :: Invalid ( e. into ( ) ) ) ?;
802
+
803
+ let field_stats = self . collect_all_field_stats ( & ctx, & schema) . await ;
804
+
805
+ let stats = DatasetStats {
806
+ dataset_name : self . stream_name . clone ( ) ,
807
+ field_stats,
808
+ } ;
809
+ if stats. field_stats . is_empty ( ) {
810
+ return Ok ( ( ) ) ;
811
+ }
812
+ let stats_value = serde_json:: to_value ( & stats) . map_err ( |e| PostError :: Invalid ( e. into ( ) ) ) ?;
813
+
814
+ flatten_and_push_logs (
815
+ stats_value,
816
+ & dataset_meta,
817
+ & LogSource :: Json ,
818
+ & HashMap :: new ( ) ,
819
+ )
820
+ . await ?;
821
+ Ok ( ( ) )
822
+ }
823
+
824
+ async fn collect_all_field_stats (
825
+ & self ,
826
+ ctx : & SessionContext ,
827
+ schema : & Arc < Schema > ,
828
+ ) -> Vec < FieldStat > {
829
+ let field_futures = schema. fields ( ) . iter ( ) . map ( |field| {
830
+ let ctx = ctx. clone ( ) ;
831
+ let stream_name = self . stream_name . clone ( ) ;
832
+ let field_name = field. name ( ) . clone ( ) ;
833
+ async move { Self :: calculate_single_field_stats ( ctx, stream_name, field_name) . await }
834
+ } ) ;
835
+
836
+ FuturesUnordered :: from_iter ( field_futures)
837
+ . filter_map ( |x| async { x } )
838
+ . collect :: < Vec < _ > > ( )
839
+ . await
840
+ }
841
+
842
+ async fn calculate_single_field_stats (
843
+ ctx : SessionContext ,
844
+ stream_name : String ,
845
+ field_name : String ,
846
+ ) -> Option < FieldStat > {
847
+ let count = Self :: query_single_i64 (
848
+ & ctx,
849
+ & format ! (
850
+ "select count(\" {field_name}\" ) as count from \" {stream_name}\" where \" {field_name}\" is not null"
851
+ ) ,
852
+ )
853
+ . await ?;
854
+ if count == 0 {
855
+ return None ;
856
+ }
857
+
858
+ let distinct_count = Self :: query_single_i64 (
859
+ & ctx,
860
+ & format ! (
861
+ "select COUNT(DISTINCT \" {field_name}\" ) as distinct_count from \" {stream_name}\" "
862
+ ) ,
863
+ )
864
+ . await ?;
865
+
866
+ let distinct_stats = Self :: query_distinct_stats ( & ctx, & stream_name, & field_name) . await ;
867
+
868
+ Some ( FieldStat {
869
+ field_name,
870
+ count,
871
+ distinct_count,
872
+ distinct_stats,
873
+ } )
874
+ }
875
+
876
+ async fn query_single_i64 ( ctx : & SessionContext , sql : & str ) -> Option < i64 > {
877
+ let df = ctx. sql ( sql) . await . ok ( ) ?;
878
+ let batches = df. collect ( ) . await . ok ( ) ?;
879
+ let array = batches
880
+ . first ( ) ?
881
+ . column ( 0 )
882
+ . as_any ( )
883
+ . downcast_ref :: < arrow_array:: Int64Array > ( ) ?;
884
+ Some ( array. value ( 0 ) )
885
+ }
886
+
887
+ fn format_arrow_value ( array : & dyn Array , idx : usize ) -> String {
888
+ if array. is_null ( idx) {
889
+ return "NULL" . to_string ( ) ;
890
+ }
891
+ if let Some ( arr) = array. as_any ( ) . downcast_ref :: < StringArray > ( ) {
892
+ arr. value ( idx) . to_string ( )
893
+ } else if let Some ( arr) = array. as_any ( ) . downcast_ref :: < Int64Array > ( ) {
894
+ arr. value ( idx) . to_string ( )
895
+ } else if let Some ( arr) = array. as_any ( ) . downcast_ref :: < Float64Array > ( ) {
896
+ arr. value ( idx) . to_string ( )
897
+ } else if let Some ( arr) = array. as_any ( ) . downcast_ref :: < TimestampMillisecondArray > ( ) {
898
+ let timestamp = arr. value ( idx) ;
899
+ DateTime :: from_timestamp_millis ( timestamp)
900
+ . map ( |dt| dt. to_string ( ) )
901
+ . unwrap_or_else ( || "INVALID_TIMESTAMP" . to_string ( ) )
902
+ } else if let Some ( arr) = array. as_any ( ) . downcast_ref :: < BooleanArray > ( ) {
903
+ arr. value ( idx) . to_string ( )
904
+ } else if array. as_any ( ) . downcast_ref :: < NullArray > ( ) . is_some ( ) {
905
+ "NULL" . to_string ( )
906
+ } else {
907
+ "UNSUPPORTED" . to_string ( )
908
+ }
909
+ }
910
+
911
+ async fn query_distinct_stats (
912
+ ctx : & SessionContext ,
913
+ stream_name : & str ,
914
+ field_name : & str ,
915
+ ) -> Vec < DistinctStat > {
916
+ let sql = format ! (
917
+ "select count(*) as distinct_count, \" {field_name}\" from \" {stream_name}\" where \" {field_name}\" is not null group by \" {field_name}\" order by distinct_count desc limit 50"
918
+ ) ;
919
+ let mut distinct_stats = Vec :: new ( ) ;
920
+ if let Ok ( df) = ctx. sql ( & sql) . await {
921
+ if let Ok ( batches) = df. collect ( ) . await {
922
+ for rb in batches {
923
+ let counts = rb
924
+ . column ( 0 )
925
+ . as_any ( )
926
+ . downcast_ref :: < Int64Array > ( )
927
+ . expect ( "Counts should be Int64Array" ) ;
928
+ let values = rb. column ( 1 ) . as_ref ( ) ;
929
+ for i in 0 ..rb. num_rows ( ) {
930
+ let value = Self :: format_arrow_value ( values, i) ;
931
+ distinct_stats. push ( DistinctStat {
932
+ distinct_value : value,
933
+ count : counts. value ( i) ,
934
+ } ) ;
935
+ }
936
+ }
937
+ }
938
+ }
939
+ distinct_stats
940
+ }
747
941
}
748
942
749
943
#[ derive( Deref , DerefMut , Default ) ]
@@ -829,7 +1023,7 @@ impl Streams {
829
1023
. map ( Arc :: clone)
830
1024
. collect ( ) ;
831
1025
for stream in streams {
832
- joinset. spawn ( async move { stream. flush_and_convert ( shutdown_signal) } ) ;
1026
+ joinset. spawn ( async move { stream. flush_and_convert ( shutdown_signal) . await } ) ;
833
1027
}
834
1028
}
835
1029
}
@@ -1019,7 +1213,7 @@ mod tests {
1019
1213
None ,
1020
1214
)
1021
1215
. convert_disk_files_to_parquet ( None , None , false ) ?;
1022
- assert ! ( result. is_none( ) ) ;
1216
+ assert ! ( result. 0 . is_none( ) ) ;
1023
1217
// Verify metrics were set to 0
1024
1218
let staging_files = metrics:: STAGING_FILES . with_label_values ( & [ & stream] ) . get ( ) ;
1025
1219
assert_eq ! ( staging_files, 0 ) ;
@@ -1100,8 +1294,8 @@ mod tests {
1100
1294
. convert_disk_files_to_parquet ( None , None , true )
1101
1295
. unwrap ( ) ;
1102
1296
1103
- assert ! ( result. is_some( ) ) ;
1104
- let result_schema = result. unwrap ( ) ;
1297
+ assert ! ( result. 0 . is_some( ) ) ;
1298
+ let result_schema = result. 0 . unwrap ( ) ;
1105
1299
assert_eq ! ( result_schema. fields( ) . len( ) , 3 ) ;
1106
1300
1107
1301
// Verify parquet files were created and the arrow files deleted
@@ -1149,8 +1343,8 @@ mod tests {
1149
1343
. convert_disk_files_to_parquet ( None , None , true )
1150
1344
. unwrap ( ) ;
1151
1345
1152
- assert ! ( result. is_some( ) ) ;
1153
- let result_schema = result. unwrap ( ) ;
1346
+ assert ! ( result. 0 . is_some( ) ) ;
1347
+ let result_schema = result. 0 . unwrap ( ) ;
1154
1348
assert_eq ! ( result_schema. fields( ) . len( ) , 3 ) ;
1155
1349
1156
1350
// Verify parquet files were created and the arrow files deleted
@@ -1203,8 +1397,8 @@ mod tests {
1203
1397
. convert_disk_files_to_parquet ( None , None , false )
1204
1398
. unwrap ( ) ;
1205
1399
1206
- assert ! ( result. is_some( ) ) ;
1207
- let result_schema = result. unwrap ( ) ;
1400
+ assert ! ( result. 0 . is_some( ) ) ;
1401
+ let result_schema = result. 0 . unwrap ( ) ;
1208
1402
assert_eq ! ( result_schema. fields( ) . len( ) , 3 ) ;
1209
1403
1210
1404
// Verify parquet files were created and the arrow file left
0 commit comments