From 5ce00b14c39dba93541687f1a611c2f38238ffb2 Mon Sep 17 00:00:00 2001
From: Artem Vysochyn <artem.vysochyn@gmail.com>
Date: Sun, 22 Sep 2024 16:52:47 +0300
Subject: [PATCH] WIP

---
 .../cluster/utils/NetworkEmulator.java        |  41 ++-
 .../utils/NetworkEmulatorTransport.java       |   7 +-
 .../cluster/utils/NetworkEmulatorTest.java    |  14 +-
 .../io/scalecube/cluster/ClusterImpl.java     |  10 +-
 .../fdetector/FailureDetectorImpl.java        |  12 +-
 .../cluster/gossip/GossipProtocolImpl.java    |   9 +-
 .../membership/MembershipProtocolImpl.java    |  19 +-
 .../cluster/metadata/MetadataStoreImpl.java   |   6 +-
 .../fdetector/FailureDetectorTest.java        | 294 ++++++++++++++----
 .../cluster/gossip/GossipDelayTest.java       |  21 +-
 .../cluster/gossip/GossipProtocolTest.java    |   8 +-
 .../membership/MembershipProtocolTest.java    | 158 +++++++---
 transport-parent/transport-api/pom.xml        |  11 +-
 .../cluster/transport/api/DistinctErrors.java | 139 +++++++++
 .../cluster/transport/api/Message.java        |  17 +-
 .../cluster/transport/api/Transport.java      |  64 +++-
 .../transport/api/TransportConfig.java        |   7 +-
 .../io/scalecube/transport/netty/Sender.java  |   3 +-
 .../transport/netty/TransportImpl.java        |  60 ++--
 .../transport/netty/tcp/TcpSender.java        |  12 +-
 .../netty/websocket/WebsocketSender.java      |  12 +-
 .../scalecube/transport/netty/BaseTest.java   |   3 +-
 .../netty/tcp/TcpTransportSendOrderTest.java  |   3 +-
 .../transport/netty/tcp/TcpTransportTest.java |   7 +-
 .../WebsocketTransportSendOrderTest.java      |   3 +-
 .../websocket/WebsocketTransportTest.java     |   7 +-
 26 files changed, 698 insertions(+), 249 deletions(-)
 create mode 100644 transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/DistinctErrors.java

diff --git a/cluster-testlib/src/main/java/io/scalecube/cluster/utils/NetworkEmulator.java b/cluster-testlib/src/main/java/io/scalecube/cluster/utils/NetworkEmulator.java
index d51a7526..552c10bc 100644
--- a/cluster-testlib/src/main/java/io/scalecube/cluster/utils/NetworkEmulator.java
+++ b/cluster-testlib/src/main/java/io/scalecube/cluster/utils/NetworkEmulator.java
@@ -1,7 +1,6 @@
 package io.scalecube.cluster.utils;
 
 import io.scalecube.cluster.transport.api.Message;
-import io.scalecube.net.Address;
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collection;
@@ -30,21 +29,21 @@ public final class NetworkEmulator {
   private volatile OutboundSettings defaultOutboundSettings = new OutboundSettings(0, 0);
   private volatile InboundSettings defaultInboundSettings = new InboundSettings(true);
 
-  private final Map<Address, OutboundSettings> outboundSettings = new ConcurrentHashMap<>();
-  private final Map<Address, InboundSettings> inboundSettings = new ConcurrentHashMap<>();
+  private final Map<String, OutboundSettings> outboundSettings = new ConcurrentHashMap<>();
+  private final Map<String, InboundSettings> inboundSettings = new ConcurrentHashMap<>();
 
   private final AtomicLong totalMessageSentCount = new AtomicLong();
   private final AtomicLong totalOutboundMessageLostCount = new AtomicLong();
   private final AtomicLong totalInboundMessageLostCount = new AtomicLong();
 
-  private final Address address;
+  private final String address;
 
   /**
    * Creates new instance of network emulator.
    *
    * @param address local address
    */
-  NetworkEmulator(Address address) {
+  NetworkEmulator(String address) {
     this.address = address;
   }
 
@@ -56,7 +55,7 @@ public final class NetworkEmulator {
    * @param destination address of target endpoint
    * @return network outbound settings
    */
-  public OutboundSettings outboundSettings(Address destination) {
+  public OutboundSettings outboundSettings(String destination) {
     return outboundSettings.getOrDefault(destination, defaultOutboundSettings);
   }
 
@@ -67,7 +66,7 @@ public OutboundSettings outboundSettings(Address destination) {
    * @param lossPercent loss in percents
    * @param meanDelay mean delay
    */
-  public void outboundSettings(Address destination, int lossPercent, int meanDelay) {
+  public void outboundSettings(String destination, int lossPercent, int meanDelay) {
     OutboundSettings settings = new OutboundSettings(lossPercent, meanDelay);
     outboundSettings.put(destination, settings);
     LOGGER.debug("[{}] Set outbound settings {} to {}", address, settings, destination);
@@ -103,7 +102,7 @@ public void unblockAllOutbound() {
    *
    * @param destinations collection of target endpoints where to apply
    */
-  public void blockOutbound(Address... destinations) {
+  public void blockOutbound(String... destinations) {
     blockOutbound(Arrays.asList(destinations));
   }
 
@@ -112,8 +111,8 @@ public void blockOutbound(Address... destinations) {
    *
    * @param destinations collection of target endpoints where to apply
    */
-  public void blockOutbound(Collection<Address> destinations) {
-    for (Address destination : destinations) {
+  public void blockOutbound(Collection<String> destinations) {
+    for (String destination : destinations) {
       outboundSettings.put(destination, new OutboundSettings(100, 0));
     }
     LOGGER.debug("[{}] Blocked outbound to {}", address, destinations);
@@ -124,7 +123,7 @@ public void blockOutbound(Collection<Address> destinations) {
    *
    * @param destinations collection of target endpoints where to apply
    */
-  public void unblockOutbound(Address... destinations) {
+  public void unblockOutbound(String... destinations) {
     unblockOutbound(Arrays.asList(destinations));
   }
 
@@ -133,7 +132,7 @@ public void unblockOutbound(Address... destinations) {
    *
    * @param destinations collection of target endpoints where to apply
    */
-  public void unblockOutbound(Collection<Address> destinations) {
+  public void unblockOutbound(Collection<String> destinations) {
     destinations.forEach(outboundSettings::remove);
     LOGGER.debug("[{}] Unblocked outbound {}", address, destinations);
   }
@@ -164,7 +163,7 @@ public long totalOutboundMessageLostCount() {
    * @param address target address
    * @return mono message
    */
-  public Mono<Message> tryFailOutbound(Message msg, Address address) {
+  public Mono<Message> tryFailOutbound(Message msg, String address) {
     return Mono.defer(
         () -> {
           totalMessageSentCount.incrementAndGet();
@@ -187,7 +186,7 @@ public Mono<Message> tryFailOutbound(Message msg, Address address) {
    * @param address target address
    * @return mono message
    */
-  public Mono<Message> tryDelayOutbound(Message msg, Address address) {
+  public Mono<Message> tryDelayOutbound(Message msg, String address) {
     return Mono.defer(
         () -> {
           totalMessageSentCount.incrementAndGet();
@@ -209,7 +208,7 @@ public Mono<Message> tryDelayOutbound(Message msg, Address address) {
    * @param destination address of target endpoint
    * @return network inbound settings
    */
-  public InboundSettings inboundSettings(Address destination) {
+  public InboundSettings inboundSettings(String destination) {
     return inboundSettings.getOrDefault(destination, defaultInboundSettings);
   }
 
@@ -218,7 +217,7 @@ public InboundSettings inboundSettings(Address destination) {
    *
    * @param shallPass shallPass inbound flag
    */
-  public void inboundSettings(Address destination, boolean shallPass) {
+  public void inboundSettings(String destination, boolean shallPass) {
     InboundSettings settings = new InboundSettings(shallPass);
     inboundSettings.put(destination, settings);
     LOGGER.debug("[{}] Set inbound settings {} to {}", address, settings, destination);
@@ -253,7 +252,7 @@ public void unblockAllInbound() {
    *
    * @param destinations collection of target endpoints where to apply
    */
-  public void blockInbound(Address... destinations) {
+  public void blockInbound(String... destinations) {
     blockInbound(Arrays.asList(destinations));
   }
 
@@ -262,8 +261,8 @@ public void blockInbound(Address... destinations) {
    *
    * @param destinations collection of target endpoints where to apply
    */
-  public void blockInbound(Collection<Address> destinations) {
-    for (Address destination : destinations) {
+  public void blockInbound(Collection<String> destinations) {
+    for (String destination : destinations) {
       inboundSettings.put(destination, new InboundSettings(false));
     }
     LOGGER.debug("[{}] Blocked inbound from {}", address, destinations);
@@ -274,7 +273,7 @@ public void blockInbound(Collection<Address> destinations) {
    *
    * @param destinations collection of target endpoints where to apply
    */
-  public void unblockInbound(Address... destinations) {
+  public void unblockInbound(String... destinations) {
     unblockInbound(Arrays.asList(destinations));
   }
 
@@ -283,7 +282,7 @@ public void unblockInbound(Address... destinations) {
    *
    * @param destinations collection of target endpoints where to apply
    */
-  public void unblockInbound(Collection<Address> destinations) {
+  public void unblockInbound(Collection<String> destinations) {
     destinations.forEach(inboundSettings::remove);
     LOGGER.debug("[{}] Unblocked inbound from {}", address, destinations);
   }
diff --git a/cluster-testlib/src/main/java/io/scalecube/cluster/utils/NetworkEmulatorTransport.java b/cluster-testlib/src/main/java/io/scalecube/cluster/utils/NetworkEmulatorTransport.java
index 381042c5..dcfbf677 100644
--- a/cluster-testlib/src/main/java/io/scalecube/cluster/utils/NetworkEmulatorTransport.java
+++ b/cluster-testlib/src/main/java/io/scalecube/cluster/utils/NetworkEmulatorTransport.java
@@ -2,7 +2,6 @@
 
 import io.scalecube.cluster.transport.api.Message;
 import io.scalecube.cluster.transport.api.Transport;
-import io.scalecube.net.Address;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
@@ -26,7 +25,7 @@ public NetworkEmulator networkEmulator() {
   }
 
   @Override
-  public Address address() {
+  public String address() {
     return transport.address();
   }
 
@@ -46,7 +45,7 @@ public boolean isStopped() {
   }
 
   @Override
-  public Mono<Void> send(Address address, Message message) {
+  public Mono<Void> send(String address, Message message) {
     return Mono.defer(
         () ->
             Mono.just(enhanceWithSender(message))
@@ -56,7 +55,7 @@ public Mono<Void> send(Address address, Message message) {
   }
 
   @Override
-  public Mono<Message> requestResponse(Address address, Message request) {
+  public Mono<Message> requestResponse(String address, Message request) {
     return Mono.defer(
         () ->
             Mono.just(enhanceWithSender(request))
diff --git a/cluster-testlib/src/test/java/io/scalecube/cluster/utils/NetworkEmulatorTest.java b/cluster-testlib/src/test/java/io/scalecube/cluster/utils/NetworkEmulatorTest.java
index 1919bc6f..b65137db 100644
--- a/cluster-testlib/src/test/java/io/scalecube/cluster/utils/NetworkEmulatorTest.java
+++ b/cluster-testlib/src/test/java/io/scalecube/cluster/utils/NetworkEmulatorTest.java
@@ -1,7 +1,6 @@
 package io.scalecube.cluster.utils;
 
 import io.scalecube.cluster.utils.NetworkEmulator.OutboundSettings;
-import io.scalecube.net.Address;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
@@ -10,24 +9,23 @@ public class NetworkEmulatorTest extends BaseTest {
   @Test
   public void testResolveLinkSettingsBySocketAddress() {
     // Init network emulator
-    Address address = Address.from("localhost:1234");
-    NetworkEmulator networkEmulator = new NetworkEmulator(address);
-    networkEmulator.outboundSettings(Address.create("localhost", 5678), 25, 10);
-    networkEmulator.outboundSettings(Address.create("192.168.0.1", 8765), 10, 20);
+    NetworkEmulator networkEmulator = new NetworkEmulator("localhost:1234");
+    networkEmulator.outboundSettings("localhost:" + 5678, 25, 10);
+    networkEmulator.outboundSettings("192.168.0.1:" + 8765, 10, 20);
     networkEmulator.setDefaultOutboundSettings(0, 2);
 
     // Check resolve by hostname:port
-    OutboundSettings link1 = networkEmulator.outboundSettings(Address.create("localhost", 5678));
+    OutboundSettings link1 = networkEmulator.outboundSettings("localhost:" + 5678);
     Assertions.assertEquals(25, link1.lossPercent());
     Assertions.assertEquals(10, link1.meanDelay());
 
     // Check resolve by ipaddr:port
-    OutboundSettings link2 = networkEmulator.outboundSettings(Address.create("192.168.0.1", 8765));
+    OutboundSettings link2 = networkEmulator.outboundSettings("192.168.0.1:" + 8765);
     Assertions.assertEquals(10, link2.lossPercent());
     Assertions.assertEquals(20, link2.meanDelay());
 
     // Check default link settings
-    OutboundSettings link3 = networkEmulator.outboundSettings(Address.create("localhost", 8765));
+    OutboundSettings link3 = networkEmulator.outboundSettings("localhost:" + 8765);
     Assertions.assertEquals(0, link3.lossPercent());
     Assertions.assertEquals(2, link3.meanDelay());
   }
diff --git a/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java b/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java
index 77942be8..ff8a67fb 100644
--- a/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java
+++ b/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java
@@ -241,7 +241,7 @@ private Mono<Cluster> doStart0() {
     return Transport.bind(config.transportConfig())
         .flatMap(
             boundTransport -> {
-              localMember = createLocalMember(boundTransport.address());
+              localMember = createLocalMember(Address.from(boundTransport.address()));
               transport = new SenderAwareTransport(boundTransport, localMember.address());
 
               scheduler = Schedulers.newSingle("sc-cluster-" + localMember.address().port(), true);
@@ -506,7 +506,7 @@ private SenderAwareTransport(Transport transport, Address address) {
     }
 
     @Override
-    public Address address() {
+    public String address() {
       return transport.address();
     }
 
@@ -526,12 +526,12 @@ public boolean isStopped() {
     }
 
     @Override
-    public Mono<Void> send(Address address, Message message) {
+    public Mono<Void> send(String address, Message message) {
       return Mono.defer(() -> transport.send(address, enhanceWithSender(message)));
     }
 
     @Override
-    public Mono<Message> requestResponse(Address address, Message request) {
+    public Mono<Message> requestResponse(String address, Message request) {
       return Mono.defer(() -> transport.requestResponse(address, enhanceWithSender(request)));
     }
 
@@ -541,7 +541,7 @@ public Flux<Message> listen() {
     }
 
     private Message enhanceWithSender(Message message) {
-      return Message.with(message).sender(address).build();
+      return Message.with(message).sender(address.toString()).build();
     }
   }
 }
diff --git a/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java b/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java
index c7540bc0..40860a89 100644
--- a/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java
+++ b/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java
@@ -147,7 +147,7 @@ private void doPing() {
     LOGGER.debug("[{}][{}] Send Ping to {}", localMember, period, pingMember);
     Address address = pingMember.address();
     transport
-        .requestResponse(address, pingMsg)
+        .requestResponse(address.toString(), pingMsg)
         .timeout(Duration.ofMillis(config.pingTimeout()), scheduler)
         .publishOn(scheduler)
         .subscribe(
@@ -190,7 +190,7 @@ private void doPingReq(
     pingReqMembers.forEach(
         member ->
             transport
-                .requestResponse(member.address(), pingReqMsg)
+                .requestResponse(member.address().toString(), pingReqMsg)
                 .timeout(timeout, scheduler)
                 .publishOn(scheduler)
                 .subscribe(
@@ -232,7 +232,7 @@ private void onMessage(Message message) {
   /** Listens to PING message and answers with ACK. */
   private void onPing(Message message) {
     long period = this.currentPeriod;
-    Address sender = message.sender();
+    Address sender = Address.from(message.sender());
     LOGGER.debug("[{}][{}] Received Ping from {}", localMember, period, sender);
     PingData data = message.data();
     data = data.withAckType(AckType.DEST_OK);
@@ -252,7 +252,7 @@ private void onPing(Message message) {
     Address address = data.getFrom().address();
     LOGGER.debug("[{}][{}] Send PingAck to {}", localMember, period, address);
     transport
-        .send(address, ackMessage)
+        .send(address.toString(), ackMessage)
         .subscribe(
             null,
             ex ->
@@ -278,7 +278,7 @@ private void onPingReq(Message message) {
     Address address = target.address();
     LOGGER.debug("[{}][{}] Send transit Ping to {}", localMember, period, address);
     transport
-        .send(address, pingMessage)
+        .send(address.toString(), pingMessage)
         .subscribe(
             null,
             ex ->
@@ -308,7 +308,7 @@ private void onTransitPingAck(Message message) {
     Address address = target.address();
     LOGGER.debug("[{}][{}] Resend transit PingAck to {}", localMember, period, address);
     transport
-        .send(address, originalAckMessage)
+        .send(address.toString(), originalAckMessage)
         .subscribe(
             null,
             ex ->
diff --git a/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java b/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java
index 1acfd060..6e76bd5c 100644
--- a/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java
+++ b/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java
@@ -1,6 +1,6 @@
 package io.scalecube.cluster.gossip;
 
-import static io.scalecube.reactor.RetryNonSerializedEmitFailureHandler.RETRY_NON_SERIALIZED;
+import static reactor.core.publisher.Sinks.EmitFailureHandler.busyLooping;
 
 import io.scalecube.cluster.ClusterMath;
 import io.scalecube.cluster.Member;
@@ -8,6 +8,7 @@
 import io.scalecube.cluster.transport.api.Message;
 import io.scalecube.cluster.transport.api.Transport;
 import io.scalecube.net.Address;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -120,7 +121,7 @@ public void stop() {
     actionsDisposables.dispose();
 
     // Stop publishing events
-    sink.emitComplete(RETRY_NON_SERIALIZED);
+    sink.emitComplete(busyLooping(Duration.ofSeconds(3)));
   }
 
   @Override
@@ -208,7 +209,7 @@ private void onGossipRequest(Message message) {
         if (gossipState == null) { // new gossip
           gossipState = new GossipState(gossip, period);
           gossips.put(gossip.gossipId(), gossipState);
-          sink.emitNext(gossip.message(), RETRY_NON_SERIALIZED);
+          sink.emitNext(gossip.message(), busyLooping(Duration.ofSeconds(3)));
         }
       }
       if (gossipState != null) {
@@ -294,7 +295,7 @@ private void spreadGossipsTo(long period, Member member) {
         .forEach(
             message ->
                 transport
-                    .send(address, message)
+                    .send(address.toString(), message)
                     .subscribe(
                         null,
                         ex ->
diff --git a/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java b/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java
index fad3bca3..ad3240d5 100644
--- a/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java
+++ b/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java
@@ -3,7 +3,7 @@
 import static io.scalecube.cluster.membership.MemberStatus.ALIVE;
 import static io.scalecube.cluster.membership.MemberStatus.DEAD;
 import static io.scalecube.cluster.membership.MemberStatus.LEAVING;
-import static io.scalecube.reactor.RetryNonSerializedEmitFailureHandler.RETRY_NON_SERIALIZED;
+import static reactor.core.publisher.Sinks.EmitFailureHandler.busyLooping;
 
 import io.scalecube.cluster.ClusterConfig;
 import io.scalecube.cluster.ClusterMath;
@@ -167,7 +167,7 @@ private List<Address> cleanUpSeedMembers(Collection<Address> seedMembers) {
     String hostName = localIpAddress.getHostName();
 
     Address memberAddr = localMember.address();
-    Address transportAddr = transport.address();
+    Address transportAddr = Address.from(transport.address());
     Address memberAddrByHostAddress = Address.create(hostAddress, memberAddr.port());
     Address transportAddrByHostAddress = Address.create(hostAddress, transportAddr.port());
     Address memberAddByHostName = Address.create(hostName, memberAddr.port());
@@ -256,7 +256,8 @@ private void start0(MonoSink<Object> sink) {
                 address ->
                     transport
                         .requestResponse(
-                            address, prepareSyncDataMsg(SYNC, UUID.randomUUID().toString()))
+                            address.toString(),
+                            prepareSyncDataMsg(SYNC, UUID.randomUUID().toString()))
                         .doOnError(
                             ex ->
                                 LOGGER.warn(
@@ -300,7 +301,7 @@ public void stop() {
     suspicionTimeoutTasks.clear();
 
     // Stop publishing events
-    sink.emitComplete(RETRY_NON_SERIALIZED);
+    sink.emitComplete(busyLooping(Duration.ofSeconds(3)));
   }
 
   @Override
@@ -339,7 +340,7 @@ private void doSync() {
     Message message = prepareSyncDataMsg(SYNC, null);
     LOGGER.debug("[{}][doSync] Send Sync to {}", localMember, address);
     transport
-        .send(address, message)
+        .send(address.toString(), message)
         .subscribe(
             null,
             ex ->
@@ -390,14 +391,14 @@ private Mono<Void> onSyncAck(Message syncAckMsg, boolean onStart) {
   private Mono<Void> onSync(Message syncMsg) {
     return Mono.defer(
         () -> {
-          final Address sender = syncMsg.sender();
+          final Address sender = Address.from(syncMsg.sender());
           LOGGER.debug("[{}] Received Sync from {}", localMember, sender);
           return syncMembership(syncMsg.data(), false)
               .doOnSuccess(
                   avoid -> {
                     Message message = prepareSyncDataMsg(SYNC_ACK, syncMsg.correlationId());
                     transport
-                        .send(sender, message)
+                        .send(sender.toString(), message)
                         .subscribe(
                             null,
                             ex ->
@@ -427,7 +428,7 @@ private void onFailureDetectorEvent(FailureDetectorEvent fdEvent) {
       Message syncMsg = prepareSyncDataMsg(SYNC, null);
       Address address = fdEvent.member().address();
       transport
-          .send(address, syncMsg)
+          .send(address.toString(), syncMsg)
           .subscribe(
               null,
               ex ->
@@ -726,7 +727,7 @@ private Mono<Void> onLeavingDetected(MembershipRecord r0, MembershipRecord r1) {
 
   private void publishEvent(MembershipEvent event) {
     LOGGER.info("[{}][publishEvent] {}", localMember, event);
-    sink.emitNext(event, RETRY_NON_SERIALIZED);
+    sink.emitNext(event, busyLooping(Duration.ofSeconds(3)));
   }
 
   private Mono<Void> onDeadMemberDetected(MembershipRecord r1) {
diff --git a/cluster/src/main/java/io/scalecube/cluster/metadata/MetadataStoreImpl.java b/cluster/src/main/java/io/scalecube/cluster/metadata/MetadataStoreImpl.java
index 35ba5328..e7d0ad9a 100644
--- a/cluster/src/main/java/io/scalecube/cluster/metadata/MetadataStoreImpl.java
+++ b/cluster/src/main/java/io/scalecube/cluster/metadata/MetadataStoreImpl.java
@@ -160,7 +160,7 @@ public Mono<ByteBuffer> fetchMetadata(Member member) {
                   .build();
 
           return transport
-              .requestResponse(targetAddress, request)
+              .requestResponse(targetAddress.toString(), request)
               .timeout(Duration.ofMillis(config.metadataTimeout()), scheduler)
               .publishOn(scheduler)
               .doOnSuccess(
@@ -196,7 +196,7 @@ private void onMessage(Message message) {
   }
 
   private void onMetadataRequest(Message message) {
-    final Address sender = message.sender();
+    final Address sender = Address.from(message.sender());
     LOGGER.debug("[{}] Received GetMetadataReq from {}", localMember, sender);
 
     GetMetadataRequest reqData = message.data();
@@ -225,7 +225,7 @@ private void onMetadataRequest(Message message) {
 
     LOGGER.debug("[{}] Send GetMetadataResp to {}", localMember, sender);
     transport
-        .send(sender, response)
+        .send(sender.toString(), response)
         .subscribe(
             null,
             ex ->
diff --git a/cluster/src/test/java/io/scalecube/cluster/fdetector/FailureDetectorTest.java b/cluster/src/test/java/io/scalecube/cluster/fdetector/FailureDetectorTest.java
index 3c2241d8..0ca750b1 100644
--- a/cluster/src/test/java/io/scalecube/cluster/fdetector/FailureDetectorTest.java
+++ b/cluster/src/test/java/io/scalecube/cluster/fdetector/FailureDetectorTest.java
@@ -53,7 +53,9 @@ public void testTrusted() {
     Transport a = createTransport();
     Transport b = createTransport();
     Transport c = createTransport();
-    List<Address> members = Arrays.asList(a.address(), b.address(), c.address());
+    List<Address> members =
+        Arrays.asList(
+            Address.from(a.address()), Address.from(b.address()), Address.from(c.address()));
 
     // Create failure detectors
     FailureDetectorImpl fdA = createFd(a, members);
@@ -68,9 +70,24 @@ public void testTrusted() {
       Future<List<FailureDetectorEvent>> listB = listenNextEventFor(fdB, members);
       Future<List<FailureDetectorEvent>> listC = listenNextEventFor(fdC, members);
 
-      assertStatus(a.address(), ALIVE, awaitEvents(listA), b.address(), c.address());
-      assertStatus(b.address(), ALIVE, awaitEvents(listB), a.address(), c.address());
-      assertStatus(c.address(), ALIVE, awaitEvents(listC), a.address(), b.address());
+      assertStatus(
+          Address.from(a.address()),
+          ALIVE,
+          awaitEvents(listA),
+          Address.from(b.address()),
+          Address.from(c.address()));
+      assertStatus(
+          Address.from(b.address()),
+          ALIVE,
+          awaitEvents(listB),
+          Address.from(a.address()),
+          Address.from(c.address()));
+      assertStatus(
+          Address.from(c.address()),
+          ALIVE,
+          awaitEvents(listC),
+          Address.from(a.address()),
+          Address.from(b.address()));
     } finally {
       stop(fdetectors);
     }
@@ -82,7 +99,9 @@ public void testSuspected() {
     NetworkEmulatorTransport a = createTransport();
     NetworkEmulatorTransport b = createTransport();
     NetworkEmulatorTransport c = createTransport();
-    List<Address> members = Arrays.asList(a.address(), b.address(), c.address());
+    List<Address> members =
+        Arrays.asList(
+            Address.from(a.address()), Address.from(b.address()), Address.from(c.address()));
 
     // Create failure detectors
     FailureDetectorImpl fdA = createFd(a, members);
@@ -91,9 +110,12 @@ public void testSuspected() {
     List<FailureDetectorImpl> fdetectors = Arrays.asList(fdA, fdB, fdC);
 
     // block all traffic
-    a.networkEmulator().blockOutbound(members);
-    b.networkEmulator().blockOutbound(members);
-    c.networkEmulator().blockOutbound(members);
+    a.networkEmulator()
+        .blockOutbound(members.stream().map(Address::toString).collect(Collectors.toList()));
+    b.networkEmulator()
+        .blockOutbound(members.stream().map(Address::toString).collect(Collectors.toList()));
+    c.networkEmulator()
+        .blockOutbound(members.stream().map(Address::toString).collect(Collectors.toList()));
 
     try {
       start(fdetectors);
@@ -102,9 +124,24 @@ public void testSuspected() {
       Future<List<FailureDetectorEvent>> listB = listenNextEventFor(fdB, members);
       Future<List<FailureDetectorEvent>> listC = listenNextEventFor(fdC, members);
 
-      assertStatus(a.address(), SUSPECT, awaitEvents(listA), b.address(), c.address());
-      assertStatus(b.address(), SUSPECT, awaitEvents(listB), a.address(), c.address());
-      assertStatus(c.address(), SUSPECT, awaitEvents(listC), a.address(), b.address());
+      assertStatus(
+          Address.from(a.address()),
+          SUSPECT,
+          awaitEvents(listA),
+          Address.from(b.address()),
+          Address.from(c.address()));
+      assertStatus(
+          Address.from(b.address()),
+          SUSPECT,
+          awaitEvents(listB),
+          Address.from(a.address()),
+          Address.from(c.address()));
+      assertStatus(
+          Address.from(c.address()),
+          SUSPECT,
+          awaitEvents(listC),
+          Address.from(a.address()),
+          Address.from(b.address()));
     } finally {
       a.networkEmulator().unblockAllOutbound();
       b.networkEmulator().unblockAllOutbound();
@@ -119,7 +156,9 @@ public void testTrustedDespiteBadNetwork() {
     NetworkEmulatorTransport a = createTransport();
     NetworkEmulatorTransport b = createTransport();
     NetworkEmulatorTransport c = createTransport();
-    List<Address> members = Arrays.asList(a.address(), b.address(), c.address());
+    List<Address> members =
+        Arrays.asList(
+            Address.from(a.address()), Address.from(b.address()), Address.from(c.address()));
 
     // Create failure detectors
     FailureDetectorImpl fdA = createFd(a, members);
@@ -137,9 +176,24 @@ public void testTrustedDespiteBadNetwork() {
     try {
       start(fdetectors);
 
-      assertStatus(a.address(), ALIVE, awaitEvents(listA), b.address(), c.address());
-      assertStatus(b.address(), ALIVE, awaitEvents(listB), a.address(), c.address());
-      assertStatus(c.address(), ALIVE, awaitEvents(listC), a.address(), b.address());
+      assertStatus(
+          Address.from(a.address()),
+          ALIVE,
+          awaitEvents(listA),
+          Address.from(b.address()),
+          Address.from(c.address()));
+      assertStatus(
+          Address.from(b.address()),
+          ALIVE,
+          awaitEvents(listB),
+          Address.from(a.address()),
+          Address.from(c.address()));
+      assertStatus(
+          Address.from(c.address()),
+          ALIVE,
+          awaitEvents(listC),
+          Address.from(a.address()),
+          Address.from(b.address()));
     } finally {
       stop(fdetectors);
     }
@@ -151,7 +205,9 @@ public void testTrustedDespiteDifferentPingTimings() {
     Transport a = createTransport();
     Transport b = createTransport();
     Transport c = createTransport();
-    List<Address> members = Arrays.asList(a.address(), b.address(), c.address());
+    List<Address> members =
+        Arrays.asList(
+            Address.from(a.address()), Address.from(b.address()), Address.from(c.address()));
 
     // Create failure detectors
     FailureDetectorImpl fdA = createFd(a, members);
@@ -168,9 +224,24 @@ public void testTrustedDespiteDifferentPingTimings() {
       Future<List<FailureDetectorEvent>> listB = listenNextEventFor(fdB, members);
       Future<List<FailureDetectorEvent>> listC = listenNextEventFor(fdC, members);
 
-      assertStatus(a.address(), ALIVE, awaitEvents(listA), b.address(), c.address());
-      assertStatus(b.address(), ALIVE, awaitEvents(listB), a.address(), c.address());
-      assertStatus(c.address(), ALIVE, awaitEvents(listC), a.address(), b.address());
+      assertStatus(
+          Address.from(a.address()),
+          ALIVE,
+          awaitEvents(listA),
+          Address.from(b.address()),
+          Address.from(c.address()));
+      assertStatus(
+          Address.from(b.address()),
+          ALIVE,
+          awaitEvents(listB),
+          Address.from(a.address()),
+          Address.from(c.address()));
+      assertStatus(
+          Address.from(c.address()),
+          ALIVE,
+          awaitEvents(listC),
+          Address.from(a.address()),
+          Address.from(b.address()));
     } finally {
       stop(fdetectors);
     }
@@ -183,7 +254,12 @@ public void testSuspectedMemberWithBadNetworkGetsPartitioned() throws Exception
     NetworkEmulatorTransport b = createTransport();
     NetworkEmulatorTransport c = createTransport();
     NetworkEmulatorTransport d = createTransport();
-    List<Address> members = Arrays.asList(a.address(), b.address(), c.address(), d.address());
+    List<Address> members =
+        Arrays.asList(
+            Address.from(a.address()),
+            Address.from(b.address()),
+            Address.from(c.address()),
+            Address.from(d.address()));
 
     // Create failure detectors
     FailureDetectorImpl fdA = createFd(a, members);
@@ -193,7 +269,8 @@ public void testSuspectedMemberWithBadNetworkGetsPartitioned() throws Exception
     List<FailureDetectorImpl> fdetectors = Arrays.asList(fdA, fdB, fdC, fdD);
 
     // Block traffic on member A to all cluster members
-    a.networkEmulator().blockOutbound(members);
+    a.networkEmulator()
+        .blockOutbound(members.stream().map(Address::toString).collect(Collectors.toList()));
 
     try {
       final Future<List<FailureDetectorEvent>> listA = listenNextEventFor(fdA, members);
@@ -204,16 +281,19 @@ public void testSuspectedMemberWithBadNetworkGetsPartitioned() throws Exception
       start(fdetectors);
 
       assertStatus(
-          a.address(),
+          Address.from(a.address()),
           SUSPECT,
           awaitEvents(listA),
-          b.address(),
-          c.address(),
-          d.address()); // node A
+          Address.from(b.address()),
+          Address.from(c.address()),
+          Address.from(d.address())); // node A
       // partitioned
-      assertStatus(b.address(), SUSPECT, awaitEvents(listB), a.address());
-      assertStatus(c.address(), SUSPECT, awaitEvents(listC), a.address());
-      assertStatus(d.address(), SUSPECT, awaitEvents(listD), a.address());
+      assertStatus(
+          Address.from(b.address()), SUSPECT, awaitEvents(listB), Address.from(a.address()));
+      assertStatus(
+          Address.from(c.address()), SUSPECT, awaitEvents(listC), Address.from(a.address()));
+      assertStatus(
+          Address.from(d.address()), SUSPECT, awaitEvents(listD), Address.from(a.address()));
 
       // Unblock traffic on member A
       a.networkEmulator().unblockAllOutbound();
@@ -226,10 +306,34 @@ public void testSuspectedMemberWithBadNetworkGetsPartitioned() throws Exception
 
       // Check member A recovers
 
-      assertStatus(a.address(), ALIVE, awaitEvents(listA0), b.address(), c.address(), d.address());
-      assertStatus(b.address(), ALIVE, awaitEvents(listB0), a.address(), c.address(), d.address());
-      assertStatus(c.address(), ALIVE, awaitEvents(listC0), a.address(), b.address(), d.address());
-      assertStatus(d.address(), ALIVE, awaitEvents(listD0), a.address(), b.address(), c.address());
+      assertStatus(
+          Address.from(a.address()),
+          ALIVE,
+          awaitEvents(listA0),
+          Address.from(b.address()),
+          Address.from(c.address()),
+          Address.from(d.address()));
+      assertStatus(
+          Address.from(b.address()),
+          ALIVE,
+          awaitEvents(listB0),
+          Address.from(a.address()),
+          Address.from(c.address()),
+          Address.from(d.address()));
+      assertStatus(
+          Address.from(c.address()),
+          ALIVE,
+          awaitEvents(listC0),
+          Address.from(a.address()),
+          Address.from(b.address()),
+          Address.from(d.address()));
+      assertStatus(
+          Address.from(d.address()),
+          ALIVE,
+          awaitEvents(listD0),
+          Address.from(a.address()),
+          Address.from(b.address()),
+          Address.from(c.address()));
     } finally {
       stop(fdetectors);
     }
@@ -242,7 +346,12 @@ public void testSuspectedMemberWithNormalNetworkGetsPartitioned() throws Excepti
     NetworkEmulatorTransport b = createTransport();
     NetworkEmulatorTransport c = createTransport();
     NetworkEmulatorTransport d = createTransport();
-    List<Address> members = Arrays.asList(a.address(), b.address(), c.address(), d.address());
+    List<Address> members =
+        Arrays.asList(
+            Address.from(a.address()),
+            Address.from(b.address()),
+            Address.from(c.address()),
+            Address.from(d.address()));
 
     // Create failure detectors
     FailureDetectorImpl fdA = createFd(a, members);
@@ -264,16 +373,19 @@ public void testSuspectedMemberWithNormalNetworkGetsPartitioned() throws Excepti
 
       start(fdetectors);
 
-      assertStatus(a.address(), SUSPECT, awaitEvents(listA), d.address());
-      assertStatus(b.address(), SUSPECT, awaitEvents(listB), d.address());
-      assertStatus(c.address(), SUSPECT, awaitEvents(listC), d.address());
       assertStatus(
-          d.address(),
+          Address.from(a.address()), SUSPECT, awaitEvents(listA), Address.from(d.address()));
+      assertStatus(
+          Address.from(b.address()), SUSPECT, awaitEvents(listB), Address.from(d.address()));
+      assertStatus(
+          Address.from(c.address()), SUSPECT, awaitEvents(listC), Address.from(d.address()));
+      assertStatus(
+          Address.from(d.address()),
           SUSPECT,
           awaitEvents(listD),
-          a.address(),
-          b.address(),
-          c.address()); // node D
+          Address.from(a.address()),
+          Address.from(b.address()),
+          Address.from(c.address())); // node D
       // partitioned
 
       // Unblock traffic to member D on other members
@@ -289,10 +401,34 @@ public void testSuspectedMemberWithNormalNetworkGetsPartitioned() throws Excepti
 
       // Check member D recovers
 
-      assertStatus(a.address(), ALIVE, awaitEvents(listA0), b.address(), c.address(), d.address());
-      assertStatus(b.address(), ALIVE, awaitEvents(listB0), a.address(), c.address(), d.address());
-      assertStatus(c.address(), ALIVE, awaitEvents(listC0), a.address(), b.address(), d.address());
-      assertStatus(d.address(), ALIVE, awaitEvents(listD0), a.address(), b.address(), c.address());
+      assertStatus(
+          Address.from(a.address()),
+          ALIVE,
+          awaitEvents(listA0),
+          Address.from(b.address()),
+          Address.from(c.address()),
+          Address.from(d.address()));
+      assertStatus(
+          Address.from(b.address()),
+          ALIVE,
+          awaitEvents(listB0),
+          Address.from(a.address()),
+          Address.from(c.address()),
+          Address.from(d.address()));
+      assertStatus(
+          Address.from(c.address()),
+          ALIVE,
+          awaitEvents(listC0),
+          Address.from(a.address()),
+          Address.from(b.address()),
+          Address.from(d.address()));
+      assertStatus(
+          Address.from(d.address()),
+          ALIVE,
+          awaitEvents(listD0),
+          Address.from(a.address()),
+          Address.from(b.address()),
+          Address.from(c.address()));
     } finally {
       stop(fdetectors);
     }
@@ -303,7 +439,7 @@ public void testMemberStatusChangeAfterNetworkRecovery() throws Exception {
     // Create transports
     NetworkEmulatorTransport a = createTransport();
     NetworkEmulatorTransport b = createTransport();
-    List<Address> members = Arrays.asList(a.address(), b.address());
+    List<Address> members = Arrays.asList(Address.from(a.address()), Address.from(b.address()));
 
     // Create failure detectors
     FailureDetectorImpl fdA = createFd(a, members);
@@ -320,8 +456,10 @@ public void testMemberStatusChangeAfterNetworkRecovery() throws Exception {
     try {
       start(fdetectors);
 
-      assertStatus(a.address(), SUSPECT, awaitEvents(listA), b.address());
-      assertStatus(b.address(), SUSPECT, awaitEvents(listB), a.address());
+      assertStatus(
+          Address.from(a.address()), SUSPECT, awaitEvents(listA), Address.from(b.address()));
+      assertStatus(
+          Address.from(b.address()), SUSPECT, awaitEvents(listB), Address.from(a.address()));
 
       // Unblock A and B members: A-->B, B-->A
       a.networkEmulator().unblockAllOutbound();
@@ -333,8 +471,8 @@ public void testMemberStatusChangeAfterNetworkRecovery() throws Exception {
       listA = listenNextEventFor(fdA, members);
       listB = listenNextEventFor(fdB, members);
 
-      assertStatus(a.address(), ALIVE, awaitEvents(listA), b.address());
-      assertStatus(b.address(), ALIVE, awaitEvents(listB), a.address());
+      assertStatus(Address.from(a.address()), ALIVE, awaitEvents(listA), Address.from(b.address()));
+      assertStatus(Address.from(b.address()), ALIVE, awaitEvents(listB), Address.from(a.address()));
     } finally {
       stop(fdetectors);
     }
@@ -346,7 +484,9 @@ public void testStatusChangeAfterMemberRestart() throws Exception {
     NetworkEmulatorTransport a = createTransport();
     NetworkEmulatorTransport b = createTransport();
     NetworkEmulatorTransport x = createTransport();
-    List<Address> members = Arrays.asList(a.address(), b.address(), x.address());
+    List<Address> members =
+        Arrays.asList(
+            Address.from(a.address()), Address.from(b.address()), Address.from(x.address()));
 
     // Create failure detectors
     FailureDetectorImpl fdA = createFd(a, members);
@@ -365,16 +505,31 @@ public void testStatusChangeAfterMemberRestart() throws Exception {
     try {
       start(fdetectors);
 
-      assertStatus(a.address(), ALIVE, awaitEvents(listA), b.address(), x.address());
-      assertStatus(b.address(), ALIVE, awaitEvents(listB), a.address(), x.address());
-      assertStatus(x.address(), ALIVE, awaitEvents(listX), a.address(), b.address());
+      assertStatus(
+          Address.from(a.address()),
+          ALIVE,
+          awaitEvents(listA),
+          Address.from(b.address()),
+          Address.from(x.address()));
+      assertStatus(
+          Address.from(b.address()),
+          ALIVE,
+          awaitEvents(listB),
+          Address.from(a.address()),
+          Address.from(x.address()));
+      assertStatus(
+          Address.from(x.address()),
+          ALIVE,
+          awaitEvents(listX),
+          Address.from(a.address()),
+          Address.from(b.address()));
 
       // stop node X
       stop(Collections.singletonList(fdX));
       TimeUnit.SECONDS.sleep(2);
 
       // restart node X as XX
-      xx = createTransport(new TransportConfig().port(x.address().port()));
+      xx = createTransport(new TransportConfig().port(Address.from(x.address()).port()));
       assertEquals(x.address(), xx.address());
       fdetectors = Arrays.asList(fdA, fdB, fdXx = createFd(xx, members));
 
@@ -389,9 +544,24 @@ public void testStatusChangeAfterMemberRestart() throws Exception {
       // TODO [AK]: It would be more correct to consider restarted member as a new member, so x is
       // still suspected!
 
-      assertStatus(a.address(), ALIVE, awaitEvents(listA), b.address(), xx.address());
-      assertStatus(b.address(), ALIVE, awaitEvents(listB), a.address(), xx.address());
-      assertStatus(xx.address(), ALIVE, awaitEvents(listXx), a.address(), b.address());
+      assertStatus(
+          Address.from(a.address()),
+          ALIVE,
+          awaitEvents(listA),
+          Address.from(b.address()),
+          Address.from(xx.address()));
+      assertStatus(
+          Address.from(b.address()),
+          ALIVE,
+          awaitEvents(listB),
+          Address.from(a.address()),
+          Address.from(xx.address()));
+      assertStatus(
+          Address.from(xx.address()),
+          ALIVE,
+          awaitEvents(listXx),
+          Address.from(a.address()),
+          Address.from(b.address()));
     } finally {
       stop(fdetectors);
     }
@@ -410,11 +580,15 @@ private FailureDetectorImpl createFd(
       Transport transport, List<Address> addresses, FailureDetectorConfig config) {
 
     Member localMember =
-        new Member("member-" + transport.address().port(), null, transport.address(), NAMESPACE);
+        new Member(
+            "member-" + Address.from(transport.address()).port(),
+            null,
+            Address.from(transport.address()),
+            NAMESPACE);
 
     Flux<MembershipEvent> membershipFlux =
         Flux.fromIterable(addresses)
-            .filter(address -> !transport.address().equals(address))
+            .filter(address -> !transport.address().equals(address.toString()))
             .map(address -> new Member("member-" + address.port(), null, address, NAMESPACE))
             .map(member -> MembershipEvent.createAdded(member, null, 0));
 
@@ -465,7 +639,7 @@ private static Future<List<FailureDetectorEvent>> listenNextEventFor(
       FailureDetectorImpl fd, List<Address> addresses) {
     final Transport transport = BaseTest.getField(fd, "transport");
     addresses = new ArrayList<>(addresses);
-    addresses.remove(transport.address()); // exclude self
+    addresses.remove(Address.from(transport.address())); // exclude self
     if (addresses.isEmpty()) {
       throw new IllegalArgumentException();
     }
diff --git a/cluster/src/test/java/io/scalecube/cluster/gossip/GossipDelayTest.java b/cluster/src/test/java/io/scalecube/cluster/gossip/GossipDelayTest.java
index 3857d1e6..823459c3 100644
--- a/cluster/src/test/java/io/scalecube/cluster/gossip/GossipDelayTest.java
+++ b/cluster/src/test/java/io/scalecube/cluster/gossip/GossipDelayTest.java
@@ -38,15 +38,24 @@ public void testMessageDelayMoreThanGossipSweepTime() throws InterruptedExceptio
     final GossipProtocolImpl gossipProtocol1 =
         initGossipProtocol(
             transport1,
-            Arrays.asList(transport1.address(), transport2.address(), transport3.address()));
+            Arrays.asList(
+                Address.from(transport1.address()),
+                Address.from(transport2.address()),
+                Address.from(transport3.address())));
     final GossipProtocolImpl gossipProtocol2 =
         initGossipProtocol(
             transport2,
-            Arrays.asList(transport1.address(), transport2.address(), transport3.address()));
+            Arrays.asList(
+                Address.from(transport1.address()),
+                Address.from(transport2.address()),
+                Address.from(transport3.address())));
     final GossipProtocolImpl gossipProtocol3 =
         initGossipProtocol(
             transport3,
-            Arrays.asList(transport1.address(), transport2.address(), transport3.address()));
+            Arrays.asList(
+                Address.from(transport1.address()),
+                Address.from(transport2.address()),
+                Address.from(transport3.address())));
 
     final AtomicInteger protocol1GossipCounter = new AtomicInteger(0);
     final AtomicInteger protocol2GossipCounter = new AtomicInteger(0);
@@ -82,7 +91,11 @@ private GossipProtocolImpl initGossipProtocol(Transport transport, List<Address>
             .gossipRepeatMult(gossipRepeatMultiplier);
 
     Member localMember =
-        new Member("member-" + transport.address().port(), null, transport.address(), NAMESPACE);
+        new Member(
+            "member-" + Address.from(transport.address()).port(),
+            null,
+            Address.from(transport.address()),
+            NAMESPACE);
 
     Flux<MembershipEvent> membershipFlux =
         Flux.fromIterable(members)
diff --git a/cluster/src/test/java/io/scalecube/cluster/gossip/GossipProtocolTest.java b/cluster/src/test/java/io/scalecube/cluster/gossip/GossipProtocolTest.java
index ae82bca2..12e5856f 100644
--- a/cluster/src/test/java/io/scalecube/cluster/gossip/GossipProtocolTest.java
+++ b/cluster/src/test/java/io/scalecube/cluster/gossip/GossipProtocolTest.java
@@ -232,7 +232,7 @@ private List<GossipProtocolImpl> initGossipProtocols(int count, int lostPercent,
     final List<Transport> transports = initTransports(count, lostPercent, meanDelay);
     List<Address> members = new ArrayList<>();
     for (Transport transport : transports) {
-      members.add(transport.address());
+      members.add(Address.from(transport.address()));
     }
     List<GossipProtocolImpl> gossipProtocols = new ArrayList<>();
     for (Transport transport : transports) {
@@ -259,7 +259,11 @@ private GossipProtocolImpl initGossipProtocol(Transport transport, List<Address>
             .gossipRepeatMult(gossipRepeatMultiplier);
 
     Member localMember =
-        new Member("member-" + transport.address().port(), null, transport.address(), NAMESPACE);
+        new Member(
+            "member-" + Address.from(transport.address()).port(),
+            null,
+            Address.from(transport.address()),
+            NAMESPACE);
 
     Flux<MembershipEvent> membershipFlux =
         Flux.fromIterable(members)
diff --git a/cluster/src/test/java/io/scalecube/cluster/membership/MembershipProtocolTest.java b/cluster/src/test/java/io/scalecube/cluster/membership/MembershipProtocolTest.java
index 4fced8e2..ee1fdc53 100644
--- a/cluster/src/test/java/io/scalecube/cluster/membership/MembershipProtocolTest.java
+++ b/cluster/src/test/java/io/scalecube/cluster/membership/MembershipProtocolTest.java
@@ -76,7 +76,9 @@ public void testLeaveCluster() {
     NetworkEmulatorTransport a = createTransport();
     NetworkEmulatorTransport b = createTransport();
     NetworkEmulatorTransport c = createTransport();
-    List<Address> addresses = Arrays.asList(a.address(), b.address(), c.address());
+    List<Address> addresses =
+        Arrays.asList(
+            Address.from(a.address()), Address.from(b.address()), Address.from(c.address()));
 
     MembershipProtocolImpl cmA = createMembership(a, addresses);
     MembershipProtocolImpl cmB = createMembership(b, addresses);
@@ -111,7 +113,8 @@ public void testLeaveClusterCameBeforeAlive() {
     final NetworkEmulatorTransport b = createTransport();
     final Member anotherMember =
         new Member("leavingNodeId-1", null, Address.from("localhost:9236"), NAMESPACE);
-    final List<Address> addresses = Arrays.asList(a.address(), b.address());
+    final List<Address> addresses =
+        Arrays.asList(Address.from(a.address()), Address.from(b.address()));
 
     final MembershipProtocolImpl cmA = createMembership(a, addresses);
     final MembershipProtocolImpl cmB = createMembership(b, addresses);
@@ -156,7 +159,8 @@ public void testLeaveClusterOnly() {
     final NetworkEmulatorTransport b = createTransport();
     final Member anotherMember =
         new Member("leavingNodeId-1", null, Address.from("localhost:9236"), NAMESPACE);
-    final List<Address> addresses = Arrays.asList(a.address(), b.address());
+    final List<Address> addresses =
+        Arrays.asList(Address.from(a.address()), Address.from(b.address()));
 
     final MembershipProtocolImpl cmA = createMembership(a, addresses);
     final MembershipProtocolImpl cmB = createMembership(b, addresses);
@@ -190,7 +194,8 @@ public void testLeaveClusterOnSuspectedNode() {
     final NetworkEmulatorTransport b = createTransport();
     final Member anotherMember =
         new Member("leavingNodeId-1", null, Address.from("localhost:9236"), NAMESPACE);
-    final List<Address> addresses = Arrays.asList(a.address(), b.address());
+    final List<Address> addresses =
+        Arrays.asList(Address.from(a.address()), Address.from(b.address()));
 
     final MembershipProtocolImpl cmA = createMembership(a, addresses);
     final MembershipProtocolImpl cmB = createMembership(b, addresses);
@@ -234,7 +239,8 @@ public void testLeaveClusterOnSuspectedNode() {
   public void testLeaveClusterOnAliveAndSuspectedNode() {
     final NetworkEmulatorTransport a = createTransport();
     final NetworkEmulatorTransport b = createTransport();
-    final List<Address> addresses = Arrays.asList(a.address(), b.address());
+    final List<Address> addresses =
+        Arrays.asList(Address.from(a.address()), Address.from(b.address()));
 
     final MembershipProtocolImpl cmA = createMembership(a, addresses);
     final MembershipProtocolImpl cmB = createMembership(b, addresses);
@@ -270,7 +276,9 @@ public void testInitialPhaseOk() {
     Transport a = createTransport();
     Transport b = createTransport();
     Transport c = createTransport();
-    List<Address> addresses = Arrays.asList(a.address(), b.address(), c.address());
+    List<Address> addresses =
+        Arrays.asList(
+            Address.from(a.address()), Address.from(b.address()), Address.from(c.address()));
 
     MembershipProtocolImpl cmA = createMembership(a, addresses);
     MembershipProtocolImpl cmB = createMembership(b, addresses);
@@ -295,7 +303,9 @@ public void testNetworkPartitionDueNoOutboundThenRecover() {
     NetworkEmulatorTransport a = createTransport();
     NetworkEmulatorTransport b = createTransport();
     NetworkEmulatorTransport c = createTransport();
-    List<Address> addresses = Arrays.asList(a.address(), b.address(), c.address());
+    List<Address> addresses =
+        Arrays.asList(
+            Address.from(a.address()), Address.from(b.address()), Address.from(c.address()));
 
     MembershipProtocolImpl cmA = createMembership(a, addresses);
     MembershipProtocolImpl cmB = createMembership(b, addresses);
@@ -304,9 +314,12 @@ public void testNetworkPartitionDueNoOutboundThenRecover() {
     awaitSeconds(3);
 
     // Block traffic
-    a.networkEmulator().blockOutbound(addresses);
-    b.networkEmulator().blockOutbound(addresses);
-    c.networkEmulator().blockOutbound(addresses);
+    a.networkEmulator()
+        .blockOutbound(addresses.stream().map(Address::toString).collect(Collectors.toList()));
+    b.networkEmulator()
+        .blockOutbound(addresses.stream().map(Address::toString).collect(Collectors.toList()));
+    c.networkEmulator()
+        .blockOutbound(addresses.stream().map(Address::toString).collect(Collectors.toList()));
 
     try {
 
@@ -341,7 +354,9 @@ public void testMemberLostNetworkDueNoOutboundThenRecover() {
     NetworkEmulatorTransport a = createTransport();
     NetworkEmulatorTransport b = createTransport();
     NetworkEmulatorTransport c = createTransport();
-    List<Address> members = Arrays.asList(a.address(), b.address(), c.address());
+    List<Address> members =
+        Arrays.asList(
+            Address.from(a.address()), Address.from(b.address()), Address.from(c.address()));
 
     MembershipProtocolImpl cmA = createMembership(a, members);
     MembershipProtocolImpl cmB = createMembership(b, members);
@@ -397,7 +412,9 @@ public void testNetworkPartitionTwiceDueNoOutboundThenRecover() {
     NetworkEmulatorTransport a = createTransport();
     NetworkEmulatorTransport b = createTransport();
     NetworkEmulatorTransport c = createTransport();
-    List<Address> addresses = Arrays.asList(a.address(), b.address(), c.address());
+    List<Address> addresses =
+        Arrays.asList(
+            Address.from(a.address()), Address.from(b.address()), Address.from(c.address()));
 
     MembershipProtocolImpl cmA = createMembership(a, addresses);
     MembershipProtocolImpl cmB = createMembership(b, addresses);
@@ -467,7 +484,9 @@ public void testNetworkLostOnAllNodesDueNoOutboundThenRecover() {
     NetworkEmulatorTransport a = createTransport();
     NetworkEmulatorTransport b = createTransport();
     NetworkEmulatorTransport c = createTransport();
-    List<Address> addresses = Arrays.asList(a.address(), b.address(), c.address());
+    List<Address> addresses =
+        Arrays.asList(
+            Address.from(a.address()), Address.from(b.address()), Address.from(c.address()));
 
     MembershipProtocolImpl cmA = createMembership(a, addresses);
     MembershipProtocolImpl cmB = createMembership(b, addresses);
@@ -483,9 +502,12 @@ public void testNetworkLostOnAllNodesDueNoOutboundThenRecover() {
       assertTrusted(cmC, cmB.member(), cmA.member());
       assertNoSuspected(cmC);
 
-      a.networkEmulator().blockOutbound(addresses);
-      b.networkEmulator().blockOutbound(addresses);
-      c.networkEmulator().blockOutbound(addresses);
+      a.networkEmulator()
+          .blockOutbound(addresses.stream().map(Address::toString).collect(Collectors.toList()));
+      b.networkEmulator()
+          .blockOutbound(addresses.stream().map(Address::toString).collect(Collectors.toList()));
+      c.networkEmulator()
+          .blockOutbound(addresses.stream().map(Address::toString).collect(Collectors.toList()));
 
       awaitSeconds(1);
 
@@ -523,7 +545,12 @@ public void testLongNetworkPartitionDueNoOutboundThenRemoved() {
     NetworkEmulatorTransport b = createTransport();
     NetworkEmulatorTransport c = createTransport();
     NetworkEmulatorTransport d = createTransport();
-    List<Address> addresses = Arrays.asList(a.address(), b.address(), c.address(), d.address());
+    List<Address> addresses =
+        Arrays.asList(
+            Address.from(a.address()),
+            Address.from(b.address()),
+            Address.from(c.address()),
+            Address.from(d.address()));
 
     MembershipProtocolImpl cmA = createMembership(a, addresses);
     MembershipProtocolImpl cmB = createMembership(b, addresses);
@@ -576,7 +603,12 @@ public void testRestartStoppedMembers() {
     NetworkEmulatorTransport b = createTransport();
     NetworkEmulatorTransport c = createTransport();
     NetworkEmulatorTransport d = createTransport();
-    List<Address> addresses = Arrays.asList(a.address(), b.address(), c.address(), d.address());
+    List<Address> addresses =
+        Arrays.asList(
+            Address.from(a.address()),
+            Address.from(b.address()),
+            Address.from(c.address()),
+            Address.from(d.address()));
 
     MembershipProtocolImpl cmA = createMembership(a, addresses);
     MembershipProtocolImpl cmB = createMembership(b, addresses);
@@ -656,7 +688,12 @@ public void testRestartStoppedMembersOnSameAddresses() {
     NetworkEmulatorTransport b = createTransport();
     NetworkEmulatorTransport c = createTransport();
     NetworkEmulatorTransport d = createTransport();
-    List<Address> addresses = Arrays.asList(a.address(), b.address(), c.address(), d.address());
+    List<Address> addresses =
+        Arrays.asList(
+            Address.from(a.address()),
+            Address.from(b.address()),
+            Address.from(c.address()),
+            Address.from(d.address()));
 
     MembershipProtocolImpl cmA = createMembership(a, addresses);
     MembershipProtocolImpl cmB = createMembership(b, addresses);
@@ -691,8 +728,8 @@ public void testRestartStoppedMembersOnSameAddresses() {
       assertSuspected(cmB, cmC.member(), cmD.member());
 
       // Restart C and D on same ports
-      c_Restarted = createTransport(new TransportConfig().port(c.address().port()));
-      d_Restarted = createTransport(new TransportConfig().port(d.address().port()));
+      c_Restarted = createTransport(new TransportConfig().port(Address.from(c.address()).port()));
+      d_Restarted = createTransport(new TransportConfig().port(Address.from(d.address()).port()));
       cmC_Restarted = createMembership(c_Restarted, addresses);
       cmD_Restarted = createMembership(d_Restarted, addresses);
 
@@ -728,10 +765,14 @@ public void testLimitedSeedMembers() {
     NetworkEmulatorTransport e = createTransport();
 
     MembershipProtocolImpl cmA = createMembership(a, Collections.emptyList());
-    MembershipProtocolImpl cmB = createMembership(b, Collections.singletonList(a.address()));
-    MembershipProtocolImpl cmC = createMembership(c, Collections.singletonList(a.address()));
-    MembershipProtocolImpl cmD = createMembership(d, Collections.singletonList(b.address()));
-    MembershipProtocolImpl cmE = createMembership(e, Collections.singletonList(b.address()));
+    MembershipProtocolImpl cmB =
+        createMembership(b, Collections.singletonList(Address.from(a.address())));
+    MembershipProtocolImpl cmC =
+        createMembership(c, Collections.singletonList(Address.from(a.address())));
+    MembershipProtocolImpl cmD =
+        createMembership(d, Collections.singletonList(Address.from(b.address())));
+    MembershipProtocolImpl cmE =
+        createMembership(e, Collections.singletonList(Address.from(b.address())));
 
     try {
       awaitSeconds(3);
@@ -765,16 +806,24 @@ public void testOverrideMemberAddress() throws UnknownHostException {
         createMembership(a, testConfig(Collections.emptyList()).externalHost(localAddress));
     MembershipProtocolImpl cmB =
         createMembership(
-            b, testConfig(Collections.singletonList(a.address())).externalHost(localAddress));
+            b,
+            testConfig(Collections.singletonList(Address.from(a.address())))
+                .externalHost(localAddress));
     MembershipProtocolImpl cmC =
         createMembership(
-            c, testConfig(Collections.singletonList(a.address())).externalHost(localAddress));
+            c,
+            testConfig(Collections.singletonList(Address.from(a.address())))
+                .externalHost(localAddress));
     MembershipProtocolImpl cmD =
         createMembership(
-            d, testConfig(Collections.singletonList(b.address())).externalHost(localAddress));
+            d,
+            testConfig(Collections.singletonList(Address.from(b.address())))
+                .externalHost(localAddress));
     MembershipProtocolImpl cmE =
         createMembership(
-            e, testConfig(Collections.singletonList(b.address())).externalHost(localAddress));
+            e,
+            testConfig(Collections.singletonList(Address.from(b.address())))
+                .externalHost(localAddress));
 
     try {
       awaitSeconds(3);
@@ -804,9 +853,10 @@ public void testNodeJoinClusterWithNoInbound() {
     c_noInbound.networkEmulator().blockAllInbound();
 
     MembershipProtocolImpl cmA = createMembership(a, Collections.emptyList());
-    MembershipProtocolImpl cmB = createMembership(b, Collections.singletonList(a.address()));
+    MembershipProtocolImpl cmB =
+        createMembership(b, Collections.singletonList(Address.from(a.address())));
     MembershipProtocolImpl cm_noInbound =
-        createMembership(c_noInbound, Collections.singletonList(a.address()));
+        createMembership(c_noInbound, Collections.singletonList(Address.from(a.address())));
 
     awaitSeconds(3);
 
@@ -831,9 +881,11 @@ public void testNodeJoinClusterWithNoInboundThenInboundRecover() {
     c_noInboundThenInboundOk.networkEmulator().blockAllInbound();
 
     MembershipProtocolImpl cmA = createMembership(a, Collections.emptyList());
-    MembershipProtocolImpl cmB = createMembership(b, Collections.singletonList(a.address()));
+    MembershipProtocolImpl cmB =
+        createMembership(b, Collections.singletonList(Address.from(a.address())));
     MembershipProtocolImpl cm_noInboundThenInboundOk =
-        createMembership(c_noInboundThenInboundOk, Collections.singletonList(a.address()));
+        createMembership(
+            c_noInboundThenInboundOk, Collections.singletonList(Address.from(a.address())));
 
     awaitSeconds(3);
 
@@ -865,8 +917,10 @@ public void testNetworkPartitionDueNoInboundThenRemoved() {
     NetworkEmulatorTransport c = createTransport();
 
     MembershipProtocolImpl cmA = createMembership(a, Collections.emptyList());
-    MembershipProtocolImpl cmB = createMembership(b, Collections.singletonList(a.address()));
-    MembershipProtocolImpl cmC = createMembership(c, Collections.singletonList(a.address()));
+    MembershipProtocolImpl cmB =
+        createMembership(b, Collections.singletonList(Address.from(a.address())));
+    MembershipProtocolImpl cmC =
+        createMembership(c, Collections.singletonList(Address.from(a.address())));
 
     try {
       awaitSeconds(3);
@@ -905,8 +959,10 @@ public void testNetworkPartitionDueNoInboundUntilRemovedThenInboundRecover() {
     NetworkEmulatorTransport c = createTransport();
 
     MembershipProtocolImpl cmA = createMembership(a, Collections.emptyList());
-    MembershipProtocolImpl cmB = createMembership(b, Collections.singletonList(a.address()));
-    MembershipProtocolImpl cmC = createMembership(c, Collections.singletonList(a.address()));
+    MembershipProtocolImpl cmB =
+        createMembership(b, Collections.singletonList(Address.from(a.address())));
+    MembershipProtocolImpl cmC =
+        createMembership(c, Collections.singletonList(Address.from(a.address())));
 
     try {
       awaitSeconds(3);
@@ -957,8 +1013,10 @@ public void testNetworkPartitionBetweenTwoMembersDueNoInbound() {
     NetworkEmulatorTransport c = createTransport();
 
     MembershipProtocolImpl cmA = createMembership(a, Collections.emptyList());
-    MembershipProtocolImpl cmB = createMembership(b, Collections.singletonList(a.address()));
-    MembershipProtocolImpl cmC = createMembership(c, Collections.singletonList(a.address()));
+    MembershipProtocolImpl cmB =
+        createMembership(b, Collections.singletonList(Address.from(a.address())));
+    MembershipProtocolImpl cmC =
+        createMembership(c, Collections.singletonList(Address.from(a.address())));
 
     try {
       awaitSeconds(3);
@@ -987,8 +1045,10 @@ public void testNetworkPartitionBetweenTwoMembersDueNoOutbound() {
     NetworkEmulatorTransport c = createTransport();
 
     MembershipProtocolImpl cmA = createMembership(a, Collections.emptyList());
-    MembershipProtocolImpl cmB = createMembership(b, Collections.singletonList(a.address()));
-    MembershipProtocolImpl cmC = createMembership(c, Collections.singletonList(a.address()));
+    MembershipProtocolImpl cmB =
+        createMembership(b, Collections.singletonList(Address.from(a.address())));
+    MembershipProtocolImpl cmC =
+        createMembership(c, Collections.singletonList(Address.from(a.address())));
 
     try {
       awaitSeconds(3);
@@ -1017,8 +1077,10 @@ public void testNetworkPartitionBetweenTwoMembersDueNoTrafficAtAll() {
     NetworkEmulatorTransport c = createTransport();
 
     MembershipProtocolImpl cmA = createMembership(a, Collections.emptyList());
-    MembershipProtocolImpl cmB = createMembership(b, Collections.singletonList(a.address()));
-    MembershipProtocolImpl cmC = createMembership(c, Collections.singletonList(a.address()));
+    MembershipProtocolImpl cmB =
+        createMembership(b, Collections.singletonList(Address.from(a.address())));
+    MembershipProtocolImpl cmC =
+        createMembership(c, Collections.singletonList(Address.from(a.address())));
 
     try {
       awaitSeconds(3);
@@ -1047,7 +1109,12 @@ public void testNetworkPartitionManyDueNoInboundThenRemovedThenRecover() {
     NetworkEmulatorTransport b = createTransport();
     NetworkEmulatorTransport c = createTransport();
     NetworkEmulatorTransport d = createTransport();
-    List<Address> addresses = Arrays.asList(a.address(), b.address(), c.address(), d.address());
+    List<Address> addresses =
+        Arrays.asList(
+            Address.from(a.address()),
+            Address.from(b.address()),
+            Address.from(c.address()),
+            Address.from(d.address()));
 
     MembershipProtocolImpl cmA = createMembership(a, addresses);
     MembershipProtocolImpl cmB = createMembership(b, addresses);
@@ -1135,7 +1202,8 @@ private MembershipProtocolImpl createMembership(
   }
 
   private MembershipProtocolImpl createMembership(Transport transport, ClusterConfig config) {
-    Member localMember = new Member(newMemberId(), null, transport.address(), NAMESPACE);
+    Member localMember =
+        new Member(newMemberId(), null, Address.from(transport.address()), NAMESPACE);
 
     Sinks.Many<MembershipEvent> membershipProcessor = Sinks.many().multicast().directBestEffort();
 
diff --git a/transport-parent/transport-api/pom.xml b/transport-parent/transport-api/pom.xml
index 9a1f4c38..b0a2d71a 100644
--- a/transport-parent/transport-api/pom.xml
+++ b/transport-parent/transport-api/pom.xml
@@ -1,5 +1,7 @@
 <?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <parent>
     <artifactId>scalecube-transport-parent</artifactId>
     <groupId>io.scalecube</groupId>
@@ -10,11 +12,4 @@
   <artifactId>scalecube-transport-api</artifactId>
   <name>ScaleCube/ClusterTransportApi</name>
 
-  <dependencies>
-    <dependency>
-      <groupId>io.scalecube</groupId>
-      <artifactId>scalecube-commons</artifactId>
-    </dependency>
-  </dependencies>
-
 </project>
diff --git a/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/DistinctErrors.java b/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/DistinctErrors.java
new file mode 100644
index 00000000..a421de0a
--- /dev/null
+++ b/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/DistinctErrors.java
@@ -0,0 +1,139 @@
+package io.scalecube.cluster.transport.api;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+public class DistinctErrors {
+
+  private final List<DistinctObservation> distinctObservations = new ArrayList<>();
+  private final long evictionInterval;
+
+  /** Constructor. */
+  public DistinctErrors() {
+    this(null);
+  }
+
+  /**
+   * Constructor.
+   *
+   * @param evictionInterval optional, how long consider incoming observation as unique.
+   */
+  public DistinctErrors(Duration evictionInterval) {
+    this.evictionInterval =
+        evictionInterval != null && evictionInterval.toMillis() > 0
+            ? evictionInterval.toMillis()
+            : Long.MAX_VALUE;
+  }
+
+  /**
+   * Return true if there is an observation (or at least in the eviction time window) of this error
+   * type for a stack trace. Otherwise a new entry will be created and kept.
+   *
+   * @param observation an error observation
+   * @return true if such observation exists.
+   */
+  public boolean contains(Throwable observation) {
+    synchronized (this) {
+      final long now = System.currentTimeMillis();
+      DistinctObservation distinctObservation = find(now, distinctObservations, observation);
+
+      if (distinctObservation == null) {
+        distinctObservations.add(new DistinctObservation(observation, now + evictionInterval));
+        return false;
+      }
+
+      if (distinctObservation.deadline > now) {
+        distinctObservation.resetDeadline(now + evictionInterval);
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  private static DistinctObservation find(
+      long now, List<DistinctObservation> existingObservations, Throwable observation) {
+    DistinctObservation existingObservation = null;
+
+    for (int lastIndex = existingObservations.size() - 1, i = lastIndex; i >= 0; i--) {
+      final DistinctObservation o = existingObservations.get(lastIndex);
+
+      if (equals(o.throwable, observation)) {
+        existingObservation = o;
+        break;
+      }
+
+      if (o.deadline > now) {
+        if (i == lastIndex) {
+          existingObservations.remove(i);
+        } else {
+          existingObservations.set(i, existingObservations.remove(lastIndex));
+        }
+        lastIndex--;
+      }
+    }
+
+    return existingObservation;
+  }
+
+  private static boolean equals(Throwable lhs, Throwable rhs) {
+    while (true) {
+      if (lhs == rhs) {
+        return true;
+      }
+
+      if (lhs.getClass() == rhs.getClass()
+          && Objects.equals(lhs.getMessage(), rhs.getMessage())
+          && equals(lhs.getStackTrace(), rhs.getStackTrace())) {
+        lhs = lhs.getCause();
+        rhs = rhs.getCause();
+
+        if (null == lhs && null == rhs) {
+          return true;
+        } else if (null != lhs && null != rhs) {
+          continue;
+        }
+      }
+
+      return false;
+    }
+  }
+
+  private static boolean equals(
+      StackTraceElement[] lhsStackTrace, StackTraceElement[] rhsStackTrace) {
+    if (lhsStackTrace.length != rhsStackTrace.length) {
+      return false;
+    }
+
+    for (int i = 0, length = lhsStackTrace.length; i < length; i++) {
+      final StackTraceElement lhs = lhsStackTrace[i];
+      final StackTraceElement rhs = rhsStackTrace[i];
+
+      if (lhs.getLineNumber() != rhs.getLineNumber()
+          || !lhs.getClassName().equals(rhs.getClassName())
+          || !Objects.equals(lhs.getMethodName(), rhs.getMethodName())
+          || !Objects.equals(lhs.getFileName(), rhs.getFileName())) {
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  private static final class DistinctObservation {
+
+    private final Throwable throwable;
+    private long deadline;
+
+    DistinctObservation(Throwable throwable, long deadline) {
+      this.throwable = throwable;
+      this.deadline = deadline;
+    }
+
+    void resetDeadline(long deadline) {
+      this.deadline = deadline;
+    }
+  }
+}
diff --git a/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/Message.java b/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/Message.java
index b5e3e879..96f5105f 100644
--- a/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/Message.java
+++ b/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/Message.java
@@ -1,6 +1,5 @@
 package io.scalecube.cluster.transport.api;
 
-import io.scalecube.net.Address;
 import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
@@ -33,8 +32,8 @@ public final class Message implements Externalizable {
   public static final String HEADER_CORRELATION_ID = "cid";
 
   /**
-   * This header represents sender address of type {@link Address}. It's an address of message
-   * originator. This header is optional.
+   * This header represents sender address. It is an address of message originator. This header is
+   * optional.
    */
   public static final String HEADER_SENDER = "sender";
 
@@ -186,12 +185,12 @@ public <T> T data() {
   }
 
   /**
-   * Returns {@link Address} of the sender of this message.
+   * Returns address of the sender of this message.
    *
-   * @return address
+   * @return address, or null
    */
-  public Address sender() {
-    return Optional.ofNullable(header(HEADER_SENDER)).map(Address::from).orElse(null);
+  public String sender() {
+    return Optional.ofNullable(header(HEADER_SENDER)).orElse(null);
   }
 
   @Override
@@ -281,8 +280,8 @@ public Builder correlationId(String correlationId) {
       return header(HEADER_CORRELATION_ID, correlationId);
     }
 
-    public Builder sender(Address sender) {
-      return header(HEADER_SENDER, sender.toString());
+    public Builder sender(String sender) {
+      return header(HEADER_SENDER, sender);
     }
 
     public Message build() {
diff --git a/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/Transport.java b/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/Transport.java
index 2cd0b025..f8ad2d18 100644
--- a/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/Transport.java
+++ b/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/Transport.java
@@ -1,7 +1,8 @@
 package io.scalecube.cluster.transport.api;
 
-import io.scalecube.net.Address;
 import java.util.Objects;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 import reactor.core.Exceptions;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -12,13 +13,14 @@
  */
 public interface Transport {
 
+  Pattern ADDRESS_FORMAT = Pattern.compile("(?<host>^.*):(?<port>\\d+$)");
+
   /**
-   * Returns local {@link Address} on which current instance of transport listens for incoming
-   * messages.
+   * Returns local address on which current instance of transport listens for incoming messages.
    *
    * @return address
    */
-  Address address();
+  String address();
 
   /**
    * Start transport. After this call method {@link #address()} shall be eligible for calling.
@@ -50,7 +52,7 @@ public interface Transport {
    * @return promise which will be completed with result of sending (void or exception)
    * @throws IllegalArgumentException if {@code message} or {@code address} is null
    */
-  Mono<Void> send(Address address, Message message);
+  Mono<Void> send(String address, Message message);
 
   /**
    * Sends message to the given address. It will issue connect in case if no transport channel by
@@ -62,7 +64,7 @@ public interface Transport {
    * @return promise which will be completed with result of sending (message or exception)
    * @throws IllegalArgumentException if {@code message} or {@code address} is null
    */
-  Mono<Message> requestResponse(Address address, Message request);
+  Mono<Message> requestResponse(String address, Message request);
 
   /**
    * Returns stream of received messages. For each observers subscribed to the returned observable:
@@ -124,4 +126,54 @@ static Mono<Transport> bind(TransportConfig config) {
     Objects.requireNonNull(config.transportFactory(), "[bind] transportFactory");
     return config.transportFactory().createTransport(config).start();
   }
+
+  /**
+   * Parses string in format {@code host:port} and returns host part.
+   *
+   * @param address address, must be string in format {@code host:port}
+   * @return address host, or throwing exception
+   */
+  static String parseHost(String address) {
+    if (address == null || address.isEmpty()) {
+      throw new IllegalArgumentException("Cannot parse address host from: " + address);
+    }
+
+    Matcher matcher = ADDRESS_FORMAT.matcher(address);
+    if (!matcher.find()) {
+      throw new IllegalArgumentException("Cannot parse address host from: " + address);
+    }
+
+    String host = matcher.group(1);
+    if (host == null || host.isEmpty()) {
+      throw new IllegalArgumentException("Cannot parse address host from: " + address);
+    }
+
+    return host;
+  }
+
+  /**
+   * Parses string in format {@code host:port} and returns port part.
+   *
+   * @param address address, must be string in format {@code host:port}
+   * @return address port, or throwing exception
+   */
+  static int parsePort(String address) {
+    if (address == null || address.isEmpty()) {
+      throw new IllegalArgumentException("Cannot parse address port from: " + address);
+    }
+
+    Matcher matcher = ADDRESS_FORMAT.matcher(address);
+    if (!matcher.find()) {
+      throw new IllegalArgumentException("Cannot parse address port from: " + address);
+    }
+
+    int port;
+    try {
+      port = Integer.parseInt(matcher.group(2));
+    } catch (NumberFormatException ex) {
+      throw new IllegalArgumentException("Cannot parse address port from: " + address, ex);
+    }
+
+    return port;
+  }
 }
diff --git a/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/TransportConfig.java b/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/TransportConfig.java
index 06129f83..b6837f9e 100644
--- a/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/TransportConfig.java
+++ b/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/TransportConfig.java
@@ -1,6 +1,5 @@
 package io.scalecube.cluster.transport.api;
 
-import io.scalecube.net.Address;
 import java.util.StringJoiner;
 import java.util.function.Function;
 import reactor.core.Exceptions;
@@ -22,7 +21,7 @@ public final class TransportConfig implements Cloneable {
   private MessageCodec messageCodec = MessageCodec.INSTANCE;
   private int maxFrameLength = 2 * 1024 * 1024; // 2 MB
   private TransportFactory transportFactory;
-  private Function<Address, Address> addressMapper = Function.identity();
+  private Function<String, String> addressMapper = Function.identity();
 
   public TransportConfig() {}
 
@@ -143,13 +142,13 @@ public TransportConfig maxFrameLength(int maxFrameLength) {
    * @param addressMapper address mapper
    * @return new {@code TransportConfig} instance
    */
-  public TransportConfig addressMapper(Function<Address, Address> addressMapper) {
+  public TransportConfig addressMapper(Function<String, String> addressMapper) {
     TransportConfig t = clone();
     t.addressMapper = addressMapper;
     return t;
   }
 
-  public Function<Address, Address> addressMapper() {
+  public Function<String, String> addressMapper() {
     return addressMapper;
   }
 
diff --git a/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/Sender.java b/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/Sender.java
index 5c0cbdd6..3d5ef35c 100644
--- a/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/Sender.java
+++ b/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/Sender.java
@@ -1,13 +1,12 @@
 package io.scalecube.transport.netty;
 
 import io.scalecube.cluster.transport.api.Message;
-import io.scalecube.net.Address;
 import reactor.core.publisher.Mono;
 import reactor.netty.Connection;
 
 public interface Sender {
 
-  Mono<Connection> connect(Address address);
+  Mono<Connection> connect(String address);
 
   Mono<Void> send(Message message);
 }
diff --git a/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/TransportImpl.java b/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/TransportImpl.java
index 9838c68d..bcb93ad8 100644
--- a/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/TransportImpl.java
+++ b/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/TransportImpl.java
@@ -1,6 +1,6 @@
 package io.scalecube.transport.netty;
 
-import static io.scalecube.reactor.RetryNonSerializedEmitFailureHandler.RETRY_NON_SERIALIZED;
+import static reactor.core.publisher.Sinks.EmitFailureHandler.busyLooping;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
@@ -10,13 +10,13 @@
 import io.netty.handler.codec.DecoderException;
 import io.netty.handler.codec.EncoderException;
 import io.netty.util.ReferenceCountUtil;
+import io.scalecube.cluster.transport.api.DistinctErrors;
 import io.scalecube.cluster.transport.api.Message;
 import io.scalecube.cluster.transport.api.MessageCodec;
 import io.scalecube.cluster.transport.api.Transport;
-import io.scalecube.errors.DistinctErrors;
-import io.scalecube.net.Address;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
 import java.time.Duration;
 import java.util.Map;
 import java.util.Objects;
@@ -36,6 +36,7 @@
 public final class TransportImpl implements Transport {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(Transport.class);
+
   private static final DistinctErrors DISTINCT_ERRORS = new DistinctErrors(Duration.ofMinutes(1));
 
   private final MessageCodec messageCodec;
@@ -48,15 +49,15 @@ public final class TransportImpl implements Transport {
   private final Sinks.One<Void> onStop = Sinks.one();
 
   // Server
-  private Address address;
+  private String address;
   private DisposableServer server;
-  private final Map<Address, Mono<? extends Connection>> connections = new ConcurrentHashMap<>();
+  private final Map<String, Mono<? extends Connection>> connections = new ConcurrentHashMap<>();
   private final LoopResources loopResources = LoopResources.create("sc-cluster-io", 1, true);
 
   // Transport factory
   private final Receiver receiver;
   private final Sender sender;
-  private final Function<Address, Address> addressMapper;
+  private final Function<String, String> addressMapper;
 
   /**
    * Constructor with config as parameter.
@@ -72,21 +73,30 @@ public TransportImpl(
       MessageCodec messageCodec,
       Receiver receiver,
       Sender sender,
-      Function<Address, Address> addressMapper) {
+      Function<String, String> addressMapper) {
     this.messageCodec = messageCodec;
     this.receiver = receiver;
     this.sender = sender;
     this.addressMapper = addressMapper;
   }
 
-  private static Address prepareAddress(DisposableServer server) {
+  private static String prepareAddress(DisposableServer server) {
     final InetSocketAddress serverAddress = (InetSocketAddress) server.address();
-    InetAddress inetAddress = serverAddress.getAddress();
-    int port = serverAddress.getPort();
+    final InetAddress inetAddress = serverAddress.getAddress();
+    final int port = serverAddress.getPort();
+
     if (inetAddress.isAnyLocalAddress()) {
-      return Address.create(Address.getLocalIpAddress().getHostAddress(), port);
+      return getLocalHostAddress() + ":" + port;
     } else {
-      return Address.create(inetAddress.getHostAddress(), port);
+      return inetAddress.getHostAddress() + ":" + port;
+    }
+  }
+
+  private static String getLocalHostAddress() {
+    try {
+      return InetAddress.getLocalHost().getHostAddress();
+    } catch (UnknownHostException e) {
+      throw new RuntimeException(e);
     }
   }
 
@@ -96,7 +106,7 @@ private void init(DisposableServer server) {
     // Setup cleanup
     stop.asMono()
         .then(doStop())
-        .doFinally(s -> onStop.emitEmpty(RETRY_NON_SERIALIZED))
+        .doFinally(s -> onStop.emitEmpty(busyLooping(Duration.ofSeconds(3))))
         .subscribe(
             null, ex -> LOGGER.warn("[{}][doStop] Exception occurred: {}", address, ex.toString()));
   }
@@ -122,7 +132,7 @@ public Mono<Transport> start() {
   }
 
   @Override
-  public Address address() {
+  public String address() {
     return address;
   }
 
@@ -132,10 +142,10 @@ public boolean isStopped() {
   }
 
   @Override
-  public final Mono<Void> stop() {
+  public Mono<Void> stop() {
     return Mono.defer(
         () -> {
-          stop.emitEmpty(RETRY_NON_SERIALIZED);
+          stop.emitEmpty(busyLooping(Duration.ofSeconds(3)));
           return onStop.asMono();
         });
   }
@@ -145,7 +155,7 @@ private Mono<Void> doStop() {
         () -> {
           LOGGER.info("[{}][doStop] Stopping", address);
           // Complete incoming messages observable
-          sink.emitComplete(RETRY_NON_SERIALIZED);
+          sink.emitComplete(busyLooping(Duration.ofSeconds(3)));
           return Flux.concatDelayError(closeServer(), shutdownLoopResources())
               .then()
               .doFinally(s -> connections.clear())
@@ -154,12 +164,12 @@ private Mono<Void> doStop() {
   }
 
   @Override
-  public final Flux<Message> listen() {
+  public Flux<Message> listen() {
     return sink.asFlux().onBackpressureBuffer();
   }
 
   @Override
-  public Mono<Void> send(Address address, Message message) {
+  public Mono<Void> send(String address, Message message) {
     return Mono.deferContextual(context -> connections.computeIfAbsent(address, this::connect))
         .flatMap(
             connection ->
@@ -172,7 +182,7 @@ public Mono<Void> send(Address address, Message message) {
   }
 
   @Override
-  public Mono<Message> requestResponse(Address address, final Message request) {
+  public Mono<Message> requestResponse(String address, final Message request) {
     return Mono.create(
         sink -> {
           Objects.requireNonNull(request, "request must be not null");
@@ -224,8 +234,8 @@ private ByteBuf encodeMessage(Message message) {
     return byteBuf;
   }
 
-  private Mono<? extends Connection> connect(Address remoteAddress) {
-    final Address mappedAddr = addressMapper.apply(remoteAddress);
+  private Mono<? extends Connection> connect(String remoteAddress) {
+    final String mappedAddr = addressMapper.apply(remoteAddress);
     return sender
         .connect(mappedAddr)
         .doOnSuccess(
@@ -277,13 +287,13 @@ private Mono<Void> shutdownLoopResources() {
 
   public static final class ReceiverContext {
 
-    private final Address address;
+    private final String address;
     private final Sinks.Many<Message> sink;
     private final LoopResources loopResources;
     private final Function<ByteBuf, Message> messageDecoder;
 
     private ReceiverContext(
-        Address address,
+        String address,
         Sinks.Many<Message> sink,
         LoopResources loopResources,
         Function<ByteBuf, Message> messageDecoder) {
@@ -313,7 +323,7 @@ public void onMessage(ByteBuf byteBuf) {
           return;
         }
         final Message message = messageDecoder.apply(byteBuf);
-        sink.emitNext(message, RETRY_NON_SERIALIZED);
+        sink.emitNext(message, busyLooping(Duration.ofSeconds(3)));
       } catch (Exception e) {
         LOGGER.error("[{}][onMessage] Exception occurred:", address, e);
       }
diff --git a/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/tcp/TcpSender.java b/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/tcp/TcpSender.java
index 708d1a95..80450f31 100644
--- a/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/tcp/TcpSender.java
+++ b/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/tcp/TcpSender.java
@@ -1,9 +1,11 @@
 package io.scalecube.transport.netty.tcp;
 
+import static io.scalecube.cluster.transport.api.Transport.parseHost;
+import static io.scalecube.cluster.transport.api.Transport.parsePort;
+
 import io.netty.channel.ChannelOption;
 import io.scalecube.cluster.transport.api.Message;
 import io.scalecube.cluster.transport.api.TransportConfig;
-import io.scalecube.net.Address;
 import io.scalecube.transport.netty.Sender;
 import io.scalecube.transport.netty.TransportImpl.SenderContext;
 import reactor.core.publisher.Mono;
@@ -19,7 +21,7 @@ public final class TcpSender implements Sender {
   }
 
   @Override
-  public Mono<Connection> connect(Address address) {
+  public Mono<Connection> connect(String address) {
     return Mono.deferContextual(context -> Mono.just(context.get(SenderContext.class)))
         .map(context -> newTcpClient(context, address))
         .flatMap(TcpClient::connect);
@@ -38,12 +40,12 @@ public Mono<Void> send(Message message) {
         });
   }
 
-  private TcpClient newTcpClient(SenderContext context, Address address) {
+  private TcpClient newTcpClient(SenderContext context, String address) {
     TcpClient tcpClient =
         TcpClient.newConnection()
             .runOn(context.loopResources())
-            .host(address.host())
-            .port(address.port())
+            .host(parseHost(address))
+            .port(parsePort(address))
             .option(ChannelOption.TCP_NODELAY, true)
             .option(ChannelOption.SO_KEEPALIVE, true)
             .option(ChannelOption.SO_REUSEADDR, true)
diff --git a/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/websocket/WebsocketSender.java b/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/websocket/WebsocketSender.java
index 704a7f3d..34762917 100644
--- a/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/websocket/WebsocketSender.java
+++ b/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/websocket/WebsocketSender.java
@@ -1,10 +1,12 @@
 package io.scalecube.transport.netty.websocket;
 
+import static io.scalecube.cluster.transport.api.Transport.parseHost;
+import static io.scalecube.cluster.transport.api.Transport.parsePort;
+
 import io.netty.channel.ChannelOption;
 import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
 import io.scalecube.cluster.transport.api.Message;
 import io.scalecube.cluster.transport.api.TransportConfig;
-import io.scalecube.net.Address;
 import io.scalecube.transport.netty.Sender;
 import io.scalecube.transport.netty.TransportImpl.SenderContext;
 import reactor.core.publisher.Mono;
@@ -21,7 +23,7 @@ public WebsocketSender(TransportConfig config) {
   }
 
   @Override
-  public Mono<Connection> connect(Address address) {
+  public Mono<Connection> connect(String address) {
     return Mono.deferContextual(context -> Mono.just(context.get(SenderContext.class)))
         .map(context -> newWebsocketSender(context, address))
         .flatMap(sender -> sender.uri("/").connect());
@@ -44,12 +46,12 @@ public Mono<Void> send(Message message) {
         });
   }
 
-  private HttpClient.WebsocketSender newWebsocketSender(SenderContext context, Address address) {
+  private HttpClient.WebsocketSender newWebsocketSender(SenderContext context, String address) {
     HttpClient httpClient =
         HttpClient.newConnection()
             .runOn(context.loopResources())
-            .host(address.host())
-            .port(address.port())
+            .host(parseHost(address))
+            .port(parsePort(address))
             .option(ChannelOption.TCP_NODELAY, true)
             .option(ChannelOption.SO_KEEPALIVE, true)
             .option(ChannelOption.SO_REUSEADDR, true)
diff --git a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/BaseTest.java b/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/BaseTest.java
index 3d07e84d..a8638bf8 100644
--- a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/BaseTest.java
+++ b/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/BaseTest.java
@@ -4,7 +4,6 @@
 import io.scalecube.cluster.transport.api.Transport;
 import io.scalecube.cluster.transport.api.TransportConfig;
 import io.scalecube.cluster.utils.NetworkEmulatorTransport;
-import io.scalecube.net.Address;
 import io.scalecube.transport.netty.tcp.TcpTransportFactory;
 import io.scalecube.transport.netty.websocket.WebsocketTransportFactory;
 import java.time.Duration;
@@ -37,7 +36,7 @@ public final void baseTearDown(TestInfo testInfo) {
    * @param to destination
    * @param msg request
    */
-  protected Mono<Void> send(Transport transport, Address to, Message msg) {
+  protected Mono<Void> send(Transport transport, String to, Message msg) {
     return transport
         .send(to, msg)
         .doOnError(
diff --git a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/tcp/TcpTransportSendOrderTest.java b/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/tcp/TcpTransportSendOrderTest.java
index 753622d7..417c6f0b 100644
--- a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/tcp/TcpTransportSendOrderTest.java
+++ b/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/tcp/TcpTransportSendOrderTest.java
@@ -4,7 +4,6 @@
 
 import io.scalecube.cluster.transport.api.Message;
 import io.scalecube.cluster.transport.api.Transport;
-import io.scalecube.net.Address;
 import io.scalecube.transport.netty.BaseTest;
 import java.time.Duration;
 import java.util.ArrayList;
@@ -220,7 +219,7 @@ private void assertSendOrder(int total, List<Message> received) {
     }
   }
 
-  private Callable<Void> sender(int id, Transport client, Address address, int total) {
+  private Callable<Void> sender(int id, Transport client, String address, int total) {
     return () -> {
       for (int j = 0; j < total; j++) {
         String correlationId = id + "/" + j;
diff --git a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/tcp/TcpTransportTest.java b/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/tcp/TcpTransportTest.java
index f8bb8daa..9c5ed4b9 100644
--- a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/tcp/TcpTransportTest.java
+++ b/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/tcp/TcpTransportTest.java
@@ -8,7 +8,6 @@
 
 import io.scalecube.cluster.transport.api.Message;
 import io.scalecube.cluster.utils.NetworkEmulatorTransport;
-import io.scalecube.net.Address;
 import io.scalecube.transport.netty.BaseTest;
 import java.io.IOException;
 import java.net.UnknownHostException;
@@ -45,7 +44,7 @@ public void testUnresolvedHostConnection() {
     client = createTcpTransport();
     // create transport with wrong host
     try {
-      Address address = Address.from("wronghost:49255");
+      String address = "wronghost:49255";
       Message message = Message.withData("q").build();
       client.send(address, message).block(Duration.ofSeconds(20));
       fail("fail");
@@ -57,7 +56,7 @@ public void testUnresolvedHostConnection() {
 
   @Test
   public void testInteractWithNoConnection(TestInfo testInfo) {
-    Address serverAddress = Address.from("localhost:49255");
+    String serverAddress = "localhost:49255";
     for (int i = 0; i < 10; i++) {
       LOGGER.debug("####### {} : iteration = {}", testInfo.getDisplayName(), i);
 
@@ -94,7 +93,7 @@ public void testPingPongClientTfListenAndServerTfListen() throws Exception {
         .listen()
         .subscribe(
             message -> {
-              Address address = message.sender();
+              String address = message.sender();
               assertEquals(client.address(), address, "Expected clientAddress");
               send(server, address, Message.fromQualifier("hi client")).subscribe();
             });
diff --git a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/websocket/WebsocketTransportSendOrderTest.java b/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/websocket/WebsocketTransportSendOrderTest.java
index ab17c5b1..8ed11235 100644
--- a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/websocket/WebsocketTransportSendOrderTest.java
+++ b/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/websocket/WebsocketTransportSendOrderTest.java
@@ -4,7 +4,6 @@
 
 import io.scalecube.cluster.transport.api.Message;
 import io.scalecube.cluster.transport.api.Transport;
-import io.scalecube.net.Address;
 import io.scalecube.transport.netty.BaseTest;
 import java.time.Duration;
 import java.util.ArrayList;
@@ -220,7 +219,7 @@ private void assertSendOrder(int total, List<Message> received) {
     }
   }
 
-  private Callable<Void> sender(int id, Transport client, Address address, int total) {
+  private Callable<Void> sender(int id, Transport client, String address, int total) {
     return () -> {
       for (int j = 0; j < total; j++) {
         String correlationId = id + "/" + j;
diff --git a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/websocket/WebsocketTransportTest.java b/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/websocket/WebsocketTransportTest.java
index 050474af..9d5867c4 100644
--- a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/websocket/WebsocketTransportTest.java
+++ b/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/websocket/WebsocketTransportTest.java
@@ -8,7 +8,6 @@
 
 import io.scalecube.cluster.transport.api.Message;
 import io.scalecube.cluster.utils.NetworkEmulatorTransport;
-import io.scalecube.net.Address;
 import io.scalecube.transport.netty.BaseTest;
 import java.io.IOException;
 import java.net.UnknownHostException;
@@ -45,7 +44,7 @@ public void testUnresolvedHostConnection() {
     client = createWebsocketTransport();
     // create transport with wrong host
     try {
-      Address address = Address.from("wronghost:49255");
+      String address = "wronghost:49255";
       Message message = Message.withData("q").build();
       client.send(address, message).block(Duration.ofSeconds(20));
       fail("fail");
@@ -57,7 +56,7 @@ public void testUnresolvedHostConnection() {
 
   @Test
   public void testInteractWithNoConnection(TestInfo testInfo) {
-    Address serverAddress = Address.from("localhost:49255");
+    String serverAddress = "localhost:49255";
     for (int i = 0; i < 10; i++) {
       LOGGER.debug("####### {} : iteration = {}", testInfo.getDisplayName(), i);
 
@@ -94,7 +93,7 @@ public void testPingPongClientTfListenAndServerTfListen() throws Exception {
         .listen()
         .subscribe(
             message -> {
-              Address address = message.sender();
+              String address = message.sender();
               assertEquals(client.address(), address, "Expected clientAddress");
               send(server, address, Message.fromQualifier("hi client")).subscribe();
             });