Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ bincode = { version = "2.0", features = ["serde"] }
walrus-rust = "0.2.0"
thiserror = "2.0"
strum = { version = "0.27", features = ["derive"] }
# Parquet Variant support for proper semi-structured data encoding
parquet-variant = "0.2.0"
parquet-variant-compute = "0.2.0"
parquet-variant-json = "0.2.0"

[dev-dependencies]
sqllogictest = { git = "https://github.com/risinglightdb/sqllogictest-rs.git" }
Expand Down
14 changes: 7 additions & 7 deletions schemas/otel_logs_and_spans.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ fields:
data_type: Int32
nullable: true
- name: body
data_type: Utf8
data_type: Variant
nullable: true
- name: duration
data_type: Int64
Expand All @@ -61,7 +61,7 @@ fields:
data_type: 'Timestamp(Microsecond, Some("UTC"))'
nullable: true
- name: context
data_type: Utf8
data_type: Variant
nullable: true
- name: context___trace_id
data_type: Utf8
Expand All @@ -79,13 +79,13 @@ fields:
data_type: Utf8
nullable: true
- name: events
data_type: Utf8
data_type: Variant
nullable: true
- name: links
data_type: Utf8
data_type: Variant
nullable: true
- name: attributes
data_type: Utf8
data_type: Variant
nullable: true
- name: attributes___client___address
data_type: Utf8
Expand Down Expand Up @@ -235,7 +235,7 @@ fields:
data_type: Utf8
nullable: true
- name: resource
data_type: Utf8
data_type: Variant
nullable: true
- name: resource___service___name
data_type: Utf8
Expand Down Expand Up @@ -268,7 +268,7 @@ fields:
data_type: "List(Utf8)"
nullable: false
- name: errors
data_type: Utf8
data_type: Variant
nullable: true
- name: log_pattern
data_type: Utf8
Expand Down
37 changes: 34 additions & 3 deletions src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use deltalake::PartitionFilter;
use deltalake::datafusion::parquet::file::metadata::SortingColumn;
use deltalake::datafusion::parquet::file::properties::WriterProperties;
use deltalake::kernel::transaction::CommitProperties;
use deltalake::kernel::{Action, Protocol};
use deltalake::operations::create::CreateBuilder;
use deltalake::{DeltaTable, DeltaTableBuilder};
use futures::StreamExt;
Expand Down Expand Up @@ -66,6 +67,23 @@ pub fn extract_project_id(batch: &RecordBatch) -> Option<String> {
// Compression level for parquet files - kept for WriterProperties fallback
const ZSTD_COMPRESSION_LEVEL: i32 = 3;

/// Create a Protocol with variantType feature enabled.
/// Required for tables with Variant columns per Delta Lake protocol spec.
/// Note: Currently unused because delta-rs ProtocolChecker doesn't support variantType yet.
/// When delta-rs adds support, this can be enabled in the CreateBuilder.with_actions() call.
#[allow(dead_code)]
fn create_variant_protocol() -> Protocol {
// Protocol::try_new is pub(crate) in delta-kernel, so we use serde_json
// to create it (same approach used internally by delta-rs)
serde_json::from_value(serde_json::json!({
"minReaderVersion": 3,
"minWriterVersion": 7,
"readerFeatures": ["variantType"],
"writerFeatures": ["variantType"]
}))
.expect("Valid protocol JSON")
}

#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)]
struct StorageConfig {
project_id: String,
Expand Down Expand Up @@ -765,10 +783,13 @@ impl Database {
ctx.register_udf(set_config_udf);
}

/// Register JSON functions from datafusion-functions-json
/// Register JSON functions from datafusion-functions-json with Variant-aware wrappers
pub fn register_json_functions(&self, ctx: &mut SessionContext) {
datafusion_functions_json::register_all(ctx).expect("Failed to register JSON functions");
info!("Registered JSON functions with SessionContext");
// Register variant-aware wrappers that override the standard JSON functions
// These handle both Variant (Struct) and Utf8 inputs transparently
crate::variant_utils::register_variant_json_functions(ctx);
info!("Registered JSON functions with Variant support");
}

#[instrument(
Expand Down Expand Up @@ -993,7 +1014,15 @@ impl Database {

let mut config = HashMap::new();
config.insert("delta.checkpointInterval".to_string(), Some(checkpoint_interval));
config.insert("delta.checkpointPolicy".to_string(), Some("v2".to_string()));
// Note: v2 checkpoint policy requires v2Checkpoint feature which delta-rs doesn't support yet
// config.insert("delta.checkpointPolicy".to_string(), Some("v2".to_string()));

// Note: delta-rs doesn't yet support variantType in its ProtocolChecker.
// Variant columns are stored as Struct<metadata: BinaryView, value: BinaryView>
// which is the correct binary representation per Parquet Variant spec.
// When delta-rs adds variantType support, we can enable the Protocol action.
// See: https://github.com/delta-io/delta-rs/blob/main/crates/core/src/kernel/transaction/protocol.rs
let actions: Vec<Action> = vec![];

match CreateBuilder::new()
.with_location(&storage_uri)
Expand All @@ -1002,6 +1031,7 @@ impl Database {
.with_storage_options(storage_options.clone())
.with_commit_properties(commit_properties)
.with_configuration(config)
.with_actions(actions)
.await
{
Ok(table) => break table,
Expand Down Expand Up @@ -2004,6 +2034,7 @@ mod tests {
let db_arc = Arc::new(db.clone());
let mut ctx = db_arc.create_session_context();
datafusion_functions_json::register_all(&mut ctx)?;
crate::variant_utils::register_variant_json_functions(&mut ctx);
db.setup_session_context(&mut ctx)?;
Ok((db, ctx, test_prefix))
}
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@ pub mod schema_loader;
pub mod statistics;
pub mod telemetry;
pub mod test_utils;
pub mod variant_utils;
pub mod wal;
23 changes: 23 additions & 0 deletions src/schema_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,21 @@ impl TableSchema {
})
.collect()
}

/// Check if this schema contains any Variant type columns
pub fn has_variant_columns(&self) -> bool {
self.fields.iter().any(|f| f.data_type == "Variant")
}
}

/// Get the Arrow DataType for Variant (Struct with metadata and value BinaryView fields)
/// Uses BinaryView to match parquet-variant-compute output
pub fn variant_arrow_type() -> ArrowDataType {
use arrow::datatypes::Fields;
ArrowDataType::Struct(Fields::from(vec![
Arc::new(Field::new("metadata", ArrowDataType::BinaryView, false)),
Arc::new(Field::new("value", ArrowDataType::BinaryView, false)),
]))
}

fn parse_arrow_data_type(s: &str) -> anyhow::Result<ArrowDataType> {
Expand All @@ -103,10 +118,17 @@ fn parse_arrow_data_type(s: &str) -> anyhow::Result<ArrowDataType> {
"List(Utf8)" => ArrowDataType::List(Arc::new(Field::new("item", ArrowDataType::Utf8, true))),
"Timestamp(Microsecond, None)" => ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, None),
"Timestamp(Microsecond, Some(\"UTC\"))" => ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, Some("UTC".into())),
"Variant" => variant_arrow_type(),
_ => anyhow::bail!("Unknown type: {}", s),
})
}

/// Create a proper Delta Variant type using delta-kernel's unshredded_variant()
/// This represents a Struct<metadata: Binary, value: Binary> that delta-kernel recognizes as Variant
fn variant_delta_type() -> DeltaDataType {
DeltaDataType::unshredded_variant()
}

fn parse_delta_data_type(s: &str) -> anyhow::Result<DeltaDataType> {
use PrimitiveType::*;
Ok(match s {
Expand All @@ -115,6 +137,7 @@ fn parse_delta_data_type(s: &str) -> anyhow::Result<DeltaDataType> {
"Int32" | "UInt32" => DeltaDataType::Primitive(Integer),
"Int64" | "UInt64" => DeltaDataType::Primitive(Long),
"List(Utf8)" => DeltaDataType::Array(Box::new(ArrayType::new(DeltaDataType::Primitive(String), true))),
"Variant" => variant_delta_type(),
_ if s.starts_with("Timestamp") => DeltaDataType::Primitive(Timestamp),
_ => anyhow::bail!("Unknown type: {}", s),
})
Expand Down
21 changes: 18 additions & 3 deletions src/test_utils.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,34 @@
pub mod test_helpers {
use crate::schema_loader::get_default_schema;
use crate::variant_utils::{VARIANT_COLUMNS, convert_batch_json_to_variant, variant_data_type};
use arrow::datatypes::{DataType, Field, Schema};
use arrow_json::ReaderBuilder;
use datafusion::arrow::record_batch::RecordBatch;
use serde_json::{Value, json};
use std::collections::HashMap;
use std::sync::Arc;

pub fn json_to_batch(records: Vec<Value>) -> anyhow::Result<RecordBatch> {
let schema = get_default_schema().schema_ref();
// Create a parsing schema with Utf8 for Variant columns (arrow_json can't parse Variant)
let parse_schema = Arc::new(Schema::new(
schema.fields().iter().map(|f| {
if VARIANT_COLUMNS.contains(&f.name().as_str()) && *f.data_type() == variant_data_type() {
Arc::new(Field::new(f.name(), DataType::Utf8, f.is_nullable()))
} else {
f.clone()
}
}).collect::<Vec<_>>()
));
let json_data = records.into_iter().map(|v| v.to_string()).collect::<Vec<_>>().join("\n");

ReaderBuilder::new(schema.clone())
let batch = ReaderBuilder::new(parse_schema)
.build(std::io::Cursor::new(json_data.as_bytes()))?
.next()
.ok_or_else(|| anyhow::anyhow!("Failed to read batch"))?
.map_err(Into::into)
.ok_or_else(|| anyhow::anyhow!("Failed to read batch"))??;

// Convert Utf8 JSON columns to Variant
convert_batch_json_to_variant(batch).map_err(|e| anyhow::anyhow!("{}", e))
}

pub fn create_default_record() -> HashMap<String, Value> {
Expand Down
Loading
Loading