Skip to content

Commit

Permalink
fixed issue #4; README was changed;
Browse files Browse the repository at this point in the history
  • Loading branch information
Pavel Dubovik committed Nov 14, 2023
1 parent d4b13b6 commit 773ff6b
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 60 deletions.
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* [Usage](#Usage)
* [Exception handling](#exception-handling)
* [Changelog](#Changelog)
* [New in version 2.4](#new-in-version-2.4)
* [New in version 2.3](#new-in-version-2.3)
* [New in version 1.3](#new-in-version-1.3)
* [New in version 1.2](#new-in-version-1.2)
Expand Down Expand Up @@ -135,6 +136,12 @@ Exception handling has also been added to methods **progress_imap** and **progre
<a name="Changelog"></a>
## Changelog

<a name="new-in-version-2.4"></a>
### Version 2.4
- fixed [issue](https://github.com/dubovikmaster/parallelbar/issues/4)
- For **Windows OS**, when using the `add_progress` decorator, the function being decorated no longer needs the `worker_queue` keyword argument.


<a name="new-in-version-2.3"></a>
### New in version 2.3
- added `wrappers` module with which contains decorators:
Expand Down
7 changes: 0 additions & 7 deletions parallelbar/_worker_queue.py

This file was deleted.

70 changes: 26 additions & 44 deletions parallelbar/parallelbar.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import platform
import time
from functools import partial
from collections import abc
Expand All @@ -15,9 +14,8 @@
from .wrappers import (
ProgressStatus,
stopit_after_timeout,
add_progress
init_worker,
)
from ._worker_queue import _WORKER_QUEUE

try:
import dill
Expand Down Expand Up @@ -74,38 +72,33 @@ def _deserialize(func, task):
return dill.loads(func)(task)


class _LocalFunctions:
# From https://stackoverflow.com/questions/72766345/attributeerror-cant-pickle-local-object-in-multiprocessing
@classmethod
def add_functions(cls, *args):
for function in args:
setattr(cls, function.__name__, function)
function.__qualname__ = cls.__qualname__ + '.' + function.__name__


def _func_wrapped(func, error_handling, set_error_value, worker_queue, task, cnt=count(1), state=ProgressStatus()):
def _func_wrapped(func, error_handling, set_error_value, worker_queue, disable, task, cnt=count(1),
state=ProgressStatus()):
try:
result = func(task)
except BaseException as e:
except Exception as e:
if error_handling == 'raise':
worker_queue.put((None, -1))
if not disable:
worker_queue.put((None, -1))
raise
else:
worker_queue.put((1, task))
if not disable:
worker_queue.put((1, task))
if set_error_value is None:
return e
return set_error_value
else:
updated = next(cnt)
time_now = time.perf_counter()
delta_t = time_now - state.last_update_t
if updated == state.next_update or delta_t > .25:
delta_i = updated - state.last_update_val
if not disable:
updated = next(cnt)
time_now = time.perf_counter()
delta_t = time_now - state.last_update_t
if updated == state.next_update or delta_t > .25:
delta_i = updated - state.last_update_val

state.next_update += max(int((delta_i / delta_t) * .25), 1)
state.last_update_val = updated
state.last_update_t = time_now
worker_queue.put_nowait((0, delta_i))
state.next_update += max(int((delta_i / delta_t) * .25), 1)
state.last_update_val = updated
state.last_update_t = time_now
worker_queue.put_nowait((0, delta_i))

return result

Expand All @@ -115,24 +108,13 @@ def _do_parallel(func, pool_type, tasks, initializer, initargs, n_cpu, chunk_siz
return_failed_tasks,
):
raised_exception = False
queue = mp.Manager().Queue() if _WORKER_QUEUE is None else _WORKER_QUEUE
queue = mp.Manager().Queue()
error_queue = mp.Manager().Queue() if return_failed_tasks else None
len_tasks = get_len(tasks, total)
if need_serialize:
func = partial(_deserialize, func)
new_func = partial(func)
if not used_decorators:
if platform.system() not in ['Darwin', 'Windows']:
@add_progress(error_handling=error_behavior, set_error_value=set_error_value)
def new_func(task):
return func(task)

_LocalFunctions.add_functions(new_func)
else:
new_func = partial(_func_wrapped, func, error_behavior, set_error_value, queue)
else:
if platform.system() in ['Darwin', 'Windows']:
new_func = partial(new_func, worker_queue=queue)
func = partial(_func_wrapped, func, error_behavior, set_error_value, queue, disable)
bar_size = len_tasks
thread = Thread(target=_process_status,
args=(bar_size, queue),
Expand All @@ -143,17 +125,19 @@ def new_func(task):
if not disable:
thread.start()
if executor == 'threads':
exc_pool = mp.pool.ThreadPool(n_cpu, initializer=initializer, initargs=initargs)
exc_pool = mp.pool.ThreadPool(n_cpu, initializer=init_worker,
initargs=(queue, initializer, initargs))
else:
exc_pool = mp.get_context(context).Pool(maxtasksperchild=maxtasksperchild,
processes=n_cpu, initializer=initializer, initargs=initargs)
processes=n_cpu, initializer=init_worker,
initargs=(queue, initializer, initargs))
with exc_pool as p:
if pool_type == 'map':
result = p.map(new_func, tasks, chunksize=chunk_size)
result = p.map(func, tasks, chunksize=chunk_size)
else:
result = list()
method = getattr(p, pool_type)
iter_result = method(new_func, tasks, chunksize=chunk_size)
iter_result = method(func, tasks, chunksize=chunk_size)
while 1:
try:
result.append(next(iter_result))
Expand All @@ -169,8 +153,6 @@ def new_func(task):
else:
queue.put((None, None))
# clear queue
while queue.qsize():
queue.get()
if raised_exception:
raise raised_exception
# get error_queue if exists
Expand Down
19 changes: 11 additions & 8 deletions parallelbar/wrappers.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
import time
from itertools import count

from ._worker_queue import _WORKER_QUEUE

try:
import dill
except ImportError:
Expand Down Expand Up @@ -61,26 +59,31 @@ def _make_args(*args, **kwargs):
return args


def init_worker(worker_queue, init_progress, init_progress_args):
global _WORKER_QUEUE
_WORKER_QUEUE = worker_queue
if init_progress is not None:
init_progress(*init_progress_args)


def add_progress(error_handling='raise', set_error_value=None, timeout=None):
state = ProgressStatus()
cnt = count(1)

def actual_decorator(func):
@wraps(func)
def wrapper(*args, worker_queue=None, **kwargs):
if worker_queue is None:
worker_queue = _WORKER_QUEUE
def wrapper(*args, **kwargs):
try:
if timeout is None:
result = func(*args, **kwargs)
else:
result = stopit_after_timeout(timeout)(func)(*args, **kwargs)
except Exception as e:
if error_handling == 'raise':
worker_queue.put((None, -1))
_WORKER_QUEUE.put((None, -1))
raise
else:
worker_queue.put((1, _make_args(*args, **kwargs)))
_WORKER_QUEUE.put((1, _make_args(*args, **kwargs)))
if set_error_value is None:
return e
return set_error_value
Expand All @@ -94,7 +97,7 @@ def wrapper(*args, worker_queue=None, **kwargs):
state.next_update += max(int((delta_i / delta_t) * .25), 1)
state.last_update_val = updated
state.last_update_t = time_now
worker_queue.put_nowait((0, delta_i))
_WORKER_QUEUE.put_nowait((0, delta_i))
return result

return wrapper
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

setup(
name='parallelbar',
version='2.3.1',
version='2.4',
packages=find_packages(),
author='Dubovik Pavel',
author_email='geometryk@gmail.com',
Expand Down

0 comments on commit 773ff6b

Please sign in to comment.