-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdatabaseConnection.py
85 lines (70 loc) · 2.85 KB
/
databaseConnection.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import mysql.connector
from mysql.connector import errorcode
from dotenv import load_dotenv
import os
# Load environment variables from .env file
load_dotenv()
class DatabaseConnection:
def __init__(self):
# Initialize connection and cursor attributes for later use.
self.connection = None
self.cursor = None
def get_cursor(self):
if self.connection and self.connection.is_connected():
self.cursor = self.connection.cursor()
return self.cursor
else:
print("Not connected to the database.")
return None
def connect(self):
try:
self.connection = mysql.connector.connect(
user=os.getenv('DB_USER'),
password=os.getenv('DB_PASSWORD'),
host=os.getenv('DB_HOST'),
database=os.getenv('DB_NAME'),
port=int(os.getenv('DB_PORT', 3306)) # Default to 3306
)
except mysql.connector.Error as db_error:
if db_error.errno == errorcode.ER_ACCESS_DENIED_ERROR:
print("Invalid username or password.")
elif db_error.errno == errorcode.ER_BAD_DB_ERROR:
print("Database does not exist.")
else:
print(f"Error: {db_error}")
raise # Re-raise exception after logging
# This function expects a tuple, so for query that only have one parameter, they mush have a trailing comma at the end
# for it to work properly
def execute_query(self, query, parameters=None):
try:
cursor = self.get_cursor()
if not cursor:
raise ValueError("Cannot execute query without a database connection.")
# Execute the query with or without parameters
if parameters:
cursor.execute(query, parameters)
else:
cursor.execute(query)
# For SELECT queries, fetch and return results
if query.strip().upper().startswith("SELECT"):
results = cursor.fetchall()
return results
# For non-SELECT queries, commit changes
self.connection.commit()
except Exception:
print("Something went wrong... here")
self.connection.rollback() # Rollback in case of error
raise
def close_connection(self):
# Close the database connection.
if self.connection and self.connection.is_connected():
self.connection.close()
else:
print("There is no open connection to DB.")
def __enter__(self):
# This allows us to use 'with' statement for queries.
self.connect()
return self
def __exit__(self, exc_type, exc_value, traceback):
# Ensure the connection is closed after using it.
self.close_connection()