|
37 | 37 | /// `reader` or the input is encoded in a newer version of DBN. |
38 | 38 | /// |
39 | 39 | /// # Cancel safety |
40 | | - /// This method is not cancellation safe. If this method is used in a |
41 | | - /// `tokio::select!` statement and another branch completes first, the metadata |
42 | | - /// may have been partially read, corrupting the stream. |
| 40 | + /// This method is cancel safe. It can be used within a `tokio::select!` statement |
| 41 | + /// without the potential for corrupting the input stream. |
43 | 42 | pub async fn new(reader: R) -> crate::Result<Self> { |
44 | 43 | let mut metadata_decoder = MetadataDecoder::new(reader); |
45 | 44 | let mut metadata = metadata_decoder.decode().await?; |
|
58 | 57 | /// `reader` or the input is encoded in a newer version of DBN. |
59 | 58 | /// |
60 | 59 | /// # Cancel safety |
61 | | - /// This method is not cancellation safe. If this method is used in a |
62 | | - /// `tokio::select!` statement and another branch completes first, the metadata |
63 | | - /// may have been partially read, corrupting the stream. |
| 60 | + /// This method is cancel safe. It can be used within a `tokio::select!` statement |
| 61 | + /// without the potential for corrupting the input stream. |
64 | 62 | pub async fn with_upgrade_policy( |
65 | 63 | reader: R, |
66 | 64 | upgrade_policy: VersionUpgradePolicy, |
@@ -102,7 +100,7 @@ where |
102 | 100 | /// incompatible. |
103 | 101 | pub fn set_upgrade_policy(&mut self, upgrade_policy: VersionUpgradePolicy) -> Result<()> { |
104 | 102 | self.decoder.set_upgrade_policy(upgrade_policy)?; |
105 | | - self.metadata.set_version(upgrade_policy); |
| 103 | + self.metadata.upgrade(upgrade_policy); |
106 | 104 | Ok(()) |
107 | 105 | } |
108 | 106 |
|
@@ -293,6 +291,12 @@ where |
293 | 291 | Ok(Self { reader, fsm }) |
294 | 292 | } |
295 | 293 |
|
| 294 | + /// Creates a new `RecordDecoder` that will decode from `reader` and use the state |
| 295 | + /// and buffered data already in `fsm`. |
| 296 | + pub fn with_fsm(reader: R, fsm: DbnFsm) -> Self { |
| 297 | + Self { reader, fsm } |
| 298 | + } |
| 299 | + |
296 | 300 | /// Sets the DBN version to expect when decoding. |
297 | 301 | /// |
298 | 302 | /// # Errors |
@@ -496,45 +500,23 @@ where |
496 | 500 | Self { reader, fsm } |
497 | 501 | } |
498 | 502 |
|
| 503 | + /// Creates a new `MetadataDecoder` that will decode from `reader` and use the state |
| 504 | + /// and buffered data already in `fsm`. |
| 505 | + pub fn with_fsm(reader: R, fsm: DbnFsm) -> Self { |
| 506 | + Self { reader, fsm } |
| 507 | + } |
| 508 | + |
499 | 509 | /// Decodes and returns a DBN [`Metadata`]. |
500 | 510 | /// |
501 | 511 | /// # Errors |
502 | 512 | /// This function will return an error if it is unable to parse the metadata or the |
503 | 513 | /// input is encoded in a newere version of DBN. |
504 | 514 | /// |
505 | 515 | /// # Cancel safety |
506 | | - /// This method is not cancellation safe. If this method is used in a |
507 | | - /// `tokio::select!` statement and another branch completes first, the metadata |
508 | | - /// may have been partially read, corrupting the stream. |
| 516 | + /// This method is cancel safe. It can be used within a `tokio::select!` statement |
| 517 | + /// without the potential for corrupting the input stream. |
509 | 518 | pub async fn decode(&mut self) -> Result<Metadata> { |
510 | | - let io_err = |err| crate::Error::io(err, "decoding metadata"); |
511 | | - let nbytes = self.reader.read(self.fsm.space()).await.map_err(io_err)?; |
512 | | - self.fsm.fill(nbytes); |
513 | | - loop { |
514 | | - match self.fsm.process() { |
515 | | - ProcessResult::ReadMore(n) => { |
516 | | - // asm guarantees there's at least `n` bytes available in `space()` |
517 | | - let mut total_read = 0; |
518 | | - loop { |
519 | | - let read = self.reader.read(self.fsm.space()).await.map_err(io_err)?; |
520 | | - if read == 0 { |
521 | | - return Err(crate::Error::io( |
522 | | - io::Error::from(io::ErrorKind::UnexpectedEof), |
523 | | - "decoding metadata", |
524 | | - )); |
525 | | - } |
526 | | - self.fsm.fill(read); |
527 | | - total_read += read; |
528 | | - if total_read >= n { |
529 | | - break; |
530 | | - } |
531 | | - } |
532 | | - } |
533 | | - ProcessResult::Metadata(metadata) => return Ok(metadata), |
534 | | - ProcessResult::Record(_) => unreachable!("metadata precedes records"), |
535 | | - ProcessResult::Err(error) => return Err(error), |
536 | | - } |
537 | | - } |
| 519 | + decode_metadata_with_fsm(&mut self.reader, &mut self.fsm).await |
538 | 520 | } |
539 | 521 |
|
540 | 522 | /// Returns a mutable reference to the inner reader. |
@@ -568,6 +550,99 @@ where |
568 | 550 | } |
569 | 551 | } |
570 | 552 |
|
| 553 | +/// A low-level function for decoding DBN [`Metadata`]. Generally, the [`Decoder`] or |
| 554 | +/// [`MetadataDecoder`] should be used instead of this function. |
| 555 | +/// |
| 556 | +/// # Errors |
| 557 | +/// This function will return an error if it is unable to parse the metadata or the |
| 558 | +/// input is encoded in a newere version of DBN. |
| 559 | +/// |
| 560 | +/// # Cancel safety |
| 561 | +/// This method is cancel safe. It can be used within a `tokio::select!` statement |
| 562 | +/// without the potential for corrupting the input stream. |
| 563 | +/// |
| 564 | +/// # Panics |
| 565 | +/// This function will panic if it encounters DBN records. The caller the initial data |
| 566 | +/// from `reader` contains DBN metadata. |
| 567 | +pub async fn decode_metadata_with_fsm<R>(mut reader: R, fsm: &mut DbnFsm) -> crate::Result<Metadata> |
| 568 | +where |
| 569 | + R: io::AsyncReadExt + Unpin, |
| 570 | +{ |
| 571 | + let io_err = |err| crate::Error::io(err, "decoding metadata"); |
| 572 | + let nbytes = reader.read(fsm.space()).await.map_err(io_err)?; |
| 573 | + fsm.fill(nbytes); |
| 574 | + loop { |
| 575 | + match fsm.process() { |
| 576 | + ProcessResult::ReadMore(n) => { |
| 577 | + // asm guarantees there's at least `n` bytes available in `space()` |
| 578 | + let mut total_read = 0; |
| 579 | + loop { |
| 580 | + let read = reader.read(fsm.space()).await.map_err(io_err)?; |
| 581 | + if read == 0 { |
| 582 | + return Err(crate::Error::io( |
| 583 | + io::Error::from(io::ErrorKind::UnexpectedEof), |
| 584 | + "decoding metadata", |
| 585 | + )); |
| 586 | + } |
| 587 | + fsm.fill(read); |
| 588 | + total_read += read; |
| 589 | + if total_read >= n { |
| 590 | + break; |
| 591 | + } |
| 592 | + } |
| 593 | + } |
| 594 | + ProcessResult::Metadata(metadata) => return Ok(metadata), |
| 595 | + ProcessResult::Record(_) => panic!("DBN metadata should precede records"), |
| 596 | + ProcessResult::Err(error) => return Err(error), |
| 597 | + } |
| 598 | + } |
| 599 | +} |
| 600 | + |
| 601 | +/// A low-level function for decoding the next [`RecordRef`]. Returns `None` if `reader` |
| 602 | +/// has been exhausted. |
| 603 | +/// |
| 604 | +/// Generally [`Decoder`] and [`RecordDecoder`] should be used instead of this function. |
| 605 | +/// |
| 606 | +/// # Errors |
| 607 | +/// This function returns an error if the underlying reader returns an |
| 608 | +/// error of a kind other than `io::ErrorKind::UnexpectedEof` upon reading. |
| 609 | +/// It will also return an error if it encounters an invalid record. |
| 610 | +/// |
| 611 | +/// # Cancel safety |
| 612 | +/// This method is cancel safe. It can be used within a `tokio::select!` statement |
| 613 | +/// without the potential for corrupting the input stream. |
| 614 | +/// |
| 615 | +/// # Panics |
| 616 | +/// This function will panic if it encounters DBN metadata. The caller must ensure |
| 617 | +/// the metadata has already been decoded. |
| 618 | +pub async fn decode_record_ref_with_fsm<'a, R>( |
| 619 | + mut reader: R, |
| 620 | + fsm: &'a mut DbnFsm, |
| 621 | +) -> Result<Option<RecordRef<'a>>> |
| 622 | +where |
| 623 | + R: io::AsyncReadExt + Unpin, |
| 624 | +{ |
| 625 | + loop { |
| 626 | + match fsm.process() { |
| 627 | + ProcessResult::ReadMore(_) => match reader.read(fsm.space()).await { |
| 628 | + Ok(0) => return Ok(None), |
| 629 | + Ok(nbytes) => { |
| 630 | + fsm.fill(nbytes); |
| 631 | + } |
| 632 | + Err(err) if err.kind() == std::io::ErrorKind::UnexpectedEof => { |
| 633 | + return Ok(None); |
| 634 | + } |
| 635 | + Err(err) => { |
| 636 | + return Err(crate::Error::io(err, "decoding record reference")); |
| 637 | + } |
| 638 | + }, |
| 639 | + ProcessResult::Record(_) => return Ok(fsm.last_record()), |
| 640 | + ProcessResult::Err(error) => return Err(error), |
| 641 | + ProcessResult::Metadata(_) => panic!("found DBN metadata when expected records"), |
| 642 | + } |
| 643 | + } |
| 644 | +} |
| 645 | + |
571 | 646 | #[cfg(test)] |
572 | 647 | mod tests { |
573 | 648 | #![allow(clippy::clone_on_copy)] |
@@ -770,7 +845,7 @@ mod tests { |
770 | 845 | VersionUpgradePolicy::UpgradeToV2, |
771 | 846 | ) |
772 | 847 | .await?; |
773 | | - assert_eq!(decoder.metadata().version, crate::DBN_VERSION); |
| 848 | + assert_eq!(decoder.metadata().version, 2); |
774 | 849 | assert_eq!(decoder.metadata().symbol_cstr_len, crate::SYMBOL_CSTR_LEN); |
775 | 850 | let mut has_decoded = false; |
776 | 851 | while let Some(_rec) = decoder.decode_record::<v2::InstrumentDefMsg>().await? { |
|
0 commit comments