Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added .github/.keep
Empty file.
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,29 @@
[![Review Assignment Due Date](https://classroom.github.com/assets/deadline-readme-button-22041afd0340ce965d47ae6ef1cefeee28c7c493a6346c4f15d667ab976d596c.svg)](https://classroom.github.com/a/AwTYhPar)
# Лабораторная работа № 1: определение достижимости параллелизма и реализация параллельных алгоритмов.

Шаги выполнения:
1) Выберите один из алгоритмов обхода графа (BFS или BFS).

После тщательного анализа своих возможностей был выбран алгоритм обхода графа в ширину (BFS).

2) Разберитесь с выбранным алгоритмом и выделите основные этапы его выполнения. Идентифицируйте зависимости между этапами и выберите те, которые можно эффективно распараллелить (для этого постройте граф зависимостей (можно в голове))

На каждом шаге поровну распределяем между потоками вершины из очереди и в каждом потоке обрабатываем каждую назначенную вершину (добавляем непосещенных соседей в следующую очередь)

3) Напишите программу на выбранном вами языке программирования (java, c++), реализующую выбранный алгоритм с учётом параллельных возможностей.

Допустим, написал.

4) С помощью инструментов (ThreadSanitizer && Helgrind для С++, JCStress тесты для Java) проанализировать программу на предмет отсутствия ошибок синхронизации данных. Если ошибок не нашлось, то внести их и найти.

В invalidParallelBFS используется не atomic boolean для прохода по вершинам. incorrectStressTest должен падать на этом.

5) Эксперименты и анализ результатов:\
Проведите эксперименты, измеряя производительность параллельной реализации алгоритма на различных объемах входных данных. Сравните результаты с последовательной версией и опишите полученные выводы.
* Постройте график зависимости времени выполнения параллельной версий алгоритма от выделенных ресурсов.
* Постройте график зависимости времени выполнения параллельной и последовательной версий алгоритма в зависимости от объема входных данных.\
\
**Загрузить графики в отдельную директорию в репозитории** \
**Для построения графиков можно воспользоваться чем угодно**

plot_bfs_performance.py + tmp dir
23 changes: 22 additions & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
plugins {
kotlin("jvm") version "1.9.20"
java
application
}

Expand All @@ -12,10 +13,14 @@ repositories {

dependencies {
testImplementation(kotlin("test"))
testImplementation("org.openjdk.jcstress:jcstress-core:0.16")
testAnnotationProcessor("org.openjdk.jcstress:jcstress-core:0.16")
}

tasks.test {
useJUnitPlatform()
minHeapSize = "4g"
maxHeapSize = "8g"
}

kotlin {
Expand All @@ -24,4 +29,20 @@ kotlin {

application {
mainClass.set("MainKt")
}
}

// JCStress runner task: runs JCStress tests located on the test runtime classpath
// Use: ./gradlew jcstress [-PjcstressArgs="-v -m quick"]
tasks.register<JavaExec>("jcstress") {
group = "verification"
description = "Run JCStress stress tests"
mainClass.set("org.openjdk.jcstress.Main")
classpath = sourceSets.test.get().runtimeClasspath
dependsOn("testClasses")

val argsProp = project.findProperty("jcstressArgs") as String?
if (!argsProp.isNullOrBlank()) {
args = argsProp.split("\\s+".toRegex())
}
}

127 changes: 127 additions & 0 deletions plot_bfs_performance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
import os
import glob
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np

def plot_by_thread(df, filename):
"""Plot performance vs thread count."""
plt.figure(figsize=(10, 6))
plt.plot(df['threads'], df['time_ms'], marker='o', linewidth=2, markersize=8)
plt.xlabel('Number of Threads', fontsize=12)
plt.ylabel('Time (ms)', fontsize=12)
plt.title('Parallel BFS Performance by Thread Count', fontsize=14, fontweight='bold')
plt.grid(True, alpha=0.3)
plt.xticks(df['threads'])

# Add value labels on points
for _, row in df.iterrows():
plt.annotate(f"{row['time_ms']:.1f}",
(row['threads'], row['time_ms']),
textcoords="offset points",
xytext=(0,10),
ha='center', fontsize=9)

output_file = filename.replace('.csv', '_plot.png')
plt.tight_layout()
plt.savefig(output_file, dpi=300, bbox_inches='tight')
print(f" Saved: {output_file}")
plt.close()

def plot_by_data_size(df, filename):
"""Plot performance vs data size (vertices)."""
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 6))

# Plot 1: Serial vs Parallel comparison
ax1.plot(df['vertices'], df['serial_time_ms'], marker='o', linewidth=2,
markersize=8, label='Serial BFS', color='#2E86AB')
ax1.plot(df['vertices'], df['parallel_time_ms'], marker='s', linewidth=2,
markersize=8, label='Parallel BFS', color='#A23B72')
ax1.set_xlabel('Number of Vertices', fontsize=12)
ax1.set_ylabel('Time (ms)', fontsize=12)
ax1.set_title('BFS Performance: Serial vs Parallel', fontsize=13, fontweight='bold')
ax1.set_xscale('log')
ax1.set_yscale('log')
ax1.legend(fontsize=11)
ax1.grid(True, alpha=0.3, which='both')

# Plot 2: Speedup
speedup = df['serial_time_ms'] / df['parallel_time_ms']
ax2.plot(df['vertices'], speedup, marker='D', linewidth=2,
markersize=8, color='#F18F01')
ax2.axhline(y=1, color='r', linestyle='--', alpha=0.5, label='No speedup')
ax2.set_xlabel('Number of Vertices', fontsize=12)
ax2.set_ylabel('Speedup (Serial / Parallel)', fontsize=12)
ax2.set_title('Parallel BFS Speedup', fontsize=13, fontweight='bold')
ax2.set_xscale('log')
ax2.legend(fontsize=11)
ax2.grid(True, alpha=0.3, which='both')

# Add value labels on speedup points
for i, (_, row) in enumerate(df.iterrows()):
if speedup.iloc[i] > 0:
ax2.annotate(f"{speedup.iloc[i]:.2f}x",
(row['vertices'], speedup.iloc[i]),
textcoords="offset points",
xytext=(0,10),
ha='center', fontsize=9)

plt.tight_layout()
output_file = filename.replace('.csv', '_plot.png')
plt.savefig(output_file, dpi=300, bbox_inches='tight')
print(f" Saved: {output_file}")
plt.close()

def main():
"""Main function to find and plot all BFS performance CSV files."""
tmp_dir = 'tmp'

if not os.path.exists(tmp_dir):
print(f"Error: Directory '{tmp_dir}' not found!")
return

# Find all CSV files matching the pattern (handling both typo and correct spelling)
patterns = [
os.path.join(tmp_dir, '*bfs_perfomance*.csv'), # typo version
os.path.join(tmp_dir, '*bfs_performance*.csv') # correct spelling
]

csv_files = []
for pattern in patterns:
csv_files.extend(glob.glob(pattern))

# Remove duplicates
csv_files = list(set(csv_files))

if not csv_files:
print(f"No BFS performance CSV files found in '{tmp_dir}' directory!")
return

print(f"Found {len(csv_files)} CSV file(s) to plot:\n")

for csv_file in sorted(csv_files):
print(f"Processing: {csv_file}")
try:
df = pd.read_csv(csv_file)

# Detect file type based on columns
if 'threads' in df.columns and 'time_ms' in df.columns:
print(" Type: Performance by thread count")
plot_by_thread(df, csv_file)
elif 'vertices' in df.columns and 'serial_time_ms' in df.columns and 'parallel_time_ms' in df.columns:
print(" Type: Performance by data size")
plot_by_data_size(df, csv_file)
else:
print(f" Warning: Unknown CSV format. Columns: {list(df.columns)}")
print(f" Skipping...")

except Exception as e:
print(f" Error processing {csv_file}: {e}")

print()

print("All plots generated successfully!")

if __name__ == '__main__':
main()

128 changes: 125 additions & 3 deletions src/main/java/org/itmo/Graph.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerArray;

class Graph {
private final int V;
Expand All @@ -23,12 +23,130 @@ void addEdge(int src, int dest) {
}
}

void parallelBFS(int startVertex) {
// Generated by Egor Sviridenko's brain
int parallelBFS(int startVertex) {
return parallelBFS(startVertex, Runtime.getRuntime().availableProcessors());
}

int parallelBFS(int startVertex, int threads) {
AtomicInteger visitedCounter = new AtomicInteger();
AtomicIntegerArray visited = new AtomicIntegerArray(V);
visited.compareAndSet(startVertex, 0, 1);

ExecutorService executorService = Executors.newFixedThreadPool(threads);

// Read-only while concurrent operations are in process
List<Integer> currentQueue = new ArrayList<>();
currentQueue.add(startVertex);

while (!currentQueue.isEmpty()) {
List<Integer> nextQueue = Collections.synchronizedList(new ArrayList<>());
List<Future<?>> futures = new ArrayList<>();

// example for threads=5, currentQueue.size()=13: 3 3 3 2 2
int maxThreadInterval = (currentQueue.size() - 1) / threads + 1;
int maxThreadIntervalCount = currentQueue.size() % threads == 0 ? threads : currentQueue.size() % threads;
int minThreadIntervalBeginIndex = maxThreadInterval * maxThreadIntervalCount;

int start = 0;

while (start < currentQueue.size()) {
int currentInterval = start < minThreadIntervalBeginIndex
? maxThreadInterval
: maxThreadInterval - 1;

List<Integer> currentSublist = currentQueue.subList(start, start + currentInterval);
futures.add(executorService.submit(() -> currentSublist.forEach(vertex ->
adjList[vertex].forEach(nextVertex -> {
if (visited.compareAndSet(nextVertex, 0, 1)) {
nextQueue.add(nextVertex);
visitedCounter.incrementAndGet();
}
})
)));

start += currentInterval;
}

futures.forEach(future -> {
try {
future.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});

currentQueue.clear();
currentQueue.addAll(nextQueue);
}

executorService.shutdown();
return visitedCounter.get();
}

// Generated by Egor Sviridenko's brain
int invalidParallelBFS(int startVertex) {
AtomicInteger visitedCounter = new AtomicInteger();
boolean[] visited = new boolean[V];
visited[startVertex] = true;

int threads = Runtime.getRuntime().availableProcessors();
ExecutorService executorService = Executors.newFixedThreadPool(threads);

// Read-only while concurrent operations are in process
List<Integer> currentQueue = new ArrayList<>();
currentQueue.add(startVertex);

while (!currentQueue.isEmpty()) {
List<Integer> nextQueue = Collections.synchronizedList(new ArrayList<>());
List<Future<?>> futures = new ArrayList<>();

// example for threads=5, currentQueue.size()=13: 3 3 3 2 2
int maxThreadInterval = (currentQueue.size() - 1) / threads + 1;
int maxThreadIntervalCount = currentQueue.size() % threads == 0 ? threads : currentQueue.size() % threads;
int minThreadIntervalBeginIndex = maxThreadInterval * maxThreadIntervalCount;

int start = 0;

while (start < currentQueue.size()) {
int currentInterval = start < minThreadIntervalBeginIndex
? maxThreadInterval
: maxThreadInterval - 1;

List<Integer> currentSublist = currentQueue.subList(start, start + currentInterval);
futures.add(executorService.submit(() -> currentSublist.forEach(vertex ->
adjList[vertex].forEach(nextVertex -> {
if (!visited[nextVertex]) {
visited[nextVertex] = true;
nextQueue.add(nextVertex);
visitedCounter.incrementAndGet();
}
})
)));

start += currentInterval;
}

futures.forEach(future -> {
try {
future.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});

currentQueue.clear();
currentQueue.addAll(nextQueue);
}

executorService.shutdown();
return visitedCounter.get();
}

//Generated by ChatGPT
void bfs(int startVertex) {
int bfs(int startVertex) {
boolean[] visited = new boolean[V];
int visitedCounter = 0;

LinkedList<Integer> queue = new LinkedList<>();

Expand All @@ -42,9 +160,13 @@ void bfs(int startVertex) {
if (!visited[n]) {
visited[n] = true;
queue.add(n);
visitedCounter++;
}
}

}

return visitedCounter;
}

}
13 changes: 13 additions & 0 deletions src/main/java/org/itmo/UnsafeCounter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package org.itmo;

public class UnsafeCounter {
private int counter = 0;

public void increment() {
counter++; // <-- гонка данных
}

public int get() {
return counter;
}
}
Loading