From 3c9fb53b85ef53946a6c5cda48273a78a60a8a09 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Charlotte=20=F0=9F=A6=9D=20Delenk?= Date: Fri, 7 Apr 2023 14:44:13 +0100 Subject: [PATCH] Add transport compression to attic push --- Cargo.lock | 109 ++++++++++++++++++++---------------- client/Cargo.toml | 13 +++++ client/src/api/mod.rs | 56 ++++++++++++++++-- client/src/command/login.rs | 3 +- client/src/compression.rs | 88 +++++++++++++++++++++++++++++ client/src/config.rs | 34 +++++++++++ client/src/main.rs | 1 + 7 files changed, 250 insertions(+), 54 deletions(-) create mode 100644 client/src/compression.rs diff --git a/Cargo.lock b/Cargo.lock index 0f5e2df..ebc53a9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -118,6 +118,7 @@ dependencies = [ "brotli", "flate2", "futures-core", + "futures-io", "memchr", "pin-project-lite", "tokio", @@ -128,9 +129,9 @@ dependencies = [ [[package]] name = "async-stream" -version = "0.3.4" +version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ad445822218ce64be7a341abfb0b1ea43b5c23aa83902542a4542e78309d8e5e" +checksum = "cd56dd203fef61ac097dd65721a419ddccb106b2d2b70ba60a6b529f03961a51" dependencies = [ "async-stream-impl", "futures-core", @@ -139,13 +140,13 @@ dependencies = [ [[package]] name = "async-stream-impl" -version = "0.3.4" +version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e4655ae1a7b0cdf149156f780c5bf3f1352bc53cbd9e0a361a7ef7b22947e965" +checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.13", ] [[package]] @@ -156,7 +157,7 @@ checksum = "095183a3539c7c7649b2beb87c2d3f0591f3a7fed07761cc546d244e27e0238c" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.109", ] [[package]] @@ -206,6 +207,8 @@ version = "0.1.0" dependencies = [ "anyhow", "async-channel", + "async-compression", + "async-stream", "attic", "bytes", "clap 4.1.8", @@ -219,6 +222,7 @@ dependencies = [ "indicatif", "lazy_static", "notify", + "pin-project", "regex", "reqwest", "serde", @@ -750,7 +754,7 @@ dependencies = [ "heck 0.4.1", "proc-macro2", "quote", - "syn", + "syn 1.0.109", ] [[package]] @@ -763,7 +767,7 @@ dependencies = [ "proc-macro-error", "proc-macro2", "quote", - "syn", + "syn 1.0.109", ] [[package]] @@ -818,7 +822,7 @@ dependencies = [ "regex", "rustc-hash", "shlex", - "syn", + "syn 1.0.109", "which", ] @@ -863,7 +867,7 @@ dependencies = [ "borsh-schema-derive-internal", "proc-macro-crate", "proc-macro2", - "syn", + "syn 1.0.109", ] [[package]] @@ -874,7 +878,7 @@ checksum = "186b734fa1c9f6743e90c95d7233c9faab6360d1a96d4ffa19d9cfd1e9350f8a" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.109", ] [[package]] @@ -885,7 +889,7 @@ checksum = "99b7ff1008316626f485991b960ade129253d4034014616b94f309a15366cc49" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.109", ] [[package]] @@ -934,7 +938,7 @@ checksum = "e31225543cb46f81a7e224762764f4a6a0f097b1db0b175f69e8065efaa42de5" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.109", ] [[package]] @@ -1061,7 +1065,7 @@ dependencies = [ "proc-macro-error", "proc-macro2", "quote", - "syn", + "syn 1.0.109", ] [[package]] @@ -1074,7 +1078,7 @@ dependencies = [ "proc-macro-error", "proc-macro2", "quote", - "syn", + "syn 1.0.109", ] [[package]] @@ -1323,7 +1327,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6d2301688392eb071b0bf1a37be05c469d3cc4dbbd95df672fe28ab021e6a096" dependencies = [ "quote", - "syn", + "syn 1.0.109", ] [[package]] @@ -1350,7 +1354,7 @@ dependencies = [ "proc-macro2", "quote", "scratch", - "syn", + "syn 1.0.109", ] [[package]] @@ -1367,7 +1371,7 @@ checksum = "086c685979a698443656e5cf7856c95c642295a38599f12fb1ff76fb28d19892" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.109", ] [[package]] @@ -1391,7 +1395,7 @@ dependencies = [ "proc-macro2", "quote", "strsim", - "syn", + "syn 1.0.109", ] [[package]] @@ -1402,7 +1406,7 @@ checksum = "b36230598a2d5de7ec1c6f51f72d8a99a9208daff41de2084d06e3fd3ea56685" dependencies = [ "darling_core", "quote", - "syn", + "syn 1.0.109", ] [[package]] @@ -1424,7 +1428,7 @@ checksum = "fcc3dd5e9e9c0b295d6e1e4d811fb6f157d5ffd784b8d202fc62eac8035a770b" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.109", ] [[package]] @@ -1485,7 +1489,7 @@ checksum = "3bf95dc3f046b9da4f2d51833c0d3547d8564ef6910f5c1ed130306a75b92886" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.109", ] [[package]] @@ -1568,7 +1572,7 @@ dependencies = [ "heck 0.4.1", "proc-macro2", "quote", - "syn", + "syn 1.0.109", ] [[package]] @@ -1607,7 +1611,7 @@ dependencies = [ "proc-macro-error", "proc-macro2", "quote", - "syn", + "syn 1.0.109", ] [[package]] @@ -1751,7 +1755,7 @@ checksum = "95a73af87da33b5acf53acfebdc339fe592ecf5357ac7c0a7734ab9d8c876a70" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.109", ] [[package]] @@ -2607,7 +2611,7 @@ dependencies = [ "proc-macro-error", "proc-macro2", "quote", - "syn", + "syn 1.0.109", ] [[package]] @@ -2724,7 +2728,7 @@ checksum = "069bdb1e05adc7a8990dce9cc75370895fbe4e3d58b9b73bf1aee56359344a55" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.109", ] [[package]] @@ -2818,7 +2822,7 @@ dependencies = [ "proc-macro-error-attr", "proc-macro2", "quote", - "syn", + "syn 1.0.109", "version_check", ] @@ -2835,9 +2839,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.51" +version = "1.0.56" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d727cae5b39d21da60fa540906919ad737832fe0b1c165da3a34d6548c849d6" +checksum = "2b63bdb0cd06f1f4dedf69b254734f9b45af66e4a031e42a7480257d9898b435" dependencies = [ "unicode-ident", ] @@ -2862,7 +2866,7 @@ dependencies = [ "itertools", "proc-macro2", "quote", - "syn", + "syn 1.0.109", ] [[package]] @@ -2891,14 +2895,14 @@ checksum = "16b845dbfca988fa33db069c0e230574d15a3088f147a87b64c7589eb662c9ac" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.109", ] [[package]] name = "quote" -version = "1.0.23" +version = "1.0.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8856d8364d252a14d474036ea1358d63c9e6965c8e5c1885c18f73d70bff9c7b" +checksum = "4424af4bf778aae2051a77b60283332f386554255d722233d09fbfc7e30da2fc" dependencies = [ "proc-macro2", ] @@ -3078,7 +3082,7 @@ checksum = "ff26ed6c7c4dfc2aa9480b86a60e3c7233543a270a680e10758a507c5a4ce476" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.109", ] [[package]] @@ -3294,7 +3298,7 @@ dependencies = [ "heck 0.3.3", "proc-macro2", "quote", - "syn", + "syn 1.0.109", ] [[package]] @@ -3350,7 +3354,7 @@ dependencies = [ "heck 0.3.3", "proc-macro2", "quote", - "syn", + "syn 1.0.109", "thiserror", ] @@ -3374,7 +3378,7 @@ dependencies = [ "heck 0.3.3", "proc-macro2", "quote", - "syn", + "syn 1.0.109", ] [[package]] @@ -3396,7 +3400,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn", + "syn 1.0.109", ] [[package]] @@ -3465,7 +3469,7 @@ checksum = "af487d118eecd09402d70a5d72551860e788df87b464af30e5ea6a38c75c541e" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.109", ] [[package]] @@ -3534,7 +3538,7 @@ dependencies = [ "darling", "proc-macro2", "quote", - "syn", + "syn 1.0.109", ] [[package]] @@ -3786,7 +3790,7 @@ dependencies = [ "sha2", "sqlx-core", "sqlx-rt", - "syn", + "syn 1.0.109", "url", ] @@ -3834,6 +3838,17 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "syn" +version = "2.0.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c9da457c5285ac1f936ebd076af6dac17a61cfe7826f2076b4d015cf47bc8ec" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + [[package]] name = "sync_wrapper" version = "0.1.2" @@ -3885,7 +3900,7 @@ checksum = "1fb327af4685e4d03fa8cbcf1716380da910eeb2bb8be417e7f9fd3fb164f36f" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.109", ] [[package]] @@ -3989,7 +4004,7 @@ checksum = "d266c00fde287f55d3f1c3e96c500c362a2b8c695076ec180f27918820bc6df8" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.109", ] [[package]] @@ -4189,7 +4204,7 @@ checksum = "4017f8f45139870ca7e672686113917c71c7a6e02d4924eda67186083c03081a" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.109", ] [[package]] @@ -4438,7 +4453,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn", + "syn 1.0.109", "wasm-bindgen-shared", ] @@ -4472,7 +4487,7 @@ checksum = "2aff81306fcac3c7515ad4e177f521b5c9a15f2b08f4e32d823066102f35a5f6" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.109", "wasm-bindgen-backend", "wasm-bindgen-shared", ] diff --git a/client/Cargo.toml b/client/Cargo.toml index 1026c96..d594fd9 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -13,6 +13,7 @@ attic = { path = "../attic" } anyhow = "1.0.69" async-channel = "1.8.0" +async-stream = "0.3.5" bytes = "1.4.0" clap = { version = "4.1", features = ["derive"] } clap_complete = "4.1.4" @@ -25,6 +26,7 @@ humantime = "2.1.0" indicatif = "0.17.3" lazy_static = "1.4.0" notify = { version = "5.1.0", default-features = false, features = ["macos_kqueue"] } +pin-project = "1.0.12" regex = "1.7.1" reqwest = { version = "0.11.14", default-features = false, features = ["json", "rustls-tls", "rustls-tls-native-roots", "stream"] } serde = { version = "1.0.152", features = ["derive"] } @@ -45,3 +47,14 @@ features = [ "rt-multi-thread", "sync", ] + +[dependencies.async-compression] +version = "0.3.15" +features = [ + "futures-io", + "xz", + "zstd", + "brotli", + "deflate", + "gzip" +] diff --git a/client/src/api/mod.rs b/client/src/api/mod.rs index 399627b..f7f2508 100644 --- a/client/src/api/mod.rs +++ b/client/src/api/mod.rs @@ -8,15 +8,19 @@ use displaydoc::Display; use futures::{ future, stream::{self, StreamExt, TryStream, TryStreamExt}, + AsyncReadExt, }; use reqwest::{ - header::{HeaderMap, HeaderValue, AUTHORIZATION, USER_AGENT}, + header::{HeaderMap, HeaderValue, AUTHORIZATION, CONTENT_ENCODING, USER_AGENT}, Body, Client as HttpClient, Response, StatusCode, Url, }; use serde::Deserialize; -use crate::config::ServerConfig; use crate::version::ATTIC_DISTRIBUTOR; +use crate::{ + compression::StreamingCompressor, + config::{CompressionConfig, ServerConfig}, +}; use attic::api::v1::cache_config::{CacheConfig, CreateCacheRequest}; use attic::api::v1::get_missing_paths::{GetMissingPathsRequest, GetMissingPathsResponse}; use attic::api::v1::upload_path::{ @@ -40,6 +44,9 @@ pub struct ApiClient { /// An initialized HTTP client. client: HttpClient, + + /// The compression algorithm to use. + compression: CompressionConfig, } /// An API error. @@ -67,6 +74,7 @@ impl ApiClient { Ok(Self { endpoint: Url::parse(&config.endpoint)?, client, + compression: config.compression, }) } @@ -176,7 +184,7 @@ impl ApiClient { force_preamble: bool, ) -> Result> where - S: TryStream + Send + Sync + 'static, + S: TryStream + Send + Sync + std::marker::Unpin + 'static, S::Error: Into> + Send + Sync, { let endpoint = self.endpoint.join("_api/v1/upload-path")?; @@ -185,7 +193,11 @@ impl ApiClient { let mut req = self .client .put(endpoint) - .header(USER_AGENT, HeaderValue::from_str(ATTIC_USER_AGENT)?); + .header(USER_AGENT, HeaderValue::from_str(ATTIC_USER_AGENT)?) + .header( + CONTENT_ENCODING, + HeaderValue::from_static(self.compression.http_value()), + ); if force_preamble || upload_info_json.len() >= NAR_INFO_PREAMBLE_THRESHOLD { let preamble = Bytes::from(upload_info_json); @@ -193,13 +205,45 @@ impl ApiClient { let preamble_stream = stream::once(future::ok(preamble)); let chained = preamble_stream.chain(stream.into_stream()); + let chained = chained.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)); + let mut compressed = + StreamingCompressor::new_unbuffered(chained.into_async_read(), self.compression); + let final_stream = async_stream::stream! { + loop { + let mut buf = vec![0; 4096]; + match compressed.read(&mut buf).await { + Ok(0) => {break;} + Ok(n) => { + buf.truncate(n); + yield Ok(buf); + } + Err(e) => {yield Err(e);} + } + } + }; req = req .header(ATTIC_NAR_INFO_PREAMBLE_SIZE, preamble_len) - .body(Body::wrap_stream(chained)); + .body(Body::wrap_stream(final_stream)); } else { + let stream = stream.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)); + let mut compressed = + StreamingCompressor::new_unbuffered(stream.into_async_read(), self.compression); + let final_stream = async_stream::stream! { + loop { + let mut buf = vec![0; 4096]; + match compressed.read(&mut buf).await { + Ok(0) => {break;} + Ok(n) => { + buf.truncate(n); + yield Ok(buf); + } + Err(e) => {yield Err(e);} + } + } + }; req = req .header(ATTIC_NAR_INFO, HeaderValue::from_str(&upload_info_json)?) - .body(Body::wrap_stream(stream)); + .body(Body::wrap_stream(final_stream)); } let res = req.send().await?; diff --git a/client/src/command/login.rs b/client/src/command/login.rs index 4cb9281..7205a61 100644 --- a/client/src/command/login.rs +++ b/client/src/command/login.rs @@ -3,7 +3,7 @@ use clap::Parser; use crate::cache::ServerName; use crate::cli::Opts; -use crate::config::{Config, ServerConfig}; +use crate::config::{CompressionConfig, Config, ServerConfig}; /// Log into an Attic server. #[derive(Debug, Parser)] @@ -43,6 +43,7 @@ pub async fn run(opts: Opts) -> Result<()> { ServerConfig { endpoint: sub.endpoint.to_owned(), token: sub.token.to_owned(), + compression: CompressionConfig::Zstd, }, ); } diff --git a/client/src/compression.rs b/client/src/compression.rs new file mode 100644 index 0000000..151fbb3 --- /dev/null +++ b/client/src/compression.rs @@ -0,0 +1,88 @@ +//! Module for implementing streaming decompression across multiple +//! algorithms + +use async_compression::futures::bufread::{ + BrotliEncoder, DeflateEncoder, GzipEncoder, XzEncoder, ZstdEncoder, +}; +use futures::io::{AsyncBufRead, AsyncRead, BufReader}; +use pin_project::pin_project; +use std::{ + io, + pin::Pin, + task::{Context, Poll}, +}; + +use crate::config::CompressionConfig; + +/// A streaming multi-codec decompressor +#[pin_project(project = SCProj)] +pub enum StreamingCompressor { + /// None decompression + None(#[pin] S), + /// Brotli decompression + Brotli(#[pin] BrotliEncoder), + /// Deflate decompression + Deflate(#[pin] DeflateEncoder), + /// Gzip decompression + Gzip(#[pin] GzipEncoder), + /// XZ decompression + Xz(#[pin] XzEncoder), + /// Zstd decompression + Zstd(#[pin] ZstdEncoder), +} + +impl StreamingCompressor { + /// Creates a new streaming decompressor from a buffered stream and compression type. + pub fn new(inner: S, kind: CompressionConfig) -> Self { + match kind { + CompressionConfig::None => Self::None(inner), + CompressionConfig::Brotli => Self::Brotli(BrotliEncoder::new(inner)), + CompressionConfig::Deflate => Self::Deflate(DeflateEncoder::new(inner)), + CompressionConfig::Gzip => Self::Gzip(GzipEncoder::new(inner)), + CompressionConfig::Xz => Self::Xz(XzEncoder::new(inner)), + CompressionConfig::Zstd => Self::Zstd(ZstdEncoder::new(inner)), + } + } +} + +impl StreamingCompressor> { + /// Creates a new streaming decompressor from an unbuffered stream and compression type. + /// + /// # Errors + /// This function will return an error if the compression type is invalid + pub fn new_unbuffered(inner: U, kind: CompressionConfig) -> Self { + Self::new(BufReader::new(inner), kind) + } +} + +impl AsyncRead for StreamingCompressor { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + match self.project() { + SCProj::None(i) => i.poll_read(cx, buf), + SCProj::Brotli(i) => i.poll_read(cx, buf), + SCProj::Deflate(i) => i.poll_read(cx, buf), + SCProj::Gzip(i) => i.poll_read(cx, buf), + SCProj::Xz(i) => i.poll_read(cx, buf), + SCProj::Zstd(i) => i.poll_read(cx, buf), + } + } + + fn poll_read_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &mut [io::IoSliceMut<'_>], + ) -> Poll> { + match self.project() { + SCProj::None(i) => i.poll_read_vectored(cx, bufs), + SCProj::Brotli(i) => i.poll_read_vectored(cx, bufs), + SCProj::Deflate(i) => i.poll_read_vectored(cx, bufs), + SCProj::Gzip(i) => i.poll_read_vectored(cx, bufs), + SCProj::Xz(i) => i.poll_read_vectored(cx, bufs), + SCProj::Zstd(i) => i.poll_read_vectored(cx, bufs), + } + } +} diff --git a/client/src/config.rs b/client/src/config.rs index 9ca727f..d5077e0 100644 --- a/client/src/config.rs +++ b/client/src/config.rs @@ -48,11 +48,45 @@ pub struct ConfigData { pub servers: HashMap, } +/// Compression Settings +#[derive(Debug, Copy, Clone, Deserialize, Serialize, Default)] +pub enum CompressionConfig { + /// No compression. + None, + /// Brotli compression. + Brotli, + /// Deflate compression. + Deflate, + /// Gzip compression. + Gzip, + /// Xz compression. + Xz, + /// Zstd compression. + #[default] + Zstd, +} + +impl CompressionConfig { + /// Returns the HTTP representation of this compression setting. + pub fn http_value(self) -> &'static str { + match self { + Self::None => "identity", + Self::Brotli => "br", + Self::Deflate => "deflate", + Self::Gzip => "gzip", + Self::Xz => "xz", + Self::Zstd => "zstd", + } + } +} + /// Configuration of a server. #[derive(Debug, Clone, Deserialize, Serialize)] pub struct ServerConfig { pub endpoint: String, pub token: Option, + #[serde(default)] + pub compression: CompressionConfig, } /// Wrapper that automatically saves the config once dropped. diff --git a/client/src/main.rs b/client/src/main.rs index ca85460..a190579 100644 --- a/client/src/main.rs +++ b/client/src/main.rs @@ -17,6 +17,7 @@ mod api; mod cache; mod cli; mod command; +mod compression; mod config; mod nix_config; mod nix_netrc;