forked from mcstastney/TeamGreen
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdb_utils.py
305 lines (238 loc) · 9.28 KB
/
db_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
import mysql.connector
from config import HOST, USER, PASSWORD
db_name = "online_shop"
# Create an exception class called DbConnectionError
class DbConnectionError(Exception):
pass
# Connect to 'shop_online' MySQL database.
# Ensure you have entered your details in the config.py file
def _connect_to_specific_db(database_name):
mydb = mysql.connector.connect(
host=HOST,
user=USER,
password=PASSWORD,
auth_plugin='mysql_native_password',
database = database_name
)
print("Connected to DB: %s" % db_name)
return mydb
# Show all tables in database
def _show_all_table_in_specific_db(database_name):
specific_db = _connect_to_specific_db(database_name)
mycursor = specific_db.cursor()
mycursor.execute("SHOW TABLES")
for x in mycursor:
print(x)
# Select all records in the 'products' table in the 'online_shop' DB
def get_all_records():
try:
# Connect to DB
db_name = "online_shop"
db_connection = _connect_to_specific_db(db_name)
# Add cursor object to execute queries / actions in DB
my_cursor = db_connection.cursor()
print("Connected to DB: %s" % db_name)
# Query to select all data in 'products' table
query = """select p.product_id, p.product_name, c.category_name, p.price, p.stock_quantity
from products as p
inner join categories as c
where p.product_category = c.category_id
order by p.product_name;"""
my_cursor.execute(query)
result = my_cursor.fetchall() # this is a list with db records where each record is a tuple
# Close the cursor after query executed
my_cursor.close()
# Print and Return the fetched records
print(result)
return result
except Exception:
raise DbConnectionError("Failed to read data from DB")
finally:
if db_connection:
db_connection.close()
print("DB connection is closed")
# Get categories of products from the database
def get_categories():
try:
db_name = "online_shop"
db_connection = _connect_to_specific_db(db_name)
my_cursor = db_connection.cursor()
print("Connected to DB: %s" % db_name)
# Query to select all categories of product
query = """select c.category_id, c.category_name
from categories as c;"""
my_cursor.execute(query)
result = my_cursor.fetchall() # this is a list with db records where each record is a tuple
my_cursor.close()
print(result)
return result
except Exception:
raise DbConnectionError("Failed to read data from DB")
finally:
if db_connection:
db_connection.close()
print("DB connection is closed")
# Get all products in a specific category
def get_products_by_cat(category):
try:
db_name = "online_shop"
db_connection = _connect_to_specific_db(db_name)
my_cursor = db_connection.cursor()
print("Connected to DB: %s" % db_name)
# Query to select all product within a given category
query = """select p.product_id, p.product_name, c.category_name, p.price, p.stock_quantity
from products as p
inner join categories as c on p.product_category = c.category_id
where c.category_name = '{category}'
order by p.product_name;""".format(category=category)
my_cursor.execute(query)
result = my_cursor.fetchall() # this is a list with db records where each record is a tuple
my_cursor.close()
print(result)
return result
except Exception:
raise DbConnectionError("Failed to read data from DB")
finally:
if db_connection:
db_connection.close()
print("DB connection is closed")
# Add a new product to the database
def insert_new_product(record):
try:
# connect to db
db_name = "online_shop"
db_connection = _connect_to_specific_db(db_name)
my_cursor = db_connection.cursor()
# query
query = """INSERT INTO products ({}) VALUES ('{}', '{}', '{}', {})""".format(
', '.join(record.keys()),
record['product_name'],
record['product_category'],
record['price'],
record['stock_quantity'],
)
my_cursor.execute(query)
db_connection.commit() # VERY IMPORTANT, otherwise, rows would not be added or reflected in the DB!
my_cursor.close()
except Exception:
raise DbConnectionError()
finally:
if db_connection:
db_connection.close()
print("DB connection is closed")
print("{} added to database".format(record['product_name']))
# Add a new review to the 'reviews' table
def add_review(review):
try:
# connect to db
db_name = "online_shop"
db_connection = _connect_to_specific_db(db_name)
my_cursor = db_connection.cursor()
# query
query = """INSERT INTO reviews ({}) VALUES ('{}', '{}', '{}')""".format(
', '.join(review.keys()),
review['product_id'],
review['rating'],
review['review_text'],
)
my_cursor.execute(query)
db_connection.commit()
my_cursor.close()
except Exception:
raise DbConnectionError("Failed to read data from DB")
finally:
if db_connection:
db_connection.close()
print("DB connection is closed")
print("Review for product ID: {} added to database".format(review['product_id']))
# Get all customer records
def get_all_customer_details():
try:
#connect to db
db_name="online_shop"
db_connection = _connect_to_specific_db(db_name)
my_cursor = db_connection.cursor()
print("Connected to DB: %s" % db_name)
query = "SELECT * from online_shop.customers"
my_cursor.execute(query)
result = my_cursor.fetchall()
my_cursor.close()
print(result)
return result
except Exception:
raise DbConnectionError("Fail to read data from DB")
finally:
if db_connection:
db_connection.close()
print("DB connection is closed")
# Get the record of a specific customer
def get_specific_customer_detail(email_address):
try:
db_name = "online_shop"
db_connection = _connect_to_specific_db(db_name)
my_cursor = db_connection.cursor()
print("Connected to DB: %s" % db_name)
# Query to select all product within a given category
query = """select * from online_shop.customers
where email_address = '{email_address}';""".format(email_address=email_address)
print(query)
my_cursor.execute(query)
result = my_cursor.fetchall() # this is a list with db records where each record is a tuple
my_cursor.close()
print(result)
return result
except Exception:
raise DbConnectionError("Failed to read data from DB")
finally:
if db_connection:
db_connection.close()
print("DB connection is closed")
# Add a new customer record to the database
def insert_new_customer(record):
try:
# connect to db
db_name = "online_shop"
db_connection = _connect_to_specific_db(db_name)
my_cursor = db_connection.cursor()
# query
query = """INSERT INTO customers ({}) VALUES ('{}', '{}', '{}', '{}', '{}', '{}', '{}')""".format(
', '.join(record.keys()),
record['first_name'],
record['last_name'],
record['email_address'],
record['address1'],
record['address2'],
record['postcode'],
record['mobile'],
)
my_cursor.execute(query)
db_connection.commit() # VERY IMPORTANT, otherwise, rows would not be added or reflected in the DB!
my_cursor.close()
except Exception:
raise DbConnectionError()
finally:
if db_connection:
db_connection.close()
print("DB connection is closed")
print("{} added to database".format(record['email_address']))
# Testing the functions work as intended
# _connect_to_specific_db(db_name)
# _show_all_table_in_specific_db(db_name)
#get_all_records()
#get_categories()
#get_products_by_cat("Plant")
#get_specific_customer_details("daisy@hotmail.com")
# Sample records for testing purposes
# testrecord = {
# 'product_name': 'Gooseberry bush',
# 'product_category': 1,
# 'price': 5.00,
# 'stock_quantity': 35}
# insert_new_product(testrecord)
# testreview = {
# 'product_id': 10,
# 'rating': 5,
# 'review_text': 'Great compost for germinating seedings.'}
# add_review(testreview)
# IF TIME - get timestamp working in add_review
# # review_date = datetime.now().strftime('%Y-%m-%d %H:%M:%S')