-
Notifications
You must be signed in to change notification settings - Fork 0
/
verify copy.py
255 lines (212 loc) · 8.19 KB
/
verify copy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
import sys
import os
import glob
import requests
import pandas as pd
import re
from concurrent.futures import ThreadPoolExecutor
import time
import asyncio
import aiohttp
import json
import random
# GMass API endpoint
GMASS_API_URL = "https://verify.gmass.co/verify"
i = 1
# Your GMass API Key
API_KEY = "04470b2e-2aa8-4e43-9a46-27cee7a48422"
GMASS_API_KEY = API_KEY
# Input and output directories
INPUT_FOLDER = sys.argv[1]
OUTPUT_FOLDER = sys.argv[2]
# Function to clean emails by removing junk data
def clean_email(email):
if isinstance(email["Email"], str):
cleaned_email = re.findall(r"[\w\.-]+@[\w\.-]+", email["Email"])
if cleaned_email[0]:
email["Email"] = cleaned_email[0]
else:
email["Email"] = None # Return the first extracted email
return email # Return None for non-string values or if no valid email is found
# Function to filter emails by removing verified emails
def filter_emails(emails_to_verify, verified_emails):
return [
obj
for obj in emails_to_verify
if all(obj["Email"] != obj2["Email"] for obj2 in verified_emails)
]
# Return the first extracted email
# make 2 fake emails
def make_fake_emails(original_email):
fake_emails = []
for _ in range(2):
random_number = random.randint(1000, 9999) # Generate a random 4-digit number
fake_email = f'{original_email.split("@")[0]}{random_number}@{original_email.split("@")[1]}'
fake_emails.append(fake_email)
return fake_emails
async def get_response(email):
# Build the API URL with the email and API key
url = f"{GMASS_API_URL}?email={email}&key={GMASS_API_KEY}"
try:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
# code block to be executed
result = json.loads(await response.text())
return result
except Exception as e:
print(f"Error {i} verifying {email}: {str(e)}")
return None
# Function to verify a single email using GMass API
async def verify_email(email):
all_emails = [email] + make_fake_emails(email)
tasks = []
for email in all_emails:
task = asyncio.ensure_future(get_response(email))
tasks.append(task)
results = await asyncio.gather(*tasks)
if results[0] is not None:
# Extract the verification status from the result
# verification_status = "PASS" if result.get("Success", False) and result.get("Valid", False) else "FAIL"
catch_all = False
if results[0]["Success"] == True:
if results[0]["Valid"] == True:
verification_status = "Valid"
if (
results[1]["Success"] == True
and results[2]["Success"] == True
and results[1]["Valid"] == True
and results[2]["Valid"] == True
):
catch_all = True
else:
verification_status = "Invalid"
else:
verification_status = "Could not verify"
result_str_lower = str(results[0]).lower()
filter = []
if result_str_lower.find("Barracuda") > -1:
filter.append("Barracuda")
if result_str_lower.find("Cloudfilter") > -1:
filter.append("Cloudfilter")
service = []
if result_str_lower.find("google") > -1:
service.append("google")
if result_str_lower.find("outlook") > -1:
service.append("outlook")
output_email = {}
output_email["Email"] = email
output_email["Verification Status"] = verification_status
output_email["Response"] = results[0]
output_email["Filter"] = ",".join(filter)
output_email["Service"] = ",".join(service)
output_email["Catch All"] = catch_all
return output_email
else:
output_email = {}
output_email["Email"] = email
output_email["Verification Status"] = "ERROR"
output_email["Response"] = ""
output_email["Filter"] = ""
output_email["Service"] = ""
output_email["Catch All"] = False
return output_email
# delete unverifed emails in verified emials
def remove_unverifed_emails(emails):
return [
email
for email in emails
if (
email["Verification Status"] == "Invalid"
and email["Verification Status"] == "Valid"
)
]
# Function to verify emails using multiple threads
async def verify_emails_parallel(input_filename, output_filename):
emails_to_verify = read_file(input_filename)
# Check if file exists
file_exists = os.path.exists(output_filename)
verified_emails = []
if file_exists:
verified_emails = read_file(output_filename)
verified_emails = remove_unverifed_emails(verified_emails)
results = []
# Clean the emails before verification
cleaned_emails = [clean_email(email) for email in emails_to_verify]
# Remove None values (invalid emails) from the list
cleaned_emails = [email for email in cleaned_emails if email["Email"]]
filtered_emails = filter_emails(cleaned_emails, verified_emails)
filtered_emails = list(
set(filtered_email["Email"] for filtered_email in filtered_emails)
)
print(f"{len(filtered_emails)} emails verifing...")
results = []
# valid_emails = re.findall(r'[\w\.-]+@[\w\.-]+', input_data["Email"])
chunk_size = 100
chunks = [
filtered_emails[i : i + chunk_size]
for i in range(0, len(filtered_emails), chunk_size)
]
print(len(chunks))
for index, chunk in enumerate(chunks):
tasks = []
for email_index, email in enumerate(chunk):
task = asyncio.ensure_future(verify_email(email))
tasks.append(task)
results = await asyncio.gather(*tasks)
print(chunk_size * (index + 1))
verified_emails += results
if index % 15 == 14:
output_data = pd.DataFrame(verified_emails)
output_data.to_csv(output_filename, index=False)
time.sleep(3600)
print("API LIMIT, sleeping...")
output_data = pd.DataFrame(verified_emails)
output_data.to_csv(output_filename, index=False)
# Function to process new CSV and XLSX files
async def process_new_files():
while True:
supported_file_extensions = (".csv", ".xlsx")
files_to_process = []
for ext in supported_file_extensions:
files_to_process.extend(glob.glob(os.path.join(INPUT_FOLDER, f"*{ext}")))
print("Files: ", files_to_process)
for file_to_process in files_to_process:
# historicalSize = -1
# while historicalSize != os.path.getsize(file_to_process):
# historicalSize = os.path.getsize(file_to_process)
# time.sleep(2)
# # print(os.path.getsize(file_to_process))
# Generate output filename
output_file = os.path.join(OUTPUT_FOLDER, f"verified_emails.csv")
# Verify emails using multiple threads and save results
await verify_emails_parallel(file_to_process, output_file)
original_file = os.path.join(
OUTPUT_FOLDER, "verified", os.path.basename(file_to_process)
)
if os.path.exists(original_file):
os.remove(
original_file
) # Delete the existing file in the output folder
# Move the processed file to another directory
os.rename(file_to_process, original_file)
break
# time.sleep(3600) # Sleep for an hour
def read_file(file_name):
while True:
try:
if file_name.endswith(".csv"):
input_data = pd.read_csv(file_name, encoding="latin-1")
elif file_name.endswith(".xlsx"):
input_data = pd.read_excel(file_name)
else:
print(f"Unsupported file format: {file_name}")
return []
df = pd.DataFrame(input_data)
return df.to_dict(orient="records")
except:
time.sleep(5)
async def main():
print("verifying...")
await process_new_files()
print("verified...")
asyncio.run(main())