diff --git a/README.md b/README.md index 57d0fef..59d7863 100644 --- a/README.md +++ b/README.md @@ -1,13 +1,26 @@ # Parallelbar -[![PyPI version fury.io](https://badge.fury.io/py/parallelbar.svg)](https://pypi.python.org/pypi/pandarallel/) -[![PyPI license](https://img.shields.io/pypi/l/parallelbar.svg)](https://pypi.python.org/pypi/pandarallel/) -[![PyPI download month](https://img.shields.io/pypi/dm/parallelbar.svg)](https://pypi.python.org/pypi/pandarallel/) - -**Parallelbar** displays the progress of tasks in the process pool for methods such as **map**, **imap** and **imap_unordered**. Parallelbar is based on the [tqdm](https://github.com/tqdm/tqdm) module and the standard python [multiprocessing](https://docs.python.org/3/library/multiprocessing.html) library. +[![PyPI version fury.io](https://badge.fury.io/py/parallelbar.svg)](https://pypi.python.org/pypi/parallelbar/) +[![PyPI license](https://img.shields.io/pypi/l/parallelbar.svg)](https://pypi.python.org/pypi/parallelbar/) +[![PyPI download month](https://img.shields.io/pypi/dm/parallelbar.svg)](https://pypi.python.org/pypi/parallelbar/) + +## Table of contents +* [Instalation](#Instalation) +* [Usage](#Usage) +* [Exception handling](#exception-handling) +* [Changelog](#Changelog) + * [New in version 1.2](#new-in-version-1.2) + * [New in version 1.1](#new-in-version-1.1) + * [New in version 1.0](#new-in-version-1.0) + * [New in version 0.3](#new-in-version-0.3) +* [Problems of the naive approach](#naive-approach) +* [License](#license) + +**Parallelbar** displays the progress of tasks in the process pool for [**Pool**](https://docs.python.org/3/library/multiprocessing.html#module-multiprocessing.pool) class methods such as `map`, `starmap` (since 1.2 version), `imap` and `imap_unordered`. Parallelbar is based on the [tqdm](https://github.com/tqdm/tqdm) module and the standard python [multiprocessing](https://docs.python.org/3/library/multiprocessing.html) library. Also, it is possible to handle exceptions that occur within a separate process, as well as set a timeout for the execution of a task by a process. + ## Installation pip install parallelbar @@ -15,7 +28,7 @@ Also, it is possible to handle exceptions that occur within a separate process, pip install --user git+https://github.com/dubovikmaster/parallelbar.git - + ## Usage @@ -24,11 +37,11 @@ from parallelbar import progress_imap, progress_map, progress_imapu from parallelbar.tools import cpu_bench, fibonacci ``` -Let's create a list of 100 numbers and test **progress_map** with default parameters on a toy function **cpu_bench**: +Let's create a list of 100 numbers and test `progress_map` with default parameters on a toy function `cpu_bench`: ```python -tasks = [1_000_000 + i for i in range(100)] +tasks = range(10000) ``` ```python %%time @@ -51,19 +64,6 @@ Core progress: ![](https://raw.githubusercontent.com/dubovikmaster/parallelbar/main/gifs/core_progress.gif) -Great! We got an acceleration of 6 times! We were also able to observe the process -What about the progress on the cores of your cpu? - - - -```python -if __name__=='__main__': - tasks = [5_000_00 + i for i in range(100)] - progress_map(cpu_bench, tasks, n_cpu=4, chunk_size=1, core_progress=True) -``` - -![](https://raw.githubusercontent.com/dubovikmaster/parallelbar/main/gifs/multiple_bar_4.gif) - You can also easily use **progress_imap** and **progress_imapu** analogs of the *imap* and *imap_unordered* methods of the **Pool()** class @@ -76,6 +76,7 @@ if __name__=='__main__': ![](https://raw.githubusercontent.com/dubovikmaster/parallelbar/main/gifs/one_bar_imap.gif) + ## Exception handling You can handle exceptions and set timeouts for the execution of tasks by the process. Consider the following toy example: @@ -126,15 +127,22 @@ print(res) ``` Exception handling has also been added to methods **progress_imap** and **progress_imapu**. - + ## Changelog + +### New in version 1.2 + + - Added `progress_starmap` function. An extension of the [`starmap`](https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool.starmap) method of the `Pool` class. + - Improved documentation. + + ### New in version 1.1 1. The `bar_step` keyword argument is no longer used and will be removed in a future version 2. Added `need_serialize` boolean keyword argument to the `progress_map/imap/imapu` function (default `False`). Requires [dill](https://pypi.org/project/dill/) to be installed. If `True` the target function is serialized using `dill` library. Thus, as a target function, you can now use lambda functions, class methods and other callable objects that `pickle` cannot serialize 3. Added dynamic optimization of the progress bar refresh rate. This can significantly improve the performance of the `progress_map/imap/imapu` functions ror very long iterables and small execution time of one task by the objective function. - + ### New in version 1.0 1. The "ignore" value of the `error_behavior` key parameter is no longer supported. 2. Default value of key parameter `error_behavior` changed to "raise". @@ -143,7 +151,7 @@ the target function is serialized using `dill` library. Thus, as a target functi - "threads" - use thread pool - "processes" - use processes pool (default) - + ### New in version 0.3.0 1. The `error_behavior` keyword argument has been added to the **progress_map**, **progress_imap** and **progress_imapu** methods. Must be one of the values: "raise", "ignore", "coerce". @@ -240,7 +248,7 @@ time took: 8.0 16, 17, 18, 19, 21, 22, 23, 24, 25, 26, 27, 28, 29] ``` - + ## Problems of the naive approach Why can't I do something simpler? Let's take the standard **imap** method and run through it in a loop with **tqdm** and take the results from the processes: ```python @@ -291,6 +299,7 @@ if __name__=='__main__': The progress_imap function takes care of collecting the result and closing the process pool for you. In fact, the naive approach described above will work for the standard imap_unordered method. But it does not guarantee the order of the returned result. This is often critically important. + ## License MIT license diff --git a/parallelbar/__init__.py b/parallelbar/__init__.py index bb49bf9..7f9e4ca 100644 --- a/parallelbar/__init__.py +++ b/parallelbar/__init__.py @@ -1,3 +1,3 @@ -from .parallelbar import progress_map, progress_imap, progress_imapu +from .parallelbar import progress_map, progress_imap, progress_imapu, progress_starmap -__all__ = ['progress_map', 'progress_imap', 'progress_imapu'] +__all__ = ['progress_map', 'progress_imap', 'progress_imapu', 'progress_starmap'] diff --git a/parallelbar/parallelbar.py b/parallelbar/parallelbar.py index d570f0b..ef9d2f3 100644 --- a/parallelbar/parallelbar.py +++ b/parallelbar/parallelbar.py @@ -3,8 +3,11 @@ import multiprocessing as mp from threading import Thread from tqdm.auto import tqdm -from .tools import get_len -from .tools import stopit_after_timeout +from .tools import ( + get_len, + func_args_unpack, + stopit_after_timeout +) import time from itertools import count import warnings @@ -187,6 +190,51 @@ def progress_map(func, tasks, initializer=None, initargs=(), n_cpu=None, chunk_s context=None, total=None, bar_step=1, disable=False, process_timeout=None, error_behavior='raise', set_error_value=None, executor='processes', need_serialize=False ): + """ + An extension of the map method of the multiprocessing.Poll class that allows you to display the progress of tasks, + handle exceptions, and set a timeout for the function to execute. + + Parameters: + ---------- + func: сallable + A function that will be applied element by element to the tasks + iterable. + tasks: Iterable + The func function will be applied to the task elements. + initializer: сallable or None, default None + initargs: tuple + n_cpu: int, or None, default None + number of workers, if None n_cpu = multiprocessing.cpu_count(). + chunk_size: int or None, default None + context: str or None, default None + Can be 'fork', 'spawn' or 'forkserver'. + total: int or None, default None + The number of elements in tasks. Must be specified if task is iterator. + bar_step: int, default 1. + disable: bool, default False + if True don't show progress bar. + process_timeout: float or None, default None + If not None, a TimeoutError exception will be raised if the function execution time exceeds + the specified value in seconds. + error_behavior: str, default 'raise' + Can be 'raise' or 'coerce' + - If 'raise', then the exception that occurs when calling the func function will be raised. + - If 'coerce', then the exception that occurs when calling the func function will be processed and + the result of the function execution will be the value set in set_error_value. + set_error_value: Any, default None + The value to be returned in case of exception handling. Only matters if error_behavior='coerce'. + if None, the exception traceback will be returned. + executor: str, default 'processes' + Can be 'processes' or 'threads' + - if 'processes', uses processes pool + - if 'threads', use threads pool + need_serialize: bool, default False + If True function will be serialized with dill library. + Returns + ------- + result: list + + """ _validate_args(error_behavior, tasks, total, bar_step, executor) func = _func_prepare(func, process_timeout, need_serialize) result = _do_parallel(func, 'map', tasks, initializer, initargs, n_cpu, chunk_size, context, total, @@ -194,10 +242,112 @@ def progress_map(func, tasks, initializer=None, initargs=(), n_cpu=None, chunk_s return result +def progress_starmap(func, tasks, initializer=None, initargs=(), n_cpu=None, chunk_size=None, + context=None, total=None, bar_step=1, disable=False, process_timeout=None, error_behavior='raise', + set_error_value=None, executor='processes', need_serialize=False + ): + """ + An extension of the starmap method of the multiprocessing.Poll class that allows you to display + the progress of tasks, handle exceptions, and set a timeout for the function to execute. + + Parameters: + ---------- + func: сallable + A function that will be applied element by element to the tasks iterable. + tasks: Iterable + The func function will be applied to the task elements. + initializer: сallable or None, default None + initargs: tuple + n_cpu: int, or None, default None + number of workers, if None n_cpu = multiprocessing.cpu_count(). + chunk_size: int or None, default None + context: str or None, default None + Can be 'fork', 'spawn' or 'forkserver'. + total: int or None, default None + The number of elements in tasks. Must be specified if task is iterator. + bar_step: int, default 1. + disable: bool, default False + if True don't show progress bar. + process_timeout: float or None, default None + If not None, a TimeoutError exception will be raised if the function execution time exceeds + the specified value in seconds. + error_behavior: str, default 'raise' + Can be 'raise' or 'coerce' + - If 'raise', then the exception that occurs when calling the func function will be raised. + - If 'coerce', then the exception that occurs when calling the func function will be processed and + the result of the function execution will be the value set in set_error_value. + set_error_value: Any, default None + The value to be returned in case of exception handling. Only matters if error_behavior='coerce'. + if None, the exception traceback will be returned. + executor: str, default 'processes' + Can be 'processes' or 'threads' + - if 'processes', uses processes pool + - if 'threads', use threads pool + need_serialize: bool, default False + If True function will be serialized with dill library. + Returns + ------- + result: list + + """ + _validate_args(error_behavior, tasks, total, bar_step, executor) + func = partial(func_args_unpack, func) + func = _func_prepare(func, process_timeout, need_serialize) + result = _do_parallel(func, 'map', tasks, initializer, initargs, n_cpu, chunk_size, context, total, + bar_step, disable, error_behavior, set_error_value, executor, need_serialize) + return result + + def progress_imap(func, tasks, initializer=None, initargs=(), n_cpu=None, chunk_size=1, context=None, total=None, bar_step=1, disable=False, process_timeout=None, error_behavior='raise', set_error_value=None, executor='processes', need_serialize=False ): + """ + An extension of the imap method of the multiprocessing.Poll class that allows you to display the progress of tasks, + handle exceptions, and set a timeout for the function to execute. + + Parameters: + ---------- + func: сallable + A function that will be applied element by element to the tasks + iterable. + tasks: Iterable + The func function will be applied to the task elements. + initializer: сallable or None, default None + initargs: tuple + n_cpu: int, or None, default None + number of workers, if None n_cpu = multiprocessing.cpu_count(). + chunk_size: int or None, default None + context: str or None, default None + Can be 'fork', 'spawn' or 'forkserver'. + total: int or None, default None + The number of elements in tasks. Must be specified if task is iterator. + bar_step: int, default 1. + disable: bool, default False + if True don't show progress bar. + process_timeout: float or None, default None + If not None, a TimeoutError exception will be raised if the function execution time exceeds + the specified value in seconds. + error_behavior: str, default 'raise' + Can be 'raise' or 'coerce' + - If 'raise', then the exception that occurs when calling the func function will be raised. + - If 'coerce', then the exception that occurs when calling the func function will be processed and + the result of the function execution will be the value set in set_error_value. + set_error_value: Any, default None + The value to be returned in case of exception handling. Only matters if error_behavior='coerce'. + if None, the exception traceback will be returned. + executor: str, default 'processes' + Can be 'processes' or 'threads' + - if 'processes', uses processes pool + - if 'threads', use threads pool + need_serialize: bool, default False + If True function will be serialized with dill library. + Returns + ------- + result: list + + """ + _validate_args(error_behavior, tasks, total, bar_step, executor) func = _func_prepare(func, process_timeout, need_serialize) result = _do_parallel(func, 'imap', tasks, initializer, initargs, n_cpu, chunk_size, context, total, @@ -209,6 +359,52 @@ def progress_imapu(func, tasks, initializer=None, initargs=(), n_cpu=None, chunk context=None, total=None, bar_step=1, disable=False, process_timeout=None, error_behavior='raise', set_error_value=None, executor='processes', need_serialize=False ): + """ + An extension of the imap_unordered method of the multiprocessing.Poll class that allows you to display the progress of tasks, + handle exceptions, and set a timeout for the function to execute. + + Parameters: + ---------- + func: сallable + A function that will be applied element by element to the tasks + iterable. + tasks: Iterable + The func function will be applied to the task elements. + initializer: сallable or None, default None + initargs: tuple + n_cpu: int, or None, default None + number of workers, if None n_cpu = multiprocessing.cpu_count(). + chunk_size: int or None, default None + context: str or None, default None + Can be 'fork', 'spawn' or 'forkserver'. + total: int or None, default None + The number of elements in tasks. Must be specified if task is iterator. + bar_step: int, default 1. + disable: bool, default False + if True don't show progress bar. + process_timeout: float or None, default None + If not None, a TimeoutError exception will be raised if the function execution time exceeds + the specified value in seconds. + error_behavior: str, default 'raise' + Can be 'raise' or 'coerce' + - If 'raise', then the exception that occurs when calling the func function will be raised. + - If 'coerce', then the exception that occurs when calling the func function will be processed and + the result of the function execution will be the value set in set_error_value. + set_error_value: Any, default None + The value to be returned in case of exception handling. Only matters if error_behavior='coerce'. + if None, the exception traceback will be returned. + executor: str, default 'processes' + Can be 'processes' or 'threads' + - if 'processes', uses processes pool + - if 'threads', use threads pool + need_serialize: bool, default False + If True function will be serialized with dill library. + Returns + ------- + result: list + + """ + _validate_args(error_behavior, tasks, total, bar_step, executor) func = _func_prepare(func, process_timeout, need_serialize) result = _do_parallel(func, 'imap_unordered', tasks, initializer, initargs, n_cpu, chunk_size, diff --git a/setup.py b/setup.py index 0de35cb..28ca2ba 100644 --- a/setup.py +++ b/setup.py @@ -6,7 +6,7 @@ setup( name='parallelbar', - version='1.1.3', + version='1.2.0', packages=find_packages(), author='Dubovik Pavel', author_email='geometryk@gmail.com', @@ -28,5 +28,8 @@ 'tqdm', 'colorama', ], + extras_require={ + "dill": ['dill'], + }, platforms='any' )