Skip to content

Commit 992acbe

Browse files
committed
Extract initial executor count to utils class
1 parent 2fb596d commit 992acbe

File tree

6 files changed

+53
-45
lines changed

6 files changed

+53
-45
lines changed
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.scheduler.cluster
18+
19+
import org.apache.spark.SparkConf
20+
import org.apache.spark.internal.config.{DYN_ALLOCATION_MAX_EXECUTORS, DYN_ALLOCATION_MIN_EXECUTORS, EXECUTOR_INSTANCES}
21+
import org.apache.spark.util.Utils
22+
23+
private[spark] object SchedulerBackendUtils {
24+
val DEFAULT_NUMBER_EXECUTORS = 2
25+
26+
/**
27+
* Getting the initial target number of executors depends on whether dynamic allocation is
28+
* enabled.
29+
* If not using dynamic allocation it gets the number of executors requested by the user.
30+
*/
31+
def getInitialTargetExecutorNumber(
32+
conf: SparkConf,
33+
numExecutors: Int = DEFAULT_NUMBER_EXECUTORS): Int = {
34+
if (Utils.isDynamicAllocationEnabled(conf)) {
35+
val minNumExecutors = conf.get(DYN_ALLOCATION_MIN_EXECUTORS)
36+
val initialNumExecutors = Utils.getDynamicAllocationInitialExecutors(conf)
37+
val maxNumExecutors = conf.get(DYN_ALLOCATION_MAX_EXECUTORS)
38+
require(initialNumExecutors >= minNumExecutors && initialNumExecutors <= maxNumExecutors,
39+
s"initial executor number $initialNumExecutors must between min executor number " +
40+
s"$minNumExecutors and max executor number $maxNumExecutors")
41+
42+
initialNumExecutors
43+
} else {
44+
conf.get(EXECUTOR_INSTANCES).getOrElse(numExecutors)
45+
}
46+
}
47+
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala

Lines changed: 2 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference}
2424
import scala.collection.JavaConverters._
2525
import scala.collection.mutable
2626
import scala.concurrent.{ExecutionContext, Future}
27-
2827
import io.fabric8.kubernetes.api.model._
2928
import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watcher}
3029
import io.fabric8.kubernetes.client.Watcher.Action
@@ -34,7 +33,7 @@ import org.apache.spark.deploy.k8s.config._
3433
import org.apache.spark.deploy.k8s.constants._
3534
import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress, RpcEnv}
3635
import org.apache.spark.scheduler.{ExecutorExited, SlaveLost, TaskSchedulerImpl}
37-
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
36+
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SchedulerBackendUtils}
3837
import org.apache.spark.util.Utils
3938

4039
private[spark] class KubernetesClusterSchedulerBackend(
@@ -86,7 +85,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
8685
conf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT),
8786
CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
8887

89-
private val initialExecutors = getInitialTargetExecutorNumber()
88+
private val initialExecutors = SchedulerBackendUtils.getInitialTargetExecutorNumber(conf)
9089

9190
private val podAllocationInterval = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY)
9291
require(podAllocationInterval > 0, s"Allocation batch delay " +
@@ -173,22 +172,6 @@ private[spark] class KubernetesClusterSchedulerBackend(
173172
}
174173
}
175174

176-
private def getInitialTargetExecutorNumber(defaultNumExecutors: Int = 1): Int = {
177-
if (Utils.isDynamicAllocationEnabled(conf)) {
178-
val minNumExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", 0)
179-
val initialNumExecutors = Utils.getDynamicAllocationInitialExecutors(conf)
180-
val maxNumExecutors = conf.getInt("spark.dynamicAllocation.maxExecutors", 1)
181-
require(initialNumExecutors >= minNumExecutors && initialNumExecutors <= maxNumExecutors,
182-
s"initial executor number $initialNumExecutors must between min executor number " +
183-
s"$minNumExecutors and max executor number $maxNumExecutors")
184-
185-
initialNumExecutors
186-
} else {
187-
conf.getInt("spark.executor.instances", defaultNumExecutors)
188-
}
189-
190-
}
191-
192175
override def sufficientResourcesRegistered(): Boolean = {
193176
totalRegisteredExecutors.get() >= initialExecutors * minRegisteredRatio
194177
}

resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import scala.collection.JavaConverters._
2626
import scala.collection.mutable
2727
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue}
2828
import scala.util.control.NonFatal
29-
3029
import org.apache.hadoop.yarn.api.records._
3130
import org.apache.hadoop.yarn.client.api.AMRMClient
3231
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
@@ -41,6 +40,7 @@ import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef}
4140
import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason}
4241
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
4342
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RetrieveLastAllocatedExecutorId
43+
import org.apache.spark.scheduler.cluster.SchedulerBackendUtils
4444
import org.apache.spark.util.{Clock, SystemClock, ThreadUtils}
4545

4646
/**
@@ -109,7 +109,7 @@ private[yarn] class YarnAllocator(
109109
sparkConf.get(EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS).getOrElse(-1L)
110110

111111
@volatile private var targetNumExecutors =
112-
YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sparkConf)
112+
SchedulerBackendUtils.getInitialTargetExecutorNumber(sparkConf)
113113

114114
private var currentNodeBlacklist = Set.empty[String]
115115

resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -279,27 +279,5 @@ object YarnSparkHadoopUtil {
279279
securityMgr.getModifyAclsGroups)
280280
)
281281
}
282-
283-
/**
284-
* Getting the initial target number of executors depends on whether dynamic allocation is
285-
* enabled.
286-
* If not using dynamic allocation it gets the number of executors requested by the user.
287-
*/
288-
def getInitialTargetExecutorNumber(
289-
conf: SparkConf,
290-
numExecutors: Int = DEFAULT_NUMBER_EXECUTORS): Int = {
291-
if (Utils.isDynamicAllocationEnabled(conf)) {
292-
val minNumExecutors = conf.get(DYN_ALLOCATION_MIN_EXECUTORS)
293-
val initialNumExecutors = Utils.getDynamicAllocationInitialExecutors(conf)
294-
val maxNumExecutors = conf.get(DYN_ALLOCATION_MAX_EXECUTORS)
295-
require(initialNumExecutors >= minNumExecutors && initialNumExecutors <= maxNumExecutors,
296-
s"initial executor number $initialNumExecutors must between min executor number " +
297-
s"$minNumExecutors and max executor number $maxNumExecutors")
298-
299-
initialNumExecutors
300-
} else {
301-
conf.get(EXECUTOR_INSTANCES).getOrElse(numExecutors)
302-
}
303-
}
304282
}
305283

resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ private[spark] class YarnClientSchedulerBackend(
5252

5353
logDebug("ClientArguments called with: " + argsArrayBuf.mkString(" "))
5454
val args = new ClientArguments(argsArrayBuf.toArray)
55-
totalExpectedExecutors = YarnSparkHadoopUtil.getInitialTargetExecutorNumber(conf)
55+
totalExpectedExecutors = SchedulerBackendUtils.getInitialTargetExecutorNumber(conf)
5656
client = new Client(args, conf)
5757
bindToYarn(client.submitApplication(), None)
5858

resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ private[spark] class YarnClusterSchedulerBackend(
3434
val attemptId = ApplicationMaster.getAttemptId
3535
bindToYarn(attemptId.getApplicationId(), Some(attemptId))
3636
super.start()
37-
totalExpectedExecutors = YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sc.conf)
37+
totalExpectedExecutors = SchedulerBackendUtils.getInitialTargetExecutorNumber(sc.conf)
3838
}
3939

4040
override def getDriverLogUrls: Option[Map[String, String]] = {

0 commit comments

Comments
 (0)