|
16 | 16 |
|
17 | 17 | openai.api_key = os.getenv("OPENAI_API_KEY")
|
18 | 18 |
|
| 19 | + |
19 | 20 | def download_page(url, user_agent):
|
20 |
| - print(Fore.BLUE + "Downloading webpage content...") |
21 |
| - headers = {'User-Agent': user_agent} |
22 |
| - response = requests.get(url, headers=headers) |
23 |
| - response.raise_for_status() |
24 |
| - soup = BeautifulSoup(response.text, 'html.parser') |
25 |
| - lines = [line.strip() for line in soup.get_text(separator='\n').split('\n') if line.strip()] |
26 |
| - noemptylines = "\n".join(lines) |
27 |
| - |
28 |
| - return noemptylines |
| 21 | + print(Fore.BLUE + "Downloading webpage content...") |
| 22 | + headers = {'User-Agent': user_agent} |
| 23 | + response = requests.get(url, headers=headers) |
| 24 | + response.raise_for_status() |
| 25 | + soup = BeautifulSoup(response.text, 'html.parser') |
| 26 | + lines = [line.strip() for line in soup.get_text( |
| 27 | + separator='\n').split('\n') if line.strip()] |
| 28 | + noemptylines = "\n".join(lines) |
| 29 | + |
| 30 | + return noemptylines |
| 31 | + |
29 | 32 |
|
30 | 33 | def download_images(soup, base_url):
|
31 |
| - print(Fore.BLUE + "Downloading images...") |
32 |
| - images = soup.find_all('img') |
33 |
| - downloaded_image_paths = [] |
34 |
| - for img in images: |
35 |
| - img_url = urljoin(base_url, img.get('src')) |
36 |
| - if not img_url: |
37 |
| - continue |
38 |
| - img_response = requests.get(img_url) |
39 |
| - img_name = os.path.basename(img_url) |
40 |
| - img_path = os.path.join(os.getcwd(), img_name) |
41 |
| - with open(img_path, 'wb') as f: |
42 |
| - f.write(img_response.content) |
43 |
| - downloaded_image_paths.append(img_path) |
44 |
| - return downloaded_image_paths |
| 34 | + print(Fore.BLUE + "Downloading images...") |
| 35 | + images = soup.find_all('img') |
| 36 | + # This list will store the paths of downloaded images. |
| 37 | + downloaded_image_paths = [] |
| 38 | + for img in images: |
| 39 | + img_url = urljoin(base_url, img.get('src')) |
| 40 | + if not img_url: |
| 41 | + continue # Skip if img['src'] is None |
| 42 | + img_response = requests.get(img_url) |
| 43 | + img_name = os.path.basename(img_url) |
| 44 | + img_path = os.path.join(os.getcwd(), img_name) |
| 45 | + with open(img_path, 'wb') as f: |
| 46 | + f.write(img_response.content) |
| 47 | + downloaded_image_paths.append(img_path) # Add the path to the list |
| 48 | + return downloaded_image_paths # Return the list of downloaded image paths |
| 49 | + |
45 | 50 |
|
46 | 51 | def ocr_image(image_path):
|
47 |
| - print(Fore.BLUE + f"Performing OCR on {image_path}...") |
48 |
| - return pytesseract.image_to_string(Image.open(image_path)) |
| 52 | + print(Fore.BLUE + f"Performing OCR on {image_path}...") |
| 53 | + return pytesseract.image_to_string(Image.open(image_path)) |
| 54 | + |
49 | 55 |
|
50 | 56 | def parse_openai_response_to_iocs(text):
|
51 |
| - """Parses the OpenAI text response into a list of IOC dictionaries.""" |
52 |
| - iocs = [] |
53 |
| - lines = [line for line in text.split('\n') if line.strip()] |
54 |
| - for line in text.split("\n"): |
55 |
| - parts = line.split(" : ") |
56 |
| - if len(parts) == 3: |
57 |
| - iocs.append({ |
58 |
| - "Indicator Type": parts[0].strip(), |
59 |
| - "Indicator": parts[1].strip(), |
60 |
| - "Context": parts[2].strip(), |
61 |
| - }) |
62 |
| - return iocs |
| 57 | + """Parses the OpenAI text response into a list of IOC dictionaries.""" |
| 58 | + iocs = [] |
| 59 | + lines = [line for line in text.split('\n') if line.strip()] |
| 60 | + for line in text.split("\n"): |
| 61 | + parts = line.split(" : ") |
| 62 | + if len(parts) == 3: |
| 63 | + iocs.append({ |
| 64 | + "Indicator Type": parts[0].strip(), |
| 65 | + "Indicator": parts[1].strip(), |
| 66 | + "Context": parts[2].strip(), |
| 67 | + }) |
| 68 | + # else: |
| 69 | + # print(Fore.RED + "Response format incorrect, resubmitting request...") |
| 70 | + # return None |
| 71 | + return iocs |
| 72 | + |
63 | 73 |
|
64 | 74 | def extract_iocs_with_openai(content, context_identifier, retry_count=0, max_retries=1):
|
65 |
| - if retry_count > max_retries: |
66 |
| - print(Fore.RED + "Maximum retry limit reached. Moving on without additional retries.") |
67 |
| - return [] |
68 |
| - |
69 |
| - system_prompt = "Return only as requested, without comments or code blocks, only as plain text. If nothing is found return: 'No IOCs found in the provided text'" |
70 |
| - prompt = (f"Extract all IOCs (IP addresses, domain names, email addresses, email subject, file name, useragent strings, urls, usernames, passwords, SHA and MD5 hashes, and so on) from the following text and format each as 'indicator type : indicator : context'. Context should be information that surrounds the IOC, such as the filename assocaited to the hash, the type of infrastructure that eixsts on an IPv4 address, and so on. {content}") |
71 |
| - message=[{"role": "assistant", "content": system_prompt }, {"role": "user", "content": prompt}] |
72 |
| - |
73 |
| - try: |
74 |
| - response = openai.chat.completions.create( |
75 |
| - model="gpt-4-turbo-preview", |
76 |
| - messages=message, |
77 |
| - temperature=0.3, |
78 |
| - max_tokens=4096, |
79 |
| - top_p=1.0, |
80 |
| - frequency_penalty=0.0, |
81 |
| - presence_penalty=0.0 |
82 |
| - ) |
83 |
| - iocs_text = response.choices[0].message.content |
84 |
| - print(iocs_text) |
85 |
| - iocs = parse_openai_response_to_iocs(iocs_text) |
86 |
| - if iocs is None: |
87 |
| - print(Fore.YELLOW + f"Retry {retry_count+1}/{max_retries} due to format mismatch...") |
88 |
| - return extract_iocs_with_openai(content, context_identifier, retry_count+1, max_retries) |
89 |
| - return iocs |
90 |
| - except Exception as e: |
91 |
| - print(Fore.RED + f"An error occurred while querying OpenAI: {e}") |
92 |
| - return [] |
| 75 | + if retry_count > max_retries: |
| 76 | + print( |
| 77 | + Fore.RED + "Maximum retry limit reached. Moving on without additional retries.") |
| 78 | + return [] |
| 79 | + |
| 80 | + system_prompt = "Return only as requested, without comments or code blocks, only as plain text. If nothing is found return: 'No IOCs found in the provided text'" |
| 81 | + prompt = ( |
| 82 | + f"Extract all IOCs (IP addresses, domain names, email addresses, email subject, file name, useragent strings, urls, usernames, passwords, SHA and MD5 hashes, and so on) from the following text and format each as 'indicator type : indicator : context'. Context should be information that surrounds the IOC, such as the filename assocaited to the hash, the type of infrastructure that eixsts on an IPv4 address, and so on. {content}") |
| 83 | + message = [{"role": "assistant", "content": system_prompt}, |
| 84 | + {"role": "user", "content": prompt}] |
| 85 | + |
| 86 | + try: |
| 87 | + response = openai.chat.completions.create( |
| 88 | + model="gpt-4-turbo-preview", |
| 89 | + messages=message, |
| 90 | + temperature=0.3, |
| 91 | + max_tokens=4096, |
| 92 | + top_p=1.0, |
| 93 | + frequency_penalty=0.0, |
| 94 | + presence_penalty=0.0 |
| 95 | + ) |
| 96 | + iocs_text = response.choices[0].message.content |
| 97 | + print(iocs_text) |
| 98 | + iocs = parse_openai_response_to_iocs(iocs_text) |
| 99 | + if iocs is None: |
| 100 | + print( |
| 101 | + Fore.YELLOW + f"Retry {retry_count+1}/{max_retries} due to format mismatch...") |
| 102 | + return extract_iocs_with_openai(content, context_identifier, retry_count+1, max_retries) |
| 103 | + return iocs |
| 104 | + except Exception as e: |
| 105 | + print(Fore.RED + f"An error occurred while querying OpenAI: {e}") |
| 106 | + return [] |
| 107 | + |
93 | 108 |
|
94 | 109 | def cleanup_files(file_paths):
|
95 |
| - for path in file_paths: |
96 |
| - try: |
97 |
| - os.remove(path) |
98 |
| - print(Fore.GREEN + f"Successfully removed {path}.") |
99 |
| - except Exception as e: |
100 |
| - print(Fore.RED + f"Error removing file {path}: {e}") |
| 110 | + for path in file_paths: |
| 111 | + try: |
| 112 | + os.remove(path) |
| 113 | + print(Fore.GREEN + f"Successfully removed {path}.") |
| 114 | + except Exception as e: |
| 115 | + print(Fore.RED + f"Error removing file {path}: {e}") |
| 116 | + |
101 | 117 |
|
102 | 118 | def main(url, output_file, user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3", retry_limit=1):
|
103 |
| - print(Fore.GREEN + "Starting IOC extraction process...") |
104 |
| - page_content = download_page(url, user_agent) |
105 |
| - soup = BeautifulSoup(page_content, 'html.parser') |
106 |
| - text_content = soup.get_text() |
107 |
| - downloaded_images = download_images(soup, url) |
108 |
| - |
109 |
| - text_iocs = extract_iocs_with_openai(text_content, "text_content", 0, retry_limit) |
110 |
| - |
111 |
| - all_iocs = text_iocs[:] |
112 |
| - for img_path in downloaded_images: |
113 |
| - img_text = ocr_image(img_path) |
114 |
| - img_iocs = extract_iocs_with_openai(img_text, img_path, 0, retry_limit) |
115 |
| - all_iocs.extend(img_iocs) |
116 |
| - |
117 |
| - cleanup_files(downloaded_images) |
118 |
| - |
119 |
| - with open(output_file, 'w') as f: |
120 |
| - json.dump(all_iocs, f, indent=4) |
121 |
| - print(Fore.GREEN + "IOC extraction process completed.") |
| 119 | + print(Fore.GREEN + "Starting IOC extraction process...") |
| 120 | + page_content = download_page(url, user_agent) |
| 121 | + soup = BeautifulSoup(page_content, 'html.parser') |
| 122 | + text_content = soup.get_text() |
| 123 | + downloaded_images = download_images(soup, url) |
| 124 | + |
| 125 | + text_iocs = extract_iocs_with_openai( |
| 126 | + text_content, "text_content", 0, retry_limit) |
| 127 | + |
| 128 | + all_iocs = text_iocs[:] |
| 129 | + for img_path in downloaded_images: |
| 130 | + img_text = ocr_image(img_path) |
| 131 | + img_iocs = extract_iocs_with_openai(img_text, img_path, 0, retry_limit) |
| 132 | + all_iocs.extend(img_iocs) |
| 133 | + |
| 134 | + cleanup_files(downloaded_images) |
| 135 | + |
| 136 | + with open(output_file, 'w') as f: |
| 137 | + json.dump(all_iocs, f, indent=4) |
| 138 | + print(Fore.GREEN + "IOC extraction process completed.") |
| 139 | + |
122 | 140 |
|
123 | 141 | if __name__ == "__main__":
|
124 |
| - parser = argparse.ArgumentParser(description='Extract IOCs from a webpage.') |
125 |
| - parser.add_argument('--url', required=True, help='The URL of the webpage to analyze') |
126 |
| - parser.add_argument('--output', required=True, help='The JSON file to output') |
127 |
| - parser.add_argument('--retry-limit', type=int, default=3, help='The maximum number of retries for OpenAI requests') |
128 |
| - args = parser.parse_args() |
| 142 | + parser = argparse.ArgumentParser( |
| 143 | + description='Extract IOCs from a webpage.') |
| 144 | + parser.add_argument('--url', required=True, |
| 145 | + help='The URL of the webpage to analyze') |
| 146 | + parser.add_argument('--output', required=True, |
| 147 | + help='The JSON file to output') |
| 148 | + parser.add_argument('--retry-limit', type=int, default=3, |
| 149 | + help='The maximum number of retries for OpenAI requests') |
| 150 | + args = parser.parse_args() |
129 | 151 |
|
130 |
| - main(args.url, args.output) |
| 152 | + main(args.url, args.output) |
0 commit comments