diff --git a/Cargo.lock b/Cargo.lock index bfd8da5b9f9fd..32440227377ff 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,16 @@ # It is not intended for manual editing. version = 4 +[[package]] +name = "adbc_core" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c00daa298739db8f033952bdf3c67ccbcf2ccee9652df7eb4c7cd3397a7646d" +dependencies = [ + "arrow-array 54.3.1", + "arrow-schema 54.3.1", +] + [[package]] name = "addr2line" version = "0.21.0" @@ -657,6 +667,9 @@ name = "arrow-schema" version = "54.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cfaf5e440be44db5413b75b72c2a87c1f8f0627117d110264048f2969b99e9" +dependencies = [ + "bitflags 2.10.0", +] [[package]] name = "arrow-schema" @@ -11775,6 +11788,7 @@ dependencies = [ name = "risingwave_connector" version = "2.7.0-alpha" dependencies = [ + "adbc_core", "anyhow", "apache-avro 0.16.0", "assert_matches", diff --git a/Makefile.toml b/Makefile.toml index 6f3ed7e4e927a..5193bb3d360e3 100644 --- a/Makefile.toml +++ b/Makefile.toml @@ -9,6 +9,7 @@ extend = [ { path = "src/risedevtool/connector.toml" }, { path = "src/risedevtool/risedev-components.toml" }, { path = "src/risedevtool/moat.toml" }, + { path = "src/risedevtool/adbc.toml" }, { path = "src/sqlparser/sqlparser_test.toml" }, { path = "src/frontend/planner_test/planner_test.toml" }, { path = "src/tests/compaction_test/Makefile.toml" }, @@ -85,6 +86,30 @@ env_scripts = [ set_env RISINGWAVE_FEATURE_FLAGS "${flags} --features datafusion" end + is_adbc_enabled = get_env ENABLE_ADBC + if ${is_adbc_enabled} + # Set the ADBC driver manager path for runtime loading + set_env ADBC_DRIVER_PATH "${PREFIX_BIN}/adbc" + # Add ADBC library path to LD_LIBRARY_PATH (Linux) or DYLD_LIBRARY_PATH (macOS) + os_name = os_family + if eq ${os_name} "linux" + existing_ld_path = get_env LD_LIBRARY_PATH + if ${existing_ld_path} + set_env LD_LIBRARY_PATH "${PREFIX_BIN}/adbc:${existing_ld_path}" + else + set_env LD_LIBRARY_PATH "${PREFIX_BIN}/adbc" + end + else + # macOS + existing_dyld_path = get_env DYLD_LIBRARY_PATH + if ${existing_dyld_path} + set_env DYLD_LIBRARY_PATH "${PREFIX_BIN}/adbc:${existing_dyld_path}" + else + set_env DYLD_LIBRARY_PATH "${PREFIX_BIN}/adbc" + end + end + end + is_ci = get_env RISINGWAVE_CI is_not_ci = not ${is_ci} diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 0c958a718d4e1..807099bbde379 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -7,6 +7,9 @@ keywords = { workspace = true } license = { workspace = true } repository = { workspace = true } +[package.metadata.cargo-machete] +ignored = ["adbc_core"] + [features] default = [] all-sinks = [ @@ -17,7 +20,8 @@ all-sinks = [ "sink-doris", "sink-starrocks", ] -all-sources = [] +all-sources = ["source-adbc_snowflake"] +source-adbc_snowflake = ["dep:adbc_core"] sink-bigquery = [ "dep:gcp-bigquery-client", "dep:google-cloud-bigquery", @@ -35,6 +39,7 @@ sink-doris = [] sink-starrocks = [] [dependencies] +adbc_core = { version = "0.17", optional = true } anyhow = "1" apache-avro = { workspace = true } assert_matches = "1" diff --git a/src/connector/src/source/adbc_snowflake/mod.rs b/src/connector/src/source/adbc_snowflake/mod.rs new file mode 100644 index 0000000000000..15d469286c4c2 --- /dev/null +++ b/src/connector/src/source/adbc_snowflake/mod.rs @@ -0,0 +1,13 @@ +// Copyright 2025 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. diff --git a/src/connector/src/source/mod.rs b/src/connector/src/source/mod.rs index 118efc7f859ec..a2ab87452266e 100644 --- a/src/connector/src/source/mod.rs +++ b/src/connector/src/source/mod.rs @@ -57,6 +57,7 @@ pub mod mqtt; pub mod nats; pub mod nexmark; pub mod pulsar; +pub mod utils; mod util; use std::future::IntoFuture; @@ -70,11 +71,13 @@ pub use kafka::KAFKA_CONNECTOR; pub use kinesis::KINESIS_CONNECTOR; pub use mqtt::MQTT_CONNECTOR; pub use nats::NATS_CONNECTOR; +use utils::feature_gated_source_mod; mod common; pub mod iceberg; mod manager; pub mod reader; pub mod test_source; +feature_gated_source_mod!(adbc_snowflake, "adbc_snowflake"); use async_nats::jetstream::consumer::AckPolicy as JetStreamAckPolicy; use async_nats::jetstream::context::Context as JetStreamContext; diff --git a/src/connector/src/source/utils.rs b/src/connector/src/source/utils.rs new file mode 100644 index 0000000000000..1d7e4d0ebd883 --- /dev/null +++ b/src/connector/src/source/utils.rs @@ -0,0 +1,218 @@ +// Copyright 2025 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/// Dummy trait implementation for a source when the feature is not enabled at compile time. +pub(crate) mod dummy { + use std::collections::HashMap; + + use anyhow::anyhow; + use async_trait::async_trait; + use risingwave_common::types::JsonbVal; + use serde::{Deserialize, Serialize}; + + use crate::error::{ConnectorError, ConnectorResult}; + use crate::parser::ParserConfig; + use crate::source::base::{ + SourceProperties, SplitId, SplitMetaData, SplitReader, UnknownFields, + }; + use crate::source::{BoxSourceChunkStream, SourceContextRef, SplitEnumerator, SplitImpl}; + use crate::with_options::WithOptions; + + #[allow(dead_code)] + pub fn err_feature_not_enabled(source_name: &'static str) -> ConnectorError { + ConnectorError::from(anyhow!( + "RisingWave is not compiled with feature `source-{}`", + source_name + )) + } + + /// Implement this trait will bring a dummy source implementation that always returns an error. + pub trait FeatureNotEnabledSourceMarker: + Send + Sync + Clone + std::fmt::Debug + 'static + { + #[allow(dead_code)] + const SOURCE_NAME: &'static str; + } + + /// A dummy split that always returns an error. + #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Hash)] + pub struct FeatureNotEnabledSplit { + #[serde(skip)] + _marker: std::marker::PhantomData, + } + + impl SplitMetaData for FeatureNotEnabledSplit { + fn id(&self) -> SplitId { + "feature_not_enabled".into() + } + + fn encode_to_json(&self) -> JsonbVal { + serde_json::to_value(self).unwrap().into() + } + + fn restore_from_json(_value: JsonbVal) -> crate::error::ConnectorResult { + Err(err_feature_not_enabled(S::SOURCE_NAME)) + } + + fn update_offset(&mut self, _last_seen_offset: String) -> ConnectorResult<()> { + Err(err_feature_not_enabled(S::SOURCE_NAME)) + } + } + + impl TryFrom for FeatureNotEnabledSplit { + type Error = ConnectorError; + + fn try_from(_value: SplitImpl) -> Result { + Err(err_feature_not_enabled(S::SOURCE_NAME)) + } + } + + impl From> for SplitImpl { + fn from(_: FeatureNotEnabledSplit) -> Self { + unreachable!("FeatureNotEnabledSplit should never be converted to SplitImpl") + } + } + + /// A dummy split enumerator that always returns an error. + #[allow(dead_code)] + pub struct FeatureNotEnabledSplitEnumerator( + std::marker::PhantomData, + ); + + #[async_trait] + impl SplitEnumerator for FeatureNotEnabledSplitEnumerator { + type Properties = FeatureNotEnabledProperties; + type Split = FeatureNotEnabledSplit; + + async fn new( + _properties: Self::Properties, + _context: crate::source::SourceEnumeratorContextRef, + ) -> ConnectorResult { + Err(err_feature_not_enabled(S::SOURCE_NAME)) + } + + async fn list_splits(&mut self) -> ConnectorResult> { + Err(err_feature_not_enabled(S::SOURCE_NAME)) + } + } + + /// A dummy split reader that always returns an error. + #[allow(dead_code)] + pub struct FeatureNotEnabledSplitReader( + std::marker::PhantomData, + ); + + #[async_trait] + impl SplitReader for FeatureNotEnabledSplitReader { + type Properties = FeatureNotEnabledProperties; + type Split = FeatureNotEnabledSplit; + + async fn new( + _properties: Self::Properties, + _state: Vec, + _parser_config: ParserConfig, + _source_ctx: SourceContextRef, + _columns: Option>, + ) -> ConnectorResult { + Err(err_feature_not_enabled(S::SOURCE_NAME)) + } + + fn into_stream(self) -> BoxSourceChunkStream { + Box::pin(futures::stream::once(async { + Err(err_feature_not_enabled(S::SOURCE_NAME)) + })) + } + } + + /// A dummy source properties that always returns an error. + #[allow(dead_code)] + #[derive(Clone, Debug, Deserialize, PartialEq)] + pub struct FeatureNotEnabledProperties { + #[serde(skip)] + _marker: std::marker::PhantomData, + #[serde(flatten)] + pub unknown_fields: HashMap, + } + + impl Default for FeatureNotEnabledProperties { + fn default() -> Self { + Self { + _marker: std::marker::PhantomData, + unknown_fields: HashMap::new(), + } + } + } + + impl UnknownFields for FeatureNotEnabledProperties { + fn unknown_fields(&self) -> HashMap { + self.unknown_fields.clone() + } + } + + impl WithOptions for FeatureNotEnabledProperties {} + + impl crate::enforce_secret::EnforceSecret + for FeatureNotEnabledProperties + { + const ENFORCE_SECRET_PROPERTIES: phf::Set<&'static str> = phf::phf_set! {}; + } + + impl SourceProperties + for FeatureNotEnabledProperties + { + type Split = FeatureNotEnabledSplit; + type SplitEnumerator = FeatureNotEnabledSplitEnumerator; + type SplitReader = FeatureNotEnabledSplitReader; + + const SOURCE_NAME: &'static str = S::SOURCE_NAME; + } +} + +/// Define a source module that is gated by a feature. +/// +/// This is to allow some heavy or unpopular source implementations (and their dependencies) to be disabled +/// at compile time, in order to decrease compilation time and binary size. +#[allow(unused_macros)] +macro_rules! feature_gated_source_mod { + ($mod_name:ident, $source_name:literal) => { + crate::source::utils::feature_gated_source_mod!($mod_name, $mod_name, $source_name); + }; + ($mod_name:ident, $struct_prefix:ident, $source_name:literal) => { + paste::paste! { + #[cfg(feature = "source-" $source_name)] + pub mod $mod_name; + #[cfg(not(feature = "source-" $source_name))] + pub mod $mod_name { + use crate::source::utils::dummy::{ + FeatureNotEnabledSourceMarker, FeatureNotEnabledProperties, + FeatureNotEnabledSplit, FeatureNotEnabledSplitEnumerator, FeatureNotEnabledSplitReader, + }; + pub struct [<$struct_prefix:camel NotEnabled>]; + pub const [<$source_name:upper _CONNECTOR>]: &'static str = $source_name; + impl FeatureNotEnabledSourceMarker for [<$struct_prefix:camel NotEnabled>] { + const SOURCE_NAME: &'static str = [<$source_name:upper _CONNECTOR>]; + } + #[doc = "A dummy source properties that always returns an error, as the feature `source-" $source_name "` is currently not enabled."] + pub type [<$struct_prefix:camel Properties>] = FeatureNotEnabledProperties<[<$struct_prefix:camel NotEnabled>]>; + #[doc = "A dummy split that always returns an error, as the feature `source-" $source_name "` is currently not enabled."] + pub type [<$struct_prefix:camel Split>] = FeatureNotEnabledSplit<[<$struct_prefix:camel NotEnabled>]>; + #[doc = "A dummy split enumerator that always returns an error, as the feature `source-" $source_name "` is currently not enabled."] + pub type [<$struct_prefix:camel SplitEnumerator>] = FeatureNotEnabledSplitEnumerator<[<$struct_prefix:camel NotEnabled>]>; + #[doc = "A dummy split reader that always returns an error, as the feature `source-" $source_name "` is currently not enabled."] + pub type [<$struct_prefix:camel SplitReader>] = FeatureNotEnabledSplitReader<[<$struct_prefix:camel NotEnabled>]>; + } + } + }; +} +pub(super) use feature_gated_source_mod; diff --git a/src/risedevtool/adbc.toml b/src/risedevtool/adbc.toml new file mode 100644 index 0000000000000..ca5faf4f1255d --- /dev/null +++ b/src/risedevtool/adbc.toml @@ -0,0 +1,107 @@ +extend = "common.toml" + +[env] +ADBC_VERSION = { value = "21", condition = { env_not_set = [ + "ADBC_VERSION", +] } } +# Driver version in wheel filename (e.g., 1.9.0 for ADBC release 21) +ADBC_DRIVER_VERSION = { value = "1.9.0", condition = { env_not_set = [ + "ADBC_DRIVER_VERSION", +] } } +ADBC_LIB_SUFFIX = { source = "${OS}", mapping = { linux = "so", darwin = "dylib" } } +ADBC_SNOWFLAKE_DRIVER_NAME = "libadbc_driver_snowflake.${ADBC_LIB_SUFFIX}" + +[tasks.download-adbc-snowflake] +private = true +category = "RiseDev - Components" +dependencies = ["prepare"] +condition = { env_set = [ + "ENABLE_ADBC", +], files_not_exist = [ + "${PREFIX_BIN}/adbc/${ADBC_SNOWFLAKE_DRIVER_NAME}", +] } +description = "Download ADBC Snowflake driver shared library" +script = ''' +#!/usr/bin/env bash +set -e + +if [ -f "${PREFIX_BIN}/adbc/${ADBC_SNOWFLAKE_DRIVER_NAME}" ]; then + exit 0 +fi + +# Detect OS and architecture +OS_TYPE="$(uname -s)" +ARCH_TYPE="$(uname -m)" + +# Determine wheel filename suffix based on platform +case "${OS_TYPE}" in + Linux) + case "${ARCH_TYPE}" in + x86_64) + WHEEL_SUFFIX="manylinux1_x86_64.manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_5_x86_64" + ;; + aarch64) + WHEEL_SUFFIX="manylinux2014_aarch64.manylinux_2_17_aarch64" + ;; + *) + echo "Error: Unsupported Linux architecture: ${ARCH_TYPE}" + exit 1 + ;; + esac + ;; + Darwin) + case "${ARCH_TYPE}" in + x86_64) + WHEEL_SUFFIX="macosx_10_15_x86_64" + ;; + arm64) + WHEEL_SUFFIX="macosx_11_0_arm64" + ;; + *) + echo "Error: Unsupported macOS architecture: ${ARCH_TYPE}" + exit 1 + ;; + esac + ;; + *) + echo "Error: Unsupported operating system: ${OS_TYPE}" + exit 1 + ;; +esac + +WHEEL_FILENAME="adbc_driver_snowflake-${ADBC_DRIVER_VERSION}-py3-none-${WHEEL_SUFFIX}.whl" +DOWNLOAD_URL="https://github.com/apache/arrow-adbc/releases/download/apache-arrow-adbc-${ADBC_VERSION}/${WHEEL_FILENAME}" + +echo "ADBC Snowflake driver not found, downloading version ${ADBC_VERSION} (driver ${ADBC_DRIVER_VERSION})" +echo "Platform: ${OS_TYPE} ${ARCH_TYPE}" +echo "Download URL: ${DOWNLOAD_URL}" + +mkdir -p "${PREFIX_BIN}/adbc" +curl -fL -o "${PREFIX_TMP}/${WHEEL_FILENAME}" "${DOWNLOAD_URL}" + +# Extract .so/.dylib from wheel (wheel is a zip file) +unzip -j -o "${PREFIX_TMP}/${WHEEL_FILENAME}" "adbc_driver_snowflake/${ADBC_SNOWFLAKE_DRIVER_NAME}" -d "${PREFIX_BIN}/adbc" +rm -f "${PREFIX_TMP}/${WHEEL_FILENAME}" + +if [ -f "${PREFIX_BIN}/adbc/${ADBC_SNOWFLAKE_DRIVER_NAME}" ]; then + echo "ADBC Snowflake driver installed successfully" +else + echo "Warning: ADBC Snowflake driver file not found after extraction" + echo "Please check the archive contents and update the download script" + ls -la "${PREFIX_BIN}/adbc/" +fi +''' + +[tasks.remove-adbc-snowflake] +private = true +dependencies = ["prepare"] +condition = { env_set = ["ENABLE_ADBC"], files_exist = ["${PREFIX_BIN}/adbc"] } +script = ''' +#!/usr/bin/env bash +set -e +rm -rf "${PREFIX_BIN}/adbc" +''' + +[tasks.update-adbc-snowflake] +dependencies = ["prepare", "remove-adbc-snowflake", "download-adbc-snowflake"] +condition = { env_set = ["ENABLE_ADBC"] } diff --git a/src/risedevtool/config/src/main.rs b/src/risedevtool/config/src/main.rs index 9ed323e1bfa8a..cb194b0331f77 100644 --- a/src/risedevtool/config/src/main.rs +++ b/src/risedevtool/config/src/main.rs @@ -79,6 +79,7 @@ pub enum Components { NoDefaultFeatures, Moat, DataFusion, + Adbc, } impl Components { @@ -105,6 +106,7 @@ impl Components { Self::NoDefaultFeatures => "[Build] Disable default features", Self::Moat => "[Component] Enable Moat", Self::DataFusion => "[Build] Enable DataFusion", + Self::Adbc => "[Component] ADBC Snowflake Driver", } .into() } @@ -227,6 +229,12 @@ Enable Moat as distributed hybrid cache service." " Enable DataFusion as the optional query engine for Iceberg tables." } + Self::Adbc => { + " +Enable ADBC (Arrow Database Connectivity) Snowflake driver support. +Required if you want to use ADBC Snowflake sink. +This will download the ADBC Snowflake driver shared library (.so/.dylib)." + } } .into() } @@ -254,6 +262,7 @@ Enable DataFusion as the optional query engine for Iceberg tables." "DISABLE_DEFAULT_FEATURES" => Some(Self::NoDefaultFeatures), "ENABLE_MOAT" => Some(Self::Moat), "ENABLE_DATAFUSION" => Some(Self::DataFusion), + "ENABLE_ADBC" => Some(Self::Adbc), _ => None, } } @@ -281,6 +290,7 @@ Enable DataFusion as the optional query engine for Iceberg tables." Self::NoDefaultFeatures => "DISABLE_DEFAULT_FEATURES", Self::Moat => "ENABLE_MOAT", Self::DataFusion => "ENABLE_DATAFUSION", + Self::Adbc => "ENABLE_ADBC", } .into() }