diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..13566b8 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Editor-based HTTP Client requests +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/.idea/.name b/.idea/.name new file mode 100644 index 0000000..ec463ac --- /dev/null +++ b/.idea/.name @@ -0,0 +1 @@ +leet \ No newline at end of file diff --git a/.idea/gradle.xml b/.idea/gradle.xml new file mode 100644 index 0000000..f9163b4 --- /dev/null +++ b/.idea/gradle.xml @@ -0,0 +1,15 @@ + + + + + + \ No newline at end of file diff --git a/.idea/kotlinc.xml b/.idea/kotlinc.xml new file mode 100644 index 0000000..e805548 --- /dev/null +++ b/.idea/kotlinc.xml @@ -0,0 +1,6 @@ + + + + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..f79e5d5 --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,5 @@ + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..35eb1dd --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/build.gradle.kts b/build.gradle.kts index 3341beb..f7bf52b 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -1,5 +1,6 @@ plugins { kotlin("jvm") version "1.9.20" + java application } @@ -12,6 +13,9 @@ repositories { dependencies { testImplementation(kotlin("test")) + testImplementation("org.openjdk.jcstress:jcstress-core:0.16") + testAnnotationProcessor("org.openjdk.jcstress:jcstress-core:0.16") + implementation("org.knowm.xchart:xchart:3.8.7") } tasks.test { @@ -24,4 +28,20 @@ kotlin { application { mainClass.set("MainKt") -} \ No newline at end of file +} + +// JCStress runner task: runs JCStress tests located on the test runtime classpath +// Use: ./gradlew jcstress [-PjcstressArgs="-v -m quick"] +tasks.register("jcstress") { + group = "verification" + description = "Run JCStress stress tests" + mainClass.set("org.openjdk.jcstress.Main") + classpath = sourceSets.test.get().runtimeClasspath + dependsOn("testClasses") + + val argsProp = project.findProperty("jcstressArgs") as String? + if (!argsProp.isNullOrBlank()) { + args = argsProp.split("\\s+".toRegex()) + } +} + diff --git a/charts/parallel_time_vs_threads.png b/charts/parallel_time_vs_threads.png new file mode 100644 index 0000000..c8988b8 Binary files /dev/null and b/charts/parallel_time_vs_threads.png differ diff --git a/charts/size_vs_time.png b/charts/size_vs_time.png new file mode 100644 index 0000000..c60287c Binary files /dev/null and b/charts/size_vs_time.png differ diff --git a/jcstress-results-2025-12-23-11-14-02.bin.gz b/jcstress-results-2025-12-23-11-14-02.bin.gz new file mode 100644 index 0000000..5105e8b Binary files /dev/null and b/jcstress-results-2025-12-23-11-14-02.bin.gz differ diff --git a/src/main/java/org/itmo/Graph.java b/src/main/java/org/itmo/Graph.java index 141a0b6..33af77c 100644 --- a/src/main/java/org/itmo/Graph.java +++ b/src/main/java/org/itmo/Graph.java @@ -1,9 +1,11 @@ package org.itmo; import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicIntegerArray; class Graph { private final int V; @@ -23,9 +25,210 @@ void addEdge(int src, int dest) { } } - void parallelBFS(int startVertex) { + void parallelBFS(int startVertex, Integer threadsCount) { + int numThreads = (threadsCount == null) ? Runtime.getRuntime().availableProcessors() : threadsCount; + AtomicIntegerArray visited = new AtomicIntegerArray(V); + ExecutorService pool = Executors.newFixedThreadPool(numThreads); + ConcurrentLinkedQueue frontier = new ConcurrentLinkedQueue<>(); + + for (int v = 0; v < V; v++) { + visited.set(v, 0); + } + + frontier.add(startVertex); + visited.set(startVertex, 1); + + try { + while (!frontier.isEmpty()) { + int tasks = Math.min(numThreads, frontier.size()); + CountDownLatch latch = new CountDownLatch(tasks); + List[] nextFrontiers = new ArrayList[tasks]; + + for (int t = 0; t < tasks; t++) { + final int taskId = t; + pool.submit(() -> { + List nextLevel = new ArrayList<>(); + try { + Integer node; + while ((node = frontier.poll()) != null) { + for (int neighbor : adjList[node]) { + if (visited.get(neighbor) == 0 && visited.compareAndSet(neighbor, 0, 1)) { + nextLevel.add(neighbor); + } + } + } + } finally { + nextFrontiers[taskId] = nextLevel; + latch.countDown(); + } + }); + } + + try { + latch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } + + for (List level : nextFrontiers) { + frontier.addAll(level); + } + } + } finally { + pool.shutdown(); + } } + void parallelBFSUnsafe(int startVertex, Integer threadsCount) { + int numThreads = (threadsCount == null) ? Runtime.getRuntime().availableProcessors() : threadsCount; + AtomicIntegerArray visited = new AtomicIntegerArray(V); + ExecutorService pool = Executors.newFixedThreadPool(numThreads); + ConcurrentLinkedQueue frontier = new ConcurrentLinkedQueue<>(); + + for (int v = 0; v < V; v++) visited.set(v, 0); + + frontier.add(startVertex); + visited.set(startVertex, 1); + + try { + while (!frontier.isEmpty()) { + int tasks = Math.min(numThreads, frontier.size()); + CountDownLatch latch = new CountDownLatch(tasks); + List[] nextFrontiers = new ArrayList[tasks]; + + for (int t = 0; t < tasks; t++) { + final int taskId = t; + pool.submit(() -> { + List nextLevel = new ArrayList<>(); + try { + Integer node; + while ((node = frontier.poll()) != null) { + for (int neighbor : adjList[node]) { + if (visited.get(neighbor) == 0 && visited.compareAndSet(neighbor, 0, 1)) { + nextLevel.add(neighbor); + } + } + } + } finally { + nextFrontiers[taskId] = nextLevel; + latch.countDown(); + } + }); + } + + try { + latch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } + + for (List level : nextFrontiers) frontier.addAll(level); + } + } finally { + pool.shutdown(); + } + } + +// void parallelBFS(int startVertex) { +// int numThreads = Runtime.getRuntime().availableProcessors(); +// ExecutorService executor = Executors.newFixedThreadPool(numThreads); +// +// AtomicBoolean[] visited = new AtomicBoolean[V]; +// for (int i = 0; i < V; i++) +// visited[i] = new AtomicBoolean(false); +// +// LevelQueues levelQueues = new LevelQueues(); +// +// visited[startVertex].set(true); +// levelQueues.current.add(startVertex); +// +// CyclicBarrier barrier = new CyclicBarrier(numThreads + 1); // +1 — главный поток +// AtomicInteger remaining = new AtomicInteger(); +// +// for (int t = 0; t < numThreads; t++) { +// executor.submit(() -> { +// while (true) { +// try { +// barrier.await(); +// } catch (Exception e) { +// System.out.println("barrier.await broken"); +// } +// +// if (levelQueues.current.isEmpty()) +// barrier.await(); +// +// Integer v; +// while ((v = levelQueues.current.poll()) != null) { +// for (int n : adjList[v]) { +// if (visited[n].compareAndSet(false, true)) { +// levelQueues.next.add(n); +// } +// } +// } +// +// if (remaining.decrementAndGet() == 0) { +// System.out.println("Level process ended"); +// } +// +// try { +// barrier.await(); +// } catch (Exception ignored) { +// } +// } +// }); +// } +// +// try { +// while (!levelQueues.current.isEmpty()) { +// remaining.set(numThreads); +// barrier.await(); +// barrier.await(); +// +// levelQueues.current = levelQueues.next; +// levelQueues.next = new ConcurrentLinkedQueue<>(); +// } +// } catch (Exception e) { +// System.out.println("main process barrier.await broken"); +// } +// +// executor.shutdownNow(); +// } +// +// void parallelBFS_2(int startVertex) { +// ExecutorService pool = Executors.newFixedThreadPool( +// Runtime.getRuntime().availableProcessors()); +// +// AtomicBoolean[] visited = new AtomicBoolean[V]; +// for (int i = 0; i < V; i++) +// visited[i] = new AtomicBoolean(false); +// +// ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue<>(); +// queue.add(startVertex); +// visited[startVertex].set(true); +// +// while (!queue.isEmpty()) { +// Integer v = queue.poll(); +// if (v == null) continue; +// +// CompletableFuture.runAsync(() -> { +// for (int n : adjList[v]) { +// if (visited[n].compareAndSet(false, true)) { +// queue.add(n); +// } +// } +// }, pool); +// } +// +// pool.shutdown(); +// } +// +// private class LevelQueues { +// public ConcurrentLinkedQueue current = new ConcurrentLinkedQueue<>(); +// public ConcurrentLinkedQueue next = new ConcurrentLinkedQueue<>(); +// } + //Generated by ChatGPT void bfs(int startVertex) { boolean[] visited = new boolean[V]; @@ -36,12 +239,12 @@ void bfs(int startVertex) { queue.add(startVertex); while (!queue.isEmpty()) { - startVertex = queue.poll(); + startVertex = queue.poll(); // конкуренция за queue for (int n : adjList[startVertex]) { - if (!visited[n]) { + if (!visited[n]) { // здесь гонка visited[n] = true; - queue.add(n); + queue.add(n); // риск многократного добавления n } } } diff --git a/src/main/java/org/itmo/UnsafeCounter.java b/src/main/java/org/itmo/UnsafeCounter.java new file mode 100644 index 0000000..1041a21 --- /dev/null +++ b/src/main/java/org/itmo/UnsafeCounter.java @@ -0,0 +1,13 @@ +package org.itmo; + +public class UnsafeCounter { + private int counter = 0; + + public void increment() { + counter++; // <-- гонка данных + } + + public int get() { + return counter; + } +} diff --git a/src/test/java/org/itmo/BFSTest.java b/src/test/java/org/itmo/BFSTest.java index 7bf9098..6e63fda 100644 --- a/src/test/java/org/itmo/BFSTest.java +++ b/src/test/java/org/itmo/BFSTest.java @@ -1,11 +1,19 @@ package org.itmo; import org.junit.jupiter.api.Test; +import org.knowm.xchart.BitmapEncoder; +import org.knowm.xchart.XYChart; +import org.knowm.xchart.XYChartBuilder; +import org.knowm.xchart.XYSeries; +import org.knowm.xchart.style.markers.SeriesMarkers; +import java.io.File; import java.io.FileWriter; import java.io.IOException; import java.nio.Buffer; +import java.util.ArrayList; import java.util.HashSet; +import java.util.List; import java.util.Random; import java.util.function.BiFunction; import java.util.stream.IntStream; @@ -16,6 +24,12 @@ public class BFSTest { public void bfsTest() throws IOException { int[] sizes = new int[]{10, 100, 1000, 10_000, 10_000, 50_000, 100_000, 1_000_000, 2_000_000}; int[] connections = new int[]{50, 500, 5000, 50_000, 100_000, 1_000_000, 1_000_000, 10_000_000, 10_000_000}; + + new File("charts").mkdirs(); + + List sizesList = new ArrayList<>(); + List serialTimes = new ArrayList<>(); + List parallelTimes = new ArrayList<>(); Random r = new Random(42); try (FileWriter fw = new FileWriter("tmp/results.txt")) { for (int i = 0; i < sizes.length; i++) { @@ -24,7 +38,10 @@ public void bfsTest() throws IOException { Graph g = new RandomGraphGenerator().generateGraph(r, sizes[i], connections[i]); System.out.println("Generation completed!\nStarting bfs"); long serialTime = executeSerialBfsAndGetTime(g); - long parallelTime = executeParallelBfsAndGetTime(g); + long parallelTime = executeParallelBfsAndGetTime(g, null); + sizesList.add(sizes[i]); + serialTimes.add(serialTime); + parallelTimes.add(parallelTime); fw.append("Times for " + sizes[i] + " vertices and " + connections[i] + " connections: "); fw.append("\nSerial: " + serialTime); fw.append("\nParallel: " + parallelTime); @@ -32,6 +49,25 @@ public void bfsTest() throws IOException { } fw.flush(); } + int threadsTestIndex = Math.max(6, sizes.length - 1); + int baseSize = sizes[threadsTestIndex]; + int baseConn = connections[threadsTestIndex]; + + int[] threadCounts = new int[]{1, 2, 4, 8, 16, 32, 64}; + List threadsX = new ArrayList<>(); + List parallelByThreads = new ArrayList<>(); + + Graph gThreads = new RandomGraphGenerator().generateGraph(r, baseSize, baseConn); + + for (int tc : threadCounts) { + long t = executeParallelBfsAndGetTime(gThreads, tc); + threadsX.add(tc); + parallelByThreads.add(t); + System.out.println("Threads=" + tc + " -> parallel time " + t + " ms"); + } + + saveSizeChart(sizesList, serialTimes, parallelTimes, "charts/size_vs_time.png"); + saveThreadsChart(threadsX, parallelByThreads, baseSize, baseConn, "charts/parallel_time_vs_threads.png"); } @@ -42,11 +78,50 @@ private long executeSerialBfsAndGetTime(Graph g) { return endTime - startTime; } - private long executeParallelBfsAndGetTime(Graph g) { + private long executeParallelBfsAndGetTime(Graph g, Integer threads) { long startTime = System.currentTimeMillis(); - g.parallelBFS(0); + g.parallelBFS(0, threads); long endTime = System.currentTimeMillis(); return endTime - startTime; } + private void saveSizeChart(List sizes, List serialTimes, List parallelTimes, String outPath) throws IOException { + + double[] x = sizes.stream().mapToDouble(Integer::doubleValue).toArray(); + double[] ySerial = serialTimes.stream().mapToDouble(Long::doubleValue).toArray(); + double[] yParallel = parallelTimes.stream().mapToDouble(Long::doubleValue).toArray(); + + XYChart chart = new XYChartBuilder() + .width(900).height(600) + .title("Время выполнения vs размер входных данных") + .xAxisTitle("Количество вершин") + .yAxisTitle("Время (мс)") + .build(); + + XYSeries s1 = chart.addSeries("Serial", x, ySerial); + s1.setMarker(SeriesMarkers.CIRCLE); + + XYSeries s2 = chart.addSeries("Parallel", x, yParallel); + s2.setMarker(SeriesMarkers.DIAMOND); + + BitmapEncoder.saveBitmap(chart, outPath, BitmapEncoder.BitmapFormat.PNG); + } + + private void saveThreadsChart(List threads, List times, + int baseSize, int baseConn, String outPath) throws IOException { + double[] xThreads = threads.stream().mapToDouble(Integer::doubleValue).toArray(); + double[] yTimes = times.stream().mapToDouble(Long::doubleValue).toArray(); + + XYChart chart = new XYChartBuilder() + .width(900).height(600) + .title("Параллельный BFS: время vs число потоков (size=" + baseSize + ", edges≈" + baseConn + ")") + .xAxisTitle("Число потоков") + .yAxisTitle("Время (мс)") + .build(); + + XYSeries s = chart.addSeries("Parallel", xThreads, yTimes); + s.setMarker(SeriesMarkers.SQUARE); + + BitmapEncoder.saveBitmap(chart, outPath, BitmapEncoder.BitmapFormat.PNG); + } } diff --git a/src/test/java/org/itmo/RandomGraphGenerator.java b/src/test/java/org/itmo/RandomGraphGenerator.java index fdb888c..1a57226 100644 --- a/src/test/java/org/itmo/RandomGraphGenerator.java +++ b/src/test/java/org/itmo/RandomGraphGenerator.java @@ -1,7 +1,9 @@ package org.itmo; import java.util.Arrays; +import java.util.HashSet; import java.util.Random; +import java.util.Set; import java.util.SplittableRandom; import java.util.concurrent.ForkJoinPool; import java.util.stream.IntStream; @@ -11,22 +13,27 @@ public class RandomGraphGenerator { private long pack(int u, int v) { return (((long) u) << 32) | (v & 0xffffffffL); } + private int unpackU(long key) { return (int) (key >>> 32); } + private int unpackV(long key) { return (int) (key & 0xffffffffL); } Graph generateGraph(Random r, int size, int numEdges) { + if (size < 1) throw new IllegalArgumentException("size must be >= 1"); if (numEdges < size - 1) throw new IllegalArgumentException("We need min size-1 edges"); long maxDirected = (long) size * (size - 1); if (numEdges > maxDirected) throw new IllegalArgumentException("Too many edges for directed graph without self-loops"); - int[] perm = java.util.stream.IntStream.range(0, size).toArray(); - for (int i = size - 1; i > 1; i--) { - int j = 1 + r.nextInt(i); - int tmp = perm[i]; perm[i] = perm[j]; perm[j] = tmp; + int[] perm = IntStream.range(0, size).toArray(); + for (int i = size - 1; i > 0; i--) { + int j = r.nextInt(i + 1); + int tmp = perm[i]; + perm[i] = perm[j]; + perm[j] = tmp; } final int chainCount = size - 1; @@ -74,7 +81,7 @@ Graph generateGraph(Random r, int size, int numEdges) { while (unique < numEdges) { int missing = numEdges - unique; - int extra = Math.max(missing / 2, 10_000); // небольшой запас + int extra = Math.max(missing / 2, 10_000); int add = missing + extra; long[] more = new long[unique + add]; @@ -109,6 +116,31 @@ Graph generateGraph(Random r, int size, int numEdges) { keys = more; } + Set chainSet = new HashSet<>(chainCount * 2); + for (int i = 1; i < size; i++) { + chainSet.add(pack(perm[i - 1], perm[i])); + } + + int p = 0; + for (int i = 0; i < unique && p < chainCount; i++) { + long e = keys[i]; + if (chainSet.remove(e)) { + // swap keys[p] и keys[i] + long tmp = keys[p]; + keys[p] = keys[i]; + keys[i] = tmp; + p++; + } + } + + SplittableRandom shuf = base.split(); + for (int i = p; i < numEdges; i++) { + int j = i + shuf.nextInt(unique - i); + long tmp = keys[i]; + keys[i] = keys[j]; + keys[j] = tmp; + } + Graph g = new Graph(size); for (int i = 0; i < numEdges; i++) { long key = keys[i]; @@ -118,5 +150,4 @@ Graph generateGraph(Random r, int size, int numEdges) { } return g; } - -} +} \ No newline at end of file diff --git a/src/test/java/org/itmo/UnsafeCounterTest.java b/src/test/java/org/itmo/UnsafeCounterTest.java new file mode 100644 index 0000000..a831605 --- /dev/null +++ b/src/test/java/org/itmo/UnsafeCounterTest.java @@ -0,0 +1,27 @@ +package org.itmo; + +import org.openjdk.jcstress.annotations.*; +import org.openjdk.jcstress.infra.results.I_Result; + +@JCStressTest +@Outcome(id = "5", expect = Expect.ACCEPTABLE, desc = "Все 5 инкрементов выполнены корректно") +@Outcome(id = "1", expect = Expect.ACCEPTABLE_INTERESTING, desc = "Гонка данных: часть инкрементов потерялась") +@Outcome(id = "2", expect = Expect.ACCEPTABLE_INTERESTING, desc = "Гонка данных: часть инкрементов потерялась") +@Outcome(id = "3", expect = Expect.ACCEPTABLE_INTERESTING, desc = "Гонка данных: часть инкрементов потерялась") +@Outcome(id = "4", expect = Expect.ACCEPTABLE_INTERESTING, desc = "Гонка данных: часть инкрементов потерялась") +@State +public class UnsafeCounterTest { + + private UnsafeCounter counter = new UnsafeCounter(); + + @Actor public void actor1() { counter.increment(); } + @Actor public void actor2() { counter.increment(); } + @Actor public void actor3() { counter.increment(); } + @Actor public void actor4() { counter.increment(); } + @Actor public void actor5() { counter.increment(); } + + @Arbiter + public void arbiter(I_Result r) { + r.r1 = counter.get(); + } +} diff --git a/tmp/results.txt b/tmp/results.txt index 027e7f9..865fafc 100644 --- a/tmp/results.txt +++ b/tmp/results.txt @@ -1,32 +1,36 @@ Times for 10 vertices and 50 connections: Serial: 0 -Parallel: 0 +Parallel: 4 -------- Times for 100 vertices and 500 connections: Serial: 0 -Parallel: 0 +Parallel: 2 -------- Times for 1000 vertices and 5000 connections: Serial: 1 -Parallel: 0 +Parallel: 4 -------- Times for 10000 vertices and 50000 connections: -Serial: 3 -Parallel: 0 +Serial: 5 +Parallel: 13 -------- Times for 10000 vertices and 100000 connections: Serial: 2 -Parallel: 0 +Parallel: 22 -------- Times for 50000 vertices and 1000000 connections: -Serial: 30 -Parallel: 0 +Serial: 17 +Parallel: 17 -------- Times for 100000 vertices and 1000000 connections: -Serial: 18 -Parallel: 0 +Serial: 34 +Parallel: 27 -------- Times for 1000000 vertices and 10000000 connections: -Serial: 307 -Parallel: 0 +Serial: 416 +Parallel: 947 +-------- +Times for 2000000 vertices and 10000000 connections: +Serial: 2196 +Parallel: 1787 --------