Skip to content

Commit 8f3d965

Browse files
mccheahash211
authored andcommitted
Replace submission v1 with submission v2. (apache-spark-on-k8s#286)
* Replace submission v1 with submission v2. * Address documentation changes. * Fix documentation
1 parent 408c65f commit 8f3d965

File tree

67 files changed

+668
-3324
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

67 files changed

+668
-3324
lines changed
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
---
2+
apiVersion: extensions/v1beta1
3+
kind: Deployment
4+
metadata:
5+
name: spark-resource-staging-server
6+
spec:
7+
replicas: 1
8+
template:
9+
metadata:
10+
labels:
11+
resource-staging-server-instance: default
12+
spec:
13+
volumes:
14+
- name: resource-staging-server-properties
15+
configMap:
16+
name: spark-resource-staging-server-config
17+
containers:
18+
- name: spark-resource-staging-server
19+
image: kubespark/spark-resource-staging-server:v2.1.0-kubernetes-0.1.0-alpha.3
20+
resources:
21+
requests:
22+
cpu: 100m
23+
memory: 256Mi
24+
limits:
25+
cpu: 100m
26+
memory: 256Mi
27+
volumeMounts:
28+
- name: resource-staging-server-properties
29+
mountPath: '/etc/spark-resource-staging-server'
30+
args:
31+
- '/etc/spark-resource-staging-server/resource-staging-server.properties'
32+
---
33+
apiVersion: v1
34+
kind: ConfigMap
35+
metadata:
36+
name: spark-resource-staging-server-config
37+
data:
38+
resource-staging-server.properties: |
39+
spark.kubernetes.resourceStagingServer.port=10000
40+
spark.ssl.kubernetes.resourceStagingServer.enabled=false
41+
# Other possible properties are listed below, primarily for setting up TLS. The paths given by KeyStore, password, and PEM files here should correspond to
42+
# files that are securely mounted into the resource staging server container, via e.g. secret volumes.
43+
# spark.ssl.kubernetes.resourceStagingServer.keyStore=/mnt/secrets/resource-staging-server/keyStore.jks
44+
# spark.ssl.kubernetes.resourceStagingServer.keyStorePassword=changeit
45+
# spark.ssl.kubernetes.resourceStagingServer.keyPassword=changeit
46+
# spark.ssl.kubernetes.resourceStagingServer.keyStorePasswordFile=/mnt/secrets/resource-staging-server/keystore-password.txt
47+
# spark.ssl.kubernetes.resourceStagingServer.keyPasswordFile=/mnt/secrets/resource-staging-server/keystore-key-password.txt
48+
# spark.ssl.kubernetes.resourceStagingServer.keyPem=/mnt/secrets/resource-staging-server/key.pem
49+
# spark.ssl.kubernetes.resourceStagingServer.serverCertPem=/mnt/secrets/resource-staging-server/cert.pem
50+
---
51+
apiVersion: v1
52+
kind: Service
53+
metadata:
54+
name: spark-resource-staging-service
55+
spec:
56+
type: NodePort
57+
selector:
58+
resource-staging-server-instance: default
59+
ports:
60+
- protocol: TCP
61+
port: 10000
62+
targetPort: 10000
63+
nodePort: 31000

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -619,7 +619,7 @@ object SparkSubmit {
619619
}
620620

621621
if (isKubernetesCluster) {
622-
childMainClass = "org.apache.spark.deploy.kubernetes.submit.v1.Client"
622+
childMainClass = "org.apache.spark.deploy.kubernetes.submit.Client"
623623
childArgs += args.primaryResource
624624
childArgs += args.mainClass
625625
childArgs ++= args.childArgs

dev/.rat-excludes

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,4 +103,3 @@ org.apache.spark.scheduler.ExternalClusterManager
103103
org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
104104
spark-warehouse
105105
structured-streaming/*
106-
org.apache.spark.deploy.kubernetes.submit.v1.DriverServiceManager

docs/running-on-kubernetes.md

Lines changed: 266 additions & 150 deletions
Large diffs are not rendered by default.

resource-managers/kubernetes/core/src/main/resources/META-INF/services/org.apache.spark.deploy.kubernetes.submit.v1.DriverServiceManager

Lines changed: 0 additions & 2 deletions
This file was deleted.

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/CompressionUtils.scala

Lines changed: 6 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -16,50 +16,24 @@
1616
*/
1717
package org.apache.spark.deploy.kubernetes
1818

19-
import java.io.{ByteArrayInputStream, File, FileInputStream, FileOutputStream, InputStream, OutputStream}
19+
import java.io.{File, FileInputStream, FileOutputStream, InputStream, OutputStream}
2020
import java.util.zip.{GZIPInputStream, GZIPOutputStream}
2121

2222
import com.google.common.io.Files
23-
import org.apache.commons.codec.binary.Base64
2423
import org.apache.commons.compress.archivers.tar.{TarArchiveEntry, TarArchiveInputStream, TarArchiveOutputStream}
2524
import org.apache.commons.compress.utils.CharsetNames
2625
import org.apache.commons.io.IOUtils
2726
import scala.collection.mutable
2827

29-
import org.apache.spark.deploy.rest.kubernetes.v1.TarGzippedData
3028
import org.apache.spark.internal.Logging
31-
import org.apache.spark.util.{ByteBufferOutputStream, Utils}
29+
import org.apache.spark.util.Utils
3230

3331
private[spark] object CompressionUtils extends Logging {
3432
// Defaults from TarArchiveOutputStream
3533
private val BLOCK_SIZE = 10240
3634
private val RECORD_SIZE = 512
3735
private val ENCODING = CharsetNames.UTF_8
3836

39-
/**
40-
* Compresses all of the given paths into a gzipped-tar archive, returning the compressed data in
41-
* memory as an instance of {@link TarGzippedData}. The files are taken without consideration to
42-
* their original folder structure, and are added to the tar archive in a flat hierarchy.
43-
* Directories are not allowed, and duplicate file names are de-duplicated by appending a numeric
44-
* suffix to the file name, before the file extension. For example, if paths a/b.txt and b/b.txt
45-
* were provided, then the files added to the tar archive would be b.txt and b-1.txt.
46-
* @param paths A list of file paths to be archived
47-
* @return An in-memory representation of the compressed data.
48-
*/
49-
def createTarGzip(paths: Iterable[String]): TarGzippedData = {
50-
val compressedBytesStream = Utils.tryWithResource(new ByteBufferOutputStream()) { raw =>
51-
writeTarGzipToStream(raw, paths)
52-
raw
53-
}
54-
val compressedAsBase64 = Base64.encodeBase64String(compressedBytesStream.toByteBuffer.array)
55-
TarGzippedData(
56-
dataBase64 = compressedAsBase64,
57-
blockSize = BLOCK_SIZE,
58-
recordSize = RECORD_SIZE,
59-
encoding = ENCODING
60-
)
61-
}
62-
6337
def writeTarGzipToStream(outputStream: OutputStream, paths: Iterable[String]): Unit = {
6438
Utils.tryWithResource(new GZIPOutputStream(outputStream)) { gzipping =>
6539
Utils.tryWithResource(new TarArchiveOutputStream(
@@ -98,50 +72,14 @@ private[spark] object CompressionUtils extends Logging {
9872
}
9973
}
10074

101-
/**
102-
* Decompresses the provided tar archive to a directory.
103-
* @param compressedData In-memory representation of the compressed data, ideally created via
104-
* {@link createTarGzip}.
105-
* @param rootOutputDir Directory to write the output files to. All files from the tarball
106-
* are written here in a flat hierarchy.
107-
* @return List of file paths for each file that was unpacked from the archive.
108-
*/
109-
def unpackAndWriteCompressedFiles(
110-
compressedData: TarGzippedData,
111-
rootOutputDir: File): Seq[String] = {
112-
val compressedBytes = Base64.decodeBase64(compressedData.dataBase64)
113-
if (!rootOutputDir.exists) {
114-
if (!rootOutputDir.mkdirs) {
115-
throw new IllegalStateException(s"Failed to create output directory for unpacking" +
116-
s" files at ${rootOutputDir.getAbsolutePath}")
117-
}
118-
} else if (rootOutputDir.isFile) {
119-
throw new IllegalArgumentException(s"Root dir for writing decompressed files: " +
120-
s"${rootOutputDir.getAbsolutePath} exists and is not a directory.")
121-
}
122-
Utils.tryWithResource(new ByteArrayInputStream(compressedBytes)) { compressedBytesStream =>
123-
unpackTarStreamToDirectory(
124-
compressedBytesStream,
125-
rootOutputDir,
126-
compressedData.blockSize,
127-
compressedData.recordSize,
128-
compressedData.encoding)
129-
}
130-
}
131-
132-
def unpackTarStreamToDirectory(
133-
inputStream: InputStream,
134-
outputDir: File,
135-
blockSize: Int = BLOCK_SIZE,
136-
recordSize: Int = RECORD_SIZE,
137-
encoding: String = ENCODING): Seq[String] = {
75+
def unpackTarStreamToDirectory(inputStream: InputStream, outputDir: File): Seq[String] = {
13876
val paths = mutable.Buffer.empty[String]
13977
Utils.tryWithResource(new GZIPInputStream(inputStream)) { gzipped =>
14078
Utils.tryWithResource(new TarArchiveInputStream(
14179
gzipped,
142-
blockSize,
143-
recordSize,
144-
encoding)) { tarInputStream =>
80+
BLOCK_SIZE,
81+
RECORD_SIZE,
82+
ENCODING)) { tarInputStream =>
14583
var nextTarEntry = tarInputStream.getNextTarEntry
14684
while (nextTarEntry != null) {
14785
val outputFile = new File(outputDir, nextTarEntry.getName)

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/SparkPodInitContainerBootstrap.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.deploy.kubernetes
1919
import io.fabric8.kubernetes.api.model.{ContainerBuilder, EmptyDirVolumeSource, PodBuilder, VolumeMount, VolumeMountBuilder}
2020

2121
import org.apache.spark.deploy.kubernetes.constants._
22-
import org.apache.spark.deploy.kubernetes.submit.v2.{ContainerNameEqualityPredicate, InitContainerUtil}
22+
import org.apache.spark.deploy.kubernetes.submit.{ContainerNameEqualityPredicate, InitContainerUtil}
2323

2424
private[spark] trait SparkPodInitContainerBootstrap {
2525
/**

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala

Lines changed: 2 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import java.util.concurrent.TimeUnit
2020

2121
import org.apache.spark.{SPARK_VERSION => sparkVersion}
2222
import org.apache.spark.deploy.kubernetes.constants._
23-
import org.apache.spark.deploy.kubernetes.submit.v1.NodePortUrisDriverServiceManager
2423
import org.apache.spark.internal.Logging
2524
import org.apache.spark.internal.config.ConfigBuilder
2625
import org.apache.spark.network.util.ByteUnit
@@ -212,77 +211,6 @@ package object config extends Logging {
212211
.stringConf
213212
.createOptional
214213

215-
private[spark] val KUBERNETES_DRIVER_SUBMIT_TIMEOUT =
216-
ConfigBuilder("spark.kubernetes.driverSubmissionTimeout")
217-
.doc("Time to wait for the driver process to start running before aborting its execution.")
218-
.timeConf(TimeUnit.SECONDS)
219-
.createWithDefault(60L)
220-
221-
private[spark] val KUBERNETES_DRIVER_SUBMIT_SSL_KEYSTORE =
222-
ConfigBuilder("spark.ssl.kubernetes.driversubmitserver.keyStore")
223-
.doc("KeyStore file for the driver submission server listening on SSL. Can be pre-mounted" +
224-
" on the driver container or uploaded from the submitting client.")
225-
.stringConf
226-
.createOptional
227-
228-
private[spark] val KUBERNETES_DRIVER_SUBMIT_SSL_TRUSTSTORE =
229-
ConfigBuilder("spark.ssl.kubernetes.driversubmitserver.trustStore")
230-
.doc("TrustStore containing certificates for communicating to the driver submission server" +
231-
" over SSL.")
232-
.stringConf
233-
.createOptional
234-
235-
private[spark] val DRIVER_SUBMIT_SSL_ENABLED =
236-
ConfigBuilder("spark.ssl.kubernetes.driversubmitserver.enabled")
237-
.doc("Whether or not to use SSL when sending the application dependencies to the driver pod.")
238-
.booleanConf
239-
.createWithDefault(false)
240-
241-
private[spark] val DRIVER_SUBMIT_SSL_KEY_PEM =
242-
ConfigBuilder("spark.ssl.kubernetes.driversubmitserver.keyPem")
243-
.doc("Key PEM file that the driver submission server will use when setting up TLS" +
244-
" connections. Can be pre-mounted on the driver pod's disk or uploaded from the" +
245-
" submitting client's machine.")
246-
.stringConf
247-
.createOptional
248-
249-
private[spark] val DRIVER_SUBMIT_SSL_SERVER_CERT_PEM =
250-
ConfigBuilder("spark.ssl.kubernetes.driversubmitserver.serverCertPem")
251-
.doc("Certificate PEM file that is associated with the key PEM file" +
252-
" the submission server uses to set up TLS connections. Can be pre-mounted" +
253-
" on the driver pod's disk or uploaded from the submitting client's machine.")
254-
.stringConf
255-
.createOptional
256-
257-
private[spark] val DRIVER_SUBMIT_SSL_CLIENT_CERT_PEM =
258-
ConfigBuilder("spark.ssl.kubernetes.driversubmitserver.clientCertPem")
259-
.doc("Certificate pem file that the submission client uses to connect to the submission" +
260-
" server over TLS. This should often be the same as the server certificate, but can be" +
261-
" different if the submission client will contact the driver through a proxy instead of" +
262-
" the driver service directly.")
263-
.stringConf
264-
.createOptional
265-
266-
private[spark] val KUBERNETES_DRIVER_SERVICE_NAME =
267-
ConfigBuilder("spark.kubernetes.driver.service.name")
268-
.doc("Kubernetes service that exposes the driver pod for external access.")
269-
.internal()
270-
.stringConf
271-
.createOptional
272-
273-
private[spark] val KUBERNETES_DRIVER_SUBMIT_SERVER_MEMORY =
274-
ConfigBuilder("spark.kubernetes.driver.submissionServerMemory")
275-
.doc("The amount of memory to allocate for the driver submission server.")
276-
.bytesConf(ByteUnit.MiB)
277-
.createWithDefaultString("256m")
278-
279-
private[spark] val EXPOSE_KUBERNETES_DRIVER_SERVICE_UI_PORT =
280-
ConfigBuilder("spark.kubernetes.driver.service.exposeUiPort")
281-
.doc("Whether to expose the driver Web UI port as a service NodePort. Turned off by default" +
282-
" because NodePort is a limited resource. Use alternatives if possible.")
283-
.booleanConf
284-
.createWithDefault(false)
285-
286214
private[spark] val KUBERNETES_DRIVER_POD_NAME =
287215
ConfigBuilder("spark.kubernetes.driver.pod.name")
288216
.doc("Name of the driver pod.")
@@ -327,13 +255,6 @@ package object config extends Logging {
327255
.longConf
328256
.createWithDefault(1)
329257

330-
private[spark] val DRIVER_SERVICE_MANAGER_TYPE =
331-
ConfigBuilder("spark.kubernetes.driver.serviceManagerType")
332-
.doc("A tag indicating which class to use for creating the Kubernetes service and" +
333-
" determining its URI for the submission client.")
334-
.stringConf
335-
.createWithDefault(NodePortUrisDriverServiceManager.TYPE)
336-
337258
private[spark] val WAIT_FOR_APP_COMPLETION =
338259
ConfigBuilder("spark.kubernetes.submission.waitAppCompletion")
339260
.doc("In cluster mode, whether to wait for the application to finish before exiting the" +
@@ -347,8 +268,7 @@ package object config extends Logging {
347268
.timeConf(TimeUnit.MILLISECONDS)
348269
.createWithDefaultString("1s")
349270

350-
// Spark dependency server for submission v2
351-
271+
// Spark resource staging server.
352272
private[spark] val RESOURCE_STAGING_SERVER_PORT =
353273
ConfigBuilder("spark.kubernetes.resourceStagingServer.port")
354274
.doc("Port for the Kubernetes resource staging server to listen on.")
@@ -451,7 +371,7 @@ package object config extends Logging {
451371
.stringConf
452372
.createOptional
453373

454-
// Driver and Init-Container parameters for submission v2
374+
// Driver and Init-Container parameters
455375
private[spark] val RESOURCE_STAGING_SERVER_URI =
456376
ConfigBuilder("spark.kubernetes.resourceStagingServer.uri")
457377
.doc("Base URI for the Spark resource staging server.")
Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
package org.apache.spark.deploy.kubernetes.submit.v2
17+
package org.apache.spark.deploy.kubernetes.submit
1818

1919
import java.io.File
2020
import java.util.Collections
@@ -25,8 +25,7 @@ import scala.collection.JavaConverters._
2525
import org.apache.spark.{SparkConf, SparkException}
2626
import org.apache.spark.deploy.kubernetes.config._
2727
import org.apache.spark.deploy.kubernetes.constants._
28-
import org.apache.spark.deploy.kubernetes.submit.{LoggingPodStatusWatcher, LoggingPodStatusWatcherImpl}
29-
import org.apache.spark.deploy.rest.kubernetes.v2.ResourceStagingServerSslOptionsProviderImpl
28+
import org.apache.spark.deploy.rest.kubernetes.ResourceStagingServerSslOptionsProviderImpl
3029
import org.apache.spark.internal.Logging
3130
import org.apache.spark.launcher.SparkLauncher
3231
import org.apache.spark.util.Utils
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
package org.apache.spark.deploy.kubernetes.submit.v2
17+
package org.apache.spark.deploy.kubernetes.submit
1818

1919
import java.io.File
2020

0 commit comments

Comments
 (0)