Skip to content

Commit

Permalink
feat(iceberg): support iceberg on gcs (#19999)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 authored Jan 6, 2025
1 parent 1d6a421 commit aa3df8e
Show file tree
Hide file tree
Showing 7 changed files with 153 additions and 124 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ icelake = { git = "https://github.com/risingwavelabs/icelake.git", rev = "0ec44f
"prometheus",
] }
# branch dev_rebase_main_20241230
iceberg = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "683fb89edeaf8d1baae69e1f376d68b92be1d496" }
iceberg = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "683fb89edeaf8d1baae69e1f376d68b92be1d496", features = ["storage-s3", "storage-gcs"] }
iceberg-catalog-rest = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "683fb89edeaf8d1baae69e1f376d68b92be1d496" }
iceberg-catalog-glue = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "683fb89edeaf8d1baae69e1f376d68b92be1d496" }
opendal = "0.49"
Expand Down
5 changes: 5 additions & 0 deletions java/connector-node/risingwave-sink-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@
<artifactId>iceberg-aws</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-gcp</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
Expand Down
140 changes: 98 additions & 42 deletions src/connector/src/connector_common/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@ use ::iceberg::spec::TableMetadata;
use ::iceberg::table::Table;
use ::iceberg::{Catalog, TableIdent};
use anyhow::{anyhow, Context};
use iceberg::io::{GCS_CREDENTIALS_JSON, GCS_DISABLE_CONFIG_LOAD, S3_DISABLE_CONFIG_LOAD};
use iceberg_catalog_glue::{AWS_ACCESS_KEY_ID, AWS_REGION_NAME, AWS_SECRET_ACCESS_KEY};
use risingwave_common::bail;
use serde_derive::Deserialize;
use serde_with::serde_as;
use url::Url;
use with_options::WithOptions;

use crate::connector_common::iceberg::storage_catalog::StorageCatalogConfig;
use crate::deserialize_optional_bool_from_string;
use crate::error::ConnectorResult;

Expand All @@ -48,6 +50,10 @@ pub struct IcebergCommon {
pub access_key: Option<String>,
#[serde(rename = "s3.secret.key")]
pub secret_key: Option<String>,

#[serde(rename = "gcs.credential")]
pub gcs_credential: Option<String>,

/// Path of iceberg warehouse, only applicable in storage catalog.
#[serde(rename = "warehouse.path")]
pub warehouse_path: Option<String>,
Expand Down Expand Up @@ -130,6 +136,9 @@ impl IcebergCommon {
if let Some(secret_key) = &self.secret_key {
iceberg_configs.insert(S3_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
}
if let Some(gcs_credential) = &self.gcs_credential {
iceberg_configs.insert(GCS_CREDENTIALS_JSON.to_owned(), gcs_credential.clone());
}

match &self.warehouse_path {
Some(warehouse_path) => {
Expand Down Expand Up @@ -169,8 +178,12 @@ impl IcebergCommon {
}
let enable_config_load = self.enable_config_load.unwrap_or(false);
iceberg_configs.insert(
// TODO: `disable_config_load` is outdated.
"disable_config_load".to_owned(),
S3_DISABLE_CONFIG_LOAD.to_owned(),
(!enable_config_load).to_string(),
);

iceberg_configs.insert(
GCS_DISABLE_CONFIG_LOAD.to_owned(),
(!enable_config_load).to_string(),
);

Expand Down Expand Up @@ -288,37 +301,62 @@ impl IcebergCommon {
) -> ConnectorResult<Arc<dyn Catalog>> {
match self.catalog_type() {
"storage" => {
let config =
storage_catalog::StorageCatalogConfig::builder()
.warehouse(self.warehouse_path.clone().ok_or_else(|| {
anyhow!("`warehouse.path` must be set in storage catalog")
})?)
.access_key(self.access_key.clone().ok_or_else(|| {
anyhow!("`s3.access.key` must be set in storage catalog")
})?)
.secret_key(self.secret_key.clone().ok_or_else(|| {
anyhow!("`s3.secret.key` must be set in storage catalog")
})?)
.region(self.region.clone())
.endpoint(self.endpoint.clone())
.build();
let warehouse = self
.warehouse_path
.clone()
.ok_or_else(|| anyhow!("`warehouse.path` must be set in storage catalog"))?;
let url = Url::parse(warehouse.as_ref())
.map_err(|_| anyhow!("Invalid warehouse path: {}", warehouse))?;

let config = match url.scheme() {
"s3" | "s3a" => StorageCatalogConfig::S3(
storage_catalog::StorageCatalogS3Config::builder()
.warehouse(warehouse)
.access_key(self.access_key.clone().ok_or_else(|| {
anyhow!("`s3.access.key` must be set in storage catalog")
})?)
.secret_key(self.secret_key.clone().ok_or_else(|| {
anyhow!("`s3.secret.key` must be set in storage catalog")
})?)
.region(self.region.clone())
.endpoint(self.endpoint.clone())
.build(),
),
"gs" | "gcs" => StorageCatalogConfig::Gcs(
storage_catalog::StorageCatalogGcsConfig::builder()
.warehouse(warehouse)
.credential(self.gcs_credential.clone().ok_or_else(|| {
anyhow!("`gcs.credential` must be set in storage catalog")
})?)
.build(),
),
scheme => bail!("Unsupported warehouse scheme: {}", scheme),
};

let catalog = storage_catalog::StorageCatalog::new(config)?;
Ok(Arc::new(catalog))
}
"rest_rust" => {
let mut iceberg_configs = HashMap::new();
if let Some(region) = &self.region {
iceberg_configs.insert(S3_REGION.to_owned(), region.clone());
}
if let Some(endpoint) = &self.endpoint {
iceberg_configs.insert(S3_ENDPOINT.to_owned(), endpoint.clone());
}
if let Some(access_key) = &self.access_key {
iceberg_configs.insert(S3_ACCESS_KEY_ID.to_owned(), access_key.clone());
}
if let Some(secret_key) = &self.secret_key {
iceberg_configs.insert(S3_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
}

// check gcs credential or s3 access key and secret key
if let Some(gcs_credential) = &self.gcs_credential {
iceberg_configs.insert(GCS_CREDENTIALS_JSON.to_owned(), gcs_credential.clone());
} else {
if let Some(region) = &self.region {
iceberg_configs.insert(S3_REGION.to_owned(), region.clone());
}
if let Some(endpoint) = &self.endpoint {
iceberg_configs.insert(S3_ENDPOINT.to_owned(), endpoint.clone());
}
if let Some(access_key) = &self.access_key {
iceberg_configs.insert(S3_ACCESS_KEY_ID.to_owned(), access_key.clone());
}
if let Some(secret_key) = &self.secret_key {
iceberg_configs.insert(S3_SECRET_ACCESS_KEY.to_owned(), secret_key.clone());
}
};

if let Some(credential) = &self.credential {
iceberg_configs.insert("credential".to_owned(), credential.clone());
}
Expand Down Expand Up @@ -442,20 +480,38 @@ impl IcebergCommon {
) -> ConnectorResult<Table> {
match self.catalog_type() {
"storage" => {
let config =
storage_catalog::StorageCatalogConfig::builder()
.warehouse(self.warehouse_path.clone().ok_or_else(|| {
anyhow!("`warehouse.path` must be set in storage catalog")
})?)
.access_key(self.access_key.clone().ok_or_else(|| {
anyhow!("`s3.access.key` must be set in storage catalog")
})?)
.secret_key(self.secret_key.clone().ok_or_else(|| {
anyhow!("`s3.secret.key` must be set in storage catalog")
})?)
.region(self.region.clone())
.endpoint(self.endpoint.clone())
.build();
let warehouse = self
.warehouse_path
.clone()
.ok_or_else(|| anyhow!("`warehouse.path` must be set in storage catalog"))?;
let url = Url::parse(warehouse.as_ref())
.map_err(|_| anyhow!("Invalid warehouse path: {}", warehouse))?;

let config = match url.scheme() {
"s3" | "s3a" => StorageCatalogConfig::S3(
storage_catalog::StorageCatalogS3Config::builder()
.warehouse(warehouse)
.access_key(self.access_key.clone().ok_or_else(|| {
anyhow!("`s3.access.key` must be set in storage catalog")
})?)
.secret_key(self.secret_key.clone().ok_or_else(|| {
anyhow!("`s3.secret.key` must be set in storage catalog")
})?)
.region(self.region.clone())
.endpoint(self.endpoint.clone())
.build(),
),
"gs" | "gcs" => StorageCatalogConfig::Gcs(
storage_catalog::StorageCatalogGcsConfig::builder()
.warehouse(warehouse)
.credential(self.gcs_credential.clone().ok_or_else(|| {
anyhow!("`gcs.credential` must be set in storage catalog")
})?)
.build(),
),
scheme => bail!("Unsupported warehouse scheme: {}", scheme),
};

let storage_catalog = storage_catalog::StorageCatalog::new(config)?;

let table_id = self
Expand Down
123 changes: 42 additions & 81 deletions src/connector/src/connector_common/iceberg/storage_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,56 +17,73 @@
use std::collections::HashMap;

use async_trait::async_trait;
use iceberg::io::{FileIO, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY};
use iceberg::io::{
FileIO, GCS_CREDENTIALS_JSON, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY,
};
use iceberg::spec::{TableMetadata, TableMetadataBuilder};
use iceberg::table::Table;
use iceberg::{
Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, TableCreation,
TableIdent,
};
use opendal::Operator;
use thiserror_ext::AsReport;
use tokio_stream::StreamExt;
use typed_builder::TypedBuilder;

#[derive(Debug)]
pub enum StorageCatalogConfig {
S3(StorageCatalogS3Config),
Gcs(StorageCatalogGcsConfig),
}

#[derive(Clone, Debug, TypedBuilder)]
pub struct StorageCatalogConfig {
pub struct StorageCatalogS3Config {
warehouse: String,
access_key: String,
secret_key: String,
endpoint: Option<String>,
region: Option<String>,
}

#[derive(Clone, Debug, TypedBuilder)]
pub struct StorageCatalogGcsConfig {
warehouse: String,
credential: String,
}

/// File system catalog.
#[derive(Debug)]
pub struct StorageCatalog {
warehouse: String,
config: StorageCatalogConfig,
file_io: FileIO,
}

impl StorageCatalog {
pub fn new(config: StorageCatalogConfig) -> Result<Self> {
let mut file_io_builder = FileIO::from_path(&config.warehouse)?
.with_prop(S3_ACCESS_KEY_ID, &config.access_key)
.with_prop(S3_SECRET_ACCESS_KEY, &config.secret_key);
file_io_builder = if let Some(endpoint) = &config.endpoint {
file_io_builder.with_prop(S3_ENDPOINT, endpoint)
} else {
file_io_builder
};
file_io_builder = if let Some(region) = &config.region {
file_io_builder.with_prop(S3_REGION, region)
} else {
file_io_builder
let (warehouse, file_io) = match config {
StorageCatalogConfig::S3(config) => {
let mut file_io_builder = FileIO::from_path(&config.warehouse)?
.with_prop(S3_ACCESS_KEY_ID, &config.access_key)
.with_prop(S3_SECRET_ACCESS_KEY, &config.secret_key);
file_io_builder = if let Some(endpoint) = &config.endpoint {
file_io_builder.with_prop(S3_ENDPOINT, endpoint)
} else {
file_io_builder
};
file_io_builder = if let Some(region) = &config.region {
file_io_builder.with_prop(S3_REGION, region)
} else {
file_io_builder
};
(config.warehouse.clone(), file_io_builder.build()?)
}
StorageCatalogConfig::Gcs(config) => {
let file_io_builder = FileIO::from_path(&config.warehouse)?
.with_prop(GCS_CREDENTIALS_JSON, &config.credential);
(config.warehouse.clone(), file_io_builder.build()?)
}
};

Ok(StorageCatalog {
warehouse: config.warehouse.clone(),
config,
file_io: file_io_builder.build()?,
})
Ok(StorageCatalog { warehouse, file_io })
}

/// Check if version hint file exist.
Expand Down Expand Up @@ -108,60 +125,6 @@ impl StorageCatalog {
.map_err(|_| Error::new(ErrorKind::DataInvalid, "parse version hint failed"))
}

/// List all paths of table metadata files.
///
/// The returned paths are sorted by name.
///
/// TODO: we can improve this by only fetch the latest metadata.
///
/// `table_path`: relative path of table dir under warehouse root.
async fn list_table_metadata_paths(&self, table_path: &str) -> Result<Vec<String>> {
// create s3 operator
let mut builder = opendal::services::S3::default()
.root(&self.warehouse)
.access_key_id(&self.config.access_key)
.secret_access_key(&self.config.secret_key);
if let Some(endpoint) = &self.config.endpoint {
builder = builder.endpoint(endpoint);
}
if let Some(region) = &self.config.region {
builder = builder.region(region);
}
let op: Operator = Operator::new(builder)
.map_err(|err| Error::new(ErrorKind::Unexpected, err.to_report_string()))?
.finish();

// list metadata files
let mut lister = op
.lister(format!("{table_path}/metadata/").as_str())
.await
.map_err(|err| {
Error::new(
ErrorKind::Unexpected,
format!("list metadata failed: {}", err.as_report()),
)
})?;
let mut paths = vec![];
while let Some(entry) = lister.next().await {
let entry = entry.map_err(|err| {
Error::new(
ErrorKind::Unexpected,
format!("list metadata entry failed: {}", err.as_report()),
)
})?;

// Only push into paths if the entry is a metadata file.
if entry.path().ends_with(".metadata.json") {
paths.push(entry.path().to_owned());
}
}

// Make the returned paths sorted by name.
paths.sort();

Ok(paths)
}

pub fn file_io(&self) -> &FileIO {
&self.file_io
}
Expand Down Expand Up @@ -303,12 +266,10 @@ impl Catalog for StorageCatalog {
let version_hint = self.read_version_hint(&table_path).await?;
format!("{table_path}/metadata/v{}.metadata.json", version_hint)
} else {
let files = self.list_table_metadata_paths(&table_path).await?;

files.into_iter().last().ok_or(Error::new(
return Err(Error::new(
ErrorKind::DataInvalid,
"no table metadata found",
))?
"no version hint found for table",
));
};

let metadata_file = self.file_io.new_input(path)?;
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/sink/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1430,6 +1430,7 @@ mod test {
endpoint: Some("http://127.0.0.1:9301".to_owned()),
access_key: Some("hummockadmin".to_owned()),
secret_key: Some("hummockadmin".to_owned()),
gcs_credential: None,
catalog_type: Some("jdbc".to_owned()),
catalog_name: Some("demo".to_owned()),
database_name: Some("demo_db".to_owned()),
Expand Down
Loading

0 comments on commit aa3df8e

Please sign in to comment.