Skip to content

Commit 942bfa9

Browse files
IsaacWarrenscott-routledge2
authored andcommitted
Fix missing threadpool changes in patch
1 parent c77898b commit 942bfa9

File tree

1 file changed

+51
-0
lines changed

1 file changed

+51
-0
lines changed

recipe/patches/0004-Bodo-Changes.patch

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,3 +233,54 @@ index 50310577f1..0b5162f25f 100644
233233
};
234234

235235
/// \brief ScannerBuilder is a factory class to construct a Scanner. It is used
236+
diff --git a/cpp/src/arrow/util/thread_pool.h b/cpp/src/arrow/util/thread_pool.h
237+
index cd32781aed..a8935d751a 100644
238+
--- a/cpp/src/arrow/util/thread_pool.h
239+
+++ b/cpp/src/arrow/util/thread_pool.h
240+
@@ -20,6 +20,7 @@
241+
#include <cstdint>
242+
#include <memory>
243+
#include <queue>
244+
+#include <thread>
245+
#include <type_traits>
246+
#include <unordered_set>
247+
#include <utility>
248+
@@ -591,6 +592,21 @@ typename Fut::SyncType RunSynchronously(FnOnce<Fut(Executor*)> get_future,
249+
}
250+
}
251+
252+
+template <typename T>
253+
+Iterator<T> IterateSynchronously(
254+
+ FnOnce<Result<std::function<Future<T>()>>(Executor*)> get_gen, bool use_threads,
255+
+ Executor* executor) {
256+
+ if (use_threads) {
257+
+ auto maybe_gen = std::move(get_gen)(executor);
258+
+ if (!maybe_gen.ok()) {
259+
+ return MakeErrorIterator<T>(maybe_gen.status());
260+
+ }
261+
+ return MakeGeneratorIterator(*maybe_gen);
262+
+ } else {
263+
+ return SerialExecutor::IterateGenerator(std::move(get_gen));
264+
+ }
265+
+}
266+
+
267+
/// \brief Potentially iterate an async generator serially (if use_threads is false)
268+
/// \see IterateGenerator
269+
///
270+
@@ -605,15 +621,7 @@ typename Fut::SyncType RunSynchronously(FnOnce<Fut(Executor*)> get_future,
271+
template <typename T>
272+
Iterator<T> IterateSynchronously(
273+
FnOnce<Result<std::function<Future<T>()>>(Executor*)> get_gen, bool use_threads) {
274+
- if (use_threads) {
275+
- auto maybe_gen = std::move(get_gen)(GetCpuThreadPool());
276+
- if (!maybe_gen.ok()) {
277+
- return MakeErrorIterator<T>(maybe_gen.status());
278+
- }
279+
- return MakeGeneratorIterator(*maybe_gen);
280+
- } else {
281+
- return SerialExecutor::IterateGenerator(std::move(get_gen));
282+
- }
283+
+ return IterateSynchronously(std::move(get_gen), use_threads, GetCpuThreadPool());
284+
}
285+
286+
} // namespace internal

0 commit comments

Comments
 (0)