From ce923f6068eb6c24c05d38370b5a0670439bde08 Mon Sep 17 00:00:00 2001 From: Keijo Mottonen Date: Tue, 1 Mar 2022 19:58:03 +0200 Subject: [PATCH] Threadpool added --- README.md | 74 +++++++++++-- examples/example-2.cc | 160 +++++++++++++++++++++++++++ include/Lazy/Lazy.h | 244 ++++++++++++++++++++++++++++++++++++++---- 3 files changed, 453 insertions(+), 25 deletions(-) diff --git a/README.md b/README.md index 4b3ce8f..9498344 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,7 @@ You can also use rudimentary stop tokens for letting a parallel task tell the ot ## Three basic ways to use the library -#### 1. Run any number of functions in parallel and get the results as a tuple +### 1. Run any number of functions in parallel and get the results as a tuple In this example, one of the three parallel functions may throw. @@ -43,11 +43,16 @@ The output will be `i = 100, d = 3.16228, s = "10"`. There are more examples on how to use `Lazy::runParallel` in [example-1.cc](https://github.com/tirimatangi/Lazy/blob/main/examples/example-1.cc). For an example on how to use stop tokens to communicate between the functions, see example 1.2 in [example-1.cc](https://github.com/tirimatangi/Lazy/blob/main/examples/example-1.cc). -#### 2. Vector in, vector out +### 2. Vector in, vector out using either disposable threads or a threadpool + +#### 2.1. Disposable threads You can run a function or a set of continuations in parallel for each element of the input vector and get the results as a vector. -By default, the maximum number of parallel threads in the thread pool is the number of cores in your machine. -You can set the thread pool size manually with template argument, for example `auto vecOutput = Lazy::runForAll<128>(vecInput, ...);` +By default, the maximum number of parallel threads is the number of cores in your machine. +You can set the number of threads manually with template argument, for example `auto vecOutput = Lazy::runForAll<128>(vecInput, ...);` +The threads will be disposable in the sense that the threads will die when the function returns. +Examples 2.1 to 2.6 in [example-2.cc](https://github.com/tirimatangi/Lazy/blob/main/examples/example-2.cc) show how to use +disposable threads with function `Lazy::runForAll`. Here is an example on running a sequence of 3 continuations where the inputs are in std::vector and the output is an std::vector. @@ -178,8 +183,65 @@ For other methods provided by `Lazy::StopToken`, see `class StopToken` in the be For many more examples on how to use `Lazy::runForAll`, see [example-2.cc](https://github.com/tirimatangi/Lazy/blob/main/examples/example-2.cc). +#### 2.2. Threadpool + +If you repeatedly call the same function with different input and output vectors, it may be faster to +launch a permanent threadpool instead of using disposable threads as described above in section 2.1. +However, disposable threads are lock-free whereas the threadpool must use a mutex and condition +variables to manage the states of the threads. Hence, you should measure which method gives +a better performance in your use case. +Example 2-9 in [example-2.cc](https://github.com/tirimatangi/Lazy/blob/main/examples/example-2.cc) +is an example of such a measurement. + +Example 2-7 gives an example of a use case where the return type of the function is a vector +so a call to the threadpool maps a vector of integers into a vector of vectors of integers. + +Example 2-8 demonstrates how to run a void function in the threadpool and how to deal with +exceptions and stop tokes for cancelling the run prematurely. + +Here is a brief example on how to use the threadpool. In this case, +the same input vector is used for each `multiplier` value. + +```c++ + { + int multiplier = 1; + auto func = [&multiplier](double x) { + return multiplier * std::exp(x); + }; + + std::vector vecIn(256), vecOut(256); + // Fill in the input vector + for (int i = 0; i < 256; ++i) + vecIn[i] = std::cos((3.141592654 / 256) * i); + + // Start the threadpool + auto thrPool = Lazy::ThreadPool(func); + + // Fill in the output vector several times using different parameters + while (multiplier <= 1000000) { + // Set vecOut[i] = func(vecIn[i]) for each i in parallel. + thrPool.run(vecIn.data(), vecOut.data(), vecIn.size()); + // Do something with vecOut... + std::cout << "multiplier = " << multiplier + << ", vecOut[first] = " << vecOut[0] + << ", vecOut[last] = " << vecOut[255] << "\n"; + multiplier *= 10; + } + } +``` + +The output will be +``` +multiplier = 1, vecOut[first] = 2.71828, vecOut[last] = 0.367907 +multiplier = 10, vecOut[first] = 27.1828, vecOut[last] = 3.67907 +multiplier = 100, vecOut[first] = 271.828, vecOut[last] = 36.7907 +multiplier = 1000, vecOut[first] = 2718.28, vecOut[last] = 367.907 +multiplier = 10000, vecOut[first] = 27182.8, vecOut[last] = 3679.07 +multiplier = 100000, vecOut[first] = 271828, vecOut[last] = 36790.7 +multiplier = 1000000, vecOut[first] = 2.71828e+06, vecOut[last] = 367907 +``` -#### 3. Use futures and continuations in manual mode +### 3. Use futures and continuations in manual mode You can define the futures and the continuations they will run manually. The first function in the chain of continuations can have any number of input parameters which are passed to function `Lazy::future(...)`. @@ -262,5 +324,5 @@ The easiest way to compile all examples is to do If you don't want to use cmake, the examples can be compiled manually one by one. For instance,
`g++ examples/example-1.cc -std=c++17 -I include/ -O3 -pthread -o example-1` -The examples have been tested with g++ 10.3.0 and clang++ 12.0.0 but any compiler which complies with c++17 standard should do. +The examples have been tested with g++ 11.2.0 and clang++ 13.0.0 but any compiler which complies with c++17 standard should do. The compiler can be switched from gcc to clang by building the examples with `cmake examples -DCMAKE_CXX_COMPILER=clang++`. diff --git a/examples/example-2.cc b/examples/example-2.cc index d84f945..d2920cc 100644 --- a/examples/example-2.cc +++ b/examples/example-2.cc @@ -10,6 +10,8 @@ #include +using namespace std::chrono_literals; + template void atomic_print(Args&&... args) { @@ -280,4 +282,162 @@ int main() assert(vecFrac2[i] == vecFractionOut[i]); } } + + // Example 2-7: Threadpool example: Use a function whose return type is a vector so + // the output is a vector of vectors. + std::cout << "\n*** Example 2-7 *** (Threadpool) Parallel calls to a function which\n"; + std::cout << " inputs a number and returns a vector for each call.\n"; + std::cout << " The vector contains factors of the input\n"; + std::cout << " (i.e. numbers which divide the input).\n"; + { + using std::vector; + // First a helper for printing out input vectors and output vectors of vectors. + auto printFactors = [](const auto& in, const auto& out) + { + for (int i = 0; i < in.size(); ++i) { + std::cout << in[i] << " : { "; + for (int j = 0; j < out[i].size(); ++j) + std::cout << out[i][j] << ' '; + std::cout << "}\n"; + } + }; + + // findFactors returns a list of factors of x. + // E.g. factors(60) = { 2 3 4 5 6 10 12 15 20 30 } + auto findFactors = [](int x) -> vector { + vector vecSmall, vecLarge; + x = (x >= 0) ? x : -x; + int y = 2, yy = y*y; + while (yy < x) { + if (x % y == 0) { + vecSmall.push_back(y); + vecLarge.push_back(x / y); + } + yy += (2*y + 1); // (y+1)^2 = y*y + 2*y + 1 + ++y; + } + if (yy == x) + vecSmall.push_back(y); + for (int i = vecLarge.size()-1; i >= 0; --i) + vecSmall.push_back(vecLarge[i]); + return vecSmall; + }; + + // Start a threadpool for finding factors + auto thrPool = Lazy::ThreadPool(findFactors); + + std::cout << "Doing even hundreds...\n"; + vector vecEvenHundreds {200, 400, 600, 800, 813}; // Vector of inputs + vector> vecFactorsEven; // Vector of output vectors. + vecFactorsEven.resize(vecEvenHundreds.size()); + thrPool.run(vecEvenHundreds.data(), vecFactorsEven.data(), vecEvenHundreds.size()); + printFactors(vecEvenHundreds, vecFactorsEven); + + std::cout << "Doing odd hundreds...\n"; + vector vecOddHundreds {100, 300, 500, 700, 900, 911}; // Vector of inputs + vector> vecFactorsOdd; // Vector of output vectors. + vecFactorsOdd.resize(vecOddHundreds.size()); + thrPool.run(vecOddHundreds.data(), vecFactorsOdd.data(), vecOddHundreds.size()); + printFactors(vecOddHundreds, vecFactorsOdd); + } + + // Example 2-8: (Threadpool) Use a function whose return type is void so there is no output vector. + // Also demonstrate stop tokens to cancel other jobs when one has failed. + // Also demonstrate dealing with exceptions arising from a job. + std::cout << "\n*** Example 2-8 *** (Threadpool) Demonstrate a void function with stop tokens and exceptions.\n"; + { + const int badNumber = 43; + auto func = [badNumber](Lazy::StopToken* token, int i) { + if (*token) { + atomic_print("Token is set so bailing out, i = ", i); + return; + } + + if (i == badNumber) { + atomic_print("Bad number! Setting token and throwing. i = ", i, ", thread id = ", std::this_thread::get_id()); + token->setValue(1); // Set token to let others know to give up + throw std::runtime_error("[[Bad Number]]"); + } + + atomic_print("All is good. Doing some work for 100ms. i = ", i); + std::this_thread::sleep_for(100ms); + }; + + // Start the thread pool and put the threads to idle. + auto thrPool = Lazy::ThreadPool(func); + + try { + // Allocate input vectors + std::vector vecIn {0,1,2,3, badNumber, 5,6,7,8,9}; + + // Run the function with input. There is no output data because the return type is void. + thrPool.run(vecIn.data(), nullptr, vecIn.size()); + } + catch (const std::exception& e) { + std::cout << "Exception '" << e.what() << "' caught successfully.\n"; + } + } + + // Example 2-9: Start a thread pool and put the threads in idle to wait for work. + // Compare performance with a varying number of parallel threads. + // Also compare the performance of the thread pool to + // runForAll() which uses disposable threads (i.e. the thread dies + // when the function returns.) + std::cout << "\n*** Example 2-9 *** (Threadpool) Compare threadpool to using disposable threads.\n"; + std::cout << " (This may take a while.)\n"; + { + auto func = [](int i) -> double { // The threads will run this function which maps an integer to a double. + double d = 0; + constexpr int iRounds = 8 * 1024; + for (int jj = 0; jj < iRounds; ++jj) + d += std::sin(i * (-jj % 1000) * (0.001 * 2 * 3.141592654)); + return d; + }; + + int iCores = 2 * std::thread::hardware_concurrency(); + while (iCores > std::thread::hardware_concurrency() / 8) { + std::cout << "Using " << iCores << " cores... "; + std::cout.flush(); + + // Start the thread pool and put the threads to idle. + auto thrPool = Lazy::ThreadPool(func, iCores); + + // Allocate input and output vectors + std::vector vecIn {-1,1,-2,2,-3,3,-4,4,-5,5,-6,6,-7,7,-8,8}; + std::vector vecOut(vecIn.size()); + std::vector vecRef(vecIn.size()); // The correct answer + for (size_t i = 0; i < vecIn.size(); ++i) + vecRef[i] = func(vecIn[i]); + + auto start = std::chrono::system_clock::now(); + // Reuse the threadpool 1024 times + for (int jj = 0; jj < 1024 * 8; ++jj) { + thrPool.run(vecIn.data(), vecOut.data(), vecOut.size()); + for (size_t i = 0; i < vecIn.size(); ++i) // Verify the result + if (vecOut[i] != vecRef[i]) + std::cerr << "ERROR at index " << i << std::endl; + } // for jj + auto end = std::chrono::system_clock::now(); + std::chrono::duration diff = end-start; + std::cout << "ThreadPool: Time = " << diff.count() << std::endl; + + if (iCores == std::thread::hardware_concurrency()) { // Single shot tester + std::cout << "Compare to " << iCores << " disposable threads... "; + std::cout.flush(); + auto start = std::chrono::system_clock::now(); + // Reuse the threadpool 1024 times + for (int jj = 0; jj < 1024 * 8; ++jj) { + vecOut = Lazy::runForAll(vecIn, func); + for (size_t i = 0; i < vecIn.size(); ++i) // Verify the result + if (vecOut[i] != vecRef[i]) + std::cerr << "ERROR at index " << i << std::endl; + } // for jj + auto end = std::chrono::system_clock::now(); + std::chrono::duration diff = end-start; + std::cout << " Time = " << diff.count() << std::endl; + } + iCores /= 2; + } + } + } diff --git a/include/Lazy/Lazy.h b/include/Lazy/Lazy.h index 24e461b..9ea7a5a 100644 --- a/include/Lazy/Lazy.h +++ b/include/Lazy/Lazy.h @@ -13,6 +13,7 @@ #include #include #include +#include // A library for running functions in parallel // using future-like objects with continuation @@ -24,6 +25,16 @@ namespace Lazy { +template +void atomic_print(Args&&... args) +{ + std::stringstream ss; + (ss << ... << args) << '\n'; + std::cout << ss.str(); +} + +using std::size_t; + // An std::stop_token - like class. // All it does is to wrap an atomic integer. class StopToken @@ -63,18 +74,18 @@ class StopToken std::atomic _value{0}; }; -// An std::vector - like object which behaves as if it was filled with std::iota, +// An std::vector - like object which behaves as if it was filled with std::iota, // meaning that vec[i] = i for i = 0...size()-1. // Also iterators vec.begin() and vec.end() work so range-based for-loops work. class Sequence { public: - using value_type = const std::size_t; - using size_type = std::size_t; + using value_type = const size_t; + using size_type = size_t; - Sequence(std::size_t sz = 0) : N(sz) {} + Sequence(size_t sz = 0) : N(sz) {} - value_type operator[](std::size_t i) const noexcept + value_type operator[](size_t i) const noexcept { return i; } @@ -97,13 +108,13 @@ class Sequence class Iterator { public: typedef std::bidirectional_iterator_tag iterator_category; - typedef const std::size_t value_type; + typedef const size_t value_type; typedef std::ptrdiff_t difference_type; typedef value_type* pointer; typedef value_type& reference; Iterator() : _n(0), _maxN(0) {} - Iterator(std::size_t n, std::size_t mx) : _n(n), _maxN(mx) {} + Iterator(size_t n, size_t mx) : _n(n), _maxN(mx) {} reference operator*() const { @@ -149,8 +160,8 @@ class Sequence } private: - std::size_t _n = 0; - std::size_t _maxN = 0; + size_t _n = 0; + size_t _maxN = 0; }; // class Iterator Iterator begin() const @@ -164,7 +175,7 @@ class Sequence } private: - std::size_t N; + size_t N; }; // class Sequence struct Empty @@ -172,6 +183,201 @@ struct Empty void operator()() const noexcept {} }; +// Solves the return type, argument types and noexcept status of callable F. +template +struct TypeSolver +{ + template< class R, class... Args > + static R resultOfFunction(std::function); + + template< class R, class... Args > + static std::tuple argumentsAsTupleOf(std::function); + + template< class R, class... Args > + static std::is_nothrow_invocable isNoExcept(std::function); + + constexpr TypeSolver(F) {}; // Dummy constructor for class template argument deduction. + using ResultType = decltype(resultOfFunction(std::function{std::declval()})); + using ArgumentTypes = decltype(argumentsAsTupleOf(std::function{std::declval()})); + using NoExceptType = decltype(isNoExcept(std::function{std::declval()})); +}; + +// Result type of callable F. +template +using ResultType = typename TypeSolver::ResultType; + +// Arguments of callable F as a tuple. +template +using ArgumentTypes = typename TypeSolver::ArgumentTypes; + +// I'th argument type of callable F. +template +using ArgumentType = std::tuple_element_t>; + +// std::true_type if callable F is noexcept, otherwise std::false_type. +template +using NoExceptType = typename TypeSolver::NoExceptType; + +// Threadpool for starting threads and leaving them idle waiting for work. +template +class ThreadPool +{ + Func _func; // Worker function which the worker threads will call. + + std::atomic_size_t _numTasksStarted = 0; // Number of tasks in progress + std::atomic_size_t _exceptionCount = 0; // Number of exceptions thrown + std::size_t _numAllTasks = 0; // Total number of tasks to be done (i.e. the length of input vector) + bool _go = false; // True if run() method has given tasks to threads. + bool _destructorCalled = false; // ThreadPool is being destructed. + int _numThreadsRunning = 0; // Number of active threads (i.e. not waiting in a condition variable) + int _threadCount = 0; // Number of threads in the pool. + + std::condition_variable _cvStart; // cv where the worker threads are waiting for work + std::condition_variable _cvEnd; // cv where the main thread is waiting the the work to finish. + std::mutex _mtx; + StopToken _stopToken; // Pointer to the token may be passed to work functions. + + std::exception_ptr _pException; // Pointer to possible exception thrown by a worker function. + std::vector _threads; // Worker threads + + // Number of input arguments in the function + static constexpr size_t _numArgs = std::tuple_size>{}; + + // True if the first argument of the user's function is a stop token. + static constexpr bool _bUseStopToken = (_numArgs == 2) && std::is_same_v, StopToken*>; + + using InputType = ArgumentType<_bUseStopToken, Func>; + using OutputType = ResultType; + + // Pointers to the beginnings of input and output data arrays + const InputType* _pInput = nullptr; + OutputType * _pOutput = nullptr; + + // The function does not return a value? + static constexpr bool _bVoid = std::is_same_v; + +public: + ThreadPool(Func func, int numThreads = 0) : _func(func) + { + static_assert(_numArgs == 1 || _bUseStopToken, + "The function must either take one input argument or a stop_token pointer and an input argument"); + _threadCount = (numThreads <= 0) ? std::thread::hardware_concurrency() : numThreads; + _threads.reserve(_threadCount); + + // Worker to run in each thread of the thread pool. + auto worker = [this] { + while(true) + { + { + std::unique_lock lk(_mtx); + _cvStart.wait(lk, [this]{return _go || _destructorCalled;}); + if (_go) + ++_numThreadsRunning; // This many threads are not waiting in _cvStart + if (_destructorCalled) + return; + } + + // Make local copies to avoid indirect access in the loop + const auto numAllTasks = _numAllTasks; + auto pInput = _pInput; + auto pOutput = _pOutput; + + do { + // The input will be read and output stores at this index. + auto index = _numTasksStarted.fetch_add(1); + if (index < numAllTasks) { + try { + if constexpr (_bUseStopToken) { + if constexpr (_bVoid) + _func(&_stopToken, pInput[index]); + else + pOutput[index] = _func(&_stopToken, pInput[index]); + } else { + if constexpr (_bVoid) + _func(pInput[index]); + else + pOutput[index] = _func(pInput[index]); + } + } + catch (...) { + if (_exceptionCount++ == 0) // Only one exception will be stored + _pException = std::current_exception(); + } // catch + } // if + } while (_numTasksStarted.load() < numAllTasks); + + // Note: None of the worker threads will come to this point + // until all the work has been done (i.e. _numTasksStarted >= numAllTasks) + bool bWasLast; + { + const std::lock_guard lock(_mtx); + _go = false; + // True if this is the last thread that finished the do - while loop above. + bWasLast = (--_numThreadsRunning == 0); + } + + if (bWasLast) + _cvEnd.notify_one(); + } + }; // worker + + // Start the threads. + for (int i = 0; i < _threadCount; ++i) + _threads.push_back(std::thread(worker)); + } + + ~ThreadPool() + { + { + std::lock_guard lk(_mtx); + _destructorCalled = true; + } + // Notify thr workers that we are dying. + _cvStart.notify_all(); + + for(std::thread& thr : _threads) + if (thr.joinable()) + thr.join(); + } + + // Give workers numElements items of work. The inputs are in the array pointed by pIn + // and the output goes to the array pointed by pOut. + // Returns the number of finished tasks which should be the same as numElements. + void run(const InputType* pIn, OutputType* pOut, size_t numElements) + { + if (numElements == 0) + return; + + if constexpr (_bUseStopToken) + _stopToken.setValue(0); + + { + const std::lock_guard lock(_mtx); + _numTasksStarted = 0; + _exceptionCount = 0; + _numAllTasks = numElements; + _numThreadsRunning = 0; + _pInput = pIn; + _pOutput = pOut; + _pException = nullptr; + _go = true; // Set the running state from idle to active. + } + + // Let the workers know that there is _numAllTasks items of work to do. + _cvStart.notify_all(); + { // Wait until all threads are back to idle state. + std::unique_lock lk(_mtx); + _cvEnd.wait(lk, [this]{ return _go == false && _numThreadsRunning==0; }); + _numAllTasks = 0; + } + + // Deal with possible exception. + if (_pException) + std::rethrow_exception(_pException); + } +}; // class ThreadPool + + // A promise-like object which is always allocated from the stack. // If holds an other promise and the function which is applied // at set_value() before storing the result to the upper level promise. @@ -179,7 +385,7 @@ template class Promise { public: - Promise(Prom p, Func f) : _prom(p), _fun(f){}; + Promise(Prom p, Func f) : _prom(p), _fun(f) {}; // Overload for _fun with parameters template @@ -516,7 +722,7 @@ auto future() // Calls get() on the given futures and returns the values as a tuple. // The set of futures and their states are also tuples. -template +template auto getResults(Futures&& futs, States&& states, std::index_sequence) { return std::make_tuple(std::get(futs).get(std::get(states))...); @@ -532,7 +738,7 @@ auto runFutures(Futs&&... fs) } // Helper function for sorting out the index sequence for the tuple of futures. -template +template auto runParallelAsTuple(FutTuple&& futs, std::index_sequence) { static_assert(std::tuple_size_v == sizeof...(I), "Index sequence doesn't match."); @@ -651,7 +857,7 @@ auto runForAll(const Vec& vecX, Func&& func) pException = std::current_exception(); } } // if - } while (szNumStartedTasks.load() <= vecX.size()); + } while (szNumStartedTasks.load() < vecX.size()); }; // worker if constexpr (MaxThreads > 0) { // Threadpool is an array of threads living in stack. @@ -664,7 +870,7 @@ auto runForAll(const Vec& vecX, Func&& func) thr.join(); } else { // Threadpool is a vector of threads living in heap. - auto uNumThreads = std::min(std::size_t(std::thread::hardware_concurrency()), vecX.size()); + auto uNumThreads = std::min(size_t(std::thread::hardware_concurrency()), vecX.size()); std::vector vecThreadPool(uNumThreads); for (auto& thr : vecThreadPool) thr = std::thread(worker); @@ -695,10 +901,10 @@ auto runForAll(const Vec& vecX, Func&& func) } // Helper for array overload of runForAll(...) -template +template auto runForAllInArray(const Arr& arrX, Func&& func, std::index_sequence) { - constexpr std::size_t N = sizeof...(I); + constexpr size_t N = sizeof...(I); using U = typename Arr::value_type; // input type // The functions take stop token as the first parameter @@ -730,7 +936,7 @@ auto runForAllInArray(const Arr& arrX, Func&& func, std::index_sequence) // Executes "y = func(x)" for each x in array arrX in a separate thread. // There will be as many parallel threads as there are elements in the array. // Returns an array of y's. -template +template auto runForAll(const std::array& arrX, Func&& func) { return runForAllInArray(arrX, std::forward(func), std::make_index_sequence{}); @@ -778,7 +984,7 @@ auto runForAll(const std::vector& x, F1&& f1, F2&& f2, Funcs&&... funcs) // Note: MaxThreads template parameter is ignored. // There are always as many threads as there are elements in the array. -template +template auto runForAll(const std::array& arrX, F1&& f1, F2&& f2, Funcs&&... funcs) { auto nestedFuncs = [&f1, &f2, &funcs...](auto t) { return nested(t, f1, f2, funcs...); };