Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TRACK-175: Fix grabbing permissions from KC. #2415

Merged
merged 6 commits into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 22 additions & 4 deletions epictrack-api/src/api/schemas/response/user_group_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,30 @@ class UserGroupResponseSchema(Schema):
display_name = fields.Method("get_display_name")

def get_level(self, instance):
"""Get the full name"""
return int(instance["attributes"]["level"][0])
"""
Retrieve the level attribute from the given instance.

Args:
instance (dict): A dictionary representing the instance, which is expected to have an "attributes" key.

Returns:
int: The level value extracted from the instance's attributes. Defaults to 0 if not found.
"""
return int(instance.get("attributes", {}).get("level", [0])[0] or 0)

def get_display_name(self, instance):
"""Get the display name of the group"""
return instance["attributes"]["display_name"][0]
"""
Retrieve the display name of the group from the given instance.

Args:
instance (dict): A dictionary representing the group instance,
which should contain an "attributes" key.

Returns:
str: The display name of the group. If the display name is not
found, an empty string is returned.
"""
return instance.get("attributes", {}).get("display_name", [""])[0] or ""

def get_path(self, instance):
"""Format the path of the group from keycloak"""
Expand Down
14 changes: 14 additions & 0 deletions epictrack-api/src/api/services/keycloak.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,20 @@ def get_groups(brief_representation: bool = False):
response = KeycloakService._request_keycloak(f'groups?briefRepresentation={brief_representation}')
return response.json()

@staticmethod
def get_sub_groups(group_id):
"""
Return the subgroups of a given group.

Args:
group_id (str): The ID of the group for which to retrieve subgroups.

Returns:
list: A list of subgroups for the given group.
"""
response = KeycloakService._request_keycloak(f"groups/{group_id}/children")
return response.json()

@staticmethod
def get_users():
"""Get users"""
Expand Down
68 changes: 56 additions & 12 deletions epictrack-api/src/api/services/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""User service"""
from flask import current_app
from api.exceptions import BusinessError, PermissionDeniedError
from api.utils import TokenInfo

Expand Down Expand Up @@ -39,23 +40,51 @@ def get_all_users(cls):

@classmethod
def get_groups(cls):
"""Get groups that has "level" attribute set up"""
"""
Retrieve groups that have the "level" attribute set up.

This method fetches all groups from the Keycloak service and filters them
to include only those groups that have sub-groups. It logs the groups and
sub-groups at various stages for debugging purposes.

Returns:
list: A list of filtered groups that have sub-groups.
"""
# Fetch all groups from the Keycloak service
groups = KeycloakService.get_groups()
current_app.logger.debug(f"Groups: {groups}")
filtered_groups = []

for group in groups:
if group.get("subGroups"):
filtered_groups = filtered_groups + [
sub_group
for sub_group in group.get("subGroups")
if "level" in sub_group["attributes"]
]
elif "level" in group["attributes"]:
filtered_groups.append(group)
current_app.logger.info(f"group: {group}")

# Check if the group has sub-groups by looking at the "subGroupCount" attribute
if group.get("subGroupCount", 0) > 0:

# Fetch the sub-groups for the current group
sub_groups = KeycloakService.get_sub_groups(group["id"])
current_app.logger.debug(f"sub_groups: {sub_groups}")
filtered_groups.extend(sub_groups)

current_app.logger.debug(f"filtered_groups: {filtered_groups}")
return filtered_groups

@classmethod
def update_user_group(cls, user_id, user_group_request):
"""Update the group of a user"""
"""
Updates the user's group based on the provided user group request.

Args:
cls: The class instance.
user_id (str): The ID of the user to update.
user_group_request (dict): A dictionary containing the group update request details.
Expected keys:
- "group_id_to_update" (str): The ID of the group to update.
Raises:
PermissionDeniedError: If the requester does not have permission to update the group.
Returns:
dict: The result of the group update operation from KeycloakService.
"""
token_groups = TokenInfo.get_user_data()["groups"]
groups = cls.get_groups()
requesters_group = next(
Expand Down Expand Up @@ -118,5 +147,20 @@ def _delete_from_all_epictrack_subgroups(user_id):

@classmethod
def _get_level(cls, group):
"""Gets the level from the group"""
return group["attributes"]["level"][0]
"""
Retrieves the level from the given group.

Args:
group (dict): A dictionary representing the group, which should contain
an "attributes" key with a nested "level" key.

Returns:
int: The level extracted from the group. If the level is not found or
cannot be converted to an integer, returns 0.
"""
level_str = group["attributes"].get("level", [0])[0]
try:
return int(level_str)
except (KeyError, IndexError, TypeError) as e:
current_app.logger.error(f"Error getting level from group: {e}. Returning 0.")
return 0
Loading