Skip to content

Commit

Permalink
SNOW-1709861: table clean up uses its own cursor (#2448)
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-aling authored Oct 17, 2024
1 parent 47cadd2 commit 7dc2670
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 13 deletions.
9 changes: 7 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,21 @@

### Snowpark Python API Updates

#### New Features

- Added support for 'Service' domain to `session.lineage.trace` API.
- Added support for `copy_grants` parameter when registering UDxF and stored procedures.

#### New Features

#### Improvements

- Disables sql simplification when sort is performed after limit.
- Previously, `df.sort().limit()` and `df.limit().sort()` generates the same query with sort in front of limit. Now, `df.limit().sort()` will generate query that reads `df.limit().sort()`.
- Improve performance of generated query for `df.limit().sort()`, because limit stops table scanning as soon as the number of records is satisfied.

#### Bug Fixes

- Fixed a bug where the automatic cleanup of temporary tables could interfere with the results of async query execution.

### Snowpark pandas API Updates

#### New Features
Expand Down
31 changes: 23 additions & 8 deletions src/snowflake/snowpark/_internal/temp_table_auto_cleaner.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from typing import TYPE_CHECKING, Dict

from snowflake.snowpark._internal.analyzer.snowflake_plan_node import SnowflakeTable
from snowflake.snowpark._internal.utils import create_rlock
from snowflake.snowpark._internal.utils import create_rlock, is_in_stored_procedure

if TYPE_CHECKING:
from snowflake.snowpark.session import Session # pragma: no cover
Expand Down Expand Up @@ -51,6 +51,20 @@ def _delete_ref_count(self, name: str) -> None: # pragma: no cover
self.ref_count_map[name] -= 1
current_ref_count = self.ref_count_map[name]
if current_ref_count == 0:
if (
is_in_stored_procedure()
and not self.session._conn._get_client_side_session_parameter(
"ENABLE_ASYNC_QUERY_IN_PYTHON_STORED_PROCS", False
)
):
warning_message = "Drop table requires async query which is not supported in stored procedure yet"
logging.warning(warning_message)
self.session._conn._telemetry_client.send_temp_table_cleanup_abnormal_exception_telemetry(
self.session.session_id,
name,
warning_message,
)
return
if (
self.session.auto_clean_up_temp_table_enabled
# if the session is already closed before garbage collection,
Expand All @@ -68,13 +82,14 @@ def drop_table(self, name: str) -> None: # pragma: no cover
logging.debug(f"Ready to drop {common_log_text}")
query_id = None
try:
async_job = self.session.sql(
f"drop table if exists {name} /* internal query to drop unused temp table */",
)._internal_collect_with_tag_no_telemetry(
block=False, statement_params={DROP_TABLE_STATEMENT_PARAM_NAME: name}
)
query_id = async_job.query_id
logging.debug(f"Dropping {common_log_text} with query id {query_id}")
with self.session.connection.cursor() as cursor:
async_job_query_id = cursor.execute_async(
command=f"drop table if exists {name}",
_statement_params={DROP_TABLE_STATEMENT_PARAM_NAME: name},
)["queryId"]
logging.debug(
f"Dropping {common_log_text} with query id {async_job_query_id}"
)
except Exception as ex: # pragma: no cover
warning_message = f"Failed to drop {common_log_text}, exception: {ex}"
logging.warning(warning_message)
Expand Down
3 changes: 0 additions & 3 deletions tests/integ/test_deepcopy.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,9 +387,6 @@ def test_deep_nested_select(session):
[
lambda session_: session_.create_dataframe([[1, 2], [3, 4]], schema=["a", "b"]),
lambda session_: session_.sql("select 1 as a, 2 as b"),
lambda session_: session_.table(
session_.sql("select 1 as a, 2 as b").cache_result().table_name
),
],
)
def test_deepcopy_no_duplicate(session, generator):
Expand Down

0 comments on commit 7dc2670

Please sign in to comment.