Skip to content

Commit ecd4f4b

Browse files
rjzamoraMaghoumiayushdg
authored
Align extract_partitioning_index logic with upstream shuffling (NVIDIA#60)
* update extract_partitioning_index with compat code Signed-off-by: rjzamora <rzamora217@gmail.com> * [Tutorials] Add a tutorial for PEFT data curation (NVIDIA#45) This PR adds a new tutorial to demonstrate data curation for PEFT use-cases. Signed-off-by: Mehran Maghoumi <Maghoumi@users.noreply.github.com> Signed-off-by: rjzamora <rzamora217@gmail.com> * move compat code to _compat file Signed-off-by: rjzamora <rzamora217@gmail.com> * Only import PII constants during Curator import (NVIDIA#61) * Move PII constants to a seperate file that does not import presidio/spacy and other GPU dependencies Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> * Add comment around import, move constant import to global scope Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> --------- Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> * add unit test Signed-off-by: rjzamora <rzamora217@gmail.com> * add pytest.mark.gpu Signed-off-by: rjzamora <rzamora217@gmail.com> * move extract_partitioning_index import for now Signed-off-by: rjzamora <rzamora217@gmail.com> * test both cudf and pandas Signed-off-by: rjzamora <rzamora217@gmail.com> * spelling Signed-off-by: rjzamora <rzamora217@gmail.com> --------- Signed-off-by: rjzamora <rzamora217@gmail.com> Signed-off-by: Mehran Maghoumi <Maghoumi@users.noreply.github.com> Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> Co-authored-by: Mehran Maghoumi <Maghoumi@users.noreply.github.com> Co-authored-by: Ayush Dattagupta <ayushdg95@gmail.com>
1 parent 38d8ce7 commit ecd4f4b

File tree

3 files changed

+98
-2
lines changed

3 files changed

+98
-2
lines changed

nemo_curator/_compat.py

+1
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,4 @@
2020
# TODO: remove when dask min version gets bumped
2121
DASK_SHUFFLE_METHOD_ARG = _dask_version > parseVersion("2024.1.0")
2222
DASK_P2P_ERROR = _dask_version < parseVersion("2023.10.0")
23+
DASK_SHUFFLE_CAST_DTYPE = _dask_version > parseVersion("2023.12.0")

nemo_curator/utils/fuzzy_dedup_utils/merge_utils.py

+23-2
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,14 @@
1616
from operator import getitem
1717

1818
import numpy as np
19+
import pandas as pd
1920
from dask.base import tokenize
2021
from dask.dataframe.core import new_dd_object
2122
from dask.dataframe.shuffle import partitioning_index
2223
from dask.highlevelgraph import HighLevelGraph
2324
from dask.utils import M
2425

25-
from nemo_curator.utils.fuzzy_dedup_utils.shuffle_utils import rearange_by_column_direct
26+
from nemo_curator._compat import DASK_SHUFFLE_CAST_DTYPE
2627

2728

2829
def _split_part(part, nsplits):
@@ -129,6 +130,21 @@ def extract_partitioning_index(
129130
# a partition-wise merge between `left_df` and `right_df`.
130131
# We call this `global_partitioning_index`:
131132

133+
if DASK_SHUFFLE_CAST_DTYPE:
134+
# Need to use the same type-casting logic as `shuffle`
135+
dtypes = {}
136+
if not isinstance(merge_on, list):
137+
merge_on = [merge_on]
138+
for col, dtype in left_df[merge_on].dtypes.items():
139+
if pd.api.types.is_numeric_dtype(dtype):
140+
dtypes[col] = np.float64
141+
if not dtypes:
142+
dtypes = None
143+
cast_dtype = {"cast_dtype": dtypes}
144+
else:
145+
# `cast_dtype` argument doesn't exist yet
146+
cast_dtype = {}
147+
132148
num_bucket_files = bk_mapping.file_id.max() + 1
133149
global_partitioning_index = left_df[merge_on].map_partitions(
134150
partitioning_index,
@@ -137,6 +153,7 @@ def extract_partitioning_index(
137153
enforce_metadata=False,
138154
transform_divisions=False,
139155
align_dataframes=False,
156+
**cast_dtype,
140157
)
141158

142159
if total_bucket_partitions < num_bucket_files:
@@ -157,7 +174,7 @@ def extract_partitioning_index(
157174
# want to send the rows of `left_df` to the partition
158175
# indices encoded in `global_partitioning_index`. Instead, we
159176
# need to take a modulus with `parts_per_bucket_batch` to
160-
# define a `"_partitoins"` column.
177+
# define a `"_partitions"` column.
161178
left_df["_partitions"] = global_partitioning_index % parts_per_bucket_batch
162179

163180
return left_df, global_partitioning_index
@@ -195,6 +212,10 @@ def merge_left_to_shuffled_right(
195212
subset_bucket_df,
196213
merge_on,
197214
):
215+
from nemo_curator.utils.fuzzy_dedup_utils.shuffle_utils import (
216+
rearange_by_column_direct,
217+
)
218+
198219
# We are merging an unshuffled batch of "left" partitions
199220
# with a shuffled batch of "right" partitions. To minimize
200221
# data movement, we can manaully rerrange the "left" batch

tests/test_fuzzy_dedup.py

+74
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,17 @@
1616
from itertools import combinations
1717
from typing import Iterable
1818

19+
import dask.dataframe as dd
1920
import numpy as np
2021
import pytest
2122
import yaml
23+
from dask import config
2224
from dask.dataframe.utils import assert_eq
2325
from distributed import Client
2426

2527
from nemo_curator import LSH, FuzzyDuplicates, FuzzyDuplicatesConfig, MinHash
2628
from nemo_curator.datasets import DocumentDataset
29+
from nemo_curator.utils.fuzzy_dedup_utils.merge_utils import extract_partitioning_index
2730
from nemo_curator.utils.import_utils import gpu_only_import, gpu_only_import_from
2831

2932
cudf = gpu_only_import("cudf")
@@ -367,3 +370,74 @@ def test_from_yaml(self, tmpdir):
367370
config = FuzzyDuplicatesConfig.from_yaml(tmpdir / "config.yaml")
368371
for param in yaml_params:
369372
assert getattr(config, param) == yaml_params[param]
373+
374+
375+
@pytest.mark.parametrize(
376+
"backend",
377+
[
378+
"pandas",
379+
pytest.param(
380+
"cudf",
381+
marks=pytest.mark.gpu,
382+
),
383+
],
384+
)
385+
def test_extract_partitioning_index(backend):
386+
387+
def add_partition_info(df, partition_info=None):
388+
if partition_info is None:
389+
df["file_id"] = -1
390+
else:
391+
df["file_id"] = partition_info["number"]
392+
return df
393+
394+
with config.set({"dataframe.backend": backend}):
395+
396+
# Create a random `unshuffled` DataFrame with a
397+
# "part_id" column to be used as the shuffle index
398+
npartitions_left = 7
399+
unshuffled = dd.from_dict(
400+
{"part_id": np.random.randint(25, size=1000, dtype="int32")},
401+
npartitions=npartitions_left,
402+
)
403+
404+
# Create a `bk_mapping` DataFrame that defines
405+
# the "correct" mapping beween "part_id" and
406+
# the destination partition ("file_id")
407+
npartitions_right = 5
408+
bk_mapping = (
409+
dd.from_dict(
410+
{"part_id": np.arange(25, dtype="int32")},
411+
npartitions=npartitions_right,
412+
)
413+
.shuffle("part_id")
414+
.map_partitions(add_partition_info)
415+
.compute()
416+
)
417+
418+
# Use `extract_partitioning_index` to calculate
419+
# the partitioning index and assign it as a new
420+
# "_partitions" column
421+
result, _ = extract_partitioning_index(
422+
unshuffled,
423+
"part_id",
424+
bk_mapping,
425+
npartitions_right,
426+
npartitions_right,
427+
)
428+
429+
# Rename the "_partitions" column, shuffle by "part_id",
430+
# and then assign a "file_id" column to reflect the final
431+
# partition of each row
432+
check = (
433+
result.rename(columns={"_partitions": "expected_file_id"})
434+
.shuffle(
435+
"part_id",
436+
npartitions=npartitions_right,
437+
)
438+
.map_partitions(add_partition_info)
439+
.compute()
440+
)
441+
442+
# Check that the real and expected partitions match
443+
assert (check["file_id"] == check["expected_file_id"]).all()

0 commit comments

Comments
 (0)