Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
plugins {
id 'java'
id "io.github.reyerizo.gradle.jcstress" version "0.9.0"
}

repositories {
Expand All @@ -24,3 +25,8 @@ test {
}
failFast = true
}

jcstress {
verbose = true
timeMillis = "2000"
}
Binary file added images/stats.PNG
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
86 changes: 86 additions & 0 deletions src/jcstress/java/org/itmo/ParallelBFSQueueTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package org.itmo;

import org.openjdk.jcstress.annotations.*;
import org.openjdk.jcstress.infra.results.*;

import java.util.Queue;
import java.util.Random;
import java.util.concurrent.ConcurrentLinkedQueue;

import org.itmo.*;

@JCStressTest
@Outcome(id = "true", expect = Expect.ACCEPTABLE, desc = "Each vertex is visited exactly once")
@Outcome(id = "false", expect = Expect.FORBIDDEN, desc = "Each vertex might be visited more than once")
@State
public class ParallelBFSQueueTest {
private static final int VERTICES = 20;

private final Graph graph;
private int visitedCount = 0;
private Queue<Integer> currentLevel = new ConcurrentLinkedQueue<>();
private Queue<Integer> nextLevel = new ConcurrentLinkedQueue<>();

public ParallelBFSQueueTest() {
this.graph = new Graph(VERTICES);
for (int i = 1; i < VERTICES; i++) {
this.graph.addEdge(0, i);
}
this.currentLevel.add(0);
}

@Actor
public void actor1() {
Integer vertex;
while ((vertex = currentLevel.poll()) != null) {
graph.visited[vertex].getAndSet(true);
for (int newVertex : graph.getAdjList()[vertex]) {
nextLevel.add(newVertex);
}
}
}

@Actor
public void actor2() {
Integer vertex;
while ((vertex = currentLevel.poll()) != null) {
graph.visited[vertex].getAndSet(true);
for (int newVertex : graph.getAdjList()[vertex]) {
nextLevel.add(newVertex);
}
}
}

@Actor
public void actor3() {
Integer vertex;
while ((vertex = currentLevel.poll()) != null) {
graph.visited[vertex].getAndSet(true);
for (int newVertex : graph.getAdjList()[vertex]) {
nextLevel.add(newVertex);
}
}
}

@Actor
public void actor4() {
Integer vertex;
while ((vertex = currentLevel.poll()) != null) {
graph.visited[vertex].getAndSet(true);
for (int newVertex : graph.getAdjList()[vertex]) {
nextLevel.add(newVertex);
}
}
}

@Arbiter
public void arbiter(L_Result r) {
for (int i = 0; i < VERTICES; i++) {
if (this.graph.visited[i].get() == true) {
this.visitedCount++;
}
}
currentLevel = nextLevel;
r.r1 = visitedCount == 1;
}
}
147 changes: 144 additions & 3 deletions src/main/java/org/itmo/Graph.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,103 @@
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class Graph {
public class Graph {
private final int V;
private final ArrayList<Integer>[] adjList;
AtomicBoolean[] visited;

Graph(int vertices) {
public final ConcurrentLinkedQueue<List<Integer>> globalQueue;
public final ConcurrentLinkedQueue<List<Integer>> workerQueues;
private final ExecutorService executor;
private final int cpus;
private volatile CountDownLatch latch;
private final Lock lock = new ReentrantLock(true);
private final Condition queueReady = this.lock.newCondition();
// private final Condition workersDone = lock.newCondition();
private final AtomicBoolean isFinished = new AtomicBoolean(false);
private final int INTERPROCESS_BATCH = 50;

public ArrayList<Integer>[] getAdjList() {
return this.adjList;
}

public Graph(int vertices) {

this.V = vertices;
this.visited = new AtomicBoolean[V];
adjList = new ArrayList[vertices];
for (int i = 0; i < V; i++) {
visited[i] = new AtomicBoolean(false);
}
for (int i = 0; i < vertices; ++i) {
adjList[i] = new ArrayList<>();
}

// this.cpus = Runtime.getRuntime().availableProcessors();
this.cpus = 6;
this.executor = new ThreadPoolExecutor(
cpus, cpus, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
new ThreadPoolExecutor.CallerRunsPolicy()
);
this.globalQueue = new ConcurrentLinkedQueue<>();
this.workerQueues = new ConcurrentLinkedQueue<>();
// this.levelSemaphore = new Semaphore(cpus, true);
this.latch = new CountDownLatch(cpus);
}

class BFSRunnable implements Runnable {
private final int workerId;
private final Graph graph;

public BFSRunnable(Graph graph, int workerId) {
this.graph = graph;
this.workerId = workerId;
}

@Override
public void run() {
graph.latch.countDown();
// while for each level of graph
while (!graph.isFinished.get()) {
List<Integer> vertices = null;
try {
graph.lock.lock();
graph.queueReady.await();
vertices = graph.globalQueue.poll();
} catch (InterruptedException e) {
System.out.println("Thread " + this.workerId + " is interrupted");
break;
} finally {
graph.lock.unlock();
}

List<Integer> newVertices = new ArrayList<>();
while (vertices != null) {

for (Integer vertice : vertices) {
for (int n : graph.adjList[vertice]) {
if (!visited[n].getAndSet(true)) {
newVertices.add(n);
}
if (INTERPROCESS_BATCH == newVertices.size()) {
graph.workerQueues.add(new ArrayList<>(newVertices));
newVertices.clear();
}
}
}
if (!newVertices.isEmpty()) {
graph.workerQueues.add(newVertices);
}
vertices = graph.globalQueue.poll();
}
graph.latch.countDown();
}
}
}

void addEdge(int src, int dest) {
Expand All @@ -24,6 +109,62 @@ void addEdge(int src, int dest) {
}

void parallelBFS(int startVertex) {
try {
lock.lock();
for (int i = 0; i < this.cpus; i++) {
executor.execute(new BFSRunnable(this, i));
}
visited[startVertex].set(true);
List<Integer> initial = new ArrayList<>();
for (int v : adjList[startVertex]) {
visited[v].getAndSet(true);
initial.add(v);
}
this.globalQueue.add(initial);
} finally {
lock.unlock();
}
// wait for workers to start
try {
this.latch.await();
} catch (InterruptedException e) {
System.out.println("NOT SYNCED");
} finally {
this.latch = new CountDownLatch(this.cpus);
}

while (!globalQueue.isEmpty()) {
lock.lock();
try {
queueReady.signalAll();
System.out.println("Global queue before latch: " + globalQueue.size());
} finally {
lock.unlock();
}

try {
latch.await();
lock.lock();
globalQueue.addAll(workerQueues);
workerQueues.clear();
System.out.println("Global queue after latch: " + globalQueue.size());
latch = new CountDownLatch(cpus);
} catch (InterruptedException e) {
System.out.println("MAIN GOES WRONG!!!");
} finally {
lock.unlock();
}
}
isFinished.set(true);

lock.lock();
try {
queueReady.signalAll();
} finally {
lock.unlock();
}

executor.shutdownNow();
}

//Generated by ChatGPT
Expand Down