Skip to content

Commit

Permalink
dev: agent status update, emitter completion
Browse files Browse the repository at this point in the history
  • Loading branch information
ghkdqhrbals committed Mar 29, 2024
1 parent 045c181 commit 9e79744
Show file tree
Hide file tree
Showing 23 changed files with 112 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import jakarta.servlet.http.HttpServletRequest;
import java.util.Map;
import java.util.Set;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.benchmarker.bmagent.AgentInfo;
Expand Down Expand Up @@ -42,15 +43,16 @@ public class AgentApiController {
* @param action String
* @return SseEmitter
*/
@PostMapping("/templates/{template_id}")
@PostMapping("/groups/{group_id}/templates/{template_id}")
public SseEmitter manageSSE(@PathVariable("template_id") Long templateId,
@PathVariable("group_id") String groupId,
@RequestParam("action") String action, @RequestBody TemplateInfo templateInfo) {
log.info(templateInfo.toString());

if (action.equals("start")) {
agentStatusManager.getAndUpdateStatusIfReady(
AgentStatus.TESTING).orElseThrow(() -> new RuntimeException("agent is not ready"));
return sseManageService.start(templateId, templateInfo);
return sseManageService.start(templateId, groupId, templateInfo);
} else {
sseManageService.stop(templateId);
return null;
Expand All @@ -73,10 +75,12 @@ public AgentInfo getStatus() {
String scheme = request.getScheme(); // http or https
String serverName = request.getServerName();
int serverPort = request.getServerPort();
Set<Long> longs = scheduledTaskService.getStatus().keySet();

String agentServerUrl = scheme + "://" + serverName + ":" + serverPort;

return AgentInfo.builder()
.templateId(longs)
.cpuUsage(agentStatusManager.getCpuUsage())
.memoryUsage(agentStatusManager.getMemoryUsage())
.startedAt(agentStatusManager.getStartedAt())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@
import java.util.stream.IntStream;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import org.benchmarker.bmagent.AgentStatus;
import org.benchmarker.bmagent.service.IScheduledTaskService;
import org.benchmarker.bmagent.status.AgentStatusManager;

import org.benchmarker.bmagent.util.WebClientSupport;
import org.benchmarker.bmcommon.dto.TemplateInfo;
import org.springframework.web.reactive.function.client.WebClient.RequestHeadersSpec;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import reactor.core.publisher.Mono;

/**
Expand Down Expand Up @@ -73,7 +72,7 @@ public HttpSender(ResultManagerService resultManagerService,
*
* @param templateInfo {@link TemplateInfo}
*/
public void sendRequests(TemplateInfo templateInfo) throws MalformedURLException {
public void sendRequests(SseEmitter sseEmitter, TemplateInfo templateInfo) throws MalformedURLException {

URL url = new URL(templateInfo.getUrl());
RequestHeadersSpec<?> req = WebClientSupport.create(templateInfo.getMethod(),
Expand Down Expand Up @@ -106,7 +105,7 @@ public void sendRequests(TemplateInfo templateInfo) throws MalformedURLException
// 만약 running 이 아니거나 시간이 끝났다면,
if (!isRunning || System.currentTimeMillis() > endTime) {
agentStatusManager.updateAgentStatus(AgentStatus.READY);
return;
break;
}
long requestStartTime = System.currentTimeMillis(); // 요청 시작 시간 기록
req.exchangeToMono(resp -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public interface ISseManageService extends SseManageConsts {
* @param templateInfo TemplateInfo
* @return SseEmitter
*/
SseEmitter start(Long id, TemplateInfo templateInfo);
SseEmitter start(Long id, String groupId, TemplateInfo templateInfo);

/**
* Stop the SSE emitter for the given id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ public class SseManageService extends AbstractSseManageService {
* @see ScheduledTaskService
*/
@Override
public SseEmitter start(Long id, TemplateInfo templateInfo) {
public SseEmitter start(Long id, String groupId, TemplateInfo templateInfo) {
SseEmitter emitter = new SseEmitter(SSE_TIMEOUT);

LocalDateTime startAt = LocalDateTime.now();
// when the client disconnects, complete the SseEmitter
alwaysDoStop(id, emitter);

Expand All @@ -70,11 +70,10 @@ public SseEmitter start(Long id, TemplateInfo templateInfo) {

// 1초마다 TestResult 를 보내는 스케줄러 시작
scheduledTaskService.start(id, () -> {
LocalDateTime cur = LocalDateTime.now();
LocalDateTime c = LocalDateTime.now();
Map<Double, Double> tpsP = htps.calculateTpsPercentile(percentiles);
Map<Double, Double> mttfbP = htps.calculateMttfbPercentile(percentiles);
CommonTestResult data = getCommonTestResult(templateInfo, htps, now, cur, tpsP, mttfbP);
log.info(data.toString());
CommonTestResult data = getCommonTestResult(groupId,templateInfo, htps, now, c, tpsP, mttfbP);
resultManagerService.save(id, data);
send(id, resultManagerService.find(id));
}, 0, 1, TimeUnit.SECONDS);
Expand All @@ -83,7 +82,15 @@ public SseEmitter start(Long id, TemplateInfo templateInfo) {
// async + non-blocking 필수
CompletableFuture.runAsync(() -> {
try {
htps.sendRequests(templateInfo);
htps.sendRequests(emitter, templateInfo);
LocalDateTime finished = LocalDateTime.now();
Map<Double, Double> tpsP = htps.calculateTpsPercentile(percentiles);
Map<Double, Double> mttfbP = htps.calculateMttfbPercentile(percentiles);
CommonTestResult data = getCommonTestResult(groupId,templateInfo, htps, now, finished, tpsP, mttfbP);
data.setFinishedAt(finished.toString());
data.setTestStatus(AgentStatus.TESTING_FINISH);
send(id, data);
emitter.complete();
} catch (MalformedURLException e) {
log.error(e.getMessage());
}
Expand All @@ -98,17 +105,18 @@ public SseEmitter start(Long id, TemplateInfo templateInfo) {
*
* @param templateInfo
* @param htps
* @param now
* @param start
* @param cur
* @param tpsP
* @param mttfbP
* @return CommonTestResult
*/
private CommonTestResult getCommonTestResult(TemplateInfo templateInfo, HttpSender htps,
LocalDateTime now, LocalDateTime cur, Map<Double, Double> tpsP,
private CommonTestResult getCommonTestResult(String groupId,TemplateInfo templateInfo, HttpSender htps,
LocalDateTime start, LocalDateTime cur, Map<Double, Double> tpsP,
Map<Double, Double> mttfbP) {
return CommonTestResult.builder()
.startedAt(now.toString())
.groupId(groupId)
.startedAt(start.toString())
.totalRequests(htps.getTotalRequests().get())
.totalSuccess(htps.getTotalSuccess().get())
.totalErrors(htps.getTotalErrors().get())
Expand All @@ -117,14 +125,14 @@ private CommonTestResult getCommonTestResult(TemplateInfo templateInfo, HttpSend
.url(templateInfo.getUrl())
.method(templateInfo.getMethod())
.totalUsers(templateInfo.getVuser())
.totalDuration(Duration.between(now, cur).toString())
.totalDuration(Duration.between(start, cur).toString())
.MTTFBPercentiles(mttfbP)
.TPSPercentiles(tpsP)
.testStatus(agentStatusManager.getStatus().get())
.finishedAt(cur.toString())
// TODO temp
.mttfbAverage("0")
.tpsAverage(0)
.finishedAt("-")
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,11 @@ public void testStartSSE() throws IOException {
mockSseEmitter.send("Data 2");
mockSseEmitter.complete();
return null;
}).when(sseManageService).start(eq(1L), any());
}).when(sseManageService).start(eq(1L),any(), any());

// 호출
TemplateInfo build = TemplateInfo.builder().build();
agentApiController.manageSSE(1L, "start", build);
agentApiController.manageSSE(1L, "groupId","start", build);

// then
// SseEmitter 로 전송된 메시지 모두 캡처
Expand All @@ -100,7 +100,7 @@ public void testStopSSE() throws IOException {

// when
TemplateInfo build = TemplateInfo.builder().build();
agentApiController.manageSSE(templateId, "stop", build);
agentApiController.manageSSE(templateId, "groupId","stop", build);

// then
// sseManageService.stop() 메서드가 호출되었는지 검증
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

/**
* Schel
Expand Down Expand Up @@ -53,7 +54,7 @@ void test2() throws MalformedURLException {

assertThrows((MalformedURLException.class), () -> {
// when
httpSender.sendRequests(get);
httpSender.sendRequests(new SseEmitter(),get);
httpSender.cancelRequests();
});

Expand Down Expand Up @@ -82,7 +83,7 @@ void test() throws MalformedURLException {
.build();

// when
httpSender.sendRequests(get);
httpSender.sendRequests(new SseEmitter(),get);
scheduledTaskService.shutdown(1L);

// then
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ void start_ShouldStartSseEmitterAndScheduledTask() throws InterruptedException {
resultManagerService.save(id, resultStub);

// when
SseEmitter result = sseManageService.start(id, new TemplateInfo());
SseEmitter result = sseManageService.start(id, "groupId", new TemplateInfo());

// then
assertThat(result).isNotNull();
Expand All @@ -64,10 +64,10 @@ void startAndShutdown() throws InterruptedException {
Long id = 1L;
CommonTestResult resultStub = RandomUtils.generateRandomTestResult();
resultManagerService.save(id, resultStub);
SseEmitter result = sseManageService.start(id, new TemplateInfo());
SseEmitter result = sseManageService.start(id, "groupId", new TemplateInfo());

// when
SseEmitter res = sseManageService.start(id, new TemplateInfo());
SseEmitter res = sseManageService.start(id, "groupId", new TemplateInfo());

// then
assertThat(res).isNull();
Expand All @@ -81,7 +81,7 @@ void stop_ShouldDoNothingIfEmitterAlreadyStopped() throws InterruptedException {
Long id = 1L;
CommonTestResult resultStub = RandomUtils.generateRandomTestResult();
resultManagerService.save(id, resultStub);
sseManageService.start(id, new TemplateInfo());
sseManageService.start(id, "groupId", new TemplateInfo());
sseManageService.stop(id);

// when
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.benchmarker.bmagent;

import java.time.ZonedDateTime;
import java.util.Set;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
Expand All @@ -16,6 +17,7 @@
@AllArgsConstructor
public class AgentInfo {
private AgentStatus status;
private Set<Long> templateId;
private double cpuUsage;
private double memoryUsage;
private String serverUrl;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ public class CommonTestResult {

@JsonProperty("test_id")
private int testId;
@JsonProperty("group_id")
private String groupId;
@JsonProperty("started_at")
private String startedAt;
@JsonProperty("finished_at")
Expand Down
2 changes: 2 additions & 0 deletions bm-controller/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ testlogger {
def excludeJacocoTestCoverageReport = [
'org/benchmarker/bmcontroller/home/**',
'org/benchmarker/bmcontroller/template/**',
'org/benchmarker/bmcontroller/prerun/**',
'org/benchmarker/bmcontroller/preftest/**',
'org/benchmarker/bmcontroller/common/beans/**',
'org/benchmarker/bmcontroller/user/controller/UserController.class',
'org/benchmarker/BmControllerApplication.class',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import org.benchmarker.bmcontroller.common.controller.annotation.GlobalControllerModel;
import org.benchmarker.bmcontroller.common.error.ErrorCode;
import org.benchmarker.bmcontroller.common.error.GlobalException;

import org.benchmarker.bmcontroller.preftest.service.PerftestService;
import org.benchmarker.bmcontroller.template.service.ITestTemplateService;
import org.benchmarker.bmcontroller.user.service.UserContext;
Expand Down Expand Up @@ -76,11 +75,14 @@ public ResponseEntity send(@PathVariable("group_id") String groupId,

TemplateInfo templateInfo = testTemplateService.getTemplateInfo(userId, templateId);


Flux<ServerSentEvent<CommonTestResult>> eventStream = perftestService.executePerformanceTest(
templateId, action, webClient, templateInfo);
templateId, groupId, action, webClient, templateInfo);
perftestService.saveRunning(groupId, templateId);

eventStream
.doOnComplete(() -> {
perftestService.removeRunning(groupId,templateId);
// TODO : CommonTestResult 저장 logic 구현 필요
// 코드 한줄
if (action.equals("stop")) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package org.benchmarker.bmcontroller.preftest.service;

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.benchmarker.bmcommon.dto.CommonTestResult;
import org.benchmarker.bmcommon.dto.TemplateInfo;
Expand All @@ -10,9 +14,35 @@
import reactor.core.publisher.Flux;

@Service
@Getter
@Slf4j
public class PerftestService {

private ConcurrentHashMap<String, Set<Integer>> runningTemplates = new ConcurrentHashMap<>();

public void saveRunning(String groupId, Integer templateId) {
Set<Integer> templates = runningTemplates.get(groupId);
if (templates != null) {
templates.add(templateId);
} else {
HashSet<Integer> temp = new HashSet<Integer>();
temp.add(templateId);
runningTemplates.put(groupId, temp);
}
log.info(runningTemplates.toString());
}

public void removeRunning(String groupId, Integer templateId) {
Set<Integer> templates = runningTemplates.get(groupId);
if (templates != null) {
templates.remove(templateId);
if (templates.size()==0){
runningTemplates.remove(groupId);
}
}
;
}

/**
* Execute a performance test request to the bm-agent API and receive intermediate results via
* Server-Sent Events (SSE).
Expand All @@ -24,12 +54,14 @@ public class PerftestService {
* @return Flux {@link ServerSentEvent} {@link CommonTestResult}
*/
public Flux<ServerSentEvent<CommonTestResult>> executePerformanceTest(Integer templateId,
String groupId,
String action, WebClient webClient, TemplateInfo templateInfo) {
ParameterizedTypeReference<ServerSentEvent<CommonTestResult>> typeReference =
new ParameterizedTypeReference<ServerSentEvent<CommonTestResult>>() {
};
return webClient.post()
.uri("/api/templates/{templateId}?action={action}", templateId, action)
.uri("/api/groups/{groupId}/templates/{templateId}?action={action}", groupId,
templateId, action)
.bodyValue(templateInfo)
.retrieve()
.bodyToFlux(typeReference)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public class DataLoader implements CommandLineRunner {
private final PasswordEncoder passwordEncoder;
private final ScheduledTaskService scheduledTaskService;
private final AgentServerManager agentServerManager;

@Value("${admin.id}")
private String adminId;
@Value("${admin.password}")
Expand Down Expand Up @@ -89,7 +90,6 @@ public void run(String... args) throws Exception {

// remove & add agent in every seconds
scheduledTaskService.start(-100L, () -> {
log.info(agentServerManager.getAgentsUrl().values().toString());
// agent health check
Iterator<Entry<String, AgentInfo>> iterator = agentServerManager.getAgentsUrl()
.entrySet().iterator();
Expand Down Expand Up @@ -135,7 +135,7 @@ public void run(String... args) throws Exception {
}
messagingTemplate.convertAndSend("/topic/server",
agentServerManager.getAgentsUrl().values());
}, 0, 2, TimeUnit.SECONDS);
}, 0, 500, TimeUnit.MILLISECONDS);


}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,5 @@ public void startChild(Long id, String schedulerName, Runnable runnable, long de
schedulerChild.put(id, Map.of(schedulerName, scheduler));
scheduler.scheduleAtFixedRate(runnable, delay, period, timeUnit);
}

}
Loading

0 comments on commit 9e79744

Please sign in to comment.