-
Notifications
You must be signed in to change notification settings - Fork 0
/
ftp-size-checker.py
121 lines (106 loc) · 3.87 KB
/
ftp-size-checker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import sys
import signal
import argparse
from ftplib import FTP, error_perm
from io import StringIO
from contextlib import contextmanager
from humanize import naturalsize
from socket import gaierror, timeout
class TimeoutException(Exception): pass
class FtpSizeChecker(object):
def __init__(
self,
host: str,
timeout: int,
username: str,
password: str = None,
directory: str = None,
) -> None:
self.host = host
self.username = username
self.password = password
self.directory = directory
self.timeout = timeout
self.error = None
@contextmanager
def time_limit(self):
def signal_handler(signum, frame):
raise TimeoutException("Timed out!")
signal.signal(signal.SIGALRM, signal_handler)
signal.alarm(self.timeout)
try:
yield
finally:
signal.alarm(0)
def run(self):
try:
ftp = FTP(self.host, timeout=10)
if self.username == 'anonymous':
ftp.login()
else:
ftp.login(
user = self.username,
passwd = self.password
)
if self.directory:
ftp.cwd(self.directory)
except (TimeoutError, gaierror, error_perm, timeout) as e:
self.error = f"[ERROR] {e}"
return
# Create the in-memory "file"
temp_out = StringIO()
# Replace default stdout (terminal) with our stream
sys.stdout = temp_out
try:
with self.time_limit():
ftp.dir('-Rt')
except TimeoutException:
sys.stdout = sys.__stdout__
self.error = f"[ERROR] Max timeout of {self.timeout} seconds reached!"
return
# Restore the original output stream to the terminal.
sys.stdout = sys.__stdout__
#Remove empty lines
a = [ i for i in temp_out.getvalue().splitlines() if len(i) != 0 ]
#Remove lines that belongs to a directory or symbolic link
b = [ i for i in a if i[0] not in ['l','d','.']]
total_bytes = 0
for line in b:
total_bytes += int([i for i in line.split(' ') if i != '' ][4])
return naturalsize(total_bytes)
if __name__ == '__main__':
#Create Command Line Arguments
parser = argparse.ArgumentParser(description='FTP directory total file size checker.')
parser.add_argument('--host', help='FTP hostname or IP address')
parser.add_argument('--username', '-u', default = 'anonymous', help='FTP Username')
parser.add_argument('--password', '-p', help='FTP Password')
parser.add_argument('--directory', '-d', help='FTP Directory')
parser.add_argument('--timeout',
'-t',
default = 60,
type = int,
help = 'Max timeout for fetching the FTP directory list')
args = parser.parse_args()
#Initialize the FtpSizeChecker class object
ftp_size = FtpSizeChecker(
host = args.host,
username = args.username,
password = args.password,
directory = args.directory,
timeout = args.timeout,
)
#Create a context date for displaying the input data
context = '\n'.join([
f"[INFO] FTP Host: {args.host}",
f"[INFO] FTP Username: {args.username}" if args.username else "[INFO] FTP Username: anonymous",
f"[INFO] FTP Directory: {args.directory}" if args.directory else "[INFO] FTP Directory: '/'",
f"[INFO] FTP Timeout: {args.timeout} secs" if args.timeout else "[INFO] FTP Timeout: 60 secs",
])
print(context)
#Trigger the FTP connection and store the output data to a variable
total_bytes = ftp_size.run()
#Display the results
if total_bytes:
print(f"[SUCCESS] Total File Size in Directory: {total_bytes}")
else:
print(ftp_size.error)