Skip to content

Commit

Permalink
[CELEBORN-1257][FOLLOWUP] Removed the additional secured port from Ce…
Browse files Browse the repository at this point in the history
…leborn Master

### What changes were proposed in this pull request?
#2292 (comment)
Based on the above discussion, removing the additional secured port. The existing port will be used for secured communication when auth is enabled.

### Why are the changes needed?
These changes are for enabling authentication

### Does this PR introduce _any_ user-facing change?
Yes.

### How was this patch tested?
This removed additional secured port.

Closes #2327 from otterc/CELEBORN-1257.

Authored-by: Chandni Singh <singh.chandni@gmail.com>
Signed-off-by: waitinfuture <zky.zhoukeyong@alibaba-inc.com>
  • Loading branch information
otterc authored and waitinfuture committed Feb 24, 2024
1 parent ff0cf15 commit 9185cae
Show file tree
Hide file tree
Showing 10 changed files with 68 additions and 319 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -266,24 +266,13 @@ private RpcEndpointRef setupEndpointRef(String endpoint) {
}

private List<String> resolveMasterEndpoints() {
if (isWorker) {
if (isWorker && conf.internalPortEnabled()) {
// For worker, we should use the internal endpoints if internal port is enabled.
if (conf.internalPortEnabled()) {
masterEndpointName = RpcNameConstants.MASTER_INTERNAL_EP;
return Arrays.asList(conf.masterInternalEndpoints());
} else {
masterEndpointName = RpcNameConstants.MASTER_EP;
return Arrays.asList(conf.masterEndpoints());
}
masterEndpointName = RpcNameConstants.MASTER_INTERNAL_EP;
return Arrays.asList(conf.masterInternalEndpoints());
} else {
// This is for client, so we should use the secured endpoints if auth is enabled.
if (conf.authEnabled()) {
masterEndpointName = RpcNameConstants.MASTER_SECURED_EP;
return Arrays.asList(conf.masterSecuredEndpoints());
} else {
masterEndpointName = RpcNameConstants.MASTER_EP;
return Arrays.asList(conf.masterEndpoints());
}
masterEndpointName = RpcNameConstants.MASTER_EP;
return Arrays.asList(conf.masterEndpoints());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,10 @@ public class RpcNameConstants {
// For Master
public static String MASTER_SYS = "Master";
public static String MASTER_INTERNAL_SYS = "MasterInternal";
public static String MASTER_SECURED_SYS = "MasterSecured";

// Master Endpoint Name
public static String MASTER_EP = "MasterEndpoint";
public static String MASTER_INTERNAL_EP = "MasterInternalEndpoint";
public static String MASTER_SECURED_EP = "MasterSecuredEndpoint";

// For Worker
public static String WORKER_SYS = "Worker";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1142,21 +1142,6 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
return authEnabled && internalPortEnabled
}

def haMasterNodeSecuredPort(nodeId: String): Int = {
val key = HA_MASTER_NODE_SECURED_PORT.key.replace("<id>", nodeId)
getInt(key, HA_MASTER_NODE_SECURED_PORT.defaultValue.get)
}

def masterSecuredPort: Int = get(MASTER_SECURED_PORT)

def masterSecuredEndpoints: Array[String] =
get(MASTER_SECURED_ENDPOINTS).toArray.map { endpoint =>
Utils.parseHostPort(endpoint.replace("<localhost>", Utils.localHostName(this))) match {
case (host, 0) => s"$host:${HA_MASTER_NODE_SECURED_PORT.defaultValue.get}"
case (host, port) => s"$host:$port"
}
}

// //////////////////////////////////////////////////////
// Internal Port //
// //////////////////////////////////////////////////////
Expand Down Expand Up @@ -4620,38 +4605,4 @@ object CelebornConf extends Logging {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("30s")

val MASTER_SECURED_PORT: ConfigEntry[Int] =
buildConf("celeborn.master.secured.port")
.categories("master", "auth")
.version("0.5.0")
.doc(
"Secured port on the master where clients connect.")
.intConf
.checkValue(p => p >= 1024 && p < 65535, "Invalid port")
.createWithDefault(19097)

val HA_MASTER_NODE_SECURED_PORT: ConfigEntry[Int] =
buildConf("celeborn.master.ha.node.<id>.secured.port")
.categories("ha", "auth")
.doc(
"Secured port for the clients to bind to a master node <id> in HA mode.")
.version("0.5.0")
.intConf
.checkValue(p => p >= 1024 && p < 65535, "Invalid port")
.createWithDefault(19097)

val MASTER_SECURED_ENDPOINTS: ConfigEntry[Seq[String]] =
buildConf("celeborn.master.secured.endpoints")
.categories("client", "auth")
.doc("Endpoints of master nodes for celeborn client to connect for secured communication, allowed pattern " +
"is: `<host1>:<port1>[,<host2>:<port2>]*`, e.g. `clb1:19097,clb2:19097,clb3:19097`. " +
"If the port is omitted, 19097 will be used.")
.version("0.5.0")
.stringConf
.toSequence
.checkValue(
endpoints => endpoints.map(_ => Try(Utils.parseHostPort(_))).forall(_.isSuccess),
"Allowed pattern is: `<host1>:<port1>[,<host2>:<port2>]*`")
.createWithDefaultString(s"<localhost>:19097")

}
1 change: 0 additions & 1 deletion docs/configuration/client.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ license: |
| celeborn.client.spark.shuffle.forceFallback.numPartitionsThreshold | 2147483647 | false | Celeborn will only accept shuffle of partition number lower than this configuration value. | 0.3.0 | celeborn.shuffle.forceFallback.numPartitionsThreshold |
| celeborn.client.spark.shuffle.writer | HASH | false | Celeborn supports the following kind of shuffle writers. 1. hash: hash-based shuffle writer works fine when shuffle partition count is normal; 2. sort: sort-based shuffle writer works fine when memory pressure is high or shuffle partition count is huge. This configuration only takes effect when celeborn.client.spark.push.dynamicWriteMode.enabled is false. | 0.3.0 | celeborn.shuffle.writer |
| celeborn.master.endpoints | &lt;localhost&gt;:9097 | false | Endpoints of master nodes for celeborn client to connect, allowed pattern is: `<host1>:<port1>[,<host2>:<port2>]*`, e.g. `clb1:9097,clb2:9098,clb3:9099`. If the port is omitted, 9097 will be used. | 0.2.0 | |
| celeborn.master.secured.endpoints | &lt;localhost&gt;:19097 | false | Endpoints of master nodes for celeborn client to connect for secured communication, allowed pattern is: `<host1>:<port1>[,<host2>:<port2>]*`, e.g. `clb1:19097,clb2:19097,clb3:19097`. If the port is omitted, 19097 will be used. | 0.5.0 | |
| celeborn.storage.availableTypes | HDD | false | Enabled storages. Available options: MEMORY,HDD,SSD,HDFS. Note: HDD and SSD would be treated as identical. | 0.3.0 | celeborn.storage.activeTypes |
| celeborn.storage.hdfs.dir | &lt;undefined&gt; | false | HDFS base directory for Celeborn to store shuffle data. | 0.2.0 | |
<!--end-include-->
1 change: 0 additions & 1 deletion docs/configuration/ha.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ license: |
| celeborn.master.ha.node.&lt;id&gt;.internal.port | 8097 | false | Internal port for the workers and other masters to bind to a master node <id> in HA mode. | 0.5.0 | |
| celeborn.master.ha.node.&lt;id&gt;.port | 9097 | false | Port to bind of master node <id> in HA mode. | 0.3.0 | celeborn.ha.master.node.&lt;id&gt;.port |
| celeborn.master.ha.node.&lt;id&gt;.ratis.port | 9872 | false | Ratis port to bind of master node <id> in HA mode. | 0.3.0 | celeborn.ha.master.node.&lt;id&gt;.ratis.port |
| celeborn.master.ha.node.&lt;id&gt;.secured.port | 19097 | false | Secured port for the clients to bind to a master node <id> in HA mode. | 0.5.0 | |
| celeborn.master.ha.ratis.raft.rpc.type | netty | false | RPC type for Ratis, available options: netty, grpc. | 0.3.0 | celeborn.ha.master.ratis.raft.rpc.type |
| celeborn.master.ha.ratis.raft.server.storage.dir | /tmp/ratis | false | | 0.3.0 | celeborn.ha.master.ratis.raft.server.storage.dir |
<!--end-include-->
1 change: 0 additions & 1 deletion docs/configuration/master.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ license: |
| celeborn.master.internal.port | 8097 | false | Internal port on the master where both workers and other master nodes connect. | 0.5.0 | |
| celeborn.master.port | 9097 | false | Port for master to bind. | 0.2.0 | |
| celeborn.master.rackResolver.refresh.interval | 30s | false | Interval for refreshing the node rack information periodically. | 0.5.0 | |
| celeborn.master.secured.port | 19097 | false | Secured port on the master where clients connect. | 0.5.0 | |
| celeborn.master.slot.assign.extraSlots | 2 | false | Extra slots number when master assign slots. | 0.3.0 | celeborn.slots.assign.extraSlots |
| celeborn.master.slot.assign.loadAware.diskGroupGradient | 0.1 | false | This value means how many more workload will be placed into a faster disk group than a slower group. | 0.3.0 | celeborn.slots.assign.loadAware.diskGroupGradient |
| celeborn.master.slot.assign.loadAware.fetchTimeWeight | 1.0 | false | Weight of average fetch time when calculating ordering in load-aware assignment strategy | 0.3.0 | celeborn.slots.assign.loadAware.fetchTimeWeight |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,54 +75,53 @@ private[celeborn] class Master(
metricsSystem.registerSource(new JVMCPUSource(conf, MetricsSystem.ROLE_MASTER))
metricsSystem.registerSource(new SystemMiscSource(conf, MetricsSystem.ROLE_MASTER))

override val rpcEnv: RpcEnv = RpcEnv.create(
RpcNameConstants.MASTER_SYS,
masterArgs.host,
masterArgs.host,
masterArgs.port,
conf,
Math.max(64, Runtime.getRuntime.availableProcessors()))

// Visible for testing
private[master] var internalRpcEnvInUse = rpcEnv

if (conf.internalPortEnabled) {
val internalRpcEnv: RpcEnv = RpcEnv.create(
RpcNameConstants.MASTER_INTERNAL_SYS,
masterArgs.host,
masterArgs.host,
masterArgs.internalPort,
conf,
Math.max(64, Runtime.getRuntime.availableProcessors()))
logInfo(
s"Internal port enabled, using internal port ${masterArgs.internalPort} for internal RPC.")
internalRpcEnvInUse = internalRpcEnv
}

private val rackResolver = new CelebornRackResolver(conf)
private val authEnabled = conf.authEnabled
private val secretRegistry = new SecretRegistryImpl()

override val rpcEnv: RpcEnv =
if (!authEnabled) {
RpcEnv.create(
RpcNameConstants.MASTER_SYS,
masterArgs.host,
masterArgs.host,
masterArgs.port,
conf,
Math.max(64, Runtime.getRuntime.availableProcessors()))
} else {
val externalSecurityContext = new RpcSecurityContextBuilder()
.withServerSaslContext(
new ServerSaslContextBuilder()
.withAddRegistrationBootstrap(true)
.withSecretRegistry(secretRegistry).build()).build()
logInfo(
s"Secure port enabled ${masterArgs.port} for secured RPC.")
RpcEnv.create(
RpcNameConstants.MASTER_SYS,
masterArgs.host,
masterArgs.host,
masterArgs.port,
conf,
Math.max(64, Runtime.getRuntime.availableProcessors()),
Some(externalSecurityContext))
}

// Visible for testing
private[master] var securedRpcEnv: RpcEnv = _
if (authEnabled) {
val externalSecurityContext = new RpcSecurityContextBuilder()
.withServerSaslContext(
new ServerSaslContextBuilder()
.withAddRegistrationBootstrap(true)
.withSecretRegistry(secretRegistry).build()).build()

securedRpcEnv = RpcEnv.create(
RpcNameConstants.MASTER_SECURED_SYS,
masterArgs.host,
masterArgs.host,
masterArgs.securedPort,
conf,
Math.max(64, Runtime.getRuntime.availableProcessors()),
Some(externalSecurityContext))
logInfo(
s"Secure port enabled ${masterArgs.securedPort} for secured RPC.")
}
private[master] var internalRpcEnvInUse: RpcEnv =
if (!conf.internalPortEnabled) {
rpcEnv
} else {
logInfo(
s"Internal port enabled, using internal port ${masterArgs.internalPort} for internal RPC.")
RpcEnv.create(
RpcNameConstants.MASTER_INTERNAL_SYS,
masterArgs.host,
masterArgs.host,
masterArgs.internalPort,
conf,
Math.max(64, Runtime.getRuntime.availableProcessors()))
}

private val rackResolver = new CelebornRackResolver(conf)
private val statusSystem =
if (conf.haEnabled) {
val sys = new HAMasterMetaManager(internalRpcEnvInUse, conf, rackResolver)
Expand Down Expand Up @@ -254,16 +253,6 @@ private[celeborn] class Master(
internalRpcEndpoint)
}

// Visible for testing
private[master] var securedRpcEndpoint: RpcEndpoint = _
private var securedRpcEndpointRef: RpcEndpointRef = _
if (authEnabled) {
securedRpcEndpoint = new SecuredRpcEndpoint(this, securedRpcEnv, conf)
securedRpcEndpointRef = securedRpcEnv.setupEndpoint(
RpcNameConstants.MASTER_SECURED_EP,
securedRpcEndpoint)
}

// start threads to check timeout for workers and applications
override def onStart(): Unit = {
if (!threadsStarted.compareAndSet(false, true)) {
Expand Down Expand Up @@ -387,22 +376,16 @@ private[celeborn] class Master(
requestId,
shouldResponse) =>
logDebug(s"Received heartbeat from app $appId")
if (checkAuthStatus(appId, context)) {
// TODO: [CELEBORN-1261] For the workers to be able to check whether an auth-enabled app is talking to it on
// unsecured port, Master will need to maintain a list of unauthenticated apps and send it to workers.
// This wasn't part of the original proposal because that proposal didn't target the Celeborn server to support
// both secured and unsecured communication.
executeWithLeaderChecker(
executeWithLeaderChecker(
context,
handleHeartbeatFromApplication(
context,
handleHeartbeatFromApplication(
context,
appId,
totalWritten,
fileCount,
needCheckedWorkerList,
requestId,
shouldResponse))
}
appId,
totalWritten,
fileCount,
needCheckedWorkerList,
requestId,
shouldResponse))

case pbRegisterWorker: PbRegisterWorker =>
val requestId = pbRegisterWorker.getRequestId
Expand Down Expand Up @@ -437,31 +420,22 @@ private[celeborn] class Master(
context.reply(ReleaseSlotsResponse(StatusCode.SUCCESS))

case requestSlots @ RequestSlots(applicationId, _, _, _, _, _, _, _, _, _, _) =>
if (checkAuthStatus(applicationId, context)) {
// TODO: [CELEBORN-1261]
logTrace(s"Received RequestSlots request $requestSlots.")
executeWithLeaderChecker(context, handleRequestSlots(context, requestSlots))
}
logTrace(s"Received RequestSlots request $requestSlots.")
executeWithLeaderChecker(context, handleRequestSlots(context, requestSlots))

case pb: PbUnregisterShuffle =>
val applicationId = pb.getAppId
val shuffleId = pb.getShuffleId
val requestId = pb.getRequestId
if (checkAuthStatus(applicationId, context)) {
// TODO: [CELEBORN-1261]
logDebug(s"Received UnregisterShuffle request $requestId, $applicationId, $shuffleId")
executeWithLeaderChecker(
context,
handleUnregisterShuffle(context, applicationId, shuffleId, requestId))
}
logDebug(s"Received UnregisterShuffle request $requestId, $applicationId, $shuffleId")
executeWithLeaderChecker(
context,
handleUnregisterShuffle(context, applicationId, shuffleId, requestId))

case ApplicationLost(appId, requestId) =>
if (context.senderAddress.equals(self.address) || checkAuthStatus(appId, context)) {
// TODO: [CELEBORN-1261]
logDebug(
s"Received ApplicationLost request $requestId, $appId from ${context.senderAddress}.")
executeWithLeaderChecker(context, handleApplicationLost(context, appId, requestId))
}
logDebug(
s"Received ApplicationLost request $requestId, $appId from ${context.senderAddress}.")
executeWithLeaderChecker(context, handleApplicationLost(context, appId, requestId))

case HeartbeatFromWorker(
host,
Expand Down Expand Up @@ -526,8 +500,6 @@ private[celeborn] class Master(
handleWorkerLost(context, host, rpcPort, pushPort, fetchPort, replicatePort, requestId))

case CheckQuota(userIdentifier) =>
// TODO: CheckQuota doesn't have application id in it, so we can't check auth status here.
// Will have to add application id to CheckQuota message to check auth status.
executeWithLeaderChecker(context, handleCheckQuota(userIdentifier, context))

case _: PbCheckWorkersAvailable =>
Expand Down Expand Up @@ -960,7 +932,7 @@ private[celeborn] class Master(
}
}

private[master] def handleHeartbeatFromApplication(
private def handleHeartbeatFromApplication(
context: RpcCallContext,
appId: String,
totalWritten: Long,
Expand Down Expand Up @@ -1090,16 +1062,6 @@ private[celeborn] class Master(
}.asJava
}

private def checkAuthStatus(appId: String, context: RpcCallContext): Boolean = {
if (conf.authEnabled && secretRegistry.isRegistered(appId)) {
context.sendFailure(new SecurityException(
s"Auth enabled application $appId sending messages on unsecured port!"))
false
} else {
true
}
}

override def getMasterGroupInfo: String = {
val sb = new StringBuilder
sb.append("====================== Master Group INFO ==============================\n")
Expand Down Expand Up @@ -1305,9 +1267,6 @@ private[celeborn] class Master(
if (conf.internalPortEnabled) {
internalRpcEnvInUse.awaitTermination()
}
if (authEnabled) {
securedRpcEnv.awaitTermination()
}
}

override def stop(exitKind: Int): Unit = synchronized {
Expand All @@ -1317,9 +1276,6 @@ private[celeborn] class Master(
if (conf.internalPortEnabled) {
internalRpcEnvInUse.stop(internalRpcEndpointRef)
}
if (authEnabled) {
securedRpcEnv.stop(securedRpcEndpointRef)
}
super.stop(exitKind)
logInfo("Master stopped.")
stopped = true
Expand Down
Loading

0 comments on commit 9185cae

Please sign in to comment.