diff --git a/common/src/main/java/org/apache/celeborn/common/client/MasterClient.java b/common/src/main/java/org/apache/celeborn/common/client/MasterClient.java index 90db88a166..c34e1050bf 100644 --- a/common/src/main/java/org/apache/celeborn/common/client/MasterClient.java +++ b/common/src/main/java/org/apache/celeborn/common/client/MasterClient.java @@ -266,24 +266,13 @@ private RpcEndpointRef setupEndpointRef(String endpoint) { } private List resolveMasterEndpoints() { - if (isWorker) { + if (isWorker && conf.internalPortEnabled()) { // For worker, we should use the internal endpoints if internal port is enabled. - if (conf.internalPortEnabled()) { - masterEndpointName = RpcNameConstants.MASTER_INTERNAL_EP; - return Arrays.asList(conf.masterInternalEndpoints()); - } else { - masterEndpointName = RpcNameConstants.MASTER_EP; - return Arrays.asList(conf.masterEndpoints()); - } + masterEndpointName = RpcNameConstants.MASTER_INTERNAL_EP; + return Arrays.asList(conf.masterInternalEndpoints()); } else { - // This is for client, so we should use the secured endpoints if auth is enabled. - if (conf.authEnabled()) { - masterEndpointName = RpcNameConstants.MASTER_SECURED_EP; - return Arrays.asList(conf.masterSecuredEndpoints()); - } else { - masterEndpointName = RpcNameConstants.MASTER_EP; - return Arrays.asList(conf.masterEndpoints()); - } + masterEndpointName = RpcNameConstants.MASTER_EP; + return Arrays.asList(conf.masterEndpoints()); } } } diff --git a/common/src/main/java/org/apache/celeborn/common/protocol/RpcNameConstants.java b/common/src/main/java/org/apache/celeborn/common/protocol/RpcNameConstants.java index fed7b827b4..7be02b1607 100644 --- a/common/src/main/java/org/apache/celeborn/common/protocol/RpcNameConstants.java +++ b/common/src/main/java/org/apache/celeborn/common/protocol/RpcNameConstants.java @@ -21,11 +21,10 @@ public class RpcNameConstants { // For Master public static String MASTER_SYS = "Master"; public static String MASTER_INTERNAL_SYS = "MasterInternal"; - public static String MASTER_SECURED_SYS = "MasterSecured"; + // Master Endpoint Name public static String MASTER_EP = "MasterEndpoint"; public static String MASTER_INTERNAL_EP = "MasterInternalEndpoint"; - public static String MASTER_SECURED_EP = "MasterSecuredEndpoint"; // For Worker public static String WORKER_SYS = "Worker"; diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index a7a45abaaa..8b0d9ef1d2 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -1142,21 +1142,6 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se return authEnabled && internalPortEnabled } - def haMasterNodeSecuredPort(nodeId: String): Int = { - val key = HA_MASTER_NODE_SECURED_PORT.key.replace("", nodeId) - getInt(key, HA_MASTER_NODE_SECURED_PORT.defaultValue.get) - } - - def masterSecuredPort: Int = get(MASTER_SECURED_PORT) - - def masterSecuredEndpoints: Array[String] = - get(MASTER_SECURED_ENDPOINTS).toArray.map { endpoint => - Utils.parseHostPort(endpoint.replace("", Utils.localHostName(this))) match { - case (host, 0) => s"$host:${HA_MASTER_NODE_SECURED_PORT.defaultValue.get}" - case (host, port) => s"$host:$port" - } - } - // ////////////////////////////////////////////////////// // Internal Port // // ////////////////////////////////////////////////////// @@ -4620,38 +4605,4 @@ object CelebornConf extends Logging { .timeConf(TimeUnit.MILLISECONDS) .createWithDefaultString("30s") - val MASTER_SECURED_PORT: ConfigEntry[Int] = - buildConf("celeborn.master.secured.port") - .categories("master", "auth") - .version("0.5.0") - .doc( - "Secured port on the master where clients connect.") - .intConf - .checkValue(p => p >= 1024 && p < 65535, "Invalid port") - .createWithDefault(19097) - - val HA_MASTER_NODE_SECURED_PORT: ConfigEntry[Int] = - buildConf("celeborn.master.ha.node..secured.port") - .categories("ha", "auth") - .doc( - "Secured port for the clients to bind to a master node in HA mode.") - .version("0.5.0") - .intConf - .checkValue(p => p >= 1024 && p < 65535, "Invalid port") - .createWithDefault(19097) - - val MASTER_SECURED_ENDPOINTS: ConfigEntry[Seq[String]] = - buildConf("celeborn.master.secured.endpoints") - .categories("client", "auth") - .doc("Endpoints of master nodes for celeborn client to connect for secured communication, allowed pattern " + - "is: `:[,:]*`, e.g. `clb1:19097,clb2:19097,clb3:19097`. " + - "If the port is omitted, 19097 will be used.") - .version("0.5.0") - .stringConf - .toSequence - .checkValue( - endpoints => endpoints.map(_ => Try(Utils.parseHostPort(_))).forall(_.isSuccess), - "Allowed pattern is: `:[,:]*`") - .createWithDefaultString(s":19097") - } diff --git a/docs/configuration/client.md b/docs/configuration/client.md index 5675ab576e..b3de107cc3 100644 --- a/docs/configuration/client.md +++ b/docs/configuration/client.md @@ -111,7 +111,6 @@ license: | | celeborn.client.spark.shuffle.forceFallback.numPartitionsThreshold | 2147483647 | false | Celeborn will only accept shuffle of partition number lower than this configuration value. | 0.3.0 | celeborn.shuffle.forceFallback.numPartitionsThreshold | | celeborn.client.spark.shuffle.writer | HASH | false | Celeborn supports the following kind of shuffle writers. 1. hash: hash-based shuffle writer works fine when shuffle partition count is normal; 2. sort: sort-based shuffle writer works fine when memory pressure is high or shuffle partition count is huge. This configuration only takes effect when celeborn.client.spark.push.dynamicWriteMode.enabled is false. | 0.3.0 | celeborn.shuffle.writer | | celeborn.master.endpoints | <localhost>:9097 | false | Endpoints of master nodes for celeborn client to connect, allowed pattern is: `:[,:]*`, e.g. `clb1:9097,clb2:9098,clb3:9099`. If the port is omitted, 9097 will be used. | 0.2.0 | | -| celeborn.master.secured.endpoints | <localhost>:19097 | false | Endpoints of master nodes for celeborn client to connect for secured communication, allowed pattern is: `:[,:]*`, e.g. `clb1:19097,clb2:19097,clb3:19097`. If the port is omitted, 19097 will be used. | 0.5.0 | | | celeborn.storage.availableTypes | HDD | false | Enabled storages. Available options: MEMORY,HDD,SSD,HDFS. Note: HDD and SSD would be treated as identical. | 0.3.0 | celeborn.storage.activeTypes | | celeborn.storage.hdfs.dir | <undefined> | false | HDFS base directory for Celeborn to store shuffle data. | 0.2.0 | | diff --git a/docs/configuration/ha.md b/docs/configuration/ha.md index a588d27b1e..15044ea613 100644 --- a/docs/configuration/ha.md +++ b/docs/configuration/ha.md @@ -24,7 +24,6 @@ license: | | celeborn.master.ha.node.<id>.internal.port | 8097 | false | Internal port for the workers and other masters to bind to a master node in HA mode. | 0.5.0 | | | celeborn.master.ha.node.<id>.port | 9097 | false | Port to bind of master node in HA mode. | 0.3.0 | celeborn.ha.master.node.<id>.port | | celeborn.master.ha.node.<id>.ratis.port | 9872 | false | Ratis port to bind of master node in HA mode. | 0.3.0 | celeborn.ha.master.node.<id>.ratis.port | -| celeborn.master.ha.node.<id>.secured.port | 19097 | false | Secured port for the clients to bind to a master node in HA mode. | 0.5.0 | | | celeborn.master.ha.ratis.raft.rpc.type | netty | false | RPC type for Ratis, available options: netty, grpc. | 0.3.0 | celeborn.ha.master.ratis.raft.rpc.type | | celeborn.master.ha.ratis.raft.server.storage.dir | /tmp/ratis | false | | 0.3.0 | celeborn.ha.master.ratis.raft.server.storage.dir | diff --git a/docs/configuration/master.md b/docs/configuration/master.md index f01cdba9e6..9cc78f5e7b 100644 --- a/docs/configuration/master.md +++ b/docs/configuration/master.md @@ -44,7 +44,6 @@ license: | | celeborn.master.internal.port | 8097 | false | Internal port on the master where both workers and other master nodes connect. | 0.5.0 | | | celeborn.master.port | 9097 | false | Port for master to bind. | 0.2.0 | | | celeborn.master.rackResolver.refresh.interval | 30s | false | Interval for refreshing the node rack information periodically. | 0.5.0 | | -| celeborn.master.secured.port | 19097 | false | Secured port on the master where clients connect. | 0.5.0 | | | celeborn.master.slot.assign.extraSlots | 2 | false | Extra slots number when master assign slots. | 0.3.0 | celeborn.slots.assign.extraSlots | | celeborn.master.slot.assign.loadAware.diskGroupGradient | 0.1 | false | This value means how many more workload will be placed into a faster disk group than a slower group. | 0.3.0 | celeborn.slots.assign.loadAware.diskGroupGradient | | celeborn.master.slot.assign.loadAware.fetchTimeWeight | 1.0 | false | Weight of average fetch time when calculating ordering in load-aware assignment strategy | 0.3.0 | celeborn.slots.assign.loadAware.fetchTimeWeight | diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala index a0090474f5..c309dd216c 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala @@ -75,54 +75,53 @@ private[celeborn] class Master( metricsSystem.registerSource(new JVMCPUSource(conf, MetricsSystem.ROLE_MASTER)) metricsSystem.registerSource(new SystemMiscSource(conf, MetricsSystem.ROLE_MASTER)) - override val rpcEnv: RpcEnv = RpcEnv.create( - RpcNameConstants.MASTER_SYS, - masterArgs.host, - masterArgs.host, - masterArgs.port, - conf, - Math.max(64, Runtime.getRuntime.availableProcessors())) - - // Visible for testing - private[master] var internalRpcEnvInUse = rpcEnv - - if (conf.internalPortEnabled) { - val internalRpcEnv: RpcEnv = RpcEnv.create( - RpcNameConstants.MASTER_INTERNAL_SYS, - masterArgs.host, - masterArgs.host, - masterArgs.internalPort, - conf, - Math.max(64, Runtime.getRuntime.availableProcessors())) - logInfo( - s"Internal port enabled, using internal port ${masterArgs.internalPort} for internal RPC.") - internalRpcEnvInUse = internalRpcEnv - } - - private val rackResolver = new CelebornRackResolver(conf) private val authEnabled = conf.authEnabled private val secretRegistry = new SecretRegistryImpl() + + override val rpcEnv: RpcEnv = + if (!authEnabled) { + RpcEnv.create( + RpcNameConstants.MASTER_SYS, + masterArgs.host, + masterArgs.host, + masterArgs.port, + conf, + Math.max(64, Runtime.getRuntime.availableProcessors())) + } else { + val externalSecurityContext = new RpcSecurityContextBuilder() + .withServerSaslContext( + new ServerSaslContextBuilder() + .withAddRegistrationBootstrap(true) + .withSecretRegistry(secretRegistry).build()).build() + logInfo( + s"Secure port enabled ${masterArgs.port} for secured RPC.") + RpcEnv.create( + RpcNameConstants.MASTER_SYS, + masterArgs.host, + masterArgs.host, + masterArgs.port, + conf, + Math.max(64, Runtime.getRuntime.availableProcessors()), + Some(externalSecurityContext)) + } + // Visible for testing - private[master] var securedRpcEnv: RpcEnv = _ - if (authEnabled) { - val externalSecurityContext = new RpcSecurityContextBuilder() - .withServerSaslContext( - new ServerSaslContextBuilder() - .withAddRegistrationBootstrap(true) - .withSecretRegistry(secretRegistry).build()).build() - - securedRpcEnv = RpcEnv.create( - RpcNameConstants.MASTER_SECURED_SYS, - masterArgs.host, - masterArgs.host, - masterArgs.securedPort, - conf, - Math.max(64, Runtime.getRuntime.availableProcessors()), - Some(externalSecurityContext)) - logInfo( - s"Secure port enabled ${masterArgs.securedPort} for secured RPC.") - } + private[master] var internalRpcEnvInUse: RpcEnv = + if (!conf.internalPortEnabled) { + rpcEnv + } else { + logInfo( + s"Internal port enabled, using internal port ${masterArgs.internalPort} for internal RPC.") + RpcEnv.create( + RpcNameConstants.MASTER_INTERNAL_SYS, + masterArgs.host, + masterArgs.host, + masterArgs.internalPort, + conf, + Math.max(64, Runtime.getRuntime.availableProcessors())) + } + private val rackResolver = new CelebornRackResolver(conf) private val statusSystem = if (conf.haEnabled) { val sys = new HAMasterMetaManager(internalRpcEnvInUse, conf, rackResolver) @@ -254,16 +253,6 @@ private[celeborn] class Master( internalRpcEndpoint) } - // Visible for testing - private[master] var securedRpcEndpoint: RpcEndpoint = _ - private var securedRpcEndpointRef: RpcEndpointRef = _ - if (authEnabled) { - securedRpcEndpoint = new SecuredRpcEndpoint(this, securedRpcEnv, conf) - securedRpcEndpointRef = securedRpcEnv.setupEndpoint( - RpcNameConstants.MASTER_SECURED_EP, - securedRpcEndpoint) - } - // start threads to check timeout for workers and applications override def onStart(): Unit = { if (!threadsStarted.compareAndSet(false, true)) { @@ -387,22 +376,16 @@ private[celeborn] class Master( requestId, shouldResponse) => logDebug(s"Received heartbeat from app $appId") - if (checkAuthStatus(appId, context)) { - // TODO: [CELEBORN-1261] For the workers to be able to check whether an auth-enabled app is talking to it on - // unsecured port, Master will need to maintain a list of unauthenticated apps and send it to workers. - // This wasn't part of the original proposal because that proposal didn't target the Celeborn server to support - // both secured and unsecured communication. - executeWithLeaderChecker( + executeWithLeaderChecker( + context, + handleHeartbeatFromApplication( context, - handleHeartbeatFromApplication( - context, - appId, - totalWritten, - fileCount, - needCheckedWorkerList, - requestId, - shouldResponse)) - } + appId, + totalWritten, + fileCount, + needCheckedWorkerList, + requestId, + shouldResponse)) case pbRegisterWorker: PbRegisterWorker => val requestId = pbRegisterWorker.getRequestId @@ -437,31 +420,22 @@ private[celeborn] class Master( context.reply(ReleaseSlotsResponse(StatusCode.SUCCESS)) case requestSlots @ RequestSlots(applicationId, _, _, _, _, _, _, _, _, _, _) => - if (checkAuthStatus(applicationId, context)) { - // TODO: [CELEBORN-1261] - logTrace(s"Received RequestSlots request $requestSlots.") - executeWithLeaderChecker(context, handleRequestSlots(context, requestSlots)) - } + logTrace(s"Received RequestSlots request $requestSlots.") + executeWithLeaderChecker(context, handleRequestSlots(context, requestSlots)) case pb: PbUnregisterShuffle => val applicationId = pb.getAppId val shuffleId = pb.getShuffleId val requestId = pb.getRequestId - if (checkAuthStatus(applicationId, context)) { - // TODO: [CELEBORN-1261] - logDebug(s"Received UnregisterShuffle request $requestId, $applicationId, $shuffleId") - executeWithLeaderChecker( - context, - handleUnregisterShuffle(context, applicationId, shuffleId, requestId)) - } + logDebug(s"Received UnregisterShuffle request $requestId, $applicationId, $shuffleId") + executeWithLeaderChecker( + context, + handleUnregisterShuffle(context, applicationId, shuffleId, requestId)) case ApplicationLost(appId, requestId) => - if (context.senderAddress.equals(self.address) || checkAuthStatus(appId, context)) { - // TODO: [CELEBORN-1261] - logDebug( - s"Received ApplicationLost request $requestId, $appId from ${context.senderAddress}.") - executeWithLeaderChecker(context, handleApplicationLost(context, appId, requestId)) - } + logDebug( + s"Received ApplicationLost request $requestId, $appId from ${context.senderAddress}.") + executeWithLeaderChecker(context, handleApplicationLost(context, appId, requestId)) case HeartbeatFromWorker( host, @@ -526,8 +500,6 @@ private[celeborn] class Master( handleWorkerLost(context, host, rpcPort, pushPort, fetchPort, replicatePort, requestId)) case CheckQuota(userIdentifier) => - // TODO: CheckQuota doesn't have application id in it, so we can't check auth status here. - // Will have to add application id to CheckQuota message to check auth status. executeWithLeaderChecker(context, handleCheckQuota(userIdentifier, context)) case _: PbCheckWorkersAvailable => @@ -960,7 +932,7 @@ private[celeborn] class Master( } } - private[master] def handleHeartbeatFromApplication( + private def handleHeartbeatFromApplication( context: RpcCallContext, appId: String, totalWritten: Long, @@ -1090,16 +1062,6 @@ private[celeborn] class Master( }.asJava } - private def checkAuthStatus(appId: String, context: RpcCallContext): Boolean = { - if (conf.authEnabled && secretRegistry.isRegistered(appId)) { - context.sendFailure(new SecurityException( - s"Auth enabled application $appId sending messages on unsecured port!")) - false - } else { - true - } - } - override def getMasterGroupInfo: String = { val sb = new StringBuilder sb.append("====================== Master Group INFO ==============================\n") @@ -1305,9 +1267,6 @@ private[celeborn] class Master( if (conf.internalPortEnabled) { internalRpcEnvInUse.awaitTermination() } - if (authEnabled) { - securedRpcEnv.awaitTermination() - } } override def stop(exitKind: Int): Unit = synchronized { @@ -1317,9 +1276,6 @@ private[celeborn] class Master( if (conf.internalPortEnabled) { internalRpcEnvInUse.stop(internalRpcEndpointRef) } - if (authEnabled) { - securedRpcEnv.stop(securedRpcEndpointRef) - } super.stop(exitKind) logInfo("Master stopped.") stopped = true diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterArguments.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterArguments.scala index 95dda30b41..701116a2e2 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterArguments.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterArguments.scala @@ -28,7 +28,6 @@ class MasterArguments(args: Array[String], conf: CelebornConf) { private var _host: Option[String] = None private var _port: Option[Int] = None private var _internalPort: Option[Int] = None - private var _securedPort: Option[Int] = None private var _propertiesFile: Option[String] = None private var _masterClusterInfo: Option[MasterClusterInfo] = None @@ -48,15 +47,11 @@ class MasterArguments(args: Array[String], conf: CelebornConf) { _internalPort = _internalPort.orElse { if (conf.internalPortEnabled) Some(conf.haMasterNodeInternalPort(localNode.nodeId)) else None } - _securedPort = _securedPort.orElse { - if (conf.authEnabled) Some(conf.haMasterNodeSecuredPort(localNode.nodeId)) else None - } _masterClusterInfo = Some(clusterInfo) } else { _host = _host.orElse(Some(conf.masterHost)) _port = _port.orElse(Some(conf.masterPort)) _internalPort = _internalPort.orElse(Some(conf.masterInternalPort)) - _securedPort = _securedPort.orElse(Some(conf.masterSecuredPort)) } def host: String = _host.get @@ -65,8 +60,6 @@ class MasterArguments(args: Array[String], conf: CelebornConf) { def internalPort: Int = _internalPort.get - def securedPort: Int = _securedPort.get - def masterClusterInfo: Option[MasterClusterInfo] = _masterClusterInfo @tailrec @@ -84,10 +77,6 @@ class MasterArguments(args: Array[String], conf: CelebornConf) { _internalPort = Some(value) parse(tail) - case ("--secured-port") :: IntParam(value) :: tail => - _securedPort = Some(value) - parse(tail) - case "--properties-file" :: value :: tail => _propertiesFile = Some(value) parse(tail) @@ -113,7 +102,6 @@ class MasterArguments(args: Array[String], conf: CelebornConf) { | -h HOST, --host HOST Hostname to listen on | -p PORT, --port PORT Port to listen on (default: 9097) | --internal-port PORT Internal port for internal communication (default: 8097) - | --secured-port PORT Secured port for secured communication (default: 19097) | --properties-file FILE Path to a custom Celeborn properties file, | default is conf/celeborn-defaults.conf. |""".stripMargin) diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/SecuredRpcEndpoint.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/SecuredRpcEndpoint.scala deleted file mode 100644 index e8b1eb9669..0000000000 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/SecuredRpcEndpoint.scala +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.celeborn.service.deploy.master - -import org.apache.celeborn.common.CelebornConf -import org.apache.celeborn.common.internal.Logging -import org.apache.celeborn.common.protocol.PbUnregisterShuffle -import org.apache.celeborn.common.protocol.message.ControlMessages.{ApplicationLost, CheckQuota, HeartbeatFromApplication, RequestSlots} -import org.apache.celeborn.common.rpc._ - -/** - * Secured RPC endpoint used by the Master to communicate with the Clients. - */ -private[celeborn] class SecuredRpcEndpoint( - val master: Master, - override val rpcEnv: RpcEnv, - val conf: CelebornConf) - extends RpcEndpoint with Logging { - - // start threads to check timeout for workers and applications - override def onStart(): Unit = { - master.onStart() - } - - override def onStop(): Unit = { - master.onStop() - } - - override def onDisconnected(address: RpcAddress): Unit = { - logDebug(s"Client $address got disconnected.") - } - - override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { - case HeartbeatFromApplication( - appId, - totalWritten, - fileCount, - needCheckedWorkerList, - requestId, - shouldResponse) => - logDebug(s"Received heartbeat from app $appId") - master.executeWithLeaderChecker( - context, - master.handleHeartbeatFromApplication( - context, - appId, - totalWritten, - fileCount, - needCheckedWorkerList, - requestId, - shouldResponse)) - - case requestSlots @ RequestSlots(_, _, _, _, _, _, _, _, _, _, _) => - logTrace(s"Received RequestSlots request $requestSlots.") - master.executeWithLeaderChecker(context, master.handleRequestSlots(context, requestSlots)) - - case pb: PbUnregisterShuffle => - val applicationId = pb.getAppId - val shuffleId = pb.getShuffleId - val requestId = pb.getRequestId - logDebug(s"Received UnregisterShuffle request $requestId, $applicationId, $shuffleId") - master.executeWithLeaderChecker( - context, - master.handleUnregisterShuffle(context, applicationId, shuffleId, requestId)) - - case ApplicationLost(appId, requestId) => - logDebug( - s"Received ApplicationLost request $requestId, $appId from ${context.senderAddress}.") - master.executeWithLeaderChecker( - context, - master.handleApplicationLost(context, appId, requestId)) - - case CheckQuota(userIdentifier) => - master.executeWithLeaderChecker(context, master.handleCheckQuota(userIdentifier, context)) - } -} diff --git a/master/src/test/scala/org/apache/celeborn/service/deploy/master/MasterSuite.scala b/master/src/test/scala/org/apache/celeborn/service/deploy/master/MasterSuite.scala index 62e733691f..26ea0b0407 100644 --- a/master/src/test/scala/org/apache/celeborn/service/deploy/master/MasterSuite.scala +++ b/master/src/test/scala/org/apache/celeborn/service/deploy/master/MasterSuite.scala @@ -97,44 +97,4 @@ class MasterSuite extends AnyFunSuite master.rpcEnv.shutdown() master.internalRpcEnvInUse.shutdown() } - - test("test secured port receives") { - val conf = new CelebornConf() - conf.set(CelebornConf.HA_ENABLED.key, "false") - conf.set(CelebornConf.HA_MASTER_RATIS_STORAGE_DIR.key, getTmpDir()) - conf.set(CelebornConf.WORKER_STORAGE_DIRS.key, getTmpDir()) - conf.set(CelebornConf.METRICS_ENABLED.key, "true") - conf.set(CelebornConf.INTERNAL_PORT_ENABLED.key, "true") - conf.set(CelebornConf.AUTH_ENABLED.key, "true") - - val args = - Array("-h", "localhost", "-p", "9097", "--internal-port", "8097", "--secured-port", "19097") - - val masterArgs = new MasterArguments(args, conf) - val master = new Master(conf, masterArgs) - new Thread() { - override def run(): Unit = { - master.initialize() - } - }.start() - Thread.sleep(5000L) - master.securedRpcEndpoint.receiveAndReply( - mock(classOf[org.apache.celeborn.common.rpc.RpcCallContext])) - .applyOrElse( - HeartbeatFromApplication("appId", 0L, 0L, null), - (_: Any) => fail("Unexpected message")) - master.securedRpcEndpoint.receiveAndReply( - mock(classOf[org.apache.celeborn.common.rpc.RpcCallContext])) - .applyOrElse(ApplicationLost("appId"), (_: Any) => fail("Unexpected message")) - - assertThrows[scala.MatchError] { - master.securedRpcEndpoint.receiveAndReply( - mock(classOf[org.apache.celeborn.common.rpc.RpcCallContext]))( - PbRegisterWorker.newBuilder().build()) - } - master.stop(CelebornExitKind.EXIT_IMMEDIATELY) - master.rpcEnv.shutdown() - master.internalRpcEnvInUse.shutdown() - master.securedRpcEnv.shutdown() - } }