-
Notifications
You must be signed in to change notification settings - Fork 3
/
nfs.py
106 lines (95 loc) · 5.25 KB
/
nfs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# -*- coding: utf-8 -*-
"""
this file implements the core functionality to hunt for any sensitive files on NFS shares.
"""
__author__ = "Lukas Reiter"
__license__ = "GPL v3.0"
__copyright__ = """Copyright 2018 Lukas Reiter
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
__version__ = 0.1
import os
import stat
import libnfs
import logging
import argparse
from datetime import datetime
from datetime import timezone
from database.model import Path
from database.model import File
from database.model import HunterType
from hunters.modules.core import BaseSensitiveFileHunter
logger = logging.getLogger('nfs')
class NfsSensitiveFileHunter(BaseSensitiveFileHunter):
"""
This class implements the core functionality to hunt for files on NFS shares.
"""
def __init__(self, args: argparse.Namespace, **kwargs):
super().__init__(args, address=args.host, port=args.port, service_name=HunterType.nfs, **kwargs)
self.path = args.path
self.version = args.version
self.connection_string = "nfs://{}/{}?version={}&nfsport={}".format(self.service.host.address,
self.path,
self.version,
self.service.port)
self.client = libnfs.NFS(self.connection_string)
@staticmethod
def add_argparse_arguments(parser: argparse.ArgumentParser) -> None:
"""
This method initializes command line arguments that are required by the current module.
:param parser: The argument parser to which the required command line arguments shall be added.
:return:
"""
BaseSensitiveFileHunter.add_argparse_arguments(parser)
parser.add_argument('--version', type=int, choices=[3, 4], default=3, help='NFS version to use')
parser.add_argument('--domains', type=str, nargs="*", metavar="USERDOMAIN",
help='the name of the domain name of existing microsoft active directories. if specified, '
'then the specified values become additional file content matching rules with'
'search pattern: "USERDOMAIN[/\\]\\w+". the objective is the identification domain '
'user names in files.')
nfs_target_group = parser.add_argument_group('target information')
nfs_target_group.add_argument('--host', type=str, metavar="HOST", help="the target NFS service's IP address")
nfs_target_group.add_argument('--port', type=int, default=2049, metavar="PORT",
help="the target NFS service's port")
nfs_target_group.add_argument('--path', type=str, metavar="PATH", help="path to enumerate")
def _enumerate(self, cwd: str = "") -> None:
"""
This method enumerates all files on the given service.
:return:
"""
items = self.client.listdir(cwd)
for item in items:
if item not in [".", ".."]:
full_path = os.path.join(cwd, item)
stats = self.client.stat(full_path)
file_size = stats['size']
if stat.S_ISDIR(stats['mode']):
self._enumerate(full_path)
else:
path = Path(service=self.service,
full_path=full_path,
access_time=datetime.fromtimestamp(stats['atime']['sec'], tz=timezone.utc),
modified_time=datetime.fromtimestamp(stats['mtime']['sec'], tz=timezone.utc),
creation_time=datetime.fromtimestamp(stats['ctime']['sec'], tz=timezone.utc))
if self.is_file_size_below_threshold(path, file_size):
content = self.client.open(full_path, mode='rb').read()
path.file = File(content=bytes(content))
# Add file to queue
self.file_queue.put(path)
elif file_size > 0:
path.file = File(content="[file ({}) not imported as file size ({}) "
"is above threshold]".format(str(path), file_size).encode('utf-8'))
path.file.size_bytes = file_size
relevance = self._analyze_path_name(path)
if self._args.debug and not relevance:
logger.debug("ignoring file (threshold: above, size: {}): {}".format(file_size,
str(path)))