From 88b3977937ace1194b009a4ee6d69b6e4b708c8f Mon Sep 17 00:00:00 2001 From: Andreas Happe Date: Thu, 17 Oct 2024 16:07:19 +0200 Subject: [PATCH] add initial version of a 'pentest task tree' - based upon pentestGPT - removed most of the tool metions so that the generated task tree should be generic and usable for other problems too - this is still a work-in-progress --- src/pentest_task_tree.py | 218 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 218 insertions(+) create mode 100644 src/pentest_task_tree.py diff --git a/src/pentest_task_tree.py b/src/pentest_task_tree.py new file mode 100644 index 0000000..671e90b --- /dev/null +++ b/src/pentest_task_tree.py @@ -0,0 +1,218 @@ + +import operator + +from dotenv import load_dotenv +from rich.console import Console +from rich.panel import Panel + +from langchain_core.prompts import PromptTemplate +from langchain_openai import ChatOpenAI + +from helper.common import get_or_fail +from helper.ui import print_event +from tools.ssh import get_ssh_connection_from_env, SshTestCredentialsTool, SshExecuteTool +from graphs.initial_version import create_chat_tool_agent_graph + +from typing import Annotated, List, Tuple, Union +from typing_extensions import TypedDict +from pydantic import BaseModel, Field + +from langchain_core.prompts import ChatPromptTemplate +from langgraph.graph import StateGraph, START, END + +# setup configuration from environment variables +load_dotenv() +get_or_fail("OPENAI_API_KEY") # langgraph will use this env variable itself +conn = get_ssh_connection_from_env() +conn.connect() + +# prepare console for debug output +console = Console() + +# the shared graph data structure +class PlanExecute(TypedDict): + input: str # the initial user-given objective + plan: str # the current task plan + next_step: str # the next operation to be tested by the agent + past_steps: Annotated[List[Tuple], operator.add] # past steps of the agent, also including a summary + response: str # response from the agent to the user + +# This is the common prefix used by both planner and replanner +# I used gelei's pentestGPT prompts from https://github.com/GreyDGL/PentestGPT/blob/main/pentestgpt/prompts/prompt_class_v2.py as a starting point. Mostly +# I removed tool-specific examples to not frame the LLM to move into a specific +# direction and tried to make it more generic. +COMMON_PREFIX = """You are given an objective by the user. You are required to strategize and create a tree-structured task plan that will allow to successfully solve the objective. Another worker will follow your task plan to complete the objective, and will report after each finished task back to you. You should use this feedback to update the task plan. + +When creating the task plan you must follow the following requirements: + +1. You need to maintain a task plan, which contains all potential tasks that should be investigated to solve the objective. The tasks should be in a tree structure because one task can be considered as a sub-task to another. +You can display the tasks in a layer structure, such as 1, 1.1, 1.1.1, etc. Initially, you should only generate the root tasks based on the initial information. In addition select the next task (as next_step) that should be executed by the tester. +""" + +# The Planner Prompt +planner_prompt = ChatPromptTemplate.from_messages( + [ + ( + "system", COMMON_PREFIX + """This plan should involve individual tasks, that if executed correctly will yield the correct answer. Do not add any superfluous steps but make sure that each step has all the information needed - do not skip steps.""", + ), + ("placeholder", "{messages}"), + ] +) + +# The Replanner Prompt +replanner_prompt = ChatPromptTemplate.from_template( + COMMON_PREFIX + """2. Each time you receive results from the worker you should +2.1 Analyze the message and see identify useful key information +2.2 Decide to add a new task or update a task information according to the findings. Only add steps to the plan that still NEED to be done. +2.3 Decide to delete a task if necessary. Do this if the task is not relevant for reaching the objective anymore. +2.4 From all the tasks, identify those that can be performed next. Analyze those tasks and decide which one should be performed next based on their likelihood to a successful exploit. Name this task as 'next_step'. + +Your objective was this: +{input} + +Your original task-plan was this: +{plan} + +You have currently done the follow tasks: +{past_steps} + +If no more steps are needed to solve the objective, then respond with that. Otherwise, return a new task-plan and the next step to execute. If you were not able to complete the task, stop after 15 planning steps and give a summary to the user. +""" +) + +# we are using structured responses so that we do not have to manually parse +# the output of the LLM + +class Plan(BaseModel): + """Plan to follow in future""" + + steps: str = Field( + description="the hierarchical task plan" + ) + + next_step: str = Field( + description = "The next task to perform." + ) + +class Response(BaseModel): + """Response to user.""" + response: str + +class Act(BaseModel): + """Action to perform.""" + + action: Union[Response, Plan] = Field( + description="Action to perform. If you want to respond to user, use Response. " + "If you need to further use tools to get the answer, use Plan." + ) + +# The executor agent that actually performs tasks on the connected target + +# configure a second LLM connection for the executor/agent_step +# this would allow us to use different LLMs for high-level planning +# and low-level execution. Also we could use different tools for the +# the different models, e.g., allow the high-level model to browse the web +# for newer attack ideas. +llm2 = ChatOpenAI(model="gpt-4o", temperature=0) +tools = [SshExecuteTool(conn), SshTestCredentialsTool(conn)] +llm2_with_tools = llm2.bind_tools(tools) + +# this will be called as a separate sub-graph/agent with it's own state +# and output +def execute_step(state: PlanExecute): + + task = state["next_step"] + + prompt = PromptTemplate.from_template(""" + You are a low-privilege user {username} with password {password} on a linux system and your goal is to become the root user by executing commands on the remote system. + + To achieve this, focus upon {task} + + Do not repeat already tried escalation attacks. You should focus upon enumeration and privilege escalation. If you were able to become root, describe the used method as final message. Stop after 5 executions. If not successful until then, give a summary of gathered facts. + """).format(username=conn.username, password=conn.password,task=task) + + # create our command executor/agent graph + graph_builder = create_chat_tool_agent_graph(llm2_with_tools, tools) + graph = graph_builder.compile() + + events = graph.stream( + {"messages": [("user", prompt)]}, + stream_mode='values' + ) + + agent_response = None + for event in events: + print_event(console, event) + agent_response = event + + return { + "past_steps": [(task, agent_response["messages"][-1].content)], + } + +# create the graph +def create_plan_and_execute_graph(llm, execute_step): + + def should_end(state: PlanExecute): + if "response" in state and state["response"]: + return END + else: + return "agent" + + def plan_step(state: PlanExecute): + planner = planner_prompt | llm.with_structured_output(Plan) + plan = planner.invoke({"messages": [("user", state["input"])]}) + return {"plan": plan.steps, "next_step": plan.next_step} + + def replan_step(state: PlanExecute): + + console.print(Panel(state["plan"], title="Plan")) + + replanner = replanner_prompt | llm.with_structured_output(Act) + output = replanner.invoke(state) + if isinstance(output.action, Response): + return {"response": output.action.response} + else: + console.print(Panel(output.action.next_step, title="Next Step")) + console.print(Panel(output.action.steps, title="New Plan")) + return {"plan": output.action.steps, "next_step": output.action.next_step} + + workflow = StateGraph(PlanExecute) + + # Add the nodes + workflow.add_node("planner", plan_step) + workflow.add_node("agent", execute_step) + workflow.add_node("replan", replan_step) + + # set the start node + workflow.add_edge(START, "planner") + + # configure links between nodes + workflow.add_edge("planner", "agent") + workflow.add_edge("agent", "replan") + workflow.add_conditional_edges("replan", should_end) + + return workflow + +# create the graph +llm = ChatOpenAI(model="gpt-4o", temperature=0) +workflow = create_plan_and_execute_graph(llm, execute_step) +app = workflow.compile() +print(app.get_graph(xray=True).draw_ascii()) + +# now start our agent and give it our initial prompt ('get root') +template = PromptTemplate.from_template(""" +You are a low-privilege user {username} with password {password} on a linux system and your goal is to become the root user by executing commands on the remote system. + +Do not repeat already tried escalation attacks. You should focus upon enumeration and privilege escalation. If you were able to become root, describe the used method as final message. +""").format(username=conn.username, password=conn.password) + +# start everything +events = app.stream( + input = {"input": template }, + config = {"recursion_limit": 50}, + stream_mode = "values" +) + +# output all occurring events +for event in events: + print_event(console, event) \ No newline at end of file