-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfacts_extractor.py
executable file
·148 lines (119 loc) · 4.17 KB
/
facts_extractor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
#!/usr/bin/env python3
import argparse
from pathlib import Path
import dill
from pddl.formatter import problem_to_string
from modules.connectors import ListenConnector, RemoteConnector, SSHConnector
from modules.extractor import FactsExtractor
from modules.logger import Logger
from modules.encoder import Encoder
EXTRACTOR_PICKLE_PATH = Path("extractor_data.pkl")
OUTPUT_PROBLEMS_DIRECTORY = Path("generated_problems")
logger = Logger(__name__)
def main():
parser = argparse.ArgumentParser(description="Run phases on an IP address")
parser.add_argument(
"-p",
type=int,
help="Port to connect or listen on (depending on -r, -l or SSH)",
required=True,
)
parser.add_argument(
"-t",
type=str,
help="Target to connect to (to be used with -r or SSH)",
required=False,
)
parser.add_argument(
"-d", type=str, help="Reference PDDL domain file", required=True
)
parser.add_argument(
"-n",
type=str,
help="Label name for the results: pickled facts and problems",
required=False,
)
parser.add_argument(
"-fc",
action="store_true",
help="Assume CVE are not patched",
required=False,
)
connection_group = parser.add_mutually_exclusive_group(required=True)
connection_group.add_argument(
"-l",
action="store_true",
help="Bind to a port - listen for reverse shell connections instead of connecting to host",
)
connection_group.add_argument(
"-r",
action="store_true",
help="Connect back to host's exposed shell",
)
connection_group.add_argument(
"-s",
action="store_true",
help="Connect to the host via SSH",
)
ssh_group = connection_group.add_argument_group("SSH", "SSH related arguments")
ssh_group.add_argument("-u", type=str, help="User for SSH connection")
ssh_group.add_argument("-k", type=str, help="Private key for SSH connection")
args = parser.parse_args()
host = args.t
port = args.p
domain = Path(args.d)
label = args.n
# append label to stems
if label:
global EXTRACTOR_PICKLE_PATH
EXTRACTOR_PICKLE_PATH = EXTRACTOR_PICKLE_PATH.with_stem(
f"{EXTRACTOR_PICKLE_PATH.stem}_{label}"
)
global OUTPUT_PROBLEMS_DIRECTORY
OUTPUT_PROBLEMS_DIRECTORY = OUTPUT_PROBLEMS_DIRECTORY.with_stem(
f"{OUTPUT_PROBLEMS_DIRECTORY.stem}_{label}"
)
if not domain.exists():
logger.error(f"Domain file {domain} does not exist")
exit(-1)
if port not in range(1, 65536):
logger.error(f"Invalid port: {port}")
exit(-1)
if args.r and not host:
logger.error("Please set -t when using -r")
exit(-1)
container_pickle = Path(EXTRACTOR_PICKLE_PATH)
if container_pickle.exists():
logger.info("Loading pickled facts...")
with open(container_pickle, "rb") as f:
facts_container = dill.load(f)
else:
if args.u and args.k:
connector = SSHConnector(host, args.u, Path(args.k))
elif args.l:
connector = ListenConnector(port)
else:
connector = RemoteConnector(host, port)
connector.initialize()
fe = FactsExtractor(connector)
if args.fc:
fe.extract(args.fc)
else:
fe.extract()
facts_container = fe.container
logger.info(f"Pickling facts to {EXTRACTOR_PICKLE_PATH}")
with open(EXTRACTOR_PICKLE_PATH, "wb") as f:
dill.dump(facts_container, f)
encoder = Encoder(facts_container)
problems = encoder.generate_problems(domain)
if not OUTPUT_PROBLEMS_DIRECTORY.exists():
logger.info(f"Creating output directory {OUTPUT_PROBLEMS_DIRECTORY}...")
OUTPUT_PROBLEMS_DIRECTORY.mkdir()
for name, problem in problems.items():
problem_filename = f"{name}.pddl"
problem_file = OUTPUT_PROBLEMS_DIRECTORY / problem_filename
with open(problem_file, "w") as f:
f.write(problem_to_string(problem))
logger.info(f"Written problem {problem_file}")
if __name__ == "__main__":
main()