From 7185a03b38715cc9615bca5436fc3abae6fcd4cc Mon Sep 17 00:00:00 2001 From: LightQuantum <self@lightquantum.me> Date: Fri, 1 Sep 2023 02:57:45 +0800 Subject: [PATCH] perf(fetcher): generate whole sum payload before send --- rsync-fetcher/src/rsync/generator.rs | 61 ++++++++++++++++++---------- 1 file changed, 39 insertions(+), 22 deletions(-) diff --git a/rsync-fetcher/src/rsync/generator.rs b/rsync-fetcher/src/rsync/generator.rs index 20ed647..d903a42 100644 --- a/rsync-fetcher/src/rsync/generator.rs +++ b/rsync-fetcher/src/rsync/generator.rs @@ -1,5 +1,7 @@ use std::cmp::min; use std::ffi::OsStr; +use std::io; +use std::io::Read; use std::ops::{Deref, DerefMut}; use std::os::unix::ffi::OsStrExt; use std::os::unix::fs::MetadataExt; @@ -9,7 +11,7 @@ use std::sync::Arc; use eyre::Result; use futures::{stream, StreamExt}; use tokio::fs::File; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::io::AsyncWriteExt; use tokio::net::tcp::OwnedWriteHalf; use tokio::sync::{mpsc, Semaphore}; use tracing::{debug, info}; @@ -128,31 +130,46 @@ impl Generator { Ok(()) } - async fn generate_and_send_sums(&mut self, mut file: File) -> Result<()> { + + #[allow(clippy::cast_sign_loss)] // block_len and checksum_count are always positive. + async fn generate_and_send_sums(&mut self, file: File) -> Result<()> { let file_len = file.metadata().await?.size(); let sum_head = SumHead::sum_sizes_sqroot(file_len); sum_head.write_to(&mut **self).await?; - // Sqrt of usize can't be negative. - #[allow(clippy::cast_sign_loss)] - let mut buf = vec![0u8; sum_head.block_len as usize]; - let mut remaining = file_len; - - for _ in 0..sum_head.checksum_count { - // Sqrt of usize must be in u32 range. - #[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)] - let n1 = min(sum_head.block_len as u64, remaining) as usize; - let buf_slice = &mut buf[..n1]; - file.read_exact(buf_slice).await?; - - let sum1 = checksum_1(buf_slice); - let sum2 = checksum_2(self.seed, buf_slice); - // The original implementation reads sum1 as i32, but casting it to i32 is a no-op anyway. - self.write_u32_le(sum1).await?; - self.write_all(&sum2).await?; - - remaining -= n1 as u64; - } + let mut file = file.into_std().await; + let seed = self.seed; + + let sum_bytes = tokio::task::spawn_blocking(move || { + // Sqrt of usize can't be negative. + let mut buf = vec![0u8; sum_head.block_len as usize]; + let mut remaining = file_len; + + let mut buf_sum = Vec::with_capacity(sum_head.checksum_count as usize * 20); + for _ in 0..sum_head.checksum_count { + // Sqrt of usize must be in u32 range. + #[allow(clippy::cast_possible_truncation)] + let n1 = min(sum_head.block_len as u64, remaining) as usize; + let buf_slice = &mut buf[..n1]; + file.read_exact(buf_slice)?; + + let sum1 = checksum_1(buf_slice); + let sum2 = checksum_2(seed, buf_slice); + // The original implementation reads sum1 as i32, but casting it to i32 is a no-op anyway. + buf_sum.extend_from_slice(&sum1.to_le_bytes()); + buf_sum.extend_from_slice(&sum2); + + remaining -= n1 as u64; + } + debug_assert!( + buf_sum.len() <= sum_head.checksum_count as usize * 20, + "pre-allocated buffer is too small" + ); + Ok::<_, io::Error>(buf_sum) + }) + .await??; + + self.write_all(&sum_bytes).await?; Ok(()) }