-
Notifications
You must be signed in to change notification settings - Fork 0
/
app.py
executable file
·146 lines (120 loc) · 5.21 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
#!/usr/bin/env python3
import sys
import csv
import time
import logging
import requests
import argparse
from googlesearch import search
# Disable warnings from the requests library about SSL certificates
requests.packages.urllib3.disable_warnings()
def configure_logging(debug):
"""Configures logging to include error and debug information."""
if debug:
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s - %(levelname)s - %(message)s')
def parse_arguments():
"""Parses command-line arguments for the script.
Returns:
argparse.Namespace: Parsed arguments with input file, output file, and wait time.
"""
parser = argparse.ArgumentParser(
description="Fetch and update company data from Google search.")
parser.add_argument("-i", "--input-file", required=True,
help="File name containing the accounts")
parser.add_argument("-o", "--output-file", required=True,
help="File name to save updated data in")
parser.add_argument("-w", "--wait", type=int, default=5,
help="Time (seconds) to wait between each Google Search. Default is 5.")
parser.add_argument("-d", "--debug", default=False,
action="store_true", help="Print debug log messages")
parser.add_argument("-s", "--search-keywords", required=True, nargs="+")
return parser.parse_args()
def google_search(token, keyword, wait_time):
"""Performs a Google search for a given token and keyword.
Args:
token (str): The company name to search for.
keyword (str): The specific keyword to append to the search query.
wait_time (int): Time to wait between requests to prevent bot detection.
Returns:
The first search result if successful, None otherwise.
"""
try:
search_results = search(
f"{token} {keyword}", lang="de", advanced=True, num_results=1)
for result in search_results:
return result
except Exception as e:
logging.error(
"Failed to search Google. Check your internet connection or try again later.")
logging.debug(str(e))
return None
finally:
time.sleep(wait_time)
def count_csv_rows(file_path):
"""Counts the number of rows in a CSV file.
Args:
file_path (str): Path to the CSV file.
Returns:
int: Number of rows in the file.
"""
try:
with open(file_path, 'r', newline='') as file:
return sum(1 for _ in file)
except Exception as e:
logging.error(f"Failed to open source file. Error: {e}")
sys.exit(1)
def iterate_csv_file(input_file, output_file, keywords, wait_time):
"""Iterates over each row in the input CSV file, fetches data, and appends it to the output CSV file.
Args:
input_file (str): Path to the input CSV file.
output_file (str): Path to the output CSV file.
wait_time (int): Time to wait between requests to prevent bot detection.
"""
row_count = count_csv_rows(input_file)
with open(input_file, 'r', newline='') as file, open(output_file, 'w', newline='') as outfile:
print(f"Will research the following keywords per each account: {', '.join(keywords)}")
print(f"----------")
fieldnames = ['name'] + [item for sublist in [(word.lower().replace(
' ', '_'), f"{word.lower().replace(' ', '_')}_src") for word in keywords] for item in sublist]
reader = csv.DictReader(file, fieldnames=fieldnames)
writer = csv.DictWriter(outfile, fieldnames=fieldnames)
writer.writeheader()
for index, account in enumerate(reader):
print(
f"Progress [{index+1}/{row_count}] - Fetching and processing data for {account['name']}")
result = []
for keyword in keywords:
search_result = google_search(
account['name'], keyword, wait_time)
result.extend([search_result.description, search_result.url])
row_data = {'name': account['name']}
row_data.update({fieldnames[1 + i * 2]: result[i * 2]
for i in range(len(keywords))})
row_data.update(
{fieldnames[2 + i * 2]: result[i * 2 + 1] for i in range(len(keywords))})
if result:
writer.writerow(row_data)
outfile.flush()
print(f"----------")
def main():
"""Main function to execute the script."""
try:
args = parse_arguments()
configure_logging(args.debug)
print("Script started - Press CTRL + C to abort")
print(
f"Reading input file {args.input_file} and saving results to {args.output_file}")
print(
f"Waiting for {args.wait} seconds between each search request to avoid bot detection")
iterate_csv_file(args.input_file, args.output_file,
args.search_keywords, args.wait)
except KeyboardInterrupt:
print("Script execution aborted")
sys.exit()
except Exception as e:
print(f"Script execution failed. {e}")
sys.exit()
print("Script execution finished")
if __name__ == "__main__":
main()