Skip to content

Commit

Permalink
feat: choose worker recipes by depth level
Browse files Browse the repository at this point in the history
  • Loading branch information
aliciaaevans committed Dec 27, 2023
1 parent 8255afd commit 7c4db37
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 9 deletions.
33 changes: 26 additions & 7 deletions bioconda_utils/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,12 @@
import os
import logging
import itertools
import time

from typing import List, Optional
from bioconda_utils.skiplist import Skiplist
from bioconda_utils.build_failure import BuildFailureRecord
from bioconda_utils.githandler import GitHandler

import conda
from conda.exports import UnsatisfiableError
from conda_build.exceptions import DependencyNeedsBuildingError
import networkx as nx
Expand Down Expand Up @@ -230,20 +228,38 @@ def remove_cycles(dag, name2recipes, failed, skip_dependent):
return dag.subgraph(name for name in dag if name not in nodes_in_cycles)


def get_subdags(dag, n_workers, worker_offset):
def get_subdags(dag, n_workers, worker_offset, subdag_depth = 0):
if n_workers > 1 and worker_offset >= n_workers:
raise ValueError(
"n-workers is less than the worker-offset given! "
"Either decrease --n-workers or decrease --worker-offset!")

# Get connected subdags and sort by nodes
# If subdag_depth is None, each root node and all children (not previously assigned) are assigned to the same worker.
# This may fail when attempting to build child nodes with parents assigned to other workers.
# If subdag_depth is set, only nodes of a certain depth will be built (i.e., 0: only root nodes,
# 1: only nodes with parents that are root nodes, etc.). They are assigned evenly across workers.
if n_workers > 1:
root_nodes = sorted([k for (k, v) in dag.in_degree() if v == 0])
nodes = set()
found = set()
children = []

if subdag_depth is not None:
working_dag = nx.DiGraph(dag)
# Only build the current "root" nodes after removing
for i in range(0, subdag_depth + 1):
print("{} recipes at depth {}".format(len(root_nodes), i))
if len(root_nodes) == 0:
break
if i < subdag_depth:
working_dag.remove_nodes_from(root_nodes)
root_nodes = sorted([k for (k, v) in working_dag.in_degree() if v == 0])

for idx, root_node in enumerate(root_nodes):
# Flatten the nested list
children = itertools.chain(*nx.dfs_successors(dag, root_node).values())
if subdag_depth is None:
# Flatten the nested list
children = itertools.chain(*nx.dfs_successors(dag, root_node).values())
# This is the only obvious way of ensuring that all nodes are included
# in exactly 1 subgraph
found.add(root_node)
Expand All @@ -256,6 +272,7 @@ def get_subdags(dag, n_workers, worker_offset):
else:
for child in children:
found.add(child)

subdags = dag.subgraph(list(nodes))
logger.info("Building and testing sub-DAGs %i in each group of %i, which is %i packages", worker_offset, n_workers, len(subdags.nodes()))
else:
Expand Down Expand Up @@ -300,7 +317,8 @@ def build_recipes(recipe_folder: str, config_path: str, recipes: List[str],
mulled_conda_image: str = pkg_test.MULLED_CONDA_IMAGE,
record_build_failures: bool = False,
skiplist_leafs: bool = False,
live_logs: bool = True):
live_logs: bool = True,
subdag_depth: int = None):
"""
Build one or many bioconda packages.
Expand Down Expand Up @@ -329,6 +347,7 @@ def build_recipes(recipe_folder: str, config_path: str, recipes: List[str],
keep_old_work: Do not remove anything from environment, even after successful build and test.
skiplist_leafs: If True, blacklist leaf packages that fail to build
live_logs: If True, enable live logging during the build process
subdag_depth: Number of levels of nodes to skip. (Optional, only if using n_workers)
"""
if not recipes:
logger.info("Nothing to be done.")
Expand Down Expand Up @@ -364,7 +383,7 @@ def build_recipes(recipe_folder: str, config_path: str, recipes: List[str],

skip_dependent = defaultdict(list)
dag = remove_cycles(dag, name2recipes, failed, skip_dependent)
subdag = get_subdags(dag, n_workers, worker_offset)
subdag = get_subdags(dag, n_workers, worker_offset, subdag_depth)
if not subdag:
logger.info("Nothing to be done.")
return True
Expand Down
7 changes: 5 additions & 2 deletions bioconda_utils/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,7 @@ def do_lint(recipe_folder, config, packages="*", cache=None, list_checks=False,
@arg("--record-build-failures", action="store_true", help="Record build failures in build_failure.yaml next to the recipe.")
@arg("--skiplist-leafs", action="store_true", help="Skiplist leaf recipes (i.e. ones that are not depended on by any other recipes) that fail to build.")
@arg('--disable-live-logs', action='store_true', help="Disable live logging during the build process")
@arg('--subdag-depth', type=int, help="Number of levels of root nodes to skip. (Optional, and only if using n_workers)")
@enable_logging()
def build(recipe_folder, config, packages="*", git_range=None, testonly=False,
force=False, docker=None, mulled_test=False, build_script_template=None,
Expand All @@ -445,7 +446,8 @@ def build(recipe_folder, config, packages="*", git_range=None, testonly=False,
docker_base_image=None,
record_build_failures=False,
skiplist_leafs=False,
disable_live_logs=False):
disable_live_logs=False,
subdag_depth=None):
cfg = utils.load_config(config)
setup = cfg.get('setup', None)
if setup:
Expand Down Expand Up @@ -506,7 +508,8 @@ def build(recipe_folder, config, packages="*", git_range=None, testonly=False,
mulled_conda_image=mulled_conda_image,
record_build_failures=record_build_failures,
skiplist_leafs=skiplist_leafs,
live_logs=(not disable_live_logs))
live_logs=(not disable_live_logs),
subdag_depth=subdag_depth)
exit(0 if success else 1)


Expand Down

0 comments on commit 7c4db37

Please sign in to comment.