Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1652349: Add support for iceberg to write_pandas #2056

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions DESCRIPTION.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ Source code is also available at: https://github.com/snowflakedb/snowflake-conne

# Release Notes

- v3.12.3(TBD)
- Added support for iceberg tables to `write_pandas`

- v3.12.2(September 11,2024)
- Improved error handling for asynchronous queries, providing more detailed and informative error messages when an async query fails.
- Improved inference of top-level domains for accounts specifying a region in China, now defaulting to snowflakecomputing.cn.
Expand Down
52 changes: 50 additions & 2 deletions src/snowflake/connector/pandas_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,40 @@ def _create_temp_file_format(
return file_format_location


def _convert_value_to_sql_option(value: Union[str, bool, int, float] | None) -> str:
if isinstance(value, str):
if len(value) > 1 and value.startswith("'") and value.endswith("'"):
return value
else:
value = value.replace(
"'", "''"
) # escape single quotes before adding a pair of quotes
return f"'{value}'"
else:
return str(value)


def _iceberg_config_statement_helper(iceberg_config: dict[str, str]) -> str:
ALLOWED_CONFIGS = {
"EXTERNAL_VOLUME",
"CATALOG",
"BASE_LOCATION",
"CATALOG_SYNC",
"STORAGE_SERIALIZATION_POLICY",
}

normalized = {
k.upper(): _convert_value_to_sql_option(v) for k, v in iceberg_config.items()
}

if invalid_configs := set(normalized.keys()) - ALLOWED_CONFIGS:
raise ProgrammingError(
f"Invalid iceberg configurations option(s) provided {', '.join(sorted(invalid_configs))}"
)

return " ".join(f"{k}={v}" for k, v in normalized.items())


def write_pandas(
conn: SnowflakeConnection,
df: pandas.DataFrame,
Expand All @@ -181,6 +215,7 @@ def write_pandas(
overwrite: bool = False,
table_type: Literal["", "temp", "temporary", "transient"] = "",
use_logical_type: bool | None = None,
iceberg_config: dict[str, str] | None = None,
**kwargs: Any,
) -> tuple[
bool,
Expand Down Expand Up @@ -245,6 +280,14 @@ def write_pandas(
Snowflake can interpret Parquet logical types during data loading. To enable Parquet logical types,
set use_logical_type as True. Set to None to use Snowflakes default. For more information, see:
https://docs.snowflake.com/en/sql-reference/sql/create-file-format
iceberg_config: A dictionary that can contain the following iceberg configuration values:
* external_volume: specifies the identifier for the external volume where
the Iceberg table stores its metadata files and data in Parquet format
* catalog: specifies either Snowflake or a catalog integration to use for this table
* base_location: the base directory that snowflake can write iceberg metadata and files to
* catalog_sync: optionally sets the catalog integration configured for Polaris Catalog
* storage_serialization_policy: specifies the storage serialization policy for the table



Returns:
Expand Down Expand Up @@ -393,9 +436,14 @@ def drop_object(name: str, object_type: str) -> None:
quote_identifiers,
)

iceberg = "ICEBERG " if iceberg_config else ""
iceberg_config_statement = _iceberg_config_statement_helper(
iceberg_config or {}
)

create_table_sql = (
f"CREATE {table_type.upper()} TABLE IF NOT EXISTS {target_table_location} "
f"({create_table_columns})"
f"CREATE {table_type.upper()} {iceberg}TABLE IF NOT EXISTS {target_table_location} "
f"({create_table_columns}) {iceberg_config_statement}"
f" /* Python:snowflake.connector.pandas_tools.write_pandas() */ "
)
logger.debug(f"auto creating table with '{create_table_sql}'")
Expand Down
5 changes: 5 additions & 0 deletions test/integ/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,11 @@ def conn_cnx() -> Callable[..., ContextManager[SnowflakeConnection]]:
return db


@pytest.fixture(scope="module")
def module_conn_cnx() -> Callable[..., ContextManager[SnowflakeConnection]]:
return db


@pytest.fixture()
def negative_conn_cnx() -> Callable[..., ContextManager[SnowflakeConnection]]:
"""Use this if an incident is expected and we don't want GS to create a dump file about the incident."""
Expand Down
29 changes: 28 additions & 1 deletion test/integ/pandas/test_pandas_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from __future__ import annotations

import math
import re
from datetime import datetime, timedelta, timezone
from typing import TYPE_CHECKING, Any, Callable, Generator
from unittest import mock
Expand All @@ -26,10 +27,14 @@

try:
from snowflake.connector.options import pandas
from snowflake.connector.pandas_tools import write_pandas
from snowflake.connector.pandas_tools import (
_iceberg_config_statement_helper,
write_pandas,
)
except ImportError:
pandas = None
write_pandas = None
_iceberg_config_statement_helper = None

if TYPE_CHECKING:
from snowflake.connector import SnowflakeConnection
Expand Down Expand Up @@ -975,3 +980,25 @@ def mock_execute(*args, **kwargs):
finally:
cnx.execute_string(f"drop schema if exists {source_schema}")
cnx.execute_string(f"drop schema if exists {target_schema}")


def test__iceberg_config_statement_helper():
config = {
"EXTERNAL_VOLUME": "vol",
"CATALOG": "'SNOWFLAKE'",
"BASE_LOCATION": "/root",
"CATALOG_SYNC": "foo",
"STORAGE_SERIALIZATION_POLICY": "bar",
}
assert (
_iceberg_config_statement_helper(config)
== "EXTERNAL_VOLUME='vol' CATALOG='SNOWFLAKE' BASE_LOCATION='/root' CATALOG_SYNC='foo' STORAGE_SERIALIZATION_POLICY='bar'"
)

config["foo"] = True
config["bar"] = True
with pytest.raises(
ProgrammingError,
match=re.escape("Invalid iceberg configurations option(s) provided BAR, FOO"),
):
_iceberg_config_statement_helper(config)
Loading
Loading