Skip to content

Commit

Permalink
[KYUUBI #5606][REST] Handle engine listing request properly for users…
Browse files Browse the repository at this point in the history
… who have not created engine

### _Why are the changes needed?_

Close #5606

```
2023-11-03 19:30:13.215 ERROR KyuubiRestFrontendService-57 org.apache.kyuubi.server.api.v1.AdminResource: No such engine for user: anonymous, engine type: SPARK_SQL, share level: USER, subdomain: null
org.apache.kyuubi.shaded.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /kyuubi_1.8.1-SNAPSHOT_USER_SPARK_SQL/anonymous
	at org.apache.kyuubi.shaded.zookeeper.KeeperException.create(KeeperException.java:114)
	at org.apache.kyuubi.shaded.zookeeper.KeeperException.create(KeeperException.java:54)
	at org.apache.kyuubi.shaded.zookeeper.ZooKeeper.getChildren(ZooKeeper.java:1659)
	at org.apache.kyuubi.shaded.curator.framework.imps.GetChildrenBuilderImpl$3.call(GetChildrenBuilderImpl.java:230)
	at org.apache.kyuubi.shaded.curator.framework.imps.GetChildrenBuilderImpl$3.call(GetChildrenBuilderImpl.java:219)
	at org.apache.kyuubi.shaded.curator.RetryLoop.callWithRetry(RetryLoop.java:109)
	at org.apache.kyuubi.shaded.curator.framework.imps.GetChildrenBuilderImpl.pathInForeground(GetChildrenBuilderImpl.java:216)
	at org.apache.kyuubi.shaded.curator.framework.imps.GetChildrenBuilderImpl.forPath(GetChildrenBuilderImpl.java:207)
	at org.apache.kyuubi.shaded.curator.framework.imps.GetChildrenBuilderImpl.forPath(GetChildrenBuilderImpl.java:40)
	at org.apache.kyuubi.ha.client.zookeeper.ZookeeperDiscoveryClient.getChildren(ZookeeperDiscoveryClient.scala:82)
	at org.apache.kyuubi.server.api.v1.AdminResource.$anonfun$listEngines$5(AdminResource.scala:306)
	at org.apache.kyuubi.ha.client.DiscoveryClientProvider$.withDiscoveryClient(DiscoveryClientProvider.scala:36)
	at org.apache.kyuubi.server.api.v1.AdminResource.listEngines(AdminResource.scala:304)
```

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request

### _Was this patch authored or co-authored using generative AI tooling?_

No

Closes #5619 from pan3793/list-engine.

Closes #5606

783894b [Cheng Pan] nit
52eb538 [Cheng Pan] nit
8d8097f [Cheng Pan] [KYUUBI #5606][REST] Handle engine listing request properly for users who have not created engine

Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: Cheng Pan <chengpan@apache.org>
(cherry picked from commit 7926cc6)
Signed-off-by: Cheng Pan <chengpan@apache.org>
  • Loading branch information
pan3793 committed Nov 3, 2023
1 parent 0353706 commit 6298657
Showing 1 changed file with 24 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import org.apache.kyuubi.operation.{KyuubiOperation, OperationHandle}
import org.apache.kyuubi.server.KyuubiServer
import org.apache.kyuubi.server.api.{ApiRequestContext, ApiUtils}
import org.apache.kyuubi.session.{KyuubiSession, SessionHandle}
import org.apache.kyuubi.shaded.zookeeper.KeeperException.NoNodeException

@Tag(name = "Admin")
@Produces(Array(MediaType.APPLICATION_JSON))
Expand Down Expand Up @@ -252,8 +251,8 @@ private[v1] class AdminResource extends ApiRequestContext with Logging {
} else {
fe.getSessionUser(hs2ProxyUser)
}
val engine = getEngine(userName, engineType, shareLevel, subdomain, "default")
val engineSpace = getEngineSpace(engine)
val engine = normalizeEngineInfo(userName, engineType, shareLevel, subdomain, "default")
val engineSpace = calculateEngineSpace(engine)

withDiscoveryClient(fe.getConf) { discoveryClient =>
val engineNodes = discoveryClient.getChildren(engineSpace)
Expand Down Expand Up @@ -290,33 +289,24 @@ private[v1] class AdminResource extends ApiRequestContext with Logging {
} else {
fe.getSessionUser(hs2ProxyUser)
}
val engine = getEngine(userName, engineType, shareLevel, subdomain, "")
val engineSpace = getEngineSpace(engine)
val engine = normalizeEngineInfo(userName, engineType, shareLevel, subdomain, "")
val engineSpace = calculateEngineSpace(engine)

val engineNodes = ListBuffer[ServiceNodeInfo]()
Option(subdomain).filter(_.nonEmpty) match {
case Some(_) =>
withDiscoveryClient(fe.getConf) { discoveryClient =>
info(s"Listing engine nodes for $engineSpace")
withDiscoveryClient(fe.getConf) { discoveryClient =>
Option(subdomain).filter(_.nonEmpty) match {
case Some(_) =>
info(s"Listing engine nodes under $engineSpace")
engineNodes ++= discoveryClient.getServiceNodesInfo(engineSpace)
}
case None =>
withDiscoveryClient(fe.getConf) { discoveryClient =>
try {
discoveryClient.getChildren(engineSpace).map { child =>
info(s"Listing engine nodes for $engineSpace/$child")
engineNodes ++= discoveryClient.getServiceNodesInfo(s"$engineSpace/$child")
}
} catch {
case nne: NoNodeException =>
error(
s"No such engine for user: $userName, " +
s"engine type: $engineType, share level: $shareLevel, subdomain: $subdomain",
nne)
throw new NotFoundException(s"No such engine for user: $userName, " +
s"engine type: $engineType, share level: $shareLevel, subdomain: $subdomain")
case None if discoveryClient.pathNonExists(engineSpace) =>
warn(s"Path $engineSpace does not exist. user: $userName, engine type: $engineType, " +
s"share level: $shareLevel, subdomain: $subdomain")
case None =>
discoveryClient.getChildren(engineSpace).map { child =>
info(s"Listing engine nodes under $engineSpace/$child")
engineNodes ++= discoveryClient.getServiceNodesInfo(s"$engineSpace/$child")
}
}
}
}
engineNodes.map(node =>
new Engine(
Expand Down Expand Up @@ -360,7 +350,7 @@ private[v1] class AdminResource extends ApiRequestContext with Logging {
servers.toSeq
}

private def getEngine(
private def normalizeEngineInfo(
userName: String,
engineType: String,
shareLevel: String,
Expand All @@ -373,6 +363,7 @@ private[v1] class AdminResource extends ApiRequestContext with Logging {
.foreach(_ => clonedConf.set(ENGINE_SHARE_LEVEL_SUBDOMAIN, Option(subdomain)))
Option(shareLevel).filter(_.nonEmpty).foreach(clonedConf.set(ENGINE_SHARE_LEVEL, _))

val serverSpace = clonedConf.get(HA_NAMESPACE)
val normalizedEngineType = clonedConf.get(ENGINE_TYPE)
val engineSubdomain = clonedConf.get(ENGINE_SHARE_LEVEL_SUBDOMAIN).getOrElse(subdomainDefault)
val engineShareLevel = clonedConf.get(ENGINE_SHARE_LEVEL)
Expand All @@ -384,22 +375,20 @@ private[v1] class AdminResource extends ApiRequestContext with Logging {
engineShareLevel,
engineSubdomain,
null,
null,
serverSpace,
Collections.emptyMap())
}

private def getEngineSpace(engine: Engine): String = {
val serverSpace = fe.getConf.get(HA_NAMESPACE)
val appUser = engine.getSharelevel match {
private def calculateEngineSpace(engine: Engine): String = {
val userOrGroup = engine.getSharelevel match {
case "GROUP" =>
fe.sessionManager.groupProvider.primaryGroup(engine.getUser, fe.getConf.getAll.asJava)
case _ => engine.getUser
}

DiscoveryPaths.makePath(
s"${serverSpace}_${engine.getVersion}_${engine.getSharelevel}_${engine.getEngineType}",
appUser,
engine.getSubdomain)
val engineSpace =
s"${engine.getNamespace}_${engine.getVersion}_${engine.getSharelevel}_${engine.getEngineType}"
DiscoveryPaths.makePath(engineSpace, userOrGroup, engine.getSubdomain)
}

@ApiResponse(
Expand Down

0 comments on commit 6298657

Please sign in to comment.