@@ -67,12 +67,16 @@ use super::{
67
67
} ;
68
68
69
69
/// Returns the filename for parquet if provided arrows file path is valid as per our expectation
70
- fn arrow_path_to_parquet ( staging_path : & Path , path : & Path , random_string : & str ) -> Option < PathBuf > {
70
+ fn arrow_path_to_parquet (
71
+ stream_staging_path : & Path ,
72
+ path : & Path ,
73
+ random_string : & str ,
74
+ ) -> Option < PathBuf > {
71
75
let filename = path. file_stem ( ) ?. to_str ( ) ?;
72
76
let ( _, front) = filename. split_once ( '.' ) ?;
73
77
assert ! ( front. contains( '.' ) , "contains the delim `.`" ) ;
74
78
let filename_with_random_number = format ! ( "{front}.{random_string}.parquet" ) ;
75
- let mut parquet_path = staging_path . to_owned ( ) ;
79
+ let mut parquet_path = stream_staging_path . to_owned ( ) ;
76
80
parquet_path. push ( filename_with_random_number) ;
77
81
Some ( parquet_path)
78
82
}
@@ -345,9 +349,10 @@ impl Stream {
345
349
arrow_files. retain ( |path| {
346
350
let creation = path
347
351
. metadata ( )
348
- . expect ( "Arrow file should exist on disk" )
349
- . created ( )
350
- . expect ( "Creation time should be accessible" ) ;
352
+ . ok ( )
353
+ . and_then ( |meta| meta. created ( ) . or_else ( |_| meta. modified ( ) ) . ok ( ) )
354
+ . expect ( "Arrow file should have a valid creation or modified time" ) ;
355
+
351
356
// Compare if creation time is actually from previous minute
352
357
minute_from_system_time ( creation) < minute_from_system_time ( exclude)
353
358
} ) ;
@@ -594,7 +599,7 @@ impl Stream {
594
599
. values ( )
595
600
. map ( |v| {
596
601
v. iter ( )
597
- . map ( |file| file. metadata ( ) . unwrap ( ) . len ( ) )
602
+ . filter_map ( |file| file. metadata ( ) . ok ( ) . map ( |meta| meta . len ( ) ) )
598
603
. sum :: < u64 > ( )
599
604
} )
600
605
. sum :: < u64 > ( ) ;
@@ -624,92 +629,129 @@ impl Stream {
624
629
return Ok ( None ) ;
625
630
}
626
631
627
- //find sum of arrow files in staging directory for a stream
628
632
self . update_staging_metrics ( & staging_files) ;
629
633
630
- // warn!("staging files-\n{staging_files:?}\n");
631
634
for ( parquet_path, arrow_files) in staging_files {
632
635
let record_reader = MergedReverseRecordReader :: try_new ( & arrow_files) ;
633
636
if record_reader. readers . is_empty ( ) {
634
637
continue ;
635
638
}
636
639
let merged_schema = record_reader. merged_schema ( ) ;
637
-
638
640
let props = self . parquet_writer_props ( & merged_schema, time_partition, custom_partition) ;
639
641
schemas. push ( merged_schema. clone ( ) ) ;
640
642
let schema = Arc :: new ( merged_schema) ;
641
- let mut part_path = parquet_path. to_owned ( ) ;
642
- part_path. set_extension ( "part" ) ;
643
- let mut part_file = OpenOptions :: new ( )
644
- . create ( true )
645
- . append ( true )
646
- . open ( & part_path)
647
- . map_err ( |_| StagingError :: Create ) ?;
648
- let mut writer = ArrowWriter :: try_new ( & mut part_file, schema. clone ( ) , Some ( props) ) ?;
649
- for ref record in record_reader. merged_iter ( schema, time_partition. cloned ( ) ) {
650
- writer. write ( record) ?;
651
- }
652
- writer. close ( ) ?;
653
643
654
- if part_file . metadata ( ) . expect ( "File was just created" ) . len ( )
655
- < parquet :: file :: FOOTER_SIZE as u64
656
- {
657
- error ! (
658
- "Invalid parquet file {part_path:?} detected for stream {}, removing it" ,
659
- & self . stream_name
660
- ) ;
661
- remove_file ( part_path ) . expect ( "File should be removable if it is invalid" ) ;
644
+ let part_path = parquet_path . with_extension ( "part" ) ;
645
+ if ! self . write_parquet_part_file (
646
+ & part_path ,
647
+ record_reader ,
648
+ & schema ,
649
+ & props ,
650
+ time_partition ,
651
+ ) ? {
662
652
continue ;
663
653
}
664
- trace ! ( "Parquet file successfully constructed" ) ;
665
654
666
- if let Err ( e) = std :: fs :: rename ( & part_path, & parquet_path) {
655
+ if let Err ( e) = self . finalize_parquet_file ( & part_path, & parquet_path) {
667
656
error ! ( "Couldn't rename part file: {part_path:?} -> {parquet_path:?}, error = {e}" ) ;
668
657
} else {
669
- // delete the files that were grouped to create parquet file
670
- for ( i, file) in arrow_files. iter ( ) . enumerate ( ) {
671
- match file. metadata ( ) {
672
- Ok ( meta) => {
673
- let file_size = meta. len ( ) ;
674
- match remove_file ( file) {
675
- Ok ( _) => {
676
- metrics:: STORAGE_SIZE
677
- . with_label_values ( & [
678
- "staging" ,
679
- & self . stream_name ,
680
- ARROW_FILE_EXTENSION ,
681
- ] )
682
- . sub ( file_size as i64 ) ;
683
- }
684
- Err ( e) => {
685
- warn ! ( "Failed to delete file {}: {e}" , file. display( ) ) ;
686
- }
687
- }
658
+ self . cleanup_arrow_files_and_dir ( & arrow_files) ;
659
+ }
660
+ }
661
+
662
+ if schemas. is_empty ( ) {
663
+ return Ok ( None ) ;
664
+ }
665
+
666
+ Ok ( Some ( Schema :: try_merge ( schemas) . unwrap ( ) ) )
667
+ }
668
+
669
+ fn write_parquet_part_file (
670
+ & self ,
671
+ part_path : & Path ,
672
+ record_reader : MergedReverseRecordReader ,
673
+ schema : & Arc < Schema > ,
674
+ props : & WriterProperties ,
675
+ time_partition : Option < & String > ,
676
+ ) -> Result < bool , StagingError > {
677
+ let mut part_file = OpenOptions :: new ( )
678
+ . create ( true )
679
+ . append ( true )
680
+ . open ( part_path)
681
+ . map_err ( |_| StagingError :: Create ) ?;
682
+ let mut writer = ArrowWriter :: try_new ( & mut part_file, schema. clone ( ) , Some ( props. clone ( ) ) ) ?;
683
+ for ref record in record_reader. merged_iter ( schema. clone ( ) , time_partition. cloned ( ) ) {
684
+ writer. write ( record) ?;
685
+ }
686
+ writer. close ( ) ?;
687
+
688
+ if part_file. metadata ( ) . expect ( "File was just created" ) . len ( )
689
+ < parquet:: file:: FOOTER_SIZE as u64
690
+ {
691
+ error ! (
692
+ "Invalid parquet file {part_path:?} detected for stream {}, removing it" ,
693
+ & self . stream_name
694
+ ) ;
695
+ remove_file ( part_path) . expect ( "File should be removable if it is invalid" ) ;
696
+ return Ok ( false ) ;
697
+ }
698
+ trace ! ( "Parquet file successfully constructed" ) ;
699
+ Ok ( true )
700
+ }
701
+
702
+ fn finalize_parquet_file ( & self , part_path : & Path , parquet_path : & Path ) -> std:: io:: Result < ( ) > {
703
+ std:: fs:: rename ( part_path, parquet_path)
704
+ }
705
+
706
+ fn cleanup_arrow_files_and_dir ( & self , arrow_files : & [ PathBuf ] ) {
707
+ for ( i, file) in arrow_files. iter ( ) . enumerate ( ) {
708
+ match file. metadata ( ) {
709
+ Ok ( meta) => {
710
+ let file_size = meta. len ( ) ;
711
+ match remove_file ( file) {
712
+ Ok ( _) => {
713
+ metrics:: STORAGE_SIZE
714
+ . with_label_values ( & [
715
+ "staging" ,
716
+ & self . stream_name ,
717
+ ARROW_FILE_EXTENSION ,
718
+ ] )
719
+ . sub ( file_size as i64 ) ;
688
720
}
689
- Err ( err ) => {
690
- warn ! ( "File ({}) not found; Error = {err }" , file. display( ) ) ;
721
+ Err ( e ) => {
722
+ warn ! ( "Failed to delete file {}: {e }" , file. display( ) ) ;
691
723
}
692
724
}
725
+ }
726
+ Err ( err) => {
727
+ warn ! ( "File ({}) not found; Error = {err}" , file. display( ) ) ;
728
+ }
729
+ }
693
730
694
- // After deleting the last file, try to remove the inprocess directory
695
- if i == arrow_files. len ( ) - 1 {
696
- if let Some ( parent_dir) = file. parent ( ) {
697
- if let Err ( err) = fs:: remove_dir ( parent_dir) {
698
- warn ! (
699
- "Failed to remove inprocess directory {}: {err}" ,
700
- parent_dir. display( )
701
- ) ;
731
+ // After deleting the last file, try to remove the inprocess directory if empty
732
+ if i == arrow_files. len ( ) - 1 {
733
+ if let Some ( parent_dir) = file. parent ( ) {
734
+ match fs:: read_dir ( parent_dir) {
735
+ Ok ( mut entries) => {
736
+ if entries. next ( ) . is_none ( ) {
737
+ if let Err ( err) = fs:: remove_dir ( parent_dir) {
738
+ warn ! (
739
+ "Failed to remove inprocess directory {}: {err}" ,
740
+ parent_dir. display( )
741
+ ) ;
742
+ }
702
743
}
703
744
}
745
+ Err ( err) => {
746
+ warn ! (
747
+ "Failed to read inprocess directory {}: {err}" ,
748
+ parent_dir. display( )
749
+ ) ;
750
+ }
704
751
}
705
752
}
706
753
}
707
754
}
708
- if schemas. is_empty ( ) {
709
- return Ok ( None ) ;
710
- }
711
-
712
- Ok ( Some ( Schema :: try_merge ( schemas) . unwrap ( ) ) )
713
755
}
714
756
715
757
pub fn updated_schema ( & self , current_schema : Schema ) -> Schema {
0 commit comments