Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1663726 make temp table cleaner thread safe #2309

Merged
Show file tree
Hide file tree
Changes from 71 commits
Commits
Show all changes
73 commits
Select commit Hold shift + click to select a range
56fb566
init
sfc-gh-aalam Sep 11, 2024
66003d1
make udf/sproc related files thread-safe
sfc-gh-aalam Sep 11, 2024
0e58205
Merge branch 'main' into aalam-SNOW-1418523-make-udf-sproc-thread-safe
sfc-gh-aalam Sep 11, 2024
e75dde1
init
sfc-gh-aalam Sep 11, 2024
68a8c1c
make query listener thread-safe
sfc-gh-aalam Sep 11, 2024
31a5734
Fix query_tag and last_action_id
sfc-gh-aalam Sep 11, 2024
b4dadda
core updates done
sfc-gh-aalam Sep 11, 2024
b8c6496
Add tests
sfc-gh-aalam Sep 12, 2024
f39837e
Fix local tests
sfc-gh-aalam Sep 12, 2024
31a196f
Merge branch 'main' into aalam-SNOW-1418523-make-analyzer-server_conn…
sfc-gh-aalam Sep 12, 2024
723bdf7
Merge branch 'aalam-SNOW-1418523-make-internal-session-variables-thre…
sfc-gh-aalam Sep 12, 2024
37c0419
add file IO tests
sfc-gh-aalam Sep 12, 2024
8a2d433
Merge branch 'aalam-SNOW-1418523-concurrent-file-operations' into aal…
sfc-gh-aalam Sep 12, 2024
a083989
make session._runtime_version_from_requirement safe
sfc-gh-aalam Sep 13, 2024
947d384
add sp/udf concurrent tests
sfc-gh-aalam Sep 13, 2024
fd51720
fix broken test
sfc-gh-aalam Sep 13, 2024
3077853
add udtf/udaf tests
sfc-gh-aalam Sep 13, 2024
65c3186
fix broken test
sfc-gh-aalam Sep 13, 2024
94412cf
sql_simplifier, cte_optimization, eliminate_numeric, query_compilatio…
sfc-gh-aalam Sep 13, 2024
638dd09
cover more configs
sfc-gh-aalam Sep 17, 2024
7ae2c33
fix SnowflakePlan copy
sfc-gh-aalam Sep 17, 2024
1689ebf
minor update
sfc-gh-aalam Sep 17, 2024
5e8a2d2
add description
sfc-gh-aalam Sep 17, 2024
e5b3f83
init
sfc-gh-aalam Sep 17, 2024
1c83ef2
use _package_lock to protect Session._packages
sfc-gh-aalam Sep 17, 2024
a649761
undo refactor
sfc-gh-aalam Sep 17, 2024
f03d618
undo refactor
sfc-gh-aalam Sep 17, 2024
5f398d5
fix test
sfc-gh-aalam Sep 17, 2024
3807087
fix test
sfc-gh-aalam Sep 17, 2024
4eef3e9
Merge branch 'aalam-SNOW-1418523-make-internal-session-variables-thre…
sfc-gh-aalam Sep 17, 2024
df3263c
add file IO tests
sfc-gh-aalam Sep 12, 2024
6769c54
merge with base
sfc-gh-aalam Sep 17, 2024
af86f67
merge with base
sfc-gh-aalam Sep 17, 2024
a737f33
fix test
sfc-gh-aalam Sep 17, 2024
9f2c707
merge with base
sfc-gh-aalam Sep 17, 2024
8ca2730
protect complexity bounds setter with lock
sfc-gh-aalam Sep 17, 2024
5c8389b
Merge branch 'aalam-SNOW-1663726-make-session-config-updates-thread-s…
sfc-gh-aalam Sep 18, 2024
39ea350
add tests
sfc-gh-aalam Sep 18, 2024
b616424
fix test
sfc-gh-aalam Sep 18, 2024
c10daf6
fix test
sfc-gh-aalam Sep 18, 2024
81417a3
add config context
sfc-gh-aalam Sep 19, 2024
e340567
add tests
sfc-gh-aalam Sep 19, 2024
30952bb
update documentation
sfc-gh-aalam Sep 20, 2024
03f25b5
use config context in plan compiler
sfc-gh-aalam Sep 20, 2024
6deb402
add comments
sfc-gh-aalam Sep 20, 2024
8e1dfe0
minor refactor
sfc-gh-aalam Sep 20, 2024
10bfeb4
fix test
sfc-gh-aalam Sep 20, 2024
879940a
update documentation
sfc-gh-aalam Sep 20, 2024
5aad2d9
simplify context config
sfc-gh-aalam Sep 25, 2024
669eb91
merge with base
sfc-gh-aalam Sep 25, 2024
a85a144
add config context to repeated subquery elimination resolution stage
sfc-gh-aalam Sep 25, 2024
a79ffb4
fix tests
sfc-gh-aalam Sep 26, 2024
4420350
refactor
sfc-gh-aalam Sep 26, 2024
5f1eaa6
remove do_analyze
sfc-gh-aalam Sep 27, 2024
9d62017
fix
sfc-gh-aalam Sep 27, 2024
b58aa8b
fix
sfc-gh-aalam Sep 27, 2024
db37033
fix
sfc-gh-aalam Sep 27, 2024
dddd15f
fix unit tests
sfc-gh-aalam Sep 27, 2024
57ee9e8
simplify
sfc-gh-aalam Sep 27, 2024
809a86e
simplify
sfc-gh-aalam Sep 27, 2024
6021ab8
simplify
sfc-gh-aalam Sep 27, 2024
43986f6
simplify
sfc-gh-aalam Sep 27, 2024
0430e92
simplify
sfc-gh-aalam Sep 27, 2024
095b04e
remove config context
sfc-gh-aalam Sep 30, 2024
32707f9
min-diff
sfc-gh-aalam Sep 30, 2024
3bf678d
min-diff
sfc-gh-aalam Sep 30, 2024
3eade1a
min-diff
sfc-gh-aalam Sep 30, 2024
1850d5d
Merge branch 'aalam-SNOW-1418523-make-internal-session-variables-thre…
sfc-gh-aalam Oct 2, 2024
1fa6ad2
add warnings
sfc-gh-aalam Oct 2, 2024
095708c
Merge branch 'aalam-SNOW-1663726-make-session-config-updates-thread-s…
sfc-gh-aalam Oct 2, 2024
f2ce7b7
less flaky changes
sfc-gh-aalam Oct 2, 2024
ee79515
Merge branch 'aalam-SNOW-1418523-make-internal-session-variables-thre…
sfc-gh-aalam Oct 4, 2024
20fc937
fix
sfc-gh-aalam Oct 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -113,17 +113,14 @@ def __init__(
session: Session,
query_generator: QueryGenerator,
logical_plans: List[LogicalPlan],
complexity_bounds: Tuple[int, int],
) -> None:
self.session = session
self._query_generator = query_generator
self.logical_plans = logical_plans
self._parent_map = defaultdict(set)
self.complexity_score_lower_bound = (
session.large_query_breakdown_complexity_bounds[0]
)
self.complexity_score_upper_bound = (
session.large_query_breakdown_complexity_bounds[1]
)
self.complexity_score_lower_bound = complexity_bounds[0]
self.complexity_score_upper_bound = complexity_bounds[1]

def apply(self) -> List[LogicalPlan]:
if is_active_transaction(self.session):
Expand Down
34 changes: 20 additions & 14 deletions src/snowflake/snowpark/_internal/compiler/plan_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ class PlanCompiler:

def __init__(self, plan: SnowflakePlan) -> None:
self._plan = plan
session = plan.session
self.cte_optimization_enabled = session.cte_optimization_enabled
self.large_query_breakdown_enabled = session.large_query_breakdown_enabled
self.large_query_breakdown_complexity_bounds = (
session.large_query_breakdown_complexity_bounds
)
self.query_compilation_stage_enabled = session._query_compilation_stage_enabled

def should_start_query_compilation(self) -> bool:
"""
Expand All @@ -68,15 +75,13 @@ def should_start_query_compilation(self) -> bool:
return (
not isinstance(current_session._conn, MockServerConnection)
and (self._plan.source_plan is not None)
and current_session._query_compilation_stage_enabled
and (
current_session.cte_optimization_enabled
or current_session.large_query_breakdown_enabled
)
and self.query_compilation_stage_enabled
and (self.cte_optimization_enabled or self.large_query_breakdown_enabled)
)

def compile(self) -> Dict[PlanQueryType, List[Query]]:
if self.should_start_query_compilation():
session = self._plan.session
# preparation for compilation
# 1. make a copy of the original plan
start_time = time.time()
Expand All @@ -93,7 +98,7 @@ def compile(self) -> Dict[PlanQueryType, List[Query]]:
# 3. apply each optimizations if needed
# CTE optimization
cte_start_time = time.time()
if self._plan.session.cte_optimization_enabled:
if self.cte_optimization_enabled:
repeated_subquery_eliminator = RepeatedSubqueryElimination(
logical_plans, query_generator
)
Expand All @@ -108,9 +113,12 @@ def compile(self) -> Dict[PlanQueryType, List[Query]]:
plot_plan_if_enabled(plan, f"cte_optimized_plan_{i}")

# Large query breakdown
if self._plan.session.large_query_breakdown_enabled:
if self.large_query_breakdown_enabled:
large_query_breakdown = LargeQueryBreakdown(
self._plan.session, query_generator, logical_plans
session,
query_generator,
logical_plans,
self.large_query_breakdown_complexity_bounds,
)
logical_plans = large_query_breakdown.apply()

Expand All @@ -130,11 +138,10 @@ def compile(self) -> Dict[PlanQueryType, List[Query]]:
cte_time = cte_end_time - cte_start_time
large_query_breakdown_time = large_query_breakdown_end_time - cte_end_time
total_time = time.time() - start_time
session = self._plan.session
summary_value = {
TelemetryField.CTE_OPTIMIZATION_ENABLED.value: session.cte_optimization_enabled,
TelemetryField.LARGE_QUERY_BREAKDOWN_ENABLED.value: session.large_query_breakdown_enabled,
CompilationStageTelemetryField.COMPLEXITY_SCORE_BOUNDS.value: session.large_query_breakdown_complexity_bounds,
TelemetryField.CTE_OPTIMIZATION_ENABLED.value: self.cte_optimization_enabled,
TelemetryField.LARGE_QUERY_BREAKDOWN_ENABLED.value: self.large_query_breakdown_enabled,
CompilationStageTelemetryField.COMPLEXITY_SCORE_BOUNDS.value: self.large_query_breakdown_complexity_bounds,
CompilationStageTelemetryField.TIME_TAKEN_FOR_COMPILATION.value: total_time,
CompilationStageTelemetryField.TIME_TAKEN_FOR_DEEP_COPY_PLAN.value: deep_copy_time,
CompilationStageTelemetryField.TIME_TAKEN_FOR_CTE_OPTIMIZATION.value: cte_time,
Expand All @@ -151,8 +158,7 @@ def compile(self) -> Dict[PlanQueryType, List[Query]]:
return queries
else:
final_plan = self._plan
if self._plan.session.cte_optimization_enabled:
final_plan = final_plan.replace_repeated_subquery_with_cte()
final_plan = final_plan.replace_repeated_subquery_with_cte()
return {
PlanQueryType.QUERIES: final_plan.queries,
PlanQueryType.POST_ACTIONS: final_plan.post_actions,
Expand Down
22 changes: 15 additions & 7 deletions src/snowflake/snowpark/_internal/temp_table_auto_cleaner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Copyright (c) 2012-2024 Snowflake Computing Inc. All rights reserved.
#
import logging
import threading
import weakref
from collections import defaultdict
from typing import TYPE_CHECKING, Dict
Expand Down Expand Up @@ -31,9 +32,12 @@ def __init__(self, session: "Session") -> None:
# to its reference count for later temp table management
# this dict will still be maintained even if the cleaner is stopped (`stop()` is called)
self.ref_count_map: Dict[str, int] = defaultdict(int)
# Lock to protect the ref_count_map
self.lock = threading.RLock()

def add(self, table: SnowflakeTable) -> None:
self.ref_count_map[table.name] += 1
with self.lock:
self.ref_count_map[table.name] += 1
# the finalizer will be triggered when it gets garbage collected
# and this table will be dropped finally
_ = weakref.finalize(table, self._delete_ref_count, table.name)
Expand All @@ -43,13 +47,15 @@ def _delete_ref_count(self, name: str) -> None: # pragma: no cover
Decrements the reference count of a temporary table,
and if the count reaches zero, puts this table in the queue for cleanup.
"""
self.ref_count_map[name] -= 1
if self.ref_count_map[name] == 0:
with self.lock:
self.ref_count_map[name] -= 1
current_ref_count = self.ref_count_map[name]
if current_ref_count == 0:
if self.session.auto_clean_up_temp_table_enabled:
self.drop_table(name)
elif self.ref_count_map[name] < 0:
elif current_ref_count < 0:
logging.debug(
f"Unexpected reference count {self.ref_count_map[name]} for table {name}"
f"Unexpected reference count {current_ref_count} for table {name}"
)

def drop_table(self, name: str) -> None: # pragma: no cover
Expand Down Expand Up @@ -89,9 +95,11 @@ def stop(self) -> None:

@property
def num_temp_tables_created(self) -> int:
return len(self.ref_count_map)
with self.lock:
return len(self.ref_count_map)

@property
def num_temp_tables_cleaned(self) -> int:
# TODO SNOW-1662536: we may need a separate counter for the number of tables cleaned when parameter is enabled
return sum(v == 0 for v in self.ref_count_map.values())
with self.lock:
return sum(v == 0 for v in self.ref_count_map.values())
145 changes: 86 additions & 59 deletions src/snowflake/snowpark/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,38 +339,44 @@ def __init__(self, session: "Session", conf: Dict[str, Any]) -> None:
"use_constant_subquery_alias": True,
"flatten_select_after_filter_and_orderby": True,
} # For config that's temporary/to be removed soon
self._lock = self._session._lock
for key, val in conf.items():
if self.is_mutable(key):
self.set(key, val)

def get(self, key: str, default=None) -> Any:
if hasattr(Session, key):
return getattr(self._session, key)
if hasattr(self._session._conn._conn, key):
return getattr(self._session._conn._conn, key)
return self._conf.get(key, default)
with self._lock:
if hasattr(Session, key):
return getattr(self._session, key)
if hasattr(self._session._conn._conn, key):
return getattr(self._session._conn._conn, key)
return self._conf.get(key, default)

def is_mutable(self, key: str) -> bool:
if hasattr(Session, key) and isinstance(getattr(Session, key), property):
return getattr(Session, key).fset is not None
if hasattr(SnowflakeConnection, key) and isinstance(
getattr(SnowflakeConnection, key), property
):
return getattr(SnowflakeConnection, key).fset is not None
return key in self._conf
with self._lock:
if hasattr(Session, key) and isinstance(
getattr(Session, key), property
):
return getattr(Session, key).fset is not None
if hasattr(SnowflakeConnection, key) and isinstance(
getattr(SnowflakeConnection, key), property
):
return getattr(SnowflakeConnection, key).fset is not None
return key in self._conf

def set(self, key: str, value: Any) -> None:
if self.is_mutable(key):
if hasattr(Session, key):
setattr(self._session, key, value)
if hasattr(SnowflakeConnection, key):
setattr(self._session._conn._conn, key, value)
if key in self._conf:
self._conf[key] = value
else:
raise AttributeError(
f'Configuration "{key}" does not exist or is not mutable in runtime'
)
with self._lock:
if self.is_mutable(key):
if hasattr(Session, key):
setattr(self._session, key, value)
if hasattr(SnowflakeConnection, key):
setattr(self._session._conn._conn, key, value)
if key in self._conf:
self._conf[key] = value
else:
raise AttributeError(
f'Configuration "{key}" does not exist or is not mutable in runtime'
)

class SessionBuilder:
"""
Expand Down Expand Up @@ -538,11 +544,6 @@ def __init__(
self._udtf_registration = UDTFRegistration(self)
self._udaf_registration = UDAFRegistration(self)

self._plan_builder = (
SnowflakePlanBuilder(self)
if isinstance(self._conn, ServerConnection)
else MockSnowflakePlanBuilder(self)
)
self._last_action_id = 0
self._last_canceled_id = 0
self._use_scoped_temp_objects: bool = (
Expand Down Expand Up @@ -637,6 +638,16 @@ def _analyzer(self) -> Analyzer:
)
return self._thread_store.analyzer

@property
def _plan_builder(self):
if not hasattr(self._thread_store, "plan_builder"):
self._thread_store.plan_builder = (
SnowflakePlanBuilder(self)
if isinstance(self._conn, ServerConnection)
else MockSnowflakePlanBuilder(self)
)
return self._thread_store.plan_builder

def close(self) -> None:
"""Close this session."""
if is_in_stored_procedure():
Expand Down Expand Up @@ -770,36 +781,45 @@ def custom_package_usage_config(self) -> Dict:

@sql_simplifier_enabled.setter
def sql_simplifier_enabled(self, value: bool) -> None:
self._conn._telemetry_client.send_sql_simplifier_telemetry(
self._session_id, value
)
try:
self._conn._cursor.execute(
f"alter session set {_PYTHON_SNOWPARK_USE_SQL_SIMPLIFIER_STRING} = {value}"
with self._lock:
self._conn._telemetry_client.send_sql_simplifier_telemetry(
self._session_id, value
)
except Exception:
pass
self._sql_simplifier_enabled = value
try:
self._conn._cursor.execute(
f"alter session set {_PYTHON_SNOWPARK_USE_SQL_SIMPLIFIER_STRING} = {value}"
)
except Exception:
pass
self._sql_simplifier_enabled = value

@cte_optimization_enabled.setter
@experimental_parameter(version="1.15.0")
def cte_optimization_enabled(self, value: bool) -> None:
if value:
self._conn._telemetry_client.send_cte_optimization_telemetry(
self._session_id
if threading.active_count() > 1:
# TODO (SNOW-1541096): Remove the limitation once old cte implementation is removed.
_logger.warning(
"Setting cte_optimization_enabled is not currently thread-safe. Ignoring the update"
)
self._cte_optimization_enabled = value
return
with self._lock:
if value:
self._conn._telemetry_client.send_cte_optimization_telemetry(
self._session_id
)
self._cte_optimization_enabled = value

@eliminate_numeric_sql_value_cast_enabled.setter
@experimental_parameter(version="1.20.0")
def eliminate_numeric_sql_value_cast_enabled(self, value: bool) -> None:
"""Set the value for eliminate_numeric_sql_value_cast_enabled"""

if value in [True, False]:
self._conn._telemetry_client.send_eliminate_numeric_sql_value_cast_telemetry(
self._session_id, value
)
self._eliminate_numeric_sql_value_cast_enabled = value
with self._lock:
self._conn._telemetry_client.send_eliminate_numeric_sql_value_cast_telemetry(
self._session_id, value
)
self._eliminate_numeric_sql_value_cast_enabled = value
else:
raise ValueError(
"value for eliminate_numeric_sql_value_cast_enabled must be True or False!"
Expand All @@ -810,10 +830,11 @@ def eliminate_numeric_sql_value_cast_enabled(self, value: bool) -> None:
def auto_clean_up_temp_table_enabled(self, value: bool) -> None:
"""Set the value for auto_clean_up_temp_table_enabled"""
if value in [True, False]:
self._conn._telemetry_client.send_auto_clean_up_temp_table_telemetry(
self._session_id, value
)
self._auto_clean_up_temp_table_enabled = value
with self._lock:
self._conn._telemetry_client.send_auto_clean_up_temp_table_telemetry(
self._session_id, value
)
self._auto_clean_up_temp_table_enabled = value
else:
raise ValueError(
"value for auto_clean_up_temp_table_enabled must be True or False!"
Expand All @@ -829,10 +850,11 @@ def large_query_breakdown_enabled(self, value: bool) -> None:
"""

if value in [True, False]:
self._conn._telemetry_client.send_large_query_breakdown_telemetry(
self._session_id, value
)
self._large_query_breakdown_enabled = value
with self._lock:
self._conn._telemetry_client.send_large_query_breakdown_telemetry(
self._session_id, value
)
self._large_query_breakdown_enabled = value
else:
raise ValueError(
"value for large_query_breakdown_enabled must be True or False!"
Expand All @@ -850,16 +872,20 @@ def large_query_breakdown_complexity_bounds(self, value: Tuple[int, int]) -> Non
raise ValueError(
f"Expecting a tuple of lower and upper bound with the lower bound less than the upper bound. Got (lower, upper) = ({value[0], value[1]})"
)
self._conn._telemetry_client.send_large_query_breakdown_update_complexity_bounds(
self._session_id, value[0], value[1]
)
with self._lock:
self._conn._telemetry_client.send_large_query_breakdown_update_complexity_bounds(
self._session_id, value[0], value[1]
)

self._large_query_breakdown_complexity_bounds = value
self._large_query_breakdown_complexity_bounds = value

@custom_package_usage_config.setter
@experimental_parameter(version="1.6.0")
def custom_package_usage_config(self, config: Dict) -> None:
self._custom_package_usage_config = {k.lower(): v for k, v in config.items()}
with self._lock:
self._custom_package_usage_config = {
k.lower(): v for k, v in config.items()
}

def cancel_all(self) -> None:
"""
Expand Down Expand Up @@ -1458,7 +1484,8 @@ def _get_dependency_packages(
statement_params=statement_params,
)

custom_package_usage_config = self._custom_package_usage_config.copy()
with self._lock:
custom_package_usage_config = self._custom_package_usage_config.copy()

unsupported_packages: List[str] = []
for package, package_info in package_dict.items():
Expand Down
Loading
Loading