Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.

Commit cb84a56

Browse files
authored
Merge pull request #14 from databendcloud/feat/support-session
feat: support session settings and copy options
2 parents b05a199 + 1cbf621 commit cb84a56

File tree

5 files changed

+67
-24
lines changed

5 files changed

+67
-24
lines changed

.coveragerc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
[run]
2+
plugins = Cython.Coverage
3+
source = databend_py

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ Use the next code to check connection:
2626
> user="user",
2727
> port="443",
2828
> password="password")
29+
> settings={"copy_purge":True,"force":True}
2930
> >>> print(client.execute("SELECT 1"))
3031
> ```
3132
@@ -37,7 +38,7 @@ Pure Client example:
3738
> ``` python
3839
> >>> from databend_py import Client
3940
> >>>
40-
> >>> client = Client.from_url('http://root@localhost:8000/db')
41+
> >>> client = Client.from_url('http://root@localhost:8000/db?secure=False&copy_purge=True')
4142
> >>>
4243
> >>> client.execute('SHOW TABLES')
4344
> [('test',)]

databend_py/client.py

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ def receive_data(self, next_uri: str):
4747
return raw_data
4848

4949
def receive_result(self, query, query_id=None, with_column_types=False):
50-
raw_data = self.connection.query(query, None)
50+
raw_data = self.connection.query(query)
5151
helper = self.helper()
5252
helper.response = raw_data
5353
helper.check_error()
@@ -57,7 +57,7 @@ def receive_result(self, query, query_id=None, with_column_types=False):
5757
return result.get_result()
5858

5959
def iter_receive_result(self, query, query_id=None, with_column_types=False):
60-
raw_data = self.connection.query(query, None)
60+
raw_data = self.connection.query(query)
6161
helper = self.helper()
6262
helper.response = raw_data
6363
helper.check_error()
@@ -168,7 +168,7 @@ def from_url(cls, url):
168168
169169
For example::
170170
171-
http://[user:password]@localhost:8000/default
171+
https://[user:password]@localhost:8000/default?secure=True
172172
http://[user:password]@localhost:8000/default
173173
databend://[user:password]@localhost:8000/default
174174
@@ -195,10 +195,12 @@ def from_url(cls, url):
195195
kwargs[name] = value
196196
elif name == 'secure':
197197
kwargs[name] = asbool(value)
198+
elif name == 'copy_purge':
199+
kwargs[name] = asbool(value)
198200
elif name in timeouts:
199201
kwargs[name] = float(value)
200202
else:
201-
settings[name] = value
203+
settings[name] = value # settings={'copy_purge':False}
202204
secure = kwargs.get("secure", False)
203205
kwargs['secure'] = secure
204206

@@ -246,6 +248,35 @@ def stage_csv_file(self, filename, data):
246248
def sync_csv_file_into_table(self, filename, data, table):
247249
start = time.time()
248250
stage_path = self.stage_csv_file(filename, data)
249-
_, _ = self.execute("COPY INTO %s FROM %s FILE_FORMAT = (type = CSV)" % (table, stage_path))
251+
copy_options = self.generate_copy_options()
252+
print(copy_options)
253+
_, _ = self.execute(
254+
f"COPY INTO {table} FROM {stage_path} FILE_FORMAT = (type = CSV)\
255+
PURGE = {copy_options['PURGE']} FORCE = {copy_options['FORCE']}\
256+
SIZE_LIMIT={copy_options['SIZE_LIMIT']} ON_ERROR = {copy_options['ON_ERROR']}")
250257
print("sync %s duration:%ss" % (filename, int(time.time() - start)))
251258
os.remove(filename)
259+
260+
def generate_copy_options(self):
261+
# copy options docs: https://databend.rs/doc/sql-commands/dml/dml-copy-into-table#copyoptions
262+
copy_options = {}
263+
if "copy_purge" in self.settings:
264+
copy_options["PURGE"] = self.settings["copy_purge"]
265+
else:
266+
copy_options["PURGE"] = False
267+
268+
if "force" in self.settings:
269+
copy_options["FORCE"] = self.settings["force"]
270+
else:
271+
copy_options["FORCE"] = False
272+
273+
if "size_limit" in self.settings:
274+
copy_options["SIZE_LIMIT"] = self.settings["size_limit"]
275+
else:
276+
copy_options["SIZE_LIMIT"] = 0
277+
if "on_error" in self.settings:
278+
copy_options["ON_ERROR"] = self.settings["on_error"]
279+
280+
else:
281+
copy_options["ON_ERROR"] = "abort"
282+
return copy_options

databend_py/connection.py

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -67,15 +67,16 @@ class Connection(object):
6767
# 'database': 'default'
6868
# }
6969
def __init__(self, host, port=None, user=defines.DEFAULT_USER, password=defines.DEFAULT_PASSWORD,
70-
database=defines.DEFAULT_DATABASE, secure=False, ):
70+
database=defines.DEFAULT_DATABASE, secure=False, copy_purge=False, session_settings=None):
7171
self.host = host
7272
self.port = port
7373
self.user = user
7474
self.password = password
7575
self.database = database
7676
self.secure = secure
77+
self.copy_purge = copy_purge
7778
self.session_max_idle_time = defines.DEFAULT_SESSION_IDLE_TIME
78-
self.session = {}
79+
self.client_session = session_settings
7980
self.additional_headers = dict()
8081
self.query_option = None
8182
self.context = Context()
@@ -102,26 +103,27 @@ def get_description(self):
102103
return '{}:{}'.format(self.host, self.port)
103104

104105
def disconnect(self):
105-
self.session = {}
106+
self.client_session = dict()
106107

107-
def query(self, statement, session):
108+
def query(self, statement):
108109
url = self.format_url()
109110
log.logger.debug(f"http sql: {statement}")
110111
query_sql = {'sql': statement, "string_fields": True}
111-
if session is not None:
112-
query_sql['session'] = session
112+
if self.client_session is not None and len(self.client_session) != 0:
113+
query_sql['session'] = self.client_session
113114
else:
114-
session = {"database": self.database}
115-
query_sql['session'] = session
115+
self.client_session = {"db": self.database}
116+
query_sql['session'] = self.client_session
116117
log.logger.debug(f"http headers {self.make_headers()}")
117118
response = requests.post(url,
118119
data=json.dumps(query_sql),
119120
headers=self.make_headers(),
120121
auth=HTTPBasicAuth(self.user, self.password),
121122
verify=True)
122-
123123
try:
124-
return json.loads(response.content)
124+
resp_dict = json.loads(response.content)
125+
self.client_session = resp_dict["session"]
126+
return resp_dict
125127
except Exception as err:
126128
log.logger.error(
127129
f"http error on {url}, SQL: {statement} content: {response.content} error msg:{str(err)}"
@@ -136,32 +138,32 @@ def format_url(self):
136138
return f"{self.schema}://{self.host}:{self.port}/v1/query/"
137139

138140
def reset_session(self):
139-
self.session = {}
141+
self.client_session = dict()
140142

141143
def next_page(self, next_uri):
142144
url = "{}://{}:{}{}".format(self.schema, self.host, self.port, next_uri)
143145
return requests.get(url=url, headers=self.make_headers())
144146

145147
# return a list of response util empty next_uri
146148
def query_with_session(self, statement):
147-
current_session = self.session
149+
current_session = self.client_session
148150
response_list = list()
149-
response = self.query(statement, current_session)
151+
response = self.query(statement)
150152
log.logger.debug(f"response content: {response}")
151153
response_list.append(response)
152154
start_time = time.time()
153155
time_limit = 12
154156
session = response['session']
155157
if session:
156-
self.session = session
158+
self.client_session = session
157159
while response['next_uri'] is not None:
158160
resp = self.next_page(response['next_uri'])
159161
response = json.loads(resp.content)
160162
log.logger.debug(f"Sql in progress, fetch next_uri content: {response}")
161163
self.check_error(response)
162164
session = response['session']
163165
if session:
164-
self.session = session
166+
self.client_session = session
165167
response_list.append(response)
166168
if time.time() - start_time > time_limit:
167169
log.logger.warning(

tests/test_client.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,12 @@ def assertHostsEqual(self, client, another, msg=None):
1212
self.assertEqual(client.connection.host, another, msg=msg)
1313

1414
def test_simple(self):
15-
c = Client.from_url('https://app.databend.com:443')
15+
c = Client.from_url('https://app.databend.com:443?copy_purge=True')
1616

1717
self.assertHostsEqual(c, 'app.databend.com')
1818
self.assertEqual(c.connection.database, 'default')
1919
self.assertEqual(c.connection.user, 'root')
20+
self.assertEqual(c.connection.copy_purge, True)
2021

2122
c = Client.from_url('https://host:443/db')
2223

@@ -31,6 +32,11 @@ def test_simple(self):
3132
c = Client.from_url("databend://root:root@localhost:8000/default?secure=false")
3233
self.assertEqual(c.connection.schema, "http")
3334

35+
def test_session_settings(self):
36+
session_settings = {"db": "database"}
37+
c = Client(host="localhost", port=8000, user="root", password="root", session_settings={"db": "database"})
38+
self.assertEqual(c.connection.client_session, session_settings)
39+
3440
def test_ordinary_query(self):
3541
select_test = '''
3642
select
@@ -55,8 +61,8 @@ def test_ordinary_query(self):
5561
self.assertEqual(r, ([('1', 'UInt8')], [(1,)]))
5662

5763
def test_batch_insert(self):
58-
c = Client.from_url(self.databend_url)
59-
64+
# with copy on purge
65+
c = Client(host="localhost", port=8000, user="root", password="root", settings={"copy_purge": True})
6066
c.execute('DROP TABLE IF EXISTS test')
6167
c.execute('CREATE TABLE if not exists test (x Int32,y VARCHAR)')
6268
c.execute('DESC test')

0 commit comments

Comments
 (0)