From b0222f82598a504ae8b498914eeda66155556ec5 Mon Sep 17 00:00:00 2001 From: ohchansol Date: Tue, 26 Mar 2024 18:03:16 +0900 Subject: [PATCH 01/11] =?UTF-8?q?=E2=9C=A8=20Feat=20:=20=EC=83=88=EB=A1=9C?= =?UTF-8?q?=EC=9A=B4=20=EA=B8=B0=EB=8A=A5=20=E2=9C=85=20Test=20:=20?= =?UTF-8?q?=ED=85=8C=EC=8A=A4=ED=8A=B8=20=EC=BD=94=EB=93=9C=20=EC=B6=94?= =?UTF-8?q?=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. 비동기 서비스 클래스 생성 2. 생성한 메서드에 대한 기본 테스트 작성 1. 여러 입출력 상황에 맞게 비동기 메서드 호출과 비동기 객체 병합 방법 강구 --- .gitignore | 6 +-- .../async/service/AsyncService.java | 21 +++++++++++ .../thread/concurrency/AsyncServiceTest.java | 37 +++++++++++++++++++ 3 files changed, 61 insertions(+), 3 deletions(-) create mode 100644 src/main/java/com/thread/concurrency/async/service/AsyncService.java create mode 100644 src/test/java/com/thread/concurrency/AsyncServiceTest.java diff --git a/.gitignore b/.gitignore index bd3712a..9cfa0cf 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,6 @@ HELP.md -.gradle -build/ +.gradle/* +build/* !gradle/wrapper/gradle-wrapper.jar !**/src/main/**/build/ !**/src/test/**/build/ @@ -18,7 +18,7 @@ bin/ !**/src/test/**/bin/ ### IntelliJ IDEA ### -.idea +.idea/* *.iws *.iml *.ipr diff --git a/src/main/java/com/thread/concurrency/async/service/AsyncService.java b/src/main/java/com/thread/concurrency/async/service/AsyncService.java new file mode 100644 index 0000000..422c911 --- /dev/null +++ b/src/main/java/com/thread/concurrency/async/service/AsyncService.java @@ -0,0 +1,21 @@ +package com.thread.concurrency.async.service; + +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Service; + +import java.util.concurrent.CompletableFuture; + +@Service +public class AsyncService { + @Async + public CompletableFuture voidParamStringReturn(){ + System.out.println("비동기적으로 실행 - "+ + Thread.currentThread().getName()); + try{ + Thread.sleep(1000); + return CompletableFuture.completedFuture("hello world"); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } +} diff --git a/src/test/java/com/thread/concurrency/AsyncServiceTest.java b/src/test/java/com/thread/concurrency/AsyncServiceTest.java new file mode 100644 index 0000000..65a7033 --- /dev/null +++ b/src/test/java/com/thread/concurrency/AsyncServiceTest.java @@ -0,0 +1,37 @@ +package com.thread.concurrency; + +import com.thread.concurrency.async.service.AsyncService; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + + +public class AsyncServiceTest { + private AsyncService asyncService; + @BeforeEach + void initUseCase() { // spring container를 사용하지 않고 순수 클래스를 사용. + asyncService = new AsyncService(); + } + @Test + @DisplayName("입력은 void 출력은 String인 비동기 함수 단일 호출") + public void testGetString() throws ExecutionException, InterruptedException { + CompletableFuture helloWorld = asyncService.voidParamStringReturn(); + Assertions.assertEquals("hello world",helloWorld.get()); + } + +// @Test +// @DisplayName("입력은 void 출력은 String인 비동기 함수 복수 호출 그리고 결과 조합") +// public void testMultiGetString() throws ExecutionException, InterruptedException { +// List> completableFutures = new ArrayList<>(); +// +// for (int j = 0; j <= 23; j++) { +// completableFutures.add(asyncService.voidParamStringReturn())); +// } +// CompletableFuture.allOf(price1,price2,price3).join(); +// Assertions.assertEquals("hello world !!hello world !!", combin); +// } +} From fd930569f6ff7449ee39d63cd716334146344a7c Mon Sep 17 00:00:00 2001 From: ohchansol Date: Wed, 27 Mar 2024 09:46:14 +0900 Subject: [PATCH 02/11] =?UTF-8?q?=E2=9C=85=20Test=20:=20=ED=85=8C=EC=8A=A4?= =?UTF-8?q?=ED=8A=B8=20=EC=BD=94=EB=93=9C=20=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. 메서드가 비동기 메서드인지 확인하는 테스트 작성 2. @Async를 붙인 메서드가 비동기로 동자가하지 않고 하나의 쓰레드에서 동작한다. --- .../async/service/AsyncService.java | 8 +-- .../thread/concurrency/AsyncServiceTest.java | 66 +++++++++++++------ 2 files changed, 49 insertions(+), 25 deletions(-) diff --git a/src/main/java/com/thread/concurrency/async/service/AsyncService.java b/src/main/java/com/thread/concurrency/async/service/AsyncService.java index 422c911..3ed96b5 100644 --- a/src/main/java/com/thread/concurrency/async/service/AsyncService.java +++ b/src/main/java/com/thread/concurrency/async/service/AsyncService.java @@ -8,11 +8,11 @@ @Service public class AsyncService { @Async - public CompletableFuture voidParamStringReturn(){ - System.out.println("비동기적으로 실행 - "+ - Thread.currentThread().getName()); + public CompletableFuture voidParamStringReturn(long waitTime){ +// System.out.println("비동기적으로 실행 - "+ +// Thread.currentThread().getName()); try{ - Thread.sleep(1000); + Thread.sleep(waitTime); return CompletableFuture.completedFuture("hello world"); } catch (InterruptedException e) { throw new RuntimeException(e); diff --git a/src/test/java/com/thread/concurrency/AsyncServiceTest.java b/src/test/java/com/thread/concurrency/AsyncServiceTest.java index 65a7033..087e4e7 100644 --- a/src/test/java/com/thread/concurrency/AsyncServiceTest.java +++ b/src/test/java/com/thread/concurrency/AsyncServiceTest.java @@ -1,37 +1,61 @@ package com.thread.concurrency; import com.thread.concurrency.async.service.AsyncService; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.*; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; - +@SpringBootTest public class AsyncServiceTest { + @Autowired private AsyncService asyncService; - @BeforeEach - void initUseCase() { // spring container를 사용하지 않고 순수 클래스를 사용. - asyncService = new AsyncService(); - } @Test @DisplayName("입력은 void 출력은 String인 비동기 함수 단일 호출") public void testGetString() throws ExecutionException, InterruptedException { - CompletableFuture helloWorld = asyncService.voidParamStringReturn(); + CompletableFuture helloWorld = asyncService.voidParamStringReturn(1000); Assertions.assertEquals("hello world",helloWorld.get()); } + @Test + @DisplayName("입력은 void 출력은 String인 비동기 함수 단일 호출 타임아웃 발생.") + public void testGetStringTimeOutIsThisAsync() { + // voidParamStringReturn가 비동기 메서드인지 의문이 생김. + CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> { + try { + return asyncService.voidParamStringReturn(4000).get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + }); + long timeOutValue = 1; + TimeUnit timeUnit = TimeUnit.SECONDS; + // 1초가 지난 후 타임아웃 발생 + Assertions.assertThrows(ExecutionException.class, () -> completableFuture.orTimeout(timeOutValue,timeUnit).get()); + } -// @Test -// @DisplayName("입력은 void 출력은 String인 비동기 함수 복수 호출 그리고 결과 조합") -// public void testMultiGetString() throws ExecutionException, InterruptedException { -// List> completableFutures = new ArrayList<>(); -// -// for (int j = 0; j <= 23; j++) { -// completableFutures.add(asyncService.voidParamStringReturn())); -// } -// CompletableFuture.allOf(price1,price2,price3).join(); -// Assertions.assertEquals("hello world !!hello world !!", combin); -// } + @Test + @DisplayName("입력은 void 출력은 String인 비동기 함수 복수 호출 그리고 결과 조합") + public void testMultiGetString() { + List> futures = new ArrayList<>(); + for (int i = 1; i <= 1000; i++) { // 동기라면 10초가 걸리고 비동기라면 0.01초가 걸릴 것이다. + futures.add(asyncService.voidParamStringReturn(10)); + } + CompletableFuture> aggregate = CompletableFuture.completedFuture(new ArrayList<>()); + for (CompletableFuture future : futures) { + aggregate = aggregate.thenCompose(list -> { + list.add(String.valueOf(future)); + return CompletableFuture.completedFuture(list); + }); + } + final List results = aggregate.join(); + for (int i = 0; i < 1000; i++) { + System.out.println("Finished " + results.get(i)); + } + + } } From 3c8ff758277d6f9ff18fd849e9b3c95e2c6e14d4 Mon Sep 17 00:00:00 2001 From: ohchansol Date: Thu, 28 Mar 2024 00:12:26 +0900 Subject: [PATCH 03/11] =?UTF-8?q?=E2=9C=A8=20Feat=20:=20=EC=83=88=EB=A1=9C?= =?UTF-8?q?=EC=9A=B4=20=EA=B8=B0=EB=8A=A5=20=E2=9C=85=20Test=20:=20?= =?UTF-8?q?=ED=85=8C=EC=8A=A4=ED=8A=B8=20=EC=BD=94=EB=93=9C=20=EC=B6=94?= =?UTF-8?q?=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. 다중 스레드 설정을 위해 taskExecutor 메서드 빈으로 저장 2. 비동기 메서드 다중 호출 시 병렬적으로 동작하는지 testGetMultiString에서 검증 3. 비동기 메서드의 지연시간이 길어질 경우 타임아웃 발생하도록 하는 부분 검증 --- .../SpringThreadConcurrencyApplication.java | 17 +++++- .../async/controller/AsyncController.java | 12 +++++ .../async/service/AsyncService.java | 15 +++--- .../thread/concurrency/AsyncServiceTest.java | 53 ++++++++----------- 4 files changed, 56 insertions(+), 41 deletions(-) create mode 100644 src/main/java/com/thread/concurrency/async/controller/AsyncController.java diff --git a/src/main/java/com/thread/concurrency/SpringThreadConcurrencyApplication.java b/src/main/java/com/thread/concurrency/SpringThreadConcurrencyApplication.java index 42b9717..099710c 100644 --- a/src/main/java/com/thread/concurrency/SpringThreadConcurrencyApplication.java +++ b/src/main/java/com/thread/concurrency/SpringThreadConcurrencyApplication.java @@ -2,12 +2,27 @@ import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Bean; +import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.util.concurrent.Executor; @SpringBootApplication +@EnableAsync public class SpringThreadConcurrencyApplication { public static void main(String[] args) { SpringApplication.run(SpringThreadConcurrencyApplication.class, args); } - + @Bean + public Executor taskExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // Spring에서 사용하는 스레드를 제어한느 설정 + executor.setCorePoolSize(50); // thread-pool에 살아있는 thread의 최소 개수 + executor.setMaxPoolSize(50); // thread-pool에서 사용할 수 있는 최대 개수 + executor.setQueueCapacity(500); //thread-pool에 최대 queue 크기 + executor.setThreadNamePrefix("AsyncApp-"); + executor.initialize(); + return executor; + } } diff --git a/src/main/java/com/thread/concurrency/async/controller/AsyncController.java b/src/main/java/com/thread/concurrency/async/controller/AsyncController.java new file mode 100644 index 0000000..64037ed --- /dev/null +++ b/src/main/java/com/thread/concurrency/async/controller/AsyncController.java @@ -0,0 +1,12 @@ +package com.thread.concurrency.async.controller; + +import com.thread.concurrency.async.service.AsyncService; +import org.springframework.stereotype.Controller; +@Controller +public class AsyncController { + private final AsyncService asyncService; + + public AsyncController(AsyncService asyncService) { + this.asyncService = asyncService; + } +} diff --git a/src/main/java/com/thread/concurrency/async/service/AsyncService.java b/src/main/java/com/thread/concurrency/async/service/AsyncService.java index 3ed96b5..9cb2a00 100644 --- a/src/main/java/com/thread/concurrency/async/service/AsyncService.java +++ b/src/main/java/com/thread/concurrency/async/service/AsyncService.java @@ -5,17 +5,14 @@ import java.util.concurrent.CompletableFuture; + @Service public class AsyncService { @Async - public CompletableFuture voidParamStringReturn(long waitTime){ -// System.out.println("비동기적으로 실행 - "+ -// Thread.currentThread().getName()); - try{ - Thread.sleep(waitTime); - return CompletableFuture.completedFuture("hello world"); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } + public CompletableFuture voidParamStringReturn (long waitTime, String message) throws InterruptedException{ + System.out.println("비동기적으로 실행 - "+ + Thread.currentThread().getName()); + Thread.sleep(waitTime); + return CompletableFuture.completedFuture(message); } } diff --git a/src/test/java/com/thread/concurrency/AsyncServiceTest.java b/src/test/java/com/thread/concurrency/AsyncServiceTest.java index 087e4e7..90b0d2d 100644 --- a/src/test/java/com/thread/concurrency/AsyncServiceTest.java +++ b/src/test/java/com/thread/concurrency/AsyncServiceTest.java @@ -2,6 +2,8 @@ import com.thread.concurrency.async.service.AsyncService; import org.junit.jupiter.api.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @@ -13,49 +15,38 @@ @SpringBootTest public class AsyncServiceTest { + private static final Logger logger = LoggerFactory.getLogger(AsyncServiceTest.class); @Autowired private AsyncService asyncService; + @Test @DisplayName("입력은 void 출력은 String인 비동기 함수 단일 호출") public void testGetString() throws ExecutionException, InterruptedException { - CompletableFuture helloWorld = asyncService.voidParamStringReturn(1000); - Assertions.assertEquals("hello world",helloWorld.get()); + CompletableFuture helloWorld = asyncService.voidParamStringReturn(1000, "기본 메세지"); + Assertions.assertEquals("기본 메세지",helloWorld.get()); } + + @Test + @DisplayName("입력은 void 출력은 String인 비동기 함수 다중 호출") + public void testGetMultiString() throws InterruptedException { + List> hellos = new ArrayList<>(); + for(int i=0; i<100; i++){ + hellos.add(asyncService.voidParamStringReturn(1000,i+"번째 메세지")); + } + // 모든 비동기 호출이 완료될 때까지 대기하고 결과를 리스트에 넣기 + List results = hellos.stream().map(CompletableFuture::join) + .toList(); + results.forEach(logger::info); + } + @Test @DisplayName("입력은 void 출력은 String인 비동기 함수 단일 호출 타임아웃 발생.") - public void testGetStringTimeOutIsThisAsync() { + public void testGetStringTimeOutIsThisAsync() throws InterruptedException { // voidParamStringReturn가 비동기 메서드인지 의문이 생김. - CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> { - try { - return asyncService.voidParamStringReturn(4000).get(); - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException(e); - } - }); + CompletableFuture completableFuture = asyncService.voidParamStringReturn(4000, "타임아웃 발생 안 함!"); long timeOutValue = 1; TimeUnit timeUnit = TimeUnit.SECONDS; // 1초가 지난 후 타임아웃 발생 Assertions.assertThrows(ExecutionException.class, () -> completableFuture.orTimeout(timeOutValue,timeUnit).get()); } - - @Test - @DisplayName("입력은 void 출력은 String인 비동기 함수 복수 호출 그리고 결과 조합") - public void testMultiGetString() { - List> futures = new ArrayList<>(); - for (int i = 1; i <= 1000; i++) { // 동기라면 10초가 걸리고 비동기라면 0.01초가 걸릴 것이다. - futures.add(asyncService.voidParamStringReturn(10)); - } - CompletableFuture> aggregate = CompletableFuture.completedFuture(new ArrayList<>()); - for (CompletableFuture future : futures) { - aggregate = aggregate.thenCompose(list -> { - list.add(String.valueOf(future)); - return CompletableFuture.completedFuture(list); - }); - } - final List results = aggregate.join(); - for (int i = 0; i < 1000; i++) { - System.out.println("Finished " + results.get(i)); - } - - } } From c734c167888058b7b6275052cce642b79896531b Mon Sep 17 00:00:00 2001 From: ohchansol Date: Thu, 28 Mar 2024 12:32:08 +0900 Subject: [PATCH 04/11] =?UTF-8?q?=E2=9C=A8=20Feat=20:=20=EC=83=88=EB=A1=9C?= =?UTF-8?q?=EC=9A=B4=20=EA=B8=B0=EB=8A=A5=20=E2=9C=85=20Test=20:=20?= =?UTF-8?q?=ED=85=8C=EC=8A=A4=ED=8A=B8=20=EC=BD=94=EB=93=9C=20=EC=B6=94?= =?UTF-8?q?=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. 비동기 호출을 10회 하는 비동기 메서드를 10회 호출하는 calculateRunTime 작성 2. 비동기 호출이 100회 일 때 50개는 쓰레드가 실행하고 나머지 50개는 블록킹 큐에서 대기한다. --- .../async/controller/AsyncController.java | 23 +++++++++++++ .../concurrency/AsyncControllerTest.java | 32 +++++++++++++++++++ 2 files changed, 55 insertions(+) create mode 100644 src/test/java/com/thread/concurrency/AsyncControllerTest.java diff --git a/src/main/java/com/thread/concurrency/async/controller/AsyncController.java b/src/main/java/com/thread/concurrency/async/controller/AsyncController.java index 64037ed..6be4cdf 100644 --- a/src/main/java/com/thread/concurrency/async/controller/AsyncController.java +++ b/src/main/java/com/thread/concurrency/async/controller/AsyncController.java @@ -1,7 +1,15 @@ package com.thread.concurrency.async.controller; import com.thread.concurrency.async.service.AsyncService; +import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Controller; + +import java.time.Duration; +import java.time.LocalTime; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; + @Controller public class AsyncController { private final AsyncService asyncService; @@ -9,4 +17,19 @@ public class AsyncController { public AsyncController(AsyncService asyncService) { this.asyncService = asyncService; } + + @Async + public CompletableFuture calculateRunTime(int cnt, int waitTime) throws InterruptedException { + LocalTime lt1 = LocalTime.now(); + List> hellos = new ArrayList<>(); + for(int i=0; i results = hellos.stream().map(CompletableFuture::join) + .toList(); + LocalTime lt2 = LocalTime.now(); + long dif = Duration.between(lt1, lt2).toMillis(); + return CompletableFuture.completedFuture(dif+"가 걸렸습니다."); + } } diff --git a/src/test/java/com/thread/concurrency/AsyncControllerTest.java b/src/test/java/com/thread/concurrency/AsyncControllerTest.java new file mode 100644 index 0000000..59d0f06 --- /dev/null +++ b/src/test/java/com/thread/concurrency/AsyncControllerTest.java @@ -0,0 +1,32 @@ +package com.thread.concurrency; + +import com.thread.concurrency.async.controller.AsyncController; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +@SpringBootTest +public class AsyncControllerTest { + private static final Logger logger = LoggerFactory.getLogger(AsyncServiceTest.class); + + @Autowired + private AsyncController asyncController; + + @Test + public void invokeMultiAsyncMethod() throws InterruptedException { + List> hellos = new ArrayList<>(); + for(int i=0; i<10; i++){ + hellos.add(asyncController.calculateRunTime(10, 1000)); + } + // 모든 비동기 호출이 완료될 때까지 대기하고 결과를 리스트에 넣기 + List results = hellos.stream().map(CompletableFuture::join) + .toList(); + results.forEach(logger::info); + } +} From f78aeaca360dd79080ff9f7ba8fd0db7f42d29f4 Mon Sep 17 00:00:00 2001 From: ohchansol Date: Sat, 30 Mar 2024 16:01:54 +0900 Subject: [PATCH 05/11] =?UTF-8?q?=E2=9C=A8=20Feat=20:=20=EC=83=88=EB=A1=9C?= =?UTF-8?q?=EC=9A=B4=20=EA=B8=B0=EB=8A=A5=20=E2=9C=85=20Test=20:=20?= =?UTF-8?q?=ED=85=8C=EC=8A=A4=ED=8A=B8=20=EC=BD=94=EB=93=9C=20=EC=B6=94?= =?UTF-8?q?=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. BasicCounter 구현. 멀티 쓰레드로 카운터 동시 업데이트 불가 테스트 2. SynchronizedCounter 구현. 멀티 쓰레드로 카운터 동시 업데이트 가능 테스트. Synchronized만 사용했을 때는 0.5s 소요 3. Completable 사용하면 실행시간이 대폭 감소 --- build.gradle.kts | 1 + .../concurrency/counter/BasicCounter.java | 18 +++++ .../thread/concurrency/counter/Counter.java | 6 ++ .../counter/SynchronizedCounter.java | 21 +++++ .../concurrency/counter/BasicCounterTest.java | 65 ++++++++++++++++ .../counter/SynchronizedCounterTest.java | 76 +++++++++++++++++++ .../com/thread/concurrency/package-info.java | 1 + 7 files changed, 188 insertions(+) create mode 100644 src/main/java/com/thread/concurrency/counter/BasicCounter.java create mode 100644 src/main/java/com/thread/concurrency/counter/Counter.java create mode 100644 src/main/java/com/thread/concurrency/counter/SynchronizedCounter.java create mode 100644 src/test/java/com/thread/concurrency/counter/BasicCounterTest.java create mode 100644 src/test/java/com/thread/concurrency/counter/SynchronizedCounterTest.java create mode 100644 src/test/java/com/thread/concurrency/package-info.java diff --git a/build.gradle.kts b/build.gradle.kts index dfe6a2a..85502fe 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -18,6 +18,7 @@ repositories { dependencies { implementation("org.springframework.boot:spring-boot-starter") + implementation("net.jcip:jcip-annotations:1.0") testImplementation("org.springframework.boot:spring-boot-starter-test") testRuntimeOnly("org.junit.platform:junit-platform-launcher") testRuntimeOnly("org.reactivestreams:reactive-streams") diff --git a/src/main/java/com/thread/concurrency/counter/BasicCounter.java b/src/main/java/com/thread/concurrency/counter/BasicCounter.java new file mode 100644 index 0000000..ae72fd6 --- /dev/null +++ b/src/main/java/com/thread/concurrency/counter/BasicCounter.java @@ -0,0 +1,18 @@ +package com.thread.concurrency.counter; + +import org.springframework.stereotype.Component; + +@Component +public class BasicCounter implements Counter{ + private static int count = 100; + + @Override + public void add(int value) { + count += value; + } + + @Override + public int show() { + return count; + } +} diff --git a/src/main/java/com/thread/concurrency/counter/Counter.java b/src/main/java/com/thread/concurrency/counter/Counter.java new file mode 100644 index 0000000..4575c3d --- /dev/null +++ b/src/main/java/com/thread/concurrency/counter/Counter.java @@ -0,0 +1,6 @@ +package com.thread.concurrency.counter; + +public interface Counter { + void add(int value); + int show(); +} diff --git a/src/main/java/com/thread/concurrency/counter/SynchronizedCounter.java b/src/main/java/com/thread/concurrency/counter/SynchronizedCounter.java new file mode 100644 index 0000000..be6863c --- /dev/null +++ b/src/main/java/com/thread/concurrency/counter/SynchronizedCounter.java @@ -0,0 +1,21 @@ +package com.thread.concurrency.counter; + +import net.jcip.annotations.GuardedBy; +import org.springframework.stereotype.Component; + +@Component +public class SynchronizedCounter implements Counter{ + + @GuardedBy("this") + private int counter = 100; + + @Override + public synchronized void add(int value) { + counter += value; + } + + @Override + public synchronized int show() { + return counter; + } +} diff --git a/src/test/java/com/thread/concurrency/counter/BasicCounterTest.java b/src/test/java/com/thread/concurrency/counter/BasicCounterTest.java new file mode 100644 index 0000000..7a8a2e3 --- /dev/null +++ b/src/test/java/com/thread/concurrency/counter/BasicCounterTest.java @@ -0,0 +1,65 @@ +package com.thread.concurrency.counter; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +@SpringBootTest +public class BasicCounterTest { + + private final BasicCounter basicCounter; + private final int counteNumber = 1; + private final int totalCount = 100; + + @Autowired + public BasicCounterTest(BasicCounter basicCounter) { + this.basicCounter = basicCounter; + } + + @Test + @DisplayName("스레드 안전하지 않는 카운터로 동시에 여러 더하기 수행하기. 실패 예상") + public void 여러_더하기_수행(){ + int initalCount = basicCounter.show(); + + for(int i=0; i { + basicCounter.add(counteNumber); + }); + } + int finalCount = basicCounter.show(); + Assertions.assertNotEquals(initalCount+totalCount*counteNumber, finalCount); + } + + @Test + @DisplayName("synchronized로 스레드 안전한 카운터로 동시에 여러 더하기 수행하기. 활동성 문제 예상") + public void 여러_더하기_수행_CompletableFuture() { + int initalCount = basicCounter.show(); + List> tasks = new ArrayList<>(); + for(int i=0; i basicCounter.add(counteNumber))); + } + + CompletableFuture> aggregate = CompletableFuture.completedFuture(new ArrayList<>()); + for (CompletableFuture future : tasks) { + aggregate = aggregate.thenCompose(list -> { + try { + list.add(future.get()); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + return CompletableFuture.completedFuture(list); + }); + } + aggregate.join(); // 전체 비동기 결과 집계 + int finalCount = basicCounter.show(); + Assertions.assertEquals(initalCount+totalCount*counteNumber, finalCount); + } +} diff --git a/src/test/java/com/thread/concurrency/counter/SynchronizedCounterTest.java b/src/test/java/com/thread/concurrency/counter/SynchronizedCounterTest.java new file mode 100644 index 0000000..340112d --- /dev/null +++ b/src/test/java/com/thread/concurrency/counter/SynchronizedCounterTest.java @@ -0,0 +1,76 @@ +package com.thread.concurrency.counter; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.*; + +@SpringBootTest +public class SynchronizedCounterTest { + + private final SynchronizedCounter counter; + private final int counteNumber = 1; + private final int totalCount = 100; + + @Autowired + public SynchronizedCounterTest(SynchronizedCounter counter) { + this.counter = counter; + } + + /** + * 실행 완료까지 0.5s 정도 소요 + * @throws InterruptedException + */ + @Test + @DisplayName("synchronized로 스레드 안전한 카운터로 동시에 여러 더하기 수행하기. 활동성 문제 예상") + public void 여러_더하기_수행_Executor() throws InterruptedException { + int initalCount = counter.show(); + int numberOfThreads = totalCount; + ExecutorService service = Executors.newFixedThreadPool(10); + CountDownLatch latch = new CountDownLatch(numberOfThreads); + for (int i = 0; i < numberOfThreads; i++) { + service.submit(() -> { + counter.add(counteNumber); + latch.countDown(); + }); + } + latch.await(); + int finalCount = counter.show(); + + Assertions.assertEquals(initalCount+totalCount*counteNumber, finalCount); + } + + /** + * 실행 완료까지 0.002s 소요 + */ + @Test + @DisplayName("synchronized로 스레드 안전한 카운터로 동시에 여러 더하기 수행하기. 활동성 문제 예상") + public void 여러_더하기_수행_CompletableFuture() { + int initalCount = counter.show(); + List> tasks = new ArrayList<>(); + for(int i=0; i counter.add(counteNumber))); + } + + CompletableFuture> aggregate = CompletableFuture.completedFuture(new ArrayList<>()); + for (CompletableFuture future : tasks) { + aggregate = aggregate.thenCompose(list -> { + try { + list.add(future.get()); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + return CompletableFuture.completedFuture(list); + }); + } + aggregate.join(); // 전체 비동기 결과 집계 + int finalCount = counter.show(); + Assertions.assertEquals(initalCount+totalCount*counteNumber, finalCount); + } +} diff --git a/src/test/java/com/thread/concurrency/package-info.java b/src/test/java/com/thread/concurrency/package-info.java new file mode 100644 index 0000000..ae29573 --- /dev/null +++ b/src/test/java/com/thread/concurrency/package-info.java @@ -0,0 +1 @@ +package com.thread.concurrency; From b96f482e8005c3abff80acb8906e4936e4e1cb79 Mon Sep 17 00:00:00 2001 From: ohchansol Date: Sat, 30 Mar 2024 16:30:32 +0900 Subject: [PATCH 06/11] =?UTF-8?q?=F0=9F=A4=96=20Refactor=20:=20=EC=BD=94?= =?UTF-8?q?=EB=93=9C=20=EB=A6=AC=ED=8C=A9=ED=86=A0=EB=A7=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. SynchronizedCounter 테스트의 실질적 실행시간을 로깅하도록 리팩토링 --- .../concurrency/counter/BasicCounterTest.java | 30 ---------------- .../counter/SynchronizedCounterTest.java | 36 ++++++++++++------- 2 files changed, 23 insertions(+), 43 deletions(-) diff --git a/src/test/java/com/thread/concurrency/counter/BasicCounterTest.java b/src/test/java/com/thread/concurrency/counter/BasicCounterTest.java index 7a8a2e3..ffc7e7f 100644 --- a/src/test/java/com/thread/concurrency/counter/BasicCounterTest.java +++ b/src/test/java/com/thread/concurrency/counter/BasicCounterTest.java @@ -1,16 +1,11 @@ package com.thread.concurrency.counter; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; - -import java.util.ArrayList; -import java.util.List; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; @SpringBootTest public class BasicCounterTest { @@ -37,29 +32,4 @@ public BasicCounterTest(BasicCounter basicCounter) { int finalCount = basicCounter.show(); Assertions.assertNotEquals(initalCount+totalCount*counteNumber, finalCount); } - - @Test - @DisplayName("synchronized로 스레드 안전한 카운터로 동시에 여러 더하기 수행하기. 활동성 문제 예상") - public void 여러_더하기_수행_CompletableFuture() { - int initalCount = basicCounter.show(); - List> tasks = new ArrayList<>(); - for(int i=0; i basicCounter.add(counteNumber))); - } - - CompletableFuture> aggregate = CompletableFuture.completedFuture(new ArrayList<>()); - for (CompletableFuture future : tasks) { - aggregate = aggregate.thenCompose(list -> { - try { - list.add(future.get()); - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException(e); - } - return CompletableFuture.completedFuture(list); - }); - } - aggregate.join(); // 전체 비동기 결과 집계 - int finalCount = basicCounter.show(); - Assertions.assertEquals(initalCount+totalCount*counteNumber, finalCount); - } } diff --git a/src/test/java/com/thread/concurrency/counter/SynchronizedCounterTest.java b/src/test/java/com/thread/concurrency/counter/SynchronizedCounterTest.java index 340112d..8d55ba7 100644 --- a/src/test/java/com/thread/concurrency/counter/SynchronizedCounterTest.java +++ b/src/test/java/com/thread/concurrency/counter/SynchronizedCounterTest.java @@ -1,11 +1,16 @@ package com.thread.concurrency.counter; +import com.thread.concurrency.AsyncServiceTest; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; +import java.time.Duration; +import java.time.LocalTime; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -14,27 +19,24 @@ @SpringBootTest public class SynchronizedCounterTest { - private final SynchronizedCounter counter; private final int counteNumber = 1; private final int totalCount = 100; - - @Autowired - public SynchronizedCounterTest(SynchronizedCounter counter) { - this.counter = counter; - } + private static final Logger logger = LoggerFactory.getLogger(SynchronizedCounterTest.class); /** - * 실행 완료까지 0.5s 정도 소요 + * 실행 완료까지 871ms 정도 소요 * @throws InterruptedException */ @Test @DisplayName("synchronized로 스레드 안전한 카운터로 동시에 여러 더하기 수행하기. 활동성 문제 예상") public void 여러_더하기_수행_Executor() throws InterruptedException { + SynchronizedCounter counter = new SynchronizedCounter(); + LocalTime lt1 = LocalTime.now(); int initalCount = counter.show(); - int numberOfThreads = totalCount; - ExecutorService service = Executors.newFixedThreadPool(10); - CountDownLatch latch = new CountDownLatch(numberOfThreads); - for (int i = 0; i < numberOfThreads; i++) { + int numberOfThreads = 15; + ExecutorService service = Executors.newFixedThreadPool(15); + CountDownLatch latch = new CountDownLatch(100); + for (int i = 0; i < totalCount; i++) { service.submit(() -> { counter.add(counteNumber); latch.countDown(); @@ -42,16 +44,20 @@ public SynchronizedCounterTest(SynchronizedCounter counter) { } latch.await(); int finalCount = counter.show(); - + LocalTime lt2 = LocalTime.now(); + long dif = Duration.between(lt1, lt2).getNano(); + logger.info("여러_더하기_수행_Executor 테스트가 걸린 시간 : "+dif/1000+"ms"); Assertions.assertEquals(initalCount+totalCount*counteNumber, finalCount); } /** - * 실행 완료까지 0.002s 소요 + * 실행 완료까지 1061ms 소요 */ @Test @DisplayName("synchronized로 스레드 안전한 카운터로 동시에 여러 더하기 수행하기. 활동성 문제 예상") public void 여러_더하기_수행_CompletableFuture() { + SynchronizedCounter counter = new SynchronizedCounter(); + LocalTime lt1 = LocalTime.now(); int initalCount = counter.show(); List> tasks = new ArrayList<>(); for(int i=0; i Date: Sat, 30 Mar 2024 17:52:12 +0900 Subject: [PATCH 07/11] =?UTF-8?q?=E2=9C=A8=20Feat=20:=20=EC=83=88=EB=A1=9C?= =?UTF-8?q?=EC=9A=B4=20=EA=B8=B0=EB=8A=A5=20=E2=9C=85=20Test=20:=20?= =?UTF-8?q?=ED=85=8C=EC=8A=A4=ED=8A=B8=20=EC=BD=94=EB=93=9C=20=EC=B6=94?= =?UTF-8?q?=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. CompletableFuture를 이용한 카운터 작성 2. 카운터에 int가 아닌 CompletableFuture를 저장 3. 작업의 완성본을 저장하지 않고 작업 진행 중인 Future 객체를 저장하는 점에서 추후 캐싱 시나리오에 도움될 것으로 보임 --- .../counter/CompletableFutureCounter.java | 30 +++++++++++++ .../counter/CompletableFutureCounterTest.java | 43 +++++++++++++++++++ .../counter/SynchronizedCounterTest.java | 4 +- 3 files changed, 75 insertions(+), 2 deletions(-) create mode 100644 src/main/java/com/thread/concurrency/counter/CompletableFutureCounter.java create mode 100644 src/test/java/com/thread/concurrency/counter/CompletableFutureCounterTest.java diff --git a/src/main/java/com/thread/concurrency/counter/CompletableFutureCounter.java b/src/main/java/com/thread/concurrency/counter/CompletableFutureCounter.java new file mode 100644 index 0000000..0a32d60 --- /dev/null +++ b/src/main/java/com/thread/concurrency/counter/CompletableFutureCounter.java @@ -0,0 +1,30 @@ +package com.thread.concurrency.counter; + +import org.springframework.stereotype.Component; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; + +@Component +public class CompletableFutureCounter implements Counter{ + + private CompletableFuture counter = new CompletableFuture<>(); + + @Override + public void add(int value) { + // 연산이 진행 중이라면 기다렸다가 thenApply + // 카운트에 값 저장 + counter = counter.thenApply((c) -> c + value); + } + + @Override + public int show() { + try { + // 카운트에 대한 연산이 실행 중이라면 기다렸다가 가져오기 + return counter.get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } +} diff --git a/src/test/java/com/thread/concurrency/counter/CompletableFutureCounterTest.java b/src/test/java/com/thread/concurrency/counter/CompletableFutureCounterTest.java new file mode 100644 index 0000000..236bfd9 --- /dev/null +++ b/src/test/java/com/thread/concurrency/counter/CompletableFutureCounterTest.java @@ -0,0 +1,43 @@ +package com.thread.concurrency.counter; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.test.context.SpringBootTest; + +import java.time.Duration; +import java.time.LocalTime; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +@SpringBootTest +public class CompletableFutureCounterTest { + private final int counteNumber = 1; + private final int totalCount = 10000; + private static final Logger logger = LoggerFactory.getLogger(CompletableFutureCounterTest.class); + + @Test + public void 여러_더하기_수행_Compltable() throws InterruptedException { + SynchronizedCounter counter = new SynchronizedCounter(); + LocalTime lt1 = LocalTime.now(); + + int initalCount = counter.show(); + ExecutorService service = Executors.newFixedThreadPool(15); + CountDownLatch latch = new CountDownLatch(totalCount); + for (int i = 0; i < totalCount; i++) { + service.submit(() -> { + counter.add(counteNumber); + latch.countDown(); + }); + } + latch.await(); + int finalCount = counter.show(); + + LocalTime lt2 = LocalTime.now(); + long dif = Duration.between(lt1, lt2).getNano(); + logger.info("여러_더하기_수행_Compltable 테스트가 걸린 시간 : "+dif/1000+"ms"); + Assertions.assertEquals(initalCount+totalCount*counteNumber, finalCount); + } +} diff --git a/src/test/java/com/thread/concurrency/counter/SynchronizedCounterTest.java b/src/test/java/com/thread/concurrency/counter/SynchronizedCounterTest.java index 8d55ba7..0c24bec 100644 --- a/src/test/java/com/thread/concurrency/counter/SynchronizedCounterTest.java +++ b/src/test/java/com/thread/concurrency/counter/SynchronizedCounterTest.java @@ -20,7 +20,7 @@ public class SynchronizedCounterTest { private final int counteNumber = 1; - private final int totalCount = 100; + private final int totalCount = 10000; private static final Logger logger = LoggerFactory.getLogger(SynchronizedCounterTest.class); /** @@ -35,7 +35,7 @@ public class SynchronizedCounterTest { int initalCount = counter.show(); int numberOfThreads = 15; ExecutorService service = Executors.newFixedThreadPool(15); - CountDownLatch latch = new CountDownLatch(100); + CountDownLatch latch = new CountDownLatch(totalCount); for (int i = 0; i < totalCount; i++) { service.submit(() -> { counter.add(counteNumber); From 83c6f93bbe816d26ac159c8064c4d50cac885f5e Mon Sep 17 00:00:00 2001 From: ohchansol Date: Tue, 2 Apr 2024 15:55:55 +0900 Subject: [PATCH 08/11] =?UTF-8?q?=E2=9C=A8=20Feat=20:=20=EC=83=88=EB=A1=9C?= =?UTF-8?q?=EC=9A=B4=20=EA=B8=B0=EB=8A=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. 컨슈머는 브로커에게서 이벤트를 가져와 처리한다. 2. 프로듀서는 브로커에게 이벤트를 저장한다. 3. 브로커는 큐를 관리하고 큐에 이벤트 삽입과 제거를 담당한다. --- .../producerCustomer/CounterBroker.java | 28 +++++++++++++++++++ .../producerCustomer/CounterCustomer.java | 16 +++++++++++ .../producerCustomer/CounterProducer.java | 13 +++++++++ .../concurrency/counter/QueueCounter.java | 18 ++++++++++++ 4 files changed, 75 insertions(+) create mode 100644 src/main/java/com/thread/concurrency/counter/producerCustomer/CounterBroker.java create mode 100644 src/main/java/com/thread/concurrency/counter/producerCustomer/CounterCustomer.java create mode 100644 src/main/java/com/thread/concurrency/counter/producerCustomer/CounterProducer.java create mode 100644 src/test/java/com/thread/concurrency/counter/QueueCounter.java diff --git a/src/main/java/com/thread/concurrency/counter/producerCustomer/CounterBroker.java b/src/main/java/com/thread/concurrency/counter/producerCustomer/CounterBroker.java new file mode 100644 index 0000000..e759333 --- /dev/null +++ b/src/main/java/com/thread/concurrency/counter/producerCustomer/CounterBroker.java @@ -0,0 +1,28 @@ +package com.thread.concurrency.counter.producerCustomer; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.function.Function; + +public class CounterBroker { + private BlockingQueue> queue = new LinkedBlockingQueue<>(); + private Integer count; + public void addEvent(int value){ + try{ + queue.put((c) -> c + value); // 이 이벤트를 컨슈머가 처리할 당시 count와 value를 더한 값을 출력한다. + } + catch(InterruptedException e){ + Thread.currentThread().interrupt(); + } + } + + public void consumEvent(){ + try{ + // "value를 더한다"라는 이벤트는 현재 스레드만 가질 수 있다. + count = queue.take().apply(count); + } + catch(InterruptedException e){ + Thread.currentThread().interrupt(); + } + } +} diff --git a/src/main/java/com/thread/concurrency/counter/producerCustomer/CounterCustomer.java b/src/main/java/com/thread/concurrency/counter/producerCustomer/CounterCustomer.java new file mode 100644 index 0000000..b539b2c --- /dev/null +++ b/src/main/java/com/thread/concurrency/counter/producerCustomer/CounterCustomer.java @@ -0,0 +1,16 @@ +package com.thread.concurrency.counter.producerCustomer; + + +public class CounterCustomer{ + private CounterBroker counterBroker; + + public CounterCustomer(CounterBroker counterBroker) { + this.counterBroker = counterBroker; + } + + public void consumEvent(){ + while(true){ + counterBroker.consumEvent(); + } + } +} diff --git a/src/main/java/com/thread/concurrency/counter/producerCustomer/CounterProducer.java b/src/main/java/com/thread/concurrency/counter/producerCustomer/CounterProducer.java new file mode 100644 index 0000000..ac94c74 --- /dev/null +++ b/src/main/java/com/thread/concurrency/counter/producerCustomer/CounterProducer.java @@ -0,0 +1,13 @@ +package com.thread.concurrency.counter.producerCustomer; + +public class CounterProducer { + private CounterBroker counterBroker; + + public CounterProducer(CounterBroker counterBroker) { + this.counterBroker = counterBroker; + } + + public void add(int value){ + counterBroker.addEvent(value); + } +} diff --git a/src/test/java/com/thread/concurrency/counter/QueueCounter.java b/src/test/java/com/thread/concurrency/counter/QueueCounter.java new file mode 100644 index 0000000..a45cea7 --- /dev/null +++ b/src/test/java/com/thread/concurrency/counter/QueueCounter.java @@ -0,0 +1,18 @@ +package com.thread.concurrency.counter; + +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class QueueCounter { + private final int counteNumber = 1; + private final int totalCount = 10000; + private static final Logger logger = LoggerFactory.getLogger(SynchronizedCounterTest.class); + + @Test + @DisplayName("producer consumer 패턴을 이용해서 더하기 이벤트 발생 스레드와 더하기 이벤트 처리 스레드를 분리하자") + public void 프로듀서_컨슈며_더하기_멀티_프로듀서_단일_컨슈머(){ + + } +} From 0b332226916f7cb13869fee3696f3dcc31c07cac Mon Sep 17 00:00:00 2001 From: ohchansol Date: Tue, 2 Apr 2024 17:20:39 +0900 Subject: [PATCH 09/11] =?UTF-8?q?=E2=9C=85=20Test=20:=20=ED=85=8C=EC=8A=A4?= =?UTF-8?q?=ED=8A=B8=20=EC=BD=94=EB=93=9C=20=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. 프로듀서-컨슈머 패턴을 이용한 카운터 구현을 위해 테스트를 작성 2. 시간 측정 결과 기존 CompletableFutureCounter에 2배 시간이 걸리는 걸로 나왔다. To Do CompletableFutureCounterTest와 QueueCounterTest에 쓰이는 쓰레드 수를 같게해서 다시 시간을 측정해보기 --- .../producerCustomer/CounterBroker.java | 22 ++++++-- .../producerCustomer/CounterCustomer.java | 12 ++-- .../producerCustomer/CounterProducer.java | 2 +- .../concurrency/counter/QueueCounter.java | 18 ------ .../concurrency/counter/QueueCounterTest.java | 56 +++++++++++++++++++ 5 files changed, 83 insertions(+), 27 deletions(-) delete mode 100644 src/test/java/com/thread/concurrency/counter/QueueCounter.java create mode 100644 src/test/java/com/thread/concurrency/counter/QueueCounterTest.java diff --git a/src/main/java/com/thread/concurrency/counter/producerCustomer/CounterBroker.java b/src/main/java/com/thread/concurrency/counter/producerCustomer/CounterBroker.java index e759333..c6a1e8c 100644 --- a/src/main/java/com/thread/concurrency/counter/producerCustomer/CounterBroker.java +++ b/src/main/java/com/thread/concurrency/counter/producerCustomer/CounterBroker.java @@ -2,14 +2,21 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; +import java.util.function.IntUnaryOperator; +import java.util.function.UnaryOperator; public class CounterBroker { - private BlockingQueue> queue = new LinkedBlockingQueue<>(); - private Integer count; + // 100개의 이벤트를 저장할 수 있다. + private BlockingQueue> queue = new LinkedBlockingQueue<>(100); + private AtomicInteger count = new AtomicInteger(100); // AtomicInteger로 변경 public void addEvent(int value){ try{ - queue.put((c) -> c + value); // 이 이벤트를 컨슈머가 처리할 당시 count와 value를 더한 값을 출력한다. + // 이 이벤트를 컨슈머가 처리할 당시 count와 value를 더한 값을 출력한다. + // 만약 4초 동안 프로듀서가 요소를 넣지 못하면 timeout이 발생한다. + queue.offer((c) -> c + value, 4, TimeUnit.SECONDS); } catch(InterruptedException e){ Thread.currentThread().interrupt(); @@ -19,10 +26,17 @@ public void addEvent(int value){ public void consumEvent(){ try{ // "value를 더한다"라는 이벤트는 현재 스레드만 가질 수 있다. - count = queue.take().apply(count); + // AtomicInteger의 updateAndGet 메서드를 사용하여 원자적으로 값을 업데이트 + Function event = queue.take(); + IntUnaryOperator operator = event::apply; + count.updateAndGet(operator); } catch(InterruptedException e){ Thread.currentThread().interrupt(); } } + + public int show(){ + return count.get(); // AtomicInteger의 get 메서드를 사용하여 값을 가져옴 + } } diff --git a/src/main/java/com/thread/concurrency/counter/producerCustomer/CounterCustomer.java b/src/main/java/com/thread/concurrency/counter/producerCustomer/CounterCustomer.java index b539b2c..ae14d8b 100644 --- a/src/main/java/com/thread/concurrency/counter/producerCustomer/CounterCustomer.java +++ b/src/main/java/com/thread/concurrency/counter/producerCustomer/CounterCustomer.java @@ -1,15 +1,19 @@ package com.thread.concurrency.counter.producerCustomer; -public class CounterCustomer{ - private CounterBroker counterBroker; +public class CounterCustomer { + private final CounterBroker counterBroker; + private volatile boolean running; public CounterCustomer(CounterBroker counterBroker) { this.counterBroker = counterBroker; + this.running = true; + } + public void stop() { + running = false; // 스레드 종료를 위해 running 플래그를 false로 설정 } - public void consumEvent(){ - while(true){ + while(running){ counterBroker.consumEvent(); } } diff --git a/src/main/java/com/thread/concurrency/counter/producerCustomer/CounterProducer.java b/src/main/java/com/thread/concurrency/counter/producerCustomer/CounterProducer.java index ac94c74..c35110d 100644 --- a/src/main/java/com/thread/concurrency/counter/producerCustomer/CounterProducer.java +++ b/src/main/java/com/thread/concurrency/counter/producerCustomer/CounterProducer.java @@ -1,7 +1,7 @@ package com.thread.concurrency.counter.producerCustomer; public class CounterProducer { - private CounterBroker counterBroker; + private final CounterBroker counterBroker; public CounterProducer(CounterBroker counterBroker) { this.counterBroker = counterBroker; diff --git a/src/test/java/com/thread/concurrency/counter/QueueCounter.java b/src/test/java/com/thread/concurrency/counter/QueueCounter.java deleted file mode 100644 index a45cea7..0000000 --- a/src/test/java/com/thread/concurrency/counter/QueueCounter.java +++ /dev/null @@ -1,18 +0,0 @@ -package com.thread.concurrency.counter; - -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class QueueCounter { - private final int counteNumber = 1; - private final int totalCount = 10000; - private static final Logger logger = LoggerFactory.getLogger(SynchronizedCounterTest.class); - - @Test - @DisplayName("producer consumer 패턴을 이용해서 더하기 이벤트 발생 스레드와 더하기 이벤트 처리 스레드를 분리하자") - public void 프로듀서_컨슈며_더하기_멀티_프로듀서_단일_컨슈머(){ - - } -} diff --git a/src/test/java/com/thread/concurrency/counter/QueueCounterTest.java b/src/test/java/com/thread/concurrency/counter/QueueCounterTest.java new file mode 100644 index 0000000..595bac8 --- /dev/null +++ b/src/test/java/com/thread/concurrency/counter/QueueCounterTest.java @@ -0,0 +1,56 @@ +package com.thread.concurrency.counter; + +import com.thread.concurrency.counter.producerCustomer.CounterBroker; +import com.thread.concurrency.counter.producerCustomer.CounterCustomer; +import com.thread.concurrency.counter.producerCustomer.CounterProducer; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.time.LocalTime; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +public class QueueCounterTest { + private final int counteNumber = 1; + private final int totalCount = 10000; + private static final Logger logger = LoggerFactory.getLogger(SynchronizedCounterTest.class); + @Test + @DisplayName("producer consumer 패턴을 이용해서 더하기 이벤트 발생 스레드와 더하기 이벤트 처리 스레드를 분리하자") + public void 프로듀서_컨슈며_더하기_멀티_프로듀서_멀티_컨슈머() throws InterruptedException { + CounterBroker counterBroker = new CounterBroker(); + CounterCustomer customer = new CounterCustomer(counterBroker); + CounterProducer producer = new CounterProducer(counterBroker); + LocalTime lt1 = LocalTime.now(); + int initalCount = counterBroker.show(); + ExecutorService service = Executors.newFixedThreadPool(15); + CountDownLatch latch = new CountDownLatch(totalCount); + + // CounterCustomer 스레드 생성 및 비동기로 처리 시작 + List> futureList = new ArrayList<>(); + for(int i=0; i<3; i++){ + futureList.add(CompletableFuture.runAsync(customer::consumEvent)); + } + // 프로듀서 스레드 생성 + for (int i = 0; i < totalCount; i++) { + service.submit(() -> { + producer.add(counteNumber); + latch.countDown(); + }); + } + latch.await(); + + int finalCount = counterBroker.show(); + LocalTime lt2 = LocalTime.now(); + long dif = Duration.between(lt1, lt2).getNano(); + logger.info("프로듀서_컨슈며_더하기_멀티_프로듀서_단일_컨슈머 테스트가 걸린 시간 : " + dif / 1000 + "ms"); + Assertions.assertEquals(initalCount + totalCount*counteNumber, finalCount); + } +} From 06cbe1f570de27d0e5ac5725863b38b4ee0b8b50 Mon Sep 17 00:00:00 2001 From: ohchansol Date: Wed, 3 Apr 2024 08:56:30 +0900 Subject: [PATCH 10/11] =?UTF-8?q?=F0=9F=A4=96=20Refactor=20:=20=EC=BD=94?= =?UTF-8?q?=EB=93=9C=20=EB=A6=AC=ED=8C=A9=ED=86=A0=EB=A7=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. 브로커를 테스트에서 제거 2. 단일한 큐를 컨슈머와 프로듀서가 공유 3. 컨슈머가 카운터를 업데이트하는 역할을 가짐 To Do 컨슈머의 consumEvent를 실행하는 스레드와 show를 호출하는 메인 스레드 간 싱크가 맞지 않는다. 모든 consumEvent가 끝나고 show를 호출할 수 있는 방법이 필요하다. --- .../producerCustomer/CounterBroker.java | 50 +++++++------------ .../producerCustomer/CounterCustomer.java | 32 ++++++++---- .../producerCustomer/CounterProducer.java | 13 +++-- .../concurrency/counter/QueueCounterTest.java | 39 ++++++++------- 4 files changed, 70 insertions(+), 64 deletions(-) diff --git a/src/main/java/com/thread/concurrency/counter/producerCustomer/CounterBroker.java b/src/main/java/com/thread/concurrency/counter/producerCustomer/CounterBroker.java index c6a1e8c..da1368a 100644 --- a/src/main/java/com/thread/concurrency/counter/producerCustomer/CounterBroker.java +++ b/src/main/java/com/thread/concurrency/counter/producerCustomer/CounterBroker.java @@ -3,40 +3,26 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; -import java.util.function.IntUnaryOperator; -import java.util.function.UnaryOperator; public class CounterBroker { - // 100개의 이벤트를 저장할 수 있다. - private BlockingQueue> queue = new LinkedBlockingQueue<>(100); - private AtomicInteger count = new AtomicInteger(100); // AtomicInteger로 변경 - public void addEvent(int value){ - try{ - // 이 이벤트를 컨슈머가 처리할 당시 count와 value를 더한 값을 출력한다. - // 만약 4초 동안 프로듀서가 요소를 넣지 못하면 timeout이 발생한다. - queue.offer((c) -> c + value, 4, TimeUnit.SECONDS); - } - catch(InterruptedException e){ - Thread.currentThread().interrupt(); - } - } + // 최대 1000개의 이벤트를 저장할 수 있다. + private final BlockingQueue> queue = new LinkedBlockingQueue<>(1000); - public void consumEvent(){ - try{ - // "value를 더한다"라는 이벤트는 현재 스레드만 가질 수 있다. - // AtomicInteger의 updateAndGet 메서드를 사용하여 원자적으로 값을 업데이트 - Function event = queue.take(); - IntUnaryOperator operator = event::apply; - count.updateAndGet(operator); - } - catch(InterruptedException e){ - Thread.currentThread().interrupt(); - } - } - - public int show(){ - return count.get(); // AtomicInteger의 get 메서드를 사용하여 값을 가져옴 - } +// public void addEvent(int value){ +// try{ +// // 이 이벤트를 컨슈머가 처리할 당시 count와 value를 더한 값을 출력한다. +// queue.put((c) -> c + value); +// } +// catch(InterruptedException e){ +// Thread.currentThread().interrupt(); +// } +// } +// public Function take(){ +// try { +// return queue.take(); +// } catch (InterruptedException e) { +// throw new RuntimeException(e); +// } +// } } diff --git a/src/main/java/com/thread/concurrency/counter/producerCustomer/CounterCustomer.java b/src/main/java/com/thread/concurrency/counter/producerCustomer/CounterCustomer.java index ae14d8b..a5e8d9a 100644 --- a/src/main/java/com/thread/concurrency/counter/producerCustomer/CounterCustomer.java +++ b/src/main/java/com/thread/concurrency/counter/producerCustomer/CounterCustomer.java @@ -1,20 +1,32 @@ package com.thread.concurrency.counter.producerCustomer; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.function.IntUnaryOperator; public class CounterCustomer { - private final CounterBroker counterBroker; - private volatile boolean running; + private final BlockingQueue> queue; + private AtomicInteger count = new AtomicInteger(100); // 스레드 안전성은 synchronized에게 맞기기 때문에 int로 변경. - public CounterCustomer(CounterBroker counterBroker) { - this.counterBroker = counterBroker; - this.running = true; + public CounterCustomer(BlockingQueue> queue) { + this.queue = queue; } - public void stop() { - running = false; // 스레드 종료를 위해 running 플래그를 false로 설정 + + public void consumEvent() throws InterruptedException { + while(!queue.isEmpty()){ + Function event = queue.take(); + IntUnaryOperator operator = event::apply; + synchronized (this){ + System.out.println(count.updateAndGet(operator)); + } + } } - public void consumEvent(){ - while(running){ - counterBroker.consumEvent(); + public int show(){ // 큐가 비어지는 마지막 순간에 if문이 true가 되어 count를 출력해버린다... + while(true){ + if(queue.isEmpty()){ + return count.get(); + } } } } diff --git a/src/main/java/com/thread/concurrency/counter/producerCustomer/CounterProducer.java b/src/main/java/com/thread/concurrency/counter/producerCustomer/CounterProducer.java index c35110d..e068f5b 100644 --- a/src/main/java/com/thread/concurrency/counter/producerCustomer/CounterProducer.java +++ b/src/main/java/com/thread/concurrency/counter/producerCustomer/CounterProducer.java @@ -1,13 +1,16 @@ package com.thread.concurrency.counter.producerCustomer; +import java.util.concurrent.BlockingQueue; +import java.util.function.Function; + public class CounterProducer { - private final CounterBroker counterBroker; + private final BlockingQueue> queue; - public CounterProducer(CounterBroker counterBroker) { - this.counterBroker = counterBroker; + public CounterProducer(BlockingQueue> queue) { + this.queue = queue; } - public void add(int value){ - counterBroker.addEvent(value); + public void add(int value) throws InterruptedException { + queue.put((c) -> c + value); } } diff --git a/src/test/java/com/thread/concurrency/counter/QueueCounterTest.java b/src/test/java/com/thread/concurrency/counter/QueueCounterTest.java index 595bac8..3ff8a3f 100644 --- a/src/test/java/com/thread/concurrency/counter/QueueCounterTest.java +++ b/src/test/java/com/thread/concurrency/counter/QueueCounterTest.java @@ -11,12 +11,8 @@ import java.time.Duration; import java.time.LocalTime; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.*; +import java.util.function.Function; public class QueueCounterTest { private final int counteNumber = 1; @@ -25,29 +21,38 @@ public class QueueCounterTest { @Test @DisplayName("producer consumer 패턴을 이용해서 더하기 이벤트 발생 스레드와 더하기 이벤트 처리 스레드를 분리하자") public void 프로듀서_컨슈며_더하기_멀티_프로듀서_멀티_컨슈머() throws InterruptedException { - CounterBroker counterBroker = new CounterBroker(); - CounterCustomer customer = new CounterCustomer(counterBroker); - CounterProducer producer = new CounterProducer(counterBroker); + BlockingQueue> queue = new LinkedBlockingQueue<>(1000); + CounterCustomer customer = new CounterCustomer(queue); + CounterProducer producer = new CounterProducer(queue); LocalTime lt1 = LocalTime.now(); - int initalCount = counterBroker.show(); + int initalCount = customer.show(); ExecutorService service = Executors.newFixedThreadPool(15); CountDownLatch latch = new CountDownLatch(totalCount); - // CounterCustomer 스레드 생성 및 비동기로 처리 시작 - List> futureList = new ArrayList<>(); - for(int i=0; i<3; i++){ - futureList.add(CompletableFuture.runAsync(customer::consumEvent)); - } // 프로듀서 스레드 생성 for (int i = 0; i < totalCount; i++) { service.submit(() -> { - producer.add(counteNumber); + try { + producer.add(counteNumber); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } latch.countDown(); }); } + // CounterCustomer 스레드 생성 및 비동기로 처리 시작 + for(int i=0; i<3; i++){ + CompletableFuture.runAsync(()->{ + try{ + customer.consumEvent(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + } latch.await(); - int finalCount = counterBroker.show(); + int finalCount = customer.show(); LocalTime lt2 = LocalTime.now(); long dif = Duration.between(lt1, lt2).getNano(); logger.info("프로듀서_컨슈며_더하기_멀티_프로듀서_단일_컨슈머 테스트가 걸린 시간 : " + dif / 1000 + "ms"); From a752e80080b362ec53a2c845c80792e800339d69 Mon Sep 17 00:00:00 2001 From: ohchansol Date: Wed, 3 Apr 2024 12:46:08 +0900 Subject: [PATCH 11/11] =?UTF-8?q?=F0=9F=90=9B=20Fix=20:=20=EC=98=A4?= =?UTF-8?q?=EB=A5=98=20=EC=88=98=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. 아직 프로듀서-컨슈머 문제해결 못함 2. consumer가 아직 이벤트를 처리 중임을 다른 스레드가 알 수 있거나 이벤트를 전부 끝냈다는 정보를 다른 스레드가 알 수 있게할 필요가 있다. --- .../producerCustomer/CounterBroker.java | 28 ---------------- .../producerCustomer/CounterConsumer.java | 33 +++++++++++++++++++ .../producerCustomer/CounterCustomer.java | 32 ------------------ .../counter/CompletableFutureCounterTest.java | 2 +- .../concurrency/counter/QueueCounterTest.java | 15 ++++----- .../counter/SynchronizedCounterTest.java | 4 +-- 6 files changed, 42 insertions(+), 72 deletions(-) delete mode 100644 src/main/java/com/thread/concurrency/counter/producerCustomer/CounterBroker.java create mode 100644 src/main/java/com/thread/concurrency/counter/producerCustomer/CounterConsumer.java delete mode 100644 src/main/java/com/thread/concurrency/counter/producerCustomer/CounterCustomer.java diff --git a/src/main/java/com/thread/concurrency/counter/producerCustomer/CounterBroker.java b/src/main/java/com/thread/concurrency/counter/producerCustomer/CounterBroker.java deleted file mode 100644 index da1368a..0000000 --- a/src/main/java/com/thread/concurrency/counter/producerCustomer/CounterBroker.java +++ /dev/null @@ -1,28 +0,0 @@ -package com.thread.concurrency.counter.producerCustomer; - -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.function.Function; - -public class CounterBroker { - // 최대 1000개의 이벤트를 저장할 수 있다. - private final BlockingQueue> queue = new LinkedBlockingQueue<>(1000); - -// public void addEvent(int value){ -// try{ -// // 이 이벤트를 컨슈머가 처리할 당시 count와 value를 더한 값을 출력한다. -// queue.put((c) -> c + value); -// } -// catch(InterruptedException e){ -// Thread.currentThread().interrupt(); -// } -// } -// public Function take(){ -// try { -// return queue.take(); -// } catch (InterruptedException e) { -// throw new RuntimeException(e); -// } -// } -} diff --git a/src/main/java/com/thread/concurrency/counter/producerCustomer/CounterConsumer.java b/src/main/java/com/thread/concurrency/counter/producerCustomer/CounterConsumer.java new file mode 100644 index 0000000..c80d515 --- /dev/null +++ b/src/main/java/com/thread/concurrency/counter/producerCustomer/CounterConsumer.java @@ -0,0 +1,33 @@ +package com.thread.concurrency.counter.producerCustomer; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.function.IntUnaryOperator; + +public class CounterConsumer { + private final BlockingQueue> queue; + private final AtomicInteger count = new AtomicInteger(100); // 스레드 안전성은 synchronized에게 맞기기 때문에 int로 변경. + + public CounterConsumer(BlockingQueue> queue) { + this.queue = queue; + } + + public void consumeEvent() throws InterruptedException { + while (!queue.isEmpty()) { + System.out.println("현재 큐 사이즈 : "+queue.size()); + Function event = queue.take(); + IntUnaryOperator operator = event::apply; + System.out.println("결과 카운트 : "+count.updateAndGet(operator)); + } + } + public int show(){ // 큐가 비어지는 마지막 순간에 if문이 true가 되어 count를 출력해버린다... + while(true){ + if(queue.isEmpty()){ + int ret = count.get(); + System.out.println("정답은 ? : "+ret); + return ret; + } + } + } +} diff --git a/src/main/java/com/thread/concurrency/counter/producerCustomer/CounterCustomer.java b/src/main/java/com/thread/concurrency/counter/producerCustomer/CounterCustomer.java deleted file mode 100644 index a5e8d9a..0000000 --- a/src/main/java/com/thread/concurrency/counter/producerCustomer/CounterCustomer.java +++ /dev/null @@ -1,32 +0,0 @@ -package com.thread.concurrency.counter.producerCustomer; - -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Function; -import java.util.function.IntUnaryOperator; - -public class CounterCustomer { - private final BlockingQueue> queue; - private AtomicInteger count = new AtomicInteger(100); // 스레드 안전성은 synchronized에게 맞기기 때문에 int로 변경. - - public CounterCustomer(BlockingQueue> queue) { - this.queue = queue; - } - - public void consumEvent() throws InterruptedException { - while(!queue.isEmpty()){ - Function event = queue.take(); - IntUnaryOperator operator = event::apply; - synchronized (this){ - System.out.println(count.updateAndGet(operator)); - } - } - } - public int show(){ // 큐가 비어지는 마지막 순간에 if문이 true가 되어 count를 출력해버린다... - while(true){ - if(queue.isEmpty()){ - return count.get(); - } - } - } -} diff --git a/src/test/java/com/thread/concurrency/counter/CompletableFutureCounterTest.java b/src/test/java/com/thread/concurrency/counter/CompletableFutureCounterTest.java index 236bfd9..eac7fef 100644 --- a/src/test/java/com/thread/concurrency/counter/CompletableFutureCounterTest.java +++ b/src/test/java/com/thread/concurrency/counter/CompletableFutureCounterTest.java @@ -37,7 +37,7 @@ public class CompletableFutureCounterTest { LocalTime lt2 = LocalTime.now(); long dif = Duration.between(lt1, lt2).getNano(); - logger.info("여러_더하기_수행_Compltable 테스트가 걸린 시간 : "+dif/1000+"ms"); + logger.info("여러_더하기_수행_Compltable 테스트가 걸린 시간 : "+dif/1000000+"ms"); Assertions.assertEquals(initalCount+totalCount*counteNumber, finalCount); } } diff --git a/src/test/java/com/thread/concurrency/counter/QueueCounterTest.java b/src/test/java/com/thread/concurrency/counter/QueueCounterTest.java index 3ff8a3f..4903c7b 100644 --- a/src/test/java/com/thread/concurrency/counter/QueueCounterTest.java +++ b/src/test/java/com/thread/concurrency/counter/QueueCounterTest.java @@ -1,7 +1,6 @@ package com.thread.concurrency.counter; -import com.thread.concurrency.counter.producerCustomer.CounterBroker; -import com.thread.concurrency.counter.producerCustomer.CounterCustomer; +import com.thread.concurrency.counter.producerCustomer.CounterConsumer; import com.thread.concurrency.counter.producerCustomer.CounterProducer; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.DisplayName; @@ -22,10 +21,10 @@ public class QueueCounterTest { @DisplayName("producer consumer 패턴을 이용해서 더하기 이벤트 발생 스레드와 더하기 이벤트 처리 스레드를 분리하자") public void 프로듀서_컨슈며_더하기_멀티_프로듀서_멀티_컨슈머() throws InterruptedException { BlockingQueue> queue = new LinkedBlockingQueue<>(1000); - CounterCustomer customer = new CounterCustomer(queue); + CounterConsumer consumer = new CounterConsumer(queue); CounterProducer producer = new CounterProducer(queue); LocalTime lt1 = LocalTime.now(); - int initalCount = customer.show(); + int initalCount = consumer.show(); ExecutorService service = Executors.newFixedThreadPool(15); CountDownLatch latch = new CountDownLatch(totalCount); @@ -44,18 +43,16 @@ public class QueueCounterTest { for(int i=0; i<3; i++){ CompletableFuture.runAsync(()->{ try{ - customer.consumEvent(); + consumer.consumeEvent(); } catch (InterruptedException e) { throw new RuntimeException(e); } }); } - latch.await(); - - int finalCount = customer.show(); + int finalCount = consumer.show(); LocalTime lt2 = LocalTime.now(); long dif = Duration.between(lt1, lt2).getNano(); - logger.info("프로듀서_컨슈며_더하기_멀티_프로듀서_단일_컨슈머 테스트가 걸린 시간 : " + dif / 1000 + "ms"); + logger.info("프로듀서_컨슈며_더하기_멀티_프로듀서_단일_컨슈머 테스트가 걸린 시간 : " + dif / 1000000 + "ms"); Assertions.assertEquals(initalCount + totalCount*counteNumber, finalCount); } } diff --git a/src/test/java/com/thread/concurrency/counter/SynchronizedCounterTest.java b/src/test/java/com/thread/concurrency/counter/SynchronizedCounterTest.java index 0c24bec..2cab7da 100644 --- a/src/test/java/com/thread/concurrency/counter/SynchronizedCounterTest.java +++ b/src/test/java/com/thread/concurrency/counter/SynchronizedCounterTest.java @@ -46,7 +46,7 @@ public class SynchronizedCounterTest { int finalCount = counter.show(); LocalTime lt2 = LocalTime.now(); long dif = Duration.between(lt1, lt2).getNano(); - logger.info("여러_더하기_수행_Executor 테스트가 걸린 시간 : "+dif/1000+"ms"); + logger.info("여러_더하기_수행_Executor 테스트가 걸린 시간 : "+dif/1000000+"ms"); Assertions.assertEquals(initalCount+totalCount*counteNumber, finalCount); } @@ -80,7 +80,7 @@ public class SynchronizedCounterTest { LocalTime lt2 = LocalTime.now(); long dif = Duration.between(lt1, lt2).getNano(); - logger.info("여러_더하기_수행_CompletableFuture 테스트가 걸린 시간 : "+dif/1000+"ms"); + logger.info("여러_더하기_수행_CompletableFuture 테스트가 걸린 시간 : "+dif/1000000+"ms"); Assertions.assertEquals(initalCount+totalCount*counteNumber, finalCount); } }