55from .util .helper import asbool , Helper
66from .util .escape import escape_params
77from .result import QueryResult
8- import json , operator , csv , uuid , requests , time , os
8+ import json , csv , uuid , requests , time
99
1010
1111class Client (object ):
@@ -19,6 +19,7 @@ def __init__(self, *args, **kwargs):
1919 self .connection = Connection (* args , ** kwargs )
2020 self .query_result_cls = QueryResult
2121 self .helper = Helper
22+ self ._debug = asbool (self .settings .get ('debug' , False ))
2223
2324 def __enter__ (self ):
2425 return self
@@ -29,39 +30,39 @@ def disconnect(self):
2930 def disconnect_connection (self ):
3031 self .connection .disconnect ()
3132
32- def data_generator (self , raw_data ):
33+ def _data_generator (self , raw_data ):
3334 while raw_data ['next_uri' ] is not None :
3435 try :
35- raw_data = self .receive_data (raw_data ['next_uri' ])
36+ raw_data = self ._receive_data (raw_data ['next_uri' ])
3637 yield raw_data
3738 except (Exception , KeyboardInterrupt ):
3839 self .disconnect ()
3940 raise
4041
41- def receive_data (self , next_uri : str ):
42+ def _receive_data (self , next_uri : str ):
4243 resp = self .connection .next_page (next_uri )
4344 raw_data = json .loads (resp .content )
4445 helper = self .helper ()
4546 helper .response = raw_data
4647 helper .check_error ()
4748 return raw_data
4849
49- def receive_result (self , query , query_id = None , with_column_types = False ):
50+ def _receive_result (self , query , query_id = None , with_column_types = False ):
5051 raw_data = self .connection .query (query )
5152 helper = self .helper ()
5253 helper .response = raw_data
5354 helper .check_error ()
54- gen = self .data_generator (raw_data )
55+ gen = self ._data_generator (raw_data )
5556 result = self .query_result_cls (
5657 gen , raw_data , with_column_types = with_column_types )
5758 return result .get_result ()
5859
59- def iter_receive_result (self , query , query_id = None , with_column_types = False ):
60+ def _iter_receive_result (self , query , query_id = None , with_column_types = False ):
6061 raw_data = self .connection .query (query )
6162 helper = self .helper ()
6263 helper .response = raw_data
6364 helper .check_error ()
64- gen = self .data_generator (raw_data )
65+ gen = self ._data_generator (raw_data )
6566 result = self .query_result_cls (
6667 gen , raw_data , with_column_types = with_column_types )
6768 _ , rows = result .get_result ()
@@ -104,16 +105,16 @@ def execute(self, query, params=None, with_column_types=False,
104105 if is_insert :
105106 # remove the `\n` '\s' `\t` in the SQL
106107 query = " " .join ([s .strip () for s in query .splitlines ()]).strip ()
107- rv = self .process_insert_query (query , params )
108+ rv = self ._process_insert_query (query , params )
108109 return [], rv
109110
110- column_types , rv = self .process_ordinary_query (
111+ column_types , rv = self ._process_ordinary_query (
111112 query , params = params , with_column_types = with_column_types ,
112113 query_id = query_id )
113114 return column_types , rv
114115
115116 # params = [(1,),(2,)] or params = [(1,2),(2,3)]
116- def process_insert_query (self , query , params ):
117+ def _process_insert_query (self , query , params ):
117118 insert_rows = 0
118119 if "values" in query :
119120 query = query .split ("values" )[0 ] + 'values'
@@ -128,32 +129,32 @@ def process_insert_query(self, query, params):
128129 batch_size = query .count (',' ) + 1
129130 if params is not None :
130131 tuple_ls = [tuple (params [i :i + batch_size ]) for i in range (0 , len (params ), batch_size )]
131- csv_data , filename = self .generate_csv_data (tuple_ls )
132- self .sync_csv_file_into_table (filename , csv_data , table_name , "CSV" )
132+ csv_data , filename = self ._generate_csv_data (tuple_ls )
133+ self ._sync_csv_file_into_table (filename , csv_data , table_name , "CSV" )
133134 insert_rows = len (tuple_ls )
134135
135136 return insert_rows
136137
137- def process_ordinary_query (self , query , params = None , with_column_types = False ,
138+ def _process_ordinary_query (self , query , params = None , with_column_types = False ,
138139 query_id = None ):
139140 if params is not None :
140- query = self .substitute_params (
141+ query = self ._substitute_params (
141142 query , params , self .connection .context
142143 )
143- return self .receive_result (query , query_id = query_id , with_column_types = with_column_types , )
144+ return self ._receive_result (query , query_id = query_id , with_column_types = with_column_types , )
144145
145146 def execute_iter (self , query , params = None , with_column_types = False ,
146147 query_id = None , settings = None ):
147148 if params is not None :
148- query = self .substitute_params (
149+ query = self ._substitute_params (
149150 query , params , self .connection .context
150151 )
151- return self .iter_receive_result (query , query_id = query_id , with_column_types = with_column_types )
152+ return self ._iter_receive_result (query , query_id = query_id , with_column_types = with_column_types )
152153
153- def iter_process_ordinary_query (self , query , with_column_types = False , query_id = None ):
154- return self .iter_receive_result (query , query_id = query_id , with_column_types = with_column_types )
154+ def _iter_process_ordinary_query (self , query , with_column_types = False , query_id = None ):
155+ return self ._iter_receive_result (query , query_id = query_id , with_column_types = with_column_types )
155156
156- def substitute_params (self , query , params , context ):
157+ def _substitute_params (self , query , params , context ):
157158 if not isinstance (params , dict ):
158159 raise ValueError ('Parameters are expected in dict form' )
159160
@@ -197,6 +198,8 @@ def from_url(cls, url):
197198 elif name == 'copy_purge' :
198199 kwargs [name ] = asbool (value )
199200 settings [name ] = asbool (value )
201+ elif name == 'debug' :
202+ settings [name ] = asbool (value )
200203 elif name in timeouts :
201204 kwargs [name ] = float (value )
202205 else :
@@ -224,37 +227,48 @@ def from_url(cls, url):
224227
225228 return cls (host , ** kwargs )
226229
227- def generate_csv_data (self , bindings ):
230+ def _generate_csv_data (self , bindings ):
228231 file_name = f'{ uuid .uuid4 ()} .csv'
229232 buffer = io .StringIO ()
230233 csvwriter = csv .writer (buffer , delimiter = ',' , quoting = csv .QUOTE_MINIMAL )
231234 csvwriter .writerows (bindings )
232235 buffer .seek (0 ) # Move the buffer's position to the beginning
233236 return buffer .getvalue (), file_name
234237
235- def get_file_data (self , filename ):
238+ def _get_file_data (self , filename ):
236239 with open (filename , "r" ) as f :
237240 return io .StringIO (f .read ())
238241
239242 def stage_csv_file (self , filename , data ):
240243 stage_path = "@~/%s" % filename
244+
245+ start_presign_time = time .time ()
241246 _ , row = self .execute ('presign upload %s' % stage_path )
247+ if self ._debug :
248+ print ("upload: presign file:%s duration:%ss" % (filename , time .time () - start_presign_time ))
249+
242250 presigned_url = row [0 ][2 ]
243251 headers = json .loads (row [0 ][1 ])
244- resp = requests .put (presigned_url , headers = headers , data = data )
245- resp .raise_for_status ()
252+
253+ start_upload_time = time .time ()
254+ try :
255+ resp = requests .put (presigned_url , headers = headers , data = data )
256+ resp .raise_for_status ()
257+ finally :
258+ if self ._debug :
259+ print ("upload: put file:%s duration:%ss" % (filename , time .time () - start_upload_time ))
246260 return stage_path
247261
248- def sync_csv_file_into_table (self , filename , data , table , file_type ):
262+ def _sync_csv_file_into_table (self , filename , data , table , file_type ):
249263 start = time .time ()
250264 stage_path = self .stage_csv_file (filename , data )
251- copy_options = self .generate_copy_options ()
265+ copy_options = self ._generate_copy_options ()
252266 _ , _ = self .execute (
253267 f"COPY INTO { table } FROM { stage_path } FILE_FORMAT = (type = { file_type } RECORD_DELIMITER = '\r \n ')\
254268 PURGE = { copy_options ['PURGE' ]} FORCE = { copy_options ['FORCE' ]} \
255269 SIZE_LIMIT={ copy_options ['SIZE_LIMIT' ]} ON_ERROR = { copy_options ['ON_ERROR' ]} " )
256- print ( "sync %s duration:%ss" % ( filename , int ( time . time () - start )))
257- # os.remove( filename)
270+ if self . _debug :
271+ print ( "upload: copy %s duration:%ss" % ( filename , int ( time . time () - start )) )
258272
259273 def upload (self , file_name , table_name , file_type = None ):
260274 """
@@ -268,10 +282,10 @@ def upload(self, file_name, table_name, file_type=None):
268282 file_type = file_name .split ("." )[1 ].upper ()
269283 else :
270284 file_type = "CSV"
271- file_data = self .get_file_data (file_name )
272- self .sync_csv_file_into_table (file_name , file_data , table_name , file_type )
285+ file_data = self ._get_file_data (file_name )
286+ self ._sync_csv_file_into_table (file_name , file_data , table_name , file_type )
273287
274- def generate_copy_options (self ):
288+ def _generate_copy_options (self ):
275289 # copy options docs: https://databend.rs/doc/sql-commands/dml/dml-copy-into-table#copyoptions
276290 copy_options = {}
277291 if "copy_purge" in self .settings :
0 commit comments