-
Notifications
You must be signed in to change notification settings - Fork 2
/
data_loading.py
141 lines (114 loc) · 5.17 KB
/
data_loading.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
import os
import streamlit as st
import pandas as pd
from utils import preprocess_sensor_data, infer_aggregation_frequency
from sqlalchemy.sql import select
from sqlalchemy import Table, MetaData
from sqlalchemy import inspect, create_engine
def get_db_connection(db_user, db_pass, db_ip, db_port, db_name):
try:
connection_url = f'leanxcale://{db_user}:{db_pass}@{db_ip}:{db_port}/{db_name}?autocommit=False¶llel=True?txn_mode=NO_CONFLICTS_NO_LOGGING'
eng = create_engine(connection_url)
return eng
except Exception as e:
st.error(f"Error connecting to the database: {e}")
return None
def get_table_names(db_connection):
try:
if db_connection is None:
raise ValueError("No database connection available.")
return inspect(db_connection).get_table_names()
except Exception as e:
st.error(f"Error fetching table names: {e}")
return []
def load_data_from_db(table_name, engine):
try:
if engine is None:
raise ValueError("No database connection available.")
metadata = MetaData(bind=engine)
table = Table(table_name, metadata, autoload=True)
query = select([table])
with engine.connect() as connection:
result = connection.execute(query)
df = pd.DataFrame(result.fetchall(), columns=result.keys())
df.columns = df.columns.str.lower()
df.set_index('timestamp', inplace=True)
return df
except Exception as e:
st.error(f"Error loading data from table {table_name}: {e}")
return pd.DataFrame()
# Database connection settings
DB_USER = 'app'
DB_PASS = 'app'
DB_IP = '0.0.0.0'
DB_PORT = '1529'
DB_NAME = 'MOH'
# Read environment variables if needed
DB_USER = os.getenv('DB_USER', DB_USER)
DB_PASS = os.getenv('DB_PASS', DB_PASS)
DB_IP = os.getenv('DB_IP', DB_IP)
DB_PORT = os.getenv('DB_PORT', DB_PORT)
DB_NAME = os.getenv('DB_NAME', DB_NAME)
engine = get_db_connection(DB_USER, DB_PASS, DB_IP, DB_PORT, DB_NAME)
def show():
st.title('Data Loading, Annotation, and Storage')
st.write("""
Use this page to load sensor data from the LeanXscale (LXS) database. You can retrieve aggregated data and perform various analyses on the raw data using these aggregations.
""")
table_names = get_table_names(engine)
table_names = [col for col in table_names if "hours" in col.lower()]
selected_table = st.sidebar.selectbox(
"Select a table (machine) to load data from",
options=table_names,
placeholder="Choose a machine"
)
st.session_state['ORIG_FREQ'] = st.number_input('Set frequency of the original data (in seconds)', min_value=1,
value=10)
if selected_table:
with st.spinner('Loading data...'):
readings = load_data_from_db(selected_table, engine)
st.session_state['raw_readings'] = readings
readings, sensors = preprocess_sensor_data(readings)
st.session_state['readings'] = readings
st.session_state['sensors'] = sensors
inferred_freq = infer_aggregation_frequency(readings)
if inferred_freq is not None:
st.success(f"Aggregation frequency: {inferred_freq} seconds")
st.session_state['AGG_FREQ'] = int(inferred_freq / st.session_state['ORIG_FREQ'])
else:
st.warning("Could not infer aggregation frequency from the data")
with st.expander("Show/Hide Annotated Data"):
st.dataframe(readings)
tags_path = "data/tags.csv"
tags = pd.read_csv(tags_path, header=0)
tags.columns = tags.columns.str.lower()
tags.tag = tags.tag.str.lower()
tags = tags[tags.tag.isin(readings.columns)].reset_index(drop=True)
tags.columns = [col.replace(' ', '_') for col in tags.columns]
with st.expander("Show/Hide Data Description"):
st.dataframe(tags)
selected_machine = st.selectbox('Select a Machine:', tags['machine_group'].unique())
st.write(f'You have selected Equipment: **{selected_machine}**')
machine_tags = tags[
tags['machine_group'].str.contains(selected_machine[1:], case=False, na=False)
].reset_index(drop=True)
st.session_state['tags'] = machine_tags
st.write(f'**{machine_tags.shape[0]}** Sensors are monitoring the **{selected_machine}** equipment')
persist_data = st.checkbox("Export annotated dataset?")
if persist_data:
if 'readings' in st.session_state:
csv = st.session_state['readings'].to_csv().encode('utf-8')
st.download_button(
label="Download data as CSV",
data=csv,
file_name='annotated_readings.csv',
mime='text/csv',
)
else:
st.warning("No sensor data available. Please select data from the side panel.")
else:
st.success(
"Data successfully prepared for visualization. Please proceed to the **Visualization** page to "
"explore your data.")
else:
st.warning("Please select a Machine for analysis.")