Skip to content

Add IT for config props driven listeners #822

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,13 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.common.schema.SchemaType;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;

import org.springframework.boot.SpringApplication;
Expand Down Expand Up @@ -62,12 +67,11 @@ class PulsarListenerIntegrationTests implements PulsarTestContainerSupport {
void basicPulsarListener() throws Exception {
SpringApplication app = new SpringApplication(BasicListenerConfig.class);
app.setWebApplicationType(WebApplicationType.NONE);

try (ConfigurableApplicationContext context = app
.run("--spring.pulsar.client.serviceUrl=" + PulsarTestContainerSupport.getPulsarBrokerUrl())) {
@SuppressWarnings("unchecked")
PulsarTemplate<String> pulsarTemplate = context.getBean(PulsarTemplate.class);
pulsarTemplate.send("plt-basic-topic", "John Doe");
pulsarTemplate.send("plit-basic-topic", "John Doe");
assertThat(LATCH_1.await(20, TimeUnit.SECONDS)).isTrue();
}
}
Expand All @@ -76,12 +80,11 @@ void basicPulsarListener() throws Exception {
void basicPulsarListenerCustomType() throws Exception {
SpringApplication app = new SpringApplication(BasicListenerCustomTypeConfig.class);
app.setWebApplicationType(WebApplicationType.NONE);

try (ConfigurableApplicationContext context = app
.run("--spring.pulsar.client.serviceUrl=" + PulsarTestContainerSupport.getPulsarBrokerUrl())) {
@SuppressWarnings("unchecked")
PulsarTemplate<Foo> pulsarTemplate = context.getBean(PulsarTemplate.class);
pulsarTemplate.send("plt-foo-topic1", new Foo("John Doe"), Schema.JSON(Foo.class));
pulsarTemplate.send("plit-foo-topic1", new Foo("John Doe"), Schema.JSON(Foo.class));
assertThat(LATCH_2.await(20, TimeUnit.SECONDS)).isTrue();
}
}
Expand All @@ -90,12 +93,11 @@ void basicPulsarListenerCustomType() throws Exception {
void basicPulsarListenerCustomTypeWithTypeMapping() throws Exception {
SpringApplication app = new SpringApplication(BasicListenerCustomTypeWithTypeMappingConfig.class);
app.setWebApplicationType(WebApplicationType.NONE);

try (ConfigurableApplicationContext context = app
.run("--spring.pulsar.client.serviceUrl=" + PulsarTestContainerSupport.getPulsarBrokerUrl())) {
@SuppressWarnings("unchecked")
PulsarTemplate<Foo> pulsarTemplate = context.getBean(PulsarTemplate.class);
pulsarTemplate.send("plt-foo-topic2", new Foo("John Doe"));
pulsarTemplate.send("plit-foo-topic2", new Foo("John Doe"));
assertThat(LATCH_3.await(20, TimeUnit.SECONDS)).isTrue();
}
}
Expand All @@ -104,12 +106,11 @@ void basicPulsarListenerCustomTypeWithTypeMapping() throws Exception {
void basicPulsarListenerWithTopicMapping() throws Exception {
SpringApplication app = new SpringApplication(BasicListenerWithTopicMappingConfig.class);
app.setWebApplicationType(WebApplicationType.NONE);

try (ConfigurableApplicationContext context = app
.run("--spring.pulsar.client.serviceUrl=" + PulsarTestContainerSupport.getPulsarBrokerUrl())) {
@SuppressWarnings("unchecked")
PulsarTemplate<Foo> pulsarTemplate = context.getBean(PulsarTemplate.class);
pulsarTemplate.send("plt-topicMapping-topic", new Foo("Crazy8z"), Schema.JSON(Foo.class));
pulsarTemplate.send("plit-topicMapping-topic", new Foo("Crazy8z"), Schema.JSON(Foo.class));
assertThat(LATCH_4.await(20, TimeUnit.SECONDS)).isTrue();
}
}
Expand All @@ -118,23 +119,63 @@ void basicPulsarListenerWithTopicMapping() throws Exception {
void batchPulsarListener() throws Exception {
SpringApplication app = new SpringApplication(BatchListenerConfig.class);
app.setWebApplicationType(WebApplicationType.NONE);

try (ConfigurableApplicationContext context = app
.run("--spring.pulsar.client.serviceUrl=" + PulsarTestContainerSupport.getPulsarBrokerUrl())) {
@SuppressWarnings("unchecked")
PulsarTemplate<String> pulsarTemplate = context.getBean(PulsarTemplate.class);
for (int i = 0; i < 10; i++) {
pulsarTemplate.send("plt-batch-topic", "John Doe");
pulsarTemplate.send("plit-batch-topic", "John Doe");
}
assertThat(LATCH_5.await(10, TimeUnit.SECONDS)).isTrue();
}
}

@Nested
class ConfigPropsDrivenListener {

private static final CountDownLatch LATCH_CONFIG_PROPS = new CountDownLatch(1);

@Test
void subscriptionConfigPropsAreRespectedOnListener() throws Exception {
SpringApplication app = new SpringApplication(ConfigPropsDrivenListenerConfig.class);
app.setWebApplicationType(WebApplicationType.NONE);
try (ConfigurableApplicationContext context = app.run(
"--spring.pulsar.client.serviceUrl=" + PulsarTestContainerSupport.getPulsarBrokerUrl(),
"--my.env=dev", "--spring.pulsar.consumer.topics=plit-config-props-topic-${my.env}",
"--spring.pulsar.consumer.subscription.type=Shared",
"--spring.pulsar.consumer.subscription.name=plit-config-props-subs-${my.env}")) {
@SuppressWarnings("unchecked")
PulsarTemplate<String> pulsarTemplate = context.getBean(PulsarTemplate.class);
pulsarTemplate.send("plit-config-props-topic-dev", "hello config props driven");
assertThat(LATCH_CONFIG_PROPS.await(10, TimeUnit.SECONDS)).isTrue();
}

}

@EnableAutoConfiguration
@SpringBootConfiguration
static class ConfigPropsDrivenListenerConfig {

@PulsarListener
public void listen(String ignored, Consumer<String> consumer) {
assertThat(consumer).extracting("conf", InstanceOfAssertFactories.type(ConsumerConfigurationData.class))
.satisfies((conf) -> {
assertThat(conf.getSingleTopic()).isEqualTo("plit-config-props-topic-dev");
assertThat(conf.getSubscriptionType()).isEqualTo(SubscriptionType.Shared);
assertThat(conf.getSubscriptionName()).isEqualTo("plit-config-props-subs-dev");
});
LATCH_CONFIG_PROPS.countDown();
}

}

}

@EnableAutoConfiguration
@SpringBootConfiguration
static class BasicListenerConfig {

@PulsarListener(subscriptionName = "plt-basic-sub", topics = "plt-basic-topic")
@PulsarListener(subscriptionName = "plit-basic-sub", topics = "plit-basic-topic")
public void listen(String ignored) {
LATCH_1.countDown();
}
Expand All @@ -145,7 +186,7 @@ public void listen(String ignored) {
@SpringBootConfiguration
static class BasicListenerCustomTypeConfig {

@PulsarListener(subscriptionName = "plt-foo-sub1", topics = "plt-foo-topic1", schemaType = SchemaType.JSON)
@PulsarListener(subscriptionName = "plit-foo-sub1", topics = "plit-foo-topic1", schemaType = SchemaType.JSON)
public void listen(Foo ignored) {
LATCH_2.countDown();
}
Expand All @@ -163,7 +204,7 @@ SchemaResolver customSchemaResolver() {
return resolver;
}

@PulsarListener(subscriptionName = "plt-foo-sub2", topics = "plt-foo-topic2")
@PulsarListener(subscriptionName = "plit-foo-sub2", topics = "plit-foo-topic2")
public void listen(Foo ignored) {
LATCH_3.countDown();
}
Expand All @@ -177,11 +218,11 @@ static class BasicListenerWithTopicMappingConfig {
@Bean
TopicResolver customTopicResolver() {
DefaultTopicResolver resolver = new DefaultTopicResolver();
resolver.addCustomTopicMapping(Foo.class, "plt-topicMapping-topic");
resolver.addCustomTopicMapping(Foo.class, "plit-topicMapping-topic");
return resolver;
}

@PulsarListener(subscriptionName = "plt-topicMapping-sub", schemaType = SchemaType.JSON)
@PulsarListener(subscriptionName = "plit-topicMapping-sub", schemaType = SchemaType.JSON)
public void listen(Foo ignored) {
LATCH_4.countDown();
}
Expand All @@ -192,7 +233,7 @@ public void listen(Foo ignored) {
@SpringBootConfiguration
static class BatchListenerConfig {

@PulsarListener(subscriptionName = "plt-batch-sub", topics = "plt-batch-topic", batch = true)
@PulsarListener(subscriptionName = "plit-batch-sub", topics = "plit-batch-topic", batch = true)
public void listen(List<String> foo) {
foo.forEach(t -> LATCH_5.countDown());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,25 @@

import static org.assertj.core.api.Assertions.assertThat;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.reactive.client.api.MessageResult;
import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumer;
import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumerSpec;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;

import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.boot.WebApplicationType;
Expand All @@ -43,8 +52,12 @@
import org.springframework.pulsar.core.TopicResolver;
import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener;
import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListenerMessageConsumerBuilderCustomizer;
import org.springframework.pulsar.reactive.core.ReactiveMessageConsumerBuilderCustomizer;
import org.springframework.pulsar.reactive.core.ReactivePulsarConsumerFactory;
import org.springframework.pulsar.reactive.core.ReactivePulsarTemplate;
import org.springframework.pulsar.test.support.PulsarTestContainerSupport;
import org.springframework.test.util.ReflectionTestUtils;
import org.springframework.util.ObjectUtils;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand Down Expand Up @@ -134,6 +147,75 @@ void fluxListener() throws Exception {
}
}

@Nested
class ConfigPropsDrivenListener {

private static final CountDownLatch LATCH_CONFIG_PROPS = new CountDownLatch(1);

@Test
void subscriptionConfigPropsAreRespectedOnListener() throws Exception {
SpringApplication app = new SpringApplication(ConfigPropsDrivenListenerConfig.class);
app.setWebApplicationType(WebApplicationType.NONE);
try (ConfigurableApplicationContext context = app.run(
"--spring.pulsar.client.serviceUrl=" + PulsarTestContainerSupport.getPulsarBrokerUrl(),
"--my.env=dev", "--spring.pulsar.consumer.topics=rplit-config-props-topic-${my.env}",
"--spring.pulsar.consumer.subscription.type=Shared",
"--spring.pulsar.consumer.subscription.name=rplit-config-props-subs-${my.env}")) {
var topic = "persistent://public/default/rplit-config-props-topic-dev";
@SuppressWarnings("unchecked")
ReactivePulsarTemplate<String> pulsarTemplate = context.getBean(ReactivePulsarTemplate.class);
pulsarTemplate.send(topic, "hello config props driven").block();
assertThat(LATCH_CONFIG_PROPS.await(10, TimeUnit.SECONDS)).isTrue();
@SuppressWarnings("unchecked")
ConsumerTrackingReactivePulsarConsumerFactory<String> consumerFactory = (ConsumerTrackingReactivePulsarConsumerFactory<String>) context
.getBean(ReactivePulsarConsumerFactory.class);
assertThat(consumerFactory.getSpec(topic)).isNotNull().satisfies((consumerSpec) -> {
assertThat(consumerSpec.getTopicNames()).containsExactly(topic);
assertThat(consumerSpec.getSubscriptionName()).isEqualTo("rplit-config-props-subs-dev");
assertThat(consumerSpec.getSubscriptionType()).isEqualTo(SubscriptionType.Shared);
});
}
}

@EnableAutoConfiguration
@SpringBootConfiguration
@Import(ConsumerCustomizerConfig.class)
static class ConfigPropsDrivenListenerConfig {

/**
* Post process the Reactive consumer factory and replace it with a tracking
* wrapper around it. Because this test requires the Spring Boot config props
* to be applied to the auto-configured consumer factory we can't simply
* replace the consumer factory bean as the config props will not be set on
* the custom consumer factory.
* @return post processor to wrap a tracker around the reactive consumer
* factory
*/
@Bean
static BeanPostProcessor consumerTrackingConsumerFactory() {
return new BeanPostProcessor() {
@SuppressWarnings({ "rawtypes", "unchecked" })
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof ReactivePulsarConsumerFactory rcf) {
return new ConsumerTrackingReactivePulsarConsumerFactory<>(
(ReactivePulsarConsumerFactory<String>) rcf);
}
return bean;
}
};
}

@ReactivePulsarListener(consumerCustomizer = "consumerCustomizer")
public Mono<Void> listen(String ignored) {
LATCH_CONFIG_PROPS.countDown();
return Mono.empty();
}

}

}

@EnableAutoConfiguration
@SpringBootConfiguration
@Import(ConsumerCustomizerConfig.class)
Expand Down Expand Up @@ -230,4 +312,42 @@ ReactivePulsarListenerMessageConsumerBuilderCustomizer<String> consumerCustomize
record Foo(String value) {
}

static class ConsumerTrackingReactivePulsarConsumerFactory<T> implements ReactivePulsarConsumerFactory<T> {

private Map<String, ReactiveMessageConsumerSpec> topicNameToConsumerSpec = new HashMap<>();

private ReactivePulsarConsumerFactory<T> delegate;

ConsumerTrackingReactivePulsarConsumerFactory(ReactivePulsarConsumerFactory<T> delegate) {
this.delegate = delegate;
}

@Override
public ReactiveMessageConsumer<T> createConsumer(Schema<T> schema) {
var consumer = this.delegate.createConsumer(schema);
storeSpec(consumer);
return consumer;
}

@Override
public ReactiveMessageConsumer<T> createConsumer(Schema<T> schema,
List<ReactiveMessageConsumerBuilderCustomizer<T>> reactiveMessageConsumerBuilderCustomizers) {
var consumer = this.delegate.createConsumer(schema, reactiveMessageConsumerBuilderCustomizers);
storeSpec(consumer);
return consumer;
}

private void storeSpec(ReactiveMessageConsumer<T> consumer) {
var consumerSpec = (ReactiveMessageConsumerSpec) ReflectionTestUtils.getField(consumer, "consumerSpec");
var topicNamesKey = !ObjectUtils.isEmpty(consumerSpec.getTopicNames()) ? consumerSpec.getTopicNames().get(0)
: "no-topics-set";
this.topicNameToConsumerSpec.put(topicNamesKey, consumerSpec);
}

ReactiveMessageConsumerSpec getSpec(String topic) {
return this.topicNameToConsumerSpec.get(topic);
}

}

}