diff --git a/.github/workflows/deploy-websocket.yml b/.github/workflows/deploy-websocket.yml new file mode 100644 index 00000000..9a7bba5c --- /dev/null +++ b/.github/workflows/deploy-websocket.yml @@ -0,0 +1,101 @@ +name: Fintory Websocket Module CI/CD +on: + push: + branches: [ "dev" ] + pull_request: + branches: [ "dev" ] + +env: + AWS_REGION: ap-northeast-2 + +jobs: + build-and-push: + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v3 + + - name: Set up JDK 17 + uses: actions/setup-java@v3 + with: + distribution: 'temurin' + java-version: '17' + + - name: Grant permission to gradlew + run: chmod +x ./gradlew + + - name: Cache Gradle + uses: actions/cache@v3 + with: + path: | + ~/.gradle/caches + ~/.gradle/wrapper + key: ${{ runner.os }}-gradle-${{ hashFiles('**/*.gradle*', '**/gradle-wrapper.properties') }} + restore-keys: | + ${{ runner.os }}-gradle- + + - name: Configure AWS credentials + uses: aws-actions/configure-aws-credentials@v2 + with: + aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }} + aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }} + aws-region: ${{ env.AWS_REGION }} + + - name: Login to Amazon ECR + id: login-ecr + uses: aws-actions/amazon-ecr-login@v1 + + - name: Build and push websocket + run: | + ./gradlew :websocket:clean build + docker build -t websocket:latest ./websocket + docker tag websocket:latest ${{ steps.login-ecr.outputs.registry }}/fintory-child:websocket-latest + docker push ${{ steps.login-ecr.outputs.registry }}/fintory-child:websocket-latest + + deploy-websocket: + needs: build-and-push + runs-on: ubuntu-latest + steps: + - name: Configure AWS credentials + uses: aws-actions/configure-aws-credentials@v2 + with: + aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }} + aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }} + aws-region: ${{ env.AWS_REGION }} + + - name: Login to Amazon ECR + id: login-ecr + uses: aws-actions/amazon-ecr-login@v1 + + - name: Deploy fintory-websocket to EC2 + uses: appleboy/ssh-action@v1.0.0 + with: + host: ${{ secrets.FINTORY_WEBSOCKET_HOST }} + username: ubuntu + key: ${{ secrets.EC2_WEBSOCKET_SSH_KEY }} + script: | + cat > .env << 'EOF' + ECR_REGISTRY=${{ steps.login-ecr.outputs.registry }} + RDS_URL=${{ secrets.RDS_URL }} + RDS_USERNAME=${{ secrets.RDS_USERNAME }} + RDS_PASSWORD=${{ secrets.RDS_PASSWORD }} + AWS_REDIS_HOST=${{ secrets.FINTORY_CHILD_PRIVATE_HOST }} + AWS_REDIS_PASSWORD=${{ secrets.AWS_REDIS_PASSWORD }} + HANTU_APPKEY=${{ secrets.HANTU_APPKEY}} + HANTU_APPSECRET=${{ secrets.HANTU_APPSECRET}} + DB_APPKEY=${{ secrets.DB_APPKEY}} + DB_APPSECRET=${{ secrets.DB_APPSECRET}} + EOS_API_KEY=${{ secrets.EOS_API_KEY}} + EOF + + + aws ecr get-login-password --region ap-northeast-2 | \ + docker login --username AWS --password-stdin ${{ steps.login-ecr.outputs.registry }} + + docker pull ${{ steps.login-ecr.outputs.registry }}/fintory-child:websocket-latest + + docker compose down + docker compose up -d + + docker image prune -f + diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml index 34cca234..80db38df 100644 --- a/.github/workflows/deploy.yml +++ b/.github/workflows/deploy.yml @@ -95,10 +95,11 @@ jobs: export OPENAI_MODEL=${{ secrets.OPENAI_MODEL}} export OPENAI_TEMPERATURE=${{secrets.OPENAI_TEMPERATURE}} export OPENAI_MAX_TOKENS=${{secrets.OPENAI_MAX_TOKENS}} + export WEBSOCKET_PRIVATE_HOST=${{secrets.WEBSOCKET_PRIVATE_HOST}} export FIREBASE_CONFIG='${{secrets.FIREBASE_CONFIG}}' aws ecr get-login-password --region ap-northeast-2 | \ - docker login --username AWS --password-stdin ${{ steps.login-ecr.outputs.registry }} + docker login --username AWS --password-stdin ${{ steps.login-ecr.outputs.registry }} docker pull ${{ steps.login-ecr.outputs.registry }}/fintory-child:latest @@ -106,4 +107,4 @@ jobs: docker compose up -d docker image prune -f - + diff --git a/app-child/Dockerfile b/app-child/Dockerfile index 241a224c..26a1452e 100644 --- a/app-child/Dockerfile +++ b/app-child/Dockerfile @@ -10,4 +10,4 @@ WORKDIR /app COPY build/libs/*.jar app.jar COPY src/main/resources/application-deploy.yml /app/application-deploy.yml -ENTRYPOINT ["java", "-jar","app.jar", "--spring.profiles.active=deploy"] +ENTRYPOINT ["java", "-jar", "app.jar", "--spring.profiles.active=deploy"] diff --git a/app-child/src/main/java/com/fintory/child/ChildApplication.java b/app-child/src/main/java/com/fintory/child/ChildApplication.java index 48274f26..83c90930 100644 --- a/app-child/src/main/java/com/fintory/child/ChildApplication.java +++ b/app-child/src/main/java/com/fintory/child/ChildApplication.java @@ -15,7 +15,7 @@ @SpringBootApplication(scanBasePackages = { "com.fintory.infra", "com.fintory.auth", - "com.fintory.child" + "com.fintory.child", }) @ConfigurationPropertiesScan(basePackages = { "com.fintory.auth" diff --git a/app-child/src/main/java/com/fintory/child/domain/stock/controller/common/CommonStockControllerImpl.java b/app-child/src/main/java/com/fintory/child/domain/stock/controller/common/CommonStockControllerImpl.java index da22cbbe..19a1a0a9 100644 --- a/app-child/src/main/java/com/fintory/child/domain/stock/controller/common/CommonStockControllerImpl.java +++ b/app-child/src/main/java/com/fintory/child/domain/stock/controller/common/CommonStockControllerImpl.java @@ -4,13 +4,14 @@ import com.fintory.domain.stock.dto.korean.response.StockSearchResponse; import com.fintory.domain.stock.dto.websocket.MarketStatusResponse; import com.fintory.domain.stock.service.common.CommonStockService; -import com.fintory.domain.stock.service.websocket.LiveStockPriceWebsocketService; import lombok.RequiredArgsConstructor; +import org.springframework.beans.factory.annotation.Value; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.client.RestTemplate; import java.util.List; @@ -20,22 +21,27 @@ public class CommonStockControllerImpl implements CommonStockController { private final CommonStockService commonStockService; - private final LiveStockPriceWebsocketService websocketService; + private final RestTemplate restTemplate; + + @Value("${websocket.server.url}") + private String websocketServerUrl; //주식 종목 검색 @Override @GetMapping("/search") public ResponseEntity>> searchStock(@RequestParam String keyword) { - List stockSearchRespons = commonStockService.searchStock(keyword); - return ResponseEntity.ok(ApiResponse.ok(stockSearchRespons)); + List stockSearchResponse = commonStockService.searchStock(keyword); + return ResponseEntity.ok(ApiResponse.ok(stockSearchResponse)); } //장시간 리턴 @Override @GetMapping("/opened-market") public ResponseEntity> getMarketStatus(){ - return ResponseEntity.ok(ApiResponse.ok(websocketService.getMarketStatus())); + String url = "http://"+websocketServerUrl+":8080" + "/api/websocket/market/status"; + MarketStatusResponse response = restTemplate.getForObject(url, MarketStatusResponse.class); + return ResponseEntity.ok(ApiResponse.ok(response)); } } diff --git a/app-child/src/main/java/com/fintory/child/domain/stock/controller/websocket/StockWebSocketControllerImpl.java b/app-child/src/main/java/com/fintory/child/domain/stock/controller/websocket/StockWebSocketControllerImpl.java deleted file mode 100644 index ae07653e..00000000 --- a/app-child/src/main/java/com/fintory/child/domain/stock/controller/websocket/StockWebSocketControllerImpl.java +++ /dev/null @@ -1,118 +0,0 @@ -package com.fintory.child.domain.stock.controller.websocket; -import com.fintory.domain.stock.dto.websocket.StockMessageRequest; -import com.fintory.domain.stock.model.Stock; -import com.fintory.domain.stock.service.websocket.LiveStockPriceWebsocketService; -import com.fintory.infra.domain.stock.repository.StockRepository; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.springframework.messaging.handler.annotation.MessageMapping; -import org.springframework.scheduling.annotation.Async; -import org.springframework.stereotype.Controller; - -import java.util.List; - - -@Controller //Http ResponseBody가 아닌 WebSocket 메시지 브로드 캐스트용 -@RequiredArgsConstructor -@Slf4j -public class StockWebSocketControllerImpl { - - private final LiveStockPriceWebsocketService liveStockWebSocketService; - private final StockRepository stockRepository; - - /** - * 프론트엔드에서 STOMP로 한국 주식 구독 요청 - * - * @param request 종목 코드가 포함된 요청 - */ - @MessageMapping("/stock/subscribe/korean") - public void subscribeKoreanStock(StockMessageRequest request) { - log.info("STOMP를 통한 한국 종목 {} 구독 요청", request.code()); - // 실제 WebSocket 구독 시작 (자동 연결 포함) - liveStockWebSocketService.koreanStockSubscribe(request.code()); - } - - /** - * 프론트엔드에서 STOMP로 한국 주식 구독 해제 요청 - */ - @MessageMapping("/stock/unsubscribe/korean") - public void unsubscribeKoreanStock(StockMessageRequest request) { - log.info("STOMP를 통한 한국 종목 {} 구독 해제 요청", request.code()); - liveStockWebSocketService.koreanStockUnsubscribe(request.code()); - } - - /** - * 프론트엔드에서 STOMP로 해외 주식 구독 요청 - */ - @MessageMapping("/stock/subscribe/overseas") - public void subscribeOverseasStock(StockMessageRequest request) { - log.info("STOMP를 통한 해외 종목 {} 구독 요청", request.code()); - liveStockWebSocketService.overseasStockSubscribe(request.code()); - } - - /** - * 프론트엔드에서 STOMP로 해외 주식 구독 해제 요청 - */ - @MessageMapping("/stock/unsubscribe/overseas") - public void unsubscribeOverseasStock(StockMessageRequest request) { - log.info("STOMP를 통한 해외 종목 {} 구독 해제 요청", request.code()); - liveStockWebSocketService.overseasStockUnsubscribe(request.code()); - } - - //NOTE 프론트에서 어떻게 구현할지 정해지지 않아서 현재처럼 전체 구독 + 개별 구독을 같이 구현 - /** - * 전체 구독 - * - */ - @MessageMapping("/stock/subscribe-all/korean") - @Async - public void subscribeKoreanStockAll() { - List stockList = stockRepository.findByCurrencyName("KRW"); - for (Stock stock : stockList) { - log.info("STOMP를 통한 국내 종목 {} 구독 요청", stock.getCode()); - // 실제 WebSocket 구독 시작 (자동 연결 포함) - liveStockWebSocketService.koreanStockSubscribe(stock.getCode()); - } - log.info("국내 종목 일괄 구독 완료"); - - } - - @MessageMapping("/stock/unsubscribe-all/korean") - @Async - public void unsubscribeKoreanStockAll() { - List stockList = stockRepository.findByCurrencyName("KRW"); - for (Stock stock : stockList) { - log.info("STOMP를 통한 국내 종목 {} 구독 취소 요청", stock.getCode()); - // 실제 WebSocket 구독 시작 (자동 연결 포함) - liveStockWebSocketService.koreanStockUnsubscribe(stock.getCode()); - } - log.info("국내 종목 일괄 구독 취소 완료"); - - } - - @MessageMapping("/stock/subscribe-all/overseas") - @Async - public void subscribeOverseasStockAll() { - List stockList = stockRepository.findByCurrencyName("USD"); - for (Stock stock : stockList) { - log.info("STOMP를 통한 해외 종목 {} 구독 요청", stock.getCode()); - // 실제 WebSocket 구독 시작 (자동 연결 포함) - liveStockWebSocketService.overseasStockSubscribe(stock.getCode()); - } - log.info("해외 종목 일괄 구독 완료"); - - } - - @MessageMapping("/stock/unsubscribe-all/overseas") - @Async - public void unsubscribeOverseasStockAll() { - List stockList = stockRepository.findByCurrencyName("USD"); - for (Stock stock : stockList) { - log.info("STOMP를 통한 해외 종목 {} 구독 취소 요청", stock.getCode()); - // 실제 WebSocket 구독 시작 (자동 연결 포함) - liveStockWebSocketService.overseasStockUnsubscribe(stock.getCode()); - } - log.info("해외 종목 일괄 구독 취소 완료"); - } - -} \ No newline at end of file diff --git a/app-child/src/main/java/com/fintory/child/domain/stock/controller/websocket/StockWebsocketController.java b/app-child/src/main/java/com/fintory/child/domain/stock/controller/websocket/StockWebsocketController.java deleted file mode 100644 index bc7d57aa..00000000 --- a/app-child/src/main/java/com/fintory/child/domain/stock/controller/websocket/StockWebsocketController.java +++ /dev/null @@ -1,50 +0,0 @@ -package com.fintory.child.domain.stock.controller.websocket; - -import io.swagger.v3.oas.annotations.Operation; -import io.swagger.v3.oas.annotations.Parameter; -import io.swagger.v3.oas.annotations.responses.ApiResponse; -import org.springframework.http.ResponseEntity; -import org.springframework.web.bind.annotation.PathVariable; - -public interface StockWebsocketController { - - /* 개별 구독 */ - @Operation(summary = "한국 주식 실시간 구독", description = "한국 주식 종목의 실시간 가격 정보를 구독합니다.") - @ApiResponse(responseCode = "200", description = "한국 주식 구독 성공") - ResponseEntity> subscribeKoreanStock( - @Parameter(description = "종목 코드", example = "005930") @PathVariable String code); - - @Operation(summary = "한국 주식 실시간 구독 해제", description = "한국 주식 종목의 실시간 가격 정보 구독을 해제합니다") - @ApiResponse(responseCode = "200", description = "한국 주식 구독 해제 성공") - ResponseEntity> unsubscribeKoreanStock( - @Parameter(description = "종목 코드", example = "005930") @PathVariable String code); - - @Operation(summary = "해외 주식 실시간 구독", description = "해외 주식 종목의 실시간 가격 정보를 구독합니다.") - @ApiResponse(responseCode = "200", description = "해외 주식 구독 성공") - ResponseEntity> subscribeOverseasStock( - @Parameter(description = "종목 코드", example = "AAPL") @PathVariable String code); - - @Operation(summary = "해외 주식 실시간 구독 해제", description = "해외 주식 종목의 실시간 가격 정보 구독을 해제합니다") - @ApiResponse(responseCode = "200", description = "해외 주식 구독 해제 성공") - ResponseEntity> unsubscribeOverseasStock( - @Parameter(description = "종목 코드", example = "AAPL") @PathVariable String code); - - - /* 전체 구독 */ - @Operation(summary = "한국 주식 전체 구독", description = "모든 한국 주식을 일괄 구독합니다") - @ApiResponse(responseCode = "200", description = "구독 시작") - ResponseEntity> subscribeAllKoreanStocks(); - - @Operation(summary = "한국 주식 전체 구독 해제", description = "모든 한국 주식 구독을 해제합니다") - @ApiResponse(responseCode = "200", description = "구독 해제 시작") - ResponseEntity> unsubscribeAllKoreanStocks(); - - @Operation(summary = "해외 주식 전체 구독", description = "모든 해외 주식을 일괄 구독합니다") - @ApiResponse(responseCode = "200", description = "구독 시작") - ResponseEntity> subscribeAllOverseasStocks(); - - @Operation(summary = "해외 주식 전체 구독 해제", description = "모든 해외 주식 구독을 해제합니다") - @ApiResponse(responseCode = "200", description = "구독 해제 시작") - ResponseEntity> unsubscribeAllOverseasStocks(); - -} diff --git a/app-child/src/main/java/com/fintory/child/domain/stock/metrics/WebSocketMetrics.java b/app-child/src/main/java/com/fintory/child/domain/stock/metrics/WebSocketMetrics.java deleted file mode 100644 index 4918f31c..00000000 --- a/app-child/src/main/java/com/fintory/child/domain/stock/metrics/WebSocketMetrics.java +++ /dev/null @@ -1,64 +0,0 @@ -package com.fintory.child.domain.stock.metrics; - -import com.fintory.domain.stock.service.websocket.LiveStockPriceWebsocketService; -import io.micrometer.core.instrument.Gauge; -import io.micrometer.core.instrument.MeterRegistry; -import jakarta.annotation.PostConstruct; -import lombok.RequiredArgsConstructor; -import org.springframework.messaging.simp.user.SimpUserRegistry; -import org.springframework.stereotype.Component; - -//REVIEW 혹시 해당 파일의 위치를 바꾸길 원하시면 리뷰 주세요! ->Micrometer가 있는 모듈에서 작업하기 위해 해당 위치를 선택함 -@Component -@RequiredArgsConstructor -public class WebSocketMetrics { - - private final MeterRegistry meterRegistry; - private final LiveStockPriceWebsocketService websocketService; - private final SimpUserRegistry simpUserRegistry; - - @PostConstruct - public void registerMetrics(){ - - //실제 연결된 클라이언트 수 - Gauge.builder("websocket.clients.connected", - simpUserRegistry, registry -> registry.getUserCount()) - .description("Number of websocket clients connected") - .register(meterRegistry); - - //활성 세션 수 - Gauge.builder("websocket.sessions.active", - simpUserRegistry, registry -> registry.getUsers().stream() - .mapToInt(user-> user.getSessions().size()) - .sum()) - .description("Number of websocket sessions active") - .register(meterRegistry); - - // 국내 주식 활성 구독 종목 수 - Gauge.builder("websocket.korean.subscriptions.active", - websocketService, service -> service.getKoreanSubscribedStocks().size()) - .description("Active Korean Stock subscriptions count") - .register(meterRegistry); - - // 해외 주식 활성 구독 종목 수 - Gauge.builder("websocket.overseas.subscriptions.active", - websocketService, service -> service.getOverseasSubscribedStocks().size()) - .description("Active Overseas Stock subscriptions count") - .register(meterRegistry); - - // 국내 Websocket 연결 상태 - Gauge.builder("websocket.korean.connected", - websocketService, service -> service.isKoreanConnected() ? 1.0 : 0.0) - .description("Korean WebSocket connection status") - .register(meterRegistry); - - // 해외 Websocket 연결 상태 - Gauge.builder("websocket.overseas.connected", - websocketService, service -> service.isOverseasConnected() ? 1.0 : 0.0) - .description("Overseas WebSocket connection status") - .register(meterRegistry); - - } -} - - diff --git a/app-child/src/main/java/com/fintory/child/exceptionhandler/GlobalExceptionHandler.java b/app-child/src/main/java/com/fintory/child/exceptionhandler/GlobalExceptionHandler.java index eeb9dcd3..436c5e77 100644 --- a/app-child/src/main/java/com/fintory/child/exceptionhandler/GlobalExceptionHandler.java +++ b/app-child/src/main/java/com/fintory/child/exceptionhandler/GlobalExceptionHandler.java @@ -3,6 +3,7 @@ import com.fintory.common.exception.DomainErrorCode; import com.fintory.common.exception.ExceptionResponse; +import jakarta.servlet.http.HttpServletRequest; import jakarta.validation.ConstraintViolation; import jakarta.validation.ConstraintViolationException; import lombok.extern.slf4j.Slf4j; @@ -73,5 +74,4 @@ public ResponseEntity handleUnhandledException(Exception e) { .body(new ExceptionResponse(DomainErrorCode.INTERNAL_SERVER_ERROR)); } - } diff --git a/app-child/src/main/resources/application-deploy.yml b/app-child/src/main/resources/application-deploy.yml index fcd29fb3..c7d09f66 100644 --- a/app-child/src/main/resources/application-deploy.yml +++ b/app-child/src/main/resources/application-deploy.yml @@ -6,6 +6,7 @@ spring: activate: on-profile: deploy + datasource: driver-class-name: com.mysql.cj.jdbc.Driver url: ${RDS_URL} @@ -73,7 +74,7 @@ management: endpoints: web: exposure: - include: health, prometheus + include: health, prometheus,metrics endpoint: health: show-details: always @@ -92,4 +93,8 @@ eos: api-key: ${EOS_API_KEY} firebase: - config: ${FIREBASE_CONFIG} \ No newline at end of file + config: ${FIREBASE_CONFIG} + +websocket: + server: + url: ${WEBSOCKET_PRIVATE_HOST} \ No newline at end of file diff --git a/app-child/src/main/resources/application-local.yml b/app-child/src/main/resources/application-local.yml index 707f6b63..f4ab3474 100644 --- a/app-child/src/main/resources/application-local.yml +++ b/app-child/src/main/resources/application-local.yml @@ -100,6 +100,9 @@ db-openapi: firebase: config: ${FIREBASE_CONFIG} +websocket: + server: + url: http://localhost:8081 diff --git a/domain/build.gradle b/domain/build.gradle index b52bb85a..a8a32633 100644 --- a/domain/build.gradle +++ b/domain/build.gradle @@ -50,7 +50,6 @@ dependencies { /* Micrometer(Prometheus) */ implementation("io.micrometer:micrometer-registry-prometheus:1.15.2") - } test { diff --git a/domain/src/main/java/com/fintory/domain/portfolio/service/ExchangeRateService.java b/domain/src/main/java/com/fintory/domain/portfolio/service/ExchangeRateService.java index b309ca3c..47f8ceb5 100644 --- a/domain/src/main/java/com/fintory/domain/portfolio/service/ExchangeRateService.java +++ b/domain/src/main/java/com/fintory/domain/portfolio/service/ExchangeRateService.java @@ -5,7 +5,7 @@ public interface ExchangeRateService { /** - * ECOS 에서 제공하는 오늘의 환율 값 조회 메소드 + * ECOS 에서 제공하는 오늘의 환율 값 조회 메소드(레디스에 저장된 값 불러오기) * @return 환율 값 */ BigDecimal getExchangeRate(); diff --git a/domain/src/main/java/com/fintory/domain/stock/dto/websocket/LiveStockPriceStream.java b/domain/src/main/java/com/fintory/domain/stock/dto/websocket/LiveStockPriceStream.java index c69197d6..f761858d 100644 --- a/domain/src/main/java/com/fintory/domain/stock/dto/websocket/LiveStockPriceStream.java +++ b/domain/src/main/java/com/fintory/domain/stock/dto/websocket/LiveStockPriceStream.java @@ -2,6 +2,7 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import java.io.Serializable; import java.math.BigDecimal; @JsonIgnoreProperties(ignoreUnknown = true) @@ -10,4 +11,6 @@ public record LiveStockPriceStream( BigDecimal currentPrice, BigDecimal priceChange, BigDecimal priceChangeRate -){ } +) implements Serializable { + private static final long serialVersionUID = 1L; +} diff --git a/domain/src/main/java/com/fintory/domain/stock/service/websocket/LiveStockPriceWebSocketSaverService.java b/domain/src/main/java/com/fintory/domain/stock/service/websocket/LiveStockPriceWebSocketSaverService.java deleted file mode 100644 index 34b668c8..00000000 --- a/domain/src/main/java/com/fintory/domain/stock/service/websocket/LiveStockPriceWebSocketSaverService.java +++ /dev/null @@ -1,7 +0,0 @@ -package com.fintory.domain.stock.service.websocket; - -import com.fintory.domain.stock.dto.websocket.LiveStockPriceStream; - -public interface LiveStockPriceWebSocketSaverService { - void saveStockData(LiveStockPriceStream dto); -} diff --git a/domain/src/main/java/com/fintory/domain/stock/service/websocket/LiveStockPriceWebsocketService.java b/domain/src/main/java/com/fintory/domain/stock/service/websocket/LiveStockPriceWebsocketService.java deleted file mode 100644 index 825fa2bc..00000000 --- a/domain/src/main/java/com/fintory/domain/stock/service/websocket/LiveStockPriceWebsocketService.java +++ /dev/null @@ -1,62 +0,0 @@ -package com.fintory.domain.stock.service.websocket; - -import com.fintory.domain.stock.dto.websocket.MarketStatusResponse; - -import java.util.Map; -import java.util.Set; - -/** - * 실시간 주식 가격 WebSocket 서비스 인터페이스 - * 한국 및 해외 주식의 실시간 가격 구독/해제 및 데이터 전송을 담당 - */ -public interface LiveStockPriceWebsocketService { - - /** - * 한국 주식 종목 구독 - * @param code 주식 종목 코드 - * @throws com.fintory.common.exception.DomainException 구독 실패 시 - */ - void koreanStockSubscribe(String code); - - /** - * 한국 주식 종목 구독 해제 - * @param code 주식 종목 코드 - * @throws com.fintory.common.exception.DomainException 구독 해제 실패 시 - */ - void koreanStockUnsubscribe(String code); - - /** - * 해외 주식 종목 구독 - * @param code 주식 종목 코드 - * @throws com.fintory.common.exception.DomainException 구독 실패 시 - */ - void overseasStockSubscribe(String code); - - /** - * 해외 주식 종목 구독 해제 - * @param code 주식 종목 코드 - * @throws com.fintory.common.exception.DomainException 구독 해제 실패 시 - */ - void overseasStockUnsubscribe(String code); - - /** - * 해당 토픽을 구독한 사용자(프론트)들에게 주식 데이터 브로드캐스트 - * @param stockCode 주식 종목 코드 - * @param stockData 전송할 주식 데이터 (KoreanLiveStockPriceStream 또는 OverseasLiveStockPriceStream) - */ - void sendStockData(String stockCode, Object stockData); - - - /** - * stomp 구독 시 어떤 장이 열린건지 확인 - * @return korean, overseas, no 중 하나 - */ - MarketStatusResponse getMarketStatus(); - - - /* 매트릭용 Getter 함수 추가 */ - Set getKoreanSubscribedStocks(); - Set getOverseasSubscribedStocks(); - boolean isKoreanConnected(); - boolean isOverseasConnected(); -} \ No newline at end of file diff --git a/infra/src/main/java/com/fintory/infra/domain/alarm/config/RedisSubscribeConfig.java b/infra/src/main/java/com/fintory/infra/domain/alarm/config/RedisSubscribeConfig.java new file mode 100644 index 00000000..b7898a8a --- /dev/null +++ b/infra/src/main/java/com/fintory/infra/domain/alarm/config/RedisSubscribeConfig.java @@ -0,0 +1,36 @@ +package com.fintory.infra.domain.alarm.config; + +import com.fintory.infra.domain.alarm.serviceImpl.PriceAlertEventListener; +import lombok.RequiredArgsConstructor; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.listener.ChannelTopic; +import org.springframework.data.redis.listener.RedisMessageListenerContainer; +import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; + +/** + * Spring이 Redis Pub/Sub을 자동으로 구독하도록 설정 => 누가 어떤 채널을 듣고 어떤 메소드로 처리하는지 정의 + */ +@Configuration +@RequiredArgsConstructor +public class RedisSubscribeConfig { + + private static final String PRICE_ALERT_CHANNEL = "price:alert:channel"; + + @Bean + public RedisMessageListenerContainer redisMessageListenerContainer( //Redis로부터 메시지를 받아서 지정된 리스너에게 전달하는 도우미 백그라운드 쓰레드 풀 + RedisConnectionFactory connectionFactory, + MessageListenerAdapter listenerAdapter) { + + RedisMessageListenerContainer container = new RedisMessageListenerContainer(); + container.setConnectionFactory(connectionFactory); + container.addMessageListener(listenerAdapter, new ChannelTopic(PRICE_ALERT_CHANNEL)); //Redis 메시지가 들어올 때 자동으로 PriceAlertEventListener 호출 + return container; + } + + @Bean + public MessageListenerAdapter listenerAdapter(PriceAlertEventListener subscriber) { + return new MessageListenerAdapter(subscriber, "onMessage"); + } +} \ No newline at end of file diff --git a/infra/src/main/java/com/fintory/infra/domain/alarm/event/PriceAlertEvent.java b/infra/src/main/java/com/fintory/infra/domain/alarm/event/PriceAlertEvent.java deleted file mode 100644 index 7a11f80f..00000000 --- a/infra/src/main/java/com/fintory/infra/domain/alarm/event/PriceAlertEvent.java +++ /dev/null @@ -1,17 +0,0 @@ -package com.fintory.infra.domain.alarm.event; - -import com.fintory.domain.stock.dto.websocket.LiveStockPriceStream; -import lombok.Getter; -import org.springframework.context.ApplicationEvent; - -@Getter -public class PriceAlertEvent extends ApplicationEvent { - - private final LiveStockPriceStream stockPriceStream; - - - public PriceAlertEvent(Object source, LiveStockPriceStream stockPriceStream) { - super(source); - this.stockPriceStream = stockPriceStream; - } -} diff --git a/infra/src/main/java/com/fintory/infra/domain/alarm/serviceImpl/PriceAlertEventListener.java b/infra/src/main/java/com/fintory/infra/domain/alarm/serviceImpl/PriceAlertEventListener.java index cbe09fe7..7b28c3e9 100644 --- a/infra/src/main/java/com/fintory/infra/domain/alarm/serviceImpl/PriceAlertEventListener.java +++ b/infra/src/main/java/com/fintory/infra/domain/alarm/serviceImpl/PriceAlertEventListener.java @@ -4,16 +4,15 @@ import com.fintory.domain.alarm.model.NotificationType; import com.fintory.domain.alarm.model.PriceAlert; import com.fintory.domain.alarm.service.AlarmService; -import com.fintory.infra.domain.alarm.event.PriceAlertEvent; +import com.fintory.domain.stock.dto.websocket.LiveStockPriceStream; import com.fintory.infra.domain.alarm.repository.PriceAlertRepository; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.springframework.context.event.EventListener; +import org.springframework.data.redis.connection.MessageListener; import org.springframework.data.redis.core.RedisTemplate; -import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; - +import org.springframework.data.redis.connection.Message; import java.math.BigDecimal; import java.time.Duration; import java.util.List; @@ -21,20 +20,30 @@ @Component @RequiredArgsConstructor @Slf4j -public class PriceAlertEventListener { +public class PriceAlertEventListener implements MessageListener { private final RedisTemplate redisTemplate; private final PriceAlertRepository priceAlertRepository; private final AlarmService alarmService; - @Async("alertExecutor") - @EventListener + @Override + public void onMessage(Message message, byte[] pattern){ + try{ + LiveStockPriceStream stockPriceStream = (LiveStockPriceStream) redisTemplate.getValueSerializer() + .deserialize(message.getBody()); + log.debug("Redis로부터 감시가 체크 요청 수신: {}", stockPriceStream.code()); + handlePriceAlert(stockPriceStream); + } catch (Exception e) { + log.error("Redis 메시지 처리 실패", e); + } + } + @Transactional - public void handlePriceAlert(PriceAlertEvent event){ + public void handlePriceAlert(LiveStockPriceStream stockData){ - String stockCode = event.getStockPriceStream().code(); - BigDecimal currentPrice = event.getStockPriceStream().currentPrice(); + String stockCode = stockData.code(); + BigDecimal currentPrice = stockData.currentPrice(); String cachedKey = "priceAlert:"+stockCode; List priceAlertList = getPriceAlertFromCache(cachedKey,stockCode); diff --git a/infra/src/main/java/com/fintory/infra/domain/portfolio/serviceImpl/ExchangeRateServiceImpl.java b/infra/src/main/java/com/fintory/infra/domain/portfolio/serviceImpl/ExchangeRateServiceImpl.java index 5462b3d8..64faa7f0 100644 --- a/infra/src/main/java/com/fintory/infra/domain/portfolio/serviceImpl/ExchangeRateServiceImpl.java +++ b/infra/src/main/java/com/fintory/infra/domain/portfolio/serviceImpl/ExchangeRateServiceImpl.java @@ -6,12 +6,15 @@ import com.fintory.infra.domain.portfolio.properties.EosProperties; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import org.w3c.dom.Document; import org.w3c.dom.Element; import org.w3c.dom.Node; import org.w3c.dom.NodeList; +import javax.annotation.PostConstruct; import javax.xml.parsers.DocumentBuilder; import javax.xml.parsers.DocumentBuilderFactory; import java.io.ByteArrayInputStream; @@ -19,15 +22,43 @@ import java.math.BigDecimal; import java.net.URL; import java.net.URLConnection; +import java.util.concurrent.TimeUnit; @Service @Slf4j @RequiredArgsConstructor public class ExchangeRateServiceImpl implements ExchangeRateService { + private final RedisTemplate redisTemplate; private final EosProperties eosProperties; + + /* 앱 시작 시 환율 초기화 */ + @PostConstruct + public void initExchangeRate() { + log.info("앱 시작 - 환율 정보 갱신"); + fetchExchangeRate(); + } + + /* 매일 오전 9시에 환율 갱신 */ + @Scheduled(cron = "0 0 9 * * * ") //매일 오전 9시에 시행 + public void refreshExchangeRate(){ + log.info("스케쥴러 실행 - 환율 정보 갱신"); + fetchExchangeRate(); + } + + /* Redis 조회 */ @Override public BigDecimal getExchangeRate(){ + String cached = (String) redisTemplate.opsForValue().get("exchangeRate"); + if(cached == null){ + return fetchExchangeRate(); + } + return new BigDecimal(cached); + } + + + /* API 호출 및 Redis 저장 */ + private BigDecimal fetchExchangeRate(){ try{ String stringUrl = "https://ecos.bok.or.kr/api/KeyStatisticList/"+eosProperties.getApiKey()+"/xml/kr/1/10"; @@ -47,9 +78,18 @@ public BigDecimal getExchangeRate(){ String data = sb.toString(); - log.info(data); - return parseExchangeRate(data); + BigDecimal exchangeRate = parseExchangeRate(data); + + redisTemplate.opsForValue().set("exchangeRate",exchangeRate.toString(),26, TimeUnit.HOURS); //여유 있게 26시간 설정 + + return exchangeRate; }catch (Exception e){ + + String cached = (String) redisTemplate.opsForValue().get("exchangeRate"); + if (cached != null) { + log.warn("API 호출 실패 - 기존 캐시 사용"); + return new BigDecimal(cached); + } log.error("환율 정보 조회 시 에러 발생"); throw new DomainException(DomainErrorCode.EXCHANGE_RATE_ERROR); } @@ -78,7 +118,4 @@ private BigDecimal parseExchangeRate(String data){ throw new DomainException(DomainErrorCode.PARSING_ERROR); } } - - - } diff --git a/infra/src/main/java/com/fintory/infra/domain/stock/repository/StockRankRepository.java b/infra/src/main/java/com/fintory/infra/domain/stock/repository/StockRankRepository.java deleted file mode 100644 index 22b563a4..00000000 --- a/infra/src/main/java/com/fintory/infra/domain/stock/repository/StockRankRepository.java +++ /dev/null @@ -1,26 +0,0 @@ -package com.fintory.infra.domain.stock.repository; - -import com.fintory.domain.stock.model.StockRank; -import org.springframework.data.jpa.repository.JpaRepository; -import org.springframework.data.jpa.repository.Query; -import org.springframework.stereotype.Repository; - -import java.util.List; -import java.util.Optional; - -@Repository -public interface StockRankRepository extends JpaRepository { - Optional findByStockCode(String code); - - @Query("SELECT sr, lsp FROM StockRank sr JOIN FETCH sr.stock s JOIN LiveStockPrice lsp ON lsp.stock =s WHERE sr.stock.currencyName=:currencyName ORDER BY sr.marketCapRank ASC") - List findMarketCapTop20(String currencyName); - - @Query("SELECT sr, lsp FROM StockRank sr JOIN FETCH sr.stock s JOIN LiveStockPrice lsp ON lsp.stock = s WHERE sr.stock.currencyName=:currencyName ORDER BY sr.rocRank ASC") - List findROCTop20(String currencyName); - - @Query("SELECT sr, lsp FROM StockRank sr JOIN FETCH sr.stock s JOIN LiveStockPrice lsp ON lsp.stock = s WHERE sr.stock.currencyName=:currencyName ORDER BY sr.tradingVolumeRank ASC") - List findTradingVolumeTop20(String currencyName); - - @Query("SELECT sr FROM StockRank sr WHERE sr.stock.currencyName=:currencyName") - List findByCurrencyName(String currencyName); -} diff --git a/infra/src/main/java/com/fintory/infra/domain/stock/service/korean/KoreanStockRankServiceImpl.java b/infra/src/main/java/com/fintory/infra/domain/stock/service/korean/KoreanStockRankServiceImpl.java deleted file mode 100644 index fefec73a..00000000 --- a/infra/src/main/java/com/fintory/infra/domain/stock/service/korean/KoreanStockRankServiceImpl.java +++ /dev/null @@ -1,164 +0,0 @@ -package com.fintory.infra.domain.stock.service.korean; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fintory.common.exception.DomainErrorCode; -import com.fintory.common.exception.DomainException; -import com.fintory.domain.stock.dto.korean.core.KoreanStockRankData; -import com.fintory.domain.stock.dto.korean.wrapper.KoreanStockRankDataWrapper; -import com.fintory.domain.stock.model.Stock; -import com.fintory.domain.stock.model.StockRank; -import com.fintory.domain.stock.service.korean.KoreanStockRankService; -import com.fintory.infra.domain.stock.repository.StockRankRepository; -import com.fintory.infra.domain.stock.repository.StockRepository; -import com.fintory.infra.domain.stock.service.korean.saver.KoreanStockRankSaverService; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.data.redis.core.RedisTemplate; -import org.springframework.http.*; -import org.springframework.retry.annotation.Backoff; -import org.springframework.retry.annotation.Retryable; -import org.springframework.stereotype.Service; -import org.springframework.transaction.annotation.Transactional; -import org.springframework.web.client.ResourceAccessException; -import org.springframework.web.client.RestTemplate; -import org.springframework.web.util.UriComponentsBuilder; - -import java.util.Comparator; -import java.util.List; - -@Service -@Slf4j -@RequiredArgsConstructor -public class KoreanStockRankServiceImpl implements KoreanStockRankService { - - - private final StockRepository stockRepository; - private final StockRankRepository stockRankRepository; - private final KoreanStockRankSaverService koreanStockRankSaverService; - private final RedisTemplate redisTemplate; - private final RestTemplate restTemplate; - private final ObjectMapper objectMapper; - - @Value("${hantu-openapi.appkey}") - private String appkey; - - @Value("${hantu-openapi.appsecret}") - private String appsecret; - - @Value("${hantu-openapi.base-url}") - private String baseUrl; - - //REVIEW API호출 실패 문제는 대부분 시스템 레벨 문제 -> 개별 종목만 실패할 확률은 낮고 대부분 전체적으로 실패하므로 처음부터 다시 시작하도록 설정 - @Retryable(maxAttempts=5, backoff = @Backoff(delay = 1000)) - @Override - public void initiateKoreanStockRank(){ - - List stockList = stockRepository.findByCurrencyName("KRW"); - String token = (String) redisTemplate.opsForValue().get("kis-access-token"); - - int failCount=0; - int successCount=0; - - // 토큰 null 체크 추가 - if (token == null || token.trim().isEmpty()) { - log.error("KIS 액세스 토큰을 찾을 수 없습니다."); - throw new DomainException(DomainErrorCode.TOKEN_EMPTY); - } - - for(Stock stock: stockList){ - try { - processStockRankData(stock.getCode(), token); - successCount++; - }catch(Exception e){ - failCount++; - log.warn("주식 {} 처리 실패: {}", stock.getCode(), e.getMessage()); //로그 기록 남기기 - } - } - - //하나라도 성공을 못 시킬때만 재시작 - if(successCount == 0){ - log.error("순위 데이터 초기화 작업 중 모든 종목 처리 실패"); - throw new DomainException(DomainErrorCode.COMPLETE_INITIALIZATION_FAILURE); - } - - //순위 데이터 생성 및 저장 - processStockRank(); - } - - //NOTE LiveStockPrice에서도 동일한 호출을 진행하지만 클래스의 책임 분리가 모호해서 따로따로 호출하기로 함 -> 대신 상위 클래스에서 초기화 메소드 실행 시 순서만 조정 - //순위를 얻는데 필요한 데이터 조회 - private void processStockRankData(String code, String token) { - try { - //URL 생성 - String url = UriComponentsBuilder.fromHttpUrl(baseUrl) - .path("/uapi/domestic-stock/v1/quotations/inquire-price") - .queryParam("FID_COND_MRKT_DIV_CODE", "UN") - .queryParam("FID_INPUT_ISCD", code) - .build() - .toUriString(); - - // 헤더 설정 - HttpHeaders headers = new HttpHeaders(); - headers.set("authorization", "Bearer " + token); - headers.set("appkey", appkey); - headers.set("appsecret", appsecret); - headers.set("tr_id", "FHKST01010100"); - headers.set("custtype", "P"); - headers.setContentType(MediaType.APPLICATION_JSON); - - - HttpEntity entity = new HttpEntity<>(headers); - - // API 호출 - ResponseEntity response = restTemplate.exchange(url, HttpMethod.GET, entity, String.class); - - - if (response.getStatusCode().is2xxSuccessful()) { - KoreanStockRankDataWrapper wrapper = objectMapper.readValue(response.getBody(), KoreanStockRankDataWrapper.class); - - koreanStockRankSaverService.saveStockRankData(code, wrapper); - } else { - log.error("순위 관련 데이터 조회 실패: {} - 응답이 비어있음", code); - throw new DomainException(DomainErrorCode.API_RESPONSE_EMPTY); - } - - } catch (DomainException e) { - throw e; - } catch (JsonProcessingException e) { - log.error("JSON 파싱 실패: {} - {}", code, e.getMessage()); - throw new DomainException(DomainErrorCode.JSON_PARSING_ERROR); - } catch (ResourceAccessException e) { - log.error("API 연결 실패: {} - {}", code, e.getMessage()); - throw new DomainException(DomainErrorCode.API_CONNECTION_ERROR); - } catch (Exception e) { - log.error("예상치 못한 오류: {} - {}", code, e.getMessage()); - throw new DomainException(DomainErrorCode.INTERNAL_SERVER_ERROR); - } - } - - //순위 데이터 생성 및 저장 - private void processStockRank(){ - List stockRankList = stockRankRepository.findByCurrencyName("KRW"); - - stockRankList.sort(Comparator.comparing(StockRank::getMarketCap).reversed()); - for (int i = 0; i < stockRankList.size(); i++) { - stockRankList.get(i).updateMarketCapRank(i + 1); - } - - stockRankList.sort(Comparator.comparing((StockRank sr) -> sr.getRocRate().abs()).thenComparing(StockRank::getRocRate).reversed()); //같은 절댓값이면 실제값으로 재정렬 - for (int i = 0; i < stockRankList.size(); i++) { - stockRankList.get(i).updateRocRank(i + 1); - } - - stockRankList.sort(Comparator.comparing(StockRank::getTradingVolume).reversed()); - for (int i = 0; i < stockRankList.size(); i++) { - stockRankList.get(i).updateTradingVolumeRank(i + 1); - } - - stockRankRepository.saveAll(stockRankList); - } - - -} diff --git a/infra/src/main/java/com/fintory/infra/domain/stock/service/korean/KoreanStockServiceImpl.java b/infra/src/main/java/com/fintory/infra/domain/stock/service/korean/KoreanStockServiceImpl.java index 580bad35..29b6da61 100644 --- a/infra/src/main/java/com/fintory/infra/domain/stock/service/korean/KoreanStockServiceImpl.java +++ b/infra/src/main/java/com/fintory/infra/domain/stock/service/korean/KoreanStockServiceImpl.java @@ -6,9 +6,6 @@ import com.fintory.domain.stock.dto.korean.response.*; import com.fintory.domain.stock.model.Stock; import com.fintory.domain.stock.service.korean.*; -import com.fintory.infra.domain.stock.repository.LiveStockPriceRepository; -import com.fintory.infra.domain.stock.repository.StockPriceHistoryRepository; -import com.fintory.infra.domain.stock.repository.StockRankRepository; import com.fintory.infra.domain.stock.repository.StockRepository; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -26,13 +23,8 @@ @RequiredArgsConstructor public class KoreanStockServiceImpl implements KoreanStockService { - private final KoreanStockRankService koreanStockRankService; private final KoreanLiveStockPriceService koreanLiveStockPriceService; private final KoreanStockPriceHistoryService koreanStockPriceHistoryService; - private final LiveStockPriceRepository liveStockPriceRepository; - private final StockPriceHistoryRepository stockPriceHistoryRepository; - - private final StockRankRepository stockRankRepository; private final StockRepository stockRepository; // 어플리케이션이 완전히 준비된 후 한번만 실행됨 @@ -109,7 +101,6 @@ public KoreanLiveStockPriceResponse getLiveStockPrice(String code) { private boolean executeWithErrorHandling(String taskName,Runnable task){ try{ task.run(); - log.info("{} 초기화 성공",taskName); return true; }catch(Exception e){ log.error("{} 초기화 실패 {}",taskName,e.getMessage()); diff --git a/infra/src/main/java/com/fintory/infra/domain/stock/service/korean/saver/KoreanStockRankSaverService.java b/infra/src/main/java/com/fintory/infra/domain/stock/service/korean/saver/KoreanStockRankSaverService.java deleted file mode 100644 index 801d6ee7..00000000 --- a/infra/src/main/java/com/fintory/infra/domain/stock/service/korean/saver/KoreanStockRankSaverService.java +++ /dev/null @@ -1,58 +0,0 @@ -package com.fintory.infra.domain.stock.service.korean.saver; - -import com.fintory.common.exception.DomainErrorCode; -import com.fintory.common.exception.DomainException; -import com.fintory.domain.stock.dto.korean.core.KoreanStockRankData; -import com.fintory.domain.stock.dto.korean.wrapper.KoreanStockRankDataWrapper; -import com.fintory.domain.stock.model.Stock; -import com.fintory.domain.stock.model.StockRank; -import com.fintory.infra.domain.stock.repository.StockRankRepository; -import com.fintory.infra.domain.stock.repository.StockRepository; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Service; -import org.springframework.transaction.annotation.Transactional; - -@Service -@Slf4j -@RequiredArgsConstructor -public class KoreanStockRankSaverService { - - - private final StockRepository stockRepository; - private final StockRankRepository stockRankRepository; - - //순위를 얻는데 필요한 데이터 저장 메서드 - @Transactional - public void saveStockRankData(String code, KoreanStockRankDataWrapper response) { - - if (response == null || response.output() == null) { - log.warn("순위 관련 데이터 응답이 비어있음: {}", code); - throw new DomainException(DomainErrorCode.API_RESPONSE_EMPTY); - } - - KoreanStockRankData item = response.output(); - - if (item == null) { - log.warn("순위 관련 응답에서 데이터를 찾을 수 없음"); - throw new DomainException(DomainErrorCode.STOCK_DATA_NOT_FOUND); - } - - StockRank stockRank = stockRankRepository.findByStockCode(code).orElse(null); - Stock stock = stockRepository.findByCode(code).orElseThrow(()->new DomainException(DomainErrorCode.STOCK_NOT_FOUND)); - - if (stockRank == null) { - stockRank = StockRank.builder() - .tradingVolume(item.tradingVolume()) - .rocRate(item.roc()) - .marketCap(item.marketCap()) - .stock(stock) - .build(); - } else { - stockRank.updateStockRankData(item.marketCap(), item.roc(), item.tradingVolume()); - } - - stockRankRepository.save(stockRank); - } - -} diff --git a/infra/src/main/java/com/fintory/infra/domain/stock/service/overseas/OverseasStockRankServiceImpl.java b/infra/src/main/java/com/fintory/infra/domain/stock/service/overseas/OverseasStockRankServiceImpl.java deleted file mode 100644 index 85c56cb5..00000000 --- a/infra/src/main/java/com/fintory/infra/domain/stock/service/overseas/OverseasStockRankServiceImpl.java +++ /dev/null @@ -1,150 +0,0 @@ -package com.fintory.infra.domain.stock.service.overseas; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fintory.common.exception.DomainErrorCode; -import com.fintory.common.exception.DomainException; -import com.fintory.domain.stock.dto.overseas.wrapper.OverseasStockRankDataWrapper; -import com.fintory.domain.stock.model.Stock; -import com.fintory.domain.stock.model.StockRank; -import com.fintory.domain.stock.service.overseas.OverseasStockRankService; -import com.fintory.infra.domain.stock.repository.StockRankRepository; -import com.fintory.infra.domain.stock.repository.StockRepository; -import com.fintory.infra.domain.stock.service.overseas.saver.OverseasStockRankSaverService; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.data.redis.core.RedisTemplate; -import org.springframework.http.*; -import org.springframework.retry.annotation.Backoff; -import org.springframework.retry.annotation.Retryable; -import org.springframework.stereotype.Service; -import org.springframework.transaction.annotation.Transactional; -import org.springframework.web.client.ResourceAccessException; -import org.springframework.web.client.RestTemplate; -import org.springframework.web.util.UriComponentsBuilder; - -import java.util.Comparator; -import java.util.List; - -@Service -@Slf4j -@RequiredArgsConstructor -public class OverseasStockRankServiceImpl implements OverseasStockRankService { - - private final StockRepository stockRepository; - private final StockRankRepository stockRankRepository; - private final OverseasStockRankSaverService overseasStockRankSaverService; - private final RedisTemplate redisTemplate; - private final RestTemplate restTemplate; - private final ObjectMapper objectMapper; - - @Value("${hantu-openapi.appkey}") - private String appkey; - - @Value("${hantu-openapi.appsecret}") - private String appsecret; - - @Value("${hantu-openapi.base-url}") - private String baseUrl; - - //NOTE 랭킹 데이터는 상대적 비교가 필요해서 전체적인 일관성이 필요함 -> 랭킹이 동일하더라도 프론트에서 받은 데이터를 정렬해서 표시 - @Override - @Retryable(maxAttempts=3, backoff = @Backoff(delay = 1000)) - public void initiateOverseasStockRank(){ - List stockList = stockRepository.findByCurrencyName("USD"); - String token = (String) redisTemplate.opsForValue().get("kis-access-token"); - int successCount = 0; - - if (token == null || token.trim().isEmpty()) { - log.error("KIS 액세스 토큰을 찾을 수 없습니다."); - throw new DomainException(DomainErrorCode.TOKEN_EMPTY); - } - - for(Stock stock : stockList){ - try { - processStockRankData(stock.getCode(), token); - successCount++; - }catch(Exception e){ - log.warn("주식 {} 처리 실패: {}", stock.getCode(), e.getMessage()); //로그 기록 남기기 - } - } - - //하나라도 성공을 못 시킬때만 재시작 - if(successCount == 0){ - log.error("순위 데이터 초기화 작업 중 모든 종목 처리 실패"); - throw new DomainException(DomainErrorCode.COMPLETE_INITIALIZATION_FAILURE); - } - - // 순위 데이터 생성 및 저장 - processStockRank(); - } - - //순위를 얻는데 필요한 데이터 조회 - @Transactional - public void processStockRankData(String code, String token) { - try { - String url = UriComponentsBuilder.fromHttpUrl(baseUrl) - .path("/uapi/overseas-price/v1/quotations/price-detail") - .queryParam("AUTH", "") - .queryParam("EXCD", "NAS") - .queryParam("SYMB", code) - .build() - .toUriString(); - - HttpHeaders headers = new HttpHeaders(); - headers.set("authorization", "Bearer " + token); - headers.set("appkey", appkey); - headers.set("appsecret", appsecret); - headers.set("tr_id", "HHDFS76200200"); - headers.set("custtype", "P"); - headers.setContentType(MediaType.APPLICATION_JSON); - - HttpEntity entity = new HttpEntity<>(headers); - - ResponseEntity response = restTemplate.exchange(url, HttpMethod.GET, entity, String.class); - - if (response.getStatusCode().is2xxSuccessful()) { - OverseasStockRankDataWrapper wrapper = objectMapper.readValue(response.getBody(), OverseasStockRankDataWrapper.class); - overseasStockRankSaverService.saveStockRankData(code, wrapper); //db에 데이터 저장 - } else { - log.error("순위 관련 데이터 조회 실패: {} - 응답이 비어있음", code); - throw new DomainException(DomainErrorCode.API_RESPONSE_EMPTY); - } - - } catch (DomainException e) { - throw e; - } catch (JsonProcessingException e) { - log.error("JSON 파싱 실패: {} - {}", code, e.getMessage()); - throw new DomainException(DomainErrorCode.JSON_PARSING_ERROR); - } catch (ResourceAccessException e) { - log.error("API 연결 실패: {} - {}", code, e.getMessage()); - throw new DomainException(DomainErrorCode.API_CONNECTION_ERROR); - } catch (Exception e) { - log.error("예상치 못한 오류: {} - {}", code, e.getMessage()); - throw new DomainException(DomainErrorCode.INTERNAL_SERVER_ERROR); - } - } - - //순위 데이터 생성 및 저장 - private void processStockRank(){ - List stockRankList = stockRankRepository.findByCurrencyName("USD"); - - stockRankList.sort(Comparator.comparing(StockRank::getMarketCap).reversed()); - for (int i = 0; i < stockRankList.size(); i++) { - stockRankList.get(i).updateMarketCapRank(i + 1); - } - - stockRankList.sort(Comparator.comparing((StockRank sr) -> sr.getRocRate().abs()).thenComparing(StockRank::getRocRate).reversed()); - for (int i = 0; i < stockRankList.size(); i++) { - stockRankList.get(i).updateRocRank(i + 1); - } - - stockRankList.sort(Comparator.comparing(StockRank::getTradingVolume).reversed()); - for (int i = 0; i < stockRankList.size(); i++) { - stockRankList.get(i).updateTradingVolumeRank(i + 1); - } - - stockRankRepository.saveAll(stockRankList); - } -} \ No newline at end of file diff --git a/infra/src/main/java/com/fintory/infra/domain/stock/service/overseas/OverseasStockServiceImpl.java b/infra/src/main/java/com/fintory/infra/domain/stock/service/overseas/OverseasStockServiceImpl.java index d3a96a93..7ba899db 100644 --- a/infra/src/main/java/com/fintory/infra/domain/stock/service/overseas/OverseasStockServiceImpl.java +++ b/infra/src/main/java/com/fintory/infra/domain/stock/service/overseas/OverseasStockServiceImpl.java @@ -5,8 +5,6 @@ import com.fintory.domain.stock.dto.overseas.response.*; import com.fintory.domain.stock.model.Stock; import com.fintory.domain.stock.service.overseas.*; -import com.fintory.infra.domain.stock.repository.StockPriceHistoryRepository; -import com.fintory.infra.domain.stock.repository.StockRankRepository; import com.fintory.infra.domain.stock.repository.StockRepository; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -15,10 +13,8 @@ import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; -import java.math.BigDecimal; import java.util.Comparator; import java.util.List; -import java.util.function.Function; import java.util.stream.Collectors; @@ -27,13 +23,9 @@ @RequiredArgsConstructor public class OverseasStockServiceImpl implements OverseasStockService { - private final OverseasStockRankService overseasStockRankService; private final OverseasLiveStockPriceService overseasLiveStockPriceService; private final OverseasStockPriceHistoryService overseasStockPriceHistoryService; - - private final StockRankRepository stockRankRepository; private final StockRepository stockRepository; - private final StockPriceHistoryRepository stockPriceHistoryRepository; @EventListener(ApplicationReadyEvent.class) @@ -129,7 +121,6 @@ public OverseasLiveStockPriceResponse getLiveStockPrice(String code) { private boolean executeWithErrorHandling(String taskName, Runnable task){ try{ task.run(); - log.info("{} 초기화 성공",taskName); return true; }catch(Exception e){ log.error("{} 초기화 실패 {}",taskName,e.getMessage()); diff --git a/infra/src/main/java/com/fintory/infra/domain/stock/service/overseas/saver/OverseasStockRankSaverService.java b/infra/src/main/java/com/fintory/infra/domain/stock/service/overseas/saver/OverseasStockRankSaverService.java deleted file mode 100644 index 0f46b806..00000000 --- a/infra/src/main/java/com/fintory/infra/domain/stock/service/overseas/saver/OverseasStockRankSaverService.java +++ /dev/null @@ -1,56 +0,0 @@ -package com.fintory.infra.domain.stock.service.overseas.saver; - -import com.fintory.common.exception.DomainErrorCode; -import com.fintory.common.exception.DomainException; -import com.fintory.domain.stock.dto.overseas.core.OverseasStockRankData; -import com.fintory.domain.stock.dto.overseas.wrapper.OverseasStockRankDataWrapper; -import com.fintory.domain.stock.model.Stock; -import com.fintory.domain.stock.model.StockRank; -import com.fintory.infra.domain.stock.repository.StockRankRepository; -import com.fintory.infra.domain.stock.repository.StockRepository; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Service; -import org.springframework.transaction.annotation.Transactional; - -@Service -@RequiredArgsConstructor -@Slf4j -public class OverseasStockRankSaverService { - - private final StockRankRepository stockRankRepository; - private final StockRepository stockRepository; - - //순위를 얻는데 필요한 데이터 저장 메서드 - @Transactional - public void saveStockRankData(String code, OverseasStockRankDataWrapper response) { - if (response == null || response.output() == null) { - log.warn("순위 관련 데이터 응답이 비어있음: {}", code); - throw new DomainException(DomainErrorCode.API_RESPONSE_EMPTY); - } - - OverseasStockRankData item = response.output(); - - if (item == null) { - log.warn("순위 관련 응답에서 데이터를 찾을 수 없음"); - throw new DomainException(DomainErrorCode.STOCK_DATA_NOT_FOUND); - } - - StockRank stockRank = stockRankRepository.findByStockCode(code).orElse(null); - Stock stock = stockRepository.findByCode(code).orElseThrow(() -> new DomainException(DomainErrorCode.STOCK_NOT_FOUND)); - - if (stockRank == null) { - stockRank = StockRank.builder() - .tradingVolume(item.tradingVolume()) - .rocRate(item.roc()) - .marketCap(item.marketCap()) - .stock(stock) - .build(); - } else { - stockRank.updateStockRankData(item.marketCap(), item.roc(), item.tradingVolume()); - } - - stockRankRepository.save(stockRank); - } - -} diff --git a/infra/src/main/java/com/fintory/infra/domain/stock/service/token/KisTokenIssueServiceImpl.java b/infra/src/main/java/com/fintory/infra/domain/stock/service/token/KisTokenIssueServiceImpl.java index 1181d7d8..2a19f518 100644 --- a/infra/src/main/java/com/fintory/infra/domain/stock/service/token/KisTokenIssueServiceImpl.java +++ b/infra/src/main/java/com/fintory/infra/domain/stock/service/token/KisTokenIssueServiceImpl.java @@ -39,6 +39,7 @@ public class KisTokenIssueServiceImpl implements KisTokenIssueService { @PostConstruct public void refreshKisToken() { try { + if(redisTemplate.opsForValue().get("kis-access-token")== null || redisTemplate.opsForValue().get("kis-websocket-access-token") == null || redisTemplate.getExpire("kis-access-token")<=0 || redisTemplate.getExpire("kis-websocket-access-token")<=0) { // REST API 토큰 발급 @@ -61,8 +62,8 @@ public void refreshKisToken() { } } - // 23시간마다 토큰 갱신 - @Scheduled(fixedRate = 82800000, initialDelay = 82800000) + // 8시간마다 토큰 갱신 + @Scheduled(fixedRate = 28800000, initialDelay = 28800000) public void changeRefreshToken() { try { // REST API 토큰 갱신 diff --git a/infra/src/main/java/com/fintory/infra/domain/stock/service/websocket/LiveStockPriceWebsocketServiceImpl.java b/infra/src/main/java/com/fintory/infra/domain/stock/service/websocket/LiveStockPriceWebsocketServiceImpl.java deleted file mode 100644 index 505baca7..00000000 --- a/infra/src/main/java/com/fintory/infra/domain/stock/service/websocket/LiveStockPriceWebsocketServiceImpl.java +++ /dev/null @@ -1,629 +0,0 @@ -package com.fintory.infra.domain.stock.service.websocket; - -import com.fintory.common.exception.DomainErrorCode; -import com.fintory.common.exception.DomainException; -import com.fintory.domain.stock.dto.websocket.LiveStockPriceStream; -import com.fintory.domain.stock.dto.websocket.MarketStatusResponse; -import com.fintory.domain.stock.model.Stock; -import com.fintory.domain.stock.service.websocket.LiveStockPriceWebSocketSaverService; -import com.fintory.domain.stock.service.websocket.LiveStockPriceWebsocketService; -import com.fintory.infra.domain.alarm.event.PriceAlertEvent; -import com.fintory.infra.domain.stock.handler.KoreanLiveStockPriceWebSocketHandler; -import com.fintory.infra.domain.stock.handler.OverseasLiveStockPriceWebSocketHandler; -import com.fintory.infra.domain.stock.repository.StockRepository; -import jakarta.annotation.PostConstruct; -import jakarta.annotation.PreDestroy; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.ApplicationEventPublisher; -import org.springframework.data.redis.core.RedisTemplate; -import org.springframework.http.HttpEntity; -import org.springframework.http.HttpHeaders; -import org.springframework.http.MediaType; -import org.springframework.http.ResponseEntity; -import org.springframework.messaging.simp.SimpMessagingTemplate; -import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.stereotype.Service; -import org.springframework.web.client.RestTemplate; -import org.springframework.web.socket.client.WebSocketConnectionManager; - -import java.math.BigDecimal; -import java.time.*; -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Consumer; - -import io.micrometer.core.instrument.MeterRegistry; -import io.micrometer.core.instrument.Timer; - -//NOTE 구독 시도시 -> 에러 코드를 보고 프론트에서 DB API 호출 -//NOTE 구독 성공 후 일정시간 동안 데이터가 오지 않으면 -> 프론트에서 연결 끊김 판단 -@Service -@Slf4j -public class LiveStockPriceWebsocketServiceImpl implements LiveStockPriceWebsocketService { - - @Value("${db-openapi.base-url}") - private String baseUrl; - - private final RedisTemplate redisTemplate; - private final WebSocketConnectionManager koreanConnectionManager; - private final WebSocketConnectionManager overseasConnectionManager; - private final KoreanLiveStockPriceWebSocketHandler koreanHandler; - private final OverseasLiveStockPriceWebSocketHandler overseasHandler; - private final StockRepository stockRepository; - private final SimpMessagingTemplate messageTemplate; - - // 공통 데이터 구조들 - //구독 중인 종목 코드 저장 - private final Set koreanSubscribedStocks = ConcurrentHashMap.newKeySet(); - private final Set overseasSubscribedStocks = ConcurrentHashMap.newKeySet(); - - //db에 저장되지 않은 주식 데이터 임시 저장용 - private final Map koreanPendingData = new ConcurrentHashMap<>(); - private final Map overseasPendingData = new ConcurrentHashMap<>(); - - // 이전에 받은 주식 데이터 저장 -> 중복 데이터 필터링용 - private final Map previousKoreanData = new ConcurrentHashMap<>(); - private final Map previousOverseasData = new ConcurrentHashMap<>(); - private final RestTemplate restTemplate; - - private volatile AtomicBoolean isKoreanConnected = new AtomicBoolean(false); - private volatile AtomicBoolean isOverseasConnected = new AtomicBoolean(false); - private String cachedAccessToken; - - private final LiveStockPriceWebSocketSaverService liveStockPriceWebSocketSaverService; - - //이벤트 - private final ApplicationEventPublisher applicationEventPublisher; - - //그라파나용 매트릭 -> 레이턴시, 효율성 - private final Timer dataProcessingTime; - - - public LiveStockPriceWebsocketServiceImpl( - @Qualifier("koreanLiveStockPriceWebSocketConnectionManager") WebSocketConnectionManager koreanConnectionManager, - @Qualifier("overseasLiveStockPriceWebSocketConnectionManager") WebSocketConnectionManager overseasConnectionManager, - KoreanLiveStockPriceWebSocketHandler koreanHandler, - OverseasLiveStockPriceWebSocketHandler overseasHandler, - StockRepository stockRepository, - SimpMessagingTemplate messageTemplate, RestTemplate restTemplate, RedisTemplate redisTemplate, LiveStockPriceWebSocketSaverService liveStockPriceWebSocketSaverService, ApplicationEventPublisher applicationEventPublisher,MeterRegistry meterRegistry) { - - this.koreanConnectionManager = koreanConnectionManager; - this.overseasConnectionManager = overseasConnectionManager; - this.koreanHandler = koreanHandler; - this.overseasHandler = overseasHandler; - this.stockRepository = stockRepository; - this.messageTemplate = messageTemplate; - this.restTemplate = restTemplate; - this.redisTemplate = redisTemplate; - this.liveStockPriceWebSocketSaverService = liveStockPriceWebSocketSaverService; - this.applicationEventPublisher = applicationEventPublisher; - - this.dataProcessingTime = Timer.builder("websocket.data.processing.time") - .description("Time to process and send stock data") - .register(meterRegistry); - } - - /* 구독 관리 메서드 */ - @Override - public void koreanStockSubscribe(String code) { - subscribeStock(code, "국내", isKoreanConnected, koreanSubscribedStocks, - this::connectKoreanWebSocket, koreanHandler::subscribe); - } - - @Override - public void koreanStockUnsubscribe(String code) { - unsubscribeStock(code, "국내", koreanSubscribedStocks, previousKoreanData, - koreanPendingData, koreanHandler::unsubscribe); - } - - @Override - public void overseasStockSubscribe(String code) { - subscribeStock(code, "해외", isOverseasConnected, overseasSubscribedStocks, - this::connectOverseasWebSocket, overseasHandler::subscribe); - } - - @Override - public void overseasStockUnsubscribe(String code) { - unsubscribeStock(code, "해외", overseasSubscribedStocks, previousOverseasData, - overseasPendingData, overseasHandler::unsubscribe); - } - - @Override - public void sendStockData(String stockCode, Object stockData) { - if (stockData instanceof LiveStockPriceStream stream) { - if (stream.priceChange() == null || stream.priceChange().compareTo(BigDecimal.ZERO) == 0) { - log.debug("변동 없음 - 전송 스킵: {}", stockCode); - return; - } - messageTemplate.convertAndSend("/topic/stock/live-Price/" + stockCode, stockData); - } - } - - /* 구독 자동 실행 메소드 */ - @Scheduled(cron="0 30 09 * * MON-FRI", zone="America/New_York") - public void scheduledOverseasMarketSubscription(){ - startOverseasMarketSubscription(); - } - - @Scheduled(cron="0 0 9 * * MON-FRI", zone="Asia/Seoul") - public void scheduledKoreanMarketSubscription(){ - startKoreanMarketSubscription(); - } - - @PostConstruct - public void initMarketSubscriptions() { - // 국내 장 체크 및 구독 - if (isKoreanMarketOpen()) { - log.info("애플리케이션 시작 - 국내 장 열림, 자동 구독 시작"); - startKoreanMarketSubscription(); - } else { - log.info("국내 장이 열려있지 않아 자동 구독 스킵"); - } - - // 해외 장 체크 및 구독 - if (isOverseasMarketOpen()) { - log.info("애플리케이션 시작 - 해외 장 열림, 자동 구독 시작"); - startOverseasMarketSubscription(); - } else { - log.info("해외 장이 열려있지 않아 자동 구독 스킵"); - } - } - - /* 통합 구독/구독해제 로직 */ - private void subscribeStock(String code, String marketName, AtomicBoolean isConnected, - Set subscribedStocks, Runnable connectAction, - Consumer subscribeAction) { - try { - - //해외, 국내 주식 토큰 분리 - if("해외".equals(marketName)){ - cachedAccessToken = (String) redisTemplate.opsForValue().get("db-access-token"); - }else{ - cachedAccessToken = (String) redisTemplate.opsForValue().get("kis-websocket-access-token"); - } - - boolean isMarketClosed = ("해외".equals(marketName) && !isOverseasMarketOpen()) || - ("국내".equals(marketName) && !isKoreanMarketOpen()); - - if (isMarketClosed) { - throw new DomainException(DomainErrorCode.MARKET_CLOSED); - } - - if (!isConnected.get()) { - log.info("{} 주식 WebSocket이 연결되어 있지 않아 자동 연결을 시작합니다.", marketName); - connectAction.run(); - } - - if (!subscribedStocks.contains(code)) { - subscribedStocks.add(code); - } - log.info("{} 종목 {} 구독", marketName, code); - - subscribeAction.accept(code); - } catch (Exception e) { - subscribedStocks.remove(code); - log.error("{} 종목 구독 실패: {}", marketName, e.getMessage()); - throw new DomainException(DomainErrorCode.STOCK_SUBSCRIBE_FAILED); - } - } - - private void unsubscribeStock(String code, String marketName, Set subscribedStocks, - Map previousData, - Map pendingData, - Consumer unsubscribeAction) { - try { - if (subscribedStocks.contains(code)) { - unsubscribeAction.accept(code); - subscribedStocks.remove(code); - - // 메모리 정리 - previousData.remove(code); - pendingData.remove(code); - log.info("{} 종목 {} 구독 해제", marketName, code); - } - } catch (Exception e) { - log.error("{} 종목 구독 해제 실패: {}", marketName, e.getMessage()); - throw new DomainException(DomainErrorCode.STOCK_UNSUBSCRIBE_FAILED); - } - } - - /* WebSocket 연결 관리 */ - private void connectKoreanWebSocket() { - if (isKoreanConnected.get()) { - log.info("국내 주식 WebSocket이 이미 연결되어 있습니다."); - return; - } - - Consumer callback = dto -> - processStreamData(dto, previousKoreanData, koreanPendingData, "국내"); - - koreanHandler.setDataCallBack(callback); - koreanConnectionManager.start(); - - boolean connected = koreanHandler.waitForConnection(30); - if (!connected) { - log.info("국내 장시간임에도 WebSocket 연결 실패 - 공휴일이거나 기술적 문제일 수 있음"); - throw new DomainException(DomainErrorCode.WEBSOCKET_CONNECTION_FAILED); - } - - isKoreanConnected.set(true); - log.info("국내 주식 WebSocket 연결 완료"); - } - - private void connectOverseasWebSocket() { - if (isOverseasConnected.get()) { - log.info("해외 주식 WebSocket이 이미 연결되어 있습니다."); - return; - } - - Consumer callback = dto -> - processStreamData(dto, previousOverseasData, overseasPendingData, "해외"); - - overseasHandler.setDataCallBack(callback); - overseasConnectionManager.start(); - - boolean connected = overseasHandler.waitForConnection(30); - if (!connected) { - log.info("해외 장시간임에도 WebSocket 연결 실패 - 공휴일이거나 기술적 문제일 수 있음"); - throw new DomainException(DomainErrorCode.WEBSOCKET_CONNECTION_FAILED); - } - - isOverseasConnected.set(true); - log.info("해외 주식 WebSocket 연결 완료"); - } - - //웹소켓으로 받은 데이터를 처리하는 메서드 - private void processStreamData(LiveStockPriceStream dto, - Map previousData, - Map pendingData, - String marketName) { - LiveStockPriceStream previous = previousData.get(dto.code()); - - Timer.Sample sample = Timer.start(); - try { - - //이전 데이터와 비교하여 중복 체크 - if (previous != null && previous.equals(dto)) { - log.debug("{} 주식 중복 데이터 스킵: {}", marketName, dto.code()); - return; //똑같은 데이터면 무시 - } - - //새로운 데이터를 받으면 -> 감시가 이벤트 발행 - applicationEventPublisher.publishEvent( - new PriceAlertEvent(this, dto) - ); - - //스케쥴러 + 웹소켓 연결 시작하자마자 받은 데이터 값(첫 데이터) 저장 - if (previous == null) { - try { - liveStockPriceWebSocketSaverService.saveStockData(dto); //DB에 바로 저장 - log.debug("{} 종목 {} 실시간 저장 완료", marketName, dto.code()); - } catch (Exception e) { - // 실패 시 배치 저장을 위해 pendingData에 보관 - pendingData.put(dto.code(), dto); - log.error("{} 종목 {} 실시간 저장 실패, 배치 저장 대기: {}", marketName, dto.code(), e.getMessage()); - } - } - - //새로운 데이터면 다음 중복 체크용으로 저장 - previousData.put(dto.code(), dto); - pendingData.put(dto.code(), dto); //배치 저장 대기 - sendStockData(dto.code(), dto); //클라이언트에게 전송 - }finally { - sample.stop(dataProcessingTime); - - } - } - - /* 스케줄링 - 배치 저장 */ - @Scheduled(cron = "0 * 9-15 * * MON-FRI", zone = "Asia/Seoul") - public void saveKoreanStockDataBatch() { - if (!isKoreanMarketOpen()) { - log.debug("국내 장 마감으로 인한 배치 저장 중단"); - return; - } - saveBatchData("국내", koreanPendingData); - } - - @Scheduled(cron = "0 * 9-15 * * MON-FRI", zone = "America/New_York") - public void saveOverseasStockDataBatch() { - if (!isOverseasMarketOpen()) { - log.debug("해외 장 마감으로 인한 배치 저장 중단"); - return; - } - saveBatchData("해외", overseasPendingData); - } - - private void saveBatchData(String marketName, Map pendingData) { - if (pendingData.isEmpty()) return; - - Map dataToSave = new HashMap<>(pendingData); - pendingData.clear(); - - dataToSave.values().forEach(dto -> { - try { - liveStockPriceWebSocketSaverService.saveStockData(dto); - } catch (Exception e) { - log.error("{} 종목 {} 저장 실패: {}", marketName, dto.code(), e.getMessage()); - } - }); - - log.info("{} 주식 배치 저장 완료 - 저장된 종목 수: {}", marketName, dataToSave.size()); - } - - /* 장 시작 시 자동으로 필요한 종목 전부 구독*/ - public void startKoreanMarketSubscription(){ - if (!isKoreanMarketOpen()) { - log.info("국내 장이 열려있지 않아 자동 구독 스킵"); - return; - } - - connectKoreanWebSocket(); - - List targetStocks = stockRepository.findByCurrencyName("KRW"); - int beforeSize = koreanSubscribedStocks.size(); - - targetStocks.forEach(dto -> { - if(!koreanSubscribedStocks.contains(dto.getCode())) { - try { - koreanHandler.subscribe(dto.getCode()); - koreanSubscribedStocks.add(dto.getCode()); - }catch (Exception e){ - log.error("종목 {} 구독 실패: {}", dto.getCode(), e.getMessage()); - } - } - }); - int successCount = koreanSubscribedStocks.size() - beforeSize; - log.info("장 시작 - 총 {} 종목 중 {} 종목 구독 완료", - targetStocks.size(), successCount); - } - - public void startOverseasMarketSubscription(){ - - if (!isOverseasMarketOpen()) { - log.info("해외 장이 열려있지 않아 자동 구독 스킵"); - return; - } - - connectOverseasWebSocket(); - - - List targetStocks = stockRepository.findByCurrencyName("USD"); - int beforeSize = overseasSubscribedStocks.size(); - - targetStocks.forEach(stock -> { - if (!overseasSubscribedStocks.contains(stock.getCode())) { - try { - overseasHandler.subscribe(stock.getCode()); - overseasSubscribedStocks.add(stock.getCode()); - }catch(Exception e){ - log.error("종목 {} 구독 실패: {}", stock.getCode(), e.getMessage()); - } - } - }); - - int successCount = overseasSubscribedStocks.size() - beforeSize; - log.info("장 시작 - 총 {} 종목 중 {} 종목 구독 완료", - targetStocks.size(), successCount); - } - - @Override - public MarketStatusResponse getMarketStatus() { - // 국내 장 시간이면 "korean" - if (isKoreanConnected.get() && isKoreanMarketOpen()) { - return new MarketStatusResponse("korean"); - } - - // 해외 장 시간이면 "overseas" - if (isOverseasConnected.get() && isOverseasMarketOpen()) { - return new MarketStatusResponse("overseas"); - } - - // 둘 다 아니면 "no" - return new MarketStatusResponse("no"); - } - - /* 스케줄링 - 장 마감 정리 */ - @Scheduled(cron = "0 20 15 * * MON-FRI", zone = "Asia/Seoul") - public void cleanUpAfterKoreanMarketClose() { - log.debug("국내 장 마감 - 마지막 데이터 저장 및 정리 시작"); - saveRemainingData("국내", koreanPendingData); - disconnectKoreanWebSocket(); - log.info("국내 장 마감 정리 완료"); - } - - @Scheduled(cron = "0 0 16 * * MON-FRI", zone = "America/New_York") - public void cleanUpAfterOverseasMarketClose() { - log.debug("해외 장 마감 - 마지막 데이터 저장 및 정리 시작"); - saveRemainingData("해외", overseasPendingData); - disconnectOverseasWebSocket(); - log.info("해외 장 마감 정리 완료"); - } - - private void saveRemainingData(String marketName, Map pendingData) { - if (!pendingData.isEmpty()) { - - Map dataToSave = new HashMap<>(pendingData); - pendingData.clear(); - - dataToSave.values().forEach(dto -> { - try { - liveStockPriceWebSocketSaverService.saveStockData(dto); - } catch (Exception e) { - log.error("{} 종목 {} 마지막 저장 실패: {}", marketName, dto.code(), e.getMessage()); - } - }); - - log.info("{} 주식 마지막 배치 저장 완료 - 저장된 종목 수: {}", marketName, dataToSave.size()); - } - } - - private void disconnectKoreanWebSocket() { - if (!isKoreanConnected.get()) return; - - log.info("국내 WebSocket 연결 해제 시작"); - try { - new ArrayList<>(koreanSubscribedStocks).forEach(code -> { - try { - koreanStockUnsubscribe(code); - } catch (Exception e) { - log.warn("국내 종목 {} 구독 해제 중 에러 발생: {}", code, e.getMessage()); - } - }); - Thread.sleep(1000); //서버 처리 대기 - } catch (Exception e) { - log.error("구독 해제 중 에러: {}", e.getMessage()); - } finally { - // 반드시 실행 - koreanConnectionManager.stop(); - koreanSubscribedStocks.clear(); - previousKoreanData.clear(); - koreanPendingData.clear(); - } - - log.info("국내 WebSocket 연결 해제 완료"); - } - - - private void disconnectOverseasWebSocket() { - if (!isOverseasConnected.get()) return; - - log.info("해외 WebSocket 연결 해제 시작"); - try { - new ArrayList<>(overseasSubscribedStocks).forEach(code -> { - try { - overseasStockUnsubscribe(code); - } catch (Exception e) { - log.warn("해외 종목 {} 구독 해제 중 에러 발생: {}", code, e.getMessage()); - } - }); - - Thread.sleep(1000); //서버 처리 대기 - try { - disconnectDBSession(); //db증권은 세션 정리를 하지 않을 경우 에러 발생함 - Thread.sleep(500); - } catch (Exception e) { - log.warn("세션 종료 실패 (무시): {}", e.getMessage()); // 에러 무시 - } - - } catch (Exception e) { - log.error("구독 해제 중 에러: {}", e.getMessage()); - } finally { - overseasConnectionManager.stop(); - isOverseasConnected.set(false); - overseasSubscribedStocks.clear(); - previousOverseasData.clear(); - overseasPendingData.clear(); - } - - log.info("해외 WebSocket 연결 해제 완료"); - } - - - public void disconnectDBSession(){ - try { - //ERROR LettuceConnectionFactory has been STOPPED. Use start() to initialize it - HttpHeaders headers = new HttpHeaders(); - headers.setContentType(MediaType.APPLICATION_JSON); - headers.set("authorization", "Bearer " + cachedAccessToken); - - HttpEntity> entity = new HttpEntity<>(new HashMap<>(), headers); - - ResponseEntity response = restTemplate.postForEntity( - baseUrl + "/api/v1/websocket/disconnectSession", - entity, - Map.class - ); - log.info(response.getBody().toString()); - if (response.getStatusCode().is2xxSuccessful()) { - Map body = response.getBody(); - log.info("웹소켓 세션 초기화 성공: {}", body.get("result")); - } else { - log.warn("웹소켓 세션 초기화 응답 이상: {}", response.getStatusCode()); - } - - }catch (Exception e) { - log.error("웹소켓 세션 초기화 중 오류 발생", e); - throw new RuntimeException("웹소켓 세션 초기화 실패: " + e.getMessage()); - } - } - - /* 유틸리티 메서드 */ - private boolean isKoreanMarketOpen() { - ZonedDateTime now = ZonedDateTime.now(ZoneId.of("Asia/Seoul")); - boolean weekday = now.getDayOfWeek() != DayOfWeek.SATURDAY && now.getDayOfWeek() != DayOfWeek.SUNDAY; - return weekday - && !now.toLocalTime().isBefore(LocalTime.of(9, 0)) - && now.toLocalTime().isBefore(LocalTime.of(15, 30)); - } - - private boolean isOverseasMarketOpen() { - ZonedDateTime now = ZonedDateTime.now(ZoneId.of("America/New_York")); - boolean weekday = now.getDayOfWeek() != DayOfWeek.SATURDAY && now.getDayOfWeek() != DayOfWeek.SUNDAY; - return weekday - && !now.toLocalTime().isBefore(LocalTime.of(9, 0)) - && now.toLocalTime().isBefore(LocalTime.of(16, 0)); - } - - @PreDestroy - public void cleanUp() { - log.info("애플리케이션 종료로 인한 WebSocket 연결 해제 시작"); - - try { - // 남은 데이터 저장 - saveRemainingData("국내", koreanPendingData); - saveRemainingData("해외", overseasPendingData); - - // 연결 해제 - if (isKoreanConnected.get()) { - disconnectKoreanWebSocket(); - } - if (isOverseasConnected.get()) { - disconnectOverseasWebSocket(); - } - - // 최종 리소스 정리 -> (안전장치) - koreanSubscribedStocks.clear(); - overseasSubscribedStocks.clear(); - previousKoreanData.clear(); - previousOverseasData.clear(); - koreanPendingData.clear(); - overseasPendingData.clear(); - - } catch (Exception e) { - log.error("WebSocket cleanup 중 에러 발생", e); - - // 에러 발생해도 리소스는 강제 정리 - koreanSubscribedStocks.clear(); - overseasSubscribedStocks.clear(); - previousKoreanData.clear(); - previousOverseasData.clear(); - koreanPendingData.clear(); - overseasPendingData.clear(); - } - - log.info("WebSocket 연결 해제 완료"); - } - - - - /* 메트릭용 Getter 추가 */ - public Set getKoreanSubscribedStocks() { - return koreanSubscribedStocks; - } - - public Set getOverseasSubscribedStocks() { - return overseasSubscribedStocks; - } - - public boolean isKoreanConnected() { - return isKoreanConnected.get(); - } - - public boolean isOverseasConnected() { - return isOverseasConnected.get(); - } -} \ No newline at end of file diff --git a/settings.gradle b/settings.gradle index 8bc68d65..bf9b00ba 100644 --- a/settings.gradle +++ b/settings.gradle @@ -4,4 +4,5 @@ include 'app-child' include 'domain' include 'infra' include 'common' -include 'auth' \ No newline at end of file +include 'auth' +include 'websocket' diff --git a/websocket/Dockerfile b/websocket/Dockerfile new file mode 100644 index 00000000..241a224c --- /dev/null +++ b/websocket/Dockerfile @@ -0,0 +1,13 @@ +FROM eclipse-temurin:17-jdk + +RUN apt-get update && apt-get install -y tzdata \ + && rm -rf /var/lib/apt/lists/* + +ENV TZ=Asia/Seoul + +WORKDIR /app + +COPY build/libs/*.jar app.jar +COPY src/main/resources/application-deploy.yml /app/application-deploy.yml + +ENTRYPOINT ["java", "-jar","app.jar", "--spring.profiles.active=deploy"] diff --git a/websocket/build.gradle b/websocket/build.gradle new file mode 100644 index 00000000..3fcc4ea5 --- /dev/null +++ b/websocket/build.gradle @@ -0,0 +1,80 @@ +plugins { + id 'java' + id 'org.springframework.boot' + id 'io.spring.dependency-management' +} + +group = 'com.fintory' +version = '0.0.1-SNAPSHOT' + + +java { + toolchain { + languageVersion = JavaLanguageVersion.of(17) + } +} + +configurations { + compileOnly { + extendsFrom annotationProcessor + } +} + +repositories { + mavenCentral() +} + +dependencies { + implementation project(":domain") + implementation project(":common") + implementation project(':infra') + + /* db */ + runtimeOnly 'com.mysql:mysql-connector-j' + + /* .env 자동 로딩 */ + implementation 'me.paulschwarz:spring-dotenv:4.0.0' + + /* jpa */ + implementation 'org.springframework.boot:spring-boot-starter-data-jpa' + + /* lombok */ + compileOnly 'org.projectlombok:lombok' + annotationProcessor 'org.projectlombok:lombok' + + /* webFlux */ + implementation 'org.springframework.boot:spring-boot-starter-webflux' + + /* Jackson */ + implementation 'com.fasterxml.jackson.core:jackson-databind' + + /* Redis */ + implementation 'org.springframework.boot:spring-boot-starter-data-redis' + implementation 'io.lettuce:lettuce-core' + + /* WebSocket */ + implementation 'org.springframework.boot:spring-boot-starter-websocket' + implementation 'org.springframework:spring-messaging' + + /* Micrometer(Prometheus) */ + implementation("io.micrometer:micrometer-registry-prometheus:1.15.2") + + /* Actuator */ + implementation 'org.springframework.boot:spring-boot-starter-actuator' + + /* Security */ + implementation 'org.springframework.boot:spring-boot-starter-security' + +} + +test { + useJUnitPlatform() +} + +bootJar { + enabled = true +} + +jar { + enabled = false +} \ No newline at end of file diff --git a/websocket/src/main/java/com/fintory/websocket/WebsocketApplication.java b/websocket/src/main/java/com/fintory/websocket/WebsocketApplication.java new file mode 100644 index 00000000..66e68560 --- /dev/null +++ b/websocket/src/main/java/com/fintory/websocket/WebsocketApplication.java @@ -0,0 +1,31 @@ +package com.fintory.websocket; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.autoconfigure.domain.EntityScan; +import org.springframework.data.jpa.repository.config.EnableJpaAuditing; +import org.springframework.data.jpa.repository.config.EnableJpaRepositories; +import org.springframework.scheduling.annotation.EnableScheduling; + + + +@EnableScheduling +@SpringBootApplication(scanBasePackages = { + "com.fintory.websocket", + "com.fintory.domain", + "com.fintory.common", + "com.fintory.infra.config", + "com.fintory.infra.domain.stock.service.token" +}) +@EntityScan(basePackages = "com.fintory.domain") +@EnableJpaRepositories( + basePackages = { + "com.fintory.websocket.publisher.repository" + } +) +@EnableJpaAuditing +public class WebsocketApplication { + public static void main(String[] args) { + SpringApplication.run(WebsocketApplication.class, args); + } +} diff --git a/websocket/src/main/java/com/fintory/websocket/config/SecurityConfig.java b/websocket/src/main/java/com/fintory/websocket/config/SecurityConfig.java new file mode 100644 index 00000000..985a8bd8 --- /dev/null +++ b/websocket/src/main/java/com/fintory/websocket/config/SecurityConfig.java @@ -0,0 +1,67 @@ +package com.fintory.websocket.config; + +import lombok.RequiredArgsConstructor; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.DependsOn; +import org.springframework.security.config.annotation.web.builders.HttpSecurity; +import org.springframework.security.config.annotation.web.configuration.EnableWebSecurity; +import org.springframework.security.config.annotation.web.configurers.AbstractHttpConfigurer; +import org.springframework.security.web.SecurityFilterChain; +import org.springframework.web.cors.CorsConfiguration; +import org.springframework.web.cors.CorsConfigurationSource; +import org.springframework.web.cors.UrlBasedCorsConfigurationSource; + +import java.util.Arrays; +import java.util.List; + +@Configuration +@EnableWebSecurity +@RequiredArgsConstructor +@DependsOn("liveStockPriceWebSocketService") +public class SecurityConfig { + + + @Bean + public SecurityFilterChain filterChain(HttpSecurity http) throws Exception { + http + .csrf(AbstractHttpConfigurer::disable) + .authorizeHttpRequests(auth -> auth.anyRequest().permitAll()); + return http.build(); + } + + /** + * CORS 설정을 위한 CorsConfigurationSource 빈을 정의 + * JWT 인증을 위해 특정 헤더를 허용하도록 설정 + */ + @Bean + public CorsConfigurationSource corsConfigurationSource() { + + CorsConfiguration configuration = new CorsConfiguration(); + + // 허용할 프론트 도메인 설정 + configuration.setAllowedOriginPatterns(List.of( + "http://localhost:8081", + "https://localhost:8081", + "http://fintory.xyz", + "https://fintory.xyz" + )); + // 허용할 HTTP 메서드 + configuration.setAllowedMethods(Arrays.asList("GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS")); + // 자격 증명(쿠키, HTTP 인증 등) 허용 + configuration.setAllowCredentials(true); + // 모든 헤더 허용 + configuration.setAllowedHeaders(List.of("*")); + // 클라이언트가 접근할 수 있도록 노출할 응답 헤더 + configuration.setExposedHeaders(Arrays.asList("Authorization", "AccessToken", "RefreshToken")); + + configuration.setMaxAge(3600L); + + UrlBasedCorsConfigurationSource source = new UrlBasedCorsConfigurationSource(); + source.registerCorsConfiguration("/**", configuration); + + return source; + } + + +} diff --git a/websocket/src/main/java/com/fintory/websocket/monitoring/config/StompStatsMetricsConfiguration.java b/websocket/src/main/java/com/fintory/websocket/monitoring/config/StompStatsMetricsConfiguration.java new file mode 100644 index 00000000..074ea517 --- /dev/null +++ b/websocket/src/main/java/com/fintory/websocket/monitoring/config/StompStatsMetricsConfiguration.java @@ -0,0 +1,138 @@ +package com.fintory.websocket.monitoring.config; + +import io.micrometer.core.instrument.FunctionCounter; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Tag; +import org.springframework.context.annotation.Configuration; +import org.springframework.web.socket.config.WebSocketMessageBrokerStats; +import org.springframework.web.socket.messaging.StompSubProtocolHandler; +import org.springframework.web.socket.messaging.SubProtocolWebSocketHandler; + +import java.util.Arrays; +import java.util.Collections; + +@Configuration +public class StompStatsMetricsConfiguration { + + private final WebSocketMessageBrokerStats stats; + private final MeterRegistry registry; + + public StompStatsMetricsConfiguration(WebSocketMessageBrokerStats stats, MeterRegistry registry) { + this.stats = stats; + this.registry = registry; + registerMetrics(); + } + + private void registerMetrics() { + // 1. WebSocket 세션 통계 (Gauge) + registerSessionMetrics(); + + // 2. STOMP 프로토콜 통계 (FunctionCounter) + registerStompProtocolMetrics(); + + // 3. 채널 Executor 큐 통계 (Gauge) + registerChannelExecutorMetrics(); + } + + // --- 1. WebSocket 세션 통계 등록 --- + private void registerSessionMetrics() { + final String sessionsGaugeName = "websocket.sessions"; + final String sessionsGaugeNameTotal = "websocket.total.sessions"; + + // 현재 활성 세션 수 (Gauge) + registry.gauge(sessionsGaugeName, Arrays.asList(Tag.of("status", "current"), Tag.of("type", "total")), + this.stats, s -> { + SubProtocolWebSocketHandler.Stats sessionStats = s.getWebSocketSessionStats(); + return (sessionStats != null) ? sessionStats.getWebSocketSessions() : 0; + }); + + // 누적 총 세션 수 (FunctionCounter) + FunctionCounter.builder(sessionsGaugeNameTotal, this.stats, s -> { + SubProtocolWebSocketHandler.Stats sessionStats = s.getWebSocketSessionStats(); + return (sessionStats != null) ? sessionStats.getTotalSessions() : 0; + }) + .description("Total number of WebSocket sessions (accumulated)") + .tag("type", "total_accumulated") + .register(registry); + + // 전송 오류로 종료된 세션 수 (FunctionCounter) + FunctionCounter.builder(sessionsGaugeName + ".closed.abnormally", this.stats, s -> { + SubProtocolWebSocketHandler.Stats sessionStats = s.getWebSocketSessionStats(); + return (sessionStats != null) ? sessionStats.getTransportErrorSessions() : 0; + }) + .description("Number of sessions closed due to transport error") + .tag("reason", "transport_error") + .register(registry); + } + + // --- 2. STOMP 프로토콜 통계 등록 --- + private void registerStompProtocolMetrics() { + final String metricName = "stomp.messages.processed"; + + // CONNECT 프레임 처리 수 (FunctionCounter) + FunctionCounter.builder(metricName, this.stats, s -> { + StompSubProtocolHandler.Stats stompStats = s.getStompSubProtocolStats(); + return (stompStats != null) ? stompStats.getTotalConnect() : 0; + }) + .description("Total number of STOMP CONNECT frames processed") + .tag("action", "CONNECT") + .register(registry); + + // CONNECTED 프레임 처리 수 (FunctionCounter) + FunctionCounter.builder(metricName, this.stats, s -> { + StompSubProtocolHandler.Stats stompStats = s.getStompSubProtocolStats(); + return (stompStats != null) ? stompStats.getTotalConnected() : 0; + }) + .description("Total number of STOMP CONNECTED frames sent") + .tag("action", "CONNECTED") + .register(registry); + + // DISCONNECT 프레임 처리 수 (FunctionCounter) + FunctionCounter.builder(metricName, this.stats, s -> { + StompSubProtocolHandler.Stats stompStats = s.getStompSubProtocolStats(); + return (stompStats != null) ? stompStats.getTotalDisconnect() : 0; + }) + .description("Total number of STOMP DISCONNECT frames processed") + .tag("action", "DISCONNECT") + .register(registry); + } + + // --- 3. 채널 Executor 큐 통계 등록 --- + private void registerChannelExecutorMetrics() { + // 이미 Micrometer Actuator가 executor_queued_tasks 등으로 등록할 가능성이 높지만, + // 명시적인 이름으로 재등록하여 보장성을 높입니다. + + // 인바운드 채널 대기 큐 크기 (Gauge) + registry.gauge("websocket.channel.queue.size", Collections.singletonList(Tag.of("channel", "inbound")), + this.stats, s -> { + String info = s.getClientInboundExecutorStatsInfo(); + return extractQueuedTasks(info); + }); + + // 아웃바운드 채널 대기 큐 크기 (Gauge) + registry.gauge("websocket.channel.queue.size", Collections.singletonList(Tag.of("channel", "outbound")), + this.stats, s -> { + String info = s.getClientOutboundExecutorStatsInfo(); + return extractQueuedTasks(info); + }); + } + + // ThreadPoolExecutor 문자열에서 queued tasks 값을 추출하는 헬퍼 메서드 + private double extractQueuedTasks(String statsInfo) { + if (statsInfo.contains("queued tasks = ")) { + try { + int start = statsInfo.indexOf("queued tasks = ") + "queued tasks = ".length(); + int end = statsInfo.indexOf(",", start); + if (end == -1) { + end = statsInfo.indexOf("]", start); + } + String value = statsInfo.substring(start, end).trim(); + return Double.parseDouble(value); + } catch (Exception e) { + // 파싱 오류 시 0 반환 + return 0; + } + } + return 0; + } +} diff --git a/websocket/src/main/java/com/fintory/websocket/monitoring/config/WebSocketMetrics.java b/websocket/src/main/java/com/fintory/websocket/monitoring/config/WebSocketMetrics.java new file mode 100644 index 00000000..0f5aeb64 --- /dev/null +++ b/websocket/src/main/java/com/fintory/websocket/monitoring/config/WebSocketMetrics.java @@ -0,0 +1,82 @@ +package com.fintory.websocket.monitoring.config; + +import com.fintory.websocket.publisher.service.LiveStockPriceWebSocketService; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.Gauge; +import io.micrometer.core.instrument.MeterRegistry; +import jakarta.annotation.PostConstruct; +import org.springframework.context.annotation.Lazy; +import org.springframework.stereotype.Component; + +import java.util.concurrent.atomic.AtomicInteger; + +//REVIEW 혹시 해당 파일의 위치를 바꾸길 원하시면 리뷰 주세요! ->WebSocketMetrics는 Micrometer와 Prometheus 같은 외부 기술에 의존하기 때문에 infra 모듈에 위치시켰습니다 +@Component +public class WebSocketMetrics { + + private final LiveStockPriceWebSocketService websocketService; + private final MeterRegistry meterRegistry; + private final AtomicInteger activeConnections = new AtomicInteger(0); + private Counter messageSent; + + //REVIEW @Lazy를 쓰기 위해 명시적 생성자 사용 -> @Lazy는 생성자 파라미터에 직접 붙어 있어야 동작함 + // @RequiredConstructor는 생성자 파라미터별 어노테이션을 직접 지원하지 않는 것으로 알고 있음. + public WebSocketMetrics(@Lazy LiveStockPriceWebSocketService websocketService, MeterRegistry meterRegistry) { + this.websocketService = websocketService; + this.meterRegistry = meterRegistry; + } + + @PostConstruct + public void registerMetrics(){ + + // STOMP 활성 연결 수 + Gauge.builder("stomp.connections.active", + activeConnections, AtomicInteger::get) + .description("Active STOMP connections (클라이언트 수)") + .register(meterRegistry); + + // TODO 활성 구독 종목 수 -> 그라파나로 확인한 후 없애기 + // 국내 주식 활성 구독 종목 수 + Gauge.builder("websocket.korean.subscriptions.active", + websocketService, service -> service.getKoreanSubscribedStocks().size()) + .description("Active Korean Stock subscriptions count") + .register(meterRegistry); + + // 해외 주식 활성 구독 종목 수 + Gauge.builder("websocket.overseas.subscriptions.active", + websocketService, service -> service.getOverseasSubscribedStocks().size()) + .description("Active Overseas Stock subscriptions count") + .register(meterRegistry); + + // 국내 Websocket 연결 상태 + Gauge.builder("websocket.korean.connected", + websocketService, service -> service.isKoreanConnected() ? 1.0 : 0.0) + .description("Korean WebSocket connection status") + .register(meterRegistry); + + // 해외 Websocket 연결 상태 + Gauge.builder("websocket.overseas.connected", + websocketService, service -> service.isOverseasConnected() ? 1.0 : 0.0) + .description("Overseas WebSocket connection status") + .register(meterRegistry); + + this.messageSent = Counter.builder("websocket.messages.sent") + .description("Messages sent to Front") + .register(meterRegistry); + } + + // 연결 관리 + public void incrementConnection() { + activeConnections.incrementAndGet(); + } + + public void decrementConnection() { + activeConnections.decrementAndGet(); + } + + public void incrementMessageSent(){ + messageSent.increment(); + } +} + + diff --git a/websocket/src/main/java/com/fintory/websocket/monitoring/config/WebSocketStatsConfig.java b/websocket/src/main/java/com/fintory/websocket/monitoring/config/WebSocketStatsConfig.java new file mode 100644 index 00000000..d5a1f947 --- /dev/null +++ b/websocket/src/main/java/com/fintory/websocket/monitoring/config/WebSocketStatsConfig.java @@ -0,0 +1,27 @@ +package com.fintory.websocket.monitoring.config; + +import jakarta.annotation.PostConstruct; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Configuration; +import org.springframework.web.socket.config.WebSocketMessageBrokerStats; + +@Configuration +public class WebSocketStatsConfig { + + // 설정 파일의 속성 값을 읽어와 주입 + @Value("${spring.websocket.stomp.stats-log-period:30000}") + private long loggingPeriodMillis; + + private final WebSocketMessageBrokerStats stats; + + public WebSocketStatsConfig(WebSocketMessageBrokerStats stats) { + this.stats = stats; + } + + @PostConstruct + public void setCustomLoggingPeriod() { + if (this.loggingPeriodMillis > 0) { + this.stats.setLoggingPeriod(this.loggingPeriodMillis); + } + } +} diff --git a/websocket/src/main/java/com/fintory/websocket/monitoring/listener/WebSocketEventListener.java b/websocket/src/main/java/com/fintory/websocket/monitoring/listener/WebSocketEventListener.java new file mode 100644 index 00000000..0d4899f3 --- /dev/null +++ b/websocket/src/main/java/com/fintory/websocket/monitoring/listener/WebSocketEventListener.java @@ -0,0 +1,30 @@ +package com.fintory.websocket.monitoring.listener; + +import com.fintory.websocket.monitoring.config.WebSocketMetrics; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.event.EventListener; +import org.springframework.stereotype.Component; +import org.springframework.web.socket.messaging.SessionConnectedEvent; +import org.springframework.web.socket.messaging.SessionDisconnectEvent; + + +//REVIEW 서비스 코드는 아니고, 리스너는 인터페이스가 필요없어서 따로 패키지를 만들었는데, 논리상 파일 위치에 문제 있을 시 리뷰 주시면 반영하겠습니다! +@Component +@RequiredArgsConstructor +@Slf4j +public class WebSocketEventListener { + + private final WebSocketMetrics webSocketMetrics; + + // 클라이언트 stomp 연결(STMOP CONNECTED로 응답 완료 후 ) -> SessionConntecdEvent 발행 + @EventListener + public void handleSessionConnect(SessionConnectedEvent event) { + webSocketMetrics.incrementConnection(); + } + + @EventListener + public void handleSessionDisconnect(SessionDisconnectEvent event) { + webSocketMetrics.decrementConnection(); + } +} diff --git a/infra/src/main/java/com/fintory/infra/domain/stock/config/WebSocketClientConfig.java b/websocket/src/main/java/com/fintory/websocket/provider/config/WebSocketClientConfig.java similarity index 91% rename from infra/src/main/java/com/fintory/infra/domain/stock/config/WebSocketClientConfig.java rename to websocket/src/main/java/com/fintory/websocket/provider/config/WebSocketClientConfig.java index d42b7be2..04af4ab0 100644 --- a/infra/src/main/java/com/fintory/infra/domain/stock/config/WebSocketClientConfig.java +++ b/websocket/src/main/java/com/fintory/websocket/provider/config/WebSocketClientConfig.java @@ -1,6 +1,7 @@ -package com.fintory.infra.domain.stock.config; +package com.fintory.websocket.provider.config; -import com.fintory.infra.domain.stock.handler.*; +import com.fintory.websocket.provider.handler.KoreanLiveStockPriceWebSocketHandler; +import com.fintory.websocket.provider.handler.OverseasLiveStockPriceWebSocketHandler; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; diff --git a/infra/src/main/java/com/fintory/infra/domain/stock/config/WebSocketInterceptor.java b/websocket/src/main/java/com/fintory/websocket/provider/config/WebSocketInterceptor.java similarity index 96% rename from infra/src/main/java/com/fintory/infra/domain/stock/config/WebSocketInterceptor.java rename to websocket/src/main/java/com/fintory/websocket/provider/config/WebSocketInterceptor.java index 5d08f786..57db5ae3 100644 --- a/infra/src/main/java/com/fintory/infra/domain/stock/config/WebSocketInterceptor.java +++ b/websocket/src/main/java/com/fintory/websocket/provider/config/WebSocketInterceptor.java @@ -1,4 +1,4 @@ -package com.fintory.infra.domain.stock.config; +package com.fintory.websocket.provider.config; import lombok.extern.slf4j.Slf4j; import org.springframework.http.server.ServerHttpRequest; diff --git a/infra/src/main/java/com/fintory/infra/domain/stock/handler/KoreanLiveStockPriceWebSocketHandler.java b/websocket/src/main/java/com/fintory/websocket/provider/handler/KoreanLiveStockPriceWebSocketHandler.java similarity index 91% rename from infra/src/main/java/com/fintory/infra/domain/stock/handler/KoreanLiveStockPriceWebSocketHandler.java rename to websocket/src/main/java/com/fintory/websocket/provider/handler/KoreanLiveStockPriceWebSocketHandler.java index 7701ac03..bd8e7be5 100644 --- a/infra/src/main/java/com/fintory/infra/domain/stock/handler/KoreanLiveStockPriceWebSocketHandler.java +++ b/websocket/src/main/java/com/fintory/websocket/provider/handler/KoreanLiveStockPriceWebSocketHandler.java @@ -1,4 +1,4 @@ -package com.fintory.infra.domain.stock.handler; +package com.fintory.websocket.provider.handler; import com.fasterxml.jackson.databind.ObjectMapper; import com.fintory.common.exception.DomainErrorCode; @@ -44,10 +44,6 @@ public void setDataCallBack(Consumer dataCallBack) { this.dataCallBack = dataCallBack; } - public void setSaveCallBack(Consumer saveCallBack) { - this.saveCallBack = saveCallBack; - } - //연결 상태 확인 public boolean isConnected(){ @@ -68,14 +64,8 @@ public boolean waitForConnection(long timeoutSeconds){ @Override public void afterConnectionEstablished(WebSocketSession session) { this.session = session; - isConnected.set(true); - connectionLatch.countDown(); - log.info("국내 주식 웹소켓 연결 성공"); - log.info("세션 ID: {}", session.getId()); - log.info("웹소켓 handshake 헤더: {}", session.getHandshakeHeaders()); - log.info("웹소켓 URI: {}", session.getUri()); - log.info("웹소켓 로컬 주소: {}", session.getLocalAddress()); - log.info("웹소켓 리모트 주소: {}", session.getRemoteAddress()); + this.isConnected.set(true); + this.connectionLatch.countDown(); } @Override @@ -91,7 +81,7 @@ public void afterConnectionClosed(WebSocketSession session, CloseStatus status){ public void subscribe(String code){ try { sendSubscribeMessage(code); - Thread.sleep(100); + Thread.sleep(200); }catch (InterruptedException e){ Thread.currentThread().interrupt(); @@ -186,7 +176,6 @@ public void sendUnsubscribeMessage(String code) { //메시지를 받으면 실행되는 메소드 public void handleTextMessage(WebSocketSession session, TextMessage message){ String payload = message.getPayload(); - //log.info(payload); try{ String[] fields = payload.split("\\^"); if (fields.length < 40) return; @@ -206,7 +195,6 @@ public void handleTextMessage(WebSocketSession session, TextMessage message){ executeCallbacks(stockData); - log.debug("주식 데이터 처리 완료: {}", stockData); }catch(Exception e){ log.error("KIS Developer 실시간 현재가 조회 시 응답 받는 과정에서 에러 발생:{}",e.getMessage()); throw new DomainException(DomainErrorCode.WEBSOCKET_MESSAGE_PARSE_FAILED); diff --git a/infra/src/main/java/com/fintory/infra/domain/stock/handler/OverseasLiveStockPriceWebSocketHandler.java b/websocket/src/main/java/com/fintory/websocket/provider/handler/OverseasLiveStockPriceWebSocketHandler.java similarity index 90% rename from infra/src/main/java/com/fintory/infra/domain/stock/handler/OverseasLiveStockPriceWebSocketHandler.java rename to websocket/src/main/java/com/fintory/websocket/provider/handler/OverseasLiveStockPriceWebSocketHandler.java index dba0c9cd..a2b9bd34 100644 --- a/infra/src/main/java/com/fintory/infra/domain/stock/handler/OverseasLiveStockPriceWebSocketHandler.java +++ b/websocket/src/main/java/com/fintory/websocket/provider/handler/OverseasLiveStockPriceWebSocketHandler.java @@ -1,4 +1,4 @@ -package com.fintory.infra.domain.stock.handler; +package com.fintory.websocket.provider.handler; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; @@ -43,15 +43,18 @@ public void setDataCallBack(Consumer dataCallBack) { this.dataCallBack = dataCallBack; } - public void setSaveCallBack(Consumer saveCallBack) { - this.saveCallBack = saveCallBack; - } - // 연결 상태 확인 public boolean isConnected() { return isConnected.get() && session != null && session.isOpen(); } + @Override + public void afterConnectionEstablished(WebSocketSession session) { + this.session = session; + this.isConnected.set(true); + this.connectionLatch.countDown(); + } + // 연결 대기 public boolean waitForConnection(long timeoutSeconds) { try { @@ -63,19 +66,6 @@ public boolean waitForConnection(long timeoutSeconds) { } } - @Override - public void afterConnectionEstablished(WebSocketSession session) { - this.session = session; - isConnected.set(true); - connectionLatch.countDown(); - log.info("해외 주식 웹소켓 연결 성공"); - log.info("세션 ID: {}", session.getId()); - log.info("웹소켓 handshake 헤더: {}", session.getHandshakeHeaders()); - log.info("웹소켓 URI: {}", session.getUri()); - log.info("웹소켓 로컬 주소: {}", session.getLocalAddress()); - log.info("웹소켓 리모트 주소: {}", session.getRemoteAddress()); - } - @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus status) { this.session = null; @@ -107,7 +97,6 @@ public void sendSubscribeMessage(String code) { try { String token = (String) redisTemplate.opsForValue().get("db-access-token"); - log.info("토큰 확인"+token); if (token == null || token.trim().isEmpty()) { log.error("Redis에서 DB 토큰을 찾을 수 없습니다."); throw new DomainException(DomainErrorCode.TOKEN_NOT_FOUND); @@ -130,7 +119,6 @@ public void sendSubscribeMessage(String code) { String message = objectMapper.writeValueAsString(request); session.sendMessage(new TextMessage(message)); - log.info("해외 주식 구독 메시지 전송 완료 - 종목: {}", code); } catch (Exception e) { log.error("DB API 실시간 현재가 데이터 조회 메시지 요청 중 에러 발생 - 종목: {}, 에러: {}", code, e.getMessage()); @@ -177,7 +165,6 @@ public void sendUnsubscribeMessage(String code) { String jsonMessage = objectMapper.writeValueAsString(request); session.sendMessage(new TextMessage(jsonMessage)); - log.info("해외 주식 구독 해제 메시지 전송 완료 - 종목: {}", code); } catch (Exception e) { log.error("DB API 실시간 현재가 데이터 구독 해제 요청 중 에러 발생 - 종목: {}, 에러: {}", code, e.getMessage()); @@ -190,9 +177,6 @@ public void sendUnsubscribeMessage(String code) { @Override public void handleTextMessage(WebSocketSession session, TextMessage message) { String payload = message.getPayload(); - //log.info("수신된 payload: {}", payload); - - synchronized (sendLock) { try { JsonNode root = objectMapper.readTree(payload); @@ -217,9 +201,6 @@ private void parseAndProcessMessage(String payload) { try { JsonNode root = objectMapper.readTree(payload); - JsonNode header = root.get("header"); - - JsonNode body = root.get("body"); // 안전한 필드 추출 @@ -239,8 +220,6 @@ private void parseAndProcessMessage(String payload) { // 콜백 실행 executeCallbacks(stockData); - //log.debug("해외 주식 데이터 처리 완료: {}", stockData); - } catch (Exception e) { log.error("메시지 파싱 중 에러 발생: {}", e.getMessage()); throw new DomainException(DomainErrorCode.WEBSOCKET_MESSAGE_PARSE_FAILED); diff --git a/websocket/src/main/java/com/fintory/websocket/provider/service/StockSubscriptionService.java b/websocket/src/main/java/com/fintory/websocket/provider/service/StockSubscriptionService.java new file mode 100644 index 00000000..e8bc6a3d --- /dev/null +++ b/websocket/src/main/java/com/fintory/websocket/provider/service/StockSubscriptionService.java @@ -0,0 +1,84 @@ +package com.fintory.websocket.provider.service; + + +import com.fintory.domain.stock.model.Stock; +import com.fintory.websocket.provider.handler.KoreanLiveStockPriceWebSocketHandler; +import com.fintory.websocket.provider.handler.OverseasLiveStockPriceWebSocketHandler; +import com.fintory.websocket.publisher.repository.StockRepository; +import com.fintory.websocket.publisher.service.MarketTimeService; +import com.fintory.websocket.publisher.state.StockDataHolder; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; +import java.util.List; + + +@Service +@Slf4j +@RequiredArgsConstructor +public class StockSubscriptionService { + private final StockDataHolder stockDataHolder; + private final KoreanLiveStockPriceWebSocketHandler koreanHandler; + private final OverseasLiveStockPriceWebSocketHandler overseasHandler; + + private final WebSocketConnectionService connectionService; + private final StockRepository stockRepository; + private final MarketTimeService marketTimeService; + + /* 장 시작 시 자동으로 필요한 종목 전부 구독*/ + public void startKoreanMarketSubscription(){ + List targetStocks = stockRepository.findByCurrencyName("KRW"); + + if (!marketTimeService.isKoreanMarketOpen()) { + return; + } + + connectionService.connectKoreanWebSocket(); + + int beforeSize = stockDataHolder.getKoreanSubscribedStocks().size(); + + targetStocks.forEach(dto -> { + if(!stockDataHolder.getKoreanSubscribedStocks().contains(dto.getCode())) { + try { + koreanHandler.subscribe(dto.getCode()); + stockDataHolder.getKoreanSubscribedStocks().add(dto.getCode()); + }catch (Exception e){ + log.error("종목 {} 구독 실패: {}", dto.getCode(), e.getMessage()); + } + } + }); + int successCount = stockDataHolder.getKoreanSubscribedStocks().size() - beforeSize; + log.info("장 시작 - 총 {} 종목 중 {} 종목 구독 완료", + targetStocks.size(), successCount); + } + + + public void startOverseasMarketSubscription(){ + + List targetStocks = stockRepository.findByCurrencyName("USD"); + + if (!marketTimeService.isOverseasMarketOpen()) { + return; + } + connectionService.connectOverseasWebSocket(); + + int beforeSize = stockDataHolder.getOverseasSubscribedStocks().size(); + + targetStocks.forEach(stock -> { + if (!stockDataHolder.getOverseasSubscribedStocks().contains(stock.getCode())) { + try { + overseasHandler.subscribe(stock.getCode()); + stockDataHolder.getOverseasSubscribedStocks().add(stock.getCode()); + }catch(Exception e){ + log.error("종목 {} 구독 실패: {}", stock.getCode(), e.getMessage()); + } + } + }); + + int successCount = stockDataHolder.getOverseasSubscribedStocks().size() - beforeSize; + log.info("장 시작 - 총 {} 종목 중 {} 종목 구독 완료", + targetStocks.size(), successCount); + } + + +} diff --git a/websocket/src/main/java/com/fintory/websocket/provider/service/WebSocketConnectionService.java b/websocket/src/main/java/com/fintory/websocket/provider/service/WebSocketConnectionService.java new file mode 100644 index 00000000..013eff46 --- /dev/null +++ b/websocket/src/main/java/com/fintory/websocket/provider/service/WebSocketConnectionService.java @@ -0,0 +1,203 @@ +package com.fintory.websocket.provider.service; + +import com.fintory.common.exception.DomainErrorCode; +import com.fintory.common.exception.DomainException; +import com.fintory.domain.stock.dto.websocket.LiveStockPriceStream; +import com.fintory.websocket.provider.handler.KoreanLiveStockPriceWebSocketHandler; +import com.fintory.websocket.provider.handler.OverseasLiveStockPriceWebSocketHandler; +import com.fintory.websocket.publisher.service.StockDataProcessService; +import com.fintory.websocket.publisher.state.StockDataHolder; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.http.HttpEntity; +import org.springframework.http.HttpHeaders; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.stereotype.Service; +import org.springframework.web.client.RestTemplate; +import org.springframework.web.socket.client.WebSocketConnectionManager; + +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Consumer; + +@Service +@Slf4j +public class WebSocketConnectionService { + + private final RedisTemplate redisTemplate; + @Value("${db-openapi.base-url}") + private String baseUrl; + + private final StockDataHolder stockDataHolder; + private final WebSocketConnectionManager koreanConnectionManager; + private final WebSocketConnectionManager overseasConnectionManager; + private final StockDataProcessService stockDataProcessService; + private final RestTemplate restTemplate; + private final KoreanLiveStockPriceWebSocketHandler koreanHandler; + private final OverseasLiveStockPriceWebSocketHandler overseasHandler; + + public WebSocketConnectionService(StockDataHolder stockDataHolder, + @Qualifier("koreanLiveStockPriceWebSocketConnectionManager")WebSocketConnectionManager koreanConnectionManager, + @Qualifier("overseasLiveStockPriceWebSocketConnectionManager")WebSocketConnectionManager overseasConnectionManager, + StockDataProcessService stockDataProcessService, RestTemplate restTemplate, + KoreanLiveStockPriceWebSocketHandler koreanHandler, + OverseasLiveStockPriceWebSocketHandler overseasHandler, + RedisTemplate redisTemplate) { + this.stockDataHolder = stockDataHolder; + this.koreanConnectionManager = koreanConnectionManager; + this.overseasConnectionManager = overseasConnectionManager; + this.stockDataProcessService = stockDataProcessService; + this.restTemplate = restTemplate; + this.koreanHandler = koreanHandler; + this.overseasHandler = overseasHandler; + this.redisTemplate = redisTemplate; + } + + + /* WebSocket 연결 관리 */ + public void connectKoreanWebSocket() { + if (stockDataHolder.getIsKoreanConnected().get()) { + return; + } + + Consumer callback = dto -> + stockDataProcessService.processStreamData(dto, + stockDataHolder.getPreviousKoreanData(), + stockDataHolder.getKoreanPendingData(), "국내"); + + koreanHandler.setDataCallBack(callback); + koreanConnectionManager.start(); + + boolean connected = koreanHandler.waitForConnection(30); + if (!connected) { + log.error("연결 실패!"); + throw new DomainException(DomainErrorCode.WEBSOCKET_CONNECTION_FAILED); + } + + stockDataHolder.getIsKoreanConnected().set(true); + log.info("국내 주식 WebSocket 연결 완료"); + } + + + public void connectOverseasWebSocket() { + if (stockDataHolder.getIsOverseasConnected().get()) { + return; + } + Consumer callback = dto -> + stockDataProcessService.processStreamData(dto, stockDataHolder.getPreviousOverseasData(), stockDataHolder.getOverseasPendingData(), "해외"); + + overseasHandler.setDataCallBack(callback); + overseasConnectionManager.start(); + + boolean connected = overseasHandler.waitForConnection(30); + if (!connected) { + log.info("해외 장시간임에도 WebSocket 연결 실패 - 공휴일이거나 기술적 문제일 수 있음"); + throw new DomainException(DomainErrorCode.WEBSOCKET_CONNECTION_FAILED); + } + + stockDataHolder.getIsOverseasConnected().set(true); + log.info("해외 주식 WebSocket 연결 완료"); + } + + + + + public void disconnectKoreanWebSocket() { + if (!stockDataHolder.getIsKoreanConnected().get()) return; + + log.info("국내 WebSocket 연결 해제 시작"); + try { + new ArrayList<>(stockDataHolder.getKoreanSubscribedStocks()).forEach(code -> { + try { + koreanHandler.unsubscribe(code); + stockDataHolder.getKoreanSubscribedStocks().remove(code); + } catch (Exception e) { + log.warn("국내 종목 {} 구독 해제 중 에러 발생: {}", code, e.getMessage()); + } + }); + Thread.sleep(1000); //서버 처리 대기 + } catch (Exception e) { + log.error("구독 해제 중 에러: {}", e.getMessage()); + } finally { + // 반드시 실행 + koreanConnectionManager.stop(); + stockDataHolder.getKoreanSubscribedStocks().clear(); + stockDataHolder.getPreviousKoreanData().clear(); + stockDataHolder.getKoreanPendingData().clear(); + } + + log.info("국내 WebSocket 연결 해제 완료"); + } + + + public void disconnectOverseasWebSocket() { + if (!stockDataHolder.getIsOverseasConnected().get()) return; + + log.info("해외 WebSocket 연결 해제 시작"); + try { + new ArrayList<>(stockDataHolder.getOverseasSubscribedStocks()).forEach(code -> { + try { + //overseasHandler.unsubscribe(code); + stockDataHolder.getOverseasSubscribedStocks().remove(code); + } catch (Exception e) { + log.warn("해외 종목 {} 구독 해제 중 에러 발생: {}", code, e.getMessage()); + } + }); + Thread.sleep(1000); //서버 처리 대기 + try { + disconnectDBSession(); //db증권은 세션 정리를 하지 않을 경우 에러 발생함 + Thread.sleep(500); + } catch (Exception e) { + log.warn("세션 종료 실패 (무시): {}", e.getMessage()); // 에러 무시 + } + } catch (Exception e) { + log.error("구독 해제 중 에러: {}", e.getMessage()); + } finally { + overseasConnectionManager.stop(); + stockDataHolder.getIsOverseasConnected().set(false); + stockDataHolder.getOverseasSubscribedStocks().clear(); + stockDataHolder.getPreviousOverseasData().clear(); + stockDataHolder.getOverseasPendingData().clear(); + + } + + log.info("해외 WebSocket 연결 해제 완료"); + } + + + public void disconnectDBSession(){ + try { + //ERROR LettuceConnectionFactory has been STOPPED. Use start() to initialize it + HttpHeaders headers = new HttpHeaders(); + headers.setContentType(MediaType.APPLICATION_JSON); + String token = stockDataHolder.getCachedAccessToken(); + headers.set("authorization", "Bearer " +token); + + HttpEntity> entity = new HttpEntity<>(new HashMap<>(), headers); + + ResponseEntity response = restTemplate.postForEntity( + baseUrl + "/api/v1/websocket/disconnectSession", + entity, + Map.class + ); + log.info(response.getBody().toString()); + if (response.getStatusCode().is2xxSuccessful()) { + Map body = response.getBody(); + log.info("웹소켓 세션 초기화 성공: {}", body.get("result")); + } else { + log.warn("웹소켓 세션 초기화 응답 이상: {}", response.getStatusCode()); + } + + }catch (Exception e) { + log.error("웹소켓 세션 초기화 중 오류 발생", e); + throw new RuntimeException("웹소켓 세션 초기화 실패: " + e.getMessage()); + } + } + + +} diff --git a/infra/src/main/java/com/fintory/infra/domain/stock/config/WebSocketBrokerConfig.java b/websocket/src/main/java/com/fintory/websocket/publisher/config/WebSocketBrokerConfig.java similarity index 83% rename from infra/src/main/java/com/fintory/infra/domain/stock/config/WebSocketBrokerConfig.java rename to websocket/src/main/java/com/fintory/websocket/publisher/config/WebSocketBrokerConfig.java index 7639fdb8..731a926b 100644 --- a/infra/src/main/java/com/fintory/infra/domain/stock/config/WebSocketBrokerConfig.java +++ b/websocket/src/main/java/com/fintory/websocket/publisher/config/WebSocketBrokerConfig.java @@ -1,6 +1,5 @@ -package com.fintory.infra.domain.stock.config; +package com.fintory.websocket.publisher.config; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.messaging.simp.config.MessageBrokerRegistry; @@ -16,15 +15,6 @@ @EnableWebSocketMessageBroker public class WebSocketBrokerConfig implements WebSocketMessageBrokerConfigurer { - // REVIEW 현재 dev를 pull 받으니까 순환 참조 문제 발생 - /* - private final TaskScheduler messageBrokerTaskScheduler; - - @Autowired - public WebSocketBrokerConfig(TaskScheduler webSocketTaskScheduler) { - this.messageBrokerTaskScheduler = webSocketTaskScheduler; - } -*/ @Bean(name = "webSocketTaskScheduler") public TaskScheduler messageBrokerTaskScheduler() { ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); diff --git a/websocket/src/main/java/com/fintory/websocket/publisher/controller/MarketApiController.java b/websocket/src/main/java/com/fintory/websocket/publisher/controller/MarketApiController.java new file mode 100644 index 00000000..f2f26d00 --- /dev/null +++ b/websocket/src/main/java/com/fintory/websocket/publisher/controller/MarketApiController.java @@ -0,0 +1,20 @@ +package com.fintory.websocket.publisher.controller; + +import com.fintory.domain.stock.dto.websocket.MarketStatusResponse; +import com.fintory.websocket.publisher.service.MarketTimeService; +import lombok.RequiredArgsConstructor; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +@RestController +@RequestMapping("/api/websocket/market") +@RequiredArgsConstructor +public class MarketApiController { + private final MarketTimeService marketTimeService; + + @GetMapping("/status") + public MarketStatusResponse getMarketStatus() { + return marketTimeService.getMarketStatus(); + } +} diff --git a/websocket/src/main/java/com/fintory/websocket/publisher/repository/LiveStockPriceRepository.java b/websocket/src/main/java/com/fintory/websocket/publisher/repository/LiveStockPriceRepository.java new file mode 100644 index 00000000..3ee7a125 --- /dev/null +++ b/websocket/src/main/java/com/fintory/websocket/publisher/repository/LiveStockPriceRepository.java @@ -0,0 +1,11 @@ +package com.fintory.websocket.publisher.repository; + +import com.fintory.domain.stock.model.LiveStockPrice; +import com.fintory.domain.stock.model.Stock; +import org.springframework.data.jpa.repository.JpaRepository; + +import java.util.Optional; + +public interface LiveStockPriceRepository extends JpaRepository { + Optional findByStock(Stock stock); +} diff --git a/websocket/src/main/java/com/fintory/websocket/publisher/repository/StockPriceHistoryRepository.java b/websocket/src/main/java/com/fintory/websocket/publisher/repository/StockPriceHistoryRepository.java new file mode 100644 index 00000000..918080ec --- /dev/null +++ b/websocket/src/main/java/com/fintory/websocket/publisher/repository/StockPriceHistoryRepository.java @@ -0,0 +1,24 @@ +package com.fintory.websocket.publisher.repository; + +import com.fintory.domain.stock.model.IntervalType; +import com.fintory.domain.stock.model.Stock; +import com.fintory.domain.stock.model.StockPriceHistory; +import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.data.jpa.repository.Query; +import org.springframework.stereotype.Repository; + +import java.time.LocalDate; +import java.util.List; +import java.util.Optional; + +@Repository +public interface StockPriceHistoryRepository extends JpaRepository { + + + @Query("SELECT sph FROM StockPriceHistory sph WHERE sph.stock=:stock AND sph.intervalType=:intervalType AND sph.date=:now ORDER BY sph.updatedAt ASC LIMIT 1") + Optional findOldestByStockAndIntervalTypeAndDate(Stock stock, IntervalType intervalType, LocalDate now); + + void deleteByStockAndIntervalTypeAndDateBefore(Stock stock, IntervalType intervalType, LocalDate now); + + List findByStockAndIntervalTypeAndDate(Stock stock, IntervalType intervalType, LocalDate localDate); +} diff --git a/websocket/src/main/java/com/fintory/websocket/publisher/repository/StockRepository.java b/websocket/src/main/java/com/fintory/websocket/publisher/repository/StockRepository.java new file mode 100644 index 00000000..f24cf28d --- /dev/null +++ b/websocket/src/main/java/com/fintory/websocket/publisher/repository/StockRepository.java @@ -0,0 +1,15 @@ +package com.fintory.websocket.publisher.repository; + +import com.fintory.domain.stock.model.Stock; +import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.stereotype.Repository; + +import java.util.List; +import java.util.Optional; + +@Repository +public interface StockRepository extends JpaRepository { + Optional findByCode(String code); + + List findByCurrencyName(String krw); +} diff --git a/infra/src/main/java/com/fintory/infra/domain/stock/service/websocket/LiveStockPriceWebSocketSaverServiceImpl.java b/websocket/src/main/java/com/fintory/websocket/publisher/service/LiveStockPriceWebSocketSaverService.java similarity index 91% rename from infra/src/main/java/com/fintory/infra/domain/stock/service/websocket/LiveStockPriceWebSocketSaverServiceImpl.java rename to websocket/src/main/java/com/fintory/websocket/publisher/service/LiveStockPriceWebSocketSaverService.java index f292417f..68745d6f 100644 --- a/infra/src/main/java/com/fintory/infra/domain/stock/service/websocket/LiveStockPriceWebSocketSaverServiceImpl.java +++ b/websocket/src/main/java/com/fintory/websocket/publisher/service/LiveStockPriceWebSocketSaverService.java @@ -1,4 +1,4 @@ -package com.fintory.infra.domain.stock.service.websocket; +package com.fintory.websocket.publisher.service; import com.fintory.common.exception.DomainErrorCode; import com.fintory.common.exception.DomainException; @@ -7,10 +7,9 @@ import com.fintory.domain.stock.model.LiveStockPrice; import com.fintory.domain.stock.model.Stock; import com.fintory.domain.stock.model.StockPriceHistory; -import com.fintory.domain.stock.service.websocket.LiveStockPriceWebSocketSaverService; -import com.fintory.infra.domain.stock.repository.LiveStockPriceRepository; -import com.fintory.infra.domain.stock.repository.StockPriceHistoryRepository; -import com.fintory.infra.domain.stock.repository.StockRepository; +import com.fintory.websocket.publisher.repository.LiveStockPriceRepository; +import com.fintory.websocket.publisher.repository.StockPriceHistoryRepository; +import com.fintory.websocket.publisher.repository.StockRepository; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; @@ -26,7 +25,7 @@ @Service @RequiredArgsConstructor @Slf4j -public class LiveStockPriceWebSocketSaverServiceImpl implements LiveStockPriceWebSocketSaverService { +public class LiveStockPriceWebSocketSaverService { private final StockPriceHistoryRepository stockPriceHistoryRepository; private final StockRepository stockRepository; @@ -35,7 +34,6 @@ public class LiveStockPriceWebSocketSaverServiceImpl implements LiveStockPriceWe private static final Map todayOpenPrices = new ConcurrentHashMap<>(); //데이터 DB에 저장 메소드 - @Override @Transactional public void saveStockData(LiveStockPriceStream dto) { Stock stock = stockRepository.findByCode(dto.code()) diff --git a/websocket/src/main/java/com/fintory/websocket/publisher/service/LiveStockPriceWebSocketService.java b/websocket/src/main/java/com/fintory/websocket/publisher/service/LiveStockPriceWebSocketService.java new file mode 100644 index 00000000..47a47e1a --- /dev/null +++ b/websocket/src/main/java/com/fintory/websocket/publisher/service/LiveStockPriceWebSocketService.java @@ -0,0 +1,155 @@ +package com.fintory.websocket.publisher.service; + + + +import com.fintory.websocket.provider.service.StockSubscriptionService; +import com.fintory.websocket.provider.service.WebSocketConnectionService; +import com.fintory.websocket.publisher.state.StockDataHolder; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import org.springframework.context.annotation.DependsOn; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Service; +import java.util.*; + + + +//NOTE 구독 시도시 -> 에러 코드를 보고 프론트에서 DB API 호출 +//NOTE 구독 성공 후 일정시간 동안 데이터가 오지 않으면 -> 프론트에서 연결 끊김 판단 +@Service +@Slf4j +@RequiredArgsConstructor +public class LiveStockPriceWebSocketService { + + + private final StockDataHolder stockDataHolder; + private final StockSubscriptionService stockSubscriptionService; + private final StockDataBatchSaveService stockDataBatchSaveService; + private final WebSocketConnectionService webSocketConnectionService; + private final MarketTimeService marketTimeService; + private final RedisTemplate redisTemplate; + + /* 구독 자동 실행 메소드 */ + @Scheduled(cron="0 30 09 * * MON-FRI", zone="America/New_York") + public void scheduledOverseasMarketSubscription(){ + stockSubscriptionService.startOverseasMarketSubscription(); + } + + @Scheduled(cron="0 0 9 * * MON-FRI", zone="Asia/Seoul") + public void scheduledKoreanMarketSubscription(){ + stockSubscriptionService.startKoreanMarketSubscription(); + } + + + @PostConstruct + public void initMarketSubscriptions() { + // 국내 장 체크 및 구독 + if (marketTimeService.isKoreanMarketOpen()) { + log.info("애플리케이션 시작 - 국내 장 열림, 자동 구독 시작"); + stockDataHolder.setCachedAccessToken((String) redisTemplate.opsForValue().get("kis-access-token")); + stockSubscriptionService.startKoreanMarketSubscription(); + } else { + log.info("국내 장이 열려있지 않아 자동 구독 스킵"); + } + + // 해외 장 체크 및 구독 + if (marketTimeService.isOverseasMarketOpen()) { + log.info("애플리케이션 시작 - 해외 장 열림, 자동 구독 시작"); + stockDataHolder.setCachedAccessToken((String) redisTemplate.opsForValue().get("db-access-token")); + stockSubscriptionService.startOverseasMarketSubscription(); + } else { + log.info("해외 장이 열려있지 않아 자동 구독 스킵"); + } + } + + /* 스케줄링 - 배치 저장 */ + @Scheduled(cron = "0 * 9-15 * * MON-FRI", zone = "Asia/Seoul") + public void saveKoreanStockDataBatch() { + if (!marketTimeService.isKoreanMarketOpen()) { + return; + } + stockDataBatchSaveService.saveBatchData("국내", stockDataHolder.getKoreanPendingData()); + } + + @Scheduled(cron = "0 * 9-15 * * MON-FRI", zone = "America/New_York") + public void saveOverseasStockDataBatch() { + if (!marketTimeService.isOverseasMarketOpen()) { + return; + } + stockDataBatchSaveService.saveBatchData("해외", stockDataHolder.getOverseasPendingData()); + } + + /* 스케줄링 - 장 마감 정리 */ + @Scheduled(cron = "0 20 15 * * MON-FRI", zone = "Asia/Seoul") + public void cleanUpAfterKoreanMarketClose() { + stockDataBatchSaveService.saveRemainingData("국내", stockDataHolder.getKoreanPendingData()); + webSocketConnectionService.disconnectKoreanWebSocket(); + log.info("국내 장 마감 정리 완료"); + } + + @Scheduled(cron = "0 0 16 * * MON-FRI", zone = "America/New_York") + public void cleanUpAfterOverseasMarketClose() { + stockDataBatchSaveService.saveRemainingData("해외", stockDataHolder.getOverseasPendingData()); + webSocketConnectionService.disconnectOverseasWebSocket(); + log.info("해외 장 마감 정리 완료"); + } + + + @PreDestroy + public void cleanUp() { + try { + // 남은 데이터 저장 + stockDataBatchSaveService.saveRemainingData("국내", stockDataHolder.getKoreanPendingData()); + stockDataBatchSaveService.saveRemainingData("해외", stockDataHolder.getOverseasPendingData()); + // 웹소켓 연결 해제 + if (stockDataHolder.getIsKoreanConnected().get()) { + webSocketConnectionService.disconnectKoreanWebSocket(); + } + if (stockDataHolder.getIsOverseasConnected().get()) { + webSocketConnectionService.disconnectOverseasWebSocket(); + } + + // 최종 리소스 정리 -> (안전장치) + stockDataHolder.getKoreanSubscribedStocks().clear(); + stockDataHolder.getOverseasSubscribedStocks().clear(); + stockDataHolder.getPreviousKoreanData().clear(); + stockDataHolder.getPreviousOverseasData().clear(); + stockDataHolder.getKoreanPendingData().clear(); + stockDataHolder.getOverseasPendingData().clear(); + + } catch (Exception e) { + log.error("WebSocket cleanup 중 에러 발생", e); + + // 에러 발생해도 리소스는 강제 정리 + stockDataHolder.getKoreanSubscribedStocks().clear(); + stockDataHolder.getOverseasSubscribedStocks().clear(); + stockDataHolder.getPreviousKoreanData().clear(); + stockDataHolder.getPreviousOverseasData().clear(); + stockDataHolder.getKoreanPendingData().clear(); + stockDataHolder.getOverseasPendingData().clear(); + } + + log.info("WebSocket 연결 해제 완료"); + } + + /* 메트릭용 Getter 추가 */ + public Set getKoreanSubscribedStocks() { + return stockDataHolder.getKoreanSubscribedStocks(); + } + + public Set getOverseasSubscribedStocks() { + return stockDataHolder.getOverseasSubscribedStocks(); + } + + public boolean isKoreanConnected() { + return stockDataHolder.getIsKoreanConnected().get(); + } + + public boolean isOverseasConnected() { + return stockDataHolder.getIsOverseasConnected().get(); + } +} \ No newline at end of file diff --git a/websocket/src/main/java/com/fintory/websocket/publisher/service/MarketTimeService.java b/websocket/src/main/java/com/fintory/websocket/publisher/service/MarketTimeService.java new file mode 100644 index 00000000..753943ef --- /dev/null +++ b/websocket/src/main/java/com/fintory/websocket/publisher/service/MarketTimeService.java @@ -0,0 +1,51 @@ +package com.fintory.websocket.publisher.service; + +import com.fintory.domain.stock.dto.websocket.MarketStatusResponse; +import com.fintory.websocket.publisher.state.StockDataHolder; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Service; + +import java.time.DayOfWeek; +import java.time.LocalTime; +import java.time.ZoneId; +import java.time.ZonedDateTime; + +@Service +@RequiredArgsConstructor +public class MarketTimeService { + + private final StockDataHolder stockDataHolder; + + public boolean isKoreanMarketOpen() { + ZonedDateTime now = ZonedDateTime.now(ZoneId.of("Asia/Seoul")); + boolean weekday = now.getDayOfWeek() != DayOfWeek.SATURDAY && now.getDayOfWeek() != DayOfWeek.SUNDAY; + return weekday + && !now.toLocalTime().isBefore(LocalTime.of(9, 0)) + && now.toLocalTime().isBefore(LocalTime.of(15, 30)); + } + + public boolean isOverseasMarketOpen() { + ZonedDateTime now = ZonedDateTime.now(ZoneId.of("America/New_York")); + boolean weekday = now.getDayOfWeek() != DayOfWeek.SATURDAY && now.getDayOfWeek() != DayOfWeek.SUNDAY; + return weekday + && !now.toLocalTime().isBefore(LocalTime.of(9, 0)) + && now.toLocalTime().isBefore(LocalTime.of(16, 0)); + } + + + public MarketStatusResponse getMarketStatus() { + // 국내 장 시간이면 "korean" + if (stockDataHolder.getIsKoreanConnected().get() && isKoreanMarketOpen()) { + return new MarketStatusResponse("korean"); + } + + // 해외 장 시간이면 "overseas" + if (stockDataHolder.getIsOverseasConnected().get() && isOverseasMarketOpen()) { + return new MarketStatusResponse("overseas"); + } + + // 둘 다 아니면 "no" + return new MarketStatusResponse("no"); + } + +} diff --git a/websocket/src/main/java/com/fintory/websocket/publisher/service/StockDataBatchSaveService.java b/websocket/src/main/java/com/fintory/websocket/publisher/service/StockDataBatchSaveService.java new file mode 100644 index 00000000..c2d58ac6 --- /dev/null +++ b/websocket/src/main/java/com/fintory/websocket/publisher/service/StockDataBatchSaveService.java @@ -0,0 +1,54 @@ +package com.fintory.websocket.publisher.service; + +import com.fintory.domain.stock.dto.websocket.LiveStockPriceStream; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +import java.util.HashMap; +import java.util.Map; + + +@Service +@Slf4j +@RequiredArgsConstructor +public class StockDataBatchSaveService { + + private final LiveStockPriceWebSocketSaverService liveStockPriceWebSocketSaverService; + + public void saveBatchData(String marketName, Map pendingData) { + if (pendingData.isEmpty()) return; + + Map dataToSave = new HashMap<>(pendingData); + pendingData.clear(); + + dataToSave.values().forEach(dto -> { + try { + liveStockPriceWebSocketSaverService.saveStockData(dto); + } catch (Exception e) { + log.error("{} 종목 {} 저장 실패: {}", marketName, dto.code(), e.getMessage()); + } + }); + + log.info("{} 주식 배치 저장 완료 - 저장된 종목 수: {}", marketName, dataToSave.size()); + } + + public void saveRemainingData(String marketName, Map pendingData) { + if (!pendingData.isEmpty()) { + + Map dataToSave = new HashMap<>(pendingData); + pendingData.clear(); + + dataToSave.values().forEach(dto -> { + try { + liveStockPriceWebSocketSaverService.saveStockData(dto); + } catch (Exception e) { + log.error("{} 종목 {} 마지막 저장 실패: {}", marketName, dto.code(), e.getMessage()); + } + }); + + log.info("{} 주식 마지막 배치 저장 완료 - 저장된 종목 수: {}", marketName, dataToSave.size()); + } + } + +} diff --git a/websocket/src/main/java/com/fintory/websocket/publisher/service/StockDataProcessService.java b/websocket/src/main/java/com/fintory/websocket/publisher/service/StockDataProcessService.java new file mode 100644 index 00000000..3c8d216d --- /dev/null +++ b/websocket/src/main/java/com/fintory/websocket/publisher/service/StockDataProcessService.java @@ -0,0 +1,102 @@ +package com.fintory.websocket.publisher.service; + +import com.fintory.domain.stock.dto.websocket.LiveStockPriceStream; +import com.fintory.websocket.monitoring.config.WebSocketMetrics; +import io.micrometer.core.instrument.MeterRegistry; +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.annotation.Lazy; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.messaging.simp.SimpMessageHeaderAccessor; +import org.springframework.messaging.simp.SimpMessagingTemplate; +import org.springframework.stereotype.Service; +import io.micrometer.core.instrument.Timer; + +import java.math.BigDecimal; +import java.util.Map; + +@Service +@Slf4j +public class StockDataProcessService { + private final WebSocketMetrics webSocketMetrics; + private final SimpMessagingTemplate messageTemplate; + private final Timer dataProcessingTime; + private final LiveStockPriceWebSocketSaverService liveStockPriceWebSocketSaverService; + private final RedisTemplate redisTemplate; + private static final String PRICE_ALERT_CHANNEL = "price:alert:channel"; + + public StockDataProcessService(@Lazy WebSocketMetrics webSocketMetrics, + SimpMessagingTemplate messageTemplate, + MeterRegistry meterRegistry, + LiveStockPriceWebSocketSaverService liveStockPriceWebSocketSaverService, RedisTemplate redisTemplate) { + this.webSocketMetrics = webSocketMetrics; + this.messageTemplate = messageTemplate; + this.dataProcessingTime = Timer.builder("websocket.data.processing.time") + .description("Time to process and send stock data") + .publishPercentiles(0.5,0.95,0.99) + .register(meterRegistry); + this.liveStockPriceWebSocketSaverService = liveStockPriceWebSocketSaverService; + this.redisTemplate = redisTemplate; + } + + //웹소켓으로 받은 데이터를 처리하는 메서드 + public void processStreamData(LiveStockPriceStream dto, + Map previousData, + Map pendingData, + String marketName) { + LiveStockPriceStream previous = previousData.get(dto.code()); + + Timer.Sample sample = Timer.start(); + try { + //이전 데이터와 비교하여 중복 체크 + if (previous != null && previous.equals(dto)) { + return; //똑같은 데이터면 무시 + } + //새로운 데이터를 받으면 -> 감시가 이벤트 발행 + // @EventListener는 같은 JVM 내에서만 동작함 -> 다른 통신 방법 필요 -> redis pub/sub 활용 + /* 알림 기능 -> 잠깐 미룬 상태 + try { + + redisTemplate.convertAndSend(PRICE_ALERT_CHANNEL, dto); + } catch (Exception e) { + log.error("Redis Pub/Sub 전송 실패: {}", dto.code(), e); + }*/ + + //스케쥴러 + 웹소켓 연결 시작하자마자 받은 데이터 값(첫 데이터) 저장 + if (previous == null) { + try { + liveStockPriceWebSocketSaverService.saveStockData(dto); //DB에 바로 저장 + } catch (Exception e) { + // 실패 시 배치 저장을 위해 pendingData에 보관 + pendingData.put(dto.code(), dto); + log.error("{} 종목 {} 실시간 저장 실패, 배치 저장 대기: {}", marketName, dto.code(), e.getMessage()); + } + } + + //새로운 데이터면 다음 중복 체크용으로 저장 + previousData.put(dto.code(), dto); + pendingData.put(dto.code(), dto); //배치 저장 대기 + sendStockData(dto.code(), dto); //클라이언트에게 전송 + }finally { + sample.stop(dataProcessingTime); + + } + } + + public void sendStockData(String stockCode, Object stockData) { + if (stockData instanceof LiveStockPriceStream stream) { + /* + if (stream.priceChange() == null || stream.priceChange().compareTo(BigDecimal.ZERO) == 0) { + return; + }*/ + // 지연 시간을 측정하기 위해 STOMP 헤더에 타임스탬프 추가 + // REVIEW 헤더에 데이터를 추가한 것일 뿐 바디는 바뀌지 않으므로 프론트 코드에는 문제가 없는 것으로 알고 있는데 아니라면 수정 필수 + SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.create(); + headerAccessor.setNativeHeader("sentTimestamp", String.valueOf(System.currentTimeMillis())); + + webSocketMetrics.incrementMessageSent(); + messageTemplate.convertAndSend("/topic/stock/live-Price/" + stockCode, stockData, headerAccessor.getMessageHeaders()); + } + } + + +} diff --git a/websocket/src/main/java/com/fintory/websocket/publisher/state/StockDataHolder.java b/websocket/src/main/java/com/fintory/websocket/publisher/state/StockDataHolder.java new file mode 100644 index 00000000..c4544cb2 --- /dev/null +++ b/websocket/src/main/java/com/fintory/websocket/publisher/state/StockDataHolder.java @@ -0,0 +1,35 @@ +package com.fintory.websocket.publisher.state; + +import com.fintory.domain.stock.dto.websocket.LiveStockPriceStream; + + +import lombok.Data; +import org.springframework.stereotype.Component; + +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; + +@Component +@Data +public class StockDataHolder { + + private final Set koreanSubscribedStocks = ConcurrentHashMap.newKeySet(); + private final Set overseasSubscribedStocks = ConcurrentHashMap.newKeySet(); + + // 이전에 받은 주식 데이터 저장 -> 중복 데이터 필터링용 + private final Map previousKoreanData = new ConcurrentHashMap<>(); + private final Map previousOverseasData = new ConcurrentHashMap<>(); + + // db에 저장되지 않은 주식 데이터 임시 저장용 + private final Map koreanPendingData = new ConcurrentHashMap<>(); + private final Map overseasPendingData = new ConcurrentHashMap<>(); + + private final AtomicBoolean isKoreanConnected = new AtomicBoolean(false); + private final AtomicBoolean isOverseasConnected = new AtomicBoolean(false); + + private String cachedAccessToken; + + +} diff --git a/websocket/src/main/resources/application-deploy.yml b/websocket/src/main/resources/application-deploy.yml new file mode 100644 index 00000000..6a17f530 --- /dev/null +++ b/websocket/src/main/resources/application-deploy.yml @@ -0,0 +1,54 @@ +server: + port: 8080 + +spring: + config: + activate: + on-profile: deploy + + datasource: + driver-class-name: com.mysql.cj.jdbc.Driver + url: ${RDS_URL} + username: ${RDS_USERNAME} + password: ${RDS_PASSWORD} + + jpa: + show-sql: false + hibernate: + ddl-auto: update + + main: + allow-bean-definition-overriding: true + + data: + redis: + host: ${AWS_REDIS_HOST} + port: 6379 + password: ${AWS_REDIS_PASSWORD} + + websocket: + stomp: + stats-log-period: 30000 + +management: + endpoints: + web: + exposure: + include: health, prometheus,metrics + endpoint: + health: + show-details: always + +hantu-openapi: + appkey: ${HANTU_APPKEY} + appsecret: ${HANTU_APPSECRET} + base-url: https://openapi.koreainvestment.com:9443 + +db-openapi: + db-appkey: ${DB_APPKEY} + db-appsecret: ${DB_APPSECRET} + base-url: https://openapi.dbsec.co.kr:8443 + +eos: + api-key: ${EOS_API_KEY} + diff --git a/websocket/src/main/resources/application-local.yml b/websocket/src/main/resources/application-local.yml new file mode 100644 index 00000000..1778e769 --- /dev/null +++ b/websocket/src/main/resources/application-local.yml @@ -0,0 +1,50 @@ +server: + port: 8081 + +spring: + config: + activate: + on-profile: local + + datasource: + driver-class-name: com.mysql.cj.jdbc.Driver + url: ${RDS_URL} + username: ${RDS_USERNAME} + password: ${RDS_PASSWORD} + + jpa: + show-sql: false + hibernate: + ddl-auto: update + + main: + allow-bean-definition-overriding: true + + data: + redis: + host: ${AWS_REDIS_HOST} + port: 6379 + + +management: + endpoints: + web: + exposure: + include: health, prometheus,metrics + endpoint: + health: + show-details: always + +hantu-openapi: + appkey: ${HANTU_APPKEY} + appsecret: ${HANTU_APPSECRET} + base-url: https://openapi.koreainvestment.com:9443 + +db-openapi: + db-appkey: ${DB_APPKEY} + db-appsecret: ${DB_APPSECRET} + base-url: https://openapi.dbsec.co.kr:8443 + +eos: + api-key: ${EOS_API_KEY} + diff --git a/websocket/src/main/resources/application.yml b/websocket/src/main/resources/application.yml new file mode 100644 index 00000000..5558aa3a --- /dev/null +++ b/websocket/src/main/resources/application.yml @@ -0,0 +1,21 @@ +spring: + config: + import: + - optional:file:.env + profiles: + active: local + jpa: + show-sql: true + properties: + hibernate.jdbc.time_zone: Asia/Seoul + + messages: + basename: messages + encoding: UTF-8 + + + + + + +