Skip to content

Commit

Permalink
Added progress_starmap function; Added docstring to the progress_map/…
Browse files Browse the repository at this point in the history
…starmap/imap/imapu functions;
  • Loading branch information
padu committed Dec 21, 2022
1 parent 8dfbc21 commit 98ff99e
Show file tree
Hide file tree
Showing 4 changed files with 238 additions and 30 deletions.
59 changes: 34 additions & 25 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,21 +1,34 @@

# Parallelbar

[![PyPI version fury.io](https://badge.fury.io/py/parallelbar.svg)](https://pypi.python.org/pypi/pandarallel/)
[![PyPI license](https://img.shields.io/pypi/l/parallelbar.svg)](https://pypi.python.org/pypi/pandarallel/)
[![PyPI download month](https://img.shields.io/pypi/dm/parallelbar.svg)](https://pypi.python.org/pypi/pandarallel/)

**Parallelbar** displays the progress of tasks in the process pool for methods such as **map**, **imap** and **imap_unordered**. Parallelbar is based on the [tqdm](https://github.com/tqdm/tqdm) module and the standard python [multiprocessing](https://docs.python.org/3/library/multiprocessing.html) library.
[![PyPI version fury.io](https://badge.fury.io/py/parallelbar.svg)](https://pypi.python.org/pypi/parallelbar/)
[![PyPI license](https://img.shields.io/pypi/l/parallelbar.svg)](https://pypi.python.org/pypi/parallelbar/)
[![PyPI download month](https://img.shields.io/pypi/dm/parallelbar.svg)](https://pypi.python.org/pypi/parallelbar/)

## Table of contents
* [Instalation](#Instalation)
* [Usage](#Usage)
* [Exception handling](#exception-handling)
* [Changelog](#Changelog)
* [New in version 1.2](#new-in-version-1.2)
* [New in version 1.1](#new-in-version-1.1)
* [New in version 1.0](#new-in-version-1.0)
* [New in version 0.3](#new-in-version-0.3)
* [Problems of the naive approach](#naive-approach)
* [License](#license)

**Parallelbar** displays the progress of tasks in the process pool for [**Pool**](https://docs.python.org/3/library/multiprocessing.html#module-multiprocessing.pool) class methods such as `map`, `starmap` (since 1.2 version), `imap` and `imap_unordered`. Parallelbar is based on the [tqdm](https://github.com/tqdm/tqdm) module and the standard python [multiprocessing](https://docs.python.org/3/library/multiprocessing.html) library.
Also, it is possible to handle exceptions that occur within a separate process, as well as set a timeout for the execution of a task by a process.

<a name="Installation"></a>
## Installation

pip install parallelbar
or
pip install --user git+https://github.com/dubovikmaster/parallelbar.git



<a name="Usage"></a>
## Usage


Expand All @@ -24,11 +37,11 @@ from parallelbar import progress_imap, progress_map, progress_imapu
from parallelbar.tools import cpu_bench, fibonacci
```

Let's create a list of 100 numbers and test **progress_map** with default parameters on a toy function **cpu_bench**:
Let's create a list of 100 numbers and test `progress_map` with default parameters on a toy function `cpu_bench`:


```python
tasks = [1_000_000 + i for i in range(100)]
tasks = range(10000)
```
```python
%%time
Expand All @@ -51,19 +64,6 @@ Core progress:

![](https://raw.githubusercontent.com/dubovikmaster/parallelbar/main/gifs/core_progress.gif)

Great! We got an acceleration of 6 times! We were also able to observe the process
What about the progress on the cores of your cpu?



```python
if __name__=='__main__':
tasks = [5_000_00 + i for i in range(100)]
progress_map(cpu_bench, tasks, n_cpu=4, chunk_size=1, core_progress=True)
```

![](https://raw.githubusercontent.com/dubovikmaster/parallelbar/main/gifs/multiple_bar_4.gif)

You can also easily use **progress_imap** and **progress_imapu** analogs of the *imap* and *imap_unordered* methods of the **Pool()** class


Expand All @@ -76,6 +76,7 @@ if __name__=='__main__':

![](https://raw.githubusercontent.com/dubovikmaster/parallelbar/main/gifs/one_bar_imap.gif)

<a name="exception-handling"></a>
## Exception handling
You can handle exceptions and set timeouts for the execution of tasks by the process.
Consider the following toy example:
Expand Down Expand Up @@ -126,15 +127,22 @@ print(res)
```

Exception handling has also been added to methods **progress_imap** and **progress_imapu**.

<a name="Changelog"></a>
## Changelog
<a name="new-in-version-1.2"></a>
### New in version 1.2

- Added `progress_starmap` function. An extension of the [`starmap`](https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool.starmap) method of the `Pool` class.
- Improved documentation.

<a name="new-in-version-1.1"></a>
### New in version 1.1
1. The `bar_step` keyword argument is no longer used and will be removed in a future version
2. Added `need_serialize` boolean keyword argument to the `progress_map/imap/imapu` function (default `False`). Requires [dill](https://pypi.org/project/dill/) to be installed. If `True`
the target function is serialized using `dill` library. Thus, as a target function, you can now use lambda functions, class methods and other callable objects that `pickle` cannot serialize
3. Added dynamic optimization of the progress bar refresh rate. This can significantly improve the performance of the `progress_map/imap/imapu` functions ror very long iterables and small execution time of one task by the objective function.


<a name="new-in-version-1.0"></a>
### New in version 1.0
1. The "ignore" value of the `error_behavior` key parameter is no longer supported.
2. Default value of key parameter `error_behavior` changed to "raise".
Expand All @@ -143,7 +151,7 @@ the target function is serialized using `dill` library. Thus, as a target functi
- "threads" - use thread pool
- "processes" - use processes pool (default)


<a name="new-in-version-0.3"></a>
### New in version 0.3.0
1. The `error_behavior` keyword argument has been added to the **progress_map**, **progress_imap** and **progress_imapu** methods.
Must be one of the values: "raise", "ignore", "coerce".
Expand Down Expand Up @@ -240,7 +248,7 @@ time took: 8.0
16, 17, 18, 19, 21, 22, 23, 24, 25, 26, 27, 28, 29]
```


<a name="naive-approach"></a>
## Problems of the naive approach
Why can't I do something simpler? Let's take the standard **imap** method and run through it in a loop with **tqdm** and take the results from the processes:
```python
Expand Down Expand Up @@ -291,6 +299,7 @@ if __name__=='__main__':
The progress_imap function takes care of collecting the result and closing the process pool for you.
In fact, the naive approach described above will work for the standard imap_unordered method. But it does not guarantee the order of the returned result. This is often critically important.

<a name="license"></a>
## License

MIT license
4 changes: 2 additions & 2 deletions parallelbar/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from .parallelbar import progress_map, progress_imap, progress_imapu
from .parallelbar import progress_map, progress_imap, progress_imapu, progress_starmap

__all__ = ['progress_map', 'progress_imap', 'progress_imapu']
__all__ = ['progress_map', 'progress_imap', 'progress_imapu', 'progress_starmap']
200 changes: 198 additions & 2 deletions parallelbar/parallelbar.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@
import multiprocessing as mp
from threading import Thread
from tqdm.auto import tqdm
from .tools import get_len
from .tools import stopit_after_timeout
from .tools import (
get_len,
func_args_unpack,
stopit_after_timeout
)
import time
from itertools import count
import warnings
Expand Down Expand Up @@ -187,17 +190,164 @@ def progress_map(func, tasks, initializer=None, initargs=(), n_cpu=None, chunk_s
context=None, total=None, bar_step=1, disable=False, process_timeout=None, error_behavior='raise',
set_error_value=None, executor='processes', need_serialize=False
):
"""
An extension of the map method of the multiprocessing.Poll class that allows you to display the progress of tasks,
handle exceptions, and set a timeout for the function to execute.
Parameters:
----------
func: сallable
A function that will be applied element by element to the tasks
iterable.
tasks: Iterable
The func function will be applied to the task elements.
initializer: сallable or None, default None
initargs: tuple
n_cpu: int, or None, default None
number of workers, if None n_cpu = multiprocessing.cpu_count().
chunk_size: int or None, default None
context: str or None, default None
Can be 'fork', 'spawn' or 'forkserver'.
total: int or None, default None
The number of elements in tasks. Must be specified if task is iterator.
bar_step: int, default 1.
disable: bool, default False
if True don't show progress bar.
process_timeout: float or None, default None
If not None, a TimeoutError exception will be raised if the function execution time exceeds
the specified value in seconds.
error_behavior: str, default 'raise'
Can be 'raise' or 'coerce'
- If 'raise', then the exception that occurs when calling the func function will be raised.
- If 'coerce', then the exception that occurs when calling the func function will be processed and
the result of the function execution will be the value set in set_error_value.
set_error_value: Any, default None
The value to be returned in case of exception handling. Only matters if error_behavior='coerce'.
if None, the exception traceback will be returned.
executor: str, default 'processes'
Can be 'processes' or 'threads'
- if 'processes', uses processes pool
- if 'threads', use threads pool
need_serialize: bool, default False
If True function will be serialized with dill library.
Returns
-------
result: list
"""
_validate_args(error_behavior, tasks, total, bar_step, executor)
func = _func_prepare(func, process_timeout, need_serialize)
result = _do_parallel(func, 'map', tasks, initializer, initargs, n_cpu, chunk_size, context, total,
bar_step, disable, error_behavior, set_error_value, executor, need_serialize)
return result


def progress_starmap(func, tasks, initializer=None, initargs=(), n_cpu=None, chunk_size=None,
context=None, total=None, bar_step=1, disable=False, process_timeout=None, error_behavior='raise',
set_error_value=None, executor='processes', need_serialize=False
):
"""
An extension of the starmap method of the multiprocessing.Poll class that allows you to display
the progress of tasks, handle exceptions, and set a timeout for the function to execute.
Parameters:
----------
func: сallable
A function that will be applied element by element to the tasks iterable.
tasks: Iterable
The func function will be applied to the task elements.
initializer: сallable or None, default None
initargs: tuple
n_cpu: int, or None, default None
number of workers, if None n_cpu = multiprocessing.cpu_count().
chunk_size: int or None, default None
context: str or None, default None
Can be 'fork', 'spawn' or 'forkserver'.
total: int or None, default None
The number of elements in tasks. Must be specified if task is iterator.
bar_step: int, default 1.
disable: bool, default False
if True don't show progress bar.
process_timeout: float or None, default None
If not None, a TimeoutError exception will be raised if the function execution time exceeds
the specified value in seconds.
error_behavior: str, default 'raise'
Can be 'raise' or 'coerce'
- If 'raise', then the exception that occurs when calling the func function will be raised.
- If 'coerce', then the exception that occurs when calling the func function will be processed and
the result of the function execution will be the value set in set_error_value.
set_error_value: Any, default None
The value to be returned in case of exception handling. Only matters if error_behavior='coerce'.
if None, the exception traceback will be returned.
executor: str, default 'processes'
Can be 'processes' or 'threads'
- if 'processes', uses processes pool
- if 'threads', use threads pool
need_serialize: bool, default False
If True function will be serialized with dill library.
Returns
-------
result: list
"""
_validate_args(error_behavior, tasks, total, bar_step, executor)
func = partial(func_args_unpack, func)
func = _func_prepare(func, process_timeout, need_serialize)
result = _do_parallel(func, 'map', tasks, initializer, initargs, n_cpu, chunk_size, context, total,
bar_step, disable, error_behavior, set_error_value, executor, need_serialize)
return result


def progress_imap(func, tasks, initializer=None, initargs=(), n_cpu=None, chunk_size=1,
context=None, total=None, bar_step=1, disable=False, process_timeout=None, error_behavior='raise',
set_error_value=None, executor='processes', need_serialize=False
):
"""
An extension of the imap method of the multiprocessing.Poll class that allows you to display the progress of tasks,
handle exceptions, and set a timeout for the function to execute.
Parameters:
----------
func: сallable
A function that will be applied element by element to the tasks
iterable.
tasks: Iterable
The func function will be applied to the task elements.
initializer: сallable or None, default None
initargs: tuple
n_cpu: int, or None, default None
number of workers, if None n_cpu = multiprocessing.cpu_count().
chunk_size: int or None, default None
context: str or None, default None
Can be 'fork', 'spawn' or 'forkserver'.
total: int or None, default None
The number of elements in tasks. Must be specified if task is iterator.
bar_step: int, default 1.
disable: bool, default False
if True don't show progress bar.
process_timeout: float or None, default None
If not None, a TimeoutError exception will be raised if the function execution time exceeds
the specified value in seconds.
error_behavior: str, default 'raise'
Can be 'raise' or 'coerce'
- If 'raise', then the exception that occurs when calling the func function will be raised.
- If 'coerce', then the exception that occurs when calling the func function will be processed and
the result of the function execution will be the value set in set_error_value.
set_error_value: Any, default None
The value to be returned in case of exception handling. Only matters if error_behavior='coerce'.
if None, the exception traceback will be returned.
executor: str, default 'processes'
Can be 'processes' or 'threads'
- if 'processes', uses processes pool
- if 'threads', use threads pool
need_serialize: bool, default False
If True function will be serialized with dill library.
Returns
-------
result: list
"""

_validate_args(error_behavior, tasks, total, bar_step, executor)
func = _func_prepare(func, process_timeout, need_serialize)
result = _do_parallel(func, 'imap', tasks, initializer, initargs, n_cpu, chunk_size, context, total,
Expand All @@ -209,6 +359,52 @@ def progress_imapu(func, tasks, initializer=None, initargs=(), n_cpu=None, chunk
context=None, total=None, bar_step=1, disable=False, process_timeout=None, error_behavior='raise',
set_error_value=None, executor='processes', need_serialize=False
):
"""
An extension of the imap_unordered method of the multiprocessing.Poll class that allows you to display the progress of tasks,
handle exceptions, and set a timeout for the function to execute.
Parameters:
----------
func: сallable
A function that will be applied element by element to the tasks
iterable.
tasks: Iterable
The func function will be applied to the task elements.
initializer: сallable or None, default None
initargs: tuple
n_cpu: int, or None, default None
number of workers, if None n_cpu = multiprocessing.cpu_count().
chunk_size: int or None, default None
context: str or None, default None
Can be 'fork', 'spawn' or 'forkserver'.
total: int or None, default None
The number of elements in tasks. Must be specified if task is iterator.
bar_step: int, default 1.
disable: bool, default False
if True don't show progress bar.
process_timeout: float or None, default None
If not None, a TimeoutError exception will be raised if the function execution time exceeds
the specified value in seconds.
error_behavior: str, default 'raise'
Can be 'raise' or 'coerce'
- If 'raise', then the exception that occurs when calling the func function will be raised.
- If 'coerce', then the exception that occurs when calling the func function will be processed and
the result of the function execution will be the value set in set_error_value.
set_error_value: Any, default None
The value to be returned in case of exception handling. Only matters if error_behavior='coerce'.
if None, the exception traceback will be returned.
executor: str, default 'processes'
Can be 'processes' or 'threads'
- if 'processes', uses processes pool
- if 'threads', use threads pool
need_serialize: bool, default False
If True function will be serialized with dill library.
Returns
-------
result: list
"""

_validate_args(error_behavior, tasks, total, bar_step, executor)
func = _func_prepare(func, process_timeout, need_serialize)
result = _do_parallel(func, 'imap_unordered', tasks, initializer, initargs, n_cpu, chunk_size,
Expand Down
5 changes: 4 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

setup(
name='parallelbar',
version='1.1.3',
version='1.2.0',
packages=find_packages(),
author='Dubovik Pavel',
author_email='geometryk@gmail.com',
Expand All @@ -28,5 +28,8 @@
'tqdm',
'colorama',
],
extras_require={
"dill": ['dill'],
},
platforms='any'
)

0 comments on commit 98ff99e

Please sign in to comment.