-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmysql_utils.py
76 lines (68 loc) · 1.93 KB
/
mysql_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import pandas as pd
import common_utils
import mysql.connector
# This is the default MYSQL port for running on the server.
# If running on a local machine with ports forwarded to the main
# server, the appropriate port must be passed on the command line.
# If the host, user, or password must be changed they should be changed
# in the get_connection() function below.
PORT = "3306"
def set_port(new_port: str):
global PORT
PORT = new_port
def get_connection() -> mysql.connector.MySQLConnection:
conn = mysql.connector.connect(
host = "localhost",
port = PORT,
user = "web",
password = "RZhRwsau6HZrMUXf",
autocommit = True,
)
return conn
def make_sql_values(data) -> str:
data = data.applymap(lambda x: "NULL" if pd.isna(x) else "'" + str(x).replace("'", "''") + "'")
data = data.apply(
lambda row: "(" + ",".join(row) + ")",
axis = "columns"
)
return ",".join(data)
def upload_in_chunks(
data: pd.DataFrame,
columns: list[str],
cursor,
db: str,
table_name: str,
batch_size: int,
):
columns_sql = ", ".join([f"`{x}`" for x in columns])
for i in range(0, data.shape[0], batch_size):
common_utils.log(f"{i} / {data.shape[0]}")
values = make_sql_values(data.loc[i : (i + batch_size - 1), columns])
cursor.execute(
f"""
INSERT INTO `{db}`.`{table_name}`
({columns_sql})
VALUES {values};
"""
)
def download_in_chunks(
db: str,
table_name: str,
columns: list[str],
cursor, # expects a non-dictionary cursor
num_rows: int,
batch_size: int,
):
sql_columns = ",".join(f"`{x}`" for x in columns)
row_list = []
for start_row in range(0, num_rows, batch_size):
common_utils.log(f"{start_row} / {num_rows}")
cursor.execute(
f"""
SELECT {sql_columns}
FROM `{db}`.`{table_name}`
LIMIT {start_row}, {batch_size};
"""
)
row_list += cursor.fetchall()
return pd.DataFrame.from_records(row_list, columns=columns)