Skip to content

Commit d1096a2

Browse files
committed
Add IT for config props driven listener
This commit adds an integration test to verify the following Spring Boot config props can be used to configure `@PulsarListener` and `@ReactivePulsarListener`: - `spring.pulsar.consumer.topic` - `spring.pulsar.consumer.subscription.name` - `spring.pulsar.consumer.subscription.type` See spring-projects/spring-boot#42053
1 parent c254d6b commit d1096a2

File tree

2 files changed

+177
-16
lines changed

2 files changed

+177
-16
lines changed

integration-tests/src/intTest/java/org/springframework/pulsar/inttest/listener/PulsarListenerIntegrationTests.java

Lines changed: 57 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,13 @@
2222
import java.util.concurrent.CountDownLatch;
2323
import java.util.concurrent.TimeUnit;
2424

25+
import org.apache.pulsar.client.api.Consumer;
2526
import org.apache.pulsar.client.api.Schema;
27+
import org.apache.pulsar.client.api.SubscriptionType;
28+
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
2629
import org.apache.pulsar.common.schema.SchemaType;
30+
import org.assertj.core.api.InstanceOfAssertFactories;
31+
import org.junit.jupiter.api.Nested;
2732
import org.junit.jupiter.api.Test;
2833

2934
import org.springframework.boot.SpringApplication;
@@ -62,12 +67,11 @@ class PulsarListenerIntegrationTests implements PulsarTestContainerSupport {
6267
void basicPulsarListener() throws Exception {
6368
SpringApplication app = new SpringApplication(BasicListenerConfig.class);
6469
app.setWebApplicationType(WebApplicationType.NONE);
65-
6670
try (ConfigurableApplicationContext context = app
6771
.run("--spring.pulsar.client.serviceUrl=" + PulsarTestContainerSupport.getPulsarBrokerUrl())) {
6872
@SuppressWarnings("unchecked")
6973
PulsarTemplate<String> pulsarTemplate = context.getBean(PulsarTemplate.class);
70-
pulsarTemplate.send("plt-basic-topic", "John Doe");
74+
pulsarTemplate.send("plit-basic-topic", "John Doe");
7175
assertThat(LATCH_1.await(20, TimeUnit.SECONDS)).isTrue();
7276
}
7377
}
@@ -76,12 +80,11 @@ void basicPulsarListener() throws Exception {
7680
void basicPulsarListenerCustomType() throws Exception {
7781
SpringApplication app = new SpringApplication(BasicListenerCustomTypeConfig.class);
7882
app.setWebApplicationType(WebApplicationType.NONE);
79-
8083
try (ConfigurableApplicationContext context = app
8184
.run("--spring.pulsar.client.serviceUrl=" + PulsarTestContainerSupport.getPulsarBrokerUrl())) {
8285
@SuppressWarnings("unchecked")
8386
PulsarTemplate<Foo> pulsarTemplate = context.getBean(PulsarTemplate.class);
84-
pulsarTemplate.send("plt-foo-topic1", new Foo("John Doe"), Schema.JSON(Foo.class));
87+
pulsarTemplate.send("plit-foo-topic1", new Foo("John Doe"), Schema.JSON(Foo.class));
8588
assertThat(LATCH_2.await(20, TimeUnit.SECONDS)).isTrue();
8689
}
8790
}
@@ -90,12 +93,11 @@ void basicPulsarListenerCustomType() throws Exception {
9093
void basicPulsarListenerCustomTypeWithTypeMapping() throws Exception {
9194
SpringApplication app = new SpringApplication(BasicListenerCustomTypeWithTypeMappingConfig.class);
9295
app.setWebApplicationType(WebApplicationType.NONE);
93-
9496
try (ConfigurableApplicationContext context = app
9597
.run("--spring.pulsar.client.serviceUrl=" + PulsarTestContainerSupport.getPulsarBrokerUrl())) {
9698
@SuppressWarnings("unchecked")
9799
PulsarTemplate<Foo> pulsarTemplate = context.getBean(PulsarTemplate.class);
98-
pulsarTemplate.send("plt-foo-topic2", new Foo("John Doe"));
100+
pulsarTemplate.send("plit-foo-topic2", new Foo("John Doe"));
99101
assertThat(LATCH_3.await(20, TimeUnit.SECONDS)).isTrue();
100102
}
101103
}
@@ -104,12 +106,11 @@ void basicPulsarListenerCustomTypeWithTypeMapping() throws Exception {
104106
void basicPulsarListenerWithTopicMapping() throws Exception {
105107
SpringApplication app = new SpringApplication(BasicListenerWithTopicMappingConfig.class);
106108
app.setWebApplicationType(WebApplicationType.NONE);
107-
108109
try (ConfigurableApplicationContext context = app
109110
.run("--spring.pulsar.client.serviceUrl=" + PulsarTestContainerSupport.getPulsarBrokerUrl())) {
110111
@SuppressWarnings("unchecked")
111112
PulsarTemplate<Foo> pulsarTemplate = context.getBean(PulsarTemplate.class);
112-
pulsarTemplate.send("plt-topicMapping-topic", new Foo("Crazy8z"), Schema.JSON(Foo.class));
113+
pulsarTemplate.send("plit-topicMapping-topic", new Foo("Crazy8z"), Schema.JSON(Foo.class));
113114
assertThat(LATCH_4.await(20, TimeUnit.SECONDS)).isTrue();
114115
}
115116
}
@@ -118,23 +119,63 @@ void basicPulsarListenerWithTopicMapping() throws Exception {
118119
void batchPulsarListener() throws Exception {
119120
SpringApplication app = new SpringApplication(BatchListenerConfig.class);
120121
app.setWebApplicationType(WebApplicationType.NONE);
121-
122122
try (ConfigurableApplicationContext context = app
123123
.run("--spring.pulsar.client.serviceUrl=" + PulsarTestContainerSupport.getPulsarBrokerUrl())) {
124124
@SuppressWarnings("unchecked")
125125
PulsarTemplate<String> pulsarTemplate = context.getBean(PulsarTemplate.class);
126126
for (int i = 0; i < 10; i++) {
127-
pulsarTemplate.send("plt-batch-topic", "John Doe");
127+
pulsarTemplate.send("plit-batch-topic", "John Doe");
128128
}
129129
assertThat(LATCH_5.await(10, TimeUnit.SECONDS)).isTrue();
130130
}
131131
}
132132

133+
@Nested
134+
class ConfigPropsDrivenListener {
135+
136+
private static final CountDownLatch LATCH_CONFIG_PROPS = new CountDownLatch(1);
137+
138+
@Test
139+
void subscriptionConfigPropsAreRespectedOnListener() throws Exception {
140+
SpringApplication app = new SpringApplication(ConfigPropsDrivenListenerConfig.class);
141+
app.setWebApplicationType(WebApplicationType.NONE);
142+
try (ConfigurableApplicationContext context = app.run(
143+
"--spring.pulsar.client.serviceUrl=" + PulsarTestContainerSupport.getPulsarBrokerUrl(),
144+
"--my.env=dev", "--spring.pulsar.consumer.topics=plit-config-props-topic-${my.env}",
145+
"--spring.pulsar.consumer.subscription.type=Shared",
146+
"--spring.pulsar.consumer.subscription.name=plit-config-props-subs-${my.env}")) {
147+
@SuppressWarnings("unchecked")
148+
PulsarTemplate<String> pulsarTemplate = context.getBean(PulsarTemplate.class);
149+
pulsarTemplate.send("plit-config-props-topic-dev", "hello config props driven");
150+
assertThat(LATCH_CONFIG_PROPS.await(10, TimeUnit.SECONDS)).isTrue();
151+
}
152+
153+
}
154+
155+
@EnableAutoConfiguration
156+
@SpringBootConfiguration
157+
static class ConfigPropsDrivenListenerConfig {
158+
159+
@PulsarListener
160+
public void listen(String ignored, Consumer<String> consumer) {
161+
assertThat(consumer).extracting("conf", InstanceOfAssertFactories.type(ConsumerConfigurationData.class))
162+
.satisfies((conf) -> {
163+
assertThat(conf.getSingleTopic()).isEqualTo("plit-config-props-topic-dev");
164+
assertThat(conf.getSubscriptionType()).isEqualTo(SubscriptionType.Shared);
165+
assertThat(conf.getSubscriptionName()).isEqualTo("plit-config-props-subs-dev");
166+
});
167+
LATCH_CONFIG_PROPS.countDown();
168+
}
169+
170+
}
171+
172+
}
173+
133174
@EnableAutoConfiguration
134175
@SpringBootConfiguration
135176
static class BasicListenerConfig {
136177

137-
@PulsarListener(subscriptionName = "plt-basic-sub", topics = "plt-basic-topic")
178+
@PulsarListener(subscriptionName = "plit-basic-sub", topics = "plit-basic-topic")
138179
public void listen(String ignored) {
139180
LATCH_1.countDown();
140181
}
@@ -145,7 +186,7 @@ public void listen(String ignored) {
145186
@SpringBootConfiguration
146187
static class BasicListenerCustomTypeConfig {
147188

148-
@PulsarListener(subscriptionName = "plt-foo-sub1", topics = "plt-foo-topic1", schemaType = SchemaType.JSON)
189+
@PulsarListener(subscriptionName = "plit-foo-sub1", topics = "plit-foo-topic1", schemaType = SchemaType.JSON)
149190
public void listen(Foo ignored) {
150191
LATCH_2.countDown();
151192
}
@@ -163,7 +204,7 @@ SchemaResolver customSchemaResolver() {
163204
return resolver;
164205
}
165206

166-
@PulsarListener(subscriptionName = "plt-foo-sub2", topics = "plt-foo-topic2")
207+
@PulsarListener(subscriptionName = "plit-foo-sub2", topics = "plit-foo-topic2")
167208
public void listen(Foo ignored) {
168209
LATCH_3.countDown();
169210
}
@@ -177,11 +218,11 @@ static class BasicListenerWithTopicMappingConfig {
177218
@Bean
178219
TopicResolver customTopicResolver() {
179220
DefaultTopicResolver resolver = new DefaultTopicResolver();
180-
resolver.addCustomTopicMapping(Foo.class, "plt-topicMapping-topic");
221+
resolver.addCustomTopicMapping(Foo.class, "plit-topicMapping-topic");
181222
return resolver;
182223
}
183224

184-
@PulsarListener(subscriptionName = "plt-topicMapping-sub", schemaType = SchemaType.JSON)
225+
@PulsarListener(subscriptionName = "plit-topicMapping-sub", schemaType = SchemaType.JSON)
185226
public void listen(Foo ignored) {
186227
LATCH_4.countDown();
187228
}
@@ -192,7 +233,7 @@ public void listen(Foo ignored) {
192233
@SpringBootConfiguration
193234
static class BatchListenerConfig {
194235

195-
@PulsarListener(subscriptionName = "plt-batch-sub", topics = "plt-batch-topic", batch = true)
236+
@PulsarListener(subscriptionName = "plit-batch-sub", topics = "plit-batch-topic", batch = true)
196237
public void listen(List<String> foo) {
197238
foo.forEach(t -> LATCH_5.countDown());
198239
}

integration-tests/src/intTest/java/org/springframework/pulsar/inttest/listener/ReactivePulsarListenerIntegrationTests.java

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,25 @@
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
2020

21+
import java.util.HashMap;
22+
import java.util.List;
23+
import java.util.Map;
2124
import java.util.concurrent.CountDownLatch;
2225
import java.util.concurrent.TimeUnit;
2326

2427
import org.apache.pulsar.client.api.Message;
2528
import org.apache.pulsar.client.api.Schema;
2629
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
30+
import org.apache.pulsar.client.api.SubscriptionType;
2731
import org.apache.pulsar.common.schema.SchemaType;
2832
import org.apache.pulsar.reactive.client.api.MessageResult;
33+
import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumer;
34+
import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumerSpec;
35+
import org.junit.jupiter.api.Nested;
2936
import org.junit.jupiter.api.Test;
3037

38+
import org.springframework.beans.BeansException;
39+
import org.springframework.beans.factory.config.BeanPostProcessor;
3140
import org.springframework.boot.SpringApplication;
3241
import org.springframework.boot.SpringBootConfiguration;
3342
import org.springframework.boot.WebApplicationType;
@@ -43,8 +52,12 @@
4352
import org.springframework.pulsar.core.TopicResolver;
4453
import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener;
4554
import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListenerMessageConsumerBuilderCustomizer;
55+
import org.springframework.pulsar.reactive.core.ReactiveMessageConsumerBuilderCustomizer;
56+
import org.springframework.pulsar.reactive.core.ReactivePulsarConsumerFactory;
4657
import org.springframework.pulsar.reactive.core.ReactivePulsarTemplate;
4758
import org.springframework.pulsar.test.support.PulsarTestContainerSupport;
59+
import org.springframework.test.util.ReflectionTestUtils;
60+
import org.springframework.util.ObjectUtils;
4861

4962
import reactor.core.publisher.Flux;
5063
import reactor.core.publisher.Mono;
@@ -134,6 +147,75 @@ void fluxListener() throws Exception {
134147
}
135148
}
136149

150+
@Nested
151+
class ConfigPropsDrivenListener {
152+
153+
private static final CountDownLatch LATCH_CONFIG_PROPS = new CountDownLatch(1);
154+
155+
@Test
156+
void subscriptionConfigPropsAreRespectedOnListener() throws Exception {
157+
SpringApplication app = new SpringApplication(ConfigPropsDrivenListenerConfig.class);
158+
app.setWebApplicationType(WebApplicationType.NONE);
159+
try (ConfigurableApplicationContext context = app.run(
160+
"--spring.pulsar.client.serviceUrl=" + PulsarTestContainerSupport.getPulsarBrokerUrl(),
161+
"--my.env=dev", "--spring.pulsar.consumer.topics=rplit-config-props-topic-${my.env}",
162+
"--spring.pulsar.consumer.subscription.type=Shared",
163+
"--spring.pulsar.consumer.subscription.name=rplit-config-props-subs-${my.env}")) {
164+
var topic = "persistent://public/default/rplit-config-props-topic-dev";
165+
@SuppressWarnings("unchecked")
166+
ReactivePulsarTemplate<String> pulsarTemplate = context.getBean(ReactivePulsarTemplate.class);
167+
pulsarTemplate.send(topic, "hello config props driven").block();
168+
assertThat(LATCH_CONFIG_PROPS.await(10, TimeUnit.SECONDS)).isTrue();
169+
@SuppressWarnings("unchecked")
170+
ConsumerTrackingReactivePulsarConsumerFactory<String> consumerFactory = (ConsumerTrackingReactivePulsarConsumerFactory<String>) context
171+
.getBean(ReactivePulsarConsumerFactory.class);
172+
assertThat(consumerFactory.getSpec(topic)).isNotNull().satisfies((consumerSpec) -> {
173+
assertThat(consumerSpec.getTopicNames()).containsExactly(topic);
174+
assertThat(consumerSpec.getSubscriptionName()).isEqualTo("rplit-config-props-subs-dev");
175+
assertThat(consumerSpec.getSubscriptionType()).isEqualTo(SubscriptionType.Shared);
176+
});
177+
}
178+
}
179+
180+
@EnableAutoConfiguration
181+
@SpringBootConfiguration
182+
@Import(ConsumerCustomizerConfig.class)
183+
static class ConfigPropsDrivenListenerConfig {
184+
185+
/**
186+
* Post process the Reactive consumer factory and replace it with a tracking
187+
* wrapper around it. Because this test requires the Spring Boot config props
188+
* to be applied to the auto-configured consumer factory we can't simply
189+
* replace the consumer factory bean as the config props will not be set on
190+
* the custom consumer factory.
191+
* @return post processor to wrap a tracker around the reactive consumer
192+
* factory
193+
*/
194+
@Bean
195+
static BeanPostProcessor consumerTrackingConsumerFactory() {
196+
return new BeanPostProcessor() {
197+
@SuppressWarnings({ "rawtypes", "unchecked" })
198+
@Override
199+
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
200+
if (bean instanceof ReactivePulsarConsumerFactory rcf) {
201+
return new ConsumerTrackingReactivePulsarConsumerFactory<>(
202+
(ReactivePulsarConsumerFactory<String>) rcf);
203+
}
204+
return bean;
205+
}
206+
};
207+
}
208+
209+
@ReactivePulsarListener(consumerCustomizer = "consumerCustomizer")
210+
public Mono<Void> listen(String ignored) {
211+
LATCH_CONFIG_PROPS.countDown();
212+
return Mono.empty();
213+
}
214+
215+
}
216+
217+
}
218+
137219
@EnableAutoConfiguration
138220
@SpringBootConfiguration
139221
@Import(ConsumerCustomizerConfig.class)
@@ -230,4 +312,42 @@ ReactivePulsarListenerMessageConsumerBuilderCustomizer<String> consumerCustomize
230312
record Foo(String value) {
231313
}
232314

315+
static class ConsumerTrackingReactivePulsarConsumerFactory<T> implements ReactivePulsarConsumerFactory<T> {
316+
317+
private Map<String, ReactiveMessageConsumerSpec> topicNameToConsumerSpec = new HashMap<>();
318+
319+
private ReactivePulsarConsumerFactory<T> delegate;
320+
321+
ConsumerTrackingReactivePulsarConsumerFactory(ReactivePulsarConsumerFactory<T> delegate) {
322+
this.delegate = delegate;
323+
}
324+
325+
@Override
326+
public ReactiveMessageConsumer<T> createConsumer(Schema<T> schema) {
327+
var consumer = this.delegate.createConsumer(schema);
328+
storeSpec(consumer);
329+
return consumer;
330+
}
331+
332+
@Override
333+
public ReactiveMessageConsumer<T> createConsumer(Schema<T> schema,
334+
List<ReactiveMessageConsumerBuilderCustomizer<T>> reactiveMessageConsumerBuilderCustomizers) {
335+
var consumer = this.delegate.createConsumer(schema, reactiveMessageConsumerBuilderCustomizers);
336+
storeSpec(consumer);
337+
return consumer;
338+
}
339+
340+
private void storeSpec(ReactiveMessageConsumer<T> consumer) {
341+
var consumerSpec = (ReactiveMessageConsumerSpec) ReflectionTestUtils.getField(consumer, "consumerSpec");
342+
var topicNamesKey = !ObjectUtils.isEmpty(consumerSpec.getTopicNames()) ? consumerSpec.getTopicNames().get(0)
343+
: "no-topics-set";
344+
this.topicNameToConsumerSpec.put(topicNamesKey, consumerSpec);
345+
}
346+
347+
ReactiveMessageConsumerSpec getSpec(String topic) {
348+
return this.topicNameToConsumerSpec.get(topic);
349+
}
350+
351+
}
352+
233353
}

0 commit comments

Comments
 (0)