@@ -20,6 +20,7 @@ import java.io.Closeable
20
20
import java .net .InetAddress
21
21
import java .util .concurrent .{ConcurrentHashMap , ExecutorService , ScheduledExecutorService , TimeUnit }
22
22
import java .util .concurrent .atomic .{AtomicInteger , AtomicLong , AtomicReference }
23
+ import javax .annotation .concurrent .GuardedBy
23
24
24
25
import io .fabric8 .kubernetes .api .model ._
25
26
import io .fabric8 .kubernetes .client .{KubernetesClient , KubernetesClientException , Watcher }
@@ -49,9 +50,11 @@ private[spark] class KubernetesClusterSchedulerBackend(
49
50
50
51
private val EXECUTOR_ID_COUNTER = new AtomicLong (0L )
51
52
private val RUNNING_EXECUTOR_PODS_LOCK = new Object
52
- // Indexed by executor IDs and guarded by RUNNING_EXECUTOR_PODS_LOCK.
53
+ // Indexed by executor IDs
54
+ @ GuardedBy (" RUNNING_EXECUTOR_PODS_LOCK" )
53
55
private val runningExecutorsToPods = new mutable.HashMap [String , Pod ]
54
- // Indexed by executor pod names and guarded by RUNNING_EXECUTOR_PODS_LOCK.
56
+ // Indexed by executor pod names
57
+ @ GuardedBy (" RUNNING_EXECUTOR_PODS_LOCK" )
55
58
private val runningPodsToExecutors = new mutable.HashMap [String , String ]
56
59
private val executorPodsByIPs = new ConcurrentHashMap [String , Pod ]()
57
60
private val podsWithKnownExitReasons = new ConcurrentHashMap [String , ExecutorExited ]()
@@ -105,21 +108,44 @@ private[spark] class KubernetesClusterSchedulerBackend(
105
108
106
109
override def run (): Unit = {
107
110
handleDisconnectedExecutors()
111
+ val executorsToAllocate = mutable.Map [String , Pod ]()
112
+ val currentTotalRegisteredExecutors = totalRegisteredExecutors.get
113
+ val currentTotalExpectedExecutors = totalExpectedExecutors.get
114
+ val currentNodeToLocalTaskCount = getNodesWithLocalTaskCounts
115
+ if (currentTotalRegisteredExecutors < runningExecutorsToPods.size) {
116
+ logDebug(" Waiting for pending executors before scaling" )
117
+ } else if (currentTotalExpectedExecutors <= runningExecutorsToPods.size) {
118
+ logDebug(" Maximum allowed executor limit reached. Not scaling up further." )
119
+ } else {
120
+ val nodeToLocalTaskCount = getNodesWithLocalTaskCounts
121
+ for (i <- 0 until math.min(
122
+ currentTotalExpectedExecutors - runningExecutorsToPods.size, podAllocationSize)) {
123
+ val executorId = EXECUTOR_ID_COUNTER .incrementAndGet().toString
124
+ val executorPod = executorPodFactory.createExecutorPod(
125
+ executorId,
126
+ applicationId(),
127
+ driverUrl,
128
+ conf.getExecutorEnv,
129
+ driverPod,
130
+ nodeToLocalTaskCount)
131
+ executorsToAllocate(executorId) = executorPod
132
+ logInfo(
133
+ s " Requesting a new executor, total executors is now ${runningExecutorsToPods.size}" )
134
+ }
135
+ }
136
+ val allocatedExecutors = executorsToAllocate.mapValues { pod =>
137
+ Utils .tryLog {
138
+ kubernetesClient.pods().create(pod)
139
+ }
140
+ }
108
141
RUNNING_EXECUTOR_PODS_LOCK .synchronized {
109
- if (totalRegisteredExecutors.get() < runningExecutorsToPods.size) {
110
- logDebug(" Waiting for pending executors before scaling" )
111
- } else if (totalExpectedExecutors.get() <= runningExecutorsToPods.size) {
112
- logDebug(" Maximum allowed executor limit reached. Not scaling up further." )
113
- } else {
114
- val nodeToLocalTaskCount = getNodesWithLocalTaskCounts
115
- for (i <- 0 until math.min(
116
- totalExpectedExecutors.get - runningExecutorsToPods.size, podAllocationSize)) {
117
- val (executorId, pod) = allocateNewExecutorPod(nodeToLocalTaskCount)
118
- runningExecutorsToPods.put(executorId, pod)
119
- runningPodsToExecutors.put(pod.getMetadata.getName, executorId)
120
- logInfo(
121
- s " Requesting a new executor, total executors is now ${runningExecutorsToPods.size}" )
122
- }
142
+ allocatedExecutors.map {
143
+ case (executorId, attemptedAllocatedExecutor) =>
144
+ attemptedAllocatedExecutor.map { successfullyAllocatedExecutor =>
145
+ runningExecutorsToPods.put(executorId, successfullyAllocatedExecutor)
146
+ runningPodsToExecutors.put(
147
+ successfullyAllocatedExecutor.getMetadata.getName, executorId)
148
+ }
123
149
}
124
150
}
125
151
}
@@ -128,25 +154,25 @@ private[spark] class KubernetesClusterSchedulerBackend(
128
154
// For each disconnected executor, synchronize with the loss reasons that may have been found
129
155
// by the executor pod watcher. If the loss reason was discovered by the watcher,
130
156
// inform the parent class with removeExecutor.
131
- disconnectedPodsByExecutorIdPendingRemoval.keys().asScala.foreach { case (executorId) =>
132
- val executorPod = disconnectedPodsByExecutorIdPendingRemoval.get(executorId)
133
- val knownExitReason = Option (podsWithKnownExitReasons.remove(
134
- executorPod.getMetadata.getName))
135
- knownExitReason.fold {
136
- removeExecutorOrIncrementLossReasonCheckCount(executorId)
137
- } { executorExited =>
138
- logWarning(s " Removing executor $executorId with loss reason " + executorExited.message)
139
- removeExecutor(executorId, executorExited)
140
- // We keep around executors that have exit conditions caused by the application. This
141
- // allows them to be debugged later on. Otherwise, mark them as to be deleted from the
142
- // the API server.
143
- if (! executorExited.exitCausedByApp) {
144
- logInfo(s " Executor $executorId failed because of a framework error. " )
145
- deleteExecutorFromClusterAndDataStructures(executorId)
146
- } else {
147
- logInfo(s " Executor $executorId exited because of the application. " )
157
+ disconnectedPodsByExecutorIdPendingRemoval.asScala.foreach {
158
+ case (executorId, executorPod) =>
159
+ val knownExitReason = Option (podsWithKnownExitReasons.remove(
160
+ executorPod.getMetadata.getName))
161
+ knownExitReason.fold {
162
+ removeExecutorOrIncrementLossReasonCheckCount(executorId)
163
+ } { executorExited =>
164
+ logWarning(s " Removing executor $executorId with loss reason " + executorExited.message)
165
+ removeExecutor(executorId, executorExited)
166
+ // We keep around executors that have exit conditions caused by the application. This
167
+ // allows them to be debugged later on. Otherwise, mark them as to be deleted from the
168
+ // the API server.
169
+ if (! executorExited.exitCausedByApp) {
170
+ logInfo(s " Executor $executorId failed because of a framework error. " )
171
+ deleteExecutorFromClusterAndDataStructures(executorId)
172
+ } else {
173
+ logInfo(s " Executor $executorId exited because of the application. " )
174
+ }
148
175
}
149
- }
150
176
}
151
177
}
152
178
@@ -163,12 +189,17 @@ private[spark] class KubernetesClusterSchedulerBackend(
163
189
def deleteExecutorFromClusterAndDataStructures (executorId : String ): Unit = {
164
190
disconnectedPodsByExecutorIdPendingRemoval.remove(executorId)
165
191
executorReasonCheckAttemptCounts -= executorId
166
- RUNNING_EXECUTOR_PODS_LOCK .synchronized {
192
+ podsWithKnownExitReasons -= executorId
193
+ val maybeExecutorPodToDelete = RUNNING_EXECUTOR_PODS_LOCK .synchronized {
167
194
runningExecutorsToPods.remove(executorId).map { pod =>
168
- kubernetesClient.pods().delete(pod)
169
195
runningPodsToExecutors.remove(pod.getMetadata.getName)
170
- }.getOrElse(logWarning(s " Unable to remove pod for unknown executor $executorId" ))
196
+ pod
197
+ }.orElse {
198
+ logWarning(s " Unable to remove pod for unknown executor $executorId" )
199
+ None
200
+ }
171
201
}
202
+ maybeExecutorPodToDelete.foreach(pod => kubernetesClient.pods().delete(pod))
172
203
}
173
204
}
174
205
@@ -203,25 +234,23 @@ private[spark] class KubernetesClusterSchedulerBackend(
203
234
// TODO investigate why Utils.tryLogNonFatalError() doesn't work in this context.
204
235
// When using Utils.tryLogNonFatalError some of the code fails but without any logs or
205
236
// indication as to why.
206
- try {
207
- RUNNING_EXECUTOR_PODS_LOCK .synchronized {
208
- runningExecutorsToPods.values.foreach(kubernetesClient.pods().delete(_) )
237
+ Utils .tryLogNonFatalError {
238
+ val executorPodsToDelete = RUNNING_EXECUTOR_PODS_LOCK .synchronized {
239
+ val runningExecutorPodsCopy = Seq ( runningExecutorsToPods.values.toSeq : _* )
209
240
runningExecutorsToPods.clear()
210
241
runningPodsToExecutors.clear()
242
+ runningExecutorPodsCopy
211
243
}
244
+ kubernetesClient.pods().delete(executorPodsToDelete : _* )
212
245
executorPodsByIPs.clear()
213
246
val resource = executorWatchResource.getAndSet(null )
214
247
if (resource != null ) {
215
248
resource.close()
216
249
}
217
- } catch {
218
- case e : Throwable => logError(" Uncaught exception while shutting down controllers." , e)
219
250
}
220
- try {
251
+ Utils .tryLogNonFatalError {
221
252
logInfo(" Closing kubernetes client" )
222
253
kubernetesClient.close()
223
- } catch {
224
- case e : Throwable => logError(" Uncaught exception closing Kubernetes client." , e)
225
254
}
226
255
}
227
256
@@ -231,7 +260,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
231
260
*/
232
261
private def getNodesWithLocalTaskCounts () : Map [String , Int ] = {
233
262
val nodeToLocalTaskCount = mutable.Map [String , Int ]() ++
234
- KubernetesClusterSchedulerBackend . this . synchronized {
263
+ synchronized {
235
264
hostToLocalTaskCount
236
265
}
237
266
for (pod <- executorPodsByIPs.values().asScala) {
@@ -247,58 +276,31 @@ private[spark] class KubernetesClusterSchedulerBackend(
247
276
nodeToLocalTaskCount.toMap[String , Int ]
248
277
}
249
278
250
- /**
251
- * Allocates a new executor pod
252
- *
253
- * @param nodeToLocalTaskCount A map of K8s cluster nodes to the number of tasks that could
254
- * benefit from data locality if an executor launches on the cluster
255
- * node.
256
- * @return A tuple of the new executor name and the Pod data structure.
257
- */
258
- private def allocateNewExecutorPod (nodeToLocalTaskCount : Map [String , Int ]): (String , Pod ) = {
259
- val executorId = EXECUTOR_ID_COUNTER .incrementAndGet().toString
260
- val executorPod = executorPodFactory.createExecutorPod(
261
- executorId,
262
- applicationId(),
263
- driverUrl,
264
- conf.getExecutorEnv,
265
- driverPod,
266
- nodeToLocalTaskCount)
267
- try {
268
- (executorId, kubernetesClient.pods.create(executorPod))
269
- } catch {
270
- case throwable : Throwable =>
271
- logError(" Failed to allocate executor pod." , throwable)
272
- throw throwable
273
- }
274
- }
275
-
276
279
override def doRequestTotalExecutors (requestedTotal : Int ): Future [Boolean ] = Future [Boolean ] {
277
280
totalExpectedExecutors.set(requestedTotal)
278
281
true
279
282
}
280
283
281
284
override def doKillExecutors (executorIds : Seq [String ]): Future [Boolean ] = Future [Boolean ] {
285
+ val podsToDelete = mutable.Buffer [Pod ]()
282
286
RUNNING_EXECUTOR_PODS_LOCK .synchronized {
283
287
for (executor <- executorIds) {
284
288
val maybeRemovedExecutor = runningExecutorsToPods.remove(executor)
285
289
maybeRemovedExecutor.foreach { executorPod =>
286
- kubernetesClient.pods().delete(executorPod)
287
290
disconnectedPodsByExecutorIdPendingRemoval.put(executor, executorPod)
288
291
runningPodsToExecutors.remove(executorPod.getMetadata.getName)
292
+ podsToDelete += executorPod
289
293
}
290
294
if (maybeRemovedExecutor.isEmpty) {
291
295
logWarning(s " Unable to remove pod for unknown executor $executor" )
292
296
}
293
297
}
294
298
}
299
+ kubernetesClient.pods().delete(podsToDelete : _* )
295
300
true
296
301
}
297
302
298
303
def getExecutorPodByIP (podIP : String ): Option [Pod ] = {
299
- // Note: Per https://github.com/databricks/scala-style-guide#concurrency, we don't
300
- // want to be switching to scala.collection.concurrent.Map on
301
- // executorPodsByIPs.
302
304
val pod = executorPodsByIPs.get(podIP)
303
305
Option (pod)
304
306
}
0 commit comments