Skip to content

Commit

Permalink
Added error_behaviour and set_error_value parameters to the progress_…
Browse files Browse the repository at this point in the history
…map, progress_imap and progress_input methods.
  • Loading branch information
padu committed Jul 29, 2022
1 parent e4ef0e2 commit 955cddf
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 26 deletions.
98 changes: 97 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,107 @@ time took 1.010124 s.
As you can see, instead of 5 seconds of execution, the function was interrupted after 1 second of timeout.
If `raise_exception=True`, a **TimeoutError** exception will be raised.

## New in version 0.3.0
1. The `error_behavior` keyword argument has been added to the **progress_map**, **progress_imap** and **progress_imapu** methods.
Must be one of the values: "raise", "ignore", "coerce".
- "raise" - raise an exception thrown in the process pool.
- "ignore" - ignore the exceptions that occur. Do not add anything to the result
- "coerce" - handle the exception. The result will include the value set by the parameter `set_error_value` (by default None - the traceback of the raised exception will be added to the result)
2. The `set_error_value` keyword argument has been added to the **progress_map**, **progress_imap** and **progress_imapu** methods.

Example of usage

```python
import time
import resource as rs
from parallelbar import progress_imap


def memory_limit(limit):
soft, hard = rs.getrlimit(rs.RLIMIT_AS)
rs.setrlimit(rs.RLIMIT_AS, (limit, hard))


def my_awesome_foo(n):
if n == 0:
s = 'a' * 10000000
elif n == 20:
time.sleep(100)
else:
time.sleep(1)
return n


if __name__ == '__main__':
tasks = range(30)
start = time.monotonic()
result = progress_imap(my_awesome_foo, tasks,
process_timeout=1.5,
initializer=memory_limit,
initargs=(100,),
n_cpu=4,
error_behavior='coerce',
set_error_value=None,
)
print(f'time took: {time.monotonic() - start:.1f}')
print(result)
```
![](https://raw.githubusercontent.com/dubovikmaster/parallelbar/main/gifs/test-new.gif)
```
time took: 8.2
[MemoryError(), 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15,
16, 17, 18, 19, TimeoutError('function "my_awesome_foo" took longer than 1.5 s.'), 21, 22, 23, 24, 25, 26, 27, 28, 29]
```
Set NaN instead of tracebacks to the result of the pool operation:
```python
if __name__ == '__main__':
tasks = range(30)
start = time.monotonic()
result = progress_imap(my_awesome_foo, tasks,
process_timeout=1.5,
initializer=memory_limit,
initargs=(100,),
n_cpu=4,
error_behavior='coerce',
set_error_value=float('nan'),
)
print(f'time took: {time.monotonic() - start:.1f}')
print(result)
```
![](https://raw.githubusercontent.com/dubovikmaster/parallelbar/main/gifs/test-new.gif)
```
time took: 8.0
[nan, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15,
16, 17, 18, 19, nan, 21, 22, 23, 24, 25, 26, 27, 28, 29]
```
Let's ignore exception:
```python
if __name__ == '__main__':
tasks = range(30)
start = time.monotonic()
result = progress_imap(my_awesome_foo, tasks,
process_timeout=1.5,
initializer=memory_limit,
initargs=(100,),
n_cpu=4,
error_behavior='ignore',
set_error_value=None,
)
print(f'time took: {time.monotonic() - start:.1f}')
print(result)
```
![](https://raw.githubusercontent.com/dubovikmaster/parallelbar/main/gifs/test-new.gif)
```
time took: 8.0
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15,
16, 17, 18, 19, 21, 22, 23, 24, 25, 26, 27, 28, 29]
```




## Problems of the naive approach
Why can't I do something simpler? Let's take the standard **imap** method and run through it in a loop with **tqdm** and take the results from the processes:

```python
from multiprocessing import Pool
from tqdm.auto import tqdm
Expand Down
Binary file modified gifs/error_bar_1.gif
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified gifs/error_bar_2.gif
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
61 changes: 39 additions & 22 deletions parallelbar/parallelbar.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,23 @@ def _update_error_bar(bar_dict, bar_parameters):
bar_dict['bar'].update()


def _error_behavior(error_handling, msgs, result, set_error_value, q):
if error_handling == 'raise':
q.put(None)
raise
elif error_handling == 'ignore':
pass
elif error_handling == 'coerce':
if set_error_value is None:
set_error_value = msgs
result.append(set_error_value)
else:
raise ValueError(
'Invalid error_handling value specified. Must be one of the values: "raise", "ignore", "coerce"')


def _do_parallel(func, pool_type, tasks, initializer, initargs, n_cpu, chunk_size, core_progress,
context, total, bar_step, disable, process_timeout,
context, total, bar_step, disable, process_timeout, error_behavior, set_error_value,
):
q = mp.Manager().Queue()
len_tasks = get_len(tasks, total)
Expand All @@ -113,11 +128,11 @@ def _do_parallel(func, pool_type, tasks, initializer, initargs, n_cpu, chunk_siz
chunk_size += 1
if core_progress:
bar_size = _bar_size(chunk_size, len_tasks, n_cpu)
proc = Thread(target=_core_process_status, args=(bar_size, bar_step, disable, q))
thread = Thread(target=_core_process_status, args=(bar_size, bar_step, disable, q), daemon=True)
else:
bar_size = len_tasks
proc = Thread(target=_process_status, args=(bar_size, bar_step, disable, q))
proc.start()
thread = Thread(target=_process_status, args=(bar_size, bar_step, disable, q), daemon=True)
thread.start()
target = partial(_process, func, q)
bar_parameters = dict(total=len_tasks, disable=disable, position=1, desc='ERROR', colour='red')
error_bar = {}
Expand All @@ -134,13 +149,15 @@ def _do_parallel(func, pool_type, tasks, initializer, initargs, n_cpu, chunk_siz
break
except TimeoutError:
_update_error_bar(error_bar, bar_parameters)
result.append(f"function {func.__name__} took longer than {process_timeout} s.")
except ProcessExpired as error:
_error_behavior(error_behavior,
f"function \"{func.__name__}\" took longer than {process_timeout} s.", result,
set_error_value, q)
except ProcessExpired as e:
_update_error_bar(error_bar, bar_parameters)
result.append(f" {error}. Exit code: {error.exitcode}")
_error_behavior(error_behavior, f" {e}. Exit code: {e.exitcode}", result, set_error_value, q)
except Exception as e:
_update_error_bar(error_bar, bar_parameters)
result.append(e)
_error_behavior(error_behavior, e, result, set_error_value, q)
else:
with mp.get_context(context).Pool(n_cpu, initializer=initializer, initargs=initargs) as p:
result = list()
Expand All @@ -153,48 +170,48 @@ def _do_parallel(func, pool_type, tasks, initializer, initargs, n_cpu, chunk_siz
break
except Exception as e:
_update_error_bar(error_bar, bar_parameters)
result.append(e)
_error_behavior(error_behavior, e, result, set_error_value, q)
if error_bar:
error_bar['bar'].close()
q.put(None)
proc.join()
thread.join()
return result


def progress_map(func, tasks, initializer=None, initargs=(), n_cpu=None, chunk_size=None, core_progress=False,
context=None, total=None, bar_step=1,
disable=False, process_timeout=None):
context=None, total=None, bar_step=1, disable=False, process_timeout=None, error_behavior='coerce',
set_error_value=None,
):
result = _do_parallel(func, 'map', tasks, initializer, initargs, n_cpu, chunk_size, core_progress, context, total,
bar_step, disable,
process_timeout)
bar_step, disable, process_timeout, error_behavior, set_error_value)
return result


def progress_imap(func, tasks, initializer=None, initargs=(), n_cpu=None, chunk_size=1, core_progress=False,
context=None, total=None,
bar_step=1, disable=False, process_timeout=None):
context=None, total=None, bar_step=1, disable=False, process_timeout=None, error_behavior='coerce',
set_error_value=None,
):
if process_timeout and chunk_size != 1:
raise ValueError('the process_timeout can only be used if chunk_size=1')
if isinstance(tasks, abc.Iterator) and not total:
raise ValueError('If the tasks are an iterator, the total parameter must be specified')
if process_timeout:
func = partial(_wrapped_func, func, process_timeout, True)
result = _do_parallel(func, 'imap', tasks, initializer, initargs, n_cpu, chunk_size, core_progress, context, total,
bar_step, disable,
None)
bar_step, disable, None, error_behavior, set_error_value)
return result


def progress_imapu(func, tasks, initializer=None, initargs=(), n_cpu=None, chunk_size=1, core_progress=False,
context=None, total=None,
bar_step=1, disable=False, process_timeout=None):
context=None, total=None, bar_step=1, disable=False, process_timeout=None, error_behavior='coerce',
set_error_value=None,
):
if process_timeout and chunk_size != 1:
raise ValueError('the process_timeout can only be used if chunk_size=1')
if isinstance(tasks, abc.Iterator) and not total:
raise ValueError('If the tasks are an iterator, the total parameter must be specified')
if process_timeout:
func = partial(_wrapped_func, func, process_timeout, True)
result = _do_parallel(func, 'imap_unordered', tasks, initializer, initargs, n_cpu, chunk_size, core_progress,
context, total, bar_step,
disable, None)
context, total, bar_step, disable, None, error_behavior, set_error_value)
return result
4 changes: 2 additions & 2 deletions parallelbar/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,11 @@ def actual_decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
timer = threading.Timer(s, stop_function)
timer.start()
try:
timer.start()
result = func(*args, **kwargs)
except KeyboardInterrupt:
msg = f'function {func.__name__} took longer than {s} s.'
msg = f'function \"{func.__name__}\" took longer than {s} s.'
if raise_exception:
raise TimeoutError(msg)
result = msg
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

setup(
name='parallelbar',
version='0.2.15',
version='0.3.0',
packages=find_packages(),
author='Dubovik Pavel',
author_email='geometryk@gmail.com',
Expand Down

0 comments on commit 955cddf

Please sign in to comment.