From 7dc26700fea557bbf09d7e0b58800d3679dd63cd Mon Sep 17 00:00:00 2001 From: Adam Ling Date: Thu, 17 Oct 2024 15:23:39 -0700 Subject: [PATCH] SNOW-1709861: table clean up uses its own cursor (#2448) --- CHANGELOG.md | 9 ++++-- .../_internal/temp_table_auto_cleaner.py | 31 ++++++++++++++----- tests/integ/test_deepcopy.py | 3 -- 3 files changed, 30 insertions(+), 13 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9e6038a832..a5e03e6324 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,16 +4,21 @@ ### Snowpark Python API Updates +#### New Features + - Added support for 'Service' domain to `session.lineage.trace` API. - Added support for `copy_grants` parameter when registering UDxF and stored procedures. -#### New Features - #### Improvements + - Disables sql simplification when sort is performed after limit. - Previously, `df.sort().limit()` and `df.limit().sort()` generates the same query with sort in front of limit. Now, `df.limit().sort()` will generate query that reads `df.limit().sort()`. - Improve performance of generated query for `df.limit().sort()`, because limit stops table scanning as soon as the number of records is satisfied. +#### Bug Fixes + +- Fixed a bug where the automatic cleanup of temporary tables could interfere with the results of async query execution. + ### Snowpark pandas API Updates #### New Features diff --git a/src/snowflake/snowpark/_internal/temp_table_auto_cleaner.py b/src/snowflake/snowpark/_internal/temp_table_auto_cleaner.py index d5b8387a26..c53491f536 100644 --- a/src/snowflake/snowpark/_internal/temp_table_auto_cleaner.py +++ b/src/snowflake/snowpark/_internal/temp_table_auto_cleaner.py @@ -7,7 +7,7 @@ from typing import TYPE_CHECKING, Dict from snowflake.snowpark._internal.analyzer.snowflake_plan_node import SnowflakeTable -from snowflake.snowpark._internal.utils import create_rlock +from snowflake.snowpark._internal.utils import create_rlock, is_in_stored_procedure if TYPE_CHECKING: from snowflake.snowpark.session import Session # pragma: no cover @@ -51,6 +51,20 @@ def _delete_ref_count(self, name: str) -> None: # pragma: no cover self.ref_count_map[name] -= 1 current_ref_count = self.ref_count_map[name] if current_ref_count == 0: + if ( + is_in_stored_procedure() + and not self.session._conn._get_client_side_session_parameter( + "ENABLE_ASYNC_QUERY_IN_PYTHON_STORED_PROCS", False + ) + ): + warning_message = "Drop table requires async query which is not supported in stored procedure yet" + logging.warning(warning_message) + self.session._conn._telemetry_client.send_temp_table_cleanup_abnormal_exception_telemetry( + self.session.session_id, + name, + warning_message, + ) + return if ( self.session.auto_clean_up_temp_table_enabled # if the session is already closed before garbage collection, @@ -68,13 +82,14 @@ def drop_table(self, name: str) -> None: # pragma: no cover logging.debug(f"Ready to drop {common_log_text}") query_id = None try: - async_job = self.session.sql( - f"drop table if exists {name} /* internal query to drop unused temp table */", - )._internal_collect_with_tag_no_telemetry( - block=False, statement_params={DROP_TABLE_STATEMENT_PARAM_NAME: name} - ) - query_id = async_job.query_id - logging.debug(f"Dropping {common_log_text} with query id {query_id}") + with self.session.connection.cursor() as cursor: + async_job_query_id = cursor.execute_async( + command=f"drop table if exists {name}", + _statement_params={DROP_TABLE_STATEMENT_PARAM_NAME: name}, + )["queryId"] + logging.debug( + f"Dropping {common_log_text} with query id {async_job_query_id}" + ) except Exception as ex: # pragma: no cover warning_message = f"Failed to drop {common_log_text}, exception: {ex}" logging.warning(warning_message) diff --git a/tests/integ/test_deepcopy.py b/tests/integ/test_deepcopy.py index a07aa49950..d834dcd03a 100644 --- a/tests/integ/test_deepcopy.py +++ b/tests/integ/test_deepcopy.py @@ -387,9 +387,6 @@ def test_deep_nested_select(session): [ lambda session_: session_.create_dataframe([[1, 2], [3, 4]], schema=["a", "b"]), lambda session_: session_.sql("select 1 as a, 2 as b"), - lambda session_: session_.table( - session_.sql("select 1 as a, 2 as b").cache_result().table_name - ), ], ) def test_deepcopy_no_duplicate(session, generator):