Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit 4b32134

Browse files
committed
Various fixes to the scheduler
- Move Kubernetes client calls out of synchronized blocks to prevent locking with HTTP connection lag - Fix a bug where pods that fail to launch through the APi are not retried - Remove the map from executor pod name to executor ID by using the Pod's labels to get the same information without having to track extra state.
1 parent 018f4d8 commit 4b32134

File tree

4 files changed

+124
-78
lines changed

4 files changed

+124
-78
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,14 +138,14 @@ private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf)
138138
new EnvVarBuilder().withName(s"$ENV_JAVA_OPT_PREFIX$index").withValue(opt).build()
139139
}
140140
}.getOrElse(Seq.empty[EnvVar])
141-
val executorEnv = Seq(
141+
val executorEnv = (Seq(
142142
(ENV_EXECUTOR_PORT, executorPort.toString),
143143
(ENV_DRIVER_URL, driverUrl),
144144
// Executor backend expects integral value for executor cores, so round it up to an int.
145145
(ENV_EXECUTOR_CORES, math.ceil(executorCores).toInt.toString),
146146
(ENV_EXECUTOR_MEMORY, executorMemoryString),
147147
(ENV_APPLICATION_ID, applicationId),
148-
(ENV_EXECUTOR_ID, executorId))
148+
(ENV_EXECUTOR_ID, executorId)) ++ executorEnvs)
149149
.map(env => new EnvVarBuilder()
150150
.withName(env._1)
151151
.withValue(env._2)

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala

Lines changed: 45 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,8 @@ private[spark] class KubernetesClusterSchedulerBackend(
5050

5151
private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
5252
private val RUNNING_EXECUTOR_PODS_LOCK = new Object
53-
// Indexed by executor IDs
5453
@GuardedBy("RUNNING_EXECUTOR_PODS_LOCK")
5554
private val runningExecutorsToPods = new mutable.HashMap[String, Pod]
56-
// Indexed by executor pod names
57-
@GuardedBy("RUNNING_EXECUTOR_PODS_LOCK")
58-
private val runningPodsToExecutors = new mutable.HashMap[String, String]
5955
private val executorPodsByIPs = new ConcurrentHashMap[String, Pod]()
6056
private val podsWithKnownExitReasons = new ConcurrentHashMap[String, ExecutorExited]()
6157
private val disconnectedPodsByExecutorIdPendingRemoval = new ConcurrentHashMap[String, Pod]()
@@ -117,7 +113,6 @@ private[spark] class KubernetesClusterSchedulerBackend(
117113
} else if (currentTotalExpectedExecutors <= runningExecutorsToPods.size) {
118114
logDebug("Maximum allowed executor limit reached. Not scaling up further.")
119115
} else {
120-
val nodeToLocalTaskCount = getNodesWithLocalTaskCounts
121116
for (i <- 0 until math.min(
122117
currentTotalExpectedExecutors - runningExecutorsToPods.size, podAllocationSize)) {
123118
val executorId = EXECUTOR_ID_COUNTER.incrementAndGet().toString
@@ -127,7 +122,16 @@ private[spark] class KubernetesClusterSchedulerBackend(
127122
driverUrl,
128123
conf.getExecutorEnv,
129124
driverPod,
130-
nodeToLocalTaskCount)
125+
currentNodeToLocalTaskCount)
126+
require(executorPod.getMetadata.getLabels.containsKey(SPARK_EXECUTOR_ID_LABEL),
127+
s"Illegal internal state for pod with name ${executorPod.getMetadata.getName} - all" +
128+
s" executor pods must contain the label $SPARK_EXECUTOR_ID_LABEL.")
129+
val resolvedExecutorIdLabel = executorPod.getMetadata.getLabels.get(
130+
SPARK_EXECUTOR_ID_LABEL)
131+
require(resolvedExecutorIdLabel == executorId,
132+
s"Illegal internal state for pod with name ${executorPod.getMetadata.getName} - all" +
133+
s" executor pods must map the label with key ${SPARK_EXECUTOR_ID_LABEL} to the" +
134+
s" executor's ID. This label mapped instead to: $resolvedExecutorIdLabel.")
131135
executorsToAllocate(executorId) = executorPod
132136
logInfo(
133137
s"Requesting a new executor, total executors is now ${runningExecutorsToPods.size}")
@@ -143,8 +147,6 @@ private[spark] class KubernetesClusterSchedulerBackend(
143147
case (executorId, attemptedAllocatedExecutor) =>
144148
attemptedAllocatedExecutor.map { successfullyAllocatedExecutor =>
145149
runningExecutorsToPods.put(executorId, successfullyAllocatedExecutor)
146-
runningPodsToExecutors.put(
147-
successfullyAllocatedExecutor.getMetadata.getName, executorId)
148150
}
149151
}
150152
}
@@ -166,11 +168,12 @@ private[spark] class KubernetesClusterSchedulerBackend(
166168
// We keep around executors that have exit conditions caused by the application. This
167169
// allows them to be debugged later on. Otherwise, mark them as to be deleted from the
168170
// the API server.
169-
if (!executorExited.exitCausedByApp) {
171+
if (executorExited.exitCausedByApp) {
172+
logInfo(s"Executor $executorId exited because of the application.")
173+
deleteExecutorFromDataStructures(executorId)
174+
} else {
170175
logInfo(s"Executor $executorId failed because of a framework error.")
171176
deleteExecutorFromClusterAndDataStructures(executorId)
172-
} else {
173-
logInfo(s"Executor $executorId exited because of the application.")
174177
}
175178
}
176179
}
@@ -187,19 +190,20 @@ private[spark] class KubernetesClusterSchedulerBackend(
187190
}
188191

189192
def deleteExecutorFromClusterAndDataStructures(executorId: String): Unit = {
193+
deleteExecutorFromDataStructures(executorId)
194+
.foreach(pod => kubernetesClient.pods().delete(pod))
195+
}
196+
197+
def deleteExecutorFromDataStructures(executorId: String): Option[Pod] = {
190198
disconnectedPodsByExecutorIdPendingRemoval.remove(executorId)
191199
executorReasonCheckAttemptCounts -= executorId
192-
podsWithKnownExitReasons -= executorId
193-
val maybeExecutorPodToDelete = RUNNING_EXECUTOR_PODS_LOCK.synchronized {
194-
runningExecutorsToPods.remove(executorId).map { pod =>
195-
runningPodsToExecutors.remove(pod.getMetadata.getName)
196-
pod
197-
}.orElse {
200+
podsWithKnownExitReasons.remove(executorId)
201+
RUNNING_EXECUTOR_PODS_LOCK.synchronized {
202+
runningExecutorsToPods.remove(executorId).orElse {
198203
logWarning(s"Unable to remove pod for unknown executor $executorId")
199204
None
200205
}
201206
}
202-
maybeExecutorPodToDelete.foreach(pod => kubernetesClient.pods().delete(pod))
203207
}
204208
}
205209

@@ -231,14 +235,10 @@ private[spark] class KubernetesClusterSchedulerBackend(
231235
super.stop()
232236

233237
// then delete the executor pods
234-
// TODO investigate why Utils.tryLogNonFatalError() doesn't work in this context.
235-
// When using Utils.tryLogNonFatalError some of the code fails but without any logs or
236-
// indication as to why.
237238
Utils.tryLogNonFatalError {
238239
val executorPodsToDelete = RUNNING_EXECUTOR_PODS_LOCK.synchronized {
239240
val runningExecutorPodsCopy = Seq(runningExecutorsToPods.values.toSeq: _*)
240241
runningExecutorsToPods.clear()
241-
runningPodsToExecutors.clear()
242242
runningExecutorPodsCopy
243243
}
244244
kubernetesClient.pods().delete(executorPodsToDelete: _*)
@@ -288,7 +288,6 @@ private[spark] class KubernetesClusterSchedulerBackend(
288288
val maybeRemovedExecutor = runningExecutorsToPods.remove(executor)
289289
maybeRemovedExecutor.foreach { executorPod =>
290290
disconnectedPodsByExecutorIdPendingRemoval.put(executor, executorPod)
291-
runningPodsToExecutors.remove(executorPod.getMetadata.getName)
292291
podsToDelete += executorPod
293292
}
294293
if (maybeRemovedExecutor.isEmpty) {
@@ -300,11 +299,6 @@ private[spark] class KubernetesClusterSchedulerBackend(
300299
true
301300
}
302301

303-
def getExecutorPodByIP(podIP: String): Option[Pod] = {
304-
val pod = executorPodsByIPs.get(podIP)
305-
Option(pod)
306-
}
307-
308302
private class ExecutorPodsWatcher extends Watcher[Pod] {
309303

310304
private val DEFAULT_CONTAINER_FAILURE_EXIT_STATUS = -1
@@ -316,21 +310,33 @@ private[spark] class KubernetesClusterSchedulerBackend(
316310
val clusterNodeName = pod.getSpec.getNodeName
317311
logInfo(s"Executor pod $pod ready, launched at $clusterNodeName as IP $podIP.")
318312
executorPodsByIPs.put(podIP, pod)
319-
} else if ((action == Action.MODIFIED && pod.getMetadata.getDeletionTimestamp != null) ||
320-
action == Action.DELETED || action == Action.ERROR) {
313+
} else if (action == Action.DELETED || action == Action.ERROR) {
314+
val executorId = pod.getMetadata.getLabels.get(SPARK_EXECUTOR_ID_LABEL)
315+
require(executorId != null, "Unexpected pod metadata; expected all executor pods" +
316+
s" to have label $SPARK_EXECUTOR_ID_LABEL.")
321317
val podName = pod.getMetadata.getName
322318
val podIP = pod.getStatus.getPodIP
323319
logDebug(s"Executor pod $podName at IP $podIP was at $action.")
324320
if (podIP != null) {
325321
executorPodsByIPs.remove(podIP)
326322
}
327-
if (action == Action.ERROR) {
323+
val executorExitReason = if (action == Action.ERROR) {
328324
logWarning(s"Received pod $podName exited event. Reason: " + pod.getStatus.getReason)
329-
handleErroredPod(pod)
325+
executorExitReasonOnError(pod)
330326
} else if (action == Action.DELETED) {
331327
logWarning(s"Received delete pod $podName event. Reason: " + pod.getStatus.getReason)
332-
handleDeletedPod(pod)
328+
executorExitReasonOnDelete(pod)
329+
} else {
330+
throw new IllegalStateException(
331+
s"Unknown action that should only be DELETED or ERROR: $action")
332+
}
333+
podsWithKnownExitReasons.put(pod.getMetadata.getName, executorExitReason)
334+
if (!disconnectedPodsByExecutorIdPendingRemoval.containsKey(executorId)) {
335+
log.warn(s"Executor with id $executorId was not marked as disconnected, but the" +
336+
s" watch received an event of type $action for this executor. The executor may" +
337+
s" have failed to start in the first place and never registered with the driver.")
333338
}
339+
disconnectedPodsByExecutorIdPendingRemoval.put(executorId, pod)
334340
}
335341
}
336342

@@ -356,15 +362,16 @@ private[spark] class KubernetesClusterSchedulerBackend(
356362
}
357363

358364
def isPodAlreadyReleased(pod: Pod): Boolean = {
365+
val executorId = pod.getMetadata.getLabels.get(SPARK_EXECUTOR_ID_LABEL)
359366
RUNNING_EXECUTOR_PODS_LOCK.synchronized {
360-
!runningPodsToExecutors.contains(pod.getMetadata.getName)
367+
!runningExecutorsToPods.contains(executorId)
361368
}
362369
}
363370

364-
def handleErroredPod(pod: Pod): Unit = {
371+
def executorExitReasonOnError(pod: Pod): ExecutorExited = {
365372
val containerExitStatus = getExecutorExitStatus(pod)
366373
// container was probably actively killed by the driver.
367-
val exitReason = if (isPodAlreadyReleased(pod)) {
374+
if (isPodAlreadyReleased(pod)) {
368375
ExecutorExited(containerExitStatus, exitCausedByApp = false,
369376
s"Container in pod ${pod.getMetadata.getName} exited from explicit termination" +
370377
" request.")
@@ -373,18 +380,16 @@ private[spark] class KubernetesClusterSchedulerBackend(
373380
s"exited with exit status code $containerExitStatus."
374381
ExecutorExited(containerExitStatus, exitCausedByApp = true, containerExitReason)
375382
}
376-
podsWithKnownExitReasons.put(pod.getMetadata.getName, exitReason)
377383
}
378384

379-
def handleDeletedPod(pod: Pod): Unit = {
385+
def executorExitReasonOnDelete(pod: Pod): ExecutorExited = {
380386
val exitMessage = if (isPodAlreadyReleased(pod)) {
381387
s"Container in pod ${pod.getMetadata.getName} exited from explicit termination request."
382388
} else {
383389
s"Pod ${pod.getMetadata.getName} deleted or lost."
384390
}
385-
val exitReason = ExecutorExited(
391+
ExecutorExited(
386392
getExecutorExitStatus(pod), exitCausedByApp = false, exitMessage)
387-
podsWithKnownExitReasons.put(pod.getMetadata.getName, exitReason)
388393
}
389394
}
390395

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.scalatest.{BeforeAndAfter, BeforeAndAfterEach}
2525

2626
import org.apache.spark.{SparkConf, SparkFunSuite}
2727
import org.apache.spark.deploy.k8s.config._
28-
import org.apache.spark.deploy.k8s.constants
28+
import org.apache.spark.deploy.k8s.constants._
2929

3030
class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with BeforeAndAfterEach {
3131
private val driverPodName: String = "driver-pod"
@@ -64,6 +64,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef
6464
// The executor pod name and default labels.
6565
assert(executor.getMetadata.getName === s"$executorPrefix-exec-1")
6666
assert(executor.getMetadata.getLabels.size() === 3)
67+
assert(executor.getMetadata.getLabels.get(SPARK_EXECUTOR_ID_LABEL) === "1")
6768

6869
// There is exactly 1 container with no volume mounts and default memory limits.
6970
// Default memory limit is 1024M + 384M (minimum overhead constant).
@@ -120,14 +121,13 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef
120121
// Check that the expected environment variables are present.
121122
private def checkEnv(executor: Pod, additionalEnvVars: Map[String, String]): Unit = {
122123
val defaultEnvs = Map(
123-
constants.ENV_EXECUTOR_ID -> "1",
124-
constants.ENV_DRIVER_URL -> "dummy",
125-
constants.ENV_EXECUTOR_CORES -> "1",
126-
constants.ENV_EXECUTOR_MEMORY -> "1g",
127-
constants.ENV_APPLICATION_ID -> "dummy",
128-
constants.ENV_MOUNTED_CLASSPATH -> "/var/spark-data/spark-jars/*",
129-
constants.ENV_EXECUTOR_POD_IP -> null,
130-
constants.ENV_EXECUTOR_PORT -> "10000") ++ additionalEnvVars
124+
ENV_EXECUTOR_ID -> "1",
125+
ENV_DRIVER_URL -> "dummy",
126+
ENV_EXECUTOR_CORES -> "1",
127+
ENV_EXECUTOR_MEMORY -> "1g",
128+
ENV_APPLICATION_ID -> "dummy",
129+
ENV_EXECUTOR_POD_IP -> null,
130+
ENV_EXECUTOR_PORT -> "10000") ++ additionalEnvVars
131131

132132
assert(executor.getSpec.getContainers.size() === 1)
133133
assert(executor.getSpec.getContainers.get(0).getEnv().size() === defaultEnvs.size)

0 commit comments

Comments
 (0)