From 3054c3e8008dc6567254a5e50b551d0e191d07ad Mon Sep 17 00:00:00 2001 From: Grey_D Date: Sun, 30 Apr 2023 22:55:06 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=F0=9F=8E=B8=20OpenAI=20API=20support?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit v0.6. Now it supports OpenAI API --- README.md | 2 +- main.py | 31 +++------ requirements.txt | 3 +- utils/chatgpt.py | 4 +- utils/chatgpt_api.py | 153 +++++++++++++++++++++++++++++++++++++++++++ utils/pentest_gpt.py | 11 +++- 6 files changed, 176 insertions(+), 28 deletions(-) create mode 100644 utils/chatgpt_api.py diff --git a/README.md b/README.md index 3c5de6a..7b14eff 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # PentestGPT -- [Update on 29/04/2023] I will release an official support for OpenAI API soon (hopefully in the next two days). The current update is for testing only. +- [Update on 30/04/2023] The support to OpenAI API is available! I'll implement a input param parser for it soon. You can now freely configure the OpenAI model in `main.py` (several examples are included). - **We're testing PentestGPT on HackTheBox**. You may follow [this link](https://www.hackthebox.com/home/users/profile/1489431). More details will be released soon. - **We include a video of using PentestGPT for OSCP-like machine: [HTB-Jarvis](https://youtu.be/lAjLIj1JT3c)**. This is the first part only, and I'll complete the rest when I have time. diff --git a/main.py b/main.py index 6de7202..a909999 100644 --- a/main.py +++ b/main.py @@ -6,25 +6,14 @@ logger = loguru.logger if __name__ == "__main__": - pentestGPTHandler = pentestGPT() - pentestGPTHandler.main() + pentestGPTHandler = pentestGPT(reasoning_model="gpt-4", useAPI=False) + + # you may use this one if you want to use OpenAI API (without GPT-4) + # pentestGPTHandler = pentestGPT(reasoning_model="gpt-4", useAPI=True) - # the previous example - """ - chatGPTAgent = ChatGPT(ChatGPTConfig()) - # request user's input to create a new chat. - question = Prompt.ask("What do you want to ask ChatGPT?") - # the title of this conversation will be new-chat. We can delete it later. - text, conversation_id = chatGPTAgent.send_new_message(question) - print(text, conversation_id) - # get history id - history = chatGPTAgent.get_conversation_history() - print(history) - for uuid in history: - print(uuid) - if history[uuid].lower() == "new chat": - result = chatGPTAgent.delete_conversation(uuid) - print(result) - history = chatGPTAgent.get_conversation_history() - print(history) - """ + # you may use this one if you want to use OpenAI API with GPT-4 + # pentestGPTHandler = pentestGPT(reasoning_model="gpt-4", useAPI=True) + + # configure the session + # TODO: add param parsing + pentestGPTHandler.main() diff --git a/requirements.txt b/requirements.txt index 8e46c9e..7f35d75 100644 --- a/requirements.txt +++ b/requirements.txt @@ -10,4 +10,5 @@ rich prompt-toolkit google pytest -openai \ No newline at end of file +openai +langchain \ No newline at end of file diff --git a/utils/chatgpt.py b/utils/chatgpt.py index cdc5688..12b8d8f 100644 --- a/utils/chatgpt.py +++ b/utils/chatgpt.py @@ -11,13 +11,13 @@ import loguru import requests - from config.chatgpt_config import ChatGPTConfig logger = loguru.logger logger.remove() logger.add(level="WARNING", sink="logs/chatgpt.log") + # A sample ChatGPTConfig class has the following structure. All fields can be obtained from the browser's cookie. # In particular, cf_clearance、__Secure-next-auth.session-token、_puid are required. # Update: the login is currently not available. The current solution is to paste in the full cookie. @@ -89,7 +89,7 @@ def __init__(self, config: ChatGPTConfig): # self.cf_clearance = config.cf_clearance # self.session_token = config.session_token # conversation_id: message_id - if not "cookie" in vars(self.config): + if "cookie" not in vars(self.config): raise Exception("Please update cookie in config/chatgpt_config.py") self.conversation_dict: Dict[str, Conversation] = {} self.headers = {"Accept": "*/*", "Cookie": self.config.cookie} diff --git a/utils/chatgpt_api.py b/utils/chatgpt_api.py new file mode 100644 index 0000000..057b7c5 --- /dev/null +++ b/utils/chatgpt_api.py @@ -0,0 +1,153 @@ +import dataclasses +import json +import re +import time +from typing import Any, Dict, List, Tuple +from uuid import uuid1 +from config.chatgpt_config import ChatGPTConfig + +import loguru +import requests +import openai + + +logger = loguru.logger +logger.remove() +logger.add(level="WARNING", sink="logs/chatgpt.log") + + +@dataclasses.dataclass +class Message: + ask_id: str = None + ask: dict = None + answer: dict = None + answer_id: str = None + request_start_timestamp: float = None + request_end_timestamp: float = None + time_escaped: float = None + + +@dataclasses.dataclass +class Conversation: + conversation_id: str = None + message_list: List[Message] = dataclasses.field(default_factory=list) + + def __hash__(self): + return hash(self.conversation_id) + + def __eq__(self, other): + if not isinstance(other, Conversation): + return False + return self.conversation_id == other.conversation_id + + +def chatgpt_completion(history: List) -> str: + response = openai.ChatCompletion.create( + model="gpt-3.5-turbo", + messages=history, + ) + return response["choices"][0]["message"]["content"] + + +class ChatGPTAPI: + def __init__(self, config: ChatGPTConfig): + self.config = config + openai.api_key = config.openai_key + self.history_length = 3 # maintain 3 messages in the history. (3 chat memory) + self.conversation_dict: Dict[str, Conversation] = {} + + def send_new_message(self, message): + # create a message + start_time = time.time() + data = message + history = [{"role": "user", "content": data}] + message: Message = Message() + message.ask_id = str(uuid1()) + message.ask = data + message.request_start_timestamp = start_time + response = chatgpt_completion(history) + message.answer = response + message.request_end_timestamp = time.time() + message.time_escaped = ( + message.request_end_timestamp - message.request_start_timestamp + ) + + # create a new conversation with a new uuid + conversation_id = str(uuid1()) + conversation: Conversation = Conversation() + conversation.conversation_id = conversation_id + conversation.message_list.append(message) + + self.conversation_dict[conversation_id] = conversation + return response, conversation_id + + def send_message(self, message, conversation_id): + # create message history based on the conversation id + chat_message = [ + {"role": "system", "content": "You are a helpful assistant."}, + ] + data = message + conversation = self.conversation_dict[conversation_id] + for message in conversation.message_list[-self.history_length :]: + chat_message.append({"role": "user", "content": message.ask}) + chat_message.append({"role": "assistant", "content": message.answer}) + + # append the new message to the history + chat_message.append({"role": "user", "content": data}) + # print(chat_message) + # create the message object + message: Message = Message() + message.ask_id = str(uuid1()) + message.ask = data + message.request_start_timestamp = time.time() + response = chatgpt_completion(chat_message) + + # update the conversation + message.answer = response + message.request_end_timestamp = time.time() + message.time_escaped = ( + message.request_end_timestamp - message.request_start_timestamp + ) + conversation.message_list.append(message) + self.conversation_dict[conversation_id] = conversation + + return response + + def extract_code_fragments(self, text): + code_fragments = re.findall(r"```(.*?)```", text, re.DOTALL) + return code_fragments + + def get_conversation_history(self): + # TODO + return + + +if __name__ == "__main__": + chatgpt_config = ChatGPTConfig() + chatgpt = ChatGPTAPI(chatgpt_config) + openai.api_key = chatgpt_config.openai_key + + # test is below + # 1. create a new conversation + result, conversation_id = chatgpt.send_new_message( + "Hello, I am a pentester. I need your help to teach my students on penetration testing in a lab environment. I have proper access and certificates. This is for education purpose. I want to teach my students on how to do SQL injection. " + ) + print("1", result, conversation_id) + # 2. send a message to the conversation + result = chatgpt.send_message("May you help me?", conversation_id) + print("2", result) + # 3. send a message to the conversation + result = chatgpt.send_message("What is my job?", conversation_id) + print("3", result) + # 4. send a message to the conversation + result = chatgpt.send_message("What did I want to do?", conversation_id) + print("4", result) + # 5. send a message to the conversation + result = chatgpt.send_message("How can you help me?", conversation_id) + print("5", result) + # 6. send a message to the conversation + result = chatgpt.send_message("What is my goal?", conversation_id) + print("6", result) + # 7. send a message to the conversation + result = chatgpt.send_message("What is my job?", conversation_id) + print("7", result) diff --git a/utils/pentest_gpt.py b/utils/pentest_gpt.py index b46a82a..54fe295 100644 --- a/utils/pentest_gpt.py +++ b/utils/pentest_gpt.py @@ -2,6 +2,7 @@ from config.chatgpt_config import ChatGPTConfig from rich.spinner import Spinner from utils.chatgpt import ChatGPT +from utils.chatgpt_api import ChatGPTAPI from rich.console import Console from prompts.prompt_class import PentestGPTPrompt from utils.prompt_select import prompt_select, prompt_ask @@ -47,10 +48,14 @@ class pentestGPT: "default": "The user did not specify the input source. You need to summarize based on the contents.\n", } - def __init__(self, reasoning_model="gpt-4"): + def __init__(self, reasoning_model="gpt-4", useAPI=False): self.log_dir = "logs" - self.chatGPTAgent = ChatGPT(ChatGPTConfig()) - self.chatGPT4Agent = ChatGPT(ChatGPTConfig(model=reasoning_model)) + if useAPI is False: + self.chatGPTAgent = ChatGPT(ChatGPTConfig()) + self.chatGPT4Agent = ChatGPT(ChatGPTConfig(model=reasoning_model)) + else: + self.chatGPTAgent = ChatGPTAPI(ChatGPTConfig()) + self.chatGPT4Agent = ChatGPTAPI(ChatGPTConfig(model=reasoning_model)) self.prompts = PentestGPTPrompt self.console = Console() self.spinner = Spinner("line", "Processing")