diff --git a/build.gradle b/build.gradle index 568439c..8a3c45d 100644 --- a/build.gradle +++ b/build.gradle @@ -1,5 +1,6 @@ plugins { id 'java' + id "io.github.reyerizo.gradle.jcstress" version "0.9.0" } repositories { @@ -24,3 +25,8 @@ test { } failFast = true } + +jcstress { + verbose = true + timeMillis = "2000" +} diff --git a/images/stats.PNG b/images/stats.PNG new file mode 100644 index 0000000..de354c2 Binary files /dev/null and b/images/stats.PNG differ diff --git a/src/jcstress/java/org/itmo/ParallelBFSQueueTest.java b/src/jcstress/java/org/itmo/ParallelBFSQueueTest.java new file mode 100644 index 0000000..dcd260f --- /dev/null +++ b/src/jcstress/java/org/itmo/ParallelBFSQueueTest.java @@ -0,0 +1,86 @@ +package org.itmo; + +import org.openjdk.jcstress.annotations.*; +import org.openjdk.jcstress.infra.results.*; + +import java.util.Queue; +import java.util.Random; +import java.util.concurrent.ConcurrentLinkedQueue; + +import org.itmo.*; + +@JCStressTest +@Outcome(id = "true", expect = Expect.ACCEPTABLE, desc = "Each vertex is visited exactly once") +@Outcome(id = "false", expect = Expect.FORBIDDEN, desc = "Each vertex might be visited more than once") +@State +public class ParallelBFSQueueTest { + private static final int VERTICES = 20; + + private final Graph graph; + private int visitedCount = 0; + private Queue currentLevel = new ConcurrentLinkedQueue<>(); + private Queue nextLevel = new ConcurrentLinkedQueue<>(); + + public ParallelBFSQueueTest() { + this.graph = new Graph(VERTICES); + for (int i = 1; i < VERTICES; i++) { + this.graph.addEdge(0, i); + } + this.currentLevel.add(0); + } + + @Actor + public void actor1() { + Integer vertex; + while ((vertex = currentLevel.poll()) != null) { + graph.visited[vertex].getAndSet(true); + for (int newVertex : graph.getAdjList()[vertex]) { + nextLevel.add(newVertex); + } + } + } + + @Actor + public void actor2() { + Integer vertex; + while ((vertex = currentLevel.poll()) != null) { + graph.visited[vertex].getAndSet(true); + for (int newVertex : graph.getAdjList()[vertex]) { + nextLevel.add(newVertex); + } + } + } + + @Actor + public void actor3() { + Integer vertex; + while ((vertex = currentLevel.poll()) != null) { + graph.visited[vertex].getAndSet(true); + for (int newVertex : graph.getAdjList()[vertex]) { + nextLevel.add(newVertex); + } + } + } + + @Actor + public void actor4() { + Integer vertex; + while ((vertex = currentLevel.poll()) != null) { + graph.visited[vertex].getAndSet(true); + for (int newVertex : graph.getAdjList()[vertex]) { + nextLevel.add(newVertex); + } + } + } + + @Arbiter + public void arbiter(L_Result r) { + for (int i = 0; i < VERTICES; i++) { + if (this.graph.visited[i].get() == true) { + this.visitedCount++; + } + } + currentLevel = nextLevel; + r.r1 = visitedCount == 1; + } +} diff --git a/src/main/java/org/itmo/Graph.java b/src/main/java/org/itmo/Graph.java index 141a0b6..41a10e8 100644 --- a/src/main/java/org/itmo/Graph.java +++ b/src/main/java/org/itmo/Graph.java @@ -3,18 +3,103 @@ import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; -class Graph { +public class Graph { private final int V; private final ArrayList[] adjList; + AtomicBoolean[] visited; - Graph(int vertices) { + public final ConcurrentLinkedQueue> globalQueue; + public final ConcurrentLinkedQueue> workerQueues; + private final ExecutorService executor; + private final int cpus; + private volatile CountDownLatch latch; + private final Lock lock = new ReentrantLock(true); + private final Condition queueReady = this.lock.newCondition(); + // private final Condition workersDone = lock.newCondition(); + private final AtomicBoolean isFinished = new AtomicBoolean(false); + private final int INTERPROCESS_BATCH = 50; + + public ArrayList[] getAdjList() { + return this.adjList; + } + + public Graph(int vertices) { + this.V = vertices; + this.visited = new AtomicBoolean[V]; adjList = new ArrayList[vertices]; + for (int i = 0; i < V; i++) { + visited[i] = new AtomicBoolean(false); + } for (int i = 0; i < vertices; ++i) { adjList[i] = new ArrayList<>(); } + + // this.cpus = Runtime.getRuntime().availableProcessors(); + this.cpus = 6; + this.executor = new ThreadPoolExecutor( + cpus, cpus, 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>(), + new ThreadPoolExecutor.CallerRunsPolicy() + ); + this.globalQueue = new ConcurrentLinkedQueue<>(); + this.workerQueues = new ConcurrentLinkedQueue<>(); + // this.levelSemaphore = new Semaphore(cpus, true); + this.latch = new CountDownLatch(cpus); + } + + class BFSRunnable implements Runnable { + private final int workerId; + private final Graph graph; + + public BFSRunnable(Graph graph, int workerId) { + this.graph = graph; + this.workerId = workerId; + } + + @Override + public void run() { + graph.latch.countDown(); + // while for each level of graph + while (!graph.isFinished.get()) { + List vertices = null; + try { + graph.lock.lock(); + graph.queueReady.await(); + vertices = graph.globalQueue.poll(); + } catch (InterruptedException e) { + System.out.println("Thread " + this.workerId + " is interrupted"); + break; + } finally { + graph.lock.unlock(); + } + + List newVertices = new ArrayList<>(); + while (vertices != null) { + + for (Integer vertice : vertices) { + for (int n : graph.adjList[vertice]) { + if (!visited[n].getAndSet(true)) { + newVertices.add(n); + } + if (INTERPROCESS_BATCH == newVertices.size()) { + graph.workerQueues.add(new ArrayList<>(newVertices)); + newVertices.clear(); + } + } + } + if (!newVertices.isEmpty()) { + graph.workerQueues.add(newVertices); + } + vertices = graph.globalQueue.poll(); + } + graph.latch.countDown(); + } + } } void addEdge(int src, int dest) { @@ -24,6 +109,62 @@ void addEdge(int src, int dest) { } void parallelBFS(int startVertex) { + try { + lock.lock(); + for (int i = 0; i < this.cpus; i++) { + executor.execute(new BFSRunnable(this, i)); + } + visited[startVertex].set(true); + List initial = new ArrayList<>(); + for (int v : adjList[startVertex]) { + visited[v].getAndSet(true); + initial.add(v); + } + this.globalQueue.add(initial); + } finally { + lock.unlock(); + } + // wait for workers to start + try { + this.latch.await(); + } catch (InterruptedException e) { + System.out.println("NOT SYNCED"); + } finally { + this.latch = new CountDownLatch(this.cpus); + } + + while (!globalQueue.isEmpty()) { + lock.lock(); + try { + queueReady.signalAll(); + System.out.println("Global queue before latch: " + globalQueue.size()); + } finally { + lock.unlock(); + } + + try { + latch.await(); + lock.lock(); + globalQueue.addAll(workerQueues); + workerQueues.clear(); + System.out.println("Global queue after latch: " + globalQueue.size()); + latch = new CountDownLatch(cpus); + } catch (InterruptedException e) { + System.out.println("MAIN GOES WRONG!!!"); + } finally { + lock.unlock(); + } + } + isFinished.set(true); + + lock.lock(); + try { + queueReady.signalAll(); + } finally { + lock.unlock(); + } + + executor.shutdownNow(); } //Generated by ChatGPT