Skip to content

Commit 0012125

Browse files
authored
Add IT for config props driven listener (#822)
This commit adds an integration test to verify the following Spring Boot config props can be used to configure `@PulsarListener` and `@ReactivePulsarListener`: - `spring.pulsar.consumer.topic` - `spring.pulsar.consumer.subscription.name` - `spring.pulsar.consumer.subscription.type` See spring-projects/spring-boot#42053
1 parent a518e11 commit 0012125

File tree

2 files changed

+177
-16
lines changed

2 files changed

+177
-16
lines changed

integration-tests/src/intTest/java/org/springframework/pulsar/inttest/listener/PulsarListenerIntegrationTests.java

Lines changed: 57 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,13 @@
2222
import java.util.concurrent.CountDownLatch;
2323
import java.util.concurrent.TimeUnit;
2424

25+
import org.apache.pulsar.client.api.Consumer;
2526
import org.apache.pulsar.client.api.Schema;
27+
import org.apache.pulsar.client.api.SubscriptionType;
28+
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
2629
import org.apache.pulsar.common.schema.SchemaType;
30+
import org.assertj.core.api.InstanceOfAssertFactories;
31+
import org.junit.jupiter.api.Nested;
2732
import org.junit.jupiter.api.Test;
2833

2934
import org.springframework.boot.SpringApplication;
@@ -62,12 +67,11 @@ class PulsarListenerIntegrationTests implements PulsarTestContainerSupport {
6267
void basicPulsarListener() throws Exception {
6368
SpringApplication app = new SpringApplication(BasicListenerConfig.class);
6469
app.setWebApplicationType(WebApplicationType.NONE);
65-
6670
try (ConfigurableApplicationContext context = app
6771
.run("--spring.pulsar.client.serviceUrl=" + PulsarTestContainerSupport.getPulsarBrokerUrl())) {
6872
@SuppressWarnings("unchecked")
6973
PulsarTemplate<String> pulsarTemplate = context.getBean(PulsarTemplate.class);
70-
pulsarTemplate.send("plt-basic-topic", "John Doe");
74+
pulsarTemplate.send("plit-basic-topic", "John Doe");
7175
assertThat(LATCH_1.await(20, TimeUnit.SECONDS)).isTrue();
7276
}
7377
}
@@ -76,12 +80,11 @@ void basicPulsarListener() throws Exception {
7680
void basicPulsarListenerCustomType() throws Exception {
7781
SpringApplication app = new SpringApplication(BasicListenerCustomTypeConfig.class);
7882
app.setWebApplicationType(WebApplicationType.NONE);
79-
8083
try (ConfigurableApplicationContext context = app
8184
.run("--spring.pulsar.client.serviceUrl=" + PulsarTestContainerSupport.getPulsarBrokerUrl())) {
8285
@SuppressWarnings("unchecked")
8386
PulsarTemplate<Foo> pulsarTemplate = context.getBean(PulsarTemplate.class);
84-
pulsarTemplate.send("plt-foo-topic1", new Foo("John Doe"), Schema.JSON(Foo.class));
87+
pulsarTemplate.send("plit-foo-topic1", new Foo("John Doe"), Schema.JSON(Foo.class));
8588
assertThat(LATCH_2.await(20, TimeUnit.SECONDS)).isTrue();
8689
}
8790
}
@@ -90,12 +93,11 @@ void basicPulsarListenerCustomType() throws Exception {
9093
void basicPulsarListenerCustomTypeWithTypeMapping() throws Exception {
9194
SpringApplication app = new SpringApplication(BasicListenerCustomTypeWithTypeMappingConfig.class);
9295
app.setWebApplicationType(WebApplicationType.NONE);
93-
9496
try (ConfigurableApplicationContext context = app
9597
.run("--spring.pulsar.client.serviceUrl=" + PulsarTestContainerSupport.getPulsarBrokerUrl())) {
9698
@SuppressWarnings("unchecked")
9799
PulsarTemplate<Foo> pulsarTemplate = context.getBean(PulsarTemplate.class);
98-
pulsarTemplate.send("plt-foo-topic2", new Foo("John Doe"));
100+
pulsarTemplate.send("plit-foo-topic2", new Foo("John Doe"));
99101
assertThat(LATCH_3.await(20, TimeUnit.SECONDS)).isTrue();
100102
}
101103
}
@@ -104,12 +106,11 @@ void basicPulsarListenerCustomTypeWithTypeMapping() throws Exception {
104106
void basicPulsarListenerWithTopicMapping() throws Exception {
105107
SpringApplication app = new SpringApplication(BasicListenerWithTopicMappingConfig.class);
106108
app.setWebApplicationType(WebApplicationType.NONE);
107-
108109
try (ConfigurableApplicationContext context = app
109110
.run("--spring.pulsar.client.serviceUrl=" + PulsarTestContainerSupport.getPulsarBrokerUrl())) {
110111
@SuppressWarnings("unchecked")
111112
PulsarTemplate<Foo> pulsarTemplate = context.getBean(PulsarTemplate.class);
112-
pulsarTemplate.send("plt-topicMapping-topic", new Foo("Crazy8z"), Schema.JSON(Foo.class));
113+
pulsarTemplate.send("plit-topicMapping-topic", new Foo("Crazy8z"), Schema.JSON(Foo.class));
113114
assertThat(LATCH_4.await(20, TimeUnit.SECONDS)).isTrue();
114115
}
115116
}
@@ -118,23 +119,63 @@ void basicPulsarListenerWithTopicMapping() throws Exception {
118119
void batchPulsarListener() throws Exception {
119120
SpringApplication app = new SpringApplication(BatchListenerConfig.class);
120121
app.setWebApplicationType(WebApplicationType.NONE);
121-
122122
try (ConfigurableApplicationContext context = app
123123
.run("--spring.pulsar.client.serviceUrl=" + PulsarTestContainerSupport.getPulsarBrokerUrl())) {
124124
@SuppressWarnings("unchecked")
125125
PulsarTemplate<String> pulsarTemplate = context.getBean(PulsarTemplate.class);
126126
for (int i = 0; i < 10; i++) {
127-
pulsarTemplate.send("plt-batch-topic", "John Doe");
127+
pulsarTemplate.send("plit-batch-topic", "John Doe");
128128
}
129129
assertThat(LATCH_5.await(10, TimeUnit.SECONDS)).isTrue();
130130
}
131131
}
132132

133+
@Nested
134+
class ConfigPropsDrivenListener {
135+
136+
private static final CountDownLatch LATCH_CONFIG_PROPS = new CountDownLatch(1);
137+
138+
@Test
139+
void subscriptionConfigPropsAreRespectedOnListener() throws Exception {
140+
SpringApplication app = new SpringApplication(ConfigPropsDrivenListenerConfig.class);
141+
app.setWebApplicationType(WebApplicationType.NONE);
142+
try (ConfigurableApplicationContext context = app.run(
143+
"--spring.pulsar.client.serviceUrl=" + PulsarTestContainerSupport.getPulsarBrokerUrl(),
144+
"--my.env=dev", "--spring.pulsar.consumer.topics=plit-config-props-topic-${my.env}",
145+
"--spring.pulsar.consumer.subscription.type=Shared",
146+
"--spring.pulsar.consumer.subscription.name=plit-config-props-subs-${my.env}")) {
147+
@SuppressWarnings("unchecked")
148+
PulsarTemplate<String> pulsarTemplate = context.getBean(PulsarTemplate.class);
149+
pulsarTemplate.send("plit-config-props-topic-dev", "hello config props driven");
150+
assertThat(LATCH_CONFIG_PROPS.await(10, TimeUnit.SECONDS)).isTrue();
151+
}
152+
153+
}
154+
155+
@EnableAutoConfiguration
156+
@SpringBootConfiguration
157+
static class ConfigPropsDrivenListenerConfig {
158+
159+
@PulsarListener
160+
public void listen(String ignored, Consumer<String> consumer) {
161+
assertThat(consumer).extracting("conf", InstanceOfAssertFactories.type(ConsumerConfigurationData.class))
162+
.satisfies((conf) -> {
163+
assertThat(conf.getSingleTopic()).isEqualTo("plit-config-props-topic-dev");
164+
assertThat(conf.getSubscriptionType()).isEqualTo(SubscriptionType.Shared);
165+
assertThat(conf.getSubscriptionName()).isEqualTo("plit-config-props-subs-dev");
166+
});
167+
LATCH_CONFIG_PROPS.countDown();
168+
}
169+
170+
}
171+
172+
}
173+
133174
@EnableAutoConfiguration
134175
@SpringBootConfiguration
135176
static class BasicListenerConfig {
136177

137-
@PulsarListener(subscriptionName = "plt-basic-sub", topics = "plt-basic-topic")
178+
@PulsarListener(subscriptionName = "plit-basic-sub", topics = "plit-basic-topic")
138179
public void listen(String ignored) {
139180
LATCH_1.countDown();
140181
}
@@ -145,7 +186,7 @@ public void listen(String ignored) {
145186
@SpringBootConfiguration
146187
static class BasicListenerCustomTypeConfig {
147188

148-
@PulsarListener(subscriptionName = "plt-foo-sub1", topics = "plt-foo-topic1", schemaType = SchemaType.JSON)
189+
@PulsarListener(subscriptionName = "plit-foo-sub1", topics = "plit-foo-topic1", schemaType = SchemaType.JSON)
149190
public void listen(Foo ignored) {
150191
LATCH_2.countDown();
151192
}
@@ -163,7 +204,7 @@ SchemaResolver customSchemaResolver() {
163204
return resolver;
164205
}
165206

166-
@PulsarListener(subscriptionName = "plt-foo-sub2", topics = "plt-foo-topic2")
207+
@PulsarListener(subscriptionName = "plit-foo-sub2", topics = "plit-foo-topic2")
167208
public void listen(Foo ignored) {
168209
LATCH_3.countDown();
169210
}
@@ -177,11 +218,11 @@ static class BasicListenerWithTopicMappingConfig {
177218
@Bean
178219
TopicResolver customTopicResolver() {
179220
DefaultTopicResolver resolver = new DefaultTopicResolver();
180-
resolver.addCustomTopicMapping(Foo.class, "plt-topicMapping-topic");
221+
resolver.addCustomTopicMapping(Foo.class, "plit-topicMapping-topic");
181222
return resolver;
182223
}
183224

184-
@PulsarListener(subscriptionName = "plt-topicMapping-sub", schemaType = SchemaType.JSON)
225+
@PulsarListener(subscriptionName = "plit-topicMapping-sub", schemaType = SchemaType.JSON)
185226
public void listen(Foo ignored) {
186227
LATCH_4.countDown();
187228
}
@@ -192,7 +233,7 @@ public void listen(Foo ignored) {
192233
@SpringBootConfiguration
193234
static class BatchListenerConfig {
194235

195-
@PulsarListener(subscriptionName = "plt-batch-sub", topics = "plt-batch-topic", batch = true)
236+
@PulsarListener(subscriptionName = "plit-batch-sub", topics = "plit-batch-topic", batch = true)
196237
public void listen(List<String> foo) {
197238
foo.forEach(t -> LATCH_5.countDown());
198239
}

integration-tests/src/intTest/java/org/springframework/pulsar/inttest/listener/ReactivePulsarListenerIntegrationTests.java

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,25 @@
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
2020

21+
import java.util.HashMap;
22+
import java.util.List;
23+
import java.util.Map;
2124
import java.util.concurrent.CountDownLatch;
2225
import java.util.concurrent.TimeUnit;
2326

2427
import org.apache.pulsar.client.api.Message;
2528
import org.apache.pulsar.client.api.Schema;
2629
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
30+
import org.apache.pulsar.client.api.SubscriptionType;
2731
import org.apache.pulsar.common.schema.SchemaType;
2832
import org.apache.pulsar.reactive.client.api.MessageResult;
33+
import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumer;
34+
import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumerSpec;
35+
import org.junit.jupiter.api.Nested;
2936
import org.junit.jupiter.api.Test;
3037

38+
import org.springframework.beans.BeansException;
39+
import org.springframework.beans.factory.config.BeanPostProcessor;
3140
import org.springframework.boot.SpringApplication;
3241
import org.springframework.boot.SpringBootConfiguration;
3342
import org.springframework.boot.WebApplicationType;
@@ -43,8 +52,12 @@
4352
import org.springframework.pulsar.core.TopicResolver;
4453
import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener;
4554
import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListenerMessageConsumerBuilderCustomizer;
55+
import org.springframework.pulsar.reactive.core.ReactiveMessageConsumerBuilderCustomizer;
56+
import org.springframework.pulsar.reactive.core.ReactivePulsarConsumerFactory;
4657
import org.springframework.pulsar.reactive.core.ReactivePulsarTemplate;
4758
import org.springframework.pulsar.test.support.PulsarTestContainerSupport;
59+
import org.springframework.test.util.ReflectionTestUtils;
60+
import org.springframework.util.ObjectUtils;
4861

4962
import reactor.core.publisher.Flux;
5063
import reactor.core.publisher.Mono;
@@ -134,6 +147,75 @@ void fluxListener() throws Exception {
134147
}
135148
}
136149

150+
@Nested
151+
class ConfigPropsDrivenListener {
152+
153+
private static final CountDownLatch LATCH_CONFIG_PROPS = new CountDownLatch(1);
154+
155+
@Test
156+
void subscriptionConfigPropsAreRespectedOnListener() throws Exception {
157+
SpringApplication app = new SpringApplication(ConfigPropsDrivenListenerConfig.class);
158+
app.setWebApplicationType(WebApplicationType.NONE);
159+
try (ConfigurableApplicationContext context = app.run(
160+
"--spring.pulsar.client.serviceUrl=" + PulsarTestContainerSupport.getPulsarBrokerUrl(),
161+
"--my.env=dev", "--spring.pulsar.consumer.topics=rplit-config-props-topic-${my.env}",
162+
"--spring.pulsar.consumer.subscription.type=Shared",
163+
"--spring.pulsar.consumer.subscription.name=rplit-config-props-subs-${my.env}")) {
164+
var topic = "persistent://public/default/rplit-config-props-topic-dev";
165+
@SuppressWarnings("unchecked")
166+
ReactivePulsarTemplate<String> pulsarTemplate = context.getBean(ReactivePulsarTemplate.class);
167+
pulsarTemplate.send(topic, "hello config props driven").block();
168+
assertThat(LATCH_CONFIG_PROPS.await(10, TimeUnit.SECONDS)).isTrue();
169+
@SuppressWarnings("unchecked")
170+
ConsumerTrackingReactivePulsarConsumerFactory<String> consumerFactory = (ConsumerTrackingReactivePulsarConsumerFactory<String>) context
171+
.getBean(ReactivePulsarConsumerFactory.class);
172+
assertThat(consumerFactory.getSpec(topic)).isNotNull().satisfies((consumerSpec) -> {
173+
assertThat(consumerSpec.getTopicNames()).containsExactly(topic);
174+
assertThat(consumerSpec.getSubscriptionName()).isEqualTo("rplit-config-props-subs-dev");
175+
assertThat(consumerSpec.getSubscriptionType()).isEqualTo(SubscriptionType.Shared);
176+
});
177+
}
178+
}
179+
180+
@EnableAutoConfiguration
181+
@SpringBootConfiguration
182+
@Import(ConsumerCustomizerConfig.class)
183+
static class ConfigPropsDrivenListenerConfig {
184+
185+
/**
186+
* Post process the Reactive consumer factory and replace it with a tracking
187+
* wrapper around it. Because this test requires the Spring Boot config props
188+
* to be applied to the auto-configured consumer factory we can't simply
189+
* replace the consumer factory bean as the config props will not be set on
190+
* the custom consumer factory.
191+
* @return post processor to wrap a tracker around the reactive consumer
192+
* factory
193+
*/
194+
@Bean
195+
static BeanPostProcessor consumerTrackingConsumerFactory() {
196+
return new BeanPostProcessor() {
197+
@SuppressWarnings({ "rawtypes", "unchecked" })
198+
@Override
199+
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
200+
if (bean instanceof ReactivePulsarConsumerFactory rcf) {
201+
return new ConsumerTrackingReactivePulsarConsumerFactory<>(
202+
(ReactivePulsarConsumerFactory<String>) rcf);
203+
}
204+
return bean;
205+
}
206+
};
207+
}
208+
209+
@ReactivePulsarListener(consumerCustomizer = "consumerCustomizer")
210+
public Mono<Void> listen(String ignored) {
211+
LATCH_CONFIG_PROPS.countDown();
212+
return Mono.empty();
213+
}
214+
215+
}
216+
217+
}
218+
137219
@EnableAutoConfiguration
138220
@SpringBootConfiguration
139221
@Import(ConsumerCustomizerConfig.class)
@@ -230,4 +312,42 @@ ReactivePulsarListenerMessageConsumerBuilderCustomizer<String> consumerCustomize
230312
record Foo(String value) {
231313
}
232314

315+
static class ConsumerTrackingReactivePulsarConsumerFactory<T> implements ReactivePulsarConsumerFactory<T> {
316+
317+
private Map<String, ReactiveMessageConsumerSpec> topicNameToConsumerSpec = new HashMap<>();
318+
319+
private ReactivePulsarConsumerFactory<T> delegate;
320+
321+
ConsumerTrackingReactivePulsarConsumerFactory(ReactivePulsarConsumerFactory<T> delegate) {
322+
this.delegate = delegate;
323+
}
324+
325+
@Override
326+
public ReactiveMessageConsumer<T> createConsumer(Schema<T> schema) {
327+
var consumer = this.delegate.createConsumer(schema);
328+
storeSpec(consumer);
329+
return consumer;
330+
}
331+
332+
@Override
333+
public ReactiveMessageConsumer<T> createConsumer(Schema<T> schema,
334+
List<ReactiveMessageConsumerBuilderCustomizer<T>> reactiveMessageConsumerBuilderCustomizers) {
335+
var consumer = this.delegate.createConsumer(schema, reactiveMessageConsumerBuilderCustomizers);
336+
storeSpec(consumer);
337+
return consumer;
338+
}
339+
340+
private void storeSpec(ReactiveMessageConsumer<T> consumer) {
341+
var consumerSpec = (ReactiveMessageConsumerSpec) ReflectionTestUtils.getField(consumer, "consumerSpec");
342+
var topicNamesKey = !ObjectUtils.isEmpty(consumerSpec.getTopicNames()) ? consumerSpec.getTopicNames().get(0)
343+
: "no-topics-set";
344+
this.topicNameToConsumerSpec.put(topicNamesKey, consumerSpec);
345+
}
346+
347+
ReactiveMessageConsumerSpec getSpec(String topic) {
348+
return this.topicNameToConsumerSpec.get(topic);
349+
}
350+
351+
}
352+
233353
}

0 commit comments

Comments
 (0)