Skip to content

Commit a4fbed7

Browse files
committed
Improve DataFrameGroupBy.parallel_apply
1 parent 5857664 commit a4fbed7

File tree

3 files changed

+29
-21
lines changed

3 files changed

+29
-21
lines changed

pandarallel/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
1-
__version__ = '0.1.3'
1+
__version__ = '0.1.4'
22

33
from ._pandarallel import pandarallel

pandarallel/_pandarallel.py

Lines changed: 27 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import itertools
12
import pandas as _pd
23
import numpy as np
34
import pyarrow.plasma as _plasma
@@ -135,35 +136,42 @@ def closure(df, func, *args, **kwargs):
135136

136137
class _DataFrameGroupBy:
137138
@staticmethod
138-
def worker(plasma_store_name, object_id, keys, func, *args, **kwargs):
139+
def worker(plasma_store_name, object_id, groups_id, chunk,
140+
func, *args, **kwargs):
139141
client = _plasma.connect(plasma_store_name)
140142
df = client.get(object_id)
141-
return client.put(df.groupby(keys).apply(func, *args, **kwargs))
143+
groups = client.get(groups_id)[chunk]
144+
result = [
145+
func(df.iloc[indexes], *args, **kwargs)
146+
for _, indexes in groups
147+
]
148+
149+
return client.put(result)
142150

143151
@staticmethod
144152
def apply(plasma_store_name, nb_workers, plasma_client):
145153
@_parallel(nb_workers, plasma_client)
146154
def closure(df_grouped, func, *args, **kwargs):
147-
groups = list(df_grouped.groups.values())
148-
keys = df_grouped.keys
149-
slices = _chunk(len(groups), nb_workers)
150-
futures = []
155+
groups = list(df_grouped.groups.items())
156+
chunks = _chunk(len(groups), nb_workers)
157+
object_id = plasma_client.put(df_grouped.obj)
158+
groups_id = plasma_client.put(groups)
151159

152160
with _ProcessPoolExecutor(max_workers=nb_workers) as executor:
153-
for slice_ in slices:
154-
indexes = [index.to_numpy() for index in groups[slice_]]
155-
sub_df = df_grouped.obj.iloc[np.concatenate(indexes)]
156-
object_id = plasma_client.put(sub_df)
157-
future = executor.submit(_DataFrameGroupBy.worker,
158-
plasma_store_name, object_id,
159-
keys, func, *args, **kwargs)
160-
futures.append(future)
161-
162-
result = _pd.concat([
163-
plasma_client.get(future.result())
164-
for future in futures
165-
], copy=False)
161+
futures = [
162+
executor.submit(_DataFrameGroupBy.worker,
163+
plasma_store_name, object_id,
164+
groups_id, chunk, func, *args, **kwargs)
165+
for chunk in chunks
166+
]
166167

168+
result = _pd.DataFrame(list(itertools.chain.from_iterable([
169+
plasma_client.get(future.result())
170+
for future in futures
171+
])),
172+
index=_pd.Series(list(df_grouped.grouper),
173+
name=df_grouped.keys)
174+
).squeeze()
167175
return result
168176
return closure
169177

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
setup(
1010
name='pandarallel',
11-
version='0.1.3',
11+
version='0.1.4',
1212
packages=find_packages(),
1313
author='Manu NALEPA',
1414
author_email='nalepae@gmail.com',

0 commit comments

Comments
 (0)