Skip to content

Commit

Permalink
VER: Release 0.20.1
Browse files Browse the repository at this point in the history
  • Loading branch information
threecgreen authored Aug 27, 2024
2 parents d9fad40 + 37e9534 commit 1f40e11
Show file tree
Hide file tree
Showing 10 changed files with 154 additions and 29 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
# Changelog

## 0.20.1 - 2024-08-26

### Enhancements
- Added `DynAsyncBufWriter` for buffering compressed or uncompressed async output
- Added new publisher values for `XCIS.BBOTRADES` and `XNYS.BBOTRADES`

### Bug fixes
- Added missing Python type stub for `pretty_ts_ref` in `StatMsg`

## 0.20.0 - 2024-07-30

### Enhancements
Expand Down
10 changes: 5 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ resolver = "2"
[workspace.package]
authors = ["Databento <support@databento.com>"]
edition = "2021"
version = "0.20.0"
version = "0.20.1"
documentation = "https://docs.databento.com"
repository = "https://github.com/databento/dbn"
license = "Apache-2.0"
Expand Down
4 changes: 2 additions & 2 deletions python/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "databento-dbn"
version = "0.20.0"
version = "0.20.1"
description = "Python bindings for encoding and decoding Databento Binary Encoding (DBN)"
authors = ["Databento <support@databento.com>"]
license = "Apache-2.0"
Expand All @@ -17,7 +17,7 @@ build-backend = "maturin"

[project]
name = "databento-dbn"
version = "0.20.0"
version = "0.20.1"
authors = [
{ name = "Databento", email = "support@databento.com" }
]
Expand Down
12 changes: 12 additions & 0 deletions python/python/databento_dbn/_lib.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -4258,6 +4258,18 @@ class StatMsg(Record):
"""

@property
def pretty_ts_ref(self) -> dt.datetime:
"""
Reference timestamp expressed as the number of nanoseconds since the
UNIX epoch as a datetime or `pandas.Timestamp`, if available.
Returns
-------
datetime.datetime
"""

@property
def ts_ref(self) -> int:
"""
Expand Down
2 changes: 1 addition & 1 deletion rust/dbn-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ name = "dbn"
path = "src/main.rs"

[dependencies]
dbn = { path = "../dbn", version = "=0.20.0", default-features = false }
dbn = { path = "../dbn", version = "=0.20.1", default-features = false }

anyhow = { workspace = true }
clap = { version = "4.5", features = ["derive", "wrap_help"] }
Expand Down
2 changes: 1 addition & 1 deletion rust/dbn/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ serde = ["dep:serde", "time/parsing", "time/serde"]
trivial_copy = []

[dependencies]
dbn-macros = { version = "=0.20.0", path = "../dbn-macros" }
dbn-macros = { version = "=0.20.1", path = "../dbn-macros" }

async-compression = { version = "0.4.11", features = ["tokio", "zstd"], optional = true }
csv = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion rust/dbn/src/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub use self::{
AsyncEncoder as AsyncDbnEncoder, AsyncMetadataEncoder as AsyncDbnMetadataEncoder,
AsyncRecordEncoder as AsyncDbnRecordEncoder,
},
dyn_writer::DynAsyncWriter,
dyn_writer::{DynAsyncBufWriter, DynAsyncWriter},
json::AsyncEncoder as AsyncJsonEncoder,
};

Expand Down
112 changes: 97 additions & 15 deletions rust/dbn/src/encode/dyn_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ where
W: io::Write,
{
Uncompressed(W),
ZStd(zstd::stream::AutoFinishEncoder<'a, W>),
Zstd(zstd::stream::AutoFinishEncoder<'a, W>),
}

impl<'a, W> DynWriter<'a, W>
Expand All @@ -28,15 +28,15 @@ where
pub fn new(writer: W, compression: Compression) -> Result<Self> {
match compression {
Compression::None => Ok(Self(DynWriterImpl::Uncompressed(writer))),
Compression::ZStd => zstd_encoder(writer).map(|enc| Self(DynWriterImpl::ZStd(enc))),
Compression::ZStd => zstd_encoder(writer).map(|enc| Self(DynWriterImpl::Zstd(enc))),
}
}

/// Returns a mutable reference to the underlying writer.
pub fn get_mut(&mut self) -> &mut W {
match &mut self.0 {
DynWriterImpl::Uncompressed(w) => w,
DynWriterImpl::ZStd(enc) => enc.get_mut(),
DynWriterImpl::Zstd(enc) => enc.get_mut(),
}
}
}
Expand All @@ -48,39 +48,41 @@ where
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
match &mut self.0 {
DynWriterImpl::Uncompressed(writer) => writer.write(buf),
DynWriterImpl::ZStd(writer) => writer.write(buf),
DynWriterImpl::Zstd(writer) => writer.write(buf),
}
}

fn flush(&mut self) -> io::Result<()> {
match &mut self.0 {
DynWriterImpl::Uncompressed(writer) => writer.flush(),
DynWriterImpl::ZStd(writer) => writer.flush(),
DynWriterImpl::Zstd(writer) => writer.flush(),
}
}

fn write_vectored(&mut self, bufs: &[io::IoSlice<'_>]) -> io::Result<usize> {
match &mut self.0 {
DynWriterImpl::Uncompressed(writer) => writer.write_vectored(bufs),
DynWriterImpl::ZStd(writer) => writer.write_vectored(bufs),
DynWriterImpl::Zstd(writer) => writer.write_vectored(bufs),
}
}

fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
match &mut self.0 {
DynWriterImpl::Uncompressed(writer) => writer.write_all(buf),
DynWriterImpl::ZStd(writer) => writer.write_all(buf),
DynWriterImpl::Zstd(writer) => writer.write_all(buf),
}
}

fn write_fmt(&mut self, fmt: std::fmt::Arguments<'_>) -> io::Result<()> {
match &mut self.0 {
DynWriterImpl::Uncompressed(writer) => writer.write_fmt(fmt),
DynWriterImpl::ZStd(writer) => writer.write_fmt(fmt),
DynWriterImpl::Zstd(writer) => writer.write_fmt(fmt),
}
}
}

#[cfg(feature = "async")]
pub use r#async::DynBufWriter as DynAsyncBufWriter;
#[cfg(feature = "async")]
pub use r#async::DynWriter as DynAsyncWriter;

Expand All @@ -92,11 +94,91 @@ mod r#async {
};

use async_compression::tokio::write::ZstdEncoder;
use tokio::io;
use tokio::io::{self, BufWriter};

use crate::{encode::async_zstd_encoder, enums::Compression};

/// An object that allows for abstracting over compressed and uncompressed output
/// with buffering.
pub struct DynBufWriter<W, B = W>(DynBufWriterImpl<W, B>)
where
W: io::AsyncWriteExt + Unpin,
B: io::AsyncWriteExt + Unpin;

enum DynBufWriterImpl<W, B>
where
W: io::AsyncWriteExt + Unpin,
B: io::AsyncWriteExt + Unpin,
{
Uncompressed(B),
Zstd(ZstdEncoder<W>),
}

impl<W> DynBufWriter<W>
where
W: io::AsyncWriteExt + Unpin,
{
/// Creates a new instance of [`DynWriter`] which will wrap `writer` with
/// `compression`.
pub fn new(writer: W, compression: Compression) -> Self {
Self(match compression {
Compression::None => DynBufWriterImpl::Uncompressed(writer),
Compression::ZStd => DynBufWriterImpl::Zstd(async_zstd_encoder(writer)),
})
}
}

impl<W> DynBufWriter<W, BufWriter<W>>
where
W: io::AsyncWriteExt + Unpin,
{
/// Creates a new instance of [`DynWriter`], wrapping `writer` in a `BufWriter`.
pub fn new_buffered(writer: W, compression: Compression) -> Self {
Self(match compression {
Compression::None => DynBufWriterImpl::Uncompressed(BufWriter::new(writer)),
// `ZstdEncoder` already wraps `W` in a `BufWriter`, cf.
// https://github.com/Nullus157/async-compression/blob/main/src/tokio/write/generic/encoder.rs
Compression::ZStd => DynBufWriterImpl::Zstd(async_zstd_encoder(writer)),
})
}
}

impl<W> io::AsyncWrite for DynBufWriter<W>
where
W: io::AsyncWrite + io::AsyncWriteExt + Unpin,
{
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
match &mut self.0 {
DynBufWriterImpl::Uncompressed(w) => {
io::AsyncWrite::poll_write(Pin::new(w), cx, buf)
}
DynBufWriterImpl::Zstd(enc) => io::AsyncWrite::poll_write(Pin::new(enc), cx, buf),
}
}

fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
match &mut self.0 {
DynBufWriterImpl::Uncompressed(w) => io::AsyncWrite::poll_flush(Pin::new(w), cx),
DynBufWriterImpl::Zstd(enc) => io::AsyncWrite::poll_flush(Pin::new(enc), cx),
}
}

fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
match &mut self.0 {
DynBufWriterImpl::Uncompressed(w) => io::AsyncWrite::poll_shutdown(Pin::new(w), cx),
DynBufWriterImpl::Zstd(enc) => io::AsyncWrite::poll_shutdown(Pin::new(enc), cx),
}
}
}

/// An object that allows for abstracting over compressed and uncompressed output.
///
/// Compared with [`DynBufWriter`], only the compressed output is buffered, as it is
/// required by the async Zstd implementation.
pub struct DynWriter<W>(DynWriterImpl<W>)
where
W: io::AsyncWriteExt + Unpin;
Expand All @@ -106,7 +188,7 @@ mod r#async {
W: io::AsyncWriteExt + Unpin,
{
Uncompressed(W),
ZStd(ZstdEncoder<W>),
Zstd(ZstdEncoder<W>),
}

impl<W> DynWriter<W>
Expand All @@ -118,15 +200,15 @@ mod r#async {
pub fn new(writer: W, compression: Compression) -> Self {
Self(match compression {
Compression::None => DynWriterImpl::Uncompressed(writer),
Compression::ZStd => DynWriterImpl::ZStd(async_zstd_encoder(writer)),
Compression::ZStd => DynWriterImpl::Zstd(async_zstd_encoder(writer)),
})
}

/// Returns a mutable reference to the underlying writer.
pub fn get_mut(&mut self) -> &mut W {
match &mut self.0 {
DynWriterImpl::Uncompressed(w) => w,
DynWriterImpl::ZStd(enc) => enc.get_mut(),
DynWriterImpl::Zstd(enc) => enc.get_mut(),
}
}
}
Expand All @@ -142,21 +224,21 @@ mod r#async {
) -> Poll<io::Result<usize>> {
match &mut self.0 {
DynWriterImpl::Uncompressed(w) => io::AsyncWrite::poll_write(Pin::new(w), cx, buf),
DynWriterImpl::ZStd(enc) => io::AsyncWrite::poll_write(Pin::new(enc), cx, buf),
DynWriterImpl::Zstd(enc) => io::AsyncWrite::poll_write(Pin::new(enc), cx, buf),
}
}

fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
match &mut self.0 {
DynWriterImpl::Uncompressed(w) => io::AsyncWrite::poll_flush(Pin::new(w), cx),
DynWriterImpl::ZStd(enc) => io::AsyncWrite::poll_flush(Pin::new(enc), cx),
DynWriterImpl::Zstd(enc) => io::AsyncWrite::poll_flush(Pin::new(enc), cx),
}
}

fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
match &mut self.0 {
DynWriterImpl::Uncompressed(w) => io::AsyncWrite::poll_shutdown(Pin::new(w), cx),
DynWriterImpl::ZStd(enc) => io::AsyncWrite::poll_shutdown(Pin::new(enc), cx),
DynWriterImpl::Zstd(enc) => io::AsyncWrite::poll_shutdown(Pin::new(enc), cx),
}
}
}
Expand Down
Loading

0 comments on commit 1f40e11

Please sign in to comment.