-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathkexdhgroup14sha1.py
155 lines (113 loc) · 4.2 KB
/
kexdhgroup14sha1.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# Copyright (c) 2014-2015 Sam Maloney.
# License: LGPL
import llog
import asyncio
import logging
from hashlib import sha1
import dhgroup14
from sshexception import SshException
import sshtype
import packet as mnp
log = logging.getLogger(__name__)
class KexDhGroup14Sha1(object):
name = "diffie-hellman-group14-sha1"
def __init__(self, protocol):
self.dh = dhgroup14.DhGroup14()
self.protocol = protocol
@asyncio.coroutine
def run(self):
dh = self.dh
p = self.protocol
server_mode = p.server_mode
dh.generate_x()
if log.isEnabledFor(logging.DEBUG):
log.debug("x=[{}]".format(dh.x))
dh.generate_e()
if log.isEnabledFor(logging.DEBUG):
log.debug("e=[{}]".format(dh.e))
if server_mode:
pkt = yield from p.read_packet()
if not pkt:
return False
m = mnp.SshKexdhInitMessage(pkt)
if log.isEnabledFor(logging.DEBUG):
log.debug("Client sent e=[{}].".format(m.e))
self._parse_kexdh_init(m)
m = mnp.SshNewKeysMessage()
m.encode()
p.write_packet(m)
return True
# Client mode:
m = mnp.SshKexdhInitMessage()
m.e = dh.e
m.encode()
p.write_packet(m)
pkt = yield from p.read_packet()
if not pkt:
return False
m = mnp.SshKexdhReplyMessage(pkt)
r = yield from self._parse_kexdh_reply(m)
if not r:
# Client signature failed OR the client sig was valid but the id
# now verified is not wanted/allowed for connection.
return False
m = mnp.SshNewKeysMessage()
m.encode()
p.write_packet(m)
# Signal successful authentication.
return True
@asyncio.coroutine
def _parse_kexdh_reply(self, m):
# The client runs this function.
host_key = m.host_key
server_f = self.dh.f = m.f
if (server_f < 1) or (server_f > self.dh.P - 1):
raise SshException('Server kex "f" is out of range')
K = self.dh.calculate_k()
if log.isEnabledFor(logging.DEBUG):
log.debug("K=[{}].".format(K))
# H = (V_C || V_S || I_C || I_S || K_S || e || f || K).
hm = bytearray()
hm += sshtype.encodeString(self.protocol.local_banner)
hm += sshtype.encodeString(self.protocol.remote_banner)
hm += sshtype.encodeBinary(self.protocol.local_kex_init_message)
hm += sshtype.encodeBinary(self.protocol.remote_kex_init_message)
hm += sshtype.encodeBinary(host_key)
hm += sshtype.encodeMpint(self.dh.e)
hm += sshtype.encodeMpint(server_f)
hm += sshtype.encodeMpint(K)
H = sha1(hm).digest()
self.protocol.set_K_H(K, H)
log.info("Verifying signature...")
r = yield from self.protocol.verify_server_key(host_key, m.signature)
return r
def _parse_kexdh_init(self, m):
# The server runs this function.
client_e = self.dh.f = m.e
if (client_e < 1) or (client_e > self.dh.P - 1):
raise SshException("Client kex 'e' is out of range")
K = self.dh.calculate_k()
if log.isEnabledFor(logging.DEBUG):
log.debug("K=[{}].".format(K))
key = self.protocol.server_key.asbytes()
# H = (V_C || V_S || I_C || I_S || K_S || e || f || K).
hm = bytearray()
hm += sshtype.encodeString(self.protocol.remote_banner)
hm += sshtype.encodeString(self.protocol.local_banner)
hm += sshtype.encodeBinary(self.protocol.remote_kex_init_message)
hm += sshtype.encodeBinary(self.protocol.local_kex_init_message)
hm += sshtype.encodeBinary(key)
hm += sshtype.encodeMpint(client_e)
hm += sshtype.encodeMpint(self.dh.e)
hm += sshtype.encodeMpint(K)
H = sha1(hm).digest()
self.protocol.set_K_H(K, H)
# Sign it.
sig = self.protocol.server_key.sign_ssh_data(H)
# Send reply.
m = mnp.SshKexdhReplyMessage()
m.host_key = key
m.f = self.dh.e
m.signature = sig
m.encode()
self.protocol.write_packet(m)