Skip to content

Commit

Permalink
Keep pipeline of workers (implementation is more similar to coming di…
Browse files Browse the repository at this point in the history
…stributed implementation)
  • Loading branch information
rasolca committed Jul 18, 2023
1 parent f679c7a commit eac7a63
Showing 1 changed file with 34 additions and 23 deletions.
57 changes: 34 additions & 23 deletions include/dlaf/eigensolver/band_to_tridiag/mc.h
Original file line number Diff line number Diff line change
Expand Up @@ -709,8 +709,13 @@ TridiagResult<T, Device::CPU> BandToTridiag<Backend::MC, D, T>::call_L(
};

// Maximum size / (2b-1) sweeps can be executed in parallel.
// const auto max_workers =
// std::min(ceilDiv(size, 2 * b - 1), 2 * to_SizeType(get_num_threads("default")));
const auto max_workers =
2 * std::min(ceilDiv(size, 2 * b - 1), 2 * to_SizeType(get_num_threads("default")));

vector<Pipeline<SweepWorker<T>>> workers;
workers.reserve(max_workers);
for (SizeType i = 0; i < max_workers; ++i)
workers.emplace_back(SweepWorker<T>(size, b));

auto sem = std::make_shared<pika::counting_semaphore<>>(0);

Expand All @@ -732,10 +737,8 @@ TridiagResult<T, Device::CPU> BandToTridiag<Backend::MC, D, T>::call_L(
ex::start_detached(std::move(prev_dep));

auto run_sweep = [a_ws, size, nb, b](SemaphorePtr&& sem, SemaphorePtr&& sem_next, SizeType sweep,
const TileVectorPtr& tiles_v) {
SweepWorker<T> worker(size, b);
SweepWorker<T>& worker, const TileVectorPtr& tiles_v) {
const SizeType nr_steps = nrStepsForSweep(sweep, size, b);
worker.start_sweep(sweep, *a_ws);
for (SizeType step = 0; step < nr_steps; ++step) {
SizeType j_el_tl = sweep % nb;
// ii_el is the row element index with origin in the first row of the diagonal tile.
Expand Down Expand Up @@ -765,31 +768,40 @@ TridiagResult<T, Device::CPU> BandToTridiag<Backend::MC, D, T>::call_L(
blas::copy(n_e, (BaseType<T>*) a_ws->ptr(1, start), inc, tile_t.ptr({0, 1}), 1);
};

auto before_sweep = [](SemaphorePtr&& sem) { sem->acquire(); };
auto init_sweep = [a_ws](SemaphorePtr&& sem, SizeType sweep, SweepWorker<T>& worker) {
sem->acquire();
worker.start_sweep(sweep, *a_ws);
return std::move(sem);
};

auto before_sweep_copy_tridiag = [copy_tridiag_task,
nb](SemaphorePtr&& sem, SizeType sweep,
const matrix::Tile<BaseType<T>, Device::CPU>& tile_t) {
auto init_sweep_copy_tridiag = [a_ws, copy_tridiag_task,
nb](SemaphorePtr&& sem, SizeType sweep, SweepWorker<T>& worker,
const matrix::Tile<BaseType<T>, Device::CPU>& tile_t) {
sem->acquire();
copy_tridiag_task(sweep - nb, nb, nb, tile_t);
worker.start_sweep(sweep, *a_ws);
copy_tridiag_task(sweep - (nb - 1), nb, nb, tile_t);
return std::move(sem);
};

const SizeType sweeps = nrSweeps<T>(size);
ex::any_sender<TileVectorPtr> tiles_v;

for (SizeType sweep = 0; sweep < sweeps; ++sweep) {
auto& w_pipeline = workers[sweep % max_workers];
auto sem_next = std::make_shared<pika::counting_semaphore<>>(0);
ex::unique_any_sender<> dep;
if (sweep == 0 || sweep % nb != 0) {
dep = ex::just(sem) | dlaf::internal::transform(policy_hp, before_sweep);
ex::unique_any_sender<SemaphorePtr> sem_sender;
if ((sweep + 1) % nb != 0) {
sem_sender = ex::ensure_started(ex::when_all(ex::just(std::move(sem), sweep), w_pipeline()) |
dlaf::internal::transform(policy_hp, init_sweep));
}
else {
const auto tile_index = (sweep - 1) / nb;
dep = ex::when_all(ex::just(sem, sweep), mat_trid.readwrite(GlobalTileIndex{tile_index, 0})) |
dlaf::internal::transform(policy_hp, before_sweep_copy_tridiag);
const auto tile_index = sweep / nb;
sem_sender = ex::ensure_started(ex::when_all(ex::just(std::move(sem), sweep), w_pipeline(),
mat_trid.readwrite(GlobalTileIndex{tile_index, 0})) |
dlaf::internal::transform(policy_hp, init_sweep_copy_tridiag));
}
const SizeType i = sweep / nb;
if (sweep % nb == 0) {
const SizeType i = sweep / nb;
tiles_v = ex::when_all_vector(matrix::select(
mat_v, common::iterate_range2d(LocalTileIndex{i, i}, LocalTileSize{n - i, 1}))) |
ex::then([](auto&& vector) {
Expand All @@ -798,7 +810,7 @@ TridiagResult<T, Device::CPU> BandToTridiag<Backend::MC, D, T>::call_L(
ex::split();
}

ex::when_all(ex::just(sem, sem_next, sweep), tiles_v, std::move(dep)) |
ex::when_all(std::move(sem_sender), ex::just(sem_next, sweep), w_pipeline(), tiles_v) |
dlaf::internal::transformDetach(policy_hp, run_sweep);
sem = std::move(sem_next);
}
Expand All @@ -811,14 +823,13 @@ TridiagResult<T, Device::CPU> BandToTridiag<Backend::MC, D, T>::call_L(
dlaf::internal::transformDetach(policy_hp, copy_tridiag_task);
};

auto dep = ex::just(sem) | dlaf::internal::transform(policy_hp, before_sweep) | ex::split();
auto dep = ex::just(sem) |
dlaf::internal::transform(policy_hp, [](SemaphorePtr&& sem) { sem->acquire(); }) |
ex::split();

// copy the last elements of the diagonals
// As for real types only size - 2 sweeps are performed, make sure that all the elements are copied.
if (!isComplex_v<T> && (size - 2) % nb == 0) {
copy_tridiag(size - 2, dep);
}
if ((size - 1) % nb == 0) {
if (!isComplex_v<T> && (size - 1) % nb == 0) {
copy_tridiag(size - 1, dep);
}
copy_tridiag(size, std::move(dep));
Expand Down

0 comments on commit eac7a63

Please sign in to comment.