From fef7960a95d2a2c7a28509a8adbb30650f5d31a1 Mon Sep 17 00:00:00 2001 From: Artem Vysochyn Date: Mon, 7 Oct 2024 09:46:56 +0300 Subject: [PATCH] Upgrade to jdk17 and pom cleanup (#395) * Set new parent (with jdk 17). * Updated CI yaml files. * Removed slf4j from dependencies. * Cleanup poms. * Bump `jackson` to `2.18.0`. --- .github/workflows/branch-ci.yml | 17 +- .github/workflows/pre-release-ci.yml | 15 +- .github/workflows/release-ci.yml | 24 +-- checkstyle-suppressions.xml | 14 ++ .../cluster/utils/NetworkEmulator.java | 34 ++-- .../io/scalecube/cluster/utils/BaseTest.java | 10 +- cluster/pom.xml | 10 -- .../io/scalecube/cluster/ClusterImpl.java | 51 ++++-- .../fdetector/FailureDetectorImpl.java | 117 ++++++++----- .../cluster/gossip/GossipProtocolImpl.java | 53 ++++-- .../membership/MembershipProtocolImpl.java | 155 +++++++++++------- .../cluster/metadata/MetadataStoreImpl.java | 63 ++++--- .../java/io/scalecube/cluster/BaseTest.java | 10 +- .../io/scalecube/cluster/ClusterTest.java | 16 +- .../cluster/gossip/GossipProtocolTest.java | 48 ++++-- examples/pom.xml | 8 +- pom.xml | 101 ++++++------ transport-parent/pom.xml | 21 +-- .../transport/netty/TransportImpl.java | 57 ++++--- .../scalecube/transport/netty/BaseTest.java | 17 +- .../netty/tcp/TcpTransportSendOrderTest.java | 32 ++-- .../transport/netty/tcp/TcpTransportTest.java | 19 ++- .../WebsocketTransportSendOrderTest.java | 32 ++-- .../websocket/WebsocketTransportTest.java | 19 ++- 24 files changed, 557 insertions(+), 386 deletions(-) create mode 100644 checkstyle-suppressions.xml diff --git a/.github/workflows/branch-ci.yml b/.github/workflows/branch-ci.yml index 603791e5..5787348a 100644 --- a/.github/workflows/branch-ci.yml +++ b/.github/workflows/branch-ci.yml @@ -14,25 +14,24 @@ jobs: name: Branch CI runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 - - uses: actions/cache@v1 + - uses: actions/checkout@v4 + - uses: actions/cache@v3 with: path: ~/.m2/repository key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }} restore-keys: | ${{ runner.os }}-maven- - - name: Set up JDK 1.8 - uses: actions/setup-java@v1 + - name: Set up JDK + uses: actions/setup-java@v4 with: - java-version: 1.8 + java-version: 17 + distribution: zulu server-id: github server-username: GITHUB_ACTOR server-password: GITHUB_TOKEN - name: Maven Build - run: mvn clean install -DskipTests=true -Dmaven.javadoc.skip=true -Ddockerfile.skip=true -B -V + run: mvn clean install -DskipTests=true -Dmaven.javadoc.skip=true -B -V env: GITHUB_TOKEN: ${{ secrets.ORGANIZATION_TOKEN }} - name: Maven Verify - run: | - sudo echo "127.0.0.1 $(eval hostname)" | sudo tee -a /etc/hosts - mvn verify -B + run: mvn verify -B diff --git a/.github/workflows/pre-release-ci.yml b/.github/workflows/pre-release-ci.yml index 4948f5bc..245e4d48 100644 --- a/.github/workflows/pre-release-ci.yml +++ b/.github/workflows/pre-release-ci.yml @@ -9,23 +9,23 @@ jobs: name: Pre-release CI runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 - - uses: actions/cache@v1 + - uses: actions/checkout@v4 + - uses: actions/cache@v3 with: path: ~/.m2/repository key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }} restore-keys: | ${{ runner.os }}-maven- - name: Set up Java for publishing to GitHub Packages - uses: actions/setup-java@v1 + uses: actions/setup-java@v4 with: - java-version: 1.8 + java-version: 17 + distribution: zulu server-id: github server-username: GITHUB_ACTOR server-password: GITHUB_TOKEN - name: Deploy pre-release version to GitHub Packages run: | - sudo echo "127.0.0.1 $(eval hostname)" | sudo tee -a /etc/hosts pre_release_version=${{ github.event.release.tag_name }} echo Pre-release version $pre_release_version mvn versions:set -DnewVersion=$pre_release_version -DgenerateBackupPoms=false @@ -34,9 +34,10 @@ jobs: env: GITHUB_TOKEN: ${{ secrets.ORGANIZATION_TOKEN }} - name: Set up Java for publishing to Maven Central Repository - uses: actions/setup-java@v1 + uses: actions/setup-java@v4 with: - java-version: 1.8 + java-version: 17 + distribution: zulu server-id: ossrh server-username: MAVEN_USERNAME server-password: MAVEN_PASSWORD diff --git a/.github/workflows/release-ci.yml b/.github/workflows/release-ci.yml index 32e3623a..4924f526 100644 --- a/.github/workflows/release-ci.yml +++ b/.github/workflows/release-ci.yml @@ -9,31 +9,30 @@ jobs: name: Release CI runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 with: fetch-depth: 0 - run: git checkout ${{ github.event.release.target_commitish }} - - uses: actions/cache@v1 + - uses: actions/cache@v3 with: path: ~/.m2/repository key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }} restore-keys: | ${{ runner.os }}-maven- - name: Set up Java for publishing to GitHub Packages - uses: actions/setup-java@v1 + uses: actions/setup-java@v4 with: - java-version: 1.8 + java-version: 17 + distribution: zulu server-id: github server-username: GITHUB_ACTOR server-password: GITHUB_TOKEN - name: Maven Build - run: mvn clean install -DskipTests=true -Ddockerfile.skip=true -B -V + run: mvn clean install -DskipTests=true -B -V env: GITHUB_TOKEN: ${{ secrets.ORGANIZATION_TOKEN }} - name: Maven Verify - run: | - sudo echo "127.0.0.1 $(eval hostname)" | sudo tee -a /etc/hosts - mvn verify -B + run: mvn verify -B - name: Configure git run: | git config --global user.email "${GITHUB_ACTOR}@users.noreply.github.com" @@ -43,17 +42,18 @@ jobs: run: | mvn -B build-helper:parse-version release:prepare \ -DreleaseVersion=\${parsedVersion.majorVersion}.\${parsedVersion.minorVersion}.\${parsedVersion.incrementalVersion} \ - -Darguments="-DskipTests=true -Ddockerfile.skip=true" + -Darguments="-DskipTests=true" echo release_tag=$(git describe --tags --abbrev=0) >> $GITHUB_OUTPUT - name: Perform release - run: mvn -B release:perform -Pdeploy2Github -Darguments="-DskipTests=true -Ddockerfile.skip=true -Pdeploy2Github" + run: mvn -B release:perform -Pdeploy2Github -Darguments="-DskipTests=true -Pdeploy2Github" env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} GITHUB_REPOSITORY: ${{ secrets.GITHUB_REPOSITORY }} - name: Set up Java for publishing to Maven Central Repository - uses: actions/setup-java@v1 + uses: actions/setup-java@v4 with: - java-version: 1.8 + java-version: 17 + distribution: zulu server-id: ossrh server-username: MAVEN_USERNAME server-password: MAVEN_PASSWORD diff --git a/checkstyle-suppressions.xml b/checkstyle-suppressions.xml new file mode 100644 index 00000000..dbd0fdb1 --- /dev/null +++ b/checkstyle-suppressions.xml @@ -0,0 +1,14 @@ + + + + + + + + + + + + diff --git a/cluster-testlib/src/main/java/io/scalecube/cluster/utils/NetworkEmulator.java b/cluster-testlib/src/main/java/io/scalecube/cluster/utils/NetworkEmulator.java index 552c10bc..030081e3 100644 --- a/cluster-testlib/src/main/java/io/scalecube/cluster/utils/NetworkEmulator.java +++ b/cluster-testlib/src/main/java/io/scalecube/cluster/utils/NetworkEmulator.java @@ -1,6 +1,8 @@ package io.scalecube.cluster.utils; import io.scalecube.cluster.transport.api.Message; +import java.lang.System.Logger; +import java.lang.System.Logger.Level; import java.time.Duration; import java.util.Arrays; import java.util.Collection; @@ -9,8 +11,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicLong; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; /** @@ -24,7 +24,7 @@ */ public final class NetworkEmulator { - private static final Logger LOGGER = LoggerFactory.getLogger(NetworkEmulator.class); + private static final Logger LOGGER = System.getLogger(NetworkEmulator.class.getName()); private volatile OutboundSettings defaultOutboundSettings = new OutboundSettings(0, 0); private volatile InboundSettings defaultInboundSettings = new InboundSettings(true); @@ -69,7 +69,8 @@ public OutboundSettings outboundSettings(String destination) { public void outboundSettings(String destination, int lossPercent, int meanDelay) { OutboundSettings settings = new OutboundSettings(lossPercent, meanDelay); outboundSettings.put(destination, settings); - LOGGER.debug("[{}] Set outbound settings {} to {}", address, settings, destination); + LOGGER.log( + Level.DEBUG, "[{0}] Set outbound settings {1} to {2}", address, settings, destination); } /** @@ -80,21 +81,22 @@ public void outboundSettings(String destination, int lossPercent, int meanDelay) */ public void setDefaultOutboundSettings(int lossPercent, int meanDelay) { defaultOutboundSettings = new OutboundSettings(lossPercent, meanDelay); - LOGGER.debug("[{}] Set default outbound settings {}", address, defaultOutboundSettings); + LOGGER.log( + Level.DEBUG, "[{0}] Set default outbound settings {1}", address, defaultOutboundSettings); } /** Blocks outbound messages to all destinations. */ public void blockAllOutbound() { outboundSettings.clear(); setDefaultOutboundSettings(100, 0); - LOGGER.debug("[{}] Blocked outbound to all destinations", address); + LOGGER.log(Level.DEBUG, "[{0}] Blocked outbound to all destinations", address); } /** Unblocks outbound messages to all destinations. */ public void unblockAllOutbound() { outboundSettings.clear(); setDefaultOutboundSettings(0, 0); - LOGGER.debug("[{}] Unblocked outbound to all destinations", address); + LOGGER.log(Level.DEBUG, "[{0}] Unblocked outbound to all destinations", address); } /** @@ -115,7 +117,7 @@ public void blockOutbound(Collection destinations) { for (String destination : destinations) { outboundSettings.put(destination, new OutboundSettings(100, 0)); } - LOGGER.debug("[{}] Blocked outbound to {}", address, destinations); + LOGGER.log(Level.DEBUG, "[{0}] Blocked outbound to {1}", address, destinations); } /** @@ -134,7 +136,7 @@ public void unblockOutbound(String... destinations) { */ public void unblockOutbound(Collection destinations) { destinations.forEach(outboundSettings::remove); - LOGGER.debug("[{}] Unblocked outbound {}", address, destinations); + LOGGER.log(Level.DEBUG, "[{0}] Unblocked outbound {1}", address, destinations); } /** @@ -220,7 +222,8 @@ public InboundSettings inboundSettings(String destination) { public void inboundSettings(String destination, boolean shallPass) { InboundSettings settings = new InboundSettings(shallPass); inboundSettings.put(destination, settings); - LOGGER.debug("[{}] Set inbound settings {} to {}", address, settings, destination); + LOGGER.log( + Level.DEBUG, "[{0}] Set inbound settings {1} to {2}", address, settings, destination); } /** @@ -230,21 +233,22 @@ public void inboundSettings(String destination, boolean shallPass) { */ public void setDefaultInboundSettings(boolean shallPass) { defaultInboundSettings = new InboundSettings(shallPass); - LOGGER.debug("[{}] Set default inbound settings {}", address, defaultInboundSettings); + LOGGER.log( + Level.DEBUG, "[{0}] Set default inbound settings {1}", address, defaultInboundSettings); } /** Blocks inbound messages from all destinations. */ public void blockAllInbound() { inboundSettings.clear(); setDefaultInboundSettings(false); - LOGGER.debug("[{}] Blocked inbound from all destinations", address); + LOGGER.log(Level.DEBUG, "[{0}] Blocked inbound from all destinations", address); } /** Unblocks inbound messages to all destinations. */ public void unblockAllInbound() { inboundSettings.clear(); setDefaultInboundSettings(true); - LOGGER.debug("[{}] Unblocked inbound from all destinations", address); + LOGGER.log(Level.DEBUG, "[{0}] Unblocked inbound from all destinations", address); } /** @@ -265,7 +269,7 @@ public void blockInbound(Collection destinations) { for (String destination : destinations) { inboundSettings.put(destination, new InboundSettings(false)); } - LOGGER.debug("[{}] Blocked inbound from {}", address, destinations); + LOGGER.log(Level.DEBUG, "[{0}] Blocked inbound from {1}", address, destinations); } /** @@ -284,7 +288,7 @@ public void unblockInbound(String... destinations) { */ public void unblockInbound(Collection destinations) { destinations.forEach(inboundSettings::remove); - LOGGER.debug("[{}] Unblocked inbound from {}", address, destinations); + LOGGER.log(Level.DEBUG, "[{0}] Unblocked inbound from {1}", address, destinations); } /** diff --git a/cluster-testlib/src/test/java/io/scalecube/cluster/utils/BaseTest.java b/cluster-testlib/src/test/java/io/scalecube/cluster/utils/BaseTest.java index 4519b3ba..610c017b 100644 --- a/cluster-testlib/src/test/java/io/scalecube/cluster/utils/BaseTest.java +++ b/cluster-testlib/src/test/java/io/scalecube/cluster/utils/BaseTest.java @@ -1,23 +1,23 @@ package io.scalecube.cluster.utils; +import java.lang.System.Logger; +import java.lang.System.Logger.Level; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.TestInfo; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** Base test class. */ public class BaseTest { - protected static final Logger LOGGER = LoggerFactory.getLogger(BaseTest.class); + protected static final Logger LOGGER = System.getLogger(BaseTest.class.getName()); @BeforeEach public final void baseSetUp(TestInfo testInfo) { - LOGGER.info("***** Test started : " + testInfo.getDisplayName() + " *****"); + LOGGER.log(Level.INFO, "***** Test started : " + testInfo.getDisplayName() + " *****"); } @AfterEach public final void baseTearDown(TestInfo testInfo) { - LOGGER.info("***** Test finished : " + testInfo.getDisplayName() + " *****"); + LOGGER.log(Level.INFO, "***** Test finished : " + testInfo.getDisplayName() + " *****"); } } diff --git a/cluster/pom.xml b/cluster/pom.xml index 8b9287ed..d9666d18 100644 --- a/cluster/pom.xml +++ b/cluster/pom.xml @@ -38,16 +38,6 @@ ${project.version} test - - org.apache.logging.log4j - log4j-slf4j-impl - test - - - org.apache.logging.log4j - log4j-core - test - diff --git a/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java b/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java index 27309586..566cfe03 100644 --- a/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java @@ -19,6 +19,8 @@ import io.scalecube.cluster.transport.api.TransportConfig; import io.scalecube.cluster.transport.api.TransportFactory; import java.io.Serializable; +import java.lang.System.Logger; +import java.lang.System.Logger.Level; import java.nio.ByteBuffer; import java.time.Duration; import java.util.Collection; @@ -35,8 +37,6 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.stream.StreamSupport; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import reactor.core.Disposable; import reactor.core.Disposables; import reactor.core.publisher.Flux; @@ -48,7 +48,7 @@ /** Cluster implementation. */ public final class ClusterImpl implements Cluster { - private static final Logger LOGGER = LoggerFactory.getLogger(Cluster.class); + private static final Logger LOGGER = System.getLogger(Cluster.class.getName()); private static final Pattern NAMESPACE_PATTERN = Pattern.compile("^(\\w+[\\w\\-./]*\\w)+"); @@ -115,7 +115,9 @@ private void initLifecycle() { .then(doStart()) .doOnSuccess(avoid -> onStart.emitEmpty(busyLooping(Duration.ofSeconds(3)))) .doOnError(th -> onStart.emitError(th, busyLooping(Duration.ofSeconds(3)))) - .subscribe(null, th -> LOGGER.error("[{}][doStart] Exception occurred:", localMember, th)); + .subscribe( + null, + th -> LOGGER.log(Level.ERROR, "[{0}][doStart] Exception occurred:", localMember, th)); shutdown .asMono() @@ -124,7 +126,11 @@ private void initLifecycle() { .subscribe( null, th -> - LOGGER.warn("[{}][doShutdown] Exception occurred: {}", localMember, th.toString())); + LOGGER.log( + Level.WARNING, + "[{0}][doShutdown] Exception occurred: {1}", + localMember, + th.toString())); } /** @@ -287,7 +293,9 @@ private Mono doStart0() { .subscribe( event -> membershipSink.emitNext(event, busyLooping(Duration.ofSeconds(3))), - ex -> LOGGER.error("[{}][membership][error] cause:", localMember, ex), + ex -> + LOGGER.log( + Level.ERROR, "[{0}][membership][error] cause:", localMember, ex), () -> membershipSink.emitComplete(busyLooping(Duration.ofSeconds(3))))); return Mono.fromRunnable(() -> failureDetector.start()) @@ -297,8 +305,10 @@ private Mono doStart0() { .then(membership.start()) .then(); }) - .doOnSubscribe(s -> LOGGER.info("[{}][doStart] Starting, config: {}", localMember, config)) - .doOnSuccess(avoid -> LOGGER.info("[{}][doStart] Started", localMember)) + .doOnSubscribe( + s -> + LOGGER.log(Level.INFO, "[{0}][doStart] Starting, config: {1}", localMember, config)) + .doOnSuccess(avoid -> LOGGER.log(Level.INFO, "[{0}][doStart] Started", localMember)) .thenReturn(this); } @@ -339,17 +349,19 @@ private void startHandler() { listenMessage() .subscribe( handler::onMessage, - ex -> LOGGER.error("[{}][onMessage][error] cause:", localMember, ex))); + ex -> LOGGER.log(Level.ERROR, "[{0}][onMessage][error] cause:", localMember, ex))); actionsDisposables.add( listenMembership() .subscribe( handler::onMembershipEvent, - ex -> LOGGER.error("[{}][onMembershipEvent][error] cause:", localMember, ex))); + ex -> + LOGGER.log( + Level.ERROR, "[{0}][onMembershipEvent][error] cause:", localMember, ex))); actionsDisposables.add( listenGossip() .subscribe( handler::onGossip, - ex -> LOGGER.error("[{}][onGossip][error] cause:", localMember, ex))); + ex -> LOGGER.log(Level.ERROR, "[{0}][onGossip][error] cause:", localMember, ex))); } private Flux listenMessage() { @@ -457,11 +469,12 @@ public void shutdown() { private Mono doShutdown() { return Mono.defer( () -> { - LOGGER.info("[{}][doShutdown] Shutting down", localMember); + LOGGER.log(Level.INFO, "[{0}][doShutdown] Shutting down", localMember); return Flux.concatDelayError(leaveCluster(), dispose(), transport.stop()) .then() .doFinally(s -> scheduler.dispose()) - .doOnSuccess(avoid -> LOGGER.info("[{}][doShutdown] Shutdown", localMember)); + .doOnSuccess( + avoid -> LOGGER.log(Level.INFO, "[{0}][doShutdown] Shutdown", localMember)); }); } @@ -469,12 +482,16 @@ private Mono leaveCluster() { return membership .leaveCluster() .subscribeOn(scheduler) - .doOnSubscribe(s -> LOGGER.info("[{}][leaveCluster] Leaving cluster", localMember)) - .doOnSuccess(s -> LOGGER.info("[{}][leaveCluster] Left cluster", localMember)) + .doOnSubscribe( + s -> LOGGER.log(Level.INFO, "[{0}][leaveCluster] Leaving cluster", localMember)) + .doOnSuccess(s -> LOGGER.log(Level.INFO, "[{0}][leaveCluster] Left cluster", localMember)) .doOnError( ex -> - LOGGER.warn( - "[{}][leaveCluster] Exception occurred: {}", localMember, ex.toString())) + LOGGER.log( + Level.WARNING, + "[{0}][leaveCluster] Exception occurred: {1}", + localMember, + ex.toString())) .then(); } diff --git a/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java b/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java index 274f5a08..a85b4b3c 100644 --- a/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java @@ -8,6 +8,8 @@ import io.scalecube.cluster.membership.MembershipEvent; import io.scalecube.cluster.transport.api.Message; import io.scalecube.cluster.transport.api.Transport; +import java.lang.System.Logger; +import java.lang.System.Logger.Level; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; @@ -17,8 +19,6 @@ import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import reactor.core.Disposable; import reactor.core.Disposables; import reactor.core.publisher.Flux; @@ -27,7 +27,7 @@ public final class FailureDetectorImpl implements FailureDetector { - private static final Logger LOGGER = LoggerFactory.getLogger(FailureDetector.class); + private static final Logger LOGGER = System.getLogger(FailureDetector.class.getName()); // Qualifiers @@ -89,15 +89,20 @@ public FailureDetectorImpl( .subscribe( this::onMessage, ex -> - LOGGER.error( - "[{}][{}][onMessage][error] cause:", localMember, currentPeriod, ex)), + LOGGER.log( + Level.ERROR, + "[{0}][{1}][onMessage][error] cause:", + localMember, + currentPeriod, + ex)), membershipProcessor // Listen membership events to update remoteMembers .publishOn(scheduler) .subscribe( this::onMembershipEvent, ex -> - LOGGER.error( - "[{}][{}][onMembershipEvent][error] cause:", + LOGGER.log( + Level.ERROR, + "[{0}][{1}][onMembershipEvent][error] cause:", localMember, currentPeriod, ex)))); @@ -143,7 +148,7 @@ private void doPing() { PingData pingData = new PingData(localMember, pingMember); Message pingMsg = Message.withData(pingData).qualifier(PING).correlationId(cid).build(); - LOGGER.debug("[{}][{}] Send Ping to {}", localMember, period, pingMember); + LOGGER.log(Level.DEBUG, "[{0}][{1}] Send Ping to {2}", localMember, period, pingMember); String address = pingMember.address(); transport .requestResponse(address, pingMsg) @@ -151,13 +156,18 @@ private void doPing() { .publishOn(scheduler) .subscribe( message -> { - LOGGER.debug( - "[{}][{}] Received PingAck from {}", localMember, period, message.sender()); + LOGGER.log( + Level.DEBUG, + "[{0}][{1}] Received PingAck from {2}", + localMember, + period, + message.sender()); publishPingResult(period, pingMember, computeMemberStatus(message, period)); }, ex -> { - LOGGER.debug( - "[{}][{}] Failed to get PingAck from {} within {} ms", + LOGGER.log( + Level.DEBUG, + "[{0}][{1}] Failed to get PingAck from {2} within {3} ms", localMember, period, pingMember, @@ -167,7 +177,7 @@ private void doPing() { final List pingReqMembers = selectPingReqMembers(pingMember); if (timeLeft <= 0 || pingReqMembers.isEmpty()) { - LOGGER.debug("[{}][{}] No PingReq occurred", localMember, period); + LOGGER.log(Level.DEBUG, "[{0}][{1}] No PingReq occurred", localMember, period); publishPingResult(period, pingMember, MemberStatus.SUSPECT); } else { doPingReq(currentPeriod, pingMember, pingReqMembers, cid); @@ -182,8 +192,13 @@ private void doPingReq( .qualifier(PING_REQ) .correlationId(cid) .build(); - LOGGER.debug( - "[{}][{}] Send PingReq to {} for {}", localMember, period, pingReqMembers, pingMember); + LOGGER.log( + Level.DEBUG, + "[{0}][{1}] Send PingReq to {2} for {3}", + localMember, + period, + pingReqMembers, + pingMember); Duration timeout = Duration.ofMillis(config.pingInterval() - config.pingTimeout()); pingReqMembers.forEach( @@ -194,8 +209,9 @@ private void doPingReq( .publishOn(scheduler) .subscribe( message -> { - LOGGER.debug( - "[{}][{}] Received transit PingAck from {} to {}", + LOGGER.log( + Level.DEBUG, + "[{0}][{1}] Received transit PingAck from {2} to {3}", localMember, period, message.sender(), @@ -203,8 +219,10 @@ private void doPingReq( publishPingResult(period, pingMember, computeMemberStatus(message, period)); }, throwable -> { - LOGGER.debug( - "[{}][{}] Timeout getting transit PingAck from {} to {} within {} ms", + LOGGER.log( + Level.DEBUG, + "[{0}][{1}] " + + "Timeout getting transit PingAck from {2} to {3} within {4} ms", localMember, period, member, @@ -232,12 +250,13 @@ private void onMessage(Message message) { private void onPing(Message message) { long period = this.currentPeriod; String sender = message.sender(); - LOGGER.debug("[{}][{}] Received Ping from {}", localMember, period, sender); + LOGGER.log(Level.DEBUG, "[{0}][{1}] Received Ping from {2}", localMember, period, sender); PingData data = message.data(); data = data.withAckType(AckType.DEST_OK); if (!data.getTo().id().equals(localMember.id())) { - LOGGER.debug( - "[{}][{}] Received Ping from {} to {}, but local member is {}", + LOGGER.log( + Level.DEBUG, + "[{0}][{1}] Received Ping from {2} to {3}, but local member is {4}", localMember, period, sender, @@ -249,14 +268,15 @@ private void onPing(Message message) { Message ackMessage = Message.withData(data).qualifier(PING_ACK).correlationId(correlationId).build(); String address = data.getFrom().address(); - LOGGER.debug("[{}][{}] Send PingAck to {}", localMember, period, address); + LOGGER.log(Level.DEBUG, "[{0}][{1}] Send PingAck to {2}", localMember, period, address); transport .send(address, ackMessage) .subscribe( null, ex -> - LOGGER.debug( - "[{}][{}] Failed to send PingAck to {}, cause: {}", + LOGGER.log( + Level.DEBUG, + "[{0}][{1}] Failed to send PingAck to {2}, cause: {3}", localMember, period, address, @@ -266,7 +286,8 @@ private void onPing(Message message) { /** Listens to PING_REQ message and sends PING to requested cluster member. */ private void onPingReq(Message message) { long period = this.currentPeriod; - LOGGER.debug("[{}][{}] Received PingReq from {}", localMember, period, message.sender()); + LOGGER.log( + Level.DEBUG, "[{0}][{1}] Received PingReq from {2}", localMember, period, message.sender()); PingData data = message.data(); Member target = data.getTo(); Member originalIssuer = data.getFrom(); @@ -275,14 +296,15 @@ private void onPingReq(Message message) { Message pingMessage = Message.withData(pingReqData).qualifier(PING).correlationId(correlationId).build(); String address = target.address(); - LOGGER.debug("[{}][{}] Send transit Ping to {}", localMember, period, address); + LOGGER.log(Level.DEBUG, "[{0}][{1}] Send transit Ping to {2}", localMember, period, address); transport .send(address, pingMessage) .subscribe( null, ex -> - LOGGER.debug( - "[{}][{}] Failed to send transit Ping to {}, cause: {}", + LOGGER.log( + Level.DEBUG, + "[{0}][{1}] Failed to send transit Ping to {2}, cause: {3}", localMember, period, address, @@ -295,8 +317,12 @@ private void onPingReq(Message message) { */ private void onTransitPingAck(Message message) { long period = this.currentPeriod; - LOGGER.debug( - "[{}][{}] Received transit PingAck from {}", localMember, period, message.sender()); + LOGGER.log( + Level.DEBUG, + "[{0}][{1}] Received transit PingAck from {2}", + localMember, + period, + message.sender()); PingData data = message.data(); AckType ackType = data.getAckType(); Member target = data.getOriginalIssuer(); @@ -305,14 +331,16 @@ private void onTransitPingAck(Message message) { Message originalAckMessage = Message.withData(originalAckData).qualifier(PING_ACK).correlationId(correlationId).build(); String address = target.address(); - LOGGER.debug("[{}][{}] Resend transit PingAck to {}", localMember, period, address); + LOGGER.log( + Level.DEBUG, "[{0}][{1}] Resend transit PingAck to {2}", localMember, period, address); transport .send(address, originalAckMessage) .subscribe( null, ex -> - LOGGER.debug( - "[{}][{}] Failed to resend transit PingAck to {}, cause: {}", + LOGGER.log( + Level.DEBUG, + "[{0}][{1}] Failed to resend transit PingAck to {2}, cause: {3}", localMember, period, address, @@ -324,8 +352,9 @@ private void onMembershipEvent(MembershipEvent event) { if (event.isRemoved()) { boolean removed = pingMembers.remove(member); if (removed) { - LOGGER.debug( - "[{}][{}] Removed {} from pingMembers list (size={})", + LOGGER.log( + Level.DEBUG, + "[{0}][{1}] Removed {2} from pingMembers list (size={3})", localMember, currentPeriod, member, @@ -337,8 +366,9 @@ private void onMembershipEvent(MembershipEvent event) { int size = pingMembers.size(); int index = size > 0 ? ThreadLocalRandom.current().nextInt(size) : 0; pingMembers.add(index, member); - LOGGER.debug( - "[{}][{}] Added {} to pingMembers list (size={})", + LOGGER.log( + Level.DEBUG, + "[{0}][{1}] Added {2} to pingMembers list (size={3})", localMember, currentPeriod, member, @@ -376,7 +406,8 @@ private List selectPingReqMembers(Member pingMember) { } private void publishPingResult(long period, Member member, MemberStatus status) { - LOGGER.debug("[{}][{}] Member {} detected as {}", localMember, period, member, status); + LOGGER.log( + Level.DEBUG, "[{0}][{1}] Member {2} detected as {3}", localMember, period, member, status); sink.emitNext(new FailureDetectorEvent(member, status), busyLooping(Duration.ofSeconds(3))); } @@ -397,8 +428,12 @@ private MemberStatus computeMemberStatus(Message message, long period) { memberStatus = MemberStatus.DEAD; break; default: - LOGGER.warn( - "[{}][{}] Unknown PingData.AckType received '{}'", localMember, period, ackType); + LOGGER.log( + Level.WARNING, + "[{0}][{1}] Unknown PingData.AckType received: {2}", + localMember, + period, + ackType); memberStatus = MemberStatus.SUSPECT; } return memberStatus; diff --git a/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java b/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java index 3657c06b..28cb2857 100644 --- a/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java @@ -7,6 +7,8 @@ import io.scalecube.cluster.membership.MembershipEvent; import io.scalecube.cluster.transport.api.Message; import io.scalecube.cluster.transport.api.Transport; +import java.lang.System.Logger; +import java.lang.System.Logger.Level; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; @@ -19,8 +21,6 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import reactor.core.Disposable; import reactor.core.Disposables; import reactor.core.publisher.Flux; @@ -31,7 +31,7 @@ public final class GossipProtocolImpl implements GossipProtocol { - private static final Logger LOGGER = LoggerFactory.getLogger(GossipProtocol.class); + private static final Logger LOGGER = System.getLogger(GossipProtocol.class.getName()); // Qualifiers @@ -94,14 +94,24 @@ public GossipProtocolImpl( .publishOn(scheduler) .subscribe( this::onMembershipEvent, - ex -> LOGGER.error("[{}][onMembershipEvent][error] cause:", localMember, ex)), + ex -> + LOGGER.log( + Level.ERROR, + "[{0}][onMembershipEvent][error] cause:", + localMember, + ex)), transport .listen() // Listen gossip requests .publishOn(scheduler) .filter(this::isGossipRequest) .subscribe( this::onGossipRequest, - ex -> LOGGER.error("[{}][onGossipRequest][error] cause:", localMember, ex)))); + ex -> + LOGGER.log( + Level.ERROR, + "[{0}][onGossipRequest][error] cause:", + localMember, + ex)))); } @Override @@ -158,7 +168,8 @@ private void doSpreadGossip() { // Sweep gossips Set gossipsToRemove = getGossipsToRemove(period); if (!gossipsToRemove.isEmpty()) { - LOGGER.debug("[{}][{}] Sweep gossips: {}", localMember, period, gossipsToRemove); + LOGGER.log( + Level.DEBUG, "[{0}][{1}] Sweep gossips: {2}", localMember, period, gossipsToRemove); for (String gossipId : gossipsToRemove) { gossips.remove(gossipId); } @@ -167,8 +178,9 @@ private void doSpreadGossip() { // Check spread gossips Set gossipsThatSpread = getGossipsThatMostLikelyDisseminated(period); if (!gossipsThatSpread.isEmpty()) { - LOGGER.debug( - "[{}][{}] Most likely disseminated gossips: {}", + LOGGER.log( + Level.DEBUG, + "[{0}][{1}] Most likely disseminated gossips: {2}", localMember, period, gossipsThatSpread); @@ -180,7 +192,8 @@ private void doSpreadGossip() { } } } catch (Exception ex) { - LOGGER.warn("[{}][{}][doSpreadGossip] Exception occurred:", localMember, period, ex); + LOGGER.log( + Level.WARNING, "[{0}][{1}][doSpreadGossip] Exception occurred:", localMember, period, ex); } } @@ -225,9 +238,10 @@ private void checkGossipSegmentation() { // or network issue final SequenceIdCollector sequenceIdCollector = entry.getValue(); if (sequenceIdCollector.size() > intervalsThreshold) { - LOGGER.warn( - "[{}][{}] Too many missed gossip messages from original gossiper: '{}', " - + "current node({}) was SUSPECTED much for a long time or connection problem", + LOGGER.log( + Level.WARNING, + "[{0}][{1}] Too many missed gossip messages from original gossiper: {2}, " + + "current node({3}) was SUSPECTED much for a long time or connection problem", localMember, currentPeriod, entry.getKey(), @@ -244,8 +258,9 @@ private void onMembershipEvent(MembershipEvent event) { boolean removed = remoteMembers.remove(member); sequenceIdCollectors.remove(member.id()); if (removed) { - LOGGER.debug( - "[{}][{}] Removed {} from remoteMembers list (size={})", + LOGGER.log( + Level.DEBUG, + "[{0}][{1}] Removed {2} from remoteMembers list (size={3})", localMember, currentPeriod, member, @@ -254,8 +269,9 @@ private void onMembershipEvent(MembershipEvent event) { } if (event.isAdded()) { remoteMembers.add(member); - LOGGER.debug( - "[{}][{}] Added {} to remoteMembers list (size={})", + LOGGER.log( + Level.DEBUG, + "[{0}][{1}] Added {2} to remoteMembers list (size={3})", localMember, currentPeriod, member, @@ -298,8 +314,9 @@ private void spreadGossipsTo(long period, Member member) { .subscribe( null, ex -> - LOGGER.debug( - "[{}][{}] Failed to send GossipReq({}) to {}, cause: {}", + LOGGER.log( + Level.DEBUG, + "[{0}][{1}] Failed to send GossipReq({2}) to {3}, cause: {4}", localMember, period, message, diff --git a/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java b/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java index 5c73af81..855319aa 100644 --- a/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java @@ -16,6 +16,8 @@ import io.scalecube.cluster.metadata.MetadataStore; import io.scalecube.cluster.transport.api.Message; import io.scalecube.cluster.transport.api.Transport; +import java.lang.System.Logger; +import java.lang.System.Logger.Level; import java.net.InetAddress; import java.net.UnknownHostException; import java.nio.ByteBuffer; @@ -39,8 +41,6 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import reactor.core.Disposable; import reactor.core.Disposables; import reactor.core.publisher.Flux; @@ -52,7 +52,7 @@ @SuppressWarnings({"FieldCanBeLocal", "unused"}) public final class MembershipProtocolImpl implements MembershipProtocol { - private static final Logger LOGGER = LoggerFactory.getLogger(MembershipProtocol.class); + private static final Logger LOGGER = System.getLogger(MembershipProtocol.class.getName()); private enum MembershipUpdateReason { FAILURE_DETECTOR_EVENT, @@ -142,22 +142,30 @@ public MembershipProtocolImpl( .publishOn(scheduler) .subscribe( this::onMessage, - ex -> LOGGER.error("[{}][onMessage][error] cause:", localMember, ex)), + ex -> + LOGGER.log(Level.ERROR, "[{0}][onMessage][error] cause:", localMember, ex)), failureDetector .listen() // Listen to events from failure detector .publishOn(scheduler) .subscribe( this::onFailureDetectorEvent, ex -> - LOGGER.error( - "[{}][onFailureDetectorEvent][error] cause:", localMember, ex)), + LOGGER.log( + Level.ERROR, + "[{0}][onFailureDetectorEvent][error] cause:", + localMember, + ex)), gossipProtocol .listen() // Listen to membership gossips .publishOn(scheduler) .subscribe( this::onMembershipGossip, ex -> - LOGGER.error("[{}][onMembershipGossip][error] cause:", localMember, ex)))); + LOGGER.log( + Level.DEBUG, + "[{0}][onMembershipGossip][error] cause:", + localMember, + ex)))); } // Remove duplicates and local address(es) @@ -197,7 +205,7 @@ private boolean checkAddressesNotEqual(String address0, String address1) { if (!address0.equals(address1)) { return true; } else { - LOGGER.warn("[{}] Filtering out seed address: {}", localMember, address0); + LOGGER.log(Level.WARNING, "[{0}] Filtering out seed address: {1}", localMember, address0); return false; } } @@ -256,7 +264,8 @@ private void start0(MonoSink sink) { return; } // If seed addresses are specified in config - send initial sync to those nodes - LOGGER.info("[{}] Making initial Sync to all seed members: {}", localMember, seedMembers); + LOGGER.log( + Level.INFO, "[{0}] Making initial Sync to all seed members: {1}", localMember, seedMembers); //noinspection unchecked Mono[] syncs = @@ -268,8 +277,9 @@ private void start0(MonoSink sink) { address, prepareSyncDataMsg(SYNC, UUID.randomUUID().toString())) .doOnError( ex -> - LOGGER.warn( - "[{}] Exception on initial Sync, cause: {}", + LOGGER.log( + Level.WARNING, + "[{0}] Exception on initial Sync, cause: {1}", localMember, ex.toString())) .onErrorResume(Exception.class, e -> Mono.empty())) @@ -289,8 +299,11 @@ address, prepareSyncDataMsg(SYNC, UUID.randomUUID().toString())) .subscribe( null, ex -> - LOGGER.warn( - "[{}] Exception on initial SyncAck, cause: {}", localMember, ex.toString())); + LOGGER.log( + Level.WARNING, + "[{0}] Exception on initial SyncAck, cause: {1}", + localMember, + ex.toString())); } @Override @@ -346,14 +359,15 @@ private void doSync() { } Message message = prepareSyncDataMsg(SYNC, null); - LOGGER.debug("[{}][doSync] Send Sync to {}", localMember, address); + LOGGER.log(Level.DEBUG, "[{0}][doSync] Send Sync to {1}", localMember, address); transport .send(address, message) .subscribe( null, ex -> - LOGGER.debug( - "[{}][doSync] Failed to send Sync to {}, cause: {}", + LOGGER.log( + Level.DEBUG, + "[{0}][doSync] Failed to send Sync to {1}, cause: {2}", localMember, address, ex.toString())); @@ -366,11 +380,14 @@ private void doSync() { private void onMessage(Message message) { if (isSync(message)) { onSync(message) - .subscribe(null, ex -> LOGGER.error("[{}][onSync][error] cause:", localMember, ex)); + .subscribe( + null, ex -> LOGGER.log(Level.ERROR, "[{0}][onSync][error] cause:", localMember, ex)); } else if (isSyncAck(message)) { if (message.correlationId() == null) { // filter out initial sync onSyncAck(message, false) - .subscribe(null, ex -> LOGGER.error("[{}][onSyncAck][error] cause:", localMember, ex)); + .subscribe( + null, + ex -> LOGGER.log(Level.ERROR, "[{0}][onSyncAck][error] cause:", localMember, ex)); } } } @@ -390,7 +407,8 @@ private boolean isSyncAck(Message message) { private Mono onSyncAck(Message syncAckMsg, boolean onStart) { return Mono.defer( () -> { - LOGGER.debug("[{}] Received SyncAck from {}", localMember, syncAckMsg.sender()); + LOGGER.log( + Level.DEBUG, "[{0}] Received SyncAck from {1}", localMember, syncAckMsg.sender()); return syncMembership(syncAckMsg.data(), onStart); }); } @@ -400,7 +418,7 @@ private Mono onSync(Message syncMsg) { return Mono.defer( () -> { final String sender = syncMsg.sender(); - LOGGER.debug("[{}] Received Sync from {}", localMember, sender); + LOGGER.log(Level.DEBUG, "[{0}] Received Sync from {1}", localMember, sender); return syncMembership(syncMsg.data(), false) .doOnSuccess( avoid -> { @@ -410,8 +428,9 @@ private Mono onSync(Message syncMsg) { .subscribe( null, ex -> - LOGGER.debug( - "[{}] Failed to send SyncAck to {}, cause: {}", + LOGGER.log( + Level.DEBUG, + "[{0}] Failed to send SyncAck to {}, cause: {1}", localMember, sender, ex.toString())); @@ -428,7 +447,11 @@ private void onFailureDetectorEvent(FailureDetectorEvent fdEvent) { if (r0.status() == fdEvent.status()) { // status not changed return; } - LOGGER.debug("[{}][onFailureDetectorEvent] Received status change: {}", localMember, fdEvent); + LOGGER.log( + Level.DEBUG, + "[{0}][onFailureDetectorEvent] Received status change: {1}", + localMember, + fdEvent); if (fdEvent.status() == ALIVE) { // TODO: Consider to make more elegant solution // Alive won't override SUSPECT so issue instead extra sync with member to force it spread @@ -440,8 +463,9 @@ private void onFailureDetectorEvent(FailureDetectorEvent fdEvent) { .subscribe( null, ex -> - LOGGER.debug( - "[{}][onFailureDetectorEvent] Failed to send Sync to {}, cause: {}", + LOGGER.log( + Level.DEBUG, + "[{0}][onFailureDetectorEvent] Failed to send Sync to {1}, cause: {2}", localMember, address, ex.toString())); @@ -452,8 +476,9 @@ private void onFailureDetectorEvent(FailureDetectorEvent fdEvent) { .subscribe( null, ex -> - LOGGER.error( - "[{}][onFailureDetectorEvent][updateMembership][error] cause:", + LOGGER.log( + Level.ERROR, + "[{0}][onFailureDetectorEvent][updateMembership][error] cause:", localMember, ex)); } @@ -463,13 +488,16 @@ private void onFailureDetectorEvent(FailureDetectorEvent fdEvent) { private void onMembershipGossip(Message message) { if (MEMBERSHIP_GOSSIP.equals(message.qualifier())) { MembershipRecord record = message.data(); - LOGGER.debug("[{}] Received membership gossip: {}", localMember, record); + LOGGER.log(Level.DEBUG, "[{0}] Received membership gossip: {1}", localMember, record); updateMembership(record, MembershipUpdateReason.MEMBERSHIP_GOSSIP) .subscribe( null, ex -> - LOGGER.error( - "[{}][onMembershipGossip][updateMembership][error] cause:", localMember, ex)); + LOGGER.log( + Level.ERROR, + "[{0}][onMembershipGossip][updateMembership][error] cause:", + localMember, + ex)); } } @@ -517,8 +545,9 @@ private Mono syncMembership(SyncData syncData, boolean onStart) { updateMembership(r1, reason) .doOnError( ex -> - LOGGER.warn( - "[{}][syncMembership][{}][error] cause: {}", + LOGGER.log( + Level.WARNING, + "[{0}][syncMembership][{1}][error] cause: {2}", localMember, reason, ex.toString())) @@ -571,9 +600,10 @@ private Mono updateMembership(MembershipRecord r1, MembershipUpdateReason String localNamespace = membershipConfig.namespace(); String namespace = r1.member().namespace(); if (!areNamespacesRelated(localNamespace, namespace)) { - LOGGER.debug( - "[{}][updateMembership][{}] Skipping update, " - + "namespace not matched, local: {}, inbound: {}", + LOGGER.log( + Level.DEBUG, + "[{0}][updateMembership][{1}] Skipping update, " + + "namespace not matched, local: {2}, inbound: {3}", localMember, reason, localNamespace, @@ -587,9 +617,10 @@ private Mono updateMembership(MembershipRecord r1, MembershipUpdateReason // if current record is LEAVING then we want to process other event too // Check if new record r1 overrides existing membership record r0 if ((r0 == null || !r0.isLeaving()) && !r1.isOverrides(r0)) { - LOGGER.debug( - "[{}][updateMembership][{}] Skipping update, " - + "can't override r0: {} with received r1: {}", + LOGGER.log( + Level.DEBUG, + "[{0}][updateMembership][{1}] Skipping update, " + + "can't override r0: {2} with received r1: {3}", localMember, reason, r0, @@ -634,9 +665,10 @@ private Mono updateMembership(MembershipRecord r1, MembershipUpdateReason .fetchMetadata(r1.member()) .doOnError( ex -> - LOGGER.warn( - "[{}][updateMembership][{}] Skipping to add/update member: {}, " - + "due to failed fetchMetadata call (cause: {})", + LOGGER.log( + Level.WARNING, + "[{0}][updateMembership][{1}] Skipping to add/update member: {2}, " + + "due to failed fetchMetadata call (cause: {3})", localMember, reason, r1, @@ -689,10 +721,11 @@ private Mono onSelfMemberDetected( membershipTable.put(localMember.id(), r2); - LOGGER.debug( - "[{}][updateMembership][{}] Updating incarnation, " - + "local record r0: {} to received r1: {}, " - + "spreading with increased incarnation r2: {}", + LOGGER.log( + Level.DEBUG, + "[{0}][updateMembership][{1}] Updating incarnation, " + + "local record r0: {2} to received r1: {3}, " + + "spreading with increased incarnation r2: {4}", localMember, reason, r0, @@ -734,7 +767,7 @@ private Mono onLeavingDetected(MembershipRecord r0, MembershipRecord r1) { } private void publishEvent(MembershipEvent event) { - LOGGER.info("[{}][publishEvent] {}", localMember, event); + LOGGER.log(Level.INFO, "[{0}][publishEvent] {1}", localMember, event); sink.emitNext(event, busyLooping(Duration.ofSeconds(3))); } @@ -757,9 +790,10 @@ private Mono onDeadMemberDetected(MembershipRecord r1) { // Log that member left gracefully or without notification if (r0.isLeaving()) { - LOGGER.info("[{}] Member left gracefully: {}", localMember, member); + LOGGER.log(Level.INFO, "[{0}] Member left gracefully: {1}", localMember, member); } else { - LOGGER.info("[{}] Member left without notification: {}", localMember, member); + LOGGER.log( + Level.INFO, "[{0}] Member left without notification: {1}", localMember, member); } final long timestamp = System.currentTimeMillis(); @@ -798,7 +832,8 @@ private void onAliveMemberDetected( private void cancelSuspicionTimeoutTask(String memberId) { Disposable future = suspicionTimeoutTasks.remove(memberId); if (future != null && !future.isDisposed()) { - LOGGER.debug("[{}] Cancelled SuspicionTimeoutTask for {}", localMember, memberId); + LOGGER.log( + Level.DEBUG, "[{0}] Cancelled SuspicionTimeoutTask for {1}", localMember, memberId); future.dispose(); } } @@ -813,8 +848,9 @@ private void scheduleSuspicionTimeoutTask(MembershipRecord r) { suspicionTimeoutTasks.computeIfAbsent( r.member().id(), id -> { - LOGGER.debug( - "[{}] Scheduled SuspicionTimeoutTask for {}, suspicionTimeout: {}", + LOGGER.log( + Level.DEBUG, + "[{0}] Scheduled SuspicionTimeoutTask for {1}, suspicionTimeout: {2}", localMember, id, suspicionTimeout); @@ -827,14 +863,18 @@ private void onSuspicionTimeout(String memberId) { suspicionTimeoutTasks.remove(memberId); MembershipRecord r = membershipTable.get(memberId); if (r != null) { - LOGGER.debug("[{}] Declare SUSPECTED member {} as DEAD by timeout", localMember, r); + LOGGER.log( + Level.DEBUG, "[{0}] Declare SUSPECTED member {1} as DEAD by timeout", localMember, r); MembershipRecord deadRecord = new MembershipRecord(r.member(), DEAD, r.incarnation()); updateMembership(deadRecord, MembershipUpdateReason.SUSPICION_TIMEOUT) .subscribe( null, ex -> - LOGGER.error( - "[{}][onSuspicionTimeout][updateMembership][error] cause:", localMember, ex)); + LOGGER.log( + Level.ERROR, + "[{0}][onSuspicionTimeout][updateMembership][error] cause:", + localMember, + ex)); } } @@ -856,13 +896,14 @@ private Mono spreadMembershipGossip(MembershipRecord r) { return Mono.defer( () -> { Message msg = Message.withData(r).qualifier(MEMBERSHIP_GOSSIP).build(); - LOGGER.debug("[{}] Send membership with gossip", localMember); + LOGGER.log(Level.DEBUG, "[{0}] Send membership with gossip", localMember); return gossipProtocol .spread(msg) .doOnError( ex -> - LOGGER.debug( - "[{}] Failed to send membership with gossip, cause: {}", + LOGGER.log( + Level.DEBUG, + "[{0}] Failed to send membership with gossip, cause: {1}", localMember, ex.toString())) .then(); diff --git a/cluster/src/main/java/io/scalecube/cluster/metadata/MetadataStoreImpl.java b/cluster/src/main/java/io/scalecube/cluster/metadata/MetadataStoreImpl.java index f86680cf..52e0c29a 100644 --- a/cluster/src/main/java/io/scalecube/cluster/metadata/MetadataStoreImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/metadata/MetadataStoreImpl.java @@ -4,6 +4,8 @@ import io.scalecube.cluster.Member; import io.scalecube.cluster.transport.api.Message; import io.scalecube.cluster.transport.api.Transport; +import java.lang.System.Logger; +import java.lang.System.Logger.Level; import java.nio.ByteBuffer; import java.time.Duration; import java.util.HashMap; @@ -11,8 +13,6 @@ import java.util.Objects; import java.util.Optional; import java.util.UUID; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import reactor.core.Disposable; import reactor.core.Disposables; import reactor.core.publisher.Mono; @@ -20,7 +20,7 @@ public class MetadataStoreImpl implements MetadataStore { - private static final Logger LOGGER = LoggerFactory.getLogger(MetadataStore.class); + private static final Logger LOGGER = System.getLogger(MetadataStore.class.getName()); // Qualifiers @@ -80,7 +80,7 @@ public void start() { .publishOn(scheduler) .subscribe( this::onMessage, - ex -> LOGGER.error("[{}][onMessage][error] cause:", localMember, ex))); + ex -> LOGGER.log(Level.ERROR, "[{0}][onMessage][error] cause:", localMember, ex))); } @Override @@ -115,11 +115,19 @@ public ByteBuffer updateMetadata(Member member, ByteBuffer metadata) { ByteBuffer result = membersMetadata.put(member, value); if (result == null) { - LOGGER.debug( - "[{}] Added metadata(size={}) for member {}", localMember, value.remaining(), member); + LOGGER.log( + Level.DEBUG, + "[{0}] Added metadata(size={1}) for member {2}", + localMember, + value.remaining(), + member); } else { - LOGGER.debug( - "[{}] Updated metadata(size={}) for member {}", localMember, value.remaining(), member); + LOGGER.log( + Level.DEBUG, + "[{0}] Updated metadata(size={1}) for member {2}", + localMember, + value.remaining(), + member); } return result; } @@ -132,8 +140,9 @@ public ByteBuffer removeMetadata(Member member) { // remove ByteBuffer metadata = membersMetadata.remove(member); if (metadata != null) { - LOGGER.debug( - "[{}] Removed metadata(size={}) for member {}", + LOGGER.log( + Level.DEBUG, + "[{0}] Removed metadata(size={1}) for member {2}", localMember, metadata.remaining(), member); @@ -149,7 +158,8 @@ public Mono fetchMetadata(Member member) { final String cid = UUID.randomUUID().toString(); final String targetAddress = member.address(); - LOGGER.debug("[{}][{}] Getting metadata for member {}", localMember, cid, member); + LOGGER.log( + Level.DEBUG, "[{0}][{1}] Getting metadata for member {2}", localMember, cid, member); Message request = Message.builder() @@ -164,8 +174,9 @@ public Mono fetchMetadata(Member member) { .publishOn(scheduler) .doOnSuccess( s -> - LOGGER.debug( - "[{}][{}] Received GetMetadataResp from {}", + LOGGER.log( + Level.DEBUG, + "[{0}][{1}] Received GetMetadataResp from {2}", localMember, cid, targetAddress)) @@ -173,9 +184,10 @@ public Mono fetchMetadata(Member member) { .map(GetMetadataResponse::getMetadata) .doOnError( th -> - LOGGER.warn( - "[{}][{}] Timeout getting GetMetadataResp " - + "from {} within {} ms, cause: {}", + LOGGER.log( + Level.WARNING, + "[{0}][{1}] Timeout getting GetMetadataResp " + + "from {2} within {3} ms, cause: {4}", localMember, cid, targetAddress, @@ -196,15 +208,16 @@ private void onMessage(Message message) { private void onMetadataRequest(Message message) { final String sender = message.sender(); - LOGGER.debug("[{}] Received GetMetadataReq from {}", localMember, sender); + LOGGER.log(Level.DEBUG, "[{0}] Received GetMetadataReq from {1}", localMember, sender); GetMetadataRequest reqData = message.data(); Member targetMember = reqData.getMember(); // Validate target member if (!targetMember.id().equals(localMember.id())) { - LOGGER.warn( - "[{}] Received GetMetadataReq from {} to {}, but local member is {}", + LOGGER.log( + Level.WARNING, + "[{0}] Received GetMetadataReq from {1} to {2}, but local member is {3}", localMember, sender, targetMember, @@ -222,14 +235,15 @@ private void onMetadataRequest(Message message) { .data(respData) .build(); - LOGGER.debug("[{}] Send GetMetadataResp to {}", localMember, sender); + LOGGER.log(Level.DEBUG, "[{0}] Send GetMetadataResp to {1}", localMember, sender); transport .send(sender, response) .subscribe( null, ex -> - LOGGER.debug( - "[{}] Failed to send GetMetadataResp to {}, cause: {}", + LOGGER.log( + Level.DEBUG, + "[{0}] Failed to send GetMetadataResp to {1}, cause: {2}", localMember, sender, ex.toString())); @@ -240,8 +254,9 @@ private ByteBuffer encodeMetadata() { try { result = config.metadataCodec().serialize(localMetadata); } catch (Exception e) { - LOGGER.error( - "[{}] Failed to encode metadata: {}, cause: {}", + LOGGER.log( + Level.ERROR, + "[{0}] Failed to encode metadata: {1}, cause: {2}", localMember, localMetadata, e.toString()); diff --git a/cluster/src/test/java/io/scalecube/cluster/BaseTest.java b/cluster/src/test/java/io/scalecube/cluster/BaseTest.java index 6ba2895d..2a786007 100644 --- a/cluster/src/test/java/io/scalecube/cluster/BaseTest.java +++ b/cluster/src/test/java/io/scalecube/cluster/BaseTest.java @@ -6,29 +6,29 @@ import io.scalecube.cluster.transport.api.TransportConfig; import io.scalecube.cluster.utils.NetworkEmulatorTransport; import io.scalecube.transport.netty.tcp.TcpTransportFactory; +import java.lang.System.Logger; +import java.lang.System.Logger.Level; import java.lang.reflect.Field; import java.time.Duration; import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.TestInfo; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import reactor.core.Exceptions; /** Base test class. */ public class BaseTest { - protected static final Logger LOGGER = LoggerFactory.getLogger(BaseTest.class); + protected static final Logger LOGGER = System.getLogger(BaseTest.class.getName()); @BeforeEach public final void baseSetUp(TestInfo testInfo) { - LOGGER.info("***** Test started : " + testInfo.getDisplayName() + " *****"); + LOGGER.log(Level.INFO, "***** Test started : " + testInfo.getDisplayName() + " *****"); } @AfterEach public final void baseTearDown(TestInfo testInfo) { - LOGGER.info("***** Test finished : " + testInfo.getDisplayName() + " *****"); + LOGGER.log(Level.INFO, "***** Test finished : " + testInfo.getDisplayName() + " *****"); } public static T getField(Object obj, String fieldName) { diff --git a/cluster/src/test/java/io/scalecube/cluster/ClusterTest.java b/cluster/src/test/java/io/scalecube/cluster/ClusterTest.java index f8ac8eda..9c4068a0 100644 --- a/cluster/src/test/java/io/scalecube/cluster/ClusterTest.java +++ b/cluster/src/test/java/io/scalecube/cluster/ClusterTest.java @@ -10,6 +10,7 @@ import io.scalecube.cluster.membership.MembershipEvent.Type; import io.scalecube.cluster.metadata.MetadataCodec; import io.scalecube.transport.netty.tcp.TcpTransportFactory; +import java.lang.System.Logger.Level; import java.net.InetAddress; import java.net.UnknownHostException; import java.time.Duration; @@ -178,9 +179,9 @@ public void testJoinDynamicPort() { .transportFactory(TcpTransportFactory::new) .startAwait()); } - LOGGER.info("Start up time: {} ms", System.currentTimeMillis() - startAt); + LOGGER.log(Level.INFO, "Start up time: {0} ms", System.currentTimeMillis() - startAt); assertEquals(membersNum + 1, seedNode.members().size()); - LOGGER.info("Cluster nodes: {}", seedNode.members()); + LOGGER.log(Level.INFO, "Cluster nodes: {0}", seedNode.members()); } finally { // Shutdown all nodes shutdown( @@ -222,7 +223,8 @@ public void testUpdateMetadata() throws Exception { @Override public void onMembershipEvent(MembershipEvent event) { if (event.isUpdated()) { - LOGGER.info("Received membership update event: {}", event); + LOGGER.log( + Level.INFO, "Received membership update event: {0}", event); updateLatch.countDown(); } } @@ -295,7 +297,8 @@ public void testUpdateMetadataProperty() throws Exception { @Override public void onMembershipEvent(MembershipEvent event) { if (event.isUpdated()) { - LOGGER.info("Received membership update event: {}", event); + LOGGER.log( + Level.INFO, "Received membership update event: {0}", event); updateLatch.countDown(); } } @@ -373,7 +376,8 @@ public void testRemoveMetadataProperty() throws Exception { @Override public void onMembershipEvent(MembershipEvent event) { if (event.isUpdated()) { - LOGGER.info("Received membership update event: {}", event); + LOGGER.log( + Level.INFO, "Received membership update event: {0}", event); updateLatch.countDown(); } } @@ -603,7 +607,7 @@ private void shutdown(List nodes) { .collect(Collectors.toList())) .block(TIMEOUT); } catch (Exception ex) { - LOGGER.error("Exception on cluster shutdown", ex); + LOGGER.log(Level.ERROR, "Exception on cluster shutdown", ex); } } } diff --git a/cluster/src/test/java/io/scalecube/cluster/gossip/GossipProtocolTest.java b/cluster/src/test/java/io/scalecube/cluster/gossip/GossipProtocolTest.java index 9842c07c..444fa910 100644 --- a/cluster/src/test/java/io/scalecube/cluster/gossip/GossipProtocolTest.java +++ b/cluster/src/test/java/io/scalecube/cluster/gossip/GossipProtocolTest.java @@ -16,6 +16,8 @@ import io.scalecube.cluster.transport.api.Message; import io.scalecube.cluster.transport.api.Transport; import io.scalecube.cluster.utils.NetworkEmulatorTransport; +import java.lang.System.Logger; +import java.lang.System.Logger.Level; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; @@ -33,8 +35,6 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; @@ -42,7 +42,7 @@ class GossipProtocolTest extends BaseTest { - private static final Logger LOGGER = LoggerFactory.getLogger(GossipProtocolTest.class); + private static final Logger LOGGER = System.getLogger(GossipProtocolTest.class.getName()); private static final List experiments = Arrays.asList( @@ -138,7 +138,8 @@ void testGossipProtocol(int membersNum, int lossPercent, int meanDelay) throws E if (firstTimeAdded) { latch.countDown(); } else { - LOGGER.error("Delivered gossip twice to: {}", transport.address()); + LOGGER.log( + Level.ERROR, "Delivered gossip twice to: {0}", transport.address()); doubleDelivery.set(true); } } @@ -176,10 +177,16 @@ void testGossipProtocol(int membersNum, int lossPercent, int meanDelay) throws E assertFalse(doubleDelivery.get(), "Delivered gossip twice to same member"); } finally { // Print theoretical results - LOGGER.info( + LOGGER.log( + Level.INFO, "Experiment params: " - + "N={}, Gfanout={}, Grepeat_mult={}, Tgossip={}ms Ploss={}%, Tmean={}ms", - membersNum, gossipFanout, gossipRepeatMultiplier, gossipInterval, lossPercent, meanDelay); + + "N={0}, Gfanout={1}, Grepeat_mult={2}, Tgossip={3}ms Ploss={4}%, Tmean={5}ms", + membersNum, + gossipFanout, + gossipRepeatMultiplier, + gossipInterval, + lossPercent, + meanDelay); double convergProb = gossipConvergencePercent(gossipFanout, gossipRepeatMultiplier, membersNum, lossPercent); long expDissemTime = @@ -187,21 +194,28 @@ void testGossipProtocol(int membersNum, int lossPercent, int meanDelay) throws E int maxMsgPerNode = maxMessagesPerGossipPerNode(gossipFanout, gossipRepeatMultiplier, membersNum); int maxMsgTotal = maxMessagesPerGossipTotal(gossipFanout, gossipRepeatMultiplier, membersNum); - LOGGER.info( - "Expected dissemination time is {}ms with probability {}%", expDissemTime, convergProb); - LOGGER.info("Max messages sent per node {} and total {}", maxMsgPerNode, maxMsgTotal); + LOGGER.log( + Level.INFO, + "Expected dissemination time is {0}ms with probability {1}%", + expDissemTime, + convergProb); + LOGGER.log( + Level.INFO, "Max messages sent per node {0} and total {1}", maxMsgPerNode, maxMsgTotal); // Print actual results - LOGGER.info( - "Actual dissemination time: {}ms (timeout {}ms)", disseminationTime, gossipTimeout); - LOGGER.info("Messages sent stats (diss.): {}", messageSentStatsDissemination); + LOGGER.log( + Level.INFO, + "Actual dissemination time: {0}ms (timeout {1}ms)", + disseminationTime, + gossipTimeout); + LOGGER.log(Level.INFO, "Messages sent stats (diss.): {0}", messageSentStatsDissemination); if (lossPercent > 0) { - LOGGER.info("Messages lost stats (diss.): {}", messageLostStatsDissemination); + LOGGER.log(Level.INFO, "Messages lost stats (diss.): {0}", messageLostStatsDissemination); } if (awaitFullCompletion) { - LOGGER.info("Messages sent stats (total): {}", messageSentStatsOverall); + LOGGER.log(Level.INFO, "Messages sent stats (total): {0}", messageSentStatsOverall); if (lossPercent > 0) { - LOGGER.info("Messages lost stats (total): {}", messageLostStatsOverall); + LOGGER.log(Level.INFO, "Messages lost stats (total): {0}", messageLostStatsOverall); } } @@ -289,7 +303,7 @@ private static void destroyGossipProtocols(List gossipProtoc try { Mono.when(futures).block(Duration.ofSeconds(30)); } catch (Exception ex) { - LOGGER.warn("Failed to await transport termination: " + ex); + LOGGER.log(Level.WARNING, "Failed to await transport termination: " + ex); } // Await a bit diff --git a/examples/pom.xml b/examples/pom.xml index a90c00a3..d0b2d934 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -1,4 +1,6 @@ - + 4.0.0 @@ -39,6 +41,10 @@ org.apache.logging.log4j log4j-core + + org.apache.logging.log4j + log4j-jpl + diff --git a/pom.xml b/pom.xml index 584671d8..7a95f881 100644 --- a/pom.xml +++ b/pom.xml @@ -7,7 +7,7 @@ io.scalecube scalecube-parent - 0.2.20 + 0.3.1 scalecube-cluster-parent @@ -35,17 +35,17 @@ - 1.7.36 - 2.17.2 2020.0.32 - 2.15.1 + 2.18.0 5.3.1 5.9.3 1.3 + 2.17.2 https://maven.pkg.github.com/scalecube/scalecube-cluster + checkstyle-suppressions.xml @@ -57,6 +57,44 @@ codec-parent + + + + + io.projectreactor + reactor-bom + ${reactor.version} + pom + import + + + + + com.fasterxml.jackson + jackson-bom + ${jackson.version} + pom + import + + + + + org.apache.logging.log4j + log4j-bom + ${log4j.version} + pom + import + + + + + org.junit.jupiter + junit-jupiter-api + ${junit-jupiter.version} + + + + @@ -88,51 +126,16 @@ reactor-test test + + org.apache.logging.log4j + log4j-core + test + + + org.apache.logging.log4j + log4j-jpl + test + - - - - - org.slf4j - slf4j-api - ${slf4j.version} - - - - - org.apache.logging.log4j - log4j-bom - ${log4j.version} - pom - import - - - - - io.projectreactor - reactor-bom - ${reactor.version} - pom - import - - - - - com.fasterxml.jackson - jackson-bom - ${jackson.version} - pom - import - - - - - org.junit.jupiter - junit-jupiter-api - ${junit-jupiter.version} - - - - diff --git a/transport-parent/pom.xml b/transport-parent/pom.xml index b5ba01c7..061d3fdc 100644 --- a/transport-parent/pom.xml +++ b/transport-parent/pom.xml @@ -1,5 +1,7 @@ - + 4.0.0 @@ -22,23 +24,6 @@ io.projectreactor reactor-core - - - org.slf4j - slf4j-api - - - - - org.apache.logging.log4j - log4j-slf4j-impl - test - - - org.apache.logging.log4j - log4j-core - test - diff --git a/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/TransportImpl.java b/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/TransportImpl.java index bcb93ad8..86c66949 100644 --- a/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/TransportImpl.java +++ b/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/TransportImpl.java @@ -14,6 +14,8 @@ import io.scalecube.cluster.transport.api.Message; import io.scalecube.cluster.transport.api.MessageCodec; import io.scalecube.cluster.transport.api.Transport; +import java.lang.System.Logger; +import java.lang.System.Logger.Level; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.UnknownHostException; @@ -22,8 +24,6 @@ import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import reactor.core.Disposable; import reactor.core.Disposables; import reactor.core.publisher.Flux; @@ -35,7 +35,7 @@ public final class TransportImpl implements Transport { - private static final Logger LOGGER = LoggerFactory.getLogger(Transport.class); + private static final Logger LOGGER = System.getLogger(Transport.class.getName()); private static final DistinctErrors DISTINCT_ERRORS = new DistinctErrors(Duration.ofMinutes(1)); @@ -108,7 +108,13 @@ private void init(DisposableServer server) { .then(doStop()) .doFinally(s -> onStop.emitEmpty(busyLooping(Duration.ofSeconds(3)))) .subscribe( - null, ex -> LOGGER.warn("[{}][doStop] Exception occurred: {}", address, ex.toString())); + null, + ex -> + LOGGER.log( + Level.WARNING, + "[{0}][doStop] Exception occurred: {1}", + address, + ex.toString())); } /** @@ -120,8 +126,12 @@ private void init(DisposableServer server) { public Mono start() { return Mono.deferContextual(context -> receiver.bind()) .doOnNext(this::init) - .doOnSuccess(t -> LOGGER.info("[start][{}] Bound cluster transport", t.address())) - .doOnError(ex -> LOGGER.error("[start][{}] Exception occurred: {}", address, ex.toString())) + .doOnSuccess( + t -> LOGGER.log(Level.INFO, "[start][{0}] Bound cluster transport", t.address())) + .doOnError( + ex -> + LOGGER.log( + Level.ERROR, "[start][{0}] Exception occurred: {1}", address, ex.toString())) .thenReturn(this) .cast(Transport.class) .contextWrite( @@ -153,13 +163,13 @@ public Mono stop() { private Mono doStop() { return Mono.defer( () -> { - LOGGER.info("[{}][doStop] Stopping", address); + LOGGER.log(Level.INFO, "[{0}][doStop] Stopping", address); // Complete incoming messages observable sink.emitComplete(busyLooping(Duration.ofSeconds(3))); return Flux.concatDelayError(closeServer(), shutdownLoopResources()) .then() .doFinally(s -> connections.clear()) - .doOnSuccess(avoid -> LOGGER.info("[{}][doStop] Stopped", address)); + .doOnSuccess(avoid -> LOGGER.log(Level.INFO, "[{0}][doStop] Stopped", address)); }); } @@ -213,7 +223,8 @@ private Message decodeMessage(ByteBuf byteBuf) { return messageCodec.deserialize(stream); } catch (Exception e) { if (!DISTINCT_ERRORS.contains(e)) { - LOGGER.warn("[{}][decodeMessage] Exception occurred: {}", address, e.toString()); + LOGGER.log( + Level.WARNING, "[{0}][decodeMessage] Exception occurred: {1}", address, e.toString()); } throw new DecoderException(e); } @@ -227,7 +238,8 @@ private ByteBuf encodeMessage(Message message) { } catch (Exception e) { byteBuf.release(); if (!DISTINCT_ERRORS.contains(e)) { - LOGGER.warn("[{}][encodeMessage] Exception occurred: {}", address, e.toString()); + LOGGER.log( + Level.WARNING, "[{0}][encodeMessage] Exception occurred: {1}", address, e.toString()); } throw new EncoderException(e); } @@ -244,8 +256,9 @@ private Mono connect(String remoteAddress) { .onDispose() .doOnTerminate(() -> connections.remove(remoteAddress)) .subscribe(); - LOGGER.debug( - "[{}][connect][success] remoteAddress: {}, channel: {}", + LOGGER.log( + Level.DEBUG, + "[{0}][connect][success] remoteAddress: {1}, channel: {2}", address, remoteAddress, connection.channel()); @@ -253,8 +266,9 @@ private Mono connect(String remoteAddress) { .doOnError( th -> { if (!DISTINCT_ERRORS.contains(th)) { - LOGGER.warn( - "[{}][connect][error] remoteAddress: {}, cause: {}", + LOGGER.log( + Level.WARNING, + "[{0}][connect][error] remoteAddress: {1}, cause: {2}", address, remoteAddress, th.toString()); @@ -270,14 +284,19 @@ private Mono closeServer() { if (server == null) { return Mono.empty(); } - LOGGER.info("[{}][closeServer] Closing server channel", address); + LOGGER.log(Level.INFO, "[{0}][closeServer] Closing server channel", address); return Mono.fromRunnable(server::dispose) .then(server.onDispose()) - .doOnSuccess(avoid -> LOGGER.info("[{}][closeServer] Closed server channel", address)) + .doOnSuccess( + avoid -> + LOGGER.log(Level.INFO, "[{0}][closeServer] Closed server channel", address)) .doOnError( e -> - LOGGER.warn( - "[{}][closeServer] Exception occurred: {}", address, e.toString())); + LOGGER.log( + Level.WARNING, + "[{0}][closeServer] Exception occurred: {1}", + address, + e.toString())); }); } @@ -325,7 +344,7 @@ public void onMessage(ByteBuf byteBuf) { final Message message = messageDecoder.apply(byteBuf); sink.emitNext(message, busyLooping(Duration.ofSeconds(3))); } catch (Exception e) { - LOGGER.error("[{}][onMessage] Exception occurred:", address, e); + LOGGER.log(Level.ERROR, "[{0}][onMessage] Exception occurred", address, e); } } } diff --git a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/BaseTest.java b/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/BaseTest.java index a8638bf8..57cc5877 100644 --- a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/BaseTest.java +++ b/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/BaseTest.java @@ -6,27 +6,27 @@ import io.scalecube.cluster.utils.NetworkEmulatorTransport; import io.scalecube.transport.netty.tcp.TcpTransportFactory; import io.scalecube.transport.netty.websocket.WebsocketTransportFactory; +import java.lang.System.Logger; +import java.lang.System.Logger.Level; import java.time.Duration; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.TestInfo; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; /** Base test class. */ public class BaseTest { - protected static final Logger LOGGER = LoggerFactory.getLogger(BaseTest.class); + protected static final Logger LOGGER = System.getLogger(BaseTest.class.getName()); @BeforeEach public final void baseSetUp(TestInfo testInfo) { - LOGGER.info("***** Test started : " + testInfo.getDisplayName() + " *****"); + LOGGER.log(Level.INFO, "***** Test started : " + testInfo.getDisplayName() + " *****"); } @AfterEach public final void baseTearDown(TestInfo testInfo) { - LOGGER.info("***** Test finished : " + testInfo.getDisplayName() + " *****"); + LOGGER.log(Level.INFO, "***** Test finished : " + testInfo.getDisplayName() + " *****"); } /** @@ -41,8 +41,9 @@ protected Mono send(Transport transport, String to, Message msg) { .send(to, msg) .doOnError( th -> - LOGGER.error( - "Failed to send {} to {} from transport: {}, cause: {}", + LOGGER.log( + Level.ERROR, + "Failed to send {0} to {1} from transport: {2}, cause: {3}", msg, to, transport, @@ -59,7 +60,7 @@ protected void destroyTransport(Transport transport) { try { transport.stop().block(Duration.ofSeconds(1)); } catch (Exception ex) { - LOGGER.warn("Failed to await transport termination: " + ex); + LOGGER.log(Level.WARNING, "Failed to await transport termination: {0}", ex.toString()); } } } diff --git a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/tcp/TcpTransportSendOrderTest.java b/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/tcp/TcpTransportSendOrderTest.java index 417c6f0b..532af84c 100644 --- a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/tcp/TcpTransportSendOrderTest.java +++ b/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/tcp/TcpTransportSendOrderTest.java @@ -5,6 +5,7 @@ import io.scalecube.cluster.transport.api.Message; import io.scalecube.cluster.transport.api.Transport; import io.scalecube.transport.netty.BaseTest; +import java.lang.System.Logger.Level; import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; @@ -45,7 +46,7 @@ public void testSendOrderSingleThreadWithoutPromises(TestInfo testInfo) throws E int sentPerIteration = 1000; long[] iterationTimeSeries = new long[iterationNum - 1]; for (int i = 0; i < iterationNum; i++) { - LOGGER.debug("####### {} : iteration = {}", testInfo.getDisplayName(), i); + LOGGER.log(Level.DEBUG, "####### {0} : iteration = {1}", testInfo.getDisplayName(), i); client = createTcpTransport(); final List received = new ArrayList<>(); @@ -65,7 +66,7 @@ public void testSendOrderSingleThreadWithoutPromises(TestInfo testInfo) throws E Message message = Message.withQualifier("q" + j).build(); client .send(server.address(), message) - .subscribe(null, th -> LOGGER.error("Failed to send message", th)); + .subscribe(null, th -> LOGGER.log(Level.ERROR, "Failed to send message", th)); } latch.await(20, TimeUnit.SECONDS); long iterationTime = System.currentTimeMillis() - startAt; @@ -74,7 +75,7 @@ public void testSendOrderSingleThreadWithoutPromises(TestInfo testInfo) throws E } assertSendOrder(sentPerIteration, received); - LOGGER.debug("Iteration time: {} ms", iterationTime); + LOGGER.log(Level.DEBUG, "Iteration time: {0} ms", iterationTime); serverSubscriber.dispose(); destroyTransport(client); @@ -82,7 +83,7 @@ public void testSendOrderSingleThreadWithoutPromises(TestInfo testInfo) throws E LongSummaryStatistics iterationTimeStats = LongStream.of(iterationTimeSeries).summaryStatistics(); - LOGGER.debug("Iteration time stats (ms): {}", iterationTimeStats); + LOGGER.log(Level.DEBUG, "Iteration time stats (ms): {0}", iterationTimeStats); } @Test @@ -94,7 +95,7 @@ public void testSendOrderSingleThread(TestInfo testInfo) throws Exception { long[] iterationTimeSeries = new long[iterationNum - 1]; List totalSentTimeSeries = new ArrayList<>(sentPerIteration * (iterationNum - 1)); for (int i = 0; i < iterationNum; i++) { - LOGGER.debug("####### {} : iteration = {}", testInfo.getDisplayName(), i); + LOGGER.log(Level.DEBUG, "####### {0} : iteration = {1}", testInfo.getDisplayName(), i); List iterSentTimeSeries = new ArrayList<>(sentPerIteration); client = createTcpTransport(); @@ -119,8 +120,9 @@ public void testSendOrderSingleThread(TestInfo testInfo) throws Exception { .subscribe( avoid -> iterSentTimeSeries.add(System.currentTimeMillis() - sentAt), th -> - LOGGER.error( - "Failed to send message in {} ms", + LOGGER.log( + Level.ERROR, + "Failed to send message in {0} ms", System.currentTimeMillis() - sentAt, th)); } @@ -137,15 +139,15 @@ public void testSendOrderSingleThread(TestInfo testInfo) throws Exception { LongSummaryStatistics iterSentTimeStats = iterSentTimeSeries.stream().mapToLong(v -> v).summaryStatistics(); if (i == 0) { // warm up iteration - LOGGER.debug("Warm up iteration time: {} ms", iterationTime); - LOGGER.debug("Sent time stats warm up iter (ms): {}", iterSentTimeStats); + LOGGER.log(Level.DEBUG, "Warm up iteration time: {0} ms", iterationTime); + LOGGER.log(Level.DEBUG, "Sent time stats warm up iter (ms): {0}", iterSentTimeStats); } else { totalSentTimeSeries.addAll(iterSentTimeSeries); LongSummaryStatistics totalSentTimeStats = totalSentTimeSeries.stream().mapToLong(v -> v).summaryStatistics(); - LOGGER.debug("Iteration time: {} ms", iterationTime); - LOGGER.debug("Sent time stats iter (ms): {}", iterSentTimeStats); - LOGGER.debug("Sent time stats total (ms): {}", totalSentTimeStats); + LOGGER.log(Level.DEBUG, "Iteration time: {0} ms", iterationTime); + LOGGER.log(Level.DEBUG, "Sent time stats iter (ms): {0}", iterSentTimeStats); + LOGGER.log(Level.DEBUG, "Sent time stats total (ms): {0}", totalSentTimeStats); } serverSubscriber.dispose(); @@ -154,7 +156,7 @@ public void testSendOrderSingleThread(TestInfo testInfo) throws Exception { LongSummaryStatistics iterationTimeStats = LongStream.of(iterationTimeSeries).summaryStatistics(); - LOGGER.debug("Iteration time stats (ms): {}", iterationTimeStats); + LOGGER.log(Level.DEBUG, "Iteration time stats (ms): {0}", iterationTimeStats); } @Test @@ -163,7 +165,7 @@ public void testSendOrderMultiThread(TestInfo testInfo) throws Exception { final int total = 1000; for (int i = 0; i < 10; i++) { - LOGGER.debug("####### {} : iteration = {}", testInfo.getDisplayName(), i); + LOGGER.log(Level.DEBUG, "####### {0} : iteration = {1}", testInfo.getDisplayName(), i); ExecutorService exec = Executors.newFixedThreadPool( 4, @@ -227,7 +229,7 @@ private Callable sender(int id, Transport client, String address, int tota Message message = Message.withQualifier("q").correlationId(correlationId).build(); client.send(address, message).block(Duration.ofSeconds(3)); } catch (Exception e) { - LOGGER.error("Failed to send message: j = {} id = {}", j, id, e); + LOGGER.log(Level.ERROR, "Failed to send message: j = {0} id = {1}", j, id, e); throw Exceptions.propagate(e); } } diff --git a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/tcp/TcpTransportTest.java b/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/tcp/TcpTransportTest.java index 9c5ed4b9..37ac22d5 100644 --- a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/tcp/TcpTransportTest.java +++ b/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/tcp/TcpTransportTest.java @@ -10,6 +10,7 @@ import io.scalecube.cluster.utils.NetworkEmulatorTransport; import io.scalecube.transport.netty.BaseTest; import java.io.IOException; +import java.lang.System.Logger.Level; import java.net.UnknownHostException; import java.time.Duration; import java.util.ArrayList; @@ -58,7 +59,7 @@ public void testUnresolvedHostConnection() { public void testInteractWithNoConnection(TestInfo testInfo) { String serverAddress = "localhost:49255"; for (int i = 0; i < 10; i++) { - LOGGER.debug("####### {} : iteration = {}", testInfo.getDisplayName(), i); + LOGGER.log(Level.DEBUG, "####### {0} : iteration = {1}", testInfo.getDisplayName(), i); client = createTcpTransport(); @@ -146,7 +147,7 @@ public void testPingPongOnSingleChannel() throws Exception { Message echo = Message.withData("echo/" + message.qualifier()).build(); server .send(message.sender(), echo) - .subscribe(null, th -> LOGGER.error("Failed to send message", th)); + .subscribe(null, th -> LOGGER.log(Level.ERROR, "Failed to send message", th)); } }); @@ -158,10 +159,10 @@ public void testPingPongOnSingleChannel() throws Exception { client .send(server.address(), q1) - .subscribe(null, th -> LOGGER.error("Failed to send message", th)); + .subscribe(null, th -> LOGGER.log(Level.ERROR, "Failed to send message", th)); client .send(server.address(), q2) - .subscribe(null, th -> LOGGER.error("Failed to send message", th)); + .subscribe(null, th -> LOGGER.log(Level.ERROR, "Failed to send message", th)); List target = targetFuture.get(1, TimeUnit.SECONDS); assertNotNull(target); @@ -216,7 +217,7 @@ public void testPingPongOnSeparateChannel() throws Exception { Message echo = Message.withData("echo/" + message.qualifier()).build(); server .send(message.sender(), echo) - .subscribe(null, th -> LOGGER.error("Failed to send message", th)); + .subscribe(null, th -> LOGGER.log(Level.ERROR, "Failed to send message", th)); } }); @@ -228,10 +229,10 @@ public void testPingPongOnSeparateChannel() throws Exception { client .send(server.address(), q1) - .subscribe(null, th -> LOGGER.error("Failed to send message", th)); + .subscribe(null, th -> LOGGER.log(Level.ERROR, "Failed to send message", th)); client .send(server.address(), q2) - .subscribe(null, th -> LOGGER.error("Failed to send message", th)); + .subscribe(null, th -> LOGGER.log(Level.ERROR, "Failed to send message", th)); List target = targetFuture.get(1, TimeUnit.SECONDS); assertNotNull(target); @@ -281,7 +282,7 @@ public void testObserverThrowsException() throws Exception { Message echo = Message.withData("echo/" + message.qualifier()).build(); server .send(message.sender(), echo) - .subscribe(null, th -> LOGGER.error("Failed to send message", th)); + .subscribe(null, th -> LOGGER.log(Level.ERROR, "Failed to send message", th)); } }, Throwable::printStackTrace); @@ -292,7 +293,7 @@ public void testObserverThrowsException() throws Exception { Message message = Message.withData("throw").build(); client .send(server.address(), message) - .subscribe(null, th -> LOGGER.error("Failed to send message", th)); + .subscribe(null, th -> LOGGER.log(Level.ERROR, "Failed to send message", th)); Message message0 = null; try { message0 = messageFuture0.get(1, TimeUnit.SECONDS); diff --git a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/websocket/WebsocketTransportSendOrderTest.java b/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/websocket/WebsocketTransportSendOrderTest.java index 8ed11235..9b353247 100644 --- a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/websocket/WebsocketTransportSendOrderTest.java +++ b/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/websocket/WebsocketTransportSendOrderTest.java @@ -5,6 +5,7 @@ import io.scalecube.cluster.transport.api.Message; import io.scalecube.cluster.transport.api.Transport; import io.scalecube.transport.netty.BaseTest; +import java.lang.System.Logger.Level; import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; @@ -45,7 +46,7 @@ public void testSendOrderSingleThreadWithoutPromises(TestInfo testInfo) throws E int sentPerIteration = 1000; long[] iterationTimeSeries = new long[iterationNum - 1]; for (int i = 0; i < iterationNum; i++) { - LOGGER.debug("####### {} : iteration = {}", testInfo.getDisplayName(), i); + LOGGER.log(Level.DEBUG, "####### {0} : iteration = {1}", testInfo.getDisplayName(), i); client = createWebsocketTransport(); final List received = new ArrayList<>(); @@ -65,7 +66,7 @@ public void testSendOrderSingleThreadWithoutPromises(TestInfo testInfo) throws E Message message = Message.withQualifier("q" + j).build(); client .send(server.address(), message) - .subscribe(null, th -> LOGGER.error("Failed to send message", th)); + .subscribe(null, th -> LOGGER.log(Level.ERROR, "Failed to send message", th)); } latch.await(20, TimeUnit.SECONDS); long iterationTime = System.currentTimeMillis() - startAt; @@ -74,7 +75,7 @@ public void testSendOrderSingleThreadWithoutPromises(TestInfo testInfo) throws E } assertSendOrder(sentPerIteration, received); - LOGGER.debug("Iteration time: {} ms", iterationTime); + LOGGER.log(Level.DEBUG, "Iteration time: {0} ms", iterationTime); serverSubscriber.dispose(); destroyTransport(client); @@ -82,7 +83,7 @@ public void testSendOrderSingleThreadWithoutPromises(TestInfo testInfo) throws E LongSummaryStatistics iterationTimeStats = LongStream.of(iterationTimeSeries).summaryStatistics(); - LOGGER.debug("Iteration time stats (ms): {}", iterationTimeStats); + LOGGER.log(Level.DEBUG, "Iteration time stats (ms): {0}", iterationTimeStats); } @Test @@ -94,7 +95,7 @@ public void testSendOrderSingleThread(TestInfo testInfo) throws Exception { long[] iterationTimeSeries = new long[iterationNum - 1]; List totalSentTimeSeries = new ArrayList<>(sentPerIteration * (iterationNum - 1)); for (int i = 0; i < iterationNum; i++) { - LOGGER.debug("####### {} : iteration = {}", testInfo.getDisplayName(), i); + LOGGER.log(Level.DEBUG, "####### {0} : iteration = {1}", testInfo.getDisplayName(), i); List iterSentTimeSeries = new ArrayList<>(sentPerIteration); client = createWebsocketTransport(); @@ -119,8 +120,9 @@ public void testSendOrderSingleThread(TestInfo testInfo) throws Exception { .subscribe( avoid -> iterSentTimeSeries.add(System.currentTimeMillis() - sentAt), th -> - LOGGER.error( - "Failed to send message in {} ms", + LOGGER.log( + Level.ERROR, + "Failed to send message in {0} ms", System.currentTimeMillis() - sentAt, th)); } @@ -137,15 +139,15 @@ public void testSendOrderSingleThread(TestInfo testInfo) throws Exception { LongSummaryStatistics iterSentTimeStats = iterSentTimeSeries.stream().mapToLong(v -> v).summaryStatistics(); if (i == 0) { // warm up iteration - LOGGER.debug("Warm up iteration time: {} ms", iterationTime); - LOGGER.debug("Sent time stats warm up iter (ms): {}", iterSentTimeStats); + LOGGER.log(Level.DEBUG, "Warm up iteration time: {0} ms", iterationTime); + LOGGER.log(Level.DEBUG, "Sent time stats warm up iter (ms): {0}", iterSentTimeStats); } else { totalSentTimeSeries.addAll(iterSentTimeSeries); LongSummaryStatistics totalSentTimeStats = totalSentTimeSeries.stream().mapToLong(v -> v).summaryStatistics(); - LOGGER.debug("Iteration time: {} ms", iterationTime); - LOGGER.debug("Sent time stats iter (ms): {}", iterSentTimeStats); - LOGGER.debug("Sent time stats total (ms): {}", totalSentTimeStats); + LOGGER.log(Level.DEBUG, "Iteration time: {0} ms", iterationTime); + LOGGER.log(Level.DEBUG, "Sent time stats iter (ms): {0}", iterSentTimeStats); + LOGGER.log(Level.DEBUG, "Sent time stats total (ms): {0}", totalSentTimeStats); } serverSubscriber.dispose(); @@ -154,7 +156,7 @@ public void testSendOrderSingleThread(TestInfo testInfo) throws Exception { LongSummaryStatistics iterationTimeStats = LongStream.of(iterationTimeSeries).summaryStatistics(); - LOGGER.debug("Iteration time stats (ms): {}", iterationTimeStats); + LOGGER.log(Level.DEBUG, "Iteration time stats (ms): {0}", iterationTimeStats); } @Test @@ -163,7 +165,7 @@ public void testSendOrderMultiThread(TestInfo testInfo) throws Exception { final int total = 1000; for (int i = 0; i < 10; i++) { - LOGGER.debug("####### {} : iteration = {}", testInfo.getDisplayName(), i); + LOGGER.log(Level.DEBUG, "####### {0} : iteration = {1}", testInfo.getDisplayName(), i); ExecutorService exec = Executors.newFixedThreadPool( 4, @@ -227,7 +229,7 @@ private Callable sender(int id, Transport client, String address, int tota Message message = Message.withQualifier("q").correlationId(correlationId).build(); client.send(address, message).block(Duration.ofSeconds(3)); } catch (Exception e) { - LOGGER.error("Failed to send message: j = {} id = {}", j, id, e); + LOGGER.log(Level.ERROR, "Failed to send message: j = {0} id = {1}", j, id, e); throw Exceptions.propagate(e); } } diff --git a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/websocket/WebsocketTransportTest.java b/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/websocket/WebsocketTransportTest.java index 9d5867c4..9d8021ca 100644 --- a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/websocket/WebsocketTransportTest.java +++ b/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/websocket/WebsocketTransportTest.java @@ -10,6 +10,7 @@ import io.scalecube.cluster.utils.NetworkEmulatorTransport; import io.scalecube.transport.netty.BaseTest; import java.io.IOException; +import java.lang.System.Logger.Level; import java.net.UnknownHostException; import java.time.Duration; import java.util.ArrayList; @@ -58,7 +59,7 @@ public void testUnresolvedHostConnection() { public void testInteractWithNoConnection(TestInfo testInfo) { String serverAddress = "localhost:49255"; for (int i = 0; i < 10; i++) { - LOGGER.debug("####### {} : iteration = {}", testInfo.getDisplayName(), i); + LOGGER.log(Level.DEBUG, "####### {0} : iteration = {1}", testInfo.getDisplayName(), i); client = createWebsocketTransport(); @@ -146,7 +147,7 @@ public void testPingPongOnSingleChannel() throws Exception { Message echo = Message.withData("echo/" + message.qualifier()).build(); server .send(message.sender(), echo) - .subscribe(null, th -> LOGGER.error("Failed to send message", th)); + .subscribe(null, th -> LOGGER.log(Level.DEBUG, "Failed to send message", th)); } }); @@ -158,10 +159,10 @@ public void testPingPongOnSingleChannel() throws Exception { client .send(server.address(), q1) - .subscribe(null, th -> LOGGER.error("Failed to send message", th)); + .subscribe(null, th -> LOGGER.log(Level.ERROR, "Failed to send message", th)); client .send(server.address(), q2) - .subscribe(null, th -> LOGGER.error("Failed to send message", th)); + .subscribe(null, th -> LOGGER.log(Level.ERROR, "Failed to send message", th)); List target = targetFuture.get(1, TimeUnit.SECONDS); assertNotNull(target); @@ -216,7 +217,7 @@ public void testPingPongOnSeparateChannel() throws Exception { Message echo = Message.withData("echo/" + message.qualifier()).build(); server .send(message.sender(), echo) - .subscribe(null, th -> LOGGER.error("Failed to send message", th)); + .subscribe(null, th -> LOGGER.log(Level.ERROR, "Failed to send message", th)); } }); @@ -228,10 +229,10 @@ public void testPingPongOnSeparateChannel() throws Exception { client .send(server.address(), q1) - .subscribe(null, th -> LOGGER.error("Failed to send message", th)); + .subscribe(null, th -> LOGGER.log(Level.ERROR, "Failed to send message", th)); client .send(server.address(), q2) - .subscribe(null, th -> LOGGER.error("Failed to send message", th)); + .subscribe(null, th -> LOGGER.log(Level.ERROR, "Failed to send message", th)); List target = targetFuture.get(1, TimeUnit.SECONDS); assertNotNull(target); @@ -281,7 +282,7 @@ public void testObserverThrowsException() throws Exception { Message echo = Message.withData("echo/" + message.qualifier()).build(); server .send(message.sender(), echo) - .subscribe(null, th -> LOGGER.error("Failed to send message", th)); + .subscribe(null, th -> LOGGER.log(Level.ERROR, "Failed to send message", th)); } }, Throwable::printStackTrace); @@ -292,7 +293,7 @@ public void testObserverThrowsException() throws Exception { Message message = Message.withData("throw").build(); client .send(server.address(), message) - .subscribe(null, th -> LOGGER.error("Failed to send message", th)); + .subscribe(null, th -> LOGGER.log(Level.ERROR, "Failed to send message", th)); Message message0 = null; try { message0 = messageFuture0.get(1, TimeUnit.SECONDS);