Skip to content

Commit

Permalink
Parallelize workload
Browse files Browse the repository at this point in the history
  • Loading branch information
sethmlarson committed Mar 29, 2022
1 parent a665e58 commit c7172a3
Showing 1 changed file with 157 additions and 142 deletions.
299 changes: 157 additions & 142 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import tempfile
import time
from contextlib import closing
from typing import NamedTuple
from concurrent.futures import ProcessPoolExecutor

import urllib3
from packaging.version import InvalidVersion, Version
Expand Down Expand Up @@ -70,13 +72,16 @@
);
"""
)
db.execute("""
db.execute(
"""
CREATE TABLE IF NOT EXISTS maintainers (
name STRING,
package_name STRING
);
""")
"""
)
db.commit()
pool = ProcessPoolExecutor()


def get_all_package_names():
Expand Down Expand Up @@ -195,171 +200,181 @@ def get_maintainers_from_pypi(package: str):
return set()
elif resp.status != 200:
continue
return set(re.findall(r"<a href=\"/user/([^/]+)/\" aria-label=", resp.data.decode("utf-8")))
return set(
re.findall(
r"<a href=\"/user/([^/]+)/\" aria-label=", resp.data.decode("utf-8")
)
)
return set()


def update_data_from_pypi():
for package in tqdm(packages, unit="packages"):
resp = http.request("GET", f"https://pypi.org/pypi/{package}/json")
def update_data_for_package(package: str) -> None:
resp = http.request("GET", f"https://pypi.org/pypi/{package}/json")

if resp.status != 200:
return
try:
resp = json.loads(resp.data.decode("utf-8"))
except Exception:
return
try:
version = Version(resp["info"]["version"])
except InvalidVersion: # The latest release has an invalid version, skip
return
latest_version = max(to_versions(resp["releases"].keys()))

# Favor pre-releases over non-pre-releases
if version < latest_version:
version = latest_version
new_resp = http.request(
"GET", f"https://pypi.org/pypi/{package}/{latest_version}/json"
)
if new_resp.status == 200:
resp = json.loads(new_resp.data.decode("utf-8"))

if resp.status != 200:
continue
# Get the exact string for the version that we found
for strv in resp["releases"]:
try:
resp = json.loads(resp.data.decode("utf-8"))
except Exception:
if Version(strv) == version:
str_version = strv
break
except InvalidVersion:
continue
else:
raise ValueError("???")

# Check to see if we already have this version or not
with closing(db.cursor()) as cur:
cur.execute(
"SELECT name FROM packages WHERE name = ? AND version = ?;",
(package, str_version),
)
if cur.fetchone():
return

maintainers = get_maintainers_from_pypi(package)

requires_python = resp["info"]["requires_python"] or ""
urequires_dist = [
normalize_requires_dist(x) for x in resp["info"]["requires_dist"] or []
]
urequires_dist = sorted(urequires_dist, key=requires_dist_sort_key)

requires_dist = {"specifiers": [], "dists": []}
requires_extras = {}
yanked = []

releases = resp["releases"][str_version]
uploaded_at = None if not releases else min(x["upload_time"] for x in releases)
wheel_filenames = [
x["filename"] for x in releases if x["filename"].endswith(".whl")
]
has_binary_wheel = False

for filename in wheel_filenames:
try:
version = Version(resp["info"]["version"])
except InvalidVersion: # The latest release has an invalid version, skip
whl = parse_wheel_filename(filename)
except InvalidFilenameError:
continue
latest_version = max(to_versions(resp["releases"].keys()))

# Favor pre-releases over non-pre-releases
if version < latest_version:
version = latest_version
new_resp = http.request(
"GET", f"https://pypi.org/pypi/{package}/{latest_version}/json"
)
if new_resp.status == 200:
resp = json.loads(new_resp.data.decode("utf-8"))

# Get the exact string for the version that we found
for strv in resp["releases"]:
try:
if Version(strv) == version:
str_version = strv
break
except InvalidVersion:
continue
else:
raise ValueError("???")
python_tags, abi_tags, platform_tags = (
whl.python_tags,
whl.abi_tags,
whl.platform_tags,
)

# Check to see if we already have this version or not
with closing(db.cursor()) as cur:
cur.execute(
"SELECT name FROM packages WHERE name = ? AND version = ?;",
(package, str_version),
)
if cur.fetchone():
continue

# If we don't have 'requires_dist' information install
# locally and investigate the installed package
if False and resp["info"]["requires_dist"] is None:
new_resp = get_metadata_by_install(package, resp)
if new_resp is not None:
resp = new_resp

requires_python = resp["info"]["requires_python"] or ""
urequires_dist = [
normalize_requires_dist(x) for x in resp["info"]["requires_dist"] or []
]
urequires_dist = sorted(urequires_dist, key=requires_dist_sort_key)

requires_dist = {"specifiers": [], "dists": []}
requires_extras = {}
yanked = []

releases = resp["releases"][str_version]
uploaded_at = None if not releases else min(x["upload_time"] for x in releases)
wheel_filenames = [
x["filename"] for x in releases if x["filename"].endswith(".whl")
]
has_binary_wheel = False

for filename in wheel_filenames:
try:
whl = parse_wheel_filename(filename)
except InvalidFilenameError:
continue
python_tags, abi_tags, platform_tags = (
whl.python_tags,
whl.abi_tags,
whl.platform_tags,
for wheel_data in itertools.product(python_tags, abi_tags, platform_tags):
py, abi, plat = wheel_data
db.execute(
"""
INSERT INTO wheels (
name, version, filename, python, abi, platform
) VALUES (?, ?, ?, ?, ?, ?);
""",
(package, str_version, filename, py, abi, plat),
)

for wheel_data in itertools.product(python_tags, abi_tags, platform_tags):
py, abi, plat = wheel_data
db.execute(
"""
INSERT INTO wheels (
name, version, filename, python, abi, platform
) VALUES (?, ?, ?, ?, ?, ?);
""",
(package, str_version, filename, py, abi, plat),
)
if abi_tags == ["none"] and platform_tags == ["any"]:
continue

if abi_tags == ["none"] and platform_tags == ["any"]:
continue
has_binary_wheel = True

has_binary_wheel = True
db.execute(
"""
INSERT OR IGNORE INTO packages (
name, version, requires_python, has_binary_wheel, uploaded_at
) VALUES (?, ?, ?, ?, ?);
""",
(package, str_version, requires_python, has_binary_wheel, uploaded_at),
)

for maintainer in maintainers:
db.execute(
"""
INSERT OR IGNORE INTO packages (
name, version, requires_python, has_binary_wheel, uploaded_at
) VALUES (?, ?, ?, ?, ?);
INSERT OR IGNORE INTO maintainers (name, package_name) VALUES (?, ?);
""",
(package, str_version, requires_python, has_binary_wheel, uploaded_at),
(maintainer, package),
)
db.commit()

for maintainer in get_maintainers_from_pypi(package):
db.execute("""
INSERT OR IGNORE INTO maintainers (name, package_name) VALUES (?, ?);
""", (maintainer, package))

for req in urequires_dist:
extras = get_extras(req)
req_no_specifiers = dist_from_requires_dist(req)
specifier = specifier_from_requires_dist(req).replace(
req_no_specifiers + " ", "", 1
)
if extras:
for extra in extras:
db.execute(
"""
INSERT OR IGNORE INTO deps (
name,
version,
dep_name,
dep_specifier,
extra
) VALUES (?, ?, ?, ?, ?);
""",
(package, str_version, req_no_specifiers, specifier, extra),
)
else:

for req in urequires_dist:
extras = get_extras(req)
req_no_specifiers = dist_from_requires_dist(req)
specifier = specifier_from_requires_dist(req).replace(
req_no_specifiers + " ", "", 1
)
if extras:
for extra in extras:
db.execute(
"""
INSERT OR IGNORE INTO deps (
name,
version,
dep_name,
dep_specifier
) VALUES (?, ?, ?, ?);
name,
version,
dep_name,
dep_specifier,
extra
) VALUES (?, ?, ?, ?, ?);
""",
(package, str_version, req_no_specifiers, specifier),
(package, str_version, req_no_specifiers, specifier, extra),
)
else:
db.execute(
"""
INSERT OR IGNORE INTO deps (
name,
version,
dep_name,
dep_specifier
) VALUES (?, ?, ?, ?);
""",
(package, str_version, req_no_specifiers, specifier),
)

requires_dist["dists"] = sorted(set(requires_dist["dists"]))
for extra, extra_info in list(requires_extras.items()):
requires_extras[extra]["dists"] = sorted(set(extra_info["dists"]))
requires_dist["dists"] = sorted(set(requires_dist["dists"]))
for extra, extra_info in list(requires_extras.items()):
requires_extras[extra]["dists"] = sorted(set(extra_info["dists"]))

for relv, downloads in resp["releases"].items():
for download in downloads:
if download["yanked"]:
yanked.append(relv)
break
for relv, downloads in resp["releases"].items():
for download in downloads:
if download["yanked"]:
yanked.append(relv)
break

yanked = sorted_versions(set(yanked))
if yanked:
db.execute(
"UPDATE packages SET yanked=1 WHERE name=? AND version=?;",
(package, str_version),
)
yanked = sorted_versions(set(yanked))
if yanked:
db.execute(
"UPDATE packages SET yanked=1 WHERE name=? AND version=?;",
(package, str_version),
)

db.commit()

return package


def update_data_from_pypi():
results = pool.map(update_data_for_package, packages)
for _ in tqdm(results, total=len(packages), unit="packages"):
pass

db.commit()

update_data_from_pypi()
if __name__ == "__main__":
update_data_from_pypi()

0 comments on commit c7172a3

Please sign in to comment.