From ed03d911e31a80a898e96f08068e6886d20fcb86 Mon Sep 17 00:00:00 2001 From: Ludovic Orban Date: Fri, 17 Nov 2023 15:46:46 +0100 Subject: [PATCH] put node id in thread names instead of MDC Signed-off-by: Ludovic Orban --- .../jetty/orchestrator/rpc/NodeProcess.java | 4 +--- .../jetty/orchestrator/rpc/RpcClient.java | 9 ++++++++- .../jetty/orchestrator/rpc/RpcServer.java | 17 +++++++++-------- 3 files changed, 18 insertions(+), 12 deletions(-) diff --git a/src/main/java/org/mortbay/jetty/orchestrator/rpc/NodeProcess.java b/src/main/java/org/mortbay/jetty/orchestrator/rpc/NodeProcess.java index da456af..89e28a6 100644 --- a/src/main/java/org/mortbay/jetty/orchestrator/rpc/NodeProcess.java +++ b/src/main/java/org/mortbay/jetty/orchestrator/rpc/NodeProcess.java @@ -27,11 +27,10 @@ import org.mortbay.jetty.orchestrator.configuration.Jvm; import org.mortbay.jetty.orchestrator.nodefs.NodeFileSystemProvider; import org.mortbay.jetty.orchestrator.util.IOUtil; -import org.mortbay.jetty.orchestrator.util.StreamCopier; import org.mortbay.jetty.orchestrator.util.ProcessHolder; +import org.mortbay.jetty.orchestrator.util.StreamCopier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.slf4j.MDC; public class NodeProcess implements Serializable, AutoCloseable { @@ -77,7 +76,6 @@ public static void main(String[] args) throws Exception String nodeId = args[0]; String connectString = args[1]; - MDC.put("NodeId", nodeId); if (LOG.isDebugEnabled()) LOG.debug("Starting node [{}] with JVM version '{}' connecting to {}", nodeId, System.getProperty("java.version"), connectString); CuratorFramework curator = CuratorFrameworkFactory.newClient(connectString, new RetryNTimes(0, 0)); diff --git a/src/main/java/org/mortbay/jetty/orchestrator/rpc/RpcClient.java b/src/main/java/org/mortbay/jetty/orchestrator/rpc/RpcClient.java index 8a723f8..36bc0f8 100644 --- a/src/main/java/org/mortbay/jetty/orchestrator/rpc/RpcClient.java +++ b/src/main/java/org/mortbay/jetty/orchestrator/rpc/RpcClient.java @@ -24,6 +24,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.apache.curator.framework.CuratorFramework; @@ -39,6 +40,7 @@ public class RpcClient implements AutoCloseable private final SimpleDistributedQueue commandQueue; private final SimpleDistributedQueue responseQueue; private final ExecutorService executorService; + private final AtomicInteger threadIdGenerator = new AtomicInteger(); private final ConcurrentMap> calls = new ConcurrentHashMap<>(); private final AtomicLong requestIdGenerator = new AtomicLong(); private final GlobalNodeId globalNodeId; @@ -48,7 +50,12 @@ public RpcClient(CuratorFramework curator, GlobalNodeId globalNodeId) this.globalNodeId = globalNodeId; commandQueue = new SimpleDistributedQueue(curator, "/clients/" + globalNodeId.getNodeId() + "/commandQ"); responseQueue = new SimpleDistributedQueue(curator, "/clients/" + globalNodeId.getNodeId() + "/responseQ"); - executorService = Executors.newSingleThreadExecutor(); + executorService = Executors.newSingleThreadExecutor(r -> + { + Thread t = new Thread(r); + t.setName("jco-" + threadIdGenerator.getAndIncrement()); + return t; + }); executorService.submit(() -> { while (true) diff --git a/src/main/java/org/mortbay/jetty/orchestrator/rpc/RpcServer.java b/src/main/java/org/mortbay/jetty/orchestrator/rpc/RpcServer.java index 37001ef..94ee12c 100644 --- a/src/main/java/org/mortbay/jetty/orchestrator/rpc/RpcServer.java +++ b/src/main/java/org/mortbay/jetty/orchestrator/rpc/RpcServer.java @@ -20,14 +20,14 @@ import java.io.ObjectOutputStream; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; -import org.mortbay.jetty.orchestrator.ClusterTools; -import org.mortbay.jetty.orchestrator.rpc.command.Command; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.queue.SimpleDistributedQueue; +import org.mortbay.jetty.orchestrator.ClusterTools; +import org.mortbay.jetty.orchestrator.rpc.command.Command; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.slf4j.MDC; public class RpcServer implements AutoCloseable { @@ -37,6 +37,7 @@ public class RpcServer implements AutoCloseable private final SimpleDistributedQueue commandQueue; private final SimpleDistributedQueue responseQueue; private final ExecutorService executorService; + private final AtomicInteger threadIdGenerator = new AtomicInteger(); private volatile boolean active; private final ClusterTools clusterTools; @@ -46,11 +47,11 @@ public RpcServer(CuratorFramework curator, GlobalNodeId globalNodeId) commandQueue = new SimpleDistributedQueue(curator, "/clients/" + globalNodeId.getNodeId() + "/commandQ"); responseQueue = new SimpleDistributedQueue(curator, "/clients/" + globalNodeId.getNodeId() + "/responseQ"); executorService = Executors.newCachedThreadPool(r -> - new Thread(() -> - { - MDC.put("NodeId", globalNodeId.getNodeId()); - r.run(); - })); + { + Thread thread = new Thread(r); + thread.setName(threadIdGenerator.getAndIncrement() + "|" + this.globalNodeId.getNodeId()); + return thread; + }); clusterTools = new ClusterTools(curator, globalNodeId); }