Skip to content

Commit

Permalink
perf(fetcher): generate whole sum payload before send
Browse files Browse the repository at this point in the history
  • Loading branch information
PhotonQuantum committed Aug 31, 2023
1 parent 5965579 commit 7185a03
Showing 1 changed file with 39 additions and 22 deletions.
61 changes: 39 additions & 22 deletions rsync-fetcher/src/rsync/generator.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::cmp::min;
use std::ffi::OsStr;
use std::io;
use std::io::Read;
use std::ops::{Deref, DerefMut};
use std::os::unix::ffi::OsStrExt;
use std::os::unix::fs::MetadataExt;
Expand All @@ -9,7 +11,7 @@ use std::sync::Arc;
use eyre::Result;
use futures::{stream, StreamExt};
use tokio::fs::File;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::io::AsyncWriteExt;
use tokio::net::tcp::OwnedWriteHalf;
use tokio::sync::{mpsc, Semaphore};
use tracing::{debug, info};
Expand Down Expand Up @@ -128,31 +130,46 @@ impl Generator {

Ok(())
}
async fn generate_and_send_sums(&mut self, mut file: File) -> Result<()> {

#[allow(clippy::cast_sign_loss)] // block_len and checksum_count are always positive.
async fn generate_and_send_sums(&mut self, file: File) -> Result<()> {
let file_len = file.metadata().await?.size();
let sum_head = SumHead::sum_sizes_sqroot(file_len);
sum_head.write_to(&mut **self).await?;

// Sqrt of usize can't be negative.
#[allow(clippy::cast_sign_loss)]
let mut buf = vec![0u8; sum_head.block_len as usize];
let mut remaining = file_len;

for _ in 0..sum_head.checksum_count {
// Sqrt of usize must be in u32 range.
#[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)]
let n1 = min(sum_head.block_len as u64, remaining) as usize;
let buf_slice = &mut buf[..n1];
file.read_exact(buf_slice).await?;

let sum1 = checksum_1(buf_slice);
let sum2 = checksum_2(self.seed, buf_slice);
// The original implementation reads sum1 as i32, but casting it to i32 is a no-op anyway.
self.write_u32_le(sum1).await?;
self.write_all(&sum2).await?;

remaining -= n1 as u64;
}
let mut file = file.into_std().await;
let seed = self.seed;

let sum_bytes = tokio::task::spawn_blocking(move || {
// Sqrt of usize can't be negative.
let mut buf = vec![0u8; sum_head.block_len as usize];
let mut remaining = file_len;

let mut buf_sum = Vec::with_capacity(sum_head.checksum_count as usize * 20);
for _ in 0..sum_head.checksum_count {
// Sqrt of usize must be in u32 range.
#[allow(clippy::cast_possible_truncation)]
let n1 = min(sum_head.block_len as u64, remaining) as usize;
let buf_slice = &mut buf[..n1];
file.read_exact(buf_slice)?;

let sum1 = checksum_1(buf_slice);
let sum2 = checksum_2(seed, buf_slice);
// The original implementation reads sum1 as i32, but casting it to i32 is a no-op anyway.
buf_sum.extend_from_slice(&sum1.to_le_bytes());
buf_sum.extend_from_slice(&sum2);

remaining -= n1 as u64;
}
debug_assert!(
buf_sum.len() <= sum_head.checksum_count as usize * 20,
"pre-allocated buffer is too small"
);
Ok::<_, io::Error>(buf_sum)
})
.await??;

self.write_all(&sum_bytes).await?;

Ok(())
}
Expand Down

0 comments on commit 7185a03

Please sign in to comment.