@@ -84,15 +84,37 @@ pub struct MergedReverseRecordReader {
8484impl MergedReverseRecordReader {
8585 pub fn try_new ( files : & [ PathBuf ] ) -> Self {
8686 let mut readers = Vec :: with_capacity ( files. len ( ) ) ;
87- for file in files {
88- let Ok ( reader) = get_reverse_reader ( File :: open ( file) . unwrap ( ) ) else {
89- error ! ( "Invalid file detected, ignoring it: {:?}" , file) ;
90- continue ;
91- } ;
9287
93- readers. push ( reader) ;
88+ for file in files {
89+ match File :: open ( file) {
90+ Ok ( file) => {
91+ // Create two readers - one for counting and one for keeping
92+ match ( get_reverse_reader ( file. try_clone ( ) . unwrap ( ) ) , get_reverse_reader ( file. try_clone ( ) . unwrap ( ) ) ) {
93+ ( Ok ( count_reader) , Ok ( keep_reader) ) => {
94+ // Count records from the first reader
95+ let count: usize = count_reader
96+ . flat_map ( |r| r. ok ( ) )
97+ . map ( |b| b. num_rows ( ) ) . count ( ) ;
98+
99+ println ! ( "File {:?} has {} records" , file, count) ;
100+
101+ // Keep the second reader for actual processing
102+ readers. push ( keep_reader) ;
103+ }
104+ _ => {
105+ error ! ( "Invalid file detected, ignoring it: {:?}" , file) ;
106+ continue ;
107+ }
108+ }
109+ }
110+ Err ( e) => {
111+ error ! ( "Failed to open file {:?}: {}" , file, e) ;
112+ continue ;
113+ }
114+ }
94115 }
95-
116+
117+ println ! ( "Total valid readers: {}" , readers. len( ) ) ;
96118 Self { readers }
97119 }
98120
@@ -248,18 +270,30 @@ pub fn get_reverse_reader<T: Read + Seek>(
248270) -> Result < StreamReader < BufReader < OffsetReader < T > > > , io:: Error > {
249271 let mut offset = 0 ;
250272 let mut messages = Vec :: new ( ) ;
273+ let mut record_count = 0 ;
251274
252275 while let Some ( res) = find_limit_and_type ( & mut reader) . transpose ( ) {
253276 match res {
254277 Ok ( ( header, size) ) => {
278+ // Log message type and size
279+ // println!("Message type: {:?}, size: {}", header, size);
255280 messages. push ( ( header, offset, size) ) ;
281+ if header == MessageHeader :: RecordBatch {
282+ record_count += 1 ;
283+ }
256284 offset += size;
257285 }
258- Err ( err) if err. kind ( ) == io:: ErrorKind :: UnexpectedEof => break ,
286+ Err ( err) if err. kind ( ) == io:: ErrorKind :: UnexpectedEof => {
287+ println ! ( "Reached EOF after {} record batches" , record_count) ;
288+ break ;
289+ }
259290 Err ( err) => return Err ( err) ,
260291 }
261292 }
262293
294+ println ! ( "Total record batches found: {}" , record_count) ;
295+ println ! ( "Total messages found: {}" , messages. len( ) ) ;
296+
263297 // reverse everything leaving the first because it has schema message.
264298 messages[ 1 ..] . reverse ( ) ;
265299 let messages = messages
@@ -273,14 +307,58 @@ pub fn get_reverse_reader<T: Read + Seek>(
273307 Ok ( StreamReader :: try_new ( BufReader :: new ( OffsetReader :: new ( reader, messages) ) , None ) . unwrap ( ) )
274308}
275309
276- // return limit for
277310fn find_limit_and_type (
278311 reader : & mut ( impl Read + Seek ) ,
279312) -> Result < Option < ( MessageHeader , usize ) > , io:: Error > {
280313 let mut size = 0 ;
281314 let marker = reader. read_u32 :: < LittleEndian > ( ) ?;
282315 size += 4 ;
283316
317+ if marker != 0xFFFFFFFF {
318+ println ! ( "Invalid marker: {:x}" , marker) ;
319+ return Err ( io:: Error :: new (
320+ io:: ErrorKind :: InvalidData ,
321+ format ! ( "Invalid Continuation Marker: {:x}" , marker) ,
322+ ) ) ;
323+ }
324+
325+ let metadata_size = reader. read_u32 :: < LittleEndian > ( ) ? as usize ;
326+ size += 4 ;
327+
328+ if metadata_size == 0x00000000 {
329+ println ! ( "Found end of stream (metadata_size = 0)" ) ;
330+ return Ok ( None ) ;
331+ }
332+
333+ // println!("Metadata size: {}", metadata_size);
334+
335+ let mut message = vec ! [ 0u8 ; metadata_size] ;
336+ reader. read_exact ( & mut message) ?;
337+ size += metadata_size;
338+
339+ let message = unsafe { root_as_message_unchecked ( & message) } ;
340+ let header = message. header_type ( ) ;
341+ let message_size = message. bodyLength ( ) ;
342+
343+ // println!("Message header: {:?}, body length: {}", header, message_size);
344+
345+ size += message_size as usize ;
346+
347+ let padding = ( 8 - ( size % 8 ) ) % 8 ;
348+ reader. seek ( SeekFrom :: Current ( padding as i64 + message_size) ) ?;
349+ size += padding;
350+
351+ Ok ( Some ( ( header, size) ) )
352+ }
353+
354+ // return limit for
355+ fn find_limit_and_type1 (
356+ reader : & mut ( impl Read + Seek ) ,
357+ ) -> Result < Option < ( MessageHeader , usize ) > , io:: Error > {
358+ let mut size = 0 ;
359+ let marker = reader. read_u32 :: < LittleEndian > ( ) ?;
360+ size += 4 ;
361+
284362 if marker != 0xFFFFFFFF {
285363 return Err ( io:: Error :: new (
286364 io:: ErrorKind :: InvalidData ,
0 commit comments