Skip to content

Commit

Permalink
API Key Creation & Management
Browse files Browse the repository at this point in the history
Added functionality for superusers and users to create and manage API keys, with Knox integration for secure key hashing.
  • Loading branch information
NEZRI Ygal authored and NEZRI Ygal committed Jul 22, 2024
1 parent 8be3b00 commit b230c6c
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 115 deletions.
222 changes: 133 additions & 89 deletions Watcher/Watcher/accounts/admin.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
from django.contrib import admin

# Import for Log Entries Snippet
from django.contrib.admin.models import LogEntry, ADDITION, CHANGE, DELETION
from django.utils.html import escape
from django.urls import reverse, NoReverseMatch
from django.contrib.auth.models import User
from django.utils.safestring import mark_safe
from django.contrib.auth.admin import UserAdmin as BaseUserAdmin
from .models import APIKey
from .api import generate_api_key
from django.contrib import messages
from django import forms
from django.utils import timezone
from datetime import timedelta
from knox.models import AuthToken
from django.db.models.signals import post_delete
from django.dispatch import receiver

"""
Log Entries Snippet
Expand Down Expand Up @@ -82,6 +82,7 @@ class LogEntryAdmin(admin.ModelAdmin):
UserFilter,
ActionFilter,
'content_type',
# 'user',
]

search_fields = [
Expand Down Expand Up @@ -131,82 +132,130 @@ def action_description(self, obj):

action_description.short_description = 'Action'


admin.site.register(LogEntry, LogEntryAdmin)


class UserAdmin(BaseUserAdmin):
actions = ['generate_api_key']

def generate_api_key(self, request, queryset):
for user in queryset:
raw_key, hashed_key = generate_api_key(user)
if raw_key:
self.message_user(request, f"API Key generated for {user.username}: {raw_key[:10]}...")
else:
self.message_user(request, f"Failed to generate API Key for {user.username}", level='ERROR')

generate_api_key.short_description = "Generate API Key"

admin.site.unregister(User)
admin.site.register(User, UserAdmin)


class ReadOnlyTextInput(forms.TextInput):
def render(self, name, value, attrs=None, renderer=None):
if value:
truncated_value = value[:5] + '*' * 59
return f'{truncated_value}'
return super().render(name, value, attrs, renderer)


class APIKeyForm(forms.ModelForm):
EXPIRATION_CHOICES = (
(1, '1 day'), (7, '7 days'), (30, '30 days'), (60, '60 days'), (90, '90 days'), (365, '1 year'), (730, '2 years'),
(1, '1 day'),
(7, '7 days'),
(30, '30 days'),
(60, '60 days'),
(90, '90 days'),
(365, '1 year'),
(730, '2 years'),
)
expiration = forms.ChoiceField(choices=EXPIRATION_CHOICES, label='Expiration', required=True)
user = forms.ModelChoiceField(queryset=User.objects.all(), label='User', required=True)


class Meta:
fields = ['user', 'expiration']
model = APIKey
fields = ['user', 'key', 'expiration', 'expiry_at']

def __init__(self, *args, **kwargs):
self.request = kwargs.pop('request', None)
super().__init__(*args, **kwargs)

if not self.instance or not self.instance.pk:
self.fields['expiration'].initial = 30

else:
instance = kwargs.get('instance')
if instance and instance.pk:
if 'key' in self.fields:
self.fields['key'].widget.attrs['readonly'] = True
self.fields['key'].widget = ReadOnlyTextInput()
if 'user' in self.fields:
self.fields['user'].widget = forms.HiddenInput()
self.fields['user'].widget.attrs['readonly'] = True
if 'expiry_at' in self.fields:
self.fields['expiry_at'].widget.attrs['readonly'] = True
if 'expiration' in self.fields:
self.fields['expiration'].widget = forms.HiddenInput()

if self.request and not self.request.user.is_superuser:
self.fields['user'].queryset = User.objects.filter(id=self.request.user.id)
self.fields['user'].initial = self.request.user
if not self.request.user.is_superuser:
self.fields['expiration'].widget.attrs['disabled'] = True
else:
self.fields['expiration'].widget.attrs['readonly'] = True
else:
self.fields['user'].queryset = User.objects.all()
if 'key' in self.fields:
self.fields['key'].widget = forms.HiddenInput()
if 'expiry_at' in self.fields:
self.fields['expiry_at'].widget = forms.HiddenInput()

if self.request and not self.request.user.is_superuser:
self.fields['user'].queryset = User.objects.filter(id=self.request.user.id)
self.fields['user'].initial = self.request.user
else:
self.fields['user'].queryset = User.objects.all()

def clean_key(self):
instance = getattr(self, 'instance', None)
if instance and instance.pk:
return instance.key
return self.cleaned_data.get('key', '')

def clean_expiration(self):
expiration = self.cleaned_data.get('expiration')
if expiration:
try:
expiration = int(expiration)
if expiration not in [choice[0] for choice in self.EXPIRATION_CHOICES]:
raise forms.ValidationError('Invalid expiration value.')
except ValueError:
raise forms.ValidationError('Invalid expiration value.')
return expiration

def save(self, commit=True):
instance = super().save(commit=False)
expiration_days = int(self.cleaned_data['expiration'])
instance.get_expiry = timezone.now() + timezone.timedelta(days=expiration_days)
expiration = self.cleaned_data.get('expiration')

if expiration:
instance.expiry_at = timezone.now() + timedelta(days=int(expiration))

if commit:
instance.save()

return instance


class APIKeyAdmin(admin.ModelAdmin):
list_display = ('get_user', 'get_digest', 'get_created', 'get_expiry')
list_display = ('user', 'shortened_key', 'created_at', 'expiry_at_display')
form = APIKeyForm
readonly_fields = ('key_details',)

def get_user(self, obj):
return obj.auth_token.user if obj.auth_token else None

def get_digest(self, obj):
return obj.auth_token.digest if obj.auth_token else None

def get_created(self, obj):
return obj.auth_token.created.strftime("%b %d, %Y, %-I:%M %p").replace('AM', 'a.m.').replace('PM', 'p.m.') if obj.auth_token else None

def get_expiry(self, obj):
return obj.auth_token.expiry.strftime("%b %d, %Y, %-I:%M %p").replace('AM', 'a.m.').replace('PM', 'p.m.') if obj.auth_token else None

get_user.short_description = 'User'
get_digest.short_description = 'Digest'
get_created.short_description = 'Created'
get_expiry.short_description = 'Expiry'
def get_queryset(self, request):
if request.user.is_superuser:
return APIKey.objects.all()
else:
return APIKey.objects.filter(user=request.user)

def has_add_permission(self, request):
return True

def get_queryset(self, request):
qs = super().get_queryset(request)
if not request.user.is_superuser:
qs = qs.filter(auth_token__user=request.user)
return qs

def get_form(self, request, obj=None, **kwargs):
kwargs['form'] = self.form
form = super().get_form(request, obj, **kwargs)

if 'key' in form.base_fields:
form.base_fields['key'].widget = ReadOnlyTextInput()

class CustomAPIKeyForm(form):
def __init__(self, *args, **kwargs):
kwargs['request'] = request
Expand All @@ -215,12 +264,20 @@ def __init__(self, *args, **kwargs):
return CustomAPIKeyForm

def save_model(self, request, obj, form, change):
if not obj.pk:
user = form.cleaned_data['user']
expiration = form.cleaned_data['expiration']
raw_key, auth_token = generate_api_key(user, int(expiration))
obj.auth_token = auth_token
obj.save()
if not obj.key:
user = request.user
expiration_days = int(form.cleaned_data.get('expiration', 30))
raw_key, hashed_key = generate_api_key(user, expiration_days)
obj.key = hashed_key
obj.expiry_at = timezone.now() + timedelta(days=expiration_days)
hash_parts = hashed_key.split('$')
obj.key_details = (
f"algorithm: pbkdf2_sha256 \n "
f"iterations: {hash_parts[1]}\n "
f"salt: {hash_parts[2][:8]}{'*' * (len(hash_parts[2]) - 8)}\n "
f"hash: {hash_parts[3][:8]}{'*' * (len(hash_parts[3]) - 8)}\n\n"
f"Raw API keys are not stored, so there is no way to see this user’s API key."
)
copy_button = f'''
<button id="copyButton" onclick="copyToClipboard('{raw_key}')" style="border: none; background: none; cursor: pointer;">
<img src="https://img.icons8.com/material-outlined/24/000000/clipboard.png" alt="Copy" style="vertical-align: middle;"/>
Expand Down Expand Up @@ -256,55 +313,42 @@ def save_model(self, request, obj, form, change):
}}
</style>
'''
messages.success(request, mark_safe(f"The API Key was added successfully: {raw_key}. {copy_button} Make sure to copy this personal token now. You won't be able to see it again!"), extra_tags='safe', fail_silently=True)
messages.success(request, mark_safe(f"The API Key for {user.username} was added successfully: {raw_key}. {copy_button} Make sure to copy this personal token now. You won't be able to see it again!"), extra_tags='safe', fail_silently=True)
super().save_model(request, obj, form, change)

def shortened_key(self, obj):
if obj.key:
hash_parts = obj.key.split('$')
if len(hash_parts) >= 4:
hash_start = hash_parts[3][:5] + '*' * (len(hash_parts[3]) - 5)
return hash_start
else:
return 'Invalid Key'
else:
super().save_model(request, obj, form, change)
return ''

shortened_key.short_description = 'Api-Key'

def expiry_at_display(self, obj):
return obj.expiry_at.strftime("%b %d, %Y, %-I:%M %p").replace('AM', 'a.m.').replace('PM', 'p.m.') if obj.expiry_at else '-'

def get_readonly_fields(self, request, obj=None):
readonly_fields = []
if obj:
readonly_fields.extend(['get_user', 'get_digest', 'get_created', 'get_expiry', 'key_details'])
readonly_fields.extend(['user', 'expiry_at', 'key_details'])
return readonly_fields

def get_exclude(self, request, obj=None):
if not obj:
return ['key', 'get_expiry']
return ['key', 'expiry_at']
else:
return ['key']

def has_view_permission(self, request, obj=None):
if obj and not request.user.is_superuser:
return obj.auth_token.user == request.user
return super().has_view_permission(request, obj)

def key_details(self, obj):
if obj.auth_token:
return mark_safe(
f"Algorithm: SHA3_512<br>"
f"Raw API keys are not stored, so there is no way to see this user’s API key."
)
return None

key_details.short_description = 'Key Details'

admin.site.register(APIKey, APIKeyAdmin)

if request.user.is_superuser:
return True
if obj is None:
return True
return obj.user == request.user

@receiver(post_delete, sender=APIKey)
def delete_authtoken_when_apikey_deleted(sender, instance, **kwargs):
try:
if instance.auth_token:
instance.auth_token.delete()
except AuthToken.DoesNotExist:
pass


class AuthTokenAdmin(admin.ModelAdmin):
list_display = ('user', 'digest', 'created', 'expiry')
readonly_fields = ('user', 'digest', 'created', 'expiry')

def has_add_permission(self, request):
return False

admin.site.unregister(AuthToken)
admin.site.register(AuthToken, AuthTokenAdmin)
admin.site.register(APIKey, APIKeyAdmin)
24 changes: 6 additions & 18 deletions Watcher/Watcher/accounts/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@
from knox.models import AuthToken
from .serializers import UserSerializer, LoginSerializer, UserPasswordChangeSerializer
from django.utils import timezone
from django.contrib.auth.models import User
from hashlib import sha256
from django.contrib.auth.hashers import make_password, check_password


# Login API
Expand All @@ -16,10 +13,9 @@ def post(self, request, *args, **kwargs):
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
user = serializer.validated_data
raw_key, _ = generate_api_key(user)
return Response({
"user": UserSerializer(user, context=self.get_serializer_context()).data,
"token": raw_key
"token": AuthToken.objects.create(user)[1]
})


Expand All @@ -42,17 +38,9 @@ class PasswordChangeViewSet(viewsets.ModelViewSet):
serializer_class = UserPasswordChangeSerializer


# Generate Api Key
def generate_api_key(user, expiration_days=30):
expiry = timezone.timedelta(days=expiration_days)
token_instance, raw_key = AuthToken.objects.create(user, expiry=expiry)
# Generate API Key
def generate_api_key(user, expiration):
expiry = timezone.timedelta(days=expiration)
token_instance, raw_key = AuthToken.objects.create(user=user, expiry=expiry)

# Generate hash using pbkdf2_sha256
hashed_key = make_password(raw_key, salt=None, hasher='pbkdf2_sha256')

if raw_key:
print(f"API Key generated for user {user.username}: {raw_key}")
return raw_key, hashed_key
else:
print(f"Failed to generate API Key for user {user.username}")
return None, None
return raw_key, token_instance
15 changes: 7 additions & 8 deletions Watcher/Watcher/accounts/models.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
from django.db import models
from django_auth_ldap.backend import populate_user
from django.contrib.auth.models import User
from knox.models import AuthToken


class APIKey(models.Model):
user = models.ForeignKey(User, on_delete=models.CASCADE)
key = models.CharField(max_length=100, unique=True)
created_at = models.DateTimeField(auto_now_add=True)
expiration = models.IntegerField(default=30)
expiry_at = models.DateTimeField(null=True, blank=True)
key_details = models.TextField(null=True, blank=True) # Ajout de ce champ
"""
Manages creation, modification, and deletion of user API keys.
"""
auth_token = models.OneToOneField(AuthToken, on_delete=models.CASCADE, null=True, blank=True)

def __str__(self):
return f"API Key for {self.user.username}"
return f"API Key for {self.auth_token.user.username}"

class Meta:
verbose_name = "API Key"
Expand All @@ -23,5 +23,4 @@ def make_inactive(sender, user, **kwargs):
if not User.objects.filter(username=user.username):
user.is_active = False


populate_user.connect(make_inactive)

0 comments on commit b230c6c

Please sign in to comment.