Skip to content

Commit

Permalink
Fix merge conflicts
Browse files Browse the repository at this point in the history
Signed-off-by: Simon Beal <simobeal@amazon.com>
  • Loading branch information
muddyfish committed Nov 14, 2024
1 parent 7e441ca commit db3d9a2
Showing 1 changed file with 8 additions and 119 deletions.
127 changes: 8 additions & 119 deletions mountpoint-s3/src/data_cache/express_data_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,114 +38,6 @@ impl Default for ExpressDataCacheConfig {
}
}

/// Metadata about the cached object to ensure that the object we've retrieved is the one we were
/// wanting to get (and avoid collisions with the key).
/// On miss, bypass the cache and go to the main data source.
#[cfg_attr(test, derive(Arbitrary))]
#[derive(Debug, PartialEq, Eq)]
struct BlockHeader {
version: String,
block_idx: BlockIndex,
block_offset: u64,
etag: String,
source_key: String,
source_bucket_name: String,
header_checksum: u32,
}

impl BlockHeader {
pub fn new(block_idx: BlockIndex, block_offset: u64, cache_key: &ObjectId, source_bucket_name: &str) -> Self {
let header_checksum = Self::get_header_checksum(block_idx, block_offset, cache_key, source_bucket_name).value();
Self {
version: CACHE_VERSION.to_string(),
block_idx,
block_offset,
etag: cache_key.etag().as_str().to_string(),
source_key: cache_key.key().to_string(),
source_bucket_name: source_bucket_name.to_string(),
header_checksum,
}
}

/// Get the S3 key this block should be written to or read from.
pub fn to_s3_key(&self, prefix: &str) -> String {
let hashed_cache_key = hex::encode(
Sha256::new()
.chain_update(&self.source_key)
.chain_update(&self.etag)
.finalize(),
);
format!("{}/{}/{:010}", prefix, hashed_cache_key, self.block_idx)
}

/// Convert to object metadata that is HTTP header safe (ASCII only)
pub fn to_headers(&self) -> HashMap<String, String> {
let source_key_encoded = Base64::encode_string(self.source_key.as_bytes());
HashMap::from([
("cache-version".to_string(), self.version.clone()),
("block-idx".to_string(), format!("{}", self.block_idx)),
("block-offset".to_string(), format!("{}", self.block_offset)),
("etag".to_string(), self.etag.clone()),
("source-key".to_string(), source_key_encoded),
("source-bucket-name".to_string(), self.source_bucket_name.clone()),
("header-checksum".to_string(), format!("{}", self.header_checksum)),
])
}

/// Validate the object metadata headers received match this BlockHeader object.
pub fn validate_headers(&self, headers: &HashMap<String, String>) -> Result<(), DataCacheError> {
self.validate_header(headers, "cache-version", |version| version == self.version)?;
self.validate_header(headers, "block-idx", |block_idx| {
block_idx.parse() == Ok(self.block_idx)
})?;
self.validate_header(headers, "block-offset", |block_offset| {
block_offset.parse() == Ok(self.block_offset)
})?;
self.validate_header(headers, "etag", |etag| etag == self.etag)?;
self.validate_header(headers, "source-key", |source_key| {
source_key == Base64::encode_string(self.source_key.as_bytes())
})?;
self.validate_header(headers, "source-bucket-name", |source_bucket_name| {
source_bucket_name == self.source_bucket_name
})?;
self.validate_header(headers, "header-checksum", |header_checksum| {
header_checksum.parse() == Ok(self.header_checksum)
})?;

Ok(())
}

fn validate_header<F: Fn(&str) -> bool>(
&self,
headers: &HashMap<String, String>,
header: &str,
is_valid: F,
) -> Result<(), DataCacheError> {
let value = headers
.get(header)
.ok_or(DataCacheError::InvalidBlockHeader(header.to_string()))?;
is_valid(value)
.then_some(())
.ok_or(DataCacheError::InvalidBlockHeader(header.to_string()))
}

fn get_header_checksum(
block_idx: BlockIndex,
block_offset: u64,
cache_key: &ObjectId,
source_bucket_name: &str,
) -> Crc32c {
let mut hasher = crc32c::Hasher::new();
hasher.update(CACHE_VERSION.as_bytes());
hasher.update(&block_idx.to_be_bytes());
hasher.update(&block_offset.to_be_bytes());
hasher.update(cache_key.etag().as_str().as_bytes());
hasher.update(cache_key.key().as_bytes());
hasher.update(source_bucket_name.as_bytes());
hasher.finalize()
}
}

/// A data cache on S3 Express One Zone that can be shared across Mountpoint instances.
pub struct ExpressDataCache<Client: ObjectClient> {
client: Client,
Expand Down Expand Up @@ -175,13 +67,13 @@ where
pub fn new(client: Client, config: ExpressDataCacheConfig, source_bucket_name: &str, bucket_name: &str) -> Self {
Self {
client,
prefix: Self::build_prefix(source_description, config.block_size),
prefix: Self::build_prefix(source_bucket_name, config.block_size),
config,
bucket_name: bucket_name.to_owned(),
}
}

fn build_prefix(source_description: &str, block_size: u64) -> String {
fn build_prefix(source_bucket_name: &str, block_size: u64) -> String {
hex::encode(
Sha256::new()
.chain_update(CACHE_VERSION.as_bytes())
Expand Down Expand Up @@ -581,18 +473,15 @@ mod tests {
#[tokio::test]
async fn test_get_validate_failure() {
let bucket = "test-bucket";
let part_size = 8 * 1024 * 1024;
let block_size = 512 * 1024;
let config = MockClientConfig {
bucket: bucket.to_string(),
part_size,
part_size: 8 * 1024 * 1024,
enable_backpressure: true,
initial_read_window_size: part_size,
initial_read_window_size: 8 * 1024 * 1024,
..Default::default()
};
let client = Arc::new(MockClient::new(config));

let cache = ExpressDataCache::new(bucket, client.clone(), "unique source description", block_size);
let cache = ExpressDataCache::new(client.clone(), Default::default(), "unique source description", bucket);

let data = ChecksummedBytes::new("Foo".into());
let data_2 = ChecksummedBytes::new("Bar".into());
Expand All @@ -607,7 +496,7 @@ mod tests {
.await
.unwrap();
let err = cache
.get_block(&cache_key, 0, 0)
.get_block(&cache_key, 0, 0, data.len())
.await
.expect_err("cache should return error if checksum isn't present");
assert!(matches!(err, DataCacheError::BlockChecksumMissing));
Expand All @@ -624,7 +513,7 @@ mod tests {
.await
.unwrap();
let err = cache
.get_block(&cache_key, 0, 0)
.get_block(&cache_key, 0, 0, data.len())
.await
.expect_err("cache should return error if object metadata isn't present");
assert!(matches!(err, DataCacheError::InvalidBlockHeader(_)));
Expand All @@ -643,7 +532,7 @@ mod tests {
.await
.unwrap();
let err = cache
.get_block(&cache_key, 0, 0)
.get_block(&cache_key, 0, 0, data.len())
.await
.expect_err("cache should return error if object headers don't match data");
assert!(matches!(err, DataCacheError::InvalidBlockHeader(_)));
Expand Down

0 comments on commit db3d9a2

Please sign in to comment.