Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .idea/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions .idea/.name

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions .idea/gradle.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions .idea/kotlinc.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions .idea/misc.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions .idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 21 additions & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
plugins {
kotlin("jvm") version "1.9.20"
java
application
}

Expand All @@ -12,6 +13,9 @@ repositories {

dependencies {
testImplementation(kotlin("test"))
testImplementation("org.openjdk.jcstress:jcstress-core:0.16")
testAnnotationProcessor("org.openjdk.jcstress:jcstress-core:0.16")
implementation("org.knowm.xchart:xchart:3.8.7")
}

tasks.test {
Expand All @@ -24,4 +28,20 @@ kotlin {

application {
mainClass.set("MainKt")
}
}

// JCStress runner task: runs JCStress tests located on the test runtime classpath
// Use: ./gradlew jcstress [-PjcstressArgs="-v -m quick"]
tasks.register<JavaExec>("jcstress") {
group = "verification"
description = "Run JCStress stress tests"
mainClass.set("org.openjdk.jcstress.Main")
classpath = sourceSets.test.get().runtimeClasspath
dependsOn("testClasses")

val argsProp = project.findProperty("jcstressArgs") as String?
if (!argsProp.isNullOrBlank()) {
args = argsProp.split("\\s+".toRegex())
}
}

Binary file added charts/parallel_time_vs_threads.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added charts/size_vs_time.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added jcstress-results-2025-12-23-11-14-02.bin.gz
Binary file not shown.
217 changes: 210 additions & 7 deletions src/main/java/org/itmo/Graph.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package org.itmo;

import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicIntegerArray;

class Graph {
private final int V;
Expand All @@ -23,9 +25,210 @@ void addEdge(int src, int dest) {
}
}

void parallelBFS(int startVertex) {
void parallelBFS(int startVertex, Integer threadsCount) {
int numThreads = (threadsCount == null) ? Runtime.getRuntime().availableProcessors() : threadsCount;
AtomicIntegerArray visited = new AtomicIntegerArray(V);
ExecutorService pool = Executors.newFixedThreadPool(numThreads);
ConcurrentLinkedQueue<Integer> frontier = new ConcurrentLinkedQueue<>();

for (int v = 0; v < V; v++) {
visited.set(v, 0);
}

frontier.add(startVertex);
visited.set(startVertex, 1);

try {
while (!frontier.isEmpty()) {
int tasks = Math.min(numThreads, frontier.size());
CountDownLatch latch = new CountDownLatch(tasks);
List<Integer>[] nextFrontiers = new ArrayList[tasks];

for (int t = 0; t < tasks; t++) {
final int taskId = t;
pool.submit(() -> {
List<Integer> nextLevel = new ArrayList<>();
try {
Integer node;
while ((node = frontier.poll()) != null) {
for (int neighbor : adjList[node]) {
if (visited.get(neighbor) == 0 && visited.compareAndSet(neighbor, 0, 1)) {
nextLevel.add(neighbor);
}
}
}
} finally {
nextFrontiers[taskId] = nextLevel;
latch.countDown();
}
});
}

try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}

for (List<Integer> level : nextFrontiers) {
frontier.addAll(level);
}
}
} finally {
pool.shutdown();
}
}

void parallelBFSUnsafe(int startVertex, Integer threadsCount) {
int numThreads = (threadsCount == null) ? Runtime.getRuntime().availableProcessors() : threadsCount;
AtomicIntegerArray visited = new AtomicIntegerArray(V);
ExecutorService pool = Executors.newFixedThreadPool(numThreads);
ConcurrentLinkedQueue<Integer> frontier = new ConcurrentLinkedQueue<>();

for (int v = 0; v < V; v++) visited.set(v, 0);

frontier.add(startVertex);
visited.set(startVertex, 1);

try {
while (!frontier.isEmpty()) {
int tasks = Math.min(numThreads, frontier.size());
CountDownLatch latch = new CountDownLatch(tasks);
List<Integer>[] nextFrontiers = new ArrayList[tasks];

for (int t = 0; t < tasks; t++) {
final int taskId = t;
pool.submit(() -> {
List<Integer> nextLevel = new ArrayList<>();
try {
Integer node;
while ((node = frontier.poll()) != null) {
for (int neighbor : adjList[node]) {
if (visited.get(neighbor) == 0 && visited.compareAndSet(neighbor, 0, 1)) {
nextLevel.add(neighbor);
}
}
}
} finally {
nextFrontiers[taskId] = nextLevel;
latch.countDown();
}
});
}

try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}

for (List<Integer> level : nextFrontiers) frontier.addAll(level);
}
} finally {
pool.shutdown();
}
}

// void parallelBFS(int startVertex) {
// int numThreads = Runtime.getRuntime().availableProcessors();
// ExecutorService executor = Executors.newFixedThreadPool(numThreads);
//
// AtomicBoolean[] visited = new AtomicBoolean[V];
// for (int i = 0; i < V; i++)
// visited[i] = new AtomicBoolean(false);
//
// LevelQueues levelQueues = new LevelQueues();
//
// visited[startVertex].set(true);
// levelQueues.current.add(startVertex);
//
// CyclicBarrier barrier = new CyclicBarrier(numThreads + 1); // +1 — главный поток
// AtomicInteger remaining = new AtomicInteger();
//
// for (int t = 0; t < numThreads; t++) {
// executor.submit(() -> {
// while (true) {
// try {
// barrier.await();
// } catch (Exception e) {
// System.out.println("barrier.await broken");
// }
//
// if (levelQueues.current.isEmpty())
// barrier.await();
//
// Integer v;
// while ((v = levelQueues.current.poll()) != null) {
// for (int n : adjList[v]) {
// if (visited[n].compareAndSet(false, true)) {
// levelQueues.next.add(n);
// }
// }
// }
//
// if (remaining.decrementAndGet() == 0) {
// System.out.println("Level process ended");
// }
//
// try {
// barrier.await();
// } catch (Exception ignored) {
// }
// }
// });
// }
//
// try {
// while (!levelQueues.current.isEmpty()) {
// remaining.set(numThreads);
// barrier.await();
// barrier.await();
//
// levelQueues.current = levelQueues.next;
// levelQueues.next = new ConcurrentLinkedQueue<>();
// }
// } catch (Exception e) {
// System.out.println("main process barrier.await broken");
// }
//
// executor.shutdownNow();
// }
//
// void parallelBFS_2(int startVertex) {
// ExecutorService pool = Executors.newFixedThreadPool(
// Runtime.getRuntime().availableProcessors());
//
// AtomicBoolean[] visited = new AtomicBoolean[V];
// for (int i = 0; i < V; i++)
// visited[i] = new AtomicBoolean(false);
//
// ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();
// queue.add(startVertex);
// visited[startVertex].set(true);
//
// while (!queue.isEmpty()) {
// Integer v = queue.poll();
// if (v == null) continue;
//
// CompletableFuture.runAsync(() -> {
// for (int n : adjList[v]) {
// if (visited[n].compareAndSet(false, true)) {
// queue.add(n);
// }
// }
// }, pool);
// }
//
// pool.shutdown();
// }
//
// private class LevelQueues {
// public ConcurrentLinkedQueue<Integer> current = new ConcurrentLinkedQueue<>();
// public ConcurrentLinkedQueue<Integer> next = new ConcurrentLinkedQueue<>();
// }

//Generated by ChatGPT
void bfs(int startVertex) {
boolean[] visited = new boolean[V];
Expand All @@ -36,12 +239,12 @@ void bfs(int startVertex) {
queue.add(startVertex);

while (!queue.isEmpty()) {
startVertex = queue.poll();
startVertex = queue.poll(); // конкуренция за queue

for (int n : adjList[startVertex]) {
if (!visited[n]) {
if (!visited[n]) { // здесь гонка
visited[n] = true;
queue.add(n);
queue.add(n); // риск многократного добавления n
}
}
}
Expand Down
13 changes: 13 additions & 0 deletions src/main/java/org/itmo/UnsafeCounter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package org.itmo;

public class UnsafeCounter {
private int counter = 0;

public void increment() {
counter++; // <-- гонка данных
}

public int get() {
return counter;
}
}
Loading