Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

@Async 메서드 사용 예시 작성 #25

Closed
wants to merge 14 commits into from
6 changes: 3 additions & 3 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
HELP.md
.gradle
build/
.gradle/*
build/*
!gradle/wrapper/gradle-wrapper.jar
!**/src/main/**/build/
!**/src/test/**/build/
Expand All @@ -18,7 +18,7 @@ bin/
!**/src/test/**/bin/

### IntelliJ IDEA ###
.idea
.idea/*
*.iws
*.iml
*.ipr
Expand Down
1 change: 1 addition & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ repositories {

dependencies {
implementation("org.springframework.boot:spring-boot-starter")
implementation("net.jcip:jcip-annotations:1.0")
testImplementation("org.springframework.boot:spring-boot-starter-test")
testRuntimeOnly("org.junit.platform:junit-platform-launcher")
testRuntimeOnly("org.reactivestreams:reactive-streams")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,27 @@

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;

@SpringBootApplication
@EnableAsync
public class SpringThreadConcurrencyApplication {

public static void main(String[] args) {
SpringApplication.run(SpringThreadConcurrencyApplication.class, args);
}

@Bean
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // Spring에서 사용하는 스레드를 제어한느 설정
executor.setCorePoolSize(50); // thread-pool에 살아있는 thread의 최소 개수
executor.setMaxPoolSize(50); // thread-pool에서 사용할 수 있는 최대 개수
executor.setQueueCapacity(500); //thread-pool에 최대 queue 크기
executor.setThreadNamePrefix("AsyncApp-");
executor.initialize();
return executor;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package com.thread.concurrency.async.controller;

import com.thread.concurrency.async.service.AsyncService;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Controller;

import java.time.Duration;
import java.time.LocalTime;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;

@Controller
public class AsyncController {
private final AsyncService asyncService;

public AsyncController(AsyncService asyncService) {
this.asyncService = asyncService;
}

@Async
public CompletableFuture<String> calculateRunTime(int cnt, int waitTime) throws InterruptedException {
LocalTime lt1 = LocalTime.now();
List<CompletableFuture<String>> hellos = new ArrayList<>();
for(int i=0; i<cnt; i++){
hellos.add(asyncService.voidParamStringReturn(waitTime,i+"번째 메세지"));
}
// 모든 비동기 호출이 완료될 때까지 대기하고 결과를 리스트에 넣기
List<String> results = hellos.stream().map(CompletableFuture::join)
.toList();
LocalTime lt2 = LocalTime.now();
long dif = Duration.between(lt1, lt2).toMillis();
return CompletableFuture.completedFuture(dif+"가 걸렸습니다.");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.thread.concurrency.async.service;

import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

import java.util.concurrent.CompletableFuture;


@Service
public class AsyncService {
@Async
public CompletableFuture<String> voidParamStringReturn (long waitTime, String message) throws InterruptedException{
System.out.println("비동기적으로 실행 - "+
Thread.currentThread().getName());
Thread.sleep(waitTime);
return CompletableFuture.completedFuture(message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.thread.concurrency.counter;

import org.springframework.stereotype.Component;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;

@Component
public class CompletableFutureCounter implements Counter{

private CompletableFuture<Integer> counter = new CompletableFuture<>();

@Override
public void add(int value) {
// 연산이 진행 중이라면 기다렸다가 thenApply
// 카운트에 값 저장
counter = counter.thenApply((c) -> c + value);
}

@Override
public int show() {
try {
// 카운트에 대한 연산이 실행 중이라면 기다렸다가 가져오기
return counter.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.thread.concurrency.counter;

import net.jcip.annotations.GuardedBy;
import org.springframework.stereotype.Component;

@Component
public class SynchronizedCounter implements Counter{

@GuardedBy("this")
private int counter = 100;

@Override
public synchronized void add(int value) {
counter += value;
}

@Override
public synchronized int show() {
return counter;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.thread.concurrency.counter.producerCustomer;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.IntUnaryOperator;

public class CounterConsumer {
private final BlockingQueue<Function<Integer, Integer>> queue;
private final AtomicInteger count = new AtomicInteger(100); // 스레드 안전성은 synchronized에게 맞기기 때문에 int로 변경.

public CounterConsumer(BlockingQueue<Function<Integer, Integer>> queue) {
this.queue = queue;
}

public void consumeEvent() throws InterruptedException {
while (!queue.isEmpty()) {
System.out.println("현재 큐 사이즈 : "+queue.size());
Function<Integer, Integer> event = queue.take();
IntUnaryOperator operator = event::apply;
System.out.println("결과 카운트 : "+count.updateAndGet(operator));
}
}
public int show(){ // 큐가 비어지는 마지막 순간에 if문이 true가 되어 count를 출력해버린다...
while(true){
if(queue.isEmpty()){
int ret = count.get();
System.out.println("정답은 ? : "+ret);
return ret;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.thread.concurrency.counter.producerCustomer;

import java.util.concurrent.BlockingQueue;
import java.util.function.Function;

public class CounterProducer {
private final BlockingQueue<Function<Integer, Integer>> queue;

public CounterProducer(BlockingQueue<Function<Integer, Integer>> queue) {
this.queue = queue;
}

public void add(int value) throws InterruptedException {
queue.put((c) -> c + value);
}
}
32 changes: 32 additions & 0 deletions src/test/java/com/thread/concurrency/AsyncControllerTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.thread.concurrency;

import com.thread.concurrency.async.controller.AsyncController;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;

@SpringBootTest
public class AsyncControllerTest {
private static final Logger logger = LoggerFactory.getLogger(AsyncServiceTest.class);

@Autowired
private AsyncController asyncController;

@Test
public void invokeMultiAsyncMethod() throws InterruptedException {
List<CompletableFuture<String>> hellos = new ArrayList<>();
for(int i=0; i<10; i++){
hellos.add(asyncController.calculateRunTime(10, 1000));
}
// 모든 비동기 호출이 완료될 때까지 대기하고 결과를 리스트에 넣기
List<String> results = hellos.stream().map(CompletableFuture::join)
.toList();
results.forEach(logger::info);
}
}
52 changes: 52 additions & 0 deletions src/test/java/com/thread/concurrency/AsyncServiceTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package com.thread.concurrency;

import com.thread.concurrency.async.service.AsyncService;
import org.junit.jupiter.api.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

@SpringBootTest
public class AsyncServiceTest {
private static final Logger logger = LoggerFactory.getLogger(AsyncServiceTest.class);
@Autowired
private AsyncService asyncService;

@Test
@DisplayName("입력은 void 출력은 String인 비동기 함수 단일 호출")
public void testGetString() throws ExecutionException, InterruptedException {
CompletableFuture<String> helloWorld = asyncService.voidParamStringReturn(1000, "기본 메세지");
Assertions.assertEquals("기본 메세지",helloWorld.get());
}

@Test
@DisplayName("입력은 void 출력은 String인 비동기 함수 다중 호출")
public void testGetMultiString() throws InterruptedException {
List<CompletableFuture<String>> hellos = new ArrayList<>();
for(int i=0; i<100; i++){
hellos.add(asyncService.voidParamStringReturn(1000,i+"번째 메세지"));
}
// 모든 비동기 호출이 완료될 때까지 대기하고 결과를 리스트에 넣기
List<String> results = hellos.stream().map(CompletableFuture::join)
.toList();
results.forEach(logger::info);
}

@Test
@DisplayName("입력은 void 출력은 String인 비동기 함수 단일 호출 타임아웃 발생.")
public void testGetStringTimeOutIsThisAsync() throws InterruptedException {
// voidParamStringReturn가 비동기 메서드인지 의문이 생김.
CompletableFuture<String> completableFuture = asyncService.voidParamStringReturn(4000, "타임아웃 발생 안 함!");
long timeOutValue = 1;
TimeUnit timeUnit = TimeUnit.SECONDS;
// 1초가 지난 후 타임아웃 발생
Assertions.assertThrows(ExecutionException.class, () -> completableFuture.orTimeout(timeOutValue,timeUnit).get());
}
}
35 changes: 35 additions & 0 deletions src/test/java/com/thread/concurrency/counter/BasicCounterTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package com.thread.concurrency.counter;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.concurrent.CompletableFuture;

@SpringBootTest
public class BasicCounterTest {

private final BasicCounter basicCounter;
private final int counteNumber = 1;
private final int totalCount = 100;

@Autowired
public BasicCounterTest(BasicCounter basicCounter) {
this.basicCounter = basicCounter;
}

@Test
@DisplayName("스레드 안전하지 않는 카운터로 동시에 여러 더하기 수행하기. 실패 예상")
public void 여러_더하기_수행(){
int initalCount = basicCounter.show();

for(int i=0; i<totalCount; i++){
CompletableFuture.runAsync(() -> {
basicCounter.add(counteNumber);
});
}
int finalCount = basicCounter.show();
Assertions.assertNotEquals(initalCount+totalCount*counteNumber, finalCount);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package com.thread.concurrency.counter;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.test.context.SpringBootTest;

import java.time.Duration;
import java.time.LocalTime;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@SpringBootTest
public class CompletableFutureCounterTest {
private final int counteNumber = 1;
private final int totalCount = 10000;
private static final Logger logger = LoggerFactory.getLogger(CompletableFutureCounterTest.class);

@Test
public void 여러_더하기_수행_Compltable() throws InterruptedException {
SynchronizedCounter counter = new SynchronizedCounter();
LocalTime lt1 = LocalTime.now();

int initalCount = counter.show();
ExecutorService service = Executors.newFixedThreadPool(15);
CountDownLatch latch = new CountDownLatch(totalCount);
for (int i = 0; i < totalCount; i++) {
service.submit(() -> {
counter.add(counteNumber);
latch.countDown();
});
}
latch.await();
int finalCount = counter.show();

LocalTime lt2 = LocalTime.now();
long dif = Duration.between(lt1, lt2).getNano();
logger.info("여러_더하기_수행_Compltable 테스트가 걸린 시간 : "+dif/1000000+"ms");
Assertions.assertEquals(initalCount+totalCount*counteNumber, finalCount);
}
}
Loading