Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optionally prepending s3 compatible storage's key with a hash of the … #5495

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions quickwit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ matches = "0.1.9"
md5 = "0.7"
mime_guess = "2.0.4"
mockall = "0.11"
murmurhash32 = "0.3"
mrecordlog = { git = "https://github.com/quickwit-oss/mrecordlog", rev = "306c0a7" }
new_string_template = "1.5.1"
nom = "7.1.3"
Expand Down
37 changes: 36 additions & 1 deletion quickwit/quickwit-config/src/storage_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use itertools::Itertools;
use quickwit_common::get_bool_from_env;
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, EnumMap};
use tracing::warn;

/// Lists the storage backends supported by Quickwit.
#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)]
Expand Down Expand Up @@ -93,6 +94,9 @@ impl StorageConfigs {
}

pub fn validate(&self) -> anyhow::Result<()> {
for storage_config in self.0.iter() {
storage_config.validate()?;
}
let backends: Vec<StorageBackend> = self
.0
.iter()
Expand Down Expand Up @@ -216,6 +220,14 @@ impl StorageConfig {
_ => None,
}
}

pub fn validate(&self) -> anyhow::Result<()> {
if let StorageConfig::S3(config) = self {
config.validate()
} else {
Ok(())
}
}
}

impl From<AzureStorageConfig> for StorageConfig {
Expand Down Expand Up @@ -313,6 +325,8 @@ impl fmt::Debug for AzureStorageConfig {
}
}

const MAX_S3_HASH_PREFIX_CARDINALITY: usize = 16usize.pow(3);

#[derive(Clone, Default, Eq, PartialEq, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct S3StorageConfig {
Expand All @@ -334,9 +348,30 @@ pub struct S3StorageConfig {
pub disable_multi_object_delete: bool,
#[serde(default)]
pub disable_multipart_upload: bool,
#[serde(default)]
#[serde(skip_serializing_if = "lower_than_2")]
pub hash_prefix_cardinality: usize,
}

fn lower_than_2(n: &usize) -> bool {
*n < 2
}

impl S3StorageConfig {
fn validate(&self) -> anyhow::Result<()> {
if self.hash_prefix_cardinality == 1 {
warn!("A hash prefix of 1 will be ignored");
}
if self.hash_prefix_cardinality > MAX_S3_HASH_PREFIX_CARDINALITY {
anyhow::bail!(
"hash_prefix_cardinality can take values of at most \
{MAX_S3_HASH_PREFIX_CARDINALITY}, currently set to {}",
self.hash_prefix_cardinality
);
}
Ok(())
}

fn apply_flavor(&mut self) {
match self.flavor {
Some(StorageBackendFlavor::DigitalOcean) => {
Expand Down Expand Up @@ -383,7 +418,7 @@ impl S3StorageConfig {
}

impl fmt::Debug for S3StorageConfig {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("S3StorageConfig")
.field("access_key_id", &self.access_key_id)
.field(
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ once_cell = { workspace = true }
pin-project = { workspace = true }
rand = { workspace = true }
regex = { workspace = true }
murmurhash32 = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tantivy = { workspace = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,13 @@ pub struct S3CompatibleObjectStorage {
s3_client: S3Client,
uri: Uri,
bucket: String,
prefix: PathBuf,
prefix: String,
multipart_policy: MultiPartPolicy,
retry_params: RetryParams,
disable_multi_object_delete: bool,
disable_multipart_upload: bool,
// If 0, we don't have any prefix
hash_prefix_cardinality: usize,
}

impl fmt::Debug for S3CompatibleObjectStorage {
Expand All @@ -99,6 +101,7 @@ impl fmt::Debug for S3CompatibleObjectStorage {
.debug_struct("S3CompatibleObjectStorage")
.field("bucket", &self.bucket)
.field("prefix", &self.prefix)
.field("hash_prefix_cardinality", &self.hash_prefix_cardinality)
.finish()
}
}
Expand Down Expand Up @@ -181,19 +184,20 @@ impl S3CompatibleObjectStorage {
s3_client,
uri: uri.clone(),
bucket,
prefix,
prefix: prefix.to_string_lossy().to_string(),
multipart_policy: MultiPartPolicy::default(),
retry_params,
disable_multi_object_delete,
disable_multipart_upload,
hash_prefix_cardinality: s3_storage_config.hash_prefix_cardinality,
})
}

/// Sets a specific for all buckets.
///
/// This method overrides any existing prefix. (It does NOT
/// append the argument to any existing prefix.)
pub fn with_prefix(self, prefix: PathBuf) -> Self {
pub fn with_prefix(self, prefix: String) -> Self {
Self {
s3_client: self.s3_client,
uri: self.uri,
Expand All @@ -203,6 +207,7 @@ impl S3CompatibleObjectStorage {
retry_params: self.retry_params,
disable_multi_object_delete: self.disable_multi_object_delete,
disable_multipart_upload: self.disable_multipart_upload,
hash_prefix_cardinality: self.hash_prefix_cardinality,
}
}

Expand Down Expand Up @@ -262,12 +267,49 @@ async fn compute_md5<T: AsyncRead + std::marker::Unpin>(mut read: T) -> io::Resu
}
}
}
const HEX_ALPHABET: [u8; 16] = *b"0123456789abcdef";
const UNINITIALIZED_HASH_PREFIX: &str = "00000000";

fn build_key(prefix: &str, relative_path: &str, hash_prefix_cardinality: usize) -> String {
let mut key = String::with_capacity(
UNINITIALIZED_HASH_PREFIX.len() + 1 + prefix.len() + 1 + relative_path.len(),
);
if hash_prefix_cardinality > 1 {
key.push_str(UNINITIALIZED_HASH_PREFIX);
key.push('/');
}
key.push_str(prefix);
if key.as_bytes().last().copied() != Some(b'/') {
key.push('/');
}
key.push_str(relative_path);
// We then set up the prefix.
if hash_prefix_cardinality > 1 {
let key_without_prefix = &key.as_bytes()[UNINITIALIZED_HASH_PREFIX.len() + 1..];
let mut prefix_hash: usize =
murmurhash32::murmurhash3(key_without_prefix) as usize % hash_prefix_cardinality;
unsafe {
let prefix_buf: &mut [u8] = &mut key.as_bytes_mut()[..UNINITIALIZED_HASH_PREFIX.len()];
for prefix_byte in prefix_buf {
let hex: u8 = HEX_ALPHABET[(prefix_hash % 16) as usize];
*prefix_byte = hex;
if prefix_hash < 16 {
break;
}
prefix_hash /= 16;
}
}
}
key
}

impl S3CompatibleObjectStorage {
fn key(&self, relative_path: &Path) -> String {
// FIXME: This may not work on Windows.
let key_path = self.prefix.join(relative_path);
key_path.to_string_lossy().to_string()
build_key(
&self.prefix,
relative_path.to_string_lossy().as_ref(),
self.hash_prefix_cardinality,
)
}

fn relative_path(&self, key: &str) -> PathBuf {
Expand Down Expand Up @@ -945,13 +987,13 @@ mod tests {
let s3_client = S3Client::new(&sdk_config);
let uri = Uri::for_test("s3://bucket/indexes");
let bucket = "bucket".to_string();
let prefix = PathBuf::new();

let mut s3_storage = S3CompatibleObjectStorage {
s3_client,
uri,
bucket,
prefix,
prefix: String::new(),
hash_prefix_cardinality: 0,
multipart_policy: MultiPartPolicy::default(),
retry_params: RetryParams::for_test(),
disable_multi_object_delete: false,
Expand All @@ -962,7 +1004,7 @@ mod tests {
PathBuf::from("indexes/foo")
);

s3_storage.prefix = PathBuf::from("indexes");
s3_storage.prefix = "indexes".to_string();

assert_eq!(
s3_storage.relative_path("indexes/foo"),
Expand Down Expand Up @@ -1000,13 +1042,13 @@ mod tests {
let s3_client = S3Client::from_conf(config);
let uri = Uri::for_test("s3://bucket/indexes");
let bucket = "bucket".to_string();
let prefix = PathBuf::new();

let s3_storage = S3CompatibleObjectStorage {
s3_client,
uri,
bucket,
prefix,
prefix: String::new(),
hash_prefix_cardinality: 0,
multipart_policy: MultiPartPolicy::default(),
retry_params: RetryParams::for_test(),
disable_multi_object_delete: true,
Expand Down Expand Up @@ -1041,13 +1083,13 @@ mod tests {
let s3_client = S3Client::from_conf(config);
let uri = Uri::for_test("s3://bucket/indexes");
let bucket = "bucket".to_string();
let prefix = PathBuf::new();

let s3_storage = S3CompatibleObjectStorage {
s3_client,
uri,
bucket,
prefix,
prefix: String::new(),
hash_prefix_cardinality: 0,
multipart_policy: MultiPartPolicy::default(),
retry_params: RetryParams::for_test(),
disable_multi_object_delete: false,
Expand Down Expand Up @@ -1123,13 +1165,13 @@ mod tests {
let s3_client = S3Client::from_conf(config);
let uri = Uri::for_test("s3://bucket/indexes");
let bucket = "bucket".to_string();
let prefix = PathBuf::new();

let s3_storage = S3CompatibleObjectStorage {
s3_client,
uri,
bucket,
prefix,
prefix: String::new(),
hash_prefix_cardinality: 0,
multipart_policy: MultiPartPolicy::default(),
retry_params: RetryParams::for_test(),
disable_multi_object_delete: false,
Expand Down Expand Up @@ -1216,13 +1258,13 @@ mod tests {
let s3_client = S3Client::from_conf(config);
let uri = Uri::for_test("s3://bucket/indexes");
let bucket = "bucket".to_string();
let prefix = PathBuf::new();

let s3_storage = S3CompatibleObjectStorage {
s3_client,
uri,
bucket,
prefix,
prefix: String::new(),
hash_prefix_cardinality: 0,
multipart_policy: MultiPartPolicy::default(),
retry_params: RetryParams::for_test(),
disable_multi_object_delete: false,
Expand All @@ -1233,4 +1275,19 @@ mod tests {
.await
.unwrap();
}

#[test]
fn test_build_key() {
assert_eq!(build_key("hello", "coucou", 0), "hello/coucou");
assert_eq!(build_key("hello/", "coucou", 0), "hello/coucou");
assert_eq!(build_key("hello/", "coucou", 1), "hello/coucou");
assert_eq!(build_key("hello", "coucou", 1), "hello/coucou");
assert_eq!(build_key("hello/", "coucou", 2), "10000000/hello/coucou");
assert_eq!(build_key("hello", "coucou", 2), "10000000/hello/coucou");
assert_eq!(build_key("hello/", "coucou", 16), "d0000000/hello/coucou");
assert_eq!(build_key("hello", "coucou", 16), "d0000000/hello/coucou");
assert_eq!(build_key("hello/", "coucou", 17), "50000000/hello/coucou");
assert_eq!(build_key("hello", "coucou", 17), "50000000/hello/coucou");
assert_eq!(build_key("hello/", "coucou", 70), "f0000000/hello/coucou");
}
}
2 changes: 1 addition & 1 deletion quickwit/quickwit-storage/tests/s3_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ pub mod s3_storage_test_suite {
S3CompatibleObjectStorage::from_uri(&s3_storage_config, &storage_uri)
.await
.unwrap()
.with_prefix(PathBuf::from("test-s3-compatible-storage"));
.with_prefix("test-s3-compatible-storage".to_string());

quickwit_storage::storage_test_single_part_upload(&mut object_storage)
.await
Expand Down
Loading