11diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc
2- index e6ac389..60561d2 100644
2+ index 62b8f57ba1..a108fb51eb 100644
33--- a/cpp/src/arrow/dataset/file_parquet.cc
44+++ b/cpp/src/arrow/dataset/file_parquet.cc
55@@ -26,16 +26,23 @@
6-
6+
77 #include "arrow/compute/cast.h"
88 #include "arrow/compute/exec.h"
99+ #include "arrow/dataset/dataset.h"
@@ -20,16 +20,16 @@ index e6ac389..60561d2 100644
2020 #include "arrow/util/checked_cast.h"
2121 #include "arrow/util/future.h"
2222 #include "arrow/util/iterator.h"
23- #include "arrow/util/logging .h"
23+ #include "arrow/util/logging_internal .h"
2424 #include "arrow/util/range.h"
2525+ #include "arrow/util/thread_pool.h"
2626 #include "arrow/util/tracing_internal.h"
2727 #include "parquet/arrow/reader.h"
2828 #include "parquet/arrow/schema.h"
29- @@ -558 ,6 +565 ,68 @@ Future<std::shared_ptr<parquet::arrow::FileReader>> ParquetFileFormat::GetReader
29+ @@ -564 ,6 +571 ,68 @@ Future<std::shared_ptr<parquet::arrow::FileReader>> ParquetFileFormat::GetReader
3030 });
3131 }
32-
32+
3333+ struct CastingGenerator {
3434+ CastingGenerator(RecordBatchGenerator source, std::shared_ptr<Schema> final_schema,
3535+ const std::unordered_set<std::string>& cols_to_skip,
@@ -95,7 +95,7 @@ index e6ac389..60561d2 100644
9595 struct SlicingGenerator {
9696 SlicingGenerator(RecordBatchGenerator source, int64_t batch_size)
9797 : state(std::make_shared<State>(source, batch_size)) {}
98- @@ -620 ,6 +689 ,9 @@ Result<RecordBatchGenerator> ParquetFileFormat::ScanBatchesAsync(
98+ @@ -626 ,6 +695 ,9 @@ Result<RecordBatchGenerator> ParquetFileFormat::ScanBatchesAsync(
9999 [this, options, parquet_fragment, pre_filtered,
100100 row_groups](const std::shared_ptr<parquet::arrow::FileReader>& reader) mutable
101101 -> Result<RecordBatchGenerator> {
@@ -105,7 +105,7 @@ index e6ac389..60561d2 100644
105105 // Ensure that parquet_fragment has FileMetaData
106106 RETURN_NOT_OK(parquet_fragment->EnsureCompleteMetadata(reader.get()));
107107 if (!pre_filtered) {
108- @@ -636 ,12 +708 ,24 @@ Result<RecordBatchGenerator> ParquetFileFormat::ScanBatchesAsync(
108+ @@ -642 ,12 +714 ,24 @@ Result<RecordBatchGenerator> ParquetFileFormat::ScanBatchesAsync(
109109 kParquetTypeName, options.get(), default_fragment_scan_options));
110110 int batch_readahead = options->batch_readahead;
111111 int64_t rows_to_readahead = batch_readahead * options->batch_size;
@@ -136,7 +136,7 @@ index e6ac389..60561d2 100644
136136 return sliced;
137137 }
138138diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc
139- index 8fa45ac..7cd0b73 100644
139+ index a8c8c6bde6..93d0d1934e 100644
140140--- a/cpp/src/arrow/dataset/scanner.cc
141141+++ b/cpp/src/arrow/dataset/scanner.cc
142142@@ -360,8 +360,10 @@ class OneShotFragment : public Fragment {
@@ -151,15 +151,15 @@ index 8fa45ac..7cd0b73 100644
151151+ return MakeTransferredGenerator(std::move(background_gen), cpu_executor);
152152 }
153153 std::string type_name() const override { return "one-shot"; }
154-
154+
155155@@ -387,7 +389,7 @@ Result<TaggedRecordBatchIterator> AsyncScanner::ScanBatches() {
156156 [this](::arrow::internal::Executor* executor) {
157157 return ScanBatchesAsync(executor);
158158 },
159159- scan_options_->use_threads);
160160+ scan_options_->use_threads, this->async_cpu_executor());
161161 }
162-
162+
163163 Result<EnumeratedRecordBatchIterator> AsyncScanner::ScanBatchesUnordered() {
164164@@ -395,7 +397,7 @@ Result<EnumeratedRecordBatchIterator> AsyncScanner::ScanBatchesUnordered() {
165165 [this](::arrow::internal::Executor* executor) {
@@ -168,51 +168,51 @@ index 8fa45ac..7cd0b73 100644
168168- scan_options_->use_threads);
169169+ scan_options_->use_threads, this->async_cpu_executor());
170170 }
171-
171+
172172 Result<std::shared_ptr<Table>> AsyncScanner::ToTable() {
173173@@ -405,7 +407,7 @@ Result<std::shared_ptr<Table>> AsyncScanner::ToTable() {
174174 }
175-
175+
176176 Result<EnumeratedRecordBatchGenerator> AsyncScanner::ScanBatchesUnorderedAsync() {
177177- return ScanBatchesUnorderedAsync(::arrow::internal::GetCpuThreadPool(),
178178+ return ScanBatchesUnorderedAsync(this->async_cpu_executor(),
179179 /*sequence_fragments=*/false);
180180 }
181-
181+
182182@@ -606,7 +608,7 @@ Result<std::shared_ptr<Table>> AsyncScanner::Head(int64_t num_rows) {
183183 }
184-
184+
185185 Result<TaggedRecordBatchGenerator> AsyncScanner::ScanBatchesAsync() {
186186- return ScanBatchesAsync(::arrow::internal::GetCpuThreadPool());
187187+ return ScanBatchesAsync(this->async_cpu_executor());
188188 }
189-
189+
190190 Result<TaggedRecordBatchGenerator> AsyncScanner::ScanBatchesAsync(
191191@@ -783,7 +785,7 @@ Future<int64_t> AsyncScanner::CountRowsAsync(Executor* executor) {
192192 }
193-
193+
194194 Future<int64_t> AsyncScanner::CountRowsAsync() {
195195- return CountRowsAsync(::arrow::internal::GetCpuThreadPool());
196196+ return CountRowsAsync(this->async_cpu_executor());
197197 }
198-
198+
199199 Result<int64_t> AsyncScanner::CountRows() {
200200diff --git a/cpp/src/arrow/dataset/scanner.h b/cpp/src/arrow/dataset/scanner.h
201- index 5031057..c867ece 100644
201+ index 50310577f1..0b5162f25f 100644
202202--- a/cpp/src/arrow/dataset/scanner.h
203203+++ b/cpp/src/arrow/dataset/scanner.h
204- @@ -35,6 +35,7 @@
205- #include "arrow/type_fwd.h"
204+ @@ -36,6 +36,7 @@
206205 #include "arrow/util/async_generator_fwd.h"
207206 #include "arrow/util/iterator.h"
208- + #include "arrow/util/thread_pool.h"
209207 #include "arrow/util/type_fwd.h"
210-
208+ + #include "arrow/util/thread_pool.h"
209+
211210 namespace arrow {
211+
212212@@ -104,6 +105,11 @@ struct ARROW_DS_EXPORT ScanOptions {
213213 /// Note: The IOContext executor will be ignored if use_threads is set to false
214214 io::IOContext io_context;
215-
215+
216216+ /// ExecContext for any CPU tasks
217217+ ///
218218+ /// Note: The ExecContext executor will be ignored if use_threads is set to false
@@ -223,64 +223,13 @@ index 5031057..c867ece 100644
223223 /// Note: If true, this will use threads from both the cpu_executor and the
224224@@ -459,6 +465,11 @@ class ARROW_DS_EXPORT Scanner {
225225 TaggedRecordBatchIterator scan);
226-
226+
227227 const std::shared_ptr<ScanOptions> scan_options_;
228228+
229229+ ::arrow::internal::Executor* async_cpu_executor() const {
230230+ return scan_options_->exec_context.executor() ? scan_options_->exec_context.executor()
231231+ : ::arrow::internal::GetCpuThreadPool();
232232+ }
233233 };
234-
234+
235235 /// \brief ScannerBuilder is a factory class to construct a Scanner. It is used
236- diff --git a/cpp/src/arrow/util/thread_pool.h b/cpp/src/arrow/util/thread_pool.h
237- index cd32781..a8935d7 100644
238- --- a/cpp/src/arrow/util/thread_pool.h
239- +++ b/cpp/src/arrow/util/thread_pool.h
240- @@ -20,6 +20,7 @@
241- #include <cstdint>
242- #include <memory>
243- #include <queue>
244- + #include <thread>
245- #include <type_traits>
246- #include <unordered_set>
247- #include <utility>
248- @@ -591,6 +592,21 @@ typename Fut::SyncType RunSynchronously(FnOnce<Fut(Executor*)> get_future,
249- }
250- }
251-
252- + template <typename T>
253- + Iterator<T> IterateSynchronously(
254- + FnOnce<Result<std::function<Future<T>()>>(Executor*)> get_gen, bool use_threads,
255- + Executor* executor) {
256- + if (use_threads) {
257- + auto maybe_gen = std::move(get_gen)(executor);
258- + if (!maybe_gen.ok()) {
259- + return MakeErrorIterator<T>(maybe_gen.status());
260- + }
261- + return MakeGeneratorIterator(*maybe_gen);
262- + } else {
263- + return SerialExecutor::IterateGenerator(std::move(get_gen));
264- + }
265- + }
266- +
267- /// \brief Potentially iterate an async generator serially (if use_threads is false)
268- /// \see IterateGenerator
269- ///
270- @@ -605,15 +621,7 @@ typename Fut::SyncType RunSynchronously(FnOnce<Fut(Executor*)> get_future,
271- template <typename T>
272- Iterator<T> IterateSynchronously(
273- FnOnce<Result<std::function<Future<T>()>>(Executor*)> get_gen, bool use_threads) {
274- - if (use_threads) {
275- - auto maybe_gen = std::move(get_gen)(GetCpuThreadPool());
276- - if (!maybe_gen.ok()) {
277- - return MakeErrorIterator<T>(maybe_gen.status());
278- - }
279- - return MakeGeneratorIterator(*maybe_gen);
280- - } else {
281- - return SerialExecutor::IterateGenerator(std::move(get_gen));
282- - }
283- + return IterateSynchronously(std::move(get_gen), use_threads, GetCpuThreadPool());
284- }
285-
286- } // namespace internal
0 commit comments