Skip to content

Commit

Permalink
Pipe() replaced by multiprocessing.Manager().Queue() and
Browse files Browse the repository at this point in the history
Procces replaced by Thread in _do_parallel function; updated requirements.txt;
  • Loading branch information
padu committed Jul 24, 2022
1 parent 015cf2e commit a7360e5
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 21 deletions.
35 changes: 17 additions & 18 deletions parallelbar/parallelbar.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from functools import partial
from collections import abc
import multiprocessing as mp
from threading import Thread

from pebble import ProcessPool
from pebble import ProcessExpired
Expand Down Expand Up @@ -44,35 +43,35 @@ def close(self):
self.disp(bar_style='warning')


def _process(func, pipe, task):
def _process(func, q, task):
result = func(task)
pipe.send([os.getpid()])
q.put(os.getpid())
return result


def _core_process_status(bar_size, bar_step, disable, pipe):
def _core_process_status(bar_size, bar_step, disable, q):
pid_dict = dict()
i = 0
while True:
result = pipe.recv()
result = q.get()
if not result:
for val in pid_dict.values():
val.close()
break
try:
pid_dict[result[0]].update()
pid_dict[result].update()
except KeyError:
i += 1
position = len(pid_dict)
pid_dict[result[0]] = ProgressBar(step=bar_step, total=bar_size, position=position, desc=f'Core {i}',
disable=disable)
pid_dict[result[0]].update()
pid_dict[result] = ProgressBar(step=bar_step, total=bar_size, position=position, desc=f'Core {i}',
disable=disable)
pid_dict[result].update()


def _process_status(bar_size, bar_step, disable, pipe):
def _process_status(bar_size, bar_step, disable, q):
bar = ProgressBar(step=bar_step, total=bar_size, disable=disable, desc='DONE')
while True:
result = pipe.recv()
result = q.get()
if not result:
bar.close()
break
Expand Down Expand Up @@ -103,7 +102,7 @@ def _update_error_bar(bar_dict, bar_parameters):
def _do_parallel(func, pool_type, tasks, initializer, initargs, n_cpu, chunk_size, core_progress,
context, total, bar_step, disable, process_timeout,
):
parent, child = mp.Pipe()
q = mp.Manager().Queue()
len_tasks = get_len(tasks, total)
if not n_cpu:
n_cpu = mp.cpu_count()
Expand All @@ -113,12 +112,12 @@ def _do_parallel(func, pool_type, tasks, initializer, initargs, n_cpu, chunk_siz
chunk_size += 1
if core_progress:
bar_size = _bar_size(chunk_size, len_tasks, n_cpu)
thread = Thread(target=_core_process_status, args=(bar_size, bar_step, disable, parent))
proc = mp.Process(target=_core_process_status, args=(bar_size, bar_step, disable, q))
else:
bar_size = len_tasks
thread = Thread(target=_process_status, args=(bar_size, bar_step, disable, parent))
thread.start()
target = partial(_process, func, child)
proc = mp.Process(target=_process_status, args=(bar_size, bar_step, disable, q))
proc.start()
target = partial(_process, func, q)
bar_parameters = dict(total=len_tasks, disable=disable, position=1, desc='ERROR', colour='red')
error_bar = {}
result = list()
Expand Down Expand Up @@ -156,8 +155,8 @@ def _do_parallel(func, pool_type, tasks, initializer, initargs, n_cpu, chunk_siz
result.append(e)
if error_bar:
error_bar['bar'].close()
child.send(None)
thread.join()
q.put(None)
proc.join()
return result


Expand Down
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
colorama==0.4.4
colorama==0.4.5
Pebble==4.6.3
tqdm==4.62.0
tqdm==4.64.0
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

setup(
name='parallelbar',
version='0.2.12',
version='0.2.13',
packages=find_packages(),
author='Dubovik Pavel',
author_email='geometryk@gmail.com',
Expand Down

0 comments on commit a7360e5

Please sign in to comment.