From 4bc5a6871f541cf55ba3b246c674112f7d4bf5c8 Mon Sep 17 00:00:00 2001 From: hitonanode <32937551+hitonanode@users.noreply.github.com> Date: Wed, 25 Sep 2024 00:21:34 +0900 Subject: [PATCH 1/2] add ParallelRunner --- multithread/parallel_runner.hpp | 152 ++++++++++++++++++++++++++++++++ multithread/parallel_runner.md | 36 ++++++++ 2 files changed, 188 insertions(+) create mode 100644 multithread/parallel_runner.hpp create mode 100644 multithread/parallel_runner.md diff --git a/multithread/parallel_runner.hpp b/multithread/parallel_runner.hpp new file mode 100644 index 00000000..0e667fe3 --- /dev/null +++ b/multithread/parallel_runner.hpp @@ -0,0 +1,152 @@ +#ifndef PARALLEL_RUNNER_HPP +#define PARALLEL_RUNNER_HPP + +#include +#include +#include +#include +#include +#include +#include +#include + +template +concept ISolver = requires(const T ct, T t) { + t = {}; + { t.solve() } -> std::same_as; + { t.read_stdin() } -> std::same_as; + { ct.dump_stdout(std::declval()) } -> std::same_as; +}; + +template class ParallelRunner { +public: + int num_threads_; + std::vector instances; + std::vector> rets; + + std::mutex mtx; + + ParallelRunner(int num_threads = std::thread::hardware_concurrency()) + : num_threads_(num_threads > 0 ? num_threads : 1) { + std::cerr << "num_threads: " << num_threads_ << std::endl; + } + + void read_all(int num_testcases) { + instances.clear(); + instances.reserve(num_testcases); + for (int i = 0; i < num_testcases; ++i) { + instances.emplace_back(Solver{}); + instances.back().read_stdin(); + } + } + + void run_sequential() { + rets.assign(instances.size(), std::nullopt); + + for (int index = 0; index < (int)instances.size(); ++index) { + run_single_(index); + if (rets.at(index).has_value()) { + mhc_stdout_(instances.at(index), rets.at(index).value(), index); + } + } + } + + void run_parallel(int num_skip = 0) { + rets.assign(instances.size(), std::nullopt); + + const int num_inputs = instances.size(); + std::vector> futures; + + std::atomic index(num_skip < 0 ? 0 : num_skip); + std::vector is_finished(num_inputs, false); + int num_written = 0; + + for (int i = 0; i < num_threads_; ++i) { + futures.push_back(std::async(std::launch::async, [&]() { + while (true) { + const int current_index = index.fetch_add(1); + if (current_index >= num_inputs) break; + + run_single_(current_index); + + { + std::unique_lock lock(mtx); + is_finished.at(current_index) = true; + while (num_written < num_inputs and is_finished.at(num_written)) { + if (rets.at(num_written).has_value()) { + mhc_stdout_(instances.at(num_written), + rets.at(num_written).value(), num_written); + } + ++num_written; + } + } + } + })); + } + + for (auto &f : futures) f.get(); + } + + void run_single_(int current_index) { + { + std::unique_lock lock(mtx); + std::cerr << "[#" << current_index + 1 << "] start" << std::endl; + } + + auto start = std::chrono::steady_clock::now(); + + try { + rets.at(current_index) = instances.at(current_index).solve(); + } catch (const std::exception &e) { + std::unique_lock lock(mtx); + std::cerr << "Error in Case #" << current_index + 1 << ": " << e.what() << std::endl; + return; + } catch (...) { + std::unique_lock lock(mtx); + std::cerr << "Unknown error in Case #" << current_index + 1 << std::endl; + return; + } + + auto end = std::chrono::steady_clock::now(); + auto msec = std::chrono::duration_cast(end - start).count(); + + { + std::unique_lock lock(mtx); + std::cerr << "[#" << current_index + 1 << "] end, " << msec << " ms" << std::endl; + } + } + + static void mhc_stdout_(const Solver &result, const Solver::Ret &sol, int index) { + std::cout << "Case #" << index + 1 << ": "; + result.dump_stdout(sol); + std::cout << std::flush; + } +}; + +#endif // PARALLEL_RUNNER_HPP + +/* Usage: +struct Solver { + using Ret = int; + + void read_stdin() { + // read input using std::cin + } + + Ret solve() { + // solve the problem + } + + void dump_stdout(const Ret &ret) const { + // output the result using std::cout + // std::cout << ret << std::endl; + } +}; + +int T; +cin >> T; + +ParallelRunner pm; +pm.read_all(T); +pm.run_parallel(); +*/ diff --git a/multithread/parallel_runner.md b/multithread/parallel_runner.md new file mode 100644 index 00000000..2f85d518 --- /dev/null +++ b/multithread/parallel_runner.md @@ -0,0 +1,36 @@ +--- +title: Parallel runner (複数テストケースのマルチスレッド並列実行) +documentation_of: ./parallel_runner.hpp +--- + +ダウンロードしたテストケースに対してプログラムを手元で実行して結果を提出する形式の競技において,複数テストケースの並列処理を行うためのコード. + +## 使用例 + +``` cpp +struct Solver { + using Ret = int; + + void read_stdin() { + // read input using std::cin + } + + Ret solve() { + // solve the problem + } + + void dump_stdout(const Ret &ret) const { + // output the result using std::cout + // std::cout << ret << std::endl; + } +}; + +int main() { + int T; + cin >> T; + + ParallelRunner pm; + pm.read_all(T); + pm.run_parallel(); +} +``` From 864dc6b64436ed42e99f32bfd64b1524228926d9 Mon Sep 17 00:00:00 2001 From: hitonanode <32937551+hitonanode@users.noreply.github.com> Date: Wed, 25 Sep 2024 00:24:03 +0900 Subject: [PATCH 2/2] erase old multithread_example.cpp --- multithread/multithread_example.cpp | 65 ----------------------------- 1 file changed, 65 deletions(-) delete mode 100644 multithread/multithread_example.cpp diff --git a/multithread/multithread_example.cpp b/multithread/multithread_example.cpp deleted file mode 100644 index 4a705242..00000000 --- a/multithread/multithread_example.cpp +++ /dev/null @@ -1,65 +0,0 @@ -// マルチスレッドでローカル実行高速化 コンパイル時のオプションに -pthread が必要 -#include -using namespace std; -using pint = pair; -#define FOR(i, begin, end) for (int i = (begin), i##_end_ = (end); i < i##_end_; i++) -#define IFOR(i, begin, end) for (int i = (end) - 1, i##_begin_ = (begin); i >= i##_begin_; i--) -#define REP(i, n) FOR(i, 0, n) -#define IREP(i, n) IFOR(i, 0, n) -#define dbg(x) cerr << #x << " = " << (x) << " (L" << __LINE__ << ") " << __FILE__ << endl; - -struct SingleCaseInput { - int N, M; - vector query; - SingleCaseInput(int n, int m, const vector &q) : N(n), M(m), query(q) {} -}; -vector testcase; - -vector ret; - -/* 各テストケースの完了状況を保持 */ -mutex mtx; -vector done; - -void solve(int tc) { /* tc個目のテストケースを処理する関数 */ } - -void run() { - /* 未完了で最も番号が若いテストケースを処理 */ - int i = 0; - while (true) { - mtx.lock(); - while (i < done.size() and done[i]) i++; - if (i < done.size()) done[i] = 1; - mtx.unlock(); - if (i >= done.size()) break; - solve(i); - } -} - -int main() { - auto START = chrono::system_clock::now(); - std::cin.tie(nullptr), std::ios::sync_with_stdio(false); - - int TC; - cin >> TC; - REP(tc, TC) { - int N, M; - cin >> N >> M; - vector q(M); - REP(i, M) cin >> q[i].first >> q[i].second; - testcase.emplace_back(N, M, q); - } - ret.resize(TC); - done.assign(TC, 0); - - size_t n_thread = - thread::hardware_concurrency(); // スレッド数を自動取得. 8や16と手で設定してもよい - if (n_thread == 0) n_thread = 1; - - vector threads(n_thread); - REP(i, n_thread) { threads[i] = thread(run); } - REP(i, n_thread) { threads[i].join(); } - REP(tc, TC) { cout << "Case #" << tc + 1 << ": " << ret[tc] << endl; } - dbg(chrono::duration_cast(std::chrono::system_clock::now() - START) - .count()); -}