Skip to content

Commit

Permalink
Optionally prepending s3 compatible storage's key with a hash of the …
Browse files Browse the repository at this point in the history
…key.

The point is to workaround S3 rate limiting.
Since it is based on keys, our ULID naming scheme can lead to hotspot in
the keyspace.

This solution has a downside. External scripts listing files will have a their job multiplied.
For this reason, the prefix cardinality is configurable.

Closes #4824
  • Loading branch information
fulmicoton committed Oct 17, 2024
1 parent e08eb9f commit 8c90035
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 19 deletions.
1 change: 1 addition & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions quickwit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ matches = "0.1.9"
md5 = "0.7"
mime_guess = "2.0.4"
mockall = "0.11"
murmurhash32 = "0.3"
mrecordlog = { git = "https://github.com/quickwit-oss/mrecordlog", rev = "306c0a7" }
new_string_template = "1.5.1"
nom = "7.1.3"
Expand Down
37 changes: 36 additions & 1 deletion quickwit/quickwit-config/src/storage_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use itertools::Itertools;
use quickwit_common::get_bool_from_env;
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, EnumMap};
use tracing::warn;

/// Lists the storage backends supported by Quickwit.
#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)]
Expand Down Expand Up @@ -93,6 +94,9 @@ impl StorageConfigs {
}

pub fn validate(&self) -> anyhow::Result<()> {
for storage_config in self.0.iter() {
storage_config.validate()?;
}
let backends: Vec<StorageBackend> = self
.0
.iter()
Expand Down Expand Up @@ -216,6 +220,14 @@ impl StorageConfig {
_ => None,
}
}

pub fn validate(&self) -> anyhow::Result<()> {
if let StorageConfig::S3(config) = self {
config.validate()
} else {
Ok(())
}
}
}

impl From<AzureStorageConfig> for StorageConfig {
Expand Down Expand Up @@ -313,6 +325,8 @@ impl fmt::Debug for AzureStorageConfig {
}
}

const MAX_S3_HASH_PREFIX_CARDINALITY: usize = 16usize.pow(3);

#[derive(Clone, Default, Eq, PartialEq, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct S3StorageConfig {
Expand All @@ -334,9 +348,30 @@ pub struct S3StorageConfig {
pub disable_multi_object_delete: bool,
#[serde(default)]
pub disable_multipart_upload: bool,
#[serde(default)]
#[serde(skip_serializing_if = "lower_than_2")]
pub hash_prefix_cardinality: usize,
}

fn lower_than_2(n: &usize) -> bool {
*n < 2
}

impl S3StorageConfig {
fn validate(&self) -> anyhow::Result<()> {
if self.hash_prefix_cardinality == 1 {
warn!("A hash prefix of 1 will be ignored");
}
if self.hash_prefix_cardinality > MAX_S3_HASH_PREFIX_CARDINALITY {
anyhow::bail!(
"hash_prefix_cardinality can take values of at most \
{MAX_S3_HASH_PREFIX_CARDINALITY}, currently set to {}",
self.hash_prefix_cardinality
);
}
Ok(())
}

fn apply_flavor(&mut self) {
match self.flavor {
Some(StorageBackendFlavor::DigitalOcean) => {
Expand Down Expand Up @@ -383,7 +418,7 @@ impl S3StorageConfig {
}

impl fmt::Debug for S3StorageConfig {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("S3StorageConfig")
.field("access_key_id", &self.access_key_id)
.field(
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ once_cell = { workspace = true }
pin-project = { workspace = true }
rand = { workspace = true }
regex = { workspace = true }
murmurhash32 = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tantivy = { workspace = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,13 @@ pub struct S3CompatibleObjectStorage {
s3_client: S3Client,
uri: Uri,
bucket: String,
prefix: PathBuf,
prefix: String,
multipart_policy: MultiPartPolicy,
retry_params: RetryParams,
disable_multi_object_delete: bool,
disable_multipart_upload: bool,
// If 0, we don't have any prefix
hash_prefix_cardinality: usize,
}

impl fmt::Debug for S3CompatibleObjectStorage {
Expand All @@ -99,6 +101,7 @@ impl fmt::Debug for S3CompatibleObjectStorage {
.debug_struct("S3CompatibleObjectStorage")
.field("bucket", &self.bucket)
.field("prefix", &self.prefix)
.field("hash_prefix_cardinality", &self.hash_prefix_cardinality)
.finish()
}
}
Expand Down Expand Up @@ -181,19 +184,20 @@ impl S3CompatibleObjectStorage {
s3_client,
uri: uri.clone(),
bucket,
prefix,
prefix: prefix.to_string_lossy().to_string(),
multipart_policy: MultiPartPolicy::default(),
retry_params,
disable_multi_object_delete,
disable_multipart_upload,
hash_prefix_cardinality: s3_storage_config.hash_prefix_cardinality,
})
}

/// Sets a specific for all buckets.
///
/// This method overrides any existing prefix. (It does NOT
/// append the argument to any existing prefix.)
pub fn with_prefix(self, prefix: PathBuf) -> Self {
pub fn with_prefix(self, prefix: String) -> Self {
Self {
s3_client: self.s3_client,
uri: self.uri,
Expand All @@ -203,6 +207,7 @@ impl S3CompatibleObjectStorage {
retry_params: self.retry_params,
disable_multi_object_delete: self.disable_multi_object_delete,
disable_multipart_upload: self.disable_multipart_upload,
hash_prefix_cardinality: self.hash_prefix_cardinality,
}
}

Expand Down Expand Up @@ -262,12 +267,49 @@ async fn compute_md5<T: AsyncRead + std::marker::Unpin>(mut read: T) -> io::Resu
}
}
}
const HEX_ALPHABET: [u8; 16] = *b"0123456789abcdef";
const UNINITIALIZED_HASH_PREFIX: &str = "00000000";

fn build_key(prefix: &str, relative_path: &str, hash_prefix_cardinality: usize) -> String {
let mut key = String::with_capacity(
UNINITIALIZED_HASH_PREFIX.len() + 1 + prefix.len() + 1 + relative_path.len(),
);
if hash_prefix_cardinality > 1 {
key.push_str(UNINITIALIZED_HASH_PREFIX);
key.push('/');
}
key.push_str(prefix);
if key.as_bytes().last().copied() != Some(b'/') {
key.push('/');
}
key.push_str(relative_path);
// We then set up the prefix.
if hash_prefix_cardinality > 1 {
let key_without_prefix = &key.as_bytes()[UNINITIALIZED_HASH_PREFIX.len() + 1..];
let mut prefix_hash: usize =
murmurhash32::murmurhash3(key_without_prefix) as usize % hash_prefix_cardinality;
unsafe {
let prefix_buf: &mut [u8] = &mut key.as_bytes_mut()[..UNINITIALIZED_HASH_PREFIX.len()];
for prefix_byte in prefix_buf {
let hex: u8 = HEX_ALPHABET[(prefix_hash % 16) as usize];
*prefix_byte = hex;
if prefix_hash < 16 {
break;
}
prefix_hash /= 16;
}
}
}
key
}

impl S3CompatibleObjectStorage {
fn key(&self, relative_path: &Path) -> String {
// FIXME: This may not work on Windows.
let key_path = self.prefix.join(relative_path);
key_path.to_string_lossy().to_string()
build_key(
&self.prefix,
relative_path.to_string_lossy().as_ref(),
self.hash_prefix_cardinality,
)
}

fn relative_path(&self, key: &str) -> PathBuf {
Expand Down Expand Up @@ -945,13 +987,13 @@ mod tests {
let s3_client = S3Client::new(&sdk_config);
let uri = Uri::for_test("s3://bucket/indexes");
let bucket = "bucket".to_string();
let prefix = PathBuf::new();

let mut s3_storage = S3CompatibleObjectStorage {
s3_client,
uri,
bucket,
prefix,
prefix: String::new(),
hash_prefix_cardinality: 0,
multipart_policy: MultiPartPolicy::default(),
retry_params: RetryParams::for_test(),
disable_multi_object_delete: false,
Expand All @@ -962,7 +1004,7 @@ mod tests {
PathBuf::from("indexes/foo")
);

s3_storage.prefix = PathBuf::from("indexes");
s3_storage.prefix = "indexes".to_string();

assert_eq!(
s3_storage.relative_path("indexes/foo"),
Expand Down Expand Up @@ -1000,13 +1042,13 @@ mod tests {
let s3_client = S3Client::from_conf(config);
let uri = Uri::for_test("s3://bucket/indexes");
let bucket = "bucket".to_string();
let prefix = PathBuf::new();

let s3_storage = S3CompatibleObjectStorage {
s3_client,
uri,
bucket,
prefix,
prefix: String::new(),
hash_prefix_cardinality: 0,
multipart_policy: MultiPartPolicy::default(),
retry_params: RetryParams::for_test(),
disable_multi_object_delete: true,
Expand Down Expand Up @@ -1041,13 +1083,13 @@ mod tests {
let s3_client = S3Client::from_conf(config);
let uri = Uri::for_test("s3://bucket/indexes");
let bucket = "bucket".to_string();
let prefix = PathBuf::new();

let s3_storage = S3CompatibleObjectStorage {
s3_client,
uri,
bucket,
prefix,
prefix: String::new(),
hash_prefix_cardinality: 0,
multipart_policy: MultiPartPolicy::default(),
retry_params: RetryParams::for_test(),
disable_multi_object_delete: false,
Expand Down Expand Up @@ -1123,13 +1165,13 @@ mod tests {
let s3_client = S3Client::from_conf(config);
let uri = Uri::for_test("s3://bucket/indexes");
let bucket = "bucket".to_string();
let prefix = PathBuf::new();

let s3_storage = S3CompatibleObjectStorage {
s3_client,
uri,
bucket,
prefix,
prefix: String::new(),
hash_prefix_cardinality: 0,
multipart_policy: MultiPartPolicy::default(),
retry_params: RetryParams::for_test(),
disable_multi_object_delete: false,
Expand Down Expand Up @@ -1216,13 +1258,13 @@ mod tests {
let s3_client = S3Client::from_conf(config);
let uri = Uri::for_test("s3://bucket/indexes");
let bucket = "bucket".to_string();
let prefix = PathBuf::new();

let s3_storage = S3CompatibleObjectStorage {
s3_client,
uri,
bucket,
prefix,
prefix: String::new(),
hash_prefix_cardinality: 0,
multipart_policy: MultiPartPolicy::default(),
retry_params: RetryParams::for_test(),
disable_multi_object_delete: false,
Expand All @@ -1233,4 +1275,19 @@ mod tests {
.await
.unwrap();
}

#[test]
fn test_build_key() {
assert_eq!(build_key("hello", "coucou", 0), "hello/coucou");
assert_eq!(build_key("hello/", "coucou", 0), "hello/coucou");
assert_eq!(build_key("hello/", "coucou", 1), "hello/coucou");
assert_eq!(build_key("hello", "coucou", 1), "hello/coucou");
assert_eq!(build_key("hello/", "coucou", 2), "10000000/hello/coucou");
assert_eq!(build_key("hello", "coucou", 2), "10000000/hello/coucou");
assert_eq!(build_key("hello/", "coucou", 16), "d0000000/hello/coucou");
assert_eq!(build_key("hello", "coucou", 16), "d0000000/hello/coucou");
assert_eq!(build_key("hello/", "coucou", 17), "50000000/hello/coucou");
assert_eq!(build_key("hello", "coucou", 17), "50000000/hello/coucou");
assert_eq!(build_key("hello/", "coucou", 70), "f0000000/hello/coucou");
}
}
2 changes: 1 addition & 1 deletion quickwit/quickwit-storage/tests/s3_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ pub mod s3_storage_test_suite {
S3CompatibleObjectStorage::from_uri(&s3_storage_config, &storage_uri)
.await
.unwrap()
.with_prefix(PathBuf::from("test-s3-compatible-storage"));
.with_prefix("test-s3-compatible-storage".to_string());

quickwit_storage::storage_test_single_part_upload(&mut object_storage)
.await
Expand Down

0 comments on commit 8c90035

Please sign in to comment.