Skip to content

Commit

Permalink
Address review comment: rename rpc_server to rpc_service
Browse files Browse the repository at this point in the history
  • Loading branch information
Mridul Muralidharan committed Apr 17, 2024
1 parent 3d6ba99 commit 9b41e6e
Show file tree
Hide file tree
Showing 9 changed files with 34 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
masterRpcEnvInUse =
RpcEnv.create(
RpcNameConstants.LIFECYCLE_MANAGER_MASTER_SYS,
TransportModuleConstants.RPC_SERVER_MODULE,
TransportModuleConstants.RPC_SERVICE_MODULE,
lifecycleHost,
0,
conf,
Expand All @@ -190,7 +190,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
workerRpcEnvInUse =
RpcEnv.create(
RpcNameConstants.LIFECYCLE_MANAGER_WORKER_SYS,
TransportModuleConstants.RPC_SERVER_MODULE,
TransportModuleConstants.RPC_SERVICE_MODULE,
lifecycleHost,
0,
conf,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class TransportModuleConstants {
public static final String RPC_APP_MODULE = "rpc_app";
// RPC module used to communicate with/between server components
// This is used both at server (master/worker) and application side.
public static final String RPC_SERVER_MODULE = "rpc_server";
public static final String RPC_SERVICE_MODULE = "rpc_service";

// Both RPC_APP and RPC_SERVER fallsback to earlier RPC_MODULE for backward
// compatibility
Expand Down
22 changes: 11 additions & 11 deletions common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1368,7 +1368,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
object CelebornConf extends Logging {

val TRANSPORT_MODULE_FALLBACKS = Map(
"rpc_server" -> "rpc",
"rpc_service" -> "rpc",
"rpc_app" -> "rpc",
// only for testing
"test_child_module" -> "test_parent_module")
Expand Down Expand Up @@ -1705,7 +1705,7 @@ object CelebornConf extends Logging {
.doc("If true, we will prefer allocating off-heap byte buffers within Netty. " +
s"If setting <module> to `${TransportModuleConstants.RPC_APP_MODULE}`, " +
s"works for shuffle client. " +
s"If setting <module> to `${TransportModuleConstants.RPC_SERVER_MODULE}`, " +
s"If setting <module> to `${TransportModuleConstants.RPC_SERVICE_MODULE}`, " +
s"works for master or worker. " +
s"If setting <module> to `${TransportModuleConstants.DATA_MODULE}`, " +
s"it works for shuffle client push and fetch data. " +
Expand All @@ -1724,7 +1724,7 @@ object CelebornConf extends Logging {
.doc("Socket connect timeout. " +
s"If setting <module> to `${TransportModuleConstants.RPC_APP_MODULE}`, " +
s"works for shuffle client. " +
s"If setting <module> to `${TransportModuleConstants.RPC_SERVER_MODULE}`, " +
s"If setting <module> to `${TransportModuleConstants.RPC_SERVICE_MODULE}`, " +
s"works for master or worker. " +
s"If setting <module> to `${TransportModuleConstants.DATA_MODULE}`, " +
s"it works for shuffle client push and fetch data. " +
Expand All @@ -1738,7 +1738,7 @@ object CelebornConf extends Logging {
.doc("Connection active timeout. " +
s"If setting <module> to `${TransportModuleConstants.RPC_APP_MODULE}`, " +
s"works for shuffle client. " +
s"If setting <module> to `${TransportModuleConstants.RPC_SERVER_MODULE}`, " +
s"If setting <module> to `${TransportModuleConstants.RPC_SERVICE_MODULE}`, " +
s"works for master or worker. " +
s"If setting <module> to `${TransportModuleConstants.DATA_MODULE}`, " +
s"it works for shuffle client push and fetch data. " +
Expand All @@ -1756,7 +1756,7 @@ object CelebornConf extends Logging {
.doc("Number of concurrent connections between two nodes. " +
s"If setting <module> to `${TransportModuleConstants.RPC_APP_MODULE}`, " +
s"works for shuffle client. " +
s"If setting <module> to `${TransportModuleConstants.RPC_SERVER_MODULE}`, " +
s"If setting <module> to `${TransportModuleConstants.RPC_SERVICE_MODULE}`, " +
s"works for master or worker. " +
s"If setting <module> to `${TransportModuleConstants.DATA_MODULE}`, " +
s"it works for shuffle client push and fetch data. " +
Expand All @@ -1772,7 +1772,7 @@ object CelebornConf extends Logging {
"Requested maximum length of the queue of incoming connections. Default 0 for no backlog. " +
s"If setting <module> to `${TransportModuleConstants.RPC_APP_MODULE}`, " +
s"works for shuffle client. " +
s"If setting <module> to `${TransportModuleConstants.RPC_SERVER_MODULE}`, " +
s"If setting <module> to `${TransportModuleConstants.RPC_SERVICE_MODULE}`, " +
s"works for master or worker. " +
s"If setting <module> to `${TransportModuleConstants.PUSH_MODULE}`, " +
s"it works for worker receiving push data. " +
Expand All @@ -1789,7 +1789,7 @@ object CelebornConf extends Logging {
.doc("Number of threads used in the server thread pool. Default to 0, which is 2x#cores. " +
s"If setting <module> to `${TransportModuleConstants.RPC_APP_MODULE}`, " +
s"works for shuffle client. " +
s"If setting <module> to `${TransportModuleConstants.RPC_SERVER_MODULE}`, " +
s"If setting <module> to `${TransportModuleConstants.RPC_SERVICE_MODULE}`, " +
s"works for master or worker. " +
s"If setting <module> to `${TransportModuleConstants.PUSH_MODULE}`, " +
s"it works for worker receiving push data. " +
Expand All @@ -1806,7 +1806,7 @@ object CelebornConf extends Logging {
.doc("Number of threads used in the client thread pool. Default to 0, which is 2x#cores. " +
s"If setting <module> to `${TransportModuleConstants.RPC_APP_MODULE}`, " +
s"works for shuffle client. " +
s"If setting <module> to `${TransportModuleConstants.RPC_SERVER_MODULE}`, " +
s"If setting <module> to `${TransportModuleConstants.RPC_SERVICE_MODULE}`, " +
s"works for master or worker. " +
s"If setting <module> to `${TransportModuleConstants.DATA_MODULE}`, " +
s"it works for shuffle client push and fetch data. " +
Expand All @@ -1823,7 +1823,7 @@ object CelebornConf extends Logging {
"buffer size should be ~ 1.25MB. " +
s"If setting <module> to `${TransportModuleConstants.RPC_APP_MODULE}`, " +
s"works for shuffle client. " +
s"If setting <module> to `${TransportModuleConstants.RPC_SERVER_MODULE}`, " +
s"If setting <module> to `${TransportModuleConstants.RPC_SERVICE_MODULE}`, " +
s"works for master or worker. " +
s"If setting <module> to `${TransportModuleConstants.DATA_MODULE}`, " +
s"it works for shuffle client push and fetch data. " +
Expand All @@ -1843,7 +1843,7 @@ object CelebornConf extends Logging {
.doc("Send buffer size (SO_SNDBUF). " +
s"If setting <module> to `${TransportModuleConstants.RPC_APP_MODULE}`, " +
s"works for shuffle client. " +
s"If setting <module> to `${TransportModuleConstants.RPC_SERVER_MODULE}`, " +
s"If setting <module> to `${TransportModuleConstants.RPC_SERVICE_MODULE}`, " +
s"works for master or worker. " +
s"If setting <module> to `${TransportModuleConstants.DATA_MODULE}`, " +
s"it works for shuffle client push and fetch data. " +
Expand Down Expand Up @@ -1980,7 +1980,7 @@ object CelebornConf extends Logging {
.doc("The heartbeat interval between worker and client. " +
s"If setting <module> to `${TransportModuleConstants.RPC_APP_MODULE}`, " +
s"works for shuffle client. " +
s"If setting <module> to `${TransportModuleConstants.RPC_SERVER_MODULE}`, " +
s"If setting <module> to `${TransportModuleConstants.RPC_SERVICE_MODULE}`, " +
s"works for master or worker. " +
s"If setting <module> to `${TransportModuleConstants.DATA_MODULE}`, " +
s"it works for shuffle client push and fetch data. " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,6 @@ private[celeborn] case class RpcEnvConfig(
numUsableCores: Int,
securityContext: Option[RpcSecurityContext]) {
assert(TransportModuleConstants.RPC_APP_MODULE == transportModule ||
TransportModuleConstants.RPC_SERVER_MODULE == transportModule ||
TransportModuleConstants.RPC_SERVICE_MODULE == transportModule ||
TransportModuleConstants.RPC_MODULE == transportModule)
}
Original file line number Diff line number Diff line change
Expand Up @@ -376,10 +376,10 @@ class CelebornConfSuite extends CelebornFunSuite {
validateDefauitTransportConfValue(conf, "test_child_module")
}

test("rpc_server and rpc_client should default to rpc if not configured") {
test("rpc_service and rpc_client should default to rpc if not configured") {
val conf = setupCelebornConfForTransportTests("rpc")
// set in rpc, so should work for specific rpc servers
validateDefauitTransportConfValue(conf, "rpc_server")
validateDefauitTransportConfValue(conf, "rpc_service")
validateDefauitTransportConfValue(conf, "rpc_app")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ class WorkerInfoSuite extends CelebornFunSuite {
try {
rpcEnv = RpcEnv.create(
"mockEnv",
TransportModuleConstants.RPC_SERVER_MODULE,
TransportModuleConstants.RPC_SERVICE_MODULE,
"localhost",
"localhost",
12345,
Expand Down
Loading

0 comments on commit 9b41e6e

Please sign in to comment.