Skip to content

Commit 7d7767b

Browse files
authored
Write to file without including "filename" column (NVIDIA#317)
* keep_filename_column param Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * update pytests Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * run black Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> --------- Signed-off-by: Sarah Yurick <sarahyurick@gmail.com>
1 parent dc87963 commit 7d7767b

File tree

3 files changed

+62
-6
lines changed

3 files changed

+62
-6
lines changed

nemo_curator/datasets/doc_dataset.py

+7
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ def __len__(self):
3535
def persist(self):
3636
return DocumentDataset(self.df.persist())
3737

38+
def head(self, n=5):
39+
return self.df.head(n)
40+
3841
@classmethod
3942
def read_json(
4043
cls,
@@ -107,6 +110,7 @@ def to_json(
107110
self,
108111
output_file_dir,
109112
write_to_filename=False,
113+
keep_filename_column=False,
110114
):
111115
"""
112116
See nemo_curator.utils.distributed_utils.write_to_disk docstring for other parameters.
@@ -116,13 +120,15 @@ def to_json(
116120
df=self.df,
117121
output_file_dir=output_file_dir,
118122
write_to_filename=write_to_filename,
123+
keep_filename_column=keep_filename_column,
119124
output_type="jsonl",
120125
)
121126

122127
def to_parquet(
123128
self,
124129
output_file_dir,
125130
write_to_filename=False,
131+
keep_filename_column=False,
126132
):
127133
"""
128134
See nemo_curator.utils.distributed_utils.write_to_disk docstring for other parameters.
@@ -132,6 +138,7 @@ def to_parquet(
132138
df=self.df,
133139
output_file_dir=output_file_dir,
134140
write_to_filename=write_to_filename,
141+
keep_filename_column=keep_filename_column,
135142
output_type="parquet",
136143
)
137144

nemo_curator/utils/distributed_utils.py

+25-2
Original file line numberDiff line numberDiff line change
@@ -494,13 +494,19 @@ def process_all_batches(
494494
)
495495

496496

497-
def single_partition_write_with_filename(df, output_file_dir, output_type="jsonl"):
497+
def single_partition_write_with_filename(
498+
df,
499+
output_file_dir,
500+
keep_filename_column=False,
501+
output_type="jsonl",
502+
):
498503
"""
499504
This function processes a DataFrame and writes it to disk
500505
501506
Args:
502507
df: A DataFrame.
503508
output_file_dir: The output file path.
509+
keep_filename_column: Whether to keep or drop the "filename" column, if it exists.
504510
output_type="jsonl": The type of output file to write.
505511
Returns:
506512
If the DataFrame is non-empty, return a Series containing a single element, True.
@@ -526,12 +532,18 @@ def single_partition_write_with_filename(df, output_file_dir, output_type="jsonl
526532
filenames = df.filename.unique()
527533
filenames = list(filenames.values_host) if is_cudf_type(df) else list(filenames)
528534
num_files = len(filenames)
535+
529536
for filename in filenames:
530537
out_df = df[df.filename == filename] if num_files > 1 else df
538+
if not keep_filename_column:
539+
out_df = out_df.drop("filename", axis=1)
540+
531541
filename = Path(filename).stem
532542
output_file_path = os.path.join(output_file_dir, filename)
543+
533544
if output_type == "jsonl":
534545
output_file_path = output_file_path + ".jsonl"
546+
535547
if isinstance(df, pd.DataFrame):
536548
out_df.to_json(
537549
output_file_path,
@@ -550,16 +562,24 @@ def single_partition_write_with_filename(df, output_file_dir, output_type="jsonl
550562
lines=True,
551563
force_ascii=False,
552564
)
565+
553566
elif output_type == "parquet":
554567
output_file_path = output_file_path + ".parquet"
555568
out_df.to_parquet(output_file_path)
569+
556570
else:
557571
raise ValueError(f"Unknown output type: {output_type}")
558572

559573
return success_ser
560574

561575

562-
def write_to_disk(df, output_file_dir, write_to_filename=False, output_type="jsonl"):
576+
def write_to_disk(
577+
df,
578+
output_file_dir,
579+
write_to_filename=False,
580+
keep_filename_column=False,
581+
output_type="jsonl",
582+
):
563583
"""
564584
This function writes a Dask DataFrame to the specified file path.
565585
If write_to_filename is True, then it expects the
@@ -569,6 +589,7 @@ def write_to_disk(df, output_file_dir, write_to_filename=False, output_type="jso
569589
df: A Dask DataFrame.
570590
output_file_dir: The output file path.
571591
write_to_filename: Whether to write the filename using the "filename" column.
592+
keep_filename_column: Whether to keep or drop the "filename" column, if it exists.
572593
output_type="jsonl": The type of output file to write.
573594
574595
"""
@@ -589,11 +610,13 @@ def write_to_disk(df, output_file_dir, write_to_filename=False, output_type="jso
589610
output = df.map_partitions(
590611
single_partition_write_with_filename,
591612
output_file_dir,
613+
keep_filename_column=keep_filename_column,
592614
output_type=output_type,
593615
meta=output_meta,
594616
enforce_metadata=False,
595617
)
596618
output = output.compute()
619+
597620
else:
598621
if output_type == "jsonl":
599622
if is_cudf_type(df):

tests/test_io.py

+30-4
Original file line numberDiff line numberDiff line change
@@ -149,33 +149,59 @@ def test_meta_str(self, jsonl_dataset):
149149

150150

151151
class TestWriteWithFilename:
152+
@pytest.mark.parametrize("keep_filename_column", [True, False])
152153
@pytest.mark.parametrize("file_ext", ["jsonl", "parquet"])
153-
def test_multifile_single_partition(self, tmp_path, file_ext):
154+
def test_multifile_single_partition(
155+
self,
156+
tmp_path,
157+
keep_filename_column,
158+
file_ext,
159+
):
154160
df = pd.DataFrame({"a": [1, 2, 3], "filename": ["file0", "file1", "file1"]})
155161

156162
single_partition_write_with_filename(
157-
df=df, output_file_dir=tmp_path, output_type=file_ext
163+
df=df,
164+
output_file_dir=tmp_path,
165+
keep_filename_column=keep_filename_column,
166+
output_type=file_ext,
158167
)
159168
assert os.path.exists(tmp_path / f"file0.{file_ext}")
160169
assert os.path.exists(tmp_path / f"file1.{file_ext}")
161170

171+
if not keep_filename_column:
172+
df = df.drop("filename", axis=1)
173+
162174
df1 = read_single_partition(
163175
files=[tmp_path / f"file0.{file_ext}"], backend="pandas", filetype=file_ext
164176
)
165177
assert_eq(df1, df.iloc[0:1], check_index=False)
178+
166179
df2 = read_single_partition(
167180
files=[tmp_path / f"file1.{file_ext}"], backend="pandas", filetype=file_ext
168181
)
169182
assert_eq(df2, df.iloc[1:3], check_index=False)
170183

184+
@pytest.mark.parametrize("keep_filename_column", [True, False])
171185
@pytest.mark.parametrize("file_ext", ["jsonl", "parquet"])
172-
def test_singlefile_single_partition(self, tmp_path, file_ext):
186+
def test_singlefile_single_partition(
187+
self,
188+
tmp_path,
189+
keep_filename_column,
190+
file_ext,
191+
):
173192
df = pd.DataFrame({"a": [1, 2, 3], "filename": ["file2", "file2", "file2"]})
193+
174194
single_partition_write_with_filename(
175-
df=df, output_file_dir=tmp_path, output_type=file_ext
195+
df=df,
196+
output_file_dir=tmp_path,
197+
keep_filename_column=keep_filename_column,
198+
output_type=file_ext,
176199
)
177200
assert len(os.listdir(tmp_path)) == 1
178201
assert os.path.exists(tmp_path / f"file2.{file_ext}")
202+
203+
if not keep_filename_column:
204+
df = df.drop("filename", axis=1)
179205
got = read_single_partition(
180206
files=[tmp_path / f"file2.{file_ext}"], backend="pandas", filetype=file_ext
181207
)

0 commit comments

Comments
 (0)