|
4 | 4 | from flask_cors import CORS
|
5 | 5 | from apscheduler.schedulers.background import BackgroundScheduler
|
6 | 6 | from datetime import datetime, timedelta
|
| 7 | +import time |
7 | 8 |
|
8 | 9 | app = Flask(__name__)
|
9 | 10 | CORS(app, resources={r"/api/*": {"origins": "http://localhost:3000"}}, methods=["POST", "OPTIONS", "PUT"])
|
|
23 | 24 | 'user': 'root',
|
24 | 25 | 'password': 'root',
|
25 | 26 | 'host': 'warehouse_db',
|
26 |
| - 'port': '3308', |
| 27 | + 'port': '3309', |
27 | 28 | 'database': 'BrewandBrain_warehouse'
|
28 | 29 | }
|
| 30 | +try: |
| 31 | + connection = mysql.connector.connect(**warehouse_db_config) |
| 32 | + print("Connected to the warehouse database successfully!") |
| 33 | + connection.close() |
| 34 | +except mysql.connector.Error as err: |
| 35 | + print(f"Error connecting to the warehouse database: {err}") |
| 36 | +def wait_for_databases(): |
| 37 | + max_retries = 30 |
| 38 | + retry_interval = 2 # seconds |
| 39 | + |
| 40 | + operational_connection = None |
| 41 | + warehouse_connection = None |
| 42 | + |
| 43 | + for _ in range(max_retries): |
| 44 | + try: |
| 45 | + operational_connection = mysql.connector.connect(**db_config) |
| 46 | + print("Connected to the operational database successfully!") |
| 47 | + |
| 48 | + warehouse_connection = mysql.connector.connect(**warehouse_db_config) |
| 49 | + print("Connected to the warehouse database successfully!") |
| 50 | + |
| 51 | + operational_connection.close() |
| 52 | + warehouse_connection.close() |
| 53 | + return |
| 54 | + except mysql.connector.Error as err: |
| 55 | + print(f"Error connecting to databases: {err}") |
| 56 | + time.sleep(retry_interval) |
| 57 | + |
| 58 | + print("Unable to connect to databases after retries.") |
29 | 59 |
|
| 60 | +wait_for_databases() |
30 | 61 |
|
31 | 62 | def get_db_connection(config):
|
| 63 | + connection = None |
32 | 64 | try:
|
33 | 65 | connection = mysql.connector.connect(**config)
|
34 |
| - return connection |
35 | 66 | except mysql.connector.Error as err:
|
36 | 67 | print(f"Error connecting to the database: {err}")
|
37 |
| - return None |
| 68 | + return connection |
| 69 | + |
| 70 | +def close_connection(connection): |
| 71 | + if connection: |
| 72 | + connection.close() |
38 | 73 |
|
39 | 74 | # User-related functions
|
40 | 75 | def write_to_users(data):
|
@@ -144,69 +179,79 @@ def get_all_reservations():
|
144 | 179 | return results
|
145 | 180 | except mysql.connector.Error as err:
|
146 | 181 | print(f"Error Fetching Reservations: {err}")
|
| 182 | + |
147 | 183 | def perform_warehouse_process():
|
| 184 | + operational_connection = get_db_connection(db_config) |
| 185 | + warehouse_connection = get_db_connection(warehouse_db_config) |
| 186 | + |
148 | 187 | try:
|
149 |
| - # Connect to operational database |
150 |
| - operational_connection = get_db_connection(db_config) |
151 |
| - if operational_connection: |
152 |
| - operational_cursor = operational_connection.cursor(dictionary=True) |
| 188 | + if not operational_connection or not warehouse_connection: |
| 189 | + return # Return early if unable to connect to either database |
| 190 | + |
| 191 | + with operational_connection.cursor(dictionary=True) as operational_cursor, \ |
| 192 | + warehouse_connection.cursor() as warehouse_cursor: |
153 | 193 |
|
154 |
| - # Connect to warehouse database |
155 |
| - warehouse_connection = get_db_connection(warehouse_db_config) |
156 |
| - if warehouse_connection: |
157 |
| - warehouse_cursor = warehouse_connection.cursor() |
| 194 | + operational_cursor.execute(""" |
| 195 | + SELECT Users.UserID, Users.School, Users.Occupation, Reservations.StartTime, Reservations.EndTime |
| 196 | + FROM Users |
| 197 | + LEFT JOIN Reservations ON Users.UserID = Reservations.UserID |
| 198 | + """) |
| 199 | + users_data = operational_cursor.fetchall() |
158 | 200 |
|
| 201 | + for user in users_data: |
159 | 202 | try:
|
160 |
| - # Extract data from the operational database |
161 |
| - operational_cursor.execute(""" |
162 |
| - SELECT UserID, School, Occupation |
163 |
| - FROM Users |
164 |
| - """) |
165 |
| - users_data = operational_cursor.fetchall() |
166 |
| - |
167 |
| - # Transform and load data into the warehouse |
168 |
| - for user in users_data: |
169 |
| - # Fetch reservations for the current user |
170 |
| - operational_cursor.execute(""" |
171 |
| - SELECT StartTime, EndTime |
172 |
| - FROM Reservations |
173 |
| - WHERE UserID = %s |
174 |
| - """, (user['UserID'],)) |
175 |
| - reservations_data = operational_cursor.fetchall() |
176 |
| - |
177 |
| - # Insert user and reservation data into the warehouse |
178 |
| - for reservation in reservations_data: |
179 |
| - warehouse_cursor.execute( |
180 |
| - """ |
181 |
| - INSERT INTO UserSummary (UserID, School, Occupation, StartTime, EndTime) |
182 |
| - VALUES (%s, %s, %s, %s, %s) |
183 |
| - """, |
184 |
| - (user['UserID'], user['School'], user['Occupation'], |
185 |
| - reservation['StartTime'], reservation['EndTime']) |
186 |
| - ) |
187 |
| - |
188 |
| - # Commit changes to the warehouse database |
189 |
| - warehouse_connection.commit() |
190 |
| - print("Warehouse process completed successfully.") |
191 |
| - except Exception as e: |
192 |
| - print(f"Error during ETL process: {e}") |
193 |
| - warehouse_connection.rollback() # Rollback changes in case of an error |
194 |
| - |
195 |
| - warehouse_cursor.close() |
196 |
| - warehouse_connection.close() |
197 |
| - |
198 |
| - operational_cursor.close() |
199 |
| - operational_connection.close() |
| 203 | + warehouse_cursor.execute( |
| 204 | + """ |
| 205 | + INSERT INTO UserSummary (UserID, School, Occupation, StartTime, EndTime) |
| 206 | + VALUES (%s, %s, %s, %s, %s) |
| 207 | + ON DUPLICATE KEY UPDATE School=VALUES(School), Occupation=VALUES(Occupation), |
| 208 | + StartTime=VALUES(StartTime), EndTime=VALUES(EndTime) |
| 209 | + """, |
| 210 | + (user['UserID'], user['School'], user['Occupation'], |
| 211 | + user['StartTime'], user['EndTime']) |
| 212 | + ) |
| 213 | + except mysql.connector.Error as e: |
| 214 | + print(f"Error processing user {user['UserID']}: {e}") |
| 215 | + |
| 216 | + warehouse_connection.commit() |
| 217 | + print("Warehouse process completed successfully.") |
| 218 | + except mysql.connector.Error as e: |
| 219 | + print(f"Error during ETL process: {e}") |
| 220 | + warehouse_connection.rollback() |
| 221 | + finally: |
| 222 | + close_connection(operational_connection) |
| 223 | + close_connection(warehouse_connection) |
200 | 224 |
|
201 |
| - except Exception as e: |
202 |
| - print(f"Error performing warehouse process: {e}") |
203 | 225 |
|
204 | 226 | # Initialize the BackgroundScheduler
|
205 | 227 | scheduler = BackgroundScheduler()
|
206 | 228 |
|
207 |
| -# Add the scheduled job to run the ETL process every 168 hours or 1 week |
208 |
| -scheduler.add_job(perform_warehouse_process, 'interval', hours=168) |
| 229 | +# Add the scheduled job to run the ETL process every 1 week |
| 230 | +scheduler.add_job(perform_warehouse_process, 'interval', weeks=1) |
| 231 | + |
| 232 | +@app.route('/api/get-warehouse-data', methods=['GET']) |
| 233 | +def get_warehouse_data(): |
| 234 | + try: |
| 235 | + warehouse_connection = get_db_connection(warehouse_db_config) |
| 236 | + if not warehouse_connection: |
| 237 | + return jsonify(error='Unable to connect to the warehouse database'), 500 |
| 238 | + |
| 239 | + with warehouse_connection.cursor(dictionary=True) as warehouse_cursor: |
| 240 | + warehouse_cursor.execute("SELECT * FROM UserSummary") |
| 241 | + warehouse_data = warehouse_cursor.fetchall() |
209 | 242 |
|
| 243 | + return jsonify({'message': 'Warehouse data fetched successfully', 'warehouse_data': warehouse_data}) |
| 244 | + |
| 245 | + except Exception as e: |
| 246 | + print(f"Error fetching warehouse data: {e}") |
| 247 | + return jsonify(error='Error fetching warehouse data'), 500 |
| 248 | + |
| 249 | + finally: |
| 250 | + if warehouse_connection: |
| 251 | + warehouse_connection.close() |
| 252 | + print("Warehouse connection closed.") |
| 253 | + |
| 254 | + |
210 | 255 | @app.route('/api/create-account', methods=['POST'])
|
211 | 256 | def create_account():
|
212 | 257 | try:
|
|
0 commit comments