Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Support custom labels on the driver pod. #27

Merged
merged 5 commits into from
Jan 19, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,14 @@ from the other deployment modes. See the [configuration page](configuration.html
(typically 6-10%).
</td>
</tr>
<tr>
<td><code>spark.kubernetes.driver.labels</code></td>
<td>(none)</td>
<td>
Custom labels that will be added to the driver pod. This should be a comma-separated list of label key-value pairs,
where each label is in the format <code>key=value</code>.
</td>
</tr>
</table>

## Current Limitations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ private[spark] class Client(
private val serviceAccount = sparkConf.get("spark.kubernetes.submit.serviceAccountName",
"default")

private val customLabels = sparkConf.get("spark.kubernetes.driver.labels", "")

private implicit val retryableExecutionContext = ExecutionContext
.fromExecutorService(
Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
Expand All @@ -85,6 +87,7 @@ private[spark] class Client(
.build()))

def run(): Unit = {
val parsedCustomLabels = parseCustomLabels(customLabels)
var k8ConfBuilder = new ConfigBuilder()
.withApiVersion("v1")
.withMasterUrl(master)
Expand All @@ -109,14 +112,15 @@ private[spark] class Client(
.withType("Opaque")
.done()
try {
val selectors = Map(DRIVER_LAUNCHER_SELECTOR_LABEL -> driverLauncherSelectorValue).asJava
val resolvedSelectors = (Map(DRIVER_LAUNCHER_SELECTOR_LABEL -> driverLauncherSelectorValue)
++ parsedCustomLabels).asJava
val (servicePorts, containerPorts) = configurePorts()
val service = kubernetesClient.services().createNew()
.withNewMetadata()
.withName(kubernetesAppId)
.endMetadata()
.withNewSpec()
.withSelector(selectors)
.withSelector(resolvedSelectors)
.withPorts(servicePorts.asJava)
.endSpec()
.done()
Expand All @@ -137,7 +141,7 @@ private[spark] class Client(
.asScala
.find(status =>
status.getName == DRIVER_LAUNCHER_CONTAINER_NAME && status.getReady) match {
case Some(status) =>
case Some(_) =>
try {
val driverLauncher = getDriverLauncherService(
k8ClientConfig, master)
Expand Down Expand Up @@ -184,7 +188,7 @@ private[spark] class Client(
kubernetesClient.pods().createNew()
.withNewMetadata()
.withName(kubernetesAppId)
.withLabels(selectors)
.withLabels(resolvedSelectors)
.endMetadata()
.withNewSpec()
.withRestartPolicy("OnFailure")
Expand Down Expand Up @@ -291,7 +295,7 @@ private[spark] class Client(

Utils.tryWithResource(kubernetesClient
.pods()
.withLabels(selectors)
.withLabels(resolvedSelectors)
.watch(podWatcher)) { createDriverPod }
} finally {
kubernetesClient.secrets().delete(secret)
Expand Down Expand Up @@ -336,7 +340,7 @@ private[spark] class Client(
.getOption("spark.ui.port")
.map(_.toInt)
.getOrElse(DEFAULT_UI_PORT))
(servicePorts.toSeq, containerPorts.toSeq)
(servicePorts, containerPorts)
}

private def buildSubmissionRequest(): KubernetesCreateSubmissionRequest = {
Expand Down Expand Up @@ -366,7 +370,7 @@ private[spark] class Client(
uploadedJarsBase64Contents = uploadJarsBase64Contents)
}

def compressJars(maybeFilePaths: Option[String]): Option[TarGzippedData] = {
private def compressJars(maybeFilePaths: Option[String]): Option[TarGzippedData] = {
maybeFilePaths
.map(_.split(","))
.map(CompressionUtils.createTarGzip(_))
Expand All @@ -391,6 +395,23 @@ private[spark] class Client(
sslSocketFactory = sslContext.getSocketFactory,
trustContext = trustManager)
}

private def parseCustomLabels(labels: String): Map[String, String] = {
labels.split(",").map(_.trim).filterNot(_.isEmpty).map(label => {
label.split("=", 2).toSeq match {
case Seq(k, v) =>
require(k != DRIVER_LAUNCHER_SELECTOR_LABEL, "Label with key" +
s" $DRIVER_LAUNCHER_SELECTOR_LABEL cannot be used in" +
" spark.kubernetes.driver.labels, as it is reserved for Spark's" +
" internal configuration.")
(k, v)
case _ =>
throw new SparkException("Custom labels set by spark.kubernetes.driver.labels" +
" must be a comma-separated list of key-value pairs, with format <key>=<value>." +
s" Got label: $label. All labels: $labels")
}
}).toMap
}
}

private object Client {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,4 +161,38 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
"spark-pi", NAMESPACE, "spark-ui-port")
expectationsForStaticAllocation(sparkMetricsService)
}

test("Run with custom labels") {
val args = Array(
"--master", s"k8s://https://${Minikube.getMinikubeIp}:8443",
"--deploy-mode", "cluster",
"--kubernetes-namespace", NAMESPACE,
"--name", "spark-pi",
"--executor-memory", "512m",
"--executor-cores", "1",
"--num-executors", "1",
"--upload-jars", HELPER_JAR,
"--class", MAIN_CLASS,
"--conf", s"spark.kubernetes.submit.caCertFile=${clientConfig.getCaCertFile}",
"--conf", s"spark.kubernetes.submit.clientKeyFile=${clientConfig.getClientKeyFile}",
"--conf", s"spark.kubernetes.submit.clientCertFile=${clientConfig.getClientCertFile}",
"--conf", "spark.kubernetes.executor.docker.image=spark-executor:latest",
"--conf", "spark.kubernetes.driver.docker.image=spark-driver:latest",
"--conf", "spark.kubernetes.driver.labels=label1=label1value,label2=label2value",
EXAMPLES_JAR)
SparkSubmit.main(args)
val driverPodLabels = minikubeKubernetesClient
.pods
.withName("spark-pi")
.get
.getMetadata
.getLabels
// We can't match all of the selectors directly since one of the selectors is based on the
// launch time.
assert(driverPodLabels.size == 3, "Unexpected number of pod labels.")
assert(driverPodLabels.containsKey("driver-launcher-selector"), "Expected driver launcher" +
" selector label to be present.")
assert(driverPodLabels.get("label1") == "label1value", "Unexpected value for label1")
assert(driverPodLabels.get("label2") == "label2value", "Unexpected value for label2")
}
}