From aa3df8e775921e46c94976437cedf9597ea263d4 Mon Sep 17 00:00:00 2001 From: Dylan Date: Mon, 6 Jan 2025 18:37:47 +0800 Subject: [PATCH] feat(iceberg): support iceberg on gcs (#19999) --- Cargo.toml | 2 +- .../risingwave-sink-iceberg/pom.xml | 5 + .../src/connector_common/iceberg/mod.rs | 140 ++++++++++++------ .../iceberg/storage_catalog.rs | 123 ++++++--------- src/connector/src/sink/iceberg/mod.rs | 1 + src/connector/with_options_sink.yaml | 3 + src/connector/with_options_source.yaml | 3 + 7 files changed, 153 insertions(+), 124 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index c676c8522c1dd..14038ef0dff87 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -154,7 +154,7 @@ icelake = { git = "https://github.com/risingwavelabs/icelake.git", rev = "0ec44f "prometheus", ] } # branch dev_rebase_main_20241230 -iceberg = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "683fb89edeaf8d1baae69e1f376d68b92be1d496" } +iceberg = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "683fb89edeaf8d1baae69e1f376d68b92be1d496", features = ["storage-s3", "storage-gcs"] } iceberg-catalog-rest = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "683fb89edeaf8d1baae69e1f376d68b92be1d496" } iceberg-catalog-glue = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "683fb89edeaf8d1baae69e1f376d68b92be1d496" } opendal = "0.49" diff --git a/java/connector-node/risingwave-sink-iceberg/pom.xml b/java/connector-node/risingwave-sink-iceberg/pom.xml index 186aceae7e711..49500676896b3 100644 --- a/java/connector-node/risingwave-sink-iceberg/pom.xml +++ b/java/connector-node/risingwave-sink-iceberg/pom.xml @@ -104,6 +104,11 @@ iceberg-aws ${iceberg.version} + + org.apache.iceberg + iceberg-gcp + ${iceberg.version} + software.amazon.awssdk s3 diff --git a/src/connector/src/connector_common/iceberg/mod.rs b/src/connector/src/connector_common/iceberg/mod.rs index e8047e6a33959..ff53d80e89598 100644 --- a/src/connector/src/connector_common/iceberg/mod.rs +++ b/src/connector/src/connector_common/iceberg/mod.rs @@ -23,6 +23,7 @@ use ::iceberg::spec::TableMetadata; use ::iceberg::table::Table; use ::iceberg::{Catalog, TableIdent}; use anyhow::{anyhow, Context}; +use iceberg::io::{GCS_CREDENTIALS_JSON, GCS_DISABLE_CONFIG_LOAD, S3_DISABLE_CONFIG_LOAD}; use iceberg_catalog_glue::{AWS_ACCESS_KEY_ID, AWS_REGION_NAME, AWS_SECRET_ACCESS_KEY}; use risingwave_common::bail; use serde_derive::Deserialize; @@ -30,6 +31,7 @@ use serde_with::serde_as; use url::Url; use with_options::WithOptions; +use crate::connector_common::iceberg::storage_catalog::StorageCatalogConfig; use crate::deserialize_optional_bool_from_string; use crate::error::ConnectorResult; @@ -48,6 +50,10 @@ pub struct IcebergCommon { pub access_key: Option, #[serde(rename = "s3.secret.key")] pub secret_key: Option, + + #[serde(rename = "gcs.credential")] + pub gcs_credential: Option, + /// Path of iceberg warehouse, only applicable in storage catalog. #[serde(rename = "warehouse.path")] pub warehouse_path: Option, @@ -130,6 +136,9 @@ impl IcebergCommon { if let Some(secret_key) = &self.secret_key { iceberg_configs.insert(S3_SECRET_ACCESS_KEY.to_owned(), secret_key.clone()); } + if let Some(gcs_credential) = &self.gcs_credential { + iceberg_configs.insert(GCS_CREDENTIALS_JSON.to_owned(), gcs_credential.clone()); + } match &self.warehouse_path { Some(warehouse_path) => { @@ -169,8 +178,12 @@ impl IcebergCommon { } let enable_config_load = self.enable_config_load.unwrap_or(false); iceberg_configs.insert( - // TODO: `disable_config_load` is outdated. - "disable_config_load".to_owned(), + S3_DISABLE_CONFIG_LOAD.to_owned(), + (!enable_config_load).to_string(), + ); + + iceberg_configs.insert( + GCS_DISABLE_CONFIG_LOAD.to_owned(), (!enable_config_load).to_string(), ); @@ -288,37 +301,62 @@ impl IcebergCommon { ) -> ConnectorResult> { match self.catalog_type() { "storage" => { - let config = - storage_catalog::StorageCatalogConfig::builder() - .warehouse(self.warehouse_path.clone().ok_or_else(|| { - anyhow!("`warehouse.path` must be set in storage catalog") - })?) - .access_key(self.access_key.clone().ok_or_else(|| { - anyhow!("`s3.access.key` must be set in storage catalog") - })?) - .secret_key(self.secret_key.clone().ok_or_else(|| { - anyhow!("`s3.secret.key` must be set in storage catalog") - })?) - .region(self.region.clone()) - .endpoint(self.endpoint.clone()) - .build(); + let warehouse = self + .warehouse_path + .clone() + .ok_or_else(|| anyhow!("`warehouse.path` must be set in storage catalog"))?; + let url = Url::parse(warehouse.as_ref()) + .map_err(|_| anyhow!("Invalid warehouse path: {}", warehouse))?; + + let config = match url.scheme() { + "s3" | "s3a" => StorageCatalogConfig::S3( + storage_catalog::StorageCatalogS3Config::builder() + .warehouse(warehouse) + .access_key(self.access_key.clone().ok_or_else(|| { + anyhow!("`s3.access.key` must be set in storage catalog") + })?) + .secret_key(self.secret_key.clone().ok_or_else(|| { + anyhow!("`s3.secret.key` must be set in storage catalog") + })?) + .region(self.region.clone()) + .endpoint(self.endpoint.clone()) + .build(), + ), + "gs" | "gcs" => StorageCatalogConfig::Gcs( + storage_catalog::StorageCatalogGcsConfig::builder() + .warehouse(warehouse) + .credential(self.gcs_credential.clone().ok_or_else(|| { + anyhow!("`gcs.credential` must be set in storage catalog") + })?) + .build(), + ), + scheme => bail!("Unsupported warehouse scheme: {}", scheme), + }; + let catalog = storage_catalog::StorageCatalog::new(config)?; Ok(Arc::new(catalog)) } "rest_rust" => { let mut iceberg_configs = HashMap::new(); - if let Some(region) = &self.region { - iceberg_configs.insert(S3_REGION.to_owned(), region.clone()); - } - if let Some(endpoint) = &self.endpoint { - iceberg_configs.insert(S3_ENDPOINT.to_owned(), endpoint.clone()); - } - if let Some(access_key) = &self.access_key { - iceberg_configs.insert(S3_ACCESS_KEY_ID.to_owned(), access_key.clone()); - } - if let Some(secret_key) = &self.secret_key { - iceberg_configs.insert(S3_SECRET_ACCESS_KEY.to_owned(), secret_key.clone()); - } + + // check gcs credential or s3 access key and secret key + if let Some(gcs_credential) = &self.gcs_credential { + iceberg_configs.insert(GCS_CREDENTIALS_JSON.to_owned(), gcs_credential.clone()); + } else { + if let Some(region) = &self.region { + iceberg_configs.insert(S3_REGION.to_owned(), region.clone()); + } + if let Some(endpoint) = &self.endpoint { + iceberg_configs.insert(S3_ENDPOINT.to_owned(), endpoint.clone()); + } + if let Some(access_key) = &self.access_key { + iceberg_configs.insert(S3_ACCESS_KEY_ID.to_owned(), access_key.clone()); + } + if let Some(secret_key) = &self.secret_key { + iceberg_configs.insert(S3_SECRET_ACCESS_KEY.to_owned(), secret_key.clone()); + } + }; + if let Some(credential) = &self.credential { iceberg_configs.insert("credential".to_owned(), credential.clone()); } @@ -442,20 +480,38 @@ impl IcebergCommon { ) -> ConnectorResult { match self.catalog_type() { "storage" => { - let config = - storage_catalog::StorageCatalogConfig::builder() - .warehouse(self.warehouse_path.clone().ok_or_else(|| { - anyhow!("`warehouse.path` must be set in storage catalog") - })?) - .access_key(self.access_key.clone().ok_or_else(|| { - anyhow!("`s3.access.key` must be set in storage catalog") - })?) - .secret_key(self.secret_key.clone().ok_or_else(|| { - anyhow!("`s3.secret.key` must be set in storage catalog") - })?) - .region(self.region.clone()) - .endpoint(self.endpoint.clone()) - .build(); + let warehouse = self + .warehouse_path + .clone() + .ok_or_else(|| anyhow!("`warehouse.path` must be set in storage catalog"))?; + let url = Url::parse(warehouse.as_ref()) + .map_err(|_| anyhow!("Invalid warehouse path: {}", warehouse))?; + + let config = match url.scheme() { + "s3" | "s3a" => StorageCatalogConfig::S3( + storage_catalog::StorageCatalogS3Config::builder() + .warehouse(warehouse) + .access_key(self.access_key.clone().ok_or_else(|| { + anyhow!("`s3.access.key` must be set in storage catalog") + })?) + .secret_key(self.secret_key.clone().ok_or_else(|| { + anyhow!("`s3.secret.key` must be set in storage catalog") + })?) + .region(self.region.clone()) + .endpoint(self.endpoint.clone()) + .build(), + ), + "gs" | "gcs" => StorageCatalogConfig::Gcs( + storage_catalog::StorageCatalogGcsConfig::builder() + .warehouse(warehouse) + .credential(self.gcs_credential.clone().ok_or_else(|| { + anyhow!("`gcs.credential` must be set in storage catalog") + })?) + .build(), + ), + scheme => bail!("Unsupported warehouse scheme: {}", scheme), + }; + let storage_catalog = storage_catalog::StorageCatalog::new(config)?; let table_id = self diff --git a/src/connector/src/connector_common/iceberg/storage_catalog.rs b/src/connector/src/connector_common/iceberg/storage_catalog.rs index b924c22669328..da7a337c67686 100644 --- a/src/connector/src/connector_common/iceberg/storage_catalog.rs +++ b/src/connector/src/connector_common/iceberg/storage_catalog.rs @@ -17,20 +17,26 @@ use std::collections::HashMap; use async_trait::async_trait; -use iceberg::io::{FileIO, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY}; +use iceberg::io::{ + FileIO, GCS_CREDENTIALS_JSON, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY, +}; use iceberg::spec::{TableMetadata, TableMetadataBuilder}; use iceberg::table::Table; use iceberg::{ Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, TableIdent, }; -use opendal::Operator; use thiserror_ext::AsReport; -use tokio_stream::StreamExt; use typed_builder::TypedBuilder; +#[derive(Debug)] +pub enum StorageCatalogConfig { + S3(StorageCatalogS3Config), + Gcs(StorageCatalogGcsConfig), +} + #[derive(Clone, Debug, TypedBuilder)] -pub struct StorageCatalogConfig { +pub struct StorageCatalogS3Config { warehouse: String, access_key: String, secret_key: String, @@ -38,35 +44,46 @@ pub struct StorageCatalogConfig { region: Option, } +#[derive(Clone, Debug, TypedBuilder)] +pub struct StorageCatalogGcsConfig { + warehouse: String, + credential: String, +} + /// File system catalog. #[derive(Debug)] pub struct StorageCatalog { warehouse: String, - config: StorageCatalogConfig, file_io: FileIO, } impl StorageCatalog { pub fn new(config: StorageCatalogConfig) -> Result { - let mut file_io_builder = FileIO::from_path(&config.warehouse)? - .with_prop(S3_ACCESS_KEY_ID, &config.access_key) - .with_prop(S3_SECRET_ACCESS_KEY, &config.secret_key); - file_io_builder = if let Some(endpoint) = &config.endpoint { - file_io_builder.with_prop(S3_ENDPOINT, endpoint) - } else { - file_io_builder - }; - file_io_builder = if let Some(region) = &config.region { - file_io_builder.with_prop(S3_REGION, region) - } else { - file_io_builder + let (warehouse, file_io) = match config { + StorageCatalogConfig::S3(config) => { + let mut file_io_builder = FileIO::from_path(&config.warehouse)? + .with_prop(S3_ACCESS_KEY_ID, &config.access_key) + .with_prop(S3_SECRET_ACCESS_KEY, &config.secret_key); + file_io_builder = if let Some(endpoint) = &config.endpoint { + file_io_builder.with_prop(S3_ENDPOINT, endpoint) + } else { + file_io_builder + }; + file_io_builder = if let Some(region) = &config.region { + file_io_builder.with_prop(S3_REGION, region) + } else { + file_io_builder + }; + (config.warehouse.clone(), file_io_builder.build()?) + } + StorageCatalogConfig::Gcs(config) => { + let file_io_builder = FileIO::from_path(&config.warehouse)? + .with_prop(GCS_CREDENTIALS_JSON, &config.credential); + (config.warehouse.clone(), file_io_builder.build()?) + } }; - Ok(StorageCatalog { - warehouse: config.warehouse.clone(), - config, - file_io: file_io_builder.build()?, - }) + Ok(StorageCatalog { warehouse, file_io }) } /// Check if version hint file exist. @@ -108,60 +125,6 @@ impl StorageCatalog { .map_err(|_| Error::new(ErrorKind::DataInvalid, "parse version hint failed")) } - /// List all paths of table metadata files. - /// - /// The returned paths are sorted by name. - /// - /// TODO: we can improve this by only fetch the latest metadata. - /// - /// `table_path`: relative path of table dir under warehouse root. - async fn list_table_metadata_paths(&self, table_path: &str) -> Result> { - // create s3 operator - let mut builder = opendal::services::S3::default() - .root(&self.warehouse) - .access_key_id(&self.config.access_key) - .secret_access_key(&self.config.secret_key); - if let Some(endpoint) = &self.config.endpoint { - builder = builder.endpoint(endpoint); - } - if let Some(region) = &self.config.region { - builder = builder.region(region); - } - let op: Operator = Operator::new(builder) - .map_err(|err| Error::new(ErrorKind::Unexpected, err.to_report_string()))? - .finish(); - - // list metadata files - let mut lister = op - .lister(format!("{table_path}/metadata/").as_str()) - .await - .map_err(|err| { - Error::new( - ErrorKind::Unexpected, - format!("list metadata failed: {}", err.as_report()), - ) - })?; - let mut paths = vec![]; - while let Some(entry) = lister.next().await { - let entry = entry.map_err(|err| { - Error::new( - ErrorKind::Unexpected, - format!("list metadata entry failed: {}", err.as_report()), - ) - })?; - - // Only push into paths if the entry is a metadata file. - if entry.path().ends_with(".metadata.json") { - paths.push(entry.path().to_owned()); - } - } - - // Make the returned paths sorted by name. - paths.sort(); - - Ok(paths) - } - pub fn file_io(&self) -> &FileIO { &self.file_io } @@ -303,12 +266,10 @@ impl Catalog for StorageCatalog { let version_hint = self.read_version_hint(&table_path).await?; format!("{table_path}/metadata/v{}.metadata.json", version_hint) } else { - let files = self.list_table_metadata_paths(&table_path).await?; - - files.into_iter().last().ok_or(Error::new( + return Err(Error::new( ErrorKind::DataInvalid, - "no table metadata found", - ))? + "no version hint found for table", + )); }; let metadata_file = self.file_io.new_input(path)?; diff --git a/src/connector/src/sink/iceberg/mod.rs b/src/connector/src/sink/iceberg/mod.rs index c52ab3936da7e..f84d0aa2cb339 100644 --- a/src/connector/src/sink/iceberg/mod.rs +++ b/src/connector/src/sink/iceberg/mod.rs @@ -1430,6 +1430,7 @@ mod test { endpoint: Some("http://127.0.0.1:9301".to_owned()), access_key: Some("hummockadmin".to_owned()), secret_key: Some("hummockadmin".to_owned()), + gcs_credential: None, catalog_type: Some("jdbc".to_owned()), catalog_name: Some("demo".to_owned()), database_name: Some("demo_db".to_owned()), diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index a6bdd6ced3139..d7291c8ecc61d 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -437,6 +437,9 @@ IcebergConfig: - name: s3.secret.key field_type: String required: false + - name: gcs.credential + field_type: String + required: false - name: warehouse.path field_type: String comments: Path of iceberg warehouse, only applicable in storage catalog. diff --git a/src/connector/with_options_source.yaml b/src/connector/with_options_source.yaml index f2547998a5c9c..4cb45b983f0fa 100644 --- a/src/connector/with_options_source.yaml +++ b/src/connector/with_options_source.yaml @@ -96,6 +96,9 @@ IcebergProperties: - name: s3.secret.key field_type: String required: false + - name: gcs.credential + field_type: String + required: false - name: warehouse.path field_type: String comments: Path of iceberg warehouse, only applicable in storage catalog.