diff --git a/parallelbar/parallelbar.py b/parallelbar/parallelbar.py index 5cb6ed6..7e8fabc 100644 --- a/parallelbar/parallelbar.py +++ b/parallelbar/parallelbar.py @@ -2,7 +2,6 @@ from functools import partial from collections import abc import multiprocessing as mp -from threading import Thread from pebble import ProcessPool from pebble import ProcessExpired @@ -44,35 +43,35 @@ def close(self): self.disp(bar_style='warning') -def _process(func, pipe, task): +def _process(func, q, task): result = func(task) - pipe.send([os.getpid()]) + q.put(os.getpid()) return result -def _core_process_status(bar_size, bar_step, disable, pipe): +def _core_process_status(bar_size, bar_step, disable, q): pid_dict = dict() i = 0 while True: - result = pipe.recv() + result = q.get() if not result: for val in pid_dict.values(): val.close() break try: - pid_dict[result[0]].update() + pid_dict[result].update() except KeyError: i += 1 position = len(pid_dict) - pid_dict[result[0]] = ProgressBar(step=bar_step, total=bar_size, position=position, desc=f'Core {i}', - disable=disable) - pid_dict[result[0]].update() + pid_dict[result] = ProgressBar(step=bar_step, total=bar_size, position=position, desc=f'Core {i}', + disable=disable) + pid_dict[result].update() -def _process_status(bar_size, bar_step, disable, pipe): +def _process_status(bar_size, bar_step, disable, q): bar = ProgressBar(step=bar_step, total=bar_size, disable=disable, desc='DONE') while True: - result = pipe.recv() + result = q.get() if not result: bar.close() break @@ -103,7 +102,7 @@ def _update_error_bar(bar_dict, bar_parameters): def _do_parallel(func, pool_type, tasks, initializer, initargs, n_cpu, chunk_size, core_progress, context, total, bar_step, disable, process_timeout, ): - parent, child = mp.Pipe() + q = mp.Manager().Queue() len_tasks = get_len(tasks, total) if not n_cpu: n_cpu = mp.cpu_count() @@ -113,12 +112,12 @@ def _do_parallel(func, pool_type, tasks, initializer, initargs, n_cpu, chunk_siz chunk_size += 1 if core_progress: bar_size = _bar_size(chunk_size, len_tasks, n_cpu) - thread = Thread(target=_core_process_status, args=(bar_size, bar_step, disable, parent)) + proc = mp.Process(target=_core_process_status, args=(bar_size, bar_step, disable, q)) else: bar_size = len_tasks - thread = Thread(target=_process_status, args=(bar_size, bar_step, disable, parent)) - thread.start() - target = partial(_process, func, child) + proc = mp.Process(target=_process_status, args=(bar_size, bar_step, disable, q)) + proc.start() + target = partial(_process, func, q) bar_parameters = dict(total=len_tasks, disable=disable, position=1, desc='ERROR', colour='red') error_bar = {} result = list() @@ -156,8 +155,8 @@ def _do_parallel(func, pool_type, tasks, initializer, initargs, n_cpu, chunk_siz result.append(e) if error_bar: error_bar['bar'].close() - child.send(None) - thread.join() + q.put(None) + proc.join() return result diff --git a/requirements.txt b/requirements.txt index 9ccbfc9..9898939 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,3 @@ -colorama==0.4.4 +colorama==0.4.5 Pebble==4.6.3 -tqdm==4.62.0 +tqdm==4.64.0 diff --git a/setup.py b/setup.py index 71bf91e..74a1947 100644 --- a/setup.py +++ b/setup.py @@ -6,7 +6,7 @@ setup( name='parallelbar', - version='0.2.12', + version='0.2.13', packages=find_packages(), author='Dubovik Pavel', author_email='geometryk@gmail.com',