Skip to content

Commit d22c049

Browse files
committed
Support kotlin coroutines
Resolves: #383 Inspired by https://github.com/stas29a/coroutine-feign-client ## TODO - [ ] Separate Kotlin support module - [ ] Enhance test case
1 parent 8e5e8ce commit d22c049

File tree

10 files changed

+219
-6
lines changed

10 files changed

+219
-6
lines changed

feign-reactor-core/pom.xml

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,11 @@
2626
<version>3.2.4-SNAPSHOT</version>
2727
</parent>
2828

29+
<properties>
30+
<kotlin.version>1.5.30</kotlin.version>
31+
<kotlinx.coroutines.version>1.5.2</kotlinx.coroutines.version>
32+
</properties>
33+
2934
<artifactId>feign-reactor-core</artifactId>
3035
<packaging>jar</packaging>
3136
<name>Feign Reactive Core</name>
@@ -63,6 +68,23 @@
6368
<optional>true</optional>
6469
</dependency>
6570

71+
<dependency>
72+
<groupId>org.jetbrains.kotlin</groupId>
73+
<artifactId>kotlin-stdlib-jdk8</artifactId>
74+
<version>${kotlin.version}</version>
75+
</dependency>
76+
77+
<dependency>
78+
<groupId>org.jetbrains.kotlin</groupId>
79+
<artifactId>kotlin-reflect</artifactId>
80+
</dependency>
81+
82+
<dependency>
83+
<groupId>org.jetbrains.kotlinx</groupId>
84+
<artifactId>kotlinx-coroutines-reactor</artifactId>
85+
<version>${kotlinx.coroutines.version}</version>
86+
</dependency>
87+
6688
<!-- Tests -->
6789
<dependency>
6890
<groupId>io.projectreactor</groupId>
@@ -185,6 +207,37 @@
185207
</execution>
186208
</executions>
187209
</plugin>
210+
<plugin>
211+
<artifactId>kotlin-maven-plugin</artifactId>
212+
<groupId>org.jetbrains.kotlin</groupId>
213+
<version>${kotlin.version}</version>
214+
<executions>
215+
<execution>
216+
<id>compile</id>
217+
<goals>
218+
<goal>compile</goal>
219+
</goals>
220+
<configuration>
221+
<sourceDirs>
222+
<sourceDir>${project.basedir}/src/main/kotlin</sourceDir>
223+
<sourceDir>${project.basedir}/src/main/java</sourceDir>
224+
</sourceDirs>
225+
</configuration>
226+
</execution>
227+
<execution>
228+
<id>test-compile</id>
229+
<goals>
230+
<goal>test-compile</goal>
231+
</goals>
232+
<configuration>
233+
<sourceDirs>
234+
<sourceDir>${project.basedir}/src/test/kotlin</sourceDir>
235+
<sourceDir>${project.basedir}/src/test/java</sourceDir>
236+
</sourceDirs>
237+
</configuration>
238+
</execution>
239+
</executions>
240+
</plugin>
188241
</plugins>
189242
</build>
190243

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
@file:JvmName("KotlinExtensions")
2+
3+
package reactivefeign
4+
5+
import kotlinx.coroutines.reactor.awaitSingle
6+
import reactivefeign.client.ReactiveHttpResponse
7+
import reactor.core.publisher.Mono
8+
import java.lang.reflect.Method
9+
import java.lang.reflect.Type
10+
import kotlin.reflect.jvm.javaType
11+
import kotlin.reflect.jvm.kotlinFunction
12+
13+
internal suspend fun Mono<*>.awaitReactiveHttpResponse(): Any {
14+
val result = awaitSingle()
15+
if (result is ReactiveHttpResponse<*>) {
16+
val body = result.body()
17+
require(body is Mono<*>) { "Only Mono type is allowed for suspend method" }
18+
return body.awaitSingle()
19+
}
20+
21+
return result
22+
}
23+
24+
internal fun Method.isSuspendMethod(): Boolean =
25+
kotlinFunction?.isSuspend ?: false
26+
27+
internal fun Method.getKotlinMethodReturnType(): Type? =
28+
kotlinFunction?.returnType?.javaType

feign-reactor-core/src/main/java/reactivefeign/ReactiveContract.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626

2727
import static feign.Util.checkNotNull;
2828
import static java.util.Arrays.asList;
29+
import static reactivefeign.KotlinExtensions.getKotlinMethodReturnType;
30+
import static reactivefeign.KotlinExtensions.isSuspendMethod;
2931
import static reactivefeign.utils.FeignUtils.*;
3032

3133
/**
@@ -48,7 +50,13 @@ public List<MethodMetadata> parseAndValidateMetadata(final Class<?> targetType)
4850

4951
for (final MethodMetadata metadata : methodsMetadata) {
5052
final Type type = metadata.returnType();
51-
if (!isReactorType(type)) {
53+
54+
boolean isSuspend = isSuspendMethod(metadata.method());
55+
if (isSuspend) {
56+
modifySuspendMethodMetadata(metadata);
57+
}
58+
59+
if (!isReactorType(type) && !isSuspend) {
5260
throw new IllegalArgumentException(String.format(
5361
"Method %s of contract %s doesn't returns reactor.core.publisher.Mono or reactor.core.publisher.Flux",
5462
metadata.configKey(), targetType.getSimpleName()));
@@ -64,6 +72,23 @@ public List<MethodMetadata> parseAndValidateMetadata(final Class<?> targetType)
6472
return methodsMetadata;
6573
}
6674

75+
private static void modifySuspendMethodMetadata(MethodMetadata metadata) {
76+
Type kotlinMethodReturnType = getKotlinMethodReturnType(metadata.method());
77+
if (kotlinMethodReturnType == null) {
78+
throw new IllegalArgumentException(String.format(
79+
"Method %s can't have continuation argument, only kotlin method is allowed",
80+
metadata.configKey()));
81+
}
82+
metadata.returnType(kotlinMethodReturnType);
83+
84+
int continuationIndex = metadata.method().getParameterCount() - 1;
85+
metadata.ignoreParamater(continuationIndex);
86+
87+
if(metadata.bodyIndex() != null && metadata.bodyIndex().equals(continuationIndex)) {
88+
metadata.bodyIndex(null);
89+
}
90+
}
91+
6792
private static final Set<Class> REACTOR_PUBLISHERS = new HashSet<>(asList(Mono.class, Flux.class));
6893

6994
private boolean isReactorType(final Type type) {

feign-reactor-core/src/main/java/reactivefeign/ReactiveFeign.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545

4646
import static feign.Util.checkNotNull;
4747
import static feign.Util.isDefault;
48+
import static reactivefeign.KotlinExtensions.isSuspendMethod;
4849
import static reactivefeign.client.ReactiveHttpExchangeFilterFunction.ofRequestProcessor;
4950
import static reactivefeign.client.ReactiveHttpExchangeFilterFunction.ofResponseProcessor;
5051
import static reactivefeign.client.StatusHandlerPostProcessor.handleStatus;
@@ -287,7 +288,9 @@ public static PublisherHttpClient retry(
287288
MethodMetadata methodMetadata,
288289
Retry retry) {
289290
Type returnPublisherType = returnPublisherType(methodMetadata);
290-
if(returnPublisherType == Mono.class){
291+
if (isSuspendMethod(methodMetadata.method())) {
292+
return new MonoRetryPublisherHttpClient(publisherClient, methodMetadata, retry);
293+
} else if (returnPublisherType == Mono.class) {
291294
return new MonoRetryPublisherHttpClient(publisherClient, methodMetadata, retry);
292295
} else if(returnPublisherType == Flux.class) {
293296
return new FluxRetryPublisherHttpClient(publisherClient, methodMetadata, retry);
@@ -297,7 +300,9 @@ public static PublisherHttpClient retry(
297300
}
298301

299302
protected PublisherHttpClient toPublisher(ReactiveHttpClient reactiveHttpClient, MethodMetadata methodMetadata){
300-
if(isResponsePublisher(methodMetadata.returnType())){
303+
if (isSuspendMethod(methodMetadata.method())) {
304+
return new ResponsePublisherHttpClient(reactiveHttpClient);
305+
} else if (isResponsePublisher(methodMetadata.returnType())) {
301306
return new ResponsePublisherHttpClient(reactiveHttpClient);
302307
}
303308

feign-reactor-core/src/main/java/reactivefeign/ReactiveInvocationHandler.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,18 @@
1616
import feign.InvocationHandlerFactory;
1717
import feign.InvocationHandlerFactory.MethodHandler;
1818
import feign.Target;
19+
import kotlin.coroutines.Continuation;
20+
import reactor.core.publisher.Mono;
1921

2022
import java.lang.reflect.InvocationHandler;
2123
import java.lang.reflect.Method;
2224
import java.lang.reflect.Proxy;
25+
import java.util.Arrays;
2326
import java.util.Map;
2427

2528
import static feign.Util.checkNotNull;
29+
import static reactivefeign.KotlinExtensions.awaitReactiveHttpResponse;
30+
import static reactivefeign.KotlinExtensions.isSuspendMethod;
2631

2732
/**
2833
* {@link InvocationHandler} implementation that transforms calls to methods of feign contract into
@@ -61,6 +66,13 @@ private void defineObjectMethodsHandlers() {
6166

6267
@Override
6368
public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
69+
if (isSuspendMethod(method)) {
70+
Object[] newArgs = Arrays.copyOfRange(args, 0, args.length - 1);
71+
Mono<?> result = (Mono<?>) dispatch.get(method).invoke(newArgs);
72+
Continuation<Object> continuation = (Continuation<Object>) args[args.length - 1];
73+
return awaitReactiveHttpResponse(result, continuation);
74+
}
75+
6476
return dispatch.get(method).invoke(args);
6577
}
6678

feign-reactor-core/src/main/java/reactivefeign/methodhandler/ReactiveMethodHandlerFactory.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import java.lang.reflect.Type;
1212

1313
import static feign.Util.checkNotNull;
14+
import static reactivefeign.KotlinExtensions.isSuspendMethod;
1415
import static reactivefeign.utils.FeignUtils.isResponsePublisher;
1516
import static reactivefeign.utils.FeignUtils.returnPublisherType;
1617

@@ -35,7 +36,9 @@ public MethodHandler create(MethodMetadata metadata) {
3536
MethodHandler methodHandler = new PublisherClientMethodHandler(
3637
target, metadata, publisherClientFactory.create(metadata));
3738

38-
if(isResponsePublisher(metadata.returnType())){
39+
if (isSuspendMethod(metadata.method())) {
40+
return new MonoMethodHandler(methodHandler);
41+
} else if (isResponsePublisher(metadata.returnType())) {
3942
return new MonoMethodHandler(methodHandler);
4043
}
4144

@@ -53,7 +56,9 @@ public MethodHandler create(MethodMetadata metadata) {
5356
public MethodHandler createDefault(Method method) {
5457
MethodHandler defaultMethodHandler = new DefaultMethodHandler(method);
5558

56-
if(method.getReturnType() == Mono.class){
59+
if (isSuspendMethod(method)) {
60+
return new MonoMethodHandler(defaultMethodHandler);
61+
} else if (method.getReturnType() == Mono.class) {
5762
return new MonoMethodHandler(defaultMethodHandler);
5863
} else if(method.getReturnType() == Flux.class) {
5964
return new FluxMethodHandler(defaultMethodHandler);

feign-reactor-core/src/main/java/reactivefeign/utils/FeignUtils.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,10 @@ public static Type returnActualType(MethodMetadata methodMetadata) {
6464
}
6565

6666
public static Type returnActualType(Type returnType) {
67+
if (!(returnType instanceof ParameterizedType)) {
68+
return returnType;
69+
}
70+
6771
Class<?> publisher = (Class)((ParameterizedType) returnType).getRawType();
6872
Type typeInPublisher = resolveLastTypeParameter(returnType, publisher);
6973
if(isResponsePublisher(publisher, typeInPublisher)){
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package reactivefeign
2+
3+
import com.fasterxml.jackson.core.JsonProcessingException
4+
import kotlinx.coroutines.runBlocking
5+
import org.assertj.core.api.Assertions.assertThat
6+
import org.junit.AfterClass
7+
import org.junit.BeforeClass
8+
import org.junit.Test
9+
import reactivefeign.resttemplate.client.RestTemplateFakeReactiveFeign
10+
import reactivefeign.testcase.SuspendIceCreamServiceApi
11+
import reactivefeign.testcase.domain.OrderGenerator
12+
import reactor.core.publisher.Mono
13+
import reactor.netty.DisposableServer
14+
import reactor.netty.http.HttpProtocol
15+
import reactor.netty.http.server.HttpServer
16+
import reactor.netty.http.server.HttpServerRequest
17+
import reactor.netty.http.server.HttpServerResponse
18+
import reactor.netty.http.server.HttpServerRoutes
19+
import java.time.Duration
20+
21+
class SuspendTest {
22+
companion object {
23+
private lateinit var server: DisposableServer
24+
private const val DELAY_IN_MILLIS: Long = 500L
25+
private val cannedValue = OrderGenerator().generate(1)
26+
27+
@BeforeClass
28+
@JvmStatic
29+
@Throws(JsonProcessingException::class)
30+
fun startServer() {
31+
val data = TestUtils.MAPPER.writeValueAsString(cannedValue).toByteArray()
32+
server = HttpServer.create()
33+
.protocol(HttpProtocol.HTTP11, HttpProtocol.H2C)
34+
.route { routes: HttpServerRoutes ->
35+
routes[
36+
"/icecream/orders/1", { req: HttpServerRequest?, res: HttpServerResponse ->
37+
res.header("Content-Type", "application/json")
38+
Mono.delay(Duration.ofMillis(DELAY_IN_MILLIS))
39+
.thenEmpty(res.sendByteArray(Mono.just(data)))
40+
}
41+
]
42+
}
43+
.bindNow()
44+
}
45+
46+
@JvmStatic
47+
@AfterClass
48+
fun stopServer() {
49+
server.disposeNow()
50+
}
51+
}
52+
53+
@Test
54+
fun shouldRun(): Unit = runBlocking {
55+
val client = RestTemplateFakeReactiveFeign
56+
.builder<SuspendIceCreamServiceApi>()
57+
.target(
58+
SuspendIceCreamServiceApi::class.java,
59+
"http://localhost:" + server.port()
60+
)
61+
62+
val firstOrder = client.findOrder(orderId = 1)
63+
64+
assertThat(firstOrder).usingRecursiveComparison().isEqualTo(cannedValue)
65+
}
66+
}

feign-reactor-core/src/test/java/reactivefeign/resttemplate/client/RestTemplateFakeReactiveHttpClient.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import java.util.Map;
3636

3737
import static org.springframework.core.ParameterizedTypeReference.forType;
38+
import static reactivefeign.KotlinExtensions.isSuspendMethod;
3839
import static reactivefeign.utils.FeignUtils.returnActualType;
3940
import static reactivefeign.utils.FeignUtils.returnPublisherType;
4041

@@ -51,7 +52,11 @@ public class RestTemplateFakeReactiveHttpClient implements ReactiveHttpClient {
5152
this.restTemplate = restTemplate;
5253
this.acceptGzip = acceptGzip;
5354

54-
returnPublisherType = returnPublisherType(methodMetadata);
55+
if (isSuspendMethod(methodMetadata.method())) {
56+
returnPublisherType = Mono.class;
57+
} else {
58+
returnPublisherType = returnPublisherType(methodMetadata);
59+
}
5560
returnActualType = forType(returnActualType(methodMetadata));
5661
}
5762

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package reactivefeign.testcase
2+
3+
import feign.Param
4+
import feign.RequestLine
5+
import reactivefeign.testcase.domain.IceCreamOrder
6+
7+
interface SuspendIceCreamServiceApi {
8+
@RequestLine("GET /icecream/orders/{orderId}")
9+
suspend fun findOrder(@Param("orderId") orderId: Int): IceCreamOrder
10+
}

0 commit comments

Comments
 (0)