diff --git a/examples/settings.yml b/examples/settings.yml index 0e9cf04e..d3608efd 100644 --- a/examples/settings.yml +++ b/examples/settings.yml @@ -6,20 +6,6 @@ # You can run Action from each repo, acting on that repo's settings.yml, or # from a central repo, using a single settings.yml to control many repos. -# For users, it is the login id -# For teams, it is the slug id -# permission can be 'push','pull','triage','admin','maintain', or any custom role you have defined -# for either users or teams, set exists to false to remove their permissions -collaborators: - # - name: andrewthetechie - # type: user - # permission: admin - # exists: false - # - name: / - # type: team - # permission: admin - # exists: false - # Which method you choose is up to you. See README.md for more info and example # Workflows to implement these strategies. settings: diff --git a/repo_manager/gh/branch_protections.py b/repo_manager/gh/branch_protections.py index 5f6b35c1..9a81ab3b 100644 --- a/repo_manager/gh/branch_protections.py +++ b/repo_manager/gh/branch_protections.py @@ -1,8 +1,6 @@ from copy import deepcopy from typing import Any -from actions_toolkit import core as actions_toolkit - from github.Consts import mediaTypeRequireMultipleApprovingReviews from github.GithubException import GithubException from github.GithubObject import NotSet @@ -311,12 +309,7 @@ def check_repo_branch_protections( diff_protections[config_bp.name] = ["Branch is not protected"] continue - try: - this_protection = repo_bp.get_protection() - except Exception as exc: - actions_toolkit.info(f"Repo {repo.full_name} does not currently have any branch protections defined?") - actions_toolkit.info(f"error: {exc}") - continue + this_protection = repo_bp.get_protection() if config_bp.protection.pr_options is not None: diffs.append( diff_option( diff --git a/repo_manager/gh/collaborators.py b/repo_manager/gh/collaborators.py deleted file mode 100644 index ae924db8..00000000 --- a/repo_manager/gh/collaborators.py +++ /dev/null @@ -1,199 +0,0 @@ -from typing import Any -from actions_toolkit import core as actions_toolkit - -from github.Repository import Repository - -from repo_manager.utils import get_organization -from repo_manager.schemas.collaborator import Collaborator - - -def diff_option(key: str, expected: Any, repo_value: Any) -> str | None: - if expected is not None: - if expected != repo_value: - return f"{key} -- Expected: {expected} Found: {repo_value}" - return None - - -def check_collaborators( - repo: Repository, collaborators: list[Collaborator] -) -> tuple[bool, dict[str, list[str] | dict[str, Any]]]: - """Checks a repo's environments vs our expected settings - - Args: - repo (Repository): [description] - environments (List[Environment]): [description] - - Returns: - Tuple[bool, Optional[List[str]]]: [description] - """ - - diff = {} - - expected_collab_usernames = { - collaborator.name - for collaborator in filter( - lambda collaborator: collaborator.type == "User" and collaborator.exists, - collaborators, - ) - } - expected_collab_teamnames = { - collaborator.name - for collaborator in filter( - lambda collaborator: collaborator.type == "Team" and collaborator.exists, - collaborators, - ) - } - repo_collab_users = repo.get_collaborators() - repo_collab_teams = repo.get_teams() # __get_teams__(repo) - repo_collab_usernames = {collaborator.login for collaborator in repo_collab_users} - repo_collab_teamnames = {collaborator.slug for collaborator in repo_collab_teams} - - missing_users = list(expected_collab_usernames - repo_collab_usernames) - missing_teams = list(expected_collab_teamnames - repo_collab_teamnames) - if len(missing_users) + len(missing_teams) > 0: - diff["missing"] = {} - if len(missing_users) > 0: - diff["missing"]["Users"] = missing_users - if len(missing_teams) > 0: - diff["missing"]["Teams"] = missing_teams - - extra_users = list( - repo_collab_usernames.intersection( - collaborator.name - for collaborator in filter( - lambda collaborator: collaborator.type == "User" and not collaborator.exists, - collaborators, - ) - ) - ) - extra_teams = list( - repo_collab_teamnames.intersection( - collaborator.name - for collaborator in filter( - lambda collaborator: collaborator.type == "Team" and not collaborator.exists, - collaborators, - ) - ) - ) - if len(extra_users) + len(extra_teams) > 0: - diff["extra"] = {} - if len(extra_users) > 0: - diff["extra"]["Users"] = extra_users - if len(extra_teams) > 0: - diff["extra"]["Teams"] = extra_teams - - collaborators_to_check_values_on = {} - collaborators_to_check_values_on["Users"] = list(expected_collab_usernames.intersection(repo_collab_usernames)) - collaborators_to_check_values_on["Teams"] = list(expected_collab_teamnames.intersection(repo_collab_teamnames)) - config_collaborator_dict = {collaborator.name: collaborator for collaborator in collaborators} - repo_collab_dict = {"Users": {}, "Teams": {}} - repo_collab_dict["Users"] = {collaborator.login: collaborator for collaborator in repo_collab_users} - repo_collab_dict["Teams"] = {collaborator.slug: collaborator for collaborator in repo_collab_teams} - perm_diffs = {"Users": {}, "Teams": {}} - for collaborator_type in collaborators_to_check_values_on.keys(): - for collaborator_name in collaborators_to_check_values_on[collaborator_type]: - if collaborator_type == "Users": - repo_value = getattr( - repo_collab_dict[collaborator_type][collaborator_name].permissions, - config_collaborator_dict[collaborator_name].permission, - None, - ) - else: - repo_value = ( - getattr( - repo_collab_dict[collaborator_type][collaborator_name], - "permission", - None, - ) - == config_collaborator_dict[collaborator_name].permission - ) - if repo_value is not True: - perm_diffs[collaborator_type][collaborator_name] = diff_option( - config_collaborator_dict[collaborator_name].permission, - True, - repo_value, - ) - - if len(perm_diffs["Users"]) == 0: - perm_diffs.pop("Users") - - if len(perm_diffs["Teams"]) == 0: - perm_diffs.pop("Teams") - - if len(perm_diffs) > 0: - diff["diff"] = perm_diffs - - if len(diff) > 0: - return False, diff - - return True, None - - -def update_collaborators(repo: Repository, collaborators: list[Collaborator], diffs: dict[str, Any]) -> set[str]: - """Updates a repo's environments to match the expected settings - - Args: - repo (Repository): [description] - environments (List[environment]): [description] - diffs (Dictionary[string, Any]): List of all the summarized differences by environment name - - Returns: - set[str]: [description] - """ - errors = [] - users_dict = { - collaborator.name: collaborator - for collaborator in filter( - lambda collaborator: collaborator.type == "User" and collaborator.name, - collaborators, - ) - } - teams_dict = { - collaborator.name: collaborator - for collaborator in filter( - lambda collaborator: collaborator.type == "Team" and collaborator.name, - collaborators, - ) - } - - def switch(collaborator: Collaborator, diff_type: str) -> None: - if diff_type == "missing": - if collaborator.type == "User": - repo.add_to_collaborators(collaborator.name, collaborator.permission) - elif collaborator.type == "Team": - get_organization().get_team_by_slug(collaborator.name).update_team_repository( - repo, collaborator.permission - ) - actions_toolkit.info(f"Added collaborator {collaborator.name} with permission {collaborator.permission}.") - elif diff_type == "extra": - if collaborator.type == "User": - repo.remove_from_collaborators(collaborator.name) - elif collaborator.type == "Team": - get_organization().get_team_by_slug(collaborator.name).remove_from_repos(repo) - else: - raise Exception(f"Modifying collaborators of type {collaborator.type} not currently supported") - actions_toolkit.info(f"Removed collaborator {collaborator.name}.") - elif diff_type == "diff": - if collaborator.type == "User": - repo.add_to_collaborators(collaborator.name, collaborator.permission) - elif collaborator.type == "Team": - get_organization().get_team_by_slug(collaborator.name).update_team_repository( - repo, collaborator.permission - ) - else: - raise Exception(f"Modifying collaborators of type {collaborator.type} not currently supported") - actions_toolkit.info(f"Updated collaborator {collaborator.name} with permission {collaborator.permission}.") - else: - errors.append(f"Collaborator {collaborator} not found in expected collaborators") - - for diff_type in diffs.keys(): - for collaborator_type in diffs[diff_type]: - for collaborator in diffs[diff_type][collaborator_type]: - if collaborator_type == "Users": - switch(users_dict[collaborator], diff_type) - elif collaborator_type == "Teams": - switch(teams_dict[collaborator], diff_type) - else: - raise Exception(f"Modifying collaborators of type {collaborator_type} not currently supported") - - return errors diff --git a/repo_manager/gh/settings.py b/repo_manager/gh/settings.py index 8c062618..202d8443 100644 --- a/repo_manager/gh/settings.py +++ b/repo_manager/gh/settings.py @@ -1,7 +1,5 @@ from typing import Any -from actions_toolkit import core as actions_toolkit - from github.Repository import Repository from repo_manager.schemas.settings import Settings @@ -68,21 +66,14 @@ def get_repo_value(setting_name: str, repo: Repository) -> Any | None: for setting_name in settings.dict().keys(): repo_value = get_repo_value(setting_name, repo) settings_value = getattr(settings, setting_name) - if setting_name in [ - "allow_squash_merge", - "allow_merge_commit", - "allow_rebase_merge", - "delete_branch_on_merge", - "enable_automated_security_fixes", - "enable_vulnerability_alerts", - ]: - if repo._requester.oauth_scopes is None: - continue - elif repo_value is None: - actions_toolkit.info(f"Unable to access {setting_name} with OAUTH of {repo._requester.oauth_scopes}") + # These don't seem to update if changed; may need to explore a different API call + if (setting_name == "enable_automated_security_fixes") | (setting_name == "enable_vulnerability_alerts"): + continue # We don't want to flag description being different if the YAML is None - if settings_value is None: + if (setting_name == "description") & (not settings_value): continue + elif (setting_name == "topics") & (settings_value is None): + settings_value = [] if repo_value != settings_value: drift.append(f"{setting_name} -- Expected: '{settings_value}' Found: '{repo_value}'") checked &= False if (settings_value is not None) else True diff --git a/repo_manager/main.py b/repo_manager/main.py index 9e39fa1b..ca18e8ce 100644 --- a/repo_manager/main.py +++ b/repo_manager/main.py @@ -14,8 +14,6 @@ from repo_manager.gh.secrets import check_repo_secrets from repo_manager.gh.secrets import create_secret from repo_manager.gh.secrets import delete_secret -from repo_manager.gh.collaborators import check_collaborators -from repo_manager.gh.collaborators import update_collaborators from repo_manager.gh.settings import check_repo_settings from repo_manager.gh.settings import update_settings from repo_manager.schemas import load_config @@ -59,7 +57,6 @@ def main(): # noqa: C901 "branch_protections", config.branch_protections, ), - check_collaborators: ("collaborators", config.collaborators), }.items(): check_name, to_check = to_check if to_check is not None: @@ -68,13 +65,11 @@ def main(): # noqa: C901 if this_diffs is not None: diffs[check_name] = this_diffs - actions_toolkit.debug(json_diff := json.dumps(diffs)) + actions_toolkit.debug(json_diff := json.dumps({})) actions_toolkit.set_output("diff", json_diff) if inputs["action"] == "check": if not check_result: - actions_toolkit.info(inputs["repo_object"].full_name) - actions_toolkit.info(json.dumps(diffs)) actions_toolkit.set_output("result", "Check failed, diff detected") actions_toolkit.set_failed("Diff detected") actions_toolkit.set_output("result", "Check passed") @@ -82,27 +77,6 @@ def main(): # noqa: C901 if inputs["action"] == "apply": errors = [] - for update, to_update in { - # TODO: Implement these functions to reduce length and complexity of code - # update_settings: ("settings", config.settings), - # update_secrets: ("secrets", config.secrets), - # check_repo_labels: ("labels", config.labels), - # check_repo_branch_protections: ( - # "branch_protections", - # config.branch_protections, - # ), - update_collaborators: ("collaborators", config.collaborators, diffs.get("collaborators", None)), - }.items(): - update_name, to_update, categorical_diffs = to_update - if categorical_diffs is not None: - try: - application_errors = update(inputs["repo_object"], to_update, categorical_diffs) - if len(application_errors) > 0: - errors.append(application_errors) - else: - actions_toolkit.info(f"Synced {update_name}") - except Exception as exc: - errors.append({"type": f"{update_name}-update", "error": f"{exc}"}) # Because we cannot diff secrets, just apply it every time if config.secrets is not None: diff --git a/repo_manager/schemas/__init__.py b/repo_manager/schemas/__init__.py index cbc38215..5c08f4d5 100644 --- a/repo_manager/schemas/__init__.py +++ b/repo_manager/schemas/__init__.py @@ -1,23 +1,26 @@ import yaml -from pydantic import BaseModel, Field # pylint: disable=E0611 +from pydantic import BaseModel # pylint: disable=E0611 from .branch_protection import BranchProtection from .file import FileConfig from .label import Label from .secret import Secret from .settings import Settings -from .collaborator import Collaborator +from pydantic import Field +from copy import copy + + +def empty_list(): + this_list = list() + return copy(this_list) class RepoManagerConfig(BaseModel): settings: Settings | None - branch_protections: list[BranchProtection] | None = Field( - None, description="Branch protections in the repo to manage" - ) - secrets: list[Secret] | None = Field(None, description="Secrets in the repo to manage") - labels: list[Label] | None = Field(None, description="Labels in the repo to manage") - files: list[FileConfig] | None = Field(None, description="Files in the repo to manage") - collaborators: list[Collaborator] | None = Field(None, description="Collaborators in the repo to manage") + branch_protections: list[BranchProtection] = Field(default_factory=empty_list) + secrets: list[Secret] = Field(default_factory=empty_list) + labels: list[Label] = Field(default_factory=empty_list) + files: list[FileConfig] = Field(default_factory=empty_list) @property def secrets_dict(self): @@ -35,14 +38,6 @@ def branch_protections_dict(self): else {} ) - @property - def collaborators_dict(self): - return ( - {collaborator.name: collaborator for collaborator in self.collaborators} - if self.collaborators is not None - else {} - ) - def load_config(filename: str) -> RepoManagerConfig: """Loads a yaml file into a RepoManagerconfig""" diff --git a/repo_manager/schemas/collaborator.py b/repo_manager/schemas/collaborator.py deleted file mode 100644 index bb8365bf..00000000 --- a/repo_manager/schemas/collaborator.py +++ /dev/null @@ -1,48 +0,0 @@ -from typing import Self - -from github import Github - -from repo_manager.utils import get_client, get_repo - -from pydantic import BaseModel # pylint: disable=E0611 -from pydantic import Field, field_validator, model_validator - - -class Collaborator(BaseModel): - type: str = Field("team", description="Type of reviewer, can be `user` or `team`") - name: str = Field("user", description="Name of the reviewer, either a user or team name") - permission: str = Field( - "pull", - description="Permission level of the reviewer, can be `pull` `triage`, " - + "`push`, `maintain`, `admin`, or custom roles defined in the repo/org", - ) - exists: bool = Field( - True, - description="Whether the collaborator should exist in the repo; " - + "mark as false to remove the collaborator from the repo", - ) - id: int = Field(0, description="ID of the reviewer, either a user or team ID") - repositories_url: str = Field(None, description="URL to modify team permissions, only applicable for teams") - - @model_validator(mode="after") - def initialize_id(self) -> Self: - client: Github = get_client() - if self.type == "User": - self.id = int(client.get_user(self.name).id) - elif self.type == "Team": - if self.name.count("/") == 1: - org, team = self.name.split("/") - else: - org = get_repo().owner.login - team = self.name - github_object = client.get_organization(org).get_team_by_slug(team) - self.repositories_url = github_object.repositories_url - self.id = github_object.id - return self - - @field_validator("type") - def validate_type(cls, v) -> str: - v = v.lower().capitalize() - if v not in {"User", "Team"}: - raise ValueError("Reviewer Type must be user or team.") - return v diff --git a/repo_manager/utils/__init__.py b/repo_manager/utils/__init__.py index 6849444f..ad8e00df 100644 --- a/repo_manager/utils/__init__.py +++ b/repo_manager/utils/__init__.py @@ -3,8 +3,6 @@ from actions_toolkit import core as actions_toolkit -from github import Github, Repository, Organization - # Needed to handle extracting certain attributes/fields from nested objects and lists from itertools import repeat @@ -15,14 +13,13 @@ VALID_ACTIONS = {"validate": None, "check": None, "apply": None} -def __get_inputs__() -> dict: +def get_inputs() -> dict[str, Any]: """Get inputs from our workflow, valudate them, and return as a dict Reads inputs from the dict INPUTS. This dict is generated from the actions.yml file. Non required inputs that are not set are returned as None Returns: Dict[str, Any]: [description] """ - global parsed_inputs parsed_inputs = dict() for input_name, input_config in INPUTS.items(): this_input_value = actions_toolkit.get_input( @@ -40,64 +37,6 @@ def __get_inputs__() -> dict: parsed_inputs[input_name] = input_config.get("default", None) if parsed_inputs[input_name] is None: actions_toolkit.set_failed(f"Error getting inputs. {input_name} is missing a default") - return parsed_inputs - - -def __get_api_url__() -> str: - global parsed_inputs - parsed_inputs = __get_inputs__() if "parsed_inputs" not in globals() else parsed_inputs - global api_url - if parsed_inputs["github_server_url"] == "https://github.com": - api_url = "https://api.github.com" - else: - api_url = parsed_inputs["github_server_url"] + "/api/v3" - actions_toolkit.debug(f"api_url: {api_url}") - return api_url - - -def get_client() -> Github: - global parsed_inputs - parsed_inputs = __get_inputs__() if "parsed_inputs" not in globals() else parsed_inputs - global api_url - api_url = __get_api_url__() if "api_url" not in globals() else api_url - try: - client = get_github_client(parsed_inputs["token"], api_url=api_url) - except Exception as exc: # this should be tighter - actions_toolkit.set_failed(f"Error while retrieving GitHub REST API Client from {api_url}. {exc}") - return client - - -def get_repo() -> Repository: - global parsed_inputs - parsed_inputs = __get_inputs__() if "parsed_inputs" not in globals() else parsed_inputs - client = get_client() - try: - repo = client.get_repo(parsed_inputs["repo"]) - except Exception as exc: # this should be tighter - actions_toolkit.set_failed(f"Error while retrieving {parsed_inputs['repo']} from Github. {exc}") - return repo - - -def get_organization() -> Organization: - global parsed_inputs - parsed_inputs = __get_inputs__() if "parsed_inputs" not in globals() else parsed_inputs - client = get_client() - try: - org = client.get_organization(parsed_inputs["repo"].split("/")[0]) - except Exception as exc: # this should be tighter - actions_toolkit.set_failed(f"Error while retrieving {parsed_inputs['repo'].split('/')[0]} from Github. {exc}") - return org - - -def get_inputs() -> dict[str, Any]: - """Get inputs from our workflow, valudate them, and return as a dict - Reads inputs from the dict INPUTS. This dict is generated from the actions.yml file. - Non required inputs that are not set are returned as None - Returns: - Dict[str, Any]: [description] - """ - global parsed_inputs - parsed_inputs = __get_inputs__() if "parsed_inputs" not in globals() else parsed_inputs return validate_inputs(parsed_inputs) @@ -144,8 +83,19 @@ def validate_inputs(parsed_inputs: dict[str, Any]) -> dict[str, Any]: + "INPUT_GITHUB_SERVER_URL or GITHUB_SERVER_URL in the env" ) actions_toolkit.debug(f"github_server_url: {parsed_inputs['github_server_url']}") + if parsed_inputs["github_server_url"] == "https://github.com": + api_url = "https://api.github.com" + else: + api_url = parsed_inputs["github_server_url"] + "/api/v3" + + actions_toolkit.debug(f"api_url: {api_url}") + + try: + repo = get_github_client(parsed_inputs["token"], api_url=api_url).get_repo(parsed_inputs["repo"]) + except Exception as exc: # this should be tighter + actions_toolkit.set_failed(f"Error while retriving {parsed_inputs['repo']} from Github. {exc}") - parsed_inputs["repo_object"] = get_repo() + parsed_inputs["repo_object"] = repo return parsed_inputs