From 544a1073233d772f98ca2d7495ed129fb72b7845 Mon Sep 17 00:00:00 2001 From: Anoop Narang Date: Tue, 10 Feb 2026 15:41:24 +0530 Subject: [PATCH 1/9] feat(bigquery): add BigQuery native datasource Add BigQuery as a native data source with table discovery and data fetching via the gcp-bigquery-client crate. Includes Source enum variant, credential support, and integration into the NativeFetcher. --- Cargo.lock | 186 +++++- Cargo.toml | 1 + src/datafetch/native/bigquery.rs | 1004 ++++++++++++++++++++++++++++++ src/datafetch/native/mod.rs | 5 + src/source.rs | 85 +++ 5 files changed, 1280 insertions(+), 1 deletion(-) create mode 100644 src/datafetch/native/bigquery.rs diff --git a/Cargo.lock b/Cargo.lock index b57cb3f..3c14964 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2882,7 +2882,7 @@ dependencies = [ "itertools 0.14.0", "parking_lot 0.12.5", "paste", - "petgraph", + "petgraph 0.8.3", ] [[package]] @@ -3067,6 +3067,24 @@ dependencies = [ "unicode-width", ] +[[package]] +name = "deadpool" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0be2b1d1d6ec8d846f05e137292d0b89133caf95ef33695424c09568bdd39b1b" +dependencies = [ + "deadpool-runtime", + "lazy_static", + "num_cpus", + "tokio", +] + +[[package]] +name = "deadpool-runtime" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "092966b41edc516079bdf31ec78a2e0588d1d0c08f78b91d8307215928642b2b" + [[package]] name = "delegate" version = "0.13.5" @@ -3614,6 +3632,39 @@ dependencies = [ "slab", ] +[[package]] +name = "gcp-bigquery-client" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b776bd336303c2dcd66a872d3a9f5c2a4678162dc3c90617e89ea665e2ce79cc" +dependencies = [ + "async-stream", + "async-trait", + "deadpool", + "dyn-clone", + "flate2", + "futures", + "hyper-util", + "log", + "pin-project", + "prost", + "prost-build", + "prost-types", + "reqwest 0.12.24", + "serde", + "serde_json", + "thiserror 2.0.17", + "time", + "tokio", + "tokio-stream", + "tonic", + "tonic-build", + "tonic-prost", + "tonic-prost-build", + "url", + "yup-oauth2", +] + [[package]] name = "generic-array" version = "0.14.9" @@ -4752,6 +4803,12 @@ dependencies = [ "uuid", ] +[[package]] +name = "multimap" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d87ecb2933e8aeadb3e3a02b828fed80a7528047e68b4f424523a0981a3a084" + [[package]] name = "murmur3" version = "0.5.2" @@ -4883,6 +4940,15 @@ dependencies = [ "libc", ] +[[package]] +name = "num_threads" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c7398b9c8b70908f6371f47ed36737907c87c52af34c268fed0bf0ceb92ead9" +dependencies = [ + "libc", +] + [[package]] name = "object" version = "0.32.2" @@ -5351,6 +5417,16 @@ dependencies = [ "sha2", ] +[[package]] +name = "petgraph" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3672b37090dbd86368a4145bc067582552b29c27377cad4e0a306c97f9bd7772" +dependencies = [ + "fixedbitset", + "indexmap 2.12.0", +] + [[package]] name = "petgraph" version = "0.8.3" @@ -5542,6 +5618,28 @@ dependencies = [ "prost-derive", ] +[[package]] +name = "prost-build" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac6c3320f9abac597dcbc668774ef006702672474aad53c6d596b62e487b40b1" +dependencies = [ + "heck", + "itertools 0.14.0", + "log", + "multimap", + "once_cell", + "petgraph 0.7.1", + "prettyplease", + "prost", + "prost-types", + "pulldown-cmark", + "pulldown-cmark-to-cmark", + "regex", + "syn 2.0.110", + "tempfile", +] + [[package]] name = "prost-derive" version = "0.14.1" @@ -5594,6 +5692,26 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "pulldown-cmark" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e8bbe1a966bd2f362681a44f6edce3c2310ac21e4d5067a6e7ec396297a6ea0" +dependencies = [ + "bitflags 2.10.0", + "memchr", + "unicase", +] + +[[package]] +name = "pulldown-cmark-to-cmark" +version = "21.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8246feae3db61428fd0bb94285c690b460e4517d83152377543ca802357785f1" +dependencies = [ + "pulldown-cmark", +] + [[package]] name = "quad-rand" version = "0.2.3" @@ -6196,6 +6314,7 @@ dependencies = [ "datafusion-tracing", "duckdb", "futures", + "gcp-bigquery-client", "http 1.3.1", "iceberg", "iceberg-catalog-glue", @@ -7399,7 +7518,9 @@ checksum = "91e7d9e3bb61134e77bde20dd4825b97c010155709965fedf0f49bb138e52a9d" dependencies = [ "deranged", "itoa", + "libc", "num-conv", + "num_threads", "powerfmt", "serde", "time-core", @@ -7622,6 +7743,7 @@ dependencies = [ "axum", "base64 0.22.1", "bytes", + "flate2", "h2 0.4.12", "http 1.3.1", "http-body 1.0.1", @@ -7631,14 +7753,29 @@ dependencies = [ "hyper-util", "percent-encoding", "pin-project", + "rustls-native-certs", "socket2 0.6.1", "sync_wrapper", "tokio", + "tokio-rustls 0.26.4", "tokio-stream", "tower", "tower-layer", "tower-service", "tracing", + "zstd", +] + +[[package]] +name = "tonic-build" +version = "0.14.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "27aac809edf60b741e2d7db6367214d078856b8a5bff0087e94ff330fb97b6fc" +dependencies = [ + "prettyplease", + "proc-macro2", + "quote", + "syn 2.0.110", ] [[package]] @@ -7652,6 +7789,22 @@ dependencies = [ "tonic", ] +[[package]] +name = "tonic-prost-build" +version = "0.14.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4556786613791cfef4ed134aa670b61a85cfcacf71543ef33e8d801abae988f" +dependencies = [ + "prettyplease", + "proc-macro2", + "prost-build", + "prost-types", + "quote", + "syn 2.0.110", + "tempfile", + "tonic-build", +] + [[package]] name = "tower" version = "0.5.2" @@ -7842,6 +7995,12 @@ version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2896d95c02a80c6d6a5d6e953d479f5ddf2dfdb6a244441010e373ac0fb88971" +[[package]] +name = "unicase" +version = "2.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dbc4bc3a9f746d862c45cb89d705aa10f187bb96c76001afab07a0d35ce60142" + [[package]] name = "unicode-bidi" version = "0.3.18" @@ -8667,6 +8826,31 @@ dependencies = [ "synstructure", ] +[[package]] +name = "yup-oauth2" +version = "12.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef19a12dfb29fe39f78e1547e1be49717b84aef8762a4001359ed4f94d3accc1" +dependencies = [ + "async-trait", + "base64 0.22.1", + "http 1.3.1", + "http-body-util", + "hyper 1.8.1", + "hyper-rustls 0.27.7", + "hyper-util", + "log", + "percent-encoding", + "rustls 0.23.35", + "seahash", + "serde", + "serde_json", + "thiserror 2.0.17", + "time", + "tokio", + "url", +] + [[package]] name = "zerocopy" version = "0.8.27" diff --git a/Cargo.toml b/Cargo.toml index f0e7ad3..68aa187 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,6 +50,7 @@ iceberg = "0.7" iceberg-catalog-rest = "0.7" iceberg-catalog-glue = "0.7" snowflake-api = "0.11" +gcp-bigquery-client = "0.28.0" # Arrow 55 for iceberg compatibility (iceberg uses arrow 55, datafusion uses arrow 57) arrow-array-55 = { package = "arrow-array", version = "55" } arrow-ipc-55 = { package = "arrow-ipc", version = "55" } diff --git a/src/datafetch/native/bigquery.rs b/src/datafetch/native/bigquery.rs new file mode 100644 index 0000000..0787a74 --- /dev/null +++ b/src/datafetch/native/bigquery.rs @@ -0,0 +1,1004 @@ +//! BigQuery native driver implementation using gcp-bigquery-client + +use datafusion::arrow::array::{ + ArrayBuilder, BinaryBuilder, BooleanBuilder, Date32Builder, Float64Builder, Int64Builder, + StringBuilder, TimestampMicrosecondBuilder, +}; +use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit}; +use datafusion::arrow::record_batch::RecordBatch; +use gcp_bigquery_client::model::query_request::QueryRequest; +use gcp_bigquery_client::model::table_cell::TableCell; +use gcp_bigquery_client::Client; +use std::sync::Arc; +use tracing::warn; + +use crate::datafetch::batch_writer::BatchWriter; +use crate::datafetch::{ColumnMetadata, DataFetchError, TableMetadata}; +use crate::secrets::SecretManager; +use crate::source::Source; + +/// Build a BigQuery client from source credentials. +/// The credential is expected to be a GCP service account key JSON string. +async fn build_client( + source: &Source, + secrets: &SecretManager, +) -> Result { + let credential = match source { + Source::Bigquery { credential, .. } => credential, + _ => { + return Err(DataFetchError::Connection( + "Expected BigQuery source".to_string(), + )) + } + }; + + let cred_json = credential + .resolve(secrets) + .await + .map_err(|e| DataFetchError::Connection(e.to_string()))?; + + let sa_key = serde_json::from_str(&cred_json).map_err(|e| { + DataFetchError::Connection(format!("Invalid service account key JSON: {}", e)) + })?; + + Client::from_service_account_key(sa_key, false) + .await + .map_err(|e| DataFetchError::Connection(format!("Failed to create BigQuery client: {}", e))) +} + +/// Discover tables and columns from BigQuery +pub async fn discover_tables( + source: &Source, + secrets: &SecretManager, +) -> Result, DataFetchError> { + let client = build_client(source, secrets).await?; + + let (project_id, dataset_filter) = match source { + Source::Bigquery { + project_id, + dataset, + .. + } => (project_id.as_str(), dataset.as_deref()), + _ => { + return Err(DataFetchError::Connection( + "Expected BigQuery source".to_string(), + )) + } + }; + + // Build discovery query with optional dataset filter + let query_sql = if let Some(dataset) = dataset_filter { + format!( + r#" + SELECT + c.table_catalog, + c.table_schema, + c.table_name, + t.table_type, + c.column_name, + c.data_type, + c.is_nullable, + c.ordinal_position + FROM `{project_id}`.`{dataset}`.INFORMATION_SCHEMA.COLUMNS c + JOIN `{project_id}`.`{dataset}`.INFORMATION_SCHEMA.TABLES t + ON c.table_catalog = t.table_catalog + AND c.table_schema = t.table_schema + AND c.table_name = t.table_name + ORDER BY c.table_schema, c.table_name, c.ordinal_position + "#, + project_id = project_id.replace('`', "\\`"), + dataset = dataset.replace('`', "\\`"), + ) + } else { + // Without a dataset filter, query all datasets via region-level INFORMATION_SCHEMA + format!( + r#" + SELECT + c.table_catalog, + c.table_schema, + c.table_name, + t.table_type, + c.column_name, + c.data_type, + c.is_nullable, + c.ordinal_position + FROM `{project_id}`.`region-us`.INFORMATION_SCHEMA.COLUMNS c + JOIN `{project_id}`.`region-us`.INFORMATION_SCHEMA.TABLES t + ON c.table_catalog = t.table_catalog + AND c.table_schema = t.table_schema + AND c.table_name = t.table_name + WHERE c.table_schema NOT IN ('INFORMATION_SCHEMA') + ORDER BY c.table_schema, c.table_name, c.ordinal_position + "#, + project_id = project_id.replace('`', "\\`"), + ) + }; + + let query_request = QueryRequest::new(&query_sql); + + let response = client + .job() + .query(project_id, query_request) + .await + .map_err(|e| DataFetchError::Query(format!("Discovery query failed: {}", e)))?; + + let rows = match &response.rows { + Some(rows) => rows, + None => return Ok(Vec::new()), + }; + + let mut tables: Vec = Vec::new(); + + for row in rows { + let cells = match &row.columns { + Some(cells) => cells, + None => continue, + }; + if cells.len() < 8 { + continue; + } + + let catalog = cell_to_string(&cells[0]); + let schema_name = cell_to_string(&cells[1]).unwrap_or_default(); + let table_name = cell_to_string(&cells[2]).unwrap_or_default(); + let table_type = cell_to_string(&cells[3]).unwrap_or_else(|| "BASE TABLE".to_string()); + let col_name = cell_to_string(&cells[4]).unwrap_or_default(); + let data_type_str = cell_to_string(&cells[5]).unwrap_or_else(|| "STRING".to_string()); + let is_nullable = cell_to_string(&cells[6]) + .map(|s| s.to_uppercase() == "YES") + .unwrap_or(true); + let ordinal = cell_to_string(&cells[7]) + .and_then(|s| s.parse::().ok()) + .unwrap_or(0); + + let column = ColumnMetadata { + name: col_name, + data_type: bigquery_type_to_arrow(&data_type_str), + nullable: is_nullable, + ordinal_position: ordinal, + }; + + if let Some(existing) = tables.iter_mut().find(|t| { + t.catalog_name.as_deref() == catalog.as_deref() + && t.schema_name == schema_name + && t.table_name == table_name + }) { + existing.columns.push(column); + } else { + tables.push(TableMetadata { + catalog_name: catalog, + schema_name, + table_name, + table_type, + columns: vec![column], + }); + } + } + + Ok(tables) +} + +/// Fetch table data and write to Parquet +pub async fn fetch_table( + source: &Source, + secrets: &SecretManager, + _catalog: Option<&str>, + schema: &str, + table: &str, + writer: &mut dyn BatchWriter, +) -> Result<(), DataFetchError> { + let client = build_client(source, secrets).await?; + + let project_id = match source { + Source::Bigquery { project_id, .. } => project_id.as_str(), + _ => { + return Err(DataFetchError::Connection( + "Expected BigQuery source".to_string(), + )) + } + }; + + // First, get the schema from INFORMATION_SCHEMA + let schema_sql = format!( + r#" + SELECT column_name, data_type, is_nullable + FROM `{project_id}`.`{dataset}`.INFORMATION_SCHEMA.COLUMNS + WHERE table_name = '{table}' + ORDER BY ordinal_position + "#, + project_id = project_id.replace('`', "\\`"), + dataset = schema.replace('`', "\\`"), + table = table.replace('\'', "\\'"), + ); + + let schema_request = QueryRequest::new(&schema_sql); + + let schema_response = client + .job() + .query(project_id, schema_request) + .await + .map_err(|e| DataFetchError::Query(format!("Schema query failed: {}", e)))?; + + let fields: Vec = match &schema_response.rows { + Some(rows) if !rows.is_empty() => rows + .iter() + .filter_map(|row| { + let cells = row.columns.as_ref()?; + if cells.len() < 3 { + return None; + } + let col_name = cell_to_string(&cells[0])?; + let data_type_str = cell_to_string(&cells[1]).unwrap_or_else(|| "STRING".to_string()); + let is_nullable = cell_to_string(&cells[2]) + .map(|s| s.to_uppercase() == "YES") + .unwrap_or(true); + Some(Field::new( + col_name, + bigquery_type_to_arrow(&data_type_str), + is_nullable, + )) + }) + .collect(), + _ => { + return Err(DataFetchError::Query(format!( + "Table {}.{} not found or has no columns", + schema, table + ))) + } + }; + + let arrow_schema = Schema::new(fields); + writer.init(&arrow_schema)?; + + // Now fetch the actual data + let data_sql = format!( + "SELECT * FROM `{}`.`{}`.`{}`", + project_id.replace('`', "\\`"), + schema.replace('`', "\\`"), + table.replace('`', "\\`"), + ); + + let data_request = QueryRequest::new(&data_sql); + + let data_response = client + .job() + .query(project_id, data_request) + .await + .map_err(|e| DataFetchError::Query(format!("Fetch query failed: {}", e)))?; + + match &data_response.rows { + Some(rows) if !rows.is_empty() => { + let batch = rows_to_batch(rows, &arrow_schema)?; + writer.write_batch(&batch)?; + } + _ => { + let empty_batch = RecordBatch::new_empty(Arc::new(arrow_schema)); + writer.write_batch(&empty_batch)?; + } + } + + Ok(()) +} + +/// Extract a string value from a BigQuery TableCell +fn cell_to_string(cell: &TableCell) -> Option { + match &cell.value { + Some(serde_json::Value::String(s)) => Some(s.clone()), + Some(serde_json::Value::Number(n)) => Some(n.to_string()), + Some(serde_json::Value::Bool(b)) => Some(b.to_string()), + Some(serde_json::Value::Null) | None => None, + Some(other) => Some(other.to_string()), + } +} + +/// Convert BigQuery INFORMATION_SCHEMA data_type string to Arrow DataType +pub fn bigquery_type_to_arrow(bq_type: &str) -> DataType { + let type_upper = bq_type.to_uppercase(); + let base_type = type_upper.split('<').next().unwrap_or(&type_upper).trim(); + let base_type = base_type.split('(').next().unwrap_or(base_type).trim(); + + match base_type { + "BOOL" | "BOOLEAN" => DataType::Boolean, + "INT64" | "INT" | "SMALLINT" | "INTEGER" | "BIGINT" | "TINYINT" | "BYTEINT" => { + DataType::Int64 + } + "FLOAT" | "FLOAT64" => DataType::Float64, + "NUMERIC" | "DECIMAL" => DataType::Utf8, + "BIGNUMERIC" | "BIGDECIMAL" => DataType::Utf8, + "STRING" => DataType::Utf8, + "BYTES" => DataType::Binary, + "DATE" => DataType::Date32, + "TIME" => DataType::Utf8, + "DATETIME" => DataType::Timestamp(TimeUnit::Microsecond, None), + "TIMESTAMP" => DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())), + "GEOGRAPHY" => DataType::Utf8, + "JSON" => DataType::Utf8, + "INTERVAL" => DataType::Utf8, + "STRUCT" | "RECORD" => DataType::Utf8, + "ARRAY" => DataType::Utf8, + "RANGE" => DataType::Utf8, + _ => DataType::Utf8, + } +} + +/// Build a RecordBatch from BigQuery rows +fn rows_to_batch( + rows: &[gcp_bigquery_client::model::table_row::TableRow], + schema: &Schema, +) -> Result { + let num_cols = schema.fields().len(); + let num_rows = rows.len(); + + let mut builders: Vec> = schema + .fields() + .iter() + .map(|f| make_builder(f.data_type(), num_rows)) + .collect(); + + for row in rows { + let cells = row.columns.as_ref().map(|c| c.as_slice()).unwrap_or(&[]); + for col_idx in 0..num_cols { + let cell = cells.get(col_idx); + let data_type = schema.field(col_idx).data_type(); + append_cell(&mut builders[col_idx], cell, data_type); + } + } + + let arrays: Vec> = + builders.iter_mut().map(|b| b.finish()).collect(); + + RecordBatch::try_new(Arc::new(schema.clone()), arrays) + .map_err(|e| DataFetchError::Query(e.to_string())) +} + +fn make_builder(data_type: &DataType, capacity: usize) -> Box { + match data_type { + DataType::Boolean => Box::new(BooleanBuilder::with_capacity(capacity)), + DataType::Int64 => Box::new(Int64Builder::with_capacity(capacity)), + DataType::Float64 => Box::new(Float64Builder::with_capacity(capacity)), + DataType::Utf8 => Box::new(StringBuilder::with_capacity(capacity, capacity * 32)), + DataType::Binary => Box::new(BinaryBuilder::with_capacity(capacity, capacity * 32)), + DataType::Date32 => Box::new(Date32Builder::with_capacity(capacity)), + DataType::Timestamp(_, tz) => { + let mut b = TimestampMicrosecondBuilder::with_capacity(capacity); + if let Some(tz) = tz { + b = b.with_timezone(tz.as_ref()); + } + Box::new(b) + } + _ => Box::new(StringBuilder::with_capacity(capacity, capacity * 32)), + } +} + +/// Append a BigQuery cell value to the appropriate Arrow builder. +/// BigQuery REST API returns all values as JSON strings, so we parse them. +fn append_cell( + builder: &mut Box, + cell: Option<&TableCell>, + data_type: &DataType, +) { + let value_str = cell + .and_then(|c| c.value.as_ref()) + .and_then(|v| match v { + serde_json::Value::String(s) => Some(s.as_str()), + serde_json::Value::Null => None, + _ => None, + }); + + match data_type { + DataType::Boolean => { + let b = builder + .as_any_mut() + .downcast_mut::() + .unwrap(); + match value_str { + Some(s) => b.append_value(s.eq_ignore_ascii_case("true")), + None => b.append_null(), + } + } + DataType::Int64 => { + let b = builder.as_any_mut().downcast_mut::().unwrap(); + match value_str.and_then(|s| s.parse::().ok()) { + Some(v) => b.append_value(v), + None => b.append_null(), + } + } + DataType::Float64 => { + let b = builder + .as_any_mut() + .downcast_mut::() + .unwrap(); + match value_str.and_then(|s| s.parse::().ok()) { + Some(v) => b.append_value(v), + None => b.append_null(), + } + } + DataType::Date32 => { + let b = builder + .as_any_mut() + .downcast_mut::() + .unwrap(); + match value_str.and_then(|s| chrono::NaiveDate::parse_from_str(s, "%Y-%m-%d").ok()) { + Some(date) => { + let epoch = chrono::NaiveDate::from_ymd_opt(1970, 1, 1).unwrap(); + let days = (date - epoch).num_days() as i32; + b.append_value(days); + } + None => b.append_null(), + } + } + DataType::Timestamp(_, tz) => { + let b = builder + .as_any_mut() + .downcast_mut::() + .unwrap(); + match value_str { + Some(s) => { + // BigQuery TIMESTAMP returns epoch seconds as float string + // BigQuery DATETIME returns ISO format string + if tz.is_some() { + // TIMESTAMP: value is epoch seconds (e.g. "1.609459200E9" or "1609459200.0") + if let Ok(epoch_secs) = s.parse::() { + let micros = (epoch_secs * 1_000_000.0) as i64; + b.append_value(micros); + } else { + warn!(value = s, "Failed to parse BigQuery TIMESTAMP value"); + b.append_null(); + } + } else { + // DATETIME: value is ISO-like "2021-01-01 12:00:00" or "2021-01-01T12:00:00" + let parsed = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S%.f") + .or_else(|_| chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f")) + .or_else(|_| chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S")) + .or_else(|_| chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S")); + match parsed { + Ok(dt) => b.append_value(dt.and_utc().timestamp_micros()), + Err(_) => { + warn!(value = s, "Failed to parse BigQuery DATETIME value"); + b.append_null(); + } + } + } + } + None => b.append_null(), + } + } + DataType::Binary => { + let b = builder + .as_any_mut() + .downcast_mut::() + .unwrap(); + match value_str { + // BigQuery BYTES are base64 encoded + Some(s) => match base64::Engine::decode(&base64::engine::general_purpose::STANDARD, s) { + Ok(bytes) => b.append_value(&bytes), + Err(_) => b.append_value(s.as_bytes()), + }, + None => b.append_null(), + } + } + _ => { + // Default: store as string (Utf8) + let b = builder + .as_any_mut() + .downcast_mut::() + .unwrap(); + match value_str { + Some(s) => b.append_value(s), + None => b.append_null(), + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use gcp_bigquery_client::model::field_type::FieldType; + use gcp_bigquery_client::model::table_field_schema::TableFieldSchema; + + /// Convert BigQuery FieldType enum to Arrow DataType + fn field_type_to_arrow(ft: &FieldType) -> DataType { + match ft { + FieldType::Bool | FieldType::Boolean => DataType::Boolean, + FieldType::Int64 | FieldType::Integer => DataType::Int64, + FieldType::Float64 | FieldType::Float => DataType::Float64, + FieldType::Numeric => DataType::Utf8, + FieldType::Bignumeric => DataType::Utf8, + FieldType::String => DataType::Utf8, + FieldType::Bytes => DataType::Binary, + FieldType::Date => DataType::Date32, + FieldType::Time => DataType::Utf8, + FieldType::Datetime => DataType::Timestamp(TimeUnit::Microsecond, None), + FieldType::Timestamp => DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())), + FieldType::Geography => DataType::Utf8, + FieldType::Json => DataType::Utf8, + FieldType::Interval => DataType::Utf8, + FieldType::Record | FieldType::Struct => DataType::Utf8, + } + } + + /// Build Arrow schema from BigQuery TableFieldSchema + fn schema_from_fields(fields: &[TableFieldSchema]) -> Schema { + let arrow_fields: Vec = fields + .iter() + .map(|f| { + let nullable = f + .mode + .as_ref() + .map(|m| m.to_uppercase() != "REQUIRED") + .unwrap_or(true); + Field::new(&f.name, field_type_to_arrow(&f.r#type), nullable) + }) + .collect(); + Schema::new(arrow_fields) + } + + // ========================================================================= + // Type mapping tests + // ========================================================================= + + #[test] + fn test_bigquery_type_boolean() { + assert!(matches!(bigquery_type_to_arrow("BOOL"), DataType::Boolean)); + assert!(matches!( + bigquery_type_to_arrow("BOOLEAN"), + DataType::Boolean + )); + assert!(matches!(bigquery_type_to_arrow("bool"), DataType::Boolean)); + } + + #[test] + fn test_bigquery_type_integer() { + assert!(matches!(bigquery_type_to_arrow("INT64"), DataType::Int64)); + assert!(matches!(bigquery_type_to_arrow("INT"), DataType::Int64)); + assert!(matches!(bigquery_type_to_arrow("INTEGER"), DataType::Int64)); + assert!(matches!(bigquery_type_to_arrow("BIGINT"), DataType::Int64)); + assert!(matches!( + bigquery_type_to_arrow("SMALLINT"), + DataType::Int64 + )); + assert!(matches!( + bigquery_type_to_arrow("TINYINT"), + DataType::Int64 + )); + assert!(matches!( + bigquery_type_to_arrow("BYTEINT"), + DataType::Int64 + )); + } + + #[test] + fn test_bigquery_type_float() { + assert!(matches!( + bigquery_type_to_arrow("FLOAT64"), + DataType::Float64 + )); + assert!(matches!(bigquery_type_to_arrow("FLOAT"), DataType::Float64)); + } + + #[test] + fn test_bigquery_type_numeric() { + assert!(matches!(bigquery_type_to_arrow("NUMERIC"), DataType::Utf8)); + assert!(matches!(bigquery_type_to_arrow("DECIMAL"), DataType::Utf8)); + assert!(matches!( + bigquery_type_to_arrow("BIGNUMERIC"), + DataType::Utf8 + )); + assert!(matches!( + bigquery_type_to_arrow("BIGDECIMAL"), + DataType::Utf8 + )); + } + + #[test] + fn test_bigquery_type_string() { + assert!(matches!(bigquery_type_to_arrow("STRING"), DataType::Utf8)); + assert!(matches!(bigquery_type_to_arrow("string"), DataType::Utf8)); + } + + #[test] + fn test_bigquery_type_bytes() { + assert!(matches!(bigquery_type_to_arrow("BYTES"), DataType::Binary)); + } + + #[test] + fn test_bigquery_type_date() { + assert!(matches!(bigquery_type_to_arrow("DATE"), DataType::Date32)); + } + + #[test] + fn test_bigquery_type_time() { + assert!(matches!(bigquery_type_to_arrow("TIME"), DataType::Utf8)); + } + + #[test] + fn test_bigquery_type_datetime() { + match bigquery_type_to_arrow("DATETIME") { + DataType::Timestamp(unit, tz) => { + assert!(matches!(unit, TimeUnit::Microsecond)); + assert!(tz.is_none()); + } + other => panic!("Expected Timestamp without tz, got {:?}", other), + } + } + + #[test] + fn test_bigquery_type_timestamp() { + match bigquery_type_to_arrow("TIMESTAMP") { + DataType::Timestamp(unit, tz) => { + assert!(matches!(unit, TimeUnit::Microsecond)); + assert_eq!(tz.as_deref(), Some("UTC")); + } + other => panic!("Expected Timestamp with UTC, got {:?}", other), + } + } + + #[test] + fn test_bigquery_type_geography() { + assert!(matches!( + bigquery_type_to_arrow("GEOGRAPHY"), + DataType::Utf8 + )); + } + + #[test] + fn test_bigquery_type_json() { + assert!(matches!(bigquery_type_to_arrow("JSON"), DataType::Utf8)); + } + + #[test] + fn test_bigquery_type_struct_and_record() { + assert!(matches!(bigquery_type_to_arrow("STRUCT"), DataType::Utf8)); + assert!(matches!(bigquery_type_to_arrow("RECORD"), DataType::Utf8)); + } + + #[test] + fn test_bigquery_type_array() { + assert!(matches!(bigquery_type_to_arrow("ARRAY"), DataType::Utf8)); + // Parameterized ARRAY + assert!(matches!( + bigquery_type_to_arrow("ARRAY"), + DataType::Utf8 + )); + } + + #[test] + fn test_bigquery_type_parameterized() { + // NUMERIC(10,2) should still map to Utf8 (base type extraction) + assert!(matches!( + bigquery_type_to_arrow("NUMERIC(10,2)"), + DataType::Utf8 + )); + assert!(matches!( + bigquery_type_to_arrow("STRING(100)"), + DataType::Utf8 + )); + } + + #[test] + fn test_bigquery_type_case_insensitive() { + assert!(matches!(bigquery_type_to_arrow("int64"), DataType::Int64)); + assert!(matches!( + bigquery_type_to_arrow("Float64"), + DataType::Float64 + )); + assert!(matches!( + bigquery_type_to_arrow("Timestamp"), + DataType::Timestamp(_, _) + )); + } + + #[test] + fn test_bigquery_type_unknown_fallback() { + assert!(matches!( + bigquery_type_to_arrow("UNKNOWN_TYPE"), + DataType::Utf8 + )); + } + + // ========================================================================= + // FieldType enum mapping tests + // ========================================================================= + + #[test] + fn test_field_type_to_arrow_boolean() { + assert!(matches!( + field_type_to_arrow(&FieldType::Bool), + DataType::Boolean + )); + assert!(matches!( + field_type_to_arrow(&FieldType::Boolean), + DataType::Boolean + )); + } + + #[test] + fn test_field_type_to_arrow_integer() { + assert!(matches!( + field_type_to_arrow(&FieldType::Int64), + DataType::Int64 + )); + assert!(matches!( + field_type_to_arrow(&FieldType::Integer), + DataType::Int64 + )); + } + + #[test] + fn test_field_type_to_arrow_float() { + assert!(matches!( + field_type_to_arrow(&FieldType::Float64), + DataType::Float64 + )); + assert!(matches!( + field_type_to_arrow(&FieldType::Float), + DataType::Float64 + )); + } + + #[test] + fn test_field_type_to_arrow_string() { + assert!(matches!( + field_type_to_arrow(&FieldType::String), + DataType::Utf8 + )); + } + + #[test] + fn test_field_type_to_arrow_timestamp() { + match field_type_to_arrow(&FieldType::Timestamp) { + DataType::Timestamp(unit, tz) => { + assert!(matches!(unit, TimeUnit::Microsecond)); + assert_eq!(tz.as_deref(), Some("UTC")); + } + other => panic!("Expected Timestamp with UTC, got {:?}", other), + } + } + + #[test] + fn test_field_type_to_arrow_datetime() { + match field_type_to_arrow(&FieldType::Datetime) { + DataType::Timestamp(unit, tz) => { + assert!(matches!(unit, TimeUnit::Microsecond)); + assert!(tz.is_none()); + } + other => panic!("Expected Timestamp without tz, got {:?}", other), + } + } + + // ========================================================================= + // Cell extraction tests + // ========================================================================= + + #[test] + fn test_cell_to_string_values() { + let string_cell = TableCell { + value: Some(serde_json::Value::String("hello".to_string())), + }; + assert_eq!(cell_to_string(&string_cell), Some("hello".to_string())); + + let number_cell = TableCell { + value: Some(serde_json::Value::Number(serde_json::Number::from(42))), + }; + assert_eq!(cell_to_string(&number_cell), Some("42".to_string())); + + let bool_cell = TableCell { + value: Some(serde_json::Value::Bool(true)), + }; + assert_eq!(cell_to_string(&bool_cell), Some("true".to_string())); + + let null_cell = TableCell { + value: Some(serde_json::Value::Null), + }; + assert_eq!(cell_to_string(&null_cell), None); + + let none_cell = TableCell { value: None }; + assert_eq!(cell_to_string(&none_cell), None); + } + + // ========================================================================= + // Schema building tests + // ========================================================================= + + #[test] + fn test_schema_from_fields() { + let fields = vec![ + TableFieldSchema::string("name"), + TableFieldSchema::integer("age"), + TableFieldSchema::bool("active"), + ]; + + let schema = schema_from_fields(&fields); + assert_eq!(schema.fields().len(), 3); + assert_eq!(schema.field(0).name(), "name"); + assert_eq!(*schema.field(0).data_type(), DataType::Utf8); + assert_eq!(schema.field(1).name(), "age"); + assert_eq!(*schema.field(1).data_type(), DataType::Int64); + assert_eq!(schema.field(2).name(), "active"); + assert_eq!(*schema.field(2).data_type(), DataType::Boolean); + } + + // ========================================================================= + // Row conversion tests + // ========================================================================= + + #[test] + fn test_rows_to_batch_basic() { + use gcp_bigquery_client::model::table_row::TableRow; + + let schema = Schema::new(vec![ + Field::new("name", DataType::Utf8, true), + Field::new("age", DataType::Int64, true), + ]); + + let rows = vec![ + TableRow { + columns: Some(vec![ + TableCell { + value: Some(serde_json::Value::String("Alice".to_string())), + }, + TableCell { + value: Some(serde_json::Value::String("30".to_string())), + }, + ]), + }, + TableRow { + columns: Some(vec![ + TableCell { + value: Some(serde_json::Value::String("Bob".to_string())), + }, + TableCell { + value: Some(serde_json::Value::String("25".to_string())), + }, + ]), + }, + ]; + + let batch = rows_to_batch(&rows, &schema).unwrap(); + assert_eq!(batch.num_rows(), 2); + assert_eq!(batch.num_columns(), 2); + + let name_col = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(name_col.value(0), "Alice"); + assert_eq!(name_col.value(1), "Bob"); + + let age_col = batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(age_col.value(0), 30); + assert_eq!(age_col.value(1), 25); + } + + #[test] + fn test_rows_to_batch_with_nulls() { + use gcp_bigquery_client::model::table_row::TableRow; + + let schema = Schema::new(vec![ + Field::new("value", DataType::Utf8, true), + Field::new("count", DataType::Int64, true), + ]); + + let rows = vec![TableRow { + columns: Some(vec![ + TableCell { + value: Some(serde_json::Value::Null), + }, + TableCell { value: None }, + ]), + }]; + + let batch = rows_to_batch(&rows, &schema).unwrap(); + assert_eq!(batch.num_rows(), 1); + assert!(batch.column(0).is_null(0)); + assert!(batch.column(1).is_null(0)); + } + + #[test] + fn test_rows_to_batch_boolean() { + use gcp_bigquery_client::model::table_row::TableRow; + + let schema = Schema::new(vec![Field::new("flag", DataType::Boolean, true)]); + + let rows = vec![ + TableRow { + columns: Some(vec![TableCell { + value: Some(serde_json::Value::String("true".to_string())), + }]), + }, + TableRow { + columns: Some(vec![TableCell { + value: Some(serde_json::Value::String("false".to_string())), + }]), + }, + ]; + + let batch = rows_to_batch(&rows, &schema).unwrap(); + let col = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert!(col.value(0)); + assert!(!col.value(1)); + } + + #[test] + fn test_rows_to_batch_date() { + use gcp_bigquery_client::model::table_row::TableRow; + + let schema = Schema::new(vec![Field::new("d", DataType::Date32, true)]); + + let rows = vec![TableRow { + columns: Some(vec![TableCell { + value: Some(serde_json::Value::String("2021-06-15".to_string())), + }]), + }]; + + let batch = rows_to_batch(&rows, &schema).unwrap(); + let col = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + // 2021-06-15 is 18793 days since epoch + let epoch = chrono::NaiveDate::from_ymd_opt(1970, 1, 1).unwrap(); + let expected = chrono::NaiveDate::from_ymd_opt(2021, 6, 15).unwrap(); + let expected_days = (expected - epoch).num_days() as i32; + assert_eq!(col.value(0), expected_days); + } + + #[test] + fn test_rows_to_batch_timestamp() { + use gcp_bigquery_client::model::table_row::TableRow; + + let schema = Schema::new(vec![Field::new( + "ts", + DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())), + true, + )]); + + // BigQuery TIMESTAMP returns epoch seconds as string + let rows = vec![TableRow { + columns: Some(vec![TableCell { + value: Some(serde_json::Value::String("1609459200.0".to_string())), + }]), + }]; + + let batch = rows_to_batch(&rows, &schema).unwrap(); + let col = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + // 2021-01-01 00:00:00 UTC = 1609459200 seconds = 1609459200000000 microseconds + assert_eq!(col.value(0), 1609459200000000); + } + + #[test] + fn test_rows_to_batch_float() { + use gcp_bigquery_client::model::table_row::TableRow; + + let schema = Schema::new(vec![Field::new("f", DataType::Float64, true)]); + + let rows = vec![TableRow { + columns: Some(vec![TableCell { + value: Some(serde_json::Value::String("3.14".to_string())), + }]), + }]; + + let batch = rows_to_batch(&rows, &schema).unwrap(); + let col = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert!((col.value(0) - 3.14).abs() < f64::EPSILON); + } +} diff --git a/src/datafetch/native/mod.rs b/src/datafetch/native/mod.rs index db8b8c6..2d59a1f 100644 --- a/src/datafetch/native/mod.rs +++ b/src/datafetch/native/mod.rs @@ -1,5 +1,6 @@ // Backend modules - public for type mapping function access in tests // The type mapping functions are the authoritative implementations that tests validate against. +pub mod bigquery; pub mod duckdb; pub mod iceberg; pub mod mysql; @@ -178,6 +179,7 @@ impl DataFetcher for NativeFetcher { Source::Iceberg { .. } => iceberg::discover_tables(source, secrets).await, Source::Mysql { .. } => mysql::discover_tables(source, secrets).await, Source::Snowflake { .. } => snowflake::discover_tables(source, secrets).await, + Source::Bigquery { .. } => bigquery::discover_tables(source, secrets).await, }?; tracing::Span::current().record("runtimedb.tables_found", tables.len()); Ok(tables) @@ -217,6 +219,9 @@ impl DataFetcher for NativeFetcher { Source::Snowflake { .. } => { snowflake::fetch_table(source, secrets, catalog, schema, table, writer).await } + Source::Bigquery { .. } => { + bigquery::fetch_table(source, secrets, catalog, schema, table, writer).await + } } } } diff --git a/src/source.rs b/src/source.rs index d885991..9120503 100644 --- a/src/source.rs +++ b/src/source.rs @@ -124,6 +124,13 @@ pub enum Source { #[serde(default)] credential: Credential, }, + Bigquery { + project_id: String, + #[serde(skip_serializing_if = "Option::is_none")] + dataset: Option, + #[serde(default)] + credential: Credential, + }, } impl Source { @@ -136,6 +143,7 @@ impl Source { Source::Duckdb { .. } => "duckdb", Source::Iceberg { .. } => "iceberg", Source::Mysql { .. } => "mysql", + Source::Bigquery { .. } => "bigquery", } } @@ -160,6 +168,7 @@ impl Source { IcebergCatalogType::Glue { credential, .. } => credential, }, Source::Mysql { credential, .. } => credential, + Source::Bigquery { credential, .. } => credential, } } @@ -239,6 +248,15 @@ impl Source { database, credential, }, + Source::Bigquery { + project_id, + dataset, + .. + } => Source::Bigquery { + project_id, + dataset, + credential, + }, } } } @@ -694,4 +712,71 @@ mod tests { let cred = Credential::None; assert_eq!(cred.secret_id(), None); } + + #[test] + fn test_bigquery_serialization() { + let source = Source::Bigquery { + project_id: "my-gcp-project".to_string(), + dataset: Some("my_dataset".to_string()), + credential: Credential::SecretRef { + id: "secr_bq".to_string(), + }, + }; + + let json = serde_json::to_string(&source).unwrap(); + assert!(json.contains(r#""type":"bigquery""#)); + assert!(json.contains(r#""project_id":"my-gcp-project""#)); + assert!(json.contains(r#""dataset":"my_dataset""#)); + + let parsed: Source = serde_json::from_str(&json).unwrap(); + assert_eq!(source, parsed); + } + + #[test] + fn test_bigquery_without_dataset() { + let source = Source::Bigquery { + project_id: "my-gcp-project".to_string(), + dataset: None, + credential: Credential::None, + }; + + let json = serde_json::to_string(&source).unwrap(); + assert!(!json.contains(r#""dataset""#)); + + let parsed: Source = serde_json::from_str(&json).unwrap(); + assert_eq!(source, parsed); + } + + #[test] + fn test_bigquery_source_type() { + let bq = Source::Bigquery { + project_id: "proj".to_string(), + dataset: None, + credential: Credential::None, + }; + assert_eq!(bq.source_type(), "bigquery"); + } + + #[test] + fn test_bigquery_credential_accessor() { + let with_secret = Source::Bigquery { + project_id: "proj".to_string(), + dataset: None, + credential: Credential::SecretRef { + id: "secr_bq".to_string(), + }, + }; + assert!(matches!( + with_secret.credential(), + Credential::SecretRef { id } if id == "secr_bq" + )); + assert_eq!(with_secret.secret_id(), Some("secr_bq")); + + let without_cred = Source::Bigquery { + project_id: "proj".to_string(), + dataset: None, + credential: Credential::None, + }; + assert!(matches!(without_cred.credential(), Credential::None)); + } } From e5829873d8570434566dd3980c0913d7c45dd574 Mon Sep 17 00:00:00 2001 From: Anoop Narang Date: Tue, 10 Feb 2026 16:05:56 +0530 Subject: [PATCH 2/9] feat(bigquery): add configurable region for cross-dataset discovery Replace hardcoded "region-us" with a configurable region field on the BigQuery source config, defaulting to "us" when omitted. --- src/datafetch/native/bigquery.rs | 10 +++++--- src/source.rs | 42 ++++++++++++++++++++++++++++++++ 2 files changed, 48 insertions(+), 4 deletions(-) diff --git a/src/datafetch/native/bigquery.rs b/src/datafetch/native/bigquery.rs index 0787a74..d664fc0 100644 --- a/src/datafetch/native/bigquery.rs +++ b/src/datafetch/native/bigquery.rs @@ -53,12 +53,13 @@ pub async fn discover_tables( ) -> Result, DataFetchError> { let client = build_client(source, secrets).await?; - let (project_id, dataset_filter) = match source { + let (project_id, dataset_filter, region) = match source { Source::Bigquery { project_id, dataset, + region, .. - } => (project_id.as_str(), dataset.as_deref()), + } => (project_id.as_str(), dataset.as_deref(), region.as_str()), _ => { return Err(DataFetchError::Connection( "Expected BigQuery source".to_string(), @@ -102,8 +103,8 @@ pub async fn discover_tables( c.data_type, c.is_nullable, c.ordinal_position - FROM `{project_id}`.`region-us`.INFORMATION_SCHEMA.COLUMNS c - JOIN `{project_id}`.`region-us`.INFORMATION_SCHEMA.TABLES t + FROM `{project_id}`.`region-{region}`.INFORMATION_SCHEMA.COLUMNS c + JOIN `{project_id}`.`region-{region}`.INFORMATION_SCHEMA.TABLES t ON c.table_catalog = t.table_catalog AND c.table_schema = t.table_schema AND c.table_name = t.table_name @@ -111,6 +112,7 @@ pub async fn discover_tables( ORDER BY c.table_schema, c.table_name, c.ordinal_position "#, project_id = project_id.replace('`', "\\`"), + region = region.replace('`', "\\`"), ) }; diff --git a/src/source.rs b/src/source.rs index 9120503..9181c19 100644 --- a/src/source.rs +++ b/src/source.rs @@ -72,6 +72,10 @@ impl Credential { } } +fn default_bigquery_region() -> String { + "us".to_string() +} + /// Represents a data source connection with typed configuration. /// The `type` field is used as the JSON discriminator via serde's tag attribute. /// @@ -128,6 +132,10 @@ pub enum Source { project_id: String, #[serde(skip_serializing_if = "Option::is_none")] dataset: Option, + /// Region for cross-dataset discovery (e.g., "us", "eu", "us-central1"). + /// Only used when `dataset` is not specified. Defaults to "us". + #[serde(default = "default_bigquery_region")] + region: String, #[serde(default)] credential: Credential, }, @@ -251,10 +259,12 @@ impl Source { Source::Bigquery { project_id, dataset, + region, .. } => Source::Bigquery { project_id, dataset, + region, credential, }, } @@ -718,6 +728,7 @@ mod tests { let source = Source::Bigquery { project_id: "my-gcp-project".to_string(), dataset: Some("my_dataset".to_string()), + region: "us".to_string(), credential: Credential::SecretRef { id: "secr_bq".to_string(), }, @@ -727,6 +738,7 @@ mod tests { assert!(json.contains(r#""type":"bigquery""#)); assert!(json.contains(r#""project_id":"my-gcp-project""#)); assert!(json.contains(r#""dataset":"my_dataset""#)); + assert!(json.contains(r#""region":"us""#)); let parsed: Source = serde_json::from_str(&json).unwrap(); assert_eq!(source, parsed); @@ -737,6 +749,7 @@ mod tests { let source = Source::Bigquery { project_id: "my-gcp-project".to_string(), dataset: None, + region: "us".to_string(), credential: Credential::None, }; @@ -747,11 +760,38 @@ mod tests { assert_eq!(source, parsed); } + #[test] + fn test_bigquery_with_region() { + let source = Source::Bigquery { + project_id: "my-gcp-project".to_string(), + dataset: None, + region: "europe-west1".to_string(), + credential: Credential::None, + }; + + let json = serde_json::to_string(&source).unwrap(); + assert!(json.contains(r#""region":"europe-west1""#)); + + let parsed: Source = serde_json::from_str(&json).unwrap(); + assert_eq!(source, parsed); + } + + #[test] + fn test_bigquery_region_defaults_to_us() { + let json = r#"{"type":"bigquery","project_id":"proj"}"#; + let parsed: Source = serde_json::from_str(json).unwrap(); + match parsed { + Source::Bigquery { region, .. } => assert_eq!(region, "us"), + _ => panic!("Expected Bigquery variant"), + } + } + #[test] fn test_bigquery_source_type() { let bq = Source::Bigquery { project_id: "proj".to_string(), dataset: None, + region: "us".to_string(), credential: Credential::None, }; assert_eq!(bq.source_type(), "bigquery"); @@ -762,6 +802,7 @@ mod tests { let with_secret = Source::Bigquery { project_id: "proj".to_string(), dataset: None, + region: "us".to_string(), credential: Credential::SecretRef { id: "secr_bq".to_string(), }, @@ -775,6 +816,7 @@ mod tests { let without_cred = Source::Bigquery { project_id: "proj".to_string(), dataset: None, + region: "us".to_string(), credential: Credential::None, }; assert!(matches!(without_cred.credential(), Credential::None)); From 0f3be03ca71b5f547b82b024f0ac42e55b679b5e Mon Sep 17 00:00:00 2001 From: Anoop Narang Date: Tue, 10 Feb 2026 17:22:52 +0530 Subject: [PATCH 3/9] fix(bigquery): add pagination and batched writes for data fetching Paginate through all result pages via get_query_results instead of only reading the first page. Buffer rows and flush in 10k-row batches to bound memory usage, matching the postgres/mysql fetcher pattern. --- src/datafetch/native/bigquery.rs | 73 ++++++++++++++++++++++++++------ 1 file changed, 60 insertions(+), 13 deletions(-) diff --git a/src/datafetch/native/bigquery.rs b/src/datafetch/native/bigquery.rs index d664fc0..f06db29 100644 --- a/src/datafetch/native/bigquery.rs +++ b/src/datafetch/native/bigquery.rs @@ -6,8 +6,10 @@ use datafusion::arrow::array::{ }; use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit}; use datafusion::arrow::record_batch::RecordBatch; +use gcp_bigquery_client::model::get_query_results_parameters::GetQueryResultsParameters; use gcp_bigquery_client::model::query_request::QueryRequest; use gcp_bigquery_client::model::table_cell::TableCell; +use gcp_bigquery_client::model::table_row::TableRow; use gcp_bigquery_client::Client; use std::sync::Arc; use tracing::warn; @@ -180,6 +182,8 @@ pub async fn discover_tables( Ok(tables) } +const BATCH_SIZE: usize = 10_000; + /// Fetch table data and write to Parquet pub async fn fetch_table( source: &Source, @@ -268,15 +272,64 @@ pub async fn fetch_table( .await .map_err(|e| DataFetchError::Query(format!("Fetch query failed: {}", e)))?; - match &data_response.rows { - Some(rows) if !rows.is_empty() => { - let batch = rows_to_batch(rows, &arrow_schema)?; - writer.write_batch(&batch)?; + let job_id = data_response + .job_reference + .as_ref() + .and_then(|jr| jr.job_id.clone()) + .ok_or_else(|| DataFetchError::Query("No job_id in query response".to_string()))?; + + // Process first page + paginate through remaining results + let mut row_buffer: Vec = Vec::with_capacity(BATCH_SIZE); + let mut page_token = data_response.page_token.clone(); + + // Buffer rows from the first page + if let Some(rows) = data_response.rows { + for row in rows { + row_buffer.push(row); + if row_buffer.len() >= BATCH_SIZE { + let batch = rows_to_batch(&row_buffer, &arrow_schema)?; + writer.write_batch(&batch)?; + row_buffer.clear(); + } } - _ => { - let empty_batch = RecordBatch::new_empty(Arc::new(arrow_schema)); - writer.write_batch(&empty_batch)?; + } + + // Fetch subsequent pages + while let Some(token) = page_token { + let result = client + .job() + .get_query_results( + project_id, + &job_id, + GetQueryResultsParameters { + page_token: Some(token), + ..Default::default() + }, + ) + .await + .map_err(|e| DataFetchError::Query(format!("Pagination query failed: {}", e)))?; + + if let Some(rows) = result.rows { + for row in rows { + row_buffer.push(row); + if row_buffer.len() >= BATCH_SIZE { + let batch = rows_to_batch(&row_buffer, &arrow_schema)?; + writer.write_batch(&batch)?; + row_buffer.clear(); + } + } } + + page_token = result.page_token; + } + + // Flush remaining rows + if row_buffer.is_empty() { + let empty_batch = RecordBatch::new_empty(Arc::new(arrow_schema)); + writer.write_batch(&empty_batch)?; + } else { + let batch = rows_to_batch(&row_buffer, &arrow_schema)?; + writer.write_batch(&batch)?; } Ok(()) @@ -827,7 +880,6 @@ mod tests { #[test] fn test_rows_to_batch_basic() { - use gcp_bigquery_client::model::table_row::TableRow; let schema = Schema::new(vec![ Field::new("name", DataType::Utf8, true), @@ -880,7 +932,6 @@ mod tests { #[test] fn test_rows_to_batch_with_nulls() { - use gcp_bigquery_client::model::table_row::TableRow; let schema = Schema::new(vec![ Field::new("value", DataType::Utf8, true), @@ -904,7 +955,6 @@ mod tests { #[test] fn test_rows_to_batch_boolean() { - use gcp_bigquery_client::model::table_row::TableRow; let schema = Schema::new(vec![Field::new("flag", DataType::Boolean, true)]); @@ -933,7 +983,6 @@ mod tests { #[test] fn test_rows_to_batch_date() { - use gcp_bigquery_client::model::table_row::TableRow; let schema = Schema::new(vec![Field::new("d", DataType::Date32, true)]); @@ -958,7 +1007,6 @@ mod tests { #[test] fn test_rows_to_batch_timestamp() { - use gcp_bigquery_client::model::table_row::TableRow; let schema = Schema::new(vec![Field::new( "ts", @@ -985,7 +1033,6 @@ mod tests { #[test] fn test_rows_to_batch_float() { - use gcp_bigquery_client::model::table_row::TableRow; let schema = Schema::new(vec![Field::new("f", DataType::Float64, true)]); From fe4a22feb43deb38a6e509ff4de81bf236d1d963 Mon Sep 17 00:00:00 2001 From: Anoop Narang Date: Tue, 10 Feb 2026 17:38:24 +0530 Subject: [PATCH 4/9] refactor(bigquery): move job_id instead of cloning --- src/datafetch/native/bigquery.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/datafetch/native/bigquery.rs b/src/datafetch/native/bigquery.rs index f06db29..423c9bf 100644 --- a/src/datafetch/native/bigquery.rs +++ b/src/datafetch/native/bigquery.rs @@ -274,8 +274,7 @@ pub async fn fetch_table( let job_id = data_response .job_reference - .as_ref() - .and_then(|jr| jr.job_id.clone()) + .and_then(|jr| jr.job_id) .ok_or_else(|| DataFetchError::Query("No job_id in query response".to_string()))?; // Process first page + paginate through remaining results From 642e3c9e458c9a5aa7a3ee8726dd5c0920e56e7d Mon Sep 17 00:00:00 2001 From: Anoop Narang Date: Tue, 10 Feb 2026 17:42:47 +0530 Subject: [PATCH 5/9] refactor(bigquery): move page_token instead of cloning --- src/datafetch/native/bigquery.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/datafetch/native/bigquery.rs b/src/datafetch/native/bigquery.rs index 423c9bf..bcf3c1f 100644 --- a/src/datafetch/native/bigquery.rs +++ b/src/datafetch/native/bigquery.rs @@ -279,7 +279,7 @@ pub async fn fetch_table( // Process first page + paginate through remaining results let mut row_buffer: Vec = Vec::with_capacity(BATCH_SIZE); - let mut page_token = data_response.page_token.clone(); + let mut page_token = data_response.page_token; // Buffer rows from the first page if let Some(rows) = data_response.rows { From dc1aad1c97d2ef84fac08b04547cd421eba474d5 Mon Sep 17 00:00:00 2001 From: Anoop Narang Date: Tue, 10 Feb 2026 19:49:59 +0530 Subject: [PATCH 6/9] fix(bigquery): serialize non-string JSON values instead of dropping as null BigQuery returns ARRAY, STRUCT, and JSON columns as non-string JSON values (arrays, objects). Previously these were silently dropped as nulls, causing Arrow validation errors on non-nullable columns. Now they are serialized to their JSON string representation. --- src/datafetch/native/bigquery.rs | 109 ++++++++++++++++++++++++++++++- 1 file changed, 108 insertions(+), 1 deletion(-) diff --git a/src/datafetch/native/bigquery.rs b/src/datafetch/native/bigquery.rs index bcf3c1f..4bdf4f6 100644 --- a/src/datafetch/native/bigquery.rs +++ b/src/datafetch/native/bigquery.rs @@ -533,13 +533,18 @@ fn append_cell( } _ => { // Default: store as string (Utf8) + // For complex types (ARRAY, STRUCT, JSON), BigQuery may return non-string + // JSON values. Fall back to serializing the raw JSON when value_str is None. let b = builder .as_any_mut() .downcast_mut::() .unwrap(); match value_str { Some(s) => b.append_value(s), - None => b.append_null(), + None => match cell.and_then(|c| c.value.as_ref()) { + Some(serde_json::Value::Null) | None => b.append_null(), + Some(v) => b.append_value(v.to_string()), + }, } } } @@ -548,6 +553,7 @@ fn append_cell( #[cfg(test)] mod tests { use super::*; + use datafusion::arrow::array::Array; use gcp_bigquery_client::model::field_type::FieldType; use gcp_bigquery_client::model::table_field_schema::TableFieldSchema; @@ -1049,4 +1055,105 @@ mod tests { .unwrap(); assert!((col.value(0) - 3.14).abs() < f64::EPSILON); } + + #[test] + fn test_rows_to_batch_complex_json_values_with_nulls() { + // BigQuery returns ARRAY, STRUCT, and JSON columns as non-string JSON values. + // These map to Utf8 and must be serialized rather than dropped as null. + // Test multiple rows with nulls interspersed to cover all branches. + let schema = Schema::new(vec![ + Field::new("tags", DataType::Utf8, true), + Field::new("metadata", DataType::Utf8, true), + Field::new("count", DataType::Utf8, true), + ]); + + let rows = vec![ + // Row 0: all non-string JSON values (array, object, number) + TableRow { + columns: Some(vec![ + TableCell { + value: Some(serde_json::json!(["tag_a", "tag_b"])), + }, + TableCell { + value: Some(serde_json::json!({"level": 1, "tier": "bronze"})), + }, + TableCell { + value: Some(serde_json::json!(42)), + }, + ]), + }, + // Row 1: all nulls (JSON null, None, JSON null) + TableRow { + columns: Some(vec![ + TableCell { + value: Some(serde_json::Value::Null), + }, + TableCell { value: None }, + TableCell { + value: Some(serde_json::Value::Null), + }, + ]), + }, + // Row 2: mix of values and nulls + TableRow { + columns: Some(vec![ + TableCell { + value: Some(serde_json::json!(["tag_c"])), + }, + TableCell { value: None }, + TableCell { + value: Some(serde_json::Value::String("plain_string".to_string())), + }, + ]), + }, + // Row 3: boolean JSON value (non-string primitive) + TableRow { + columns: Some(vec![ + TableCell { value: None }, + TableCell { + value: Some(serde_json::json!({"nested": [1, 2, 3]})), + }, + TableCell { + value: Some(serde_json::json!(true)), + }, + ]), + }, + ]; + + let batch = rows_to_batch(&rows, &schema).unwrap(); + assert_eq!(batch.num_rows(), 4); + + let tags = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(tags.value(0), r#"["tag_a","tag_b"]"#); + assert!(tags.is_null(1)); + assert_eq!(tags.value(2), r#"["tag_c"]"#); + assert!(tags.is_null(3)); + + let meta = batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + let meta0: serde_json::Value = serde_json::from_str(meta.value(0)).unwrap(); + assert_eq!(meta0["level"], 1); + assert_eq!(meta0["tier"], "bronze"); + assert!(meta.is_null(1)); + assert!(meta.is_null(2)); + let meta3: serde_json::Value = serde_json::from_str(meta.value(3)).unwrap(); + assert_eq!(meta3["nested"], serde_json::json!([1, 2, 3])); + + let count = batch + .column(2) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(count.value(0), "42"); + assert!(count.is_null(1)); + assert_eq!(count.value(2), "plain_string"); + assert_eq!(count.value(3), "true"); + } } From cc6987a56f95d8065471dd07caa904569a82e4e3 Mon Sep 17 00:00:00 2001 From: Anoop Narang Date: Tue, 10 Feb 2026 19:52:52 +0530 Subject: [PATCH 7/9] style(bigquery): fix clippy and fmt warnings --- src/datafetch/native/bigquery.rs | 68 ++++++++++++++------------------ 1 file changed, 30 insertions(+), 38 deletions(-) diff --git a/src/datafetch/native/bigquery.rs b/src/datafetch/native/bigquery.rs index 4bdf4f6..f487ea1 100644 --- a/src/datafetch/native/bigquery.rs +++ b/src/datafetch/native/bigquery.rs @@ -21,10 +21,7 @@ use crate::source::Source; /// Build a BigQuery client from source credentials. /// The credential is expected to be a GCP service account key JSON string. -async fn build_client( - source: &Source, - secrets: &SecretManager, -) -> Result { +async fn build_client(source: &Source, secrets: &SecretManager) -> Result { let credential = match source { Source::Bigquery { credential, .. } => credential, _ => { @@ -234,7 +231,8 @@ pub async fn fetch_table( return None; } let col_name = cell_to_string(&cells[0])?; - let data_type_str = cell_to_string(&cells[1]).unwrap_or_else(|| "STRING".to_string()); + let data_type_str = + cell_to_string(&cells[1]).unwrap_or_else(|| "STRING".to_string()); let is_nullable = cell_to_string(&cells[2]) .map(|s| s.to_uppercase() == "YES") .unwrap_or(true); @@ -380,7 +378,6 @@ fn rows_to_batch( rows: &[gcp_bigquery_client::model::table_row::TableRow], schema: &Schema, ) -> Result { - let num_cols = schema.fields().len(); let num_rows = rows.len(); let mut builders: Vec> = schema @@ -390,11 +387,11 @@ fn rows_to_batch( .collect(); for row in rows { - let cells = row.columns.as_ref().map(|c| c.as_slice()).unwrap_or(&[]); - for col_idx in 0..num_cols { + let cells = row.columns.as_deref().unwrap_or(&[]); + for (col_idx, builder) in builders.iter_mut().enumerate() { let cell = cells.get(col_idx); let data_type = schema.field(col_idx).data_type(); - append_cell(&mut builders[col_idx], cell, data_type); + append_cell(builder, cell, data_type); } } @@ -431,13 +428,11 @@ fn append_cell( cell: Option<&TableCell>, data_type: &DataType, ) { - let value_str = cell - .and_then(|c| c.value.as_ref()) - .and_then(|v| match v { - serde_json::Value::String(s) => Some(s.as_str()), - serde_json::Value::Null => None, - _ => None, - }); + let value_str = cell.and_then(|c| c.value.as_ref()).and_then(|v| match v { + serde_json::Value::String(s) => Some(s.as_str()), + serde_json::Value::Null => None, + _ => None, + }); match data_type { DataType::Boolean => { @@ -501,10 +496,17 @@ fn append_cell( } } else { // DATETIME: value is ISO-like "2021-01-01 12:00:00" or "2021-01-01T12:00:00" - let parsed = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S%.f") - .or_else(|_| chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f")) - .or_else(|_| chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S")) - .or_else(|_| chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S")); + let parsed = + chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S%.f") + .or_else(|_| { + chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f") + }) + .or_else(|_| { + chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S") + }) + .or_else(|_| { + chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S") + }); match parsed { Ok(dt) => b.append_value(dt.and_utc().timestamp_micros()), Err(_) => { @@ -524,10 +526,12 @@ fn append_cell( .unwrap(); match value_str { // BigQuery BYTES are base64 encoded - Some(s) => match base64::Engine::decode(&base64::engine::general_purpose::STANDARD, s) { - Ok(bytes) => b.append_value(&bytes), - Err(_) => b.append_value(s.as_bytes()), - }, + Some(s) => { + match base64::Engine::decode(&base64::engine::general_purpose::STANDARD, s) { + Ok(bytes) => b.append_value(&bytes), + Err(_) => b.append_value(s.as_bytes()), + } + } None => b.append_null(), } } @@ -618,14 +622,8 @@ mod tests { bigquery_type_to_arrow("SMALLINT"), DataType::Int64 )); - assert!(matches!( - bigquery_type_to_arrow("TINYINT"), - DataType::Int64 - )); - assert!(matches!( - bigquery_type_to_arrow("BYTEINT"), - DataType::Int64 - )); + assert!(matches!(bigquery_type_to_arrow("TINYINT"), DataType::Int64)); + assert!(matches!(bigquery_type_to_arrow("BYTEINT"), DataType::Int64)); } #[test] @@ -885,7 +883,6 @@ mod tests { #[test] fn test_rows_to_batch_basic() { - let schema = Schema::new(vec![ Field::new("name", DataType::Utf8, true), Field::new("age", DataType::Int64, true), @@ -937,7 +934,6 @@ mod tests { #[test] fn test_rows_to_batch_with_nulls() { - let schema = Schema::new(vec![ Field::new("value", DataType::Utf8, true), Field::new("count", DataType::Int64, true), @@ -960,7 +956,6 @@ mod tests { #[test] fn test_rows_to_batch_boolean() { - let schema = Schema::new(vec![Field::new("flag", DataType::Boolean, true)]); let rows = vec![ @@ -988,7 +983,6 @@ mod tests { #[test] fn test_rows_to_batch_date() { - let schema = Schema::new(vec![Field::new("d", DataType::Date32, true)]); let rows = vec![TableRow { @@ -1012,7 +1006,6 @@ mod tests { #[test] fn test_rows_to_batch_timestamp() { - let schema = Schema::new(vec![Field::new( "ts", DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())), @@ -1038,7 +1031,6 @@ mod tests { #[test] fn test_rows_to_batch_float() { - let schema = Schema::new(vec![Field::new("f", DataType::Float64, true)]); let rows = vec![TableRow { From a3f413885c1daef9aea64bb98d6a11a4665cbc48 Mon Sep 17 00:00:00 2001 From: Anoop Narang Date: Tue, 10 Feb 2026 20:10:32 +0530 Subject: [PATCH 8/9] style(bigquery): fix approx_constant clippy lint in float test --- src/datafetch/native/bigquery.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/datafetch/native/bigquery.rs b/src/datafetch/native/bigquery.rs index f487ea1..5416457 100644 --- a/src/datafetch/native/bigquery.rs +++ b/src/datafetch/native/bigquery.rs @@ -1035,7 +1035,7 @@ mod tests { let rows = vec![TableRow { columns: Some(vec![TableCell { - value: Some(serde_json::Value::String("3.14".to_string())), + value: Some(serde_json::Value::String("2.72".to_string())), }]), }]; @@ -1045,7 +1045,7 @@ mod tests { .as_any() .downcast_ref::() .unwrap(); - assert!((col.value(0) - 3.14).abs() < f64::EPSILON); + assert_eq!(col.value(0), 2.72_f64); } #[test] From 54428d1c7809d94663cd546c4ad3d7428bf45d92 Mon Sep 17 00:00:00 2001 From: Anoop Narang Date: Tue, 10 Feb 2026 22:57:05 +0530 Subject: [PATCH 9/9] fix(bigquery): auto-create secret for inline credentials_json The connection handler only recognized password, token, and bearer_token as inline credential fields. BigQuery's credentials_json was not being extracted, stored as a secret, or linked to the connection. --- .../controllers/connections_controller.rs | 1 + tests/http_server_tests.rs | 80 +++++++++++++++++++ 2 files changed, 81 insertions(+) diff --git a/src/http/controllers/connections_controller.rs b/src/http/controllers/connections_controller.rs index cb8aebb..3bccb75 100644 --- a/src/http/controllers/connections_controller.rs +++ b/src/http/controllers/connections_controller.rs @@ -80,6 +80,7 @@ pub async fn create_connection_handler( ("password", "password"), ("token", "token"), ("bearer_token", "bearer_token"), + ("credentials_json", "credentials_json"), ]; // Skip inline credential processing if user already provided a secret_name diff --git a/tests/http_server_tests.rs b/tests/http_server_tests.rs index 5be58bb..23e6b45 100644 --- a/tests/http_server_tests.rs +++ b/tests/http_server_tests.rs @@ -1267,6 +1267,86 @@ async fn test_create_connection_with_password_auto_creates_secret() -> Result<() Ok(()) } +#[tokio::test(flavor = "multi_thread")] +async fn test_create_bigquery_connection_with_credentials_json_auto_creates_secret() -> Result<()> { + let (app, _tempdir) = setup_test().await?; + + let fake_sa_json = serde_json::to_string(&json!({ + "type": "service_account", + "project_id": "test-project", + "private_key_id": "key123", + "private_key": "-----BEGIN RSA PRIVATE KEY-----\nfake\n-----END RSA PRIVATE KEY-----\n", + "client_email": "test@test-project.iam.gserviceaccount.com", + "client_id": "123456789", + "auth_uri": "https://accounts.google.com/o/oauth2/auth", + "token_uri": "https://oauth2.googleapis.com/token" + }))?; + + let response = app + .clone() + .oneshot( + Request::builder() + .method("POST") + .uri(PATH_CONNECTIONS) + .header("content-type", "application/json") + .body(Body::from(serde_json::to_string(&json!({ + "name": "my_bq", + "source_type": "bigquery", + "config": { + "project_id": "test-project", + "credentials_json": fake_sa_json, + "dataset": "my_dataset" + } + }))?))?, + ) + .await?; + + assert_eq!(response.status(), StatusCode::CREATED); + + let body = axum::body::to_bytes(response.into_body(), usize::MAX).await?; + let json: serde_json::Value = serde_json::from_slice(&body)?; + + assert_eq!(json["name"], "my_bq"); + assert_eq!(json["source_type"], "bigquery"); + // Discovery fails because no BigQuery server, but the important thing is it's not + // "no credential configured" - it should be a connection/auth error + assert_eq!(json["discovery_status"], "failed"); + let error = json["discovery_error"].as_str().unwrap(); + assert!( + !error.contains("no credential configured"), + "Error should not be 'no credential configured', got: {}", + error + ); + + // Verify the secret was auto-created with the expected name + let secrets_response = app + .oneshot( + Request::builder() + .method("GET") + .uri(PATH_SECRETS) + .body(Body::empty())?, + ) + .await?; + + assert_eq!(secrets_response.status(), StatusCode::OK); + + let secrets_body = axum::body::to_bytes(secrets_response.into_body(), usize::MAX).await?; + let secrets_json: serde_json::Value = serde_json::from_slice(&secrets_body)?; + + let secrets = secrets_json["secrets"].as_array().unwrap(); + assert_eq!( + secrets.len(), + 1, + "Expected exactly one secret to be created" + ); + assert_eq!( + secrets[0]["name"], "conn-my_bq-credentials_json", + "Secret should be named 'conn-my_bq-credentials_json'" + ); + + Ok(()) +} + #[tokio::test(flavor = "multi_thread")] async fn test_create_connection_with_secret_name() -> Result<()> { let (app, _tempdir) = setup_test().await?;