diff --git a/src/ansible_navigator/data/image_introspect.py b/src/ansible_navigator/data/image_introspect.py index 631c04bf8..895abc3bf 100644 --- a/src/ansible_navigator/data/image_introspect.py +++ b/src/ansible_navigator/data/image_introspect.py @@ -2,11 +2,11 @@ from __future__ import annotations import json -import multiprocessing import os import re import subprocess import sys +import threading from queue import Queue from types import SimpleNamespace @@ -17,8 +17,6 @@ JSONTypes = Union[bool, int, str, dict, list] -PROCESSES = (multiprocessing.cpu_count() - 1) or 1 - class Command(SimpleNamespace): """Abstraction for a details about a shell command.""" @@ -51,7 +49,7 @@ def run_command(command: Command) -> None: command.errors = [str(exc.stderr)] -def worker(pending_queue: multiprocessing.Queue, completed_queue: multiprocessing.Queue) -> None: +def worker(pending_queue: Queue, completed_queue: Queue) -> None: """Run a command from pending, parse, and place in completed. :param pending_queue: A queue with plugins to process @@ -80,28 +78,8 @@ def __init__(self): self._completed_queue: Queue | None = None self._pending_queue: Queue | None = None - @staticmethod - def run_single_process(command_classes: Any): - """Run commands with a single process. - - :param command_classes: All command classes to be run - :returns: The results from running all commands - """ - all_commands = tuple( - command for command_class in command_classes for command in command_class.commands - ) - results = [] - for command in all_commands: - run_command(command) - try: - command.parse(command) - except Exception as exc: # noqa: BLE001 - command.errors = command.errors + [str(exc)] - results.append(command) - return results - - def run_multi_process(self, command_classes): - """Run commands with multiple processes. + def run_multi_thread(self, command_classes): + """Run commands with multiple threads. Workers are started to read from pending queue. Exit when the number of results is equal to the number @@ -111,9 +89,9 @@ def run_multi_process(self, command_classes): :returns: The results from running all commands """ if self._completed_queue is None: - self._completed_queue = multiprocessing.Manager().Queue() + self._completed_queue = Queue() if self._pending_queue is None: - self._pending_queue = multiprocessing.Manager().Queue() + self._pending_queue = Queue() results = {} all_commands = tuple( command for command_class in command_classes for command in command_class.commands @@ -129,10 +107,10 @@ def start_workers(self, jobs): :param jobs: The jobs to be run """ - worker_count = min(len(jobs), PROCESSES) + worker_count = len(jobs) processes = [] for _proc in range(worker_count): - proc = multiprocessing.Process( + proc = threading.Thread( target=worker, args=(self._pending_queue, self._completed_queue), ) @@ -409,7 +387,7 @@ def main(serialize: bool = True) -> dict[str, JSONTypes] | None: PythonPackages(), SystemPackages(), ] - results = command_runner.run_multi_process(commands) + results = command_runner.run_multi_thread(commands) for result in results: result_as_dict = vars(result) result_as_dict.pop("parse")