-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy path3searchPrices.py
107 lines (87 loc) · 4.03 KB
/
3searchPrices.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import json
import openai
import requests
import re
import os
from utils import *
# Configuration and Initialization
INDUSTRY_KEYWORD = os.getenv('INDUSTRY_KEYWORD')
WHOISJSONAPI = os.getenv('WHOISJSONAPI')
COMPAREPRICES = os.getenv('COMPAREPRICES')
SERP_PRICES_EXT = os.getenv('SERP_PRICES_EXT') or exit("SERP_PRICES_EXT is not defined. Please define it in .env file if you want to use this script.")
DATA_FOLDER = f"data/{INDUSTRY_KEYWORD}"
BASE_GPTV= os.environ.get('BASE_GPTV','gpt-3.5-turbo-0125')
OPENAI_API_KEY = os.environ.get('MY_OPENAI_KEY', os.environ.get('OPENAI_API_KEY_DEFAULT'))
if not OPENAI_API_KEY.startswith('sk-'):
OPENAI_API_KEY = os.environ.get('OPENAI_API_KEY_DEFAULT')
openai.api_key = OPENAI_API_KEY # Set the OpenAI API key
def find_link_to_plans(serp_content, domain_data):
""" Use GPT to find the link to the plans page from SERP content. """
print(serp_content)
try:
response = openai.ChatCompletion.create(
model=BASE_GPTV, # Update this to the model you're using
response_format={"type": "json_object"},
temperature=0,
messages=[
{"role": "system", "content": "Find the link to the page with "+SERP_PRICES_EXT+" for "+INDUSTRY_KEYWORD+". Return JSON with 'url' field, or return 'Not found' if not found and no chances that another page like faq can contain required information."},
{"role": "user", "content": serp_content}
]
)
if response['choices'][0]['message']['content']:
ch = response['choices'][0]['message']['content']
urls = json.loads(ch)
else:
urls = "Not found"
return urls
except Exception as e:
print(f"An error occurred: {e}")
return 'Not found'
def process_domain_data(domain, domain_data):
""" Process the data for a single domain. """
query = f"site:{domain} {SERP_PRICES_EXT}"
organic_results = search_companies_on_google(query, 10)
serp_content = "\n\n".join([
f"{result.get('position', 'N/A')}. link: {result.get('link', '')}, "
f"text: {result.get('title', '')}, "
f"snippet: {result.get('snippet', '')}"
for result in organic_results
])
# Determine the plans URL based on the number of search results
if organic_results:
urls = find_link_to_plans(serp_content, domain_data)
return urls
else:
return 'Not found'
def main():
print("Starting the 3searchPrices.py script...")
# Load data
data = load_from_json_file("1companies.json", DATA_FOLDER)
# Filter only data with nature=single project and not yet crawled
data = {k: v for k, v in data.items() if v['nature'] == 'single project' and 'priceAndPlansCrawled' not in v }
print(len(data), "domains to process.")
for domain, domain_data in data.items():
print(f"\n\nProcessing prices {domain}...")
plans_url = process_domain_data(domain, domain_data)
# Handle found plans URL
#if plans_url['url'] returned (with 'url' field) then correct it
if isinstance(plans_url, dict) and 'url' in plans_url:
plans_url = correct_url(plans_url['url'])
domain_data["priceAndPlansCrawled"] = plans_url
if 'Not foun' not in plans_url:
print(f"Crawling {plans_url}...")
summary = extract_content(plans_url)
if (len(summary['text_content']) < 600):
print ("try to extract from web archive")
url1 = get_wayback_url(domain)
if (url1 is not None):
summary = extract_content(url1)
details = load_from_json_file(f"{domain}.json", DATA_FOLDER)
details["priceAndPlans"] = summary['text_content']
save_to_json_file(details, f"{domain}.json", DATA_FOLDER)
else:
domain_data["priceAndPlansCrawled"] = plans_url
save_to_json_file(data, "1companies.json", DATA_FOLDER)
print("Processing complete. Next step: 5")
if __name__ == "__main__":
main()