-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathfalcon_jwt.py
122 lines (101 loc) · 5.04 KB
/
falcon_jwt.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
from datetime import datetime, timedelta
import traceback
import json
import logging
import falcon
import jwt
from passlib.hash import sha256_crypt
DEFAULT_TOKEN_OPTS = {"name": "auth_token", "location":"cookie"}
class LoginResource(object):
def __init__(self, get_user, secret, token_expiration_seconds, **token_opts):
self.get_user = get_user
self.secret = secret
self.token_expiration_seconds = token_expiration_seconds
self.token_opts = token_opts or DEFAULT_TOKEN_OPTS
logging.debug(token_opts)
def on_post(self, req, resp):
logging.debug("Reached on_post() in Login")
try:
req_stream = req.stream.read()
if isinstance(req_stream, bytes):
data = json.loads(req_stream.decode())
else:
data = json.loads(req.stream.read())
except Exception:
raise falcon.HTTPBadRequest(
"I don't understand", traceback.format_exc())
email = data["email"]
password = data["password"]
user = self.get_user(email)
if user and sha256_crypt.verify(password, user["password"]):
logging.debug("Valid user, jwt'ing!")
self.add_new_jwtoken(resp, email)
else:
raise falcon.HTTPUnauthorized('Who Do You Think You Are?',
'Bad email/password combination, please try again',
['Hello="World!"'])
# given a user identifier, this will add a new token to the response
# Typically you would call this from within your login function, after the
# back end has OK'd the username/password
def add_new_jwtoken(self, resp, user_identifier=None):
# add a JSON web token to the response headers
if not user_identifier:
raise Exception('Empty user_identifer passed to set JWT')
logging.debug(
"Creating new JWT, user_identifier is: {}".format(user_identifier))
token = jwt.encode({'user_identifier': user_identifier,
'exp': datetime.utcnow() + timedelta(seconds=self.token_expiration_seconds)},
self.secret,
algorithm='HS256').decode("utf-8")
logging.debug("Setting TOKEN!")
self.token_opts["value"] = token
logging.debug(self.token_opts)
if self.token_opts.get('location', 'cookie') == 'cookie': # default to cookie
resp.set_cookie(**self.token_opts)
elif self.token_opts['location'] == 'header':
resp.body = json.dumps({
self.token_opts['name'] : self.token_opts['value']
})
else:
raise falcon.HTTPInternalServerError('Unrecognized jwt token location specifier')
class AuthMiddleware(object):
def __init__(self, secret, **token_opts):
self.secret = secret
self.token_opts = token_opts or DEFAULT_TOKEN_OPTS
def process_resource(self, req, resp, resource, params): # pylint: disable=unused-argument
logging.debug("Processing request in AuthMiddleware: ")
if isinstance(resource, LoginResource):
logging.debug("LOGIN, DON'T NEED TOKEN")
return
challenges = ['Hello="World"'] # I think this is very irrelevant
if self.token_opts.get('location', 'cookie') == 'cookie':
token = req.cookies.get(self.token_opts.get("name"))
elif self.token_opts['location'] == 'header':
token = req.get_header(self.token_opts.get("name"), required=True)
else:
# Unrecognized token location
token = None
if token is None:
description = ('Please provide an auth token '
'as part of the request.')
raise falcon.HTTPUnauthorized('Auth token required',
description,
challenges,
href='http://docs.example.com/auth')
if not self._token_is_valid(token):
description = ('The provided auth token is not valid. '
'Please request a new token and try again.')
raise falcon.HTTPUnauthorized('Authentication required',
description,
challenges,
href='http://docs.example.com/auth')
def _token_is_valid(self, token):
try:
options = {'verify_exp': True}
jwt.decode(token, self.secret, verify='True', algorithms=['HS256'], options=options)
return True
except jwt.DecodeError as err:
logging.debug("Token validation failed Error :{}".format(str(err)))
return False
def get_auth_objects(get_user, secret, token_expiration_seconds, token_opts=DEFAULT_TOKEN_OPTS): # pylint: disable=dangerous-default-value
return LoginResource(get_user, secret, token_expiration_seconds, **token_opts), AuthMiddleware(secret, **token_opts)