From 9b41e6efd101aa58632d01b6c451c4044afdf6cd Mon Sep 17 00:00:00 2001 From: Mridul Muralidharan Date: Wed, 17 Apr 2024 00:58:23 -0500 Subject: [PATCH] Address review comment: rename rpc_server to rpc_service --- .../celeborn/client/LifecycleManager.scala | 4 ++-- .../protocol/TransportModuleConstants.java | 2 +- .../apache/celeborn/common/CelebornConf.scala | 22 +++++++++---------- .../apache/celeborn/common/rpc/RpcEnv.scala | 2 +- .../celeborn/common/CelebornConfSuite.scala | 4 ++-- .../common/meta/WorkerInfoSuite.scala | 2 +- docs/configuration/network.md | 20 ++++++++--------- .../service/deploy/master/Master.scala | 6 ++--- .../service/deploy/worker/Worker.scala | 6 ++--- 9 files changed, 34 insertions(+), 34 deletions(-) diff --git a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala index a31afd78d2..8e577a130a 100644 --- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala +++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala @@ -179,7 +179,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends masterRpcEnvInUse = RpcEnv.create( RpcNameConstants.LIFECYCLE_MANAGER_MASTER_SYS, - TransportModuleConstants.RPC_SERVER_MODULE, + TransportModuleConstants.RPC_SERVICE_MODULE, lifecycleHost, 0, conf, @@ -190,7 +190,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends workerRpcEnvInUse = RpcEnv.create( RpcNameConstants.LIFECYCLE_MANAGER_WORKER_SYS, - TransportModuleConstants.RPC_SERVER_MODULE, + TransportModuleConstants.RPC_SERVICE_MODULE, lifecycleHost, 0, conf, diff --git a/common/src/main/java/org/apache/celeborn/common/protocol/TransportModuleConstants.java b/common/src/main/java/org/apache/celeborn/common/protocol/TransportModuleConstants.java index ff2e506739..9e813ae3a4 100644 --- a/common/src/main/java/org/apache/celeborn/common/protocol/TransportModuleConstants.java +++ b/common/src/main/java/org/apache/celeborn/common/protocol/TransportModuleConstants.java @@ -27,7 +27,7 @@ public class TransportModuleConstants { public static final String RPC_APP_MODULE = "rpc_app"; // RPC module used to communicate with/between server components // This is used both at server (master/worker) and application side. - public static final String RPC_SERVER_MODULE = "rpc_server"; + public static final String RPC_SERVICE_MODULE = "rpc_service"; // Both RPC_APP and RPC_SERVER fallsback to earlier RPC_MODULE for backward // compatibility diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index d12fc2d1a4..c237039795 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -1368,7 +1368,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se object CelebornConf extends Logging { val TRANSPORT_MODULE_FALLBACKS = Map( - "rpc_server" -> "rpc", + "rpc_service" -> "rpc", "rpc_app" -> "rpc", // only for testing "test_child_module" -> "test_parent_module") @@ -1705,7 +1705,7 @@ object CelebornConf extends Logging { .doc("If true, we will prefer allocating off-heap byte buffers within Netty. " + s"If setting to `${TransportModuleConstants.RPC_APP_MODULE}`, " + s"works for shuffle client. " + - s"If setting to `${TransportModuleConstants.RPC_SERVER_MODULE}`, " + + s"If setting to `${TransportModuleConstants.RPC_SERVICE_MODULE}`, " + s"works for master or worker. " + s"If setting to `${TransportModuleConstants.DATA_MODULE}`, " + s"it works for shuffle client push and fetch data. " + @@ -1724,7 +1724,7 @@ object CelebornConf extends Logging { .doc("Socket connect timeout. " + s"If setting to `${TransportModuleConstants.RPC_APP_MODULE}`, " + s"works for shuffle client. " + - s"If setting to `${TransportModuleConstants.RPC_SERVER_MODULE}`, " + + s"If setting to `${TransportModuleConstants.RPC_SERVICE_MODULE}`, " + s"works for master or worker. " + s"If setting to `${TransportModuleConstants.DATA_MODULE}`, " + s"it works for shuffle client push and fetch data. " + @@ -1738,7 +1738,7 @@ object CelebornConf extends Logging { .doc("Connection active timeout. " + s"If setting to `${TransportModuleConstants.RPC_APP_MODULE}`, " + s"works for shuffle client. " + - s"If setting to `${TransportModuleConstants.RPC_SERVER_MODULE}`, " + + s"If setting to `${TransportModuleConstants.RPC_SERVICE_MODULE}`, " + s"works for master or worker. " + s"If setting to `${TransportModuleConstants.DATA_MODULE}`, " + s"it works for shuffle client push and fetch data. " + @@ -1756,7 +1756,7 @@ object CelebornConf extends Logging { .doc("Number of concurrent connections between two nodes. " + s"If setting to `${TransportModuleConstants.RPC_APP_MODULE}`, " + s"works for shuffle client. " + - s"If setting to `${TransportModuleConstants.RPC_SERVER_MODULE}`, " + + s"If setting to `${TransportModuleConstants.RPC_SERVICE_MODULE}`, " + s"works for master or worker. " + s"If setting to `${TransportModuleConstants.DATA_MODULE}`, " + s"it works for shuffle client push and fetch data. " + @@ -1772,7 +1772,7 @@ object CelebornConf extends Logging { "Requested maximum length of the queue of incoming connections. Default 0 for no backlog. " + s"If setting to `${TransportModuleConstants.RPC_APP_MODULE}`, " + s"works for shuffle client. " + - s"If setting to `${TransportModuleConstants.RPC_SERVER_MODULE}`, " + + s"If setting to `${TransportModuleConstants.RPC_SERVICE_MODULE}`, " + s"works for master or worker. " + s"If setting to `${TransportModuleConstants.PUSH_MODULE}`, " + s"it works for worker receiving push data. " + @@ -1789,7 +1789,7 @@ object CelebornConf extends Logging { .doc("Number of threads used in the server thread pool. Default to 0, which is 2x#cores. " + s"If setting to `${TransportModuleConstants.RPC_APP_MODULE}`, " + s"works for shuffle client. " + - s"If setting to `${TransportModuleConstants.RPC_SERVER_MODULE}`, " + + s"If setting to `${TransportModuleConstants.RPC_SERVICE_MODULE}`, " + s"works for master or worker. " + s"If setting to `${TransportModuleConstants.PUSH_MODULE}`, " + s"it works for worker receiving push data. " + @@ -1806,7 +1806,7 @@ object CelebornConf extends Logging { .doc("Number of threads used in the client thread pool. Default to 0, which is 2x#cores. " + s"If setting to `${TransportModuleConstants.RPC_APP_MODULE}`, " + s"works for shuffle client. " + - s"If setting to `${TransportModuleConstants.RPC_SERVER_MODULE}`, " + + s"If setting to `${TransportModuleConstants.RPC_SERVICE_MODULE}`, " + s"works for master or worker. " + s"If setting to `${TransportModuleConstants.DATA_MODULE}`, " + s"it works for shuffle client push and fetch data. " + @@ -1823,7 +1823,7 @@ object CelebornConf extends Logging { "buffer size should be ~ 1.25MB. " + s"If setting to `${TransportModuleConstants.RPC_APP_MODULE}`, " + s"works for shuffle client. " + - s"If setting to `${TransportModuleConstants.RPC_SERVER_MODULE}`, " + + s"If setting to `${TransportModuleConstants.RPC_SERVICE_MODULE}`, " + s"works for master or worker. " + s"If setting to `${TransportModuleConstants.DATA_MODULE}`, " + s"it works for shuffle client push and fetch data. " + @@ -1843,7 +1843,7 @@ object CelebornConf extends Logging { .doc("Send buffer size (SO_SNDBUF). " + s"If setting to `${TransportModuleConstants.RPC_APP_MODULE}`, " + s"works for shuffle client. " + - s"If setting to `${TransportModuleConstants.RPC_SERVER_MODULE}`, " + + s"If setting to `${TransportModuleConstants.RPC_SERVICE_MODULE}`, " + s"works for master or worker. " + s"If setting to `${TransportModuleConstants.DATA_MODULE}`, " + s"it works for shuffle client push and fetch data. " + @@ -1980,7 +1980,7 @@ object CelebornConf extends Logging { .doc("The heartbeat interval between worker and client. " + s"If setting to `${TransportModuleConstants.RPC_APP_MODULE}`, " + s"works for shuffle client. " + - s"If setting to `${TransportModuleConstants.RPC_SERVER_MODULE}`, " + + s"If setting to `${TransportModuleConstants.RPC_SERVICE_MODULE}`, " + s"works for master or worker. " + s"If setting to `${TransportModuleConstants.DATA_MODULE}`, " + s"it works for shuffle client push and fetch data. " + diff --git a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala index 6e7423837c..90847f71b9 100644 --- a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala +++ b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala @@ -194,6 +194,6 @@ private[celeborn] case class RpcEnvConfig( numUsableCores: Int, securityContext: Option[RpcSecurityContext]) { assert(TransportModuleConstants.RPC_APP_MODULE == transportModule || - TransportModuleConstants.RPC_SERVER_MODULE == transportModule || + TransportModuleConstants.RPC_SERVICE_MODULE == transportModule || TransportModuleConstants.RPC_MODULE == transportModule) } diff --git a/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala b/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala index 5126f39349..6f05f6b658 100644 --- a/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala +++ b/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala @@ -376,10 +376,10 @@ class CelebornConfSuite extends CelebornFunSuite { validateDefauitTransportConfValue(conf, "test_child_module") } - test("rpc_server and rpc_client should default to rpc if not configured") { + test("rpc_service and rpc_client should default to rpc if not configured") { val conf = setupCelebornConfForTransportTests("rpc") // set in rpc, so should work for specific rpc servers - validateDefauitTransportConfValue(conf, "rpc_server") + validateDefauitTransportConfValue(conf, "rpc_service") validateDefauitTransportConfValue(conf, "rpc_app") } diff --git a/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala b/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala index 1adbca91f8..2dc817c9d9 100644 --- a/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala +++ b/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala @@ -270,7 +270,7 @@ class WorkerInfoSuite extends CelebornFunSuite { try { rpcEnv = RpcEnv.create( "mockEnv", - TransportModuleConstants.RPC_SERVER_MODULE, + TransportModuleConstants.RPC_SERVICE_MODULE, "localhost", "localhost", 12345, diff --git a/docs/configuration/network.md b/docs/configuration/network.md index 750f5e720d..3cfad02e8d 100644 --- a/docs/configuration/network.md +++ b/docs/configuration/network.md @@ -21,22 +21,22 @@ license: | | --- | ------- | --------- | ----------- | ----- | ---------- | | celeborn.<module>.fetch.timeoutCheck.interval | 5s | false | Interval for checking fetch data timeout. It only support setting to `data` since it works for shuffle client fetch data. | 0.3.0 | | | celeborn.<module>.fetch.timeoutCheck.threads | 4 | false | Threads num for checking fetch data timeout. It only support setting to `data` since it works for shuffle client fetch data. | 0.3.0 | | -| celeborn.<module>.heartbeat.interval | 60s | false | The heartbeat interval between worker and client. If setting to `rpc_app`, works for shuffle client. If setting to `rpc_server`, works for master or worker. If setting to `data`, it works for shuffle client push and fetch data. If setting to `replicate`, it works for replicate client of worker replicating data to peer worker.If you are using the "celeborn.client.heartbeat.interval", please use the new configs for each module according to your needs or replace it with "celeborn.rpc.heartbeat.interval", "celeborn.data.heartbeat.interval" and"celeborn.replicate.heartbeat.interval". | 0.3.0 | celeborn.client.heartbeat.interval | -| celeborn.<module>.io.backLog | 0 | false | Requested maximum length of the queue of incoming connections. Default 0 for no backlog. If setting to `rpc_app`, works for shuffle client. If setting to `rpc_server`, works for master or worker. If setting to `push`, it works for worker receiving push data. If setting to `replicate`, it works for replicate server of worker replicating data to peer worker. If setting to `fetch`, it works for worker fetch server. | | | -| celeborn.<module>.io.clientThreads | 0 | false | Number of threads used in the client thread pool. Default to 0, which is 2x#cores. If setting to `rpc_app`, works for shuffle client. If setting to `rpc_server`, works for master or worker. If setting to `data`, it works for shuffle client push and fetch data. If setting to `replicate`, it works for replicate client of worker replicating data to peer worker. | | | -| celeborn.<module>.io.connectTimeout | <value of celeborn.network.connect.timeout> | false | Socket connect timeout. If setting to `rpc_app`, works for shuffle client. If setting to `rpc_server`, works for master or worker. If setting to `data`, it works for shuffle client push and fetch data. If setting to `replicate`, it works for the replicate client of worker replicating data to peer worker. | | | -| celeborn.<module>.io.connectionTimeout | <value of celeborn.network.timeout> | false | Connection active timeout. If setting to `rpc_app`, works for shuffle client. If setting to `rpc_server`, works for master or worker. If setting to `data`, it works for shuffle client push and fetch data. If setting to `push`, it works for worker receiving push data. If setting to `replicate`, it works for replicate server or client of worker replicating data to peer worker. If setting to `fetch`, it works for worker fetch server. | | | +| celeborn.<module>.heartbeat.interval | 60s | false | The heartbeat interval between worker and client. If setting to `rpc_app`, works for shuffle client. If setting to `rpc_service`, works for master or worker. If setting to `data`, it works for shuffle client push and fetch data. If setting to `replicate`, it works for replicate client of worker replicating data to peer worker.If you are using the "celeborn.client.heartbeat.interval", please use the new configs for each module according to your needs or replace it with "celeborn.rpc.heartbeat.interval", "celeborn.data.heartbeat.interval" and"celeborn.replicate.heartbeat.interval". | 0.3.0 | celeborn.client.heartbeat.interval | +| celeborn.<module>.io.backLog | 0 | false | Requested maximum length of the queue of incoming connections. Default 0 for no backlog. If setting to `rpc_app`, works for shuffle client. If setting to `rpc_service`, works for master or worker. If setting to `push`, it works for worker receiving push data. If setting to `replicate`, it works for replicate server of worker replicating data to peer worker. If setting to `fetch`, it works for worker fetch server. | | | +| celeborn.<module>.io.clientThreads | 0 | false | Number of threads used in the client thread pool. Default to 0, which is 2x#cores. If setting to `rpc_app`, works for shuffle client. If setting to `rpc_service`, works for master or worker. If setting to `data`, it works for shuffle client push and fetch data. If setting to `replicate`, it works for replicate client of worker replicating data to peer worker. | | | +| celeborn.<module>.io.connectTimeout | <value of celeborn.network.connect.timeout> | false | Socket connect timeout. If setting to `rpc_app`, works for shuffle client. If setting to `rpc_service`, works for master or worker. If setting to `data`, it works for shuffle client push and fetch data. If setting to `replicate`, it works for the replicate client of worker replicating data to peer worker. | | | +| celeborn.<module>.io.connectionTimeout | <value of celeborn.network.timeout> | false | Connection active timeout. If setting to `rpc_app`, works for shuffle client. If setting to `rpc_service`, works for master or worker. If setting to `data`, it works for shuffle client push and fetch data. If setting to `push`, it works for worker receiving push data. If setting to `replicate`, it works for replicate server or client of worker replicating data to peer worker. If setting to `fetch`, it works for worker fetch server. | | | | celeborn.<module>.io.enableVerboseMetrics | false | false | Whether to track Netty memory detailed metrics. If true, the detailed metrics of Netty PoolByteBufAllocator will be gotten, otherwise only general memory usage will be tracked. | | | | celeborn.<module>.io.lazyFD | true | false | Whether to initialize FileDescriptor lazily or not. If true, file descriptors are created only when data is going to be transferred. This can reduce the number of open files. If setting to `fetch`, it works for worker fetch server. | | | | celeborn.<module>.io.maxRetries | 3 | false | Max number of times we will try IO exceptions (such as connection timeouts) per request. If set to 0, we will not do any retries. If setting to `push`, it works for Flink shuffle client push data. | | | | celeborn.<module>.io.mode | NIO | false | Netty EventLoopGroup backend, available options: NIO, EPOLL. | | | -| celeborn.<module>.io.numConnectionsPerPeer | 1 | false | Number of concurrent connections between two nodes. If setting to `rpc_app`, works for shuffle client. If setting to `rpc_server`, works for master or worker. If setting to `data`, it works for shuffle client push and fetch data. If setting to `replicate`, it works for replicate client of worker replicating data to peer worker. | | | -| celeborn.<module>.io.preferDirectBufs | true | false | If true, we will prefer allocating off-heap byte buffers within Netty. If setting to `rpc_app`, works for shuffle client. If setting to `rpc_server`, works for master or worker. If setting to `data`, it works for shuffle client push and fetch data. If setting to `push`, it works for worker receiving push data. If setting to `replicate`, it works for replicate server or client of worker replicating data to peer worker. If setting to `fetch`, it works for worker fetch server. | | | -| celeborn.<module>.io.receiveBuffer | 0b | false | Receive buffer size (SO_RCVBUF). Note: the optimal size for receive buffer and send buffer should be latency * network_bandwidth. Assuming latency = 1ms, network_bandwidth = 10Gbps buffer size should be ~ 1.25MB. If setting to `rpc_app`, works for shuffle client. If setting to `rpc_server`, works for master or worker. If setting to `data`, it works for shuffle client push and fetch data. If setting to `push`, it works for worker receiving push data. If setting to `replicate`, it works for replicate server or client of worker replicating data to peer worker. If setting to `fetch`, it works for worker fetch server. | 0.2.0 | | +| celeborn.<module>.io.numConnectionsPerPeer | 1 | false | Number of concurrent connections between two nodes. If setting to `rpc_app`, works for shuffle client. If setting to `rpc_service`, works for master or worker. If setting to `data`, it works for shuffle client push and fetch data. If setting to `replicate`, it works for replicate client of worker replicating data to peer worker. | | | +| celeborn.<module>.io.preferDirectBufs | true | false | If true, we will prefer allocating off-heap byte buffers within Netty. If setting to `rpc_app`, works for shuffle client. If setting to `rpc_service`, works for master or worker. If setting to `data`, it works for shuffle client push and fetch data. If setting to `push`, it works for worker receiving push data. If setting to `replicate`, it works for replicate server or client of worker replicating data to peer worker. If setting to `fetch`, it works for worker fetch server. | | | +| celeborn.<module>.io.receiveBuffer | 0b | false | Receive buffer size (SO_RCVBUF). Note: the optimal size for receive buffer and send buffer should be latency * network_bandwidth. Assuming latency = 1ms, network_bandwidth = 10Gbps buffer size should be ~ 1.25MB. If setting to `rpc_app`, works for shuffle client. If setting to `rpc_service`, works for master or worker. If setting to `data`, it works for shuffle client push and fetch data. If setting to `push`, it works for worker receiving push data. If setting to `replicate`, it works for replicate server or client of worker replicating data to peer worker. If setting to `fetch`, it works for worker fetch server. | 0.2.0 | | | celeborn.<module>.io.retryWait | 5s | false | Time that we will wait in order to perform a retry after an IOException. Only relevant if maxIORetries > 0. If setting to `data`, it works for shuffle client push and fetch data. If setting to `push`, it works for Flink shuffle client push data. | 0.2.0 | | | celeborn.<module>.io.saslTimeout | 30s | false | Timeout for a single round trip of auth message exchange, in milliseconds. | 0.5.0 | | -| celeborn.<module>.io.sendBuffer | 0b | false | Send buffer size (SO_SNDBUF). If setting to `rpc_app`, works for shuffle client. If setting to `rpc_server`, works for master or worker. If setting to `data`, it works for shuffle client push and fetch data. If setting to `push`, it works for worker receiving push data. If setting to `replicate`, it works for replicate server or client of worker replicating data to peer worker. If setting to `fetch`, it works for worker fetch server. | 0.2.0 | | -| celeborn.<module>.io.serverThreads | 0 | false | Number of threads used in the server thread pool. Default to 0, which is 2x#cores. If setting to `rpc_app`, works for shuffle client. If setting to `rpc_server`, works for master or worker. If setting to `push`, it works for worker receiving push data. If setting to `replicate`, it works for replicate server of worker replicating data to peer worker. If setting to `fetch`, it works for worker fetch server. | | | +| celeborn.<module>.io.sendBuffer | 0b | false | Send buffer size (SO_SNDBUF). If setting to `rpc_app`, works for shuffle client. If setting to `rpc_service`, works for master or worker. If setting to `data`, it works for shuffle client push and fetch data. If setting to `push`, it works for worker receiving push data. If setting to `replicate`, it works for replicate server or client of worker replicating data to peer worker. If setting to `fetch`, it works for worker fetch server. | 0.2.0 | | +| celeborn.<module>.io.serverThreads | 0 | false | Number of threads used in the server thread pool. Default to 0, which is 2x#cores. If setting to `rpc_app`, works for shuffle client. If setting to `rpc_service`, works for master or worker. If setting to `push`, it works for worker receiving push data. If setting to `replicate`, it works for replicate server of worker replicating data to peer worker. If setting to `fetch`, it works for worker fetch server. | | | | celeborn.<module>.push.timeoutCheck.interval | 5s | false | Interval for checking push data timeout. If setting to `data`, it works for shuffle client push data. If setting to `push`, it works for Flink shuffle client push data. If setting to `replicate`, it works for replicate client of worker replicating data to peer worker. | 0.3.0 | | | celeborn.<module>.push.timeoutCheck.threads | 4 | false | Threads num for checking push data timeout. If setting to `data`, it works for shuffle client push data. If setting to `push`, it works for Flink shuffle client push data. If setting to `replicate`, it works for replicate client of worker replicating data to peer worker. | 0.3.0 | | | celeborn.<role>.rpc.dispatcher.threads | <value of celeborn.rpc.dispatcher.threads> | false | Threads number of message dispatcher event loop for roles | | | diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala index 69602d066a..8f9c0417ed 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala @@ -90,7 +90,7 @@ private[celeborn] class Master( if (!authEnabled) { RpcEnv.create( RpcNameConstants.MASTER_SYS, - TransportModuleConstants.RPC_SERVER_MODULE, + TransportModuleConstants.RPC_SERVICE_MODULE, masterArgs.host, masterArgs.host, masterArgs.port, @@ -106,7 +106,7 @@ private[celeborn] class Master( s"Secure port enabled ${masterArgs.port} for secured RPC.") RpcEnv.create( RpcNameConstants.MASTER_SYS, - TransportModuleConstants.RPC_SERVER_MODULE, + TransportModuleConstants.RPC_SERVICE_MODULE, masterArgs.host, masterArgs.host, masterArgs.port, @@ -124,7 +124,7 @@ private[celeborn] class Master( s"Internal port enabled, using internal port ${masterArgs.internalPort} for internal RPC.") RpcEnv.create( RpcNameConstants.MASTER_INTERNAL_SYS, - TransportModuleConstants.RPC_SERVER_MODULE, + TransportModuleConstants.RPC_SERVICE_MODULE, masterArgs.host, masterArgs.host, masterArgs.internalPort, diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala index 7888c9cd0f..07d29e4651 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala @@ -95,7 +95,7 @@ private[celeborn] class Worker( if (!authEnabled) { RpcEnv.create( RpcNameConstants.WORKER_SYS, - TransportModuleConstants.RPC_SERVER_MODULE, + TransportModuleConstants.RPC_SERVICE_MODULE, workerArgs.host, workerArgs.host, workerArgs.port, @@ -111,7 +111,7 @@ private[celeborn] class Worker( s"Secure port enabled ${workerArgs.port} for secured RPC.") RpcEnv.create( RpcNameConstants.WORKER_SYS, - TransportModuleConstants.RPC_SERVER_MODULE, + TransportModuleConstants.RPC_SERVICE_MODULE, workerArgs.host, workerArgs.host, workerArgs.port, @@ -126,7 +126,7 @@ private[celeborn] class Worker( } else { RpcEnv.create( RpcNameConstants.WORKER_INTERNAL_SYS, - TransportModuleConstants.RPC_SERVER_MODULE, + TransportModuleConstants.RPC_SERVICE_MODULE, workerArgs.host, workerArgs.host, workerArgs.internalPort,