-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathapi.py
288 lines (266 loc) · 11.6 KB
/
api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
from flask import request, url_for, Flask, jsonify, flash
import parameters
from parameters import keycloak_server, keycloak_realm, client_id, client_secret, ssl_mode, admin_id, admin_pwd, save_stats
import os
import os.path
import requests
app = Flask(__name__)
import logging
import json
import sys
import ssl
import io
import csv
from flask import make_response
import time
def __init__():
global logger
logger = logging.getLogger("kc-pep."+__name__)
logging.basicConfig(level=logging.DEBUG)
__init__()
@app.errorhandler(Exception)
def unhandle_request_error(error):
import traceback as tb
logger.error("An unhandle exception occured:{}".format(error))
response = jsonify(dict(message=str(error)))
response.status_code= 500
return response
@app.route('/setup', methods=['POST'])
def setup():
csvList = [["user_name","pwd","device_id","scope"]]
si = io.StringIO()
cw = csv.writer(si)
# Read parameters
input_json_body = request.json
entries = input_json_body ['entries']
user_prefix = input_json_body ['user_prefix']
role_name = input_json_body ['role_name']
start_index = input_json_body['start_index']
# Response
response = dict(status_code="", message="", data=[])
# Get admin access token
admin_AT = get_admin_access_token()
if admin_AT == "":
response["message"] = "Unable to login admin user."
response["status_code"] = 400
response = jsonify(dict(message=str(error)))
response.status_code= 400
return response
# get role ID
role_id = get_role_id(admin_AT,role_name)
if role_id == "":
response["message"] = "Unable to obtain role_id."
response["status_code"] = 400
response = jsonify(dict(message=str(error)))
response.status_code= 400
return response
# Creat Users, Resources, Policies
for entry in range(entries):
index = entry + int(start_index)
userName, pwd = add_user(admin_AT,index,user_prefix,role_id)
if userName != "" and pwd != "":
user_id = get_user_id(admin_AT,userName)
logger.info("user_id => {0}".format(user_id))
assign_role_to_user(admin_AT,user_id,role_id,role_name)
tmpList=[userName,pwd,"deviceB","access"]
csvList.append(tmpList)
# Response
for row in csvList:
cw.writerow(row)
output = make_response(si.getvalue())
output.headers["Content-Disposition"] = "attachment; filename=export.csv"
output.headers["Content-type"] = "text/csv"
return output
def get_admin_access_token():
try:
kc_token_URL = keycloak_server + "realms/" + keycloak_realm + "/protocol/openid-connect/token"
grant_type = "password"
payload = {"grant_type":grant_type, "client_id":client_id, "client_secret": client_secret, "username":admin_id, "password":admin_pwd}
r = requests.post(kc_token_URL, data=payload)
logger.info(r.status_code)
response = r.json()
access_token = response['access_token']
logger.info("access token: {0}".format(access_token))
return access_token
except Exception as e:
logger.error(e)
raise e
def get_role_id(access_token,role_name):
role_id = ""
role_api_url = keycloak_server + "admin/realms/" + keycloak_realm + "/roles/" + role_name
headers = {'Authorization': 'Bearer ' + access_token}
r = requests.get(role_api_url,headers=headers)
logger.info("Get role id response. \n status_code => {0} \n response_message => {1}".format(r.status_code,r.text))
if r.status_code == 200:
result_json = r.json()
if "id" in result_json.keys():
role_id = result_json["id"]
return role_id
def add_user(access_token,entry,user_prefix,role_id):
user_id = user_prefix + str(entry)
pwd = user_prefix + str(entry)
fName = user_prefix
lName = str(entry)
email = user_prefix + str(entry) + "@test.com"
try:
new_user_data = {"email": email,
"username": user_id,
"enabled": True,
"firstName": fName,
"lastName": lName,
"realmRoles": ["user_default", ],
"credentials": [{"value": pwd,"type": "password",}]
}
user_uri = keycloak_server + "admin/realms/" + keycloak_realm + "/users"
headers = {'Authorization': 'Bearer ' + access_token}
r = requests.post(user_uri,json=new_user_data,headers=headers)
logger.info("Status Code => {0} \n Text => {1}".format(r.status_code,r.text))
if r.status_code == 201:
return user_id,pwd
return "",""
except Exception as error:
logger.error(error)
raise error
def get_user_id(access_token,user_name):
user_id = ""
users_api_url = keycloak_server + "admin/realms/" + keycloak_realm + "/users?username=" + user_name
headers = {'Authorization': 'Bearer ' + access_token}
r = requests.get(users_api_url,headers=headers)
logger.info("Get user id response. \n status_code => {0} \n response_message => {1}".format(r.status_code,r.text))
if r.status_code == 200:
ret = r.json()
if ret!=[]:
user_id = r.json()[0]['id']
return user_id
def assign_role_to_user(access_token,user_id,role_id,role_name): # assign role to user
try:
api_url = keycloak_server + "admin/realms/" + keycloak_realm + "/users/" + user_id + "/role-mappings/realm"
headers = {'Authorization': 'Bearer ' + access_token}
roles = [{"id": role_id, "name": role_name}]
r = requests.post(api_url,json=roles,headers=headers)
logger.info("Assign role response: \n status_code => {0} \n response_message => {1}".format(r.status_code,r.text))
if r.status_code == 204:
logger.info("Role assignment successful.")
else:
logger.error("Role assignment unsuccessful")
except Exception as e:
logger.error(e)
raise e
def record_stat(start_time):
if save_stats == True:
end_time = time.time()
elapsed = end_time - start_time
row = "\n" + str(start_time) + "," + str(end_time) + "," + str(elapsed)
f = open("results.csv", "a")
f.write(row)
f.close()
@app.route('/access', methods=['POST'])
def access():
""" API to get access to a particular device a application
:params user: dictionary with user credentials atributes
:type input: dictionary
:params device: dictionary with device information.
:type input: dictionary
"""
start_time = time.time()
response = dict(status_code="", message="", data=[])
try:
logger.debug("User access request received.")
# Fetch input data
input_json_body = request.json
user_id = input_json_body ['user_id']
user_pwd = input_json_body ['pwd']
device_id = input_json_body ['device_id']
access_scope = input_json_body ['scope']
# Login user to fetch access token
kc_token_URL = keycloak_server + "realms/" + keycloak_realm + "/protocol/openid-connect/token"
grant_type = "password"
payload = {"grant_type":grant_type, "client_id":client_id, "client_secret": client_secret, "username":user_id, "password":user_pwd}
r_authenticate = requests.post(kc_token_URL, data=payload) # The payload format, as per Keycloak API, is x-www-form-urlencoded
if r_authenticate.status_code == 200:
# Obtain RPT
access_token = r_authenticate.json()['access_token']
logger.info("Access token => {0}".format(access_token))
permission = device_id + "#" + access_scope
rpt_payload = {"grant_type":"urn:ietf:params:oauth:grant-type:uma-ticket", "audience":client_id, "permission":permission}
headers = {"Authorization": "Bearer " + access_token}
r_authorise = requests.post(kc_token_URL, headers=headers, data=rpt_payload) # The payload format, as per Keycloak API, is x-www-form-urlencoded
if r_authorise.status_code == 200:
logger.info("Authorise. The rpt toke is => {0}".format(r_authorise.json()['access_token']))
response["message"] = "Access granted"
response["status_code"] = 200
elif r_authorise.status_code == 403:
logger.info("Not Authorise")
response["message"] = "Not Authorise"
response["status_code"] = 403
else:
logger.info("Bad request => {0}".format(r_authorise.text))
response["message"] = "Bad request"
response["status_code"] = r_authorise.status_code
response["data"].append('Keycloak returned error: {}'.format(r_authorise.text))
else:
logger.info("Bad request => {0}".format(r_authenticate.text))
response["message"] = "Bad request"
response["status_code"] = r_authenticate.status_code
response["data"].append('Keycloak returned error: {}'.format(r_authenticate.text))
record_stat(start_time)
return jsonify(response)
except Exception as error:
logger.error(error)
response["message"]= "Invalid request: {}".format(error)
response["status_code"]= 422
record_stat(start_time)
return jsonify(response)
@app.route('/test', methods=['POST'])
def test():
""" Post test API
"""
response = dict(status_code="", message="", data=[])
try:
#logger.debug("Test post request received.")
# Fetch input data
input_json_body = request.json
user_id = input_json_body ['user_id']
user_pwd = input_json_body ['pwd']
device_id = input_json_body ['device_id']
access_scope = input_json_body ['scope']
logger.info("Test post request received \
\n\tUserID => {0} \n\tPwd => {1} \n\tDeviceID => {2} \n\tScope => {3}" \
.format(user_id,user_pwd,device_id,access_scope))
response["message"] = "Success"
response["status_code"] = 200
return jsonify(response)
except Exception as error:
logger.error(error)
response["message"]= "Invalid request: {}".format(error)
response["status_code"]= 422
record_stat(start_time)
return jsonify(response)
@app.route('/status', methods=['GET'])
def status():
""" API function to get the service status"""
response = dict(status_code=200, message="The pep service is running.", data=[])
return jsonify(response)
if __name__ == "__main__":
__init__()
logger.info("kc-pep started with: \nOperational ssl_mode is => {0} \nSave stats => {1}".format(ssl_mode,save_stats))
if save_stats == True:
f = open("results.csv", "w")
f.write("start_time,end_time,proc_time")
f.close()
if int(ssl_mode) == 1 or int(ssl_mode) == 2:
context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
if os.path.exists('certs/server.crt') == False or os.path.exists('certs/server.key') == False:
logger.error("Certificates files are missing")
sys.exit()
if int(ssl_mode) == 2:
if os.path.exists('certs/ca.crt') == False:
logger.error("CA certificate file is missing")
sys.exit()
else:
context.verify_mode = ssl.CERT_REQUIRED
context.load_verify_locations("certs/ca.crt")
context.load_cert_chain("certs/server.crt", "certs/server.key")
app.run(host="0.0.0.0",debug=True, port=5000, ssl_context=context)
else:
app.run(host="0.0.0.0",debug=True, port=5000)