forked from CVL-dev/cvl-ssh-utils
-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathpasswordAuth.py
202 lines (182 loc) · 8.97 KB
/
passwordAuth.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
import wx
class passwordAuth(object):
def __init__(self,displayStrings,parent,progressDialog,keydistObject,authorizedKeysFile=None,onFirstLogin=None,*args,**kwargs):
self.displayStrings=displayStrings
self.pubkey=None
self.username=None
self.host=None
self.parent=parent
self.progressDialog=progressDialog
self.authorizedKeysFile=authorizedKeysFile
self.keydistObject=keydistObject
self.onFirstLogin=onFirstLogin
if self.authorizedKeysFile==None:
self.authorizedKeysFile="~/.ssh/authorized_keys"
def getPass(self,queue):
dlg=wx.PasswordEntryDialog(self.parent,self.displayStrings.passwdPrompt)
if self.progressDialog is not None:
self.progressDialog.Hide()
retval=dlg.ShowModal()
if self.progressDialog is not None:
self.progressDialog.Show()
if retval==wx.ID_OK:
queue.put(dlg.GetValue())
else:
queue.put(None)
dlg.Destroy()
def copyID(self,keyModel,username=None,host=None):
from logger.Logger import logger
logger.debug("in password auth, copyID")
if username!=None:
self.username=username
if host!=None:
self.host=host
if self.username==None:
raise Exception("I don't know what username you are trying to log in with")
import sys
self.keyModel=keyModel
self.pubkey=self.keyModel.getPubKey()
import paramiko as ssh
import Queue
logger.debug("in password auth, copyID, creating ssh client")
sshClient = ssh.SSHClient()
sshClient.set_missing_host_key_policy(ssh.AutoAddPolicy())
passwd=""
notConnected=True
while notConnected:
queue=Queue.Queue()
try:
sshClient.connect(hostname=self.host,timeout=10,username=self.username,password=passwd,allow_agent=False,look_for_keys=False)
notConnected=False
except ssh.AuthenticationException:
logger.debug("in password auth, copyID, Authentication Exception")
wx.CallAfter(self.getPass,queue)
passwd=queue.get()
if passwd==None:
raise Exception("Login Canceled")
except Exception as e:
import traceback
raise e
logger.debug("in password auth, copyID, connected")
if self.onFirstLogin!=None:
(stdin,stdout,stderr)=sshClient.exec_command(self.onFirstLogin)
err=stderr.readlines()
if err!=[]:
logger.debug("copy id saw the error message %s"%err)
raise Exception(self.displayStrings.onFirstLoginFailure)
# SSH keys won't work if the user's home directory is writeable by other users.
writeableDirectoryErrorMessage = "" + \
"Your home directory is writeable by users other than yourself. " + \
"As a result, you won't be able to authenticate with SSH keys, so you can't use the Launcher. " + \
"Please correct the permissions on your home directory, e.g.\n\n" + \
"chmod 700 ~"
(stdin,stdout,stderr)=sshClient.exec_command('ls -ld ~ | grep -q "^d....w" && echo HOME_DIRECTORY_WRITEABLE_BY_OTHER_USERS')
err=stdout.readlines()
if err!=[]:
raise Exception(writeableDirectoryErrorMessage)
(stdin,stdout,stderr)=sshClient.exec_command('ls -ld ~ | grep -q "^d.......w" && echo HOME_DIRECTORY_WRITEABLE_BY_OTHER_USERS')
err=stdout.readlines()
if err!=[]:
raise Exception(writeableDirectoryErrorMessage)
err=stderr.readlines()
if err!=[]:
pass
(stdin,stdout,stderr)=sshClient.exec_command("/bin/mkdir -p ~/.ssh")
err=stderr.readlines()
if err!=[]:
pass
#raise Exception(err)
(stdin,stdout,stderr)=sshClient.exec_command("/bin/chmod 700 ~/.ssh")
err=stderr.readlines()
if err!=[]:
pass
#raise Exception
(stdin,stdout,stderr)=sshClient.exec_command("/bin/touch %s"%(self.authorizedKeysFile))
err=stderr.readlines()
if err!=[]:
pass
#raise Exception
(stdin,stdout,stderr)=sshClient.exec_command("/bin/chmod 600 %s"%(self.authorizedKeysFile))
err=stderr.readlines()
if err!=[]:
pass
#raise Exception
(stdin,stdout,stderr)=sshClient.exec_command("/bin/echo \"%s\" >> %s"%(self.pubkey.strip(),self.authorizedKeysFile))
err=stderr.readlines()
if err!=[]:
pass
raise Exception('The program was unable to write a file in your home directory. This might be because you have exceeded your disk quota. You should log in manually and clean up some files if this is the case')
sshClient.close()
def deleteRemoteKey(self):
from logger.Logger import logger
import traceback
if self.pubkey!=None:
try:
key=self.pubkey.split(' ')[1]
except:
key=self.pubkey
import paramiko as ssh
sshClient = ssh.SSHClient()
sshClient.set_missing_host_key_policy(ssh.AutoAddPolicy())
try:
sshClient.connect(hostname=self.host,timeout=10,username=self.username,password=None,allow_agent=True,look_for_keys=False)
cmd="sed \'\\#{key}# D\' -i {authorizedKeysFile}"
command = cmd.format(key=key,authorizedKeysFile=self.authorizedKeysFile)
(stdin,stdout,stderr)=sshClient.exec_command(command)
logger.debug("deleted remote key")
err=stderr.readlines()
if err!=[]:
raise Exception("unable to delete remote key")
except:
logger.debug("unable to delete remote key")
logger.debug(traceback.format_exc())
def testAuth(self,keyModel,username=None,host=None,timeout=10):
if username!=None:
self.username=username
if host!=None:
self.host=host
if self.username==None:
raise Exception("I don't know what username you are tring to login with")
from logger.Logger import logger
logger.debug("in passwordAuth.textAuth")
import tempfile
import sys
from logger.Logger import logger
import subprocess
fd=tempfile.NamedTemporaryFile(delete=True)
path=fd.name
fd.close()
auth=False
try:
ssh_cmd = ['{sshbinary}','-o','ConnectTimeout=%s'%timeout,'-o','IdentityFile="{nonexistantpath}"','-o','PasswordAuthentication=no','-o','ChallengeResponseAuthentication=no','-o','KbdInteractiveAuthentication=no','-o','PubkeyAuthentication=yes','-o','StrictHostKeyChecking=no','-l','{login}','{host}','echo','"success_testauth"']
cmd=[]
for s in ssh_cmd:
cmd.append(s.format(sshbinary=self.keydistObject.keyModel.sshpaths.sshBinary,login=self.username, host=self.host, nonexistantpath=path))
logger.debug('testAuthThread: attempting: %s'%cmd)
if sys.platform.startswith("win"):
ssh = subprocess.Popen(" ".join(cmd),shell=True,stdout=subprocess.PIPE,stderr=subprocess.STDOUT,universal_newlines=True, startupinfo=self.keydistObject.startupinfo, creationflags=self.keydistObject.creationflags)
else:
ssh = subprocess.Popen(cmd,shell=False,stdout=subprocess.PIPE,stderr=subprocess.STDOUT,universal_newlines=True, startupinfo=self.keydistObject.startupinfo, creationflags=self.keydistObject.creationflags)
stdout, stderr = ssh.communicate()
ssh.wait()
logger.debug("passwordAuth.testAuth: stdout of ssh command: " + str(stdout))
logger.debug("passwordAuth.testAuth: stderr of ssh command: " + str(stderr))
if 'Could not resolve hostname' in stdout:
logger.debug('Network error.')
auth=False
elif 'success_testauth' in stdout:
logger.debug("passwordAuth.testAuth: got success_testauth in stdout :)")
auth=True
elif 'Agent admitted' in stdout:
logger.debug("passwordAuth.testAuth: the ssh agent has an error. Try rebooting the computer")
self.keydistObject.cancel("Sorry, there is a problem with the SSH agent.\nThis sort of thing usually occurs if you delete your key and create a new one.\nThe easiest solution is to reboot your computer and try again.")
return
else:
logger.debug("passwordAuth.testAuth: did not see success_testauth in stdout, posting EVT_KEYDIST_AUTHFAIL event")
auth=False
except Exception as e:
import traceback
logger.debug("passwordAuth.testAuth raised an exception %s"%e)
logger.debug(traceback.format_exc())
raise e
return auth