From 27319f711c727fb380b3e9518c7c4d8576a4c8a7 Mon Sep 17 00:00:00 2001 From: anna-grim Date: Fri, 12 Jan 2024 00:48:36 +0000 Subject: [PATCH] refactor : asynchronous initial graph build --- src/deep_neurographs/geometry_utils.py | 8 +-- src/deep_neurographs/graph_utils.py | 13 +++-- src/deep_neurographs/intake.py | 72 ++++++++++++++++++-------- src/deep_neurographs/neurograph.py | 28 +++++----- src/deep_neurographs/swc_utils.py | 11 ++-- src/deep_neurographs/utils.py | 6 +-- 6 files changed, 83 insertions(+), 55 deletions(-) diff --git a/src/deep_neurographs/geometry_utils.py b/src/deep_neurographs/geometry_utils.py index 7bcb57b..ad929e7 100644 --- a/src/deep_neurographs/geometry_utils.py +++ b/src/deep_neurographs/geometry_utils.py @@ -9,7 +9,9 @@ # Directional Vectors -def get_directional(neurograph, i, proposal_tangent, window=5, n_svd_points=10): +def get_directional( + neurograph, i, proposal_tangent, window=5, n_svd_points=10 +): directionals = [] d = n_svd_points for branch in neurograph.get_branches(i): @@ -18,7 +20,7 @@ def get_directional(neurograph, i, proposal_tangent, window=5, n_svd_points=10): elif branch.shape[0] <= d: xyz = deepcopy(branch) else: - xyz = deepcopy(branch[d : window + d, :]) + xyz = deepcopy(branch[d: window + d, :]) directionals.append(compute_tangent(xyz)) # Determine best @@ -94,7 +96,7 @@ def get_profile(img, xyz_arr, window=[5, 5, 5]): def fill_path(img, path, val=-1): for xyz in path: x, y, z = tuple(np.floor(xyz).astype(int)) - img[x - 1 : x + 2, y - 1 : y + 2, z - 1 : z + 2] = val + img[x - 1: x + 2, y - 1: y + 2, z - 1: z + 2] = val return img diff --git a/src/deep_neurographs/graph_utils.py b/src/deep_neurographs/graph_utils.py index 79db58e..d82c084 100644 --- a/src/deep_neurographs/graph_utils.py +++ b/src/deep_neurographs/graph_utils.py @@ -21,16 +21,15 @@ """ -from copy import deepcopy from random import sample import networkx as nx import numpy as np -from deep_neurographs import geometry_utils, swc_utils, utils +from deep_neurographs import geometry_utils, swc_utils -def get_irreducibles(swc_dict, prune=True, depth=16, smooth=True): +def get_irreducibles(swc_dict, swc_id=None, prune=True, depth=16, smooth=True): """ Gets irreducible components of the graph stored in "swc_dict". The irreducible components consist of the leaf and junction nodes along with @@ -40,6 +39,9 @@ def get_irreducibles(swc_dict, prune=True, depth=16, smooth=True): ---------- swc_dict : dict Contents of an swc file. + swc_id : str, optional + Filename of swc which is used to run this routine with + multiprocessing. The default is None. prune : bool, optional Indication of whether to prune short branches. The default is True. depth : int, optional @@ -85,7 +87,10 @@ def get_irreducibles(swc_dict, prune=True, depth=16, smooth=True): nbs = append_value(nbs, root, j) nbs = append_value(nbs, j, root) root = None - return {"leafs": leafs, "junctions": junctions, "edges": edges} + + # Output + irreducibles = {"leafs": leafs, "junctions": junctions, "edges": edges} + return swc_id, irreducibles def get_irreducible_nodes(graph): diff --git a/src/deep_neurographs/intake.py b/src/deep_neurographs/intake.py index 997a18f..989c33e 100644 --- a/src/deep_neurographs/intake.py +++ b/src/deep_neurographs/intake.py @@ -8,9 +8,12 @@ """ -import concurrent.futures import os -from concurrent.futures import ThreadPoolExecutor, as_completed +from concurrent.futures import ( + ProcessPoolExecutor, + ThreadPoolExecutor, + as_completed, +) from io import BytesIO from time import time from zipfile import ZipFile @@ -65,7 +68,7 @@ def build_neurograph_from_local( prune=prune, prune_depth=prune_depth, smooth=smooth, - ) + ) print(f"build_neurograph(): {time() - t0} seconds") # Generate proposals @@ -149,8 +152,7 @@ def build_neurograph_from_gcs_zips( ) if search_radius > 0: neurograph.generate_proposals( - search_radius, - n_proposals_per_leaf=n_proposals_per_leaf, + search_radius, n_proposals_per_leaf=n_proposals_per_leaf ) return neurograph @@ -215,7 +217,6 @@ def download_zip(bucket, zip_path, min_size=0): def count_files_in_zips(bucket, zip_paths): - t0 = time() file_cnt = 0 for zip_path in zip_paths: zip_blob = bucket.blob(zip_path) @@ -244,7 +245,7 @@ def list_gcs_filenames(bucket, cloud_path, extension): # -- Build neurograph --- -def build_neurograph( +def build_neurograph_old( swc_dicts, bbox=None, img_path=None, @@ -254,22 +255,53 @@ def build_neurograph( ): # Extract irreducibles t0 = time() - n_components = len(swc_dicts) - irreducibles = [None] * n_components - for i in range(n_components): - irreducibles[i] = gutils.get_irreducibles( - swc_dicts[i], prune=prune, depth=prune_depth, smooth=smooth + irreducibles = dict() + for key in swc_dicts.keys(): + irreducibles[key] = gutils.get_irreducibles( + swc_dicts[key], prune=prune, depth=prune_depth, smooth=smooth ) print(f" --> get_irreducibles(): {time() - t0} seconds") # Build neurograph t0 = time() - neurograph = NeuroGraph( - bbox=bbox, - img_path=img_path, - ) - for i in range(n_components): - neurograph.add_immutables(swc_dicts[i], irreducibles[i]) + neurograph = NeuroGraph(bbox=bbox, img_path=img_path) + for key in swc_dicts.keys(): + neurograph.add_immutables(swc_dicts[key], irreducibles[key]) + print(f" --> add_irreducibles(): {time() - t0} seconds") + return neurograph + + +def build_neurograph( + swc_dicts, + bbox=None, + img_path=None, + prune=PRUNE, + prune_depth=PRUNE_DEPTH, + smooth=SMOOTH, +): + # Extract irreducibles + irreducibles = dict() + with ProcessPoolExecutor() as executor: + # Assign Processes + processes = [None] * len(swc_dicts) + for i, key in enumerate(swc_dicts.keys()): + processes[i] = executor.submit( + gutils.get_irreducibles, + swc_dicts[key], + key, + prune, + prune_depth, + smooth, + ) + for process in as_completed(processes): + process_id, result = process.result() + irreducibles[process_id] = result + + # Build neurograph + t0 = time() + neurograph = NeuroGraph(bbox=bbox, img_path=img_path) + for key in swc_dicts.keys(): + neurograph.add_immutables(irreducibles[key], swc_dicts[key], key) print(f" --> add_irreducibles(): {time() - t0} seconds") return neurograph @@ -283,8 +315,6 @@ def get_paths(swc_dir): def get_start_ids(swc_dicts): - # runtime: ~ 1 minute - t0 = time() node_ids = [] cnt = 0 for swc_dict in swc_dicts: @@ -308,6 +338,6 @@ def report_runtimes( print( f"Runtime for Zips {files_processed}: {round(chunk_runtime, 4)} seconds" ) - print(f"Zip Processing Rate: {file_rate} seconds") + print(f"Zip Processing Rate: {rate} seconds") print(f"Approximate Total Runtime: {round(eta, 4)} minutes") print("") diff --git a/src/deep_neurographs/neurograph.py b/src/deep_neurographs/neurograph.py index e648c6c..afbc893 100644 --- a/src/deep_neurographs/neurograph.py +++ b/src/deep_neurographs/neurograph.py @@ -9,7 +9,6 @@ """ from copy import deepcopy -from time import time import networkx as nx import numpy as np @@ -18,7 +17,7 @@ from deep_neurographs import geometry_utils from deep_neurographs import graph_utils as gutils -from deep_neurographs import swc_utils, utils +from deep_neurographs import utils from deep_neurographs.densegraph import DenseGraph from deep_neurographs.geometry_utils import dist as get_dist @@ -34,11 +33,7 @@ class NeuroGraph(nx.Graph): """ def __init__( - self, - bbox=None, - swc_dir=None, - img_path=None, - label_mask=None, + self, bbox=None, swc_dir=None, img_path=None, label_mask=None ): super(NeuroGraph, self).__init__() # Initialize paths @@ -85,7 +80,7 @@ def init_densegraph(self): self.densegraph = DenseGraph(self.swc_paths) # --- Add nodes or edges --- - def add_immutables(self, swc_dict, irreducibles): + def add_immutables(self, irreducibles, swc_dict, swc_id): # Add nodes node_id = dict() leafs = irreducibles["leafs"] @@ -96,11 +91,10 @@ def add_immutables(self, swc_dict, irreducibles): node_id[i], xyz=np.array(swc_dict["xyz"][i]), radius=swc_dict["radius"][i], - swc_id=swc_dict["swc_id"], + swc_id=swc_id, ) # Add edges - t0 = time() edges = irreducibles["edges"] for i, j in edges.keys(): # Get edge @@ -111,11 +105,7 @@ def add_immutables(self, swc_dict, irreducibles): # Add edge self.immutable_edges.add(frozenset(edge)) self.add_edge( - node_id[i], - node_id[j], - xyz=xyz, - radius=radii, - swc_id=swc_dict["swc_id"] + node_id[i], node_id[j], xyz=xyz, radius=radii, swc_id=swc_id ) xyz_to_edge = dict((tuple(xyz), edge) for xyz in xyz) check_xyz = set(xyz_to_edge.keys()) @@ -133,7 +123,13 @@ def add_immutables(self, swc_dict, irreducibles): self.junctions.add(node_id[j]) # --- Proposal Generation --- - def generate_proposals(self, search_radius, n_proposals_per_leaf=3, optimize=False, optimization_depth=10): + def generate_proposals( + self, + search_radius, + n_proposals_per_leaf=3, + optimize=False, + optimization_depth=10, + ): """ Generates edges for the graph. diff --git a/src/deep_neurographs/swc_utils.py b/src/deep_neurographs/swc_utils.py index a97df60..a234808 100644 --- a/src/deep_neurographs/swc_utils.py +++ b/src/deep_neurographs/swc_utils.py @@ -9,10 +9,6 @@ """ -import os -from copy import deepcopy as cp -from itertools import repeat - import networkx as nx import numpy as np @@ -23,11 +19,10 @@ # -- io utils -- def process_local_paths(paths, min_size, bbox=None): - swc_dicts = [] + swc_dicts = dict() for path in paths: - swc_dict_i = parse_local_swc(path, bbox=bbox) - swc_dict_i["swc_id"] = utils.get_swc_id(path) - swc_dicts.append(swc_dict_i) + swc_id = utils.get_swc_id(path) + swc_dicts[swc_id] = parse_local_swc(path, bbox=bbox) return swc_dicts diff --git a/src/deep_neurographs/utils.py b/src/deep_neurographs/utils.py index 0cd9b0c..17b8602 100644 --- a/src/deep_neurographs/utils.py +++ b/src/deep_neurographs/utils.py @@ -255,21 +255,21 @@ def open_tensorstore(path, driver): def read_img_chunk(img, xyz, shape): start, end = get_start_end(xyz, shape) return img[ - start[2] : end[2], start[1] : end[1], start[0] : end[0] + start[2]: end[2], start[1]: end[1], start[0]: end[0] ].transpose(2, 1, 0) def get_chunk(arr, xyz, shape): start, end = get_start_end(xyz, shape) return deepcopy( - arr[start[0] : end[0], start[1] : end[1], start[2] : end[2]] + arr[start[0]: end[0], start[1]: end[1], start[2]: end[2]] ) def read_tensorstore(ts_arr, xyz, shape): start, end = get_start_end(xyz, shape) return ( - ts_arr[start[0] : end[0], start[1] : end[1], start[2] : end[2]] + ts_arr[start[0]: end[0], start[1]: end[1], start[2]: end[2]] .read() .result() )