-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathweb_tool.py
128 lines (108 loc) · 5.57 KB
/
web_tool.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# pydantic_googleaddon/web_tool.py
import requests
from bs4 import BeautifulSoup
from typing import Dict, Any, List
from urllib.parse import urlparse, quote_plus
class WebTool:
DEBUG = True # Class variable for debug mode
@classmethod
def log_debug(cls, message):
if cls.DEBUG:
print(f"DEBUG: {message}")
def __init__(self, num_results: int = 10, max_tokens: int = 4096):
self.num_results = num_results
self.max_tokens = max_tokens
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,/;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Referer': 'https://www.google.com/',
'DNT': '1',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
}
self.log_debug("WebTool initialized")
def search(self, query: str) -> List[Dict[str, Any]]:
self.log_debug(f"Performing web search for: {query}")
search_results = self._perform_web_search(query)
filtered_results = self._filter_search_results(search_results)
deduplicated_results = self._remove_duplicates(filtered_results)
self.log_debug(f"Found {len(deduplicated_results)} unique results")
return deduplicated_results[:self.num_results]
def _perform_web_search(self, query: str) -> List[Dict[str, Any]]:
encoded_query = quote_plus(query)
search_url = f"https://www.google.com/search?q={encoded_query}&num={self.num_results * 2}"
self.log_debug(f"Search URL: {search_url}")
try:
self.log_debug("Sending GET request to Google")
response = requests.get(search_url, headers=self.headers, timeout=10)
self.log_debug(f"Response status code: {response.status_code}")
response.raise_for_status()
self.log_debug("Parsing HTML with BeautifulSoup")
soup = BeautifulSoup(response.text, 'html.parser')
self.log_debug("Searching for result divs")
search_results = []
for g in soup.find_all('div', class_='g'):
self.log_debug("Processing a search result div")
anchor = g.find('a')
title = g.find('h3').text if g.find('h3') else 'No title'
url = anchor.get('href', 'No URL') if anchor else 'No URL'
description = ''
description_div = g.find('div', class_=['VwiC3b', 'yXK7lf'])
if description_div:
description = description_div.get_text(strip=True)
else:
description = g.get_text(strip=True)
self.log_debug(f"Found result: Title: {title[:30]}..., URL: {url[:30]}...")
search_results.append({
'title': title,
'description': description,
'url': url
})
self.log_debug(f"Successfully retrieved {len(search_results)} search results for query: {query}")
return search_results
except requests.RequestException as e:
self.log_debug(f"Error performing search: {str(e)}")
raise # Re-raise the exception after logging
def _filter_search_results(self, results: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
filtered = [result for result in results if result['description'] and result['title'] != 'No title' and result['url'].startswith('https://')]
self.log_debug(f"Filtered to {len(filtered)} results")
return filtered
def _remove_duplicates(self, results: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
seen_urls = set()
unique_results = []
for result in results:
if result['url'] not in seen_urls:
seen_urls.add(result['url'])
unique_results.append(result)
self.log_debug(f"Removed duplicates, left with {len(unique_results)} results")
return unique_results
def get_web_content(self, url: str) -> str:
self.log_debug(f"Fetching content from: {url}")
try:
response = requests.get(url, headers=self.headers, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
for script in soup(["script", "style"]):
script.decompose()
text = soup.get_text()
lines = (line.strip() for line in text.splitlines())
chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
text = '\n'.join(chunk for chunk in chunks if chunk)
content = text[:self.max_tokens]
self.log_debug(f"Retrieved {len(content)} characters of content")
return content
except requests.RequestException as e:
self.log_debug(f"Error retrieving content from {url}: {str(e)}")
raise # Re-raise the exception after logging
def is_url(self, text: str) -> bool:
try:
result = urlparse(text)
return all([result.scheme, result.netloc])
except ValueError:
return False
def _clean_url(self, url: str) -> str:
url = url.rstrip(')') # Remove trailing parenthesis if present
if not url.startswith(('http://', 'https://')):
url = 'https://' + url # Add https:// if missing
return url