-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathpyknock-client
executable file
·102 lines (85 loc) · 3.05 KB
/
pyknock-client
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
#!/usr/bin/env python
import os
import sys
import socket
import hmac
import argparse
import time
from pyknock import *
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("command",
help="command: open or close",
choices=["open", "close"])
parser.add_argument("address",
help="remote address",
metavar="HOST")
parser.add_argument("-p",
"--port",
help="remote port",
type=check_port,
default=60120)
parser.add_argument("psk",
help="pre-shared key "
"used to authenticate ourselves to knocked peer",
type=psk,
metavar="PSK")
parser.add_argument("-S",
"--sign-address",
help="sign specified address "
"instead of socket source address")
parser.add_argument("-s",
"--source-address",
help="use following source address to send packet")
return parser.parse_args()
def panic(msg, code):
sys.stderr.write(msg + os.linesep)
sys.exit(code)
pass
def main():
args = parse_args()
try:
dst_ai = socket.getaddrinfo(args.address,
args.port,
socket.AF_UNSPEC,
socket.SOCK_DGRAM,
socket.IPPROTO_UDP)
except Exception as e:
sys.stderr.write("Unable to resolve destination address %s: %s%s" %
(repr(args.address), str(e), os.linesep))
sys.exit(3)
assert dst_ai, "Destionation address info must not be empty"
src_af = socket.AF_UNSPEC
if args.source_address:
try:
src_af = detect_af(args.source_address)
except:
panic("Malformed source address", 4)
sign_af = socket.AF_UNSPEC
if args.sign_address:
try:
sign_af = detect_af(args.sign_address)
except:
panic("Malformed sign address", 5)
opcode = CODE_OPEN if args.command == 'open' else CODE_CLOSE
for ai_entry in dst_ai:
af = ai_entry[0]
if src_af != socket.AF_UNSPEC and af != src_af:
continue
s = socket.socket(af, socket.SOCK_DGRAM)
if args.source_address:
s.bind((args.source_address, 0))
s.connect((ai_entry[4][0], args.port))
if args.sign_address:
myip = socket.inet_pton(sign_af, args.sign_address)
else:
myip = socket.inet_pton(af, s.getsockname()[0])
msg = struct.pack('<Bdi',
opcode,
time.time(),
sign_af if args.sign_address else af) + myip
digest = hmac.new(args.psk, msg, hashlib.sha256).digest()
s.sendall(digest + msg)
s.close()
if __name__ == '__main__':
main()