Skip to content

Commit 03cc546

Browse files
committed
FIX: Fix support for multi-frame in AsyncDynRead
1 parent 4bdeb51 commit 03cc546

File tree

4 files changed

+56
-13
lines changed

4 files changed

+56
-13
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
# Changelog
22

3+
## 0.19.2 - TBD
4+
5+
### Bug fixes
6+
- Fixed issue where `AsyncDynReader` would only decode the first frame of multi-frame
7+
Zstandard files
8+
39
## 0.19.1 - 2024-07-16
410

511
### Bug fixes

rust/dbn/src/decode.rs

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -570,6 +570,8 @@ mod r#async {
570570

571571
use crate::enums::Compression;
572572

573+
use super::zstd::zstd_decoder;
574+
573575
/// A type for runtime polymorphism on compressed and uncompressed input.
574576
/// The async version of [`DynReader`](super::DynReader).
575577
pub struct DynReader<R>(DynReaderImpl<R>)
@@ -629,7 +631,7 @@ mod r#async {
629631
.await
630632
.map_err(|e| crate::Error::io(e, "creating buffer to infer encoding"))?;
631633
Ok(if super::zstd::starts_with_prefix(first_bytes) {
632-
Self(DynReaderImpl::ZStd(ZstdDecoder::new(reader)))
634+
Self(DynReaderImpl::ZStd(zstd_decoder(reader)))
633635
} else {
634636
Self(DynReaderImpl::Uncompressed(reader))
635637
})
@@ -693,4 +695,35 @@ mod r#async {
693695
}
694696
}
695697
}
698+
699+
#[cfg(test)]
700+
mod tests {
701+
use crate::{
702+
compat::InstrumentDefMsgV1,
703+
decode::{tests::TEST_DATA_PATH, AsyncDbnRecordDecoder},
704+
VersionUpgradePolicy,
705+
};
706+
707+
use super::*;
708+
709+
#[tokio::test]
710+
async fn test_decode_multiframe_zst() {
711+
let mut decoder = AsyncDbnRecordDecoder::with_version(
712+
DynReader::from_file(&format!(
713+
"{TEST_DATA_PATH}/multi-frame.definition.v1.dbn.frag.zst"
714+
))
715+
.await
716+
.unwrap(),
717+
1,
718+
VersionUpgradePolicy::AsIs,
719+
false,
720+
)
721+
.unwrap();
722+
let mut count = 0;
723+
while let Some(_rec) = decoder.decode::<InstrumentDefMsgV1>().await.unwrap() {
724+
count += 1;
725+
}
726+
assert_eq!(count, 8);
727+
}
728+
}
696729
}

rust/dbn/src/decode/dbn/async.rs

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,21 +8,13 @@ use tokio::{
88

99
use crate::{
1010
compat,
11-
decode::{r#async::ZSTD_FILE_BUFFER_CAPACITY, FromLittleEndianSlice, VersionUpgradePolicy},
11+
decode::{
12+
r#async::ZSTD_FILE_BUFFER_CAPACITY, zstd::zstd_decoder, FromLittleEndianSlice,
13+
VersionUpgradePolicy,
14+
},
1215
HasRType, Metadata, Record, RecordHeader, RecordRef, Result, DBN_VERSION, METADATA_FIXED_LEN,
1316
};
1417

15-
/// Helper to always set multiple members.
16-
fn zstd_decoder<R>(reader: R) -> ZstdDecoder<R>
17-
where
18-
R: io::AsyncBufReadExt + Unpin,
19-
{
20-
let mut zstd_decoder = ZstdDecoder::new(reader);
21-
// explicitly enable decoding multiple frames
22-
zstd_decoder.multiple_members(true);
23-
zstd_decoder
24-
}
25-
2618
/// An async decoder for Databento Binary Encoding (DBN), both metadata and records.
2719
pub struct Decoder<R>
2820
where

rust/dbn/src/decode/zstd.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,18 @@ pub fn starts_with_prefix(bytes: &[u8]) -> bool {
1515
ZSTD_MAGIC_NUMBER == magic
1616
}
1717

18+
/// Helper to always set multiple members.
19+
#[cfg(feature = "async")]
20+
pub(crate) fn zstd_decoder<R>(reader: R) -> async_compression::tokio::bufread::ZstdDecoder<R>
21+
where
22+
R: tokio::io::AsyncBufReadExt + Unpin,
23+
{
24+
let mut zstd_decoder = async_compression::tokio::bufread::ZstdDecoder::new(reader);
25+
// explicitly enable decoding multiple frames
26+
zstd_decoder.multiple_members(true);
27+
zstd_decoder
28+
}
29+
1830
#[cfg(test)]
1931
mod tests {
2032
use std::{fs::File, io::Read};

0 commit comments

Comments
 (0)