Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Generated by Django 4.2.14 on 2025-09-19 16:14

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('api_methods', '0015_rename_postprocesseddata_deprecatedpostprocesseddata_and_more'),
]

operations = [
migrations.AlterField(
model_name='usersharepermission',
name='permission_level',
field=models.CharField(choices=[('viewer', 'Viewer'), ('contributor', 'Contributor'), ('co-owner', 'Co-Owner')], default='viewer', max_length=20),
),
]
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0015_rename_postprocesseddata_deprecatedpostprocesseddata_and_more
0016_alter_usersharepermission_permission_level
233 changes: 203 additions & 30 deletions gateway/sds_gateway/api_methods/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from django.db import models
from django.db.models import ProtectedError
from django.db.models import QuerySet
from django.db.models.signals import post_delete
from django.db.models.signals import post_save
from django.db.models.signals import pre_delete
from django.dispatch import receiver
Expand Down Expand Up @@ -88,6 +89,15 @@ class DatasetStatus(StrEnum):
FINAL = "final"


class PermissionLevel(StrEnum):
"""The access level of a user."""

OWNER = "owner"
CO_OWNER = "co-owner"
CONTRIBUTOR = "contributor"
VIEWER = "viewer"


def default_expiration_date() -> datetime.datetime:
"""Returns the default expiration date for a file."""
# 2 years from now
Expand Down Expand Up @@ -655,6 +665,28 @@ def from_db(cls, db, field_names, values):
setattr(instance, field, json.loads(getattr(instance, field)))
return instance

def update_authors_field(self):
"""Update the authors field based on current permissions."""
authors_data = UserSharePermission.get_dataset_authors(self.uuid)
author_names = [author["name"] for author in authors_data]

# Update the authors field
self.authors = author_names
self.save(update_fields=["authors"])

def get_authors_display(self):
"""Get the authors as a list for display purposes."""
if not self.authors:
return []

if isinstance(self.authors, str):
try:
return json.loads(self.authors)
except (json.JSONDecodeError, TypeError):
return [self.authors]

return self.authors


class TemporaryZipFile(BaseModel):
"""
Expand Down Expand Up @@ -754,10 +786,11 @@ class UserSharePermission(BaseModel):
(ItemType.CAPTURE, "Capture"),
]

class PermissionType(models.TextChoices):
"""Enumeration of permission types."""

VIEWER = "viewer", "Viewer"
PERMISSION_CHOICES = [
(PermissionLevel.VIEWER, "Viewer"),
(PermissionLevel.CONTRIBUTOR, "Contributor"),
(PermissionLevel.CO_OWNER, "Co-Owner"),
]

# The user who owns the item being shared
owner = models.ForeignKey(
Expand All @@ -768,8 +801,8 @@ class PermissionType(models.TextChoices):

permission_level = models.CharField(
max_length=20,
choices=PermissionType.choices,
default=PermissionType.VIEWER,
choices=PERMISSION_CHOICES,
default=PermissionLevel.VIEWER,
)

# The user who is being granted access
Expand Down Expand Up @@ -864,6 +897,117 @@ def get_shared_users_for_item(cls, item_uuid, item_type):
is_enabled=True,
).select_related("shared_with")

@classmethod
def get_user_permission_level(cls, user, item_uuid, item_type):
"""
Get the permission level for a user on a specific item.

Args:
user: The user to check permissions for
item_uuid: UUID of the item
item_type: Type of item (e.g., "dataset", "capture")

Returns:
str: Permission level ("owner", "co-owner", "contributor", "viewer",
or None if no access)
"""
# Check if user is the owner
item_models = {
ItemType.DATASET: Dataset,
ItemType.CAPTURE: Capture,
}

if item_type in item_models:
model_class = item_models[item_type]
if model_class.objects.filter(
uuid=item_uuid, owner=user, is_deleted=False
).exists():
return PermissionLevel.OWNER

# Check shared permissions
permission = cls.objects.filter(
item_uuid=item_uuid,
item_type=item_type,
shared_with=user,
is_deleted=False,
is_enabled=True,
).first()

if permission:
return permission.permission_level

return None

@classmethod
def user_can_view(cls, user, item_uuid, item_type):
"""Check if user can view the item."""
return cls.get_user_permission_level(user, item_uuid, item_type) is not None

@classmethod
def user_can_add_assets(cls, user, item_uuid, item_type):
"""Check if user can add assets to the item."""
permission_level = cls.get_user_permission_level(user, item_uuid, item_type)
return permission_level in [
PermissionLevel.OWNER,
PermissionLevel.CO_OWNER,
PermissionLevel.CONTRIBUTOR,
]

@classmethod
def user_can_remove_assets(cls, user, item_uuid, item_type):
"""Check if user can remove assets from the item."""
permission_level = cls.get_user_permission_level(user, item_uuid, item_type)
return permission_level in [
PermissionLevel.OWNER,
PermissionLevel.CO_OWNER,
]

@classmethod
def user_can_edit_dataset(cls, user, item_uuid, item_type):
"""Check if user can edit dataset metadata (name, description)."""
if item_type != ItemType.DATASET:
return False
permission_level = cls.get_user_permission_level(user, item_uuid, item_type)
return permission_level in [
PermissionLevel.OWNER,
PermissionLevel.CO_OWNER,
]

@classmethod
def user_can_remove_others_assets(cls, user, item_uuid, item_type):
"""Check if user can remove assets owned by other users."""
permission_level = cls.get_user_permission_level(user, item_uuid, item_type)
return permission_level in [
PermissionLevel.OWNER,
PermissionLevel.CO_OWNER,
]

@classmethod
def get_dataset_authors(cls, dataset_uuid):
"""
Get all authors for a dataset including owner and contributors.

Returns:
list: List of dictionaries with author information
"""
dataset = Dataset.objects.filter(uuid=dataset_uuid, is_deleted=False).first()
if not dataset:
return []

authors = []

# Add the owner
if dataset.owner:
authors.append(
{
"name": dataset.owner.name or dataset.owner.email,
"email": dataset.owner.email,
"role": PermissionLevel.OWNER,
}
)

return authors


class DEPRECATEDPostProcessedData(BaseModel):
"""
Expand Down Expand Up @@ -1184,30 +1328,7 @@ def user_has_access_to_item(user, item_uuid, item_type):
Returns:
bool: True if user has access, False otherwise
"""
# Map item types to their corresponding models
item_models = {
ItemType.DATASET: Dataset,
ItemType.CAPTURE: Capture,
# Easy to add new item types here
# ItemType.FILE: File,
}

# Check if user is the owner
if item_type in item_models:
model_class = item_models[item_type]
if model_class.objects.filter(
uuid=item_uuid, owner=user, is_deleted=False
).exists():
return True

# Check if user has been shared the item
return UserSharePermission.objects.filter(
item_uuid=item_uuid,
item_type=item_type,
shared_with=user,
is_deleted=False,
is_enabled=True,
).exists()
return UserSharePermission.user_can_view(user, item_uuid, item_type)


def get_shared_users_for_item(item_uuid, item_type):
Expand Down Expand Up @@ -1274,3 +1395,55 @@ def handle_dataset_soft_delete(sender, instance: Dataset, **kwargs) -> None:

for permission in share_permissions:
permission.soft_delete()


@receiver(post_save, sender=ShareGroup)
def handle_sharegroup_soft_delete(sender, instance: ShareGroup, **kwargs) -> None:
"""
Handle soft deletion of share groups by updating related share permissions.
"""
if instance.is_deleted:
# Find all UserSharePermission records that include this group
share_permissions = UserSharePermission.objects.filter(
share_groups=instance,
is_deleted=False,
)

for permission in share_permissions:
# Remove the group from the permission
permission.share_groups.remove(instance)
# Update the enabled status based on remaining groups
permission.update_enabled_status()
permission.save()


@receiver(post_save, sender=UserSharePermission)
def handle_usersharepermission_change(
sender, instance: UserSharePermission, **kwargs
) -> None:
"""
Handle changes to UserSharePermission by updating dataset authors field.
"""
if instance.item_type == ItemType.DATASET and instance.is_enabled:
# Update the authors field for the dataset
dataset = Dataset.objects.filter(
uuid=instance.item_uuid, is_deleted=False
).first()
if dataset:
dataset.update_authors_field()


@receiver(post_delete, sender=UserSharePermission)
def handle_usersharepermission_delete(
sender, instance: UserSharePermission, **kwargs
) -> None:
"""
Handle deletion of UserSharePermission by updating dataset authors field.
"""
if instance.item_type == ItemType.DATASET:
# Update the authors field for the dataset
dataset = Dataset.objects.filter(
uuid=instance.item_uuid, is_deleted=False
).first()
if dataset:
dataset.update_authors_field()
Loading