Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit 71a971f

Browse files
committed
Addressed more comments
1 parent 1f271be commit 71a971f

File tree

6 files changed

+62
-59
lines changed

6 files changed

+62
-59
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/ConfigurationUtils.scala

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,16 @@ package org.apache.spark.deploy.k8s
2020
import org.apache.spark.SparkConf
2121

2222
private[spark] object ConfigurationUtils {
23+
24+
/**
25+
* Extract and parse Spark configuration properties with a given name prefix and
26+
* return the result as a Map. Keys must not have more than one value.
27+
*
28+
* @param sparkConf Spark configuration
29+
* @param prefix the given property name prefix
30+
* @param configType a descriptive note on the type of entities of interest
31+
* @return a Map storing the configuration property keys and values
32+
*/
2333
def parsePrefixedKeyValuePairs(
2434
sparkConf: SparkConf,
2535
prefix: String,
@@ -34,15 +44,6 @@ private[spark] object ConfigurationUtils {
3444
fromPrefix.toMap
3545
}
3646

37-
def requireSecondIfFirstIsDefined(
38-
opt1: Option[_],
39-
opt2: Option[_],
40-
errMessageWhenSecondIsMissing: String): Unit = {
41-
opt1.foreach { _ =>
42-
require(opt2.isDefined, errMessageWhenSecondIsMissing)
43-
}
44-
}
45-
4647
def requireNandDefined(opt1: Option[_], opt2: Option[_], errMessage: String): Unit = {
4748
opt1.foreach { _ => require(opt2.isEmpty, errMessage) }
4849
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,8 @@ private[spark] object SparkKubernetesClientFactory {
5151
ConfigurationUtils.requireNandDefined(
5252
oauthTokenFile,
5353
oauthTokenValue,
54-
s"Cannot specify OAuth token through both a file $oauthTokenFileConf and a" +
55-
s" value $oauthTokenConf.")
54+
s"Cannot specify OAuth token through both a file $oauthTokenFileConf and a " +
55+
s"value $oauthTokenConf.")
5656

5757
val caCertFile = sparkConf
5858
.getOption(s"$kubernetesAuthConfPrefix.$CA_CERT_FILE_CONF_SUFFIX")

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/config.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ package object config extends Logging {
4444
.stringConf
4545
.createWithDefault("IfNotPresent")
4646

47-
4847
private[spark] val APISERVER_AUTH_DRIVER_CONF_PREFIX =
4948
"spark.kubernetes.authenticate.driver"
5049
private[spark] val APISERVER_AUTH_DRIVER_MOUNTED_CONF_PREFIX =
@@ -95,12 +94,14 @@ package object config extends Logging {
9594
ConfigBuilder("spark.kubernetes.allocation.batch.size")
9695
.doc("Number of pods to launch at once in each round of executor allocation.")
9796
.intConf
97+
.checkValue(value => value > 0, "Allocation batch size should be a positive integer")
9898
.createWithDefault(5)
9999

100100
private[spark] val KUBERNETES_ALLOCATION_BATCH_DELAY =
101101
ConfigBuilder("spark.kubernetes.allocation.batch.delay")
102102
.doc("Number of seconds to wait between each round of executor allocation.")
103103
.longConf
104+
.checkValue(value => value > 0, s"Allocation batch delay should be a positive integer")
104105
.createWithDefault(1)
105106

106107
private[spark] val KUBERNETES_EXECUTOR_LIMIT_CORES =

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import scala.collection.JavaConverters._
2020

2121
import io.fabric8.kubernetes.api.model._
2222

23-
import org.apache.spark.{SparkConf, SparkException}
23+
import org.apache.spark.SparkConf
2424
import org.apache.spark.deploy.k8s.ConfigurationUtils
2525
import org.apache.spark.deploy.k8s.config._
2626
import org.apache.spark.deploy.k8s.constants._
@@ -77,11 +77,8 @@ private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf)
7777
private val executorDockerImage = sparkConf.get(EXECUTOR_DOCKER_IMAGE)
7878
private val dockerImagePullPolicy = sparkConf.get(DOCKER_IMAGE_PULL_POLICY)
7979
private val executorPort = sparkConf.getInt("spark.executor.port", DEFAULT_STATIC_PORT)
80-
private val blockmanagerPort = sparkConf
80+
private val blockManagerPort = sparkConf
8181
.getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT)
82-
private val kubernetesDriverPodName = sparkConf
83-
.get(KUBERNETES_DRIVER_POD_NAME)
84-
.getOrElse(throw new SparkException("Must specify the driver pod name"))
8582

8683
private val executorPodNamePrefix = sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX)
8784

@@ -163,7 +160,7 @@ private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf)
163160
) ++ executorExtraJavaOptionsEnv ++ executorExtraClasspathEnv.toSeq
164161
val requiredPorts = Seq(
165162
(EXECUTOR_PORT_NAME, executorPort),
166-
(BLOCK_MANAGER_PORT_NAME, blockmanagerPort))
163+
(BLOCK_MANAGER_PORT_NAME, blockManagerPort))
167164
.map(port => {
168165
new ContainerPortBuilder()
169166
.withName(port._1)

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,10 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
3636
new TaskSchedulerImpl(sc)
3737
}
3838

39-
override def createSchedulerBackend(sc: SparkContext, masterURL: String, scheduler: TaskScheduler)
40-
: SchedulerBackend = {
39+
override def createSchedulerBackend(
40+
sc: SparkContext,
41+
masterURL: String,
42+
scheduler: TaskScheduler): SchedulerBackend = {
4143
val sparkConf = sc.getConf
4244

4345
val kubernetesClient = SparkKubernetesClientFactory.createKubernetesClient(

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala

Lines changed: 41 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -87,14 +87,8 @@ private[spark] class KubernetesClusterSchedulerBackend(
8787
private val initialExecutors = SchedulerBackendUtils.getInitialTargetExecutorNumber(conf)
8888

8989
private val podAllocationInterval = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY)
90-
require(podAllocationInterval > 0, "Allocation batch delay " +
91-
s"${KUBERNETES_ALLOCATION_BATCH_DELAY} " +
92-
s"is ${podAllocationInterval}, should be a positive integer")
9390

9491
private val podAllocationSize = conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE)
95-
require(podAllocationSize > 0, "Allocation batch size " +
96-
s"${KUBERNETES_ALLOCATION_BATCH_SIZE} " +
97-
s"is ${podAllocationSize}, should be a positive integer")
9892

9993
private val allocatorRunnable = new Runnable {
10094

@@ -304,39 +298,40 @@ private[spark] class KubernetesClusterSchedulerBackend(
304298
private val DEFAULT_CONTAINER_FAILURE_EXIT_STATUS = -1
305299

306300
override def eventReceived(action: Action, pod: Pod): Unit = {
307-
if (action == Action.MODIFIED && pod.getStatus.getPhase == "Running"
308-
&& pod.getMetadata.getDeletionTimestamp == null) {
309-
val podIP = pod.getStatus.getPodIP
310-
val clusterNodeName = pod.getSpec.getNodeName
311-
logInfo(s"Executor pod $pod ready, launched at $clusterNodeName as IP $podIP.")
312-
executorPodsByIPs.put(podIP, pod)
313-
} else if (action == Action.DELETED || action == Action.ERROR) {
314-
val executorId = pod.getMetadata.getLabels.get(SPARK_EXECUTOR_ID_LABEL)
315-
require(executorId != null, "Unexpected pod metadata; expected all executor pods" +
316-
s" to have label $SPARK_EXECUTOR_ID_LABEL.")
317-
val podName = pod.getMetadata.getName
318-
val podIP = pod.getStatus.getPodIP
319-
logDebug(s"Executor pod $podName at IP $podIP was at $action.")
320-
if (podIP != null) {
321-
executorPodsByIPs.remove(podIP)
322-
}
323-
val executorExitReason = if (action == Action.ERROR) {
324-
logWarning(s"Received pod $podName exited event. Reason: " + pod.getStatus.getReason)
325-
executorExitReasonOnError(pod)
326-
} else if (action == Action.DELETED) {
327-
logWarning(s"Received delete pod $podName event. Reason: " + pod.getStatus.getReason)
328-
executorExitReasonOnDelete(pod)
329-
} else {
330-
throw new IllegalStateException(
331-
s"Unknown action that should only be DELETED or ERROR: $action")
332-
}
333-
podsWithKnownExitReasons.put(pod.getMetadata.getName, executorExitReason)
334-
if (!disconnectedPodsByExecutorIdPendingRemoval.containsKey(executorId)) {
335-
log.warn(s"Executor with id $executorId was not marked as disconnected, but the" +
336-
s" watch received an event of type $action for this executor. The executor may" +
337-
s" have failed to start in the first place and never registered with the driver.")
338-
}
339-
disconnectedPodsByExecutorIdPendingRemoval.put(executorId, pod)
301+
action match {
302+
case Action.MODIFIED if (pod.getStatus.getPhase == "Running"
303+
&& pod.getMetadata.getDeletionTimestamp == null) =>
304+
val podIP = pod.getStatus.getPodIP
305+
val clusterNodeName = pod.getSpec.getNodeName
306+
logInfo(s"Executor pod $pod ready, launched at $clusterNodeName as IP $podIP.")
307+
executorPodsByIPs.put(podIP, pod)
308+
case Action.DELETED | Action.ERROR =>
309+
val executorId = getExecutorId(pod)
310+
val podName = pod.getMetadata.getName
311+
val podIP = pod.getStatus.getPodIP
312+
logDebug(s"Executor pod $podName at IP $podIP was at $action.")
313+
if (podIP != null) {
314+
executorPodsByIPs.remove(podIP)
315+
}
316+
317+
val executorExitReason = if (action == Action.ERROR) {
318+
logWarning(s"Received pod $podName exited event. Reason: " + pod.getStatus.getReason)
319+
executorExitReasonOnError(pod)
320+
} else if (action == Action.DELETED) {
321+
logWarning(s"Received delete pod $podName event. Reason: " + pod.getStatus.getReason)
322+
executorExitReasonOnDelete(pod)
323+
} else {
324+
throw new IllegalStateException(
325+
s"Unknown action that should only be DELETED or ERROR: $action")
326+
}
327+
podsWithKnownExitReasons.put(pod.getMetadata.getName, executorExitReason)
328+
329+
if (!disconnectedPodsByExecutorIdPendingRemoval.containsKey(executorId)) {
330+
log.warn(s"Executor with id $executorId was not marked as disconnected, but the" +
331+
s" watch received an event of type $action for this executor. The executor may" +
332+
s" have failed to start in the first place and never registered with the driver.")
333+
}
334+
disconnectedPodsByExecutorIdPendingRemoval.put(executorId, pod)
340335
}
341336
}
342337

@@ -391,6 +386,13 @@ private[spark] class KubernetesClusterSchedulerBackend(
391386
ExecutorExited(
392387
getExecutorExitStatus(pod), exitCausedByApp = false, exitMessage)
393388
}
389+
390+
def getExecutorId(pod: Pod): String = {
391+
val executorId = pod.getMetadata.getLabels.get(SPARK_EXECUTOR_ID_LABEL)
392+
require(executorId != null, "Unexpected pod metadata; expected all executor pods " +
393+
s"to have label $SPARK_EXECUTOR_ID_LABEL.")
394+
executorId
395+
}
394396
}
395397

396398
override def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = {

0 commit comments

Comments
 (0)