From 8ae44ad515d545062421d5b7ac9250284fe74d72 Mon Sep 17 00:00:00 2001 From: Nikhil Suri Date: Tue, 18 Nov 2025 06:51:26 +0530 Subject: [PATCH 1/9] basic e2e test for force telemetry verification Signed-off-by: Nikhil Suri --- tests/e2e/test_telemetry_e2e.py | 164 ++++++++++++++++++++++++++++++++ 1 file changed, 164 insertions(+) create mode 100644 tests/e2e/test_telemetry_e2e.py diff --git a/tests/e2e/test_telemetry_e2e.py b/tests/e2e/test_telemetry_e2e.py new file mode 100644 index 000000000..8f14aaf03 --- /dev/null +++ b/tests/e2e/test_telemetry_e2e.py @@ -0,0 +1,164 @@ +""" +E2E test for telemetry - verifies telemetry successfully sends to backend +""" +import time +import json +import threading +from unittest.mock import patch +import pytest +from concurrent.futures import wait + +from databricks.sql.telemetry.telemetry_client import ( + TelemetryClient, + TelemetryClientFactory, +) +from tests.e2e.test_driver import PySQLPytestTestCase + + +class TestTelemetryE2E(PySQLPytestTestCase): + """E2E tests for telemetry""" + + @pytest.fixture(autouse=True) + def telemetry_setup_teardown(self): + """ + This fixture ensures the TelemetryClientFactory is in a clean state + before each test and shuts it down afterward. + """ + try: + yield + finally: + if TelemetryClientFactory._executor: + TelemetryClientFactory._executor.shutdown(wait=True) + TelemetryClientFactory._executor = None + TelemetryClientFactory._stop_flush_thread() + TelemetryClientFactory._initialized = False + + def test_telemetry_sends_successfully_with_200_response(self): + """ + E2E test to verify telemetry successfully sends to backend and receives 200 response. + + This test: + 1. Enables telemetry with force_enable_telemetry + 2. Sets telemetry_batch_size=1 for immediate flushing + 3. Executes a simple query + 4. Captures the telemetry response + 5. Verifies response status is 200 (success) + + With batch_size=1, telemetry is sent immediately after each event. + """ + capture_lock = threading.Lock() + captured_futures = [] + + # Store original callback + original_callback = TelemetryClient._telemetry_request_callback + + def callback_wrapper(self_client, future, sent_count): + """ + Wraps the original callback to capture the server's response. + """ + with capture_lock: + captured_futures.append(future) + original_callback(self_client, future, sent_count) + + with patch.object( + TelemetryClient, "_telemetry_request_callback", callback_wrapper + ): + # Execute a query with telemetry enabled and batch_size=1 + with self.connection( + extra_params={ + "force_enable_telemetry": True, + "telemetry_batch_size": 1, # Immediate flushing for test + } + ) as conn: + with conn.cursor() as cursor: + cursor.execute("SELECT 1") + result = cursor.fetchone() + assert result[0] == 1 + + # Wait for telemetry to complete (max 30 seconds) + # With batch_size=1, we expect 2 events: initial_log and latency_log + timeout_seconds = 30 + start_time = time.time() + expected_event_count = 2 # initial_log + latency_log + + while ( + len(captured_futures) < expected_event_count + and time.time() - start_time < timeout_seconds + ): + time.sleep(0.1) + + # Wait for all futures to complete + done, not_done = wait(captured_futures, timeout=timeout_seconds) + assert ( + not not_done + ), f"Telemetry requests timed out: {len(not_done)} still pending" + + # Verify all responses are successful (status 200) + captured_exceptions = [] + captured_responses = [] + + for future in done: + try: + response = future.result() + + # Verify status is 200 + assert ( + 200 <= response.status < 300 + ), f"Expected 2xx status, got {response.status}" + + # Parse JSON response + response_data = ( + json.loads(response.data.decode()) if response.data else {} + ) + captured_responses.append(response_data) + + except Exception as e: + captured_exceptions.append(e) + + # Assert no exceptions occurred + assert ( + not captured_exceptions + ), f"Telemetry requests failed with exceptions: {captured_exceptions}" + + # Assert we got responses + assert len(captured_responses) > 0, "No telemetry responses received" + + # Verify response structure + for response in captured_responses: + # Should not have errors + assert ( + "errors" not in response or not response["errors"] + ), f"Telemetry response contained errors: {response.get('errors')}" + + def test_telemetry_does_not_break_driver_on_query_execution(self): + """ + E2E test to verify telemetry doesn't break driver functionality. + + This is a simpler test that just ensures: + 1. Driver works fine with telemetry enabled + 2. Query executes successfully + 3. Results are returned correctly + + If telemetry has issues, they're logged but don't break the driver. + """ + with self.connection( + extra_params={ + "force_enable_telemetry": True, + "telemetry_batch_size": 1, # Immediate flushing for test + } + ) as conn: + with conn.cursor() as cursor: + # Execute a simple query + cursor.execute("SELECT 1 as col1, 'test' as col2") + result = cursor.fetchone() + + # Verify query worked correctly + assert result[0] == 1 + assert result[1] == "test" + + # Execute another query to generate more telemetry + cursor.execute("SELECT 42") + result = cursor.fetchone() + assert result[0] == 42 + + # Test passes = telemetry didn't break driver ✅ From 07e4fc40110a3617e6112d9b5a93c6d44bf6bf25 Mon Sep 17 00:00:00 2001 From: Nikhil Suri Date: Tue, 18 Nov 2025 09:08:03 +0530 Subject: [PATCH 2/9] Added more integration test scenarios Signed-off-by: Nikhil Suri --- tests/e2e/test_telemetry_e2e.py | 928 ++++++++++++++++++++++++++++---- 1 file changed, 819 insertions(+), 109 deletions(-) diff --git a/tests/e2e/test_telemetry_e2e.py b/tests/e2e/test_telemetry_e2e.py index 8f14aaf03..ad0bda440 100644 --- a/tests/e2e/test_telemetry_e2e.py +++ b/tests/e2e/test_telemetry_e2e.py @@ -1,29 +1,55 @@ """ -E2E test for telemetry - verifies telemetry successfully sends to backend +E2E test for telemetry - verifies telemetry behavior with different scenarios """ import time import json import threading +import logging +from contextlib import contextmanager from unittest.mock import patch import pytest from concurrent.futures import wait +import databricks.sql as sql from databricks.sql.telemetry.telemetry_client import ( TelemetryClient, TelemetryClientFactory, ) -from tests.e2e.test_driver import PySQLPytestTestCase +log = logging.getLogger(__name__) -class TestTelemetryE2E(PySQLPytestTestCase): - """E2E tests for telemetry""" + +class TelemetryTestBase: + """Simplified test base class for telemetry e2e tests""" + + @pytest.fixture(autouse=True) + def get_details(self, connection_details): + self.arguments = connection_details.copy() + + def connection_params(self): + return { + "server_hostname": self.arguments["host"], + "http_path": self.arguments["http_path"], + "access_token": self.arguments.get("access_token"), + } + + @contextmanager + def connection(self, extra_params=()): + connection_params = dict(self.connection_params(), **dict(extra_params)) + log.info("Connecting with args: {}".format(connection_params)) + conn = sql.connect(**connection_params) + try: + yield conn + finally: + conn.close() + + +class TestTelemetryE2E(TelemetryTestBase): + """E2E tests for telemetry scenarios""" @pytest.fixture(autouse=True) def telemetry_setup_teardown(self): - """ - This fixture ensures the TelemetryClientFactory is in a clean state - before each test and shuts it down afterward. - """ + """Clean up telemetry client state before and after each test""" try: yield finally: @@ -33,132 +59,816 @@ def telemetry_setup_teardown(self): TelemetryClientFactory._stop_flush_thread() TelemetryClientFactory._initialized = False - def test_telemetry_sends_successfully_with_200_response(self): - """ - E2E test to verify telemetry successfully sends to backend and receives 200 response. + # ==================== REUSABLE WRAPPER METHODS ==================== - This test: - 1. Enables telemetry with force_enable_telemetry - 2. Sets telemetry_batch_size=1 for immediate flushing - 3. Executes a simple query - 4. Captures the telemetry response - 5. Verifies response status is 200 (success) + @pytest.fixture + def telemetry_interceptors(self): + """ + Setup reusable telemetry interceptors as a fixture. - With batch_size=1, telemetry is sent immediately after each event. + Returns: + Tuple of (captured_events, captured_futures, export_wrapper, callback_wrapper) """ capture_lock = threading.Lock() + captured_events = [] captured_futures = [] - # Store original callback + original_export = TelemetryClient._export_event original_callback = TelemetryClient._telemetry_request_callback + def export_wrapper(self_client, event): + """Intercept telemetry events to capture payload""" + with capture_lock: + captured_events.append(event) + return original_export(self_client, event) + def callback_wrapper(self_client, future, sent_count): - """ - Wraps the original callback to capture the server's response. - """ + """Capture HTTP response futures""" with capture_lock: captured_futures.append(future) original_callback(self_client, future, sent_count) + return captured_events, captured_futures, export_wrapper, callback_wrapper + + # ==================== ASSERTION METHODS ==================== + + def assertSystemConfiguration(self, event): + """Assert base system configuration fields""" + sys_config = event.entry.sql_driver_log.system_configuration + + assert sys_config is not None, "system_configuration should not be None" + + # Driver information + assert ( + sys_config.driver_name == "Databricks SQL Python Connector" + ), f"Expected driver_name 'Databricks SQL Python Connector', got '{sys_config.driver_name}'" + assert ( + sys_config.driver_version is not None and len(sys_config.driver_version) > 0 + ), "driver_version should not be None or empty" + + # OS information + assert ( + sys_config.os_name is not None and len(sys_config.os_name) > 0 + ), "os_name should not be None or empty" + assert ( + sys_config.os_version is not None and len(sys_config.os_version) > 0 + ), "os_version should not be None or empty" + assert ( + sys_config.os_arch is not None and len(sys_config.os_arch) > 0 + ), "os_arch should not be None or empty" + + # Runtime information + assert ( + sys_config.runtime_name is not None and len(sys_config.runtime_name) > 0 + ), "runtime_name should not be None or empty" + assert ( + sys_config.runtime_version is not None + and len(sys_config.runtime_version) > 0 + ), "runtime_version should not be None or empty" + assert ( + sys_config.runtime_vendor is not None + and len(sys_config.runtime_vendor) > 0 + ), "runtime_vendor should not be None or empty" + + # Locale and encoding + assert ( + sys_config.locale_name is not None and len(sys_config.locale_name) > 0 + ), "locale_name should not be None or empty" + assert ( + sys_config.char_set_encoding is not None + and len(sys_config.char_set_encoding) > 0 + ), "char_set_encoding should not be None or empty" + + def assertConnectionParams( + self, + event, + expected_http_path=None, + expected_mode=None, + expected_auth_mech=None, + expected_auth_flow=None, + ): + """Assert connection parameters""" + conn_params = event.entry.sql_driver_log.driver_connection_params + + assert conn_params is not None, "driver_connection_params should not be None" + + # HTTP Path + if expected_http_path: + assert ( + conn_params.http_path == expected_http_path + ), f"Expected http_path '{expected_http_path}', got '{conn_params.http_path}'" + assert ( + conn_params.http_path is not None and len(conn_params.http_path) > 0 + ), "http_path should not be None or empty" + + # Mode + if expected_mode: + assert ( + conn_params.mode == expected_mode + ), f"Expected mode '{expected_mode}', got '{conn_params.mode}'" + # Mode is optional, so don't assert it must exist + + # Host Info (HostDetails object) + assert conn_params.host_info is not None, "host_info should not be None" + + # Auth Mechanism (AuthMech object) + if expected_auth_mech: + assert ( + conn_params.auth_mech == expected_auth_mech + ), f"Expected auth_mech '{expected_auth_mech}', got '{conn_params.auth_mech}'" + assert conn_params.auth_mech is not None, "auth_mech should not be None" + + # Auth Flow (optional string) + if expected_auth_flow: + assert ( + conn_params.auth_flow == expected_auth_flow + ), f"Expected auth_flow '{expected_auth_flow}', got '{conn_params.auth_flow}'" + # auth_flow is optional, so don't assert it must exist + + # Socket Timeout + # socket_timeout is optional and can be None + if conn_params.socket_timeout is not None: + assert ( + conn_params.socket_timeout > 0 + ), f"socket_timeout should be positive, got {conn_params.socket_timeout}" + + def assertStatementExecution( + self, event, statement_type=None, execution_result=None + ): + """Assert statement execution details including operation latency""" + sql_op = event.entry.sql_driver_log.sql_operation + + assert sql_op is not None, "sql_operation should not be None for SQL execution" + + # Statement Type + if statement_type: + assert ( + sql_op.statement_type == statement_type + ), f"Expected statement_type '{statement_type}', got '{sql_op.statement_type}'" + else: + # At minimum, statement_type should exist + assert ( + sql_op.statement_type is not None + ), "statement_type should not be None" + + # Execution Result + if execution_result: + assert ( + sql_op.execution_result == execution_result + ), f"Expected execution_result '{execution_result}', got '{sql_op.execution_result}'" + else: + # At minimum, execution_result should exist + assert ( + sql_op.execution_result is not None + ), "execution_result should not be None" + + # Retry Count + assert hasattr(sql_op, "retry_count"), "sql_operation should have retry_count" + if sql_op.retry_count is not None: + assert ( + sql_op.retry_count >= 0 + ), f"retry_count should be non-negative, got {sql_op.retry_count}" + + # Operation Latency + latency = event.entry.sql_driver_log.operation_latency_ms + assert latency is not None, "operation_latency_ms should not be None" + assert latency >= 0, f"operation_latency_ms should be non-negative, got {latency}" + + def assertErrorInfo(self, event, expected_error_name=None): + """Assert error information""" + error_info = event.entry.sql_driver_log.error_info + + assert error_info is not None, "error_info should not be None for error events" + + # Error Name + assert ( + error_info.error_name is not None and len(error_info.error_name) > 0 + ), "error_name should not be None or empty" + if expected_error_name: + assert ( + error_info.error_name == expected_error_name + ), f"Expected error_name '{expected_error_name}', got '{error_info.error_name}'" + + # Stack Trace + assert ( + error_info.stack_trace is not None and len(error_info.stack_trace) > 0 + ), "stack_trace should not be None or empty" + + def assertOperationLatency(self, event): + """Assert operation latency exists""" + latency = event.entry.sql_driver_log.operation_latency_ms + assert latency is not None, "operation_latency_ms should not be None" + assert latency >= 0, "operation_latency_ms should be non-negative" + + def assertBaseTelemetryEvent(self, captured_events): + """Assert basic telemetry event payload fields""" + assert len(captured_events) > 0, "No events captured to assert" + + for event in captured_events: + telemetry_event = event.entry.sql_driver_log + assert telemetry_event.session_id is not None + + # ==================== TEST SCENARIOS ==================== + + def test_enable_telemetry_on_with_server_on_sends_events( + self, telemetry_interceptors + ): + """ + Scenario: enable_telemetry=ON, force_enable_telemetry=OFF, server=ON + Expected: 2 events (initial_log + latency_log) + """ + ( + captured_events, + captured_futures, + export_wrapper, + callback_wrapper, + ) = telemetry_interceptors + with patch.object( + TelemetryClient, "_export_event", export_wrapper + ), patch.object( TelemetryClient, "_telemetry_request_callback", callback_wrapper ): - # Execute a query with telemetry enabled and batch_size=1 with self.connection( extra_params={ - "force_enable_telemetry": True, - "telemetry_batch_size": 1, # Immediate flushing for test + "enable_telemetry": True, + "force_enable_telemetry": False, + "telemetry_batch_size": 1, } ) as conn: with conn.cursor() as cursor: cursor.execute("SELECT 1") - result = cursor.fetchone() - assert result[0] == 1 - - # Wait for telemetry to complete (max 30 seconds) - # With batch_size=1, we expect 2 events: initial_log and latency_log - timeout_seconds = 30 - start_time = time.time() - expected_event_count = 2 # initial_log + latency_log + cursor.fetchone() + statement_id = cursor.query_id - while ( - len(captured_futures) < expected_event_count - and time.time() - start_time < timeout_seconds - ): - time.sleep(0.1) + time.sleep(2) + done, not_done = wait(captured_futures, timeout=10) - # Wait for all futures to complete - done, not_done = wait(captured_futures, timeout=timeout_seconds) + # Exact count assertion assert ( - not not_done - ), f"Telemetry requests timed out: {len(not_done)} still pending" + len(captured_events) == 2 + ), f"Expected exactly 2 events, got {len(captured_events)}" + assert len(done) == 2, f"Expected exactly 2 responses, got {len(done)}" + + # Verify HTTP responses + for future in done: + response = future.result() + assert 200 <= response.status < 300 + + # Assert payload for all events + for event in captured_events: + self.assertSystemConfiguration(event) + self.assertConnectionParams( + event, expected_http_path=self.arguments["http_path"] + ) + + # Assert latency event (second event) + self.assertStatementExecution(captured_events[1]) + + print(f"\nStatement ID: {statement_id}") - # Verify all responses are successful (status 200) - captured_exceptions = [] - captured_responses = [] + def test_force_enable_on_with_enable_off_sends_events( + self, telemetry_interceptors + ): + """ + Scenario: enable_telemetry=OFF, force_enable_telemetry=ON, server=ON + Expected: 2 events (initial_log + latency_log) + """ + ( + captured_events, + captured_futures, + export_wrapper, + callback_wrapper, + ) = telemetry_interceptors + with patch.object( + TelemetryClient, "_export_event", export_wrapper + ), patch.object( + TelemetryClient, "_telemetry_request_callback", callback_wrapper + ): + with self.connection( + extra_params={ + "enable_telemetry": False, + "force_enable_telemetry": True, + "telemetry_batch_size": 1, + } + ) as conn: + with conn.cursor() as cursor: + cursor.execute("SELECT 2") + cursor.fetchone() + statement_id = cursor.query_id + + time.sleep(2) + done, not_done = wait(captured_futures, timeout=10) + + # Exact count assertion + assert ( + len(captured_events) == 2 + ), f"Expected exactly 2 events, got {len(captured_events)}" + assert len(done) == 2, f"Expected exactly 2 responses, got {len(done)}" + + # Verify HTTP responses for future in done: - try: - response = future.result() - - # Verify status is 200 - assert ( - 200 <= response.status < 300 - ), f"Expected 2xx status, got {response.status}" - - # Parse JSON response - response_data = ( - json.loads(response.data.decode()) if response.data else {} - ) - captured_responses.append(response_data) - - except Exception as e: - captured_exceptions.append(e) - - # Assert no exceptions occurred - assert ( - not captured_exceptions - ), f"Telemetry requests failed with exceptions: {captured_exceptions}" - - # Assert we got responses - assert len(captured_responses) > 0, "No telemetry responses received" - - # Verify response structure - for response in captured_responses: - # Should not have errors - assert ( - "errors" not in response or not response["errors"] - ), f"Telemetry response contained errors: {response.get('errors')}" - - def test_telemetry_does_not_break_driver_on_query_execution(self): - """ - E2E test to verify telemetry doesn't break driver functionality. - - This is a simpler test that just ensures: - 1. Driver works fine with telemetry enabled - 2. Query executes successfully - 3. Results are returned correctly - - If telemetry has issues, they're logged but don't break the driver. - """ - with self.connection( - extra_params={ - "force_enable_telemetry": True, - "telemetry_batch_size": 1, # Immediate flushing for test - } - ) as conn: - with conn.cursor() as cursor: - # Execute a simple query - cursor.execute("SELECT 1 as col1, 'test' as col2") - result = cursor.fetchone() - - # Verify query worked correctly - assert result[0] == 1 - assert result[1] == "test" - - # Execute another query to generate more telemetry - cursor.execute("SELECT 42") - result = cursor.fetchone() - assert result[0] == 42 - - # Test passes = telemetry didn't break driver ✅ + response = future.result() + assert 200 <= response.status < 300 + + # Assert payload + for event in captured_events: + self.assertSystemConfiguration(event) + self.assertConnectionParams( + event, expected_http_path=self.arguments["http_path"] + ) + + self.assertStatementExecution(captured_events[1]) + + print(f"\nStatement ID: {statement_id}") + + def test_both_flags_off_does_not_send_events(self, telemetry_interceptors): + """ + Scenario: enable_telemetry=OFF, force_enable_telemetry=OFF, server=ON + Expected: 0 events + """ + ( + captured_events, + captured_futures, + export_wrapper, + callback_wrapper, + ) = telemetry_interceptors + + with patch.object( + TelemetryClient, "_export_event", export_wrapper + ), patch.object( + TelemetryClient, "_telemetry_request_callback", callback_wrapper + ): + with self.connection( + extra_params={ + "enable_telemetry": False, + "force_enable_telemetry": False, + "telemetry_batch_size": 1, + } + ) as conn: + with conn.cursor() as cursor: + cursor.execute("SELECT 3") + cursor.fetchone() + statement_id = cursor.query_id + + time.sleep(2) + + # Exact count assertion + assert ( + len(captured_events) == 0 + ), f"Expected 0 events, got {len(captured_events)}" + assert ( + len(captured_futures) == 0 + ), f"Expected 0 responses, got {len(captured_futures)}" + + print(f"\nStatement ID: {statement_id}") + + def test_sql_error_sends_telemetry_with_error_info(self, telemetry_interceptors): + """ + Scenario: SQL query with invalid syntax causes error + Expected: Telemetry event with error_name and stack_trace + """ + ( + captured_events, + captured_futures, + export_wrapper, + callback_wrapper, + ) = telemetry_interceptors + + with patch.object( + TelemetryClient, "_export_event", export_wrapper + ), patch.object( + TelemetryClient, "_telemetry_request_callback", callback_wrapper + ): + with self.connection( + extra_params={ + "force_enable_telemetry": True, + "telemetry_batch_size": 1, + } + ) as conn: + with conn.cursor() as cursor: + # Execute query with invalid syntax to trigger immediate parse error + try: + cursor.execute("SELECT * FROM WHERE INVALID SYNTAX 12345") + cursor.fetchone() + assert False, "Query should have failed" + except Exception as e: + # Expected to fail + print(f"\nExpected error occurred: {type(e).__name__}") + + time.sleep(2) + done, not_done = wait(captured_futures, timeout=10) + + # Should have at least 1 event + assert ( + len(captured_events) >= 1 + ), f"Expected at least 1 event, got {len(captured_events)}" + + print(f"\nCaptured {len(captured_events)} events") + + # Find event with error_info (typically the middle event) + error_event = None + for idx, event in enumerate(captured_events): + error_info = event.entry.sql_driver_log.error_info + if error_info: + error_event = event + print(f"\nFound error_info in event {idx}") + break + + assert error_event is not None, "Expected at least one event with error_info" + + # Assert system configuration + self.assertSystemConfiguration(error_event) + + # Assert connection params + self.assertConnectionParams( + error_event, expected_http_path=self.arguments["http_path"] + ) + + # Assert error info with ServerOperationError + self.assertErrorInfo(error_event, expected_error_name="ServerOperationError") + + print(f"✅ Error telemetry successfully captured with error_name and stack_trace") + + def test_non_existent_table_error_sends_telemetry(self, telemetry_interceptors): + """ + Scenario: SQL query on non-existent table causes error + Expected: Telemetry event with error_name and stack_trace + Note: This test checks timing - querying non-existent table vs invalid syntax + """ + ( + captured_events, + captured_futures, + export_wrapper, + callback_wrapper, + ) = telemetry_interceptors + + with patch.object( + TelemetryClient, "_export_event", export_wrapper + ), patch.object( + TelemetryClient, "_telemetry_request_callback", callback_wrapper + ): + with self.connection( + extra_params={ + "force_enable_telemetry": True, + "telemetry_batch_size": 1, + } + ) as conn: + with conn.cursor() as cursor: + # Execute query on non-existent table + try: + cursor.execute("SELECT * FROM non_existent_table_xyz_12345") + cursor.fetchone() + assert False, "Query should have failed" + except Exception as e: + # Expected to fail + print(f"\nExpected error occurred: {type(e).__name__}") + + time.sleep(2) + done, not_done = wait(captured_futures, timeout=10) + + # Should have at least 1 event + assert ( + len(captured_events) >= 1 + ), f"Expected at least 1 event, got {len(captured_events)}" + + print(f"\nCaptured {len(captured_events)} events") + + # Find event with error_info + error_event = None + for idx, event in enumerate(captured_events): + error_info = event.entry.sql_driver_log.error_info + if error_info: + error_event = event + print(f"\nFound error_info in event {idx}") + break + + assert error_event is not None, "Expected at least one event with error_info" + + # Assert system configuration + self.assertSystemConfiguration(error_event) + + # Assert connection params + self.assertConnectionParams( + error_event, expected_http_path=self.arguments["http_path"] + ) + + # Assert error info exists + self.assertErrorInfo(error_event) + + print(f"✅ Non-existent table error telemetry captured") + + def test_metadata_get_catalogs_sends_telemetry(self, telemetry_interceptors): + """ + Scenario: Statement created and metadata (getCatalogs) called + Expected: Telemetry events with system config and connection params + """ + ( + captured_events, + captured_futures, + export_wrapper, + callback_wrapper, + ) = telemetry_interceptors + + with patch.object( + TelemetryClient, "_export_event", export_wrapper + ), patch.object( + TelemetryClient, "_telemetry_request_callback", callback_wrapper + ): + with self.connection( + extra_params={ + "force_enable_telemetry": True, + "telemetry_batch_size": 1, + } + ) as conn: + with conn.cursor() as cursor: + # Call metadata operation + catalogs = cursor.catalogs() + catalogs.fetchall() + + time.sleep(2) + done, not_done = wait(captured_futures, timeout=10) + + # Should have at least 1 event + assert ( + len(captured_events) >= 1 + ), f"Expected at least 1 event, got {len(captured_events)}" + + print(f"\nCaptured {len(captured_events)} events for getCatalogs") + + # Assert system configuration and connection params for all events + for event in captured_events: + self.assertSystemConfiguration(event) + self.assertConnectionParams( + event, expected_http_path=self.arguments["http_path"] + ) + + print(f"✅ Metadata getCatalogs telemetry captured") + + def test_direct_results_sends_telemetry(self, telemetry_interceptors): + """ + Scenario: ResultSet created with directResults (use_cloud_fetch=False) + Expected: Telemetry events with SQL execution metrics + """ + ( + captured_events, + captured_futures, + export_wrapper, + callback_wrapper, + ) = telemetry_interceptors + + with patch.object( + TelemetryClient, "_export_event", export_wrapper + ), patch.object( + TelemetryClient, "_telemetry_request_callback", callback_wrapper + ): + with self.connection( + extra_params={ + "force_enable_telemetry": True, + "telemetry_batch_size": 1, + "use_cloud_fetch": False, # Force direct results + } + ) as conn: + with conn.cursor() as cursor: + cursor.execute("SELECT 100") + result = cursor.fetchall() + assert len(result) == 1 + assert result[0][0] == 100 + + time.sleep(2) + done, not_done = wait(captured_futures, timeout=10) + + # Should have at least 2 events (initial + latency) + assert ( + len(captured_events) >= 2 + ), f"Expected at least 2 events, got {len(captured_events)}" + + print(f"\nCaptured {len(captured_events)} events for direct results") + + # Assert system configuration and connection params for all events + for event in captured_events: + self.assertSystemConfiguration(event) + self.assertConnectionParams( + event, expected_http_path=self.arguments["http_path"] + ) + + # Assert SQL execution metrics on latency event (last event) + latency_event = captured_events[-1] + self.assertStatementExecution(latency_event) + + print(f"✅ Direct results telemetry captured") + + def test_cloudfetch_no_explicit_close_sends_telemetry(self, telemetry_interceptors): + """ + Scenario: ResultSet created with cloudfetch, Statement/Connection not explicitly closed + Expected: Telemetry events sent when context managers exit + """ + ( + captured_events, + captured_futures, + export_wrapper, + callback_wrapper, + ) = telemetry_interceptors + + with patch.object( + TelemetryClient, "_export_event", export_wrapper + ), patch.object( + TelemetryClient, "_telemetry_request_callback", callback_wrapper + ): + with self.connection( + extra_params={ + "force_enable_telemetry": True, + "telemetry_batch_size": 1, + "use_cloud_fetch": True, # Enable cloud fetch + } + ) as conn: + with conn.cursor() as cursor: + cursor.execute("SELECT * FROM range(1000)") + result = cursor.fetchall() + assert len(result) == 1000 + # Statement and connection close automatically via context managers + + time.sleep(2) + done, not_done = wait(captured_futures, timeout=10) + + # Should have at least 2 events (initial + latency) + assert ( + len(captured_events) >= 2 + ), f"Expected at least 2 events, got {len(captured_events)}" + + print(f"\nCaptured {len(captured_events)} events for cloudfetch (auto close)") + + # Assert system configuration and connection params for all events + for event in captured_events: + self.assertSystemConfiguration(event) + self.assertConnectionParams( + event, expected_http_path=self.arguments["http_path"] + ) + + # Assert SQL execution metrics on latency event + latency_event = captured_events[-1] + self.assertStatementExecution(latency_event) + + print(f"✅ Cloudfetch (auto close) telemetry captured") + + def test_cloudfetch_statement_closed_sends_telemetry(self, telemetry_interceptors): + """ + Scenario: ResultSet created with cloudfetch, Statement explicitly closed + Expected: Telemetry events sent when statement closes + Note: With batch_size=1, immediate flush. With larger batch, may need connection close. + """ + ( + captured_events, + captured_futures, + export_wrapper, + callback_wrapper, + ) = telemetry_interceptors + + with patch.object( + TelemetryClient, "_export_event", export_wrapper + ), patch.object( + TelemetryClient, "_telemetry_request_callback", callback_wrapper + ): + with self.connection( + extra_params={ + "force_enable_telemetry": True, + "telemetry_batch_size": 1, + "use_cloud_fetch": True, + } + ) as conn: + cursor = conn.cursor() + cursor.execute("SELECT * FROM range(1000)") + result = cursor.fetchall() + assert len(result) == 1000 + cursor.close() # Explicitly close statement + + time.sleep(2) + done, not_done = wait(captured_futures, timeout=10) + + # Should have at least 2 events (initial + latency) + assert ( + len(captured_events) >= 2 + ), f"Expected at least 2 events, got {len(captured_events)}" + + print(f"\nCaptured {len(captured_events)} events for cloudfetch (statement close)") + + # Assert system configuration and connection params for all events + for event in captured_events: + self.assertSystemConfiguration(event) + self.assertConnectionParams( + event, expected_http_path=self.arguments["http_path"] + ) + + # Assert SQL execution metrics on latency event + latency_event = captured_events[-1] + self.assertStatementExecution(latency_event) + + print(f"✅ Cloudfetch (statement close) telemetry captured") + + def test_cloudfetch_connection_closed_sends_telemetry(self, telemetry_interceptors): + """ + Scenario: ResultSet created with cloudfetch, Connection explicitly closed + Expected: Telemetry events sent when connection closes (forces flush) + Note: Connection.close() always flushes all pending telemetry events. + """ + ( + captured_events, + captured_futures, + export_wrapper, + callback_wrapper, + ) = telemetry_interceptors + + with patch.object( + TelemetryClient, "_export_event", export_wrapper + ), patch.object( + TelemetryClient, "_telemetry_request_callback", callback_wrapper + ): + conn = sql.connect( + **self.connection_params(), + force_enable_telemetry=True, + telemetry_batch_size=1, + use_cloud_fetch=True, + ) + cursor = conn.cursor() + cursor.execute("SELECT * FROM range(1000)") + result = cursor.fetchall() + assert len(result) == 1000 + conn.close() # Explicitly close connection (forces flush) + + time.sleep(2) + done, not_done = wait(captured_futures, timeout=10) + + # Should have at least 2 events (initial + latency) + assert ( + len(captured_events) >= 2 + ), f"Expected at least 2 events, got {len(captured_events)}" + + print(f"\nCaptured {len(captured_events)} events for cloudfetch (connection close)") + + # Assert system configuration and connection params for all events + for event in captured_events: + self.assertSystemConfiguration(event) + self.assertConnectionParams( + event, expected_http_path=self.arguments["http_path"] + ) + + # Assert SQL execution metrics on latency event + latency_event = captured_events[-1] + self.assertStatementExecution(latency_event) + + print(f"✅ Cloudfetch (connection close) telemetry captured") + + def test_cloudfetch_only_resultset_closed_sends_telemetry(self, telemetry_interceptors): + """ + Scenario: ResultSet created with cloudfetch, only ResultSet closed (implicit via fetchall) + Expected: Telemetry events sent (batch_size=1 ensures immediate flush) + Note: ResultSet closes after fetchall(). Events flush due to batch_size=1. + """ + ( + captured_events, + captured_futures, + export_wrapper, + callback_wrapper, + ) = telemetry_interceptors + + with patch.object( + TelemetryClient, "_export_event", export_wrapper + ), patch.object( + TelemetryClient, "_telemetry_request_callback", callback_wrapper + ): + with self.connection( + extra_params={ + "force_enable_telemetry": True, + "telemetry_batch_size": 1, + "use_cloud_fetch": True, + } + ) as conn: + cursor = conn.cursor() + cursor.execute("SELECT * FROM range(1000)") + result = cursor.fetchall() # ResultSet implicitly closed after fetchall + assert len(result) == 1000 + # Don't explicitly close cursor or connection (context manager will) + + time.sleep(2) + done, not_done = wait(captured_futures, timeout=10) + + # Should have at least 2 events (initial + latency) + assert ( + len(captured_events) >= 2 + ), f"Expected at least 2 events, got {len(captured_events)}" + + print(f"\nCaptured {len(captured_events)} events for cloudfetch (resultset close)") + + # Assert system configuration and connection params for all events + for event in captured_events: + self.assertSystemConfiguration(event) + self.assertConnectionParams( + event, expected_http_path=self.arguments["http_path"] + ) + + # Assert SQL execution metrics on latency event + latency_event = captured_events[-1] + self.assertStatementExecution(latency_event) + + print(f"✅ Cloudfetch (resultset close) telemetry captured") From 05b9aa0a3955dc120912d4bfa1aa1460fb129700 Mon Sep 17 00:00:00 2001 From: Nikhil Suri Date: Tue, 18 Nov 2025 11:22:07 +0530 Subject: [PATCH 3/9] default on telemetry + logs to investigate failing test Signed-off-by: Nikhil Suri --- src/databricks/sql/client.py | 2 +- src/databricks/sql/common/feature_flag.py | 9 ++ .../sql/telemetry/telemetry_client.py | 25 ++++-- tests/e2e/test_telemetry_e2e.py | 88 +++++++++++++++++++ 4 files changed, 114 insertions(+), 10 deletions(-) diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index fedfafdf3..3b97a5bbf 100755 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -320,7 +320,7 @@ def read(self) -> Optional[OAuthToken]: ) self.force_enable_telemetry = kwargs.get("force_enable_telemetry", False) - self.enable_telemetry = kwargs.get("enable_telemetry", False) + self.enable_telemetry = kwargs.get("enable_telemetry", True) # Default to True for telemetry self.telemetry_enabled = TelemetryHelper.is_telemetry_enabled(self) TelemetryClientFactory.initialize_telemetry_client( diff --git a/src/databricks/sql/common/feature_flag.py b/src/databricks/sql/common/feature_flag.py index 8a1cf5bd5..238319a14 100644 --- a/src/databricks/sql/common/feature_flag.py +++ b/src/databricks/sql/common/feature_flag.py @@ -106,28 +106,37 @@ def get_flag_value(self, name: str, default_value: Any) -> Any: def _refresh_flags(self): """Performs a synchronous network request to fetch and update flags.""" + import logging + logger = logging.getLogger(__name__) + headers = {} try: # Authenticate the request self._connection.session.auth_provider.add_headers(headers) headers["User-Agent"] = self._connection.session.useragent_header + logger.info(f"Fetching feature flags from: {self._feature_flag_endpoint}") response = self._http_client.request( HttpMethod.GET, self._feature_flag_endpoint, headers=headers, timeout=30 ) + logger.info(f"Feature flag response status: {response.status}") if response.status == 200: # Parse JSON response from urllib3 response data response_data = json.loads(response.data.decode()) + logger.info(f"Feature flag response data: {response_data}") ff_response = FeatureFlagsResponse.from_dict(response_data) self._update_cache_from_response(ff_response) + logger.info(f"Feature flags loaded: {self._flags}") else: # On failure, initialize with an empty dictionary to prevent re-blocking. + logger.info(f"Feature flag fetch failed with status {response.status}, initializing empty flags") if self._flags is None: self._flags = {} except Exception as e: # On exception, initialize with an empty dictionary to prevent re-blocking. + logger.info(f"Feature flag fetch exception: {type(e).__name__}: {e}, initializing empty flags") if self._flags is None: self._flags = {} diff --git a/src/databricks/sql/telemetry/telemetry_client.py b/src/databricks/sql/telemetry/telemetry_client.py index 134757fe5..8bc4edd0f 100644 --- a/src/databricks/sql/telemetry/telemetry_client.py +++ b/src/databricks/sql/telemetry/telemetry_client.py @@ -110,15 +110,21 @@ def get_auth_flow(auth_provider): @staticmethod def is_telemetry_enabled(connection: "Connection") -> bool: if connection.force_enable_telemetry: + logger.info("Telemetry: force_enable_telemetry=True, telemetry ENABLED") return True if connection.enable_telemetry: + logger.info(f"Telemetry: enable_telemetry=True, checking feature flag: {TelemetryHelper.TELEMETRY_FEATURE_FLAG_NAME}") context = FeatureFlagsContextFactory.get_instance(connection) flag_value = context.get_flag_value( TelemetryHelper.TELEMETRY_FEATURE_FLAG_NAME, default_value=False ) - return str(flag_value).lower() == "true" + logger.info(f"Telemetry: feature flag value = '{flag_value}'") + enabled = str(flag_value).lower() == "true" + logger.info(f"Telemetry: feature flag check result = {enabled}") + return enabled else: + logger.info("Telemetry: enable_telemetry=False, telemetry DISABLED") return False @@ -191,13 +197,12 @@ def __init__( def _export_event(self, event): """Add an event to the batch queue and flush if batch is full""" - logger.debug("Exporting event for connection %s", self._session_id_hex) + logger.info(f"Exporting telemetry event for connection {self._session_id_hex}") with self._lock: self._events_batch.append(event) + logger.info(f"Event added to batch, batch size now: {len(self._events_batch)}/{self._batch_size}") if len(self._events_batch) >= self._batch_size: - logger.debug( - "Batch size limit reached (%s), flushing events", self._batch_size - ) + logger.info(f"Batch size reached ({self._batch_size}), flushing events") self._flush() def _flush(self): @@ -465,9 +470,8 @@ def initialize_telemetry_client( TelemetryClientFactory._initialize() if session_id_hex not in TelemetryClientFactory._clients: - logger.debug( - "Creating new TelemetryClient for connection %s", - session_id_hex, + logger.info( + f"Creating telemetry client for connection {session_id_hex}, telemetry_enabled={telemetry_enabled}" ) if telemetry_enabled: TelemetryClientFactory._clients[ @@ -481,14 +485,17 @@ def initialize_telemetry_client( batch_size=batch_size, client_context=client_context, ) + logger.info(f"Created TelemetryClient for connection {session_id_hex}") else: TelemetryClientFactory._clients[ session_id_hex ] = NoopTelemetryClient() + logger.info(f"Created NoopTelemetryClient for connection {session_id_hex}") except Exception as e: - logger.debug("Failed to initialize telemetry client: %s", e) + logger.info(f"Failed to initialize telemetry client: {type(e).__name__}: {e}") # Fallback to NoopTelemetryClient to ensure connection doesn't fail TelemetryClientFactory._clients[session_id_hex] = NoopTelemetryClient() + logger.info(f"Fallback to NoopTelemetryClient for connection {session_id_hex}") @staticmethod def get_telemetry_client(session_id_hex): diff --git a/tests/e2e/test_telemetry_e2e.py b/tests/e2e/test_telemetry_e2e.py index ad0bda440..3f623a851 100644 --- a/tests/e2e/test_telemetry_e2e.py +++ b/tests/e2e/test_telemetry_e2e.py @@ -277,6 +277,13 @@ def test_enable_telemetry_on_with_server_on_sends_events( Scenario: enable_telemetry=ON, force_enable_telemetry=OFF, server=ON Expected: 2 events (initial_log + latency_log) """ + from databricks.sql.telemetry.telemetry_client import TelemetryHelper + + print(f"\n{'='*80}") + print(f"TEST: test_enable_telemetry_on_with_server_on_sends_events") + print(f"Feature flag being checked: {TelemetryHelper.TELEMETRY_FEATURE_FLAG_NAME}") + print(f"{'='*80}\n") + ( captured_events, captured_futures, @@ -296,6 +303,11 @@ def test_enable_telemetry_on_with_server_on_sends_events( "telemetry_batch_size": 1, } ) as conn: + print(f"\nConnection created:") + print(f" enable_telemetry: {conn.enable_telemetry}") + print(f" force_enable_telemetry: {conn.force_enable_telemetry}") + print(f" telemetry_enabled (computed): {conn.telemetry_enabled}") + print(f" telemetry_client type: {type(conn._telemetry_client).__name__}\n") with conn.cursor() as cursor: cursor.execute("SELECT 1") cursor.fetchone() @@ -424,6 +436,82 @@ def test_both_flags_off_does_not_send_events(self, telemetry_interceptors): print(f"\nStatement ID: {statement_id}") + def test_default_behavior_sends_events_with_server_flag_on( + self, telemetry_interceptors + ): + """ + Scenario: Neither enable_telemetry nor force_enable_telemetry passed (uses defaults) + Expected: 2 events (initial_log + latency_log) when server feature flag is ON + + Default behavior: + - enable_telemetry defaults to True + - force_enable_telemetry defaults to False + - Telemetry will be sent if server feature flag is enabled + """ + from databricks.sql.telemetry.telemetry_client import TelemetryHelper + + print(f"\n{'='*80}") + print(f"TEST: test_default_behavior_sends_events_with_server_flag_on") + print(f"Feature flag being checked: {TelemetryHelper.TELEMETRY_FEATURE_FLAG_NAME}") + print(f"Testing DEFAULT behavior (no flags passed explicitly)") + print(f"{'='*80}\n") + + ( + captured_events, + captured_futures, + export_wrapper, + callback_wrapper, + ) = telemetry_interceptors + + with patch.object( + TelemetryClient, "_export_event", export_wrapper + ), patch.object( + TelemetryClient, "_telemetry_request_callback", callback_wrapper + ): + # Connection without explicit telemetry flags - relies on defaults + with self.connection( + extra_params={ + "telemetry_batch_size": 1, + } + ) as conn: + # Verify defaults are as expected + print(f"\nConnection created with DEFAULT settings:") + print(f" enable_telemetry (default): {conn.enable_telemetry}") + print(f" force_enable_telemetry (default): {conn.force_enable_telemetry}") + print(f" telemetry_enabled (computed): {conn.telemetry_enabled}") + print(f" telemetry_client type: {type(conn._telemetry_client).__name__}\n") + + with conn.cursor() as cursor: + cursor.execute("SELECT 99") + cursor.fetchone() + statement_id = cursor.query_id + + time.sleep(2) + done, not_done = wait(captured_futures, timeout=10) + + # With default enable_telemetry=True and server flag ON, expect 2 events + assert ( + len(captured_events) == 2 + ), f"Expected exactly 2 events with default settings, got {len(captured_events)}" + assert len(done) == 2, f"Expected exactly 2 responses, got {len(done)}" + + # Verify HTTP responses + for future in done: + response = future.result() + assert 200 <= response.status < 300 + + # Assert payload for all events + for event in captured_events: + self.assertSystemConfiguration(event) + self.assertConnectionParams( + event, expected_http_path=self.arguments["http_path"] + ) + + # Assert latency event (second event) + self.assertStatementExecution(captured_events[1]) + + print(f"\nStatement ID: {statement_id}") + def test_sql_error_sends_telemetry_with_error_info(self, telemetry_interceptors): """ Scenario: SQL query with invalid syntax causes error From a25e3f3aa90418986308523c0b916fe1e29bb4d5 Mon Sep 17 00:00:00 2001 From: Nikhil Suri Date: Tue, 18 Nov 2025 11:32:43 +0530 Subject: [PATCH 4/9] fixed linting issue Signed-off-by: Nikhil Suri --- src/databricks/sql/client.py | 4 +- src/databricks/sql/common/feature_flag.py | 11 +- .../sql/telemetry/telemetry_client.py | 24 +++- tests/e2e/test_telemetry_e2e.py | 111 +++++++++++------- 4 files changed, 98 insertions(+), 52 deletions(-) diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index 3b97a5bbf..91468eada 100755 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -320,7 +320,9 @@ def read(self) -> Optional[OAuthToken]: ) self.force_enable_telemetry = kwargs.get("force_enable_telemetry", False) - self.enable_telemetry = kwargs.get("enable_telemetry", True) # Default to True for telemetry + self.enable_telemetry = kwargs.get( + "enable_telemetry", True + ) # Default to True for telemetry self.telemetry_enabled = TelemetryHelper.is_telemetry_enabled(self) TelemetryClientFactory.initialize_telemetry_client( diff --git a/src/databricks/sql/common/feature_flag.py b/src/databricks/sql/common/feature_flag.py index 238319a14..7f9195b49 100644 --- a/src/databricks/sql/common/feature_flag.py +++ b/src/databricks/sql/common/feature_flag.py @@ -107,8 +107,9 @@ def get_flag_value(self, name: str, default_value: Any) -> Any: def _refresh_flags(self): """Performs a synchronous network request to fetch and update flags.""" import logging + logger = logging.getLogger(__name__) - + headers = {} try: # Authenticate the request @@ -130,13 +131,17 @@ def _refresh_flags(self): logger.info(f"Feature flags loaded: {self._flags}") else: # On failure, initialize with an empty dictionary to prevent re-blocking. - logger.info(f"Feature flag fetch failed with status {response.status}, initializing empty flags") + logger.info( + f"Feature flag fetch failed with status {response.status}, initializing empty flags" + ) if self._flags is None: self._flags = {} except Exception as e: # On exception, initialize with an empty dictionary to prevent re-blocking. - logger.info(f"Feature flag fetch exception: {type(e).__name__}: {e}, initializing empty flags") + logger.info( + f"Feature flag fetch exception: {type(e).__name__}: {e}, initializing empty flags" + ) if self._flags is None: self._flags = {} diff --git a/src/databricks/sql/telemetry/telemetry_client.py b/src/databricks/sql/telemetry/telemetry_client.py index 8bc4edd0f..cbb004011 100644 --- a/src/databricks/sql/telemetry/telemetry_client.py +++ b/src/databricks/sql/telemetry/telemetry_client.py @@ -114,7 +114,9 @@ def is_telemetry_enabled(connection: "Connection") -> bool: return True if connection.enable_telemetry: - logger.info(f"Telemetry: enable_telemetry=True, checking feature flag: {TelemetryHelper.TELEMETRY_FEATURE_FLAG_NAME}") + logger.info( + f"Telemetry: enable_telemetry=True, checking feature flag: {TelemetryHelper.TELEMETRY_FEATURE_FLAG_NAME}" + ) context = FeatureFlagsContextFactory.get_instance(connection) flag_value = context.get_flag_value( TelemetryHelper.TELEMETRY_FEATURE_FLAG_NAME, default_value=False @@ -200,7 +202,9 @@ def _export_event(self, event): logger.info(f"Exporting telemetry event for connection {self._session_id_hex}") with self._lock: self._events_batch.append(event) - logger.info(f"Event added to batch, batch size now: {len(self._events_batch)}/{self._batch_size}") + logger.info( + f"Event added to batch, batch size now: {len(self._events_batch)}/{self._batch_size}" + ) if len(self._events_batch) >= self._batch_size: logger.info(f"Batch size reached ({self._batch_size}), flushing events") self._flush() @@ -485,17 +489,25 @@ def initialize_telemetry_client( batch_size=batch_size, client_context=client_context, ) - logger.info(f"Created TelemetryClient for connection {session_id_hex}") + logger.info( + f"Created TelemetryClient for connection {session_id_hex}" + ) else: TelemetryClientFactory._clients[ session_id_hex ] = NoopTelemetryClient() - logger.info(f"Created NoopTelemetryClient for connection {session_id_hex}") + logger.info( + f"Created NoopTelemetryClient for connection {session_id_hex}" + ) except Exception as e: - logger.info(f"Failed to initialize telemetry client: {type(e).__name__}: {e}") + logger.info( + f"Failed to initialize telemetry client: {type(e).__name__}: {e}" + ) # Fallback to NoopTelemetryClient to ensure connection doesn't fail TelemetryClientFactory._clients[session_id_hex] = NoopTelemetryClient() - logger.info(f"Fallback to NoopTelemetryClient for connection {session_id_hex}") + logger.info( + f"Fallback to NoopTelemetryClient for connection {session_id_hex}" + ) @staticmethod def get_telemetry_client(session_id_hex): diff --git a/tests/e2e/test_telemetry_e2e.py b/tests/e2e/test_telemetry_e2e.py index 3f623a851..09bbd57e8 100644 --- a/tests/e2e/test_telemetry_e2e.py +++ b/tests/e2e/test_telemetry_e2e.py @@ -97,7 +97,7 @@ def assertSystemConfiguration(self, event): sys_config = event.entry.sql_driver_log.system_configuration assert sys_config is not None, "system_configuration should not be None" - + # Driver information assert ( sys_config.driver_name == "Databricks SQL Python Connector" @@ -105,7 +105,7 @@ def assertSystemConfiguration(self, event): assert ( sys_config.driver_version is not None and len(sys_config.driver_version) > 0 ), "driver_version should not be None or empty" - + # OS information assert ( sys_config.os_name is not None and len(sys_config.os_name) > 0 @@ -116,7 +116,7 @@ def assertSystemConfiguration(self, event): assert ( sys_config.os_arch is not None and len(sys_config.os_arch) > 0 ), "os_arch should not be None or empty" - + # Runtime information assert ( sys_config.runtime_name is not None and len(sys_config.runtime_name) > 0 @@ -126,10 +126,9 @@ def assertSystemConfiguration(self, event): and len(sys_config.runtime_version) > 0 ), "runtime_version should not be None or empty" assert ( - sys_config.runtime_vendor is not None - and len(sys_config.runtime_vendor) > 0 + sys_config.runtime_vendor is not None and len(sys_config.runtime_vendor) > 0 ), "runtime_vendor should not be None or empty" - + # Locale and encoding assert ( sys_config.locale_name is not None and len(sys_config.locale_name) > 0 @@ -151,7 +150,7 @@ def assertConnectionParams( conn_params = event.entry.sql_driver_log.driver_connection_params assert conn_params is not None, "driver_connection_params should not be None" - + # HTTP Path if expected_http_path: assert ( @@ -160,31 +159,31 @@ def assertConnectionParams( assert ( conn_params.http_path is not None and len(conn_params.http_path) > 0 ), "http_path should not be None or empty" - + # Mode if expected_mode: assert ( conn_params.mode == expected_mode ), f"Expected mode '{expected_mode}', got '{conn_params.mode}'" # Mode is optional, so don't assert it must exist - + # Host Info (HostDetails object) assert conn_params.host_info is not None, "host_info should not be None" - + # Auth Mechanism (AuthMech object) if expected_auth_mech: assert ( conn_params.auth_mech == expected_auth_mech ), f"Expected auth_mech '{expected_auth_mech}', got '{conn_params.auth_mech}'" assert conn_params.auth_mech is not None, "auth_mech should not be None" - + # Auth Flow (optional string) if expected_auth_flow: assert ( conn_params.auth_flow == expected_auth_flow ), f"Expected auth_flow '{expected_auth_flow}', got '{conn_params.auth_flow}'" # auth_flow is optional, so don't assert it must exist - + # Socket Timeout # socket_timeout is optional and can be None if conn_params.socket_timeout is not None: @@ -199,7 +198,7 @@ def assertStatementExecution( sql_op = event.entry.sql_driver_log.sql_operation assert sql_op is not None, "sql_operation should not be None for SQL execution" - + # Statement Type if statement_type: assert ( @@ -210,7 +209,7 @@ def assertStatementExecution( assert ( sql_op.statement_type is not None ), "statement_type should not be None" - + # Execution Result if execution_result: assert ( @@ -221,25 +220,27 @@ def assertStatementExecution( assert ( sql_op.execution_result is not None ), "execution_result should not be None" - + # Retry Count assert hasattr(sql_op, "retry_count"), "sql_operation should have retry_count" if sql_op.retry_count is not None: assert ( sql_op.retry_count >= 0 ), f"retry_count should be non-negative, got {sql_op.retry_count}" - + # Operation Latency latency = event.entry.sql_driver_log.operation_latency_ms assert latency is not None, "operation_latency_ms should not be None" - assert latency >= 0, f"operation_latency_ms should be non-negative, got {latency}" + assert ( + latency >= 0 + ), f"operation_latency_ms should be non-negative, got {latency}" def assertErrorInfo(self, event, expected_error_name=None): """Assert error information""" error_info = event.entry.sql_driver_log.error_info assert error_info is not None, "error_info should not be None for error events" - + # Error Name assert ( error_info.error_name is not None and len(error_info.error_name) > 0 @@ -248,7 +249,7 @@ def assertErrorInfo(self, event, expected_error_name=None): assert ( error_info.error_name == expected_error_name ), f"Expected error_name '{expected_error_name}', got '{error_info.error_name}'" - + # Stack Trace assert ( error_info.stack_trace is not None and len(error_info.stack_trace) > 0 @@ -278,12 +279,14 @@ def test_enable_telemetry_on_with_server_on_sends_events( Expected: 2 events (initial_log + latency_log) """ from databricks.sql.telemetry.telemetry_client import TelemetryHelper - + print(f"\n{'='*80}") print(f"TEST: test_enable_telemetry_on_with_server_on_sends_events") - print(f"Feature flag being checked: {TelemetryHelper.TELEMETRY_FEATURE_FLAG_NAME}") + print( + f"Feature flag being checked: {TelemetryHelper.TELEMETRY_FEATURE_FLAG_NAME}" + ) print(f"{'='*80}\n") - + ( captured_events, captured_futures, @@ -307,7 +310,9 @@ def test_enable_telemetry_on_with_server_on_sends_events( print(f" enable_telemetry: {conn.enable_telemetry}") print(f" force_enable_telemetry: {conn.force_enable_telemetry}") print(f" telemetry_enabled (computed): {conn.telemetry_enabled}") - print(f" telemetry_client type: {type(conn._telemetry_client).__name__}\n") + print( + f" telemetry_client type: {type(conn._telemetry_client).__name__}\n" + ) with conn.cursor() as cursor: cursor.execute("SELECT 1") cursor.fetchone() @@ -339,9 +344,7 @@ def test_enable_telemetry_on_with_server_on_sends_events( print(f"\nStatement ID: {statement_id}") - def test_force_enable_on_with_enable_off_sends_events( - self, telemetry_interceptors - ): + def test_force_enable_on_with_enable_off_sends_events(self, telemetry_interceptors): """ Scenario: enable_telemetry=OFF, force_enable_telemetry=ON, server=ON Expected: 2 events (initial_log + latency_log) @@ -442,20 +445,22 @@ def test_default_behavior_sends_events_with_server_flag_on( """ Scenario: Neither enable_telemetry nor force_enable_telemetry passed (uses defaults) Expected: 2 events (initial_log + latency_log) when server feature flag is ON - + Default behavior: - enable_telemetry defaults to True - force_enable_telemetry defaults to False - Telemetry will be sent if server feature flag is enabled """ from databricks.sql.telemetry.telemetry_client import TelemetryHelper - + print(f"\n{'='*80}") print(f"TEST: test_default_behavior_sends_events_with_server_flag_on") - print(f"Feature flag being checked: {TelemetryHelper.TELEMETRY_FEATURE_FLAG_NAME}") + print( + f"Feature flag being checked: {TelemetryHelper.TELEMETRY_FEATURE_FLAG_NAME}" + ) print(f"Testing DEFAULT behavior (no flags passed explicitly)") print(f"{'='*80}\n") - + ( captured_events, captured_futures, @@ -477,10 +482,14 @@ def test_default_behavior_sends_events_with_server_flag_on( # Verify defaults are as expected print(f"\nConnection created with DEFAULT settings:") print(f" enable_telemetry (default): {conn.enable_telemetry}") - print(f" force_enable_telemetry (default): {conn.force_enable_telemetry}") + print( + f" force_enable_telemetry (default): {conn.force_enable_telemetry}" + ) print(f" telemetry_enabled (computed): {conn.telemetry_enabled}") - print(f" telemetry_client type: {type(conn._telemetry_client).__name__}\n") - + print( + f" telemetry_client type: {type(conn._telemetry_client).__name__}\n" + ) + with conn.cursor() as cursor: cursor.execute("SELECT 99") cursor.fetchone() @@ -564,7 +573,9 @@ def test_sql_error_sends_telemetry_with_error_info(self, telemetry_interceptors) print(f"\nFound error_info in event {idx}") break - assert error_event is not None, "Expected at least one event with error_info" + assert ( + error_event is not None + ), "Expected at least one event with error_info" # Assert system configuration self.assertSystemConfiguration(error_event) @@ -575,9 +586,13 @@ def test_sql_error_sends_telemetry_with_error_info(self, telemetry_interceptors) ) # Assert error info with ServerOperationError - self.assertErrorInfo(error_event, expected_error_name="ServerOperationError") + self.assertErrorInfo( + error_event, expected_error_name="ServerOperationError" + ) - print(f"✅ Error telemetry successfully captured with error_name and stack_trace") + print( + f"✅ Error telemetry successfully captured with error_name and stack_trace" + ) def test_non_existent_table_error_sends_telemetry(self, telemetry_interceptors): """ @@ -632,7 +647,9 @@ def test_non_existent_table_error_sends_telemetry(self, telemetry_interceptors): print(f"\nFound error_info in event {idx}") break - assert error_event is not None, "Expected at least one event with error_info" + assert ( + error_event is not None + ), "Expected at least one event with error_info" # Assert system configuration self.assertSystemConfiguration(error_event) @@ -785,7 +802,9 @@ def test_cloudfetch_no_explicit_close_sends_telemetry(self, telemetry_intercepto len(captured_events) >= 2 ), f"Expected at least 2 events, got {len(captured_events)}" - print(f"\nCaptured {len(captured_events)} events for cloudfetch (auto close)") + print( + f"\nCaptured {len(captured_events)} events for cloudfetch (auto close)" + ) # Assert system configuration and connection params for all events for event in captured_events: @@ -839,7 +858,9 @@ def test_cloudfetch_statement_closed_sends_telemetry(self, telemetry_interceptor len(captured_events) >= 2 ), f"Expected at least 2 events, got {len(captured_events)}" - print(f"\nCaptured {len(captured_events)} events for cloudfetch (statement close)") + print( + f"\nCaptured {len(captured_events)} events for cloudfetch (statement close)" + ) # Assert system configuration and connection params for all events for event in captured_events: @@ -892,7 +913,9 @@ def test_cloudfetch_connection_closed_sends_telemetry(self, telemetry_intercepto len(captured_events) >= 2 ), f"Expected at least 2 events, got {len(captured_events)}" - print(f"\nCaptured {len(captured_events)} events for cloudfetch (connection close)") + print( + f"\nCaptured {len(captured_events)} events for cloudfetch (connection close)" + ) # Assert system configuration and connection params for all events for event in captured_events: @@ -907,7 +930,9 @@ def test_cloudfetch_connection_closed_sends_telemetry(self, telemetry_intercepto print(f"✅ Cloudfetch (connection close) telemetry captured") - def test_cloudfetch_only_resultset_closed_sends_telemetry(self, telemetry_interceptors): + def test_cloudfetch_only_resultset_closed_sends_telemetry( + self, telemetry_interceptors + ): """ Scenario: ResultSet created with cloudfetch, only ResultSet closed (implicit via fetchall) Expected: Telemetry events sent (batch_size=1 ensures immediate flush) @@ -946,7 +971,9 @@ def test_cloudfetch_only_resultset_closed_sends_telemetry(self, telemetry_interc len(captured_events) >= 2 ), f"Expected at least 2 events, got {len(captured_events)}" - print(f"\nCaptured {len(captured_events)} events for cloudfetch (resultset close)") + print( + f"\nCaptured {len(captured_events)} events for cloudfetch (resultset close)" + ) # Assert system configuration and connection params for all events for event in captured_events: From b005890f34135d029874c1fd220dee58553cca72 Mon Sep 17 00:00:00 2001 From: Nikhil Suri Date: Wed, 19 Nov 2025 05:51:20 +0530 Subject: [PATCH 5/9] added more logs to identify server side flag evaluation Signed-off-by: Nikhil Suri --- src/databricks/sql/common/feature_flag.py | 18 +++++--- .../sql/telemetry/telemetry_client.py | 41 +++++++++++++++---- 2 files changed, 45 insertions(+), 14 deletions(-) diff --git a/src/databricks/sql/common/feature_flag.py b/src/databricks/sql/common/feature_flag.py index 7f9195b49..9e26ea183 100644 --- a/src/databricks/sql/common/feature_flag.py +++ b/src/databricks/sql/common/feature_flag.py @@ -116,23 +116,29 @@ def _refresh_flags(self): self._connection.session.auth_provider.add_headers(headers) headers["User-Agent"] = self._connection.session.useragent_header - logger.info(f"Fetching feature flags from: {self._feature_flag_endpoint}") + logger.info( + f"Feature Flags: Sending GET request to endpoint: {self._feature_flag_endpoint}" + ) response = self._http_client.request( HttpMethod.GET, self._feature_flag_endpoint, headers=headers, timeout=30 ) - logger.info(f"Feature flag response status: {response.status}") + logger.info(f"Feature Flags: HTTP Response status code: {response.status}") + if response.status == 200: # Parse JSON response from urllib3 response data response_data = json.loads(response.data.decode()) - logger.info(f"Feature flag response data: {response_data}") + logger.info( + f"Feature Flags: ✓ SUCCESS - Received {len(response_data.get('flags', []))} flags from server" + ) + logger.info(f"Feature Flags: Response data: {response_data}") ff_response = FeatureFlagsResponse.from_dict(response_data) self._update_cache_from_response(ff_response) - logger.info(f"Feature flags loaded: {self._flags}") + logger.info(f"Feature Flags: Loaded into cache: {self._flags}") else: # On failure, initialize with an empty dictionary to prevent re-blocking. logger.info( - f"Feature flag fetch failed with status {response.status}, initializing empty flags" + f"Feature Flags: ✗ FAILED - Non-200 status code {response.status}, using empty flags" ) if self._flags is None: self._flags = {} @@ -140,7 +146,7 @@ def _refresh_flags(self): except Exception as e: # On exception, initialize with an empty dictionary to prevent re-blocking. logger.info( - f"Feature flag fetch exception: {type(e).__name__}: {e}, initializing empty flags" + f"Feature Flags: ✗ EXCEPTION - {type(e).__name__}: {e}, using empty flags" ) if self._flags is None: self._flags = {} diff --git a/src/databricks/sql/telemetry/telemetry_client.py b/src/databricks/sql/telemetry/telemetry_client.py index cbb004011..90d561ca5 100644 --- a/src/databricks/sql/telemetry/telemetry_client.py +++ b/src/databricks/sql/telemetry/telemetry_client.py @@ -114,17 +114,42 @@ def is_telemetry_enabled(connection: "Connection") -> bool: return True if connection.enable_telemetry: + from databricks.sql import __version__ + + endpoint = f"https://{connection.session.host}/api/2.0/connector-service/feature-flags/PYTHON/{__version__}" logger.info( - f"Telemetry: enable_telemetry=True, checking feature flag: {TelemetryHelper.TELEMETRY_FEATURE_FLAG_NAME}" + f"Telemetry: enable_telemetry=True, checking server-side feature flag" ) - context = FeatureFlagsContextFactory.get_instance(connection) - flag_value = context.get_flag_value( - TelemetryHelper.TELEMETRY_FEATURE_FLAG_NAME, default_value=False + logger.info(f"Telemetry: Feature flag endpoint: {endpoint}") + logger.info( + f"Telemetry: Looking for flag: {TelemetryHelper.TELEMETRY_FEATURE_FLAG_NAME}" ) - logger.info(f"Telemetry: feature flag value = '{flag_value}'") - enabled = str(flag_value).lower() == "true" - logger.info(f"Telemetry: feature flag check result = {enabled}") - return enabled + + try: + context = FeatureFlagsContextFactory.get_instance(connection) + flag_value = context.get_flag_value( + TelemetryHelper.TELEMETRY_FEATURE_FLAG_NAME, default_value=False + ) + logger.info( + f"Telemetry: Feature flag fetch SUCCESS - received value: '{flag_value}'" + ) + + enabled = str(flag_value).lower() == "true" + if enabled: + logger.info( + f"Telemetry: ✓ Server flag is 'true', telemetry ENABLED" + ) + else: + logger.info( + f"Telemetry: ✗ Server flag is '{flag_value}' (not 'true'), telemetry DISABLED" + ) + return enabled + except Exception as e: + logger.info( + f"Telemetry: Feature flag fetch FAILED with exception: {type(e).__name__}: {e}" + ) + logger.info("Telemetry: Defaulting to DISABLED due to fetch failure") + return False else: logger.info("Telemetry: enable_telemetry=False, telemetry DISABLED") return False From 785d12c6198bd37a9858583652ef38e9baf0fbd3 Mon Sep 17 00:00:00 2001 From: Nikhil Suri Date: Thu, 20 Nov 2025 11:25:13 +0530 Subject: [PATCH 6/9] remove unused logs Signed-off-by: Nikhil Suri --- src/databricks/sql/client.py | 4 +- src/databricks/sql/common/feature_flag.py | 20 ------ .../sql/telemetry/telemetry_client.py | 68 ++++--------------- 3 files changed, 13 insertions(+), 79 deletions(-) diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index 91468eada..fedfafdf3 100755 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -320,9 +320,7 @@ def read(self) -> Optional[OAuthToken]: ) self.force_enable_telemetry = kwargs.get("force_enable_telemetry", False) - self.enable_telemetry = kwargs.get( - "enable_telemetry", True - ) # Default to True for telemetry + self.enable_telemetry = kwargs.get("enable_telemetry", False) self.telemetry_enabled = TelemetryHelper.is_telemetry_enabled(self) TelemetryClientFactory.initialize_telemetry_client( diff --git a/src/databricks/sql/common/feature_flag.py b/src/databricks/sql/common/feature_flag.py index 9e26ea183..8a1cf5bd5 100644 --- a/src/databricks/sql/common/feature_flag.py +++ b/src/databricks/sql/common/feature_flag.py @@ -106,48 +106,28 @@ def get_flag_value(self, name: str, default_value: Any) -> Any: def _refresh_flags(self): """Performs a synchronous network request to fetch and update flags.""" - import logging - - logger = logging.getLogger(__name__) - headers = {} try: # Authenticate the request self._connection.session.auth_provider.add_headers(headers) headers["User-Agent"] = self._connection.session.useragent_header - logger.info( - f"Feature Flags: Sending GET request to endpoint: {self._feature_flag_endpoint}" - ) response = self._http_client.request( HttpMethod.GET, self._feature_flag_endpoint, headers=headers, timeout=30 ) - logger.info(f"Feature Flags: HTTP Response status code: {response.status}") - if response.status == 200: # Parse JSON response from urllib3 response data response_data = json.loads(response.data.decode()) - logger.info( - f"Feature Flags: ✓ SUCCESS - Received {len(response_data.get('flags', []))} flags from server" - ) - logger.info(f"Feature Flags: Response data: {response_data}") ff_response = FeatureFlagsResponse.from_dict(response_data) self._update_cache_from_response(ff_response) - logger.info(f"Feature Flags: Loaded into cache: {self._flags}") else: # On failure, initialize with an empty dictionary to prevent re-blocking. - logger.info( - f"Feature Flags: ✗ FAILED - Non-200 status code {response.status}, using empty flags" - ) if self._flags is None: self._flags = {} except Exception as e: # On exception, initialize with an empty dictionary to prevent re-blocking. - logger.info( - f"Feature Flags: ✗ EXCEPTION - {type(e).__name__}: {e}, using empty flags" - ) if self._flags is None: self._flags = {} diff --git a/src/databricks/sql/telemetry/telemetry_client.py b/src/databricks/sql/telemetry/telemetry_client.py index 90d561ca5..134757fe5 100644 --- a/src/databricks/sql/telemetry/telemetry_client.py +++ b/src/databricks/sql/telemetry/telemetry_client.py @@ -110,48 +110,15 @@ def get_auth_flow(auth_provider): @staticmethod def is_telemetry_enabled(connection: "Connection") -> bool: if connection.force_enable_telemetry: - logger.info("Telemetry: force_enable_telemetry=True, telemetry ENABLED") return True if connection.enable_telemetry: - from databricks.sql import __version__ - - endpoint = f"https://{connection.session.host}/api/2.0/connector-service/feature-flags/PYTHON/{__version__}" - logger.info( - f"Telemetry: enable_telemetry=True, checking server-side feature flag" - ) - logger.info(f"Telemetry: Feature flag endpoint: {endpoint}") - logger.info( - f"Telemetry: Looking for flag: {TelemetryHelper.TELEMETRY_FEATURE_FLAG_NAME}" + context = FeatureFlagsContextFactory.get_instance(connection) + flag_value = context.get_flag_value( + TelemetryHelper.TELEMETRY_FEATURE_FLAG_NAME, default_value=False ) - - try: - context = FeatureFlagsContextFactory.get_instance(connection) - flag_value = context.get_flag_value( - TelemetryHelper.TELEMETRY_FEATURE_FLAG_NAME, default_value=False - ) - logger.info( - f"Telemetry: Feature flag fetch SUCCESS - received value: '{flag_value}'" - ) - - enabled = str(flag_value).lower() == "true" - if enabled: - logger.info( - f"Telemetry: ✓ Server flag is 'true', telemetry ENABLED" - ) - else: - logger.info( - f"Telemetry: ✗ Server flag is '{flag_value}' (not 'true'), telemetry DISABLED" - ) - return enabled - except Exception as e: - logger.info( - f"Telemetry: Feature flag fetch FAILED with exception: {type(e).__name__}: {e}" - ) - logger.info("Telemetry: Defaulting to DISABLED due to fetch failure") - return False + return str(flag_value).lower() == "true" else: - logger.info("Telemetry: enable_telemetry=False, telemetry DISABLED") return False @@ -224,14 +191,13 @@ def __init__( def _export_event(self, event): """Add an event to the batch queue and flush if batch is full""" - logger.info(f"Exporting telemetry event for connection {self._session_id_hex}") + logger.debug("Exporting event for connection %s", self._session_id_hex) with self._lock: self._events_batch.append(event) - logger.info( - f"Event added to batch, batch size now: {len(self._events_batch)}/{self._batch_size}" - ) if len(self._events_batch) >= self._batch_size: - logger.info(f"Batch size reached ({self._batch_size}), flushing events") + logger.debug( + "Batch size limit reached (%s), flushing events", self._batch_size + ) self._flush() def _flush(self): @@ -499,8 +465,9 @@ def initialize_telemetry_client( TelemetryClientFactory._initialize() if session_id_hex not in TelemetryClientFactory._clients: - logger.info( - f"Creating telemetry client for connection {session_id_hex}, telemetry_enabled={telemetry_enabled}" + logger.debug( + "Creating new TelemetryClient for connection %s", + session_id_hex, ) if telemetry_enabled: TelemetryClientFactory._clients[ @@ -514,25 +481,14 @@ def initialize_telemetry_client( batch_size=batch_size, client_context=client_context, ) - logger.info( - f"Created TelemetryClient for connection {session_id_hex}" - ) else: TelemetryClientFactory._clients[ session_id_hex ] = NoopTelemetryClient() - logger.info( - f"Created NoopTelemetryClient for connection {session_id_hex}" - ) except Exception as e: - logger.info( - f"Failed to initialize telemetry client: {type(e).__name__}: {e}" - ) + logger.debug("Failed to initialize telemetry client: %s", e) # Fallback to NoopTelemetryClient to ensure connection doesn't fail TelemetryClientFactory._clients[session_id_hex] = NoopTelemetryClient() - logger.info( - f"Fallback to NoopTelemetryClient for connection {session_id_hex}" - ) @staticmethod def get_telemetry_client(session_id_hex): From 8394fabbdfea83720deb2d1d58161feb515bcc38 Mon Sep 17 00:00:00 2001 From: Nikhil Suri Date: Fri, 21 Nov 2025 15:52:20 +0530 Subject: [PATCH 7/9] fix broken test case for default enable telemetry Signed-off-by: Nikhil Suri --- tests/e2e/test_telemetry_e2e.py | 110 ++++---------------------------- 1 file changed, 12 insertions(+), 98 deletions(-) diff --git a/tests/e2e/test_telemetry_e2e.py b/tests/e2e/test_telemetry_e2e.py index 09bbd57e8..a42873aa4 100644 --- a/tests/e2e/test_telemetry_e2e.py +++ b/tests/e2e/test_telemetry_e2e.py @@ -278,15 +278,6 @@ def test_enable_telemetry_on_with_server_on_sends_events( Scenario: enable_telemetry=ON, force_enable_telemetry=OFF, server=ON Expected: 2 events (initial_log + latency_log) """ - from databricks.sql.telemetry.telemetry_client import TelemetryHelper - - print(f"\n{'='*80}") - print(f"TEST: test_enable_telemetry_on_with_server_on_sends_events") - print( - f"Feature flag being checked: {TelemetryHelper.TELEMETRY_FEATURE_FLAG_NAME}" - ) - print(f"{'='*80}\n") - ( captured_events, captured_futures, @@ -306,13 +297,6 @@ def test_enable_telemetry_on_with_server_on_sends_events( "telemetry_batch_size": 1, } ) as conn: - print(f"\nConnection created:") - print(f" enable_telemetry: {conn.enable_telemetry}") - print(f" force_enable_telemetry: {conn.force_enable_telemetry}") - print(f" telemetry_enabled (computed): {conn.telemetry_enabled}") - print( - f" telemetry_client type: {type(conn._telemetry_client).__name__}\n" - ) with conn.cursor() as cursor: cursor.execute("SELECT 1") cursor.fetchone() @@ -342,8 +326,6 @@ def test_enable_telemetry_on_with_server_on_sends_events( # Assert latency event (second event) self.assertStatementExecution(captured_events[1]) - print(f"\nStatement ID: {statement_id}") - def test_force_enable_on_with_enable_off_sends_events(self, telemetry_interceptors): """ Scenario: enable_telemetry=OFF, force_enable_telemetry=ON, server=ON @@ -396,7 +378,6 @@ def test_force_enable_on_with_enable_off_sends_events(self, telemetry_intercepto self.assertStatementExecution(captured_events[1]) - print(f"\nStatement ID: {statement_id}") def test_both_flags_off_does_not_send_events(self, telemetry_interceptors): """ @@ -437,30 +418,19 @@ def test_both_flags_off_does_not_send_events(self, telemetry_interceptors): len(captured_futures) == 0 ), f"Expected 0 responses, got {len(captured_futures)}" - print(f"\nStatement ID: {statement_id}") - def test_default_behavior_sends_events_with_server_flag_on( + def test_default_behavior_does_not_send_events( self, telemetry_interceptors ): """ Scenario: Neither enable_telemetry nor force_enable_telemetry passed (uses defaults) - Expected: 2 events (initial_log + latency_log) when server feature flag is ON + Expected: 0 events (telemetry disabled by default) Default behavior: - - enable_telemetry defaults to True + - enable_telemetry defaults to False - force_enable_telemetry defaults to False - - Telemetry will be sent if server feature flag is enabled + - Telemetry will NOT be sent (NoopTelemetryClient used) """ - from databricks.sql.telemetry.telemetry_client import TelemetryHelper - - print(f"\n{'='*80}") - print(f"TEST: test_default_behavior_sends_events_with_server_flag_on") - print( - f"Feature flag being checked: {TelemetryHelper.TELEMETRY_FEATURE_FLAG_NAME}" - ) - print(f"Testing DEFAULT behavior (no flags passed explicitly)") - print(f"{'='*80}\n") - ( captured_events, captured_futures, @@ -479,47 +449,19 @@ def test_default_behavior_sends_events_with_server_flag_on( "telemetry_batch_size": 1, } ) as conn: - # Verify defaults are as expected - print(f"\nConnection created with DEFAULT settings:") - print(f" enable_telemetry (default): {conn.enable_telemetry}") - print( - f" force_enable_telemetry (default): {conn.force_enable_telemetry}" - ) - print(f" telemetry_enabled (computed): {conn.telemetry_enabled}") - print( - f" telemetry_client type: {type(conn._telemetry_client).__name__}\n" - ) - with conn.cursor() as cursor: cursor.execute("SELECT 99") cursor.fetchone() - statement_id = cursor.query_id time.sleep(2) - done, not_done = wait(captured_futures, timeout=10) - # With default enable_telemetry=True and server flag ON, expect 2 events + # With default enable_telemetry=False, expect 0 events assert ( - len(captured_events) == 2 - ), f"Expected exactly 2 events with default settings, got {len(captured_events)}" - assert len(done) == 2, f"Expected exactly 2 responses, got {len(done)}" - - # Verify HTTP responses - for future in done: - response = future.result() - assert 200 <= response.status < 300 - - # Assert payload for all events - for event in captured_events: - self.assertSystemConfiguration(event) - self.assertConnectionParams( - event, expected_http_path=self.arguments["http_path"] - ) - - # Assert latency event (second event) - self.assertStatementExecution(captured_events[1]) - - print(f"\nStatement ID: {statement_id}") + len(captured_events) == 0 + ), f"Expected 0 events with default settings, got {len(captured_events)}" + assert ( + len(captured_futures) == 0 + ), f"Expected 0 responses with default settings, got {len(captured_futures)}" def test_sql_error_sends_telemetry_with_error_info(self, telemetry_interceptors): """ @@ -552,7 +494,7 @@ def test_sql_error_sends_telemetry_with_error_info(self, telemetry_interceptors) assert False, "Query should have failed" except Exception as e: # Expected to fail - print(f"\nExpected error occurred: {type(e).__name__}") + pass time.sleep(2) done, not_done = wait(captured_futures, timeout=10) @@ -562,7 +504,6 @@ def test_sql_error_sends_telemetry_with_error_info(self, telemetry_interceptors) len(captured_events) >= 1 ), f"Expected at least 1 event, got {len(captured_events)}" - print(f"\nCaptured {len(captured_events)} events") # Find event with error_info (typically the middle event) error_event = None @@ -570,7 +511,6 @@ def test_sql_error_sends_telemetry_with_error_info(self, telemetry_interceptors) error_info = event.entry.sql_driver_log.error_info if error_info: error_event = event - print(f"\nFound error_info in event {idx}") break assert ( @@ -590,9 +530,6 @@ def test_sql_error_sends_telemetry_with_error_info(self, telemetry_interceptors) error_event, expected_error_name="ServerOperationError" ) - print( - f"✅ Error telemetry successfully captured with error_name and stack_trace" - ) def test_non_existent_table_error_sends_telemetry(self, telemetry_interceptors): """ @@ -626,7 +563,7 @@ def test_non_existent_table_error_sends_telemetry(self, telemetry_interceptors): assert False, "Query should have failed" except Exception as e: # Expected to fail - print(f"\nExpected error occurred: {type(e).__name__}") + pass time.sleep(2) done, not_done = wait(captured_futures, timeout=10) @@ -636,7 +573,6 @@ def test_non_existent_table_error_sends_telemetry(self, telemetry_interceptors): len(captured_events) >= 1 ), f"Expected at least 1 event, got {len(captured_events)}" - print(f"\nCaptured {len(captured_events)} events") # Find event with error_info error_event = None @@ -644,7 +580,6 @@ def test_non_existent_table_error_sends_telemetry(self, telemetry_interceptors): error_info = event.entry.sql_driver_log.error_info if error_info: error_event = event - print(f"\nFound error_info in event {idx}") break assert ( @@ -662,7 +597,6 @@ def test_non_existent_table_error_sends_telemetry(self, telemetry_interceptors): # Assert error info exists self.assertErrorInfo(error_event) - print(f"✅ Non-existent table error telemetry captured") def test_metadata_get_catalogs_sends_telemetry(self, telemetry_interceptors): """ @@ -700,7 +634,6 @@ def test_metadata_get_catalogs_sends_telemetry(self, telemetry_interceptors): len(captured_events) >= 1 ), f"Expected at least 1 event, got {len(captured_events)}" - print(f"\nCaptured {len(captured_events)} events for getCatalogs") # Assert system configuration and connection params for all events for event in captured_events: @@ -709,7 +642,6 @@ def test_metadata_get_catalogs_sends_telemetry(self, telemetry_interceptors): event, expected_http_path=self.arguments["http_path"] ) - print(f"✅ Metadata getCatalogs telemetry captured") def test_direct_results_sends_telemetry(self, telemetry_interceptors): """ @@ -749,7 +681,6 @@ def test_direct_results_sends_telemetry(self, telemetry_interceptors): len(captured_events) >= 2 ), f"Expected at least 2 events, got {len(captured_events)}" - print(f"\nCaptured {len(captured_events)} events for direct results") # Assert system configuration and connection params for all events for event in captured_events: @@ -762,7 +693,6 @@ def test_direct_results_sends_telemetry(self, telemetry_interceptors): latency_event = captured_events[-1] self.assertStatementExecution(latency_event) - print(f"✅ Direct results telemetry captured") def test_cloudfetch_no_explicit_close_sends_telemetry(self, telemetry_interceptors): """ @@ -802,9 +732,6 @@ def test_cloudfetch_no_explicit_close_sends_telemetry(self, telemetry_intercepto len(captured_events) >= 2 ), f"Expected at least 2 events, got {len(captured_events)}" - print( - f"\nCaptured {len(captured_events)} events for cloudfetch (auto close)" - ) # Assert system configuration and connection params for all events for event in captured_events: @@ -817,7 +744,6 @@ def test_cloudfetch_no_explicit_close_sends_telemetry(self, telemetry_intercepto latency_event = captured_events[-1] self.assertStatementExecution(latency_event) - print(f"✅ Cloudfetch (auto close) telemetry captured") def test_cloudfetch_statement_closed_sends_telemetry(self, telemetry_interceptors): """ @@ -858,9 +784,6 @@ def test_cloudfetch_statement_closed_sends_telemetry(self, telemetry_interceptor len(captured_events) >= 2 ), f"Expected at least 2 events, got {len(captured_events)}" - print( - f"\nCaptured {len(captured_events)} events for cloudfetch (statement close)" - ) # Assert system configuration and connection params for all events for event in captured_events: @@ -873,7 +796,6 @@ def test_cloudfetch_statement_closed_sends_telemetry(self, telemetry_interceptor latency_event = captured_events[-1] self.assertStatementExecution(latency_event) - print(f"✅ Cloudfetch (statement close) telemetry captured") def test_cloudfetch_connection_closed_sends_telemetry(self, telemetry_interceptors): """ @@ -913,9 +835,6 @@ def test_cloudfetch_connection_closed_sends_telemetry(self, telemetry_intercepto len(captured_events) >= 2 ), f"Expected at least 2 events, got {len(captured_events)}" - print( - f"\nCaptured {len(captured_events)} events for cloudfetch (connection close)" - ) # Assert system configuration and connection params for all events for event in captured_events: @@ -928,7 +847,6 @@ def test_cloudfetch_connection_closed_sends_telemetry(self, telemetry_intercepto latency_event = captured_events[-1] self.assertStatementExecution(latency_event) - print(f"✅ Cloudfetch (connection close) telemetry captured") def test_cloudfetch_only_resultset_closed_sends_telemetry( self, telemetry_interceptors @@ -971,9 +889,6 @@ def test_cloudfetch_only_resultset_closed_sends_telemetry( len(captured_events) >= 2 ), f"Expected at least 2 events, got {len(captured_events)}" - print( - f"\nCaptured {len(captured_events)} events for cloudfetch (resultset close)" - ) # Assert system configuration and connection params for all events for event in captured_events: @@ -986,4 +901,3 @@ def test_cloudfetch_only_resultset_closed_sends_telemetry( latency_event = captured_events[-1] self.assertStatementExecution(latency_event) - print(f"✅ Cloudfetch (resultset close) telemetry captured") From 1fa38efe16af4d52569997717aa0229d32a6bf45 Mon Sep 17 00:00:00 2001 From: Nikhil Suri Date: Tue, 25 Nov 2025 16:17:03 +0530 Subject: [PATCH 8/9] redcude test length and made more reusable code Signed-off-by: Nikhil Suri --- tests/e2e/test_telemetry_e2e.py | 972 +++++++------------------------- 1 file changed, 206 insertions(+), 766 deletions(-) diff --git a/tests/e2e/test_telemetry_e2e.py b/tests/e2e/test_telemetry_e2e.py index a42873aa4..917c8e5eb 100644 --- a/tests/e2e/test_telemetry_e2e.py +++ b/tests/e2e/test_telemetry_e2e.py @@ -2,7 +2,6 @@ E2E test for telemetry - verifies telemetry behavior with different scenarios """ import time -import json import threading import logging from contextlib import contextmanager @@ -59,16 +58,9 @@ def telemetry_setup_teardown(self): TelemetryClientFactory._stop_flush_thread() TelemetryClientFactory._initialized = False - # ==================== REUSABLE WRAPPER METHODS ==================== - @pytest.fixture def telemetry_interceptors(self): - """ - Setup reusable telemetry interceptors as a fixture. - - Returns: - Tuple of (captured_events, captured_futures, export_wrapper, callback_wrapper) - """ + """Setup reusable telemetry interceptors as a fixture""" capture_lock = threading.Lock() captured_events = [] captured_futures = [] @@ -77,827 +69,275 @@ def telemetry_interceptors(self): original_callback = TelemetryClient._telemetry_request_callback def export_wrapper(self_client, event): - """Intercept telemetry events to capture payload""" with capture_lock: captured_events.append(event) return original_export(self_client, event) def callback_wrapper(self_client, future, sent_count): - """Capture HTTP response futures""" with capture_lock: captured_futures.append(future) original_callback(self_client, future, sent_count) return captured_events, captured_futures, export_wrapper, callback_wrapper - # ==================== ASSERTION METHODS ==================== + # ==================== ASSERTION HELPERS ==================== - def assertSystemConfiguration(self, event): - """Assert base system configuration fields""" + def assert_system_config(self, event): + """Assert system configuration fields""" sys_config = event.entry.sql_driver_log.system_configuration - - assert sys_config is not None, "system_configuration should not be None" - - # Driver information - assert ( - sys_config.driver_name == "Databricks SQL Python Connector" - ), f"Expected driver_name 'Databricks SQL Python Connector', got '{sys_config.driver_name}'" - assert ( - sys_config.driver_version is not None and len(sys_config.driver_version) > 0 - ), "driver_version should not be None or empty" - - # OS information - assert ( - sys_config.os_name is not None and len(sys_config.os_name) > 0 - ), "os_name should not be None or empty" - assert ( - sys_config.os_version is not None and len(sys_config.os_version) > 0 - ), "os_version should not be None or empty" - assert ( - sys_config.os_arch is not None and len(sys_config.os_arch) > 0 - ), "os_arch should not be None or empty" - - # Runtime information - assert ( - sys_config.runtime_name is not None and len(sys_config.runtime_name) > 0 - ), "runtime_name should not be None or empty" - assert ( - sys_config.runtime_version is not None - and len(sys_config.runtime_version) > 0 - ), "runtime_version should not be None or empty" - assert ( - sys_config.runtime_vendor is not None and len(sys_config.runtime_vendor) > 0 - ), "runtime_vendor should not be None or empty" - - # Locale and encoding - assert ( - sys_config.locale_name is not None and len(sys_config.locale_name) > 0 - ), "locale_name should not be None or empty" - assert ( - sys_config.char_set_encoding is not None - and len(sys_config.char_set_encoding) > 0 - ), "char_set_encoding should not be None or empty" - - def assertConnectionParams( - self, - event, - expected_http_path=None, - expected_mode=None, - expected_auth_mech=None, - expected_auth_flow=None, - ): + assert sys_config is not None + + # Check all required fields are non-empty + for field in ['driver_name', 'driver_version', 'os_name', 'os_version', + 'os_arch', 'runtime_name', 'runtime_version', 'runtime_vendor', + 'locale_name', 'char_set_encoding']: + value = getattr(sys_config, field) + assert value and len(value) > 0, f"{field} should not be None or empty" + + assert sys_config.driver_name == "Databricks SQL Python Connector" + + def assert_connection_params(self, event, expected_http_path=None): """Assert connection parameters""" conn_params = event.entry.sql_driver_log.driver_connection_params - - assert conn_params is not None, "driver_connection_params should not be None" - - # HTTP Path + assert conn_params is not None + assert conn_params.http_path + assert conn_params.host_info is not None + assert conn_params.auth_mech is not None + if expected_http_path: - assert ( - conn_params.http_path == expected_http_path - ), f"Expected http_path '{expected_http_path}', got '{conn_params.http_path}'" - assert ( - conn_params.http_path is not None and len(conn_params.http_path) > 0 - ), "http_path should not be None or empty" - - # Mode - if expected_mode: - assert ( - conn_params.mode == expected_mode - ), f"Expected mode '{expected_mode}', got '{conn_params.mode}'" - # Mode is optional, so don't assert it must exist - - # Host Info (HostDetails object) - assert conn_params.host_info is not None, "host_info should not be None" - - # Auth Mechanism (AuthMech object) - if expected_auth_mech: - assert ( - conn_params.auth_mech == expected_auth_mech - ), f"Expected auth_mech '{expected_auth_mech}', got '{conn_params.auth_mech}'" - assert conn_params.auth_mech is not None, "auth_mech should not be None" - - # Auth Flow (optional string) - if expected_auth_flow: - assert ( - conn_params.auth_flow == expected_auth_flow - ), f"Expected auth_flow '{expected_auth_flow}', got '{conn_params.auth_flow}'" - # auth_flow is optional, so don't assert it must exist - - # Socket Timeout - # socket_timeout is optional and can be None + assert conn_params.http_path == expected_http_path + if conn_params.socket_timeout is not None: - assert ( - conn_params.socket_timeout > 0 - ), f"socket_timeout should be positive, got {conn_params.socket_timeout}" - - def assertStatementExecution( - self, event, statement_type=None, execution_result=None - ): - """Assert statement execution details including operation latency""" - sql_op = event.entry.sql_driver_log.sql_operation - - assert sql_op is not None, "sql_operation should not be None for SQL execution" + assert conn_params.socket_timeout > 0 - # Statement Type - if statement_type: - assert ( - sql_op.statement_type == statement_type - ), f"Expected statement_type '{statement_type}', got '{sql_op.statement_type}'" - else: - # At minimum, statement_type should exist - assert ( - sql_op.statement_type is not None - ), "statement_type should not be None" - - # Execution Result - if execution_result: - assert ( - sql_op.execution_result == execution_result - ), f"Expected execution_result '{execution_result}', got '{sql_op.execution_result}'" - else: - # At minimum, execution_result should exist - assert ( - sql_op.execution_result is not None - ), "execution_result should not be None" - - # Retry Count - assert hasattr(sql_op, "retry_count"), "sql_operation should have retry_count" + def assert_statement_execution(self, event): + """Assert statement execution details""" + sql_op = event.entry.sql_driver_log.sql_operation + assert sql_op is not None + assert sql_op.statement_type is not None + assert sql_op.execution_result is not None + assert hasattr(sql_op, "retry_count") + if sql_op.retry_count is not None: - assert ( - sql_op.retry_count >= 0 - ), f"retry_count should be non-negative, got {sql_op.retry_count}" + assert sql_op.retry_count >= 0 - # Operation Latency latency = event.entry.sql_driver_log.operation_latency_ms - assert latency is not None, "operation_latency_ms should not be None" - assert ( - latency >= 0 - ), f"operation_latency_ms should be non-negative, got {latency}" + assert latency is not None and latency >= 0 - def assertErrorInfo(self, event, expected_error_name=None): + def assert_error_info(self, event, expected_error_name=None): """Assert error information""" error_info = event.entry.sql_driver_log.error_info - - assert error_info is not None, "error_info should not be None for error events" - - # Error Name - assert ( - error_info.error_name is not None and len(error_info.error_name) > 0 - ), "error_name should not be None or empty" + assert error_info is not None + assert error_info.error_name and len(error_info.error_name) > 0 + assert error_info.stack_trace and len(error_info.stack_trace) > 0 + if expected_error_name: - assert ( - error_info.error_name == expected_error_name - ), f"Expected error_name '{expected_error_name}', got '{error_info.error_name}'" - - # Stack Trace - assert ( - error_info.stack_trace is not None and len(error_info.stack_trace) > 0 - ), "stack_trace should not be None or empty" - - def assertOperationLatency(self, event): - """Assert operation latency exists""" - latency = event.entry.sql_driver_log.operation_latency_ms - assert latency is not None, "operation_latency_ms should not be None" - assert latency >= 0, "operation_latency_ms should be non-negative" - - def assertBaseTelemetryEvent(self, captured_events): - """Assert basic telemetry event payload fields""" - assert len(captured_events) > 0, "No events captured to assert" - - for event in captured_events: - telemetry_event = event.entry.sql_driver_log - assert telemetry_event.session_id is not None - - # ==================== TEST SCENARIOS ==================== - - def test_enable_telemetry_on_with_server_on_sends_events( - self, telemetry_interceptors - ): - """ - Scenario: enable_telemetry=ON, force_enable_telemetry=OFF, server=ON - Expected: 2 events (initial_log + latency_log) - """ - ( - captured_events, - captured_futures, - export_wrapper, - callback_wrapper, - ) = telemetry_interceptors - - with patch.object( - TelemetryClient, "_export_event", export_wrapper - ), patch.object( - TelemetryClient, "_telemetry_request_callback", callback_wrapper - ): - with self.connection( - extra_params={ - "enable_telemetry": True, - "force_enable_telemetry": False, - "telemetry_batch_size": 1, - } - ) as conn: - with conn.cursor() as cursor: - cursor.execute("SELECT 1") - cursor.fetchone() - statement_id = cursor.query_id - - time.sleep(2) - done, not_done = wait(captured_futures, timeout=10) - - # Exact count assertion - assert ( - len(captured_events) == 2 - ), f"Expected exactly 2 events, got {len(captured_events)}" - assert len(done) == 2, f"Expected exactly 2 responses, got {len(done)}" - - # Verify HTTP responses - for future in done: - response = future.result() - assert 200 <= response.status < 300 - - # Assert payload for all events - for event in captured_events: - self.assertSystemConfiguration(event) - self.assertConnectionParams( - event, expected_http_path=self.arguments["http_path"] - ) - - # Assert latency event (second event) - self.assertStatementExecution(captured_events[1]) - - def test_force_enable_on_with_enable_off_sends_events(self, telemetry_interceptors): - """ - Scenario: enable_telemetry=OFF, force_enable_telemetry=ON, server=ON - Expected: 2 events (initial_log + latency_log) - """ - ( - captured_events, - captured_futures, - export_wrapper, - callback_wrapper, - ) = telemetry_interceptors - - with patch.object( - TelemetryClient, "_export_event", export_wrapper - ), patch.object( - TelemetryClient, "_telemetry_request_callback", callback_wrapper - ): - with self.connection( - extra_params={ - "enable_telemetry": False, - "force_enable_telemetry": True, - "telemetry_batch_size": 1, - } - ) as conn: - with conn.cursor() as cursor: - cursor.execute("SELECT 2") - cursor.fetchone() - statement_id = cursor.query_id + assert error_info.error_name == expected_error_name + def verify_events(self, captured_events, captured_futures, expected_count): + """Common verification for event count and HTTP responses""" + if expected_count == 0: + assert len(captured_events) == 0, f"Expected 0 events, got {len(captured_events)}" + assert len(captured_futures) == 0, f"Expected 0 responses, got {len(captured_futures)}" + else: + assert len(captured_events) == expected_count, \ + f"Expected {expected_count} events, got {len(captured_events)}" + time.sleep(2) - done, not_done = wait(captured_futures, timeout=10) - - # Exact count assertion - assert ( - len(captured_events) == 2 - ), f"Expected exactly 2 events, got {len(captured_events)}" - assert len(done) == 2, f"Expected exactly 2 responses, got {len(done)}" - - # Verify HTTP responses + done, _ = wait(captured_futures, timeout=10) + assert len(done) == expected_count, \ + f"Expected {expected_count} responses, got {len(done)}" + for future in done: response = future.result() assert 200 <= response.status < 300 - - # Assert payload + + # Assert common fields for all events for event in captured_events: - self.assertSystemConfiguration(event) - self.assertConnectionParams( - event, expected_http_path=self.arguments["http_path"] - ) - - self.assertStatementExecution(captured_events[1]) - - - def test_both_flags_off_does_not_send_events(self, telemetry_interceptors): - """ - Scenario: enable_telemetry=OFF, force_enable_telemetry=OFF, server=ON - Expected: 0 events - """ - ( - captured_events, - captured_futures, - export_wrapper, - callback_wrapper, - ) = telemetry_interceptors - - with patch.object( - TelemetryClient, "_export_event", export_wrapper - ), patch.object( - TelemetryClient, "_telemetry_request_callback", callback_wrapper - ): - with self.connection( - extra_params={ - "enable_telemetry": False, - "force_enable_telemetry": False, - "telemetry_batch_size": 1, - } - ) as conn: + self.assert_system_config(event) + self.assert_connection_params(event, self.arguments["http_path"]) + + # ==================== PARAMETERIZED TESTS ==================== + + @pytest.mark.parametrize("enable_telemetry,force_enable,expected_count,test_id", [ + (True, False, 2, "enable_on_force_off"), + (False, True, 2, "enable_off_force_on"), + (False, False, 0, "both_off"), + (None, None, 0, "default_behavior"), + ]) + def test_telemetry_flags(self, telemetry_interceptors, enable_telemetry, + force_enable, expected_count, test_id): + """Test telemetry behavior with different flag combinations""" + captured_events, captured_futures, export_wrapper, callback_wrapper = \ + telemetry_interceptors + + with patch.object(TelemetryClient, "_export_event", export_wrapper), \ + patch.object(TelemetryClient, "_telemetry_request_callback", callback_wrapper): + + extra_params = {"telemetry_batch_size": 1} + if enable_telemetry is not None: + extra_params["enable_telemetry"] = enable_telemetry + if force_enable is not None: + extra_params["force_enable_telemetry"] = force_enable + + with self.connection(extra_params=extra_params) as conn: with conn.cursor() as cursor: - cursor.execute("SELECT 3") - cursor.fetchone() - statement_id = cursor.query_id - - time.sleep(2) - - # Exact count assertion - assert ( - len(captured_events) == 0 - ), f"Expected 0 events, got {len(captured_events)}" - assert ( - len(captured_futures) == 0 - ), f"Expected 0 responses, got {len(captured_futures)}" - - - def test_default_behavior_does_not_send_events( - self, telemetry_interceptors - ): - """ - Scenario: Neither enable_telemetry nor force_enable_telemetry passed (uses defaults) - Expected: 0 events (telemetry disabled by default) - - Default behavior: - - enable_telemetry defaults to False - - force_enable_telemetry defaults to False - - Telemetry will NOT be sent (NoopTelemetryClient used) - """ - ( - captured_events, - captured_futures, - export_wrapper, - callback_wrapper, - ) = telemetry_interceptors - - with patch.object( - TelemetryClient, "_export_event", export_wrapper - ), patch.object( - TelemetryClient, "_telemetry_request_callback", callback_wrapper - ): - # Connection without explicit telemetry flags - relies on defaults - with self.connection( - extra_params={ - "telemetry_batch_size": 1, - } - ) as conn: - with conn.cursor() as cursor: - cursor.execute("SELECT 99") + cursor.execute("SELECT 1") cursor.fetchone() - time.sleep(2) - - # With default enable_telemetry=False, expect 0 events - assert ( - len(captured_events) == 0 - ), f"Expected 0 events with default settings, got {len(captured_events)}" - assert ( - len(captured_futures) == 0 - ), f"Expected 0 responses with default settings, got {len(captured_futures)}" - - def test_sql_error_sends_telemetry_with_error_info(self, telemetry_interceptors): - """ - Scenario: SQL query with invalid syntax causes error - Expected: Telemetry event with error_name and stack_trace - """ - ( - captured_events, - captured_futures, - export_wrapper, - callback_wrapper, - ) = telemetry_interceptors - - with patch.object( - TelemetryClient, "_export_event", export_wrapper - ), patch.object( - TelemetryClient, "_telemetry_request_callback", callback_wrapper - ): - with self.connection( - extra_params={ - "force_enable_telemetry": True, - "telemetry_batch_size": 1, - } - ) as conn: + self.verify_events(captured_events, captured_futures, expected_count) + + # Assert statement execution on latency event (if events exist) + if expected_count > 0: + self.assert_statement_execution(captured_events[-1]) + + @pytest.mark.parametrize("query,expected_error", [ + ("SELECT * FROM WHERE INVALID SYNTAX 12345", "ServerOperationError"), + ("SELECT * FROM non_existent_table_xyz_12345", None), + ]) + def test_sql_errors(self, telemetry_interceptors, query, expected_error): + """Test telemetry captures error information for different SQL errors""" + captured_events, captured_futures, export_wrapper, callback_wrapper = \ + telemetry_interceptors + + with patch.object(TelemetryClient, "_export_event", export_wrapper), \ + patch.object(TelemetryClient, "_telemetry_request_callback", callback_wrapper): + + with self.connection(extra_params={ + "force_enable_telemetry": True, + "telemetry_batch_size": 1, + }) as conn: with conn.cursor() as cursor: - # Execute query with invalid syntax to trigger immediate parse error - try: - cursor.execute("SELECT * FROM WHERE INVALID SYNTAX 12345") + with pytest.raises(Exception): + cursor.execute(query) cursor.fetchone() - assert False, "Query should have failed" - except Exception as e: - # Expected to fail - pass time.sleep(2) - done, not_done = wait(captured_futures, timeout=10) - - # Should have at least 1 event - assert ( - len(captured_events) >= 1 - ), f"Expected at least 1 event, got {len(captured_events)}" - - - # Find event with error_info (typically the middle event) - error_event = None - for idx, event in enumerate(captured_events): - error_info = event.entry.sql_driver_log.error_info - if error_info: - error_event = event - break - - assert ( - error_event is not None - ), "Expected at least one event with error_info" - - # Assert system configuration - self.assertSystemConfiguration(error_event) - - # Assert connection params - self.assertConnectionParams( - error_event, expected_http_path=self.arguments["http_path"] - ) - - # Assert error info with ServerOperationError - self.assertErrorInfo( - error_event, expected_error_name="ServerOperationError" - ) - - - def test_non_existent_table_error_sends_telemetry(self, telemetry_interceptors): - """ - Scenario: SQL query on non-existent table causes error - Expected: Telemetry event with error_name and stack_trace - Note: This test checks timing - querying non-existent table vs invalid syntax - """ - ( - captured_events, - captured_futures, - export_wrapper, - callback_wrapper, - ) = telemetry_interceptors - - with patch.object( - TelemetryClient, "_export_event", export_wrapper - ), patch.object( - TelemetryClient, "_telemetry_request_callback", callback_wrapper - ): - with self.connection( - extra_params={ - "force_enable_telemetry": True, - "telemetry_batch_size": 1, - } - ) as conn: - with conn.cursor() as cursor: - # Execute query on non-existent table - try: - cursor.execute("SELECT * FROM non_existent_table_xyz_12345") - cursor.fetchone() - assert False, "Query should have failed" - except Exception as e: - # Expected to fail - pass - - time.sleep(2) - done, not_done = wait(captured_futures, timeout=10) - - # Should have at least 1 event - assert ( - len(captured_events) >= 1 - ), f"Expected at least 1 event, got {len(captured_events)}" + wait(captured_futures, timeout=10) + assert len(captured_events) >= 1 # Find event with error_info - error_event = None - for idx, event in enumerate(captured_events): - error_info = event.entry.sql_driver_log.error_info - if error_info: - error_event = event - break - - assert ( - error_event is not None - ), "Expected at least one event with error_info" - - # Assert system configuration - self.assertSystemConfiguration(error_event) - - # Assert connection params - self.assertConnectionParams( - error_event, expected_http_path=self.arguments["http_path"] - ) - - # Assert error info exists - self.assertErrorInfo(error_event) - - - def test_metadata_get_catalogs_sends_telemetry(self, telemetry_interceptors): - """ - Scenario: Statement created and metadata (getCatalogs) called - Expected: Telemetry events with system config and connection params - """ - ( - captured_events, - captured_futures, - export_wrapper, - callback_wrapper, - ) = telemetry_interceptors - - with patch.object( - TelemetryClient, "_export_event", export_wrapper - ), patch.object( - TelemetryClient, "_telemetry_request_callback", callback_wrapper - ): - with self.connection( - extra_params={ - "force_enable_telemetry": True, - "telemetry_batch_size": 1, - } - ) as conn: + error_event = next((e for e in captured_events + if e.entry.sql_driver_log.error_info), None) + assert error_event is not None + + self.assert_system_config(error_event) + self.assert_connection_params(error_event, self.arguments["http_path"]) + self.assert_error_info(error_event, expected_error) + + def test_metadata_operation(self, telemetry_interceptors): + """Test telemetry for metadata operations (getCatalogs)""" + captured_events, captured_futures, export_wrapper, callback_wrapper = \ + telemetry_interceptors + + with patch.object(TelemetryClient, "_export_event", export_wrapper), \ + patch.object(TelemetryClient, "_telemetry_request_callback", callback_wrapper): + + with self.connection(extra_params={ + "force_enable_telemetry": True, + "telemetry_batch_size": 1, + }) as conn: with conn.cursor() as cursor: - # Call metadata operation catalogs = cursor.catalogs() catalogs.fetchall() time.sleep(2) - done, not_done = wait(captured_futures, timeout=10) - - # Should have at least 1 event - assert ( - len(captured_events) >= 1 - ), f"Expected at least 1 event, got {len(captured_events)}" + wait(captured_futures, timeout=10) - - # Assert system configuration and connection params for all events + assert len(captured_events) >= 1 for event in captured_events: - self.assertSystemConfiguration(event) - self.assertConnectionParams( - event, expected_http_path=self.arguments["http_path"] - ) - - - def test_direct_results_sends_telemetry(self, telemetry_interceptors): - """ - Scenario: ResultSet created with directResults (use_cloud_fetch=False) - Expected: Telemetry events with SQL execution metrics - """ - ( - captured_events, - captured_futures, - export_wrapper, - callback_wrapper, - ) = telemetry_interceptors - - with patch.object( - TelemetryClient, "_export_event", export_wrapper - ), patch.object( - TelemetryClient, "_telemetry_request_callback", callback_wrapper - ): - with self.connection( - extra_params={ - "force_enable_telemetry": True, - "telemetry_batch_size": 1, - "use_cloud_fetch": False, # Force direct results - } - ) as conn: + self.assert_system_config(event) + self.assert_connection_params(event, self.arguments["http_path"]) + + def test_direct_results(self, telemetry_interceptors): + """Test telemetry with direct results (use_cloud_fetch=False)""" + captured_events, captured_futures, export_wrapper, callback_wrapper = \ + telemetry_interceptors + + with patch.object(TelemetryClient, "_export_event", export_wrapper), \ + patch.object(TelemetryClient, "_telemetry_request_callback", callback_wrapper): + + with self.connection(extra_params={ + "force_enable_telemetry": True, + "telemetry_batch_size": 1, + "use_cloud_fetch": False, + }) as conn: with conn.cursor() as cursor: cursor.execute("SELECT 100") result = cursor.fetchall() - assert len(result) == 1 - assert result[0][0] == 100 - - time.sleep(2) - done, not_done = wait(captured_futures, timeout=10) - - # Should have at least 2 events (initial + latency) - assert ( - len(captured_events) >= 2 - ), f"Expected at least 2 events, got {len(captured_events)}" - - - # Assert system configuration and connection params for all events - for event in captured_events: - self.assertSystemConfiguration(event) - self.assertConnectionParams( - event, expected_http_path=self.arguments["http_path"] - ) - - # Assert SQL execution metrics on latency event (last event) - latency_event = captured_events[-1] - self.assertStatementExecution(latency_event) - - - def test_cloudfetch_no_explicit_close_sends_telemetry(self, telemetry_interceptors): - """ - Scenario: ResultSet created with cloudfetch, Statement/Connection not explicitly closed - Expected: Telemetry events sent when context managers exit - """ - ( - captured_events, - captured_futures, - export_wrapper, - callback_wrapper, - ) = telemetry_interceptors - - with patch.object( - TelemetryClient, "_export_event", export_wrapper - ), patch.object( - TelemetryClient, "_telemetry_request_callback", callback_wrapper - ): - with self.connection( - extra_params={ - "force_enable_telemetry": True, - "telemetry_batch_size": 1, - "use_cloud_fetch": True, # Enable cloud fetch - } - ) as conn: - with conn.cursor() as cursor: - cursor.execute("SELECT * FROM range(1000)") - result = cursor.fetchall() - assert len(result) == 1000 - # Statement and connection close automatically via context managers + assert len(result) == 1 and result[0][0] == 100 time.sleep(2) - done, not_done = wait(captured_futures, timeout=10) + wait(captured_futures, timeout=10) - # Should have at least 2 events (initial + latency) - assert ( - len(captured_events) >= 2 - ), f"Expected at least 2 events, got {len(captured_events)}" - - - # Assert system configuration and connection params for all events + assert len(captured_events) >= 2 for event in captured_events: - self.assertSystemConfiguration(event) - self.assertConnectionParams( - event, expected_http_path=self.arguments["http_path"] + self.assert_system_config(event) + self.assert_connection_params(event, self.arguments["http_path"]) + + self.assert_statement_execution(captured_events[-1]) + + @pytest.mark.parametrize("close_type", [ + "context_manager", + "explicit_cursor", + "explicit_connection", + "implicit_fetchall", + ]) + def test_cloudfetch_with_different_close_patterns(self, telemetry_interceptors, + close_type): + """Test telemetry with cloud fetch using different resource closing patterns""" + captured_events, captured_futures, export_wrapper, callback_wrapper = \ + telemetry_interceptors + + with patch.object(TelemetryClient, "_export_event", export_wrapper), \ + patch.object(TelemetryClient, "_telemetry_request_callback", callback_wrapper): + + if close_type == "explicit_connection": + # Test explicit connection close + conn = sql.connect( + **self.connection_params(), + force_enable_telemetry=True, + telemetry_batch_size=1, + use_cloud_fetch=True, ) - - # Assert SQL execution metrics on latency event - latency_event = captured_events[-1] - self.assertStatementExecution(latency_event) - - - def test_cloudfetch_statement_closed_sends_telemetry(self, telemetry_interceptors): - """ - Scenario: ResultSet created with cloudfetch, Statement explicitly closed - Expected: Telemetry events sent when statement closes - Note: With batch_size=1, immediate flush. With larger batch, may need connection close. - """ - ( - captured_events, - captured_futures, - export_wrapper, - callback_wrapper, - ) = telemetry_interceptors - - with patch.object( - TelemetryClient, "_export_event", export_wrapper - ), patch.object( - TelemetryClient, "_telemetry_request_callback", callback_wrapper - ): - with self.connection( - extra_params={ - "force_enable_telemetry": True, - "telemetry_batch_size": 1, - "use_cloud_fetch": True, - } - ) as conn: cursor = conn.cursor() cursor.execute("SELECT * FROM range(1000)") result = cursor.fetchall() assert len(result) == 1000 - cursor.close() # Explicitly close statement - - time.sleep(2) - done, not_done = wait(captured_futures, timeout=10) - - # Should have at least 2 events (initial + latency) - assert ( - len(captured_events) >= 2 - ), f"Expected at least 2 events, got {len(captured_events)}" - - - # Assert system configuration and connection params for all events - for event in captured_events: - self.assertSystemConfiguration(event) - self.assertConnectionParams( - event, expected_http_path=self.arguments["http_path"] - ) - - # Assert SQL execution metrics on latency event - latency_event = captured_events[-1] - self.assertStatementExecution(latency_event) - - - def test_cloudfetch_connection_closed_sends_telemetry(self, telemetry_interceptors): - """ - Scenario: ResultSet created with cloudfetch, Connection explicitly closed - Expected: Telemetry events sent when connection closes (forces flush) - Note: Connection.close() always flushes all pending telemetry events. - """ - ( - captured_events, - captured_futures, - export_wrapper, - callback_wrapper, - ) = telemetry_interceptors - - with patch.object( - TelemetryClient, "_export_event", export_wrapper - ), patch.object( - TelemetryClient, "_telemetry_request_callback", callback_wrapper - ): - conn = sql.connect( - **self.connection_params(), - force_enable_telemetry=True, - telemetry_batch_size=1, - use_cloud_fetch=True, - ) - cursor = conn.cursor() - cursor.execute("SELECT * FROM range(1000)") - result = cursor.fetchall() - assert len(result) == 1000 - conn.close() # Explicitly close connection (forces flush) - - time.sleep(2) - done, not_done = wait(captured_futures, timeout=10) - - # Should have at least 2 events (initial + latency) - assert ( - len(captured_events) >= 2 - ), f"Expected at least 2 events, got {len(captured_events)}" - - - # Assert system configuration and connection params for all events - for event in captured_events: - self.assertSystemConfiguration(event) - self.assertConnectionParams( - event, expected_http_path=self.arguments["http_path"] - ) - - # Assert SQL execution metrics on latency event - latency_event = captured_events[-1] - self.assertStatementExecution(latency_event) - - - def test_cloudfetch_only_resultset_closed_sends_telemetry( - self, telemetry_interceptors - ): - """ - Scenario: ResultSet created with cloudfetch, only ResultSet closed (implicit via fetchall) - Expected: Telemetry events sent (batch_size=1 ensures immediate flush) - Note: ResultSet closes after fetchall(). Events flush due to batch_size=1. - """ - ( - captured_events, - captured_futures, - export_wrapper, - callback_wrapper, - ) = telemetry_interceptors - - with patch.object( - TelemetryClient, "_export_event", export_wrapper - ), patch.object( - TelemetryClient, "_telemetry_request_callback", callback_wrapper - ): - with self.connection( - extra_params={ + conn.close() + else: + # Other patterns use connection context manager + with self.connection(extra_params={ "force_enable_telemetry": True, "telemetry_batch_size": 1, "use_cloud_fetch": True, - } - ) as conn: - cursor = conn.cursor() - cursor.execute("SELECT * FROM range(1000)") - result = cursor.fetchall() # ResultSet implicitly closed after fetchall - assert len(result) == 1000 - # Don't explicitly close cursor or connection (context manager will) + }) as conn: + if close_type == "context_manager": + with conn.cursor() as cursor: + cursor.execute("SELECT * FROM range(1000)") + result = cursor.fetchall() + assert len(result) == 1000 + + elif close_type == "explicit_cursor": + cursor = conn.cursor() + cursor.execute("SELECT * FROM range(1000)") + result = cursor.fetchall() + assert len(result) == 1000 + cursor.close() + + elif close_type == "implicit_fetchall": + cursor = conn.cursor() + cursor.execute("SELECT * FROM range(1000)") + result = cursor.fetchall() + assert len(result) == 1000 time.sleep(2) - done, not_done = wait(captured_futures, timeout=10) - - # Should have at least 2 events (initial + latency) - assert ( - len(captured_events) >= 2 - ), f"Expected at least 2 events, got {len(captured_events)}" - + wait(captured_futures, timeout=10) - # Assert system configuration and connection params for all events + assert len(captured_events) >= 2 for event in captured_events: - self.assertSystemConfiguration(event) - self.assertConnectionParams( - event, expected_http_path=self.arguments["http_path"] - ) - - # Assert SQL execution metrics on latency event - latency_event = captured_events[-1] - self.assertStatementExecution(latency_event) - + self.assert_system_config(event) + self.assert_connection_params(event, self.arguments["http_path"]) + + self.assert_statement_execution(captured_events[-1]) From a805f8f104f5ced25e52b58ffb9374da1c9a2b6d Mon Sep 17 00:00:00 2001 From: Nikhil Suri Date: Fri, 28 Nov 2025 12:17:48 +0530 Subject: [PATCH 9/9] removed telemetry e2e to daily single run Signed-off-by: Nikhil Suri --- .github/workflows/daily-telemetry-e2e.yml | 87 +++++++++++++++++++++++ .github/workflows/integration.yml | 8 ++- 2 files changed, 93 insertions(+), 2 deletions(-) create mode 100644 .github/workflows/daily-telemetry-e2e.yml diff --git a/.github/workflows/daily-telemetry-e2e.yml b/.github/workflows/daily-telemetry-e2e.yml new file mode 100644 index 000000000..3d61cf177 --- /dev/null +++ b/.github/workflows/daily-telemetry-e2e.yml @@ -0,0 +1,87 @@ +name: Daily Telemetry E2E Tests + +on: + schedule: + - cron: '0 0 * * 0' # Run every Sunday at midnight UTC + + workflow_dispatch: # Allow manual triggering + inputs: + test_pattern: + description: 'Test pattern to run (default: tests/e2e/test_telemetry_e2e.py)' + required: false + default: 'tests/e2e/test_telemetry_e2e.py' + type: string + +jobs: + telemetry-e2e-tests: + runs-on: ubuntu-latest + environment: azure-prod + + env: + DATABRICKS_SERVER_HOSTNAME: ${{ secrets.DATABRICKS_HOST }} + DATABRICKS_HTTP_PATH: ${{ secrets.TEST_PECO_WAREHOUSE_HTTP_PATH }} + DATABRICKS_TOKEN: ${{ secrets.DATABRICKS_TOKEN }} + DATABRICKS_CATALOG: peco + DATABRICKS_USER: ${{ secrets.TEST_PECO_SP_ID }} + + steps: + #---------------------------------------------- + # check-out repo and set-up python + #---------------------------------------------- + - name: Check out repository + uses: actions/checkout@v4 + + - name: Set up python + id: setup-python + uses: actions/setup-python@v5 + with: + python-version: "3.10" + + #---------------------------------------------- + # ----- install & configure poetry ----- + #---------------------------------------------- + - name: Install Poetry + uses: snok/install-poetry@v1 + with: + virtualenvs-create: true + virtualenvs-in-project: true + installer-parallel: true + + #---------------------------------------------- + # load cached venv if cache exists + #---------------------------------------------- + - name: Load cached venv + id: cached-poetry-dependencies + uses: actions/cache@v4 + with: + path: .venv + key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ github.event.repository.name }}-${{ hashFiles('**/poetry.lock') }} + + #---------------------------------------------- + # install dependencies if cache does not exist + #---------------------------------------------- + - name: Install dependencies + run: poetry install --no-interaction --all-extras + + #---------------------------------------------- + # run telemetry E2E tests + #---------------------------------------------- + - name: Run telemetry E2E tests + run: | + TEST_PATTERN="${{ github.event.inputs.test_pattern || 'tests/e2e/test_telemetry_e2e.py' }}" + echo "Running tests: $TEST_PATTERN" + poetry run python -m pytest $TEST_PATTERN -v -s + + #---------------------------------------------- + # upload test results on failure + #---------------------------------------------- + - name: Upload test results on failure + if: failure() + uses: actions/upload-artifact@v4 + with: + name: telemetry-test-results + path: | + .pytest_cache/ + tests-unsafe.log + retention-days: 7 + diff --git a/.github/workflows/integration.yml b/.github/workflows/integration.yml index 9c9e30a24..ad5369997 100644 --- a/.github/workflows/integration.yml +++ b/.github/workflows/integration.yml @@ -54,5 +54,9 @@ jobs: #---------------------------------------------- # run test suite #---------------------------------------------- - - name: Run e2e tests - run: poetry run python -m pytest tests/e2e -n auto \ No newline at end of file + - name: Run e2e tests (excluding daily-only tests) + run: | + # Exclude telemetry E2E tests from PR runs (run daily instead) + poetry run python -m pytest tests/e2e \ + --ignore=tests/e2e/test_telemetry_e2e.py \ + -n auto \ No newline at end of file