Skip to content

Commit

Permalink
Merge pull request #11 from tirimatangi/thread-pool
Browse files Browse the repository at this point in the history
Thread pool added
  • Loading branch information
tirimatangi authored Mar 1, 2022
2 parents 9014b12 + ce923f6 commit 233ee8e
Show file tree
Hide file tree
Showing 3 changed files with 453 additions and 25 deletions.
74 changes: 68 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ You can also use rudimentary stop tokens for letting a parallel task tell the ot
## Three basic ways to use the library


#### 1. Run any number of functions in parallel and get the results as a tuple
### 1. Run any number of functions in parallel and get the results as a tuple

In this example, one of the three parallel functions may throw.

Expand All @@ -43,11 +43,16 @@ The output will be `i = 100, d = 3.16228, s = "10"`.
There are more examples on how to use `Lazy::runParallel` in [example-1.cc](https://github.com/tirimatangi/Lazy/blob/main/examples/example-1.cc).
For an example on how to use stop tokens to communicate between the functions, see example 1.2 in [example-1.cc](https://github.com/tirimatangi/Lazy/blob/main/examples/example-1.cc).
#### 2. Vector in, vector out
### 2. Vector in, vector out using either disposable threads or a threadpool
#### 2.1. Disposable threads
You can run a function or a set of continuations in parallel for each element of the input vector and get the results as a vector.
By default, the maximum number of parallel threads in the thread pool is the number of cores in your machine.
You can set the thread pool size manually with template argument, for example `auto vecOutput = Lazy::runForAll<128>(vecInput, ...);`
By default, the maximum number of parallel threads is the number of cores in your machine.
You can set the number of threads manually with template argument, for example `auto vecOutput = Lazy::runForAll<128>(vecInput, ...);`
The threads will be disposable in the sense that the threads will die when the function returns.
Examples 2.1 to 2.6 in [example-2.cc](https://github.com/tirimatangi/Lazy/blob/main/examples/example-2.cc) show how to use
disposable threads with function `Lazy::runForAll`.
Here is an example on running a sequence of 3 continuations where the inputs are in std::vector<int> and the output is an std::vector<double>.
Expand Down Expand Up @@ -178,8 +183,65 @@ For other methods provided by `Lazy::StopToken`, see `class StopToken` in the be

For many more examples on how to use `Lazy::runForAll`, see [example-2.cc](https://github.com/tirimatangi/Lazy/blob/main/examples/example-2.cc).

#### 2.2. Threadpool

If you repeatedly call the same function with different input and output vectors, it may be faster to
launch a permanent threadpool instead of using disposable threads as described above in section 2.1.
However, disposable threads are lock-free whereas the threadpool must use a mutex and condition
variables to manage the states of the threads. Hence, you should measure which method gives
a better performance in your use case.
Example 2-9 in [example-2.cc](https://github.com/tirimatangi/Lazy/blob/main/examples/example-2.cc)
is an example of such a measurement.

Example 2-7 gives an example of a use case where the return type of the function is a vector
so a call to the threadpool maps a vector of integers into a vector of vectors of integers.

Example 2-8 demonstrates how to run a void function in the threadpool and how to deal with
exceptions and stop tokes for cancelling the run prematurely.

Here is a brief example on how to use the threadpool. In this case,
the same input vector is used for each `multiplier` value.

```c++
{
int multiplier = 1;
auto func = [&multiplier](double x) {
return multiplier * std::exp(x);
};

std::vector<double> vecIn(256), vecOut(256);
// Fill in the input vector
for (int i = 0; i < 256; ++i)
vecIn[i] = std::cos((3.141592654 / 256) * i);

// Start the threadpool
auto thrPool = Lazy::ThreadPool(func);

// Fill in the output vector several times using different parameters
while (multiplier <= 1000000) {
// Set vecOut[i] = func(vecIn[i]) for each i in parallel.
thrPool.run(vecIn.data(), vecOut.data(), vecIn.size());
// Do something with vecOut...
std::cout << "multiplier = " << multiplier
<< ", vecOut[first] = " << vecOut[0]
<< ", vecOut[last] = " << vecOut[255] << "\n";
multiplier *= 10;
}
}
```
The output will be
```
multiplier = 1, vecOut[first] = 2.71828, vecOut[last] = 0.367907
multiplier = 10, vecOut[first] = 27.1828, vecOut[last] = 3.67907
multiplier = 100, vecOut[first] = 271.828, vecOut[last] = 36.7907
multiplier = 1000, vecOut[first] = 2718.28, vecOut[last] = 367.907
multiplier = 10000, vecOut[first] = 27182.8, vecOut[last] = 3679.07
multiplier = 100000, vecOut[first] = 271828, vecOut[last] = 36790.7
multiplier = 1000000, vecOut[first] = 2.71828e+06, vecOut[last] = 367907
```
#### 3. Use futures and continuations in manual mode
### 3. Use futures and continuations in manual mode
You can define the futures and the continuations they will run manually.
The first function in the chain of continuations can have any number of input parameters which are passed to function `Lazy::future<ReturnType>(...)`.
Expand Down Expand Up @@ -262,5 +324,5 @@ The easiest way to compile all examples is to do
If you don't want to use cmake, the examples can be compiled manually one by one. For instance, <br>
`g++ examples/example-1.cc -std=c++17 -I include/ -O3 -pthread -o example-1`

The examples have been tested with g++ 10.3.0 and clang++ 12.0.0 but any compiler which complies with c++17 standard should do.
The examples have been tested with g++ 11.2.0 and clang++ 13.0.0 but any compiler which complies with c++17 standard should do.
The compiler can be switched from gcc to clang by building the examples with `cmake examples -DCMAKE_CXX_COMPILER=clang++`.
160 changes: 160 additions & 0 deletions examples/example-2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

#include <Lazy/Lazy.h>

using namespace std::chrono_literals;

template <class... Args>
void atomic_print(Args&&... args)
{
Expand Down Expand Up @@ -280,4 +282,162 @@ int main()
assert(vecFrac2[i] == vecFractionOut[i]);
}
}

// Example 2-7: Threadpool example: Use a function whose return type is a vector so
// the output is a vector of vectors.
std::cout << "\n*** Example 2-7 *** (Threadpool) Parallel calls to a function which\n";
std::cout << " inputs a number and returns a vector for each call.\n";
std::cout << " The vector contains factors of the input\n";
std::cout << " (i.e. numbers which divide the input).\n";
{
using std::vector;
// First a helper for printing out input vectors and output vectors of vectors.
auto printFactors = [](const auto& in, const auto& out)
{
for (int i = 0; i < in.size(); ++i) {
std::cout << in[i] << " : { ";
for (int j = 0; j < out[i].size(); ++j)
std::cout << out[i][j] << ' ';
std::cout << "}\n";
}
};

// findFactors returns a list of factors of x.
// E.g. factors(60) = { 2 3 4 5 6 10 12 15 20 30 }
auto findFactors = [](int x) -> vector<int> {
vector<int> vecSmall, vecLarge;
x = (x >= 0) ? x : -x;
int y = 2, yy = y*y;
while (yy < x) {
if (x % y == 0) {
vecSmall.push_back(y);
vecLarge.push_back(x / y);
}
yy += (2*y + 1); // (y+1)^2 = y*y + 2*y + 1
++y;
}
if (yy == x)
vecSmall.push_back(y);
for (int i = vecLarge.size()-1; i >= 0; --i)
vecSmall.push_back(vecLarge[i]);
return vecSmall;
};

// Start a threadpool for finding factors
auto thrPool = Lazy::ThreadPool(findFactors);

std::cout << "Doing even hundreds...\n";
vector vecEvenHundreds {200, 400, 600, 800, 813}; // Vector of inputs
vector<vector<int>> vecFactorsEven; // Vector of output vectors.
vecFactorsEven.resize(vecEvenHundreds.size());
thrPool.run(vecEvenHundreds.data(), vecFactorsEven.data(), vecEvenHundreds.size());
printFactors(vecEvenHundreds, vecFactorsEven);

std::cout << "Doing odd hundreds...\n";
vector vecOddHundreds {100, 300, 500, 700, 900, 911}; // Vector of inputs
vector<vector<int>> vecFactorsOdd; // Vector of output vectors.
vecFactorsOdd.resize(vecOddHundreds.size());
thrPool.run(vecOddHundreds.data(), vecFactorsOdd.data(), vecOddHundreds.size());
printFactors(vecOddHundreds, vecFactorsOdd);
}

// Example 2-8: (Threadpool) Use a function whose return type is void so there is no output vector.
// Also demonstrate stop tokens to cancel other jobs when one has failed.
// Also demonstrate dealing with exceptions arising from a job.
std::cout << "\n*** Example 2-8 *** (Threadpool) Demonstrate a void function with stop tokens and exceptions.\n";
{
const int badNumber = 43;
auto func = [badNumber](Lazy::StopToken* token, int i) {
if (*token) {
atomic_print("Token is set so bailing out, i = ", i);
return;
}

if (i == badNumber) {
atomic_print("Bad number! Setting token and throwing. i = ", i, ", thread id = ", std::this_thread::get_id());
token->setValue(1); // Set token to let others know to give up
throw std::runtime_error("[[Bad Number]]");
}

atomic_print("All is good. Doing some work for 100ms. i = ", i);
std::this_thread::sleep_for(100ms);
};

// Start the thread pool and put the threads to idle.
auto thrPool = Lazy::ThreadPool(func);

try {
// Allocate input vectors
std::vector<int> vecIn {0,1,2,3, badNumber, 5,6,7,8,9};

// Run the function with input. There is no output data because the return type is void.
thrPool.run(vecIn.data(), nullptr, vecIn.size());
}
catch (const std::exception& e) {
std::cout << "Exception '" << e.what() << "' caught successfully.\n";
}
}

// Example 2-9: Start a thread pool and put the threads in idle to wait for work.
// Compare performance with a varying number of parallel threads.
// Also compare the performance of the thread pool to
// runForAll() which uses disposable threads (i.e. the thread dies
// when the function returns.)
std::cout << "\n*** Example 2-9 *** (Threadpool) Compare threadpool to using disposable threads.\n";
std::cout << " (This may take a while.)\n";
{
auto func = [](int i) -> double { // The threads will run this function which maps an integer to a double.
double d = 0;
constexpr int iRounds = 8 * 1024;
for (int jj = 0; jj < iRounds; ++jj)
d += std::sin(i * (-jj % 1000) * (0.001 * 2 * 3.141592654));
return d;
};

int iCores = 2 * std::thread::hardware_concurrency();
while (iCores > std::thread::hardware_concurrency() / 8) {
std::cout << "Using " << iCores << " cores... ";
std::cout.flush();

// Start the thread pool and put the threads to idle.
auto thrPool = Lazy::ThreadPool(func, iCores);

// Allocate input and output vectors
std::vector<int> vecIn {-1,1,-2,2,-3,3,-4,4,-5,5,-6,6,-7,7,-8,8};
std::vector<double> vecOut(vecIn.size());
std::vector<double> vecRef(vecIn.size()); // The correct answer
for (size_t i = 0; i < vecIn.size(); ++i)
vecRef[i] = func(vecIn[i]);

auto start = std::chrono::system_clock::now();
// Reuse the threadpool 1024 times
for (int jj = 0; jj < 1024 * 8; ++jj) {
thrPool.run(vecIn.data(), vecOut.data(), vecOut.size());
for (size_t i = 0; i < vecIn.size(); ++i) // Verify the result
if (vecOut[i] != vecRef[i])
std::cerr << "ERROR at index " << i << std::endl;
} // for jj
auto end = std::chrono::system_clock::now();
std::chrono::duration<double> diff = end-start;
std::cout << "ThreadPool: Time = " << diff.count() << std::endl;

if (iCores == std::thread::hardware_concurrency()) { // Single shot tester
std::cout << "Compare to " << iCores << " disposable threads... ";
std::cout.flush();
auto start = std::chrono::system_clock::now();
// Reuse the threadpool 1024 times
for (int jj = 0; jj < 1024 * 8; ++jj) {
vecOut = Lazy::runForAll(vecIn, func);
for (size_t i = 0; i < vecIn.size(); ++i) // Verify the result
if (vecOut[i] != vecRef[i])
std::cerr << "ERROR at index " << i << std::endl;
} // for jj
auto end = std::chrono::system_clock::now();
std::chrono::duration<double> diff = end-start;
std::cout << " Time = " << diff.count() << std::endl;
}
iCores /= 2;
}
}

}
Loading

0 comments on commit 233ee8e

Please sign in to comment.