Skip to content

Commit

Permalink
Refactor autopatch: use a AppToPatch class that regroups actions, put…
Browse files Browse the repository at this point in the history
… loops outside the action functions
  • Loading branch information
Salamandar committed Sep 12, 2024
1 parent fc9d4a6 commit cb564df
Showing 1 changed file with 147 additions and 153 deletions.
300 changes: 147 additions & 153 deletions tools/autopatches/autopatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,37 @@
import os
import subprocess
import sys
import logging
import time
from typing import Optional, TypeVar, Iterable, Generator
from pathlib import Path

import requests
import tqdm
import toml

from git import Repo, Head, Actor

# add apps/tools to sys.path
sys.path.insert(0, str(Path(__file__).parent.parent))

from app_caches import AppDir
from appslib.utils import ( # noqa: E402 pylint: disable=import-error,wrong-import-position
get_catalog,
)
import appslib.get_apps_repo as get_apps_repo

TOOLS_DIR = Path(__file__).resolve().parent.parent

my_env = os.environ.copy()
my_env["GIT_TERMINAL_PROMPT"] = "0"
os.makedirs(".apps_cache", exist_ok=True)

login = (TOOLS_DIR / ".github_login").open("r", encoding="utf-8").read().strip()
token = (TOOLS_DIR / ".github_token").open("r", encoding="utf-8").read().strip()
LOGIN = (TOOLS_DIR / ".github_login").open("r", encoding="utf-8").read().strip()
TOKEN = (TOOLS_DIR / ".github_token").open("r", encoding="utf-8").read().strip()
github_api = "https://api.github.com"

PATCHES_PATH = Path(__file__).resolve().parent / "patches"


def apps(min_level=4):
for app, infos in get_catalog().items():
Expand All @@ -36,162 +44,128 @@ def apps(min_level=4):
yield infos


def app_cache_folder(app):
return os.path.join(".apps_cache", app)

class AppToPatch:
def __init__(self, id: str, path: Path, info: dict) -> None:
self.id = id
self.path = path
self.info = info
self.patch: Optional[str] = None
self._repo: Optional[Repo] = None

@property
def repo(self) -> Repo:
if self._repo is None:
self._repo = Repo(self.path)
return self._repo

def cache(self) -> None:
appdir = AppDir(self.id, self.path)
appdir.ensure(self.info["url"], self.info.get("branch", "master"), False, False)

def reset(self) -> None:
if self.get_diff():
logging.warning("%s had local changes, they were stashed.", self.id)
self.repo.git.stash("save")
self.repo.git.checkout("testing")

def apply(self, patch: str) -> None:
current_branch = self.repo.active_branch
self.repo.head.reset(f"{current_branch}", index=True, working_tree=True)
subprocess.call([PATCHES_PATH / patch / "patch"], cwd=self.path)

def get_diff(self) -> str:
return " ".join(str(d) for d in self.repo.index.diff(None, create_patch=True))

def diff(self) -> None:
diff = self.get_diff()
if not diff:
return
print(80 * "=")
print(f"Changes in : {self.id}")
print(80 * "=")
print()
print(diff)
print("\n\n\n")

def on_github(self) -> bool:
return "github.com/yunohost-apps" in self.info["url"].lower()

def fork_if_needed(self, session: requests.Session) -> None:
repo_name = self.info["url"].split("/")[-1]
r = session.get(github_api + f"/repos/{LOGIN}/{repo_name}")
if r.status_code == 200:
return

fork_repo_name = self.info["url"].split("github.com/")[-1]
r = session.post(github_api + f"/repos/{fork_repo_name}/forks")
r.raise_for_status()
time.sleep(2) # to avoid rate limiting lol

def commit(self, patch: str) -> None:
pr_title = (PATCHES_PATH / patch / "pr_title.md").open().read().strip()
title = f"[autopatch] {pr_title}"
self.repo.git.add(all=True)
self.repo.index.commit(title, author=Actor("Yunohost-Bot", None))

def push(self, patch: str, session: requests.Session) -> None:
if not self.get_diff():
return

if not self.on_github():
return

self.fork_if_needed(session)

base_branch = self.repo.active_branch
if patch in self.repo.heads:
self.repo.delete_head(patch)
head_branch = self.repo.create_head(patch)
head_branch.checkout()

self.commit(patch)

if "fork" in self.repo.remotes:
self.repo.delete_remote(self.repo.remote("fork"))
reponame = self.info["url"].rsplit("/", 1)[-1]
self.repo.create_remote(
"fork", url=f"https://{LOGIN}:{TOKEN}@github.com/{LOGIN}/{reponame}"
)

def git(cmd, in_folder=None):
if not isinstance(cmd, list):
cmd = cmd.split()
if in_folder:
cmd = ["-C", in_folder] + cmd
cmd = ["git"] + cmd
return subprocess.check_output(cmd, env=my_env).strip().decode("utf-8")
self.repo.remote(name="fork").push(progress=None, force=True)

self.create_pull_request(patch, head_branch, base_branch, session)

# Progress bar helper, stolen from https://stackoverflow.com/a/34482761
def progressbar(it, prefix="", size=60, file=sys.stdout):
it = list(it)
count = len(it)
def create_pull_request(
self, patch: str, head: Head, base: Head, session: requests.Session
) -> None:
pr_title = (PATCHES_PATH / patch / "pr_title.md").open().read().strip()
pr_body = (PATCHES_PATH / patch / "pr_body.md").open().read().strip()
PR = {
"title": f"[autopatch] {pr_title}",
"body": f"This is an automatic PR\n\n{pr_body}",
"head": f"{LOGIN}:{head.name}",
"base": base.name,
"maintainer_can_modify": True,
}

def show(j, name=""):
name += " "
x = int(size * j / count)
file.write(
"%s[%s%s] %i/%i %s\r" % (prefix, "#" * x, "." * (size - x), j, count, name)
)
file.flush()

show(0)
for i, item in enumerate(it):
yield item
show(i + 1, item["id"])
file.write("\n")
file.flush()


def build_cache():
for app in progressbar(apps(), "Git cloning: ", 40):
folder = os.path.join(".apps_cache", app["id"])
reponame = app["url"].rsplit("/", 1)[-1]
git(f"clone --quiet --depth 1 --single-branch {app['url']} {folder}")
git(
f"remote add fork https://{login}:{token}@github.com/{login}/{reponame}",
in_folder=folder,
)
fork_repo_name = self.info["url"].split("github.com/")[-1]
repo_name = self.info["url"].split("/")[-1]
r = session.post(github_api + f"/repos/{fork_repo_name}/pulls", json.dumps(PR))
r.raise_for_status()
print(json.loads(r.text)["html_url"])
time.sleep(4) # to avoid rate limiting lol


def apply(patch):
patch_path = os.path.abspath(os.path.join("patches", patch, "patch.sh"))

for app in progressbar(apps(), "Apply to: ", 40):
folder = os.path.join(".apps_cache", app["id"])
current_branch = git(f"symbolic-ref --short HEAD", in_folder=folder)
git(f"reset --hard origin/{current_branch}", in_folder=folder)
os.system(f"cd {folder} && bash {patch_path}")


def diff():
for app in apps():
folder = os.path.join(".apps_cache", app["id"])
if bool(
subprocess.check_output(f"cd {folder} && git diff", shell=True)
.strip()
.decode("utf-8")
):
print("\n\n\n")
print("=================================")
print("Changes in : " + app["id"])
print("=================================")
print("\n")
os.system(f"cd {folder} && git --no-pager diff")


def push(patch):
title = (
"[autopatch] "
+ open(os.path.join("patches", patch, "pr_title.md")).read().strip()
)
IterType = TypeVar("IterType")

def diff_not_empty(app):
folder = os.path.join(".apps_cache", app["id"])
return bool(
subprocess.check_output(f"cd {folder} && git diff", shell=True)
.strip()
.decode("utf-8")
)

def app_is_on_github(app):
return "github.com" in app["url"]
def progressbar(elements: list[IterType]) -> Generator[IterType, None, None]:
return tqdm.tqdm(elements, total=len(elements), ascii=" ·#")

apps_to_push = [
app for app in apps() if diff_not_empty(app) and app_is_on_github(app)
]

with requests.Session() as s:
s.headers.update({"Authorization": f"token {token}"})
for app in progressbar(apps_to_push, "Forking: ", 40):
app["repo"] = app["url"][len("https://github.com/") :].strip("/")
fork_if_needed(app["repo"], s)
time.sleep(2) # to avoid rate limiting lol

for app in progressbar(apps_to_push, "Pushing: ", 40):
app["repo"] = app["url"][len("https://github.com/") :].strip("/")
app_repo_name = app["url"].rsplit("/", 1)[-1]
folder = os.path.join(".apps_cache", app["id"])
current_branch = git(f"symbolic-ref --short HEAD", in_folder=folder)
git(f"reset origin/{current_branch}", in_folder=folder)
git(
["commit", "-a", "-m", title, "--author='Yunohost-Bot <>'"],
in_folder=folder,
)
try:
git(f"remote remove fork", in_folder=folder)
except Exception:
pass
git(
f"remote add fork https://{login}:{token}@github.com/{login}/{app_repo_name}",
in_folder=folder,
)
git(f"push fork {current_branch}:{patch} --quiet --force", in_folder=folder)
create_pull_request(app["repo"], patch, current_branch, s)
time.sleep(4) # to avoid rate limiting lol


def fork_if_needed(repo, s):
repo_name = repo.split("/")[-1]
r = s.get(github_api + f"/repos/{login}/{repo_name}")

if r.status_code == 200:
return

r = s.post(github_api + f"/repos/{repo}/forks")

if r.status_code != 200:
print(r.text)


def create_pull_request(repo, patch, base_branch, s):
PR = {
"title": "[autopatch] "
+ open(os.path.join("patches", patch, "pr_title.md")).read().strip(),
"body": "This is an automatic PR\n\n"
+ open(os.path.join("patches", patch, "pr_body.md")).read().strip(),
"head": login + ":" + patch,
"base": base_branch,
"maintainer_can_modify": True,
}

r = s.post(github_api + f"/repos/{repo}/pulls", json.dumps(PR))

if r.status_code != 200:
print(r.text)
else:
json.loads(r.text)["html_url"]


def main():
def main() -> None:
parser = argparse.ArgumentParser()
get_apps_repo.add_args(parser)
parser.add_argument(
"the_patch", type=str, nargs="?", help="The name of the patch to apply"
)
Expand All @@ -212,24 +186,44 @@ def main():
)
args = parser.parse_args()

get_apps_repo.from_args(args)
cache_path = get_apps_repo.cache_path(args)
cache_path.mkdir(exist_ok=True, parents=True)

if not (args.cache or args.apply or args.diff or args.push):
parser.error("We required --cache, --apply, --diff or --push.")

apps_to_patch: list[AppToPatch] = [
AppToPatch(info["id"], cache_path / info["id"], info) for info in apps()
]

if args.cache:
build_cache()
print("Caching apps...")
for app in progressbar(apps_to_patch):
app.cache()

if args.apply:
if not args.the_patch:
parser.error("--apply requires the patch name to be passed")
apply(args.the_patch)
print(f"Applying patch '{args.the_patch}' to apps...")
for app in progressbar(apps_to_patch):
app.reset()
app.apply(args.the_patch)

if args.diff:
diff()
print("Printing diff of apps...")
for app in progressbar(apps_to_patch):
app.diff()

if args.push:
if not args.the_patch:
parser.error("--push requires the patch name to be passed")
push(args.the_patch)
print("Pushing apps...")
with requests.Session() as session:
session.headers.update({"Authorization": f"token {TOKEN}"})
for app in progressbar(apps_to_patch):
app.push(args.the_patch, session)


main()
if __name__ == "__main__":
main()

0 comments on commit cb564df

Please sign in to comment.