diff --git a/pandarallel/__init__.py b/pandarallel/__init__.py index 07144bb..a43d05b 100644 --- a/pandarallel/__init__.py +++ b/pandarallel/__init__.py @@ -1,3 +1,3 @@ -__version__ = "1.5.5" +from .core import pandarallel -from .pandarallel import pandarallel +__version__ = "1.5.6" diff --git a/pandarallel/core.py b/pandarallel/core.py new file mode 100644 index 0000000..1706ab4 --- /dev/null +++ b/pandarallel/core.py @@ -0,0 +1,565 @@ +import multiprocessing +import pickle +from itertools import count +from multiprocessing.managers import SyncManager +from pathlib import Path +from tempfile import NamedTemporaryFile +from typing import Any, Callable, Dict, Iterator, Optional, Tuple, Type, cast + +import dill +import pandas as pd +from pandas.core.groupby import DataFrameGroupBy as PandaDataFrameGroupBy +from pandas.core.window.expanding import ExpandingGroupby as PandasExpandingGroupby +from pandas.core.window.rolling import RollingGroupby as PandasRollingGroupby + +from .data_types import ( + DataFrame, + DataFrameGroupBy, + DataType, + ExpandingGroupBy, + RollingGroupBy, + Series, + SeriesRolling, +) +from .progress_bars import ProgressBarsType, get_progress_bars, progress_wrapper +from .utils import WorkerStatus + +# Python 3.8 on MacOS by default uses "spawn" instead of "fork" as start method for new +# processes, which is incompatible with pandarallel. We force it to use "fork" method. +CONTEXT = multiprocessing.get_context("fork") + +# Root of Memory File System +MEMORY_FS_ROOT = "/dev/shm" + +# By default, Pandarallel use all available CPUs +NB_WORKERS = max(CONTEXT.cpu_count() // 2, 1) + +# Prefix and suffix for files used with Memory File System +PREFIX = "pandarallel" +PREFIX_INPUT = f"{PREFIX}_input_" +PREFIX_OUTPUT = f"{PREFIX}_output_" +SUFFIX = ".pickle" + +# The goal of this part is to let Pandarallel to serialize functions which are not defined +# at the top level of the module (like DataFrame.Apply.worker). This trick is inspired by +# this article: https://medium.com/@yasufumy/python-multiprocessing-c6d54107dd55 +# Warning: In this article, the trick is presented to be able to serialize lambda functions. +# Even if Pandarallel is able to serialize lambda functions, it is only thanks to `dill`. +_func = None + + +def worker_init(func: Callable) -> None: + global _func + _func = func + + +def global_worker(*args, **kwargs): + return _func(*args, **kwargs) + + +def wrap_work_function_for_file_system( + work_function: Callable[ + [Any, Callable, tuple, Dict[str, Any], Dict[str, Any]], Any + ], +) -> Callable[ + [ + Path, + Path, + ProgressBarsType, + int, + multiprocessing.Queue, + bytes, + tuple, + Dict[str, Any], + Dict[str, Any], + ], + None, +]: + def closure( + input_file_path: Path, + output_file_path: Path, + progress_bars_type: ProgressBarsType, + worker_index: int, + master_workers_queue: multiprocessing.Queue, + dilled_user_defined_function: bytes, + user_defined_function_args: tuple, + user_defined_function_kwargs: Dict[str, Any], + extra: Dict[str, Any], + ) -> None: + try: + # Load dataframe from input file + with input_file_path.open("rb") as file_descriptor: + data = pickle.load(file_descriptor) + + # Delete input file since we don't need it any more. It will free some RAM + # since the input file is stored into Shared Memory. + input_file_path.unlink() + + data_size = len(data) + user_defined_function: Callable = dill.loads(dilled_user_defined_function) + + progress_wrapped_user_defined_function = progress_wrapper( + user_defined_function, master_workers_queue, worker_index, data_size + ) + + used_user_defined_function = ( + progress_wrapped_user_defined_function + if progress_bars_type + in ( + ProgressBarsType.InUserDefinedFunction, + ProgressBarsType.InUserDefinedFunctionMultiplyByNumberOfColumns, + ) + else user_defined_function + ) + + result = work_function( + data, + used_user_defined_function, + user_defined_function_args, + user_defined_function_kwargs, + extra, + ) + + with output_file_path.open("wb") as file_descriptor: + pickle.dump(result, file_descriptor) + + master_workers_queue.put((worker_index, WorkerStatus.Success, None)) + + except: + master_workers_queue.put((worker_index, WorkerStatus.Error, None)) + raise + + return closure + + +def wrap_work_function_for_pipe( + work_function: Callable[ + [ + Any, + Callable, + tuple, + Dict[str, Any], + Dict[str, Any], + ], + Any, + ], +) -> Callable[ + [ + Any, + ProgressBarsType, + int, + multiprocessing.Queue, + bytes, + tuple, + Dict[str, Any], + Dict[str, Any], + ], + Any, +]: + def closure( + data: Any, + progress_bars_type: ProgressBarsType, + worker_index: int, + master_workers_queue: multiprocessing.Queue, + dilled_user_defined_function: bytes, + user_defined_function_args: tuple, + user_defined_function_kwargs: Dict[str, Any], + extra: Dict[str, Any], + ) -> Any: + try: + data_size = len(data) + user_defined_function: Callable = dill.loads(dilled_user_defined_function) + + progress_wrapped_user_defined_function = progress_wrapper( + user_defined_function, master_workers_queue, worker_index, data_size + ) + + used_user_defined_function = ( + progress_wrapped_user_defined_function + if progress_bars_type + in ( + ProgressBarsType.InUserDefinedFunction, + ProgressBarsType.InUserDefinedFunctionMultiplyByNumberOfColumns, + ) + else user_defined_function + ) + + results = work_function( + data, + used_user_defined_function, + user_defined_function_args, + user_defined_function_kwargs, + extra, + ) + + master_workers_queue.put((worker_index, WorkerStatus.Success, None)) + + return results + + except: + master_workers_queue.put((worker_index, WorkerStatus.Error, None)) + raise + + return closure + + +def wrap_reduce_function_for_file_system( + reduce_function: Callable[[Iterator, Dict[str, Any]], Any] +) -> Callable[[Iterator[Path], Dict[str, Any]], Any]: + """This wrapper transforms a `reduce` function which takes as input: + - A list of pandas Dataframe + - An user defined function + and which returns a pandas Dataframe, into a `reduct` function which takes as input: + - A list of paths where pandas Dataframe are pickled + which returns a pandas Dataframe. + """ + + def closure(output_file_paths: Iterator[Path], extra: Dict[str, Any]) -> Any: + def get_dataframe_and_delete_file(file_path: Path) -> Any: + with file_path.open("rb") as file_descriptor: + data = pickle.load(file_descriptor) + + file_path.unlink() + return data + + dfs = ( + get_dataframe_and_delete_file(output_file_path) + for output_file_path in output_file_paths + ) + + return reduce_function(dfs, extra) + + return closure + + +def parallelize_with_memory_file_system( + nb_requested_workers: int, + data_type: Type[DataType], + progress_bars_type: ProgressBarsType, +): + def closure( + data: Any, + user_defined_function: Callable, + *user_defined_function_args: tuple, + **user_defined_function_kwargs: Dict[str, Any], + ): + wrapped_work_function = wrap_work_function_for_file_system(data_type.work) + wrapped_reduce_function = wrap_reduce_function_for_file_system(data_type.reduce) + + chunks = list( + data_type.get_chunks( + nb_requested_workers, + data, + user_defined_function_kwargs=user_defined_function_kwargs, + ) + ) + + nb_workers = len(chunks) + + multiplicator_factor = ( + len(cast(pd.DataFrame, data).columns) + if progress_bars_type + == ProgressBarsType.InUserDefinedFunctionMultiplyByNumberOfColumns + else 1 + ) + + progresses_length = [len(chunk_) * multiplicator_factor for chunk_ in chunks] + + work_extra = data_type.get_work_extra(data) + reduce_extra = data_type.get_reduce_extra(data) + + show_progress_bars = progress_bars_type != ProgressBarsType.No + + progress_bars = get_progress_bars(progresses_length, show_progress_bars) + progresses = [0] * nb_workers + workers_status = [WorkerStatus.Running] * nb_workers + + input_files = [ + NamedTemporaryFile( + prefix=PREFIX_INPUT, suffix=SUFFIX, dir=MEMORY_FS_ROOT, delete=False + ) + for _ in range(nb_workers) + ] + + output_files = [ + NamedTemporaryFile( + prefix=PREFIX_OUTPUT, suffix=SUFFIX, dir=MEMORY_FS_ROOT, delete=False + ) + for _ in range(nb_workers) + ] + + try: + for chunk, input_file in zip(chunks, input_files): + with Path(input_file.name).open("wb") as file_descriptor: + pickle.dump(chunk, file_descriptor) + + dilled_user_defined_function = dill.dumps(user_defined_function) + manager: SyncManager = CONTEXT.Manager() + master_workers_queue = manager.Queue() + + work_args_list = [ + ( + Path(input_file.name), + Path(output_file.name), + progress_bars_type, + worker_index, + master_workers_queue, + dilled_user_defined_function, + user_defined_function_args, + user_defined_function_kwargs, + { + **work_extra, + **{ + "master_workers_queue": master_workers_queue, + "show_progress_bars": show_progress_bars, + "worker_index": worker_index, + }, + }, + ) + for worker_index, ( + input_file, + output_file, + ) in enumerate(zip(input_files, output_files)) + ] + + pool = CONTEXT.Pool(nb_workers, worker_init, (wrapped_work_function,)) + pool.starmap_async(global_worker, work_args_list) + pool.close() + + generation = count() + + while any( + ( + worker_status == WorkerStatus.Running + for worker_status in workers_status + ) + ): + message: Tuple[int, WorkerStatus, Any] = master_workers_queue.get() + worker_index, worker_status, payload = message + workers_status[worker_index] = worker_status + + if worker_status == WorkerStatus.Success: + progresses[worker_index] = progresses_length[worker_index] + progress_bars.update(progresses) + elif worker_status == WorkerStatus.Running: + progress = cast(int, payload) + progresses[worker_index] = progress + + if next(generation) % nb_workers == 0: + progress_bars.update(progresses) + elif worker_status == WorkerStatus.Error: + progress_bars.set_error(worker_index) + progress_bars.update(progresses) + + return wrapped_reduce_function( + (Path(output_file.name) for output_file in output_files), + reduce_extra, + ) + + finally: + for output_file in output_files: + # When pandarallel stop supporting Python 3.7 and older, replace this + # try/except clause by: + # Path(output_file.name).unlink(missing_ok=True) + try: + Path(output_file.name).unlink() + except FileNotFoundError: + # Do nothing, this is the nominal case. + pass + + return closure + + +def parallelize_with_pipe( + nb_requested_workers: int, + data_type: Type[DataType], + progress_bars_type: ProgressBarsType, +): + def closure( + data: Any, + user_defined_function: Callable, + *user_defined_function_args: tuple, + **user_defined_function_kwargs: Dict[str, Any], + ): + wrapped_work_function = wrap_work_function_for_pipe(data_type.work) + dilled_user_defined_function = dill.dumps(user_defined_function) + manager: SyncManager = CONTEXT.Manager() + master_workers_queue = manager.Queue() + + chunks = list( + data_type.get_chunks( + nb_requested_workers, + data, + user_defined_function_kwargs=user_defined_function_kwargs, + ) + ) + + nb_workers = len(chunks) + + multiplicator_factor = ( + len(cast(pd.DataFrame, data).columns) + if progress_bars_type + == ProgressBarsType.InUserDefinedFunctionMultiplyByNumberOfColumns + else 1 + ) + + progresses_length = [len(chunk_) * multiplicator_factor for chunk_ in chunks] + + work_extra = data_type.get_work_extra(data) + reduce_extra = data_type.get_reduce_extra(data) + + show_progress_bars = progress_bars_type != ProgressBarsType.No + + progress_bars = get_progress_bars(progresses_length, show_progress_bars) + progresses = [0] * nb_workers + workers_status = [WorkerStatus.Running] * nb_workers + + work_args_list = [ + ( + chunk, + progress_bars_type, + worker_index, + master_workers_queue, + dilled_user_defined_function, + user_defined_function_args, + user_defined_function_kwargs, + { + **work_extra, + **{ + "master_workers_queue": master_workers_queue, + "show_progress_bars": show_progress_bars, + "worker_index": worker_index, + }, + }, + ) + for worker_index, chunk in enumerate(chunks) + ] + + pool = CONTEXT.Pool(nb_workers, worker_init, (wrapped_work_function,)) + results_promise = pool.starmap_async(global_worker, work_args_list) + pool.close() + + generation = count() + + while any( + (worker_status == WorkerStatus.Running for worker_status in workers_status) + ): + message: Tuple[int, WorkerStatus, Any] = master_workers_queue.get() + worker_index, worker_status, payload = message + workers_status[worker_index] = worker_status + + if worker_status == WorkerStatus.Success: + progresses[worker_index] = progresses_length[worker_index] + progress_bars.update(progresses) + elif worker_status == WorkerStatus.Running: + progress = cast(int, payload) + progresses[worker_index] = progress + + if next(generation) % nb_workers == 0: + progress_bars.update(progresses) + elif worker_status == WorkerStatus.Error: + progress_bars.set_error(worker_index) + + results = results_promise.get() + + return data_type.reduce(results, reduce_extra) + + return closure + + +class pandarallel: + @classmethod + def initialize( + cls, + shm_size_mb=None, + nb_workers=NB_WORKERS, + progress_bar=False, + verbose=2, + use_memory_fs: Optional[bool] = None, + ) -> None: + show_progress_bars = progress_bar + is_memory_fs_available = Path(MEMORY_FS_ROOT).exists() + + use_memory_fs = ( + use_memory_fs if use_memory_fs is not None else is_memory_fs_available + ) + + parallelize = ( + parallelize_with_memory_file_system + if use_memory_fs + else parallelize_with_pipe + ) + + if use_memory_fs and not is_memory_fs_available: + raise SystemError("Memory file system is not available") + + if verbose >= 2: + print(f"INFO: Pandarallel will run on {nb_workers} workers.") + + message = ( + ( + "INFO: Pandarallel will use Memory file system to transfer data " + "between the main process and workers." + ) + if use_memory_fs + else ( + "INFO: Pandarallel will use standard multiprocessing data transfer " + "(pipe) to transfer data between the main process and workers." + ) + ) + + print(message) + + progress_bars_in_user_defined_function = ( + ProgressBarsType.InUserDefinedFunction + if show_progress_bars + else ProgressBarsType.No + ) + + progress_bars_in_user_defined_function_multiply_by_number_of_columns = ( + ProgressBarsType.InUserDefinedFunctionMultiplyByNumberOfColumns + if show_progress_bars + else ProgressBarsType.No + ) + + progress_bars_in_work_function = ( + ProgressBarsType.InWorkFunction + if show_progress_bars + else ProgressBarsType.No + ) + + # DataFrame + pd.DataFrame.parallel_apply = parallelize( + nb_workers, DataFrame.Apply, progress_bars_in_user_defined_function + ) + pd.DataFrame.parallel_applymap = parallelize( + nb_workers, + DataFrame.ApplyMap, + progress_bars_in_user_defined_function_multiply_by_number_of_columns, + ) + + # DataFrame GroupBy + PandaDataFrameGroupBy.parallel_apply = parallelize( + nb_workers, DataFrameGroupBy.Apply, progress_bars_in_user_defined_function + ) + + # Expanding GroupBy + PandasExpandingGroupby.parallel_apply = parallelize( + nb_workers, ExpandingGroupBy.Apply, progress_bars_in_work_function + ) + + # Rolling GroupBy + PandasRollingGroupby.parallel_apply = parallelize( + nb_workers, RollingGroupBy.Apply, progress_bars_in_work_function + ) + + # Series + pd.Series.parallel_apply = parallelize( + nb_workers, Series.Apply, progress_bars_in_user_defined_function + ) + pd.Series.parallel_map = parallelize(nb_workers, Series.Map, show_progress_bars) + + # Series Rolling + pd.core.window.Rolling.parallel_apply = parallelize( + nb_workers, SeriesRolling.Apply, progress_bars_in_user_defined_function + ) diff --git a/pandarallel/data_types/__init__.py b/pandarallel/data_types/__init__.py index e69de29..3e47dba 100644 --- a/pandarallel/data_types/__init__.py +++ b/pandarallel/data_types/__init__.py @@ -0,0 +1,7 @@ +from .dataframe import DataFrame +from .dataframe_groupby import DataFrameGroupBy +from .expanding_groupby import ExpandingGroupBy +from .rolling_groupby import RollingGroupBy +from .generic import DataType +from .series import Series +from .series_rolling import SeriesRolling diff --git a/pandarallel/data_types/dataframe.py b/pandarallel/data_types/dataframe.py index 046119e..5e2e067 100644 --- a/pandarallel/data_types/dataframe.py +++ b/pandarallel/data_types/dataframe.py @@ -1,41 +1,69 @@ +from typing import Any, Callable, Dict, Iterable, Iterator + import pandas as pd -from pandarallel.utils.tools import chunk +from ..utils import chunk +from .generic import DataType -class DataFrame: - @staticmethod - def reduce(results, _): - return pd.concat(results, copy=False) - class Apply: +class DataFrame: + class Apply(DataType): @staticmethod - def get_chunks(nb_workers, df, *args, **kwargs): - axis = kwargs.get("axis", 0) - if axis == "index": - axis = 0 - elif axis == "columns": - axis = 1 + def get_chunks( + nb_workers: int, data: pd.DataFrame, **kwargs + ) -> Iterator[pd.DataFrame]: + user_defined_function_kwargs = kwargs["user_defined_function_kwargs"] + axis = user_defined_function_kwargs.get("axis", 0) + + if axis not in {0, 1, "index", "columns"}: + raise ValueError(f"No axis named {axis} for object type DataFrame") - opposite_axis = 1 - axis + axis_int = {0: 0, 1: 1, "index": 0, "columns": 1}[axis] + opposite_axis_int = 1 - axis_int - for chunk_ in chunk(df.shape[opposite_axis], nb_workers): - if axis == 1: - yield df.iloc[chunk_] - else: - yield df.iloc[:, chunk_] + for chunk_ in chunk(data.shape[opposite_axis_int], nb_workers): + yield data.iloc[chunk_] if axis_int == 1 else data.iloc[:, chunk_] + + @staticmethod + def work( + data: pd.DataFrame, + user_defined_function: Callable, + user_defined_function_args: tuple, + user_defined_function_kwargs: Dict[str, Any], + extra: Dict[str, Any], + ) -> pd.DataFrame: + return data.apply( + user_defined_function, + *user_defined_function_args, + **user_defined_function_kwargs, + ) + + @staticmethod + def reduce( + datas: Iterable[pd.DataFrame], extra: Dict[str, Any] + ) -> pd.DataFrame: + return pd.concat(datas, copy=False) + class ApplyMap(DataType): @staticmethod - def worker( - df, _index, _meta_args, _progress_bar, _queue, func, *args, **kwargs - ): - return df.apply(func, *args, **kwargs) + def get_chunks( + nb_workers: int, data: pd.DataFrame, **kwargs + ) -> Iterator[pd.DataFrame]: + for chunk_ in chunk(data.shape[0], nb_workers): + yield data.iloc[chunk_] - class ApplyMap: @staticmethod - def get_chunks(nb_workers, df, *_): - for chunk_ in chunk(df.shape[0], nb_workers): - yield df.iloc[chunk_] + def work( + data: pd.DataFrame, + user_defined_function: Callable, + user_defined_function_args: tuple, + user_defined_function_kwargs: Dict[str, Any], + extra: Dict[str, Any], + ) -> pd.DataFrame: + return data.applymap(user_defined_function) @staticmethod - def worker(df, _index, _meta_args, _progress_bar, _queue, func, *_): - return df.applymap(func) + def reduce( + datas: Iterable[pd.DataFrame], extra: Dict[str, Any] + ) -> pd.DataFrame: + return pd.concat(datas, copy=False) diff --git a/pandarallel/data_types/dataframe_groupby.py b/pandarallel/data_types/dataframe_groupby.py index 43d750d..f6d119f 100644 --- a/pandarallel/data_types/dataframe_groupby.py +++ b/pandarallel/data_types/dataframe_groupby.py @@ -1,49 +1,81 @@ import itertools +from typing import Any, Callable, Dict, Iterable, Iterator, List, Tuple, Union, cast + import pandas as pd -from pandarallel.utils.tools import chunk, df_indexed_like +from pandas.core.groupby.generic import DataFrameGroupBy as PandasDataFrameGroupBy + +from ..utils import chunk, df_indexed_like, get_pandas_version +from .generic import DataType class DataFrameGroupBy: - @staticmethod - def get_reduce_meta_args(df_grouped): - return df_grouped - - @staticmethod - def reduce(results, df_grouped): - results = itertools.chain.from_iterable(results) - keys, values, mutated = zip(*results) - mutated = any(mutated) - - # GH #150 - pd_version = tuple(map(int, pd.__version__.split('.')[:2])) - if pd_version < (1, 3): - args = keys, values - elif pd_version < (1, 4): - args = df_grouped._selected_obj, keys, values - else: - args = df_grouped._selected_obj, values - - return df_grouped._wrap_applied_output( - *args, not_indexed_same = df_grouped.mutated or mutated - ) - - @staticmethod - def get_chunks(nb_workers, df_grouped, *args, **kwargs): - chunks = chunk(df_grouped.ngroups, nb_workers) - iterator = iter(df_grouped) - - for chunk_ in chunks: - yield [next(iterator) for _ in range(chunk_.stop - chunk_.start)] - - @staticmethod - def worker( - tuples, _index, _meta_args, _progress_bar, _queue, func, *args, **kwargs - ): - keys, results, mutated = [], [], [] - for key, df in tuples: - res = func(df, *args, **kwargs) - results.append(res) - mutated.append(not df_indexed_like(res, df.axes)) - keys.append(key) - - return zip(keys, results, mutated) + class Apply(DataType): + @staticmethod + def get_chunks( + nb_workers: int, dataframe_groupby: PandasDataFrameGroupBy, **kwargs + ) -> Iterator[List[Tuple[int, pd.DataFrame]]]: + chunks = chunk(dataframe_groupby.ngroups, nb_workers) + iterator = iter(dataframe_groupby) + + for chunk_ in chunks: + yield [next(iterator) for _ in range(chunk_.stop - chunk_.start)] + + @staticmethod + def work( + data: List[Tuple[int, pd.DataFrame]], + user_defined_function: Callable, + user_defined_function_args: tuple, + user_defined_function_kwargs: Dict[str, Any], + extra: Dict[str, Any], + ) -> List[Tuple[int, pd.DataFrame, bool]]: + def compute_result( + key: int, df: pd.DataFrame + ) -> Tuple[int, pd.DataFrame, bool]: + result = user_defined_function( + df, *user_defined_function_args, **user_defined_function_kwargs + ) + mutated = not df_indexed_like(result, df.axes) + return key, result, mutated + + return [compute_result(key, df) for key, df in data] + + @staticmethod + def get_reduce_extra(data: PandasDataFrameGroupBy) -> Dict[str, Any]: + return {"df_groupby": data} + + @staticmethod + def reduce( + datas: Iterable[List[Tuple[int, pd.DataFrame, bool]]], extra: Dict[str, Any] + ) -> pd.Series: + def get_args( + keys: List[int], + values: List[pd.DataFrame], + df_groupby: PandasDataFrameGroupBy, + ) -> Union[ + Tuple[List[int], List[pd.DataFrame]], + Tuple[pd.DataFrame, List[int], List[pd.DataFrame]], + Tuple[pd.DataFrame, List[pd.DataFrame]], + ]: + pandas_version = get_pandas_version() + + if pandas_version < (1, 3): + return keys, values + elif pandas_version < (1, 4): + return df_groupby._selected_obj, keys, values + else: + return df_groupby._selected_obj, values + + df_groupby: PandasDataFrameGroupBy = extra["df_groupby"] + + results = itertools.chain.from_iterable(datas) + keys, values, mutated = zip(*results) + + keys = cast(List[int], keys) + values = cast(List[pd.DataFrame], values) + mutated = cast(List[bool], mutated) + + args = get_args(keys, values, df_groupby) + + return df_groupby._wrap_applied_output( + *args, not_indexed_same=df_groupby.mutated or mutated + ) diff --git a/pandarallel/data_types/expanding_groupby.py b/pandarallel/data_types/expanding_groupby.py index 21e4734..dd83745 100644 --- a/pandarallel/data_types/expanding_groupby.py +++ b/pandarallel/data_types/expanding_groupby.py @@ -1,58 +1,98 @@ +import multiprocessing +from typing import Any, Callable, Dict, Iterable, Iterator, List, Tuple + import pandas as pd +from pandas.core.window.expanding import ExpandingGroupby as PandasExpandingGroupby -from pandarallel.utils.tools import chunk, PROGRESSION +from ..utils import WorkerStatus, chunk, get_pandas_version +from .generic import DataType class ExpandingGroupBy: - @staticmethod - def reduce(results, _): - return pd.concat(results, copy=False) - - @staticmethod - def get_chunks(nb_workers, expanding_groupby, *args, **kwargs): - pandas_version = tuple((int(item) for item in pd.__version__.split("."))) - - nb_items = ( - expanding_groupby._grouper.ngroups - if pandas_version >= (1, 3) - else len(expanding_groupby._groupby) - ) - - chunks = chunk(nb_items, nb_workers) - - iterator = ( - expanding_groupby._grouper.get_iterator(expanding_groupby.obj) - if pandas_version > (1, 3) - else iter(expanding_groupby._groupby) - ) - - for chunk_ in chunks: - yield [next(iterator) for _ in range(chunk_.stop - chunk_.start)] - - @staticmethod - def att2value(expanding): - attributes = { - attribute: getattr(expanding, attribute) - for attribute in expanding._attributes - } - - return attributes - - @staticmethod - def worker( - tuples, index, attribute2value, queue, progress_bar, func, *args, **kwargs - ): - # TODO: See if this pd.concat is avoidable - results = [] - - attribute2value.pop("_grouper", None) - - for iteration, (name, df) in enumerate(tuples): - item = df.expanding(**attribute2value).apply(func, *args, **kwargs) - item.index = pd.MultiIndex.from_product([[name], item.index]) - results.append(item) - - if progress_bar: - queue.put_nowait((PROGRESSION, (index, iteration))) - - return pd.concat(results) + class Apply(DataType): + @staticmethod + def get_chunks( + nb_workers: int, data: PandasExpandingGroupby, *args, **kwargs + ) -> Iterator[List[Tuple[int, pd.DataFrame]]]: + pandas_version = get_pandas_version() + + nb_items = ( + len(data._groupby) if pandas_version < (1, 3) else data._grouper.ngroups + ) + + chunks = chunk(nb_items, nb_workers) + + iterator = ( + iter(data._groupby) + if pandas_version <= (1, 3) + else data._grouper.get_iterator(data.obj) + ) + + for chunk_ in chunks: + yield [next(iterator) for _ in range(chunk_.stop - chunk_.start)] + + @staticmethod + def get_work_extra(data: PandasExpandingGroupby): + attributes = { + attribute: getattr(data, attribute) for attribute in data._attributes + } + + return {"attributes": attributes} + + @staticmethod + def work( + data: List[Tuple[int, pd.DataFrame]], + user_defined_function: Callable, + user_defined_function_args: tuple, + user_defined_function_kwargs: Dict[str, Any], + extra: Dict[str, Any], + ) -> List[pd.DataFrame]: + show_progress_bars: bool = extra["show_progress_bars"] + master_workers_queue: multiprocessing.Queue = extra["master_workers_queue"] + worker_index: int = extra["worker_index"] + + def compute_result( + iteration: int, + attributes: Dict[str, Any], + index: int, + df: pd.DataFrame, + user_defined_function: Callable, + user_defined_function_args: tuple, + user_defined_function_kwargs: Dict[str, Any], + ) -> pd.DataFrame: + item = df.expanding(**attributes).apply( + user_defined_function, + *user_defined_function_args, + **user_defined_function_kwargs + ) + + item.index = pd.MultiIndex.from_product([[index], item.index]) + + if show_progress_bars: + master_workers_queue.put_nowait( + (worker_index, WorkerStatus.Running, iteration) + ) + + return item + + attributes = extra["attributes"] + attributes.pop("_grouper", None) + + dfs = ( + compute_result( + iteration, + attributes, + index, + df, + user_defined_function, + user_defined_function_args, + user_defined_function_kwargs, + ) + for iteration, (index, df) in enumerate(data) + ) + + return pd.concat(dfs) + + @staticmethod + def reduce(datas: Iterable[pd.DataFrame], extra: Dict[str, Any]) -> pd.Series: + return pd.concat(datas, copy=False) diff --git a/pandarallel/data_types/generic.py b/pandarallel/data_types/generic.py new file mode 100644 index 0000000..3b50383 --- /dev/null +++ b/pandarallel/data_types/generic.py @@ -0,0 +1,33 @@ +from abc import ABC, abstractmethod +from typing import Any, Callable, Dict, Iterable, Iterator + + +class DataType(ABC): + @staticmethod + @abstractmethod + def get_chunks(nb_workers: int, data: Any, **kwargs) -> Iterator[Any]: + ... + + @staticmethod + def get_work_extra(data: Any) -> Dict[str, Any]: + return dict() + + @staticmethod + @abstractmethod + def work( + data: Any, + user_defined_function: Callable, + user_defined_function_args: tuple, + user_defined_function_kwargs: Dict[str, Any], + extra: Dict[str, Any], + ) -> Any: + ... + + @staticmethod + def get_reduce_extra(data: Any) -> Dict[str, Any]: + return dict() + + @staticmethod + @abstractmethod + def reduce(datas: Iterable[Any], extra: Dict[str, Any]) -> Any: + ... diff --git a/pandarallel/data_types/rolling_groupby.py b/pandarallel/data_types/rolling_groupby.py index cbb538f..ac2c485 100644 --- a/pandarallel/data_types/rolling_groupby.py +++ b/pandarallel/data_types/rolling_groupby.py @@ -1,67 +1,98 @@ -from datetime import timedelta +import multiprocessing +from typing import Any, Callable, Dict, Iterable, Iterator, List, Tuple import pandas as pd -from pandas.tseries.frequencies import to_offset +from pandas.core.window.rolling import RollingGroupby as PandasRollingGroupby -from pandarallel.utils.tools import chunk, PROGRESSION +from ..utils import WorkerStatus, chunk, get_pandas_version +from .generic import DataType class RollingGroupBy: - @staticmethod - def reduce(results, _): - return pd.concat(results, copy=False) - - @staticmethod - def get_chunks(nb_workers, rolling_groupby, *args, **kwargs): - pandas_version = tuple((int(item) for item in pd.__version__.split("."))) - - nb_items = ( - rolling_groupby._grouper.ngroups - if pandas_version >= (1, 3) - else len(rolling_groupby._groupby) - ) - - chunks = chunk(nb_items, nb_workers) - - iterator = ( - rolling_groupby._grouper.get_iterator(rolling_groupby.obj) - if pandas_version > (1, 3) - else iter(rolling_groupby._groupby) - ) - - for chunk_ in chunks: - yield [next(iterator) for _ in range(chunk_.stop - chunk_.start)] - - @staticmethod - def att2value(rolling): - attributes = { - attribute: getattr(rolling, attribute) for attribute in rolling._attributes - } - - # Fix window for win_type = freq, because then it was defined by the user in a format like '1D' and refers - # to a time window rolling - if "win_type" in attributes and attributes["win_type"] == "freq": - window = to_offset(timedelta(microseconds=int(attributes["window"] / 1000))) - attributes["window"] = window - attributes.pop("win_type") - - return attributes - - @staticmethod - def worker( - tuples, index, attribute2value, queue, progress_bar, func, *args, **kwargs - ): - # TODO: See if this pd.concat is avoidable - results = [] - - attribute2value.pop("_grouper", None) - - for iteration, (name, df) in enumerate(tuples): - item = df.rolling(**attribute2value).apply(func, *args, **kwargs) - item.index = pd.MultiIndex.from_product([[name], item.index]) - results.append(item) - - if progress_bar: - queue.put_nowait((PROGRESSION, (index, iteration))) - - return pd.concat(results) + class Apply(DataType): + @staticmethod + def get_chunks( + nb_workers: int, data: PandasRollingGroupby, *args, **kwargs + ) -> Iterator[List[Tuple[int, pd.DataFrame]]]: + pandas_version = get_pandas_version() + + nb_items = ( + len(data._groupby) if pandas_version < (1, 3) else data._grouper.ngroups + ) + + chunks = chunk(nb_items, nb_workers) + + iterator = ( + iter(data._groupby) + if pandas_version <= (1, 3) + else data._grouper.get_iterator(data.obj) + ) + + for chunk_ in chunks: + yield [next(iterator) for _ in range(chunk_.stop - chunk_.start)] + + @staticmethod + def get_work_extra(data: PandasRollingGroupby): + attributes = { + attribute: getattr(data, attribute) for attribute in data._attributes + } + + return {"attributes": attributes} + + @staticmethod + def work( + data: List[Tuple[int, pd.DataFrame]], + user_defined_function: Callable, + user_defined_function_args: tuple, + user_defined_function_kwargs: Dict[str, Any], + extra: Dict[str, Any], + ) -> List[pd.DataFrame]: + show_progress_bars: bool = extra["show_progress_bars"] + master_workers_queue: multiprocessing.Queue = extra["master_workers_queue"] + worker_index: int = extra["worker_index"] + + def compute_result( + iteration: int, + attributes: Dict[str, Any], + index: int, + df: pd.DataFrame, + user_defined_function: Callable, + user_defined_function_args: tuple, + user_defined_function_kwargs: Dict[str, Any], + ) -> pd.DataFrame: + item = df.rolling(**attributes).apply( + user_defined_function, + *user_defined_function_args, + **user_defined_function_kwargs + ) + + item.index = pd.MultiIndex.from_product([[index], item.index]) + + if show_progress_bars: + master_workers_queue.put_nowait( + (worker_index, WorkerStatus.Running, iteration) + ) + + return item + + attributes = extra["attributes"] + attributes.pop("_grouper", None) + + dfs = ( + compute_result( + iteration, + attributes, + index, + df, + user_defined_function, + user_defined_function_args, + user_defined_function_kwargs, + ) + for iteration, (index, df) in enumerate(data) + ) + + return pd.concat(dfs) + + @staticmethod + def reduce(datas: Iterable[pd.DataFrame], extra: Dict[str, Any]) -> pd.Series: + return pd.concat(datas, copy=False) diff --git a/pandarallel/data_types/series.py b/pandarallel/data_types/series.py index 671b2dd..9052293 100644 --- a/pandarallel/data_types/series.py +++ b/pandarallel/data_types/series.py @@ -1,27 +1,60 @@ +from typing import Any, Callable, Dict, Iterable, Iterator + import pandas as pd -from pandarallel.utils.tools import chunk + +from ..utils import chunk +from .generic import DataType class Series: - @staticmethod - def reduce(results, _): - return pd.concat(results, copy=False) + class Apply(DataType): + @staticmethod + def get_chunks( + nb_workers: int, data: pd.Series, **kwargs + ) -> Iterator[pd.Series]: + for chunk_ in chunk(data.size, nb_workers): + yield data[chunk_] + + @staticmethod + def work( + data: pd.Series, + user_defined_function: Callable, + user_defined_function_args: tuple, + user_defined_function_kwargs: Dict[str, Any], + extra: Dict[str, Any], + ) -> pd.Series: + return data.apply( + user_defined_function, + *user_defined_function_args, + **user_defined_function_kwargs + ) + + @staticmethod + def reduce(datas: Iterable[pd.Series], extra: Dict[str, Any]) -> pd.Series: + return pd.concat(datas, copy=False) - @staticmethod - def get_chunks(nb_workers, series, *args, **kwargs): - for chunk_ in chunk(series.size, nb_workers): - yield series[chunk_] + class Map(DataType): + @staticmethod + def get_chunks( + nb_workers: int, data: pd.Series, **kwargs + ) -> Iterator[pd.Series]: + for chunk_ in chunk(data.size, nb_workers): + yield data[chunk_] - class Apply: @staticmethod - def worker( - series, _index, _meta_args, _progress_bar, _queue, func, *args, **kwargs - ): - return series.apply(func, *args, **kwargs) + def work( + data: pd.Series, + user_defined_function: Callable, + user_defined_function_args: tuple, + user_defined_function_kwargs: Dict[str, Any], + extra: Dict[str, Any], + ) -> pd.Series: + return data.map( + user_defined_function, + *user_defined_function_args, + **user_defined_function_kwargs + ) - class Map: @staticmethod - def worker( - series, _index, _meta_args, _progress_bar, _queue, func, *_, **kwargs - ): - return series.map(func, **kwargs) + def reduce(datas: Iterable[pd.Series], extra: Dict[str, Any]) -> pd.Series: + return pd.concat(datas, copy=False) diff --git a/pandarallel/data_types/series_rolling.py b/pandarallel/data_types/series_rolling.py index eee026d..760cee1 100644 --- a/pandarallel/data_types/series_rolling.py +++ b/pandarallel/data_types/series_rolling.py @@ -1,29 +1,51 @@ +from typing import Any, Callable, Dict, Iterable, Iterator + import pandas as pd -from pandarallel.utils.tools import chunk +from pandas.core.window.rolling import Rolling + +from ..utils import chunk +from .generic import DataType class SeriesRolling: - @staticmethod - def reduce(results, _): - return pd.concat(results, copy=False) - - @staticmethod - def get_chunks(nb_workers, rolling, *args, **kwargs): - chunks = chunk(rolling.obj.size, nb_workers, rolling.window) - - for chunk_ in chunks: - yield rolling.obj[chunk_] - - @staticmethod - def att2value(rolling): - return { - attribute: getattr(rolling, attribute) for attribute in rolling._attributes - } - - @staticmethod - def worker( - series, index, attribue2value, _progress_bar, _queue, func, *args, **kwargs - ): - result = series.rolling(**attribue2value).apply(func, *args, **kwargs) - - return result if index == 0 else result[attribue2value["window"] :] + class Apply(DataType): + @staticmethod + def get_chunks( + nb_workers: int, rolling: Rolling, **kwargs + ) -> Iterator[pd.Series]: + chunks = chunk(rolling.obj.size, nb_workers, rolling.window) + + for chunk_ in chunks: + yield rolling.obj[chunk_] + + @staticmethod + def get_work_extra(data: Rolling) -> Dict[str, Any]: + return { + "attributes": { + attribute: getattr(data, attribute) + for attribute in data._attributes + } + } + + @staticmethod + def work( + data: pd.Series, + user_defined_function: Callable, + user_defined_function_args: tuple, + user_defined_function_kwargs: Dict[str, Any], + extra: Dict[str, Any], + ) -> pd.Series: + attributes: Dict[str, Any] = extra["attributes"] + worker_index: int = extra["worker_index"] + + result = data.rolling(**attributes).apply( + user_defined_function, + *user_defined_function_args, + **user_defined_function_kwargs + ) + + return result if worker_index == 0 else result[attributes["window"] :] + + @staticmethod + def reduce(datas: Iterable[pd.Series], extra: Dict[str, Any]) -> pd.Series: + return pd.concat(datas, copy=False) diff --git a/pandarallel/pandarallel.py b/pandarallel/pandarallel.py deleted file mode 100644 index 4e50863..0000000 --- a/pandarallel/pandarallel.py +++ /dev/null @@ -1,607 +0,0 @@ -"""Main Pandarallel file""" - -import os -import pickle -from itertools import count -from multiprocessing import get_context -from tempfile import NamedTemporaryFile -from time import time - -from pandas import DataFrame, Series -from pandas.core.groupby import DataFrameGroupBy -from pandas.core.window import Rolling, RollingGroupby, ExpandingGroupby - -import dill -from pandarallel.data_types.dataframe import DataFrame as DF -from pandarallel.data_types.dataframe_groupby import DataFrameGroupBy as DFGB -from pandarallel.data_types.rolling_groupby import RollingGroupBy as RGB -from pandarallel.data_types.expanding_groupby import ExpandingGroupBy as EGB -from pandarallel.data_types.series import Series as S -from pandarallel.data_types.series_rolling import SeriesRolling as SR -from pandarallel.utils.inliner import inline -from pandarallel.utils.progress_bars import get_progress_bars, is_notebook_lab -from pandarallel.utils.tools import ERROR, INPUT_FILE_READ, PROGRESSION, VALUE - -# Python 3.8 on MacOS by default uses "spawn" instead of "fork" as start method for new -# processes, which is incompatible with pandarallel. We force it to use "fork" method. -context = get_context("fork") - -# By default, Pandarallel use all available CPUs -NB_WORKERS = context.cpu_count() - -# Prefix and suffix for files used with Memory File System -PREFIX = "pandarallel_" -PREFIX_INPUT = PREFIX + "input_" -PREFIX_OUTPUT = PREFIX + "output_" -SUFFIX = ".pickle" - -# Root of Memory File System -MEMORY_FS_ROOT = "/dev/shm" - -NO_PROGRESS, PROGRESS_IN_WORKER, PROGRESS_IN_FUNC, PROGRESS_IN_FUNC_MUL = list(range(4)) - - -class ProgressState: - last_put_iteration = None - next_put_iteration = None - last_put_time = None - - -# The goal of this part is to let Pandarallel to serialize functions which are not defined -# at the top level of the module (like DataFrame.Apply.worker). This trick is inspired by -# this article: https://medium.com/@yasufumy/python-multiprocessing-c6d54107dd55 -# Warning: In this article, the trick is presented to be able to serialize lambda functions. -# Even if Pandarallel is able to serialize lambda functions, it is only thanks to `dill`. -_func = None - - -def worker_init(func): - global _func - _func = func - - -def global_worker(x): - return _func(x) - - -def is_memory_fs_available(): - """Check if Memory File System is available""" - return os.path.exists(MEMORY_FS_ROOT) - - -def prepare_worker(use_memory_fs): - def closure(function): - def wrapper(worker_args): - """This function runs on WORKERS. - - If Memory File System is used: - 1. Load all pickled files (previously dumped by the MASTER) in the - Memory File System - 2. Undill the function to apply (for lambda functions) - 3. Tell to the MASTER the input file has been read (so the MASTER can remove it - from the memory - 4. Apply the function - 5. Pickle the result in the Memory File System (so the Master can read it) - 6. Tell the master task is finished - - If Memory File System is not used, steps are the same except 1. and 5. which are - skipped. - """ - if use_memory_fs: - ( - input_file_path, - output_file_path, - index, - meta_args, - queue, - progress_bar, - dilled_func, - args, - kwargs, - ) = worker_args - - try: - with open(input_file_path, "rb") as file: - data = pickle.load(file) - queue.put((INPUT_FILE_READ, index)) - - result = function( - data, - index, - meta_args, - queue, - progress_bar, - dill.loads(dilled_func), - *args, - **kwargs - ) - - with open(output_file_path, "wb") as file: - pickle.dump(result, file) - - queue.put((VALUE, index)) - - except Exception: - queue.put((ERROR, index)) - raise - else: - ( - data, - index, - meta_args, - queue, - progress_bar, - dilled_func, - args, - kwargs, - ) = worker_args - - try: - result = function( - data, - index, - meta_args, - queue, - progress_bar, - dill.loads(dilled_func), - *args, - **kwargs - ) - queue.put((VALUE, index)) - - return result - - except Exception: - queue.put((ERROR, index)) - raise - - return wrapper - - return closure - - -def create_temp_files(nb_files): - """Create temporary files in Memory File System.""" - return [ - NamedTemporaryFile(prefix=PREFIX_INPUT, suffix=SUFFIX, dir=MEMORY_FS_ROOT) - for _ in range(nb_files) - ] - - -def progress_pre_func(queue, index, counter, progression, state, time): - """Send progress to the MASTER about every 250 ms. - - The estimation system is implemented to avoid to call time() to often, - which is time consuming. - """ - iteration = next(counter) - - if iteration == state.next_put_iteration: - time_now = time() - queue.put_nowait((progression, (index, iteration))) - - delta_t = time_now - state.last_put_time - delta_i = iteration - state.last_put_iteration - - state.next_put_iteration += max(int((delta_i / delta_t) * 0.25), 1) - state.last_put_iteration = iteration - state.last_put_time = time_now - - -def progress_wrapper(progress_bar, queue, index, chunk_size): - """Wrap the function to apply in a function which monitor the part of work already done. - - inline is used instead of traditional wrapping system to avoid unnecessary function call - (and context switch) which is time consuming. - """ - counter = count() - state = ProgressState() - state.last_put_iteration = 0 - state.next_put_iteration = max(chunk_size // 100, 1) - state.last_put_time = time() - - def wrapper(func): - if progress_bar: - wrapped_func = inline( - progress_pre_func, - func, - dict( - queue=queue, - index=index, - counter=counter, - progression=PROGRESSION, - state=state, - time=time, - ), - ) - return wrapped_func - - return func - - return wrapper - - -def get_workers_args( - use_memory_fs, - nb_workers, - progress_bar, - chunks, - worker_meta_args, - queue, - func, - args, - kwargs, -): - """This function runs on the MASTER. - - If Memory File System is used: - 1. Create temporary files in Memory File System - 2. Dump chunked input files into Memory File System - (So they can be read by workers) - 3. Break input data into several chunks - 4. Wrap the function to apply to display progress bars - 5. Dill the function to apply (to handle lambda functions) - 6. Return the function to be sent to workers and path of files - in the Memory File System - - If Memory File System is not used, steps are the same except 1. and 2. which are - skipped. For step 6., paths are not returned. - """ - - def dump_and_get_lenght(chunk, input_file): - with open(input_file.name, "wb") as file: - pickle.dump(chunk, file) - - return len(chunk) - - if use_memory_fs: - input_files = create_temp_files(nb_workers) - - try: - chunk_lengths = [ - dump_and_get_lenght(chunk, input_file) - for chunk, input_file in zip(chunks, input_files) - ] - - nb_chunks = len(chunk_lengths) - output_files = create_temp_files(nb_chunks) - - except OSError: - link = "https://stackoverflow.com/questions/58804022/how-to-resize-dev-shm" - msg = " ".join( - ( - "It seems you use Memory File System and you don't have enough", - "available space in `dev/shm`. You can either call", - "pandarallel.initalize with `use_memory_fs=False`, or you can ", - "increase the size of `dev/shm` as described here:", - link, - ".", - " Please also remove all files beginning with 'pandarallel_' in the", - "`/dev/shm` directory. If you have troubles with your web browser,", - "these troubles should disappear after cleaning `/dev/shm`.", - ) - ) - raise OSError(msg) - - workers_args = [ - ( - input_file.name, - output_file.name, - index, - worker_meta_args, - queue, - progress_bar == PROGRESS_IN_WORKER, - dill.dumps( - progress_wrapper( - progress_bar >= PROGRESS_IN_FUNC, queue, index, chunk_length - )(func) - ), - args, - kwargs, - ) - for index, (input_file, output_file, chunk_length) in enumerate( - zip(input_files, output_files, chunk_lengths) - ) - ] - - return workers_args, chunk_lengths, input_files, output_files - - else: - workers_args, chunk_lengths = zip( - *[ - ( - ( - chunk, - index, - worker_meta_args, - queue, - progress_bar, - dill.dumps( - progress_wrapper( - progress_bar == PROGRESS_IN_FUNC, - queue, - index, - len(chunk), - )(func) - ), - args, - kwargs, - ), - len(chunk), - ) - for index, chunk in enumerate(chunks) - ] - ) - - return workers_args, chunk_lengths, [], [] - - -def get_workers_result( - use_memory_fs, - nb_workers, - show_progress_bar, - nb_columns, - queue, - chunk_lengths, - input_files, - output_files, - map_result, -): - """Wait for the workers' results while eventually display progress bars.""" - if show_progress_bar: - if show_progress_bar == PROGRESS_IN_FUNC_MUL: - chunk_lengths = [ - chunk_length * (nb_columns + 1) for chunk_length in chunk_lengths - ] - - progress_bars = get_progress_bars(chunk_lengths) - - progresses = [0] * nb_workers - - finished_workers = [False] * nb_workers - - generation = count() - - while not all(finished_workers): - message_type, message = queue.get() - - if message_type is INPUT_FILE_READ: - file_index = message - input_files[file_index].close() - - elif message_type is PROGRESSION: - worker_index, progression = message - progresses[worker_index] = progression - - if next(generation) % nb_workers == 0: - progress_bars.update(progresses) - - elif message_type is VALUE: - worker_index = message - finished_workers[worker_index] = VALUE - - if show_progress_bar: - progresses[worker_index] = chunk_lengths[worker_index] - progress_bars.update(progresses) - - elif message_type is ERROR: - worker_index = message - finished_workers[worker_index] = ERROR - - if show_progress_bar: - if is_notebook_lab(): - progress_bars.set_error(worker_index) - progress_bars.update(progresses) - - results = map_result.get() - - return ( - [pickle.load(output_files) for output_files in output_files] - if use_memory_fs - else results - ) - - -def parallelize( - nb_requested_workers, - use_memory_fs, - progress_bar, - get_chunks, - worker, - reduce, - get_worker_meta_args=lambda _: dict(), - get_reduce_meta_args=lambda _: dict(), -): - """Master function. - 1. Split data into chunks - 2. Send chunks to workers - 3. Wait for the workers' results (while displaying a progress bar if needed) - 4. Once results are available, combine them - 5. Return combined results to the user - """ - - def closure(data, func, *args, **kwargs): - chunks = get_chunks(nb_requested_workers, data, *args, **kwargs) - nb_columns = len(data.columns) if progress_bar == PROGRESS_IN_FUNC_MUL else None - worker_meta_args = get_worker_meta_args(data) - reduce_meta_args = get_reduce_meta_args(data) - manager = context.Manager() - queue = manager.Queue() - - workers_args, chunk_lengths, input_files, output_files = get_workers_args( - use_memory_fs, - nb_requested_workers, - progress_bar, - chunks, - worker_meta_args, - queue, - func, - args, - kwargs, - ) - - nb_workers = len(chunk_lengths) - - try: - pool = context.Pool( - nb_workers, worker_init, (prepare_worker(use_memory_fs)(worker),), - ) - - map_result = pool.map_async(global_worker, workers_args) - pool.close() - - results = get_workers_result( - use_memory_fs, - nb_workers, - progress_bar, - nb_columns, - queue, - chunk_lengths, - input_files, - output_files, - map_result, - ) - - return reduce(results, reduce_meta_args) - - finally: - if use_memory_fs: - for file in input_files + output_files: - file.close() - - return closure - - -class pandarallel: - @classmethod - def initialize( - cls, - shm_size_mb=None, - nb_workers=NB_WORKERS, - progress_bar=False, - verbose=2, - use_memory_fs=None, - ): - """ - Initialize Pandarallel shared memory. - - Parameters - ---------- - shm_size_mb: int, optional - Deprecated - - nb_workers: int, optional - Number of workers used for parallelisation - If not set, all available CPUs will be used. - - progress_bar: bool, optional - Display progress bars if set to `True` - - verbose: int, optional - The verbosity level - 0 - Don't display any logs - 1 - Display only warning logs - 2 - Display all logs - - use_memory_fs: bool, optional - If set to None and if memory file system is available, Pandaralllel will use - it to transfer data between the main process and workers. If memory file - system is not available, Pandarallel will default on multiprocessing data - transfer (pipe). - - If set to True, Pandarallel will use memory file system to transfer data - between the main process and workers and will raise a SystemError if memory - file system is not available. - - If set to False, Pandarallel will use multiprocessing data transfer - (pipe) to transfer data between the main process and workers. - - Using memory file system reduces data transfer time between the main - process and workers, especially for big data. - - Memory file system is considered as available only if the - directory `/dev/shm` exists and if the user has read and write - permission on it. - - Basically memory file system is only available on some Linux - distributions (including Ubuntu) - """ - - memory_fs_available = is_memory_fs_available() - use_memory_fs = use_memory_fs or use_memory_fs is None and memory_fs_available - - if shm_size_mb: - print( - "WARNING: `shm_size_mb` is a deprecated argument. " - "It will be removed in `pandarallel 2.0.0`." - ) - - if use_memory_fs and not memory_fs_available: - raise SystemError("Memory file system is not available") - - if verbose >= 2: - print("INFO: Pandarallel will run on", nb_workers, "workers.") - - if use_memory_fs: - print( - "INFO: Pandarallel will use Memory file system to transfer data", - "between the main process and workers.", - sep=" ", - ) - else: - print( - "INFO: Pandarallel will use standard multiprocessing data transfer", - "(pipe) to transfer data between the main", - "process and workers.", - sep=" ", - ) - - nbw = nb_workers - - progress_in_func = PROGRESS_IN_FUNC * progress_bar - progress_in_func_mul = PROGRESS_IN_FUNC_MUL * progress_bar - progress_in_worker = PROGRESS_IN_WORKER * progress_bar - - bargs_prog_func = (nbw, use_memory_fs, progress_in_func) - bargs_prog_func_mul = (nbw, use_memory_fs, progress_in_func_mul) - - bargs_prog_worker = (nbw, use_memory_fs, progress_in_worker) - - # DataFrame - args = bargs_prog_func + (DF.Apply.get_chunks, DF.Apply.worker, DF.reduce) - DataFrame.parallel_apply = parallelize(*args) - - args = bargs_prog_func_mul + ( - DF.ApplyMap.get_chunks, - DF.ApplyMap.worker, - DF.reduce, - ) - - DataFrame.parallel_applymap = parallelize(*args) - - # Series - args = bargs_prog_func + (S.get_chunks, S.Apply.worker, S.reduce) - Series.parallel_apply = parallelize(*args) - - args = bargs_prog_func + (S.get_chunks, S.Map.worker, S.reduce) - Series.parallel_map = parallelize(*args) - - # Series Rolling - args = bargs_prog_func + (SR.get_chunks, SR.worker, SR.reduce) - kwargs = dict(get_worker_meta_args=SR.att2value) - Rolling.parallel_apply = parallelize(*args, **kwargs) - - # DataFrame GroupBy - args = bargs_prog_func + (DFGB.get_chunks, DFGB.worker, DFGB.reduce) - kwargs = dict(get_reduce_meta_args=DFGB.get_reduce_meta_args) - DataFrameGroupBy.parallel_apply = parallelize(*args, **kwargs) - - # Rolling GroupBy - args = bargs_prog_worker + (RGB.get_chunks, RGB.worker, RGB.reduce) - kwargs = dict(get_worker_meta_args=RGB.att2value) - RollingGroupby.parallel_apply = parallelize(*args, **kwargs) - - # Expanding GroupBy - args = bargs_prog_worker + (EGB.get_chunks, EGB.worker, EGB.reduce) - kwargs = dict(get_worker_meta_args=EGB.att2value) - ExpandingGroupby.parallel_apply = parallelize(*args, **kwargs) diff --git a/pandarallel/progress_bars.py b/pandarallel/progress_bars.py new file mode 100644 index 0000000..ee65965 --- /dev/null +++ b/pandarallel/progress_bars.py @@ -0,0 +1,209 @@ +import multiprocessing +import os +import shutil +import sys +from abc import ABC, abstractmethod +from enum import Enum +from itertools import count +from time import time +from typing import Callable, List, Union + +from .utils import WorkerStatus + +MINIMUM_TERMINAL_WIDTH = 72 + + +class ProgressBarsType(int, Enum): + No = 0 + InUserDefinedFunction = 1 + InUserDefinedFunctionMultiplyByNumberOfColumns = 2 + InWorkFunction = 3 + + +class ProgressBars(ABC): + @abstractmethod + def __init__(self, maxs: List[int], show: bool) -> None: + ... + + @abstractmethod + def update(self, values: List[int]) -> None: + ... + + def set_error(self, index: int) -> None: + pass + + +class ProgressState: + def __init__(self, chunk_size: int) -> None: + self.last_put_iteration = 0 + self.next_put_iteration = max(chunk_size // 100, 1) + self.last_put_time = time() + + +def is_notebook_lab() -> bool: + try: + shell = get_ipython().__class__.__name__ # type: ignore + + # ZMQInteractiveShell: Jupyter notebook/lab or qtconsole + # TerminalInteractiveShell: Terminal running IPython + return shell == "ZMQInteractiveShell" + except NameError: + # Probably standard Python interpreter + return False + + +class ProgressBarsConsole(ProgressBars): + def __init__(self, maxs: List[int], show: bool) -> None: + self.__bars = [[0, max] for max in maxs] + self.__width = self.__get_width() + + self.__lines = self.__update_lines() + + if show: + sys.stdout.write("\n".join(self.__lines)) + sys.stdout.flush() + + def __get_width(self) -> int: + try: + columns = shutil.get_terminal_size().columns + return max(MINIMUM_TERMINAL_WIDTH, columns - 1) + except AttributeError: + # Python 2 + pass + + try: + columns = int(os.popen("stty size", "r").read().split()[1]) + return max(MINIMUM_TERMINAL_WIDTH, columns - 1) + except: + return MINIMUM_TERMINAL_WIDTH + + def __remove_displayed_lines(self) -> None: + if len(self.__bars) >= 1: + sys.stdout.write("\b" * len(self.__lines[-1])) + + if len(self.__bars) >= 2: + sys.stdout.write("\033M" * (len(self.__lines) - 1)) + + self.__lines = [] + + def __update_line(self, done: int, total: int) -> str: + percent = done / total + bar = (":" * int(percent * 40)).ljust(40, " ") + percent = round(percent * 100, 2) + format = " {percent:6.2f}% {bar:s} | {done:8d} / {total:8d} |" + ret = format.format(percent=percent, bar=bar, done=done, total=total) + return ret[: self.__width].ljust(self.__width, " ") + + def __update_lines(self) -> List[str]: + return [self.__update_line(value, max) for value, max in self.__bars] + + def update(self, values: List[int]) -> None: + """Update a bar value. + Positional arguments: + values - The new values of each bar + """ + for index, value in enumerate(values): + self.__bars[index][0] = value + + self.__remove_displayed_lines() + self.__lines = self.__update_lines() + + sys.stdout.write("\n".join(self.__lines)) + sys.stdout.flush() + + +class ProgressBarsNotebookLab(ProgressBars): + def __init__(self, maxs: List[int], show: bool) -> None: + """Initialization. + Positional argument: + maxs - List containing the max value of each progress bar + """ + self.__show = show + + if not show: + return + + from IPython.display import display + from ipywidgets import HBox, IntProgress, Label, VBox + + self.__bars = [ + HBox( + [ + IntProgress(0, 0, max, description="{:.2f}%".format(0)), + Label("{} / {}".format(0, max)), + ] + ) + for max in maxs + ] + + display(VBox(self.__bars)) + + def update(self, values: List[int]) -> None: + """Update a bar value. + Positional arguments: + values - The new values of each bar + """ + if not self.__show: + return + + for index, value in enumerate(values): + bar, label = self.__bars[index].children + + bar.value = value + bar.description = "{:.2f}%".format(value / bar.max * 100) + + if value >= bar.max: + bar.bar_style = "success" + + label.value = "{} / {}".format(value, bar.max) + + def set_error(self, index: int) -> None: + """Set a bar on error""" + if not self.__show: + return + + bar, _ = self.__bars[index].children + bar.bar_style = "danger" + + +def get_progress_bars( + maxs: List[int], show +) -> Union[ProgressBarsNotebookLab, ProgressBarsConsole]: + return ( + ProgressBarsNotebookLab(maxs, show) + if is_notebook_lab() + else ProgressBarsConsole(maxs, show) + ) + + +def progress_wrapper( + user_defined_function: Callable, + master_workers_queue: multiprocessing.Queue, + index: int, + chunk_size: int, +) -> Callable: + """Wrap the function to apply in a function which monitor the part of work already + done. + """ + counter = count() + state = ProgressState(chunk_size) + + def closure(*user_defined_function_args, **user_defined_functions_kwargs): + iteration = next(counter) + + if iteration == state.next_put_iteration: + time_now = time() + master_workers_queue.put_nowait((index, WorkerStatus.Running, iteration)) + + delta_t = time_now - state.last_put_time + delta_i = iteration - state.last_put_iteration + + state.next_put_iteration += max(int((delta_i / delta_t) * 0.25), 1) + state.last_put_iteration = iteration + state.last_put_time = time_now + + return user_defined_function( + *user_defined_function_args, **user_defined_functions_kwargs + ) + + return closure diff --git a/pandarallel/utils/tools.py b/pandarallel/utils.py similarity index 78% rename from pandarallel/utils/tools.py rename to pandarallel/utils.py index 15f43d1..c83e93e 100644 --- a/pandarallel/utils/tools.py +++ b/pandarallel/utils.py @@ -1,11 +1,12 @@ -import itertools as _itertools -from pandas import DataFrame, Index -from typing import List +import itertools +from enum import Enum +from typing import List, Tuple -INPUT_FILE_READ, PROGRESSION, VALUE, ERROR = list(range(4)) +import pandas as pd +from pandas import DataFrame, Index -def chunk(nb_item, nb_chunks, start_offset=0): +def chunk(nb_item: int, nb_chunks: int, start_offset=0) -> List[slice]: """ Return `nb_chunks` slices of approximatively `nb_item / nb_chunks` each. @@ -24,13 +25,11 @@ def chunk(nb_item, nb_chunks, start_offset=0): ------- A list of slices - Examples -------- - >>> chunks = _pandarallel._chunk(103, 4) + >>> chunks = chunk(103, 4) >>> chunks - [slice(0, 26, None), slice(26, 52, None), slice(52, 78, None), - slice(78, 103, None)] + [slice(0, 26, None), slice(26, 52, None), slice(52, 78, None), slice(78, 103, None)] """ if nb_item <= nb_chunks: return [slice(max(0, idx - start_offset), idx + 1) for idx in range(nb_item)] @@ -45,7 +44,7 @@ def chunk(nb_item, nb_chunks, start_offset=0): quotient + remainder for quotient, remainder in zip(quotients, remainders) ] - accumulated = list(_itertools.accumulate(nb_elems_per_chunk)) + accumulated = list(itertools.accumulate(nb_elems_per_chunk)) shifted_accumulated = accumulated.copy() shifted_accumulated.insert(0, 0) shifted_accumulated.pop() @@ -81,3 +80,14 @@ def df_indexed_like(df: DataFrame, axes: List[Index]) -> bool: return df.axes[0].equals(axes[0]) return False + + +def get_pandas_version() -> Tuple[int, int]: + major_str, minor_str, *_ = pd.__version__.split(".") + return int(major_str), int(minor_str) + + +class WorkerStatus(int, Enum): + Running = 0 + Success = 1 + Error = 2 diff --git a/pandarallel/utils/__init__.py b/pandarallel/utils/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/pandarallel/utils/inliner.py b/pandarallel/utils/inliner.py deleted file mode 100644 index 82a6400..0000000 --- a/pandarallel/utils/inliner.py +++ /dev/null @@ -1,561 +0,0 @@ -import dis -import re -import sys -from inspect import signature -from itertools import chain, tee -from types import CodeType, FunctionType -from typing import Any, Dict, Iterable, List, Tuple - - -class OpCode: - JUMP_ABSOLUTE = b"q" - JUMP_IF_FALSE_OR_POP = b"o" - JUMP_IF_TRUE_OR_POP = b"p" - LOAD_ATTR = b"j" - LOAD_CONST = b"d" - LOAD_FAST = b"|" - LOAD_GLOBAL = b"t" - LOAD_METHOD = b"\xa0" - POP_JUMP_IF_FALSE = b"r" - POP_JUMP_IF_TRUE = b"s" - RETURN_VALUE = b"S" - STORE_ATTR = b"_" - STORE_FAST = b"}" - - -def ensure_python_version(function): - """Raise SystemError if Python version not in 3.{5, 6, 7, 8}""" - - def wrapper(*args, **kwargs): - python_version = sys.version_info - if not (python_version.major == 3 and python_version.minor in (5, 6, 7, 8, 9)): - raise SystemError("Python version should be 3.{5, 6, 7, 8, 9}") - - return function(*args, **kwargs) - - return wrapper - - -def remove_duplicates(tuple_: Tuple[Any, ...]) -> Tuple[Any, ...]: - """Remove duplicate in tuple `tuple_`. - - Example: tuple_ = (3, 1, 2, 2, 1, 4) - The returned tuple is: (3, 1, 2, 4) - """ - - return tuple(sorted(set(tuple_), key=tuple_.index)) - - -@ensure_python_version -def int2python_bytes(item: int) -> bytes: - """Convert an integer into Python bytes, depending of the Python version. - - Examples: - - With Python 3.5: - int2python_bytes(5) = b"\x05\x00" - int2python_bytes(255) = b"\xFF\x00" - int2python_bytes(257) = b"\x01\x01" - - With Python 3.{6, 7}: - int2python_bytes(5) = b"\x05" - int2python_bytes(255) = b"\xFF" - - If Python version is not 3.5, 3.6 or 3.7, a SystemError is raised. - For Python 3.5, if item not in [0, 65535], a OverflowError: is raised. - For Python 3.{6, 7}, if item not in [0, 255], a OverflowError: is raised. - """ - python_version = sys.version_info - - nb_bytes = 2 if python_version.minor == 5 else 1 - return int.to_bytes(item, nb_bytes, "little") - - -@ensure_python_version -def python_ints2int(items: List[int]) -> int: - """Convert Python integer (depending of Python version) to integer - - If Python 3.5: - python_ints2int([3, 2]) = 3 + 2 * 256 = 515 - python_ints2int([4, 0]) = 4 + 0 * 256 = 4 - - If Python 3.{6, 7}: - python_ints2int([3]) = 3 - python_ints2int([0]) = 0 - - If Python version is not 3.5, 3.6 or 3.7, a SystemError is raised. - If at least one element of items is not in [0, 255], a ValueError is raised. - - If Python version is 3.5 and items does not contain exactly 2 elements, a ValueError - is raised. - - If Python version is 3.{6, 7} and items does not contain exactly 1 element, a - ValueError is raised. - """ - if not all(0 <= item <= 255 for item in items): - raise ValueError("Each element of items shoud be in [0, 255]") - - python_version = sys.version_info - if python_version.minor == 5: - if len(items) != 2: - raise ValueError("items should contain exactly 2 elements") - return items[0] + (items[1] << 8) - else: - if len(items) != 1: - raise ValueError("items should contain exactly 1 elements") - return items[0] - - -@ensure_python_version -def get_instructions(func: FunctionType) -> Iterable[bytes]: - """Return a list of bytes where each item of a list correspond to an instruction. - - Exemple: - def function(x, y): - print(x, y) - - With Python 3.5, corresponding pretty bytecode is: - 1 0 LOAD_GLOBAL 0 (print) - 3 LOAD_FAST 0 (x) - 6 LOAD_FAST 1 (y) - 9 CALL_FUNCTION 2 (2 positional, 0 keyword pair) - 12 POP_TOP - 13 LOAD_CONST 0 (None) - 16 RETURN_VALUE - - Corresponding bytecode is: b't\x00\x00|\x00\x00|\x01\x00\x83\x02\x00\x01d\x00\x00S' - - tuple(get_instructions(function)) = (b't\x00\x00', b'|\x00\x00', - b'|\x01\x00', b'\x83\x02\x00', b'\x01', - b'd\x00\x00', b'S') - - With Python 3.6 & 3.7, corresponding bytecode is: - 1 0 LOAD_GLOBAL 0 (print) - 2 LOAD_FAST 0 (x) - 4 LOAD_FAST 1 (y) - 6 CALL_FUNCTION 2 - 8 POP_TOP - 10 LOAD_CONST 0 (None) - 12 RETURN_VALUE - - Corresponding bytecode is: b't\x00|\x00|\x01\x83\x02\x01\x00d\x00S\x00' - - tuple(get_instructions(function)) = (b't\x00', b'|\x00', b'|\x01', - b'\x83\x02', b'\x01\x00', b'd\x00', - b'S\x00') - - If Python version not in 3.{5, 6, 7}, a SystemError is raised. - """ - - def pairwise(iterable): - """s -> (s0,s1), (s1,s2), (s2, s3), ...""" - a, b = tee(iterable) - next(b, None) - return zip(a, b) - - func_co_code = func.__code__.co_code - len_bytecode = len(func_co_code) - - instructions_offsets = tuple(instr.offset for instr in dis.Bytecode(func)) + ( - len_bytecode, - ) - - return (func_co_code[start:stop] for start, stop in pairwise(instructions_offsets)) - - -@ensure_python_version -def has_no_return(func: FunctionType) -> bool: - """Return True if `func` returns nothing, else return False - - If Python version not in 3.{5, 6, 7}, a SystemError is raised. - """ - - code = func.__code__ - - co_code = code.co_code - co_consts = code.co_consts - - instructions = tuple(get_instructions(func)) - - return_offsets = tuple( - index - for index, instruction in enumerate(instructions) - if instruction[0] == int.from_bytes(OpCode.RETURN_VALUE, byteorder="little") - ) - - load_const_none = OpCode.LOAD_CONST + bytes((co_consts.index(None),)) - - return len(return_offsets) == 1 and instructions[-2][0:2] == load_const_none - - -def has_duplicates(tuple_: Tuple): - """Return True if `tuple_` contains duplicates items. - - Exemple: has_duplicates((1, 3, 2, 4)) == False - has_duplicates((1, 3, 2, 3)) == True - """ - - return len(set(tuple_)) != len(tuple_) - - -def get_transitions(olds: Tuple, news: Tuple) -> Dict[int, int]: - """Returns a dictionnary where a key represents a position of an item in olds and - a value represents the position of the same item in news. - - If an element of `olds` is not in `news`, then the corresponding value will be - `None`. - - Exemples: - olds = ("a", "c", "b", "d") - news_1 = ("f", "g", "c", "d", "b", "a") - news_2 = ("c", "d") - - get_transitions(olds, news_1) = {0: 5, 1: 2, 2: 4, 3: 3} - get_transitions(olds, news_2) = {1: 0, 3: 1} - - `olds` and `news` should not have any duplicates, else a ValueError is raised. - """ - if has_duplicates(olds): - raise ValueError("`olds` has duplicates") - - if has_duplicates(news): - raise ValueError("`news` has duplicates") - - return { - index_old: news.index(old) - for index_old, old in [(olds.index(old), old) for old in olds if old in news] - } - - -@ensure_python_version -def get_b_transitions( - transitions: Dict[int, int], byte_source: bytes, byte_dest: bytes -) -> Dict[bytes, bytes]: - return { - byte_source + int2python_bytes(key): byte_dest + int2python_bytes(value) - for key, value in transitions.items() - } - - -@ensure_python_version -def are_functions_equivalent(l_func, r_func): - """Return True if `l_func` and `r_func` are equivalent - - If Python version not in 3.{5, 6, 7}, a SystemError is raised. - """ - l_code, r_code = l_func.__code__, r_func.__code__ - - trans_co_consts = get_transitions(l_code.co_consts, r_code.co_consts) - trans_co_names = get_transitions(l_code.co_names, r_code.co_names) - trans_co_varnames = get_transitions(l_code.co_varnames, r_code.co_varnames) - - transitions = { - **get_b_transitions(trans_co_consts, OpCode.LOAD_CONST, OpCode.LOAD_CONST), - **get_b_transitions(trans_co_names, OpCode.LOAD_GLOBAL, OpCode.LOAD_GLOBAL), - **get_b_transitions(trans_co_names, OpCode.LOAD_METHOD, OpCode.LOAD_METHOD), - **get_b_transitions(trans_co_names, OpCode.LOAD_ATTR, OpCode.LOAD_ATTR), - **get_b_transitions(trans_co_names, OpCode.STORE_ATTR, OpCode.STORE_ATTR), - **get_b_transitions(trans_co_varnames, OpCode.LOAD_FAST, OpCode.LOAD_FAST), - **get_b_transitions(trans_co_varnames, OpCode.STORE_FAST, OpCode.STORE_FAST), - } - - l_instructions = get_instructions(l_func) - r_instructions = get_instructions(r_func) - - new_l_instructions = tuple( - transitions.get(instruction, instruction) for instruction in l_instructions - ) - - new_l_co_code = b"".join(new_l_instructions) - - co_code_cond = new_l_co_code == r_code.co_code - co_consts_cond = set(l_code.co_consts) == set(r_code.co_consts) - co_names_cond = set(l_code.co_names) == set(l_code.co_names) - co_varnames_cond = set(l_code.co_varnames) == set(l_code.co_varnames) - - return co_code_cond and co_consts_cond and co_names_cond and co_varnames_cond - - -@ensure_python_version -def shift_instruction(instruction: bytes, qty: int) -> bytes: - """Shift an instruction by qty. - - Examples: - With Python 3.5: - shift_instruction(b"d\x05\x00", 3) = b"d\x08\x00" - - With Python 3.{6, 7}: - shift_instruction(b"d\x05", 3) = b"d\x08" - - If Python version not in 3.{5, 6, 7}, a SystemError is raised. - """ - operation, *values = instruction - return bytes((operation,)) + int2python_bytes(python_ints2int(values) + qty) - - -@ensure_python_version -def shift_instructions(instructions: Tuple[bytes], qty: int) -> Tuple[bytes]: - """Shift JUMP_ABSOLUTE, JUMP_IF_FALSE_OR_POP, - JUMP_IF_TRUE_OR_POP, POP_JUMP_IF_FALSE & POP_JUMP_IF_TRUE instructions by qty - - If Python version not in 3.{5, 6, 7}, a SystemError is raised. - """ - return tuple( - shift_instruction(instruction, qty) - if bytes((instruction[0],)) - in ( - OpCode.JUMP_ABSOLUTE, - OpCode.JUMP_IF_FALSE_OR_POP, - OpCode.JUMP_IF_TRUE_OR_POP, - OpCode.POP_JUMP_IF_FALSE, - OpCode.POP_JUMP_IF_TRUE, - ) - else instruction - for instruction in instructions - ) - - -@ensure_python_version -def pin_arguments(func: FunctionType, arguments: dict): - """Transform `func` in a function with no arguments. - - Example: - - def func(a, b): - c = 4 - print(str(a) + str(c)) - - return b - - The function returned by pin_arguments(func, {"a": 10, "b": 11}) is equivalent to: - - def pinned_func(): - c = 4 - print(str(10) + str(c)) - - return 11 - - This function is in some ways equivalent to functools.partials but with a faster - runtime. - - `arguments` keys should be identical as `func` arguments names else a TypeError is - raised. - """ - - if signature(func).parameters.keys() != set(arguments): - raise TypeError("`arguments` and `func` arguments do not correspond") - - func_code = func.__code__ - func_co_consts = func_code.co_consts - func_co_varnames = func_code.co_varnames - - new_co_consts = remove_duplicates(func_co_consts + tuple(arguments.values())) - new_co_varnames = tuple(item for item in func_co_varnames if item not in arguments) - - trans_co_varnames2_co_consts = { - func_co_varnames.index(key): new_co_consts.index(value) - for key, value in arguments.items() - } - - trans_co_varnames = get_transitions(func_co_varnames, new_co_varnames) - - transitions = { - **get_b_transitions( - trans_co_varnames2_co_consts, OpCode.LOAD_FAST, OpCode.LOAD_CONST - ), - **get_b_transitions(trans_co_varnames, OpCode.LOAD_FAST, OpCode.LOAD_FAST), - **get_b_transitions(trans_co_varnames, OpCode.STORE_FAST, OpCode.STORE_FAST), - } - - func_instructions = get_instructions(func) - new_func_instructions = tuple( - transitions.get(instruction, instruction) for instruction in func_instructions - ) - - new_co_code = b"".join(new_func_instructions) - - new_func = FunctionType( - func.__code__, - func.__globals__, - func.__name__, - func.__defaults__, - func.__closure__, - ) - - nfcode = new_func.__code__ - - python_version = sys.version_info - - if python_version.minor < 8: - new_func.__code__ = CodeType( - 0, - 0, - len(new_co_varnames), - nfcode.co_stacksize, - nfcode.co_flags, - new_co_code, - new_co_consts, - nfcode.co_names, - new_co_varnames, - nfcode.co_filename, - nfcode.co_name, - nfcode.co_firstlineno, - nfcode.co_lnotab, - nfcode.co_freevars, - nfcode.co_cellvars, - ) - - return new_func - - new_func.__code__ = CodeType( - 0, - 0, - 0, - len(new_co_varnames), - nfcode.co_stacksize, - nfcode.co_flags, - new_co_code, - new_co_consts, - nfcode.co_names, - new_co_varnames, - nfcode.co_filename, - nfcode.co_name, - nfcode.co_firstlineno, - nfcode.co_lnotab, - nfcode.co_freevars, - nfcode.co_cellvars, - ) - - return new_func - - -@ensure_python_version -def inline(pre_func: FunctionType, func: FunctionType, pre_func_arguments: dict): - """Insert `prefunc` at the beginning of `func` and return the corresponding - function. - - `pre_func` should not have a return statement (else a ValueError is raised). - `pre_func_arguments` keys should be identical as `pre_func` arguments names else a - TypeError is raised. - - This approach takes less CPU instructions than the standard decorator approach. - - Example: - - def pre_func(b, c): - a = "hello" - print(a + " " + b + " " + c) - - def func(x, y): - z = x + 2 * y - return z ** 2 - - The returned function corresponds to: - - def inlined(x, y): - a = "hello" - print(a) - z = x + 2 * y - return z ** 2 - """ - - new_func = FunctionType( - func.__code__, - func.__globals__, - func.__name__, - func.__defaults__, - func.__closure__, - ) - - if not has_no_return(pre_func): - raise ValueError("`pre_func` returns something") - - pinned_pre_func = pin_arguments(pre_func, pre_func_arguments) - pinned_pre_func_code = pinned_pre_func.__code__ - pinned_pre_func_co_consts = pinned_pre_func_code.co_consts - pinned_pre_func_co_names = pinned_pre_func_code.co_names - pinned_pre_func_co_varnames = pinned_pre_func_code.co_varnames - pinned_pre_func_instructions = tuple(get_instructions(pinned_pre_func)) - pinned_pre_func_instructions_without_return = pinned_pre_func_instructions[:-2] - - func_code = func.__code__ - func_co_consts = func_code.co_consts - func_co_names = func_code.co_names - func_co_varnames = func_code.co_varnames - - func_instructions = tuple(get_instructions(func)) - shifted_func_instructions = shift_instructions( - func_instructions, len(b"".join(pinned_pre_func_instructions_without_return)) - ) - - new_co_consts = remove_duplicates(func_co_consts + pinned_pre_func_co_consts) - new_co_names = remove_duplicates(func_co_names + pinned_pre_func_co_names) - new_co_varnames = remove_duplicates(func_co_varnames + pinned_pre_func_co_varnames) - - trans_co_consts = get_transitions(pinned_pre_func_co_consts, new_co_consts) - trans_co_names = get_transitions(pinned_pre_func_co_names, new_co_names) - trans_co_varnames = get_transitions(pinned_pre_func_co_varnames, new_co_varnames) - - transitions = { - **get_b_transitions(trans_co_consts, OpCode.LOAD_CONST, OpCode.LOAD_CONST), - **get_b_transitions(trans_co_names, OpCode.LOAD_GLOBAL, OpCode.LOAD_GLOBAL), - **get_b_transitions(trans_co_names, OpCode.LOAD_METHOD, OpCode.LOAD_METHOD), - **get_b_transitions(trans_co_names, OpCode.LOAD_ATTR, OpCode.LOAD_ATTR), - **get_b_transitions(trans_co_names, OpCode.STORE_ATTR, OpCode.STORE_ATTR), - **get_b_transitions(trans_co_varnames, OpCode.LOAD_FAST, OpCode.LOAD_FAST), - **get_b_transitions(trans_co_varnames, OpCode.STORE_FAST, OpCode.STORE_FAST), - } - - new_pinned_pre_func_instructions = tuple( - transitions.get(instruction, instruction) - for instruction in pinned_pre_func_instructions_without_return - ) - - new_instructions = new_pinned_pre_func_instructions + shifted_func_instructions - new_co_code = b"".join(new_instructions) - - nfcode = new_func.__code__ - - python_version = sys.version_info - - if python_version.minor < 8: - new_func.__code__ = CodeType( - nfcode.co_argcount, - nfcode.co_kwonlyargcount, - len(new_co_varnames), - nfcode.co_stacksize, - nfcode.co_flags, - new_co_code, - new_co_consts, - new_co_names, - new_co_varnames, - nfcode.co_filename, - nfcode.co_name, - nfcode.co_firstlineno, - nfcode.co_lnotab, - nfcode.co_freevars, - nfcode.co_cellvars, - ) - - return new_func - - new_func.__code__ = CodeType( - nfcode.co_argcount, - nfcode.co_posonlyargcount, - nfcode.co_kwonlyargcount, - len(new_co_varnames), - nfcode.co_stacksize, - nfcode.co_flags, - new_co_code, - new_co_consts, - new_co_names, - new_co_varnames, - nfcode.co_filename, - nfcode.co_name, - nfcode.co_firstlineno, - nfcode.co_lnotab, - nfcode.co_freevars, - nfcode.co_cellvars, - ) - - return new_func diff --git a/pandarallel/utils/progress_bars.py b/pandarallel/utils/progress_bars.py deleted file mode 100644 index 9642a53..0000000 --- a/pandarallel/utils/progress_bars.py +++ /dev/null @@ -1,131 +0,0 @@ -import shutil -import sys - -MINIMUM_TERMINAL_WIDTH = 72 - - -def is_notebook_lab(): - try: - shell = get_ipython().__class__.__name__ - if shell == "ZMQInteractiveShell": - # Jupyter notebook/lab or qtconsole - return True - elif shell == "TerminalInteractiveShell": - # Terminal running IPython - return False - else: - # Other type (?) - return False - except NameError: - # Probably standard Python interpreter - return False - - -def get_progress_bars(maxs): - return ( - ProgressBarsNotebookLab(maxs) - if is_notebook_lab() - else ProgressBarsConsole(maxs) - ) - - -class ProgressBarsConsole: - def __init__(self, maxs): - self.__bars = [[0, max] for max in maxs] - self.__width = self.__get_width() - - self.__update_lines() - - sys.stdout.write("\n".join(self.__lines)) - sys.stdout.flush() - - def __get_width(self): - try: - columns = shutil.get_terminal_size().columns - return max(MINIMUM_TERMINAL_WIDTH, columns - 1) - except AttributeError: - # Python 2 - pass - - try: - columns = int(os.popen("stty size", "r").read().split()[1]) - return max(MINIMUM_TERMINAL_WIDTH, columns - 1) - except: - return MINIMUM_TERMINAL_WIDTH - - def __remove_displayed_lines(self): - if len(self.__bars) >= 1: - sys.stdout.write("\b" * len(self.__lines[-1])) - - if len(self.__bars) >= 2: - sys.stdout.write("\033M" * (len(self.__lines) - 1)) - - self.__lines = [] - - def __update_line(self, done, total): - percent = done / total - bar = (":" * int(percent * 40)).ljust(40, " ") - percent = round(percent * 100, 2) - format = " {percent:6.2f}% {bar:s} | {done:8d} / {total:8d} |" - ret = format.format(percent=percent, bar=bar, done=done, total=total) - return ret[: self.__width].ljust(self.__width, " ") - - def __update_lines(self): - self.__lines = [self.__update_line(value, max) for value, max in self.__bars] - - def update(self, values): - """Update a bar value. - Positional arguments: - values - The new values of each bar - """ - for index, value in enumerate(values): - self.__bars[index][0] = value - - self.__remove_displayed_lines() - self.__update_lines() - - sys.stdout.write("\n".join(self.__lines)) - sys.stdout.flush() - - -class ProgressBarsNotebookLab: - def __init__(self, maxs): - """Initialization. - Positional argument: - maxs - List containing the max value of each progress bar - """ - from IPython.display import display - from ipywidgets import HBox, VBox, IntProgress, Label - - self.__bars = [ - HBox( - [ - IntProgress(0, 0, max, description="{:.2f}%".format(0)), - Label("{} / {}".format(0, max)), - ] - ) - for max in maxs - ] - - display(VBox(self.__bars)) - - def update(self, values): - """Update a bar value. - Positional arguments: - values - The new values of each bar - """ - for index, value in enumerate(values): - bar, label = self.__bars[index].children - - bar.value = value - bar.description = "{:.2f}%".format(value / bar.max * 100) - - if value >= bar.max: - bar.bar_style = "success" - - label.value = "{} / {}".format(value, bar.max) - - def set_error(self, index): - """Set a bar on error""" - bar, _ = self.__bars[index].children - bar.bar_style = "danger" diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..b02bd98 --- /dev/null +++ b/setup.cfg @@ -0,0 +1,34 @@ +[metadata] +name = pandarallel +description = An easy to use library to speed up computation (by parallelizing on multi CPUs) with pandas. +long_description = file: README.md +long_description_content_type = text/markdown +url = https://github.com/nalepae/pandarallel +author = Manu NALEPA +author_email = nalepae@gmail.com +license = BSD +license_file = LICENSE +version = 1.5.6 + +classifiers = + License :: OSI Approved :: BSD License + Programming Language :: Python + Programming Language :: Python :: 3 + Programming Language :: Python :: 3 :: Only + Topic :: Scientific/Engineering + +[options] +packages = find: +install_requires = + dill >= 0.3.1 + pandas >= 1 + +[options.packages.find] +exclude = + tests* + +[options.extras_require] +dev = + numpy >= 1.22 + pytest >= 7 + pytest-cov >= 3 diff --git a/setup.py b/setup.py index d9c378d..6068493 100644 --- a/setup.py +++ b/setup.py @@ -1,17 +1,3 @@ -from setuptools import setup, find_packages +from setuptools import setup -install_requires = ["dill"] - -setup( - name="pandarallel", - version="1.5.5", - python_requires=">=3.5", - packages=find_packages(), - author="Manu NALEPA", - author_email="nalepae@gmail.com", - description="An easy to use library to speed up computation (by parallelizing on multi CPUs) with pandas.", - long_description="See https://github.com/nalepae/pandarallel/tree/v1.5.5 for complete user guide.", - url="https://github.com/nalepae/pandarallel", - install_requires=install_requires, - license="BSD", -) +setup() diff --git a/tests.sh b/tests.sh new file mode 100755 index 0000000..8013b4d --- /dev/null +++ b/tests.sh @@ -0,0 +1,43 @@ +set -e + +echo "Install Pandarallel" +echo "-------------------" +echo +pip install -e .[dev] --quiet + +echo "Pandas 1.0.0" +echo "------------" +echo +pip install pandas==1.0.0 --quiet +pytest + +echo "Pandas 1.1.0" +echo "------------" +echo +pip install pandas==1.1.0 --quiet +pytest + +echo "Pandas 1.2.0" +echo "------------" +echo +pip install pandas==1.2.0 --quiet +pytest + +# TODO: Fix tests for this version of Pandas +# echo "Pandas 1.3.0" +# echo "------------" +# echo +# pip install pandas==1.3.0 --quiet +# pytest + +echo "Pandas 1.4.0" +echo "------------" +echo +pip install pandas==1.4.0 --quiet +pytest + +echo "Pandas latest" +echo "------------" +echo +pip install pandas --quiet --upgrade +pytest \ No newline at end of file diff --git a/tests/test_inliner.py b/tests/test_inliner.py deleted file mode 100644 index ca6d6da..0000000 --- a/tests/test_inliner.py +++ /dev/null @@ -1,486 +0,0 @@ -import math -import sys - -import pytest - -from pandarallel.utils import inliner -from types import CodeType, FunctionType - - -def test_remove_duplicates(): - tuple_ = (3, 1, 2, 2, 1, 4) - expected_output = (3, 1, 2, 4) - - assert inliner.remove_duplicates(tuple_) == expected_output - - -def test_int2python_bytes(): - python_version = sys.version_info - if not (python_version.major == 3 and python_version.minor in (5, 6, 7, 8)): - with pytest.raises(SystemError): - inliner.int2python_bytes(4) - return - - with pytest.raises(OverflowError): - inliner.int2python_bytes(-1) - - if python_version.minor == 5: - with pytest.raises(OverflowError): - inliner.int2python_bytes(65536) - - assert inliner.int2python_bytes(5) == b"\x05\x00" - assert inliner.int2python_bytes(255) == b"\xFF\x00" - assert inliner.int2python_bytes(257) == b"\x01\x01" - - else: - with pytest.raises(OverflowError): - inliner.int2python_bytes(256) - - assert inliner.int2python_bytes(5) == b"\x05" - assert inliner.int2python_bytes(255) == b"\xFF" - - -def test_python_ints2int(): - python_version = sys.version_info - if not (python_version.major == 3 and python_version.minor in (5, 6, 7, 8)): - with pytest.raises(SystemError): - inliner.int2python_bytes(4) - return - - if python_version.minor == 5: - with pytest.raises(ValueError): - inliner.python_ints2int([1, 2, 3]) - - with pytest.raises(ValueError): - inliner.python_ints2int([-1, 2]) - - with pytest.raises(ValueError): - inliner.python_ints2int([1, 256]) - - assert inliner.python_ints2int([4, 1]) == 260 - - else: - with pytest.raises(ValueError): - inliner.python_ints2int([1, 2]) - - with pytest.raises(ValueError): - inliner.python_ints2int([-1]) - - with pytest.raises(ValueError): - inliner.python_ints2int([256]) - - assert inliner.python_ints2int([5]) == 5 - - -def test_get_instructions(): - def function(x, y): - print(x, y) - - python_version = sys.version_info - if not (python_version.major == 3 and python_version.minor in (5, 6, 7, 8)): - with pytest.raises(SystemError): - inliner.get_instructions(function) - return - - if python_version.minor == 5: - assert tuple(inliner.get_instructions(function)) == ( - b"t\x00\x00", - b"|\x00\x00", - b"|\x01\x00", - b"\x83\x02\x00", - b"\x01", - b"d\x00\x00", - b"S", - ) - else: - assert tuple(inliner.get_instructions(function)) == ( - b"t\x00", - b"|\x00", - b"|\x01", - b"\x83\x02", - b"\x01\x00", - b"d\x00", - b"S\x00", - ) - - -def test_has_no_return(): - def func_return_nothing(a, b): - if a > b: - print(a) - else: - print("Hello World!") - - def func_return_something(a, b): - print(a) - return b - - def func_several_returns(a, b): - if a > b: - print(a) - return - - python_version = sys.version_info - if not (python_version.major == 3 and python_version.minor in (5, 6, 7, 8)): - with pytest.raises(SystemError): - inliner.has_no_return(func_return_nothing) - return - - assert inliner.has_no_return(func_return_nothing) - assert not inliner.has_no_return(func_return_something) - assert not inliner.has_no_return(func_several_returns) - - -def test_has_duplicates(): - assert not inliner.has_duplicates([1, 3, 2, 4]) - assert inliner.has_duplicates([1, 3, 2, 3]) - - -def test_get_transitions(): - with pytest.raises(ValueError): - inliner.get_transitions((1, 2, 2), (1, 2, 3)) - - with pytest.raises(ValueError): - inliner.get_transitions((1, 2), (1, 2, 2)) - - olds = ("a", "c", "b", "d") - news_1 = ("f", "g", "c", "d", "b", "a") - news_2 = ("c", "d") - - assert inliner.get_transitions(olds, news_1) == {0: 5, 1: 2, 2: 4, 3: 3} - assert inliner.get_transitions(olds, news_2) == {1: 0, 3: 1} - - -def test_get_b_transitions(): - transitions = {1: 3, 2: 5, 3: 6} - byte_source = inliner.OpCode.LOAD_CONST - byte_dest = inliner.OpCode.STORE_FAST - - python_version = sys.version_info - - if not (python_version.major == 3 and python_version.minor in (5, 6, 7, 8)): - with pytest.raises(SystemError): - inliner.get_b_transitions(transitions, byte_source, byte_dest) - return - - bytes_transitions = inliner.get_b_transitions(transitions, byte_source, byte_dest) - - if python_version.minor == 5: - expected = { - (byte_source + b"\x01\x00"): (byte_dest + b"\x03\x00"), - (byte_source + b"\x02\x00"): (byte_dest + b"\x05\x00"), - (byte_source + b"\x03\x00"): (byte_dest + b"\x06\x00"), - } - else: - expected = { - (byte_source + b"\x01"): (byte_dest + b"\x03"), - (byte_source + b"\x02"): (byte_dest + b"\x05"), - (byte_source + b"\x03"): (byte_dest + b"\x06"), - } - - assert bytes_transitions == expected - - -def test_are_functions_equivalent(): - def a_func(x, y): - c = 3 - print(c + str(x + y)) - return x * math.sin(y) - - def another_func(x, y): - c = 4 - print(c + str(x + y)) - return x * math.sin(y) - - assert inliner.are_functions_equivalent(a_func, a_func) - assert not inliner.are_functions_equivalent(a_func, another_func) - - -def test_shift_instruction(): - transitions = {1: 3, 2: 5, 3: 6} - byte_source = inliner.OpCode.LOAD_CONST - byte_dest = inliner.OpCode.STORE_FAST - - python_version = sys.version_info - if not (python_version.major == 3 and python_version.minor in (5, 6, 7, 8)): - with pytest.raises(SystemError): - inliner.get_b_transitions(transitions, byte_source, byte_dest) - return - - if python_version.minor == 5: - assert inliner.shift_instruction(b"d\x05\x00", 3) == b"d\x08\x00" - else: - assert inliner.shift_instruction(b"d\x05", 3) == b"d\x08" - - -def test_shift_instructions(): - transitions = {1: 3, 2: 5, 3: 6} - byte_source = inliner.OpCode.LOAD_CONST - byte_dest = inliner.OpCode.STORE_FAST - - python_version = sys.version_info - - if not (python_version.major == 3 and python_version.minor in (5, 6, 7, 8)): - with pytest.raises(SystemError): - inliner.get_b_transitions(transitions, byte_source, byte_dest) - return - - if python_version.minor == 5: - instructions = ( - b"|\x00\x00", - b"|\x01\x00", - b"k\x04\x00", - # JUMP_POP_IF_FALSE - b"r\x0f\x00", - b"n\x00\x00", - b"|\x00\x00", - b"|\x01\x00", - b"k\x04\x00", - # JUMP_POP_IF_TRUE - b"s\x1b\x00", - b"d\x01\x00", - b"d\x01\x00", - b"\x04", - b"\x03", - b"k\x00\x00", - # JUMP_IF_FALSE_OR_POP - b"o2\x00", - b"d\x01\x00", - b"k\x02\x00" b"n\x02\x00", - b"\x02", - b"\x01", - b"\x01", - b"x\x14\x00", - b"t\x00\x00", - b"d\x02\x00", - b"\x83\x01\x00", - b"D", - b"]\x06\x00", - b"}\x02\x00", - # JUMP_ABSOLUTE - b"qB\x00", - b"W", - b"d\x00\x00", - b"S", - ) - - expected_shifted_instructions = ( - b"|\x00\x00", - b"|\x01\x00", - b"k\x04\x00", - # JUMP_POP_IF_FALSE - b"r\x12\x00", - b"n\x00\x00", - b"|\x00\x00", - b"|\x01\x00", - b"k\x04\x00", - # JUMP_POP_IF_TRUE - b"s\x1e\x00", - b"d\x01\x00", - b"d\x01\x00", - b"\x04", - b"\x03", - b"k\x00\x00", - # JUMP_IF_FALSE_OR_POP - b"o5\x00", - b"d\x01\x00", - b"k\x02\x00" b"n\x02\x00", - b"\x02", - b"\x01", - b"\x01", - b"x\x14\x00", - b"t\x00\x00", - b"d\x02\x00", - b"\x83\x01\x00", - b"D", - b"]\x06\x00", - b"}\x02\x00", - # JUMP_ABSOLUTE - b"qE\x00", - b"W", - b"d\x00\x00", - b"S", - ) - else: - instructions = ( - b"|\x00", - b"|\x01", - b"k\x04", - # JUMP_POP_IF_FALSE - b"r\x0f", - b"n\x00", - b"|\x00", - b"|\x01", - b"k\x04", - # JUMP_POP_IF_TRUE - b"s\x1b", - b"d\x01", - b"d\x01", - b"\x04", - b"\x03", - b"k\x00", - # JUMP_IF_FALSE_OR_POP - b"o2", - b"d\x01", - b"k\x02", - b"n\x02", - b"\x02", - b"\x01", - b"\x01", - b"x\x14", - b"t\x00", - b"d\x02", - b"\x83\x01", - b"D\x00", - b"]\x06", - b"}\x02", - # JUMP_ABSOLUTE - b"qB", - b"W\x00", - b"d\x00", - b"S\x00", - ) - - expected_shifted_instructions = ( - b"|\x00", - b"|\x01", - b"k\x04", - # JUMP_POP_IF_FALSE - b"r\x12", - b"n\x00", - b"|\x00", - b"|\x01", - b"k\x04", - # JUMP_POP_IF_TRUE - b"s\x1e", - b"d\x01", - b"d\x01", - b"\x04", - b"\x03", - b"k\x00", - # JUMP_IF_FALSE_OR_POP - b"o5", - b"d\x01", - b"k\x02", - b"n\x02", - b"\x02", - b"\x01", - b"\x01", - b"x\x14", - b"t\x00", - b"d\x02", - b"\x83\x01", - b"D\x00", - b"]\x06", - b"}\x02", - # JUMP_ABSOLUTE - b"qE", - b"W\x00", - b"d\x00", - b"S\x00", - ) - - assert inliner.shift_instructions(instructions, 3) == expected_shifted_instructions - - -def test_pin_arguments(): - transitions = {1: 3, 2: 5, 3: 6} - byte_source = inliner.OpCode.LOAD_CONST - byte_dest = inliner.OpCode.STORE_FAST - - python_version = sys.version_info - - if not (python_version.major == 3 and python_version.minor in (5, 6, 7, 8)): - with pytest.raises(SystemError): - inliner.get_b_transitions(transitions, byte_source, byte_dest) - return - - def func(a, b): - c = 4 - print(str(a) + str(c)) - - return b - - def expected_pinned_func(): - c = 4 - print(str(10) + str(c)) - - return 11 - - with pytest.raises(TypeError): - inliner.pin_arguments(func, dict(a=1)) - - with pytest.raises(TypeError): - inliner.pin_arguments(func, dict(a=1, b=2, c=3)) - - pinned_func = inliner.pin_arguments(func, dict(a=10, b=11)) - - assert inliner.are_functions_equivalent(pinned_func, expected_pinned_func) - - -def test_inline(): - transitions = {1: 3, 2: 5, 3: 6} - byte_source = inliner.OpCode.LOAD_CONST - byte_dest = inliner.OpCode.STORE_FAST - - python_version = sys.version_info - - if not (python_version.major == 3 and python_version.minor in (5, 6, 7, 8)): - with pytest.raises(SystemError): - inliner.get_b_transitions(transitions, byte_source, byte_dest) - return - - def pre_func(b, c): - a = "hello" - print(a + " " + b + " " + c) - - def func(x, y): - try: - if x > y: - z = x + 2 * math.sin(y) - return z ** 2 - elif x == y: - return 4 - else: - return 2 ** 3 - except ValueError: - foo = 0 - for i in range(4): - foo += i - return foo - except TypeError: - return 42 - else: - return 33 - finally: - print("finished") - - def target_inlined_func(x, y): - # Pinned pre_func - a = "hello" - print(a + " " + "pretty" + " " + "world!") - - # func - try: - if x > y: - z = x + 2 * math.sin(y) - return z ** 2 - elif x == y: - return 4 - else: - return 2 ** 3 - except ValueError: - foo = 0 - for i in range(4): - foo += i - return foo - except TypeError: - return 42 - else: - return 33 - finally: - print("finished") - - inlined_func = inliner.inline(pre_func, func, dict(b="pretty", c="world!")) - - assert inliner.are_functions_equivalent(inlined_func, target_inlined_func) diff --git a/tests/test_pandarallel.py b/tests/test_pandarallel.py index a97fc17..6cbdc8d 100644 --- a/tests/test_pandarallel.py +++ b/tests/test_pandarallel.py @@ -1,10 +1,8 @@ import math -from datetime import datetime import numpy as np import pandas as pd import pytest - from pandarallel import pandarallel @@ -127,15 +125,17 @@ def func(x): @pytest.fixture(params=("named", "anonymous")) def func_dataframe_groupby_expanding_apply(request): def func(x): - return (x.multiply(pd.Series(range(1, len(x)), dtype='float'))).sum() + return (x.multiply(pd.Series(range(1, len(x)), dtype="float"))).sum() return dict( named=func, - anonymous=lambda x: (x.multiply(pd.Series(range(1, len(x)), dtype='float'))).sum(), + anonymous=lambda x: ( + x.multiply(pd.Series(range(1, len(x)), dtype="float")) + ).sum(), )[request.param] -@pytest.fixture() +@pytest.fixture def pandarallel_init(progress_bar, use_memory_fs): pandarallel.initialize( progress_bar=progress_bar, use_memory_fs=use_memory_fs, nb_workers=2 @@ -173,6 +173,13 @@ def test_dataframe_apply_axis_1(pandarallel_init, func_dataframe_apply_axis_1, d assert res.equals(res_parallel) +def test_dataframe_apply_invalid_axis(pandarallel_init): + df = pd.DataFrame(dict(a=[1, 2, 3, 4])) + + with pytest.raises(ValueError): + df.parallel_apply(lambda x: x, axis="invalid") + + def test_dataframe_applymap(pandarallel_init, func_dataframe_applymap, df_size): df = pd.DataFrame( dict(a=np.random.randint(1, 8, df_size), b=np.random.rand(df_size)) @@ -265,24 +272,6 @@ def test_dataframe_groupby_rolling_apply( res.equals(res_parallel) -def test_datetime_rolling(pandarallel_init): - df = pd.DataFrame( - [ - {"datetime": datetime.now(), "A": 1, "B": 7}, - {"datetime": datetime.now(), "A": 2, "B": 4}, - ] - ) - - res = df.set_index("datetime").groupby("A").rolling("1D").B.apply(sum, raw=True) - res_parallel = ( - df.set_index("datetime") - .groupby("A") - .rolling("1D") - .B.parallel_apply(sum, raw=True) - ) - res.equals(res_parallel) - - def test_dataframe_groupby_expanding_apply( pandarallel_init, func_dataframe_groupby_expanding_apply, df_size ): @@ -300,4 +289,4 @@ def test_dataframe_groupby_expanding_apply( .b.expanding() .parallel_apply(func_dataframe_groupby_expanding_apply, raw=False) ) - res.equals(res_parallel) \ No newline at end of file + res.equals(res_parallel)