Skip to content

Commit df00542

Browse files
committed
Fully clear reactive sender cache on Lifecycle stop
1 parent a2646d9 commit df00542

File tree

2 files changed

+34
-0
lines changed

2 files changed

+34
-0
lines changed

gradle/aggregate-jacoco-report.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ project.afterEvaluate {
1212

1313
tasks.create(name: 'aggregateJacocoTestReport', type: JacocoReport) {
1414

15+
dependsOn ':spring-pulsar-cache-provider:compileJava', ':spring-pulsar-cache-provider-caffeine:compileJava'
16+
1517
group = 'verification'
1618
description = 'Generates aggregate code coverage report for all projects test tasks'
1719

spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarSenderFactory.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.springframework.pulsar.core.TopicResolver;
3636
import org.springframework.util.Assert;
3737
import org.springframework.util.CollectionUtils;
38+
import org.springframework.util.ReflectionUtils;
3839

3940
/**
4041
* Default implementation of {@link ReactivePulsarSenderFactory}.
@@ -172,13 +173,44 @@ public LogAccessor logger() {
172173
@Override
173174
public void doStop() {
174175
try {
176+
this.reflectivelyClearCache();
175177
this.reactiveMessageSenderCache.close();
178+
176179
}
177180
catch (Exception e) {
178181
throw new RuntimeException(e);
179182
}
180183
}
181184

185+
/**
186+
* Workaround to reflectively clear the underlying producer cache.
187+
*
188+
* TODO: Remove once this is supported in the Reactive client.
189+
*/
190+
private void reflectivelyClearCache() {
191+
// reactiveMessageSenderCache
192+
// (org.apache.pulsar.reactive.client.internal.adapter.ProducerCache)
193+
var cacheProviderField = ReflectionUtils.findField(this.reactiveMessageSenderCache.getClass(), "cacheProvider");
194+
ReflectionUtils.makeAccessible(cacheProviderField);
195+
196+
// org.apache.pulsar.reactive.client.producercache.CaffeineShadedProducerCacheProvider
197+
var cacheProvider = ReflectionUtils.getField(cacheProviderField, this.reactiveMessageSenderCache);
198+
199+
// org.apache.pulsar.reactive.shade.com.github.benmanes.caffeine.cache.BoundedLocalCache$BoundedLocalAsyncCache
200+
var cacheField = ReflectionUtils.findField(cacheProvider.getClass(), "cache");
201+
ReflectionUtils.makeAccessible(cacheField);
202+
var cache = ReflectionUtils.getField(cacheField, cacheProvider);
203+
204+
// org.apache.pulsar.reactive.shade.com.github.benmanes.caffeine.cache.SSLMSAW
205+
var actualCacheField = ReflectionUtils.findField(cache.getClass(), "cache");
206+
ReflectionUtils.makeAccessible(actualCacheField);
207+
var actualCache = ReflectionUtils.getField(actualCacheField, cache);
208+
209+
var clearMethod = ReflectionUtils.findMethod(actualCache.getClass(), "clear");
210+
ReflectionUtils.makeAccessible(clearMethod);
211+
ReflectionUtils.invokeMethod(clearMethod, actualCache);
212+
}
213+
182214
/**
183215
* Builder for {@link DefaultReactivePulsarSenderFactory}.
184216
*

0 commit comments

Comments
 (0)