From 64276d177bf82cd1d631be9f61b7ea887eec2d89 Mon Sep 17 00:00:00 2001 From: Artem Labazin Date: Sat, 17 Feb 2018 01:08:51 +0300 Subject: [PATCH] Introducing NIO server Introducing NIO server. Added: - Netty server instead of simple IO-based. Changed: - The way of serialization/deserialization of EPMD messages. --- .codestyle/checkstyle.xml | 2 +- CHANGELOG.md | 12 ++ client/README.md | 4 +- client/pom.xml | 2 +- core/pom.xml | 2 +- .../mapper/deserializer/DataDeserializer.java | 21 +++- .../mapper/deserializer/EnumDeserializer.java | 6 +- .../deserializer/MessageDeserializer.java | 50 ++------ .../deserializer/RequestDeserializer.java | 80 +++++++++++++ .../mapper/serializer/DataSerializer.java | 16 ++- .../mapper/serializer/EnumSerializer.java | 8 +- .../mapper/serializer/MessageSerializer.java | 36 +----- .../mapper/serializer/RequestSerializer.java | 52 +++++++++ .../Message.java => model/TaggedMessage.java} | 23 +--- .../java/core/model/request/GetEpmdDump.java | 11 +- .../java/core/model/request/GetEpmdInfo.java | 11 +- .../java/core/model/request/GetNodeInfo.java | 11 +- .../epmd/java/core/model/request/Kill.java | 11 +- .../java/core/model/request/Registration.java | 11 +- .../epmd/java/core/model/request/Request.java | 28 +++++ .../epmd/java/core/model/request/Stop.java | 11 +- .../java/core/model/response/EpmdDump.java | 2 - .../java/core/model/response/EpmdInfo.java | 2 - .../java/core/model/response/KillResult.java | 3 - .../java/core/model/response/NodeInfo.java | 12 +- .../model/response/RegistrationResult.java | 12 +- .../java/core/model/response/StopResult.java | 3 - pom.xml | 6 +- server/README.md | 4 +- server/pom.xml | 10 +- .../epmd/java/server/command/server/Node.java | 7 -- .../java/server/command/server/Request.java | 71 ----------- .../command/server/ServerCommandExecutor.java | 97 +++++++-------- .../server/{Context.java => ServerState.java} | 7 +- .../server/command/server/ServerWorker.java | 91 --------------- .../server/handler/KillRequestHandler.java | 45 ------- .../server/handler/RequestDecoder.java | 107 +++++++++++++++++ .../server/handler/ResponseEncoder.java | 56 +++++++++ .../command/server/handler/ServerHandler.java | 74 ++++++++++++ .../GetEpmdDumpRequestHandler.java | 30 +++-- .../GetEpmdInfoRequestHandler.java | 30 +++-- .../GetNodeInfoRequestHandler.java | 33 ++++-- .../handler/command/KillRequestHandler.java | 64 ++++++++++ .../RegistrationRequestHandler.java | 44 +++++-- .../handler/{ => command}/RequestHandler.java | 18 ++- .../{ => command}/StopRequestHandler.java | 39 +++++-- .../command/stop/StopCommandExecutor.java | 2 + .../command/server/RequestTestUtil.java | 90 -------------- .../server/ServerCommandExecutorTest.java | 23 +++- .../GetEpmdDumpRequestHandlerTest.java | 99 ---------------- .../GetEpmdInfoRequestHandlerTest.java | 102 ---------------- .../GetNodeInfoRequestHandlerTest.java | 102 ---------------- .../handler/KillRequestHandlerTest.java | 65 ----------- .../RegistrationRequestHandlerTest.java | 110 ------------------ .../handler/StopRequestHandlerTest.java | 103 ---------------- .../command/server/util/TestNamePrinter.java | 33 ++++++ 56 files changed, 866 insertions(+), 1138 deletions(-) create mode 100644 core/src/main/java/io/appulse/epmd/java/core/mapper/deserializer/RequestDeserializer.java create mode 100644 core/src/main/java/io/appulse/epmd/java/core/mapper/serializer/RequestSerializer.java rename core/src/main/java/io/appulse/epmd/java/core/{mapper/Message.java => model/TaggedMessage.java} (55%) create mode 100644 core/src/main/java/io/appulse/epmd/java/core/model/request/Request.java delete mode 100644 server/src/main/java/io/appulse/epmd/java/server/command/server/Request.java rename server/src/main/java/io/appulse/epmd/java/server/command/server/{Context.java => ServerState.java} (92%) delete mode 100644 server/src/main/java/io/appulse/epmd/java/server/command/server/ServerWorker.java delete mode 100644 server/src/main/java/io/appulse/epmd/java/server/command/server/handler/KillRequestHandler.java create mode 100644 server/src/main/java/io/appulse/epmd/java/server/command/server/handler/RequestDecoder.java create mode 100644 server/src/main/java/io/appulse/epmd/java/server/command/server/handler/ResponseEncoder.java create mode 100644 server/src/main/java/io/appulse/epmd/java/server/command/server/handler/ServerHandler.java rename server/src/main/java/io/appulse/epmd/java/server/command/server/handler/{ => command}/GetEpmdDumpRequestHandler.java (64%) rename server/src/main/java/io/appulse/epmd/java/server/command/server/handler/{ => command}/GetEpmdInfoRequestHandler.java (64%) rename server/src/main/java/io/appulse/epmd/java/server/command/server/handler/{ => command}/GetNodeInfoRequestHandler.java (61%) create mode 100644 server/src/main/java/io/appulse/epmd/java/server/command/server/handler/command/KillRequestHandler.java rename server/src/main/java/io/appulse/epmd/java/server/command/server/handler/{ => command}/RegistrationRequestHandler.java (62%) rename server/src/main/java/io/appulse/epmd/java/server/command/server/handler/{ => command}/RequestHandler.java (67%) rename server/src/main/java/io/appulse/epmd/java/server/command/server/handler/{ => command}/StopRequestHandler.java (53%) delete mode 100644 server/src/test/java/io/appulse/epmd/java/server/command/server/RequestTestUtil.java delete mode 100644 server/src/test/java/io/appulse/epmd/java/server/command/server/handler/GetEpmdDumpRequestHandlerTest.java delete mode 100644 server/src/test/java/io/appulse/epmd/java/server/command/server/handler/GetEpmdInfoRequestHandlerTest.java delete mode 100644 server/src/test/java/io/appulse/epmd/java/server/command/server/handler/GetNodeInfoRequestHandlerTest.java delete mode 100644 server/src/test/java/io/appulse/epmd/java/server/command/server/handler/KillRequestHandlerTest.java delete mode 100644 server/src/test/java/io/appulse/epmd/java/server/command/server/handler/RegistrationRequestHandlerTest.java delete mode 100644 server/src/test/java/io/appulse/epmd/java/server/command/server/handler/StopRequestHandlerTest.java create mode 100644 server/src/test/java/io/appulse/epmd/java/server/command/server/util/TestNamePrinter.java diff --git a/.codestyle/checkstyle.xml b/.codestyle/checkstyle.xml index e7ea362..cccd806 100644 --- a/.codestyle/checkstyle.xml +++ b/.codestyle/checkstyle.xml @@ -191,7 +191,7 @@ limitations under the License. - + diff --git a/CHANGELOG.md b/CHANGELOG.md index 2ffad36..497963f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,18 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. - Add more unit and integration tests. +## [0.4.0](https://github.com/appulse-projects/epmd-java/releases/tag/0.4.0) - 2018-02-16 + +Introducing NIO server. + +### Added + +- Netty server instead of simple IO-based. + +### Changed + +- The way of serialization/deserialization of EPMD messages. + ## [0.3.3](https://github.com/appulse-projects/epmd-java/releases/tag/0.3.3) - 2018-02-09 Minor fix, using unsigned numbers where they are needed diff --git a/client/README.md b/client/README.md index 98903ca..9b4d874 100644 --- a/client/README.md +++ b/client/README.md @@ -14,7 +14,7 @@ Include the dependency to your project's pom.xml file: io.appulse.epmd.java client - 0.3.3 + 0.4.0 ... @@ -23,7 +23,7 @@ Include the dependency to your project's pom.xml file: or Gradle: ```groovy -compile 'io.appulse.epmd.java:client:0.3.3' +compile 'io.appulse.epmd.java:client:0.4.0' ``` ### Create client diff --git a/client/pom.xml b/client/pom.xml index 1e86ac2..276562b 100644 --- a/client/pom.xml +++ b/client/pom.xml @@ -25,7 +25,7 @@ limitations under the License. io.appulse epmd-java - 0.3.3 + 0.4.0 io.appulse.epmd.java diff --git a/core/pom.xml b/core/pom.xml index 4eedd00..6b29537 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -25,7 +25,7 @@ limitations under the License. io.appulse epmd-java - 0.3.3 + 0.4.0 io.appulse.epmd.java diff --git a/core/src/main/java/io/appulse/epmd/java/core/mapper/deserializer/DataDeserializer.java b/core/src/main/java/io/appulse/epmd/java/core/mapper/deserializer/DataDeserializer.java index 84dc165..40c79a3 100644 --- a/core/src/main/java/io/appulse/epmd/java/core/mapper/deserializer/DataDeserializer.java +++ b/core/src/main/java/io/appulse/epmd/java/core/mapper/deserializer/DataDeserializer.java @@ -18,8 +18,14 @@ import io.appulse.epmd.java.core.mapper.DataSerializable; import io.appulse.epmd.java.core.mapper.deserializer.exception.DeserializationException; +import io.appulse.epmd.java.core.mapper.deserializer.exception.InvalidReceivedMessageTagException; +import io.appulse.epmd.java.core.model.Tag; +import io.appulse.epmd.java.core.model.TaggedMessage; import io.appulse.utils.Bytes; +import lombok.NonNull; +import lombok.val; + /** * * @author Artem Labazin @@ -28,19 +34,30 @@ class DataDeserializer implements Deserializer { @Override - public T deserialize (Bytes bytes, Class type) throws DeserializationException { + public T deserialize (@NonNull Bytes bytes, @NonNull Class type) throws DeserializationException { T result; try { result = type.newInstance(); } catch (IllegalAccessException | InstantiationException ex) { throw new DeserializationException(ex); } + + if (result instanceof TaggedMessage) { + val expectedTag = ((TaggedMessage) result).getTag(); + val tag = Tag.of(bytes.getByte()); + if (!expectedTag.equals(tag)) { + val message = String.format("Expected tag is: %s, but actual tag is: %s", + expectedTag, tag); + throw new InvalidReceivedMessageTagException(message); + } + } + ((DataSerializable) result).read(bytes); return result; } @Override - public boolean isApplicable (Class type) { + public boolean isApplicable (@NonNull Class type) { return DataSerializable.class.isAssignableFrom(type); } } diff --git a/core/src/main/java/io/appulse/epmd/java/core/mapper/deserializer/EnumDeserializer.java b/core/src/main/java/io/appulse/epmd/java/core/mapper/deserializer/EnumDeserializer.java index f1845f2..7c2b512 100644 --- a/core/src/main/java/io/appulse/epmd/java/core/mapper/deserializer/EnumDeserializer.java +++ b/core/src/main/java/io/appulse/epmd/java/core/mapper/deserializer/EnumDeserializer.java @@ -18,13 +18,13 @@ import static java.nio.charset.StandardCharsets.ISO_8859_1; import static java.util.Arrays.asList; +import static java.util.Collections.synchronizedSet; import static java.util.Optional.empty; import static java.util.Optional.ofNullable; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.lang.reflect.Modifier; -import java.util.Collections; import java.util.HashSet; import java.util.Optional; import java.util.Set; @@ -46,12 +46,12 @@ class EnumDeserializer implements Deserializer { private static final Set ENUM_UNKNOWN_VALUE; static { - ENUM_CREATE_METHODS_NAMES = Collections.synchronizedSet(new HashSet<>(asList( + ENUM_CREATE_METHODS_NAMES = synchronizedSet(new HashSet<>(asList( "of", "parse", "from" ))); - ENUM_UNKNOWN_VALUE = Collections.synchronizedSet(new HashSet<>(asList( + ENUM_UNKNOWN_VALUE = synchronizedSet(new HashSet<>(asList( "UNDEFINED", "UNKNOWN" ))); diff --git a/core/src/main/java/io/appulse/epmd/java/core/mapper/deserializer/MessageDeserializer.java b/core/src/main/java/io/appulse/epmd/java/core/mapper/deserializer/MessageDeserializer.java index 37e77bc..575edd0 100644 --- a/core/src/main/java/io/appulse/epmd/java/core/mapper/deserializer/MessageDeserializer.java +++ b/core/src/main/java/io/appulse/epmd/java/core/mapper/deserializer/MessageDeserializer.java @@ -16,23 +16,15 @@ package io.appulse.epmd.java.core.mapper.deserializer; -import static io.appulse.epmd.java.core.model.Tag.UNDEFINED; -import static io.appulse.utils.BytesUtils.asInteger; import static java.util.Arrays.asList; -import static java.util.Optional.ofNullable; import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; -import io.appulse.epmd.java.core.mapper.Message; -import io.appulse.epmd.java.core.mapper.deserializer.exception.InvalidReceivedMessageLengthException; -import io.appulse.epmd.java.core.mapper.deserializer.exception.InvalidReceivedMessageTagException; import io.appulse.epmd.java.core.mapper.deserializer.exception.NoApplicableDeserializerException; -import io.appulse.epmd.java.core.mapper.exception.MessageAnnotationMissingException; -import io.appulse.epmd.java.core.model.Tag; import io.appulse.utils.Bytes; -import lombok.SneakyThrows; -import lombok.val; +import lombok.NonNull; /** * @@ -44,44 +36,22 @@ public final class MessageDeserializer { private static final List DESERIALIZERS; static { - DESERIALIZERS = asList( + DESERIALIZERS = new CopyOnWriteArrayList<>(asList( + new RequestDeserializer(), new DataDeserializer(), new EnumDeserializer() - ); + )); } - @SneakyThrows - public T deserialize (byte[] bytes, Class type) { - val buffer = ofNullable(bytes) - .map(Bytes::wrap) - .orElseThrow(NullPointerException::new); - - val annotation = ofNullable(type) - .map(it -> it.getAnnotation(Message.class)) - .orElseThrow(MessageAnnotationMissingException::new); - - if (annotation.lengthBytes() > 0) { - val receivedMessageLength = asInteger(buffer.getBytes(annotation.lengthBytes())); - if (receivedMessageLength != buffer.remaining()) { - val message = String.format("Expected length is %d - %d bytes, but actual length is %d bytes.", - annotation.lengthBytes(), receivedMessageLength, buffer.remaining()); - throw new InvalidReceivedMessageLengthException(message); - } - } - - if (annotation.value() != UNDEFINED) { - val tag = Tag.of(buffer.getByte()); - if (annotation.value() != tag) { - val message = String.format("Expected tag is: %s, but actual tag is: %s", - annotation.value(), tag); - throw new InvalidReceivedMessageTagException(message); - } - } + public T deserialize (@NonNull byte[] bytes, @NonNull Class type) { + return deserialize(Bytes.wrap(bytes), type); + } + public T deserialize (@NonNull Bytes bytes, @NonNull Class type) { return DESERIALIZERS.stream() .filter(it -> it.isApplicable(type)) .findAny() .orElseThrow(NoApplicableDeserializerException::new) - .deserialize(buffer, type); + .deserialize(bytes, type); } } diff --git a/core/src/main/java/io/appulse/epmd/java/core/mapper/deserializer/RequestDeserializer.java b/core/src/main/java/io/appulse/epmd/java/core/mapper/deserializer/RequestDeserializer.java new file mode 100644 index 0000000..62771c7 --- /dev/null +++ b/core/src/main/java/io/appulse/epmd/java/core/mapper/deserializer/RequestDeserializer.java @@ -0,0 +1,80 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.appulse.epmd.java.core.mapper.deserializer; + +import io.appulse.epmd.java.core.mapper.deserializer.exception.DeserializationException; +import io.appulse.epmd.java.core.mapper.deserializer.exception.InvalidReceivedMessageLengthException; +import io.appulse.epmd.java.core.mapper.deserializer.exception.InvalidReceivedMessageTagException; +import io.appulse.epmd.java.core.model.Tag; +import io.appulse.epmd.java.core.model.request.Request; +import io.appulse.utils.Bytes; + +import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; +import lombok.val; + +/** + * + * @author Artem Labazin + * @since 0.4.0 + */ +@Slf4j +class RequestDeserializer implements Deserializer { + + @Override + public T deserialize (@NonNull Bytes bytes, @NonNull Class type) throws DeserializationException { + val length = bytes.getShort(); + if (length != bytes.remaining()) { + val message = String.format("Expected length is %d - %d bytes, but actual length is %d bytes.", + 2, length, bytes.remaining()); + log.error(message); + throw new InvalidReceivedMessageLengthException(message); + } + + T result; + try { + result = type.newInstance(); + } catch (IllegalAccessException | InstantiationException ex) { + log.error("Deserialized type instantiation error", ex); + throw new DeserializationException(ex); + } + + if (!(result instanceof Request)) { + val message = String.format("Deserializing type '%s' is not an instance of '%s'", + type.getSimpleName(), Request.class.getSimpleName()); + log.error(message); + throw new DeserializationException(message); + } + val request = (Request) result; + + val tag = Tag.of(bytes.getByte()); + if (tag != request.getTag()) { + val message = String.format("Expected tag is: %s, but actual tag is: %s", + request.getTag(), tag); + log.error(message); + throw new InvalidReceivedMessageTagException(message); + } + + request.read(bytes); + return result; + } + + @Override + public boolean isApplicable (@NonNull Class type) { + return Request.class.isAssignableFrom(type); + } +} diff --git a/core/src/main/java/io/appulse/epmd/java/core/mapper/serializer/DataSerializer.java b/core/src/main/java/io/appulse/epmd/java/core/mapper/serializer/DataSerializer.java index 0643717..b3627f1 100644 --- a/core/src/main/java/io/appulse/epmd/java/core/mapper/serializer/DataSerializer.java +++ b/core/src/main/java/io/appulse/epmd/java/core/mapper/serializer/DataSerializer.java @@ -18,8 +18,10 @@ import io.appulse.epmd.java.core.mapper.DataSerializable; import io.appulse.epmd.java.core.mapper.serializer.exception.SerializationException; +import io.appulse.epmd.java.core.model.TaggedMessage; import io.appulse.utils.Bytes; +import lombok.NonNull; import lombok.val; /** @@ -30,14 +32,18 @@ class DataSerializer implements Serializer { @Override - public byte[] serialize (Object object, Class type) throws SerializationException { - val body = Bytes.allocate(); - ((DataSerializable) object).write(body); - return body.array(); + public byte[] serialize (@NonNull Object object, @NonNull Class type) throws SerializationException { + val serializable = (DataSerializable) object; + val bytes = serializable instanceof TaggedMessage + ? Bytes.allocate().put1B(((TaggedMessage) object).getTag().getCode()) + : Bytes.allocate(); + + serializable.write(bytes); + return bytes.array(); } @Override - public boolean isApplicable (Class type) { + public boolean isApplicable (@NonNull Class type) { return DataSerializable.class.isAssignableFrom(type); } } diff --git a/core/src/main/java/io/appulse/epmd/java/core/mapper/serializer/EnumSerializer.java b/core/src/main/java/io/appulse/epmd/java/core/mapper/serializer/EnumSerializer.java index 63ec690..9fe8d54 100644 --- a/core/src/main/java/io/appulse/epmd/java/core/mapper/serializer/EnumSerializer.java +++ b/core/src/main/java/io/appulse/epmd/java/core/mapper/serializer/EnumSerializer.java @@ -17,11 +17,10 @@ package io.appulse.epmd.java.core.mapper.serializer; import static java.nio.charset.StandardCharsets.ISO_8859_1; -import static lombok.AccessLevel.PRIVATE; import io.appulse.epmd.java.core.mapper.serializer.exception.SerializationException; -import lombok.experimental.FieldDefaults; +import lombok.NonNull; import lombok.val; /** @@ -29,17 +28,16 @@ * @author Artem Labazin * @since 0.0.1 */ -@FieldDefaults(level = PRIVATE, makeFinal = true) class EnumSerializer implements Serializer { @Override - public byte[] serialize (Object object, Class type) throws SerializationException { + public byte[] serialize (@NonNull Object object, @NonNull Class type) throws SerializationException { val string = ((Enum) object).name(); return string.getBytes(ISO_8859_1); } @Override - public boolean isApplicable (Class type) { + public boolean isApplicable (@NonNull Class type) { return type.isEnum(); } } diff --git a/core/src/main/java/io/appulse/epmd/java/core/mapper/serializer/MessageSerializer.java b/core/src/main/java/io/appulse/epmd/java/core/mapper/serializer/MessageSerializer.java index 96ce7f4..81d11bb 100644 --- a/core/src/main/java/io/appulse/epmd/java/core/mapper/serializer/MessageSerializer.java +++ b/core/src/main/java/io/appulse/epmd/java/core/mapper/serializer/MessageSerializer.java @@ -16,22 +16,15 @@ package io.appulse.epmd.java.core.mapper.serializer; -import static io.appulse.epmd.java.core.model.Tag.UNDEFINED; -import static io.appulse.utils.BytesUtils.align; -import static io.appulse.utils.BytesUtils.asBytes; import static java.util.Arrays.asList; -import static java.util.Optional.ofNullable; -import java.nio.ByteBuffer; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; -import io.appulse.epmd.java.core.mapper.Message; -import io.appulse.epmd.java.core.mapper.exception.MessageAnnotationMissingException; import io.appulse.epmd.java.core.mapper.serializer.exception.NoApplicableSerializerException; +import lombok.NonNull; import lombok.SneakyThrows; -import lombok.val; /** * @@ -44,38 +37,19 @@ public final class MessageSerializer { static { SERIALIZERS = new CopyOnWriteArrayList<>(asList( + new RequestSerializer(), new DataSerializer(), new EnumSerializer() )); } @SneakyThrows - public byte[] serialize (Object obj) { - Class type = ofNullable(obj) - .map(Object::getClass) - .orElseThrow(NullPointerException::new); - - val annotation = ofNullable(type) - .map(it -> it.getAnnotation(Message.class)) - .orElseThrow(MessageAnnotationMissingException::new); - - val body = SERIALIZERS.stream() + public byte[] serialize (@NonNull Object obj) { + Class type = obj.getClass(); + return SERIALIZERS.stream() .filter(it -> it.isApplicable(type)) .findAny() .orElseThrow(NoApplicableSerializerException::new) .serialize(obj, type); - - val length = annotation.value() == UNDEFINED - ? body.length - : body.length + Byte.BYTES; - - val buffer = ByteBuffer.allocate(annotation.lengthBytes() + length); - if (annotation.lengthBytes() > 0) { - buffer.put(align(asBytes(length), annotation.lengthBytes())); - } - if (annotation.value() != UNDEFINED) { - buffer.put(annotation.value().getCode()); - } - return buffer.put(body).array(); } } diff --git a/core/src/main/java/io/appulse/epmd/java/core/mapper/serializer/RequestSerializer.java b/core/src/main/java/io/appulse/epmd/java/core/mapper/serializer/RequestSerializer.java new file mode 100644 index 0000000..e52c1e8 --- /dev/null +++ b/core/src/main/java/io/appulse/epmd/java/core/mapper/serializer/RequestSerializer.java @@ -0,0 +1,52 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.appulse.epmd.java.core.mapper.serializer; + +import io.appulse.epmd.java.core.mapper.serializer.exception.SerializationException; +import io.appulse.epmd.java.core.model.request.Request; +import io.appulse.utils.Bytes; + +import lombok.NonNull; +import lombok.val; + +/** + * + * @author Artem Labazin + * @since 0.4.0 + */ +class RequestSerializer implements Serializer { + + @Override + public byte[] serialize (@NonNull Object object, @NonNull Class type) throws SerializationException { + val request = (Request) object; + + val bytes = Bytes.allocate() + .put2B(0) + .put1B(request.getTag().getCode()); + + request.write(bytes); + + return bytes + .put2B(0, bytes.limit() - Short.BYTES) + .array(); + } + + @Override + public boolean isApplicable (@NonNull Class type) { + return Request.class.isAssignableFrom(type); + } +} diff --git a/core/src/main/java/io/appulse/epmd/java/core/mapper/Message.java b/core/src/main/java/io/appulse/epmd/java/core/model/TaggedMessage.java similarity index 55% rename from core/src/main/java/io/appulse/epmd/java/core/mapper/Message.java rename to core/src/main/java/io/appulse/epmd/java/core/model/TaggedMessage.java index 8366a50..49b6c11 100644 --- a/core/src/main/java/io/appulse/epmd/java/core/mapper/Message.java +++ b/core/src/main/java/io/appulse/epmd/java/core/model/TaggedMessage.java @@ -14,29 +14,16 @@ * limitations under the License. */ -package io.appulse.epmd.java.core.mapper; +package io.appulse.epmd.java.core.model; -import static io.appulse.epmd.java.core.model.Tag.UNDEFINED; -import static java.lang.annotation.ElementType.TYPE; -import static java.lang.annotation.RetentionPolicy.RUNTIME; - -import java.lang.annotation.Documented; -import java.lang.annotation.Retention; -import java.lang.annotation.Target; - -import io.appulse.epmd.java.core.model.Tag; +import io.appulse.epmd.java.core.mapper.DataSerializable; /** * * @author Artem Labazin - * @since 0.0.1 + * @since 0.4.0 */ -@Documented -@Target(TYPE) -@Retention(RUNTIME) -public @interface Message { - - int lengthBytes () default 2; +public interface TaggedMessage extends DataSerializable { - Tag value () default UNDEFINED; + Tag getTag (); } diff --git a/core/src/main/java/io/appulse/epmd/java/core/model/request/GetEpmdDump.java b/core/src/main/java/io/appulse/epmd/java/core/model/request/GetEpmdDump.java index 97fa351..b4bbe54 100644 --- a/core/src/main/java/io/appulse/epmd/java/core/model/request/GetEpmdDump.java +++ b/core/src/main/java/io/appulse/epmd/java/core/model/request/GetEpmdDump.java @@ -18,9 +18,8 @@ import static io.appulse.epmd.java.core.model.Tag.DUMP_REQUEST; -import io.appulse.epmd.java.core.mapper.DataSerializable; import io.appulse.epmd.java.core.mapper.ExpectedResponse; -import io.appulse.epmd.java.core.mapper.Message; +import io.appulse.epmd.java.core.model.Tag; import io.appulse.epmd.java.core.model.response.EpmdDump; import io.appulse.utils.Bytes; @@ -32,9 +31,8 @@ * @since 0.0.1 */ @ToString -@Message(DUMP_REQUEST) @ExpectedResponse(EpmdDump.class) -public class GetEpmdDump implements DataSerializable { +public class GetEpmdDump implements Request { @Override public void write (Bytes bytes) { @@ -45,4 +43,9 @@ public void write (Bytes bytes) { public void read (Bytes bytes) { // nothing } + + @Override + public Tag getTag () { + return DUMP_REQUEST; + } } diff --git a/core/src/main/java/io/appulse/epmd/java/core/model/request/GetEpmdInfo.java b/core/src/main/java/io/appulse/epmd/java/core/model/request/GetEpmdInfo.java index 3eba9a6..6edd68f 100644 --- a/core/src/main/java/io/appulse/epmd/java/core/model/request/GetEpmdInfo.java +++ b/core/src/main/java/io/appulse/epmd/java/core/model/request/GetEpmdInfo.java @@ -18,9 +18,8 @@ import static io.appulse.epmd.java.core.model.Tag.NAMES_REQUEST; -import io.appulse.epmd.java.core.mapper.DataSerializable; import io.appulse.epmd.java.core.mapper.ExpectedResponse; -import io.appulse.epmd.java.core.mapper.Message; +import io.appulse.epmd.java.core.model.Tag; import io.appulse.epmd.java.core.model.response.EpmdInfo; import io.appulse.utils.Bytes; @@ -32,9 +31,8 @@ * @since 0.0.1 */ @ToString -@Message(NAMES_REQUEST) @ExpectedResponse(EpmdInfo.class) -public class GetEpmdInfo implements DataSerializable { +public class GetEpmdInfo implements Request { @Override public void write (Bytes bytes) { @@ -45,4 +43,9 @@ public void write (Bytes bytes) { public void read (Bytes bytes) { // nothing } + + @Override + public Tag getTag () { + return NAMES_REQUEST; + } } diff --git a/core/src/main/java/io/appulse/epmd/java/core/model/request/GetNodeInfo.java b/core/src/main/java/io/appulse/epmd/java/core/model/request/GetNodeInfo.java index da8d520..2e3abd0 100644 --- a/core/src/main/java/io/appulse/epmd/java/core/model/request/GetNodeInfo.java +++ b/core/src/main/java/io/appulse/epmd/java/core/model/request/GetNodeInfo.java @@ -20,9 +20,8 @@ import static java.nio.charset.StandardCharsets.ISO_8859_1; import static lombok.AccessLevel.PRIVATE; -import io.appulse.epmd.java.core.mapper.DataSerializable; import io.appulse.epmd.java.core.mapper.ExpectedResponse; -import io.appulse.epmd.java.core.mapper.Message; +import io.appulse.epmd.java.core.model.Tag; import io.appulse.epmd.java.core.model.response.NodeInfo; import io.appulse.utils.Bytes; @@ -40,10 +39,9 @@ @Data @NoArgsConstructor @AllArgsConstructor -@Message(PORT_PLEASE2_REQUEST) @FieldDefaults(level = PRIVATE) @ExpectedResponse(NodeInfo.class) -public class GetNodeInfo implements DataSerializable { +public class GetNodeInfo implements Request { @NonNull String name; @@ -57,4 +55,9 @@ public void write (@NonNull Bytes bytes) { public void read (@NonNull Bytes bytes) { name = bytes.getString(ISO_8859_1); } + + @Override + public Tag getTag () { + return PORT_PLEASE2_REQUEST; + } } diff --git a/core/src/main/java/io/appulse/epmd/java/core/model/request/Kill.java b/core/src/main/java/io/appulse/epmd/java/core/model/request/Kill.java index 0b71050..c6585e5 100644 --- a/core/src/main/java/io/appulse/epmd/java/core/model/request/Kill.java +++ b/core/src/main/java/io/appulse/epmd/java/core/model/request/Kill.java @@ -18,9 +18,8 @@ import static io.appulse.epmd.java.core.model.Tag.KILL_REQUEST; -import io.appulse.epmd.java.core.mapper.DataSerializable; import io.appulse.epmd.java.core.mapper.ExpectedResponse; -import io.appulse.epmd.java.core.mapper.Message; +import io.appulse.epmd.java.core.model.Tag; import io.appulse.epmd.java.core.model.response.KillResult; import io.appulse.utils.Bytes; @@ -32,9 +31,8 @@ * @since 0.0.1 */ @ToString -@Message(KILL_REQUEST) @ExpectedResponse(KillResult.class) -public class Kill implements DataSerializable { +public class Kill implements Request { @Override public void write (Bytes bytes) { @@ -45,4 +43,9 @@ public void write (Bytes bytes) { public void read (Bytes bytes) { // nothing } + + @Override + public Tag getTag () { + return KILL_REQUEST; + } } diff --git a/core/src/main/java/io/appulse/epmd/java/core/model/request/Registration.java b/core/src/main/java/io/appulse/epmd/java/core/model/request/Registration.java index 77dabd2..0caab78 100644 --- a/core/src/main/java/io/appulse/epmd/java/core/model/request/Registration.java +++ b/core/src/main/java/io/appulse/epmd/java/core/model/request/Registration.java @@ -20,11 +20,10 @@ import static java.nio.charset.StandardCharsets.ISO_8859_1; import static lombok.AccessLevel.PRIVATE; -import io.appulse.epmd.java.core.mapper.DataSerializable; import io.appulse.epmd.java.core.mapper.ExpectedResponse; -import io.appulse.epmd.java.core.mapper.Message; import io.appulse.epmd.java.core.model.NodeType; import io.appulse.epmd.java.core.model.Protocol; +import io.appulse.epmd.java.core.model.Tag; import io.appulse.epmd.java.core.model.Version; import io.appulse.epmd.java.core.model.response.RegistrationResult; import io.appulse.utils.Bytes; @@ -48,10 +47,9 @@ @ToString @NoArgsConstructor @AllArgsConstructor -@Message(ALIVE2_REQUEST) @FieldDefaults(level = PRIVATE) @ExpectedResponse(RegistrationResult.class) -public class Registration implements DataSerializable { +public class Registration implements Request { int port; @@ -93,4 +91,9 @@ public void read (@NonNull Bytes bytes) { val length = bytes.getShort(); name = bytes.getString(length, ISO_8859_1); } + + @Override + public Tag getTag () { + return ALIVE2_REQUEST; + } } diff --git a/core/src/main/java/io/appulse/epmd/java/core/model/request/Request.java b/core/src/main/java/io/appulse/epmd/java/core/model/request/Request.java new file mode 100644 index 0000000..8f2d104 --- /dev/null +++ b/core/src/main/java/io/appulse/epmd/java/core/model/request/Request.java @@ -0,0 +1,28 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.appulse.epmd.java.core.model.request; + +import io.appulse.epmd.java.core.model.TaggedMessage; + +/** + * + * @author Artem Labazin + * @since 0.4.0 + */ +public interface Request extends TaggedMessage { + +} diff --git a/core/src/main/java/io/appulse/epmd/java/core/model/request/Stop.java b/core/src/main/java/io/appulse/epmd/java/core/model/request/Stop.java index 625d4c2..79439c6 100644 --- a/core/src/main/java/io/appulse/epmd/java/core/model/request/Stop.java +++ b/core/src/main/java/io/appulse/epmd/java/core/model/request/Stop.java @@ -20,9 +20,8 @@ import static java.nio.charset.StandardCharsets.ISO_8859_1; import static lombok.AccessLevel.PRIVATE; -import io.appulse.epmd.java.core.mapper.DataSerializable; import io.appulse.epmd.java.core.mapper.ExpectedResponse; -import io.appulse.epmd.java.core.mapper.Message; +import io.appulse.epmd.java.core.model.Tag; import io.appulse.epmd.java.core.model.response.StopResult; import io.appulse.utils.Bytes; @@ -42,10 +41,9 @@ @ToString @NoArgsConstructor @AllArgsConstructor -@Message(STOP_REQUEST) @FieldDefaults(level = PRIVATE) @ExpectedResponse(StopResult.class) -public class Stop implements DataSerializable { +public class Stop implements Request { @NonNull String name; @@ -59,4 +57,9 @@ public void write (@NonNull Bytes bytes) { public void read (@NonNull Bytes bytes) { name = bytes.getString(ISO_8859_1); } + + @Override + public Tag getTag () { + return STOP_REQUEST; + } } diff --git a/core/src/main/java/io/appulse/epmd/java/core/model/response/EpmdDump.java b/core/src/main/java/io/appulse/epmd/java/core/model/response/EpmdDump.java index ee0ee61..6c9598d 100644 --- a/core/src/main/java/io/appulse/epmd/java/core/model/response/EpmdDump.java +++ b/core/src/main/java/io/appulse/epmd/java/core/model/response/EpmdDump.java @@ -25,7 +25,6 @@ import java.util.stream.Stream; import io.appulse.epmd.java.core.mapper.DataSerializable; -import io.appulse.epmd.java.core.mapper.Message; import io.appulse.epmd.java.core.model.response.EpmdDump.NodeDump.Status; import io.appulse.utils.Bytes; @@ -46,7 +45,6 @@ */ @Getter @Builder -@Message(lengthBytes = 0) @ToString @NoArgsConstructor @AllArgsConstructor diff --git a/core/src/main/java/io/appulse/epmd/java/core/model/response/EpmdInfo.java b/core/src/main/java/io/appulse/epmd/java/core/model/response/EpmdInfo.java index e144df0..42cfd7c 100644 --- a/core/src/main/java/io/appulse/epmd/java/core/model/response/EpmdInfo.java +++ b/core/src/main/java/io/appulse/epmd/java/core/model/response/EpmdInfo.java @@ -25,7 +25,6 @@ import java.util.stream.Stream; import io.appulse.epmd.java.core.mapper.DataSerializable; -import io.appulse.epmd.java.core.mapper.Message; import io.appulse.utils.Bytes; import lombok.AllArgsConstructor; @@ -46,7 +45,6 @@ */ @Getter @Builder -@Message(lengthBytes = 0) @ToString @NoArgsConstructor @AllArgsConstructor diff --git a/core/src/main/java/io/appulse/epmd/java/core/model/response/KillResult.java b/core/src/main/java/io/appulse/epmd/java/core/model/response/KillResult.java index a43c448..e9c51b6 100644 --- a/core/src/main/java/io/appulse/epmd/java/core/model/response/KillResult.java +++ b/core/src/main/java/io/appulse/epmd/java/core/model/response/KillResult.java @@ -16,8 +16,6 @@ package io.appulse.epmd.java.core.model.response; -import io.appulse.epmd.java.core.mapper.Message; - import lombok.ToString; /** @@ -26,7 +24,6 @@ * @since 0.0.1 */ @ToString -@Message(lengthBytes = 0) public enum KillResult { OK, diff --git a/core/src/main/java/io/appulse/epmd/java/core/model/response/NodeInfo.java b/core/src/main/java/io/appulse/epmd/java/core/model/response/NodeInfo.java index 82d8313..680c2b6 100644 --- a/core/src/main/java/io/appulse/epmd/java/core/model/response/NodeInfo.java +++ b/core/src/main/java/io/appulse/epmd/java/core/model/response/NodeInfo.java @@ -25,10 +25,10 @@ import java.util.Optional; -import io.appulse.epmd.java.core.mapper.DataSerializable; -import io.appulse.epmd.java.core.mapper.Message; import io.appulse.epmd.java.core.model.NodeType; import io.appulse.epmd.java.core.model.Protocol; +import io.appulse.epmd.java.core.model.Tag; +import io.appulse.epmd.java.core.model.TaggedMessage; import io.appulse.epmd.java.core.model.Version; import io.appulse.utils.Bytes; @@ -46,9 +46,8 @@ */ @Getter @ToString -@Message(value = PORT2_RESPONSE, lengthBytes = 0) @FieldDefaults(level = PRIVATE) -public class NodeInfo implements DataSerializable { +public class NodeInfo implements TaggedMessage { boolean ok; @@ -120,4 +119,9 @@ public void read (@NonNull Bytes bytes) { val length = bytes.getShort(); name = of(bytes.getString(length, ISO_8859_1)); } + + @Override + public Tag getTag () { + return PORT2_RESPONSE; + } } diff --git a/core/src/main/java/io/appulse/epmd/java/core/model/response/RegistrationResult.java b/core/src/main/java/io/appulse/epmd/java/core/model/response/RegistrationResult.java index 9c68967..dcacdd3 100644 --- a/core/src/main/java/io/appulse/epmd/java/core/model/response/RegistrationResult.java +++ b/core/src/main/java/io/appulse/epmd/java/core/model/response/RegistrationResult.java @@ -19,8 +19,8 @@ import static io.appulse.epmd.java.core.model.Tag.ALIVE2_RESPONSE; import static lombok.AccessLevel.PRIVATE; -import io.appulse.epmd.java.core.mapper.DataSerializable; -import io.appulse.epmd.java.core.mapper.Message; +import io.appulse.epmd.java.core.model.Tag; +import io.appulse.epmd.java.core.model.TaggedMessage; import io.appulse.utils.Bytes; import lombok.AllArgsConstructor; @@ -41,9 +41,8 @@ @ToString @NoArgsConstructor @AllArgsConstructor -@Message(value = ALIVE2_RESPONSE, lengthBytes = 0) @FieldDefaults(level = PRIVATE) -public class RegistrationResult implements DataSerializable { +public class RegistrationResult implements TaggedMessage { boolean ok; @@ -66,4 +65,9 @@ public void read (@NonNull Bytes bytes) { creation = bytes.getUnsignedShort(); } } + + @Override + public Tag getTag () { + return ALIVE2_RESPONSE; + } } diff --git a/core/src/main/java/io/appulse/epmd/java/core/model/response/StopResult.java b/core/src/main/java/io/appulse/epmd/java/core/model/response/StopResult.java index e689726..c48f91f 100644 --- a/core/src/main/java/io/appulse/epmd/java/core/model/response/StopResult.java +++ b/core/src/main/java/io/appulse/epmd/java/core/model/response/StopResult.java @@ -16,8 +16,6 @@ package io.appulse.epmd.java.core.model.response; -import io.appulse.epmd.java.core.mapper.Message; - import lombok.ToString; /** @@ -26,7 +24,6 @@ * @since 0.0.1 */ @ToString -@Message(lengthBytes = 0) public enum StopResult { STOPPED, diff --git a/pom.xml b/pom.xml index 6d4b005..277f709 100644 --- a/pom.xml +++ b/pom.xml @@ -24,7 +24,7 @@ limitations under the License. io.appulse epmd-java - 0.3.3 + 0.4.0 pom @@ -68,7 +68,7 @@ limitations under the License. https://github.com/appulse-projects/epmd-java scm:git:https://github.com/appulse-projects/epmd-java.git scm:git:https://github.com/appulse-projects/epmd-java.git - 0.3.3 + 0.4.0 @@ -117,7 +117,7 @@ limitations under the License. io.appulse utils-java - 1.3.1 + 1.3.2 diff --git a/server/README.md b/server/README.md index 7b7a7ba..1c50193 100644 --- a/server/README.md +++ b/server/README.md @@ -18,7 +18,7 @@ Include the dependency to your project's pom.xml file: io.appulse.epmd.java server - 0.3.3 + 0.4.0 ... @@ -27,5 +27,5 @@ Include the dependency to your project's pom.xml file: or Gradle: ```groovy -compile 'io.appulse.epmd.java:server:0.3.3' +compile 'io.appulse.epmd.java:server:0.4.0' ``` diff --git a/server/pom.xml b/server/pom.xml index c5405cd..b8fd606 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -25,7 +25,7 @@ limitations under the License. io.appulse epmd-java - 0.3.3 + 0.4.0 io.appulse.epmd.java @@ -51,10 +51,16 @@ limitations under the License. 1.72 + + io.netty + netty-all + 4.1.21.Final + + org.mockito mockito-core - 2.13.0 + 2.15.0 test diff --git a/server/src/main/java/io/appulse/epmd/java/server/command/server/Node.java b/server/src/main/java/io/appulse/epmd/java/server/command/server/Node.java index 0c81222..a83cba2 100644 --- a/server/src/main/java/io/appulse/epmd/java/server/command/server/Node.java +++ b/server/src/main/java/io/appulse/epmd/java/server/command/server/Node.java @@ -16,14 +16,11 @@ package io.appulse.epmd.java.server.command.server; -import java.net.Socket; - import io.appulse.epmd.java.core.model.NodeType; import io.appulse.epmd.java.core.model.Protocol; import io.appulse.epmd.java.core.model.Version; import lombok.Builder; -import lombok.EqualsAndHashCode; import lombok.NonNull; import lombok.Value; @@ -34,7 +31,6 @@ */ @Value @Builder -@EqualsAndHashCode(exclude = "socket") public class Node { @NonNull @@ -55,7 +51,4 @@ public class Node { Version low; int creation; - - @NonNull - Socket socket; } diff --git a/server/src/main/java/io/appulse/epmd/java/server/command/server/Request.java b/server/src/main/java/io/appulse/epmd/java/server/command/server/Request.java deleted file mode 100644 index cb5d15e..0000000 --- a/server/src/main/java/io/appulse/epmd/java/server/command/server/Request.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Copyright 2018 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.appulse.epmd.java.server.command.server; - -import java.net.Socket; - -import io.appulse.epmd.java.core.mapper.deserializer.MessageDeserializer; -import io.appulse.epmd.java.core.mapper.serializer.MessageSerializer; -import io.appulse.utils.Bytes; - -import lombok.Builder; -import lombok.NonNull; -import lombok.SneakyThrows; -import lombok.Value; - -@Value -@Builder -public final class Request { - - private static final MessageSerializer SERIALIZER; - - private static final MessageDeserializer DESERIALIZER; - - static { - SERIALIZER = new MessageSerializer(); - DESERIALIZER = new MessageDeserializer(); - } - - @NonNull - Context context; - - @NonNull - Socket socket; - - @NonNull - Bytes payload; - - public T parse (@NonNull Class type) { - return DESERIALIZER.deserialize(payload.array(), type); - } - - @SneakyThrows - public void respond (@NonNull Object response) { - byte[] bytes = SERIALIZER.serialize(response); - socket.getOutputStream().write(bytes); - } - - public void respondAndClose (@NonNull Object response) { - respond(response); - closeConnection(); - } - - @SneakyThrows - public void closeConnection () { - socket.close(); - } -} diff --git a/server/src/main/java/io/appulse/epmd/java/server/command/server/ServerCommandExecutor.java b/server/src/main/java/io/appulse/epmd/java/server/command/server/ServerCommandExecutor.java index ada712a..1f8bd95 100644 --- a/server/src/main/java/io/appulse/epmd/java/server/command/server/ServerCommandExecutor.java +++ b/server/src/main/java/io/appulse/epmd/java/server/command/server/ServerCommandExecutor.java @@ -16,20 +16,28 @@ package io.appulse.epmd.java.server.command.server; +import static io.netty.channel.ChannelOption.SO_BACKLOG; +import static io.netty.channel.ChannelOption.SO_KEEPALIVE; +import static io.netty.channel.ChannelOption.TCP_NODELAY; import static java.util.Optional.of; import static lombok.AccessLevel.PRIVATE; import java.io.Closeable; -import java.io.IOException; -import java.net.ServerSocket; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import io.appulse.epmd.java.server.cli.CommonOptions; import io.appulse.epmd.java.server.command.AbstractCommandExecutor; import io.appulse.epmd.java.server.command.CommandOptions; - +import io.appulse.epmd.java.server.command.server.handler.RequestDecoder; +import io.appulse.epmd.java.server.command.server.handler.ResponseEncoder; +import io.appulse.epmd.java.server.command.server.handler.ServerHandler; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; import lombok.NonNull; import lombok.SneakyThrows; import lombok.experimental.FieldDefaults; @@ -46,11 +54,11 @@ @FieldDefaults(level = PRIVATE, makeFinal = true) public class ServerCommandExecutor extends AbstractCommandExecutor implements Closeable { - ServerSocket serverSocket; + ServerState serverState; - ExecutorService executor; + EventLoopGroup bossGroup; - Context context; + EventLoopGroup workerGroup; @NonFinal volatile boolean closed; @@ -63,42 +71,45 @@ public ServerCommandExecutor (CommonOptions commonOptions, @NonNull CommandOptio .map(it -> (ServerCommandOptions) it) .orElse(new ServerCommandOptions()); - serverSocket = new ServerSocket(getPort()); - - executor = Executors.newCachedThreadPool(); - - context = Context.builder() + serverState = ServerState.builder() .nodes(new ConcurrentHashMap<>()) .commonOptions(commonOptions) .serverOptions(serverOptions) .build(); + + bossGroup = new NioEventLoopGroup(1); + workerGroup = new NioEventLoopGroup(2); } @Override - @SneakyThrows public void execute () { - log.debug("Server before start context: {}", context); + log.debug("Starting server on port {}", getPort()); try { - while (true) { - log.debug("Waiting new connection"); - - val socket = serverSocket.accept(); - log.debug("New connection was accepted"); - - if (context.getAddresses().contains(socket.getInetAddress())) { - socket.close(); - continue; - } - - if (isDebug()) { - System.out.println(); - } - - val handler = new ServerWorker(socket, context); - executor.execute(handler); - } + new ServerBootstrap() + .group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .childHandler(new ChannelInitializer() { + + @Override + public void initChannel (SocketChannel channel) throws Exception { + channel.pipeline() + .addLast("decoder", new RequestDecoder()) + .addLast("encoder", new ResponseEncoder()) + .addLast("handler", new ServerHandler(serverState)); + } + }) + .option(SO_BACKLOG, 128) + .childOption(SO_KEEPALIVE, true) + .childOption(TCP_NODELAY, true) + .bind(getPort()) + .sync() + // Wait until the server socket is closed. + .channel().closeFuture().sync(); + } catch (InterruptedException ex) { + log.error("Server work exception", ex); } finally { - close(); + workerGroup.shutdownGracefully(); + bossGroup.shutdownGracefully(); } } @@ -106,23 +117,17 @@ public void execute () { @SneakyThrows public void close () { if (closed) { + log.debug("Server was already closed"); return; } closed = true; - log.debug("Closing server..."); - - try { - serverSocket.close(); - } catch (IOException ex) { - } - log.debug("Server socket was closed"); - - if (!executor.isShutdown()) { - executor.shutdown(); - } + workerGroup.shutdownGracefully(); + bossGroup.shutdownGracefully(); + log.debug("Boss and workers groups are closed"); - context.getNodes().clear(); + serverState.getNodes().clear(); + log.debug("All nodes are removed"); log.info("Server was closed"); } diff --git a/server/src/main/java/io/appulse/epmd/java/server/command/server/Context.java b/server/src/main/java/io/appulse/epmd/java/server/command/server/ServerState.java similarity index 92% rename from server/src/main/java/io/appulse/epmd/java/server/command/server/Context.java rename to server/src/main/java/io/appulse/epmd/java/server/command/server/ServerState.java index 1480df2..d6fedbc 100644 --- a/server/src/main/java/io/appulse/epmd/java/server/command/server/Context.java +++ b/server/src/main/java/io/appulse/epmd/java/server/command/server/ServerState.java @@ -25,9 +25,14 @@ import lombok.Value; import lombok.experimental.Delegate; +/** + * + * @author Artem Labazin + * @since 0.4.0 + */ @Value @Builder -public class Context { +public class ServerState { @NonNull Map nodes; diff --git a/server/src/main/java/io/appulse/epmd/java/server/command/server/ServerWorker.java b/server/src/main/java/io/appulse/epmd/java/server/command/server/ServerWorker.java deleted file mode 100644 index e947d64..0000000 --- a/server/src/main/java/io/appulse/epmd/java/server/command/server/ServerWorker.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Copyright 2018 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.appulse.epmd.java.server.command.server; - -import static io.appulse.epmd.java.core.model.Tag.UNDEFINED; -import static lombok.AccessLevel.PRIVATE; - -import java.net.Socket; - -import io.appulse.epmd.java.core.model.Tag; -import io.appulse.epmd.java.server.command.server.handler.RequestHandler; -import io.appulse.utils.Bytes; -import io.appulse.utils.SocketUtils; - -import lombok.NonNull; -import lombok.RequiredArgsConstructor; -import lombok.experimental.FieldDefaults; -import lombok.extern.slf4j.Slf4j; -import lombok.val; - -/** - * - * @author Artem Labazin - * @since 0.3.2 - */ -@Slf4j -@RequiredArgsConstructor -@FieldDefaults(level = PRIVATE, makeFinal = true) -class ServerWorker implements Runnable { - - @NonNull - Socket socket; - - @NonNull - Context context; - - @Override - public void run () { - log.debug("Start server worker"); - - val length = SocketUtils.readBytes(socket, Short.BYTES).getShort(); - log.debug("Incoming message length is: {}", length); - - val body = SocketUtils.read(socket, length); - log.debug("Readed message body ({} bytes)", body.length); - - val bytes = Bytes.allocate(Short.BYTES + length) - .put2B(length) - .put(body); - - val tag = Tag.of(bytes.getByte(2)); - if (tag == UNDEFINED) { - log.error("Undefined incoming message tag"); - throw new IllegalArgumentException(); - } - log.debug("Incoming message tag: {}", tag); - - val handler = RequestHandler.ALL.get(tag); - if (handler == null) { - log.error("There is no handler for tag {}", tag); - throw new IllegalArgumentException(); - } - log.debug("Request's handler: {}", handler); - - val request = Request.builder() - .context(context) - .socket(socket) - .payload(bytes.position(0)) - .build(); - - log.debug("Incoming request: {}", request); - - handler.handle(request); - - log.debug("End server worker"); - } -} diff --git a/server/src/main/java/io/appulse/epmd/java/server/command/server/handler/KillRequestHandler.java b/server/src/main/java/io/appulse/epmd/java/server/command/server/handler/KillRequestHandler.java deleted file mode 100644 index 9733752..0000000 --- a/server/src/main/java/io/appulse/epmd/java/server/command/server/handler/KillRequestHandler.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright 2018 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.appulse.epmd.java.server.command.server.handler; - -import static io.appulse.epmd.java.core.model.Tag.KILL_REQUEST; -import static io.appulse.epmd.java.core.model.response.KillResult.OK; - -import io.appulse.epmd.java.core.model.Tag; -import io.appulse.epmd.java.server.command.server.Request; - -import lombok.NonNull; - -class KillRequestHandler implements RequestHandler { - - @Override - public void handle (@NonNull Request request) { - if (!request.getContext().getServerOptions().isChecks()) { - request.closeConnection(); - return; - } - - request.getContext().getNodes().clear(); - request.respondAndClose(OK); - Runtime.getRuntime().exit(1); - } - - @Override - public Tag getTag () { - return KILL_REQUEST; - } -} diff --git a/server/src/main/java/io/appulse/epmd/java/server/command/server/handler/RequestDecoder.java b/server/src/main/java/io/appulse/epmd/java/server/command/server/handler/RequestDecoder.java new file mode 100644 index 0000000..83dda72 --- /dev/null +++ b/server/src/main/java/io/appulse/epmd/java/server/command/server/handler/RequestDecoder.java @@ -0,0 +1,107 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.appulse.epmd.java.server.command.server.handler; + +import java.util.List; + +import io.appulse.epmd.java.core.mapper.deserializer.MessageDeserializer; +import io.appulse.epmd.java.core.model.Tag; +import io.appulse.epmd.java.core.model.request.GetEpmdDump; +import io.appulse.epmd.java.core.model.request.GetEpmdInfo; +import io.appulse.epmd.java.core.model.request.GetNodeInfo; +import io.appulse.epmd.java.core.model.request.Kill; +import io.appulse.epmd.java.core.model.request.Registration; +import io.appulse.epmd.java.core.model.request.Request; +import io.appulse.epmd.java.core.model.request.Stop; +import io.appulse.utils.Bytes; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ReplayingDecoder; +import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; +import lombok.val; + +/** + * + * @author Artem Labazin + * @since 0.4.0 + */ +@Slf4j +public class RequestDecoder extends ReplayingDecoder { + + private static final MessageDeserializer DESERIALIZER; + + static { + DESERIALIZER = new MessageDeserializer(); + } + + @Override + public void exceptionCaught (ChannelHandlerContext context, Throwable cause) throws Exception { + val message = String.format("Error during channel connection with %s", + context.channel().remoteAddress().toString()); + + log.error(message, cause); + context.close(); + } + + @Override + protected void decode (ChannelHandlerContext context, ByteBuf buffer, List out) throws Exception { + val length = buffer.readShort(); + log.debug("Received message length is: {}", length); + if (length == 0) { + return; + } + + ByteBuf buf = buffer.readBytes(length); + val body = new byte[length]; + buf.getBytes(0, body); + log.debug("Readed message body:\n{}", body); + + Bytes bytes = Bytes.allocate(Short.BYTES + length) + .put2B(length) + .put(body); + + bytes.position(0); + + val request = parse(bytes); + log.debug("Received request: {}", request); + out.add(request); + } + + private Request parse (@NonNull Bytes bytes) { + val tag = Tag.of(bytes.getByte(2)); + switch (tag) { + case DUMP_REQUEST: + return DESERIALIZER.deserialize(bytes, GetEpmdDump.class); + case NAMES_REQUEST: + return DESERIALIZER.deserialize(bytes, GetEpmdInfo.class); + case PORT_PLEASE2_REQUEST: + return DESERIALIZER.deserialize(bytes, GetNodeInfo.class); + case KILL_REQUEST: + return DESERIALIZER.deserialize(bytes, Kill.class); + case ALIVE2_REQUEST: + return DESERIALIZER.deserialize(bytes, Registration.class); + case STOP_REQUEST: + return DESERIALIZER.deserialize(bytes, Stop.class); + default: + val message = String.format("Unexpected decoded request tag: '%s'", tag); + log.error(message); + throw new IllegalArgumentException(message); + } + } +} diff --git a/server/src/main/java/io/appulse/epmd/java/server/command/server/handler/ResponseEncoder.java b/server/src/main/java/io/appulse/epmd/java/server/command/server/handler/ResponseEncoder.java new file mode 100644 index 0000000..5612c2e --- /dev/null +++ b/server/src/main/java/io/appulse/epmd/java/server/command/server/handler/ResponseEncoder.java @@ -0,0 +1,56 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.appulse.epmd.java.server.command.server.handler; + +import io.appulse.epmd.java.core.mapper.serializer.MessageSerializer; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandler.Sharable; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToByteEncoder; +import lombok.extern.slf4j.Slf4j; +import lombok.val; + +/** + * + * @author Artem Labazin + * @since 0.4.0 + */ +@Slf4j +@Sharable +public class ResponseEncoder extends MessageToByteEncoder { + + private final MessageSerializer serializer = new MessageSerializer(); + + @Override + public void exceptionCaught (ChannelHandlerContext context, Throwable cause) throws Exception { + val message = String.format("Error during channel connection with %s", + context.channel().remoteAddress().toString()); + + log.error(message, cause); + context.close(); + } + + @Override + protected void encode (ChannelHandlerContext context, Object response, ByteBuf out) throws Exception { + log.debug("Encoding message {} for {}", response, context.channel().remoteAddress()); + val bytes = serializer.serialize(response); + log.debug("Output bytes:\n{}", bytes); + out.writeBytes(bytes); + log.debug("Message was sent"); + } +} diff --git a/server/src/main/java/io/appulse/epmd/java/server/command/server/handler/ServerHandler.java b/server/src/main/java/io/appulse/epmd/java/server/command/server/handler/ServerHandler.java new file mode 100644 index 0000000..e48c0d7 --- /dev/null +++ b/server/src/main/java/io/appulse/epmd/java/server/command/server/handler/ServerHandler.java @@ -0,0 +1,74 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.appulse.epmd.java.server.command.server.handler; + +import static lombok.AccessLevel.PRIVATE; + +import io.appulse.epmd.java.core.model.request.Request; +import io.appulse.epmd.java.server.command.server.ServerState; +import io.appulse.epmd.java.server.command.server.handler.command.RequestHandler; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; +import lombok.experimental.FieldDefaults; +import lombok.extern.slf4j.Slf4j; +import lombok.val; + +/** + * + * @author Artem Labazin + * @since 0.4.0 + */ +@Slf4j +@RequiredArgsConstructor +@FieldDefaults(level = PRIVATE, makeFinal = true) +public class ServerHandler extends ChannelInboundHandlerAdapter { + + @NonNull + ServerState serverState; + + @Override + public void handlerAdded (ChannelHandlerContext ctx) throws Exception { + log.debug("Server handler was added"); + } + + @Override + public void channelRead (ChannelHandlerContext context, Object obj) throws Exception { + val request = (Request) obj; + + val handler = RequestHandler.ALL.get(request.getTag()); + if (handler == null) { + val message = String.format("There is no handler for tag: '%s'", request.getTag()); + log.error(message); + throw new IllegalArgumentException(message); + } + log.debug("Request's handler: {}", handler); + + handler.handle(request, context, serverState); + } + + @Override + public void exceptionCaught (ChannelHandlerContext context, Throwable cause) throws Exception { + val message = String.format("Error during channel connection with %s", + context.channel().remoteAddress().toString()); + + log.error(message, cause); + context.close(); + } +} diff --git a/server/src/main/java/io/appulse/epmd/java/server/command/server/handler/GetEpmdDumpRequestHandler.java b/server/src/main/java/io/appulse/epmd/java/server/command/server/handler/command/GetEpmdDumpRequestHandler.java similarity index 64% rename from server/src/main/java/io/appulse/epmd/java/server/command/server/handler/GetEpmdDumpRequestHandler.java rename to server/src/main/java/io/appulse/epmd/java/server/command/server/handler/command/GetEpmdDumpRequestHandler.java index a2f6213..ce55cfb 100644 --- a/server/src/main/java/io/appulse/epmd/java/server/command/server/handler/GetEpmdDumpRequestHandler.java +++ b/server/src/main/java/io/appulse/epmd/java/server/command/server/handler/command/GetEpmdDumpRequestHandler.java @@ -14,34 +14,50 @@ * limitations under the License. */ -package io.appulse.epmd.java.server.command.server.handler; +package io.appulse.epmd.java.server.command.server.handler.command; import static io.appulse.epmd.java.core.model.Tag.DUMP_REQUEST; import static io.appulse.epmd.java.core.model.response.EpmdDump.NodeDump.Status.ACTIVE; +import static io.netty.channel.ChannelFutureListener.CLOSE; import io.appulse.epmd.java.core.model.Tag; +import io.appulse.epmd.java.core.model.request.Request; import io.appulse.epmd.java.core.model.response.EpmdDump; import io.appulse.epmd.java.core.model.response.EpmdDump.EpmdDumpBuilder; import io.appulse.epmd.java.core.model.response.EpmdDump.NodeDump; -import io.appulse.epmd.java.server.command.server.Request; +import io.appulse.epmd.java.server.command.server.ServerState; +import io.netty.channel.ChannelHandlerContext; import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; +import lombok.val; +/** + * + * @author Artem Labazin + * @since 0.4.0 + */ +@Slf4j class GetEpmdDumpRequestHandler implements RequestHandler { @Override - public void handle (@NonNull Request request) { + public void handle (@NonNull Request request, @NonNull ChannelHandlerContext context, @NonNull ServerState state) { + log.debug("Processing {}", request); + EpmdDumpBuilder builder = EpmdDump.builder() - .port(request.getContext().getPort()); + .port(state.getPort()); - request.getContext() - .getNodes() + state.getNodes() .values() .stream() .map(it -> new NodeDump(ACTIVE, it.getName(), it.getPort(), -1)) .forEach(builder::node); - request.respondAndClose(builder.build()); + val response = builder.build(); + + log.debug("Response: {}", response); + context.writeAndFlush(response) + .addListener(CLOSE); } @Override diff --git a/server/src/main/java/io/appulse/epmd/java/server/command/server/handler/GetEpmdInfoRequestHandler.java b/server/src/main/java/io/appulse/epmd/java/server/command/server/handler/command/GetEpmdInfoRequestHandler.java similarity index 64% rename from server/src/main/java/io/appulse/epmd/java/server/command/server/handler/GetEpmdInfoRequestHandler.java rename to server/src/main/java/io/appulse/epmd/java/server/command/server/handler/command/GetEpmdInfoRequestHandler.java index 642a685..1d5e2b4 100644 --- a/server/src/main/java/io/appulse/epmd/java/server/command/server/handler/GetEpmdInfoRequestHandler.java +++ b/server/src/main/java/io/appulse/epmd/java/server/command/server/handler/command/GetEpmdInfoRequestHandler.java @@ -14,27 +14,39 @@ * limitations under the License. */ -package io.appulse.epmd.java.server.command.server.handler; +package io.appulse.epmd.java.server.command.server.handler.command; import static io.appulse.epmd.java.core.model.Tag.NAMES_REQUEST; +import static io.netty.channel.ChannelFutureListener.CLOSE; import io.appulse.epmd.java.core.model.Tag; +import io.appulse.epmd.java.core.model.request.Request; import io.appulse.epmd.java.core.model.response.EpmdInfo; import io.appulse.epmd.java.core.model.response.EpmdInfo.EpmdInfoBuilder; import io.appulse.epmd.java.core.model.response.EpmdInfo.NodeDescription; -import io.appulse.epmd.java.server.command.server.Request; +import io.appulse.epmd.java.server.command.server.ServerState; +import io.netty.channel.ChannelHandlerContext; import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; +import lombok.val; +/** + * + * @author Artem Labazin + * @since 0.4.0 + */ +@Slf4j class GetEpmdInfoRequestHandler implements RequestHandler { @Override - public void handle (@NonNull Request request) { + public void handle (@NonNull Request request, @NonNull ChannelHandlerContext context, @NonNull ServerState state) { + log.debug("Processing {}", request); + EpmdInfoBuilder builder = EpmdInfo.builder() - .port(request.getContext().getPort()); + .port(state.getPort()); - request.getContext() - .getNodes() + state.getNodes() .values() .stream() .map(it -> NodeDescription.builder() @@ -44,7 +56,11 @@ public void handle (@NonNull Request request) { ) .forEach(builder::node); - request.respondAndClose(builder.build()); + val response = builder.build(); + + log.debug("Response: {}", response); + context.writeAndFlush(response) + .addListener(CLOSE); } @Override diff --git a/server/src/main/java/io/appulse/epmd/java/server/command/server/handler/GetNodeInfoRequestHandler.java b/server/src/main/java/io/appulse/epmd/java/server/command/server/handler/command/GetNodeInfoRequestHandler.java similarity index 61% rename from server/src/main/java/io/appulse/epmd/java/server/command/server/handler/GetNodeInfoRequestHandler.java rename to server/src/main/java/io/appulse/epmd/java/server/command/server/handler/command/GetNodeInfoRequestHandler.java index 99b28bf..4089508 100644 --- a/server/src/main/java/io/appulse/epmd/java/server/command/server/handler/GetNodeInfoRequestHandler.java +++ b/server/src/main/java/io/appulse/epmd/java/server/command/server/handler/command/GetNodeInfoRequestHandler.java @@ -14,26 +14,43 @@ * limitations under the License. */ -package io.appulse.epmd.java.server.command.server.handler; +package io.appulse.epmd.java.server.command.server.handler.command; import static io.appulse.epmd.java.core.model.Tag.PORT_PLEASE2_REQUEST; +import static io.netty.channel.ChannelFutureListener.CLOSE; import io.appulse.epmd.java.core.model.Tag; import io.appulse.epmd.java.core.model.request.GetNodeInfo; +import io.appulse.epmd.java.core.model.request.Request; import io.appulse.epmd.java.core.model.response.NodeInfo; -import io.appulse.epmd.java.server.command.server.Request; +import io.appulse.epmd.java.server.command.server.ServerState; +import io.netty.channel.ChannelHandlerContext; import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; import lombok.val; +/** + * + * @author Artem Labazin + * @since 0.4.0 + */ +@Slf4j class GetNodeInfoRequestHandler implements RequestHandler { @Override - public void handle (@NonNull Request request) { - val getNodeInfo = request.parse(GetNodeInfo.class); + public void handle (@NonNull Request request, @NonNull ChannelHandlerContext context, @NonNull ServerState state) { + log.debug("Processing {}", request); + + if (!(request instanceof GetNodeInfo)) { + val message = String.format("Invalid request object:%n%s", request); + log.error(message); + throw new IllegalArgumentException(message); + } + + val getNodeInfo = (GetNodeInfo) request; - val node = request.getContext() - .getNodes() + val node = state.getNodes() .get(getNodeInfo.getName()); NodeInfo response; @@ -52,8 +69,10 @@ public void handle (@NonNull Request request) { .name(node.getName()) .build(); } + log.debug("Response: {}", response); - request.respondAndClose(response); + context.writeAndFlush(response) + .addListener(CLOSE); } @Override diff --git a/server/src/main/java/io/appulse/epmd/java/server/command/server/handler/command/KillRequestHandler.java b/server/src/main/java/io/appulse/epmd/java/server/command/server/handler/command/KillRequestHandler.java new file mode 100644 index 0000000..1ec8b84 --- /dev/null +++ b/server/src/main/java/io/appulse/epmd/java/server/command/server/handler/command/KillRequestHandler.java @@ -0,0 +1,64 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.appulse.epmd.java.server.command.server.handler.command; + +import static io.appulse.epmd.java.core.model.Tag.KILL_REQUEST; +import static io.appulse.epmd.java.core.model.response.KillResult.OK; +import static io.netty.channel.ChannelFutureListener.CLOSE; + +import io.appulse.epmd.java.core.model.Tag; +import io.appulse.epmd.java.core.model.request.Request; +import io.appulse.epmd.java.server.command.server.ServerState; + +import io.netty.channel.ChannelHandlerContext; +import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; + +/** + * + * @author Artem Labazin + * @since 0.4.0 + */ +@Slf4j +class KillRequestHandler implements RequestHandler { + + @Override + public void handle (@NonNull Request request, @NonNull ChannelHandlerContext context, @NonNull ServerState state) { + log.debug("Processing {}", request); + + if (!state.getServerOptions().isChecks()) { + log.warn("Option '-relaxed_command_check' is false, but someone trying to kill this EPMD"); + context.close(); + return; + } + + state.getNodes().clear(); + log.debug("Nodes registry was cleared"); + + context.writeAndFlush(OK) + .addListener(CLOSE) + .addListener(future -> { + log.debug("Shutting down this EPMD"); + Runtime.getRuntime().exit(1); + }); + } + + @Override + public Tag getTag () { + return KILL_REQUEST; + } +} diff --git a/server/src/main/java/io/appulse/epmd/java/server/command/server/handler/RegistrationRequestHandler.java b/server/src/main/java/io/appulse/epmd/java/server/command/server/handler/command/RegistrationRequestHandler.java similarity index 62% rename from server/src/main/java/io/appulse/epmd/java/server/command/server/handler/RegistrationRequestHandler.java rename to server/src/main/java/io/appulse/epmd/java/server/command/server/handler/command/RegistrationRequestHandler.java index 7d4141b..6a75e3b 100644 --- a/server/src/main/java/io/appulse/epmd/java/server/command/server/handler/RegistrationRequestHandler.java +++ b/server/src/main/java/io/appulse/epmd/java/server/command/server/handler/command/RegistrationRequestHandler.java @@ -14,33 +14,49 @@ * limitations under the License. */ -package io.appulse.epmd.java.server.command.server.handler; +package io.appulse.epmd.java.server.command.server.handler.command; import static io.appulse.epmd.java.core.model.Tag.ALIVE2_REQUEST; +import static io.netty.channel.ChannelFutureListener.CLOSE; import java.util.concurrent.atomic.AtomicInteger; import io.appulse.epmd.java.core.model.Tag; import io.appulse.epmd.java.core.model.request.Registration; +import io.appulse.epmd.java.core.model.request.Request; import io.appulse.epmd.java.core.model.response.RegistrationResult; import io.appulse.epmd.java.server.command.server.Node; -import io.appulse.epmd.java.server.command.server.Request; +import io.appulse.epmd.java.server.command.server.ServerState; +import io.netty.channel.ChannelHandlerContext; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; import lombok.val; +/** + * + * @author Artem Labazin + * @since 0.4.0 + */ @Slf4j class RegistrationRequestHandler implements RequestHandler { AtomicInteger count = new AtomicInteger(0); @Override - public void handle (@NonNull Request request) { - Registration registration = request.parse(Registration.class); + public void handle (@NonNull Request request, @NonNull ChannelHandlerContext context, @NonNull ServerState state) { + if (!(request instanceof Registration)) { + val message = String.format("Invalid request object:%n%s", request); + log.error(message); + throw new IllegalArgumentException(message); + } + + Registration registration = (Registration) request; log.info("Registering {} node...", registration.getName()); - val node = register(request, registration); + val node = register(registration, state); + log.debug("Registration result: {}", node); + val response = RegistrationResult.builder() .ok(node != null) .creation(node != null @@ -48,10 +64,17 @@ public void handle (@NonNull Request request) { : 0 ) .build(); + log.debug("Response: {}", response); - request.respond(response); - if (node == null) { - request.closeConnection(); + val future = context.writeAndFlush(response); + if (!response.isOk()) { + future.addListener(CLOSE); + } else { + context.channel().closeFuture().addListener(f -> { + val name = node.getName(); + state.getNodes().remove(name); + log.debug("Node {} was disconnected", name); + }); } } @@ -60,8 +83,8 @@ public Tag getTag () { return ALIVE2_REQUEST; } - private Node register (Request request, Registration registration) { - return request.getContext().getNodes() + private Node register (Registration registration, ServerState serverState) { + return serverState.getNodes() .compute(registration.getName(), (key, value) -> { if (value != null) { return null; @@ -74,7 +97,6 @@ private Node register (Request request, Registration registration) { .high(registration.getHigh()) .low(registration.getLow()) .creation(count.incrementAndGet()) - .socket(request.getSocket()) .build(); }); } diff --git a/server/src/main/java/io/appulse/epmd/java/server/command/server/handler/RequestHandler.java b/server/src/main/java/io/appulse/epmd/java/server/command/server/handler/command/RequestHandler.java similarity index 67% rename from server/src/main/java/io/appulse/epmd/java/server/command/server/handler/RequestHandler.java rename to server/src/main/java/io/appulse/epmd/java/server/command/server/handler/command/RequestHandler.java index e0de95a..60d1032 100644 --- a/server/src/main/java/io/appulse/epmd/java/server/command/server/handler/RequestHandler.java +++ b/server/src/main/java/io/appulse/epmd/java/server/command/server/handler/command/RequestHandler.java @@ -14,18 +14,24 @@ * limitations under the License. */ -package io.appulse.epmd.java.server.command.server.handler; +package io.appulse.epmd.java.server.command.server.handler.command; -import static java.util.stream.Collectors.toMap; +import static java.util.stream.Collectors.toConcurrentMap; import java.util.Map; import java.util.stream.Stream; import io.appulse.epmd.java.core.model.Tag; -import io.appulse.epmd.java.server.command.server.Request; +import io.appulse.epmd.java.core.model.request.Request; +import io.appulse.epmd.java.server.command.server.ServerState; -import lombok.NonNull; +import io.netty.channel.ChannelHandlerContext; +/** + * + * @author Artem Labazin + * @since 0.4.0 + */ public interface RequestHandler { Map ALL = Stream.of( @@ -35,9 +41,9 @@ public interface RequestHandler { new KillRequestHandler(), new RegistrationRequestHandler(), new StopRequestHandler() - ).collect(toMap(RequestHandler::getTag, it -> it)); + ).collect(toConcurrentMap(RequestHandler::getTag, it -> it)); - void handle (@NonNull Request request); + void handle (Request request, ChannelHandlerContext context, ServerState state); Tag getTag (); } diff --git a/server/src/main/java/io/appulse/epmd/java/server/command/server/handler/StopRequestHandler.java b/server/src/main/java/io/appulse/epmd/java/server/command/server/handler/command/StopRequestHandler.java similarity index 53% rename from server/src/main/java/io/appulse/epmd/java/server/command/server/handler/StopRequestHandler.java rename to server/src/main/java/io/appulse/epmd/java/server/command/server/handler/command/StopRequestHandler.java index 3e82108..31e46e0 100644 --- a/server/src/main/java/io/appulse/epmd/java/server/command/server/handler/StopRequestHandler.java +++ b/server/src/main/java/io/appulse/epmd/java/server/command/server/handler/command/StopRequestHandler.java @@ -14,38 +14,59 @@ * limitations under the License. */ -package io.appulse.epmd.java.server.command.server.handler; +package io.appulse.epmd.java.server.command.server.handler.command; import static io.appulse.epmd.java.core.model.Tag.STOP_REQUEST; import static io.appulse.epmd.java.core.model.response.StopResult.NOEXIST; import static io.appulse.epmd.java.core.model.response.StopResult.STOPPED; +import static io.netty.channel.ChannelFutureListener.CLOSE; import io.appulse.epmd.java.core.model.Tag; +import io.appulse.epmd.java.core.model.request.Request; import io.appulse.epmd.java.core.model.request.Stop; -import io.appulse.epmd.java.server.command.server.Request; +import io.appulse.epmd.java.server.command.server.ServerState; +import io.netty.channel.ChannelHandlerContext; import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; import lombok.val; +/** + * + * @author Artem Labazin + * @since 0.4.0 + */ +@Slf4j class StopRequestHandler implements RequestHandler { @Override - public void handle (@NonNull Request request) { - if (!request.getContext().getServerOptions().isChecks()) { - request.closeConnection(); + public void handle (@NonNull Request request, @NonNull ChannelHandlerContext context, @NonNull ServerState state) { + log.debug("Processing {}", request); + + if (!state.getServerOptions().isChecks()) { + log.warn("Option '-relaxed_command_check' is false, but someone trying to stop a node"); + context.close(); return; } - val stop = request.parse(Stop.class); - val result = request.getContext() - .getNodes() + if (!(request instanceof Stop)) { + val message = String.format("Invalid request object:%n%s", request); + log.error(message); + throw new IllegalArgumentException(message); + } + + val stop = (Stop) request; + val result = state.getNodes() .remove(stop.getName()); val response = result == null ? NOEXIST : STOPPED; - request.respondAndClose(response); + log.debug("Stopping '{}' result is: {}", stop.getName(), response); + + context.writeAndFlush(response) + .addListener(CLOSE); } @Override diff --git a/server/src/main/java/io/appulse/epmd/java/server/command/stop/StopCommandExecutor.java b/server/src/main/java/io/appulse/epmd/java/server/command/stop/StopCommandExecutor.java index cd0e5f1..2b82f62 100644 --- a/server/src/main/java/io/appulse/epmd/java/server/command/stop/StopCommandExecutor.java +++ b/server/src/main/java/io/appulse/epmd/java/server/command/stop/StopCommandExecutor.java @@ -45,6 +45,8 @@ public StopCommandExecutor (CommonOptions commonOptions, @NonNull CommandOptions .filter(it -> it instanceof StopCommandOptions) .map(it -> (StopCommandOptions) it) .orElse(new StopCommandOptions()); + + log.debug("Stop options: {}", options); } @Override diff --git a/server/src/test/java/io/appulse/epmd/java/server/command/server/RequestTestUtil.java b/server/src/test/java/io/appulse/epmd/java/server/command/server/RequestTestUtil.java deleted file mode 100644 index c8da174..0000000 --- a/server/src/test/java/io/appulse/epmd/java/server/command/server/RequestTestUtil.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Copyright 2018 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.appulse.epmd.java.server.command.server; - -import static org.mockito.Mockito.*; - -import java.io.ByteArrayOutputStream; -import java.net.Socket; -import java.util.concurrent.ConcurrentHashMap; - -import io.appulse.epmd.java.core.mapper.serializer.MessageSerializer; -import io.appulse.epmd.java.server.cli.CommonOptions; -import io.appulse.utils.Bytes; - -import lombok.SneakyThrows; - -public class RequestTestUtil { - - @SneakyThrows - public static Request createRequest (ByteArrayOutputStream output) { - return createRequest(Bytes.allocate(), new ServerCommandOptions(), output); - } - - @SneakyThrows - public static Request createRequest (ServerCommandOptions options, ByteArrayOutputStream output) { - return createRequest(Bytes.allocate(), options, output); - } - - @SneakyThrows - public static Request createRequest (Object request, ByteArrayOutputStream output) { - byte[] bytes = serialize(request); - return createRequest(Bytes.wrap(bytes), new ServerCommandOptions(), output); - } - - @SneakyThrows - public static Request createRequest (Object request, ServerCommandOptions options, ByteArrayOutputStream output) { - byte[] bytes = serialize(request); - - Context context = Context.builder() - .nodes(new ConcurrentHashMap<>()) - .commonOptions(new CommonOptions()) - .serverOptions(options) - .build(); - - Socket socket = mock(Socket.class); - when(socket.getOutputStream()).thenReturn(output); - - return Request.builder() - .context(context) - .socket(socket) - .payload(Bytes.wrap(bytes)) - .build(); - } - - @SneakyThrows - public static Request createRequest (Bytes payload, ServerCommandOptions options, ByteArrayOutputStream output) { - Context context = Context.builder() - .nodes(new ConcurrentHashMap<>()) - .commonOptions(new CommonOptions()) - .serverOptions(options) - .build(); - - Socket socket = mock(Socket.class); - when(socket.getOutputStream()).thenReturn(output); - - return Request.builder() - .context(context) - .socket(socket) - .payload(payload) - .build(); - } - - public static byte[] serialize (Object object) { - return new MessageSerializer().serialize(object); - } -} diff --git a/server/src/test/java/io/appulse/epmd/java/server/command/server/ServerCommandExecutorTest.java b/server/src/test/java/io/appulse/epmd/java/server/command/server/ServerCommandExecutorTest.java index 35ddb68..3a603c1 100644 --- a/server/src/test/java/io/appulse/epmd/java/server/command/server/ServerCommandExecutorTest.java +++ b/server/src/test/java/io/appulse/epmd/java/server/command/server/ServerCommandExecutorTest.java @@ -17,6 +17,7 @@ package io.appulse.epmd.java.server.command.server; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static io.appulse.epmd.java.core.model.NodeType.R3_ERLANG; import static io.appulse.epmd.java.core.model.Protocol.TCP; import static io.appulse.epmd.java.core.model.Version.R6; @@ -24,19 +25,32 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import io.appulse.epmd.java.client.EpmdClient; +import io.appulse.epmd.java.client.exception.EpmdRegistrationException; import io.appulse.epmd.java.server.cli.CommonOptions; +import io.appulse.epmd.java.server.command.server.util.TestNamePrinter; import io.appulse.utils.SocketUtils; import lombok.val; import org.assertj.core.api.SoftAssertions; import org.junit.After; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TestRule; +/** + * + * @author Artem Labazin + * @since 0.4.0 + */ public class ServerCommandExecutorTest { + @Rule + public TestRule watcher = new TestNamePrinter(); + EpmdClient client; ServerCommandExecutor server; @@ -44,7 +58,7 @@ public class ServerCommandExecutorTest { ExecutorService executorService; @Before - public void before () { + public void before () throws Exception { val port = SocketUtils.findFreePort() .orElseThrow(RuntimeException::new); @@ -55,6 +69,8 @@ public void before () { executorService = Executors.newSingleThreadExecutor(); executorService.execute(() -> server.execute()); + TimeUnit.SECONDS.sleep(2); + client = new EpmdClient(port); } @@ -101,6 +117,11 @@ public void register () { .isPresent() .hasValue(R6); }); + + assertThatThrownBy(() -> client.register("register", 8971, R3_ERLANG, TCP, R6, R6)) + .isInstanceOf(EpmdRegistrationException.class); + + client.stop("register"); } @Test diff --git a/server/src/test/java/io/appulse/epmd/java/server/command/server/handler/GetEpmdDumpRequestHandlerTest.java b/server/src/test/java/io/appulse/epmd/java/server/command/server/handler/GetEpmdDumpRequestHandlerTest.java deleted file mode 100644 index 4cfd2b4..0000000 --- a/server/src/test/java/io/appulse/epmd/java/server/command/server/handler/GetEpmdDumpRequestHandlerTest.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Copyright 2018 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.appulse.epmd.java.server.command.server.handler; - -import static io.appulse.epmd.java.core.model.Tag.DUMP_REQUEST; -import static org.assertj.core.api.Assertions.assertThat; -import static java.util.Collections.emptyList; -import static io.appulse.epmd.java.core.model.NodeType.R3_ERLANG; -import static io.appulse.epmd.java.core.model.Protocol.SCTP; -import static io.appulse.epmd.java.core.model.Version.R3; -import static io.appulse.epmd.java.core.model.Version.R4; -import static java.util.stream.Collectors.toList; -import static io.appulse.epmd.java.core.model.response.EpmdDump.NodeDump.Status.ACTIVE; - -import java.io.ByteArrayOutputStream; -import java.net.Socket; -import java.util.stream.IntStream; - -import io.appulse.epmd.java.core.model.response.EpmdDump; -import io.appulse.epmd.java.core.model.response.EpmdDump.NodeDump; -import io.appulse.epmd.java.server.command.server.Node; -import io.appulse.epmd.java.server.command.server.RequestTestUtil; - -import org.junit.Test; -import lombok.val; - -public class GetEpmdDumpRequestHandlerTest { - - RequestHandler handler = new GetEpmdDumpRequestHandler(); - - @Test - public void handleEmpty () { - val output = new ByteArrayOutputStream(); - val request = RequestTestUtil.createRequest(output); - - val dump = EpmdDump.builder() - .port(request.getContext().getPort()) - .nodes(emptyList()) - .build(); - - handler.handle(request); - assertThat(output.toByteArray()) - .isEqualTo(RequestTestUtil.serialize(dump)); - } - - @Test - public void handleWithNodes () { - val output = new ByteArrayOutputStream(); - val request = RequestTestUtil.createRequest(output); - - val nodes = IntStream.range(0, 3) - .boxed() - .map(it -> Node.builder() - .name("node-" + it) - .port(9090) - .type(R3_ERLANG) - .protocol(SCTP) - .high(R4) - .low(R3) - .creation(it) - .socket(new Socket()) - .build() - ) - .peek(it -> request.getContext().getNodes().put(it.getName(), it)) - .collect(toList()); - - val dump = EpmdDump.builder() - .port(request.getContext().getPort()) - .nodes(nodes.stream() - .map(it -> new NodeDump(ACTIVE, it.getName(), it.getPort(), -1)) - .collect(toList()) - ) - .build(); - - handler.handle(request); - assertThat(output.toByteArray()) - .isEqualTo(RequestTestUtil.serialize(dump)); - } - - @Test - public void getTag () { - assertThat(handler.getTag()) - .isEqualTo(DUMP_REQUEST); - } -} diff --git a/server/src/test/java/io/appulse/epmd/java/server/command/server/handler/GetEpmdInfoRequestHandlerTest.java b/server/src/test/java/io/appulse/epmd/java/server/command/server/handler/GetEpmdInfoRequestHandlerTest.java deleted file mode 100644 index d1af35b..0000000 --- a/server/src/test/java/io/appulse/epmd/java/server/command/server/handler/GetEpmdInfoRequestHandlerTest.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Copyright 2018 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.appulse.epmd.java.server.command.server.handler; - -import static io.appulse.epmd.java.core.model.Tag.NAMES_REQUEST; -import static org.assertj.core.api.Assertions.assertThat; -import static java.util.Collections.emptyList; -import static io.appulse.epmd.java.core.model.NodeType.R3_ERLANG; -import static io.appulse.epmd.java.core.model.Protocol.SCTP; -import static io.appulse.epmd.java.core.model.Version.R3; -import static io.appulse.epmd.java.core.model.Version.R4; -import static java.util.stream.Collectors.toList; - -import java.io.ByteArrayOutputStream; -import java.net.Socket; -import java.util.stream.IntStream; - -import io.appulse.epmd.java.core.model.response.EpmdInfo; -import io.appulse.epmd.java.core.model.response.EpmdInfo.NodeDescription; -import io.appulse.epmd.java.server.command.server.Node; -import io.appulse.epmd.java.server.command.server.RequestTestUtil; - -import org.junit.Test; -import lombok.val; - -public class GetEpmdInfoRequestHandlerTest { - - RequestHandler handler = new GetEpmdInfoRequestHandler(); - - @Test - public void handleEmpty () { - val output = new ByteArrayOutputStream(); - val request = RequestTestUtil.createRequest(output); - - val info = EpmdInfo.builder() - .port(request.getContext().getPort()) - .nodes(emptyList()) - .build(); - - handler.handle(request); - assertThat(output.toByteArray()) - .isEqualTo(RequestTestUtil.serialize(info)); - } - - @Test - public void handleWithNodes () { - val output = new ByteArrayOutputStream(); - val request = RequestTestUtil.createRequest(output); - - val nodes = IntStream.range(0, 3) - .boxed() - .map(it -> Node.builder() - .name("node-" + it) - .port(9090) - .type(R3_ERLANG) - .protocol(SCTP) - .high(R4) - .low(R3) - .creation(it) - .socket(new Socket()) - .build() - ) - .peek(it -> request.getContext().getNodes().put(it.getName(), it)) - .collect(toList()); - - val info = EpmdInfo.builder() - .port(request.getContext().getPort()) - .nodes(nodes.stream() - .map(it -> NodeDescription.builder() - .name(it.getName()) - .port(it.getPort()) - .build() - ) - .collect(toList()) - ) - .build(); - - handler.handle(request); - assertThat(output.toByteArray()) - .isEqualTo(RequestTestUtil.serialize(info)); - } - - @Test - public void getTag () { - assertThat(handler.getTag()) - .isEqualTo(NAMES_REQUEST); - } -} diff --git a/server/src/test/java/io/appulse/epmd/java/server/command/server/handler/GetNodeInfoRequestHandlerTest.java b/server/src/test/java/io/appulse/epmd/java/server/command/server/handler/GetNodeInfoRequestHandlerTest.java deleted file mode 100644 index c9e114c..0000000 --- a/server/src/test/java/io/appulse/epmd/java/server/command/server/handler/GetNodeInfoRequestHandlerTest.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Copyright 2018 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.appulse.epmd.java.server.command.server.handler; - -import static io.appulse.epmd.java.core.model.Tag.PORT_PLEASE2_REQUEST; -import static org.assertj.core.api.Assertions.assertThat; -import static io.appulse.epmd.java.core.model.NodeType.R3_ERLANG; -import static io.appulse.epmd.java.core.model.Protocol.SCTP; -import static io.appulse.epmd.java.core.model.Version.R3; -import static io.appulse.epmd.java.core.model.Version.R4; -import static java.util.stream.Collectors.toList; - -import java.io.ByteArrayOutputStream; -import java.net.Socket; -import java.util.stream.IntStream; - -import io.appulse.epmd.java.core.model.request.GetNodeInfo; -import io.appulse.epmd.java.core.model.response.NodeInfo; -import io.appulse.epmd.java.server.command.server.Node; -import io.appulse.epmd.java.server.command.server.RequestTestUtil; - -import org.junit.Test; -import lombok.val; - -public class GetNodeInfoRequestHandlerTest { - - RequestHandler handler = new GetNodeInfoRequestHandler(); - - @Test - public void handleEmpty () { - val requestObject = new GetNodeInfo("popa"); - - val output = new ByteArrayOutputStream(); - val request = RequestTestUtil.createRequest(requestObject, output); - - val info = NodeInfo.builder() - .ok(false) - .build(); - - handler.handle(request); - assertThat(output.toByteArray()) - .isEqualTo(RequestTestUtil.serialize(info)); - } - - @Test - public void handleWithNodes () { - val requestObject = new GetNodeInfo("node-2"); - - val output = new ByteArrayOutputStream(); - val request = RequestTestUtil.createRequest(requestObject, output); - - val nodes = IntStream.range(0, 3) - .boxed() - .map(it -> Node.builder() - .name("node-" + it) - .port(9090) - .type(R3_ERLANG) - .protocol(SCTP) - .high(R4) - .low(R3) - .creation(it) - .socket(new Socket()) - .build() - ) - .peek(it -> request.getContext().getNodes().put(it.getName(), it)) - .collect(toList()); - - val info = NodeInfo.builder() - .ok(true) - .port(nodes.get(2).getPort()) - .type(nodes.get(2).getType()) - .protocol(nodes.get(2).getProtocol()) - .high(nodes.get(2).getHigh()) - .low(nodes.get(2).getLow()) - .name(nodes.get(2).getName()) - .build(); - - handler.handle(request); - assertThat(output.toByteArray()) - .isEqualTo(RequestTestUtil.serialize(info)); - } - - @Test - public void getTag () { - assertThat(handler.getTag()) - .isEqualTo(PORT_PLEASE2_REQUEST); - } -} diff --git a/server/src/test/java/io/appulse/epmd/java/server/command/server/handler/KillRequestHandlerTest.java b/server/src/test/java/io/appulse/epmd/java/server/command/server/handler/KillRequestHandlerTest.java deleted file mode 100644 index e86f3e9..0000000 --- a/server/src/test/java/io/appulse/epmd/java/server/command/server/handler/KillRequestHandlerTest.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright 2018 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.appulse.epmd.java.server.command.server.handler; - -import static io.appulse.epmd.java.core.model.Tag.KILL_REQUEST; -import static org.assertj.core.api.Assertions.assertThat; -import static io.appulse.epmd.java.core.model.response.KillResult.OK; - -import java.io.ByteArrayOutputStream; - -import io.appulse.epmd.java.server.command.server.RequestTestUtil; -import io.appulse.epmd.java.server.command.server.ServerCommandOptions; - -import org.junit.Rule; -import org.junit.Test; -import org.junit.contrib.java.lang.system.ExpectedSystemExit; -import lombok.val; - -public class KillRequestHandlerTest { - - @Rule - public final ExpectedSystemExit exit = ExpectedSystemExit.none(); - - RequestHandler handler = new KillRequestHandler(); - - @Test - public void handleOk () { - exit.expectSystemExitWithStatus(1); - - val output = new ByteArrayOutputStream(); - val options = new ServerCommandOptions(); - options.setChecks(true); - val request = RequestTestUtil.createRequest(options, output); - - handler.handle(request); - - assertThat(output.toByteArray()) - .isEqualTo(RequestTestUtil.serialize(OK)); - } - - @Test - public void handleNok () { - - } - - @Test - public void getTag () { - assertThat(handler.getTag()) - .isEqualTo(KILL_REQUEST); - } -} diff --git a/server/src/test/java/io/appulse/epmd/java/server/command/server/handler/RegistrationRequestHandlerTest.java b/server/src/test/java/io/appulse/epmd/java/server/command/server/handler/RegistrationRequestHandlerTest.java deleted file mode 100644 index 1e0cb13..0000000 --- a/server/src/test/java/io/appulse/epmd/java/server/command/server/handler/RegistrationRequestHandlerTest.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Copyright 2018 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.appulse.epmd.java.server.command.server.handler; - -import static io.appulse.epmd.java.core.model.Tag.ALIVE2_REQUEST; -import static org.assertj.core.api.Assertions.assertThat; -import static io.appulse.epmd.java.core.model.NodeType.R3_ERLANG; -import static io.appulse.epmd.java.core.model.Protocol.SCTP; -import static io.appulse.epmd.java.core.model.Version.R3; -import static io.appulse.epmd.java.core.model.Version.R4; - -import java.io.ByteArrayOutputStream; -import java.net.Socket; -import java.util.stream.IntStream; - -import io.appulse.epmd.java.core.model.request.Registration; -import io.appulse.epmd.java.core.model.response.RegistrationResult; -import io.appulse.epmd.java.server.command.server.Node; -import io.appulse.epmd.java.server.command.server.RequestTestUtil; - -import org.junit.Test; -import lombok.val; - -public class RegistrationRequestHandlerTest { - - RequestHandler handler = new RegistrationRequestHandler(); - - @Test - public void handleEmpty () { - val registration = Registration.builder() - .name("popa") - .port(8934) - .type(R3_ERLANG) - .protocol(SCTP) - .high(R4) - .low(R3) - .build(); - - val output = new ByteArrayOutputStream(); - val request = RequestTestUtil.createRequest(registration, output); - - val result = RegistrationResult.builder() - .ok(true) - .creation(1) - .build(); - - handler.handle(request); - assertThat(output.toByteArray()) - .isEqualTo(RequestTestUtil.serialize(result)); - } - - @Test - public void handleWithNodes () { - val registration = Registration.builder() - .name("node-1") - .port(8934) - .type(R3_ERLANG) - .protocol(SCTP) - .high(R4) - .low(R3) - .build(); - - val output = new ByteArrayOutputStream(); - val request = RequestTestUtil.createRequest(registration, output); - - IntStream.range(0, 3) - .boxed() - .map(it -> Node.builder() - .name("node-" + it) - .port(9090) - .type(R3_ERLANG) - .protocol(SCTP) - .high(R4) - .low(R3) - .creation(it) - .socket(new Socket()) - .build() - ) - .forEach(it -> request.getContext().getNodes().put(it.getName(), it)); - - val result = RegistrationResult.builder() - .ok(false) - .creation(0) - .build(); - - handler.handle(request); - assertThat(output.toByteArray()) - .isEqualTo(RequestTestUtil.serialize(result)); - } - - @Test - public void getTag () { - assertThat(handler.getTag()) - .isEqualTo(ALIVE2_REQUEST); - } -} diff --git a/server/src/test/java/io/appulse/epmd/java/server/command/server/handler/StopRequestHandlerTest.java b/server/src/test/java/io/appulse/epmd/java/server/command/server/handler/StopRequestHandlerTest.java deleted file mode 100644 index 06232da..0000000 --- a/server/src/test/java/io/appulse/epmd/java/server/command/server/handler/StopRequestHandlerTest.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Copyright 2018 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.appulse.epmd.java.server.command.server.handler; - -import static io.appulse.epmd.java.core.model.Tag.STOP_REQUEST; -import static org.assertj.core.api.Assertions.assertThat; -import static io.appulse.epmd.java.core.model.response.StopResult.STOPPED; -import static io.appulse.epmd.java.core.model.response.StopResult.NOEXIST; -import static io.appulse.epmd.java.core.model.NodeType.R3_ERLANG; -import static io.appulse.epmd.java.core.model.Protocol.SCTP; -import static io.appulse.epmd.java.core.model.Version.R3; -import static io.appulse.epmd.java.core.model.Version.R4; - -import java.io.ByteArrayOutputStream; -import java.net.Socket; -import java.util.stream.IntStream; - -import io.appulse.epmd.java.core.model.request.Stop; -import io.appulse.epmd.java.server.command.server.Node; -import io.appulse.epmd.java.server.command.server.RequestTestUtil; -import io.appulse.epmd.java.server.command.server.ServerCommandOptions; - -import org.junit.Test; -import lombok.val; - -public class StopRequestHandlerTest { - - RequestHandler handler = new StopRequestHandler(); - - @Test - public void handleOk () { - Stop requestObject = new Stop("node-1"); - - val output = new ByteArrayOutputStream(); - val options = new ServerCommandOptions(); - options.setChecks(true); - val request = RequestTestUtil.createRequest(requestObject, options, output); - - IntStream.range(0, 3) - .boxed() - .map(it -> Node.builder() - .name("node-" + it) - .port(9090) - .type(R3_ERLANG) - .protocol(SCTP) - .high(R4) - .low(R3) - .creation(it) - .socket(new Socket()) - .build() - ) - .forEach(it -> request.getContext().getNodes().put(it.getName(), it)); - - handler.handle(request); - assertThat(output.toByteArray()) - .isEqualTo(RequestTestUtil.serialize(STOPPED)); - } - - @Test - public void handleNok () { - Stop requestObject = new Stop("popa"); - - val output = new ByteArrayOutputStream(); - val options = new ServerCommandOptions(); - options.setChecks(true); - val request = RequestTestUtil.createRequest(requestObject, options, output); - - handler.handle(request); - assertThat(output.toByteArray()) - .isEqualTo(RequestTestUtil.serialize(NOEXIST)); - } - - @Test - public void handleNotAllowed () { - val output = new ByteArrayOutputStream(); - val request = RequestTestUtil.createRequest(output); - - handler.handle(request); - - assertThat(output.toByteArray()) - .isEmpty(); - } - - @Test - public void getTag () { - assertThat(handler.getTag()) - .isEqualTo(STOP_REQUEST); - } -} diff --git a/server/src/test/java/io/appulse/epmd/java/server/command/server/util/TestNamePrinter.java b/server/src/test/java/io/appulse/epmd/java/server/command/server/util/TestNamePrinter.java new file mode 100644 index 0000000..543ca2e --- /dev/null +++ b/server/src/test/java/io/appulse/epmd/java/server/command/server/util/TestNamePrinter.java @@ -0,0 +1,33 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.appulse.epmd.java.server.command.server.util; + +import org.junit.rules.TestWatcher; +import org.junit.runner.Description; + +/** + * + * @author Artem Labazin + * @since 0.4.0 + */ +public class TestNamePrinter extends TestWatcher { + + @Override + protected void starting (Description description) { + System.out.println("\nRUNNING TEST: " + description.getMethodName() + '\n'); + } +}