Skip to content

Commit

Permalink
Add transport compression for UploadPath on server
Browse files Browse the repository at this point in the history
  • Loading branch information
DarkKirb committed Aug 13, 2023
1 parent 4902d57 commit 45d904e
Show file tree
Hide file tree
Showing 11 changed files with 831 additions and 440 deletions.
973 changes: 543 additions & 430 deletions Cargo.lock

Large diffs are not rendered by default.

15 changes: 14 additions & 1 deletion client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,20 @@ attic = { path = "../attic" }

anyhow = "1.0.71"
async-channel = "1.8.0"
async-stream = "0.3.5"
bytes = "1.4.0"
clap = { version = "4.3", features = ["derive"] }
clap_complete = "4.3.0"
const_format = "0.2.30"
dialoguer = "0.10.4"
displaydoc = "0.2.4"
enum-as-inner = "0.5.2"
enum-as-inner = "0.5.1"
futures = "0.3.28"
humantime = "2.1.0"
indicatif = "0.17.3"
lazy_static = "1.4.0"
notify = { version = "6.0.0", default-features = false, features = ["macos_kqueue"] }
pin-project = "1.0.12"
regex = "1.8.3"
reqwest = { version = "0.11.18", default-features = false, features = ["json", "rustls-tls", "rustls-tls-native-roots", "stream"] }
serde = { version = "1.0.163", features = ["derive"] }
Expand All @@ -45,3 +47,14 @@ features = [
"rt-multi-thread",
"sync",
]

[dependencies.async-compression]
version = "0.3.15"
features = [
"futures-io",
"xz",
"zstd",
"brotli",
"deflate",
"gzip"
]
56 changes: 50 additions & 6 deletions client/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,19 @@ use displaydoc::Display;
use futures::{
future,
stream::{self, StreamExt, TryStream, TryStreamExt},
AsyncReadExt,
};
use reqwest::{
header::{HeaderMap, HeaderValue, AUTHORIZATION, USER_AGENT},
header::{HeaderMap, HeaderValue, AUTHORIZATION, CONTENT_ENCODING, USER_AGENT},
Body, Client as HttpClient, Response, StatusCode, Url,
};
use serde::Deserialize;

use crate::config::ServerConfig;
use crate::version::ATTIC_DISTRIBUTOR;
use crate::{
compression::StreamingCompressor,
config::{CompressionConfig, ServerConfig},
};
use attic::api::v1::cache_config::{CacheConfig, CreateCacheRequest};
use attic::api::v1::get_missing_paths::{GetMissingPathsRequest, GetMissingPathsResponse};
use attic::api::v1::upload_path::{
Expand All @@ -40,6 +44,9 @@ pub struct ApiClient {

/// An initialized HTTP client.
client: HttpClient,

/// The compression algorithm to use.
compression: CompressionConfig,
}

/// An API error.
Expand Down Expand Up @@ -67,6 +74,7 @@ impl ApiClient {
Ok(Self {
endpoint: Url::parse(&config.endpoint)?,
client,
compression: config.compression,
})
}

Expand Down Expand Up @@ -176,7 +184,7 @@ impl ApiClient {
force_preamble: bool,
) -> Result<Option<UploadPathResult>>
where
S: TryStream<Ok = Bytes> + Send + Sync + 'static,
S: TryStream<Ok = Bytes> + Send + Sync + std::marker::Unpin + 'static,
S::Error: Into<Box<dyn StdError + Send + Sync>> + Send + Sync,
{
let endpoint = self.endpoint.join("_api/v1/upload-path")?;
Expand All @@ -185,21 +193,57 @@ impl ApiClient {
let mut req = self
.client
.put(endpoint)
.header(USER_AGENT, HeaderValue::from_str(ATTIC_USER_AGENT)?);
.header(USER_AGENT, HeaderValue::from_str(ATTIC_USER_AGENT)?)
.header(
CONTENT_ENCODING,
HeaderValue::from_static(self.compression.http_value()),
);

if force_preamble || upload_info_json.len() >= NAR_INFO_PREAMBLE_THRESHOLD {
let preamble = Bytes::from(upload_info_json);
let preamble_len = preamble.len();
let preamble_stream = stream::once(future::ok(preamble));

let chained = preamble_stream.chain(stream.into_stream());
let chained = chained.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e));
let mut compressed =
StreamingCompressor::new_unbuffered(chained.into_async_read(), self.compression);
let final_stream = async_stream::stream! {
loop {
let mut buf = vec![0; 4096];
match compressed.read(&mut buf).await {
Ok(0) => {break;}
Ok(n) => {
buf.truncate(n);
yield Ok(buf);
}
Err(e) => {yield Err(e);}
}
}
};
req = req
.header(ATTIC_NAR_INFO_PREAMBLE_SIZE, preamble_len)
.body(Body::wrap_stream(chained));
.body(Body::wrap_stream(final_stream));
} else {
let stream = stream.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e));
let mut compressed =
StreamingCompressor::new_unbuffered(stream.into_async_read(), self.compression);
let final_stream = async_stream::stream! {
loop {
let mut buf = vec![0; 4096];
match compressed.read(&mut buf).await {
Ok(0) => {break;}
Ok(n) => {
buf.truncate(n);
yield Ok(buf);
}
Err(e) => {yield Err(e);}
}
}
};
req = req
.header(ATTIC_NAR_INFO, HeaderValue::from_str(&upload_info_json)?)
.body(Body::wrap_stream(stream));
.body(Body::wrap_stream(final_stream));
}

let res = req.send().await?;
Expand Down
3 changes: 2 additions & 1 deletion client/src/command/login.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use clap::Parser;

use crate::cache::ServerName;
use crate::cli::Opts;
use crate::config::{Config, ServerConfig};
use crate::config::{CompressionConfig, Config, ServerConfig};

/// Log into an Attic server.
#[derive(Debug, Parser)]
Expand Down Expand Up @@ -43,6 +43,7 @@ pub async fn run(opts: Opts) -> Result<()> {
ServerConfig {
endpoint: sub.endpoint.to_owned(),
token: sub.token.to_owned(),
compression: CompressionConfig::Zstd,
},
);
}
Expand Down
88 changes: 88 additions & 0 deletions client/src/compression.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
//! Module for implementing streaming decompression across multiple
//! algorithms

use async_compression::futures::bufread::{
BrotliEncoder, DeflateEncoder, GzipEncoder, XzEncoder, ZstdEncoder,
};
use futures::io::{AsyncBufRead, AsyncRead, BufReader};
use pin_project::pin_project;
use std::{
io,
pin::Pin,
task::{Context, Poll},
};

use crate::config::CompressionConfig;

/// A streaming multi-codec decompressor
#[pin_project(project = SCProj)]
pub enum StreamingCompressor<S: AsyncBufRead> {
/// None decompression
None(#[pin] S),
/// Brotli decompression
Brotli(#[pin] BrotliEncoder<S>),
/// Deflate decompression
Deflate(#[pin] DeflateEncoder<S>),
/// Gzip decompression
Gzip(#[pin] GzipEncoder<S>),
/// XZ decompression
Xz(#[pin] XzEncoder<S>),
/// Zstd decompression
Zstd(#[pin] ZstdEncoder<S>),
}

impl<S: AsyncBufRead> StreamingCompressor<S> {
/// Creates a new streaming decompressor from a buffered stream and compression type.
pub fn new(inner: S, kind: CompressionConfig) -> Self {
match kind {
CompressionConfig::None => Self::None(inner),
CompressionConfig::Brotli => Self::Brotli(BrotliEncoder::new(inner)),
CompressionConfig::Deflate => Self::Deflate(DeflateEncoder::new(inner)),
CompressionConfig::Gzip => Self::Gzip(GzipEncoder::new(inner)),
CompressionConfig::Xz => Self::Xz(XzEncoder::new(inner)),
CompressionConfig::Zstd => Self::Zstd(ZstdEncoder::new(inner)),
}
}
}

impl<U: AsyncRead> StreamingCompressor<BufReader<U>> {
/// Creates a new streaming decompressor from an unbuffered stream and compression type.
///
/// # Errors
/// This function will return an error if the compression type is invalid
pub fn new_unbuffered(inner: U, kind: CompressionConfig) -> Self {
Self::new(BufReader::new(inner), kind)
}
}

impl<S: AsyncBufRead> AsyncRead for StreamingCompressor<S> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
match self.project() {
SCProj::None(i) => i.poll_read(cx, buf),
SCProj::Brotli(i) => i.poll_read(cx, buf),
SCProj::Deflate(i) => i.poll_read(cx, buf),
SCProj::Gzip(i) => i.poll_read(cx, buf),
SCProj::Xz(i) => i.poll_read(cx, buf),
SCProj::Zstd(i) => i.poll_read(cx, buf),
}
}

fn poll_read_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &mut [io::IoSliceMut<'_>],
) -> Poll<io::Result<usize>> {
match self.project() {
SCProj::None(i) => i.poll_read_vectored(cx, bufs),
SCProj::Brotli(i) => i.poll_read_vectored(cx, bufs),
SCProj::Deflate(i) => i.poll_read_vectored(cx, bufs),
SCProj::Gzip(i) => i.poll_read_vectored(cx, bufs),
SCProj::Xz(i) => i.poll_read_vectored(cx, bufs),
SCProj::Zstd(i) => i.poll_read_vectored(cx, bufs),
}
}
}
34 changes: 34 additions & 0 deletions client/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,45 @@ pub struct ConfigData {
pub servers: HashMap<ServerName, ServerConfig>,
}

/// Compression Settings
#[derive(Debug, Copy, Clone, Deserialize, Serialize, Default)]
pub enum CompressionConfig {
/// No compression.
None,
/// Brotli compression.
Brotli,
/// Deflate compression.
Deflate,
/// Gzip compression.
Gzip,
/// Xz compression.
Xz,
/// Zstd compression.
#[default]
Zstd,
}

impl CompressionConfig {
/// Returns the HTTP representation of this compression setting.
pub fn http_value(self) -> &'static str {
match self {
Self::None => "identity",
Self::Brotli => "br",
Self::Deflate => "deflate",
Self::Gzip => "gzip",
Self::Xz => "xz",
Self::Zstd => "zstd",
}
}
}

/// Configuration of a server.
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct ServerConfig {
pub endpoint: String,
pub token: Option<String>,
#[serde(default)]
pub compression: CompressionConfig,
}

/// Wrapper that automatically saves the config once dropped.
Expand Down
1 change: 1 addition & 0 deletions client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ mod api;
mod cache;
mod cli;
mod command;
mod compression;
mod config;
mod nix_config;
mod nix_netrc;
Expand Down
5 changes: 4 additions & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,15 @@ clap = { version = "4.3", features = ["derive"] }
derivative = "2.2.0"
digest = "0.10.7"
displaydoc = "0.2.4"
enum-as-inner = "0.5.2"
enum-as-inner = "0.5.1"
fastcdc = "3.0.3"
futures = "0.3.28"
hex = "0.4.3"
humantime = "2.1.0"
humantime-serde = "1.1.1"
itoa = "=1.0.5"
maybe-owned = "0.3.4"
pin-project = "1.0.12"
rand = "0.8.5"
regex = "1.8.3"
ryu = "1.0.13"
Expand All @@ -68,6 +69,8 @@ features = [
"xz",
"zstd",
"brotli",
"deflate",
"gzip"
]

[dependencies.sea-orm]
Expand Down
10 changes: 9 additions & 1 deletion server/src/api/v1/upload_path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use tracing::instrument;
use uuid::Uuid;

use crate::config::CompressionType;
use crate::decompression::StreamingDecompressor;
use crate::error::{ErrorKind, ServerError, ServerResult};
use crate::narinfo::Compression;
use crate::{RequestState, State};
Expand Down Expand Up @@ -122,10 +123,17 @@ pub(crate) async fn upload_path(
headers: HeaderMap,
stream: BodyStream,
) -> ServerResult<Json<UploadPathResult>> {
let mut stream = StreamReader::new(
let compression_format = headers
.get("Content-Encoding")
.and_then(|e| e.to_str().ok())
.unwrap_or("");

let stream = StreamReader::new(
stream.map(|r| r.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))),
);

let mut stream = StreamingDecompressor::new_unbuffered(stream, compression_format)?;

let upload_info: UploadPathNarInfo = {
if let Some(preamble_size_bytes) = headers.get(ATTIC_NAR_INFO_PREAMBLE_SIZE) {
// Read from the beginning of the PUT body
Expand Down
Loading

0 comments on commit 45d904e

Please sign in to comment.