Skip to content

Commit

Permalink
Add transport compression for UploadPath on server
Browse files Browse the repository at this point in the history
  • Loading branch information
DarkKirb committed Apr 7, 2023
1 parent efa15b9 commit a6bd8ac
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 1 deletion.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ humantime = "2.1.0"
humantime-serde = "1.1.1"
itoa = "=1.0.5"
maybe-owned = "0.3.4"
pin-project = "1.0.12"
rand = "0.8.5"
regex = "1.7.1"
ryu = "1.0.13"
Expand All @@ -68,6 +69,8 @@ features = [
"xz",
"zstd",
"brotli",
"deflate",
"gzip"
]

[dependencies.sea-orm]
Expand Down
10 changes: 9 additions & 1 deletion server/src/api/v1/upload_path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use tracing::instrument;
use uuid::Uuid;

use crate::config::CompressionType;
use crate::decompression::StreamingDecompressor;
use crate::error::{ErrorKind, ServerError, ServerResult};
use crate::narinfo::Compression;
use crate::{RequestState, State};
Expand Down Expand Up @@ -120,10 +121,17 @@ pub(crate) async fn upload_path(
headers: HeaderMap,
stream: BodyStream,
) -> ServerResult<Json<UploadPathResult>> {
let mut stream = StreamReader::new(
let compression_format = headers
.get("Content-Encoding")
.and_then(|e| e.to_str().ok())
.unwrap_or("");

let stream = StreamReader::new(
stream.map(|r| r.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))),
);

let mut stream = StreamingDecompressor::new_unbuffered(stream, compression_format)?;

let upload_info: UploadPathNarInfo = {
if let Some(preamble_size_bytes) = headers.get(ATTIC_NAR_INFO_PREAMBLE_SIZE) {
// Read from the beginning of the PUT body
Expand Down
85 changes: 85 additions & 0 deletions server/src/decompression.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
//! Module for implementing streaming decompression across multiple
//! algorithms

use std::{
io,
pin::Pin,
task::{Context, Poll},
};

use anyhow::anyhow;
use async_compression::tokio::bufread::{
BrotliDecoder, DeflateDecoder, GzipDecoder, XzDecoder, ZstdDecoder,
};
use pin_project::pin_project;
use tokio::io::{AsyncBufRead, AsyncRead, BufReader, ReadBuf};

use crate::error::{ErrorKind, ServerResult};

/// A streaming multi-codec decompressor
#[pin_project(project = SDProj)]
pub enum StreamingDecompressor<S: AsyncBufRead> {
/// None decompression
None(#[pin] S),
/// Brotli decompression
Brotli(#[pin] BrotliDecoder<S>),
/// Deflate decompression
Deflate(#[pin] DeflateDecoder<S>),
/// Gzip decompression
Gzip(#[pin] GzipDecoder<S>),
/// XZ decompression
Xz(#[pin] XzDecoder<S>),
/// Zstd decompression
Zstd(#[pin] ZstdDecoder<S>),
}

impl<S: AsyncBufRead> StreamingDecompressor<S> {
/// Creates a new streaming decompressor from a buffered stream and compression type.
///
/// An empty string or "identity" corresponds to no decompression.
///
/// # Errors
/// This function will return an error if the compression type is invalid
pub fn new(inner: S, kind: &str) -> ServerResult<Self> {
match kind {
"" | "identity" => Ok(Self::None(inner)),
"br" => Ok(Self::Brotli(BrotliDecoder::new(inner))),
"deflate" => Ok(Self::Deflate(DeflateDecoder::new(inner))),
"gzip" => Ok(Self::Gzip(GzipDecoder::new(inner))),
"xz" => Ok(Self::Xz(XzDecoder::new(inner))),
"zstd" => Ok(Self::Zstd(ZstdDecoder::new(inner))),
_ => Err(ErrorKind::RequestError(anyhow!(
"{} is unsupported transport compression",
kind
))
.into()),
}
}
}

impl<U: AsyncRead> StreamingDecompressor<BufReader<U>> {
/// Creates a new streaming decompressor from an unbuffered stream and compression type.
///
/// # Errors
/// This function will return an error if the compression type is invalid
pub fn new_unbuffered(inner: U, kind: &str) -> ServerResult<Self> {
Self::new(BufReader::new(inner), kind)
}
}

impl<S: AsyncBufRead> AsyncRead for StreamingDecompressor<S> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
match self.project() {
SDProj::None(i) => i.poll_read(cx, buf),
SDProj::Brotli(i) => i.poll_read(cx, buf),
SDProj::Deflate(i) => i.poll_read(cx, buf),
SDProj::Gzip(i) => i.poll_read(cx, buf),
SDProj::Xz(i) => i.poll_read(cx, buf),
SDProj::Zstd(i) => i.poll_read(cx, buf),
}
}
}
1 change: 1 addition & 0 deletions server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ mod api;
mod chunking;
pub mod config;
pub mod database;
mod decompression;
pub mod error;
pub mod gc;
mod middleware;
Expand Down

0 comments on commit a6bd8ac

Please sign in to comment.