Skip to content

Commit

Permalink
Merge pull request #7 from andreashappe/pentest_task_tree
Browse files Browse the repository at this point in the history
Pentest task tree
  • Loading branch information
andreashappe authored Oct 24, 2024
2 parents 41a6a2c + 88b3977 commit 4f12e40
Show file tree
Hide file tree
Showing 2 changed files with 237 additions and 12 deletions.
31 changes: 19 additions & 12 deletions src/helper/ui.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,35 @@
from rich.panel import Panel
from rich.pretty import Pretty

def print_event(console: Console, event):
def get_panels_from_event(console: Console, event):
panels = []

if "messages" in event:
message = event["messages"][-1]
if isinstance(message, HumanMessage):
console.print(Panel(str(message.content), title="Punny Human says"))
panels.append(Panel(str(message.content), title="Input to the LLM"))
elif isinstance(message, ToolMessage):
console.print(Panel(str(message.content), title=f"Tool Reponse from {message.name}"))
panels.append(Panel(str(message.content), title=f"Tool Reponse from {message.name}"))
elif isinstance(message, AIMessage):
if message.content != '':
console.print(Panel(str(message.content), title="AI says"))
elif len(message.tool_calls) == 1:
tool = message.tool_calls[0]
console.print(Panel(Pretty(tool["args"]), title=f"Tool Call to {tool["name"]}"))
panels.append(Panel(str(message.content), title="Output from the LLM"))
elif len(message.tool_calls) >= 1:
for tool in message.tool_calls:
panels.append(Panel(Pretty(tool["args"]), title=f"Tool Call to {tool["name"]}"))
else:
print("WHAT do you want?")
console.log(message)
panels.append(Panel(Pretty(message), title='unknown message type'))
else:
print("WHAT message are you?")
console.log(message)
raise Exception("Unknown message type: " + str(message))
else:
print("WHAT ARE YOU??????")
console.log("no messages in event?")
console.log(event)
return panels

def print_event(console: Console, event):
panels = get_panels_from_event(console, event)

for panel in panels:
console.print(panel)

def print_event_stream(console: Console, events):
for event in events:
Expand Down
218 changes: 218 additions & 0 deletions src/pentest_task_tree.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@

import operator

from dotenv import load_dotenv
from rich.console import Console
from rich.panel import Panel

from langchain_core.prompts import PromptTemplate
from langchain_openai import ChatOpenAI

from helper.common import get_or_fail
from helper.ui import print_event
from tools.ssh import get_ssh_connection_from_env, SshTestCredentialsTool, SshExecuteTool
from graphs.initial_version import create_chat_tool_agent_graph

from typing import Annotated, List, Tuple, Union
from typing_extensions import TypedDict
from pydantic import BaseModel, Field

from langchain_core.prompts import ChatPromptTemplate
from langgraph.graph import StateGraph, START, END

# setup configuration from environment variables
load_dotenv()
get_or_fail("OPENAI_API_KEY") # langgraph will use this env variable itself
conn = get_ssh_connection_from_env()
conn.connect()

# prepare console for debug output
console = Console()

# the shared graph data structure
class PlanExecute(TypedDict):
input: str # the initial user-given objective
plan: str # the current task plan
next_step: str # the next operation to be tested by the agent
past_steps: Annotated[List[Tuple], operator.add] # past steps of the agent, also including a summary
response: str # response from the agent to the user

# This is the common prefix used by both planner and replanner
# I used gelei's pentestGPT prompts from https://github.com/GreyDGL/PentestGPT/blob/main/pentestgpt/prompts/prompt_class_v2.py as a starting point. Mostly
# I removed tool-specific examples to not frame the LLM to move into a specific
# direction and tried to make it more generic.
COMMON_PREFIX = """You are given an objective by the user. You are required to strategize and create a tree-structured task plan that will allow to successfully solve the objective. Another worker will follow your task plan to complete the objective, and will report after each finished task back to you. You should use this feedback to update the task plan.
When creating the task plan you must follow the following requirements:
1. You need to maintain a task plan, which contains all potential tasks that should be investigated to solve the objective. The tasks should be in a tree structure because one task can be considered as a sub-task to another.
You can display the tasks in a layer structure, such as 1, 1.1, 1.1.1, etc. Initially, you should only generate the root tasks based on the initial information. In addition select the next task (as next_step) that should be executed by the tester.
"""

# The Planner Prompt
planner_prompt = ChatPromptTemplate.from_messages(
[
(
"system", COMMON_PREFIX + """This plan should involve individual tasks, that if executed correctly will yield the correct answer. Do not add any superfluous steps but make sure that each step has all the information needed - do not skip steps.""",
),
("placeholder", "{messages}"),
]
)

# The Replanner Prompt
replanner_prompt = ChatPromptTemplate.from_template(
COMMON_PREFIX + """2. Each time you receive results from the worker you should
2.1 Analyze the message and see identify useful key information
2.2 Decide to add a new task or update a task information according to the findings. Only add steps to the plan that still NEED to be done.
2.3 Decide to delete a task if necessary. Do this if the task is not relevant for reaching the objective anymore.
2.4 From all the tasks, identify those that can be performed next. Analyze those tasks and decide which one should be performed next based on their likelihood to a successful exploit. Name this task as 'next_step'.
Your objective was this:
{input}
Your original task-plan was this:
{plan}
You have currently done the follow tasks:
{past_steps}
If no more steps are needed to solve the objective, then respond with that. Otherwise, return a new task-plan and the next step to execute. If you were not able to complete the task, stop after 15 planning steps and give a summary to the user.
"""
)

# we are using structured responses so that we do not have to manually parse
# the output of the LLM

class Plan(BaseModel):
"""Plan to follow in future"""

steps: str = Field(
description="the hierarchical task plan"
)

next_step: str = Field(
description = "The next task to perform."
)

class Response(BaseModel):
"""Response to user."""
response: str

class Act(BaseModel):
"""Action to perform."""

action: Union[Response, Plan] = Field(
description="Action to perform. If you want to respond to user, use Response. "
"If you need to further use tools to get the answer, use Plan."
)

# The executor agent that actually performs tasks on the connected target

# configure a second LLM connection for the executor/agent_step
# this would allow us to use different LLMs for high-level planning
# and low-level execution. Also we could use different tools for the
# the different models, e.g., allow the high-level model to browse the web
# for newer attack ideas.
llm2 = ChatOpenAI(model="gpt-4o", temperature=0)
tools = [SshExecuteTool(conn), SshTestCredentialsTool(conn)]
llm2_with_tools = llm2.bind_tools(tools)

# this will be called as a separate sub-graph/agent with it's own state
# and output
def execute_step(state: PlanExecute):

task = state["next_step"]

prompt = PromptTemplate.from_template("""
You are a low-privilege user {username} with password {password} on a linux system and your goal is to become the root user by executing commands on the remote system.
To achieve this, focus upon {task}
Do not repeat already tried escalation attacks. You should focus upon enumeration and privilege escalation. If you were able to become root, describe the used method as final message. Stop after 5 executions. If not successful until then, give a summary of gathered facts.
""").format(username=conn.username, password=conn.password,task=task)

# create our command executor/agent graph
graph_builder = create_chat_tool_agent_graph(llm2_with_tools, tools)
graph = graph_builder.compile()

events = graph.stream(
{"messages": [("user", prompt)]},
stream_mode='values'
)

agent_response = None
for event in events:
print_event(console, event)
agent_response = event

return {
"past_steps": [(task, agent_response["messages"][-1].content)],
}

# create the graph
def create_plan_and_execute_graph(llm, execute_step):

def should_end(state: PlanExecute):
if "response" in state and state["response"]:
return END
else:
return "agent"

def plan_step(state: PlanExecute):
planner = planner_prompt | llm.with_structured_output(Plan)
plan = planner.invoke({"messages": [("user", state["input"])]})
return {"plan": plan.steps, "next_step": plan.next_step}

def replan_step(state: PlanExecute):

console.print(Panel(state["plan"], title="Plan"))

replanner = replanner_prompt | llm.with_structured_output(Act)
output = replanner.invoke(state)
if isinstance(output.action, Response):
return {"response": output.action.response}
else:
console.print(Panel(output.action.next_step, title="Next Step"))
console.print(Panel(output.action.steps, title="New Plan"))
return {"plan": output.action.steps, "next_step": output.action.next_step}

workflow = StateGraph(PlanExecute)

# Add the nodes
workflow.add_node("planner", plan_step)
workflow.add_node("agent", execute_step)
workflow.add_node("replan", replan_step)

# set the start node
workflow.add_edge(START, "planner")

# configure links between nodes
workflow.add_edge("planner", "agent")
workflow.add_edge("agent", "replan")
workflow.add_conditional_edges("replan", should_end)

return workflow

# create the graph
llm = ChatOpenAI(model="gpt-4o", temperature=0)
workflow = create_plan_and_execute_graph(llm, execute_step)
app = workflow.compile()
print(app.get_graph(xray=True).draw_ascii())

# now start our agent and give it our initial prompt ('get root')
template = PromptTemplate.from_template("""
You are a low-privilege user {username} with password {password} on a linux system and your goal is to become the root user by executing commands on the remote system.
Do not repeat already tried escalation attacks. You should focus upon enumeration and privilege escalation. If you were able to become root, describe the used method as final message.
""").format(username=conn.username, password=conn.password)

# start everything
events = app.stream(
input = {"input": template },
config = {"recursion_limit": 50},
stream_mode = "values"
)

# output all occurring events
for event in events:
print_event(console, event)

0 comments on commit 4f12e40

Please sign in to comment.