forked from Varsha-1605/SocioSell
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdatabase_setup.py
354 lines (318 loc) · 17.7 KB
/
database_setup.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
from pymongo import MongoClient, ASCENDING
from pymongo.errors import ConnectionFailure, ServerSelectionTimeoutError
from pymongo.server_api import ServerApi
import logging
import os
import time
from models.listing import ProductListing
from models.analytics import Analytics, SalesPerformance, CustomerBehavior, MarketingMetrics, Demographics
from models.review import RecentReview
from models.videoListing import VideoListing, ProductLink
from models.analyticsVideo import VideoAnalytics, VideoAudience, VideoEngagement, VideoPerformance
from image_data import(
sample_products,
product_listings,
product_reviews,
product_analytics,
)
from video_data import(
sample_videos,
video_listings,
video_analytics,
)
from dotenv import load_dotenv
load_dotenv()
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Configuration for retry and timeouts
MAX_RETRIES = 5
INITIAL_RETRY_DELAY = 1
MAX_RETRY_DELAY = 30
TIMEOUT_CONFIG = {
"connectTimeoutMS": 5000,
"socketTimeoutMS": 10000
}
def exponential_backoff(attempt):
"""Calculate exponential backoff delay."""
return min(INITIAL_RETRY_DELAY * (2 ** attempt), MAX_RETRY_DELAY)
def connect_to_mongodb():
"""Establish a MongoDB connection with retries and timeouts."""
uri = os.getenv("MONGODB_URL")
if not uri:
logger.error("MONGODB_URL is not set in the environment.")
raise ValueError("MONGODB_URL is missing")
for attempt in range(MAX_RETRIES):
try:
logger.info(f"Attempting to connect to MongoDB (Attempt {attempt + 1}/{MAX_RETRIES})")
client = MongoClient(uri, server_api=ServerApi('1'), **TIMEOUT_CONFIG)
# Verify connection
client.admin.command("ping")
logger.info("Connected to MongoDB successfully")
return client
except (ConnectionFailure, ServerSelectionTimeoutError) as e:
delay = exponential_backoff(attempt)
logger.warning(f"Connection failed (Attempt {attempt + 1}/{MAX_RETRIES}): {e}")
if attempt < MAX_RETRIES - 1:
logger.info(f"Retrying in {delay} seconds...")
time.sleep(delay)
else:
logger.error("Maximum retry attempts reached. Exiting...")
raise
except Exception as e:
logger.error(f"Unexpected error while connecting to MongoDB: {e}")
raise
def setup_product_database():
"""Setup product reference database with sample data"""
client = None
try:
client = connect_to_mongodb()
db = client.social_media_products
# Create Collections
product_collection = db["products"]
listing_collection = db["listings"]
analytics_collection = db["analytics"]
review_collection = db["reviews"]
video_collection = db["videos"]
video_listings_collection = db["video_listings"]
video_analytics_collection = db["video_analytics"]
# Delete indexes
product_collection.drop_indexes()
listing_collection.drop_indexes()
analytics_collection.drop_indexes()
review_collection.drop_indexes()
video_collection.drop_indexes()
video_listings_collection.drop_indexes()
video_analytics_collection.drop_indexes()
# Create indexes for products
product_collection.create_index([("id", ASCENDING)])
product_collection.create_index([("title", ASCENDING)])
product_collection.create_index([("category", ASCENDING)])
product_collection.create_index([("subcategory", ASCENDING)])
product_collection.create_index([("features", ASCENDING)])
product_collection.create_index([("price_range", ASCENDING)])
product_collection.create_index([("created_at", ASCENDING)])
product_collection.create_index([("updated_at", ASCENDING)])
# Create indexes for listings
listing_collection.create_index([("id", ASCENDING), ("product_id", ASCENDING)])
listing_collection.create_index([("product_id", ASCENDING), ("created_at", ASCENDING)])
listing_collection.create_index([("price", ASCENDING), ("updated_at", ASCENDING)])
listing_collection.create_index([("features", ASCENDING), ("title", ASCENDING)])
listing_collection.create_index([("id", ASCENDING)])
listing_collection.create_index([("title", ASCENDING)])
listing_collection.create_index([("price", ASCENDING)])
listing_collection.create_index([("features", ASCENDING)], name="features_index")
# Create indexes for analytics
analytics_collection.create_index([("id", ASCENDING), ("product_id", ASCENDING)])
analytics_collection.create_index([("product_id", ASCENDING), ("created_at", ASCENDING)])
analytics_collection.create_index([("id", ASCENDING)])
analytics_collection.create_index([("product_id", ASCENDING)], unique=True)
analytics_collection.create_index([("created_at", ASCENDING)])
analytics_collection.create_index([("updated_at", ASCENDING)])
analytics_collection.create_index([("sales_performance.total_sales", ASCENDING)])
analytics_collection.create_index([("sales_performance.revenue", ASCENDING)])
analytics_collection.create_index([("sales_performance.average_price", ASCENDING)])
analytics_collection.create_index([("customer_behavior.view_to_purchase_rate", ASCENDING)])
analytics_collection.create_index([("customer_behavior.repeat_purchase_rate", ASCENDING)])
analytics_collection.create_index([("customer_behavior.average_rating", ASCENDING)])
analytics_collection.create_index([("marketing_metrics.click_through_rate", ASCENDING)])
analytics_collection.create_index([("marketing_metrics.social_media_engagement", ASCENDING)])
# Create indexes for review
review_collection.create_index([("product_id", ASCENDING), ("rating", ASCENDING)])
review_collection.create_index([("user_id", ASCENDING), ("product_id", ASCENDING)])
review_collection.create_index([("id", ASCENDING)])
review_collection.create_index([("product_id", ASCENDING)])
review_collection.create_index([("user_id", ASCENDING)])
review_collection.create_index([("rating", ASCENDING)])
review_collection.create_index([("title", ASCENDING)])
review_collection.create_index([("verified_purchase", ASCENDING)])
review_collection.create_index([("created_at", ASCENDING)])
review_collection.create_index([("updated_at", ASCENDING)])
# Create indexes for video
video_collection.create_index([("title", ASCENDING), ("views", ASCENDING)])
video_collection.create_index([("views", ASCENDING), ("rating", ASCENDING)])
video_collection.create_index([("id", ASCENDING)])
video_collection.create_index([("title", ASCENDING)])
video_collection.create_index([("category", ASCENDING)])
video_collection.create_index([("subcategory", ASCENDING)])
video_collection.create_index([("duration", ASCENDING)])
video_collection.create_index([("views", ASCENDING)])
video_collection.create_index([("transcript_summary", "text")])
video_collection.create_index([("price_range", ASCENDING)])
video_collection.create_index([("created_at", ASCENDING)])
video_collection.create_index([("updated_at", ASCENDING)])
video_collection.create_index([("key_features", ASCENDING)])
video_collection.create_index([("highlights", ASCENDING)])
# Create indexes for video listing
video_listings_collection.create_index([("video_id", ASCENDING), ("id", ASCENDING)])
video_listings_collection.create_index([("id", ASCENDING)])
video_listings_collection.create_index([("video_id", ASCENDING)], unique=True)
video_listings_collection.create_index([("platform", ASCENDING)])
video_listings_collection.create_index([("title", ASCENDING)])
video_listings_collection.create_index([("views", ASCENDING)])
video_listings_collection.create_index([("rating", ASCENDING)])
video_listings_collection.create_index([("created_at", ASCENDING)])
video_listings_collection.create_index([("updated_at", ASCENDING)])
video_listings_collection.create_index([("product_links.price", ASCENDING)])
# Create indexes for video analytics
video_analytics_collection.create_index([("id", ASCENDING), ("video_id", ASCENDING)])
video_analytics_collection.create_index([("id", ASCENDING), ("video_id", ASCENDING)])
video_analytics_collection.create_index([("engagement.views", ASCENDING), ("engagement.likes", ASCENDING)])
video_analytics_collection.create_index([("performance.retention_rate", ASCENDING), ("performance.click_through_rate", ASCENDING)])
video_analytics_collection.create_index([("id", ASCENDING)])
video_analytics_collection.create_index([("video_id", ASCENDING)], unique=True)
video_analytics_collection.create_index([("created_at", ASCENDING)])
video_analytics_collection.create_index([("updated_at", ASCENDING)])
video_analytics_collection.create_index([("engagement.views", ASCENDING)])
video_analytics_collection.create_index([("engagement.likes", ASCENDING)])
video_analytics_collection.create_index([("engagement.comments", ASCENDING)])
video_analytics_collection.create_index([("engagement.average_watch_time", ASCENDING)])
video_analytics_collection.create_index([("audience.demographics", ASCENDING)])
video_analytics_collection.create_index([("audience.top_regions", ASCENDING)])
video_analytics_collection.create_index([("performance.retention_rate", ASCENDING)])
video_analytics_collection.create_index([("performance.click_through_rate", ASCENDING)])
logger.info("All indexes created successfully")
# Clear Existing Products
product_collection.delete_many({})
# Insert Sample Products
product_collection.insert_many([product.model_dump() for product in sample_products])
logger.info("sample_products inserted")
# Clear Existing Product Listings
listing_collection.delete_many({})
# Generate sample_product_listings
sample_product_listings = []
for listing in product_listings:
# Fetch the _id of the product from the database
result = product_collection.find_one({"title": listing["title"]}, {"_id": 1})
if result:
sample_product_listings.append(
ProductListing(
product_id=str(result["_id"]),
title=listing["title"],
price=listing["price"],
description=listing["description"],
features=listing["features"]
)
)
# Insert sample_product_listings
listing_collection.insert_many([listing.model_dump() for listing in sample_product_listings])
logger.info("sample_product_listings inserted")
# Clear Existing Analytics
analytics_collection.delete_many({})
# Generate sample_product_analytics
sample_product_analytics = []
for analytic in product_analytics:
# Fetch the _id of the product from the database
result = product_collection.find_one({"title": analytic["title"]}, {"_id": 1})
if result:
sample_product_analytics.append(
Analytics(
product_id=str(result["_id"]),
sales_performance=SalesPerformance(
total_sales=analytic["sales_performance"]["total_sales"],
revenue=analytic["sales_performance"]["revenue"],
average_price=analytic["sales_performance"]["average_price"],
growth_rate=analytic["sales_performance"]["growth_rate"]
),
customer_behavior=CustomerBehavior(
view_to_purchase_rate=analytic["customer_behavior"]["view_to_purchase_rate"],
cart_abandonment_rate=analytic["customer_behavior"]["cart_abandonment_rate"],
repeat_purchase_rate=analytic["customer_behavior"]["repeat_purchase_rate"],
average_rating=analytic["customer_behavior"]["average_rating"]
),
demographics=Demographics(
age_groups=analytic["demographics"]["age_groups"],
top_locations=analytic["demographics"]["top_locations"]
),
marketing_metrics=MarketingMetrics(
click_through_rate=analytic["marketing_metrics"]["click_through_rate"],
conversion_rate=analytic["marketing_metrics"]["conversion_rate"],
return_on_ad_spend=analytic["marketing_metrics"]["return_on_ad_spend"],
social_media_engagement=analytic["marketing_metrics"]["social_media_engagement"]
)
)
)
# Insert sample_product_analytics
analytics_collection.insert_many([analytic.model_dump() for analytic in sample_product_analytics])
logger.info("sample_product_analytics inserted")
# Clear Existing Reviews
review_collection.delete_many({})
# Generate sample_reviews
sample_reviews = []
for review in product_reviews:
# Fetch the _id of the product from the database
result = product_collection.find_one({"title": review["product_title"]}, {"_id": 1})
if result:
sample_reviews.append(
RecentReview(
product_id=str(result["_id"]),
user_id=review["user_id"],
rating=review["rating"],
title=review["title"],
comment=review["comment"],
verified_purchase=review["verified_purchase"]
)
)
# Insert sample_reviews
review_collection.insert_many([review.model_dump() for review in sample_reviews])
logger.info("sample_reviews inserted")
# Clear Existing Videos
video_collection.delete_many({})
# Insert sample_videos
video_collection.insert_many([video.model_dump() for video in sample_videos])
logger.info("sample_videos inserted")
# Clear Existing video_listings
video_listings_collection.delete_many({})
# Generate sample_video_listings
sample_video_listings = []
for video_listing in video_listings:
# Fetch the _id of the video from the database
result = video_collection.find_one({"title": video_listing["title"]}, {"_id": 1})
if result:
sample_video_listings.append(
VideoListing(
video_id=str(result["_id"]),
platform=video_listing["platform"],
title=video_listing["title"],
views=video_listing["views"],
rating=video_listing["rating"],
key_timestamps={
"00:00": "Introduction",
"02:00": "Main features",
"04:00": "Conclusion"
},
product_links=[
ProductLink(store="Amazon", price="$199"),
ProductLink(store="Best Buy", price="$205"),
]
)
)
# Insert sample_video_listings
video_listings_collection.insert_many([video_listing.model_dump() for video_listing in sample_video_listings])
logger.info("sample_video_listings inserted")
# Clear Existing video_analytics
video_analytics_collection.delete_many({})
# Generate sample_video_analytics
sample_video_analytics = []
for video_analytic in video_analytics:
# Fetch the _id of the video from the database
result = video_collection.find_one({"title": video_analytic["title"]}, {"_id": 1})
if result:
analytics = VideoAnalytics(
video_id=str(result["_id"]),
engagement=VideoEngagement(**video_analytic["engagement"]),
audience=VideoAudience(**video_analytic["audience"]),
performance=VideoPerformance(**video_analytic["performance"])
)
sample_video_analytics.append(analytics)
# Insert sample_video_analytics
video_analytics_collection.insert_many([video_analytic.model_dump() for video_analytic in sample_video_analytics])
logger.info("sample_video_analytics inserted")
except Exception as e:
logger.error(f"Error setting up database: {e}")
raise
finally:
if client:
client.close()
logger.info("MongoDB connection closed")
if __name__ == "__main__":
setup_product_database()