@@ -165,23 +165,26 @@ std::size_t estimate_size_per_subchunk(std::size_t chunk_size)
165
165
}
166
166
167
167
/* *
168
- * @brief Return the upper bound on the batch size for the JSON reader.
168
+ * @brief Return the batch size for the JSON reader.
169
169
*
170
- * The datasources passed to the JSON reader are split into batches demarcated by byte range
171
- * offsets and read iteratively. The batch size is capped at INT_MAX bytes, which is the
172
- * default value returned by the function. This value can be overridden at runtime using the
173
- * environment variable LIBCUDF_JSON_BATCH_SIZE
170
+ * The datasources passed to the JSON reader are read iteratively in batches demarcated by byte
171
+ * range offsets. The tokenizer requires the JSON buffer read in each batch to be of size at most
172
+ * INT_MAX bytes.
173
+ * Since the byte range corresponding to a given batch can cause the last JSON line
174
+ * in the batch to be incomplete, the batch size returned by this function allows for an additional
175
+ * `max_subchunks_prealloced` subchunks to be allocated beyond the byte range offsets. Since the
176
+ * size of the subchunk depends on the size of the byte range, the batch size is variable and cannot
177
+ * be directly controlled by the user. As a workaround, the environment variable
178
+ * LIBCUDF_JSON_BATCH_SIZE can be used to set a fixed batch size at runtime.
174
179
*
175
180
* @return size in bytes
176
181
*/
177
- std::size_t get_batch_size_upper_bound ( )
182
+ std::size_t get_batch_size (std:: size_t chunk_size )
178
183
{
179
- auto const batch_size_str = std::getenv (" LIBCUDF_JSON_BATCH_SIZE" );
180
- int64_t const batch_size = batch_size_str != nullptr ? std::atol (batch_size_str) : 0L ;
181
- auto const batch_limit = static_cast <int64_t >(std::numeric_limits<int32_t >::max ());
182
- auto const batch_size_upper_bound = static_cast <std::size_t >(
183
- (batch_size > 0 && batch_size < batch_limit) ? batch_size : batch_limit);
184
- return batch_size_upper_bound;
184
+ auto const size_per_subchunk = estimate_size_per_subchunk (chunk_size);
185
+ auto const batch_limit = static_cast <std::size_t >(std::numeric_limits<int32_t >::max ()) -
186
+ (max_subchunks_prealloced * size_per_subchunk);
187
+ return std::min (batch_limit, getenv_or<std::size_t >(" LIBCUDF_JSON_BATCH_SIZE" , batch_limit));
185
188
}
186
189
187
190
/* *
@@ -295,6 +298,10 @@ datasource::owning_buffer<rmm::device_buffer> get_record_range_raw_input(
295
298
}
296
299
}
297
300
301
+ auto const batch_limit = static_cast <size_t >(std::numeric_limits<int32_t >::max ());
302
+ CUDF_EXPECTS (static_cast <size_t >(next_delim_pos - first_delim_pos - shift_for_nonzero_offset) <
303
+ batch_limit,
304
+ " The size of the JSON buffer returned by every batch cannot exceed INT_MAX bytes" );
298
305
return datasource::owning_buffer<rmm::device_buffer>(
299
306
std::move (buffer),
300
307
reinterpret_cast <uint8_t *>(buffer.data ()) + first_delim_pos + shift_for_nonzero_offset,
@@ -365,17 +372,11 @@ table_with_metadata read_json_impl(host_span<std::unique_ptr<datasource>> source
365
372
reader_opts.is_enabled_lines () || total_source_size < std::numeric_limits<int32_t >::max (),
366
373
" Parsing Regular JSON inputs of size greater than INT_MAX bytes is not supported" );
367
374
368
- std::size_t chunk_offset = reader_opts.get_byte_range_offset ();
369
- std::size_t chunk_size = reader_opts.get_byte_range_size ();
370
- chunk_size = !chunk_size ? total_source_size - chunk_offset
371
- : std::min (chunk_size, total_source_size - chunk_offset);
372
-
373
- std::size_t const size_per_subchunk = estimate_size_per_subchunk (chunk_size);
374
- std::size_t const batch_size_upper_bound = get_batch_size_upper_bound ();
375
- std::size_t const batch_size =
376
- batch_size_upper_bound < (max_subchunks_prealloced * size_per_subchunk)
377
- ? batch_size_upper_bound
378
- : batch_size_upper_bound - (max_subchunks_prealloced * size_per_subchunk);
375
+ std::size_t chunk_offset = reader_opts.get_byte_range_offset ();
376
+ std::size_t chunk_size = reader_opts.get_byte_range_size ();
377
+ chunk_size = !chunk_size ? total_source_size - chunk_offset
378
+ : std::min (chunk_size, total_source_size - chunk_offset);
379
+ std::size_t const batch_size = get_batch_size (chunk_size);
379
380
380
381
/*
381
382
* Identify the position (zero-indexed) of starting source file from which to begin
@@ -490,11 +491,19 @@ table_with_metadata read_json_impl(host_span<std::unique_ptr<datasource>> source
490
491
// Dispatch individual batches to read_batch and push the resulting table into
491
492
// partial_tables array. Note that the reader options need to be updated for each
492
493
// batch to adjust byte range offset and byte range size.
493
- for (std::size_t i = 1 ; i < batch_offsets.size () - 1 ; i++) {
494
- batched_reader_opts.set_byte_range_offset (batch_offsets[i]);
495
- batched_reader_opts.set_byte_range_size (batch_offsets[i + 1 ] - batch_offsets[i]);
496
- partial_tables.emplace_back (
497
- read_batch (sources, batched_reader_opts, stream, cudf::get_current_device_resource_ref ()));
494
+ for (std::size_t batch_offset_pos = 1 ; batch_offset_pos < batch_offsets.size () - 1 ;
495
+ batch_offset_pos++) {
496
+ batched_reader_opts.set_byte_range_offset (batch_offsets[batch_offset_pos]);
497
+ batched_reader_opts.set_byte_range_size (batch_offsets[batch_offset_pos + 1 ] -
498
+ batch_offsets[batch_offset_pos]);
499
+ auto partial_table =
500
+ read_batch (sources, batched_reader_opts, stream, cudf::get_current_device_resource_ref ());
501
+ if (partial_table.tbl ->num_columns () == 0 && partial_table.tbl ->num_rows () == 0 ) {
502
+ CUDF_EXPECTS (batch_offset_pos == batch_offsets.size () - 2 ,
503
+ " Only the partial table generated by the last batch can be empty" );
504
+ break ;
505
+ }
506
+ partial_tables.emplace_back (std::move (partial_table));
498
507
}
499
508
500
509
auto expects_schema_equality =
0 commit comments