-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutil.py
123 lines (92 loc) · 4.22 KB
/
util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import pandas as pd
from tqdm import tqdm
overwrite_mapping = {
'server_name': 'your_servername',
}
server_name = 'your_servername'
thread_amount = 8
def escape_pg_value(value):
"""Escape a value for PostgreSQL queries"""
if isinstance(value, str):
return value.replace("'", "''")
else:
return str(value)
def map_dataframe(data, column_names, mapping):
df = pd.DataFrame(data, columns=column_names)
for new_col, (old_col, func) in mapping.items():
if old_col is not None:
df[new_col] = df[old_col].apply(func)
if new_col in overwrite_mapping:
df[new_col] = overwrite_mapping[new_col]
for new_col, (_, func) in _filter_mapping_by_none(mapping).items():
df[new_col] = df[new_col].apply(func)
new_columns = list(mapping.keys()) + [col for col in column_names if col in mapping]
# remove duplicates
new_columns = list(dict.fromkeys(new_columns))
new_df = df[new_columns]
return new_df
def generate_sql_insert_statement(df, table_name):
columns = ', '.join(df.columns)
values = []
statements = []
# Wrap the df.iterrows() with tqdm for the progress bar
for index, row in tqdm(df.iterrows(), total=df.shape[0], desc="Generating SQL statements"):
row_values = ', '.join([
"NULL" if value in ('None', None, 'NaN', 'nan') or value != value
else (f"{value}" if 'nextval' in str(value)
else (
"NaN" if pd.isna(value) else (f"'{value}'" if isinstance(value, str) else str(value))
))
for value in row
])
values.append(f"({row_values})")
if (index + 1) % 500 == 0:
values_str = ',\n'.join(values)
sql = f"INSERT INTO {table_name} ({columns})\nVALUES\n{values_str} ON CONFLICT DO NOTHING;"
statements.append(sql)
values = []
if len(values) > 0:
values_str = ',\n'.join(values)
sql = f"INSERT INTO {table_name} ({columns})\nVALUES\n{values_str} ON CONFLICT DO NOTHING;"
statements.append(sql)
return statements
def get_df_from_table(cursor, table_name, where=None):
query = f"SELECT * FROM {table_name}"
if where:
query += f" WHERE {where}"
query += ";"
cursor.execute(query)
columns = [desc[0] for desc in cursor.description]
data = cursor.fetchall()
return pd.DataFrame(data, columns=columns)
def transform_data(conn_synapse, mapping, source_table_names, target_table_name, merge_on=None):
cursor_synapse = conn_synapse.cursor()
source_data_df = None
# check source_table_names length
if len(source_table_names) == 1:
cursor_synapse.execute(f"SELECT * FROM {source_table_names[0]};")
source_data_columns = [desc[0] for desc in cursor_synapse.description]
source_data_data = cursor_synapse.fetchall()
source_data_df = pd.DataFrame(source_data_data, columns=source_data_columns)
if len(source_table_names) == 2:
# iterate through source_table_names
df_list = []
for source_table_name in source_table_names:
cursor_synapse.execute(f"SELECT * FROM {source_table_name};")
source_data_columns = [desc[0] for desc in cursor_synapse.description]
source_data_data = cursor_synapse.fetchall()
source_data_df = pd.DataFrame(source_data_data, columns=source_data_columns)
df_list.append(source_data_df)
# join access_tokens and devices tables on device_id
source_data_df = pd.merge(df_list[0], df_list[1], on=merge_on, how='left', suffixes=('', '_y'))
df = map_dataframe(source_data_df, source_data_df.columns.tolist(), mapping)
insert_statements = generate_sql_insert_statement(df, target_table_name)
return insert_statements
def get_nid_from_table(cursor, table_name, column_name, column_value, nid_column_name):
cursor.execute(f"SELECT * FROM {table_name} WHERE {column_name} = '{column_value}';")
columns = [desc[0] for desc in cursor.description]
data = cursor.fetchall()
df = pd.DataFrame(data, columns=columns)
return df[nid_column_name].values[0]
def _filter_mapping_by_none(mapping_dict):
return {k: v for k, v in mapping_dict.items() if v[0] is None}