Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit 23af2d7

Browse files
ScrapCodescloud-fan
authored andcommitted
[SPARK-20025][CORE] Ignore SPARK_LOCAL* env, while deploying via cluster mode.
## What changes were proposed in this pull request? In a bare metal system with No DNS setup, spark may be configured with SPARK_LOCAL* for IP and host properties. During a driver failover, in cluster deployment mode. SPARK_LOCAL* should be ignored while restarting on another node and should be picked up from target system's local environment. ## How was this patch tested? Distributed deployment against a spark standalone cluster of 6 Workers. Tested by killing JVM's running driver and verified the restarted JVMs have right configurations on them. Author: Prashant Sharma <[email protected]> Author: Prashant Sharma <[email protected]> Closes apache#17357 from ScrapCodes/driver-failover-fix.
1 parent b8a08f2 commit 23af2d7

File tree

3 files changed

+12
-7
lines changed

3 files changed

+12
-7
lines changed

core/src/main/scala/org/apache/spark/deploy/Client.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,19 +93,19 @@ private class ClientEndpoint(
9393
driverArgs.cores,
9494
driverArgs.supervise,
9595
command)
96-
ayncSendToMasterAndForwardReply[SubmitDriverResponse](
96+
asyncSendToMasterAndForwardReply[SubmitDriverResponse](
9797
RequestSubmitDriver(driverDescription))
9898

9999
case "kill" =>
100100
val driverId = driverArgs.driverId
101-
ayncSendToMasterAndForwardReply[KillDriverResponse](RequestKillDriver(driverId))
101+
asyncSendToMasterAndForwardReply[KillDriverResponse](RequestKillDriver(driverId))
102102
}
103103
}
104104

105105
/**
106106
* Send the message to master and forward the reply to self asynchronously.
107107
*/
108-
private def ayncSendToMasterAndForwardReply[T: ClassTag](message: Any): Unit = {
108+
private def asyncSendToMasterAndForwardReply[T: ClassTag](message: Any): Unit = {
109109
for (masterEndpoint <- masterEndpoints) {
110110
masterEndpoint.ask[T](message).onComplete {
111111
case Success(v) => self.send(v)

core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,9 @@ private[rest] class StandaloneSubmitRequestServlet(
139139
val driverExtraLibraryPath = sparkProperties.get("spark.driver.extraLibraryPath")
140140
val superviseDriver = sparkProperties.get("spark.driver.supervise")
141141
val appArgs = request.appArgs
142-
val environmentVariables = request.environmentVariables
142+
// Filter SPARK_LOCAL_(IP|HOSTNAME) environment variables from being set on the remote system.
143+
val environmentVariables =
144+
request.environmentVariables.filterNot(x => x._1.matches("SPARK_LOCAL_(IP|HOSTNAME)"))
143145

144146
// Construct driver description
145147
val conf = new SparkConf(false)

core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,15 @@ import org.apache.commons.lang3.StringUtils
2323

2424
import org.apache.spark.{SecurityManager, SparkConf}
2525
import org.apache.spark.deploy.{DependencyUtils, SparkHadoopUtil, SparkSubmit}
26+
import org.apache.spark.internal.Logging
2627
import org.apache.spark.rpc.RpcEnv
2728
import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils}
2829

2930
/**
3031
* Utility object for launching driver programs such that they share fate with the Worker process.
3132
* This is used in standalone cluster mode only.
3233
*/
33-
object DriverWrapper {
34+
object DriverWrapper extends Logging {
3435
def main(args: Array[String]) {
3536
args.toList match {
3637
/*
@@ -41,8 +42,10 @@ object DriverWrapper {
4142
*/
4243
case workerUrl :: userJar :: mainClass :: extraArgs =>
4344
val conf = new SparkConf()
44-
val rpcEnv = RpcEnv.create("Driver",
45-
Utils.localHostName(), 0, conf, new SecurityManager(conf))
45+
val host: String = Utils.localHostName()
46+
val port: Int = sys.props.getOrElse("spark.driver.port", "0").toInt
47+
val rpcEnv = RpcEnv.create("Driver", host, port, conf, new SecurityManager(conf))
48+
logInfo(s"Driver address: ${rpcEnv.address}")
4649
rpcEnv.setupEndpoint("workerWatcher", new WorkerWatcher(rpcEnv, workerUrl))
4750

4851
val currentLoader = Thread.currentThread.getContextClassLoader

0 commit comments

Comments
 (0)