Skip to content

Commit

Permalink
LIVY-702: Submit Spark apps to Kubernetes
Browse files Browse the repository at this point in the history
Co-authored-by: Alex Sasnouskikh <jahstreetlove@gmail.com>
  • Loading branch information
Asif Khatri and jahstreet committed Jul 9, 2024
1 parent 03ceb4a commit 8efecc7
Show file tree
Hide file tree
Showing 17 changed files with 1,208 additions and 19 deletions.
1 change: 1 addition & 0 deletions .github/workflows/integration-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ on:
pull_request:
types: [opened, reopened, synchronize]
env:
ACTIONS_ALLOW_USE_UNSECURE_NODE_VERSION: true
MAVEN_OPTS: -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.http.retryHandler.class=standard -Dmaven.wagon.http.retryHandler.count=3
jobs:
build:
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/unit-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
name: Unit Tests
on: [push]
env:
ACTIONS_ALLOW_USE_UNSECURE_NODE_VERSION: true
MAVEN_OPTS: -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.http.retryHandler.class=standard -Dmaven.wagon.http.retryHandler.count=3
jobs:
build:
Expand Down
55 changes: 55 additions & 0 deletions conf/livy.conf.template
Original file line number Diff line number Diff line change
Expand Up @@ -200,3 +200,58 @@
# livy.server.hdfs.safe-mode.interval = 5
# value specifies max attempts to retry when safe mode is ON in hdfs filesystem
# livy.server.hdfs.safe-mode.max.retry.attempts = 10

# Manual authentication to KubeApiserver (by default configured with Kubernetes ServiceAccount
# if deployed to Kubernetes cluster as a Pod)
# Kubernetes oauth token file path
# livy.server.kubernetes.oauthTokenFile =
# Kubernetes oauth token string value
# livy.server.kubernetes.oauthTokenValue =
# Kubernetes CA cert file path
# livy.server.kubernetes.caCertFile =
# Kubernetes client key file path
# livy.server.kubernetes.clientKeyFile =
# Kubernetes client cert file path
# livy.server.kubernetes.clientCertFile =

# If Livy can't find the Kubernetes app within this time, consider it lost.
# livy.server.kubernetes.app-lookup-timeout = 600s
# When the cluster is busy, we may fail to launch yarn app in app-lookup-timeout, then it would
# cause session leakage, so we need to check session leakage.
# How long to check livy session leakage
# livy.server.kubernetes.app-leakage.check-timeout = 600s
# How often to check livy session leakage
# livy.server.kubernetes.app-leakage.check-interval = 60s

# How often Livy polls KubeApiServer to refresh KubernetesApp state (Pods state, logs, description
# details, routes, etc...)
# livy.server.kubernetes.poll-interval = 15s

# Weather to create Kubernetes Nginx Ingress for Spark UI. If set to true, configure the desired
# options below
# livy.server.kubernetes.ingress.create = false
# Kubernetes Nginx Ingress protocol. If set to https refer Ingress TLS section below
# livy.server.kubernetes.ingress.protocol = http
# Kubernetes Nginx Ingress host. Be sure to set it to the FQDN of your Nginx Ingress Controller
# proxy server
# livy.server.kubernetes.ingress.host = localhost
# Kubernetes secret name for Nginx Ingress TLS. Is omitted if 'livy.server.kubernetes.ingress.protocol'
# is not https
# livy.server.kubernetes.ingress.tls.secretName = spark-cluster-tls
# Kubernetes Nginx Ingress additional configuration snippet for specific config options
# livy.server.kubernetes.ingress.additionalConfSnippet =
# Kubernetes Nginx Ingress additional annotations for specific config options, eg. for configuring
# basic auth of external oauth2 proxy. Format: annotation1=value1;annotation2=value2;...
# livy.server.kubernetes.ingress.additionalAnnotations =

# Set to true to enable Grafana Loki integration and configure options below
livy.server.kubernetes.grafana.loki.enabled = false
# Grafana UI root endpoint to build links based on
# livy.server.kubernetes.grafana.url = http://localhost:3000
# Grafana Datasource name for Loki
# livy.server.kubernetes.grafana.loki.datasource = loki
# Time range from now to past to get logs for
# livy.server.kubernetes.grafana.timeRange = 6h

# Used to build links to Spark History Server pages on Spark App completion (Kubernetes only)
# livy.ui.history-server-url = http://spark-history-server
13 changes: 13 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
<spark.scala-2.11.version>2.4.5</spark.scala-2.11.version>
<spark.scala-2.12.version>2.4.5</spark.scala-2.12.version>
<spark.version>${spark.scala-2.11.version}</spark.version>
<kubernetes.client.version>5.6.0</kubernetes.client.version>
<hive.version>3.0.0</hive.version>
<commons-codec.version>1.9</commons-codec.version>
<httpclient.version>4.5.13</httpclient.version>
Expand Down Expand Up @@ -318,6 +319,18 @@
<version>${metrics.version}</version>
</dependency>

<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-client</artifactId>
<version>${kubernetes.client.version}</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
Expand Down
7 changes: 7 additions & 0 deletions rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,13 @@ private void initializeServer() throws Exception {
// on the cluster, it would be tricky to solve that problem in a generic way.
livyConf.set(RPC_SERVER_ADDRESS, null);

// If we are running on Kubernetes, get RPC_SERVER_ADDRESS from "spark.driver.host" option
// this option is set in class org.apache.spark.deploy.k8s.features.DriverServiceFeatureStep:
// line 61: val driverHostname = s"$resolvedServiceName.${kubernetesConf.namespace()}.svc"
if (conf.get("spark.master").startsWith("k8s")) {
livyConf.set(RPC_SERVER_ADDRESS, conf.get("spark.driver.host"));
}

if (livyConf.getBoolean(TEST_STUCK_START_DRIVER)) {
// Test flag is turned on so we will just infinite loop here. It should cause
// timeout and we should still see yarn application being cleaned up.
Expand Down
5 changes: 5 additions & 0 deletions server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@
<artifactId>metrics-healthchecks</artifactId>
</dependency>

<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-client</artifactId>
</dependency>

<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ td .progress {
margin: 0;
}

.with-scroll-bar {
display: block;
overflow-y: scroll;
max-height: 200px;
}

#session-summary {
margin: 20px 0;
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ function loadSessionsTable(sessions) {
tdWrap(session.proxyUser) +
tdWrap(session.kind) +
tdWrap(session.state) +
tdWrap(logLinks(session, "session")) +
tdWrapWithClass(logLinks(session, "session"), "with-scroll-bar") +
"</tr>"
);
});
Expand All @@ -46,7 +46,7 @@ function loadBatchesTable(sessions) {
tdWrap(session.owner) +
tdWrap(session.proxyUser) +
tdWrap(session.state) +
tdWrap(logLinks(session, "batch")) +
tdWrapWithClass(logLinks(session, "batch"), "with-scroll-bar") +
"</tr>"
);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,23 @@ function driverLogLink(session) {
}
}

function executorsLogLinks(session) {
var executorLogUrls = session.appInfo.executorLogUrls;
if (executorLogUrls != null) {
return executorLogUrls.split(";").map(function (pair) {
var nameAndLink = pair.split("#");
return divWrap(anchorLink(nameAndLink[1], nameAndLink[0]));
}).join("");
} else {
return "";
}
}

function logLinks(session, kind) {
var sessionLog = divWrap(uiLink(kind + "/" + session.id + "/log", "session"));
var driverLog = divWrap(driverLogLink(session));
return sessionLog + driverLog;
var executorsLogs = executorsLogLinks(session);
return sessionLog + driverLog + executorsLogs;
}

function appIdLink(session) {
Expand All @@ -75,6 +88,18 @@ function tdWrap(val) {
return "<td>" + inner + "</td>";
}

function tdWrapWithClass(val, cl) {
var inner = "";
if (val != null) {
inner = val;
}
var clVal = "";
if (cl != null) {
clVal = " class=\"" + cl + "\"";
}
return "<td" + clVal + ">" + inner + "</td>";
}

function preWrap(inner) {
return "<pre>" + escapeHtml(inner) + "</pre>";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,18 @@ function sumWrap(name, val) {
}
}

function sumWrapWithClass(name, val, cl) {
var clVal = "";
if (cl != null) {
clVal = " class=\"" + cl + "\"";
}
if (val != null) {
return "<li" + clVal + "><strong>" + name + ": </strong>" + val + "</li>";
} else {
return "";
}
}

function formatError(output) {
var errStr = output.evalue + "\n";
var trace = output.traceback;
Expand Down Expand Up @@ -93,7 +105,7 @@ function appendSummary(session) {
sumWrap("Proxy User", session.proxyUser) +
sumWrap("Session Kind", session.kind) +
sumWrap("State", session.state) +
sumWrap("Logs", logLinks(session, "session")) +
sumWrapWithClass("Logs", logLinks(session, "session"), "with-scroll-bar") +
"</ul>"
);
}
Expand Down
60 changes: 60 additions & 0 deletions server/src/main/scala/org/apache/livy/LivyConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,63 @@ object LivyConf {
// value specifies max attempts to retry when safe mode is ON in hdfs filesystem
val HDFS_SAFE_MODE_MAX_RETRY_ATTEMPTS = Entry("livy.server.hdfs.safe-mode.max.retry.attempts", 12)

// Kubernetes oauth token file path.
val KUBERNETES_OAUTH_TOKEN_FILE = Entry("livy.server.kubernetes.oauthTokenFile", "")
// Kubernetes oauth token string value.
val KUBERNETES_OAUTH_TOKEN_VALUE = Entry("livy.server.kubernetes.oauthTokenValue", "")
// Kubernetes CA cert file path.
val KUBERNETES_CA_CERT_FILE = Entry("livy.server.kubernetes.caCertFile", "")
// Kubernetes client key file path.
val KUBERNETES_CLIENT_KEY_FILE = Entry("livy.server.kubernetes.clientKeyFile", "")
// Kubernetes client cert file path.
val KUBERNETES_CLIENT_CERT_FILE = Entry("livy.server.kubernetes.clientCertFile", "")

// If Livy can't find the Kubernetes app within this time, consider it lost.
val KUBERNETES_APP_LOOKUP_TIMEOUT = Entry("livy.server.kubernetes.app-lookup-timeout", "600s")
// How often Livy polls Kubernetes to refresh Kubernetes app state.
val KUBERNETES_POLL_INTERVAL = Entry("livy.server.kubernetes.poll-interval", "15s")

// How long to check livy session leakage.
val KUBERNETES_APP_LEAKAGE_CHECK_TIMEOUT =
Entry("livy.server.kubernetes.app-leakage.check-timeout", "600s")
// How often to check livy session leakage.
val KUBERNETES_APP_LEAKAGE_CHECK_INTERVAL =
Entry("livy.server.kubernetes.app-leakage.check-interval", "60s")

// Weather to create Kubernetes Nginx Ingress for Spark UI.
val KUBERNETES_INGRESS_CREATE = Entry("livy.server.kubernetes.ingress.create", false)
// Kubernetes Ingress class name.
val KUBERNETES_INGRESS_CLASS_NAME = Entry("livy.server.kubernetes.ingress.className", "")
// Kubernetes Nginx Ingress protocol.
val KUBERNETES_INGRESS_PROTOCOL = Entry("livy.server.kubernetes.ingress.protocol", "http")
// Kubernetes Nginx Ingress host.
val KUBERNETES_INGRESS_HOST = Entry("livy.server.kubernetes.ingress.host", "localhost")
// Kubernetes Nginx Ingress additional configuration snippet.
val KUBERNETES_INGRESS_ADDITIONAL_CONF_SNIPPET =
Entry("livy.server.kubernetes.ingress.additionalConfSnippet", "")
// Kubernetes Nginx Ingress additional annotations: key1=value1;key2=value2;... .
val KUBERNETES_INGRESS_ADDITIONAL_ANNOTATIONS =
Entry("livy.server.kubernetes.ingress.additionalAnnotations", "")
// Kubernetes secret name for Nginx Ingress TLS.
// Is omitted if 'livy.server.kubernetes.ingress.protocol' value doesn't end with 's'
val KUBERNETES_INGRESS_TLS_SECRET_NAME =
Entry("livy.server.kubernetes.ingress.tls.secretName", "spark-cluster-tls")

val KUBERNETES_GRAFANA_LOKI_ENABLED = Entry("livy.server.kubernetes.grafana.loki.enabled", false)
val KUBERNETES_GRAFANA_URL = Entry("livy.server.kubernetes.grafana.url", "http://localhost:3000")
val KUBERNETES_GRAFANA_LOKI_DATASOURCE =
Entry("livy.server.kubernetes.grafana.loki.datasource", "loki")
val KUBERNETES_GRAFANA_TIME_RANGE = Entry("livy.server.kubernetes.grafana.timeRange", "6h")

// side car container for spark pods enabled?
val KUBERNETES_SPARK_SIDECAR_ENABLED =
Entry("livy.server.kubernetes.spark.sidecar.enabled", true)
// container name to identify spark pod if running with sidecar containers
val KUBERNETES_SPARK_CONTAINER_NAME =
Entry("livy.server.kubernetes.spark.container.name", "spark-container")

val UI_HISTORY_SERVER_URL = Entry("livy.ui.history-server-url", "http://spark-history-server")

// Whether session timeout should be checked, by default it will be checked, which means inactive
// session will be stopped after "livy.server.session.timeout"
val SESSION_TIMEOUT_CHECK = Entry("livy.server.session.timeout-check", true)
Expand Down Expand Up @@ -371,6 +428,9 @@ class LivyConf(loadDefaults: Boolean) extends ClientConf[LivyConf](null) {
/** Return true if spark master starts with yarn. */
def isRunningOnYarn(): Boolean = sparkMaster().startsWith("yarn")

/** Return true if spark master starts with k8s. */
def isRunningOnKubernetes(): Boolean = sparkMaster().startsWith("k8s")

/** Return the spark deploy mode Livy sessions should use. */
def sparkDeployMode(): Option[String] = Option(get(LIVY_SPARK_DEPLOY_MODE)).filterNot(_.isEmpty)

Expand Down
12 changes: 7 additions & 5 deletions server/src/main/scala/org/apache/livy/server/LivyServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ import org.apache.livy.server.recovery.{SessionStore, StateStore, ZooKeeperManag
import org.apache.livy.server.ui.UIServlet
import org.apache.livy.sessions.{BatchSessionManager, InteractiveSessionManager}
import org.apache.livy.sessions.SessionManager.SESSION_RECOVERY_MODE_OFF
import org.apache.livy.utils.{SparkKubernetesApp, SparkYarnApp}
import org.apache.livy.utils.LivySparkUtils._
import org.apache.livy.utils.SparkYarnApp

class LivyServer extends Logging {

Expand Down Expand Up @@ -142,10 +142,12 @@ class LivyServer extends Logging {

testRecovery(livyConf)

// Initialize YarnClient ASAP to save time.
// Initialize YarnClient/KubernetesClient ASAP to save time.
if (livyConf.isRunningOnYarn()) {
SparkYarnApp.init(livyConf)
Future { SparkYarnApp.yarnClient }
} else if (livyConf.isRunningOnKubernetes()) {
SparkKubernetesApp.init(livyConf)
}

if (livyConf.get(LivyConf.RECOVERY_STATE_STORE) == "zookeeper") {
Expand Down Expand Up @@ -415,10 +417,10 @@ class LivyServer extends Logging {
}

private[livy] def testRecovery(livyConf: LivyConf): Unit = {
if (!livyConf.isRunningOnYarn()) {
// If recovery is turned on but we are not running on YARN, quit.
if (!livyConf.isRunningOnYarn() && !livyConf.isRunningOnKubernetes()) {
// If recovery is turned on, and we are not running on YARN or Kubernetes, quit.
require(livyConf.get(LivyConf.RECOVERY_MODE) == SESSION_RECOVERY_MODE_OFF,
"Session recovery requires YARN.")
"Session recovery requires YARN or Kubernetes.")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -462,11 +462,11 @@ class InteractiveSession(
app = mockApp.orElse {
val driverProcess = client.flatMap { c => Option(c.getDriverProcess) }
.map(new LineBufferedProcess(_, livyConf.getInt(LivyConf.SPARK_LOGS_SIZE)))

if (livyConf.isRunningOnYarn() || driverProcess.isDefined) {
Some(SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this)))
if (!livyConf.isRunningOnKubernetes()) {
driverProcess.map(_ => SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this)))
} else {
None
// Create SparkApp for Kubernetes anyway
Some(SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this)))
}
}

Expand Down Expand Up @@ -547,6 +547,8 @@ class InteractiveSession(
transition(SessionState.ShuttingDown)
sessionStore.remove(RECOVERY_SESSION_TYPE, id)
client.foreach { _.stop(true) }
// We need to call #kill here explicitly to delete Interactive pods from the cluster
if (livyConf.isRunningOnKubernetes()) app.foreach(_.kill())
} catch {
case _: Exception =>
app.foreach {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,10 @@ class SessionManager[S <: Session, R <: RecoveryMetadata : ClassTag](

Future.sequence(all().filter(expired).map { s =>
s.state match {
case st: FinishedSessionState =>
case _: FinishedSessionState =>
info(s"Deleting $s because it finished before ${sessionStateRetainedInSec / 1e9} secs.")
case _ =>
info(s"Deleting $s because it was inactive or the time to leave the period is over.")
info(s"Deleting $s because it was inactive for more than ${sessionTimeout / 1e9} secs.")
}
delete(s)
})
Expand Down
Loading

0 comments on commit 8efecc7

Please sign in to comment.