diff --git a/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java b/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java index 719437b5..462d0ab2 100644 --- a/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java @@ -43,8 +43,6 @@ public final class FailureDetectorImpl implements FailureDetector { private final Transport transport; private final FailureDetectorConfig config; - private final TransportWrapper transportWrapper; - // State private final List pingMembers = new ArrayList<>(); @@ -84,8 +82,6 @@ public FailureDetectorImpl( this.config = Objects.requireNonNull(config); this.scheduler = Objects.requireNonNull(scheduler); - this.transportWrapper = new TransportWrapper(this.transport); - // Subscribe actionsDisposables.addAll( Arrays.asList( @@ -151,8 +147,7 @@ private void doPing() { LOGGER.debug("[{}][{}] Send Ping to {}", localMember, period, pingMember); List
addresses = pingMember.addresses(); - transportWrapper - .requestResponse(addresses, pingMsg) + TransportWrapper.requestResponse(transport, addresses, pingMsg) .timeout(Duration.ofMillis(config.pingTimeout()), scheduler) .publishOn(scheduler) .subscribe( @@ -194,8 +189,7 @@ private void doPingReq( Duration timeout = Duration.ofMillis(config.pingInterval() - config.pingTimeout()); pingReqMembers.forEach( member -> - transportWrapper - .requestResponse(member.addresses(), pingReqMsg) + TransportWrapper.requestResponse(transport, member.addresses(), pingReqMsg) .timeout(timeout, scheduler) .publishOn(scheduler) .subscribe( @@ -256,8 +250,7 @@ private void onPing(Message message) { Message.withData(data).qualifier(PING_ACK).correlationId(correlationId).build(); List
addresses = data.getFrom().addresses(); LOGGER.debug("[{}][{}] Send PingAck to {}", localMember, period, addresses); - transportWrapper - .send(addresses, ackMessage) + TransportWrapper.send(transport, addresses, ackMessage) .subscribe( null, ex -> @@ -282,8 +275,7 @@ private void onPingReq(Message message) { Message.withData(pingReqData).qualifier(PING).correlationId(correlationId).build(); List
addresses = target.addresses(); LOGGER.debug("[{}][{}] Send transit Ping to {}", localMember, period, addresses); - transportWrapper - .send(addresses, pingMessage) + TransportWrapper.send(transport, addresses, pingMessage) .subscribe( null, ex -> @@ -312,8 +304,7 @@ private void onTransitPingAck(Message message) { Message.withData(originalAckData).qualifier(PING_ACK).correlationId(correlationId).build(); List
addresses = target.addresses(); LOGGER.debug("[{}][{}] Resend transit PingAck to {}", localMember, period, addresses); - transportWrapper - .send(addresses, originalAckMessage) + TransportWrapper.send(transport, addresses, originalAckMessage) .subscribe( null, ex -> diff --git a/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java b/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java index af16480e..24de5ca3 100644 --- a/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java @@ -80,8 +80,6 @@ private enum MembershipUpdateReason { private final GossipProtocol gossipProtocol; private final MetadataStore metadataStore; - private final TransportWrapper transportWrapper; - // State private final Map membershipTable = new HashMap<>(); @@ -129,8 +127,6 @@ public MembershipProtocolImpl( this.membershipConfig = Objects.requireNonNull(config).membershipConfig(); this.failureDetectorConfig = Objects.requireNonNull(config).failureDetectorConfig(); - this.transportWrapper = new TransportWrapper(this.transport); - // Prepare seeds seedMembers = cleanUpSeedMembers(membershipConfig.seedMembers()); @@ -355,8 +351,7 @@ private void doSync() { Message message = prepareSyncDataMsg(SYNC, null); LOGGER.debug("[{}][doSync] Send Sync to {}", localMember, addresses); - transportWrapper - .send(addresses, message) + TransportWrapper.send(transport, addresses, message) .subscribe( null, ex -> @@ -413,8 +408,7 @@ private Mono onSync(Message syncMsg) { .doOnSuccess( avoid -> { Message message = prepareSyncDataMsg(SYNC_ACK, syncMsg.correlationId()); - transportWrapper - .send(sender, message) + TransportWrapper.send(transport, sender, message) .subscribe( null, ex -> @@ -443,8 +437,7 @@ private void onFailureDetectorEvent(FailureDetectorEvent fdEvent) { // alive with inc + 1 Message syncMsg = prepareSyncDataMsg(SYNC, null); List
addresses = fdEvent.member().addresses(); - transportWrapper - .send(addresses, syncMsg) + TransportWrapper.send(transport, addresses, syncMsg) .subscribe( null, ex -> @@ -532,11 +525,11 @@ private Mono syncMembership(SyncData syncData, boolean onStart) { updateMembership(r1, reason) .doOnError( ex -> - LOGGER.warn( + LOGGER.error( "[{}][syncMembership][{}][error] cause: {}", localMember, reason, - ex.toString())) + ex)) .onErrorResume(ex -> Mono.empty())) .toArray(Mono[]::new); diff --git a/cluster/src/main/java/io/scalecube/cluster/metadata/MetadataStoreImpl.java b/cluster/src/main/java/io/scalecube/cluster/metadata/MetadataStoreImpl.java index 1ace90eb..e9ebe785 100644 --- a/cluster/src/main/java/io/scalecube/cluster/metadata/MetadataStoreImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/metadata/MetadataStoreImpl.java @@ -39,8 +39,6 @@ public class MetadataStoreImpl implements MetadataStore { private final Transport transport; private final ClusterConfig config; - private final TransportWrapper transportWrapper; - // State private final Map membersMetadata = new HashMap<>(); @@ -73,8 +71,6 @@ public MetadataStoreImpl( this.config = Objects.requireNonNull(config); this.scheduler = Objects.requireNonNull(scheduler); this.localMetadata = localMetadata; // optional - - this.transportWrapper = new TransportWrapper(this.transport); } @Override @@ -164,12 +160,9 @@ public Mono fetchMetadata(Member member) { .data(new GetMetadataRequest(member)) .build(); - // TODO. Make transport abstraction around this logic - List
addresses = member.addresses(); - return transportWrapper - .requestResponse(addresses, request) + return TransportWrapper.requestResponse(transport, addresses, request) .timeout(Duration.ofMillis(config.metadataTimeout()), scheduler) .publishOn(scheduler) .doOnSuccess( @@ -230,8 +223,7 @@ private void onMetadataRequest(Message message) { .build(); LOGGER.debug("[{}] Send GetMetadataResp to {}", localMember, sender); - transportWrapper - .send(sender, response) + TransportWrapper.send(transport, sender, response) .subscribe( null, ex -> diff --git a/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/Message.java b/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/Message.java index e9a3c7a9..d4fb5569 100644 --- a/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/Message.java +++ b/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/Message.java @@ -201,7 +201,6 @@ public List
sender() { } return Arrays.stream(headerValue.split(",")) - .map(String::trim) // Removes leading and trailing spaces. .map(Address::from) .collect(Collectors.toList()); } diff --git a/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/TransportWrapper.java b/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/TransportWrapper.java index 2d025a8b..3bbe6d81 100644 --- a/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/TransportWrapper.java +++ b/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/TransportWrapper.java @@ -6,16 +6,6 @@ public class TransportWrapper { - private final Transport transport; - - public TransportWrapper(Transport transport) { - this.transport = transport; - } - - public Mono requestResponse(List
addresses, Message request) { - return requestResponse(transport, addresses, 0, request); - } - public static Mono requestResponse( Transport transport, List
addresses, Message request) { return requestResponse(transport, addresses, 0, request); @@ -24,7 +14,7 @@ public static Mono requestResponse( private static Mono requestResponse( Transport transport, List
addresses, int currentIndex, Message request) { if (currentIndex >= addresses.size()) { - return Mono.error(new RuntimeException("All addresses have been tried and failed.")); + return Mono.error(new RuntimeException("All addresses have been tried and failed")); } return transport @@ -32,10 +22,6 @@ private static Mono requestResponse( .onErrorResume(th -> requestResponse(transport, addresses, currentIndex + 1, request)); } - public Mono send(List
addresses, Message request) { - return send(transport, addresses, 0, request); - } - public static Mono send(Transport transport, List
addresses, Message request) { return send(transport, addresses, 0, request); }