Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add transport compression to attic push #43

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
973 changes: 543 additions & 430 deletions Cargo.lock

Large diffs are not rendered by default.

15 changes: 14 additions & 1 deletion client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,20 @@ attic = { path = "../attic" }

anyhow = "1.0.71"
async-channel = "1.8.0"
async-stream = "0.3.5"
bytes = "1.4.0"
clap = { version = "4.3", features = ["derive"] }
clap_complete = "4.3.0"
const_format = "0.2.30"
dialoguer = "0.10.4"
displaydoc = "0.2.4"
enum-as-inner = "0.5.2"
enum-as-inner = "0.5.1"
futures = "0.3.28"
humantime = "2.1.0"
indicatif = "0.17.3"
lazy_static = "1.4.0"
notify = { version = "6.0.0", default-features = false, features = ["macos_kqueue"] }
pin-project = "1.0.12"
regex = "1.8.3"
reqwest = { version = "0.11.18", default-features = false, features = ["json", "rustls-tls", "rustls-tls-native-roots", "stream"] }
serde = { version = "1.0.163", features = ["derive"] }
Expand All @@ -45,3 +47,14 @@ features = [
"rt-multi-thread",
"sync",
]

[dependencies.async-compression]
version = "0.3.15"
features = [
"futures-io",
"xz",
"zstd",
"brotli",
"deflate",
"gzip"
]
56 changes: 50 additions & 6 deletions client/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,19 @@ use displaydoc::Display;
use futures::{
future,
stream::{self, StreamExt, TryStream, TryStreamExt},
AsyncReadExt,
};
use reqwest::{
header::{HeaderMap, HeaderValue, AUTHORIZATION, USER_AGENT},
header::{HeaderMap, HeaderValue, AUTHORIZATION, CONTENT_ENCODING, USER_AGENT},
Body, Client as HttpClient, Response, StatusCode, Url,
};
use serde::Deserialize;

use crate::config::ServerConfig;
use crate::version::ATTIC_DISTRIBUTOR;
use crate::{
compression::StreamingCompressor,
config::{CompressionConfig, ServerConfig},
};
use attic::api::v1::cache_config::{CacheConfig, CreateCacheRequest};
use attic::api::v1::get_missing_paths::{GetMissingPathsRequest, GetMissingPathsResponse};
use attic::api::v1::upload_path::{
Expand All @@ -40,6 +44,9 @@ pub struct ApiClient {

/// An initialized HTTP client.
client: HttpClient,

/// The compression algorithm to use.
compression: CompressionConfig,
}

/// An API error.
Expand Down Expand Up @@ -67,6 +74,7 @@ impl ApiClient {
Ok(Self {
endpoint: Url::parse(&config.endpoint)?,
client,
compression: config.compression,
})
}

Expand Down Expand Up @@ -176,7 +184,7 @@ impl ApiClient {
force_preamble: bool,
) -> Result<Option<UploadPathResult>>
where
S: TryStream<Ok = Bytes> + Send + Sync + 'static,
S: TryStream<Ok = Bytes> + Send + Sync + std::marker::Unpin + 'static,
S::Error: Into<Box<dyn StdError + Send + Sync>> + Send + Sync,
{
let endpoint = self.endpoint.join("_api/v1/upload-path")?;
Expand All @@ -185,21 +193,57 @@ impl ApiClient {
let mut req = self
.client
.put(endpoint)
.header(USER_AGENT, HeaderValue::from_str(ATTIC_USER_AGENT)?);
.header(USER_AGENT, HeaderValue::from_str(ATTIC_USER_AGENT)?)
.header(
CONTENT_ENCODING,
HeaderValue::from_static(self.compression.http_value()),
);

if force_preamble || upload_info_json.len() >= NAR_INFO_PREAMBLE_THRESHOLD {
let preamble = Bytes::from(upload_info_json);
let preamble_len = preamble.len();
let preamble_stream = stream::once(future::ok(preamble));

let chained = preamble_stream.chain(stream.into_stream());
let chained = chained.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e));
let mut compressed =
StreamingCompressor::new_unbuffered(chained.into_async_read(), self.compression);
let final_stream = async_stream::stream! {
loop {
let mut buf = vec![0; 4096];
match compressed.read(&mut buf).await {
Ok(0) => {break;}
Ok(n) => {
buf.truncate(n);
yield Ok(buf);
}
Err(e) => {yield Err(e);}
}
}
};
req = req
.header(ATTIC_NAR_INFO_PREAMBLE_SIZE, preamble_len)
.body(Body::wrap_stream(chained));
.body(Body::wrap_stream(final_stream));
} else {
let stream = stream.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e));
let mut compressed =
StreamingCompressor::new_unbuffered(stream.into_async_read(), self.compression);
let final_stream = async_stream::stream! {
loop {
let mut buf = vec![0; 4096];
match compressed.read(&mut buf).await {
Ok(0) => {break;}
Ok(n) => {
buf.truncate(n);
yield Ok(buf);
}
Err(e) => {yield Err(e);}
}
}
};
req = req
.header(ATTIC_NAR_INFO, HeaderValue::from_str(&upload_info_json)?)
.body(Body::wrap_stream(stream));
.body(Body::wrap_stream(final_stream));
}

let res = req.send().await?;
Expand Down
3 changes: 2 additions & 1 deletion client/src/command/login.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use clap::Parser;

use crate::cache::ServerName;
use crate::cli::Opts;
use crate::config::{Config, ServerConfig};
use crate::config::{CompressionConfig, Config, ServerConfig};

/// Log into an Attic server.
#[derive(Debug, Parser)]
Expand Down Expand Up @@ -43,6 +43,7 @@ pub async fn run(opts: Opts) -> Result<()> {
ServerConfig {
endpoint: sub.endpoint.to_owned(),
token: sub.token.to_owned(),
compression: CompressionConfig::Zstd,
},
);
}
Expand Down
88 changes: 88 additions & 0 deletions client/src/compression.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
//! Module for implementing streaming decompression across multiple
//! algorithms

use async_compression::futures::bufread::{
BrotliEncoder, DeflateEncoder, GzipEncoder, XzEncoder, ZstdEncoder,
};
use futures::io::{AsyncBufRead, AsyncRead, BufReader};
use pin_project::pin_project;
use std::{
io,
pin::Pin,
task::{Context, Poll},
};

use crate::config::CompressionConfig;

/// A streaming multi-codec decompressor
#[pin_project(project = SCProj)]
pub enum StreamingCompressor<S: AsyncBufRead> {
/// None decompression
None(#[pin] S),
/// Brotli decompression
Brotli(#[pin] BrotliEncoder<S>),
/// Deflate decompression
Deflate(#[pin] DeflateEncoder<S>),
/// Gzip decompression
Gzip(#[pin] GzipEncoder<S>),
/// XZ decompression
Xz(#[pin] XzEncoder<S>),
/// Zstd decompression
Zstd(#[pin] ZstdEncoder<S>),
}

impl<S: AsyncBufRead> StreamingCompressor<S> {
/// Creates a new streaming decompressor from a buffered stream and compression type.
pub fn new(inner: S, kind: CompressionConfig) -> Self {
match kind {
CompressionConfig::None => Self::None(inner),
CompressionConfig::Brotli => Self::Brotli(BrotliEncoder::new(inner)),
CompressionConfig::Deflate => Self::Deflate(DeflateEncoder::new(inner)),
CompressionConfig::Gzip => Self::Gzip(GzipEncoder::new(inner)),
CompressionConfig::Xz => Self::Xz(XzEncoder::new(inner)),
CompressionConfig::Zstd => Self::Zstd(ZstdEncoder::new(inner)),
}
}
}

impl<U: AsyncRead> StreamingCompressor<BufReader<U>> {
/// Creates a new streaming decompressor from an unbuffered stream and compression type.
///
/// # Errors
/// This function will return an error if the compression type is invalid
pub fn new_unbuffered(inner: U, kind: CompressionConfig) -> Self {
Self::new(BufReader::new(inner), kind)
}
}

impl<S: AsyncBufRead> AsyncRead for StreamingCompressor<S> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
match self.project() {
SCProj::None(i) => i.poll_read(cx, buf),
SCProj::Brotli(i) => i.poll_read(cx, buf),
SCProj::Deflate(i) => i.poll_read(cx, buf),
SCProj::Gzip(i) => i.poll_read(cx, buf),
SCProj::Xz(i) => i.poll_read(cx, buf),
SCProj::Zstd(i) => i.poll_read(cx, buf),
}
}

fn poll_read_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &mut [io::IoSliceMut<'_>],
) -> Poll<io::Result<usize>> {
match self.project() {
SCProj::None(i) => i.poll_read_vectored(cx, bufs),
SCProj::Brotli(i) => i.poll_read_vectored(cx, bufs),
SCProj::Deflate(i) => i.poll_read_vectored(cx, bufs),
SCProj::Gzip(i) => i.poll_read_vectored(cx, bufs),
SCProj::Xz(i) => i.poll_read_vectored(cx, bufs),
SCProj::Zstd(i) => i.poll_read_vectored(cx, bufs),
}
}
}
34 changes: 34 additions & 0 deletions client/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,45 @@ pub struct ConfigData {
pub servers: HashMap<ServerName, ServerConfig>,
}

/// Compression Settings
#[derive(Debug, Copy, Clone, Deserialize, Serialize, Default)]
pub enum CompressionConfig {
/// No compression.
None,
/// Brotli compression.
Brotli,
/// Deflate compression.
Deflate,
/// Gzip compression.
Gzip,
/// Xz compression.
Xz,
/// Zstd compression.
#[default]
Zstd,
}

impl CompressionConfig {
/// Returns the HTTP representation of this compression setting.
pub fn http_value(self) -> &'static str {
match self {
Self::None => "identity",
Self::Brotli => "br",
Self::Deflate => "deflate",
Self::Gzip => "gzip",
Self::Xz => "xz",
Self::Zstd => "zstd",
}
}
}

/// Configuration of a server.
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct ServerConfig {
pub endpoint: String,
pub token: Option<String>,
#[serde(default)]
pub compression: CompressionConfig,
}

/// Wrapper that automatically saves the config once dropped.
Expand Down
1 change: 1 addition & 0 deletions client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ mod api;
mod cache;
mod cli;
mod command;
mod compression;
mod config;
mod nix_config;
mod nix_netrc;
Expand Down
5 changes: 4 additions & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,15 @@ clap = { version = "4.3", features = ["derive"] }
derivative = "2.2.0"
digest = "0.10.7"
displaydoc = "0.2.4"
enum-as-inner = "0.5.2"
enum-as-inner = "0.5.1"
fastcdc = "3.0.3"
futures = "0.3.28"
hex = "0.4.3"
humantime = "2.1.0"
humantime-serde = "1.1.1"
itoa = "=1.0.5"
maybe-owned = "0.3.4"
pin-project = "1.0.12"
rand = "0.8.5"
regex = "1.8.3"
ryu = "1.0.13"
Expand All @@ -68,6 +69,8 @@ features = [
"xz",
"zstd",
"brotli",
"deflate",
"gzip"
]

[dependencies.sea-orm]
Expand Down
10 changes: 9 additions & 1 deletion server/src/api/v1/upload_path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use tracing::instrument;
use uuid::Uuid;

use crate::config::CompressionType;
use crate::decompression::StreamingDecompressor;
use crate::error::{ErrorKind, ServerError, ServerResult};
use crate::narinfo::Compression;
use crate::{RequestState, State};
Expand Down Expand Up @@ -122,10 +123,17 @@ pub(crate) async fn upload_path(
headers: HeaderMap,
stream: BodyStream,
) -> ServerResult<Json<UploadPathResult>> {
let mut stream = StreamReader::new(
let compression_format = headers
.get("Content-Encoding")
.and_then(|e| e.to_str().ok())
.unwrap_or("");

let stream = StreamReader::new(
stream.map(|r| r.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))),
);

let mut stream = StreamingDecompressor::new_unbuffered(stream, compression_format)?;

let upload_info: UploadPathNarInfo = {
if let Some(preamble_size_bytes) = headers.get(ATTIC_NAR_INFO_PREAMBLE_SIZE) {
// Read from the beginning of the PUT body
Expand Down
Loading