-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathimport_hub_main.py
169 lines (122 loc) · 6.35 KB
/
import_hub_main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
import streamlit as st
import pandas as pd
import difflib
import re
from fuzzywuzzy import process
from utils import load_config
from w4h_db_utils import create_w4h_instance, get_existing_databases, populate_tables, populate_subject_table
CONFIG_FILE = 'config.yaml'
def preprocess_string(s: str) -> str:
"""Preprocess the string by converting to lowercase, replacing underscores with spaces,
tokenizing, and then reconstructing without special characters.
Args:
s (str): String to preprocess
Returns:
str: Preprocessed string
"""
# Convert to lowercase and replace underscores with spaces
s = s.lower().replace('_', ' ')
# Tokenize the string and remove special characters
tokens = re.findall(r'\b\w+\b', s)
# Reconstruct the string with spaces
return ' '.join(tokens)
def find_closest_name(col_names: list, targets: str) -> str:
"""Find the closest column name based on substrings.
Args:
col_names (list): List of column names to search through
targets (str): String containing the target column names
Returns:
str: Closest column name
"""
score_threshold = 60
def_choice = col_names[0] # default
# Preprocess column names
preprocessed_to_original = {preprocess_string(col): col for col in col_names}
preprocessed_col_names = list(preprocessed_to_original.keys())
# Extract best match using fuzzywuzzy
match, score = process.extractOne(preprocess_string(targets), preprocessed_col_names)
return preprocessed_to_original[match] if score > score_threshold else def_choice
def populate_db(df: pd.DataFrame, db_name: str, mappings: dict, config_path: str):
"""Populate the database with the given dataframe.
Args:
df (pd.DataFrame): Dataframe containing the data to be inserted into the database
db_name (str): Name of the database to insert the data into
mappings (dict): Dictionary containing the mappings between the CSV columns and the database tables
config_path (str, optional): Path to the config file. Defaults to 'config.yaml'.
"""
st.write("Populating database...")
st.write(mappings)
populate_tables(df, db_name, mappings, config_path)
def main():
"""Main function for the streamlit app"""
# Load the config
config = load_config(config_file=CONFIG_FILE)
config_path = CONFIG_FILE
st.title("W4H Import Hub")
selected_db = None
# Choose between existing or new database
db_selection_options = ["Choose existing W4H database instance", "Create new W4H database instance"]
database_option = st.radio(
"Select an option",
db_selection_options
)
# Handling the chosen option
if database_option == db_selection_options[0]:
# `get_existing_databases()` retrieves the list of existing databases.
existing_databases = get_existing_databases(config_path) # This function needs to be implemented.
selected_db = st.selectbox("**Select an existing database**", existing_databases)
elif database_option == db_selection_options[1]:
new_db_name = st.text_input("Enter new w4h database instance name")
if st.button("Create"):
# Here, implement logic to create the new database with the name new_db_name.
create_w4h_instance(new_db_name, config_path) # This function needs to be implemented.
st.success(f"Database '{new_db_name}' created!")
selected_db = new_db_name
uploaded_file = st.file_uploader("Choose a CSV file", type="csv")
# option to populate subject table or feature time series tables
is_subjects_populated = st.checkbox("Populate subject table?")
if uploaded_file and is_subjects_populated:
# set subject table name
subject_tbl_name = st.text_input("Enter subject table name", value="subjects")
st.success("File uploaded!")
df = pd.read_csv(uploaded_file)
st.write("Columns in your CSV:")
st.write(df.columns)
if st.button("Populate Database"):
populate_subject_table(df, selected_db, config_path, user_tbl_name=subject_tbl_name)
st.success("Database populated!")
if uploaded_file and not is_subjects_populated:
st.success("File uploaded!")
df = pd.read_csv(uploaded_file)
st.write("Columns in your CSV:")
st.write(df.columns)
# Map columns
st.subheader("Mapping")
# Default selections based on column name similarity
default_timestamp = find_closest_name(df.columns, 'time timestamp date start_time end_time')
default_user_id = find_closest_name(df.columns, 'user id email')
timestamp_col = st.selectbox("**Select Timestamp Column**", df.columns, index=df.columns.get_loc(default_timestamp))
user_id_col = st.selectbox("**Select User ID Column**", df.columns, index=df.columns.get_loc(default_user_id))
# Foldable block for optional mappings
mappings = {
config['mapping']['columns']['timestamp']: timestamp_col,
config["mapping"]['columns']['user_id']: user_id_col,
}
table_mappings = {}
with st.expander("**Map Features to W4H Tables**", expanded=True):
st.write("Map your CSV columns to corresponding W4H tables.")
choices = ["None"] + list(df.columns)
for target_table_name in config['mapping']['tables']['time_series'] + config['mapping']['tables']['geo']:
target_table_label = ' '.join([label.capitalize() for label in target_table_name.replace('_', ' ').split()])
st.subheader(target_table_label)
def_choice = find_closest_name(choices, target_table_label)
mapped_col = st.selectbox("Select Corresponding Column", choices,
key=target_table_name, index=choices.index(def_choice))
table_mappings[target_table_name] = mapped_col if mapped_col != "None" else None
# Once mappings are set, allow the user to populate the database
if st.button("Populate Database"):
mappings = {**mappings, **table_mappings}
populate_db(df, selected_db, mappings, config_path)
st.success("Database populated!")
if __name__ == "__main__":
main()