Skip to content

Commit

Permalink
Test pr 1707 (#1720)
Browse files Browse the repository at this point in the history
* Add support for use_logical_type in write_pandas.

use_logical_type is a new file format option of Snowflake.
It is a Boolean that specifies whether Snowflake interprets Parquet logical types during data loading.
The default behavior of write_pandas is unchanged.
When users write a dataframe that contains datetimes with timezones and do not pass use_logical_type = True as an argument, a warning is raised (see #1687).
Providing this option also fixes issue #1687

* FIX: removed pandas import and used descriptive
naming over concise naming for is_datetime64tz_dtype.

STYLE: if statement to idiomatic form.

STYLE: broke copy_into_sql command into multiple lines,
with each file_format argument on a separate line.

* STYLE rearranged imports test_pandas_tools.py

* REFAC: Utilized 'equal sign specifier' in f-string for
improved use_logical_type warning

* changelog updates

---------

Co-authored-by: Dennis Van de Vorst <87502756+dvorst@users.noreply.github.com>
  • Loading branch information
sfc-gh-aalam and dvorst authored Oct 19, 2023
1 parent 5b61af7 commit 186a186
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 7 deletions.
4 changes: 4 additions & 0 deletions DESCRIPTION.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ Source code is also available at: https://github.com/snowflakedb/snowflake-conne

# Release Notes

- v3.3.2(TBD)

- Added support for `use_logical_type` in `write_pandas`.

- v3.3.1(October 16,2023)

- Added for non-Windows platforms command suggestions (chown/chmod) for insufficient file permissions of config files.
Expand Down
57 changes: 51 additions & 6 deletions src/snowflake/connector/pandas_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,15 @@ def _create_temp_stage(


def _do_create_temp_file_format(
cursor: SnowflakeCursor, file_format_location: str, compression: str
cursor: SnowflakeCursor,
file_format_location: str,
compression: str,
sql_use_logical_type: str,
) -> None:
file_format_sql = (
f"CREATE TEMP FILE FORMAT {file_format_location} "
f"/* Python:snowflake.connector.pandas_tools.write_pandas() */ "
f"TYPE=PARQUET COMPRESSION={compression}"
f"TYPE=PARQUET COMPRESSION={compression}{sql_use_logical_type}"
)
logger.debug(f"creating file format with '{file_format_sql}'")
cursor.execute(file_format_sql, _is_internal=True)
Expand All @@ -135,6 +138,7 @@ def _create_temp_file_format(
schema: str | None,
quote_identifiers: bool,
compression: str,
sql_use_logical_type: str,
) -> str:
file_format_name = random_string()
file_format_location = build_location_helper(
Expand All @@ -144,15 +148,19 @@ def _create_temp_file_format(
quote_identifiers=quote_identifiers,
)
try:
_do_create_temp_file_format(cursor, file_format_location, compression)
_do_create_temp_file_format(
cursor, file_format_location, compression, sql_use_logical_type
)
except ProgrammingError as e:
# User may not have the privilege to create file format on the target schema, so fall back to use current schema
# as the old behavior.
logger.debug(
f"creating stage {file_format_location} failed. Exception {str(e)}. Fall back to use current schema"
)
file_format_location = file_format_name
_do_create_temp_file_format(cursor, file_format_location, compression)
_do_create_temp_file_format(
cursor, file_format_location, compression, sql_use_logical_type
)

return file_format_location

Expand All @@ -172,6 +180,7 @@ def write_pandas(
create_temp_table: bool = False,
overwrite: bool = False,
table_type: Literal["", "temp", "temporary", "transient"] = "",
use_logical_type: bool | None = None,
**kwargs: Any,
) -> tuple[
bool,
Expand Down Expand Up @@ -232,6 +241,11 @@ def write_pandas(
Pandas DataFrame.
table_type: The table type of to-be-created table. The supported table types include ``temp``/``temporary``
and ``transient``. Empty means permanent table as per SQL convention.
use_logical_type: Boolean that specifies whether to use Parquet logical types. With this file format option,
Snowflake can interpret Parquet logical types during data loading. To enable Parquet logical types,
set use_logical_type as True. Set to None to use Snowflakes default. For more information, see:
https://docs.snowflake.com/en/sql-reference/sql/create-file-format
Returns:
Returns the COPY INTO command's results to verify ingestion in the form of a tuple of whether all chunks were
Expand Down Expand Up @@ -280,6 +294,27 @@ def write_pandas(
stacklevel=2,
)

# use_logical_type should be True when dataframe contains datetimes with timezone.
# https://github.com/snowflakedb/snowflake-connector-python/issues/1687
if not use_logical_type and any(
[pandas.api.types.is_datetime64tz_dtype(df[c]) for c in df.columns]
):
warnings.warn(
"Dataframe contains a datetime with timezone column, but "
f"'{use_logical_type=}'. This can result in dateimes "
"being incorrectly written to Snowflake. Consider setting "
"'use_logical_type = True'",
UserWarning,
stacklevel=2,
)

if use_logical_type is None:
sql_use_logical_type = ""
elif use_logical_type:
sql_use_logical_type = " USE_LOGICAL_TYPE = TRUE"
else:
sql_use_logical_type = " USE_LOGICAL_TYPE = FALSE"

cursor = conn.cursor()
stage_location = _create_temp_stage(
cursor,
Expand Down Expand Up @@ -329,7 +364,12 @@ def drop_object(name: str, object_type: str) -> None:

if auto_create_table or overwrite:
file_format_location = _create_temp_file_format(
cursor, database, schema, quote_identifiers, compression_map[compression]
cursor,
database,
schema,
quote_identifiers,
compression_map[compression],
sql_use_logical_type,
)
infer_schema_sql = f"SELECT COLUMN_NAME, TYPE FROM table(infer_schema(location=>'@{stage_location}', file_format=>'{file_format_location}'))"
logger.debug(f"inferring schema with '{infer_schema_sql}'")
Expand Down Expand Up @@ -381,7 +421,12 @@ def drop_object(name: str, object_type: str) -> None:
f"COPY INTO {target_table_location} /* Python:snowflake.connector.pandas_tools.write_pandas() */ "
f"({columns}) "
f"FROM (SELECT {parquet_columns} FROM @{stage_location}) "
f"FILE_FORMAT=(TYPE=PARQUET COMPRESSION={compression_map[compression]}{' BINARY_AS_TEXT=FALSE' if auto_create_table or overwrite else ''}) "
f"FILE_FORMAT=("
f"TYPE=PARQUET "
f"COMPRESSION={compression_map[compression]}"
f"{' BINARY_AS_TEXT=FALSE' if auto_create_table or overwrite else ''}"
f"{sql_use_logical_type}"
f") "
f"PURGE=TRUE ON_ERROR={on_error}"
)
logger.debug(f"copying into with '{copy_into_sql}'")
Expand Down
50 changes: 49 additions & 1 deletion test/integ/pandas/test_pandas_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from __future__ import annotations

import math
from datetime import datetime, timezone
from datetime import datetime, timedelta, timezone
from typing import TYPE_CHECKING, Any, Callable, Generator
from unittest import mock

Expand Down Expand Up @@ -417,6 +417,54 @@ def test_write_pandas_create_temp_table_deprecation_warning(
cnx.execute_string(drop_sql)


@pytest.mark.parametrize("use_logical_type", [None, True, False])
def test_write_pandas_use_logical_type(
conn_cnx: Callable[..., Generator[SnowflakeConnection, None, None]],
use_logical_type: bool | None,
):
table_name = random_string(5, "USE_LOCAL_TYPE_").upper()
col_name = "DT"
create_sql = f"CREATE OR REPLACE TABLE {table_name} ({col_name} TIMESTAMP_TZ)"
select_sql = f"SELECT * FROM {table_name}"
drop_sql = f"DROP TABLE IF EXISTS {table_name}"
timestamp = datetime(
year=2020,
month=1,
day=2,
hour=3,
minute=4,
second=5,
microsecond=6,
tzinfo=timezone(timedelta(hours=2)),
)
df_write = pandas.DataFrame({col_name: [timestamp]})

with conn_cnx() as cnx: # type: SnowflakeConnection
cnx.cursor().execute(create_sql).fetchall()

write_pandas_kwargs = dict(
conn=cnx,
df=df_write,
use_logical_type=use_logical_type,
auto_create_table=False,
table_name=table_name,
)

try:
# When use_logical_type = True, datetimes with timestamps should be
# correctly written to Snowflake.
if use_logical_type:
write_pandas(**write_pandas_kwargs)
df_read = cnx.cursor().execute(select_sql).fetch_pandas_all()
assert all(df_write == df_read)
# For other use_logical_type values, a UserWarning should be displayed.
else:
with pytest.warns(UserWarning, match="Dataframe contains a datetime.*"):
write_pandas(**write_pandas_kwargs)
finally:
cnx.execute_string(drop_sql)


def test_invalid_table_type_write_pandas(
conn_cnx: Callable[..., Generator[SnowflakeConnection, None, None]],
):
Expand Down

0 comments on commit 186a186

Please sign in to comment.