Skip to content
58 changes: 58 additions & 0 deletions django_auth_adfs/backend.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from datetime import datetime, timedelta

import jwt
from django.contrib.auth import get_user_model
Expand Down Expand Up @@ -424,6 +425,63 @@ def authenticate(self, request=None, authorization_code=None, **kwargs):
return user


class AdfsAuthCodeRefreshBackend(AdfsBaseBackend):
"""
Authentication backend that supports storing and refreshing ADFS tokens in the session.
Use this backend in conjunction with AdfsRefreshMiddleware.
"""
def authenticate(self, request=None, authorization_code=None, **kwargs):
# If there's no token or code, we pass control to the next authentication backend
if authorization_code is None or authorization_code == '':
logger.debug("Authentication backend was called but no authorization code was received")
return

# If there's no request object, we pass control to the next authentication backend
if request is None:
logger.debug("Authentication backend was called without request")
return

# If loaded data is too old, reload it again
provider_config.load_config()

adfs_response = self.exchange_auth_code(authorization_code, request)
access_token = adfs_response["access_token"]
user = self.process_access_token(access_token, adfs_response)
self._store_adfs_tokens_in_session(request, adfs_response)
return user

def ensure_valid_access_token(self, request):
now = datetime.now() + settings.REFRESH_THRESHOLD
expiry = datetime.fromisoformat(request.session["_adfs_token_expiry"])
if now > expiry:
adfs_refresh_response = self._refresh_access_token(
request.session["_adfs_refresh_token"]
)
self._store_adfs_tokens_in_session(request, adfs_refresh_response)

def _refresh_access_token(self, refresh_token):
provider_config.load_config()
response = provider_config.session.post(
provider_config.token_endpoint,
data=f'client_id={settings.CLIENT_ID}&client_secret={settings.CLIENT_SECRET}&grant_type=refresh_token' +
f'&refresh_token={refresh_token}'
)
response.raise_for_status()
adfs_response = response.json()
return adfs_response

def _store_adfs_tokens_in_session(self, request, adfs_response):
assert "refresh_token" in adfs_response, (
"AdfsAuthCodeRefreshBackend requires a refresh token to function correctly. "
"Make sure your ADFS server is configured to return a refresh token."
)
request.session["_adfs_access_token"] = adfs_response["access_token"]
expiry = datetime.now() + timedelta(seconds=int(adfs_response["expires_in"]))
request.session["_adfs_token_expiry"] = expiry.isoformat()
request.session["_adfs_refresh_token"] = adfs_response["refresh_token"]
request.session.save()


class AdfsAccessTokenBackend(AdfsBaseBackend):
"""
Authentication backend to allow authenticating users against a
Expand Down
1 change: 1 addition & 0 deletions django_auth_adfs/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def __init__(self):
self.USERNAME_CLAIM = "winaccountname"
self.GUEST_USERNAME_CLAIM = None
self.JWT_LEEWAY = 0
self.REFRESH_THRESHOLD = timedelta(minutes=5)
self.CUSTOM_FAILED_RESPONSE_VIEW = lambda request, error_message, status: render(
request, 'django_auth_adfs/login_failed.html', {'error_message': error_message}, status=status
)
Expand Down
47 changes: 47 additions & 0 deletions django_auth_adfs/middleware.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
"""
Based on https://djangosnippets.org/snippets/1179/
"""
import logging
from re import compile
from requests import HTTPError

from django.conf import settings as django_settings
from django.contrib import auth
from django.contrib.auth.views import redirect_to_login
from django.contrib.auth import logout
from django.core.exceptions import (PermissionDenied)

from django.urls import reverse

from django_auth_adfs.backend import AdfsAuthCodeRefreshBackend
from django_auth_adfs.exceptions import MFARequired
from django_auth_adfs.config import settings

Expand All @@ -19,6 +26,8 @@
if hasattr(settings, 'LOGIN_EXEMPT_URLS'):
LOGIN_EXEMPT_URLS += [compile(expr) for expr in settings.LOGIN_EXEMPT_URLS]

logger = logging.getLogger("django_auth_adfs")


class LoginRequiredMiddleware:
"""
Expand Down Expand Up @@ -49,3 +58,41 @@ def __call__(self, request):
return redirect_to_login('django_auth_adfs:login-force-mfa')

return self.get_response(request)


class AdfsRefreshMiddleware:
"""
Middleware that refreshes the access token for the user if it is close to
expiring. This is done by checking the session for the '_adfs_token_expiry'
key and comparing it with the current time plus a threshold defined in
settings.REFRESH_THRESHOLD.
"""

def __init__(self, get_response):
self.get_response = get_response

def __call__(self, request):
if hasattr(django_settings, "SESSION_ENGINE"):
assert (
django_settings.SESSION_ENGINE
!= "django.contrib.sessions.backends.signed_cookies"
), (
"You are trying to use ADFS Refresh middleware with signed cookie-based sessions. "
"For security reasons, we do not recommend this configuration. "
"Please change SESSION_ENGINE to a different backend, such as 'django.contrib.sessions.backends.db' "
)

try:
backend_str = request.session[auth.BACKEND_SESSION_KEY]
except KeyError:
pass
else:
backend = auth.load_backend(backend_str)
if isinstance(backend, AdfsAuthCodeRefreshBackend):
try:
backend.ensure_valid_access_token(request)
except (PermissionDenied, HTTPError) as error:
logger.debug("Error refreshing access token: %s", error)
logout(request)

return self.get_response(request)
Loading
Loading