-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathLogService.java
81 lines (70 loc) · 2.74 KB
/
LogService.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package com.uet.microservices.services.log;
import com.uet.microservices.lib.model.NodeType;
import com.uet.microservices.lib.protocol.RpcBasicOperation;
import com.uet.microservices.lib.service.AbstractClusterService;
import com.uet.microservices.services.ServiceType;
import io.activej.eventloop.Eventloop;
import io.activej.promise.Promise;
import io.activej.reactor.schedule.ScheduledRunnable;
import io.activej.rpc.server.RpcRequestHandler;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
public class LogService extends AbstractClusterService {
protected LogService(
Eventloop eventloop,
InetSocketAddress discoveryAddr,
String serviceName,
NodeType nodeType,
List<NodeType> seedTypes
) {
super(eventloop, discoveryAddr, serviceName, nodeType, seedTypes);
}
public static LogService create(Eventloop eventloop, InetSocketAddress discoveryAddr) {
return new LogService(
eventloop,
discoveryAddr,
"log-service",
ServiceType.LOG,
List.of()
);
}
@Override
protected Map<Class, RpcRequestHandler> makeRpcRequestHandlers() {
RpcRequestHandler<String, RpcBasicOperation> messageHandler =
msg -> {
logger.info(">> Received message: {}", msg);
var task = LogTask.create(msg);
var sender = this.seedNodeManager.getSender(ServiceType.LOG);
return sender.sendRequest(task)
.map($ -> RpcBasicOperation.ACCEPT);
};
RpcRequestHandler<LogTask, RpcBasicOperation> taskHandler =
task -> {
switch (task.taskType) {
case INFO -> logger.info(">> Received a task: {}", task.message);
case WARN -> logger.warn(">> Received a task: {}", task.message);
case ERROR -> logger.error(">> Received a task: {}", task.message);
}
return Promise.of(RpcBasicOperation.ACCEPT);
};
return Map.of(
String.class, messageHandler,
LogTask.class, taskHandler
);
}
@Override
protected Map<NodeType, List<Class<?>>> getConnectionClassTypes() {
return Map.of(
ServiceType.LOG, List.of(String.class, RpcBasicOperation.class, LogTask.class)
);
}
public static void main(String[] args) throws IOException {
var eventloop = Eventloop.create();
var discoveryAddr = new InetSocketAddress("localhost", 9000);
var logService = LogService.create(eventloop, discoveryAddr);
logService.startService();
eventloop.run();
}
}