Skip to content

Commit 6d2db1d

Browse files
e5losipxd
andauthored
KTOR-4766 Add Unix domain socket support for the CIO server and client engines (#4844)
* KTOR-4766 Add Unix domain socket support for the CIO server and client * Update ktor-server/ktor-server-cio/common/src/io/ktor/server/cio/CIOApplicationEngine.kt Co-authored-by: Osip Fatkullin <osip.fatkullin@jetbrains.com> --------- Co-authored-by: Osip Fatkullin <osip.fatkullin@jetbrains.com>
1 parent c40b2b6 commit 6d2db1d

File tree

40 files changed

+692
-123
lines changed

40 files changed

+692
-123
lines changed

gradle.properties

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ version=3.2.0-SNAPSHOT
3333
# * '-Dkotlin.daemon.options=autoshutdownIdleSeconds=30'
3434
# To save some memory, shutting down Kotlin Daemon used for buildSrc compilation after 30 seconds of idle.
3535
# See: https://github.com/gradle/gradle/issues/29331
36-
# (CI-specific configurations can be found in 'teamcity.default.properties')
3736
org.gradle.jvmargs=-Xms2g -Xmx9g -XX:+HeapDumpOnOutOfMemoryError -XX:+UseParallelGC -Dfile.encoding=UTF-8 -Dkotlin.daemon.jvm.options=-Xmx512m,Xms256m,-XX:MaxMetaspaceSize=256m,XX:+HeapDumpOnOutOfMemoryError
3837
kotlin.daemon.jvmargs=-Xms512m -Xmx2g -XX:MaxMetaspaceSize=256m -XX:+HeapDumpOnOutOfMemoryError
3938
# Gradle Doctor might increase memory consumption when task monitoring is enabled, so it is disabled by default.

ktor-client/ktor-client-cio/common/src/io/ktor/client/engine/cio/CIOEngine.kt

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,13 @@ internal class CIOEngine(
2424
override val config: CIOEngineConfig
2525
) : HttpClientEngineBase("ktor-cio") {
2626

27-
override val supportedCapabilities =
28-
setOf(HttpTimeoutCapability, WebSocketCapability, WebSocketExtensionsCapability, SSECapability)
27+
override val supportedCapabilities: Set<HttpClientEngineCapability<out Any>> = setOf(
28+
HttpTimeoutCapability,
29+
WebSocketCapability,
30+
WebSocketExtensionsCapability,
31+
SSECapability,
32+
UnixSocketCapability
33+
)
2934

3035
private val endpoints = ConcurrentMap<String, Endpoint>()
3136

@@ -74,7 +79,8 @@ internal class CIOEngine(
7479
val callContext = callContext()
7580

7681
while (coroutineContext.isActive) {
77-
val endpoint = selectEndpoint(data.url, proxy)
82+
val unixSocket = data.getCapabilityOrNull(UnixSocketCapability)
83+
val endpoint = selectEndpoint(data.url, proxy, unixSocket)
7884

7985
try {
8086
return endpoint.execute(data, callContext)
@@ -100,7 +106,7 @@ internal class CIOEngine(
100106
(requestsJob[Job] as CompletableJob).complete()
101107
}
102108

103-
private fun selectEndpoint(url: Url, proxy: ProxyConfig?): Endpoint {
109+
private fun selectEndpoint(url: Url, proxy: ProxyConfig?, unixSocket: UnixSocketSettings?): Endpoint {
104110
val host: String
105111
val port: Int
106112
val protocol: URLProtocol = url.protocol
@@ -114,7 +120,7 @@ internal class CIOEngine(
114120
port = url.port
115121
}
116122

117-
val endpointId = "$host:$port:$protocol"
123+
val endpointId = "$host:$port:$protocol:${unixSocket?.path}"
118124

119125
return endpoints.computeIfAbsent(endpointId) {
120126
val secure = (protocol.isSecure())
@@ -126,7 +132,8 @@ internal class CIOEngine(
126132
config,
127133
connectionFactory,
128134
coroutineContext,
129-
onDone = { endpoints.remove(endpointId) }
135+
onDone = { endpoints.remove(endpointId) },
136+
unixSocket,
130137
)
131138
}
132139
}

ktor-client/ktor-client-cio/common/src/io/ktor/client/engine/cio/ConnectionFactory.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,10 @@ internal class ConnectionFactory(
1515
private val addressConnectionsLimit: Int
1616
) {
1717
private val limit = Semaphore(connectionsLimit)
18-
private val addressLimit = ConcurrentMap<InetSocketAddress, Semaphore>()
18+
private val addressLimit = ConcurrentMap<SocketAddress, Semaphore>()
1919

2020
suspend fun connect(
21-
address: InetSocketAddress,
21+
address: SocketAddress,
2222
configuration: SocketOptions.TCPClientSocketOptions.() -> Unit = {}
2323
): Socket {
2424
limit.acquire()
@@ -39,8 +39,8 @@ internal class ConnectionFactory(
3939
}
4040
}
4141

42-
fun release(address: InetSocketAddress) {
43-
addressLimit[address]!!.release()
42+
fun release(address: SocketAddress) {
43+
addressLimit[address]?.release()
4444
limit.release()
4545
}
4646
}

ktor-client/ktor-client-cio/common/src/io/ktor/client/engine/cio/Endpoint.kt

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
package io.ktor.client.engine.cio
66

77
import io.ktor.client.engine.*
8-
import io.ktor.client.network.sockets.*
98
import io.ktor.client.plugins.*
109
import io.ktor.client.request.*
1110
import io.ktor.client.utils.*
@@ -16,13 +15,15 @@ import io.ktor.util.date.*
1615
import io.ktor.util.logging.*
1716
import io.ktor.utils.io.*
1817
import io.ktor.utils.io.core.*
19-
import kotlinx.atomicfu.*
18+
import kotlinx.atomicfu.AtomicInt
19+
import kotlinx.atomicfu.atomic
2020
import kotlinx.coroutines.*
21-
import kotlinx.coroutines.channels.*
22-
import kotlin.coroutines.*
21+
import kotlinx.coroutines.channels.Channel
22+
import kotlin.coroutines.CoroutineContext
2323

2424
private val LOGGER = KtorSimpleLogger("io.ktor.client.engine.cio.Endpoint")
2525

26+
@OptIn(InternalAPI::class)
2627
internal class Endpoint(
2728
private val host: String,
2829
private val port: Int,
@@ -31,7 +32,8 @@ internal class Endpoint(
3132
private val config: CIOEngineConfig,
3233
private val connectionFactory: ConnectionFactory,
3334
override val coroutineContext: CoroutineContext,
34-
private val onDone: () -> Unit
35+
private val onDone: () -> Unit,
36+
private val unixSocket: UnixSocketSettings?,
3537
) : CoroutineScope, Closeable {
3638
private val lastActivity = atomic(getTimeMillis())
3739
private val connections: AtomicInt = atomic(0)
@@ -194,7 +196,7 @@ internal class Endpoint(
194196
}
195197

196198
@Suppress("UNUSED_EXPRESSION")
197-
private suspend fun connect(requestData: HttpRequestData): Pair<InetSocketAddress, Connection> {
199+
private suspend fun connect(requestData: HttpRequestData): Pair<SocketAddress, Connection> {
198200
val connectAttempts = config.endpoint.connectAttempts
199201
val (connectTimeout, socketTimeout) = retrieveTimeouts(requestData)
200202
var timeoutFails = 0
@@ -203,7 +205,11 @@ internal class Endpoint(
203205

204206
try {
205207
repeat(connectAttempts) {
206-
val address = InetSocketAddress(host, port)
208+
val address = if (unixSocket != null) {
209+
UnixSocketAddress(unixSocket.path)
210+
} else {
211+
InetSocketAddress(host, port)
212+
}
207213

208214
val connect: suspend CoroutineScope.() -> Socket = {
209215
connectionFactory.connect(address) {
@@ -230,8 +236,16 @@ internal class Endpoint(
230236
if (proxy?.type == ProxyType.HTTP) {
231237
startTunnel(requestData, connection.output, connection.input)
232238
}
239+
240+
if (unixSocket != null) {
241+
throw IllegalArgumentException("TLS over Unix sockets is not supported")
242+
}
243+
233244
val realAddress = when (proxy) {
234-
null -> address
245+
null ->
246+
address as? InetSocketAddress
247+
?: throw IllegalArgumentException("Expected InetSocketAddress for TLS connection")
248+
235249
else -> InetSocketAddress(requestData.url.host, requestData.url.port)
236250
}
237251
val tlsSocket = connection.tls(coroutineContext) {
@@ -285,7 +299,7 @@ internal class Endpoint(
285299
return connectTimeout to socketTimeout
286300
}
287301

288-
private fun releaseConnection(address: InetSocketAddress) {
302+
private fun releaseConnection(address: SocketAddress) {
289303
connectionFactory.release(address)
290304
connections.decrementAndGet()
291305
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Copyright 2014-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package io.ktor.client.engine.cio
6+
7+
import io.ktor.client.HttpClient
8+
import io.ktor.client.request.get
9+
import io.ktor.client.request.unixSocket
10+
import io.ktor.client.statement.bodyAsText
11+
import io.ktor.network.sockets.*
12+
import io.ktor.server.application.*
13+
import io.ktor.server.cio.*
14+
import io.ktor.server.cio.CIO
15+
import io.ktor.server.engine.*
16+
import io.ktor.server.request.*
17+
import io.ktor.server.response.*
18+
import io.ktor.server.routing.*
19+
import kotlinx.coroutines.delay
20+
import kotlinx.coroutines.runBlocking
21+
import kotlin.test.Test
22+
import kotlin.test.assertEquals
23+
24+
class UnixSocketTest {
25+
26+
@Test
27+
fun testUnixSocketClient() = runBlocking {
28+
if (!UnixSocketAddress.isSupported()) return@runBlocking
29+
30+
val server = embeddedServer(
31+
CIO,
32+
serverConfig {
33+
module {
34+
routing {
35+
get("/") {
36+
call.respondText("Hello, Unix socket world!")
37+
}
38+
}
39+
}
40+
},
41+
configure = {
42+
unixConnector("/tmp/test-unix-socket-client.sock")
43+
}
44+
)
45+
46+
val client = HttpClient(io.ktor.client.engine.cio.CIO)
47+
try {
48+
server.start(wait = false)
49+
delay(1000)
50+
51+
val response = client.get("http://localhost/") {
52+
unixSocket("/tmp/test-unix-socket-client.sock/")
53+
}
54+
assertEquals(200, response.status.value)
55+
assertEquals("Hello, Unix socket world!", response.bodyAsText())
56+
} finally {
57+
client.close()
58+
server.stop()
59+
}
60+
}
61+
}

ktor-client/ktor-client-core/api/ktor-client-core.api

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1300,6 +1300,22 @@ public final class io/ktor/client/request/SSEClientResponseAdapter : io/ktor/cli
13001300
public fun adapt (Lio/ktor/client/request/HttpRequestData;Lio/ktor/http/HttpStatusCode;Lio/ktor/http/Headers;Lio/ktor/utils/io/ByteReadChannel;Lio/ktor/http/content/OutgoingContent;Lkotlin/coroutines/CoroutineContext;)Ljava/lang/Object;
13011301
}
13021302

1303+
public final class io/ktor/client/request/UnixSocketCapability : io/ktor/client/engine/HttpClientEngineCapability {
1304+
public static final field INSTANCE Lio/ktor/client/request/UnixSocketCapability;
1305+
public fun equals (Ljava/lang/Object;)Z
1306+
public fun hashCode ()I
1307+
public fun toString ()Ljava/lang/String;
1308+
}
1309+
1310+
public final class io/ktor/client/request/UnixSocketSettings {
1311+
public fun <init> (Ljava/lang/String;)V
1312+
public final fun getPath ()Ljava/lang/String;
1313+
}
1314+
1315+
public final class io/ktor/client/request/UnixSockets_jvmAndPosixKt {
1316+
public static final fun unixSocket (Lio/ktor/client/request/HttpRequestBuilder;Ljava/lang/String;)V
1317+
}
1318+
13031319
public final class io/ktor/client/request/UtilsKt {
13041320
public static final fun accept (Lio/ktor/http/HttpMessageBuilder;Lio/ktor/http/ContentType;)V
13051321
public static final fun basicAuth (Lio/ktor/http/HttpMessageBuilder;Ljava/lang/String;Ljava/lang/String;)V

ktor-client/ktor-client-core/api/ktor-client-core.klib.api

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -992,6 +992,13 @@ final class io.ktor.client.request/SSEClientResponseAdapter : io.ktor.client.req
992992
final fun adapt(io.ktor.client.request/HttpRequestData, io.ktor.http/HttpStatusCode, io.ktor.http/Headers, io.ktor.utils.io/ByteReadChannel, io.ktor.http.content/OutgoingContent, kotlin.coroutines/CoroutineContext): kotlin/Any? // io.ktor.client.request/SSEClientResponseAdapter.adapt|adapt(io.ktor.client.request.HttpRequestData;io.ktor.http.HttpStatusCode;io.ktor.http.Headers;io.ktor.utils.io.ByteReadChannel;io.ktor.http.content.OutgoingContent;kotlin.coroutines.CoroutineContext){}[0]
993993
}
994994

995+
final class io.ktor.client.request/UnixSocketSettings { // io.ktor.client.request/UnixSocketSettings|null[0]
996+
constructor <init>(kotlin/String) // io.ktor.client.request/UnixSocketSettings.<init>|<init>(kotlin.String){}[0]
997+
998+
final val path // io.ktor.client.request/UnixSocketSettings.path|{}path[0]
999+
final fun <get-path>(): kotlin/String // io.ktor.client.request/UnixSocketSettings.path.<get-path>|<get-path>(){}[0]
1000+
}
1001+
9951002
final class io.ktor.client.statement/DefaultHttpResponse : io.ktor.client.statement/HttpResponse { // io.ktor.client.statement/DefaultHttpResponse|null[0]
9961003
constructor <init>(io.ktor.client.call/HttpClientCall, io.ktor.client.request/HttpResponseData) // io.ktor.client.statement/DefaultHttpResponse.<init>|<init>(io.ktor.client.call.HttpClientCall;io.ktor.client.request.HttpResponseData){}[0]
9971004

@@ -1258,6 +1265,12 @@ final object io.ktor.client.plugins/SetupRequestContext : io.ktor.client.plugins
12581265
final fun install(io.ktor.client/HttpClient, kotlin.coroutines/SuspendFunction2<io.ktor.client.request/HttpRequestBuilder, kotlin.coroutines/SuspendFunction0<kotlin/Unit>, kotlin/Unit>) // io.ktor.client.plugins/SetupRequestContext.install|install(io.ktor.client.HttpClient;kotlin.coroutines.SuspendFunction2<io.ktor.client.request.HttpRequestBuilder,kotlin.coroutines.SuspendFunction0<kotlin.Unit>,kotlin.Unit>){}[0]
12591266
}
12601267

1268+
final object io.ktor.client.request/UnixSocketCapability : io.ktor.client.engine/HttpClientEngineCapability<io.ktor.client.request/UnixSocketSettings> { // io.ktor.client.request/UnixSocketCapability|null[0]
1269+
final fun equals(kotlin/Any?): kotlin/Boolean // io.ktor.client.request/UnixSocketCapability.equals|equals(kotlin.Any?){}[0]
1270+
final fun hashCode(): kotlin/Int // io.ktor.client.request/UnixSocketCapability.hashCode|hashCode(){}[0]
1271+
final fun toString(): kotlin/String // io.ktor.client.request/UnixSocketCapability.toString|toString(){}[0]
1272+
}
1273+
12611274
final object io.ktor.client.utils/CacheControl { // io.ktor.client.utils/CacheControl|null[0]
12621275
final const val MAX_AGE // io.ktor.client.utils/CacheControl.MAX_AGE|{}MAX_AGE[0]
12631276
final fun <get-MAX_AGE>(): kotlin/String // io.ktor.client.utils/CacheControl.MAX_AGE.<get-MAX_AGE>|<get-MAX_AGE>(){}[0]
@@ -1571,6 +1584,9 @@ final suspend inline fun <#A: reified kotlin/Any?> (io.ktor.client.plugins.webso
15711584
final suspend inline fun <#A: reified kotlin/Any?> (io.ktor.client.plugins.websocket/DefaultClientWebSocketSession).io.ktor.client.plugins.websocket/sendSerialized(#A) // io.ktor.client.plugins.websocket/sendSerialized|sendSerialized@io.ktor.client.plugins.websocket.DefaultClientWebSocketSession(0:0){0§<kotlin.Any?>}[0]
15721585
final suspend inline fun <#A: reified kotlin/Any?> (io.ktor.client.statement/HttpResponse).io.ktor.client.call/body(): #A // io.ktor.client.call/body|body@io.ktor.client.statement.HttpResponse(){0§<kotlin.Any?>}[0]
15731586

1587+
// Targets: [native]
1588+
final fun (io.ktor.client.request/HttpRequestBuilder).io.ktor.client.request/unixSocket(kotlin/String) // io.ktor.client.request/unixSocket|unixSocket@io.ktor.client.request.HttpRequestBuilder(kotlin.String){}[0]
1589+
15741590
// Targets: [js, wasmJs]
15751591
abstract interface io.ktor.client.fetch/AbortSignal : io.ktor.client.fetch/EventTarget { // io.ktor.client.fetch/AbortSignal|null[0]
15761592
abstract var aborted // io.ktor.client.fetch/AbortSignal.aborted|{}aborted[0]
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
/*
2+
* Copyright 2014-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
package io.ktor.client.request
5+
6+
import io.ktor.client.engine.*
7+
import io.ktor.utils.io.InternalAPI
8+
9+
@InternalAPI
10+
public class UnixSocketSettings(public val path: String)
11+
12+
@OptIn(InternalAPI::class)
13+
public data object UnixSocketCapability : HttpClientEngineCapability<UnixSocketSettings>
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
* Copyright 2014-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
package io.ktor.client.request
5+
6+
import io.ktor.utils.io.*
7+
8+
/**
9+
* Sets the path to the Unix domain socket file.
10+
*
11+
* ```kotlin
12+
* val client = client.get("http://localhost/api") {
13+
* unixSocket("/var/run/docker.sock")
14+
* }
15+
* ```
16+
*/
17+
@OptIn(InternalAPI::class)
18+
public fun HttpRequestBuilder.unixSocket(path: String) {
19+
setCapability(UnixSocketCapability, UnixSocketSettings(path))
20+
}

ktor-network/api/ktor-network.api

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -360,6 +360,7 @@ public final class io/ktor/network/sockets/UDPSocketBuilder : io/ktor/network/so
360360
}
361361

362362
public final class io/ktor/network/sockets/UnixSocketAddress : io/ktor/network/sockets/SocketAddress {
363+
public static final field Companion Lio/ktor/network/sockets/UnixSocketAddress$Companion;
363364
public fun <init> (Ljava/lang/String;)V
364365
public final fun component1 ()Ljava/lang/String;
365366
public final fun copy (Ljava/lang/String;)Lio/ktor/network/sockets/UnixSocketAddress;
@@ -370,6 +371,10 @@ public final class io/ktor/network/sockets/UnixSocketAddress : io/ktor/network/s
370371
public fun toString ()Ljava/lang/String;
371372
}
372373

374+
public final class io/ktor/network/sockets/UnixSocketAddress$Companion {
375+
public final fun isSupported ()Z
376+
}
377+
373378
public final class io/ktor/network/util/PoolsKt {
374379
public static final fun getDefaultByteBufferPool ()Lio/ktor/utils/io/pool/ObjectPool;
375380
public static final fun getDefaultDatagramByteBufferPool ()Lio/ktor/utils/io/pool/ObjectPool;

0 commit comments

Comments
 (0)