This repository has been archived by the owner on Nov 9, 2018. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 2
/
sudsssltransport.py
144 lines (127 loc) · 5.38 KB
/
sudsssltransport.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# -*- coding: utf-8 -*-
from suds import client, BytesIO
from suds.transport.http import Transport, Reply, TransportError
import requests
from requests import HTTPError, RequestException, ConnectionError
from requests.exceptions import SSLError
class SudsClientStrictSSL(client.Client):
"""
Provide Proper SSL handling for SUDS.
"""
def __init__(self, url, **kwargs):
with _StrictSSLHTTPTransportAuthenticated(**kwargs) as transport:
kwargs = transport.suds_kwargs
kwargs.update({'transport': transport})
client.Client.__init__(self, url, **kwargs)
class _StrictSSLHTTPTransportAuthenticated(Transport):
"""
suds does not verify the host when connecting over https.
This is due to a deficiency in urllib2.
This class is used by SudsClientStrictSSL to override the standard suds
transport with one based on requests and ensures that when connecting over
SSL, the handshake is properly is verified.
"""
def __init__(self, **kwargs):
# from requests.request: ' optional) if ``True``,
# the SSL cert will be verified. A CA_BUNDLE path can also be provided.
self.verify_ssl = kwargs.get('verify_ssl', True)
# from requests.request: '(optional) if String,path to ssl client cert
# file (.pem). If Tuple, ('cert', 'key') pair.'
self.client_cert = kwargs.get('client_cert', None)
self.username = kwargs.get('username', None)
self.password = kwargs.get('password', None)
#This option rewrites all paths given by a WSDL to HTTPS
#useful only when you know the WSDL shouldn't be handing you HTTP
self.rewrite_to_https = kwargs.get('rewrite_to_https', False)
self.proxy = kwargs.get('proxy', {})
self.suds_kwargs = {
key: value for key, value in list(kwargs.items()) if key not in (
'username', 'password', 'client_cert',
'verify_ssl', 'proxy', 'rewrite_to_https'
)
}
Transport.__init__(self, **self.suds_kwargs)
self.session = requests.Session()
def __enter__(self):
return self
def __exit__(self, type_, value, traceback):
"""clean up on object destruction"""
self.session.close()
def open(self, request):
"""
Perform an HTTP GET to the URL in the given suds.transport.Request and
return a file-like object of the request body.
"""
response = None
try:
if self.verify_ssl and request.url.startswith('http://'):
if self.rewrite_to_https:
request.url = 'https://' + request.url[7:]
else:
raise SSLError("can't verify SSL cert over plain HTTP")
response = self.session.get(
url=request.url,
data=request.message,
headers=request.headers,
proxies=self.proxy,
verify=self.verify_ssl,
cert=self.getclientcertificate(),
)
response.raise_for_status()
except SSLError as e:
raise TransportError(str(e), None)
except (RequestException, HTTPError, ConnectionError) as e:
raise TransportError(str(e), None)
return BytesIO(response.content)
def send(self, request):
"""
Converts the suds request into requests.request and HTTP POSTs
to the url given in the @request.
If verify_ssl is True, then performs some additional checks to make sure
SSL is being handled in a sane fashion.
"""
result = None
try:
if self.verify_ssl and request.url.startswith('http://'):
if self.rewrite_to_https:
request.url = 'https://' + request.url[7:]
else:
raise SSLError("can't verify SSL cert over plain HTTP link")
response = self.session.post(
url=request.url,
headers=request.headers,
data=request.message,
proxies=self.proxy,
verify=self.verify_ssl,
auth=self.getcredentials(),
cert=self.getclientcertificate()
)
response.raise_for_status()
result = Reply(response.status_code, response.headers, response.content)
if response.status_code == 401:
raise TransportError(response.status_code)
except SSLError as e:
raise TransportError(str(e), None)
except (HTTPError, ConnectionError, RequestException) as e:
if e.response.status_code in (202, 204):
result = None
else:
raise TransportError(
str(e),
e.response.status_code,
BytesIO(e.response.content)
)
return result
def getcredentials(self):
"""
Get a tuple of @username @password used for HTTP basic authorisation.
This method could be overridden to get credentials from an external service
"""
return (self.username, self.password)
def getclientcertificate(self):
"""
stub method that simply returns @self.client_cert
This method could be overridden to e.g. search /
create a pem from a list of files making up a chain
"""
return self.client_cert