Skip to content

Commit

Permalink
perf(fetcher/receiver): parallelize data transfer and io
Browse files Browse the repository at this point in the history
  • Loading branch information
PhotonQuantum committed Aug 31, 2023
1 parent 7185a03 commit 7e1677a
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 44 deletions.
144 changes: 102 additions & 42 deletions rsync-fetcher/src/rsync/receiver.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
use std::cmp::Ordering;
use std::io::SeekFrom;
use std::fs::File;
use std::io;
use std::io::{Read, Seek, SeekFrom, Write};
use std::ops::{Deref, DerefMut};
use std::path::PathBuf;
use std::sync::Arc;

use blake2::Blake2b;
use digest::consts::U20;
use digest::Digest;
use eyre::{ensure, Result};
use md4::Md4;
use tempfile::{tempfile_in, TempDir};
use tempfile::{tempfile_in, TempDir, TempPath};
use tokio::fs;
use tokio::fs::File;
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufReader};
use tokio::io::{AsyncReadExt, BufReader};
use tokio::net::tcp::OwnedReadHalf;
use tokio::sync::Semaphore;
use tokio::sync::{mpsc, Semaphore};
use tracing::{debug, info, instrument};

use rsync_core::utils::ToHex;
Expand Down Expand Up @@ -80,12 +80,43 @@ enum FileToken {
Done,
}

enum IOChunk {
Data(Vec<u8>),
Copied { offset: u64, data_len: i32 },
}

#[derive(Debug)]
struct RecvResult {
target_file: File,
blake2b_hash: [u8; 20],
}

struct BasisFile {
_path: TempPath,
file: File,
pb: ProgressDisplay,
}

impl Deref for BasisFile {
type Target = File;

fn deref(&self) -> &Self::Target {
&self.file
}
}

impl DerefMut for BasisFile {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.file
}
}

impl Drop for BasisFile {
fn drop(&mut self) {
self.pb.dec_basis(1);
}
}

impl Receiver {
pub async fn recv_task(mut self) -> Result<Self> {
self.recv_task_mut().await?;
Expand Down Expand Up @@ -124,21 +155,14 @@ impl Receiver {

// Get basis file if exists. It should be created by generator if delta transfer is
// enabled and an old version of the file exists.
let mut basis_file = self.try_get_basis_file(&entry.name).await?;
let basis_file = self.try_get_basis_file(&entry.name)?;

// Receive file data.
let RecvResult {
target_file,
blake2b_hash,
} = self.recv_data(basis_file.as_mut().map(|(_, f)| f)).await?;

// Delete basis file once it's not needed.
// TODO use NamedTempFile or some kind of RAII handle instead?
if let Some((p, f)) = basis_file {
drop(f);
fs::remove_file(p).await?;
self.pb.dec_basis(1);
}
} = self.recv_data(basis_file).await?;

// Release permit.
self.permits.add_permits(1);
self.pb.dec_pending(1);
Expand All @@ -149,58 +173,101 @@ impl Receiver {
.send_async(UploadTask {
idx,
blake2b_hash,
file: target_file,
file: fs::File::from_std(target_file),
})
.await?;
self.pb.inc_uploading(1);

Ok(())
}

async fn try_get_basis_file(&self, path: &[u8]) -> Result<Option<(PathBuf, File)>> {
fn try_get_basis_file(&self, path: &[u8]) -> Result<Option<BasisFile>> {
let basis_path = self
.basis_dir
.path()
.join(format!("{:x}", hash(path).as_hex()));
Ok(File::open(&basis_path)
.await
.map(|f| Some((basis_path, f)))
.map(|f| {
Some(BasisFile {
_path: TempPath::from_path(basis_path),
file: f,
pb: self.pb.clone(),
})
})
.or_else(|e| {
if e.kind() == std::io::ErrorKind::NotFound {
if e.kind() == io::ErrorKind::NotFound {
Ok(None)
} else {
Err(e)
}
})?)
}

async fn recv_data(&mut self, mut local_basis: Option<&mut File>) -> Result<RecvResult> {
async fn recv_data(&mut self, mut local_basis: Option<BasisFile>) -> Result<RecvResult> {
let SumHead {
checksum_count,
block_len,
checksum_len: _,
remainder_len,
} = SumHead::read_from(&mut **self).await?;

let mut target_file = File::from_std(tempfile_in(&self.basis_dir)?);
let mut target_file = tempfile_in(&self.basis_dir)?;

// Hasher for final file consistency check.
let mut md4_hasher = Md4::default();
md4_hasher.update(self.seed.to_le_bytes());
// Hasher for content addressing. Hash function is blake2b-160.
let mut blake2b_hasher = Blake2b::<U20>::default();

let (tx, mut rx) = mpsc::channel(1024);

#[allow(clippy::read_zero_byte_vec)] // false positive
let io_task = tokio::task::spawn_blocking(move || {
let mut buf = vec![];
while let Some(token) = rx.blocking_recv() {
match token {
IOChunk::Data(data) => {
md4_hasher.update(&data);
blake2b_hasher.update(&data);
target_file.write_all(&data)?;
}
IOChunk::Copied { offset, data_len } => {
#[allow(clippy::cast_sign_loss)] // data_len is always positive.
if buf.len() != data_len as usize {
buf.resize(data_len as usize, 0);
}

let local_basis = local_basis.as_mut().expect("incremental");
local_basis.seek(SeekFrom::Start(offset))?;
local_basis.read_exact(&mut buf)?;

md4_hasher.update(&buf);
blake2b_hasher.update(&buf);
target_file.write_all(&buf)?;
}
}
}

let md4: [u8; 16] = md4_hasher.finalize().into();
let blake2b: [u8; 20] = blake2b_hasher.finalize().into();

target_file.seek(SeekFrom::Start(0))?;
Ok::<_, io::Error>((md4, blake2b, target_file))
});

let (mut transferred, mut copied) = (0u64, 0u64);
loop {
let token = self.recv_token().await?;
match token {
FileToken::Data(data) => {
// Protect against malicious input.
// The maximum memory usage is 128 * 1024 * 1024 = 128 MiB.
assert!(data.len() <= 128 * 1024, "data chunk too large");

transferred += data.len() as u64;
self.pb.inc(data.len() as u64);

md4_hasher.update(&data);
blake2b_hasher.update(&data);
target_file.write_all(&data).await?;
tx.send(IOChunk::Data(data)).await?;
}
// We interpret sum head values as unsigned ints anyway.
#[allow(clippy::cast_sign_loss)]
Expand All @@ -215,24 +282,19 @@ impl Receiver {
copied += data_len as u64;
self.pb.inc(data_len as u64);

let mut buf = vec![0; data_len as usize];
let local_basis = local_basis.as_deref_mut().expect("incremental");
local_basis.seek(SeekFrom::Start(offset)).await?;
local_basis.read_exact(&mut buf).await?;

md4_hasher.update(&buf);
blake2b_hasher.update(&buf);
target_file.write_all(&buf).await?;
tx.send(IOChunk::Copied { offset, data_len }).await?;
}
FileToken::Done => break,
}
}

let local_checksum = md4_hasher.finalize(); // we don't need constant time comparison here
let mut remote_checksum = vec![0; local_checksum.len()];
drop(tx);
let (md4, blake2b, mut target_file) = io_task.await??;

let mut remote_md4 = [0; 16];

self.read_exact(&mut remote_checksum).await?;
ensure!(*local_checksum == remote_checksum, "checksum mismatch");
self.read_exact(&mut remote_md4).await?;
ensure!(md4 == remote_md4, "checksum mismatch");

// A debug log anyway.
#[allow(clippy::cast_precision_loss)]
Expand All @@ -241,12 +303,10 @@ impl Receiver {

// No need to set perms because we'll upload it to s3.

let hash: [u8; 20] = blake2b_hasher.finalize().into();

target_file.seek(SeekFrom::Start(0)).await?;
target_file.seek(SeekFrom::Start(0))?;
Ok(RecvResult {
target_file,
blake2b_hash: hash,
blake2b_hash: blake2b,
})
}

Expand Down
3 changes: 1 addition & 2 deletions rsync-fetcher/src/rsync/uploader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl Uploader {
let UploadTask {
idx,
blake2b_hash,
file,
mut file,
} = task;

// Avoid repeatedly uploading the same file to address
Expand All @@ -89,7 +89,6 @@ impl Uploader {
// REMARK: If a file is soft/hard linked, users may see a content-disposition with a
// different filename instead of the one they expect.
// TODO since we no longer need static file path, we can use presign on the gateway.
let mut file = file.try_clone().await.expect("unable to dup file");
file.seek(SeekFrom::Start(0))
.await
.expect("unable to seek file");
Expand Down

0 comments on commit 7e1677a

Please sign in to comment.