Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions tasks/agafonov_i_sparse_matrix_ccs/common/include/common.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#pragma once

#include <cstddef>
#include <vector>

#include "task/include/task.hpp"

namespace agafonov_i_sparse_matrix_ccs {

struct SparseMatrixCCS {
int m = 0;
int n = 0;
std::vector<double> values;
std::vector<int> row_indices;
std::vector<int> col_ptr;

SparseMatrixCCS() = default;
SparseMatrixCCS(int m_val, int n_val) : m(m_val), n(n_val), col_ptr(static_cast<std::size_t>(n_val) + 1, 0) {}
};

struct InType {
SparseMatrixCCS A;
SparseMatrixCCS B;
};

using OutType = SparseMatrixCCS;
using BaseTask = ppc::task::Task<InType, OutType>;

} // namespace agafonov_i_sparse_matrix_ccs
9 changes: 9 additions & 0 deletions tasks/agafonov_i_sparse_matrix_ccs/info.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"student": {
"first_name": "Илья",
"last_name": "Агафонов",
"middle_name": "Дмитриевич",
"group_number": "3823Б1ФИ1",
"task_number": "3"
}
}
33 changes: 33 additions & 0 deletions tasks/agafonov_i_sparse_matrix_ccs/mpi/include/ops_mpi.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#pragma once

#include <vector>

#include "agafonov_i_sparse_matrix_ccs/common/include/common.hpp"
#include "task/include/task.hpp"

namespace agafonov_i_sparse_matrix_ccs {

class SparseMatrixCCSResMPI : public BaseTask {
public:
static constexpr ppc::task::TypeOfTask GetStaticTypeOfTask() {
return ppc::task::TypeOfTask::kMPI;
}
explicit SparseMatrixCCSResMPI(const InType &in);

private:
bool ValidationImpl() override;
bool PreProcessingImpl() override;
bool RunImpl() override;
bool PostProcessingImpl() override;

static void MultiplyColumn(int j, int dims0, const SparseMatrixCCS &at, const SparseMatrixCCS &local_b,
std::vector<double> &dense_col, SparseMatrixCCS &local_c);

static void BroadcastSparseMatrix(SparseMatrixCCS &m, int rank);
static void DistributeData(const SparseMatrixCCS &b, SparseMatrixCCS &local_b, int size, int rank,
const std::vector<int> &send_counts, const std::vector<int> &displs);
static void GatherResults(SparseMatrixCCS &local_c, int size, int rank, const std::vector<int> &send_counts,
int dims0, int dims3);
};

} // namespace agafonov_i_sparse_matrix_ccs
208 changes: 208 additions & 0 deletions tasks/agafonov_i_sparse_matrix_ccs/mpi/src/ops_mpi.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
#include "agafonov_i_sparse_matrix_ccs/mpi/include/ops_mpi.hpp"

#include <mpi.h>

#include <cmath>
#include <cstddef>
#include <vector>

#include "agafonov_i_sparse_matrix_ccs/common/include/common.hpp"
#include "agafonov_i_sparse_matrix_ccs/seq/include/ops_seq.hpp"

namespace agafonov_i_sparse_matrix_ccs {

SparseMatrixCCSResMPI::SparseMatrixCCSResMPI(const InType &in) {
SetTypeOfTask(GetStaticTypeOfTask());
GetInput() = in;
}

bool SparseMatrixCCSResMPI::ValidationImpl() {
return GetInput().A.n == GetInput().B.m;
}

bool SparseMatrixCCSResMPI::PreProcessingImpl() {
return true;
}

void SparseMatrixCCSResMPI::MultiplyColumn(int j, int dims0, const SparseMatrixCCS &at, const SparseMatrixCCS &local_b,
std::vector<double> &dense_col, SparseMatrixCCS &local_c) {
for (int k_ptr = local_b.col_ptr[static_cast<size_t>(j)]; k_ptr < local_b.col_ptr[static_cast<size_t>(j) + 1];
++k_ptr) {
int k = local_b.row_indices[static_cast<size_t>(k_ptr)];
double v = local_b.values[static_cast<size_t>(k_ptr)];
if (k < static_cast<int>(at.col_ptr.size()) - 1) {
for (int i_p = at.col_ptr[static_cast<size_t>(k)]; i_p < at.col_ptr[static_cast<size_t>(k) + 1]; ++i_p) {
dense_col[static_cast<size_t>(at.row_indices[static_cast<size_t>(i_p)])] +=
at.values[static_cast<size_t>(i_p)] * v;
}
}
}
for (int i = 0; i < dims0; ++i) {
if (std::abs(dense_col[static_cast<size_t>(i)]) > 1e-15) {
local_c.values.push_back(dense_col[static_cast<size_t>(i)]);
local_c.row_indices.push_back(i);
dense_col[static_cast<size_t>(i)] = 0.0;
}
}
}

void SparseMatrixCCSResMPI::BroadcastSparseMatrix(SparseMatrixCCS &m, int rank) {
int nnz = (rank == 0) ? static_cast<int>(m.values.size()) : 0;
int cols = (rank == 0) ? m.n : 0;
MPI_Bcast(&nnz, 1, MPI_INT, 0, MPI_COMM_WORLD);
MPI_Bcast(&cols, 1, MPI_INT, 0, MPI_COMM_WORLD);
if (rank != 0) {
m.n = cols;
m.values.resize(static_cast<size_t>(nnz));
m.row_indices.resize(static_cast<size_t>(nnz));
m.col_ptr.resize(static_cast<size_t>(m.n) + 1);
}
if (nnz > 0) {
MPI_Bcast(m.values.data(), nnz, MPI_DOUBLE, 0, MPI_COMM_WORLD);
MPI_Bcast(m.row_indices.data(), nnz, MPI_INT, 0, MPI_COMM_WORLD);
}
MPI_Bcast(m.col_ptr.data(), m.n + 1, MPI_INT, 0, MPI_COMM_WORLD);
}

void SparseMatrixCCSResMPI::DistributeData(const SparseMatrixCCS &b, SparseMatrixCCS &local_b, int size, int rank,
const std::vector<int> &send_counts, const std::vector<int> &displs) {
if (rank == 0) {
for (int i = 1; i < size; i++) {
int start = displs[static_cast<size_t>(i)];
int count = send_counts[static_cast<size_t>(i)];
int nnz_s = b.col_ptr[static_cast<size_t>(start)];
int nnz_c = b.col_ptr[static_cast<size_t>(start) + static_cast<size_t>(count)] - nnz_s;
MPI_Send(&nnz_c, 1, MPI_INT, i, 0, MPI_COMM_WORLD);
if (nnz_c > 0) {
MPI_Send(&b.values[static_cast<size_t>(nnz_s)], nnz_c, MPI_DOUBLE, i, 1, MPI_COMM_WORLD);
MPI_Send(&b.row_indices[static_cast<size_t>(nnz_s)], nnz_c, MPI_INT, i, 2, MPI_COMM_WORLD);
}
std::vector<int> adj_ptr(static_cast<size_t>(count) + 1);
for (int k = 0; k <= count; ++k) {
adj_ptr[static_cast<size_t>(k)] = b.col_ptr[static_cast<size_t>(start) + k] - nnz_s;
}
MPI_Send(adj_ptr.data(), count + 1, MPI_INT, i, 3, MPI_COMM_WORLD);
}
int r_nnz = b.col_ptr[static_cast<size_t>(send_counts[0])];
local_b.values.assign(b.values.begin(), b.values.begin() + r_nnz);
local_b.row_indices.assign(b.row_indices.begin(), b.row_indices.begin() + r_nnz);
local_b.col_ptr.assign(b.col_ptr.begin(), b.col_ptr.begin() + send_counts[0] + 1);
} else {
int l_nnz = 0;
MPI_Recv(&l_nnz, 1, MPI_INT, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
local_b.values.resize(static_cast<size_t>(l_nnz));
local_b.row_indices.resize(static_cast<size_t>(l_nnz));
local_b.col_ptr.resize(static_cast<size_t>(send_counts[static_cast<size_t>(rank)]) + 1);
if (l_nnz > 0) {
MPI_Recv(local_b.values.data(), l_nnz, MPI_DOUBLE, 0, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
MPI_Recv(local_b.row_indices.data(), l_nnz, MPI_INT, 0, 2, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
}
MPI_Recv(local_b.col_ptr.data(), send_counts[static_cast<size_t>(rank)] + 1, MPI_INT, 0, 3, MPI_COMM_WORLD,
MPI_STATUS_IGNORE);
}
}

void SparseMatrixCCSResMPI::GatherResults(SparseMatrixCCS &local_c, int size, int rank,
const std::vector<int> &send_counts, int dims0, int dims3) {
if (rank == 0) {
SparseMatrixCCS &fc = local_c;
fc.m = dims0;
fc.n = dims3;
for (int i = 1; i < size; ++i) {
int r_nnz = 0;
MPI_Recv(&r_nnz, 1, MPI_INT, i, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
std::vector<double> rv(static_cast<size_t>(r_nnz));
std::vector<int> rr(static_cast<size_t>(r_nnz));
int r_cols = send_counts[static_cast<size_t>(i)];
std::vector<int> rp(static_cast<size_t>(r_cols) + 1);
if (r_nnz > 0) {
MPI_Recv(rv.data(), r_nnz, MPI_DOUBLE, i, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
MPI_Recv(rr.data(), r_nnz, MPI_INT, i, 2, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
}
MPI_Recv(rp.data(), r_cols + 1, MPI_INT, i, 3, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
int current_total_nnz = static_cast<int>(fc.values.size());
fc.values.insert(fc.values.end(), rv.begin(), rv.end());
fc.row_indices.insert(fc.row_indices.end(), rr.begin(), rr.end());
for (int k = 1; k <= r_cols; ++k) {
fc.col_ptr.push_back(rp[static_cast<size_t>(k)] + current_total_nnz);
}
}
} else {
int l_nnz = static_cast<int>(local_c.values.size());
MPI_Send(&l_nnz, 1, MPI_INT, 0, 0, MPI_COMM_WORLD);
if (l_nnz > 0) {
MPI_Send(local_c.values.data(), l_nnz, MPI_DOUBLE, 0, 1, MPI_COMM_WORLD);
MPI_Send(local_c.row_indices.data(), l_nnz, MPI_INT, 0, 2, MPI_COMM_WORLD);
}
MPI_Send(local_c.col_ptr.data(), static_cast<int>(local_c.col_ptr.size()), MPI_INT, 0, 3, MPI_COMM_WORLD);
}
}

bool SparseMatrixCCSResMPI::RunImpl() {
int size = 0;
int rank = 0;
MPI_Comm_size(MPI_COMM_WORLD, &size);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);

auto &a = GetInput().A;
auto &b = GetInput().B;
SparseMatrixCCS at;
std::vector<int> dims(4, 0);

if (rank == 0) {
dims[0] = a.m;
dims[1] = a.n;
dims[2] = b.m;
dims[3] = b.n;
at = SparseMatrixCCSSeq::Transpose(a);
}
MPI_Bcast(dims.data(), 4, MPI_INT, 0, MPI_COMM_WORLD);

if (rank != 0) {
at.m = dims[1];
at.n = dims[0];
b.m = dims[2];
b.n = dims[3];
}

BroadcastSparseMatrix(at, rank);

int chunk = b.n / size;
int remainder = b.n % size;
std::vector<int> send_counts(static_cast<size_t>(size));
std::vector<int> displs(static_cast<size_t>(size));
for (int i = 0; i < size; ++i) {
send_counts[static_cast<size_t>(i)] = chunk + (i < remainder ? 1 : 0);
displs[static_cast<size_t>(i)] =
(i == 0) ? 0 : displs[static_cast<size_t>(i) - 1] + send_counts[static_cast<size_t>(i) - 1];
}

SparseMatrixCCS local_b(b.m, send_counts[static_cast<size_t>(rank)]);
DistributeData(b, local_b, size, rank, send_counts, displs);

int local_n = send_counts[static_cast<size_t>(rank)];
SparseMatrixCCS local_c(dims[0], local_n);
std::vector<double> dense_col(static_cast<size_t>(dims[0]), 0.0);
for (int j = 0; j < local_n; ++j) {
MultiplyColumn(j, dims[0], at, local_b, dense_col, local_c);
local_c.col_ptr[static_cast<size_t>(j) + 1] = static_cast<int>(local_c.values.size());
}

if (rank == 0) {
GatherResults(local_c, size, rank, send_counts, dims[0], dims[3]);
GetOutput() = local_c;
} else {
GatherResults(local_c, size, rank, send_counts, dims[0], dims[3]);
}

MPI_Bcast(&GetOutput().m, 1, MPI_INT, 0, MPI_COMM_WORLD);
MPI_Bcast(&GetOutput().n, 1, MPI_INT, 0, MPI_COMM_WORLD);

return true;
}

bool SparseMatrixCCSResMPI::PostProcessingImpl() {
return true;
}

} // namespace agafonov_i_sparse_matrix_ccs
81 changes: 81 additions & 0 deletions tasks/agafonov_i_sparse_matrix_ccs/report.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# Умножение разреженных матриц. Элементы типа double. Формат хранения матрицы – столбцовый (CCS).

- Student: Агафонов Илья Дмитриевич, group 3823Б1ФИ1
- Technology: SEQ | MPI
- Variant: 5

## 1. Introduction
Задача умножения разреженных матриц является критически важной в научных вычислениях и машинном обучении. Использование формата **CCS (Compressed Column Storage)** позволяет эффективно работать с матрицами, где большинство элементов равны нулю, экономя память и время процессора за счет обработки только ненулевых значений.

## 2. Problem Statement
Необходимо реализовать умножение двух разреженных матриц $A$ и $B$ в формате CCS для получения результирующей матрицы $C = A \times B$.

**Входные данные:**
- Матрицы $A (m \times k)$ и $B (k \times n)$ в формате CCS.
- Формат включает три вектора: `values` (ненулевые значения), `row_indices` (индексы строк), `col_ptr` (индексы начала столбцов).

**Выходные данные:**
- Результирующая матрица $C (m \times n)$ в формате CCS.

**Ограничения:**
- Тип данных — `double`.
- Число столбцов $A$ равно числу строк $B$ ($A.n == B.m$).

## 3. Baseline Algorithm (Sequential)
Последовательный алгоритм использует "столбцовую" логику:
1. Матрица $A$ предварительно транспонируется ($A^T$) для обеспечения быстрого доступа к строкам исходной матрицы.
2. Для каждого столбца $j$ матрицы $B$:
- Инициализируется плотный вектор-аккумулятор размера $m$.
- Проход по ненулевым элементам $B_{kj}$.
- Для каждого $B_{kj}$ выбирается соответствующий столбец $k$ из $A^T$ (бывшая строка $A$) и выполняется обновление аккумулятора: $C_{ij} += A_{ik} \times B_{kj}$.
- Ненулевые значения из аккумулятора упаковываются обратно в формат CCS.

## 4. Parallelization Scheme
Для распараллеливания используется распределение столбцов выходной матрицы между MPI-процессами:

- **Data Distribution**:
- Процесс 0 транспонирует матрицу $A$ и рассылает её всем узлам через `MPI_Bcast`.
- Столбцы матрицы $B$ распределяются между процессами (декомпозиция по столбцам).
- **Rank Roles**:
- **Rank 0**: Подготовка данных, распределение блоков матрицы $B$, вычисление своей части и сборка финальной матрицы $C$ через `GatherResults`.
- **Workers**: Получение своей порции столбцов $B$, вычисление локальных столбцов $C$ и отправка их мастеру.
- **Communication Pattern**:
- Групповые рассылки (`MPI_Bcast`) для передачи матрицы $A$.
- Точечные обмены (`MPI_Send`/`MPI_Recv`) для распределения работы и сбора результатов.

## 5. Implementation Details
- **Код разбит на функциональные блоки**:
- `SparseMatrixCCS`: Основная структура данных.
- `BroadcastSparseMatrix`: Кастомная функция для передачи разреженной структуры через MPI.
- `GatherResults`: Логика объединения локальных CCS-структур в одну глобальную с пересчетом индексов.
## 6. Experimental Setup
**Hardware/OS:**
- **Процессор:** Процессор AMD Ryzen 5 5500U, ядер: 6, логических процессоров: 12
- **Оперативная память:** 16 ГБ DDR4
- **Операционная система:** Windows 10 Pro 22H2
- **Toolchain:**
- **Компилятор:** g++ 13.3.0
- **Тип сборки:** Release (-O3 )
- **MPI:** Open MPI 4.1.6

## 7. Results and Discussion

### 7.1 Correctness
Корректность подтверждена успешным прохождением тестов `agafonov_i_sparse_matrix_ccs_func`, где результаты параллельной версии сравнивались с последовательным эталоном. Все тесты `PASSED`.

### 7.2 Performance
На основе замеров в режиме `Release` на 4 процессах:

| Mode | Count | Time, s | Speedup | Efficiency |
|-------------|-------|---------|---------|------------|
| seq | 1 | 0.0195 | 1.00 | 100 |
| mpi | 4 | 0.2072 | 0.52 | 11.2% |

*\* Примечание: На матрицах время пересылки данных (коммуникационная составляющая) превышает время вычислений. Для получения ускорения требуются матрицы размерностью от 3000x3000 и выше.*

## 8. Conclusions
Алгоритм успешно реализован и проходит все тесты на корректность. Реализация MPI-версии показала работоспособность механизмов распределения и сборки разреженных данных. Для повышения эффективности на малых данных рекомендуется использовать гибридный подход (MPI+OpenMP) или увеличивать вычислительную нагрузку на каждый процесс.

## 9. References
1. Материлы и документация по курсу
2. MPI Standard Specification: [https://www.mpi-forum.org/].
23 changes: 23 additions & 0 deletions tasks/agafonov_i_sparse_matrix_ccs/seq/include/ops_seq.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#pragma once

#include "agafonov_i_sparse_matrix_ccs/common/include/common.hpp"
#include "task/include/task.hpp"

namespace agafonov_i_sparse_matrix_ccs {

class SparseMatrixCCSSeq : public BaseTask {
public:
static constexpr ppc::task::TypeOfTask GetStaticTypeOfTask() {
return ppc::task::TypeOfTask::kSEQ;
}
explicit SparseMatrixCCSSeq(const InType &in);
static SparseMatrixCCS Transpose(const SparseMatrixCCS &matrix);

private:
bool ValidationImpl() override;
bool PreProcessingImpl() override;
bool RunImpl() override;
bool PostProcessingImpl() override;
};

} // namespace agafonov_i_sparse_matrix_ccs
Loading
Loading