Skip to content

Commit 56414f9

Browse files
mccheahash211
authored andcommitted
Added files should be in the working directories. (apache-spark-on-k8s#294)
* Added files should be in the working directories. * Revert unintentional changes * Fix test
1 parent 8f3d965 commit 56414f9

File tree

10 files changed

+75
-31
lines changed

10 files changed

+75
-31
lines changed

docs/running-on-kubernetes.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -558,6 +558,22 @@ from the other deployment modes. See the [configuration page](configuration.html
558558
disk as a secret into the init-containers.
559559
</td>
560560
</tr>
561+
<tr>
562+
<td><code>spark.kubernetes.mountdependencies.jarsDownloadDir</code></td>
563+
<td><code>/var/spark-data/spark-jars</code></td>
564+
<td>
565+
Location to download jars to in the driver and executors. This will be mounted as an empty directory volume
566+
into the driver and executor containers.
567+
</td>
568+
</tr>
569+
<tr>
570+
<td><code>spark.kubernetes.mountdependencies.filesDownloadDir</code></td>
571+
<td><code>/var/spark-data/spark-files</code></td>
572+
<td>
573+
Location to download files to in the driver and executors. This will be mounted as an empty directory volume
574+
into the driver and executor containers.
575+
</td>
576+
</tr>
561577
<tr>
562578
<td><code>spark.kubernetes.report.interval</code></td>
563579
<td><code>1s</code></td>

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/SparkPodInitContainerBootstrap.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,10 @@ private[spark] class SparkPodInitContainerBootstrapImpl(
9393
.endVolume()
9494
.editMatchingContainer(new ContainerNameEqualityPredicate(mainContainerName))
9595
.addToVolumeMounts(sharedVolumeMounts: _*)
96+
.addNewEnv()
97+
.withName(ENV_MOUNTED_FILES_DIR)
98+
.withValue(filesDownloadPath)
99+
.endEnv()
96100
.endContainer()
97101
.endSpec()
98102
resourceStagingServerSecretPlugin.map { plugin =>

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -447,15 +447,15 @@ package object config extends Logging {
447447
" spark-submit, this directory must be empty and will be mounted as an empty directory" +
448448
" volume on the driver and executor pod.")
449449
.stringConf
450-
.createWithDefault("/var/spark-data/spark-submitted-jars")
450+
.createWithDefault("/var/spark-data/spark-jars")
451451

452452
private[spark] val INIT_CONTAINER_FILES_DOWNLOAD_LOCATION =
453453
ConfigBuilder("spark.kubernetes.mountdependencies.filesDownloadDir")
454454
.doc("Location to download files to in the driver and executors. When using" +
455455
" spark-submit, this directory must be empty and will be mounted as an empty directory" +
456456
" volume on the driver and executor pods.")
457457
.stringConf
458-
.createWithDefault("/var/spark-data/spark-submitted-files")
458+
.createWithDefault("/var/spark-data/spark-files")
459459

460460
private[spark] val INIT_CONTAINER_MOUNT_TIMEOUT =
461461
ConfigBuilder("spark.kubernetes.mountdependencies.mountTimeout")

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ package object constants {
9292
private[spark] val ENV_DRIVER_MAIN_CLASS = "SPARK_DRIVER_CLASS"
9393
private[spark] val ENV_DRIVER_ARGS = "SPARK_DRIVER_ARGS"
9494
private[spark] val ENV_DRIVER_JAVA_OPTS = "SPARK_DRIVER_JAVA_OPTS"
95+
private[spark] val ENV_MOUNTED_FILES_DIR = "SPARK_MOUNTED_FILES_DIR"
9596

9697
// Annotation keys
9798
private[spark] val ANNOTATION_PROVIDE_EXTERNAL_URI =

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/SparkPodInitContainerBootstrapSuite.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,16 @@ class SparkPodInitContainerBootstrapSuite extends SparkFunSuite with BeforeAndAf
111111
})
112112
}
113113

114+
test("Files download path is set as environment variable") {
115+
val bootstrappedPod = bootstrapPodWithoutSubmittedDependencies()
116+
val containers = bootstrappedPod.getSpec.getContainers.asScala
117+
val maybeMainContainer = containers.find(_.getName === MAIN_CONTAINER_NAME)
118+
assert(maybeMainContainer.exists { mainContainer =>
119+
mainContainer.getEnv.asScala.exists(envVar =>
120+
envVar.getName == ENV_MOUNTED_FILES_DIR && envVar.getValue == FILES_DOWNLOAD_PATH)
121+
})
122+
}
123+
114124
test("Running with submitted dependencies modifies the init container with the plugin.") {
115125
val bootstrappedPod = bootstrapPodWithSubmittedDependencies()
116126
val podAnnotations = bootstrappedPod.getMetadata.getAnnotations.asScala

resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver/Dockerfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,4 +40,5 @@ CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \
4040
if ! [ -z ${SPARK_MOUNTED_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \
4141
if ! [ -z ${SPARK_SUBMIT_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_SUBMIT_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \
4242
if ! [ -z ${SPARK_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \
43+
if ! [ -z ${SPARK_MOUNTED_FILES_DIR} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \
4344
exec ${JAVA_HOME}/bin/java $SPARK_DRIVER_JAVA_OPTS -cp $SPARK_CLASSPATH -Xms$SPARK_DRIVER_MEMORY -Xmx$SPARK_DRIVER_MEMORY $SPARK_DRIVER_CLASS $SPARK_DRIVER_ARGS

resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor/Dockerfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,4 +40,5 @@ WORKDIR /opt/spark
4040
CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \
4141
if ! [ -z ${SPARK_MOUNTED_CLASSPATH}+x} ]; then SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \
4242
if ! [ -z ${SPARK_EXECUTOR_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXECUTOR_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \
43+
if ! [ -z ${SPARK_MOUNTED_FILES_DIR} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \
4344
exec ${JAVA_HOME}/bin/java -Dspark.executor.port=$SPARK_EXECUTOR_PORT -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp $SPARK_CLASSPATH org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP

resource-managers/kubernetes/integration-tests-spark-jobs/src/main/scala/org/apache/spark/deploy/kubernetes/integrationtest/jobs/FileExistenceTest.scala

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,9 @@ private[spark] object FileExistenceTest {
2828

2929
def main(args: Array[String]): Unit = {
3030
if (args.length < 2) {
31-
throw new IllegalArgumentException("Usage: WordCount <source-file> <expected contents>")
31+
throw new IllegalArgumentException(
32+
s"Invalid args: ${args.mkString}, " +
33+
"Usage: FileExistenceTest <source-file> <expected contents>")
3234
}
3335
// Can't use SparkContext.textFile since the file is local to the driver
3436
val file = Paths.get(args(0)).toFile
@@ -39,16 +41,15 @@ private[spark] object FileExistenceTest {
3941
val contents = Files.toString(file, Charsets.UTF_8)
4042
if (args(1) != contents) {
4143
throw new SparkException(s"Contents do not match. Expected: ${args(1)}," +
42-
s" actual, $contents")
44+
s" actual: $contents")
4345
} else {
4446
println(s"File found at ${file.getAbsolutePath} with correct contents.")
4547
}
4648
// scalastyle:on println
4749
}
48-
val spark = SparkSession.builder()
49-
.appName("Test")
50-
.getOrCreate()
51-
spark.stop()
50+
while (true) {
51+
Thread.sleep(600000)
52+
}
5253
}
5354

5455
}

resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala

Lines changed: 32 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package org.apache.spark.deploy.kubernetes.integrationtest
1818

19+
import java.io.File
1920
import java.nio.file.Paths
2021
import java.util.UUID
2122

@@ -35,11 +36,11 @@ import org.apache.spark.deploy.kubernetes.integrationtest.backend.minikube.Minik
3536
import org.apache.spark.deploy.kubernetes.integrationtest.constants.MINIKUBE_TEST_BACKEND
3637
import org.apache.spark.deploy.kubernetes.submit.{Client, KeyAndCertPem}
3738
import org.apache.spark.launcher.SparkLauncher
39+
import org.apache.spark.util.Utils
3840

3941
private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
4042
import KubernetesSuite._
4143
private val testBackend = IntegrationTestBackendFactory.getTestBackend()
42-
4344
private val APP_LOCATOR_LABEL = UUID.randomUUID().toString.replaceAll("-", "")
4445
private var kubernetesTestComponents: KubernetesTestComponents = _
4546
private var sparkConf: SparkConf = _
@@ -124,7 +125,11 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
124125
sparkConf.set("spark.kubernetes.shuffle.labels", "app=spark-shuffle-service")
125126
sparkConf.set("spark.kubernetes.shuffle.namespace", kubernetesTestComponents.namespace)
126127
sparkConf.set("spark.app.name", "group-by-test")
127-
runSparkGroupByTestAndVerifyCompletion(SUBMITTER_LOCAL_MAIN_APP_RESOURCE)
128+
runSparkApplicationAndVerifyCompletion(
129+
SUBMITTER_LOCAL_MAIN_APP_RESOURCE,
130+
GROUP_BY_MAIN_CLASS,
131+
"The Result is",
132+
Array.empty[String])
128133
}
129134

130135
test("Use remote resources without the resource staging server.") {
@@ -173,6 +178,20 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
173178
runSparkPiAndVerifyCompletion(SparkLauncher.NO_RESOURCE)
174179
}
175180

181+
test("Added files should be placed in the driver's working directory.") {
182+
assume(testBackend.name == MINIKUBE_TEST_BACKEND)
183+
val testExistenceFileTempDir = Utils.createTempDir(namePrefix = "test-existence-file-temp-dir")
184+
val testExistenceFile = new File(testExistenceFileTempDir, "input.txt")
185+
Files.write(TEST_EXISTENCE_FILE_CONTENTS, testExistenceFile, Charsets.UTF_8)
186+
launchStagingServer(SSLOptions(), None)
187+
sparkConf.set("spark.files", testExistenceFile.getAbsolutePath)
188+
runSparkApplicationAndVerifyCompletion(
189+
SUBMITTER_LOCAL_MAIN_APP_RESOURCE,
190+
FILE_EXISTENCE_MAIN_CLASS,
191+
s"File found at /opt/spark/${testExistenceFile.getName} with correct contents.",
192+
Array(testExistenceFile.getName, TEST_EXISTENCE_FILE_CONTENTS))
193+
}
194+
176195
private def launchStagingServer(
177196
resourceStagingServerSslOptions: SSLOptions, keyAndCertPem: Option[KeyAndCertPem]): Unit = {
178197
assume(testBackend.name == MINIKUBE_TEST_BACKEND)
@@ -190,27 +209,19 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
190209
}
191210

192211
private def runSparkPiAndVerifyCompletion(appResource: String): Unit = {
193-
Client.run(sparkConf, appResource, SPARK_PI_MAIN_CLASS, Array.empty[String])
194-
val driverPod = kubernetesTestComponents.kubernetesClient
195-
.pods()
196-
.withLabel("spark-app-locator", APP_LOCATOR_LABEL)
197-
.list()
198-
.getItems
199-
.get(0)
200-
Eventually.eventually(TIMEOUT, INTERVAL) {
201-
assert(kubernetesTestComponents.kubernetesClient
202-
.pods()
203-
.withName(driverPod.getMetadata.getName)
204-
.getLog
205-
.contains("Pi is roughly 3"), "The application did not compute the value of pi.")
206-
}
212+
runSparkApplicationAndVerifyCompletion(
213+
appResource, SPARK_PI_MAIN_CLASS, "Pi is roughly 3", Array.empty[String])
207214
}
208215

209-
private def runSparkGroupByTestAndVerifyCompletion(appResource: String): Unit = {
216+
private def runSparkApplicationAndVerifyCompletion(
217+
appResource: String,
218+
mainClass: String,
219+
expectedLogOnCompletion: String,
220+
appArgs: Array[String]): Unit = {
210221
Client.run(
211222
sparkConf = sparkConf,
212-
appArgs = Array.empty[String],
213-
mainClass = GROUP_BY_MAIN_CLASS,
223+
appArgs = appArgs,
224+
mainClass = mainClass,
214225
mainAppResource = appResource)
215226
val driverPod = kubernetesTestComponents.kubernetesClient
216227
.pods()
@@ -223,7 +234,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
223234
.pods()
224235
.withName(driverPod.getMetadata.getName)
225236
.getLog
226-
.contains("The Result is"), "The application did not complete.")
237+
.contains(expectedLogOnCompletion), "The application did not complete.")
227238
}
228239
}
229240

@@ -285,8 +296,6 @@ private[spark] object KubernetesSuite {
285296
val CONTAINER_LOCAL_HELPER_JAR_PATH = s"local:///opt/spark/examples/" +
286297
s"integration-tests-jars/${HELPER_JAR_FILE.getName}"
287298

288-
val TEST_EXISTENCE_FILE = Paths.get("test-data", "input.txt").toFile
289-
val TEST_EXISTENCE_FILE_CONTENTS = Files.toString(TEST_EXISTENCE_FILE, Charsets.UTF_8)
290299
val TIMEOUT = PatienceConfiguration.Timeout(Span(2, Minutes))
291300
val INTERVAL = PatienceConfiguration.Interval(Span(2, Seconds))
292301
val SPARK_PI_MAIN_CLASS = "org.apache.spark.deploy.kubernetes" +
@@ -295,6 +304,7 @@ private[spark] object KubernetesSuite {
295304
".integrationtest.jobs.FileExistenceTest"
296305
val GROUP_BY_MAIN_CLASS = "org.apache.spark.deploy.kubernetes" +
297306
".integrationtest.jobs.GroupByTest"
307+
val TEST_EXISTENCE_FILE_CONTENTS = "contents"
298308

299309
case object ShuffleNotReadyException extends Exception
300310
}

resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesTestComponents.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ private[spark] class KubernetesTestComponents(defaultClient: DefaultKubernetesCl
6363
.set("spark.executor.memory", "500m")
6464
.set("spark.executor.cores", "1")
6565
.set("spark.executors.instances", "1")
66-
.set("spark.app.name", "spark-pi")
66+
.set("spark.app.name", "spark-test-app")
6767
.set("spark.ui.enabled", "true")
6868
.set("spark.testing", "false")
6969
.set(WAIT_FOR_APP_COMPLETION, false)

0 commit comments

Comments
 (0)