Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import jakarta.servlet.Filter;
import jakarta.servlet.FilterChain;
import jakarta.servlet.ServletException;
import jakarta.servlet.ServletOutputStream;
import jakarta.servlet.ServletRequest;
import jakarta.servlet.ServletResponse;
import jakarta.servlet.http.HttpServletRequest;
Expand All @@ -37,10 +38,18 @@ public class HttpLoggingFilter implements Filter {
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
throws IOException, ServletException {

final HttpServletRequest httpServletRequest = (HttpServletRequest)request;
final HttpServletResponse httpServletResponse = (HttpServletResponse)response;

if (isAcceptTextEventStream(httpServletRequest)) {
chain.doFilter(request, response);
return;
}

HttpRequestBodyCachedWrapper requestWrapper =
new HttpRequestBodyCachedWrapper((HttpServletRequest)request);
new HttpRequestBodyCachedWrapper(httpServletRequest);
HttpResponseBodyCachedWrapper responseWrapper =
new HttpResponseBodyCachedWrapper((HttpServletResponse)response);
new HttpResponseBodyCachedWrapper(httpServletResponse);

logRequest(requestWrapper);

Expand All @@ -50,6 +59,10 @@ public void doFilter(ServletRequest request, ServletResponse response, FilterCha
writeResponseBody(responseWrapper, response);
}

private boolean isAcceptTextEventStream(final HttpServletRequest httpServletRequest) {
return MediaType.TEXT_EVENT_STREAM_VALUE.equalsIgnoreCase(httpServletRequest.getHeader(HttpHeaders.ACCEPT));
}

private void logRequest(HttpRequestBodyCachedWrapper request) throws IOException {
String url = request.getRequestURI();
String method = request.getMethod();
Expand All @@ -76,7 +89,7 @@ private String getRequestHeadersAsString(HttpServletRequest request) {
return headers.toString().trim();
}

private String getRequestBodyAsString(final HttpRequestBodyCachedWrapper request) throws IOException {
private String getRequestBodyAsString(final HttpRequestBodyCachedWrapper request) {
return new String(request.getCachedBody(), StandardCharsets.UTF_8);
}

Expand Down Expand Up @@ -144,6 +157,14 @@ private String getResponseHeadersAsString(Map<String, String> responseHeadersMap
private void writeResponseBody(HttpResponseBodyCachedWrapper cachedResponse, ServletResponse response)
throws IOException {
final byte[] responseBody = cachedResponse.getCachedBody();
response.getOutputStream().write(responseBody);

try (final ServletOutputStream outputStream = response.getOutputStream()) {
outputStream.write(responseBody);
} catch (IOException e) {
log.error("Error writing response body", e);
throw e;
} finally {
cachedResponse.setContentLength(responseBody.length);
}
}
}
103 changes: 103 additions & 0 deletions src/main/java/kr/mywork/common/sse/SseEmitters.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package kr.mywork.common.sse;

import java.io.IOException;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import kr.mywork.domain.notification.errors.NotificationEmitterNotFoundException;
import kr.mywork.domain.notification.errors.NotificationErrorType;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@Component
public class SseEmitters {

// key : memberId
private final Map<UUID, SseEmitter> sseEmitters = new ConcurrentHashMap<>();

public SseEmitter add(UUID clientId) {
final SseEmitter sseEmitter = new SseEmitter(600 * 1000L);

if (sseEmitters.containsKey(clientId)) {
return sseEmitters.get(clientId);
} else {
sseEmitters.put(clientId, sseEmitter);
}

sseEmitter.onCompletion(() -> {
log.info("onCompletion sseEmitter : {}", sseEmitter);
sseEmitters.remove(clientId);
});
sseEmitter.onTimeout(() -> {
log.info("onTimeout sseEmitter : {}", sseEmitter);
sseEmitters.remove(clientId);
});
sseEmitter.onError(e -> {
log.info("onError sseEmitter : {}, error: {}", sseEmitter, e.getMessage());
this.sseEmitters.remove(clientId);
});

log.info("added sseEmitter id : {}, sseEmitters size: {}", clientId, sseEmitters.size());
return sseEmitter;
}

public SseEmitter remove(UUID memberId) {
if (!sseEmitters.containsKey(memberId)) {
throw new NotificationEmitterNotFoundException(NotificationErrorType.EMITTER_NOT_FOUND);
}

log.debug("removing sseEmitter for memberId: {}", memberId);

final SseEmitter removedEmitter = sseEmitters.remove(memberId);
removedEmitter.complete();

return removedEmitter;
}

public <T> boolean send(UUID clientId, String eventName, T data) {
if (!sseEmitters.containsKey(clientId)) {
return false;
}

SseEmitter emitter = sseEmitters.get(clientId);

try {
emitter.send(
SseEmitter.event()
.name(eventName)
.data(data)
.build());

return true;
} catch (Exception exception) {
log.info("send sseEmitter error : {}", exception.getMessage());
sseEmitters.remove(clientId);
emitter.completeWithError(exception);
}

return false;
}

@Scheduled(fixedDelay = 30 * 1000L)
public void ping() {
for (Map.Entry<UUID, SseEmitter> emitterEntry : sseEmitters.entrySet()) {
final SseEmitter emitter = emitterEntry.getValue();
try {
emitter.send(SseEmitter.event().name("ping").data("ping"));
} catch (IOException e) {
log.error("Error sending ping to emitter for clientId {}: {}", emitterEntry.getKey(), e.getMessage());
sseEmitters.remove(emitterEntry.getKey());
emitter.completeWithError(e);
} catch (Exception e) {
log.error("Unexpected error while sending ping: {}", e.getMessage());
sseEmitters.remove(emitterEntry.getKey());
emitter.completeWithError(e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,58 +2,57 @@

import java.util.List;

import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.event.TransactionPhase;
import org.springframework.transaction.event.TransactionalEventListener;

import kr.mywork.domain.activityLog.listener.eventObject.CreateEventObject;
import kr.mywork.domain.activityLog.listener.eventObject.DeleteEventObject;
import kr.mywork.domain.activityLog.listener.eventObject.ModifyEventObject;
import kr.mywork.domain.activityLog.listener.eventObject.ActivityLogCreateEvent;
import kr.mywork.domain.activityLog.listener.eventObject.ActivityLogDeleteEvent;
import kr.mywork.domain.activityLog.listener.eventObject.ActivityModifyEvent;
import kr.mywork.domain.activityLog.model.ActivityLog;
import kr.mywork.domain.activityLog.model.LogDetail;
import kr.mywork.domain.activityLog.repository.ActivityLogRepository;
import kr.mywork.domain.activityLog.service.ActivityLogService;
import kr.mywork.domain.activityLog.service.LogDetailService;
import kr.mywork.domain.company.repository.CompanyRepository;
import lombok.RequiredArgsConstructor;

@Component
@RequiredArgsConstructor
public class ActivityLogListener {
public class ActivityLogTxListener {

private final ActivityLogRepository activityLogRepository;
private final CompanyRepository companyRepository;
private final LogDetailService logDetailService;
private final ActivityLogService activityLogService;

@Async(value = "eventTaskExecutor")
@Transactional(propagation = Propagation.REQUIRES_NEW)
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void handleProjectCreated(CreateEventObject event) {
public void handleProjectCreated(final ActivityLogCreateEvent event) {

ActivityLog activityLog = activityLogService.makeCreatedActivityLog(event);
activityLogRepository.save(activityLog);

activityLogService.save(activityLog);
}

@Async(value = "eventTaskExecutor")
@Transactional(propagation = Propagation.REQUIRES_NEW)
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void handleProjectModified(ModifyEventObject event) {
public void handleProjectModified(final ActivityModifyEvent event) {

ActivityLog activityLog = activityLogService.makeModifiedActivityLog(event);
ActivityLog savedLog = activityLogRepository.save(activityLog);
ActivityLog savedLog = activityLogService.save(activityLog);

List<LogDetail> diffValue = logDetailService.makeLogDetails(event, savedLog.getId());

logDetailService.saveAll(diffValue);
}

@Async(value = "eventTaskExecutor")
@Transactional(propagation = Propagation.REQUIRES_NEW)
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void handleProjectDeleted(DeleteEventObject event) {
public void handleProjectDeleted(final ActivityLogDeleteEvent event) {

ActivityLog activityLog = activityLogService.makeDeletedActivityLog(event);
activityLogRepository.save(activityLog);
activityLogService.save(activityLog);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package kr.mywork.domain.activityLog.listener.eventObject;

import kr.mywork.common.auth.components.dto.LoginMemberDetail;

public record ActivityLogCreateEvent(Object created, LoginMemberDetail loginMemberDetail) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package kr.mywork.domain.activityLog.listener.eventObject;

import kr.mywork.common.auth.components.dto.LoginMemberDetail;

public record ActivityLogDeleteEvent(Object deleted, LoginMemberDetail loginMemberDetail) {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package kr.mywork.domain.activityLog.listener.eventObject;

import kr.mywork.common.auth.components.dto.LoginMemberDetail;

public record ActivityModifyEvent(Object before, Object after, LoginMemberDetail loginMemberDetail) {

}

This file was deleted.

This file was deleted.

This file was deleted.

Loading
Loading