Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: choose worker recipes by depth level #950

Merged
merged 3 commits into from
Apr 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 25 additions & 6 deletions bioconda_utils/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,12 @@
import os
import logging
import itertools
import time

from typing import List, Optional
from bioconda_utils.skiplist import Skiplist
from bioconda_utils.build_failure import BuildFailureRecord
from bioconda_utils.githandler import GitHandler

import conda
from conda.exports import UnsatisfiableError
from conda_build.exceptions import DependencyNeedsBuildingError
import networkx as nx
Expand Down Expand Up @@ -230,20 +228,38 @@ def remove_cycles(dag, name2recipes, failed, skip_dependent):
return dag.subgraph(name for name in dag if name not in nodes_in_cycles)


def get_subdags(dag, n_workers, worker_offset):
def get_subdags(dag, n_workers, worker_offset, subdag_depth):
if n_workers > 1 and worker_offset >= n_workers:
raise ValueError(
"n-workers is less than the worker-offset given! "
"Either decrease --n-workers or decrease --worker-offset!")

# Get connected subdags and sort by nodes
# If subdag_depth is None, each root node and all children (not previously assigned) are assigned to the same worker.
# This may fail when attempting to build child nodes with parents assigned to other workers.
# If subdag_depth is set, only nodes of a certain depth will be built (i.e., 0: only root nodes,
# 1: only nodes with parents that are root nodes, etc.). They are assigned evenly across workers.
if n_workers > 1:
root_nodes = sorted([k for (k, v) in dag.in_degree() if v == 0])
nodes = set()
found = set()
children = []

if subdag_depth is not None:
working_dag = nx.DiGraph(dag)
# Only build the current "root" nodes after removing
for i in range(0, subdag_depth + 1):
print("{} recipes at depth {}".format(len(root_nodes), i))
if len(root_nodes) == 0:
break
if i < subdag_depth:
working_dag.remove_nodes_from(root_nodes)
root_nodes = sorted([k for (k, v) in working_dag.in_degree() if v == 0])

for idx, root_node in enumerate(root_nodes):
# Flatten the nested list
children = itertools.chain(*nx.dfs_successors(dag, root_node).values())
if subdag_depth is None:
# Flatten the nested list
children = itertools.chain(*nx.dfs_successors(dag, root_node).values())
# This is the only obvious way of ensuring that all nodes are included
# in exactly 1 subgraph
found.add(root_node)
Expand All @@ -256,6 +272,7 @@ def get_subdags(dag, n_workers, worker_offset):
else:
for child in children:
found.add(child)

subdags = dag.subgraph(list(nodes))
logger.info("Building and testing sub-DAGs %i in each group of %i, which is %i packages", worker_offset, n_workers, len(subdags.nodes()))
else:
Expand Down Expand Up @@ -305,6 +322,7 @@ def build_recipes(recipe_folder: str, config_path: str, recipes: List[str],
skiplist_leafs: bool = False,
live_logs: bool = True,
exclude: List[str] = None,
subdag_depth: int = None
):
"""
Build one or many bioconda packages.
Expand Down Expand Up @@ -336,6 +354,7 @@ def build_recipes(recipe_folder: str, config_path: str, recipes: List[str],
live_logs: If True, enable live logging during the build process
exclude: list of recipes to exclude. Typically used for
temporary exclusion; otherwise consider adding recipe to skiplist.
subdag_depth: Number of levels of nodes to skip. (Optional, only if using n_workers)
"""
if not recipes:
logger.info("Nothing to be done.")
Expand Down Expand Up @@ -375,7 +394,7 @@ def build_recipes(recipe_folder: str, config_path: str, recipes: List[str],

skip_dependent = defaultdict(list)
dag = remove_cycles(dag, name2recipes, failed, skip_dependent)
subdag = get_subdags(dag, n_workers, worker_offset)
subdag = get_subdags(dag, n_workers, worker_offset, subdag_depth)
if not subdag:
logger.info("Nothing to be done.")
return True
Expand Down
5 changes: 4 additions & 1 deletion bioconda_utils/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,7 @@ def do_lint(recipe_folder, config, packages="*", cache=None, list_checks=False,
@arg("--skiplist-leafs", action="store_true", help="Skiplist leaf recipes (i.e. ones that are not depended on by any other recipes) that fail to build.")
@arg('--disable-live-logs', action='store_true', help="Disable live logging during the build process")
@arg('--exclude', nargs='+', help='Packages to exclude during this run')
@arg('--subdag-depth', type=int, help="Number of levels of root nodes to skip. (Optional, and only if using n_workers)")
@enable_logging()
def build(recipe_folder, config, packages="*", git_range=None, testonly=False,
force=False, docker=None, mulled_test=False, build_script_template=None,
Expand All @@ -447,7 +448,8 @@ def build(recipe_folder, config, packages="*", git_range=None, testonly=False,
record_build_failures=False,
skiplist_leafs=False,
disable_live_logs=False,
exclude=None):
exclude=None,
subdag_depth=None):
cfg = utils.load_config(config)
setup = cfg.get('setup', None)
if setup:
Expand Down Expand Up @@ -510,6 +512,7 @@ def build(recipe_folder, config, packages="*", git_range=None, testonly=False,
skiplist_leafs=skiplist_leafs,
live_logs=(not disable_live_logs),
exclude=exclude,
subdag_depth=subdag_depth
)
exit(0 if success else 1)

Expand Down