diff --git a/examples/settings.yml b/examples/settings.yml index d3608efd..0e9cf04e 100644 --- a/examples/settings.yml +++ b/examples/settings.yml @@ -6,6 +6,20 @@ # You can run Action from each repo, acting on that repo's settings.yml, or # from a central repo, using a single settings.yml to control many repos. +# For users, it is the login id +# For teams, it is the slug id +# permission can be 'push','pull','triage','admin','maintain', or any custom role you have defined +# for either users or teams, set exists to false to remove their permissions +collaborators: + # - name: andrewthetechie + # type: user + # permission: admin + # exists: false + # - name: / + # type: team + # permission: admin + # exists: false + # Which method you choose is up to you. See README.md for more info and example # Workflows to implement these strategies. settings: diff --git a/repo_manager/gh/branch_protections.py b/repo_manager/gh/branch_protections.py index 9a81ab3b..5f6b35c1 100644 --- a/repo_manager/gh/branch_protections.py +++ b/repo_manager/gh/branch_protections.py @@ -1,6 +1,8 @@ from copy import deepcopy from typing import Any +from actions_toolkit import core as actions_toolkit + from github.Consts import mediaTypeRequireMultipleApprovingReviews from github.GithubException import GithubException from github.GithubObject import NotSet @@ -309,7 +311,12 @@ def check_repo_branch_protections( diff_protections[config_bp.name] = ["Branch is not protected"] continue - this_protection = repo_bp.get_protection() + try: + this_protection = repo_bp.get_protection() + except Exception as exc: + actions_toolkit.info(f"Repo {repo.full_name} does not currently have any branch protections defined?") + actions_toolkit.info(f"error: {exc}") + continue if config_bp.protection.pr_options is not None: diffs.append( diff_option( diff --git a/repo_manager/gh/collaborators.py b/repo_manager/gh/collaborators.py new file mode 100644 index 00000000..ae924db8 --- /dev/null +++ b/repo_manager/gh/collaborators.py @@ -0,0 +1,199 @@ +from typing import Any +from actions_toolkit import core as actions_toolkit + +from github.Repository import Repository + +from repo_manager.utils import get_organization +from repo_manager.schemas.collaborator import Collaborator + + +def diff_option(key: str, expected: Any, repo_value: Any) -> str | None: + if expected is not None: + if expected != repo_value: + return f"{key} -- Expected: {expected} Found: {repo_value}" + return None + + +def check_collaborators( + repo: Repository, collaborators: list[Collaborator] +) -> tuple[bool, dict[str, list[str] | dict[str, Any]]]: + """Checks a repo's environments vs our expected settings + + Args: + repo (Repository): [description] + environments (List[Environment]): [description] + + Returns: + Tuple[bool, Optional[List[str]]]: [description] + """ + + diff = {} + + expected_collab_usernames = { + collaborator.name + for collaborator in filter( + lambda collaborator: collaborator.type == "User" and collaborator.exists, + collaborators, + ) + } + expected_collab_teamnames = { + collaborator.name + for collaborator in filter( + lambda collaborator: collaborator.type == "Team" and collaborator.exists, + collaborators, + ) + } + repo_collab_users = repo.get_collaborators() + repo_collab_teams = repo.get_teams() # __get_teams__(repo) + repo_collab_usernames = {collaborator.login for collaborator in repo_collab_users} + repo_collab_teamnames = {collaborator.slug for collaborator in repo_collab_teams} + + missing_users = list(expected_collab_usernames - repo_collab_usernames) + missing_teams = list(expected_collab_teamnames - repo_collab_teamnames) + if len(missing_users) + len(missing_teams) > 0: + diff["missing"] = {} + if len(missing_users) > 0: + diff["missing"]["Users"] = missing_users + if len(missing_teams) > 0: + diff["missing"]["Teams"] = missing_teams + + extra_users = list( + repo_collab_usernames.intersection( + collaborator.name + for collaborator in filter( + lambda collaborator: collaborator.type == "User" and not collaborator.exists, + collaborators, + ) + ) + ) + extra_teams = list( + repo_collab_teamnames.intersection( + collaborator.name + for collaborator in filter( + lambda collaborator: collaborator.type == "Team" and not collaborator.exists, + collaborators, + ) + ) + ) + if len(extra_users) + len(extra_teams) > 0: + diff["extra"] = {} + if len(extra_users) > 0: + diff["extra"]["Users"] = extra_users + if len(extra_teams) > 0: + diff["extra"]["Teams"] = extra_teams + + collaborators_to_check_values_on = {} + collaborators_to_check_values_on["Users"] = list(expected_collab_usernames.intersection(repo_collab_usernames)) + collaborators_to_check_values_on["Teams"] = list(expected_collab_teamnames.intersection(repo_collab_teamnames)) + config_collaborator_dict = {collaborator.name: collaborator for collaborator in collaborators} + repo_collab_dict = {"Users": {}, "Teams": {}} + repo_collab_dict["Users"] = {collaborator.login: collaborator for collaborator in repo_collab_users} + repo_collab_dict["Teams"] = {collaborator.slug: collaborator for collaborator in repo_collab_teams} + perm_diffs = {"Users": {}, "Teams": {}} + for collaborator_type in collaborators_to_check_values_on.keys(): + for collaborator_name in collaborators_to_check_values_on[collaborator_type]: + if collaborator_type == "Users": + repo_value = getattr( + repo_collab_dict[collaborator_type][collaborator_name].permissions, + config_collaborator_dict[collaborator_name].permission, + None, + ) + else: + repo_value = ( + getattr( + repo_collab_dict[collaborator_type][collaborator_name], + "permission", + None, + ) + == config_collaborator_dict[collaborator_name].permission + ) + if repo_value is not True: + perm_diffs[collaborator_type][collaborator_name] = diff_option( + config_collaborator_dict[collaborator_name].permission, + True, + repo_value, + ) + + if len(perm_diffs["Users"]) == 0: + perm_diffs.pop("Users") + + if len(perm_diffs["Teams"]) == 0: + perm_diffs.pop("Teams") + + if len(perm_diffs) > 0: + diff["diff"] = perm_diffs + + if len(diff) > 0: + return False, diff + + return True, None + + +def update_collaborators(repo: Repository, collaborators: list[Collaborator], diffs: dict[str, Any]) -> set[str]: + """Updates a repo's environments to match the expected settings + + Args: + repo (Repository): [description] + environments (List[environment]): [description] + diffs (Dictionary[string, Any]): List of all the summarized differences by environment name + + Returns: + set[str]: [description] + """ + errors = [] + users_dict = { + collaborator.name: collaborator + for collaborator in filter( + lambda collaborator: collaborator.type == "User" and collaborator.name, + collaborators, + ) + } + teams_dict = { + collaborator.name: collaborator + for collaborator in filter( + lambda collaborator: collaborator.type == "Team" and collaborator.name, + collaborators, + ) + } + + def switch(collaborator: Collaborator, diff_type: str) -> None: + if diff_type == "missing": + if collaborator.type == "User": + repo.add_to_collaborators(collaborator.name, collaborator.permission) + elif collaborator.type == "Team": + get_organization().get_team_by_slug(collaborator.name).update_team_repository( + repo, collaborator.permission + ) + actions_toolkit.info(f"Added collaborator {collaborator.name} with permission {collaborator.permission}.") + elif diff_type == "extra": + if collaborator.type == "User": + repo.remove_from_collaborators(collaborator.name) + elif collaborator.type == "Team": + get_organization().get_team_by_slug(collaborator.name).remove_from_repos(repo) + else: + raise Exception(f"Modifying collaborators of type {collaborator.type} not currently supported") + actions_toolkit.info(f"Removed collaborator {collaborator.name}.") + elif diff_type == "diff": + if collaborator.type == "User": + repo.add_to_collaborators(collaborator.name, collaborator.permission) + elif collaborator.type == "Team": + get_organization().get_team_by_slug(collaborator.name).update_team_repository( + repo, collaborator.permission + ) + else: + raise Exception(f"Modifying collaborators of type {collaborator.type} not currently supported") + actions_toolkit.info(f"Updated collaborator {collaborator.name} with permission {collaborator.permission}.") + else: + errors.append(f"Collaborator {collaborator} not found in expected collaborators") + + for diff_type in diffs.keys(): + for collaborator_type in diffs[diff_type]: + for collaborator in diffs[diff_type][collaborator_type]: + if collaborator_type == "Users": + switch(users_dict[collaborator], diff_type) + elif collaborator_type == "Teams": + switch(teams_dict[collaborator], diff_type) + else: + raise Exception(f"Modifying collaborators of type {collaborator_type} not currently supported") + + return errors diff --git a/repo_manager/gh/settings.py b/repo_manager/gh/settings.py index 202d8443..8c062618 100644 --- a/repo_manager/gh/settings.py +++ b/repo_manager/gh/settings.py @@ -1,5 +1,7 @@ from typing import Any +from actions_toolkit import core as actions_toolkit + from github.Repository import Repository from repo_manager.schemas.settings import Settings @@ -66,14 +68,21 @@ def get_repo_value(setting_name: str, repo: Repository) -> Any | None: for setting_name in settings.dict().keys(): repo_value = get_repo_value(setting_name, repo) settings_value = getattr(settings, setting_name) - # These don't seem to update if changed; may need to explore a different API call - if (setting_name == "enable_automated_security_fixes") | (setting_name == "enable_vulnerability_alerts"): - continue + if setting_name in [ + "allow_squash_merge", + "allow_merge_commit", + "allow_rebase_merge", + "delete_branch_on_merge", + "enable_automated_security_fixes", + "enable_vulnerability_alerts", + ]: + if repo._requester.oauth_scopes is None: + continue + elif repo_value is None: + actions_toolkit.info(f"Unable to access {setting_name} with OAUTH of {repo._requester.oauth_scopes}") # We don't want to flag description being different if the YAML is None - if (setting_name == "description") & (not settings_value): + if settings_value is None: continue - elif (setting_name == "topics") & (settings_value is None): - settings_value = [] if repo_value != settings_value: drift.append(f"{setting_name} -- Expected: '{settings_value}' Found: '{repo_value}'") checked &= False if (settings_value is not None) else True diff --git a/repo_manager/main.py b/repo_manager/main.py index ca18e8ce..9e39fa1b 100644 --- a/repo_manager/main.py +++ b/repo_manager/main.py @@ -14,6 +14,8 @@ from repo_manager.gh.secrets import check_repo_secrets from repo_manager.gh.secrets import create_secret from repo_manager.gh.secrets import delete_secret +from repo_manager.gh.collaborators import check_collaborators +from repo_manager.gh.collaborators import update_collaborators from repo_manager.gh.settings import check_repo_settings from repo_manager.gh.settings import update_settings from repo_manager.schemas import load_config @@ -57,6 +59,7 @@ def main(): # noqa: C901 "branch_protections", config.branch_protections, ), + check_collaborators: ("collaborators", config.collaborators), }.items(): check_name, to_check = to_check if to_check is not None: @@ -65,11 +68,13 @@ def main(): # noqa: C901 if this_diffs is not None: diffs[check_name] = this_diffs - actions_toolkit.debug(json_diff := json.dumps({})) + actions_toolkit.debug(json_diff := json.dumps(diffs)) actions_toolkit.set_output("diff", json_diff) if inputs["action"] == "check": if not check_result: + actions_toolkit.info(inputs["repo_object"].full_name) + actions_toolkit.info(json.dumps(diffs)) actions_toolkit.set_output("result", "Check failed, diff detected") actions_toolkit.set_failed("Diff detected") actions_toolkit.set_output("result", "Check passed") @@ -77,6 +82,27 @@ def main(): # noqa: C901 if inputs["action"] == "apply": errors = [] + for update, to_update in { + # TODO: Implement these functions to reduce length and complexity of code + # update_settings: ("settings", config.settings), + # update_secrets: ("secrets", config.secrets), + # check_repo_labels: ("labels", config.labels), + # check_repo_branch_protections: ( + # "branch_protections", + # config.branch_protections, + # ), + update_collaborators: ("collaborators", config.collaborators, diffs.get("collaborators", None)), + }.items(): + update_name, to_update, categorical_diffs = to_update + if categorical_diffs is not None: + try: + application_errors = update(inputs["repo_object"], to_update, categorical_diffs) + if len(application_errors) > 0: + errors.append(application_errors) + else: + actions_toolkit.info(f"Synced {update_name}") + except Exception as exc: + errors.append({"type": f"{update_name}-update", "error": f"{exc}"}) # Because we cannot diff secrets, just apply it every time if config.secrets is not None: diff --git a/repo_manager/schemas/__init__.py b/repo_manager/schemas/__init__.py index 5c08f4d5..cbc38215 100644 --- a/repo_manager/schemas/__init__.py +++ b/repo_manager/schemas/__init__.py @@ -1,26 +1,23 @@ import yaml -from pydantic import BaseModel # pylint: disable=E0611 +from pydantic import BaseModel, Field # pylint: disable=E0611 from .branch_protection import BranchProtection from .file import FileConfig from .label import Label from .secret import Secret from .settings import Settings -from pydantic import Field -from copy import copy - - -def empty_list(): - this_list = list() - return copy(this_list) +from .collaborator import Collaborator class RepoManagerConfig(BaseModel): settings: Settings | None - branch_protections: list[BranchProtection] = Field(default_factory=empty_list) - secrets: list[Secret] = Field(default_factory=empty_list) - labels: list[Label] = Field(default_factory=empty_list) - files: list[FileConfig] = Field(default_factory=empty_list) + branch_protections: list[BranchProtection] | None = Field( + None, description="Branch protections in the repo to manage" + ) + secrets: list[Secret] | None = Field(None, description="Secrets in the repo to manage") + labels: list[Label] | None = Field(None, description="Labels in the repo to manage") + files: list[FileConfig] | None = Field(None, description="Files in the repo to manage") + collaborators: list[Collaborator] | None = Field(None, description="Collaborators in the repo to manage") @property def secrets_dict(self): @@ -38,6 +35,14 @@ def branch_protections_dict(self): else {} ) + @property + def collaborators_dict(self): + return ( + {collaborator.name: collaborator for collaborator in self.collaborators} + if self.collaborators is not None + else {} + ) + def load_config(filename: str) -> RepoManagerConfig: """Loads a yaml file into a RepoManagerconfig""" diff --git a/repo_manager/schemas/collaborator.py b/repo_manager/schemas/collaborator.py new file mode 100644 index 00000000..bb8365bf --- /dev/null +++ b/repo_manager/schemas/collaborator.py @@ -0,0 +1,48 @@ +from typing import Self + +from github import Github + +from repo_manager.utils import get_client, get_repo + +from pydantic import BaseModel # pylint: disable=E0611 +from pydantic import Field, field_validator, model_validator + + +class Collaborator(BaseModel): + type: str = Field("team", description="Type of reviewer, can be `user` or `team`") + name: str = Field("user", description="Name of the reviewer, either a user or team name") + permission: str = Field( + "pull", + description="Permission level of the reviewer, can be `pull` `triage`, " + + "`push`, `maintain`, `admin`, or custom roles defined in the repo/org", + ) + exists: bool = Field( + True, + description="Whether the collaborator should exist in the repo; " + + "mark as false to remove the collaborator from the repo", + ) + id: int = Field(0, description="ID of the reviewer, either a user or team ID") + repositories_url: str = Field(None, description="URL to modify team permissions, only applicable for teams") + + @model_validator(mode="after") + def initialize_id(self) -> Self: + client: Github = get_client() + if self.type == "User": + self.id = int(client.get_user(self.name).id) + elif self.type == "Team": + if self.name.count("/") == 1: + org, team = self.name.split("/") + else: + org = get_repo().owner.login + team = self.name + github_object = client.get_organization(org).get_team_by_slug(team) + self.repositories_url = github_object.repositories_url + self.id = github_object.id + return self + + @field_validator("type") + def validate_type(cls, v) -> str: + v = v.lower().capitalize() + if v not in {"User", "Team"}: + raise ValueError("Reviewer Type must be user or team.") + return v diff --git a/repo_manager/utils/__init__.py b/repo_manager/utils/__init__.py index ad8e00df..6849444f 100644 --- a/repo_manager/utils/__init__.py +++ b/repo_manager/utils/__init__.py @@ -3,6 +3,8 @@ from actions_toolkit import core as actions_toolkit +from github import Github, Repository, Organization + # Needed to handle extracting certain attributes/fields from nested objects and lists from itertools import repeat @@ -13,13 +15,14 @@ VALID_ACTIONS = {"validate": None, "check": None, "apply": None} -def get_inputs() -> dict[str, Any]: +def __get_inputs__() -> dict: """Get inputs from our workflow, valudate them, and return as a dict Reads inputs from the dict INPUTS. This dict is generated from the actions.yml file. Non required inputs that are not set are returned as None Returns: Dict[str, Any]: [description] """ + global parsed_inputs parsed_inputs = dict() for input_name, input_config in INPUTS.items(): this_input_value = actions_toolkit.get_input( @@ -37,6 +40,64 @@ def get_inputs() -> dict[str, Any]: parsed_inputs[input_name] = input_config.get("default", None) if parsed_inputs[input_name] is None: actions_toolkit.set_failed(f"Error getting inputs. {input_name} is missing a default") + return parsed_inputs + + +def __get_api_url__() -> str: + global parsed_inputs + parsed_inputs = __get_inputs__() if "parsed_inputs" not in globals() else parsed_inputs + global api_url + if parsed_inputs["github_server_url"] == "https://github.com": + api_url = "https://api.github.com" + else: + api_url = parsed_inputs["github_server_url"] + "/api/v3" + actions_toolkit.debug(f"api_url: {api_url}") + return api_url + + +def get_client() -> Github: + global parsed_inputs + parsed_inputs = __get_inputs__() if "parsed_inputs" not in globals() else parsed_inputs + global api_url + api_url = __get_api_url__() if "api_url" not in globals() else api_url + try: + client = get_github_client(parsed_inputs["token"], api_url=api_url) + except Exception as exc: # this should be tighter + actions_toolkit.set_failed(f"Error while retrieving GitHub REST API Client from {api_url}. {exc}") + return client + + +def get_repo() -> Repository: + global parsed_inputs + parsed_inputs = __get_inputs__() if "parsed_inputs" not in globals() else parsed_inputs + client = get_client() + try: + repo = client.get_repo(parsed_inputs["repo"]) + except Exception as exc: # this should be tighter + actions_toolkit.set_failed(f"Error while retrieving {parsed_inputs['repo']} from Github. {exc}") + return repo + + +def get_organization() -> Organization: + global parsed_inputs + parsed_inputs = __get_inputs__() if "parsed_inputs" not in globals() else parsed_inputs + client = get_client() + try: + org = client.get_organization(parsed_inputs["repo"].split("/")[0]) + except Exception as exc: # this should be tighter + actions_toolkit.set_failed(f"Error while retrieving {parsed_inputs['repo'].split('/')[0]} from Github. {exc}") + return org + + +def get_inputs() -> dict[str, Any]: + """Get inputs from our workflow, valudate them, and return as a dict + Reads inputs from the dict INPUTS. This dict is generated from the actions.yml file. + Non required inputs that are not set are returned as None + Returns: + Dict[str, Any]: [description] + """ + global parsed_inputs + parsed_inputs = __get_inputs__() if "parsed_inputs" not in globals() else parsed_inputs return validate_inputs(parsed_inputs) @@ -83,19 +144,8 @@ def validate_inputs(parsed_inputs: dict[str, Any]) -> dict[str, Any]: + "INPUT_GITHUB_SERVER_URL or GITHUB_SERVER_URL in the env" ) actions_toolkit.debug(f"github_server_url: {parsed_inputs['github_server_url']}") - if parsed_inputs["github_server_url"] == "https://github.com": - api_url = "https://api.github.com" - else: - api_url = parsed_inputs["github_server_url"] + "/api/v3" - - actions_toolkit.debug(f"api_url: {api_url}") - - try: - repo = get_github_client(parsed_inputs["token"], api_url=api_url).get_repo(parsed_inputs["repo"]) - except Exception as exc: # this should be tighter - actions_toolkit.set_failed(f"Error while retriving {parsed_inputs['repo']} from Github. {exc}") - parsed_inputs["repo_object"] = repo + parsed_inputs["repo_object"] = get_repo() return parsed_inputs