Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
876ed88
test
samikshya-db Nov 21, 2025
0687a29
perf: Optimize telemetry latency logging to reduce overhead
samikshya-db Nov 24, 2025
695ea7f
perf: Optimize telemetry latency logging and feature flag caching
samikshya-db Nov 24, 2025
9f19c88
perf: Skip feature flag fetch when telemetry is force enabled
samikshya-db Nov 24, 2025
c76610e
style: Fix Black formatting for latency_logger.py
samikshya-db Nov 24, 2025
b97359d
test: Update telemetry test for lock-free queue
samikshya-db Nov 24, 2025
581394a
style: Apply Black formatting (single to double quotes)
samikshya-db Nov 24, 2025
564827f
fix: Prevent exception suppression in log_latency decorator
samikshya-db Nov 24, 2025
7a779c7
fix: Black formatting
samikshya-db Nov 24, 2025
4a0f81b
Add ignore_transactions config to disable transaction operations (#711)
jayantsing-db Nov 20, 2025
ac4bc8e
Ready for 4.2.1 release (#713)
vikrantpuppala Nov 20, 2025
77bac01
Change default use_hybrid_disposition to False (#714)
samikshya-db Nov 21, 2025
d6e5d85
Merge branch 'main' of https://github.com/databricks/databricks-sql-p…
samikshya-db Nov 26, 2025
f98d22a
docs: Update timing function description - remove performance claim
samikshya-db Nov 26, 2025
648aa96
docs: Simplify log_latency docstring
samikshya-db Nov 26, 2025
b793967
test: Add unit tests for FeatureFlagsContextFactory host-level caching
samikshya-db Nov 26, 2025
2bbc3e6
Merge branch 'main' into improvePerf
samikshya-db Nov 26, 2025
37d1162
fix: Add type annotation for _events_queue to resolve mypy error
samikshya-db Nov 26, 2025
97e2bea
fix: Format line to comply with Black 88-char limit
samikshya-db Nov 26, 2025
91012ad
fix: Break long comment line to comply with Black
samikshya-db Nov 26, 2025
82b74a5
fix: Apply Black formatting
samikshya-db Nov 26, 2025
d8e47db
fix: Apply Black 22.12.0 formatting (matching project version)
samikshya-db Nov 26, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions src/databricks/sql/common/feature_flag.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,9 @@ def get_instance(cls, connection: "Connection") -> FeatureFlagsContext:
cls._initialize()
assert cls._executor is not None

# Use the unique session ID as the key
key = connection.get_session_id_hex()
# Cache at HOST level - share feature flags across connections to same host
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please share your findings on what improved post this change? Do we see a memory drop after caching it at host level?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is in-general optimisation - Replicating what we have in jdbc. I did not test memory changes post this

# Feature flags are per-host, not per-session
key = connection.session.host
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this similar to how we cache feature flag values in Java?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we test this change via unit tests?

Copy link
Contributor Author

@samikshya-db samikshya-db Nov 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this similar to how we cache feature flag values in Java?

Yes, this is the key we use in java now (updated after the RL issue). Let me also add UTs. Thanks

if key not in cls._context_map:
cls._context_map[key] = FeatureFlagsContext(
connection, cls._executor, connection.session.http_client
Expand All @@ -177,7 +178,8 @@ def get_instance(cls, connection: "Connection") -> FeatureFlagsContext:
def remove_instance(cls, connection: "Connection"):
"""Removes the context for a given connection and shuts down the executor if no clients remain."""
with cls._lock:
key = connection.get_session_id_hex()
# Use host as key to match get_instance
key = connection.session.host
if key in cls._context_map:
cls._context_map.pop(key, None)

Expand Down
289 changes: 146 additions & 143 deletions src/databricks/sql/telemetry/latency_logger.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import time
import functools
from typing import Optional
from typing import Optional, Dict, Any
import logging
from databricks.sql.telemetry.telemetry_client import TelemetryClientFactory
from databricks.sql.telemetry.models.event import (
Expand All @@ -11,127 +11,141 @@
logger = logging.getLogger(__name__)


class TelemetryExtractor:
def _extract_cursor_data(cursor) -> Dict[str, Any]:
"""
Base class for extracting telemetry information from various object types.
Extract telemetry data directly from a Cursor object.

This class serves as a proxy that delegates attribute access to the wrapped object
while providing a common interface for extracting telemetry-related data.
"""

def __init__(self, obj):
self._obj = obj

def __getattr__(self, name):
return getattr(self._obj, name)

def get_session_id_hex(self):
pass

def get_statement_id(self):
pass

def get_is_compressed(self):
pass

def get_execution_result_format(self):
pass

def get_retry_count(self):
pass

def get_chunk_id(self):
pass
OPTIMIZATION: Uses direct attribute access instead of wrapper objects.
This eliminates object creation overhead and method call indirection.

Args:
cursor: The Cursor object to extract data from

class CursorExtractor(TelemetryExtractor):
Returns:
Dict with telemetry data (values may be None if extraction fails)
"""
Telemetry extractor specialized for Cursor objects.

Extracts telemetry information from database cursor objects, including
statement IDs, session information, compression settings, and result formats.
data = {}

# Extract statement_id (query_id) - direct attribute access
try:
data["statement_id"] = cursor.query_id
except (AttributeError, Exception):
data["statement_id"] = None

# Extract session_id_hex - direct method call
try:
data["session_id_hex"] = cursor.connection.get_session_id_hex()
except (AttributeError, Exception):
data["session_id_hex"] = None

# Extract is_compressed - direct attribute access
try:
data["is_compressed"] = cursor.connection.lz4_compression
except (AttributeError, Exception):
data["is_compressed"] = False

# Extract execution_result_format - inline logic
try:
if cursor.active_result_set is None:
data["execution_result"] = ExecutionResultFormat.FORMAT_UNSPECIFIED
else:
from databricks.sql.utils import ColumnQueue, CloudFetchQueue, ArrowQueue

results = cursor.active_result_set.results
if isinstance(results, ColumnQueue):
data["execution_result"] = ExecutionResultFormat.COLUMNAR_INLINE
elif isinstance(results, CloudFetchQueue):
data["execution_result"] = ExecutionResultFormat.EXTERNAL_LINKS
elif isinstance(results, ArrowQueue):
data["execution_result"] = ExecutionResultFormat.INLINE_ARROW
else:
data["execution_result"] = ExecutionResultFormat.FORMAT_UNSPECIFIED
except (AttributeError, Exception):
data["execution_result"] = ExecutionResultFormat.FORMAT_UNSPECIFIED

# Extract retry_count - direct attribute access
try:
if hasattr(cursor.backend, "retry_policy") and cursor.backend.retry_policy:
data["retry_count"] = len(cursor.backend.retry_policy.history)
else:
data["retry_count"] = 0
except (AttributeError, Exception):
data["retry_count"] = 0

# chunk_id is always None for Cursor
data["chunk_id"] = None

return data


def _extract_result_set_handler_data(handler) -> Dict[str, Any]:
"""
Extract telemetry data directly from a ResultSetDownloadHandler object.

def get_statement_id(self) -> Optional[str]:
return self.query_id

def get_session_id_hex(self) -> Optional[str]:
return self.connection.get_session_id_hex()

def get_is_compressed(self) -> bool:
return self.connection.lz4_compression

def get_execution_result_format(self) -> ExecutionResultFormat:
if self.active_result_set is None:
return ExecutionResultFormat.FORMAT_UNSPECIFIED

from databricks.sql.utils import ColumnQueue, CloudFetchQueue, ArrowQueue

if isinstance(self.active_result_set.results, ColumnQueue):
return ExecutionResultFormat.COLUMNAR_INLINE
elif isinstance(self.active_result_set.results, CloudFetchQueue):
return ExecutionResultFormat.EXTERNAL_LINKS
elif isinstance(self.active_result_set.results, ArrowQueue):
return ExecutionResultFormat.INLINE_ARROW
return ExecutionResultFormat.FORMAT_UNSPECIFIED

def get_retry_count(self) -> int:
if hasattr(self.backend, "retry_policy") and self.backend.retry_policy:
return len(self.backend.retry_policy.history)
return 0

def get_chunk_id(self):
return None
OPTIMIZATION: Uses direct attribute access instead of wrapper objects.

Args:
handler: The ResultSetDownloadHandler object to extract data from

class ResultSetDownloadHandlerExtractor(TelemetryExtractor):
"""
Telemetry extractor specialized for ResultSetDownloadHandler objects.
Returns:
Dict with telemetry data (values may be None if extraction fails)
"""
data = {}

def get_session_id_hex(self) -> Optional[str]:
return self._obj.session_id_hex
# Extract session_id_hex - direct attribute access
try:
data["session_id_hex"] = handler.session_id_hex
except (AttributeError, Exception):
data["session_id_hex"] = None

def get_statement_id(self) -> Optional[str]:
return self._obj.statement_id
# Extract statement_id - direct attribute access
try:
data["statement_id"] = handler.statement_id
except (AttributeError, Exception):
data["statement_id"] = None

def get_is_compressed(self) -> bool:
return self._obj.settings.is_lz4_compressed
# Extract is_compressed - direct attribute access
try:
data["is_compressed"] = handler.settings.is_lz4_compressed
except (AttributeError, Exception):
data["is_compressed"] = False

def get_execution_result_format(self) -> ExecutionResultFormat:
return ExecutionResultFormat.EXTERNAL_LINKS
# execution_result is always EXTERNAL_LINKS for result set handlers
data["execution_result"] = ExecutionResultFormat.EXTERNAL_LINKS

def get_retry_count(self) -> Optional[int]:
# standard requests and urllib3 libraries don't expose retry count
return None
# retry_count is not available for result set handlers
data["retry_count"] = None

# Extract chunk_id - direct attribute access
try:
data["chunk_id"] = handler.chunk_id
except (AttributeError, Exception):
data["chunk_id"] = None

def get_chunk_id(self) -> Optional[int]:
return self._obj.chunk_id
return data


def get_extractor(obj):
def _extract_telemetry_data(obj) -> Optional[Dict[str, Any]]:
"""
Factory function to create the appropriate telemetry extractor for an object.
Extract telemetry data from an object based on its type.

Determines the object type and returns the corresponding specialized extractor
that can extract telemetry information from that object type.
OPTIMIZATION: Returns a simple dict instead of creating wrapper objects.
This dict will be used to create the SqlExecutionEvent in the background thread.

Args:
obj: The object to create an extractor for. Can be a Cursor,
ResultSetDownloadHandler, or any other object.
obj: The object to extract data from (Cursor, ResultSetDownloadHandler, etc.)

Returns:
TelemetryExtractor: A specialized extractor instance:
- CursorExtractor for Cursor objects
- ResultSetDownloadHandlerExtractor for ResultSetDownloadHandler objects
- None for all other objects
Dict with telemetry data, or None if object type is not supported
"""
if obj.__class__.__name__ == "Cursor":
return CursorExtractor(obj)
elif obj.__class__.__name__ == "ResultSetDownloadHandler":
return ResultSetDownloadHandlerExtractor(obj)
obj_type = obj.__class__.__name__

if obj_type == "Cursor":
return _extract_cursor_data(obj)
elif obj_type == "ResultSetDownloadHandler":
return _extract_result_set_handler_data(obj)
else:
logger.debug("No extractor found for %s", obj.__class__.__name__)
logger.debug("No telemetry extraction available for %s", obj_type)
return None


Expand All @@ -143,12 +157,6 @@ def log_latency(statement_type: StatementType = StatementType.NONE):
data about the operation, including latency, statement information, and
execution context.

The decorator automatically:
- Measures execution time using high-precision performance counters
- Extracts telemetry information from the method's object (self)
- Creates a SqlExecutionEvent with execution details
- Sends the telemetry data asynchronously via TelemetryClient

Args:
statement_type (StatementType): The type of SQL statement being executed.

Expand All @@ -162,54 +170,49 @@ def execute(self, query):
function: A decorator that wraps methods to add latency logging.

Note:
The wrapped method's object (self) must be compatible with the
telemetry extractor system (e.g., Cursor or ResultSet objects).
The wrapped method's object (self) must be a Cursor or
ResultSetDownloadHandler for telemetry data extraction.
"""

def decorator(func):
@functools.wraps(func)
def wrapper(self, *args, **kwargs):
start_time = time.perf_counter()
result = None
start_time = time.monotonic()
try:
result = func(self, *args, **kwargs)
return result
return func(self, *args, **kwargs)
finally:

def _safe_call(func_to_call):
"""Calls a function and returns a default value on any exception."""
try:
return func_to_call()
except Exception:
return None

end_time = time.perf_counter()
duration_ms = int((end_time - start_time) * 1000)

extractor = get_extractor(self)

if extractor is not None:
session_id_hex = _safe_call(extractor.get_session_id_hex)
statement_id = _safe_call(extractor.get_statement_id)

sql_exec_event = SqlExecutionEvent(
statement_type=statement_type,
is_compressed=_safe_call(extractor.get_is_compressed),
execution_result=_safe_call(
extractor.get_execution_result_format
),
retry_count=_safe_call(extractor.get_retry_count),
chunk_id=_safe_call(extractor.get_chunk_id),
)

telemetry_client = TelemetryClientFactory.get_telemetry_client(
session_id_hex
)
telemetry_client.export_latency_log(
latency_ms=duration_ms,
sql_execution_event=sql_exec_event,
sql_statement_id=statement_id,
)
duration_ms = int((time.monotonic() - start_time) * 1000)

# Always log for debugging
logger.debug("%s completed in %dms", func.__name__, duration_ms)

# Fast check: use cached telemetry_enabled flag from connection
# Avoids dictionary lookup + instance check on every operation
connection = getattr(self, "connection", None)
if connection and getattr(connection, "telemetry_enabled", False):
session_id_hex = connection.get_session_id_hex()
if session_id_hex:
# Telemetry enabled - extract and send
telemetry_data = _extract_telemetry_data(self)
if telemetry_data:
sql_exec_event = SqlExecutionEvent(
statement_type=statement_type,
is_compressed=telemetry_data.get("is_compressed"),
execution_result=telemetry_data.get("execution_result"),
retry_count=telemetry_data.get("retry_count"),
chunk_id=telemetry_data.get("chunk_id"),
)

telemetry_client = (
TelemetryClientFactory.get_telemetry_client(
session_id_hex
)
)
telemetry_client.export_latency_log(
latency_ms=duration_ms,
sql_execution_event=sql_exec_event,
sql_statement_id=telemetry_data.get("statement_id"),
)

return wrapper

Expand Down
Loading
Loading