-
Couldn't load subscription status.
- Fork 11
Extend permissions framework to support model-level permissions #1010
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
933fbd8
ab39902
45ad448
3326955
fc6c137
d8692b0
f8cad51
ebdd2e9
5a99c21
18332ef
4ce2ea4
b07d01f
f3f6fe7
cb40ad8
814ecba
331cc83
38f55e4
53a6434
bec698c
9f78153
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -180,7 +180,38 @@ def _get_object_perms(self, user): | |||||
| object_perms = [perm for perm in all_perms if perm.endswith(f"_{model_name}")] | ||||||
| return object_perms | ||||||
|
|
||||||
| def check_model_level_permission(self, user: AbstractUser | AnonymousUser, action: str) -> bool: | ||||||
| model = self._meta.model_name | ||||||
| app_label = "main" # Assume all model level permissions are in 'main' app | ||||||
|
|
||||||
| crud_map = { | ||||||
| "create": f"{app_label}.create_{model}", | ||||||
| "update": f"{app_label}.update_{model}", | ||||||
| "partial_update": f"{app_label}.update_{model}", | ||||||
| "destroy": f"{app_label}.delete_{model}", | ||||||
| "retrieve": f"{app_label}.view_{model}", | ||||||
| } | ||||||
|
|
||||||
| perm = crud_map.get(action, f"{app_label}.{action}_{model}") | ||||||
| if action == "retrieve": | ||||||
| return True # allow view permission for all users | ||||||
| return user.has_perm(perm) | ||||||
|
|
||||||
| def check_permission(self, user: AbstractUser | AnonymousUser, action: str) -> bool: | ||||||
| """ | ||||||
| Entry point for all permission checks. | ||||||
| Decides whether to perform model-level or object-level permission check. | ||||||
| """ | ||||||
| # Get related project accessor | ||||||
| accessor = self.get_project_accessor() | ||||||
| if accessor is None or accessor == "projects": | ||||||
| # If there is no project relation, use model-level permission | ||||||
| return self.check_model_level_permission(user, action) | ||||||
|
|
||||||
| # If the object is linked to a project then use object-level permission | ||||||
| return self.check_object_level_permission(user, action) | ||||||
|
|
||||||
| def check_object_level_permission(self, user: AbstractUser | AnonymousUser, action: str) -> bool: | ||||||
| """ | ||||||
| Check if the user has permission to perform the action | ||||||
| on this instance. | ||||||
|
|
@@ -189,14 +220,6 @@ def check_permission(self, user: AbstractUser | AnonymousUser, action: str) -> b | |||||
| """ | ||||||
| from ami.users.roles import BasicMember | ||||||
|
|
||||||
| project = self.get_project() if hasattr(self, "get_project") else None | ||||||
| if not project: | ||||||
| return False | ||||||
| if action == "retrieve": | ||||||
| if project.draft: | ||||||
| # Allow view permission for members and owners of draft projects | ||||||
| return BasicMember.has_role(user, project) or user == project.owner or user.is_superuser | ||||||
| return True | ||||||
| model = self._meta.model_name | ||||||
| crud_map = { | ||||||
| "create": f"create_{model}", | ||||||
|
|
@@ -205,14 +228,24 @@ def check_permission(self, user: AbstractUser | AnonymousUser, action: str) -> b | |||||
| "destroy": f"delete_{model}", | ||||||
| } | ||||||
|
|
||||||
| project = self.get_project() if hasattr(self, "get_project") else None | ||||||
| if not project: | ||||||
| # No specific project instance found; fallback to model-level | ||||||
| return self.check_model_level_permission(user, action) | ||||||
| if action == "retrieve": | ||||||
| if project.draft: | ||||||
| # Allow view permission for members and owners of draft projects | ||||||
| return BasicMember.has_role(user, project) or user == project.owner or user.is_superuser | ||||||
| return True | ||||||
|
|
||||||
| if action in crud_map: | ||||||
| return user.has_perm(crud_map[action], project) | ||||||
|
|
||||||
| # Delegate to model-specific logic | ||||||
| return self.check_custom_permission(user, action) | ||||||
| return self.check_custom_object_level_permission(user, action) | ||||||
|
|
||||||
| def check_custom_permission(self, user: AbstractUser | AnonymousUser, action: str) -> bool: | ||||||
| """Check custom permissions for the user on this instance. | ||||||
| def check_custom_object_level_permission(self, user: AbstractUser | AnonymousUser, action: str) -> bool: | ||||||
| """Check custom object level permissions for the user on this instance. | ||||||
| This is used for actions that are not standard CRUD operations. | ||||||
| """ | ||||||
| assert self._meta.model_name is not None, "Model must have a model_name defined in Meta class." | ||||||
|
|
@@ -222,41 +255,91 @@ def check_custom_permission(self, user: AbstractUser | AnonymousUser, action: st | |||||
|
|
||||||
| return user.has_perm(permission_codename, project) | ||||||
|
|
||||||
| def get_user_object_permissions(self, user) -> list[str]: | ||||||
| def get_permissions(self, user: AbstractUser | AnonymousUser) -> list[str]: | ||||||
| """ | ||||||
| Returns a list of object-level permissions the user has on this instance. | ||||||
| This is used by frontend to determine what actions the user can perform. | ||||||
| Entry point for retrieving user permissions on this instance. | ||||||
| Decides whether to return model-level or object-level permissions. | ||||||
| """ | ||||||
| accessor = self.get_project_accessor() | ||||||
|
|
||||||
| if accessor is None or accessor == "projects": | ||||||
| # M2M or no project relation, use model-level permissions | ||||||
| return self.get_model_level_permissions(user) | ||||||
|
|
||||||
| # Otherwise, get object-level permissions | ||||||
| return self.get_object_level_permissions(user) | ||||||
|
|
||||||
| def get_model_level_permissions(self, user: AbstractUser | AnonymousUser) -> list[str]: | ||||||
| """ | ||||||
| Retrieve model-level permissions for the given user. | ||||||
| Returns a list of allowed actions such as ["create", "update", "delete"]. | ||||||
| """ | ||||||
| # Return all permissions for superusers | ||||||
| if user.is_superuser: | ||||||
| allowed_custom_actions = self.get_custom_user_permissions(user) | ||||||
| return ["update", "delete"] + allowed_custom_actions | ||||||
| # Superusers get all possible actions | ||||||
| return ["update", "delete", "view"] | ||||||
|
|
||||||
| model = self._meta.model_name | ||||||
| app_label = "main" # self._meta.app_label | ||||||
|
||||||
| crud_map = { | ||||||
| "update": f"{app_label}.update_{model}", | ||||||
| "delete": f"{app_label}.delete_{model}", | ||||||
| "view": f"{app_label}.view_{model}", | ||||||
| } | ||||||
|
|
||||||
| allowed_actions = [action for action, perm in crud_map.items() if user.has_perm(perm)] | ||||||
| return allowed_actions | ||||||
|
|
||||||
| def get_object_level_permissions(self, user: AbstractUser | AnonymousUser) -> list[str]: | ||||||
| """ | ||||||
| Retrieve object-level permissions (including custom ones) for this instance. | ||||||
| """ | ||||||
|
|
||||||
| if user.is_superuser: | ||||||
| return ["update", "delete"] + self.get_custom_object_level_permissions(user) | ||||||
|
|
||||||
| project = self.get_project() | ||||||
| if not project: | ||||||
| # Fallback to model-level permissions if no related project found | ||||||
| return self.get_model_level_permissions(user) | ||||||
|
|
||||||
| object_perms = self._get_object_perms(user) | ||||||
| # Check for update and delete permissions | ||||||
| allowed_actions = set() | ||||||
| for perm in object_perms: | ||||||
| action = perm.split("_", 1)[0] | ||||||
| if action in {"update", "delete"}: | ||||||
| allowed_actions.add(action) | ||||||
|
|
||||||
| allowed_custom_actions = self.get_custom_user_permissions(user) | ||||||
| allowed_actions.update(set(allowed_custom_actions)) | ||||||
| return list(allowed_actions) | ||||||
|
|
||||||
| def get_custom_user_permissions(self, user: AbstractUser | AnonymousUser) -> list[str]: | ||||||
| allowed_actions = { | ||||||
| perm.split("_", 1)[0] for perm in object_perms if perm.split("_", 1)[0] in {"update", "delete"} | ||||||
| } | ||||||
|
|
||||||
| custom_actions = self.get_custom_object_level_permissions(user) | ||||||
| return list(allowed_actions.union(custom_actions)) | ||||||
|
|
||||||
| def get_custom_object_level_permissions(self, user: AbstractUser | AnonymousUser) -> list[str]: | ||||||
| """ | ||||||
| Returns a list of custom permissions (not standard CRUD actions) that the user has on this instance. | ||||||
| Retrieve custom (non-CRUD) permissions for this instance. | ||||||
| """ | ||||||
| object_perms = self._get_object_perms(user) | ||||||
| custom_perms = set() | ||||||
| # Extract custom permissions that are not standard CRUD actions | ||||||
| for perm in object_perms: | ||||||
| action = perm.split("_", 1)[0] | ||||||
| # Make sure to exclude standard CRUD actions | ||||||
| if action not in ["view", "create", "update", "delete"]: | ||||||
| custom_perms.add(action) | ||||||
| custom_perms = { | ||||||
| perm.split("_", 1)[0] | ||||||
| for perm in object_perms | ||||||
| if perm.split("_", 1)[0] not in ["view", "create", "update", "delete"] | ||||||
| } | ||||||
| return list(custom_perms) | ||||||
|
|
||||||
| @classmethod | ||||||
| def get_collection_level_permissions(cls, user: AbstractUser | AnonymousUser, project) -> list[str]: | ||||||
| """ | ||||||
| Retrieve collection-level permissions for the given user. | ||||||
| """ | ||||||
| app_label = "main" | ||||||
|
||||||
| app_label = "main" | |
| app_label = cls._meta.app_label |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -58,6 +58,31 @@ def to_representation(self, instance): | |
| instance_data = self.get_permissions(instance=instance, instance_data=instance_data) | ||
| return instance_data | ||
|
|
||
| def get_instance_for_permission_check(self): | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is helpful, thanks. |
||
| """ | ||
| Returns an unsaved model instance built from validated_data, | ||
| excluding ManyToMany fields and any non-model fields (like 'project'). | ||
| Safe to use for permission checking before saving. | ||
| """ | ||
| validated_data = getattr(self, "validated_data", {}) | ||
| if not validated_data: | ||
| raise ValueError("Serializer must be validated before calling this method.") | ||
|
|
||
| model_cls = self.Meta.model | ||
| model_field_names = {f.name for f in model_cls._meta.get_fields()} | ||
| m2m_fields = {f.name for f in model_cls._meta.many_to_many} | ||
|
|
||
| safe_data = {} | ||
| for key, value in validated_data.items(): | ||
| # skip many-to-many and non-model fields | ||
| if key in m2m_fields: | ||
| continue | ||
| if key not in model_field_names: | ||
| continue | ||
| safe_data[key] = value | ||
|
|
||
| return model_cls(**safe_data) | ||
|
|
||
|
|
||
| class MinimalNestedModelSerializer(DefaultSerializer): | ||
| """ | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hardcoding
app_labelto 'main' makes this method inflexible for models in other apps. Useself._meta.app_labelinstead to automatically determine the correct app label.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with this suggestion, since models in other apps use the BaseModel, the constructed strings will be incorrect. For example:
"create": f"{app_label}.create_{model}"will be
"create": f"main.create_job"instead of"create": f"jobs.create_job"