Skip to content

Commit

Permalink
put node id in thread names instead of MDC
Browse files Browse the repository at this point in the history
Signed-off-by: Ludovic Orban <lorban@bitronix.be>
  • Loading branch information
lorban committed Nov 17, 2023
1 parent 5076dd7 commit ed03d91
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,10 @@
import org.mortbay.jetty.orchestrator.configuration.Jvm;
import org.mortbay.jetty.orchestrator.nodefs.NodeFileSystemProvider;
import org.mortbay.jetty.orchestrator.util.IOUtil;
import org.mortbay.jetty.orchestrator.util.StreamCopier;
import org.mortbay.jetty.orchestrator.util.ProcessHolder;
import org.mortbay.jetty.orchestrator.util.StreamCopier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

public class NodeProcess implements Serializable, AutoCloseable
{
Expand Down Expand Up @@ -77,7 +76,6 @@ public static void main(String[] args) throws Exception
String nodeId = args[0];
String connectString = args[1];

MDC.put("NodeId", nodeId);
if (LOG.isDebugEnabled())
LOG.debug("Starting node [{}] with JVM version '{}' connecting to {}", nodeId, System.getProperty("java.version"), connectString);
CuratorFramework curator = CuratorFrameworkFactory.newClient(connectString, new RetryNTimes(0, 0));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.curator.framework.CuratorFramework;
Expand All @@ -39,6 +40,7 @@ public class RpcClient implements AutoCloseable
private final SimpleDistributedQueue commandQueue;
private final SimpleDistributedQueue responseQueue;
private final ExecutorService executorService;
private final AtomicInteger threadIdGenerator = new AtomicInteger();
private final ConcurrentMap<Long, CompletableFuture<Object>> calls = new ConcurrentHashMap<>();
private final AtomicLong requestIdGenerator = new AtomicLong();
private final GlobalNodeId globalNodeId;
Expand All @@ -48,7 +50,12 @@ public RpcClient(CuratorFramework curator, GlobalNodeId globalNodeId)
this.globalNodeId = globalNodeId;
commandQueue = new SimpleDistributedQueue(curator, "/clients/" + globalNodeId.getNodeId() + "/commandQ");
responseQueue = new SimpleDistributedQueue(curator, "/clients/" + globalNodeId.getNodeId() + "/responseQ");
executorService = Executors.newSingleThreadExecutor();
executorService = Executors.newSingleThreadExecutor(r ->
{
Thread t = new Thread(r);
t.setName("jco-" + threadIdGenerator.getAndIncrement());
return t;
});
executorService.submit(() ->
{
while (true)
Expand Down
17 changes: 9 additions & 8 deletions src/main/java/org/mortbay/jetty/orchestrator/rpc/RpcServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@
import java.io.ObjectOutputStream;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

import org.mortbay.jetty.orchestrator.ClusterTools;
import org.mortbay.jetty.orchestrator.rpc.command.Command;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.queue.SimpleDistributedQueue;
import org.mortbay.jetty.orchestrator.ClusterTools;
import org.mortbay.jetty.orchestrator.rpc.command.Command;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

public class RpcServer implements AutoCloseable
{
Expand All @@ -37,6 +37,7 @@ public class RpcServer implements AutoCloseable
private final SimpleDistributedQueue commandQueue;
private final SimpleDistributedQueue responseQueue;
private final ExecutorService executorService;
private final AtomicInteger threadIdGenerator = new AtomicInteger();
private volatile boolean active;
private final ClusterTools clusterTools;

Expand All @@ -46,11 +47,11 @@ public RpcServer(CuratorFramework curator, GlobalNodeId globalNodeId)
commandQueue = new SimpleDistributedQueue(curator, "/clients/" + globalNodeId.getNodeId() + "/commandQ");
responseQueue = new SimpleDistributedQueue(curator, "/clients/" + globalNodeId.getNodeId() + "/responseQ");
executorService = Executors.newCachedThreadPool(r ->
new Thread(() ->
{
MDC.put("NodeId", globalNodeId.getNodeId());
r.run();
}));
{
Thread thread = new Thread(r);
thread.setName(threadIdGenerator.getAndIncrement() + "|" + this.globalNodeId.getNodeId());
return thread;
});
clusterTools = new ClusterTools(curator, globalNodeId);
}

Expand Down

0 comments on commit ed03d91

Please sign in to comment.