|
17 | 17 | package org.apache.spark.deploy.rest.kubernetes
|
18 | 18 |
|
19 | 19 | import java.io.FileInputStream
|
| 20 | +import java.net.{InetSocketAddress, URI} |
20 | 21 | import java.security.{KeyStore, SecureRandom}
|
21 | 22 | import javax.net.ssl.{SSLContext, TrustManagerFactory, X509TrustManager}
|
22 | 23 |
|
23 | 24 | import com.fasterxml.jackson.databind.ObjectMapper
|
24 | 25 | import com.fasterxml.jackson.module.scala.DefaultScalaModule
|
| 26 | +import io.fabric8.kubernetes.client.Config |
25 | 27 | import okhttp3.{Dispatcher, OkHttpClient}
|
26 | 28 | import retrofit2.Retrofit
|
27 | 29 | import retrofit2.converter.jackson.JacksonConverterFactory
|
28 | 30 | import retrofit2.converter.scalars.ScalarsConverterFactory
|
29 | 31 |
|
30 | 32 | import org.apache.spark.SSLOptions
|
| 33 | +import org.apache.spark.internal.Logging |
31 | 34 | import org.apache.spark.util.{ThreadUtils, Utils}
|
32 | 35 |
|
33 | 36 | private[spark] trait RetrofitClientFactory {
|
34 | 37 | def createRetrofitClient[T](baseUrl: String, serviceType: Class[T], sslOptions: SSLOptions): T
|
35 | 38 | }
|
36 | 39 |
|
37 |
| -private[spark] object RetrofitClientFactoryImpl extends RetrofitClientFactory { |
| 40 | +private[spark] object RetrofitClientFactoryImpl extends RetrofitClientFactory with Logging { |
38 | 41 |
|
39 | 42 | private val OBJECT_MAPPER = new ObjectMapper().registerModule(new DefaultScalaModule)
|
40 | 43 | private val SECURE_RANDOM = new SecureRandom()
|
41 | 44 |
|
42 | 45 | def createRetrofitClient[T](baseUrl: String, serviceType: Class[T], sslOptions: SSLOptions): T = {
|
43 | 46 | val dispatcher = new Dispatcher(ThreadUtils.newDaemonCachedThreadPool(s"http-client-$baseUrl"))
|
44 |
| - val okHttpClientBuilder = new OkHttpClient.Builder().dispatcher(dispatcher) |
| 47 | + val serviceUri = URI.create(baseUrl) |
| 48 | + val maybeAllProxy = Option.apply(System.getProperty(Config.KUBERNETES_ALL_PROXY)) |
| 49 | + val serviceUriScheme = serviceUri.getScheme |
| 50 | + val maybeHttpProxy = (if (serviceUriScheme.equalsIgnoreCase("https")) { |
| 51 | + Option.apply(System.getProperty(Config.KUBERNETES_HTTPS_PROXY)) |
| 52 | + } else if (serviceUriScheme.equalsIgnoreCase("http")) { |
| 53 | + Option.apply(System.getProperty(Config.KUBERNETES_HTTP_PROXY)) |
| 54 | + } else { |
| 55 | + maybeAllProxy |
| 56 | + }).map(uriStringToProxy) |
| 57 | + val maybeNoProxy = Option.apply(System.getProperty(Config.KUBERNETES_NO_PROXY)) |
| 58 | + .map(_.split(",")) |
| 59 | + .toSeq |
| 60 | + .flatten |
| 61 | + val resolvedProxy = maybeNoProxy.find(_ == serviceUri.getHost) |
| 62 | + .map( _ => java.net.Proxy.NO_PROXY) |
| 63 | + .orElse(maybeHttpProxy) |
| 64 | + .getOrElse(java.net.Proxy.NO_PROXY) |
| 65 | + val okHttpClientBuilder = new OkHttpClient.Builder() |
| 66 | + .dispatcher(dispatcher) |
| 67 | + .proxy(resolvedProxy) |
| 68 | + logDebug(s"Proxying to $baseUrl through address ${resolvedProxy.address()} with proxy of" + |
| 69 | + s" type ${resolvedProxy.`type`()}") |
45 | 70 | sslOptions.trustStore.foreach { trustStoreFile =>
|
46 | 71 | require(trustStoreFile.isFile, s"TrustStore provided at ${trustStoreFile.getAbsolutePath}"
|
47 | 72 | + " does not exist, or is not a file.")
|
@@ -69,4 +94,9 @@ private[spark] object RetrofitClientFactoryImpl extends RetrofitClientFactory {
|
69 | 94 | .create(serviceType)
|
70 | 95 | }
|
71 | 96 |
|
| 97 | + private def uriStringToProxy(uriString: String): java.net.Proxy = { |
| 98 | + val uriObject = URI.create(uriString) |
| 99 | + new java.net.Proxy(java.net.Proxy.Type.HTTP, |
| 100 | + new InetSocketAddress(uriObject.getHost, uriObject.getPort)) |
| 101 | + } |
72 | 102 | }
|
0 commit comments