Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.

Commit 246e2c5

Browse files
authored
Merge pull request #90 from databendlabs/fix/batch-insert
fix: batch insert using attachment
2 parents 15c18ce + 95ad606 commit 246e2c5

File tree

3 files changed

+29
-10
lines changed

3 files changed

+29
-10
lines changed

databend_py/client.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,6 @@ def _process_insert_query(self, query, params):
133133
query = query.split("VALUES")[0] + "VALUES"
134134
if len(query.split(" ")) < 3:
135135
raise Exception("Not standard insert/replace statement")
136-
table_name = query.split(" ")[2]
137136
batch_size = query.count(",") + 1
138137
if params is not None and len(params) > 0:
139138
if isinstance(params[0], tuple):
@@ -147,7 +146,7 @@ def _process_insert_query(self, query, params):
147146
for i in range(0, len(params), batch_size)
148147
]
149148
insert_rows = len(tuple_ls)
150-
self._uploader.upload_to_table_by_copy(table_name, tuple_ls)
149+
self._uploader.upload_to_table_by_attachment(query, tuple_ls)
151150
return insert_rows
152151

153152
def _process_ordinary_query(

databend_py/uploader.py

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,13 @@
1010

1111
class DataUploader:
1212
def __init__(
13-
self,
14-
client,
15-
connection,
16-
settings,
17-
default_stage_dir="@~",
18-
debug=False,
19-
compress=False,
13+
self,
14+
client,
15+
connection,
16+
settings,
17+
default_stage_dir="@~",
18+
debug=False,
19+
compress=False,
2020
):
2121
# TODO: make it depends on Connection instead of Client
2222
self.client = client
@@ -34,6 +34,14 @@ def upload_to_table_by_copy(self, table_name, data):
3434
self._upload_to_presigned_url(presigned_url, headers, data)
3535
self._execute_copy(table_name, stage_path, "CSV")
3636

37+
def upload_to_table_by_attachment(self, sql_statement, data):
38+
if len(data) == 0:
39+
return
40+
stage_path = self._gen_stage_path(self.default_stage_dir)
41+
presigned_url, headers = self._execute_presign(stage_path)
42+
self._upload_to_presigned_url(presigned_url, headers, data)
43+
self._execute_with_attachment(sql_statement, stage_path, "CSV")
44+
3745
def replace_into_table(self, table_name, conflict_keys, data):
3846
"""
3947
:param table_name: table name
@@ -175,6 +183,8 @@ def _make_attachment(self, sql_statement, stage_path, file_type):
175183

176184
file_format_options = {}
177185
file_format_options["type"] = file_type
186+
file_format_options["RECORD_DELIMITER"] = '\r\n'
187+
file_format_options["COMPRESSION"] = "AUTO"
178188

179189
data = {
180190
"sql": sql_statement,

tests/test_client.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,16 @@ def test_batch_insert_with_dict_list(self):
141141
_, ss = c.execute("select * from test")
142142
self.assertEqual(ss, [(5, "cc"), (6, "dd")])
143143

144+
def test_batch_insert_with_dict_multi_fields(self):
145+
c = Client.from_url(self.databend_url)
146+
c.execute("DROP TABLE IF EXISTS test")
147+
c.execute("CREATE TABLE if not exists test (id int, x Int32, y VARCHAR, z Int32)")
148+
c.execute("DESC test")
149+
_, r1 = c.execute("INSERT INTO test (x,y) VALUES", [{"x": 7, "y": "ee"}, {"x": 8, "y": "ff"}])
150+
self.assertEqual(r1, 2)
151+
_, ss = c.execute("select * from test")
152+
self.assertEqual(ss, [('NULL', 7, 'ee', 'NULL'), ('NULL', 8, 'ff', 'NULL')])
153+
144154
def test_iter_query(self):
145155
client = Client.from_url(self.databend_url)
146156
result = client.execute_iter("select 1", with_column_types=False)
@@ -167,7 +177,7 @@ def test_replace(self):
167177
client.replace("default", "test_replace", ["x"], [(1, "a"), (2, "b")])
168178
client.replace("default", "test_replace", ["x"], [(1, "c"), (2, "d")])
169179
_, upload_res = client.execute("select * from test_replace")
170-
self.assertEqual(upload_res, [(1, "c\r"), (2, "d\r")])
180+
self.assertEqual(upload_res, [(1, "c"), (2, "d")])
171181

172182
def test_insert_with_compress(self):
173183
client = Client.from_url(self.databend_url + "?compress=True&debug=True")

0 commit comments

Comments
 (0)