diff --git a/.github/ISSUE_TEMPLATE/bug_report.md b/.github/ISSUE_TEMPLATE/bug_report.md index dd84ea7..3205926 100644 --- a/.github/ISSUE_TEMPLATE/bug_report.md +++ b/.github/ISSUE_TEMPLATE/bug_report.md @@ -12,6 +12,7 @@ A clear and concise description of what the bug is. **To Reproduce** Steps to reproduce the behavior: + 1. Go to '...' 2. Click on '....' 3. Scroll down to '....' @@ -24,15 +25,17 @@ A clear and concise description of what you expected to happen. If applicable, add screenshots to help explain your problem. **Desktop (please complete the following information):** - - OS: [e.g. iOS] - - Browser [e.g. chrome, safari] - - Version [e.g. 22] + +- OS: [e.g. iOS] +- Browser [e.g. chrome, safari] +- Version [e.g. 22] **Smartphone (please complete the following information):** - - Device: [e.g. iPhone6] - - OS: [e.g. iOS8.1] - - Browser [e.g. stock browser, safari] - - Version [e.g. 22] + +- Device: [e.g. iPhone6] +- OS: [e.g. iOS8.1] +- Browser [e.g. stock browser, safari] +- Version [e.g. 22] **Additional context** Add any other context about the problem here. diff --git a/.github/workflows/gradle-test-main.yml b/.github/workflows/gradle-test-main.yml index 537f7ec..ba4a296 100644 --- a/.github/workflows/gradle-test-main.yml +++ b/.github/workflows/gradle-test-main.yml @@ -28,5 +28,6 @@ jobs: with: token: ${{ secrets.CODECOV_TOKEN }} slug: spring-templates/spring-concurrency-thread - fail_ci_if_error: true + flags: integration + fail_ci_if_error: true verbose: true diff --git a/.github/workflows/gradle-test.yml b/.github/workflows/gradle-test.yml index 3f90416..ab1531a 100644 --- a/.github/workflows/gradle-test.yml +++ b/.github/workflows/gradle-test.yml @@ -2,8 +2,9 @@ name: Run gradlew test on: pull_request: - branches-ignore: - - main + branches: + - develop + - feature/** jobs: build: @@ -36,6 +37,6 @@ jobs: with: token: ${{ secrets.CODECOV_TOKEN }} slug: spring-templates/spring-concurrency-thread - fail_ci_if_error: true - verbose: true - flags: unittests + fail_ci_if_error: true + verbose: true + flags: ${{ github.ref == 'refs/pull/develop' && 'integration' || 'unittests' }} diff --git a/.gitignore b/.gitignore index bd3712a..9cfa0cf 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,6 @@ HELP.md -.gradle -build/ +.gradle/* +build/* !gradle/wrapper/gradle-wrapper.jar !**/src/main/**/build/ !**/src/test/**/build/ @@ -18,7 +18,7 @@ bin/ !**/src/test/**/bin/ ### IntelliJ IDEA ### -.idea +.idea/* *.iws *.iml *.ipr diff --git a/README.md b/README.md index 86633b3..9249f1d 100644 --- a/README.md +++ b/README.md @@ -1 +1,50 @@ -# spring-thread-concurrency \ No newline at end of file +[![codecov](https://codecov.io/gh/spring-templates/spring-concurrency-thread/graph/badge.svg?token=N3GEH8C5K7)](https://codecov.io/gh/spring-templates/spring-concurrency-thread) + +![Spring Boot](https://img.shields.io/badge/Spring%20Boot-6DB33F?logo=springboot&logoColor=white) +![Java](https://img.shields.io/badge/Java-ED8B00?logoColor=white) +![Gradle](https://img.shields.io/badge/Gradle-02303A?logo=gradle&logoColor=white) +![JUnit5](https://img.shields.io/badge/JUnit5-25A162?logo=junit5&logoColor=white) +![JaCoCo](https://img.shields.io/badge/JaCoCo-D22128?logo=jacoco&logoColor=white) +![Codecov](https://img.shields.io/badge/Codecov-F01F7A?logo=codecov&logoColor=white) +![GitHub Actions](https://img.shields.io/badge/GitHub%20Actions-2088FF?logo=githubactions&logoColor=white) + +# 관심사 + +- [멀티 스레드의 자원 공유 문제](https://github.com/spring-templates/spring-concurrency-thread/discussions/16) +- [멀티 쓰레드 자원 업데이트 문제](https://github.com/spring-templates/spring-concurrency-thread/discussions/17) + +# 정보 + +- [동시성 기본 조건과 관심사](https://github.com/spring-templates/spring-concurrency-thread/discussions/2) + +# [Counter-implementation Benchmark](https://www.notion.so/softsquared/f314375356b54381a8878cf2dabd381b) + +> - median of 25 iterations +> - nRequests: 2^21 - 1 + +| name | nThreads | time (ms) | memory (KB) | +|-------------------|----------|-----------|-------------| +| AtomicBatch | 4 | 12 | 480 | +| Atomic | 1 | 14 | 318 | +| AtomicBatch | 1 | 30 | 240 | +| Lock | 1 | 61 | 241 | +| Synchronized | 1 | 61 | 241 | +| Polling | 1 | 78 | 463 | +| CompletableFuture | 1 | 158 | 25710 | + +### AtomicBatch vs Atomic + +> - nThreads: AtomicBatch=4, Atomic=1 + +| name | nRequests | time (ms) | memory (KB) | +|-------------|-----------|-----------|-------------| +| AtomicBatch | 2^21 - 1 | 12 | 480 | +| AtomicBatch | 2^22 - 1 | 24 | 538 | +| AtomicBatch | 2^23 - 1 | 42 | 572 | +| AtomicBatch | 2^30 - 1 | 5695 | 511 | +| AtomicBatch | 2^31 - 1 | 11621 | 294 | +| Atomic | 2^21 - 1 | 14 | 318 | +| Atomic | 2^22 - 1 | 27 | 244 | +| Atomic | 2^23 - 1 | 55 | 344 | +| Atomic | 2^30 - 1 | 7178 | 103 | +| Atomic | 2^31 - 1 | 14377 | 266 | diff --git a/codecov.yml b/codecov.yml new file mode 100644 index 0000000..2429953 --- /dev/null +++ b/codecov.yml @@ -0,0 +1,10 @@ +coverage: + status: + project: + default: + target: 40% + threshold: 10% + patch: + default: + target: 30% + threshold: 10% diff --git a/src/main/java/com/thread/concurrency/SpringThreadConcurrencyApplication.java b/src/main/java/com/thread/concurrency/SpringThreadConcurrencyApplication.java index 42b9717..839e4b9 100644 --- a/src/main/java/com/thread/concurrency/SpringThreadConcurrencyApplication.java +++ b/src/main/java/com/thread/concurrency/SpringThreadConcurrencyApplication.java @@ -1,13 +1,17 @@ package com.thread.concurrency; +import com.thread.concurrency.executor.CounterBenchmark; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.ConfigurableApplicationContext; @SpringBootApplication public class SpringThreadConcurrencyApplication { public static void main(String[] args) { - SpringApplication.run(SpringThreadConcurrencyApplication.class, args); + ConfigurableApplicationContext context = SpringApplication.run(SpringThreadConcurrencyApplication.class, args); + var performance = context.getBean(CounterBenchmark.class).benchmark(); + System.out.println("|----------------------|---------------|---------------|---------------|"); + System.out.println(performance); } - } diff --git a/src/main/java/com/thread/concurrency/counter/AtomicCounter.java b/src/main/java/com/thread/concurrency/counter/AtomicCounter.java new file mode 100644 index 0000000..0834634 --- /dev/null +++ b/src/main/java/com/thread/concurrency/counter/AtomicCounter.java @@ -0,0 +1,25 @@ +package com.thread.concurrency.counter; + +import org.springframework.stereotype.Component; + +import java.util.concurrent.atomic.AtomicInteger; + +@Component +public class AtomicCounter implements Counter { + private final AtomicInteger count = new AtomicInteger(100); + + @Override + public void add(int value) { + count.addAndGet(value); + } + + @Override + public int show() { + return count.get(); + } + + @Override + public void clear() { + count.set(0); + } +} diff --git a/src/main/java/com/thread/concurrency/counter/CompletableFutureCounter.java b/src/main/java/com/thread/concurrency/counter/CompletableFutureCounter.java new file mode 100644 index 0000000..2a60a1d --- /dev/null +++ b/src/main/java/com/thread/concurrency/counter/CompletableFutureCounter.java @@ -0,0 +1,39 @@ +package com.thread.concurrency.counter; + +import org.springframework.stereotype.Component; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +@Component +public class CompletableFutureCounter implements Counter { + private CompletableFuture counter; + + public CompletableFutureCounter() { + this.counter = new CompletableFuture<>(); + counter.complete(100); + } + + @Override + public void add(int value) { + synchronized (this) { + counter = counter.thenApply((c) -> c + value); + } + } + + @Override + public int show() { + try { + return counter.get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + + @Override + public void clear() { + synchronized (this) { + counter = CompletableFuture.completedFuture(0); + } + } +} diff --git a/src/main/java/com/thread/concurrency/counter/Counter.java b/src/main/java/com/thread/concurrency/counter/Counter.java new file mode 100644 index 0000000..3b057ab --- /dev/null +++ b/src/main/java/com/thread/concurrency/counter/Counter.java @@ -0,0 +1,9 @@ +package com.thread.concurrency.counter; + +public interface Counter { + void add(int value); + + int show(); + + void clear(); +} diff --git a/src/main/java/com/thread/concurrency/counter/LockCounter.java b/src/main/java/com/thread/concurrency/counter/LockCounter.java new file mode 100644 index 0000000..13607b3 --- /dev/null +++ b/src/main/java/com/thread/concurrency/counter/LockCounter.java @@ -0,0 +1,36 @@ +package com.thread.concurrency.counter; + +import org.springframework.stereotype.Component; + +import java.util.concurrent.locks.ReentrantLock; + +@Component +public class LockCounter implements Counter { + private static final ReentrantLock lock = new ReentrantLock(); + private static int count = 100; + + @Override + public void add(int value) { + lock.lock(); + try { + count += value; + } finally { + lock.unlock(); + } + } + + @Override + public int show() { + return count; + } + + @Override + public void clear() { + lock.lock(); + try { + count = 0; + } finally { + lock.unlock(); + } + } +} diff --git a/src/main/java/com/thread/concurrency/counter/PollingCounter.java b/src/main/java/com/thread/concurrency/counter/PollingCounter.java new file mode 100644 index 0000000..b3cc8fa --- /dev/null +++ b/src/main/java/com/thread/concurrency/counter/PollingCounter.java @@ -0,0 +1,42 @@ +package com.thread.concurrency.counter; + +import org.springframework.stereotype.Component; + +@Component +public class PollingCounter implements Counter { + private static int count = 100; + private static volatile boolean lock = false; + + @Override + public void add(int value) { + while (true) { + if (!lock) { + synchronized (PollingCounter.class) { + lock = true; + count += value; + lock = false; + break; + } + } + } + } + + @Override + public int show() { + return count; + } + + @Override + public void clear() { + while (true) { + if (!lock) { + synchronized (PollingCounter.class) { + lock = true; + count = 0; + lock = false; + break; + } + } + } + } +} diff --git a/src/main/java/com/thread/concurrency/counter/SynchronizedCounter.java b/src/main/java/com/thread/concurrency/counter/SynchronizedCounter.java new file mode 100644 index 0000000..3fc0395 --- /dev/null +++ b/src/main/java/com/thread/concurrency/counter/SynchronizedCounter.java @@ -0,0 +1,24 @@ +package com.thread.concurrency.counter; + +import org.springframework.stereotype.Component; + +@Component +public class SynchronizedCounter implements Counter { + + private int counter = 100; + + @Override + public synchronized void add(int value) { + counter += value; + } + + @Override + public synchronized int show() { + return counter; + } + + @Override + public synchronized void clear() { + counter = 0; + } +} diff --git a/src/main/java/com/thread/concurrency/counter/batch/AtomicBatchCounter.java b/src/main/java/com/thread/concurrency/counter/batch/AtomicBatchCounter.java new file mode 100644 index 0000000..98db876 --- /dev/null +++ b/src/main/java/com/thread/concurrency/counter/batch/AtomicBatchCounter.java @@ -0,0 +1,44 @@ +package com.thread.concurrency.counter.batch; + +import org.springframework.stereotype.Component; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; + +@Component +public class AtomicBatchCounter implements BatchCounter { + private final AtomicLong counter = new AtomicLong(); + private final ConcurrentMap batch = new ConcurrentHashMap<>(); + + @Override + public void add(int value) { + var threadId = Thread.currentThread().threadId(); + batch.computeIfAbsent(threadId, k -> new LongAdder()).add(value); + } + + @Override + public int show() { + return counter.intValue(); + } + + @Override + public void clear() { + counter.set(0); + batch.clear(); + } + + @Override + public void flush() { + var threadId = Thread.currentThread().threadId(); + flush(threadId); + } + + private void flush(long threadId) { + var value = batch.remove(threadId); + if (value != null) { + counter.addAndGet(value.longValue()); + } + } +} diff --git a/src/main/java/com/thread/concurrency/counter/batch/BatchCounter.java b/src/main/java/com/thread/concurrency/counter/batch/BatchCounter.java new file mode 100644 index 0000000..bfd3a77 --- /dev/null +++ b/src/main/java/com/thread/concurrency/counter/batch/BatchCounter.java @@ -0,0 +1,7 @@ +package com.thread.concurrency.counter.batch; + +import com.thread.concurrency.counter.Counter; + +public interface BatchCounter extends Counter { + void flush(); +} diff --git a/src/main/java/com/thread/concurrency/counter/batch/ConcurrentParameterizedBatchingCounter.java b/src/main/java/com/thread/concurrency/counter/batch/ConcurrentParameterizedBatchingCounter.java new file mode 100644 index 0000000..be66936 --- /dev/null +++ b/src/main/java/com/thread/concurrency/counter/batch/ConcurrentParameterizedBatchingCounter.java @@ -0,0 +1,52 @@ +package com.thread.concurrency.counter.batch; + +import org.springframework.context.annotation.Profile; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; + +@Component +@Profile("dev") +public class ConcurrentParameterizedBatchingCounter implements BatchCounter { + private static final int BATCH_SIZE = 100; + private final AtomicLong counter = new AtomicLong(); + private final ConcurrentMap> batch = new ConcurrentHashMap<>(); + + @Override + public void add(int value) { + var threadId = Thread.currentThread().threadId(); + batch.computeIfAbsent(threadId, k -> new ArrayList<>()).add(value); + if (batch.get(threadId).size() >= BATCH_SIZE) { + flush(threadId); + } + } + + @Override + public int show() { + return counter.intValue(); + } + + @Override + public void clear() { + counter.set(0); + batch.clear(); + } + + private void flush(long threadId) { + var list = batch.getOrDefault(threadId, null); + if (list != null && !list.isEmpty()) { + counter.addAndGet(list.stream().mapToLong(Integer::longValue).sum()); + batch.remove(threadId); + } + } + + @Override + public void flush() { + var threadId = Thread.currentThread().threadId(); + flush(threadId); + } +} diff --git a/src/main/java/com/thread/concurrency/counter/queue/Consumer.java b/src/main/java/com/thread/concurrency/counter/queue/Consumer.java new file mode 100644 index 0000000..af19793 --- /dev/null +++ b/src/main/java/com/thread/concurrency/counter/queue/Consumer.java @@ -0,0 +1,12 @@ +package com.thread.concurrency.counter.queue; + +import org.springframework.context.annotation.Profile; + +import java.util.concurrent.TimeUnit; + +@Profile("dev") +public interface Consumer { + void consumeEvent(long timeout, TimeUnit unit) throws InterruptedException; + + Long show(); +} diff --git a/src/main/java/com/thread/concurrency/counter/queue/CounterConsumer.java b/src/main/java/com/thread/concurrency/counter/queue/CounterConsumer.java new file mode 100644 index 0000000..2fc1716 --- /dev/null +++ b/src/main/java/com/thread/concurrency/counter/queue/CounterConsumer.java @@ -0,0 +1,35 @@ +package com.thread.concurrency.counter.queue; + +import org.springframework.context.annotation.Profile; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +@Profile("dev") +public class CounterConsumer implements Consumer { + private final BlockingQueue queue; + private final AtomicLong count = new AtomicLong(0); + + public CounterConsumer(BlockingQueue queue) { + this.queue = queue; + } + + public void consumeEvent(long timeout, TimeUnit unit) throws InterruptedException { + while (true) { + Long value = queue.poll(timeout, unit); + if (value == null) { + break; + } + count.addAndGet(value); + } + } + + public Long show() { + while (true) { + if (queue.isEmpty()) { + return count.get(); + } + } + } +} diff --git a/src/main/java/com/thread/concurrency/counter/queue/CounterProducer.java b/src/main/java/com/thread/concurrency/counter/queue/CounterProducer.java new file mode 100644 index 0000000..66c243a --- /dev/null +++ b/src/main/java/com/thread/concurrency/counter/queue/CounterProducer.java @@ -0,0 +1,18 @@ +package com.thread.concurrency.counter.queue; + +import org.springframework.context.annotation.Profile; + +import java.util.concurrent.BlockingQueue; + +@Profile("dev") +public class CounterProducer implements Producer { + private final BlockingQueue queue; + + public CounterProducer(BlockingQueue queue) { + this.queue = queue; + } + + public void add(long value) throws InterruptedException { + queue.put(value); + } +} diff --git a/src/main/java/com/thread/concurrency/counter/queue/Producer.java b/src/main/java/com/thread/concurrency/counter/queue/Producer.java new file mode 100644 index 0000000..f36b4d6 --- /dev/null +++ b/src/main/java/com/thread/concurrency/counter/queue/Producer.java @@ -0,0 +1,8 @@ +package com.thread.concurrency.counter.queue; + +import org.springframework.context.annotation.Profile; + +@Profile("dev") +public interface Producer { + void add(long value) throws InterruptedException; +} diff --git a/src/main/java/com/thread/concurrency/executor/CounterBenchmark.java b/src/main/java/com/thread/concurrency/executor/CounterBenchmark.java new file mode 100644 index 0000000..398d15e --- /dev/null +++ b/src/main/java/com/thread/concurrency/executor/CounterBenchmark.java @@ -0,0 +1,105 @@ +package com.thread.concurrency.executor; + +import com.thread.concurrency.counter.Counter; +import com.thread.concurrency.counter.batch.BatchCounter; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.function.Consumer; + + +@Component +public class CounterBenchmark { + private final Counter counter; + private final String counterName; + private final int iterations, nThreads, totalRequests; + + @Autowired + public CounterBenchmark(CounterConfiguration.CounterConfig config) { + System.out.println(config); + this.counter = config.counter(); + this.counterName = counter.getClass().getSimpleName(); + this.iterations = config.iterations(); + this.nThreads = config.nThreads(); + this.totalRequests = config.totalRequests(); + } + + public Performance benchmark() { + System.out.printf("| %-20s | %13s | %13s | %13s |%n", "Name", "Time", "Threads", "Memory"); + System.out.println("|----------------------|---------------|---------------|---------------|"); + List performances = new ArrayList<>(); + for (int i = 0; i < iterations; i++) { + var performance = calculateEach(); + performances.add(performance); + System.out.println(performance); + } + return reduce(performances); + } + + private Performance calculateEach() { + List iterPerThread = range(); + Consumer task = (Integer nRequest) -> { + for (int i = 0; i < nRequest; i++) { + counter.add(1); + } + if (counter instanceof BatchCounter batchCounter) { + batchCounter.flush(); + } + }; + System.gc(); + long timeOnStart = System.currentTimeMillis(); + long memoryOnStart = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); + + doAdd(iterPerThread, task); + if (counter.show() != totalRequests) { + System.out.printf("Counter: %d, Total: %d%n", counter.show(), totalRequests); + } + + + long timeOnEnd = System.currentTimeMillis(); + long memoryOnEnd = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); + + + long timeElapsed = timeOnEnd - timeOnStart; + long memoryUsed = memoryOnEnd - memoryOnStart; + return new Performance(counterName, timeElapsed, iterPerThread.size(), memoryUsed); + } + + private void doAdd(List params, Consumer task) { + try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) { + counter.clear(); + List> futures = new ArrayList<>(); + for (int i = 0; i < nThreads; i++) { + int nRequest = params.get(i); + futures.add(CompletableFuture.runAsync(() -> task.accept(nRequest), executor)); + } + futures.forEach(CompletableFuture::join); + } + } + + private List range() { + int baseValue = totalRequests / nThreads; + int remainder = totalRequests % nThreads; + + List result = new ArrayList<>(); + for (int i = 0; i < nThreads; i++) { + var remainderValue = i < remainder ? 1 : 0; + result.add(baseValue + remainderValue); + } + assert result.stream().mapToInt(Integer::intValue).sum() == totalRequests; + return result; + } + + private Performance reduce(List performances) { + performances.sort((a, b) -> (int) (a.time() - b.time())); + long time = performances.get(performances.size() / 2).time(); + performances.sort((a, b) -> (int) (a.memory() - b.memory())); + long memory = performances.get(performances.size() / 2).memory(); + return new Performance(counterName, time, nThreads, memory); + } +} diff --git a/src/main/java/com/thread/concurrency/executor/CounterConfiguration.java b/src/main/java/com/thread/concurrency/executor/CounterConfiguration.java new file mode 100644 index 0000000..a739431 --- /dev/null +++ b/src/main/java/com/thread/concurrency/executor/CounterConfiguration.java @@ -0,0 +1,38 @@ +package com.thread.concurrency.executor; + +import com.thread.concurrency.counter.AtomicCounter; +import com.thread.concurrency.counter.Counter; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + + +@Configuration +public class CounterConfiguration { + + @Bean + public Counter counter() { + return new AtomicCounter(); + } + + @Bean + public CounterConfig counterConfig(Counter counter) { + int iterations = 25; + int totalRequest = Integer.MAX_VALUE / 1024; + int nThreads = 1; + return new CounterConfig(counter, iterations, totalRequest, nThreads); + } + + public record CounterConfig(Counter counter, int iterations, int totalRequests, int nThreads) { + @Override + public String toString() { + // multiple-lines format + return """ + CounterConfig { + counter=%s, + iterations=%d, + totalRequests=%d, + nThreads=%d + }""".formatted(counter.getClass().getSimpleName(), iterations, totalRequests, nThreads).stripIndent(); + } + } +} diff --git a/src/main/java/com/thread/concurrency/executor/Performance.java b/src/main/java/com/thread/concurrency/executor/Performance.java new file mode 100644 index 0000000..b215102 --- /dev/null +++ b/src/main/java/com/thread/concurrency/executor/Performance.java @@ -0,0 +1,8 @@ +package com.thread.concurrency.executor; + +public record Performance(String name, long time, int threads, long memory) { + @Override + public String toString() { + return String.format("| %-20s | %10d ms | %5d threads | %10d KB |", name, time, threads, memory / 1024); + } +} diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index e92628c..4cc2930 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -1 +1,2 @@ spring.application.name=spring-thread-concurrency +spring.profiles.active=default diff --git a/src/test/java/com/thread/concurrency/counter/CounterTest.java b/src/test/java/com/thread/concurrency/counter/CounterTest.java new file mode 100644 index 0000000..d2c3732 --- /dev/null +++ b/src/test/java/com/thread/concurrency/counter/CounterTest.java @@ -0,0 +1,54 @@ +package com.thread.concurrency.counter; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.stream.Stream; + +public class CounterTest { + + public static Stream counterProvider() { + return Stream.of(new LockCounter(), new PollingCounter(), new SynchronizedCounter(), new AtomicCounter(), new CompletableFutureCounter()); + } + + @ParameterizedTest + @MethodSource("counterProvider") + public void stressTest(Counter counter) { + // given + int nThreads = 100; + int addPerThread = 1000; + int expectedValue = counter.show() + nThreads * addPerThread; + + // when + long start = System.currentTimeMillis(); + whenAdd(counter, nThreads, addPerThread); + long end = System.currentTimeMillis(); + + // then + Assertions.assertEquals(expectedValue, counter.show()); + System.out.println("Time elapsed: " + (end - start) + "ms"); + } + + private void whenAdd(Counter counter, int nThreads, int addPerThread) { + try (ExecutorService executor = Executors.newFixedThreadPool(nThreads)) { + for (int i = 0; i < nThreads; i++) { + executor.submit(() -> { + for (int j = 0; j < addPerThread; j++) { + counter.add(1); + } + }); + } + } + } + + @ParameterizedTest + @MethodSource("counterProvider") + public void clearTest(Counter counter) { + counter.add(1000); + counter.clear(); + Assertions.assertEquals(0, counter.show()); + } +} diff --git a/src/test/java/com/thread/concurrency/SpringThreadConcurrencyApplicationTests.java b/src/test/java/com/thread/concurrency/counter/SpringThreadConcurrencyApplicationTests.java similarity index 51% rename from src/test/java/com/thread/concurrency/SpringThreadConcurrencyApplicationTests.java rename to src/test/java/com/thread/concurrency/counter/SpringThreadConcurrencyApplicationTests.java index 675a607..75d7009 100644 --- a/src/test/java/com/thread/concurrency/SpringThreadConcurrencyApplicationTests.java +++ b/src/test/java/com/thread/concurrency/counter/SpringThreadConcurrencyApplicationTests.java @@ -1,15 +1,11 @@ -package com.thread.concurrency; +package com.thread.concurrency.counter; import org.junit.jupiter.api.Test; import org.springframework.boot.test.context.SpringBootTest; -import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; @SpringBootTest class SpringThreadConcurrencyApplicationTests { - @Test void contextLoads() { - assertDoesNotThrow(() -> SpringThreadConcurrencyApplication.main(new String[]{})); } - } diff --git a/src/test/java/com/thread/concurrency/counter/batch/BatchCounterTest.java b/src/test/java/com/thread/concurrency/counter/batch/BatchCounterTest.java new file mode 100644 index 0000000..8482048 --- /dev/null +++ b/src/test/java/com/thread/concurrency/counter/batch/BatchCounterTest.java @@ -0,0 +1,113 @@ +package com.thread.concurrency.counter.batch; + +import com.thread.concurrency.counter.Counter; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +class BatchCounterTest { + + public static Stream batchCounterProvider() { + return Stream.of(new AtomicBatchCounter(), new ConcurrentParameterizedBatchingCounter()); + } + + @ParameterizedTest + @MethodSource("batchCounterProvider") + void clearTest(BatchCounter counter) { + counter.add(1000); + counter.clear(); + Assertions.assertEquals(0, counter.show()); + } + + @ParameterizedTest + @MethodSource("batchCounterProvider") + void singleThreading(BatchCounter counter) { + // given + var numbers = range(); + var partialSum = numbers.stream().reduce(0, Integer::sum); + Runnable task = () -> { + for (Integer number : numbers) { + counter.add(number); + } + counter.flush(); + }; + // when + task.run(); + // then + Assertions.assertEquals(partialSum, counter.show()); + } + + private static List range() { + return IntStream.range(0, 1000).boxed().collect(Collectors.toList()); + } + + @ParameterizedTest + @MethodSource("batchCounterProvider") + void conditionalMultiThreading(BatchCounter counter) { + // given + int numberOfThreads = 2; + int expected = Integer.MAX_VALUE / 1024; + List iterPerThread = range(numberOfThreads, expected); + Consumer task = (Integer number) -> { + for (int i = 0; i < number; i++) { + counter.add(1); + } + counter.flush(); + }; + // when + try (ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads)) { + for (int num : iterPerThread) { + executor.submit(() -> task.accept(num)); + } + } + // then + Assertions.assertEquals(expected, counter.show()); + } + + private static List range(int numberOfThreads, int expected) { + int baseValue = expected / numberOfThreads; + int remainder = expected % numberOfThreads; + + List result = new ArrayList<>(); + for (int i = 0; i < numberOfThreads; i++) { + if (i < remainder) { + result.add(baseValue + 1); + } else { + result.add(baseValue); + } + } + return result; + } + + @ParameterizedTest + @MethodSource("batchCounterProvider") + void conditionalAsyncVirtualMultiThreading(BatchCounter counter) { + // given + int numberOfThreads = 2; + int expected = Integer.MAX_VALUE / 1024; + List iterPerThread = range(numberOfThreads, expected); + Consumer task = (Integer number) -> { + for (int i = 0; i < number; i++) { + counter.add(1); + } + counter.flush(); + }; + // when + try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) { + List> futures = iterPerThread.stream().map(number -> CompletableFuture.runAsync(() -> task.accept(number), executor)).toList(); + futures.forEach(CompletableFuture::join); + } + // then + Assertions.assertEquals(expected, counter.show()); + } +} diff --git a/src/test/java/com/thread/concurrency/counter/executor/CounterBenchmarkTest.java b/src/test/java/com/thread/concurrency/counter/executor/CounterBenchmarkTest.java new file mode 100644 index 0000000..231596c --- /dev/null +++ b/src/test/java/com/thread/concurrency/counter/executor/CounterBenchmarkTest.java @@ -0,0 +1,24 @@ +package com.thread.concurrency.counter.executor; + +import com.thread.concurrency.executor.CounterBenchmark; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; + +@SpringBootTest +public class CounterBenchmarkTest { + + private final CounterBenchmark counterBenchmark; + + @Autowired + public CounterBenchmarkTest(CounterBenchmark counterBenchmark) { + this.counterBenchmark = counterBenchmark; + } + + @Test + void test() { + var performance = counterBenchmark.benchmark(); + Assertions.assertNotNull(performance); + } +} diff --git a/src/test/java/com/thread/concurrency/counter/queue/QueueCounterTest.java b/src/test/java/com/thread/concurrency/counter/queue/QueueCounterTest.java new file mode 100644 index 0000000..d55ab28 --- /dev/null +++ b/src/test/java/com/thread/concurrency/counter/queue/QueueCounterTest.java @@ -0,0 +1,86 @@ +package com.thread.concurrency.counter.queue; + +import org.junit.jupiter.api.*; + +import java.time.Duration; +import java.util.concurrent.*; + +public class QueueCounterTest { + private static final int valueToAdd = 1; + private static final int nAddsPerThread = 100000; + private static final int producerNThreads = 9; + private static final int consumerNThreads = 9; + private Consumer consumer; + private Producer producer; + private ExecutorService consumerExecutor; + private ExecutorService producerExecutor; + + @BeforeEach + public void setup() { + int queueCapacity = 1000; + BlockingQueue queue = new LinkedBlockingQueue<>(queueCapacity); + consumer = new CounterConsumer(queue); + producer = new CounterProducer(queue); + producerExecutor = Executors.newFixedThreadPool(producerNThreads); + consumerExecutor = Executors.newFixedThreadPool(consumerNThreads); + } + + @AfterEach + public void cleanup() { + producerExecutor.shutdown(); + consumerExecutor.shutdown(); + } + + @Test + @SuppressWarnings("SpellCheckingInspection") + @DisplayName("멀티 프로듀서 싱글 컨슈머") + public void multiProducerSingleConsumer() { + Assertions.assertTimeout(Duration.ofSeconds(10), () -> runTest(1)); + } + + private void runTest(int consumerCount) throws InterruptedException { + Long initialCount = consumer.show(); + CountDownLatch producerLatch = new CountDownLatch(producerNThreads); + CountDownLatch consumerLatch = new CountDownLatch(consumerCount); + + createProducerThreads(producerLatch); + createConsumerThreads(consumerLatch, consumerCount); + + consumerLatch.await(); + producerLatch.await(); + + Long finalCount = consumer.show(); + Assertions.assertEquals(initialCount + nAddsPerThread * producerNThreads * valueToAdd, finalCount); + } + + private void createProducerThreads(CountDownLatch producerLatch) { + Callable task = () -> { + for (int j = 0; j < nAddsPerThread; j++) { + producer.add(valueToAdd); + } + producerLatch.countDown(); + return null; + }; + for (int i = 0; i < producerNThreads; i++) { + producerExecutor.submit(task); + } + } + + private void createConsumerThreads(CountDownLatch consumerLatch, int consumerCount) { + Callable task = () -> { + consumer.consumeEvent(1, TimeUnit.SECONDS); + consumerLatch.countDown(); + return null; + }; + for (int i = 0; i < consumerCount; i++) { + consumerExecutor.submit(task); + } + } + + @Test + @SuppressWarnings("SpellCheckingInspection") + @DisplayName("멀티 프로듀서 멀티 컨슈머") + public void multiProducerMultiConsumer() { + Assertions.assertTimeout(Duration.ofSeconds(10), () -> runTest(consumerNThreads)); + } +}