Skip to content

Commit 1f40e11

Browse files
authored
VER: Release 0.20.1
2 parents d9fad40 + 37e9534 commit 1f40e11

File tree

10 files changed

+154
-29
lines changed

10 files changed

+154
-29
lines changed

CHANGELOG.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,14 @@
11
# Changelog
22

3+
## 0.20.1 - 2024-08-26
4+
5+
### Enhancements
6+
- Added `DynAsyncBufWriter` for buffering compressed or uncompressed async output
7+
- Added new publisher values for `XCIS.BBOTRADES` and `XNYS.BBOTRADES`
8+
9+
### Bug fixes
10+
- Added missing Python type stub for `pretty_ts_ref` in `StatMsg`
11+
312
## 0.20.0 - 2024-07-30
413

514
### Enhancements

Cargo.lock

Lines changed: 5 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ resolver = "2"
1111
[workspace.package]
1212
authors = ["Databento <[email protected]>"]
1313
edition = "2021"
14-
version = "0.20.0"
14+
version = "0.20.1"
1515
documentation = "https://docs.databento.com"
1616
repository = "https://github.com/databento/dbn"
1717
license = "Apache-2.0"

python/pyproject.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "databento-dbn"
3-
version = "0.20.0"
3+
version = "0.20.1"
44
description = "Python bindings for encoding and decoding Databento Binary Encoding (DBN)"
55
authors = ["Databento <[email protected]>"]
66
license = "Apache-2.0"
@@ -17,7 +17,7 @@ build-backend = "maturin"
1717

1818
[project]
1919
name = "databento-dbn"
20-
version = "0.20.0"
20+
version = "0.20.1"
2121
authors = [
2222
{ name = "Databento", email = "[email protected]" }
2323
]

python/python/databento_dbn/_lib.pyi

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4258,6 +4258,18 @@ class StatMsg(Record):
42584258
42594259
"""
42604260

4261+
@property
4262+
def pretty_ts_ref(self) -> dt.datetime:
4263+
"""
4264+
Reference timestamp expressed as the number of nanoseconds since the
4265+
UNIX epoch as a datetime or `pandas.Timestamp`, if available.
4266+
4267+
Returns
4268+
-------
4269+
datetime.datetime
4270+
4271+
"""
4272+
42614273
@property
42624274
def ts_ref(self) -> int:
42634275
"""

rust/dbn-cli/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ name = "dbn"
1616
path = "src/main.rs"
1717

1818
[dependencies]
19-
dbn = { path = "../dbn", version = "=0.20.0", default-features = false }
19+
dbn = { path = "../dbn", version = "=0.20.1", default-features = false }
2020

2121
anyhow = { workspace = true }
2222
clap = { version = "4.5", features = ["derive", "wrap_help"] }

rust/dbn/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ serde = ["dep:serde", "time/parsing", "time/serde"]
2525
trivial_copy = []
2626

2727
[dependencies]
28-
dbn-macros = { version = "=0.20.0", path = "../dbn-macros" }
28+
dbn-macros = { version = "=0.20.1", path = "../dbn-macros" }
2929

3030
async-compression = { version = "0.4.11", features = ["tokio", "zstd"], optional = true }
3131
csv = { workspace = true }

rust/dbn/src/encode.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ pub use self::{
2727
AsyncEncoder as AsyncDbnEncoder, AsyncMetadataEncoder as AsyncDbnMetadataEncoder,
2828
AsyncRecordEncoder as AsyncDbnRecordEncoder,
2929
},
30-
dyn_writer::DynAsyncWriter,
30+
dyn_writer::{DynAsyncBufWriter, DynAsyncWriter},
3131
json::AsyncEncoder as AsyncJsonEncoder,
3232
};
3333

rust/dbn/src/encode/dyn_writer.rs

Lines changed: 97 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ where
1414
W: io::Write,
1515
{
1616
Uncompressed(W),
17-
ZStd(zstd::stream::AutoFinishEncoder<'a, W>),
17+
Zstd(zstd::stream::AutoFinishEncoder<'a, W>),
1818
}
1919

2020
impl<'a, W> DynWriter<'a, W>
@@ -28,15 +28,15 @@ where
2828
pub fn new(writer: W, compression: Compression) -> Result<Self> {
2929
match compression {
3030
Compression::None => Ok(Self(DynWriterImpl::Uncompressed(writer))),
31-
Compression::ZStd => zstd_encoder(writer).map(|enc| Self(DynWriterImpl::ZStd(enc))),
31+
Compression::ZStd => zstd_encoder(writer).map(|enc| Self(DynWriterImpl::Zstd(enc))),
3232
}
3333
}
3434

3535
/// Returns a mutable reference to the underlying writer.
3636
pub fn get_mut(&mut self) -> &mut W {
3737
match &mut self.0 {
3838
DynWriterImpl::Uncompressed(w) => w,
39-
DynWriterImpl::ZStd(enc) => enc.get_mut(),
39+
DynWriterImpl::Zstd(enc) => enc.get_mut(),
4040
}
4141
}
4242
}
@@ -48,39 +48,41 @@ where
4848
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
4949
match &mut self.0 {
5050
DynWriterImpl::Uncompressed(writer) => writer.write(buf),
51-
DynWriterImpl::ZStd(writer) => writer.write(buf),
51+
DynWriterImpl::Zstd(writer) => writer.write(buf),
5252
}
5353
}
5454

5555
fn flush(&mut self) -> io::Result<()> {
5656
match &mut self.0 {
5757
DynWriterImpl::Uncompressed(writer) => writer.flush(),
58-
DynWriterImpl::ZStd(writer) => writer.flush(),
58+
DynWriterImpl::Zstd(writer) => writer.flush(),
5959
}
6060
}
6161

6262
fn write_vectored(&mut self, bufs: &[io::IoSlice<'_>]) -> io::Result<usize> {
6363
match &mut self.0 {
6464
DynWriterImpl::Uncompressed(writer) => writer.write_vectored(bufs),
65-
DynWriterImpl::ZStd(writer) => writer.write_vectored(bufs),
65+
DynWriterImpl::Zstd(writer) => writer.write_vectored(bufs),
6666
}
6767
}
6868

6969
fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
7070
match &mut self.0 {
7171
DynWriterImpl::Uncompressed(writer) => writer.write_all(buf),
72-
DynWriterImpl::ZStd(writer) => writer.write_all(buf),
72+
DynWriterImpl::Zstd(writer) => writer.write_all(buf),
7373
}
7474
}
7575

7676
fn write_fmt(&mut self, fmt: std::fmt::Arguments<'_>) -> io::Result<()> {
7777
match &mut self.0 {
7878
DynWriterImpl::Uncompressed(writer) => writer.write_fmt(fmt),
79-
DynWriterImpl::ZStd(writer) => writer.write_fmt(fmt),
79+
DynWriterImpl::Zstd(writer) => writer.write_fmt(fmt),
8080
}
8181
}
8282
}
8383

84+
#[cfg(feature = "async")]
85+
pub use r#async::DynBufWriter as DynAsyncBufWriter;
8486
#[cfg(feature = "async")]
8587
pub use r#async::DynWriter as DynAsyncWriter;
8688

@@ -92,11 +94,91 @@ mod r#async {
9294
};
9395

9496
use async_compression::tokio::write::ZstdEncoder;
95-
use tokio::io;
97+
use tokio::io::{self, BufWriter};
9698

9799
use crate::{encode::async_zstd_encoder, enums::Compression};
98100

101+
/// An object that allows for abstracting over compressed and uncompressed output
102+
/// with buffering.
103+
pub struct DynBufWriter<W, B = W>(DynBufWriterImpl<W, B>)
104+
where
105+
W: io::AsyncWriteExt + Unpin,
106+
B: io::AsyncWriteExt + Unpin;
107+
108+
enum DynBufWriterImpl<W, B>
109+
where
110+
W: io::AsyncWriteExt + Unpin,
111+
B: io::AsyncWriteExt + Unpin,
112+
{
113+
Uncompressed(B),
114+
Zstd(ZstdEncoder<W>),
115+
}
116+
117+
impl<W> DynBufWriter<W>
118+
where
119+
W: io::AsyncWriteExt + Unpin,
120+
{
121+
/// Creates a new instance of [`DynWriter`] which will wrap `writer` with
122+
/// `compression`.
123+
pub fn new(writer: W, compression: Compression) -> Self {
124+
Self(match compression {
125+
Compression::None => DynBufWriterImpl::Uncompressed(writer),
126+
Compression::ZStd => DynBufWriterImpl::Zstd(async_zstd_encoder(writer)),
127+
})
128+
}
129+
}
130+
131+
impl<W> DynBufWriter<W, BufWriter<W>>
132+
where
133+
W: io::AsyncWriteExt + Unpin,
134+
{
135+
/// Creates a new instance of [`DynWriter`], wrapping `writer` in a `BufWriter`.
136+
pub fn new_buffered(writer: W, compression: Compression) -> Self {
137+
Self(match compression {
138+
Compression::None => DynBufWriterImpl::Uncompressed(BufWriter::new(writer)),
139+
// `ZstdEncoder` already wraps `W` in a `BufWriter`, cf.
140+
// https://github.com/Nullus157/async-compression/blob/main/src/tokio/write/generic/encoder.rs
141+
Compression::ZStd => DynBufWriterImpl::Zstd(async_zstd_encoder(writer)),
142+
})
143+
}
144+
}
145+
146+
impl<W> io::AsyncWrite for DynBufWriter<W>
147+
where
148+
W: io::AsyncWrite + io::AsyncWriteExt + Unpin,
149+
{
150+
fn poll_write(
151+
mut self: Pin<&mut Self>,
152+
cx: &mut Context<'_>,
153+
buf: &[u8],
154+
) -> Poll<io::Result<usize>> {
155+
match &mut self.0 {
156+
DynBufWriterImpl::Uncompressed(w) => {
157+
io::AsyncWrite::poll_write(Pin::new(w), cx, buf)
158+
}
159+
DynBufWriterImpl::Zstd(enc) => io::AsyncWrite::poll_write(Pin::new(enc), cx, buf),
160+
}
161+
}
162+
163+
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
164+
match &mut self.0 {
165+
DynBufWriterImpl::Uncompressed(w) => io::AsyncWrite::poll_flush(Pin::new(w), cx),
166+
DynBufWriterImpl::Zstd(enc) => io::AsyncWrite::poll_flush(Pin::new(enc), cx),
167+
}
168+
}
169+
170+
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
171+
match &mut self.0 {
172+
DynBufWriterImpl::Uncompressed(w) => io::AsyncWrite::poll_shutdown(Pin::new(w), cx),
173+
DynBufWriterImpl::Zstd(enc) => io::AsyncWrite::poll_shutdown(Pin::new(enc), cx),
174+
}
175+
}
176+
}
177+
99178
/// An object that allows for abstracting over compressed and uncompressed output.
179+
///
180+
/// Compared with [`DynBufWriter`], only the compressed output is buffered, as it is
181+
/// required by the async Zstd implementation.
100182
pub struct DynWriter<W>(DynWriterImpl<W>)
101183
where
102184
W: io::AsyncWriteExt + Unpin;
@@ -106,7 +188,7 @@ mod r#async {
106188
W: io::AsyncWriteExt + Unpin,
107189
{
108190
Uncompressed(W),
109-
ZStd(ZstdEncoder<W>),
191+
Zstd(ZstdEncoder<W>),
110192
}
111193

112194
impl<W> DynWriter<W>
@@ -118,15 +200,15 @@ mod r#async {
118200
pub fn new(writer: W, compression: Compression) -> Self {
119201
Self(match compression {
120202
Compression::None => DynWriterImpl::Uncompressed(writer),
121-
Compression::ZStd => DynWriterImpl::ZStd(async_zstd_encoder(writer)),
203+
Compression::ZStd => DynWriterImpl::Zstd(async_zstd_encoder(writer)),
122204
})
123205
}
124206

125207
/// Returns a mutable reference to the underlying writer.
126208
pub fn get_mut(&mut self) -> &mut W {
127209
match &mut self.0 {
128210
DynWriterImpl::Uncompressed(w) => w,
129-
DynWriterImpl::ZStd(enc) => enc.get_mut(),
211+
DynWriterImpl::Zstd(enc) => enc.get_mut(),
130212
}
131213
}
132214
}
@@ -142,21 +224,21 @@ mod r#async {
142224
) -> Poll<io::Result<usize>> {
143225
match &mut self.0 {
144226
DynWriterImpl::Uncompressed(w) => io::AsyncWrite::poll_write(Pin::new(w), cx, buf),
145-
DynWriterImpl::ZStd(enc) => io::AsyncWrite::poll_write(Pin::new(enc), cx, buf),
227+
DynWriterImpl::Zstd(enc) => io::AsyncWrite::poll_write(Pin::new(enc), cx, buf),
146228
}
147229
}
148230

149231
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
150232
match &mut self.0 {
151233
DynWriterImpl::Uncompressed(w) => io::AsyncWrite::poll_flush(Pin::new(w), cx),
152-
DynWriterImpl::ZStd(enc) => io::AsyncWrite::poll_flush(Pin::new(enc), cx),
234+
DynWriterImpl::Zstd(enc) => io::AsyncWrite::poll_flush(Pin::new(enc), cx),
153235
}
154236
}
155237

156238
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
157239
match &mut self.0 {
158240
DynWriterImpl::Uncompressed(w) => io::AsyncWrite::poll_shutdown(Pin::new(w), cx),
159-
DynWriterImpl::ZStd(enc) => io::AsyncWrite::poll_shutdown(Pin::new(enc), cx),
241+
DynWriterImpl::Zstd(enc) => io::AsyncWrite::poll_shutdown(Pin::new(enc), cx),
160242
}
161243
}
162244
}

0 commit comments

Comments
 (0)