Skip to content

Commit

Permalink
Fix flow-control window
Browse files Browse the repository at this point in the history
Signed-off-by: Alessandro Passaro <alexpax@amazon.co.uk>
  • Loading branch information
passaro committed Sep 25, 2024
1 parent ce7183e commit b5b3846
Showing 1 changed file with 15 additions and 8 deletions.
23 changes: 15 additions & 8 deletions mountpoint-s3/src/data_cache/express_data_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use async_trait::async_trait;
use bytes::BytesMut;
use futures::{pin_mut, StreamExt};
use mountpoint_s3_client::error::{GetObjectError, ObjectClientError};
use mountpoint_s3_client::types::PutObjectParams;
use mountpoint_s3_client::types::{GetObjectRequest, PutObjectParams};
use mountpoint_s3_client::{ObjectClient, PutObjectRequest};
use sha2::{Digest, Sha256};
use tracing::Instrument;
Expand Down Expand Up @@ -87,6 +87,10 @@ where
return Err(DataCacheError::InvalidBlockOffset);
}
buffer.extend_from_slice(&body);

// Ensure the flow-control window is large enough.
// TODO: review if/when we add a header to the block.
result.as_mut().increment_read_window(self.block_size as usize);
}
let buffer = buffer.freeze();
DataCacheResult::Ok(Some(buffer.into()))
Expand Down Expand Up @@ -140,17 +144,16 @@ mod tests {
use crate::checksums::ChecksummedBytes;
use crate::sync::Arc;

use test_case::test_case;

use mountpoint_s3_client::mock_client::{MockClient, MockClientConfig};
use mountpoint_s3_client::types::ETag;

#[test_case(1024, 512 * 1024; "block_size smaller than part_size")]
#[test_case(8 * 1024 * 1024, 512 * 1024; "block_size larger than part_size")]
#[tokio::test]
async fn test_put_get() {
let data_1 = ChecksummedBytes::new("Foo".into());
let data_2 = ChecksummedBytes::new("Bar".into());
let data_3 = ChecksummedBytes::new("Baz".into());

async fn test_put_get(part_size: usize, block_size: u64) {
let bucket = "test-bucket";
let part_size = 8 * 1024 * 1024;
let config = MockClientConfig {
bucket: bucket.to_string(),
part_size,
Expand All @@ -160,8 +163,12 @@ mod tests {
};
let client = Arc::new(MockClient::new(config));

let block_size = 512 * 1024;
let cache = ExpressDataCache::new(bucket, client, "unique source description", block_size);

let data_1 = ChecksummedBytes::new("Foo".into());
let data_2 = ChecksummedBytes::new("Bar".into());
let data_3 = ChecksummedBytes::new("a".repeat(block_size as usize).into());

let cache_key_1 = ObjectId::new("a".into(), ETag::for_tests());
let cache_key_2 = ObjectId::new(
"longkey_".repeat(128), // 1024 bytes, max length for S3 keys
Expand Down

0 comments on commit b5b3846

Please sign in to comment.