Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit 85f02bf

Browse files
mccheahfoxish
authored andcommitted
Support custom labels on the driver pod. (#27)
* Support custom labels on the driver pod. * Add integration test and fix logic. * Fix tests * Fix minor formatting mistake * Reduce unnecessary diff
1 parent a89b4b0 commit 85f02bf

File tree

3 files changed

+70
-7
lines changed

3 files changed

+70
-7
lines changed

docs/running-on-kubernetes.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,14 @@ from the other deployment modes. See the [configuration page](configuration.html
213213
(typically 6-10%).
214214
</td>
215215
</tr>
216+
<tr>
217+
<td><code>spark.kubernetes.driver.labels</code></td>
218+
<td>(none)</td>
219+
<td>
220+
Custom labels that will be added to the driver pod. This should be a comma-separated list of label key-value pairs,
221+
where each label is in the format <code>key=value</code>.
222+
</td>
223+
</tr>
216224
</table>
217225

218226
## Current Limitations

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@ private[spark] class Client(
7777
private val serviceAccount = sparkConf.get("spark.kubernetes.submit.serviceAccountName",
7878
"default")
7979

80+
private val customLabels = sparkConf.get("spark.kubernetes.driver.labels", "")
81+
8082
private implicit val retryableExecutionContext = ExecutionContext
8183
.fromExecutorService(
8284
Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
@@ -85,6 +87,7 @@ private[spark] class Client(
8587
.build()))
8688

8789
def run(): Unit = {
90+
val parsedCustomLabels = parseCustomLabels(customLabels)
8891
var k8ConfBuilder = new ConfigBuilder()
8992
.withApiVersion("v1")
9093
.withMasterUrl(master)
@@ -109,14 +112,15 @@ private[spark] class Client(
109112
.withType("Opaque")
110113
.done()
111114
try {
112-
val selectors = Map(DRIVER_LAUNCHER_SELECTOR_LABEL -> driverLauncherSelectorValue).asJava
115+
val resolvedSelectors = (Map(DRIVER_LAUNCHER_SELECTOR_LABEL -> driverLauncherSelectorValue)
116+
++ parsedCustomLabels).asJava
113117
val (servicePorts, containerPorts) = configurePorts()
114118
val service = kubernetesClient.services().createNew()
115119
.withNewMetadata()
116120
.withName(kubernetesAppId)
117121
.endMetadata()
118122
.withNewSpec()
119-
.withSelector(selectors)
123+
.withSelector(resolvedSelectors)
120124
.withPorts(servicePorts.asJava)
121125
.endSpec()
122126
.done()
@@ -137,7 +141,7 @@ private[spark] class Client(
137141
.asScala
138142
.find(status =>
139143
status.getName == DRIVER_LAUNCHER_CONTAINER_NAME && status.getReady) match {
140-
case Some(status) =>
144+
case Some(_) =>
141145
try {
142146
val driverLauncher = getDriverLauncherService(
143147
k8ClientConfig, master)
@@ -184,7 +188,7 @@ private[spark] class Client(
184188
kubernetesClient.pods().createNew()
185189
.withNewMetadata()
186190
.withName(kubernetesAppId)
187-
.withLabels(selectors)
191+
.withLabels(resolvedSelectors)
188192
.endMetadata()
189193
.withNewSpec()
190194
.withRestartPolicy("OnFailure")
@@ -291,7 +295,7 @@ private[spark] class Client(
291295

292296
Utils.tryWithResource(kubernetesClient
293297
.pods()
294-
.withLabels(selectors)
298+
.withLabels(resolvedSelectors)
295299
.watch(podWatcher)) { createDriverPod }
296300
} finally {
297301
kubernetesClient.secrets().delete(secret)
@@ -336,7 +340,7 @@ private[spark] class Client(
336340
.getOption("spark.ui.port")
337341
.map(_.toInt)
338342
.getOrElse(DEFAULT_UI_PORT))
339-
(servicePorts.toSeq, containerPorts.toSeq)
343+
(servicePorts, containerPorts)
340344
}
341345

342346
private def buildSubmissionRequest(): KubernetesCreateSubmissionRequest = {
@@ -366,7 +370,7 @@ private[spark] class Client(
366370
uploadedJarsBase64Contents = uploadJarsBase64Contents)
367371
}
368372

369-
def compressJars(maybeFilePaths: Option[String]): Option[TarGzippedData] = {
373+
private def compressJars(maybeFilePaths: Option[String]): Option[TarGzippedData] = {
370374
maybeFilePaths
371375
.map(_.split(","))
372376
.map(CompressionUtils.createTarGzip(_))
@@ -391,6 +395,23 @@ private[spark] class Client(
391395
sslSocketFactory = sslContext.getSocketFactory,
392396
trustContext = trustManager)
393397
}
398+
399+
private def parseCustomLabels(labels: String): Map[String, String] = {
400+
labels.split(",").map(_.trim).filterNot(_.isEmpty).map(label => {
401+
label.split("=", 2).toSeq match {
402+
case Seq(k, v) =>
403+
require(k != DRIVER_LAUNCHER_SELECTOR_LABEL, "Label with key" +
404+
s" $DRIVER_LAUNCHER_SELECTOR_LABEL cannot be used in" +
405+
" spark.kubernetes.driver.labels, as it is reserved for Spark's" +
406+
" internal configuration.")
407+
(k, v)
408+
case _ =>
409+
throw new SparkException("Custom labels set by spark.kubernetes.driver.labels" +
410+
" must be a comma-separated list of key-value pairs, with format <key>=<value>." +
411+
s" Got label: $label. All labels: $labels")
412+
}
413+
}).toMap
414+
}
394415
}
395416

396417
private object Client {

resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,4 +161,38 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
161161
"spark-pi", NAMESPACE, "spark-ui-port")
162162
expectationsForStaticAllocation(sparkMetricsService)
163163
}
164+
165+
test("Run with custom labels") {
166+
val args = Array(
167+
"--master", s"k8s://https://${Minikube.getMinikubeIp}:8443",
168+
"--deploy-mode", "cluster",
169+
"--kubernetes-namespace", NAMESPACE,
170+
"--name", "spark-pi",
171+
"--executor-memory", "512m",
172+
"--executor-cores", "1",
173+
"--num-executors", "1",
174+
"--upload-jars", HELPER_JAR,
175+
"--class", MAIN_CLASS,
176+
"--conf", s"spark.kubernetes.submit.caCertFile=${clientConfig.getCaCertFile}",
177+
"--conf", s"spark.kubernetes.submit.clientKeyFile=${clientConfig.getClientKeyFile}",
178+
"--conf", s"spark.kubernetes.submit.clientCertFile=${clientConfig.getClientCertFile}",
179+
"--conf", "spark.kubernetes.executor.docker.image=spark-executor:latest",
180+
"--conf", "spark.kubernetes.driver.docker.image=spark-driver:latest",
181+
"--conf", "spark.kubernetes.driver.labels=label1=label1value,label2=label2value",
182+
EXAMPLES_JAR)
183+
SparkSubmit.main(args)
184+
val driverPodLabels = minikubeKubernetesClient
185+
.pods
186+
.withName("spark-pi")
187+
.get
188+
.getMetadata
189+
.getLabels
190+
// We can't match all of the selectors directly since one of the selectors is based on the
191+
// launch time.
192+
assert(driverPodLabels.size == 3, "Unexpected number of pod labels.")
193+
assert(driverPodLabels.containsKey("driver-launcher-selector"), "Expected driver launcher" +
194+
" selector label to be present.")
195+
assert(driverPodLabels.get("label1") == "label1value", "Unexpected value for label1")
196+
assert(driverPodLabels.get("label2") == "label2value", "Unexpected value for label2")
197+
}
164198
}

0 commit comments

Comments
 (0)