From fdbd619d4de0deb4e24616403d3bed84c9a89352 Mon Sep 17 00:00:00 2001 From: Asif Khatri Date: Mon, 24 Jun 2024 15:24:41 +0530 Subject: [PATCH] LIVY-702: Submit Spark apps to Kubernetes Co-authored-by: Alex Sasnouskikh --- conf/livy.conf.template | 55 ++ pom.xml | 13 + .../org/apache/livy/rsc/driver/RSCDriver.java | 7 + server/pom.xml | 5 + .../livy/server/ui/static/css/livy-ui.css | 6 + .../livy/server/ui/static/js/all-sessions.js | 4 +- .../livy/server/ui/static/js/livy-ui.js | 27 +- .../livy/server/ui/static/js/session.js | 14 +- .../main/scala/org/apache/livy/LivyConf.scala | 60 ++ .../org/apache/livy/server/LivyServer.scala | 12 +- .../interactive/InteractiveSession.scala | 10 +- .../apache/livy/sessions/SessionManager.scala | 4 +- .../org/apache/livy/utils/SparkApp.scala | 27 +- .../livy/utils/SparkKubernetesApp.scala | 739 ++++++++++++++++++ .../livy/utils/SparkKubernetesAppSpec.scala | 242 ++++++ 15 files changed, 1206 insertions(+), 19 deletions(-) create mode 100644 server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala create mode 100644 server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala diff --git a/conf/livy.conf.template b/conf/livy.conf.template index e99251d02..a5e47a9db 100644 --- a/conf/livy.conf.template +++ b/conf/livy.conf.template @@ -200,3 +200,58 @@ # livy.server.hdfs.safe-mode.interval = 5 # value specifies max attempts to retry when safe mode is ON in hdfs filesystem # livy.server.hdfs.safe-mode.max.retry.attempts = 10 + +# Manual authentication to KubeApiserver (by default configured with Kubernetes ServiceAccount +# if deployed to Kubernetes cluster as a Pod) +# Kubernetes oauth token file path +# livy.server.kubernetes.oauthTokenFile = +# Kubernetes oauth token string value +# livy.server.kubernetes.oauthTokenValue = +# Kubernetes CA cert file path +# livy.server.kubernetes.caCertFile = +# Kubernetes client key file path +# livy.server.kubernetes.clientKeyFile = +# Kubernetes client cert file path +# livy.server.kubernetes.clientCertFile = + +# If Livy can't find the Kubernetes app within this time, consider it lost. +# livy.server.kubernetes.app-lookup-timeout = 600s +# When the cluster is busy, we may fail to launch yarn app in app-lookup-timeout, then it would +# cause session leakage, so we need to check session leakage. +# How long to check livy session leakage +# livy.server.kubernetes.app-leakage.check-timeout = 600s +# How often to check livy session leakage +# livy.server.kubernetes.app-leakage.check-interval = 60s + +# How often Livy polls KubeApiServer to refresh KubernetesApp state (Pods state, logs, description +# details, routes, etc...) +# livy.server.kubernetes.poll-interval = 15s + +# Weather to create Kubernetes Nginx Ingress for Spark UI. If set to true, configure the desired +# options below +# livy.server.kubernetes.ingress.create = false +# Kubernetes Nginx Ingress protocol. If set to https refer Ingress TLS section below +# livy.server.kubernetes.ingress.protocol = http +# Kubernetes Nginx Ingress host. Be sure to set it to the FQDN of your Nginx Ingress Controller +# proxy server +# livy.server.kubernetes.ingress.host = localhost +# Kubernetes secret name for Nginx Ingress TLS. Is omitted if 'livy.server.kubernetes.ingress.protocol' +# is not https +# livy.server.kubernetes.ingress.tls.secretName = spark-cluster-tls +# Kubernetes Nginx Ingress additional configuration snippet for specific config options +# livy.server.kubernetes.ingress.additionalConfSnippet = +# Kubernetes Nginx Ingress additional annotations for specific config options, eg. for configuring +# basic auth of external oauth2 proxy. Format: annotation1=value1;annotation2=value2;... +# livy.server.kubernetes.ingress.additionalAnnotations = + +# Set to true to enable Grafana Loki integration and configure options below +livy.server.kubernetes.grafana.loki.enabled = false +# Grafana UI root endpoint to build links based on +# livy.server.kubernetes.grafana.url = http://localhost:3000 +# Grafana Datasource name for Loki +# livy.server.kubernetes.grafana.loki.datasource = loki +# Time range from now to past to get logs for +# livy.server.kubernetes.grafana.timeRange = 6h + +# Used to build links to Spark History Server pages on Spark App completion (Kubernetes only) +# livy.ui.history-server-url = http://spark-history-server diff --git a/pom.xml b/pom.xml index 23561962d..8c5e10503 100644 --- a/pom.xml +++ b/pom.xml @@ -85,6 +85,7 @@ 2.4.5 2.4.5 ${spark.scala-2.11.version} + 5.6.0 3.0.0 1.9 4.5.13 @@ -318,6 +319,18 @@ ${metrics.version} + + io.fabric8 + kubernetes-client + ${kubernetes.client.version} + + + com.fasterxml.jackson.core + * + + + + io.netty netty-all diff --git a/rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java b/rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java index b93c5cc71..b5b99f624 100644 --- a/rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java +++ b/rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java @@ -169,6 +169,13 @@ private void initializeServer() throws Exception { // on the cluster, it would be tricky to solve that problem in a generic way. livyConf.set(RPC_SERVER_ADDRESS, null); + // If we are running on Kubernetes, get RPC_SERVER_ADDRESS from "spark.driver.host" option + // this option is set in class org.apache.spark.deploy.k8s.features.DriverServiceFeatureStep: + // line 61: val driverHostname = s"$resolvedServiceName.${kubernetesConf.namespace()}.svc" + if (conf.get("spark.master").startsWith("k8s")) { + livyConf.set(RPC_SERVER_ADDRESS, conf.get("spark.driver.host")); + } + if (livyConf.getBoolean(TEST_STUCK_START_DRIVER)) { // Test flag is turned on so we will just infinite loop here. It should cause // timeout and we should still see yarn application being cleaned up. diff --git a/server/pom.xml b/server/pom.xml index c2a8ef4f1..f9c296e51 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -84,6 +84,11 @@ metrics-healthchecks + + io.fabric8 + kubernetes-client + + javax.servlet javax.servlet-api diff --git a/server/src/main/resources/org/apache/livy/server/ui/static/css/livy-ui.css b/server/src/main/resources/org/apache/livy/server/ui/static/css/livy-ui.css index fc2ca3075..a7df0ec2a 100644 --- a/server/src/main/resources/org/apache/livy/server/ui/static/css/livy-ui.css +++ b/server/src/main/resources/org/apache/livy/server/ui/static/css/livy-ui.css @@ -41,6 +41,12 @@ td .progress { margin: 0; } +.with-scroll-bar { + display: block; + overflow-y: scroll; + max-height: 200px; +} + #session-summary { margin: 20px 0; } \ No newline at end of file diff --git a/server/src/main/resources/org/apache/livy/server/ui/static/js/all-sessions.js b/server/src/main/resources/org/apache/livy/server/ui/static/js/all-sessions.js index d8a84a761..fd68ff715 100644 --- a/server/src/main/resources/org/apache/livy/server/ui/static/js/all-sessions.js +++ b/server/src/main/resources/org/apache/livy/server/ui/static/js/all-sessions.js @@ -30,7 +30,7 @@ function loadSessionsTable(sessions) { tdWrap(session.proxyUser) + tdWrap(session.kind) + tdWrap(session.state) + - tdWrap(logLinks(session, "session")) + + tdWrapWithClass(logLinks(session, "session"), "with-scroll-bar") + "" ); }); @@ -46,7 +46,7 @@ function loadBatchesTable(sessions) { tdWrap(session.owner) + tdWrap(session.proxyUser) + tdWrap(session.state) + - tdWrap(logLinks(session, "batch")) + + tdWrapWithClass(logLinks(session, "batch"), "with-scroll-bar") + "" ); }); diff --git a/server/src/main/resources/org/apache/livy/server/ui/static/js/livy-ui.js b/server/src/main/resources/org/apache/livy/server/ui/static/js/livy-ui.js index f2d743ae6..af9352512 100644 --- a/server/src/main/resources/org/apache/livy/server/ui/static/js/livy-ui.js +++ b/server/src/main/resources/org/apache/livy/server/ui/static/js/livy-ui.js @@ -52,10 +52,23 @@ function driverLogLink(session) { } } +function executorsLogLinks(session) { + var executorLogUrls = session.appInfo.executorLogUrls; + if (executorLogUrls != null) { + return executorLogUrls.split(";").map(function (pair) { + var nameAndLink = pair.split("#"); + return divWrap(anchorLink(nameAndLink[1], nameAndLink[0])); + }).join(""); + } else { + return ""; + } +} + function logLinks(session, kind) { var sessionLog = divWrap(uiLink(kind + "/" + session.id + "/log", "session")); var driverLog = divWrap(driverLogLink(session)); - return sessionLog + driverLog; + var executorsLogs = executorsLogLinks(session); + return sessionLog + driverLog + executorsLogs; } function appIdLink(session) { @@ -75,6 +88,18 @@ function tdWrap(val) { return "" + inner + ""; } +function tdWrapWithClass(val, cl) { + var inner = ""; + if (val != null) { + inner = val; + } + var clVal = ""; + if (cl != null) { + clVal = " class=\"" + cl + "\""; + } + return "" + inner + ""; +} + function preWrap(inner) { return "
" + escapeHtml(inner) + "
"; } diff --git a/server/src/main/resources/org/apache/livy/server/ui/static/js/session.js b/server/src/main/resources/org/apache/livy/server/ui/static/js/session.js index c87e5ca40..3a23dc982 100644 --- a/server/src/main/resources/org/apache/livy/server/ui/static/js/session.js +++ b/server/src/main/resources/org/apache/livy/server/ui/static/js/session.js @@ -23,6 +23,18 @@ function sumWrap(name, val) { } } +function sumWrapWithClass(name, val, cl) { + var clVal = ""; + if (cl != null) { + clVal = " class=\"" + cl + "\""; + } + if (val != null) { + return "" + name + ": " + val + ""; + } else { + return ""; + } +} + function formatError(output) { var errStr = output.evalue + "\n"; var trace = output.traceback; @@ -93,7 +105,7 @@ function appendSummary(session) { sumWrap("Proxy User", session.proxyUser) + sumWrap("Session Kind", session.kind) + sumWrap("State", session.state) + - sumWrap("Logs", logLinks(session, "session")) + + sumWrapWithClass("Logs", logLinks(session, "session"), "with-scroll-bar") + "" ); } diff --git a/server/src/main/scala/org/apache/livy/LivyConf.scala b/server/src/main/scala/org/apache/livy/LivyConf.scala index 720aa4e15..6bef09748 100644 --- a/server/src/main/scala/org/apache/livy/LivyConf.scala +++ b/server/src/main/scala/org/apache/livy/LivyConf.scala @@ -258,6 +258,63 @@ object LivyConf { // value specifies max attempts to retry when safe mode is ON in hdfs filesystem val HDFS_SAFE_MODE_MAX_RETRY_ATTEMPTS = Entry("livy.server.hdfs.safe-mode.max.retry.attempts", 12) + // Kubernetes oauth token file path. + val KUBERNETES_OAUTH_TOKEN_FILE = Entry("livy.server.kubernetes.oauthTokenFile", "") + // Kubernetes oauth token string value. + val KUBERNETES_OAUTH_TOKEN_VALUE = Entry("livy.server.kubernetes.oauthTokenValue", "") + // Kubernetes CA cert file path. + val KUBERNETES_CA_CERT_FILE = Entry("livy.server.kubernetes.caCertFile", "") + // Kubernetes client key file path. + val KUBERNETES_CLIENT_KEY_FILE = Entry("livy.server.kubernetes.clientKeyFile", "") + // Kubernetes client cert file path. + val KUBERNETES_CLIENT_CERT_FILE = Entry("livy.server.kubernetes.clientCertFile", "") + + // If Livy can't find the Kubernetes app within this time, consider it lost. + val KUBERNETES_APP_LOOKUP_TIMEOUT = Entry("livy.server.kubernetes.app-lookup-timeout", "600s") + // How often Livy polls Kubernetes to refresh Kubernetes app state. + val KUBERNETES_POLL_INTERVAL = Entry("livy.server.kubernetes.poll-interval", "15s") + + // How long to check livy session leakage. + val KUBERNETES_APP_LEAKAGE_CHECK_TIMEOUT = + Entry("livy.server.kubernetes.app-leakage.check-timeout", "600s") + // How often to check livy session leakage. + val KUBERNETES_APP_LEAKAGE_CHECK_INTERVAL = + Entry("livy.server.kubernetes.app-leakage.check-interval", "60s") + + // Weather to create Kubernetes Nginx Ingress for Spark UI. + val KUBERNETES_INGRESS_CREATE = Entry("livy.server.kubernetes.ingress.create", false) + // Kubernetes Ingress class name. + val KUBERNETES_INGRESS_CLASS_NAME = Entry("livy.server.kubernetes.ingress.className", "") + // Kubernetes Nginx Ingress protocol. + val KUBERNETES_INGRESS_PROTOCOL = Entry("livy.server.kubernetes.ingress.protocol", "http") + // Kubernetes Nginx Ingress host. + val KUBERNETES_INGRESS_HOST = Entry("livy.server.kubernetes.ingress.host", "localhost") + // Kubernetes Nginx Ingress additional configuration snippet. + val KUBERNETES_INGRESS_ADDITIONAL_CONF_SNIPPET = + Entry("livy.server.kubernetes.ingress.additionalConfSnippet", "") + // Kubernetes Nginx Ingress additional annotations: key1=value1;key2=value2;... . + val KUBERNETES_INGRESS_ADDITIONAL_ANNOTATIONS = + Entry("livy.server.kubernetes.ingress.additionalAnnotations", "") + // Kubernetes secret name for Nginx Ingress TLS. + // Is omitted if 'livy.server.kubernetes.ingress.protocol' value doesn't end with 's' + val KUBERNETES_INGRESS_TLS_SECRET_NAME = + Entry("livy.server.kubernetes.ingress.tls.secretName", "spark-cluster-tls") + + val KUBERNETES_GRAFANA_LOKI_ENABLED = Entry("livy.server.kubernetes.grafana.loki.enabled", false) + val KUBERNETES_GRAFANA_URL = Entry("livy.server.kubernetes.grafana.url", "http://localhost:3000") + val KUBERNETES_GRAFANA_LOKI_DATASOURCE = + Entry("livy.server.kubernetes.grafana.loki.datasource", "loki") + val KUBERNETES_GRAFANA_TIME_RANGE = Entry("livy.server.kubernetes.grafana.timeRange", "6h") + + // side car container for spark pods enabled? + val KUBERNETES_SPARK_SIDECAR_ENABLED = + Entry("livy.server.kubernetes.spark.sidecar.enabled", true) + // container name to identify spark pod if running with sidecar containers + val KUBERNETES_SPARK_CONTAINER_NAME = + Entry("livy.server.kubernetes.spark.container.name", "spark-container") + + val UI_HISTORY_SERVER_URL = Entry("livy.ui.history-server-url", "http://spark-history-server") + // Whether session timeout should be checked, by default it will be checked, which means inactive // session will be stopped after "livy.server.session.timeout" val SESSION_TIMEOUT_CHECK = Entry("livy.server.session.timeout-check", true) @@ -371,6 +428,9 @@ class LivyConf(loadDefaults: Boolean) extends ClientConf[LivyConf](null) { /** Return true if spark master starts with yarn. */ def isRunningOnYarn(): Boolean = sparkMaster().startsWith("yarn") + /** Return true if spark master starts with k8s. */ + def isRunningOnKubernetes(): Boolean = sparkMaster().startsWith("k8s") + /** Return the spark deploy mode Livy sessions should use. */ def sparkDeployMode(): Option[String] = Option(get(LIVY_SPARK_DEPLOY_MODE)).filterNot(_.isEmpty) diff --git a/server/src/main/scala/org/apache/livy/server/LivyServer.scala b/server/src/main/scala/org/apache/livy/server/LivyServer.scala index 3e715bdf1..c7c7fe756 100644 --- a/server/src/main/scala/org/apache/livy/server/LivyServer.scala +++ b/server/src/main/scala/org/apache/livy/server/LivyServer.scala @@ -43,8 +43,8 @@ import org.apache.livy.server.recovery.{SessionStore, StateStore, ZooKeeperManag import org.apache.livy.server.ui.UIServlet import org.apache.livy.sessions.{BatchSessionManager, InteractiveSessionManager} import org.apache.livy.sessions.SessionManager.SESSION_RECOVERY_MODE_OFF +import org.apache.livy.utils.{SparkKubernetesApp, SparkYarnApp} import org.apache.livy.utils.LivySparkUtils._ -import org.apache.livy.utils.SparkYarnApp class LivyServer extends Logging { @@ -142,10 +142,12 @@ class LivyServer extends Logging { testRecovery(livyConf) - // Initialize YarnClient ASAP to save time. + // Initialize YarnClient/KubernetesClient ASAP to save time. if (livyConf.isRunningOnYarn()) { SparkYarnApp.init(livyConf) Future { SparkYarnApp.yarnClient } + } else if (livyConf.isRunningOnKubernetes()) { + SparkKubernetesApp.init(livyConf) } if (livyConf.get(LivyConf.RECOVERY_STATE_STORE) == "zookeeper") { @@ -415,10 +417,10 @@ class LivyServer extends Logging { } private[livy] def testRecovery(livyConf: LivyConf): Unit = { - if (!livyConf.isRunningOnYarn()) { - // If recovery is turned on but we are not running on YARN, quit. + if (!livyConf.isRunningOnYarn() && !livyConf.isRunningOnKubernetes()) { + // If recovery is turned on, and we are not running on YARN or Kubernetes, quit. require(livyConf.get(LivyConf.RECOVERY_MODE) == SESSION_RECOVERY_MODE_OFF, - "Session recovery requires YARN.") + "Session recovery requires YARN or Kubernetes.") } } } diff --git a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala index 34499d34c..d8c4a16b5 100644 --- a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala +++ b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala @@ -462,11 +462,11 @@ class InteractiveSession( app = mockApp.orElse { val driverProcess = client.flatMap { c => Option(c.getDriverProcess) } .map(new LineBufferedProcess(_, livyConf.getInt(LivyConf.SPARK_LOGS_SIZE))) - - if (livyConf.isRunningOnYarn() || driverProcess.isDefined) { - Some(SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this))) + if (!livyConf.isRunningOnKubernetes()) { + driverProcess.map(_ => SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this))) } else { - None + // Create SparkApp for Kubernetes anyway + Some(SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this))) } } @@ -547,6 +547,8 @@ class InteractiveSession( transition(SessionState.ShuttingDown) sessionStore.remove(RECOVERY_SESSION_TYPE, id) client.foreach { _.stop(true) } + // We need to call #kill here explicitly to delete Interactive pods from the cluster + if (livyConf.isRunningOnKubernetes()) app.foreach(_.kill()) } catch { case _: Exception => app.foreach { diff --git a/server/src/main/scala/org/apache/livy/sessions/SessionManager.scala b/server/src/main/scala/org/apache/livy/sessions/SessionManager.scala index b81bbc036..8742f65e7 100644 --- a/server/src/main/scala/org/apache/livy/sessions/SessionManager.scala +++ b/server/src/main/scala/org/apache/livy/sessions/SessionManager.scala @@ -192,10 +192,10 @@ class SessionManager[S <: Session, R <: RecoveryMetadata : ClassTag]( Future.sequence(all().filter(expired).map { s => s.state match { - case st: FinishedSessionState => + case _: FinishedSessionState => info(s"Deleting $s because it finished before ${sessionStateRetainedInSec / 1e9} secs.") case _ => - info(s"Deleting $s because it was inactive or the time to leave the period is over.") + info(s"Deleting $s because it was inactive for more than ${sessionTimeout / 1e9} secs.") } delete(s) }) diff --git a/server/src/main/scala/org/apache/livy/utils/SparkApp.scala b/server/src/main/scala/org/apache/livy/utils/SparkApp.scala index 9afe28162..e424f80fc 100644 --- a/server/src/main/scala/org/apache/livy/utils/SparkApp.scala +++ b/server/src/main/scala/org/apache/livy/utils/SparkApp.scala @@ -24,12 +24,20 @@ import org.apache.livy.LivyConf object AppInfo { val DRIVER_LOG_URL_NAME = "driverLogUrl" val SPARK_UI_URL_NAME = "sparkUiUrl" + val EXECUTORS_LOG_URLS_NAME = "executorLogUrls" } -case class AppInfo(var driverLogUrl: Option[String] = None, var sparkUiUrl: Option[String] = None) { +case class AppInfo( + var driverLogUrl: Option[String] = None, + var sparkUiUrl: Option[String] = None, + var executorLogUrls: Option[String] = None) { import AppInfo._ def asJavaMap: java.util.Map[String, String] = - Map(DRIVER_LOG_URL_NAME -> driverLogUrl.orNull, SPARK_UI_URL_NAME -> sparkUiUrl.orNull).asJava + Map( + DRIVER_LOG_URL_NAME -> driverLogUrl.orNull, + SPARK_UI_URL_NAME -> sparkUiUrl.orNull, + EXECUTORS_LOG_URLS_NAME -> executorLogUrls.orNull + ).asJava } trait SparkAppListener { @@ -71,13 +79,21 @@ object SparkApp { sparkConf ++ Map( SPARK_YARN_TAG_KEY -> mergedYarnTags, "spark.yarn.submit.waitAppCompletion" -> "false") + } else if (livyConf.isRunningOnKubernetes()) { + import KubernetesConstants._ + sparkConf ++ Map( + s"spark.kubernetes.driver.label.$SPARK_APP_TAG_LABEL" -> uniqueAppTag, + s"spark.kubernetes.executor.label.$SPARK_APP_TAG_LABEL" -> uniqueAppTag, + "spark.kubernetes.submission.waitAppCompletion" -> "false", + "spark.ui.proxyBase" -> s"/$uniqueAppTag") } else { sparkConf } } /** - * Return a SparkApp object to control the underlying Spark application via YARN or spark-submit. + * Return a SparkApp object to control the underlying Spark application via YARN, Kubernetes + * or spark-submit. * * @param uniqueAppTag A tag that can uniquely identify the application. */ @@ -89,8 +105,11 @@ object SparkApp { listener: Option[SparkAppListener]): SparkApp = { if (livyConf.isRunningOnYarn()) { new SparkYarnApp(uniqueAppTag, appId, process, listener, livyConf) + } else if (livyConf.isRunningOnKubernetes()) { + new SparkKubernetesApp(uniqueAppTag, appId, process, listener, livyConf) } else { - require(process.isDefined, "process must not be None when Livy master is not YARN.") + require(process.isDefined, "process must not be None when Livy master is not YARN or" + + "Kubernetes.") new SparkProcApp(process.get, listener) } } diff --git a/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala b/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala new file mode 100644 index 000000000..b5160aaf1 --- /dev/null +++ b/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala @@ -0,0 +1,739 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.livy.utils + +import java.net.URLEncoder +import java.util.Collections +import java.util.concurrent.TimeoutException + +import scala.annotation.tailrec +import scala.collection.mutable.ArrayBuffer +import scala.concurrent._ +import scala.concurrent.duration._ +import scala.language.postfixOps +import scala.util.{Failure, Success, Try} +import scala.util.control.NonFatal + +import io.fabric8.kubernetes.api.model._ +import io.fabric8.kubernetes.api.model.networking.v1.{Ingress, IngressBuilder} +import io.fabric8.kubernetes.client.{Config, ConfigBuilder, _} +import org.apache.commons.lang.StringUtils + +import org.apache.livy.{LivyConf, Logging, Utils} + +object SparkKubernetesApp extends Logging { + + private val leakedAppTags = new java.util.concurrent.ConcurrentHashMap[String, Long]() + + private val leakedAppsGCThread = new Thread() { + override def run(): Unit = { + import KubernetesExtensions._ + while (true) { + if (!leakedAppTags.isEmpty) { + // kill the app if found it and remove it if exceeding a threshold + val iter = leakedAppTags.entrySet().iterator() + var isRemoved = false + val now = System.currentTimeMillis() + val apps = withRetry(kubernetesClient.getApplications()) + while (iter.hasNext) { + val entry = iter.next() + apps.find(_.getApplicationTag.contains(entry.getKey)) + .foreach({ + app => + info(s"Kill leaked app ${app.getApplicationId}") + withRetry(kubernetesClient.killApplication(app)) + iter.remove() + isRemoved = true + }) + if (!isRemoved) { + if ((entry.getValue - now) > sessionLeakageCheckTimeout) { + iter.remove() + info(s"Remove leaked Kubernetes app tag ${entry.getKey}") + } + } + } + } + Thread.sleep(sessionLeakageCheckInterval) + } + } + } + + val RefreshServiceAccountTokenThread = new Thread() { + override def run(): Unit = { + while (true) { + var currentContext = new Context() + var currentContextName = new String + val config = kubernetesClient.getConfiguration + if (config.getCurrentContext != null) { + currentContext = config.getCurrentContext.getContext + currentContextName = config.getCurrentContext.getName + } + + var newAccessToken = new String + val newestConfig = Config.autoConfigure(currentContextName) + newAccessToken = newestConfig.getOauthToken + info(s"Refresh a new token ${newAccessToken}") + + config.setOauthToken(newAccessToken) + kubernetesClient = new DefaultKubernetesClient(config) + + // Token will expire 1 hour default, community recommend to update every 5 minutes + Thread.sleep(300000) + } + } + } + + private var livyConf: LivyConf = _ + + private var cacheLogSize: Int = _ + private var appLookupTimeout: FiniteDuration = _ + private var pollInterval: FiniteDuration = _ + + private var sessionLeakageCheckTimeout: Long = _ + private var sessionLeakageCheckInterval: Long = _ + + var kubernetesClient: DefaultKubernetesClient = _ + + def init(livyConf: LivyConf): Unit = { + this.livyConf = livyConf + + // KubernetesClient is thread safe. Create once, share it across threads. + kubernetesClient = + KubernetesClientFactory.createKubernetesClient(livyConf) + + cacheLogSize = livyConf.getInt(LivyConf.SPARK_LOGS_SIZE) + appLookupTimeout = livyConf.getTimeAsMs(LivyConf.KUBERNETES_APP_LOOKUP_TIMEOUT).milliseconds + pollInterval = livyConf.getTimeAsMs(LivyConf.KUBERNETES_POLL_INTERVAL).milliseconds + + sessionLeakageCheckInterval = + livyConf.getTimeAsMs(LivyConf.KUBERNETES_APP_LEAKAGE_CHECK_INTERVAL) + sessionLeakageCheckTimeout = livyConf.getTimeAsMs(LivyConf.KUBERNETES_APP_LEAKAGE_CHECK_TIMEOUT) + + leakedAppsGCThread.setDaemon(true) + leakedAppsGCThread.setName("LeakedAppsGCThread") + leakedAppsGCThread.start() + + RefreshServiceAccountTokenThread. + setName("RefreshServiceAccountTokenThread") + RefreshServiceAccountTokenThread.setDaemon(true) + RefreshServiceAccountTokenThread.start() + } + + // Returning T, throwing the exception on failure + // When istio-proxy restarts, the access to K8s API from livy could be down + // until envoy comes back, which could take upto 30 seconds + @tailrec + private def withRetry[T](fn: => T, n: Int = 10, retryBackoff: Long = 3000): T = { + Try { fn } match { + case Success(x) => x + case _ if n > 1 => + Thread.sleep(Math.max(retryBackoff, 3000)) + withRetry(fn, n - 1) + case Failure(e) => throw e + } + } + +} + +class SparkKubernetesApp private[utils] ( + appTag: String, + appIdOption: Option[String], + process: Option[LineBufferedProcess], + listener: Option[SparkAppListener], + livyConf: LivyConf, + kubernetesClient: => KubernetesClient = SparkKubernetesApp.kubernetesClient) // For unit test. + extends SparkApp + with Logging { + + import KubernetesExtensions._ + import SparkKubernetesApp._ + + private val appPromise: Promise[KubernetesApplication] = Promise() + private[utils] var state: SparkApp.State = SparkApp.State.STARTING + private var kubernetesDiagnostics: IndexedSeq[String] = IndexedSeq.empty[String] + private var kubernetesAppLog: IndexedSeq[String] = IndexedSeq.empty[String] + + // Exposed for unit test. + // TODO Instead of spawning a thread for every session, create a centralized thread and + // batch Kubernetes queries. + private[utils] val kubernetesAppMonitorThread = Utils + .startDaemonThread(s"kubernetesAppMonitorThread-$this") { + try { + // Get KubernetesApplication by appTag. + val app: KubernetesApplication = try { + getAppFromTag(appTag, pollInterval, appLookupTimeout.fromNow) + } catch { + case e: Exception => + appPromise.failure(e) + throw e + } + appPromise.success(app) + val appId = app.getApplicationId + + Thread.currentThread().setName(s"kubernetesAppMonitorThread-$appId") + listener.foreach(_.appIdKnown(appId)) + + if (livyConf.getBoolean(LivyConf.KUBERNETES_INGRESS_CREATE)) { + withRetry(kubernetesClient.createSparkUIIngress(app, livyConf)) + } + + var appInfo = AppInfo() + while (isRunning) { + try { + Clock.sleep(pollInterval.toMillis) + + // Refresh application state + val appReport = withRetry { + debug(s"getApplicationReport, applicationId: ${app.getApplicationId}, " + + s"namespace: ${app.getApplicationNamespace} " + + s"applicationTag: ${app.getApplicationTag}") + val report = kubernetesClient.getApplicationReport(livyConf, app, + cacheLogSize = cacheLogSize) + report + } + + kubernetesAppLog = appReport.getApplicationLog + kubernetesDiagnostics = appReport.getApplicationDiagnostics + changeState(mapKubernetesState(appReport.getApplicationState, appTag)) + + val latestAppInfo = AppInfo( + appReport.getDriverLogUrl, + appReport.getTrackingUrl, + appReport.getExecutorsLogUrls + ) + if (appInfo != latestAppInfo) { + listener.foreach(_.infoChanged(latestAppInfo)) + appInfo = latestAppInfo + } + } catch { + // TODO analyse available exceptions + case e: Throwable => + throw e + } + } + debug(s"$appId $state ${kubernetesDiagnostics.mkString(" ")}") + } catch { + case _: InterruptedException => + kubernetesDiagnostics = ArrayBuffer("Application stopped by user.") + changeState(SparkApp.State.KILLED) + case NonFatal(e) => + error(s"Error while refreshing Kubernetes state", e) + kubernetesDiagnostics = ArrayBuffer(e.getMessage) + changeState(SparkApp.State.FAILED) + } finally { + if (!isRunning) { + listener.foreach(_.infoChanged(AppInfo(sparkUiUrl = Option(buildHistoryServerUiUrl( + livyConf, Try(appPromise.future.value.get.get.getApplicationId).getOrElse("unknown") + ))))) + } + } + } + + override def log(): IndexedSeq[String] = + ("stdout: " +: kubernetesAppLog) ++ + ("\nstderr: " +: (process.map(_.inputLines).getOrElse(ArrayBuffer.empty[String]) ++ + process.map(_.errorLines).getOrElse(ArrayBuffer.empty[String]))) ++ + ("\nKubernetes Diagnostics: " +: kubernetesDiagnostics) + + override def kill(): Unit = synchronized { + try { + withRetry(kubernetesClient.killApplication(Await.result(appPromise.future, appLookupTimeout))) + } catch { + // We cannot kill the Kubernetes app without the appTag. + // There's a chance the Kubernetes app hasn't been submitted during a livy-server failure. + // We don't want a stuck session that can't be deleted. Emit a warning and move on. + case _: TimeoutException | _: InterruptedException => + warn("Deleting a session while its Kubernetes application is not found.") + kubernetesAppMonitorThread.interrupt() + } finally { + process.foreach(_.destroy()) + } + } + + private def isRunning: Boolean = { + state != SparkApp.State.FAILED && + state != SparkApp.State.FINISHED && + state != SparkApp.State.KILLED + } + + private def changeState(newState: SparkApp.State.Value): Unit = { + if (state != newState) { + listener.foreach(_.stateChanged(state, newState)) + state = newState + } + } + + /** + * Find the corresponding KubernetesApplication from an application tag. + * + * @param appTag The application tag tagged on the target application. + * If the tag is not unique, it returns the first application it found. + * @return KubernetesApplication or the failure. + */ + @tailrec + private def getAppFromTag( + appTag: String, + pollInterval: duration.Duration, + deadline: Deadline): KubernetesApplication = { + import KubernetesExtensions._ + + withRetry(kubernetesClient.getApplications().find(_.getApplicationTag.contains(appTag))) + match { + case Some(app) => app + case None => + if (deadline.isOverdue) { + process.foreach(_.destroy()) + leakedAppTags.put(appTag, System.currentTimeMillis()) + throw new IllegalStateException(s"No Kubernetes application is found with tag" + + s" $appTag in ${livyConf.getTimeAsMs(LivyConf.KUBERNETES_APP_LOOKUP_TIMEOUT) / 1000}" + + " seconds. This may be because 1) spark-submit failed to submit application to " + + "Kubernetes; or 2) Kubernetes cluster doesn't have enough resources to start the " + + "application in time. Please check Livy log and Kubernetes log to know the details.") + } else if (process.exists(p => !p.isAlive && p.exitValue() != 0)) { + throw new IllegalStateException(s"Failed to submit Kubernetes application with tag" + + s" $appTag. 'spark-submit' exited with non-zero status. " + + s"Please check Livy log and Kubernetes log to know the details.") + } else { + Clock.sleep(pollInterval.toMillis) + getAppFromTag(appTag, pollInterval, deadline) + } + } + } + + // Exposed for unit test. + private[utils] def mapKubernetesState( + kubernetesAppState: String, + appTag: String + ): SparkApp.State.Value = { + import KubernetesApplicationState._ + kubernetesAppState.toLowerCase match { + case PENDING | CONTAINER_CREATING => + SparkApp.State.STARTING + case RUNNING => + SparkApp.State.RUNNING + case COMPLETED | SUCCEEDED => + SparkApp.State.FINISHED + case FAILED | ERROR => + SparkApp.State.FAILED + case other => // any other combination is invalid, so FAIL the application. + error(s"Unknown Kubernetes state $other for app with tag $appTag.") + SparkApp.State.FAILED + } + } + + private def buildHistoryServerUiUrl(livyConf: LivyConf, appId: String): String = + s"${livyConf.get(LivyConf.UI_HISTORY_SERVER_URL)}/history/$appId/jobs/" + +} + +object KubernetesApplicationState { + val PENDING = "pending" + val CONTAINER_CREATING = "containercreating" + val RUNNING = "running" + val COMPLETED = "completed" + val SUCCEEDED = "succeeded" + val FAILED = "failed" + val ERROR = "error" +} + +object KubernetesConstants { + val SPARK_APP_ID_LABEL = "spark-app-selector" + val SPARK_APP_TAG_LABEL = "spark-app-tag" + val SPARK_ROLE_LABEL = "spark-role" + val SPARK_EXEC_ID_LABEL = "spark-exec-id" + val SPARK_ROLE_DRIVER = "driver" + val SPARK_ROLE_EXECUTOR = "executor" + val SPARK_UI_PORT_NAME = "spark-ui" + val CREATED_BY_LIVY_LABEL = Map("created-by" -> "livy") +} + +class KubernetesApplication(driverPod: Pod) { + + import KubernetesConstants._ + + private val appTag = driverPod.getMetadata.getLabels.get(SPARK_APP_TAG_LABEL) + private val appId = driverPod.getMetadata.getLabels.get(SPARK_APP_ID_LABEL) + private val namespace = driverPod.getMetadata.getNamespace + + def getApplicationTag: String = appTag + + def getApplicationId: String = appId + + def getApplicationNamespace: String = namespace + + def getApplicationPod: Pod = driverPod +} + +private[utils] case class KubernetesAppReport(driver: Option[Pod], executors: Seq[Pod], + appLog: IndexedSeq[String], ingress: Option[Ingress], livyConf: LivyConf) { + + import KubernetesConstants._ + + private val grafanaUrl = livyConf.get(LivyConf.KUBERNETES_GRAFANA_URL) + private val timeRange = livyConf.get(LivyConf.KUBERNETES_GRAFANA_TIME_RANGE) + private val lokiDatasource = livyConf.get(LivyConf.KUBERNETES_GRAFANA_LOKI_DATASOURCE) + private val sparkAppTagLogLabel = SPARK_APP_TAG_LABEL.replaceAll("-", "_") + private val sparkRoleLogLabel = SPARK_ROLE_LABEL.replaceAll("-", "_") + private val sparkExecIdLogLabel = SPARK_EXEC_ID_LABEL.replaceAll("-", "_") + + def getApplicationState: String = + driver.map(getDriverState).getOrElse("unknown") + + // if 'KUBERNETES_SPARK_SIDECAR_ENABLED' is set + // inspect the spark container status to figure out the termination status + // if spark container cannot be detected, default to pod phase. + def getDriverState(driverPod: Pod): String = { + val podStatus = driverPod.getStatus + val phase = podStatus.getPhase.toLowerCase + // if not running with sidecars, just return the pod phase + if (!livyConf.getBoolean(LivyConf.KUBERNETES_SPARK_SIDECAR_ENABLED)) { + return phase + } + if (phase != KubernetesApplicationState.RUNNING) { + return phase + } + // if the POD is still running, check spark container termination status + // default to pod phase if container state is indeterminate. + getTerminalState(podStatus).getOrElse(phase) + } + + // if the spark container has terminated + // try to figure out status based on termination status + def getTerminalState(podStatus: PodStatus): Option[String] = { + import scala.collection.JavaConverters._ + val sparkContainerName = livyConf.get(LivyConf.KUBERNETES_SPARK_CONTAINER_NAME) + for (c <- podStatus.getContainerStatuses.asScala) { + if (c.getName == sparkContainerName && c.getState.getTerminated != null) { + val exitCode = c.getState.getTerminated.getExitCode + if (exitCode == 0) { + return Some(KubernetesApplicationState.SUCCEEDED) + } else { + return Some(KubernetesApplicationState.FAILED) + } + } + } + None + } + + def getApplicationLog: IndexedSeq[String] = appLog + + def getDriverLogUrl: Option[String] = { + if (livyConf.getBoolean(LivyConf.KUBERNETES_GRAFANA_LOKI_ENABLED)) { + val appTag = driver.map(_.getMetadata.getLabels.get(SPARK_APP_TAG_LABEL)) + if (appTag.isDefined && appTag.get != null) { + return Some( + s"""$grafanaUrl/explore?left=""" + URLEncoder.encode( + s"""["now-$timeRange","now","$lokiDatasource",""" + + s"""{"expr":"{$sparkAppTagLogLabel=\\"${appTag.get}\\",""" + + s"""$sparkRoleLogLabel=\\"$SPARK_ROLE_DRIVER\\"}"},""" + + s"""{"ui":[true,true,true,"exact"]}]""", "UTF-8") + ) + } + } + None + } + + def getExecutorsLogUrls: Option[String] = { + if (livyConf.getBoolean(LivyConf.KUBERNETES_GRAFANA_LOKI_ENABLED)) { + val urls = executors.map(_.getMetadata.getLabels).flatMap(labels => { + val sparkAppTag = labels.get(SPARK_APP_TAG_LABEL) + val sparkExecId = labels.get(SPARK_EXEC_ID_LABEL) + if (sparkAppTag != null && sparkExecId != null) { + val sparkRole = labels.getOrDefault(SPARK_ROLE_LABEL, SPARK_ROLE_EXECUTOR) + Some(s"executor-$sparkExecId#$grafanaUrl/explore?left=" + URLEncoder.encode( + s"""["now-$timeRange","now","$lokiDatasource",""" + + s"""{"expr":"{$sparkAppTagLogLabel=\\"$sparkAppTag\\",""" + + s"""$sparkRoleLogLabel=\\"$sparkRole\\",""" + + s"""$sparkExecIdLogLabel=\\"$sparkExecId\\"}"},""" + + s"""{"ui":[true,true,true,"exact"]}]""", "UTF-8")) + } else { + None + } + }) + if (urls.nonEmpty) return Some(urls.mkString(";")) + } + None + } + + def getTrackingUrl: Option[String] = { + val host = ingress.flatMap(i => Try(i.getSpec.getRules.get(0).getHost).toOption) + val path = driver + .map(_.getMetadata.getLabels.getOrDefault(SPARK_APP_TAG_LABEL, "unknown")) + val protocol = livyConf.get(LivyConf.KUBERNETES_INGRESS_PROTOCOL) + if (host.isDefined && path.isDefined) Some(s"$protocol://${host.get}/${path.get}") + else None + } + + def getApplicationDiagnostics: IndexedSeq[String] = { + (Seq(driver) ++ executors.sortBy(_.getMetadata.getName).map(Some(_))) + .filter(_.nonEmpty) + .map(opt => buildSparkPodDiagnosticsPrettyString(opt.get)) + .flatMap(_.split("\n")).toIndexedSeq + } + + private def buildSparkPodDiagnosticsPrettyString(pod: Pod): String = { + import scala.collection.JavaConverters._ + def printMap(map: Map[_, _]): String = map.map { + case (key, value) => s"$key=$value" + }.mkString(", ") + + if (pod == null) return "unknown" + + s"${pod.getMetadata.getName}.${pod.getMetadata.getNamespace}:" + + s"\n\tnode: ${pod.getSpec.getNodeName}" + + s"\n\thostname: ${pod.getSpec.getHostname}" + + s"\n\tpodIp: ${pod.getStatus.getPodIP}" + + s"\n\tstartTime: ${pod.getStatus.getStartTime}" + + s"\n\tphase: ${pod.getStatus.getPhase}" + + s"\n\treason: ${pod.getStatus.getReason}" + + s"\n\tmessage: ${pod.getStatus.getMessage}" + + s"\n\tlabels: ${printMap(pod.getMetadata.getLabels.asScala.toMap)}" + + s"\n\tcontainers:" + + s"\n\t\t${ + pod.getSpec.getContainers.asScala.map(container => + s"${container.getName}:" + + s"\n\t\t\timage: ${container.getImage}" + + s"\n\t\t\trequests: ${printMap(container.getResources.getRequests.asScala.toMap)}" + + s"\n\t\t\tlimits: ${printMap(container.getResources.getLimits.asScala.toMap)}" + + s"\n\t\t\tcommand: ${container.getCommand} ${container.getArgs}" + ).mkString("\n\t\t") + }" + + s"\n\tconditions:" + + s"\n\t\t${pod.getStatus.getConditions.asScala.mkString("\n\t\t")}" + } + +} + +private[utils] object KubernetesExtensions { + import KubernetesConstants._ + + implicit class KubernetesClientExtensions(client: KubernetesClient) { + import scala.collection.JavaConverters._ + + private val NGINX_CONFIG_SNIPPET: String = + """ + |proxy_set_header Accept-Encoding ""; + |sub_filter_last_modified off; + |sub_filter_once off; + |sub_filter_types text/html text/css text/javascript application/javascript; + """.stripMargin + + def getApplications( + labels: Map[String, String] = Map(SPARK_ROLE_LABEL -> SPARK_ROLE_DRIVER), + appTagLabel: String = SPARK_APP_TAG_LABEL, + appIdLabel: String = SPARK_APP_ID_LABEL + ): Seq[KubernetesApplication] = { + client.pods.inAnyNamespace + .withLabels(labels.asJava) + .withLabel(appTagLabel) + .withLabel(appIdLabel) + .list.getItems.asScala.map(new KubernetesApplication(_)) + } + + def killApplication(app: KubernetesApplication): Boolean = { + client.pods.inAnyNamespace.delete(app.getApplicationPod) + } + + def getApplicationReport( + livyConf: LivyConf, + app: KubernetesApplication, + cacheLogSize: Int, + appTagLabel: String = SPARK_APP_TAG_LABEL + ): KubernetesAppReport = { + val pods = client.pods.inNamespace(app.getApplicationNamespace) + .withLabels(Map(appTagLabel -> app.getApplicationTag).asJava) + .list.getItems.asScala + val driver = pods.find(_.getMetadata.getLabels.get(SPARK_ROLE_LABEL) == SPARK_ROLE_DRIVER) + val executors = + pods.filter(_.getMetadata.getLabels.get(SPARK_ROLE_LABEL) == SPARK_ROLE_EXECUTOR) + val appLog = Try( + client.pods.inNamespace(app.getApplicationNamespace) + .withName(app.getApplicationPod.getMetadata.getName) + .tailingLines(cacheLogSize).getLog.split("\n").toIndexedSeq + ).getOrElse(IndexedSeq.empty) + val ingress = client.network.v1.ingresses.inNamespace(app.getApplicationNamespace) + .withLabel(SPARK_APP_TAG_LABEL, app.getApplicationTag) + .list.getItems.asScala.headOption + KubernetesAppReport(driver, executors, appLog, ingress, livyConf) + } + + def createSparkUIIngress(app: KubernetesApplication, livyConf: LivyConf): Unit = { + val annotationsString = livyConf.get(LivyConf.KUBERNETES_INGRESS_ADDITIONAL_ANNOTATIONS) + var annotations: Seq[(String, String)] = Seq.empty + if (annotationsString != null && annotationsString.trim.nonEmpty) { + annotations = annotationsString + .split(";").map(_.split("=")) + .map(array => array.head -> array.tail.mkString("=")).toSeq + } + + val sparkUIIngress = buildSparkUIIngress( + app, + livyConf.get(LivyConf.KUBERNETES_INGRESS_CLASS_NAME), + livyConf.get(LivyConf.KUBERNETES_INGRESS_PROTOCOL), + livyConf.get(LivyConf.KUBERNETES_INGRESS_HOST), + livyConf.get(LivyConf.KUBERNETES_INGRESS_TLS_SECRET_NAME), + livyConf.get(LivyConf.KUBERNETES_INGRESS_ADDITIONAL_CONF_SNIPPET), + annotations: _* + ) + val resources: Seq[HasMetadata] = Seq(sparkUIIngress) + addOwnerReference(app.getApplicationPod, resources: _*) + client.network.v1.ingresses.inNamespace(app.getApplicationNamespace). + createOrReplace(sparkUIIngress) + } + + private[utils] def buildSparkUIIngress( + app: KubernetesApplication, className: String, protocol: String, host: String, + tlsSecretName: String, additionalConfSnippet: String, additionalAnnotations: (String, String)* + ): Ingress = { + val appTag = app.getApplicationTag + val serviceHost = s"${getServiceName(app)}.${app.getApplicationNamespace}.svc.cluster.local" + + // Common annotations + val annotations = Map( + "nginx.ingress.kubernetes.io/rewrite-target" -> "/$1", + "nginx.ingress.kubernetes.io/proxy-redirect-to" -> s"/$appTag/", + "nginx.ingress.kubernetes.io/proxy-redirect-from" -> s"http://$serviceHost/", + "nginx.ingress.kubernetes.io/upstream-vhost" -> s"$serviceHost", + "nginx.ingress.kubernetes.io/service-upstream" -> "true", + "nginx.ingress.kubernetes.io/x-forwarded-prefix" -> s"/$appTag", + "nginx.ingress.kubernetes.io/configuration-snippet" -> + NGINX_CONFIG_SNIPPET.concat(additionalConfSnippet) + ) ++ additionalAnnotations + + val builder = new IngressBuilder() + .withApiVersion("networking.k8s.io/v1") + .withNewMetadata() + .withName(getServiceName(app)) + .withNamespace(app.getApplicationNamespace) + .addToAnnotations(annotations.asJava) + .addToLabels(SPARK_APP_TAG_LABEL, appTag) + .addToLabels(CREATED_BY_LIVY_LABEL.asJava) + .endMetadata() + .withNewSpec() + .withIngressClassName(className) + .addNewRule() + .withHost(host) + .withNewHttp() + .addNewPath() + .withPath(s"/$appTag/?(.*)") + .withPathType("ImplementationSpecific") + .withNewBackend() + .withNewService() + .withName(getServiceName(app)) + .withNewPort() + .withName(SPARK_UI_PORT_NAME).endPort() + .endService() + .endBackend() + .endPath() + .endHttp() + .endRule() + if (protocol.endsWith("s") && tlsSecretName != null && tlsSecretName.nonEmpty) { + builder.addNewTl().withSecretName(tlsSecretName).addToHosts(host).endTl() + } + builder.endSpec().build() + } + + private def getServiceName(app: KubernetesApplication): String = + StringUtils.stripEnd( + StringUtils.left(s"${app.getApplicationPod.getMetadata.getName}-svc", 63), "-" + ).toLowerCase + + // Add a OwnerReference to the given resources making the driver pod an owner of them so when + // the driver pod is deleted, the resources are garbage collected. + private def addOwnerReference(owner: Pod, resources: HasMetadata*): Unit = { + val driverPodOwnerReference = new OwnerReferenceBuilder() + .withName(owner.getMetadata.getName) + .withApiVersion(owner.getApiVersion) + .withUid(owner.getMetadata.getUid) + .withKind(owner.getKind) + .withController(true) + .build() + resources.foreach { + resource => + val originalMetadata = resource.getMetadata + originalMetadata.setOwnerReferences(Collections.singletonList(driverPodOwnerReference)) + } + } + + } + +} + +private[utils] object KubernetesClientFactory { + import java.io.File + import com.google.common.base.Charsets + import com.google.common.io.Files + + private implicit class OptionString(val string: String) extends AnyVal { + def toOption: Option[String] = if (string == null || string.isEmpty) None else Option(string) + } + + def createKubernetesClient(livyConf: LivyConf): DefaultKubernetesClient = { + val masterUrl = sparkMasterToKubernetesApi(livyConf.sparkMaster()) + + val oauthTokenFile = livyConf.get(LivyConf.KUBERNETES_OAUTH_TOKEN_FILE).toOption + val oauthTokenValue = livyConf.get(LivyConf.KUBERNETES_OAUTH_TOKEN_VALUE).toOption + require(oauthTokenFile.isEmpty || oauthTokenValue.isEmpty, + s"Cannot specify OAuth token through both " + + s"a file $oauthTokenFile and a value $oauthTokenValue.") + + val caCertFile = livyConf.get(LivyConf.KUBERNETES_CA_CERT_FILE).toOption + val clientKeyFile = livyConf.get(LivyConf.KUBERNETES_CLIENT_KEY_FILE).toOption + val clientCertFile = livyConf.get(LivyConf.KUBERNETES_CLIENT_CERT_FILE).toOption + + val config = new ConfigBuilder() + .withApiVersion("v1") + .withMasterUrl(masterUrl) + .withOption(oauthTokenValue) { + (token, configBuilder) => configBuilder.withOauthToken(token) + } + .withOption(oauthTokenFile) { + (file, configBuilder) => + configBuilder + .withOauthToken(Files.toString(new File(file), Charsets.UTF_8)) + } + .withOption(caCertFile) { + (file, configBuilder) => configBuilder.withCaCertFile(file) + } + .withOption(clientKeyFile) { + (file, configBuilder) => configBuilder.withClientKeyFile(file) + } + .withOption(clientCertFile) { + (file, configBuilder) => configBuilder.withClientCertFile(file) + } + .build() + new DefaultKubernetesClient(config) + } + + def sparkMasterToKubernetesApi(sparkMaster: String): String = { + val replaced = sparkMaster.replaceFirst("k8s://", "") + if (!replaced.startsWith("http")) s"https://$replaced" + else replaced + } + + private implicit class OptionConfigurableConfigBuilder( + val configBuilder: ConfigBuilder) extends AnyVal { + def withOption[T] + (option: Option[T]) + (configurator: (T, ConfigBuilder) => ConfigBuilder): ConfigBuilder = { + option.map { + opt => configurator(opt, configBuilder) + }.getOrElse(configBuilder) + } + } + +} diff --git a/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala b/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala new file mode 100644 index 000000000..00257acd8 --- /dev/null +++ b/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.livy.utils + +import java.util.Objects._ + +import io.fabric8.kubernetes.api.model._ +import io.fabric8.kubernetes.api.model.networking.v1.{Ingress, IngressRule, IngressSpec} +import org.mockito.Mockito.when +import org.scalatest.FunSpec +import org.scalatestplus.mockito.MockitoSugar._ + +import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf} +import org.apache.livy.utils.KubernetesConstants.SPARK_APP_TAG_LABEL + +class SparkKubernetesAppSpec extends FunSpec with LivyBaseUnitTestSuite { + + describe("KubernetesAppReport") { + import scala.collection.JavaConverters._ + + it("should return application state") { + val status = when(mock[PodStatus].getPhase).thenReturn("Status").getMock[PodStatus] + val driver = when(mock[Pod].getStatus).thenReturn(status).getMock[Pod] + assertResult("status") { + KubernetesAppReport(Option(driver), Seq.empty, IndexedSeq.empty, None, new LivyConf(false)) + .getApplicationState + } + assertResult("unknown") { + KubernetesAppReport(None, Seq.empty, IndexedSeq.empty, None, new LivyConf(false)) + .getApplicationState + } + } + + def livyConf(lokiEnabled: Boolean): LivyConf = new LivyConf(false) + .set(LivyConf.KUBERNETES_GRAFANA_LOKI_ENABLED, lokiEnabled) + + def podMockWithLabels(labelMap: Map[String, String]): Pod = { + val metaWithLabel = when(mock[ObjectMeta].getLabels).thenReturn(labelMap.asJava) + .getMock[ObjectMeta] + when(mock[Pod].getMetadata).thenReturn(metaWithLabel).getMock[Pod] + } + + def driverMock(labelExists: Boolean): Option[Pod] = { + val labels = if (labelExists) Map(KubernetesConstants.SPARK_APP_TAG_LABEL -> "app_tag") + else Map.empty[String, String] + Some(podMockWithLabels(labels)) + } + + it("should return driver log url") { + + def test(labelExists: Boolean, lokiEnabled: Boolean, shouldBeDefined: Boolean): Unit = + assertResult(shouldBeDefined) { + KubernetesAppReport( + driverMock(labelExists), Seq.empty, IndexedSeq.empty, None, livyConf(lokiEnabled) + ).getDriverLogUrl.isDefined + } + + test(labelExists = false, lokiEnabled = false, shouldBeDefined = false) + test(labelExists = false, lokiEnabled = true, shouldBeDefined = false) + test(labelExists = true, lokiEnabled = false, shouldBeDefined = false) + test(labelExists = true, lokiEnabled = true, shouldBeDefined = true) + assert(KubernetesAppReport(None, Seq.empty, IndexedSeq.empty, None, livyConf(true)) + .getDriverLogUrl.isEmpty) + } + + it("should return executors log urls") { + def executorMock(labelsExist: Boolean): Option[Pod] = { + val labels = if (labelsExist) { + Map(KubernetesConstants.SPARK_APP_TAG_LABEL -> "app_tag", + KubernetesConstants.SPARK_EXEC_ID_LABEL -> "exec-1") + } else { + Map.empty[String, String] + } + Some(podMockWithLabels(labels)) + } + + def test(labelExists: Boolean, lokiEnabled: Boolean, shouldBeDefined: Boolean): Unit = + assertResult(shouldBeDefined) { + KubernetesAppReport( + None, Seq(executorMock(labelExists).get), IndexedSeq.empty, None, livyConf(lokiEnabled) + ).getExecutorsLogUrls.isDefined + } + + test(labelExists = false, lokiEnabled = false, shouldBeDefined = false) + test(labelExists = false, lokiEnabled = true, shouldBeDefined = false) + test(labelExists = true, lokiEnabled = false, shouldBeDefined = false) + test(labelExists = true, lokiEnabled = true, shouldBeDefined = true) + assert(KubernetesAppReport(None, Seq.empty, IndexedSeq.empty, None, livyConf(true)) + .getExecutorsLogUrls.isEmpty) + } + + it("should return driver ingress url") { + + def livyConf(protocol: Option[String]): LivyConf = { + val conf = new LivyConf() + protocol.map(conf.set(LivyConf.KUBERNETES_INGRESS_PROTOCOL, _)).getOrElse(conf) + } + + def ingressMock(host: Option[String]): Ingress = { + val ingressRules = host.map(h => + List(when(mock[IngressRule].getHost).thenReturn(h).getMock[IngressRule])) + .getOrElse(List.empty).asJava + val ingressSpec = when(mock[IngressSpec].getRules) + .thenReturn(ingressRules).getMock[IngressSpec] + when(mock[Ingress].getSpec).thenReturn(ingressSpec).getMock[Ingress] + } + + def test(driver: Option[Pod], ingress: Option[Ingress], + protocol: Option[String], shouldBeDefined: Boolean): Unit = { + assertResult(shouldBeDefined) { + KubernetesAppReport(driver, Seq.empty, IndexedSeq.empty, ingress, livyConf(protocol)) + .getTrackingUrl.isDefined + } + } + + val hostname = Some("hostname") + val protocol = Some("protocol") + + test(None, None, None, shouldBeDefined = false) + test(None, None, protocol, shouldBeDefined = false) + test(None, Some(ingressMock(None)), None, shouldBeDefined = false) + test(None, Some(ingressMock(None)), protocol, shouldBeDefined = false) + test(None, Some(ingressMock(hostname)), None, shouldBeDefined = false) + test(None, Some(ingressMock(hostname)), protocol, shouldBeDefined = false) + + test(driverMock(true), None, None, shouldBeDefined = false) + test(driverMock(true), None, protocol, shouldBeDefined = false) + test(driverMock(true), Some(ingressMock(None)), None, shouldBeDefined = false) + test(driverMock(true), Some(ingressMock(None)), protocol, shouldBeDefined = false) + test(driverMock(true), Some(ingressMock(hostname)), None, shouldBeDefined = true) + test(driverMock(true), Some(ingressMock(hostname)), protocol, shouldBeDefined = true) + + test(driverMock(false), None, None, shouldBeDefined = false) + test(driverMock(false), None, protocol, shouldBeDefined = false) + test(driverMock(false), Some(ingressMock(None)), None, shouldBeDefined = false) + test(driverMock(false), Some(ingressMock(None)), protocol, shouldBeDefined = false) + test(driverMock(false), Some(ingressMock(hostname)), None, shouldBeDefined = true) + test(driverMock(false), Some(ingressMock(hostname)), protocol, shouldBeDefined = true) + + assertResult(s"${protocol.get}://${hostname.get}/app_tag") { + KubernetesAppReport(driverMock(true), Seq.empty, IndexedSeq.empty, + Some(ingressMock(hostname)), livyConf(protocol)).getTrackingUrl.get + } + assertResult(s"${protocol.get}://${hostname.get}/unknown") { + KubernetesAppReport(driverMock(false), Seq.empty, IndexedSeq.empty, + Some(ingressMock(hostname)), livyConf(protocol)).getTrackingUrl.get + } + } + + } + + describe("KubernetesClientFactory") { + it("should build KubernetesApi url from LivyConf masterUrl") { + def actual(sparkMaster: String): String = + KubernetesClientFactory.sparkMasterToKubernetesApi(sparkMaster) + + val masterUrl = "kubernetes.default.svc:443" + + assertResult(s"https://local")(actual(s"https://local")) + assertResult(s"https://$masterUrl")(actual(s"k8s://$masterUrl")) + assertResult(s"http://$masterUrl")(actual(s"k8s://http://$masterUrl")) + assertResult(s"https://$masterUrl")(actual(s"k8s://https://$masterUrl")) + assertResult(s"http://$masterUrl")(actual(s"http://$masterUrl")) + assertResult(s"https://$masterUrl")(actual(s"https://$masterUrl")) + } + + it("should create KubernetesClient with default configs") { + assert(nonNull(KubernetesClientFactory.createKubernetesClient(new LivyConf(false)))) + } + + it("should throw IllegalArgumentException in both oauth file and token provided") { + val conf = new LivyConf(false) + .set(LivyConf.KUBERNETES_OAUTH_TOKEN_FILE, "dummy_path") + .set(LivyConf.KUBERNETES_OAUTH_TOKEN_VALUE, "dummy_value") + intercept[IllegalArgumentException] { + KubernetesClientFactory.createKubernetesClient(conf) + } + } + } + + describe("KubernetesClientExtensions") { + it("should build an ingress from the supplied KubernetesApplication") { + def test(app: KubernetesApplication, expectedAnnotations: Map[String, String]): Unit = { + import scala.collection.JavaConverters._ + val livyConf = new LivyConf(false) + val client = KubernetesClientFactory.createKubernetesClient(livyConf) + val clientExtensions = KubernetesExtensions.KubernetesClientExtensions(client) + val ingress = clientExtensions.buildSparkUIIngress(app, "ingress-class", "https", + "cluster.example.com", "tlsSecret", "") + val diff = expectedAnnotations.toSet diff ingress.getMetadata.getAnnotations.asScala.toSet + assert(ingress.getMetadata.getName == s"${app.getApplicationPod.getMetadata.getName}-svc") + assert(diff.isEmpty) + } + + def mockPod(name: String, namespace: String, tag: String): Pod = { + new PodBuilder().withNewMetadata().withName(name).withNamespace(namespace). + addToLabels(SPARK_APP_TAG_LABEL, tag).endMetadata().withNewSpec().endSpec().build() + } + + def app(name: String, namespace: String, tag: String): KubernetesApplication = { + new KubernetesApplication(mockPod(name, namespace, tag)) + } + + test(app("app1", "ns-1", "tag-1"), Map( + "nginx.ingress.kubernetes.io/rewrite-target" -> "/$1", + "nginx.ingress.kubernetes.io/proxy-redirect-to" -> s"/tag-1/", + "nginx.ingress.kubernetes.io/x-forwarded-prefix" -> s"/tag-1", + "nginx.ingress.kubernetes.io/proxy-redirect-from" -> + s"http://app1-svc.ns-1.svc.cluster.local/", + "nginx.ingress.kubernetes.io/upstream-vhost" -> + s"app1-svc.ns-1.svc.cluster.local", + "nginx.ingress.kubernetes.io/service-upstream" -> "true" + )) + + test(app("app2", "ns-2", "tag-2"), Map( + "nginx.ingress.kubernetes.io/rewrite-target" -> "/$1", + "nginx.ingress.kubernetes.io/proxy-redirect-to" -> s"/tag-2/", + "nginx.ingress.kubernetes.io/x-forwarded-prefix" -> s"/tag-2", + "nginx.ingress.kubernetes.io/proxy-redirect-from" -> + s"http://app2-svc.ns-2.svc.cluster.local/", + "nginx.ingress.kubernetes.io/upstream-vhost" -> + s"app2-svc.ns-2.svc.cluster.local", + "nginx.ingress.kubernetes.io/service-upstream" -> "true" + )) + } + } + +}