diff --git a/integration-tests/src/intTest/java/org/springframework/pulsar/inttest/listener/PulsarListenerIntegrationTests.java b/integration-tests/src/intTest/java/org/springframework/pulsar/inttest/listener/PulsarListenerIntegrationTests.java index ad87421ba..f88549196 100644 --- a/integration-tests/src/intTest/java/org/springframework/pulsar/inttest/listener/PulsarListenerIntegrationTests.java +++ b/integration-tests/src/intTest/java/org/springframework/pulsar/inttest/listener/PulsarListenerIntegrationTests.java @@ -22,8 +22,13 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.pulsar.common.schema.SchemaType; +import org.assertj.core.api.InstanceOfAssertFactories; +import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.springframework.boot.SpringApplication; @@ -62,12 +67,11 @@ class PulsarListenerIntegrationTests implements PulsarTestContainerSupport { void basicPulsarListener() throws Exception { SpringApplication app = new SpringApplication(BasicListenerConfig.class); app.setWebApplicationType(WebApplicationType.NONE); - try (ConfigurableApplicationContext context = app .run("--spring.pulsar.client.serviceUrl=" + PulsarTestContainerSupport.getPulsarBrokerUrl())) { @SuppressWarnings("unchecked") PulsarTemplate pulsarTemplate = context.getBean(PulsarTemplate.class); - pulsarTemplate.send("plt-basic-topic", "John Doe"); + pulsarTemplate.send("plit-basic-topic", "John Doe"); assertThat(LATCH_1.await(20, TimeUnit.SECONDS)).isTrue(); } } @@ -76,12 +80,11 @@ void basicPulsarListener() throws Exception { void basicPulsarListenerCustomType() throws Exception { SpringApplication app = new SpringApplication(BasicListenerCustomTypeConfig.class); app.setWebApplicationType(WebApplicationType.NONE); - try (ConfigurableApplicationContext context = app .run("--spring.pulsar.client.serviceUrl=" + PulsarTestContainerSupport.getPulsarBrokerUrl())) { @SuppressWarnings("unchecked") PulsarTemplate pulsarTemplate = context.getBean(PulsarTemplate.class); - pulsarTemplate.send("plt-foo-topic1", new Foo("John Doe"), Schema.JSON(Foo.class)); + pulsarTemplate.send("plit-foo-topic1", new Foo("John Doe"), Schema.JSON(Foo.class)); assertThat(LATCH_2.await(20, TimeUnit.SECONDS)).isTrue(); } } @@ -90,12 +93,11 @@ void basicPulsarListenerCustomType() throws Exception { void basicPulsarListenerCustomTypeWithTypeMapping() throws Exception { SpringApplication app = new SpringApplication(BasicListenerCustomTypeWithTypeMappingConfig.class); app.setWebApplicationType(WebApplicationType.NONE); - try (ConfigurableApplicationContext context = app .run("--spring.pulsar.client.serviceUrl=" + PulsarTestContainerSupport.getPulsarBrokerUrl())) { @SuppressWarnings("unchecked") PulsarTemplate pulsarTemplate = context.getBean(PulsarTemplate.class); - pulsarTemplate.send("plt-foo-topic2", new Foo("John Doe")); + pulsarTemplate.send("plit-foo-topic2", new Foo("John Doe")); assertThat(LATCH_3.await(20, TimeUnit.SECONDS)).isTrue(); } } @@ -104,12 +106,11 @@ void basicPulsarListenerCustomTypeWithTypeMapping() throws Exception { void basicPulsarListenerWithTopicMapping() throws Exception { SpringApplication app = new SpringApplication(BasicListenerWithTopicMappingConfig.class); app.setWebApplicationType(WebApplicationType.NONE); - try (ConfigurableApplicationContext context = app .run("--spring.pulsar.client.serviceUrl=" + PulsarTestContainerSupport.getPulsarBrokerUrl())) { @SuppressWarnings("unchecked") PulsarTemplate pulsarTemplate = context.getBean(PulsarTemplate.class); - pulsarTemplate.send("plt-topicMapping-topic", new Foo("Crazy8z"), Schema.JSON(Foo.class)); + pulsarTemplate.send("plit-topicMapping-topic", new Foo("Crazy8z"), Schema.JSON(Foo.class)); assertThat(LATCH_4.await(20, TimeUnit.SECONDS)).isTrue(); } } @@ -118,23 +119,63 @@ void basicPulsarListenerWithTopicMapping() throws Exception { void batchPulsarListener() throws Exception { SpringApplication app = new SpringApplication(BatchListenerConfig.class); app.setWebApplicationType(WebApplicationType.NONE); - try (ConfigurableApplicationContext context = app .run("--spring.pulsar.client.serviceUrl=" + PulsarTestContainerSupport.getPulsarBrokerUrl())) { @SuppressWarnings("unchecked") PulsarTemplate pulsarTemplate = context.getBean(PulsarTemplate.class); for (int i = 0; i < 10; i++) { - pulsarTemplate.send("plt-batch-topic", "John Doe"); + pulsarTemplate.send("plit-batch-topic", "John Doe"); } assertThat(LATCH_5.await(10, TimeUnit.SECONDS)).isTrue(); } } + @Nested + class ConfigPropsDrivenListener { + + private static final CountDownLatch LATCH_CONFIG_PROPS = new CountDownLatch(1); + + @Test + void subscriptionConfigPropsAreRespectedOnListener() throws Exception { + SpringApplication app = new SpringApplication(ConfigPropsDrivenListenerConfig.class); + app.setWebApplicationType(WebApplicationType.NONE); + try (ConfigurableApplicationContext context = app.run( + "--spring.pulsar.client.serviceUrl=" + PulsarTestContainerSupport.getPulsarBrokerUrl(), + "--my.env=dev", "--spring.pulsar.consumer.topics=plit-config-props-topic-${my.env}", + "--spring.pulsar.consumer.subscription.type=Shared", + "--spring.pulsar.consumer.subscription.name=plit-config-props-subs-${my.env}")) { + @SuppressWarnings("unchecked") + PulsarTemplate pulsarTemplate = context.getBean(PulsarTemplate.class); + pulsarTemplate.send("plit-config-props-topic-dev", "hello config props driven"); + assertThat(LATCH_CONFIG_PROPS.await(10, TimeUnit.SECONDS)).isTrue(); + } + + } + + @EnableAutoConfiguration + @SpringBootConfiguration + static class ConfigPropsDrivenListenerConfig { + + @PulsarListener + public void listen(String ignored, Consumer consumer) { + assertThat(consumer).extracting("conf", InstanceOfAssertFactories.type(ConsumerConfigurationData.class)) + .satisfies((conf) -> { + assertThat(conf.getSingleTopic()).isEqualTo("plit-config-props-topic-dev"); + assertThat(conf.getSubscriptionType()).isEqualTo(SubscriptionType.Shared); + assertThat(conf.getSubscriptionName()).isEqualTo("plit-config-props-subs-dev"); + }); + LATCH_CONFIG_PROPS.countDown(); + } + + } + + } + @EnableAutoConfiguration @SpringBootConfiguration static class BasicListenerConfig { - @PulsarListener(subscriptionName = "plt-basic-sub", topics = "plt-basic-topic") + @PulsarListener(subscriptionName = "plit-basic-sub", topics = "plit-basic-topic") public void listen(String ignored) { LATCH_1.countDown(); } @@ -145,7 +186,7 @@ public void listen(String ignored) { @SpringBootConfiguration static class BasicListenerCustomTypeConfig { - @PulsarListener(subscriptionName = "plt-foo-sub1", topics = "plt-foo-topic1", schemaType = SchemaType.JSON) + @PulsarListener(subscriptionName = "plit-foo-sub1", topics = "plit-foo-topic1", schemaType = SchemaType.JSON) public void listen(Foo ignored) { LATCH_2.countDown(); } @@ -163,7 +204,7 @@ SchemaResolver customSchemaResolver() { return resolver; } - @PulsarListener(subscriptionName = "plt-foo-sub2", topics = "plt-foo-topic2") + @PulsarListener(subscriptionName = "plit-foo-sub2", topics = "plit-foo-topic2") public void listen(Foo ignored) { LATCH_3.countDown(); } @@ -177,11 +218,11 @@ static class BasicListenerWithTopicMappingConfig { @Bean TopicResolver customTopicResolver() { DefaultTopicResolver resolver = new DefaultTopicResolver(); - resolver.addCustomTopicMapping(Foo.class, "plt-topicMapping-topic"); + resolver.addCustomTopicMapping(Foo.class, "plit-topicMapping-topic"); return resolver; } - @PulsarListener(subscriptionName = "plt-topicMapping-sub", schemaType = SchemaType.JSON) + @PulsarListener(subscriptionName = "plit-topicMapping-sub", schemaType = SchemaType.JSON) public void listen(Foo ignored) { LATCH_4.countDown(); } @@ -192,7 +233,7 @@ public void listen(Foo ignored) { @SpringBootConfiguration static class BatchListenerConfig { - @PulsarListener(subscriptionName = "plt-batch-sub", topics = "plt-batch-topic", batch = true) + @PulsarListener(subscriptionName = "plit-batch-sub", topics = "plit-batch-topic", batch = true) public void listen(List foo) { foo.forEach(t -> LATCH_5.countDown()); } diff --git a/integration-tests/src/intTest/java/org/springframework/pulsar/inttest/listener/ReactivePulsarListenerIntegrationTests.java b/integration-tests/src/intTest/java/org/springframework/pulsar/inttest/listener/ReactivePulsarListenerIntegrationTests.java index c883bedb2..9e9493661 100644 --- a/integration-tests/src/intTest/java/org/springframework/pulsar/inttest/listener/ReactivePulsarListenerIntegrationTests.java +++ b/integration-tests/src/intTest/java/org/springframework/pulsar/inttest/listener/ReactivePulsarListenerIntegrationTests.java @@ -18,16 +18,25 @@ import static org.assertj.core.api.Assertions.assertThat; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.reactive.client.api.MessageResult; +import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumer; +import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumerSpec; +import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.config.BeanPostProcessor; import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringBootConfiguration; import org.springframework.boot.WebApplicationType; @@ -43,8 +52,12 @@ import org.springframework.pulsar.core.TopicResolver; import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener; import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListenerMessageConsumerBuilderCustomizer; +import org.springframework.pulsar.reactive.core.ReactiveMessageConsumerBuilderCustomizer; +import org.springframework.pulsar.reactive.core.ReactivePulsarConsumerFactory; import org.springframework.pulsar.reactive.core.ReactivePulsarTemplate; import org.springframework.pulsar.test.support.PulsarTestContainerSupport; +import org.springframework.test.util.ReflectionTestUtils; +import org.springframework.util.ObjectUtils; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -134,6 +147,75 @@ void fluxListener() throws Exception { } } + @Nested + class ConfigPropsDrivenListener { + + private static final CountDownLatch LATCH_CONFIG_PROPS = new CountDownLatch(1); + + @Test + void subscriptionConfigPropsAreRespectedOnListener() throws Exception { + SpringApplication app = new SpringApplication(ConfigPropsDrivenListenerConfig.class); + app.setWebApplicationType(WebApplicationType.NONE); + try (ConfigurableApplicationContext context = app.run( + "--spring.pulsar.client.serviceUrl=" + PulsarTestContainerSupport.getPulsarBrokerUrl(), + "--my.env=dev", "--spring.pulsar.consumer.topics=rplit-config-props-topic-${my.env}", + "--spring.pulsar.consumer.subscription.type=Shared", + "--spring.pulsar.consumer.subscription.name=rplit-config-props-subs-${my.env}")) { + var topic = "persistent://public/default/rplit-config-props-topic-dev"; + @SuppressWarnings("unchecked") + ReactivePulsarTemplate pulsarTemplate = context.getBean(ReactivePulsarTemplate.class); + pulsarTemplate.send(topic, "hello config props driven").block(); + assertThat(LATCH_CONFIG_PROPS.await(10, TimeUnit.SECONDS)).isTrue(); + @SuppressWarnings("unchecked") + ConsumerTrackingReactivePulsarConsumerFactory consumerFactory = (ConsumerTrackingReactivePulsarConsumerFactory) context + .getBean(ReactivePulsarConsumerFactory.class); + assertThat(consumerFactory.getSpec(topic)).isNotNull().satisfies((consumerSpec) -> { + assertThat(consumerSpec.getTopicNames()).containsExactly(topic); + assertThat(consumerSpec.getSubscriptionName()).isEqualTo("rplit-config-props-subs-dev"); + assertThat(consumerSpec.getSubscriptionType()).isEqualTo(SubscriptionType.Shared); + }); + } + } + + @EnableAutoConfiguration + @SpringBootConfiguration + @Import(ConsumerCustomizerConfig.class) + static class ConfigPropsDrivenListenerConfig { + + /** + * Post process the Reactive consumer factory and replace it with a tracking + * wrapper around it. Because this test requires the Spring Boot config props + * to be applied to the auto-configured consumer factory we can't simply + * replace the consumer factory bean as the config props will not be set on + * the custom consumer factory. + * @return post processor to wrap a tracker around the reactive consumer + * factory + */ + @Bean + static BeanPostProcessor consumerTrackingConsumerFactory() { + return new BeanPostProcessor() { + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Override + public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { + if (bean instanceof ReactivePulsarConsumerFactory rcf) { + return new ConsumerTrackingReactivePulsarConsumerFactory<>( + (ReactivePulsarConsumerFactory) rcf); + } + return bean; + } + }; + } + + @ReactivePulsarListener(consumerCustomizer = "consumerCustomizer") + public Mono listen(String ignored) { + LATCH_CONFIG_PROPS.countDown(); + return Mono.empty(); + } + + } + + } + @EnableAutoConfiguration @SpringBootConfiguration @Import(ConsumerCustomizerConfig.class) @@ -230,4 +312,42 @@ ReactivePulsarListenerMessageConsumerBuilderCustomizer consumerCustomize record Foo(String value) { } + static class ConsumerTrackingReactivePulsarConsumerFactory implements ReactivePulsarConsumerFactory { + + private Map topicNameToConsumerSpec = new HashMap<>(); + + private ReactivePulsarConsumerFactory delegate; + + ConsumerTrackingReactivePulsarConsumerFactory(ReactivePulsarConsumerFactory delegate) { + this.delegate = delegate; + } + + @Override + public ReactiveMessageConsumer createConsumer(Schema schema) { + var consumer = this.delegate.createConsumer(schema); + storeSpec(consumer); + return consumer; + } + + @Override + public ReactiveMessageConsumer createConsumer(Schema schema, + List> reactiveMessageConsumerBuilderCustomizers) { + var consumer = this.delegate.createConsumer(schema, reactiveMessageConsumerBuilderCustomizers); + storeSpec(consumer); + return consumer; + } + + private void storeSpec(ReactiveMessageConsumer consumer) { + var consumerSpec = (ReactiveMessageConsumerSpec) ReflectionTestUtils.getField(consumer, "consumerSpec"); + var topicNamesKey = !ObjectUtils.isEmpty(consumerSpec.getTopicNames()) ? consumerSpec.getTopicNames().get(0) + : "no-topics-set"; + this.topicNameToConsumerSpec.put(topicNamesKey, consumerSpec); + } + + ReactiveMessageConsumerSpec getSpec(String topic) { + return this.topicNameToConsumerSpec.get(topic); + } + + } + }