Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reimplementation of local band-to-tridiagonal #938

Merged
merged 9 commits into from
Jul 21, 2023
191 changes: 100 additions & 91 deletions include/dlaf/eigensolver/band_to_tridiag/mc.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#pragma once

#include <pika/execution.hpp>
#include <pika/semaphore.hpp>

#include <dlaf/blas/tile.h>
#include <dlaf/common/index2d.h>
Expand Down Expand Up @@ -475,7 +476,8 @@ class SweepWorker {
startSweepInternal(sweep, a);
}

void compactCopyToTile(matrix::Tile<T, Device::CPU>& tile_v, TileElementIndex index) const noexcept {
void compactCopyToTile(const matrix::Tile<T, Device::CPU>& tile_v,
TileElementIndex index) const noexcept {
tile_v(index) = tau();
common::internal::SingleThreadedBlasScope single;
blas::copy(sizeHHR() - 1, v() + 1, 1, tile_v.ptr(index) + 1, 1);
Expand Down Expand Up @@ -648,6 +650,7 @@ template <class CommSender, class PromiseSender>
template <Device D, class T>
TridiagResult<T, Device::CPU> BandToTridiag<Backend::MC, D, T>::call_L(
const SizeType b, Matrix<const T, D>& mat_a) noexcept {
// TODO: Rewrite
// Note on the algorithm and dependency tracking:
// The algorithm is composed by n-2 (real) or n-1 (complex) sweeps:
// The i-th sweep is initialized by init_sweep which act on the i-th column of the band matrix.
Expand All @@ -674,29 +677,29 @@ TridiagResult<T, Device::CPU> BandToTridiag<Backend::MC, D, T>::call_L(
using util::ceilDiv;

using pika::resource::get_num_threads;
using SemaphorePtr = std::shared_ptr<pika::counting_semaphore<>>;
using TileVectorPtr = std::shared_ptr<std::vector<matrix::Tile<T, Device::CPU>>>;
rasolca marked this conversation as resolved.
Show resolved Hide resolved

namespace ex = pika::execution::experimental;

const auto policy_hp = dlaf::internal::Policy<Backend::MC>(pika::execution::thread_priority::high);

// note: A is square and has square blocksize
const SizeType size = mat_a.size().cols();
const SizeType nrtiles = mat_a.nrTiles().cols();
const SizeType n = mat_a.nrTiles().cols();
const SizeType nb = mat_a.blockSize().cols();

// Need share pointer to keep the allocation until all the tasks are executed.
auto a_ws = std::make_shared<BandBlock<T>>(size, b);

Matrix<BaseType<T>, Device::CPU> mat_trid({size, 2}, {nb, 2});
Matrix<T, Device::CPU> mat_v({size, size}, {nb, nb});
const auto& dist_v = mat_v.distribution();
// const auto& dist_v = mat_v.distribution();

if (size == 0) {
return {std::move(mat_trid), std::move(mat_v)};
}

const auto max_deps_size = nrtiles;
vector<pika::execution::experimental::any_sender<>> deps;
deps.reserve(max_deps_size);

auto copy_diag = [a_ws](SizeType j, auto source) {
return a_ws->template copyDiag<D>(j, std::move(source));
};
Expand All @@ -705,110 +708,116 @@ TridiagResult<T, Device::CPU> BandToTridiag<Backend::MC, D, T>::call_L(
return a_ws->template copyOffDiag<D>(j, std::move(source));
};

// Maximum size / (2b-1) sweeps can be executed in parallel.
// const auto max_workers =
// std::min(ceilDiv(size, 2 * b - 1), 2 * to_SizeType(get_num_threads("default")));

auto sem = std::make_shared<pika::counting_semaphore<>>(0);

ex::unique_any_sender<> prev_dep = ex::just();
// Copy the band matrix
for (SizeType k = 0; k < nrtiles; ++k) {
auto sf = copy_diag(k * nb, mat_a.read(GlobalTileIndex{k, k})) | ex::split();
if (k < nrtiles - 1) {
auto sf2 =
copy_offdiag(k * nb, ex::when_all(std::move(sf), mat_a.read(GlobalTileIndex{k + 1, k}))) |
ex::split();
deps.push_back(std::move(sf2));
for (SizeType k = 0; k < n; ++k) {
SizeType nr_release = nb / b;
ex::unique_any_sender<> dep = copy_diag(k * nb, mat_a.read(GlobalTileIndex{k, k}));
if (k < n - 1) {
dep = copy_offdiag(k * nb, ex::when_all(std::move(dep), mat_a.read(GlobalTileIndex{k + 1, k})));
}
else {
deps.push_back(sf);
// Add one to make sure to unlock the last step of the first sweep.
nr_release = ceilDiv(size - k * nb, b) + 1;
}
prev_dep = ex::when_all(ex::just(nr_release, sem), std::move(prev_dep), std::move(dep)) |
dlaf::internal::transform(policy_hp, [](SizeType nr, auto&& sem) { sem->release(nr); });
rasolca marked this conversation as resolved.
Show resolved Hide resolved
}

// Maximum size / (2b-1) sweeps can be executed in parallel, however due to task combination it reduces
// to size / (2nb-1).
const auto max_workers =
std::min(ceilDiv(size, 2 * nb - 1), 2 * to_SizeType(get_num_threads("default")));

vector<Pipeline<SweepWorker<T>>> workers;
workers.reserve(max_workers);
for (SizeType i = 0; i < max_workers; ++i)
workers.emplace_back(SweepWorker<T>(size, b));

auto init_sweep = [a_ws](SizeType sweep, SweepWorker<T>& worker) { worker.startSweep(sweep, *a_ws); };
auto cont_sweep = [a_ws, b](SizeType nr_steps, SweepWorker<T>& worker,
matrix::Tile<T, Device::CPU>&& tile_v, TileElementIndex index) {
for (SizeType j = 0; j < nr_steps; ++j) {
worker.compactCopyToTile(tile_v, index + TileElementSize(j * b, 0));
ex::start_detached(std::move(prev_dep));

auto run_sweep = [a_ws, size, nb, b](SemaphorePtr&& sem, SemaphorePtr&& sem_next, SizeType sweep,
const TileVectorPtr& tiles_v) {
SweepWorker<T> worker(size, b);
const SizeType nr_steps = nrStepsForSweep(sweep, size, b);
worker.startSweep(sweep, *a_ws);
for (SizeType step = 0; step < nr_steps; ++step) {
SizeType j_el_tl = sweep % nb;
// ii_el is the row element index with origin in the first row of the diagonal tile.
SizeType i_el = j_el_tl / b * b + step * b;
worker.compactCopyToTile((*tiles_v)[to_sizet(i_el / nb)], TileElementIndex(i_el % nb, j_el_tl));
sem->acquire();
worker.doStep(*a_ws);
sem_next->release(1);
}
// Make sure to unlock the last step of the next sweep
sem_next->release(1);
};

auto policy_hp = dlaf::internal::Policy<Backend::MC>(pika::execution::thread_priority::high);
auto copy_tridiag = [policy_hp, a_ws, &mat_trid](SizeType sweep, auto&& dep) {
auto copy_tridiag_task = [a_ws](SizeType start, SizeType n_d, SizeType n_e, auto tile_t) {
auto inc = a_ws->ld() + 1;
if (isComplex_v<T>)
// skip imaginary part if Complex.
inc *= 2;

common::internal::SingleThreadedBlasScope single;
blas::copy(n_d, (BaseType<T>*) a_ws->ptr(0, start), inc, tile_t.ptr({0, 0}), 1);
blas::copy(n_e, (BaseType<T>*) a_ws->ptr(1, start), inc, tile_t.ptr({0, 1}), 1);
};
auto copy_tridiag_task = [a_ws](SizeType start, SizeType n_d, SizeType n_e,
const matrix::Tile<BaseType<T>, Device::CPU>& tile_t) {
auto inc = a_ws->ld() + 1;
if (isComplex_v<T>)
// skip imaginary part if Complex.
inc *= 2;

const auto size = mat_trid.size().rows();
const auto nb = mat_trid.blockSize().rows();
if (sweep % nb == nb - 1 || sweep == size - 1) {
const auto tile_index = sweep / nb;
const auto start = tile_index * nb;
dlaf::internal::whenAllLift(start, std::min(nb, size - start), std::min(nb, size - 1 - start),
mat_trid.readwrite(GlobalTileIndex{tile_index, 0}),
std::forward<decltype(dep)>(dep)) |
dlaf::internal::transformDetach(policy_hp, copy_tridiag_task);
}
else {
ex::start_detached(std::forward<decltype(dep)>(dep));
}
common::internal::SingleThreadedBlasScope single;
blas::copy(n_d, (BaseType<T>*) a_ws->ptr(0, start), inc, tile_t.ptr({0, 0}), 1);
blas::copy(n_e, (BaseType<T>*) a_ws->ptr(1, start), inc, tile_t.ptr({0, 1}), 1);
};

const SizeType steps_per_task = nb / b;
const SizeType sweeps = nrSweeps<T>(size);
for (SizeType sweep = 0, last_dep = deps.size() - 1; sweep < sweeps; ++sweep) {
auto& w_pipeline = workers[sweep % max_workers];

auto dep = dlaf::internal::whenAllLift(sweep, w_pipeline(), deps[0]) |
dlaf::internal::transform(policy_hp, init_sweep);
copy_tridiag(sweep, std::move(dep));

const SizeType steps = nrStepsForSweep(sweep, size, b);

SizeType last_dep_new = 0;
for (SizeType step = 0; step < steps;) {
// First task might apply less steps to align with the boundaries of the HHR tile v.
SizeType nr_steps = steps_per_task - (step == 0 ? (sweep % nb) / b : 0);
// Last task only applies the remaining steps
nr_steps = std::min(nr_steps, steps - step);

auto dep_index = std::min(ceilDiv(step + nr_steps, nb / b), last_dep);
auto before_sweep = [](SemaphorePtr&& sem) { sem->acquire(); };

const GlobalElementIndex index_v((sweep / b + step) * b, sweep);

SizeType set_index = ceilDiv(step, nb / b);
auto before_sweep_copy_tridiag = [copy_tridiag_task,
nb](SemaphorePtr&& sem, SizeType sweep,
const matrix::Tile<BaseType<T>, Device::CPU>& tile_t) {
sem->acquire();
copy_tridiag_task(sweep - nb, nb, nb, tile_t);
};

deps[set_index] = dlaf::internal::whenAllLift(nr_steps, w_pipeline(),
mat_v.readwrite(dist_v.globalTileIndex(index_v)),
dist_v.tileElementIndex(index_v), deps[dep_index]) |
dlaf::internal::transform(policy_hp, cont_sweep) | ex::split();
const SizeType sweeps = nrSweeps<T>(size);
ex::any_sender<TileVectorPtr> tiles_v;

last_dep_new = set_index;
step += nr_steps;
for (SizeType sweep = 0; sweep < sweeps; ++sweep) {
auto sem_next = std::make_shared<pika::counting_semaphore<>>(0);
ex::unique_any_sender<> dep;
if (sweep == 0 || sweep % nb != 0) {
dep = ex::just(sem) | dlaf::internal::transform(policy_hp, before_sweep);
}

// Limit next sweep to only use valid senders from this sweep.
last_dep = last_dep_new;
else {
const auto tile_index = (sweep - 1) / nb;
dep = ex::when_all(ex::just(sem, sweep), mat_trid.readwrite(GlobalTileIndex{tile_index, 0})) |
dlaf::internal::transform(policy_hp, before_sweep_copy_tridiag);
}
const SizeType i = sweep / nb;
if (sweep % nb == 0) {
tiles_v = ex::when_all_vector(matrix::select(
mat_v, common::iterate_range2d(LocalTileIndex{i, i}, LocalTileSize{n - i, 1}))) |
ex::then([](auto&& vector) {
return std::make_shared<typename TileVectorPtr::element_type>(std::move(vector));
rasolca marked this conversation as resolved.
Show resolved Hide resolved
}) |
ex::split();
msimberg marked this conversation as resolved.
Show resolved Hide resolved
}
// TODO needs trasformDetach with policy_hp or the priority come from the dependency?
ex::start_detached(ex::when_all(ex::just(sem, sem_next, sweep), tiles_v, std::move(dep)) |
ex::then(run_sweep));
rasolca marked this conversation as resolved.
Show resolved Hide resolved
sem = std::move(sem_next);
}

auto copy_tridiag = [policy_hp, a_ws, size, nb, &mat_trid, copy_tridiag_task](SizeType i, auto&& dep) {
const auto tile_index = (i - 1) / nb;
const auto start = tile_index * nb;
ex::when_all(ex::just(start, std::min(nb, size - start), std::min(nb, size - 1 - start)),
mat_trid.readwrite(GlobalTileIndex{tile_index, 0}), std::forward<decltype(dep)>(dep)) |
dlaf::internal::transformDetach(policy_hp, copy_tridiag_task);
};

auto dep = ex::just(sem) | dlaf::internal::transform(policy_hp, before_sweep) | ex::split();

// copy the last elements of the diagonals
if (!isComplex_v<T>) {
// only needed for real types as they don't perform sweep size-2
copy_tridiag(size - 2, deps[0]);
// As for real types only size - 2 sweeps are performed, make sure that all the elements are copied.
if (!isComplex_v<T> && (size - 2) % nb == 0) {
copy_tridiag(size - 2, dep);
}
if ((size - 1) % nb == 0) {
copy_tridiag(size - 1, dep);
}
copy_tridiag(size - 1, std::move(deps[0]));
copy_tridiag(size, std::move(dep));

return {std::move(mat_trid), std::move(mat_v)};
}
Expand Down