Skip to content

Commit

Permalink
Fixing idle issue and improving debug logging (#648)
Browse files Browse the repository at this point in the history
  • Loading branch information
benc-db authored May 1, 2024
1 parent a33829e commit cdf5803
Show file tree
Hide file tree
Showing 13 changed files with 516 additions and 86 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,14 @@
## dbt-databricks 1.7.14 (TBD)

### Fixes

- Auth headers should now evaluate at call time ([648](https://github.com/databricks/dbt-databricks/pull/648))
- User-configurable OAuth Scopes (currently limited to AWS) (thanks @stevenayers!) ([641](https://github.com/databricks/dbt-databricks/pull/641))

### Under the hood

- Reduce default idle limit for connection reuse to 60s and start organizing event logging ([648](https://github.com/databricks/dbt-databricks/pull/648))

## dbt-databricks 1.7.13 (April 8, 2024)

### Features
Expand Down
162 changes: 82 additions & 80 deletions dbt/adapters/databricks/connections.py

Large diffs are not rendered by default.

28 changes: 28 additions & 0 deletions dbt/adapters/databricks/events/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from abc import ABC

from databricks.sql.exc import Error


class ErrorEvent(ABC):
def __init__(self, exception: Exception, message: str):
self.message = message
self.exception = exception

def __str__(self) -> str:
return f"{self.message}: {self.exception}"


class SQLErrorEvent:
def __init__(self, exception: Exception, message: str):
self.message = message
self.exception = exception

def __str__(self) -> str:
properties = ""
if isinstance(self.exception, Error):
properties = "\nError properties: "
properties += ", ".join(
[f"{key}={value}" for key, value in sorted(self.exception.context.items())]
)

return f"{self.message}: {self.exception}{properties}"
121 changes: 121 additions & 0 deletions dbt/adapters/databricks/events/connection_events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
from abc import ABC
from typing import Optional
from typing import Tuple

from databricks.sql.client import Connection
from dbt.adapters.databricks.events.base import SQLErrorEvent
from dbt.contracts.graph.nodes import ResultNode


class ConnectionEvent(ABC):
def __init__(self, connection: Connection, message: str):
self.message = message
self.session_id = "Unknown"
if connection:
self.session_id = connection.get_session_id_hex() or "Unknown"

def __str__(self) -> str:
return f"Connection(session-id={self.session_id}) - {self.message}"


class ConnectionCancel(ConnectionEvent):
def __init__(self, connection: Connection):
super().__init__(connection, "Cancelling connection")


class ConnectionClose(ConnectionEvent):
def __init__(self, connection: Connection):
super().__init__(connection, "Closing connection")


class ConnectionCancelError(ConnectionEvent):
def __init__(self, connection: Connection, exception: Exception):
super().__init__(
connection, str(SQLErrorEvent(exception, "Exception while trying to cancel connection"))
)


class ConnectionCloseError(ConnectionEvent):
def __init__(self, connection: Connection, exception: Exception):
super().__init__(
connection, str(SQLErrorEvent(exception, "Exception while trying to close connection"))
)


class ConnectionCreateError(ConnectionEvent):
def __init__(self, connection: Connection, exception: Exception):
super().__init__(
connection, str(SQLErrorEvent(exception, "Exception while trying to create connection"))
)


class ConnectionWrapperEvent(ABC):
def __init__(self, description: str, message: str):
self.message = message
self.description = description

def __str__(self) -> str:
return f"{self.description} - {self.message}"


class ConnectionAcquire(ConnectionWrapperEvent):
def __init__(
self,
description: str,
model: Optional[ResultNode],
compute_name: Optional[str],
thread_identifier: Tuple[int, int],
):
message = f"Acquired connection on thread {thread_identifier}, using "
if not compute_name:
message += "default compute resource"
else:
message += f"compute resource '{compute_name}'"

if model:
# ResultNode *should* have relation_name attr, but we work around a core
# issue by checking.
relation_name = getattr(model, "relation_name", "[Unknown]")
message += f" for model '{relation_name}'"

super().__init__(description, message)


class ConnectionRelease(ConnectionWrapperEvent):
def __init__(self, description: str):
super().__init__(description, "Released connection")


class ConnectionReset(ConnectionWrapperEvent):
def __init__(self, description: str):
super().__init__(description, "Reset connection handle")


class ConnectionReuse(ConnectionWrapperEvent):
def __init__(self, description: str, prior_name: str):
super().__init__(description, f"Reusing connection previously named {prior_name}")


class ConnectionCreate(ConnectionWrapperEvent):
def __init__(self, description: str):
super().__init__(description, "Creating connection")


class ConnectionIdleCheck(ConnectionWrapperEvent):
def __init__(self, description: str):
super().__init__(description, "Checking idleness")


class ConnectionIdleClose(ConnectionWrapperEvent):
def __init__(self, description: str):
super().__init__(description, "Closing for idleness")


class ConnectionRetrieve(ConnectionWrapperEvent):
def __init__(self, description: str):
super().__init__(description, "Retrieving connection")


class ConnectionCreated(ConnectionWrapperEvent):
def __init__(self, description: str):
super().__init__(description, "Connection created")
19 changes: 19 additions & 0 deletions dbt/adapters/databricks/events/credential_events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from dbt.adapters.databricks.events.base import ErrorEvent


class CredentialLoadError(ErrorEvent):
def __init__(self, exception: Exception):
super().__init__(exception, "Exception while trying to load credentials")


class CredentialSaveError(ErrorEvent):
def __init__(self, exception: Exception):
super().__init__(exception, "Exception while trying to save credentials")


class CredentialShardEvent:
def __init__(self, password_len: int):
self.password_len = password_len

def __str__(self) -> str:
return f"Password is {self.password_len} characters, sharding it"
58 changes: 58 additions & 0 deletions dbt/adapters/databricks/events/cursor_events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
from abc import ABC
from uuid import UUID

from databricks.sql.client import Cursor
from dbt.adapters.databricks.events.base import SQLErrorEvent


class CursorEvent(ABC):
def __init__(self, cursor: Cursor, message: str):
self.message = message
self.session_id = "Unknown"
self.command_id = "Unknown"
if cursor:
if cursor.connection:
self.session_id = cursor.connection.get_session_id_hex()
if (
cursor.active_result_set
and cursor.active_result_set.command_id
and cursor.active_result_set.command_id.operationId
):
self.command_id = (
str(UUID(bytes=cursor.active_result_set.command_id.operationId.guid))
or "Unknown"
)

def __str__(self) -> str:
return (
f"Cursor(session-id={self.session_id}, command-id={self.command_id}) - {self.message}"
)


class CursorCloseError(CursorEvent):
def __init__(self, cursor: Cursor, exception: Exception):
super().__init__(
cursor, str(SQLErrorEvent(exception, "Exception while trying to close cursor"))
)


class CursorCancelError(CursorEvent):
def __init__(self, cursor: Cursor, exception: Exception):
super().__init__(
cursor, str(SQLErrorEvent(exception, "Exception while trying to cancel cursor"))
)


class CursorCreate(CursorEvent):
def __init__(self, cursor: Cursor):
super().__init__(cursor, "Created cursor")


class CursorClose(CursorEvent):
def __init__(self, cursor: Cursor):
super().__init__(cursor, "Closing cursor")


class CursorCancel(CursorEvent):
def __init__(self, cursor: Cursor):
super().__init__(cursor, "Cancelling cursor")
6 changes: 6 additions & 0 deletions dbt/adapters/databricks/events/other_events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from dbt.adapters.databricks.events.base import SQLErrorEvent


class QueryError(SQLErrorEvent):
def __init__(self, log_sql: str, exception: Exception):
super().__init__(exception, f"Exception while trying to execute query\n{log_sql}\n")
25 changes: 25 additions & 0 deletions dbt/adapters/databricks/events/pipeline_events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from abc import ABC


class PipelineEvent(ABC):
def __init__(self, pipeline_id: str, update_id: str, message: str):
self.pipeline_id = pipeline_id
self.update_id = update_id
self.message = message

def __str__(self) -> str:
return (
f"Pipeline(pipeline-id={self.pipeline_id}, update-id={self.update_id}) - {self.message}"
)


class PipelineRefresh(PipelineEvent):
def __init__(self, pipeline_id: str, update_id: str, model_name: str, state: str):
super().__init__(
pipeline_id, update_id, f"Refreshing model {model_name} with state {state}"
)


class PipelineRefreshError(PipelineEvent):
def __init__(self, pipeline_id: str, update_id: str, message: str):
super().__init__(pipeline_id, update_id, f"Error refreshing pipeline: {message}")
5 changes: 3 additions & 2 deletions tests/functional/adapter/long_sessions/test_long_sessions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import pytest
import os
from unittest import mock

import pytest
from dbt.tests import util
from tests.functional.adapter.long_sessions import fixtures

Expand Down Expand Up @@ -96,5 +97,5 @@ def test_long_sessions(self, project):
util.run_dbt(["--debug", "seed", "--target", "idle_sessions"])

_, log = util.run_dbt_and_capture(["--debug", "run", "--target", "idle_sessions"])
idle_count = log.count("closing idle connection") / 2
idle_count = log.count("Closing for idleness") / 2
assert idle_count > 0
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,11 @@ def test_wpm(self, project, profile_dir):
"alternate_warehouse",
]
)
assert "`source` using compute resource 'alternate_warehouse2'" in log

assert (
"using compute resource 'alternate_warehouse2' for model "
f"'`{project.database}`.`{project.test_schema}`.`source`'" in log
)

_, log = util.run_dbt_and_capture(
[
Expand All @@ -132,8 +136,14 @@ def test_wpm(self, project, profile_dir):
"alternate_warehouse",
]
)
assert "`target` using compute resource 'alternate_warehouse'" in log
assert "`target3` using default compute resource" in log
assert (
"using compute resource 'alternate_warehouse' for model "
f"'`{project.database}`.`{project.test_schema}`.`target`'" in log
)
assert (
f"using default compute resource for model '`{project.database}`."
f"`{project.test_schema}`.`target3`'" in log
)

_, log = util.run_dbt_and_capture(
[
Expand All @@ -145,6 +155,9 @@ def test_wpm(self, project, profile_dir):
"alternate_warehouse",
]
)
assert "`target_snap` using compute resource 'alternate_warehouse3'" in log
assert (
"using compute resource 'alternate_warehouse3' for model "
f"'`{project.database}`.`snapshots`.`target_snap`'" in log
)

util.check_relations_equal(project.adapter, ["target", "source"])
34 changes: 34 additions & 0 deletions tests/unit/events/test_base_events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from databricks.sql.exc import Error
from dbt.adapters.databricks.events.base import ErrorEvent
from dbt.adapters.databricks.events.base import SQLErrorEvent


class ErrorTestEvent(ErrorEvent):
def __init__(self, exception):
super().__init__(exception, "This is a test")


class TestErrorEvent:
def test_error_event__without_exception(self):
event = ErrorTestEvent(None)
assert str(event) == "This is a test: None"

def test_error_event__with_exception(self):
e = Exception("This is an exception")
event = ErrorTestEvent(e)
assert str(event) == "This is a test: This is an exception"


class TestSQLErrorEvent:
def test_sql_error_event__with_exception(self):
e = Exception("This is an exception")
event = SQLErrorEvent(e, "This is a test")
assert str(event) == "This is a test: This is an exception"

def test_sql_error_event__with_pysql_error(self):
e = Error("This is a pysql error", {"key": "value", "other": "other_value"})
event = SQLErrorEvent(e, "This is a test")
assert (
str(event) == "This is a test: This is a pysql error\n"
"Error properties: key=value, other=other_value"
)
Loading

0 comments on commit cdf5803

Please sign in to comment.