4
4
from typing import Any , Optional , Tuple
5
5
6
6
import agate
7
+ import clickhouse_connect
7
8
import dbt .exceptions
8
- from clickhouse_driver import Client , errors
9
+ from clickhouse_connect .driver .client import Client as ChClient
10
+ from clickhouse_connect .driver .exceptions import DatabaseError , Error
9
11
from dbt .adapters .base import Credentials
10
12
from dbt .adapters .sql import SQLConnectionManager
11
13
from dbt .contracts .connection import Connection
12
14
from dbt .events import AdapterLogger
13
- from dbt .version import __version__ as dbt_version
14
15
15
16
logger = AdapterLogger ('clickhouse' )
16
17
17
18
18
19
@dataclass
19
20
class ClickhouseCredentials (Credentials ):
21
+ """
22
+ ClickHouse connectio credentials data class.
23
+ """
24
+
25
+ # pylint: disable=too-many-instance-attributes
20
26
host : str = 'localhost'
21
27
port : Optional [int ] = None
22
28
user : Optional [str ] = 'default'
@@ -52,37 +58,38 @@ def __post_init__(self):
52
58
self .database = None
53
59
54
60
def _connection_keys (self ):
55
- return ( 'host' , 'port' , 'user' , 'schema' , 'secure' , 'verify' )
61
+ return 'host' , 'port' , 'user' , 'schema' , 'secure' , 'verify'
56
62
57
63
58
64
class ClickhouseConnectionManager (SQLConnectionManager ):
65
+ """
66
+ ClickHouse Connector connection manager.
67
+ """
68
+
59
69
TYPE = 'clickhouse'
60
70
61
71
@contextmanager
62
72
def exception_handler (self , sql ):
63
73
try :
64
74
yield
65
-
66
- except errors .ServerException as e :
67
- logger .debug ('Clickhouse error: {}' , str (e ))
68
-
75
+ except DatabaseError as err :
76
+ logger .debug ('Clickhouse error: {}' , str (err ))
69
77
try :
70
78
# attempt to release the connection
71
79
self .release ()
72
- except errors . Error :
80
+ except Error :
73
81
logger .debug ('Failed to release connection!' )
74
- pass
75
82
76
- raise dbt .exceptions .DatabaseException (str (e ).strip ()) from e
83
+ raise dbt .exceptions .DatabaseException (str (err ).strip ()) from err
77
84
78
- except Exception as e :
85
+ except Exception as exp :
79
86
logger .debug ('Error running SQL: {}' , sql )
80
87
logger .debug ('Rolling back transaction.' )
81
88
self .release ()
82
- if isinstance (e , dbt .exceptions .RuntimeException ):
89
+ if isinstance (exp , dbt .exceptions .RuntimeException ):
83
90
raise
84
91
85
- raise dbt .exceptions .RuntimeException (e ) from e
92
+ raise dbt .exceptions .RuntimeException (exp ) from exp
86
93
87
94
@classmethod
88
95
def open (cls , connection ):
@@ -94,34 +101,32 @@ def open(cls, connection):
94
101
kwargs = {}
95
102
96
103
try :
97
- handle = Client (
104
+ handle = clickhouse_connect . get_client (
98
105
host = credentials .host ,
99
106
port = credentials .port ,
100
107
database = 'default' ,
101
- user = credentials .user ,
108
+ username = credentials .user ,
102
109
password = credentials .password ,
103
- client_name = f'dbt-{ dbt_version } ' ,
104
- secure = credentials .secure ,
105
- verify = credentials .verify ,
110
+ interface = 'https' if credentials .secure else 'http' ,
111
+ compress = False if credentials .compression == '' else bool (credentials .compression ),
106
112
connect_timeout = credentials .connect_timeout ,
107
113
send_receive_timeout = credentials .send_receive_timeout ,
108
- sync_request_timeout = credentials .sync_request_timeout ,
109
- compress_block_size = credentials .compress_block_size ,
110
- compression = False if credentials .compression == '' else credentials .compression ,
114
+ verify = credentials .verify ,
115
+ query_limit = 0 ,
111
116
** kwargs ,
112
117
)
113
118
connection .handle = handle
114
119
connection .state = 'open'
115
- except errors . ServerException as e :
120
+ except DatabaseError as err :
116
121
logger .debug (
117
122
'Got an error when attempting to open a clickhouse connection: \' {}\' ' ,
118
- str (e ),
123
+ str (err ),
119
124
)
120
125
121
126
connection .handle = None
122
127
connection .state = 'fail'
123
128
124
- raise dbt .exceptions .FailedToConnectException (str (e ))
129
+ raise dbt .exceptions .FailedToConnectException (str (err ))
125
130
126
131
return connection
127
132
@@ -135,9 +140,12 @@ def cancel(self, connection):
135
140
logger .debug ('Cancel query \' {}\' ' , connection_name )
136
141
137
142
@classmethod
138
- def get_table_from_response (cls , response , columns ) -> agate .Table :
139
- column_names = [x [0 ] for x in columns ]
140
-
143
+ def get_table_from_response (cls , response , column_names ) -> agate .Table :
144
+ """
145
+ Build agate tabel from response.
146
+ :param response: ClickHouse query result
147
+ :param column_names: Table column names
148
+ """
141
149
data = []
142
150
for row in response :
143
151
data .append (dict (zip (column_names , row )))
@@ -152,28 +160,37 @@ def execute(
152
160
client = conn .handle
153
161
154
162
with self .exception_handler (sql ):
155
- logger .debug (
156
- 'On {connection_name}: {sql}' .format (connection_name = conn .name , sql = f'{ sql } ...' ),
157
- )
163
+ logger .debug (f'On { conn .name } : { sql } ...' )
158
164
159
165
pre = time .time ()
160
166
161
- response , columns = client .execute (sql , with_column_types = True )
167
+ if fetch :
168
+ query_result = client .query (sql )
169
+ else :
170
+ query_result = client .command (sql )
162
171
163
172
status = self .get_status (client )
164
173
165
- logger .debug (
166
- 'SQL status: {status} in {elapsed:0.2f} seconds' .format (
167
- status = status , elapsed = (time .time () - pre )
168
- ),
169
- )
174
+ logger .debug (f'SQL status: { status } in { (time .time () - pre ):.2f} seconds' )
170
175
171
176
if fetch :
172
- table = self .get_table_from_response (response , columns )
177
+ table = self .get_table_from_response (
178
+ query_result .result_set , query_result .column_names
179
+ )
173
180
else :
174
181
table = dbt .clients .agate_helper .empty_table ()
175
182
return status , table
176
183
184
+ def insert_table_data (self , table_name , table : agate .Table ):
185
+ """
186
+ Insert data into ClickHouse table
187
+ :param table_name: Target table name
188
+ :param table: Data to be inserted
189
+ """
190
+ client : ChClient = self .get_thread_connection ().handle
191
+ with self .exception_handler (f'INSERT INTO { table_name } ' ):
192
+ client .insert (table_name , table .rows , table .column_names )
193
+
177
194
def add_query (
178
195
self ,
179
196
sql : str ,
@@ -186,33 +203,33 @@ def add_query(
186
203
client = conn .handle
187
204
188
205
with self .exception_handler (sql ):
189
- logger .debug (
190
- 'On {connection_name}: {sql}' .format (connection_name = conn .name , sql = f'{ sql } ...' )
191
- )
206
+ logger .debug (f'On { conn .name } : { sql } ...' )
192
207
193
208
pre = time .time ()
194
- client .execute (sql )
209
+ client .query (sql )
195
210
196
211
status = self .get_status (client )
197
212
198
- logger .debug (
199
- 'SQL status: {status} in {elapsed:0.2f} seconds' .format (
200
- status = status , elapsed = (time .time () - pre )
201
- )
202
- )
213
+ logger .debug (f'SQL status: { status } in { (time .time () - pre ):0.2f} seconds' )
203
214
204
215
return conn , None
205
216
206
217
@classmethod
207
218
def get_credentials (cls , credentials ):
219
+ """
220
+ Returns ClickHouse credentials
221
+ """
208
222
return credentials
209
223
210
224
@classmethod
211
- def get_status (cls , cursor ):
225
+ def get_status (cls , _ ):
226
+ """
227
+ Returns connection status
228
+ """
212
229
return 'OK'
213
230
214
231
@classmethod
215
- def get_response (cls , cursor ):
232
+ def get_response (cls , _ ):
216
233
return 'OK'
217
234
218
235
def begin (self ):
0 commit comments