forked from Py-Contributors/awesomeScripts
-
Notifications
You must be signed in to change notification settings - Fork 0
/
amazon_scraper.py
251 lines (211 loc) · 7.64 KB
/
amazon_scraper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
# Developed and maintained by https://github.com/sarthak1905
from bs4 import BeautifulSoup
from requests_html import HTMLSession
# These will be used to check for URL validity and exceptions if not valid
from django.core.validators import URLValidator
from django.core.exceptions import ValidationError
import os
import time
import smtplib
import ssl
from pyisemail import is_email
# CustomEmailValidationError will be used to pass an
# a message to the user if the email address isn't valid
class CustomEmailValidationError(ValueError):
def __init__(self, arg):
self.arg = arg
class Scraper:
# Initializes the scraper C3PO
def __init__(self, url, budget, u_email):
# Attributes about product
self.url = url
self.budget = budget
# Setting user email
self.u_email = u_email
# Attributes about scraping
self.session = HTMLSession()
self.webpage = self.session.get(self.url).content
self.parser = 'lxml'
self.soup = BeautifulSoup(self.webpage, self.parser)
# Prints the object
def __str__(self):
return self.soup.prettify()
# Stores the title of the product
def get_title(self):
try:
temp_title = self.soup.find('span', id='productTitle').text.strip()
temp_list_title = []
for x in temp_title:
if x == '(':
break
temp_list_title.append(x)
self.product_title = ''.join(temp_list_title)
return self.product_title
except Exception:
print("\n")
print("ERROR - We weren't able to find the name of the product")
print("\n")
print("Exiting the script")
exit()
# Stores the price of the product after filtering the string and
# converting it to an integer
def get_price(self):
price_raw = self.soup.find(
'span', id='priceblock_ourprice').text.strip()
price_filtered = price_raw[2:len(price_raw) - 3]
self.product_price = int(
''.join([x for x in price_filtered if x != ',']))
return
# Prints product title
def print_title(self):
print(self.product_title)
return
# Prints product price
def print_price(self):
print(self.product_price)
return
# Checks if the price of the product is below the budget
def is_below_budget(self):
if self.product_price <= self.budget:
return True
else:
return False
# Runs the scraper
def run(self):
self.get_title()
self.get_price()
self.alert = self.is_below_budget()
self.status = False
if self.alert:
self.status = self.send_email()
return self.status
# Sends an email when the condition is satisfied. Under testing!
def send_email(self):
# Attributes for email sending
port = 587
smtp_server = 'smtp.gmail.com'
self.email = str(os.environ.get('DEVELOPER_MAIL'))
self.app_pw = str(os.environ.get('DEVELOPER_PASS'))
# Message details
subject = f'The price of {self.get_title()} is within your budget!'
body_start = """Hey there!\n
The price is now within your budget. Here is the link, buy it now!\n"""
body_mid = self.url
body_end = '\n\nRegards\nYour friendly neighbourhood programmer'
body = str(body_start) + str(body_mid) + str(body_end)
message = f"Subject: {subject}\n\n{body}"
# Establishing server
context = ssl.create_default_context()
self.server = smtplib.SMTP(smtp_server, port)
# Mail sending
self.server.ehlo()
self.server.starttls(context=context)
self.server.ehlo()
self.server.login(self.email, self.app_pw)
self.server.sendmail(self.email, self.u_email, message)
print("Email sent successfully!")
self.server.quit()
return True
def main():
url = get_url()
budget = get_target_cost()
u_email = get_user_email()
time_choice = get_frequency()
if time_choice == 1:
time_delay = 60 * 60
elif time_choice == 2:
time_delay = 3 * 60 * 60
else:
time_delay = 6 * 60 * 60
msg = (
"Great! Now just sit back and relax."
"Minimize this program and be sure "
"that it is running.\nAdditionally, ensure that there"
"is stable internet connection "
"during the time this program runs.\nIf the price of the "
"product falls within your budget, "
"you will recieve an email regarding the same and this"
"program will auto-close.\nThank you for using "
"C3PO scraper! Beep-bop bop-beep.")
print(msg)
c3po = Scraper(url, budget, u_email)
while True:
if c3po.run():
break
time.sleep(time_delay)
# get_user_email validates that that an email address was
# entered. It checks that the host exists and if the host
# is a mail server by checking if there is an MX record
# Loops once on invalid input
def get_user_email(first=True):
try:
email = str(input("Enter your email:"))
validation = is_email(email.strip(), check_dns=True, diagnose=True)
diagnosis = validation.diagnosis_type
if (f'{diagnosis}' == 'VALID'):
return email
elif (f'{diagnosis}' == 'NODOMAIN'):
message = "The address you entered didn't include a server"
elif (f'{diagnosis}' == 'NOLOCALPART'):
message = "Your email address didn't include a username"
else:
message = "Please Try again"
raise CustomEmailValidationError(message)
except CustomEmailValidationError as error:
if (first is True):
print(error.arg)
get_user_email(first=False)
else:
print("ERROR: You didn't enter a valid email address")
exit()
# get_frequency validates the user input for how
# often the user wants the URL to be checked
def get_frequency(first=True):
inp_str = ("How frequently would you like to check the price?"
"\n1.Every hour\n2.Every 3 hours\n3.Every 6 hours"
"\nEnter your choice:")
try:
frequency = int(input(inp_str))
if (frequency < 0 or frequency > 3):
raise ValueError
return frequency
except ValueError:
if (first is True):
print("Please only select one of the provided options")
get_frequency(first=False)
else:
print("Error: You didn't select a valid option")
exit()
# get_target_cost validates price input from user
# Loops once on invalid input
def get_target_cost(first=True):
try:
target = int(input("Enter your budget price:"))
return target
except ValueError:
if (first is True):
print("Please enter only numbers; "
"not currency symbols.")
get_target_cost(first=False)
else:
print("ERROR: Your target price wasn't valid")
exit()
# get_url will get the Amazon URL from the user to
# scrape for a cost
def get_url(first=True):
while True:
try:
URL = input("Paste the link of the Amazon product:")
validate = URLValidator()
validate(URL)
return URL
except ValidationError:
if (first is True):
print("Please enter a valid URL; "
"Remember to include http/https")
first = False
else:
print("ERROR: You didn't enter a valid URL")
exit()
if __name__ == '__main__':
main()