@@ -10,9 +10,10 @@ use crate::{
1010 decode:: {
1111 dbn:: fsm:: { DbnFsm , ProcessResult } ,
1212 zstd:: zstd_decoder,
13- AsyncSkipBytes , DbnMetadata , VersionUpgradePolicy , ZSTD_FILE_BUFFER_CAPACITY ,
13+ AsyncDecodeRecord , AsyncDecodeRecordRef , AsyncSkipBytes , DbnMetadata , VersionUpgradePolicy ,
14+ ZSTD_FILE_BUFFER_CAPACITY ,
1415 } ,
15- HasRType , Metadata , Record , RecordRef , Result , DBN_VERSION ,
16+ HasRType , Metadata , RecordRef , Result , DBN_VERSION ,
1617} ;
1718
1819/// An async decoder for Databento Binary Encoding (DBN), both metadata and records.
@@ -240,6 +241,24 @@ where
240241 }
241242}
242243
244+ impl < R > AsyncDecodeRecordRef for Decoder < R >
245+ where
246+ R : io:: AsyncReadExt + Unpin ,
247+ {
248+ async fn decode_record_ref ( & mut self ) -> crate :: Result < Option < RecordRef > > {
249+ self . decoder . decode_ref ( ) . await
250+ }
251+ }
252+
253+ impl < R > AsyncDecodeRecord for Decoder < R >
254+ where
255+ R : io:: AsyncReadExt + Unpin ,
256+ {
257+ async fn decode_record < ' a , T : HasRType + ' a > ( & ' a mut self ) -> crate :: Result < Option < & ' a T > > {
258+ self . decoder . decode ( ) . await
259+ }
260+ }
261+
243262/// An async decoder for files and streams of Databento Binary Encoding (DBN) records.
244263pub struct RecordDecoder < R >
245264where
@@ -343,20 +362,13 @@ where
343362 /// This method is cancel safe. It can be used within a `tokio::select!` statement
344363 /// without the potential for corrupting the input stream.
345364 pub async fn decode < ' a , T : HasRType + ' a > ( & ' a mut self ) -> Result < Option < & ' a T > > {
346- let rec_ref = self . decode_ref ( ) . await ?;
347- if let Some ( rec_ref) = rec_ref {
348- rec_ref
349- . get :: < T > ( )
350- . ok_or_else ( || {
351- crate :: Error :: conversion :: < T > ( format ! (
352- "record with rtype {:#04X}" ,
353- rec_ref. header( ) . rtype
354- ) )
355- } )
356- . map ( Some )
357- } else {
358- Ok ( None )
359- }
365+ self . decode_ref ( ) . await . and_then ( |rec| {
366+ if let Some ( rec) = rec {
367+ rec. try_get ( ) . map ( Some )
368+ } else {
369+ Ok ( None )
370+ }
371+ } )
360372 }
361373
362374 /// Tries to decode all records into a `Vec`. This eagerly decodes the data.
@@ -428,6 +440,24 @@ where
428440 }
429441}
430442
443+ impl < R > AsyncDecodeRecordRef for RecordDecoder < R >
444+ where
445+ R : io:: AsyncReadExt + Unpin ,
446+ {
447+ async fn decode_record_ref ( & mut self ) -> crate :: Result < Option < RecordRef > > {
448+ self . decode_ref ( ) . await
449+ }
450+ }
451+
452+ impl < R > AsyncDecodeRecord for RecordDecoder < R >
453+ where
454+ R : io:: AsyncReadExt + Unpin ,
455+ {
456+ async fn decode_record < ' a , T : HasRType + ' a > ( & ' a mut self ) -> crate :: Result < Option < & ' a T > > {
457+ self . decode ( ) . await
458+ }
459+ }
460+
431461impl < R > RecordDecoder < R >
432462where
433463 R : AsyncSkipBytes + io:: AsyncReadExt + Unpin ,
@@ -665,8 +695,8 @@ mod tests {
665695 AsyncEncodeRecord , DbnEncodable ,
666696 } ,
667697 rtype, v1, v2, Bbo1SMsg , CbboMsg , Cmbp1Msg , Error , ErrorMsg , ImbalanceMsg ,
668- InstrumentDefMsg , MboMsg , Mbp10Msg , Mbp1Msg , OhlcvMsg , RecordHeader , Result , Schema ,
669- StatMsg , StatusMsg , TbboMsg , TradeMsg , WithTsOut ,
698+ InstrumentDefMsg , MboMsg , Mbp10Msg , Mbp1Msg , OhlcvMsg , Record , RecordHeader , Result ,
699+ Schema , StatMsg , StatusMsg , TbboMsg , TradeMsg , WithTsOut ,
670700 } ;
671701
672702 #[ rstest]
0 commit comments