-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathauth.py
205 lines (173 loc) · 6.86 KB
/
auth.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
import json
from flask import request, _request_ctx_stack
from functools import wraps
from jose import jwt
from urllib.request import urlopen
from config import auth0_config
#----------------------------------------------------------------------------#
# Auth0 Config
#----------------------------------------------------------------------------#
AUTH0_DOMAIN = auth0_config['AUTH0_DOMAIN']
ALGORITHMS = auth0_config['ALGORITHMS']
API_AUDIENCE = auth0_config['API_AUDIENCE']
#----------------------------------------------------------------------------#
# AuthError Exception
#----------------------------------------------------------------------------#
class AuthError(Exception):
'''A standardized way to communicate auth failure modes'''
def __init__(self, error, status_code):
self.error = error
self.status_code = status_code
#----------------------------------------------------------------------------#
# Auth Wrapper Methods
#----------------------------------------------------------------------------#
def get_token_auth_header():
"""Obtains the Access Token from the Authorization Header
*Input: None
*Output:
<string> token (part of the header)
Conditions for Output:
- Authorization header is available
- header must not be malformed (i.e. Bearer XXXXX)
"""
auth = request.headers.get('Authorization', None)
# Raise error if no "Authorization" is part of header
if not auth:
raise AuthError({
'code': 'authorization_header_missing',
'description': 'Authorization header is expected.'
}, 401)
parts = auth.split()
# Raise error if no "Bearer" is part "Authorization"
if parts[0].lower() != 'bearer':
raise AuthError({
'code': 'invalid_header',
'description': 'Authorization header must start with "Bearer".'
}, 401)
# Raise error if "Authorization" only contains 1 part (it should have 2)
elif len(parts) == 1:
raise AuthError({
'code': 'invalid_header',
'description': 'Token not found.'
}, 401)
# Raise error if "Authorization" contains more than 2 parts (it should only have 2)
elif len(parts) > 2:
raise AuthError({
'code': 'invalid_header',
'description': 'Authorization header must be bearer token.'
}, 401)
# When everyhting is fine, get the token which is the second part of the Authorization Header & return it
return parts[1]
def check_permissions(permission, payload):
''' Check if permission is part of payload
*Input
<string> permission (i.e. 'post:example')
<string> payload (decoded jwt payload)
*Output:
<bool> True if all conditions have been met
Conditions for Output:
- permissions are included in the payload
- requested permission string is in the payload permissions array
'''
if 'permissions' not in payload:
raise AuthError({
'code': 'invalid_claims',
'description': 'Permissions not included in JWT.'
}, 400)
if permission not in payload['permissions']:
raise AuthError({
'code': 'unauthorized',
'description': 'Permission not found.'
}, 403)
return True
def verify_decode_jwt(token):
''' Decodes JWT Token or raises appropiate Error Messages
*Input
<string> token (a json web token)
*Output
<string> decoded payload
Conditions for output to be returned:
- Auth0 token with key id (key id = kid)
- verify the token using Auth0 /.well-known/jwks.json
- decode the payload from the token with Auth Config on top of auth.py
- claims need to fit
'''
# Verify token
jsonurl = urlopen(f'https://{AUTH0_DOMAIN}/.well-known/jwks.json')
jwks = json.loads(jsonurl.read())
unverified_header = jwt.get_unverified_header(token)
# Check if Key id is in unverified header
if 'kid' not in unverified_header:
raise AuthError({
'code': 'invalid_header',
'description': 'Authorization malformed.'
}, 401)
rsa_key = {} # initialize empty private rsa key as dict
for key in jwks['keys']:
if key['kid'] == unverified_header['kid']:
rsa_key = {
'kty': key['kty'],
'kid': key['kid'],
'use': key['use'],
'n': key['n'],
'e': key['e']
}
if rsa_key:
try:
# Use Auth Config (top of file) to decode JWT and return payload if succesful
payload = jwt.decode(
token,
rsa_key,
algorithms=ALGORITHMS,
audience=API_AUDIENCE,
issuer='https://' + AUTH0_DOMAIN + '/'
)
return payload
# Raise Error if token is not valide anymore.
except jwt.ExpiredSignatureError:
raise AuthError({
'code': 'token_expired',
'description': 'Token expired.'
}, 401)
# Raise Error if token is claiming wrong audience.
except jwt.JWTClaimsError:
raise AuthError({
'code': 'invalid_claims',
'description': 'Incorrect claims. Please, check the audience and issuer.'
}, 401)
# In all other Error cases, give generic error message
except Exception:
raise AuthError({
'code': 'invalid_header',
'description': 'Unable to parse authentication token.'
}, 400)
# If no payload has been returned yet, raise error.
raise AuthError({
'code': 'invalid_header',
'description': 'Unable to find the appropriate key.'
}, 400)
# TODO DONE implement @requires_auth(permission) decorator method
def requires_auth(permission=''):
''' Authentification Wrapper to decorate Endpoints with
*Input:
<string> permission (i.e. 'post:drink')
uses the get_token_auth_header method to get the token
uses the verify_decode_jwt method to decode the jwt
uses the check_permissions method validate claims and check the requested permission
return the decorator which passes the decoded payload to the decorated method
'''
def requires_auth_decorator(f):
@wraps(f)
def wrapper(*args, **kwargs):
token = get_token_auth_header()
try:
payload = verify_decode_jwt(token)
except:
raise AuthError({
'code': 'unauthorized',
'description': 'Permissions not found'
}, 401)
check_permissions(permission, payload)
return f(payload, *args, **kwargs)
return wrapper
return requires_auth_decorator