11diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc
2- index 1f8b6cc488..6ac6642fdb 100644
2+ index e6ac389..60561d2 100644
33--- a/cpp/src/arrow/dataset/file_parquet.cc
44+++ b/cpp/src/arrow/dataset/file_parquet.cc
55@@ -26,16 +26,23 @@
6-
6+
77 #include "arrow/compute/cast.h"
88 #include "arrow/compute/exec.h"
99+ #include "arrow/dataset/dataset.h"
@@ -26,10 +26,10 @@ index 1f8b6cc488..6ac6642fdb 100644
2626 #include "arrow/util/tracing_internal.h"
2727 #include "parquet/arrow/reader.h"
2828 #include "parquet/arrow/schema.h"
29- @@ -555 ,6 +562 ,68 @@ Future<std::shared_ptr<parquet::arrow::FileReader>> ParquetFileFormat::GetReader
29+ @@ -558 ,6 +565 ,68 @@ Future<std::shared_ptr<parquet::arrow::FileReader>> ParquetFileFormat::GetReader
3030 });
3131 }
32-
32+
3333+ struct CastingGenerator {
3434+ CastingGenerator(RecordBatchGenerator source, std::shared_ptr<Schema> final_schema,
3535+ const std::unordered_set<std::string>& cols_to_skip,
@@ -95,7 +95,7 @@ index 1f8b6cc488..6ac6642fdb 100644
9595 struct SlicingGenerator {
9696 SlicingGenerator(RecordBatchGenerator source, int64_t batch_size)
9797 : state(std::make_shared<State>(source, batch_size)) {}
98- @@ -617 ,6 +686 ,9 @@ Result<RecordBatchGenerator> ParquetFileFormat::ScanBatchesAsync(
98+ @@ -620 ,6 +689 ,9 @@ Result<RecordBatchGenerator> ParquetFileFormat::ScanBatchesAsync(
9999 [this, options, parquet_fragment, pre_filtered,
100100 row_groups](const std::shared_ptr<parquet::arrow::FileReader>& reader) mutable
101101 -> Result<RecordBatchGenerator> {
@@ -105,7 +105,7 @@ index 1f8b6cc488..6ac6642fdb 100644
105105 // Ensure that parquet_fragment has FileMetaData
106106 RETURN_NOT_OK(parquet_fragment->EnsureCompleteMetadata(reader.get()));
107107 if (!pre_filtered) {
108- @@ -633 ,12 +705 ,24 @@ Result<RecordBatchGenerator> ParquetFileFormat::ScanBatchesAsync(
108+ @@ -636 ,12 +708 ,24 @@ Result<RecordBatchGenerator> ParquetFileFormat::ScanBatchesAsync(
109109 kParquetTypeName, options.get(), default_fragment_scan_options));
110110 int batch_readahead = options->batch_readahead;
111111 int64_t rows_to_readahead = batch_readahead * options->batch_size;
@@ -136,10 +136,10 @@ index 1f8b6cc488..6ac6642fdb 100644
136136 return sliced;
137137 }
138138diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc
139- index a856a792a2..5c10dfc6ac 100644
139+ index 8fa45ac..7cd0b73 100644
140140--- a/cpp/src/arrow/dataset/scanner.cc
141141+++ b/cpp/src/arrow/dataset/scanner.cc
142- @@ -355 ,8 +355 ,10 @@ class OneShotFragment : public Fragment {
142+ @@ -360 ,8 +360 ,10 @@ class OneShotFragment : public Fragment {
143143 ARROW_ASSIGN_OR_RAISE(
144144 auto background_gen,
145145 MakeBackgroundGenerator(std::move(batch_it_), options->io_context.executor()));
@@ -151,60 +151,68 @@ index a856a792a2..5c10dfc6ac 100644
151151+ return MakeTransferredGenerator(std::move(background_gen), cpu_executor);
152152 }
153153 std::string type_name() const override { return "one-shot"; }
154-
155- @@ -382 ,7 +384 ,7 @@ Result<TaggedRecordBatchIterator> AsyncScanner::ScanBatches() {
154+
155+ @@ -387 ,7 +389 ,7 @@ Result<TaggedRecordBatchIterator> AsyncScanner::ScanBatches() {
156156 [this](::arrow::internal::Executor* executor) {
157157 return ScanBatchesAsync(executor);
158158 },
159159- scan_options_->use_threads);
160160+ scan_options_->use_threads, this->async_cpu_executor());
161161 }
162-
162+
163163 Result<EnumeratedRecordBatchIterator> AsyncScanner::ScanBatchesUnordered() {
164- @@ -390 ,7 +392 ,7 @@ Result<EnumeratedRecordBatchIterator> AsyncScanner::ScanBatchesUnordered() {
164+ @@ -395 ,7 +397 ,7 @@ Result<EnumeratedRecordBatchIterator> AsyncScanner::ScanBatchesUnordered() {
165165 [this](::arrow::internal::Executor* executor) {
166166 return ScanBatchesUnorderedAsync(executor);
167167 },
168168- scan_options_->use_threads);
169169+ scan_options_->use_threads, this->async_cpu_executor());
170170 }
171-
171+
172172 Result<std::shared_ptr<Table>> AsyncScanner::ToTable() {
173- @@ -400 ,7 +402 ,7 @@ Result<std::shared_ptr<Table>> AsyncScanner::ToTable() {
173+ @@ -405 ,7 +407 ,7 @@ Result<std::shared_ptr<Table>> AsyncScanner::ToTable() {
174174 }
175-
175+
176176 Result<EnumeratedRecordBatchGenerator> AsyncScanner::ScanBatchesUnorderedAsync() {
177177- return ScanBatchesUnorderedAsync(::arrow::internal::GetCpuThreadPool(),
178178+ return ScanBatchesUnorderedAsync(this->async_cpu_executor(),
179179 /*sequence_fragments=*/false);
180180 }
181-
182- @@ -601 ,7 +603 ,7 @@ Result<std::shared_ptr<Table>> AsyncScanner::Head(int64_t num_rows) {
181+
182+ @@ -606 ,7 +608 ,7 @@ Result<std::shared_ptr<Table>> AsyncScanner::Head(int64_t num_rows) {
183183 }
184-
184+
185185 Result<TaggedRecordBatchGenerator> AsyncScanner::ScanBatchesAsync() {
186186- return ScanBatchesAsync(::arrow::internal::GetCpuThreadPool());
187187+ return ScanBatchesAsync(this->async_cpu_executor());
188188 }
189-
189+
190190 Result<TaggedRecordBatchGenerator> AsyncScanner::ScanBatchesAsync(
191- @@ -778 ,7 +780 ,7 @@ Future<int64_t> AsyncScanner::CountRowsAsync(Executor* executor) {
191+ @@ -783 ,7 +785 ,7 @@ Future<int64_t> AsyncScanner::CountRowsAsync(Executor* executor) {
192192 }
193-
193+
194194 Future<int64_t> AsyncScanner::CountRowsAsync() {
195195- return CountRowsAsync(::arrow::internal::GetCpuThreadPool());
196196+ return CountRowsAsync(this->async_cpu_executor());
197197 }
198-
198+
199199 Result<int64_t> AsyncScanner::CountRows() {
200200diff --git a/cpp/src/arrow/dataset/scanner.h b/cpp/src/arrow/dataset/scanner.h
201- index d2de267897..1c605c1bf2 100644
201+ index 5031057..c867ece 100644
202202--- a/cpp/src/arrow/dataset/scanner.h
203203+++ b/cpp/src/arrow/dataset/scanner.h
204- @@ -107,6 +107,11 @@ struct ARROW_DS_EXPORT ScanOptions {
204+ @@ -35,6 +35,7 @@
205+ #include "arrow/type_fwd.h"
206+ #include "arrow/util/async_generator_fwd.h"
207+ #include "arrow/util/iterator.h"
208+ + #include "arrow/util/thread_pool.h"
209+ #include "arrow/util/type_fwd.h"
210+
211+ namespace arrow {
212+ @@ -104,6 +105,11 @@ struct ARROW_DS_EXPORT ScanOptions {
205213 /// Note: The IOContext executor will be ignored if use_threads is set to false
206214 io::IOContext io_context;
207-
215+
208216+ /// ExecContext for any CPU tasks
209217+ ///
210218+ /// Note: The ExecContext executor will be ignored if use_threads is set to false
@@ -213,20 +221,20 @@ index d2de267897..1c605c1bf2 100644
213221 /// If true the scanner will scan in parallel
214222 ///
215223 /// Note: If true, this will use threads from both the cpu_executor and the
216- @@ -442 ,6 +447 ,11 @@ class ARROW_DS_EXPORT Scanner {
224+ @@ -459 ,6 +465 ,11 @@ class ARROW_DS_EXPORT Scanner {
217225 TaggedRecordBatchIterator scan);
218-
226+
219227 const std::shared_ptr<ScanOptions> scan_options_;
220228+
221229+ ::arrow::internal::Executor* async_cpu_executor() const {
222230+ return scan_options_->exec_context.executor() ? scan_options_->exec_context.executor()
223231+ : ::arrow::internal::GetCpuThreadPool();
224232+ }
225233 };
226-
234+
227235 /// \brief ScannerBuilder is a factory class to construct a Scanner. It is used
228236diff --git a/cpp/src/arrow/util/thread_pool.h b/cpp/src/arrow/util/thread_pool.h
229- index 44b1e227b0..218edc60ca 100644
237+ index cd32781..a8935d7 100644
230238--- a/cpp/src/arrow/util/thread_pool.h
231239+++ b/cpp/src/arrow/util/thread_pool.h
232240@@ -20,6 +20,7 @@
@@ -240,7 +248,7 @@ index 44b1e227b0..218edc60ca 100644
240248@@ -591,6 +592,21 @@ typename Fut::SyncType RunSynchronously(FnOnce<Fut(Executor*)> get_future,
241249 }
242250 }
243-
251+
244252+ template <typename T>
245253+ Iterator<T> IterateSynchronously(
246254+ FnOnce<Result<std::function<Future<T>()>>(Executor*)> get_gen, bool use_threads,
@@ -274,5 +282,5 @@ index 44b1e227b0..218edc60ca 100644
274282- }
275283+ return IterateSynchronously(std::move(get_gen), use_threads, GetCpuThreadPool());
276284 }
277-
285+
278286 } // namespace internal
0 commit comments