diff --git a/bootstrap/hot_reloading/engine.py b/bootstrap/hot_reloading/engine.py new file mode 100644 index 0000000..a2381af --- /dev/null +++ b/bootstrap/hot_reloading/engine.py @@ -0,0 +1,258 @@ +import asyncio +import importlib +import inspect +import sys +import traceback +from types import FrameType +from typing import ( + Callable, + Optional, + Tuple, +) + +from rich.text import Text +from textual.app import App +from textual.widgets import ( + RichLog, +) + +from bootstrap.hot_reloading.module import MatchboxModule + + +class HotReloadingEngine: + def __init__(self, ui: App): + self.ui = ui + + @classmethod + def get_class_frame(cls, func: Callable, exc_traceback) -> Optional[FrameType]: + """ + Find the frame of the last callable within the scope of the MatchboxModule in + the traceback. In this instance, the MatchboxModule is a class so we want to find + the frame of the method that either (1) threw the exception, or (2) called a + function that threw (or originated) the exception. + """ + last_frame = None + for frame, _ in traceback.walk_tb(exc_traceback): + print(frame.f_code.co_qualname) + if frame.f_code.co_qualname == func.__name__: + print( + f"Found module.underlying_fn ({func.__name__}) in traceback, continuing..." + ) + for name, val in inspect.getmembers(func): + if ( + name == frame.f_code.co_name + and "self" in inspect.getargs(frame.f_code).args + ): + print(f"Found method {val} in traceback, continuing...") + last_frame = frame + return last_frame + + @classmethod + def get_lambda_child_frame( + cls, func: Callable, exc_traceback + ) -> Tuple[Optional[FrameType], Optional[str]]: + """ + Find the frame of the last callable within the scope of the MatchboxModule in + the traceback. In this instance, the MatchboxModule is a lambda function so we want + to find the frame of the first function called by the lambda. + """ + lambda_args = inspect.getargs(func.__code__) + potential_matches = {} + print(f"Lambda args: {lambda_args}") + for frame, _ in traceback.walk_tb(exc_traceback): + print(frame.f_code.co_qualname) + assert lambda_args is not None + frame_args = inspect.getargvalues(frame) + for name, val in potential_matches.items(): + print( + f"Checking candidate {name}={val} to match against {frame.f_code.co_qualname}" + ) + if val == frame.f_code.co_qualname: + print(f"Matched {name}={val} to {frame.f_code.co_qualname}") + return frame, name + elif hasattr(val, frame.f_code.co_name): + print(f"Matched {frame.f_code.co_qualname} to member of {val}") + return frame, name + for name in lambda_args.args: + print(f"Lambda arg '{name}'") + if name in frame_args.args: + print(f"Frame has arg {name} with value {frame_args.locals[name]}") + # TODO: Find the argument which initiated the call that threw! + # Which is somewhere deeper in the stack, which + # frame.f_code.co_qualname must match one of the + # frame_args.args! + # NOTE: We know the next frame in the loop WILL match one of + # this frame's arguments, either in the qual_name directly or in + # the qual_name base (the class) + potential_matches[name] = frame_args.locals[name] + return None, None + + @classmethod + def get_function_frame(cls, func: Callable, exc_traceback) -> Optional[FrameType]: + print("============= get_function_frame() =========") + last_frame = None + for frame, _ in traceback.walk_tb(exc_traceback): + print(frame.f_code.co_qualname) + if frame.f_code.co_qualname == func.__name__: + print( + f"Found module.underlying_fn ({func.__name__}) in traceback, continuing..." + ) + for name, val in inspect.getmembers(func.__module__): + if name == frame.f_code.co_name: + print(f"Found function {val} in traceback, continuing...") + last_frame = frame + print("============================================") + return last_frame + + async def catch_and_hang(self, module: MatchboxModule, *args, **kwargs): + try: + self.ui.print_info(f"Calling MatchboxModule({module.underlying_fn}) with") + self.ui.print_pretty( + { + "args": args, + "kwargs": kwargs, + "partial.args": module.partial.args, + "partial.kwargs": module.partial.keywords, + } + ) + output = await asyncio.to_thread(module, *args, **kwargs) + self.ui.print_info("Output:") + self.ui.print_pretty(output) + return output + except Exception as exception: + # If the exception came from the wrapper itself, we should not catch it! + exc_type, exc_value, exc_traceback = sys.exc_info() + if exc_traceback.tb_next is None: + self.ui.print_err( + "[ERROR] Could not find the next frame in the call stack!" + ) + elif exc_traceback.tb_next.tb_frame.f_code.co_name == "catch_and_hang": + self.ui.print_err( + f"[ERROR] Caught exception in the Builder: {exception}", + ) + else: + self.ui.print_err( + f"Caught exception: {exception}", + ) + self.ui.query_one("#traceback", RichLog).write(traceback.format_exc()) + func = module.underlying_fn + # NOTE: This frame is for the given function, which is the root of the + # call tree (our MatchboxModule's underlying function). What we want is + # to go down to the function that threw, and reload that only if it + # wasn't called anywhere in the frozen module's call tree. + frame = None + if inspect.isclass(func): + frame = self.get_class_frame(func, exc_traceback) + elif inspect.isfunction(func) and func.__name__ == "": + frame, lambda_argname = self.get_lambda_child_frame( + func, exc_traceback + ) + module.throw_lambda_argname = lambda_argname + elif inspect.isfunction(func): + frame = self.get_function_frame(func, exc_traceback) + else: + raise NotImplementedError() + if not frame: + self.ui.print_err( + f"Could not find the frame of the original function {func} in the traceback." + ) + module.throw_frame = frame + self.ui.print_info("Exception thrown in:") + self.ui.print_pretty(frame) + module.to_reload = True + self.ui.print_info("Hanged.") + await self.ui.hang(threw=True) + + async def reload_module(self, module: MatchboxModule): + if module.throw_frame is None: + self.ui.exit(1) + raise RuntimeError( + f"Module {module} is set to reload but we don't have the frame that threw!" + ) + self.ui.log_tracer( + Text( + f"Reloading code from {module.throw_frame.f_code.co_filename}", + style="purple", + ) + ) + code_obj = module.throw_frame.f_code + print(code_obj.co_qualname, inspect.getmodule(code_obj)) + code_module = inspect.getmodule(code_obj) + if code_module is None: + self.ui.exit(1) + raise RuntimeError( + f"Could not find the module for the code object {code_obj}." + ) + rld_module = importlib.reload(code_module) + if code_obj.co_qualname.endswith("__init__"): + class_name = code_obj.co_qualname.split(".")[0] + self.ui.log_tracer( + Text( + f"-> Reloading class {class_name} from module {code_module}", + style="purple", + ) + ) + rld_callable = getattr(rld_module, class_name) + if rld_callable is not None: + self.ui.log_tracer( + Text( + f"-> Reloaded class {code_obj.co_qualname} from module {code_module.__name__}", + style="cyan", + ) + ) + print(inspect.getsource(rld_callable)) + module.reload(rld_callable) + return + + else: + if code_obj.co_qualname.find(".") != -1: + class_name, _ = code_obj.co_qualname.split(".") + self.ui.log_tracer( + Text( + f"-> Reloading class {class_name} from module {code_module}", + style="purple", + ) + ) + rld_class = getattr(rld_module, class_name) + rld_callable = None + # Now find the method in the reloaded class, and replace the + # with the reloaded one. + for name, val in inspect.getmembers(rld_class): + if inspect.isfunction(val) and val.__name__ == code_obj.co_name: + self.ui.print_info( + f" -> Reloading method '{name}'", + ) + rld_callable = val + if rld_callable is not None: + self.ui.log_tracer( + Text( + f"-> Reloaded class-level method {code_obj.co_qualname} from module {code_module.__name__}", + style="cyan", + ) + ) + if module.underlying_fn.__name__ == "": + assert module.throw_lambda_argname is not None + module.reload_surgically_in_lambda( + module.throw_lambda_argname, code_obj.co_name, rld_callable + ) + else: + module.reload_surgically(code_obj.co_name, rld_callable) + return + else: + print(code_module, code_obj, code_obj.co_name) + self.ui.log_tracer( + Text( + f"-> Reloading module-level function {code_obj.co_name} from module {code_module.__name__}", + style="purple", + ) + ) + func = getattr(rld_module, code_obj.co_name) + if func is not None: + self.ui.print_info( + f" -> Reloaded module level function {code_obj.co_name}", + ) + print(inspect.getsource(func)) + module.reload(func) + return + while True: + await asyncio.sleep(1) diff --git a/bootstrap/hot_reloading/module.py b/bootstrap/hot_reloading/module.py new file mode 100644 index 0000000..e18bf14 --- /dev/null +++ b/bootstrap/hot_reloading/module.py @@ -0,0 +1,84 @@ +import uuid +from functools import partial +from types import FrameType +from typing import Any, Callable, List, Optional + +from hydra_zen.typing import Partial + + +class MatchboxModule: + def __init__(self, name: str, fn: Callable | Partial, *args, **kwargs): + self._str_rep = name + self._uid = uuid.uuid4().hex + self.underlying_fn: Callable = fn.func if isinstance(fn, partial) else fn + self.partial = partial(fn, *args, **kwargs) + self.to_reload = False + self.result = None + self.is_frozen = False + self.throw_frame: Optional[FrameType] = None + self.throw_lambda_argname: Optional[str] = None + + def reload(self, new_func: Callable) -> None: + print(f"Replacing {self.underlying_fn} with {new_func}") + self.underlying_fn = new_func + self.partial = partial( + self.underlying_fn, *self.partial.args, **self.partial.keywords + ) + self.to_reload = False + + def reload_surgically(self, method_name: str, method: Callable) -> None: + print(f"Replacing {method_name} which was {self.underlying_fn} with {method}") + setattr(self.underlying_fn, method_name, method) + self.partial = partial( + self.underlying_fn, *self.partial.args, **self.partial.keywords + ) + self.to_reload = False + + def reload_surgically_in_lambda( + self, arg_name: str, method_name: str, method: Callable + ) -> None: + print( + f"Replacing {method_name} as argument {arg_name} in lambda's {self.partial.args} or {self.partial.keywords} with {method}" + ) + if arg_name not in self.partial.keywords.keys(): + raise KeyError( + "Could not find the argument to replace in the partial kwargs!" + ) + for k, v in self.partial.keywords.items(): + print(f"Updating lambda arg {v} and") + print(f"re-passing self reference via partial {partial(method, v)}") + setattr(v, method_name, partial(method, v)) + self.partial.keywords[k] = v # Need to update when using dict iterator + self.partial = partial( + self.underlying_fn, *self.partial.args, **self.partial.keywords + ) + self.to_reload = False + + def __call__(self, module_chain: List) -> Any: + """ + Args: + module_chain: List[MatchboxModule] + """ + + def _find_module_result(module_chain: List, uid: str) -> Any: + for module in module_chain: + if module.uid == uid: + return module.result + return None + + for i, arg in enumerate(self.partial.args): + if isinstance(arg, MatchboxModule): + self.partial.args[i] = _find_module_result(module_chain, arg.uid) + for key, value in self.partial.keywords.items(): + if isinstance(value, MatchboxModule): + self.partial.keywords[key] = _find_module_result( + module_chain, value.uid + ) + return self.partial() + + def __str__(self) -> str: + return self._str_rep + + @property + def uid(self) -> str: + return f"uid-{self._uid}" diff --git a/bootstrap/launch_experiment.py b/bootstrap/launch_experiment.py index eb8e505..bd4ab6d 100644 --- a/bootstrap/launch_experiment.py +++ b/bootstrap/launch_experiment.py @@ -25,7 +25,6 @@ from rich.syntax import Syntax from torch.utils.data import DataLoader, Dataset -from bootstrap import MatchboxModule from bootstrap.factories import ( make_dataloaders, make_datasets, @@ -35,6 +34,7 @@ make_training_loss, parallelize_model, ) +from bootstrap.hot_reloading.module import MatchboxModule from bootstrap.tui.builder_ui import BuilderUI from bootstrap.tui.training_ui import TrainingUI from conf import project as project_conf @@ -118,63 +118,44 @@ def launch_builder( model: Partial[torch.nn.Module], training_loss: Partial[torch.nn.Module], ): - exp_conf = hydra_zen.to_yaml( - dict( - run_conf=run, - dataset=dataset, - model=model, - optimizer=optimizer, - scheduler=scheduler, - training_loss=training_loss, - ) + _ = data_loader + _ = optimizer + _ = scheduler + _ = trainer + _ = tester + dataset_module = MatchboxModule( + "Dataset", + dataset, # TODO: Fix the code reloading, then revert to using the dataset factory + split="train", + seed=run.seed, + progress=None, + job_id=None, ) - - async def launch_with_async_gui(): - tui = BuilderUI() - task = asyncio.create_task(tui.run_async()) - await asyncio.sleep(0.5) # Wait for the app to start up - while not tui.is_running: - await asyncio.sleep(0.01) # Wait for the app to start up - dataset_module = MatchboxModule( - "Dataset", - dataset, # TODO: Fix the code reloading, then revert to using the dataset factory - split="train", - seed=run.seed, - progress=None, - job_id=None, - ) - model_module = MatchboxModule( - "Model", - model, - encoder_input_dim=hydra_zen.just(dataset).img_dim ** 2, # type: ignore - ) - await tui.chain_up( - [ - dataset_module, - MatchboxModule( - "Dataset test", - lambda dataset_obj: dataset_obj.test(), - dataset_obj=dataset_module, - ), - model_module, - MatchboxModule( - "Model forward", - # NOTE: For now we need to call .forward() explicitly, as Matchbox - # doesn't yet handle hot code reloading for model() due to Pytorch - # wrapping - lambda model, dataset: model.forward(dataset[0][0].unsqueeze(0)), - model=model_module, - dataset=dataset_module, - ), - MatchboxModule( - "Loss", make_training_loss, run.training_mode, training_loss - ), - ] - ) - tui.run_chain() - _ = await task - - asyncio.run(launch_with_async_gui()) + model_module = MatchboxModule( + "Model", + model, + encoder_input_dim=hydra_zen.just(dataset).img_dim ** 2, # type: ignore + ) + chain = [ + dataset_module, + MatchboxModule( + "Dataset test", + lambda dataset_obj: dataset_obj.test(), + dataset_obj=dataset_module, + ), + model_module, + MatchboxModule( + "Model forward", + # NOTE: For now we need to call .forward() explicitly, as Matchbox + # doesn't yet handle hot code reloading for model() due to Pytorch + # wrapping + lambda model, dataset: model.forward(dataset[0][0].unsqueeze(0)), + model=model_module, + dataset=dataset_module, + ), + MatchboxModule("Loss", make_training_loss, run.training_mode, training_loss), + ] + BuilderUI(chain).run() def launch_experiment( diff --git a/bootstrap/tui/builder_ui.py b/bootstrap/tui/builder_ui.py index ef9f5d3..347de4d 100644 --- a/bootstrap/tui/builder_ui.py +++ b/bootstrap/tui/builder_ui.py @@ -1,15 +1,9 @@ import asyncio -import importlib -import inspect -import sys -import traceback -from types import FrameType from typing import ( Any, Callable, Iterable, List, - Optional, Tuple, ) @@ -25,20 +19,13 @@ RichLog, ) -from bootstrap.tui.widgets.logger import Logger -from bootstrap.tui.widgets.tracer import Tracer - -if __name__ == "__main__": - import os - import sys - - sys.path.append( - os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) - ) -from bootstrap import MatchboxModule +from bootstrap.hot_reloading.engine import HotReloadingEngine +from bootstrap.hot_reloading.module import MatchboxModule from bootstrap.tui.widgets.checkbox_panel import CheckboxPanel from bootstrap.tui.widgets.editor import CodeEditor from bootstrap.tui.widgets.files_tree import FilesTree +from bootstrap.tui.widgets.logger import Logger +from bootstrap.tui.widgets.tracer import Tracer class BuilderUI(App): @@ -52,24 +39,29 @@ class BuilderUI(App): BINDINGS = [ ("q", "quit", "Quit"), ("d", "toggle_dark", "Toggle dark mode"), - ("r", "reload", "Reload hot code"), + ("r", "reload", "Hot reload"), ] - def __init__(self): + def __init__(self, chain: List[MatchboxModule]): super().__init__() - self._module_chain: List[MatchboxModule] = [] + self._module_chain: List[MatchboxModule] = chain self._runner_task = None + self._engine = HotReloadingEngine(self) + + async def on_mount(self): + await self._chain_up() + self.run_chain() - async def chain_up(self, modules_seq: List[MatchboxModule]) -> None: - """Add a module (callable to interactively implement and debug) to the - run-reload chain.""" + async def _chain_up(self) -> None: keys = [] - for module in modules_seq: + for module in self._module_chain: + if not isinstance(module, MatchboxModule): + self.exit(1) + raise TypeError(f"Expected MatchboxModule, got {type(module)}") await self.query_one(CheckboxPanel).add_checkbox(str(module), module.uid) if module.uid in keys: raise ValueError(f"Duplicate module '{module}' with uid {module.uid}") keys.append(module.uid) - self._module_chain = modules_seq async def _run_chain(self) -> None: self.log_tracer("Running the chain...") @@ -81,9 +73,11 @@ async def _run_chain(self) -> None: continue if module.to_reload: self.log_tracer(Text(f"Reloading module: {module}", style="yellow")) - await self._reload_module(module) + await self._engine.reload_module(module) self.log_tracer(Text(f"Running module: {module}", style="yellow")) - module.result = await self._catch_and_hang(module, self._module_chain) + module.result = await self._engine.catch_and_hang( + module, self._module_chain + ) self.log_tracer(Text(f"{module} ran sucessfully!", style="bold green")) self.print_info("Hanged.") self.query_one("#traceback", RichLog).clear() @@ -183,217 +177,8 @@ def print_err(self, msg: str | Exception) -> None: def print_warn(self, msg: str) -> None: self.log_tracer(Text("[!] " + msg, style="bold yellow")) - def prompt(self, msg: str) -> str: - # TODO: We need to use a popup callback - self.log_tracer(Text("[?] " + msg, style="italic pink")) - return "y" - def print_info(self, msg: str) -> None: self.log_tracer(Text(msg, style="bold blue")) def print_pretty(self, msg: Any) -> None: self.log_tracer(Pretty(msg)) - - @classmethod - def get_class_frame(cls, func: Callable, exc_traceback) -> Optional[FrameType]: - """ - Find the frame of the last callable within the scope of the MatchboxModule in - the traceback. In this instance, the MatchboxModule is a class so we want to find - the frame of the method that either (1) threw the exception, or (2) called a - function that threw (or originated) the exception. - """ - last_frame = None - for frame, _ in traceback.walk_tb(exc_traceback): - for name, val in inspect.getmembers(func): - if ( - name == frame.f_code.co_name - and "self" in inspect.getargs(frame.f_code).args - ): - print(f"Found method {val} in traceback, continuing...") - last_frame = frame - return last_frame - - @classmethod - def get_lambda_child_frame( - cls, func: Callable, exc_traceback - ) -> Tuple[Optional[FrameType], Optional[str]]: - """ - Find the frame of the last callable within the scope of the MatchboxModule in - the traceback. In this instance, the MatchboxModule is a lambda function so we want - to find the frame of the first function called by the lambda. - """ - lambda_args = inspect.getargs(func.__code__) - potential_matches = {} - for frame, _ in traceback.walk_tb(exc_traceback): - assert lambda_args is not None - frame_args = inspect.getargvalues(frame) - for name, val in potential_matches.items(): - if val == frame.f_code.co_qualname: - return frame, name - elif hasattr(val, frame.f_code.co_name): - return frame, name - for name in lambda_args.args: - if name in frame_args.args: - # NOTE: Now we need to find the argument which initiated the call - # that threw! Which is somewhere deeper in the stack, which - # frame.f_code.co_qualname must match one of the frame_args.args! - # NOTE: We know the next frame in the loop WILL match one of - # this frame's arguments, either in the qual_name directly or in - # the qual_name base (the class) - potential_matches[name] = frame_args.locals[name] - return None, None - - @classmethod - def get_function_frame(cls, func: Callable, exc_traceback) -> Optional[FrameType]: - raise NotImplementedError() - - async def _catch_and_hang(self, module: MatchboxModule, *args, **kwargs): - try: - self.print_info(f"Calling MatchboxModule({module.underlying_fn}) with") - self.print_pretty( - { - "args": args, - "kwargs": kwargs, - "partial.args": module.partial.args, - "partial.kwargs": module.partial.keywords, - } - ) - output = await asyncio.to_thread(module, *args, **kwargs) - self.print_info("Output:") - self.print_pretty(output) - return output - except Exception as exception: - # If the exception came from the wrapper itself, we should not catch it! - exc_type, exc_value, exc_traceback = sys.exc_info() - if exc_traceback.tb_next is None: - self.print_err( - "[ERROR] Could not find the next frame in the call stack!" - ) - elif exc_traceback.tb_next.tb_frame.f_code.co_name == "catch_and_hang": - self.print_err( - f"[ERROR] Caught exception in the Builder: {exception}", - ) - else: - self.print_err( - f"Caught exception: {exception}", - ) - self.query_one("#traceback", RichLog).write(traceback.format_exc()) - func = module.underlying_fn - # NOTE: This frame is for the given function, which is the root of the - # call tree (our MatchboxModule's underlying function). What we want is - # to go down to the function that threw, and reload that only if it - # wasn't called anywhere in the frozen module's call tree. - frame = None - if inspect.isclass(func): - frame = self.get_class_frame(func, exc_traceback) - elif inspect.isfunction(func) and func.__name__ == "": - frame, lambda_argname = self.get_lambda_child_frame( - func, exc_traceback - ) - module.throw_lambda_argname = lambda_argname - elif inspect.isfunction(func): - frame = self.get_function_frame(func, exc_traceback) - else: - raise NotImplementedError() - if not frame: - self.print_err( - f"Could not find the frame of the original function {func} in the traceback." - ) - module.throw_frame = frame - self.print_info("Exception thrown in:") - self.print_pretty(frame) - module.to_reload = True - self.print_info("Hanged.") - await self.hang(threw=True) - - async def _reload_module(self, module: MatchboxModule): - if module.throw_frame is None: - self.exit(1) - raise RuntimeError( - f"Module {module} is set to reload but we don't have the frame that threw!" - ) - self.log_tracer( - Text( - f"Reloading code from {module.throw_frame.f_code.co_filename}", - style="purple", - ) - ) - code_obj = module.throw_frame.f_code - code_module = inspect.getmodule(code_obj) - if code_module is None: - self.exit(1) - raise RuntimeError( - f"Could not find the module for the code object {code_obj}." - ) - rld_module = importlib.reload(code_module) - if code_obj.co_qualname.endswith("__init__"): - class_name = code_obj.co_qualname.split(".")[0] - self.log_tracer( - Text( - f"-> Reloading class {class_name} from module {code_module}", - style="purple", - ) - ) - rld_callable = getattr(rld_module, class_name) - if rld_callable is not None: - self.log_tracer( - Text( - f"-> Reloaded class {code_obj.co_qualname} from module {code_module.__name__}", - style="cyan", - ) - ) - module.reload(rld_callable) - return - - else: - if code_obj.co_qualname.find(".") != -1: - class_name, _ = code_obj.co_qualname.split(".") - self.log_tracer( - Text( - f"-> Reloading class {class_name} from module {code_module}", - style="purple", - ) - ) - rld_class = getattr(rld_module, class_name) - rld_callable = None - # Now find the method in the reloaded class, and replace the - # with the reloaded one. - for name, val in inspect.getmembers(rld_class): - if inspect.isfunction(val) and val.__name__ == code_obj.co_name: - self.print_info( - f" -> Reloading method '{name}'", - ) - rld_callable = val - if rld_callable is not None: - self.log_tracer( - Text( - f"-> Reloaded class-level method {code_obj.co_qualname} from module {code_module.__name__}", - style="cyan", - ) - ) - if module.underlying_fn.__name__ == "": - assert module.throw_lambda_argname is not None - module.reload_surgically_in_lambda( - module.throw_lambda_argname, code_obj.co_name, rld_callable - ) - else: - module.reload_surgically(code_obj.co_name, rld_callable) - return - else: - print(code_module, code_obj, code_obj.co_name) - self.log_tracer( - Text( - f"-> Reloading module-level function {code_obj.co_name} from module {code_module.__name__}", - style="purple", - ) - ) - func = getattr(rld_module, code_obj.co_name) - if func is not None: - self.print_info( - f" -> Reloaded module level function {code_obj.co_name}", - ) - print(inspect.getsource(func)) - module.reload(func) - return - while True: - await asyncio.sleep(1) diff --git a/dataset/example.py b/dataset/example.py index 439d4b8..dae90b4 100644 --- a/dataset/example.py +++ b/dataset/example.py @@ -86,7 +86,7 @@ def _load( progress.update(job_id, total=length) # raise Exception("This is an exception") # test() - test_recursive() + # test_recursive() for _ in range(length): if progress is not None: assert job_id is not None