Skip to content

Commit

Permalink
Add support for Create Iceberg Table statement for Snowflake parser (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Vedin authored Jan 20, 2025
1 parent 183274e commit c7c0de6
Show file tree
Hide file tree
Showing 10 changed files with 296 additions and 5 deletions.
48 changes: 46 additions & 2 deletions src/ast/dml.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ use super::{
CommentDef, Expr, FileFormat, FromTable, HiveDistributionStyle, HiveFormat, HiveIOFormat,
HiveRowFormat, Ident, InsertAliases, MysqlInsertPriority, ObjectName, OnCommit, OnInsert,
OneOrManyWithParens, OrderByExpr, Query, RowAccessPolicy, SelectItem, Setting, SqlOption,
SqliteOnConflict, TableEngine, TableObject, TableWithJoins, Tag, WrappedCollection,
SqliteOnConflict, StorageSerializationPolicy, TableEngine, TableObject, TableWithJoins, Tag,
WrappedCollection,
};

/// CREATE INDEX statement.
Expand Down Expand Up @@ -117,6 +118,7 @@ pub struct CreateTable {
pub if_not_exists: bool,
pub transient: bool,
pub volatile: bool,
pub iceberg: bool,
/// Table name
#[cfg_attr(feature = "visitor", visit(with = "visit_relation"))]
pub name: ObjectName,
Expand Down Expand Up @@ -192,6 +194,21 @@ pub struct CreateTable {
/// Snowflake "WITH TAG" clause
/// <https://docs.snowflake.com/en/sql-reference/sql/create-table>
pub with_tags: Option<Vec<Tag>>,
/// Snowflake "EXTERNAL_VOLUME" clause for Iceberg tables
/// <https://docs.snowflake.com/en/sql-reference/sql/create-iceberg-table>
pub external_volume: Option<String>,
/// Snowflake "BASE_LOCATION" clause for Iceberg tables
/// <https://docs.snowflake.com/en/sql-reference/sql/create-iceberg-table>
pub base_location: Option<String>,
/// Snowflake "CATALOG" clause for Iceberg tables
/// <https://docs.snowflake.com/en/sql-reference/sql/create-iceberg-table>
pub catalog: Option<String>,
/// Snowflake "CATALOG_SYNC" clause for Iceberg tables
/// <https://docs.snowflake.com/en/sql-reference/sql/create-iceberg-table>
pub catalog_sync: Option<String>,
/// Snowflake "STORAGE_SERIALIZATION_POLICY" clause for Iceberg tables
/// <https://docs.snowflake.com/en/sql-reference/sql/create-iceberg-table>
pub storage_serialization_policy: Option<StorageSerializationPolicy>,
}

impl Display for CreateTable {
Expand All @@ -205,7 +222,7 @@ impl Display for CreateTable {
// `CREATE TABLE t (a INT) AS SELECT a from t2`
write!(
f,
"CREATE {or_replace}{external}{global}{temporary}{transient}{volatile}TABLE {if_not_exists}{name}",
"CREATE {or_replace}{external}{global}{temporary}{transient}{volatile}{iceberg}TABLE {if_not_exists}{name}",
or_replace = if self.or_replace { "OR REPLACE " } else { "" },
external = if self.external { "EXTERNAL " } else { "" },
global = self.global
Expand All @@ -221,6 +238,8 @@ impl Display for CreateTable {
temporary = if self.temporary { "TEMPORARY " } else { "" },
transient = if self.transient { "TRANSIENT " } else { "" },
volatile = if self.volatile { "VOLATILE " } else { "" },
// Only for Snowflake
iceberg = if self.iceberg { "ICEBERG " } else { "" },
name = self.name,
)?;
if let Some(on_cluster) = &self.on_cluster {
Expand Down Expand Up @@ -382,6 +401,31 @@ impl Display for CreateTable {
)?;
}

if let Some(external_volume) = self.external_volume.as_ref() {
write!(f, " EXTERNAL_VOLUME = '{external_volume}'")?;
}

if let Some(catalog) = self.catalog.as_ref() {
write!(f, " CATALOG = '{catalog}'")?;
}

if self.iceberg {
if let Some(base_location) = self.base_location.as_ref() {
write!(f, " BASE_LOCATION = '{base_location}'")?;
}
}

if let Some(catalog_sync) = self.catalog_sync.as_ref() {
write!(f, " CATALOG_SYNC = '{catalog_sync}'")?;
}

if let Some(storage_serialization_policy) = self.storage_serialization_policy.as_ref() {
write!(
f,
" STORAGE_SERIALIZATION_POLICY = {storage_serialization_policy}"
)?;
}

if self.copy_grants {
write!(f, " COPY GRANTS")?;
}
Expand Down
65 changes: 64 additions & 1 deletion src/ast/helpers/stmt_create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use super::super::dml::CreateTable;
use crate::ast::{
ClusteredBy, ColumnDef, CommentDef, Expr, FileFormat, HiveDistributionStyle, HiveFormat, Ident,
ObjectName, OnCommit, OneOrManyWithParens, Query, RowAccessPolicy, SqlOption, Statement,
TableConstraint, TableEngine, Tag, WrappedCollection,
StorageSerializationPolicy, TableConstraint, TableEngine, Tag, WrappedCollection,
};
use crate::parser::ParserError;

Expand Down Expand Up @@ -71,6 +71,7 @@ pub struct CreateTableBuilder {
pub if_not_exists: bool,
pub transient: bool,
pub volatile: bool,
pub iceberg: bool,
pub name: ObjectName,
pub columns: Vec<ColumnDef>,
pub constraints: Vec<TableConstraint>,
Expand Down Expand Up @@ -107,6 +108,11 @@ pub struct CreateTableBuilder {
pub with_aggregation_policy: Option<ObjectName>,
pub with_row_access_policy: Option<RowAccessPolicy>,
pub with_tags: Option<Vec<Tag>>,
pub base_location: Option<String>,
pub external_volume: Option<String>,
pub catalog: Option<String>,
pub catalog_sync: Option<String>,
pub storage_serialization_policy: Option<StorageSerializationPolicy>,
}

impl CreateTableBuilder {
Expand All @@ -119,6 +125,7 @@ impl CreateTableBuilder {
if_not_exists: false,
transient: false,
volatile: false,
iceberg: false,
name,
columns: vec![],
constraints: vec![],
Expand Down Expand Up @@ -155,6 +162,11 @@ impl CreateTableBuilder {
with_aggregation_policy: None,
with_row_access_policy: None,
with_tags: None,
base_location: None,
external_volume: None,
catalog: None,
catalog_sync: None,
storage_serialization_policy: None,
}
}
pub fn or_replace(mut self, or_replace: bool) -> Self {
Expand Down Expand Up @@ -192,6 +204,11 @@ impl CreateTableBuilder {
self
}

pub fn iceberg(mut self, iceberg: bool) -> Self {
self.iceberg = iceberg;
self
}

pub fn columns(mut self, columns: Vec<ColumnDef>) -> Self {
self.columns = columns;
self
Expand Down Expand Up @@ -371,6 +388,34 @@ impl CreateTableBuilder {
self
}

pub fn base_location(mut self, base_location: Option<String>) -> Self {
self.base_location = base_location;
self
}

pub fn external_volume(mut self, external_volume: Option<String>) -> Self {
self.external_volume = external_volume;
self
}

pub fn catalog(mut self, catalog: Option<String>) -> Self {
self.catalog = catalog;
self
}

pub fn catalog_sync(mut self, catalog_sync: Option<String>) -> Self {
self.catalog_sync = catalog_sync;
self
}

pub fn storage_serialization_policy(
mut self,
storage_serialization_policy: Option<StorageSerializationPolicy>,
) -> Self {
self.storage_serialization_policy = storage_serialization_policy;
self
}

pub fn build(self) -> Statement {
Statement::CreateTable(CreateTable {
or_replace: self.or_replace,
Expand All @@ -380,6 +425,7 @@ impl CreateTableBuilder {
if_not_exists: self.if_not_exists,
transient: self.transient,
volatile: self.volatile,
iceberg: self.iceberg,
name: self.name,
columns: self.columns,
constraints: self.constraints,
Expand Down Expand Up @@ -416,6 +462,11 @@ impl CreateTableBuilder {
with_aggregation_policy: self.with_aggregation_policy,
with_row_access_policy: self.with_row_access_policy,
with_tags: self.with_tags,
base_location: self.base_location,
external_volume: self.external_volume,
catalog: self.catalog,
catalog_sync: self.catalog_sync,
storage_serialization_policy: self.storage_serialization_policy,
})
}
}
Expand All @@ -435,6 +486,7 @@ impl TryFrom<Statement> for CreateTableBuilder {
if_not_exists,
transient,
volatile,
iceberg,
name,
columns,
constraints,
Expand Down Expand Up @@ -471,6 +523,11 @@ impl TryFrom<Statement> for CreateTableBuilder {
with_aggregation_policy,
with_row_access_policy,
with_tags,
base_location,
external_volume,
catalog,
catalog_sync,
storage_serialization_policy,
}) => Ok(Self {
or_replace,
temporary,
Expand Down Expand Up @@ -505,6 +562,7 @@ impl TryFrom<Statement> for CreateTableBuilder {
clustered_by,
options,
strict,
iceberg,
copy_grants,
enable_schema_evolution,
change_tracking,
Expand All @@ -515,6 +573,11 @@ impl TryFrom<Statement> for CreateTableBuilder {
with_row_access_policy,
with_tags,
volatile,
base_location,
external_volume,
catalog,
catalog_sync,
storage_serialization_policy,
}),
_ => Err(ParserError::ParserError(format!(
"Expected create table statement, but received: {stmt}"
Expand Down
23 changes: 23 additions & 0 deletions src/ast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8396,6 +8396,29 @@ impl fmt::Display for SessionParamValue {
}
}

/// Snowflake StorageSerializationPolicy for Iceberg Tables
/// ```sql
/// [ STORAGE_SERIALIZATION_POLICY = { COMPATIBLE | OPTIMIZED } ]
/// ```
///
/// <https://docs.snowflake.com/en/sql-reference/sql/create-iceberg-table>
#[derive(Debug, Copy, Clone, PartialEq, PartialOrd, Eq, Ord, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[cfg_attr(feature = "visitor", derive(Visit, VisitMut))]
pub enum StorageSerializationPolicy {
Compatible,
Optimized,
}

impl Display for StorageSerializationPolicy {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
StorageSerializationPolicy::Compatible => write!(f, "COMPATIBLE"),
StorageSerializationPolicy::Optimized => write!(f, "OPTIMIZED"),
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
6 changes: 6 additions & 0 deletions src/ast/spans.rs
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,7 @@ impl Spanned for CreateTable {
if_not_exists: _, // bool
transient: _, // bool
volatile: _, // bool
iceberg: _, // bool, Snowflake specific
name,
columns,
constraints,
Expand Down Expand Up @@ -568,6 +569,11 @@ impl Spanned for CreateTable {
with_aggregation_policy: _, // todo, Snowflake specific
with_row_access_policy: _, // todo, Snowflake specific
with_tags: _, // todo, Snowflake specific
external_volume: _, // todo, Snowflake specific
base_location: _, // todo, Snowflake specific
catalog: _, // todo, Snowflake specific
catalog_sync: _, // todo, Snowflake specific
storage_serialization_policy: _, // todo, Snowflake specific
} = self;

union_spans(
Expand Down
Loading

0 comments on commit c7c0de6

Please sign in to comment.