Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[LIVY-702]: Submit Spark apps to Kubernetes #451

Merged
merged 1 commit into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/integration-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ on:
pull_request:
types: [opened, reopened, synchronize]
env:
ACTIONS_ALLOW_USE_UNSECURE_NODE_VERSION: true
MAVEN_OPTS: -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.http.retryHandler.class=standard -Dmaven.wagon.http.retryHandler.count=3
jobs:
build:
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/unit-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
name: Unit Tests
on: [push]
env:
ACTIONS_ALLOW_USE_UNSECURE_NODE_VERSION: true
MAVEN_OPTS: -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.http.retryHandler.class=standard -Dmaven.wagon.http.retryHandler.count=3
jobs:
build:
Expand Down
55 changes: 55 additions & 0 deletions conf/livy.conf.template
Original file line number Diff line number Diff line change
Expand Up @@ -200,3 +200,58 @@
# livy.server.hdfs.safe-mode.interval = 5
# value specifies max attempts to retry when safe mode is ON in hdfs filesystem
# livy.server.hdfs.safe-mode.max.retry.attempts = 10

# Manual authentication to KubeApiserver (by default configured with Kubernetes ServiceAccount
# if deployed to Kubernetes cluster as a Pod)
# Kubernetes oauth token file path
# livy.server.kubernetes.oauthTokenFile =
# Kubernetes oauth token string value
# livy.server.kubernetes.oauthTokenValue =
# Kubernetes CA cert file path
# livy.server.kubernetes.caCertFile =
# Kubernetes client key file path
# livy.server.kubernetes.clientKeyFile =
# Kubernetes client cert file path
# livy.server.kubernetes.clientCertFile =

# If Livy can't find the Kubernetes app within this time, consider it lost.
# livy.server.kubernetes.app-lookup-timeout = 600s
# When the cluster is busy, we may fail to launch yarn app in app-lookup-timeout, then it would
# cause session leakage, so we need to check session leakage.
# How long to check livy session leakage
# livy.server.kubernetes.app-leakage.check-timeout = 600s
# How often to check livy session leakage
# livy.server.kubernetes.app-leakage.check-interval = 60s

# How often Livy polls KubeApiServer to refresh KubernetesApp state (Pods state, logs, description
# details, routes, etc...)
# livy.server.kubernetes.poll-interval = 15s

# Weather to create Kubernetes Nginx Ingress for Spark UI. If set to true, configure the desired
# options below
# livy.server.kubernetes.ingress.create = false
# Kubernetes Nginx Ingress protocol. If set to https refer Ingress TLS section below
# livy.server.kubernetes.ingress.protocol = http
# Kubernetes Nginx Ingress host. Be sure to set it to the FQDN of your Nginx Ingress Controller
# proxy server
# livy.server.kubernetes.ingress.host = localhost
# Kubernetes secret name for Nginx Ingress TLS. Is omitted if 'livy.server.kubernetes.ingress.protocol'
# is not https
# livy.server.kubernetes.ingress.tls.secretName = spark-cluster-tls
# Kubernetes Nginx Ingress additional configuration snippet for specific config options
# livy.server.kubernetes.ingress.additionalConfSnippet =
# Kubernetes Nginx Ingress additional annotations for specific config options, eg. for configuring
# basic auth of external oauth2 proxy. Format: annotation1=value1;annotation2=value2;...
# livy.server.kubernetes.ingress.additionalAnnotations =

# Set to true to enable Grafana Loki integration and configure options below
livy.server.kubernetes.grafana.loki.enabled = false
# Grafana UI root endpoint to build links based on
# livy.server.kubernetes.grafana.url = http://localhost:3000
# Grafana Datasource name for Loki
# livy.server.kubernetes.grafana.loki.datasource = loki
# Time range from now to past to get logs for
# livy.server.kubernetes.grafana.timeRange = 6h

# Used to build links to Spark History Server pages on Spark App completion (Kubernetes only)
# livy.ui.history-server-url = http://spark-history-server
13 changes: 13 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
<spark.scala-2.11.version>2.4.5</spark.scala-2.11.version>
<spark.scala-2.12.version>2.4.5</spark.scala-2.12.version>
<spark.version>${spark.scala-2.11.version}</spark.version>
<kubernetes.client.version>5.6.0</kubernetes.client.version>
<hive.version>3.0.0</hive.version>
<commons-codec.version>1.9</commons-codec.version>
<httpclient.version>4.5.13</httpclient.version>
Expand Down Expand Up @@ -318,6 +319,18 @@
<version>${metrics.version}</version>
</dependency>

<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-client</artifactId>
<version>${kubernetes.client.version}</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
Expand Down
7 changes: 7 additions & 0 deletions rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,13 @@ private void initializeServer() throws Exception {
// on the cluster, it would be tricky to solve that problem in a generic way.
livyConf.set(RPC_SERVER_ADDRESS, null);

// If we are running on Kubernetes, get RPC_SERVER_ADDRESS from "spark.driver.host" option
// this option is set in class org.apache.spark.deploy.k8s.features.DriverServiceFeatureStep:
// line 61: val driverHostname = s"$resolvedServiceName.${kubernetesConf.namespace()}.svc"
if (conf.get("spark.master").startsWith("k8s")) {
livyConf.set(RPC_SERVER_ADDRESS, conf.get("spark.driver.host"));
}

if (livyConf.getBoolean(TEST_STUCK_START_DRIVER)) {
// Test flag is turned on so we will just infinite loop here. It should cause
// timeout and we should still see yarn application being cleaned up.
Expand Down
5 changes: 5 additions & 0 deletions server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@
<artifactId>metrics-healthchecks</artifactId>
</dependency>

<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-client</artifactId>
</dependency>

<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ td .progress {
margin: 0;
}

.with-scroll-bar {
display: block;
overflow-y: scroll;
max-height: 200px;
}

#session-summary {
margin: 20px 0;
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ function loadSessionsTable(sessions) {
tdWrap(session.proxyUser) +
tdWrap(session.kind) +
tdWrap(session.state) +
tdWrap(logLinks(session, "session")) +
tdWrapWithClass(logLinks(session, "session"), "with-scroll-bar") +
"</tr>"
);
});
Expand All @@ -46,7 +46,7 @@ function loadBatchesTable(sessions) {
tdWrap(session.owner) +
tdWrap(session.proxyUser) +
tdWrap(session.state) +
tdWrap(logLinks(session, "batch")) +
tdWrapWithClass(logLinks(session, "batch"), "with-scroll-bar") +
"</tr>"
);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,23 @@ function driverLogLink(session) {
}
}

function executorsLogLinks(session) {
var executorLogUrls = session.appInfo.executorLogUrls;
if (executorLogUrls != null) {
return executorLogUrls.split(";").map(function (pair) {
var nameAndLink = pair.split("#");
return divWrap(anchorLink(nameAndLink[1], nameAndLink[0]));
}).join("");
} else {
return "";
}
}

function logLinks(session, kind) {
var sessionLog = divWrap(uiLink(kind + "/" + session.id + "/log", "session"));
var driverLog = divWrap(driverLogLink(session));
return sessionLog + driverLog;
var executorsLogs = executorsLogLinks(session);
return sessionLog + driverLog + executorsLogs;
}

function appIdLink(session) {
Expand All @@ -75,6 +88,18 @@ function tdWrap(val) {
return "<td>" + inner + "</td>";
}

function tdWrapWithClass(val, cl) {
var inner = "";
if (val != null) {
inner = val;
}
var clVal = "";
if (cl != null) {
clVal = " class=\"" + cl + "\"";
}
return "<td" + clVal + ">" + inner + "</td>";
}

function preWrap(inner) {
return "<pre>" + escapeHtml(inner) + "</pre>";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,18 @@ function sumWrap(name, val) {
}
}

function sumWrapWithClass(name, val, cl) {
var clVal = "";
if (cl != null) {
clVal = " class=\"" + cl + "\"";
}
if (val != null) {
return "<li" + clVal + "><strong>" + name + ": </strong>" + val + "</li>";
} else {
return "";
}
}

function formatError(output) {
var errStr = output.evalue + "\n";
var trace = output.traceback;
Expand Down Expand Up @@ -93,7 +105,7 @@ function appendSummary(session) {
sumWrap("Proxy User", session.proxyUser) +
sumWrap("Session Kind", session.kind) +
sumWrap("State", session.state) +
sumWrap("Logs", logLinks(session, "session")) +
sumWrapWithClass("Logs", logLinks(session, "session"), "with-scroll-bar") +
"</ul>"
);
}
Expand Down
60 changes: 60 additions & 0 deletions server/src/main/scala/org/apache/livy/LivyConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,63 @@ object LivyConf {
// value specifies max attempts to retry when safe mode is ON in hdfs filesystem
val HDFS_SAFE_MODE_MAX_RETRY_ATTEMPTS = Entry("livy.server.hdfs.safe-mode.max.retry.attempts", 12)

// Kubernetes oauth token file path.
val KUBERNETES_OAUTH_TOKEN_FILE = Entry("livy.server.kubernetes.oauthTokenFile", "")
// Kubernetes oauth token string value.
val KUBERNETES_OAUTH_TOKEN_VALUE = Entry("livy.server.kubernetes.oauthTokenValue", "")
// Kubernetes CA cert file path.
val KUBERNETES_CA_CERT_FILE = Entry("livy.server.kubernetes.caCertFile", "")
// Kubernetes client key file path.
val KUBERNETES_CLIENT_KEY_FILE = Entry("livy.server.kubernetes.clientKeyFile", "")
// Kubernetes client cert file path.
val KUBERNETES_CLIENT_CERT_FILE = Entry("livy.server.kubernetes.clientCertFile", "")

// If Livy can't find the Kubernetes app within this time, consider it lost.
val KUBERNETES_APP_LOOKUP_TIMEOUT = Entry("livy.server.kubernetes.app-lookup-timeout", "600s")
// How often Livy polls Kubernetes to refresh Kubernetes app state.
val KUBERNETES_POLL_INTERVAL = Entry("livy.server.kubernetes.poll-interval", "15s")

// How long to check livy session leakage.
val KUBERNETES_APP_LEAKAGE_CHECK_TIMEOUT =
Entry("livy.server.kubernetes.app-leakage.check-timeout", "600s")
// How often to check livy session leakage.
val KUBERNETES_APP_LEAKAGE_CHECK_INTERVAL =
Entry("livy.server.kubernetes.app-leakage.check-interval", "60s")

// Weather to create Kubernetes Nginx Ingress for Spark UI.
val KUBERNETES_INGRESS_CREATE = Entry("livy.server.kubernetes.ingress.create", false)
// Kubernetes Ingress class name.
val KUBERNETES_INGRESS_CLASS_NAME = Entry("livy.server.kubernetes.ingress.className", "")
// Kubernetes Nginx Ingress protocol.
val KUBERNETES_INGRESS_PROTOCOL = Entry("livy.server.kubernetes.ingress.protocol", "http")
// Kubernetes Nginx Ingress host.
val KUBERNETES_INGRESS_HOST = Entry("livy.server.kubernetes.ingress.host", "localhost")
// Kubernetes Nginx Ingress additional configuration snippet.
val KUBERNETES_INGRESS_ADDITIONAL_CONF_SNIPPET =
Entry("livy.server.kubernetes.ingress.additionalConfSnippet", "")
// Kubernetes Nginx Ingress additional annotations: key1=value1;key2=value2;... .
val KUBERNETES_INGRESS_ADDITIONAL_ANNOTATIONS =
Entry("livy.server.kubernetes.ingress.additionalAnnotations", "")
// Kubernetes secret name for Nginx Ingress TLS.
// Is omitted if 'livy.server.kubernetes.ingress.protocol' value doesn't end with 's'
val KUBERNETES_INGRESS_TLS_SECRET_NAME =
Entry("livy.server.kubernetes.ingress.tls.secretName", "spark-cluster-tls")

val KUBERNETES_GRAFANA_LOKI_ENABLED = Entry("livy.server.kubernetes.grafana.loki.enabled", false)
val KUBERNETES_GRAFANA_URL = Entry("livy.server.kubernetes.grafana.url", "http://localhost:3000")
val KUBERNETES_GRAFANA_LOKI_DATASOURCE =
Entry("livy.server.kubernetes.grafana.loki.datasource", "loki")
val KUBERNETES_GRAFANA_TIME_RANGE = Entry("livy.server.kubernetes.grafana.timeRange", "6h")
Comment on lines +303 to +307
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great to add some docs about expectations to Grafana and Loki installation to make use of this configs.
Same about installing Livy on K8s, we should give some guide to the community on how to set this up at least locally to play with. That will help to raise the adoption.
I can help you with connecting the dots, ping me if you wanna discuss.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh, I see it in https://github.com/askhatri/livycluster . Great at least to leave somewhere a link to it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I will try to add this as part of LIVY-979. I will connect with you via email.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@askhatri better to discuss it on LIVY-979 itself so that whole community is across it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


// side car container for spark pods enabled?
val KUBERNETES_SPARK_SIDECAR_ENABLED =
Entry("livy.server.kubernetes.spark.sidecar.enabled", true)
Comment on lines +309 to +311
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the use case for this flag?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A sidecar container in Kubernetes is a secondary container that runs alongside the main application container in the same pod. We can set the sidecar configuration in Livy using this flag.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what could be the intention for running sidecar?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A sidecar container can be used to collect logs from the Spark application and forward them to a centralized logging system like Elasticsearch, Fluentd, and Kibana (EFK) stack.

Copy link
Contributor

@jahstreet jahstreet Jul 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood, thx for explaining. Does Livy add this sidecar or should Livy be handling the Spark Pods with a sidecar differently? What would happen if Spark Pods have sidecar but this flag is false and how it is different from setting it to true? Just trying to understand it better.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Livy does not add a sidecar container for Spark. Instead, Livy uses a flag to determine the status of the Spark pod. If the Spark pod is running with a sidecar, the flag is set to true; otherwise, it is set to false.

// container name to identify spark pod if running with sidecar containers
val KUBERNETES_SPARK_CONTAINER_NAME =
Entry("livy.server.kubernetes.spark.container.name", "spark-container")

val UI_HISTORY_SERVER_URL = Entry("livy.ui.history-server-url", "http://spark-history-server")

// Whether session timeout should be checked, by default it will be checked, which means inactive
// session will be stopped after "livy.server.session.timeout"
val SESSION_TIMEOUT_CHECK = Entry("livy.server.session.timeout-check", true)
Expand Down Expand Up @@ -371,6 +428,9 @@ class LivyConf(loadDefaults: Boolean) extends ClientConf[LivyConf](null) {
/** Return true if spark master starts with yarn. */
def isRunningOnYarn(): Boolean = sparkMaster().startsWith("yarn")

/** Return true if spark master starts with k8s. */
def isRunningOnKubernetes(): Boolean = sparkMaster().startsWith("k8s")

/** Return the spark deploy mode Livy sessions should use. */
def sparkDeployMode(): Option[String] = Option(get(LIVY_SPARK_DEPLOY_MODE)).filterNot(_.isEmpty)

Expand Down
12 changes: 7 additions & 5 deletions server/src/main/scala/org/apache/livy/server/LivyServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ import org.apache.livy.server.recovery.{SessionStore, StateStore, ZooKeeperManag
import org.apache.livy.server.ui.UIServlet
import org.apache.livy.sessions.{BatchSessionManager, InteractiveSessionManager}
import org.apache.livy.sessions.SessionManager.SESSION_RECOVERY_MODE_OFF
import org.apache.livy.utils.{SparkKubernetesApp, SparkYarnApp}
import org.apache.livy.utils.LivySparkUtils._
import org.apache.livy.utils.SparkYarnApp

class LivyServer extends Logging {

Expand Down Expand Up @@ -142,10 +142,12 @@ class LivyServer extends Logging {

testRecovery(livyConf)

// Initialize YarnClient ASAP to save time.
// Initialize YarnClient/KubernetesClient ASAP to save time.
if (livyConf.isRunningOnYarn()) {
SparkYarnApp.init(livyConf)
Future { SparkYarnApp.yarnClient }
} else if (livyConf.isRunningOnKubernetes()) {
SparkKubernetesApp.init(livyConf)
}

if (livyConf.get(LivyConf.RECOVERY_STATE_STORE) == "zookeeper") {
Expand Down Expand Up @@ -415,10 +417,10 @@ class LivyServer extends Logging {
}

private[livy] def testRecovery(livyConf: LivyConf): Unit = {
if (!livyConf.isRunningOnYarn()) {
// If recovery is turned on but we are not running on YARN, quit.
if (!livyConf.isRunningOnYarn() && !livyConf.isRunningOnKubernetes()) {
// If recovery is turned on, and we are not running on YARN or Kubernetes, quit.
require(livyConf.get(LivyConf.RECOVERY_MODE) == SESSION_RECOVERY_MODE_OFF,
"Session recovery requires YARN.")
"Session recovery requires YARN or Kubernetes.")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -462,11 +462,11 @@ class InteractiveSession(
app = mockApp.orElse {
val driverProcess = client.flatMap { c => Option(c.getDriverProcess) }
.map(new LineBufferedProcess(_, livyConf.getInt(LivyConf.SPARK_LOGS_SIZE)))

if (livyConf.isRunningOnYarn() || driverProcess.isDefined) {
Some(SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this)))
if (!livyConf.isRunningOnKubernetes()) {
driverProcess.map(_ => SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this)))
} else {
None
// Create SparkApp for Kubernetes anyway
Some(SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this)))
}
}

Expand Down Expand Up @@ -547,6 +547,8 @@ class InteractiveSession(
transition(SessionState.ShuttingDown)
sessionStore.remove(RECOVERY_SESSION_TYPE, id)
client.foreach { _.stop(true) }
// We need to call #kill here explicitly to delete Interactive pods from the cluster
if (livyConf.isRunningOnKubernetes()) app.foreach(_.kill())
} catch {
case _: Exception =>
app.foreach {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,10 @@ class SessionManager[S <: Session, R <: RecoveryMetadata : ClassTag](

Future.sequence(all().filter(expired).map { s =>
s.state match {
case st: FinishedSessionState =>
case _: FinishedSessionState =>
info(s"Deleting $s because it finished before ${sessionStateRetainedInSec / 1e9} secs.")
case _ =>
info(s"Deleting $s because it was inactive or the time to leave the period is over.")
info(s"Deleting $s because it was inactive for more than ${sessionTimeout / 1e9} secs.")
}
delete(s)
})
Expand Down
Loading
Loading