Skip to content

Commit 24040e2

Browse files
committed
Merge pull request #42182 from onobc
* pr/42182: Polish "Add Pulsar container factory customizer infrastructure" Add Pulsar container factory customizer infrastructure Closes gh-42182
2 parents 920e3cc + 5b25a37 commit 24040e2

File tree

10 files changed

+395
-12
lines changed

10 files changed

+395
-12
lines changed

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfiguration.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ private void applyConsumerBuilderCustomizers(List<ConsumerBuilderCustomizer<?>>
178178
ConcurrentPulsarListenerContainerFactory<?> pulsarListenerContainerFactory(
179179
PulsarConsumerFactory<Object> pulsarConsumerFactory, SchemaResolver schemaResolver,
180180
TopicResolver topicResolver, ObjectProvider<PulsarAwareTransactionManager> pulsarTransactionManager,
181-
Environment environment) {
181+
Environment environment, PulsarContainerFactoryCustomizers containerFactoryCustomizers) {
182182
PulsarContainerProperties containerProperties = new PulsarContainerProperties();
183183
containerProperties.setSchemaResolver(schemaResolver);
184184
containerProperties.setTopicResolver(topicResolver);
@@ -187,7 +187,10 @@ ConcurrentPulsarListenerContainerFactory<?> pulsarListenerContainerFactory(
187187
}
188188
pulsarTransactionManager.ifUnique(containerProperties.transactions()::setTransactionManager);
189189
this.propertiesMapper.customizeContainerProperties(containerProperties);
190-
return new ConcurrentPulsarListenerContainerFactory<>(pulsarConsumerFactory, containerProperties);
190+
ConcurrentPulsarListenerContainerFactory<?> containerFactory = new ConcurrentPulsarListenerContainerFactory<>(
191+
pulsarConsumerFactory, containerProperties);
192+
containerFactoryCustomizers.customize(containerFactory);
193+
return containerFactory;
191194
}
192195

193196
@Bean
@@ -215,14 +218,18 @@ private void applyReaderBuilderCustomizers(List<ReaderBuilderCustomizer<?>> cust
215218
@Bean
216219
@ConditionalOnMissingBean(name = "pulsarReaderContainerFactory")
217220
DefaultPulsarReaderContainerFactory<?> pulsarReaderContainerFactory(PulsarReaderFactory<?> pulsarReaderFactory,
218-
SchemaResolver schemaResolver, Environment environment) {
221+
SchemaResolver schemaResolver, Environment environment,
222+
PulsarContainerFactoryCustomizers containerFactoryCustomizers) {
219223
PulsarReaderContainerProperties readerContainerProperties = new PulsarReaderContainerProperties();
220224
readerContainerProperties.setSchemaResolver(schemaResolver);
221225
if (Threading.VIRTUAL.isActive(environment)) {
222226
readerContainerProperties.setReaderTaskExecutor(new VirtualThreadTaskExecutor("pulsar-reader-"));
223227
}
224228
this.propertiesMapper.customizeReaderContainerProperties(readerContainerProperties);
225-
return new DefaultPulsarReaderContainerFactory<>(pulsarReaderFactory, readerContainerProperties);
229+
DefaultPulsarReaderContainerFactory<?> containerFactory = new DefaultPulsarReaderContainerFactory<>(
230+
pulsarReaderFactory, readerContainerProperties);
231+
containerFactoryCustomizers.customize(containerFactory);
232+
return containerFactory;
226233
}
227234

228235
@Configuration(proxyBeanMethods = false)

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarConfiguration.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,4 +188,11 @@ PulsarTopicBuilder pulsarTopicBuilder() {
188188
this.properties.getDefaults().getTopic().getNamespace());
189189
}
190190

191+
@Bean
192+
@ConditionalOnMissingBean
193+
PulsarContainerFactoryCustomizers pulsarContainerFactoryCustomizers(
194+
ObjectProvider<PulsarContainerFactoryCustomizer<?>> customizers) {
195+
return new PulsarContainerFactoryCustomizers(customizers.orderedStream().toList());
196+
}
197+
191198
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright 2012-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.boot.autoconfigure.pulsar;
18+
19+
import org.springframework.pulsar.config.PulsarContainerFactory;
20+
21+
/**
22+
* Callback interface that can be implemented by beans wishing to customize a
23+
* {@link PulsarContainerFactory} before it is fully initialized, in particular to tune
24+
* its configuration.
25+
*
26+
* @param <T> the type of the {@link PulsarContainerFactory}
27+
* @author Chris Bono
28+
* @since 3.4.0
29+
*/
30+
@FunctionalInterface
31+
public interface PulsarContainerFactoryCustomizer<T extends PulsarContainerFactory<?, ?>> {
32+
33+
/**
34+
* Customize the container factory.
35+
* @param containerFactory the {@code PulsarContainerFactory} to customize
36+
*/
37+
void customize(T containerFactory);
38+
39+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright 2012-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.boot.autoconfigure.pulsar;
18+
19+
import java.util.ArrayList;
20+
import java.util.Collections;
21+
import java.util.List;
22+
23+
import org.springframework.boot.util.LambdaSafe;
24+
import org.springframework.pulsar.config.PulsarContainerFactory;
25+
import org.springframework.pulsar.core.PulsarConsumerFactory;
26+
27+
/**
28+
* Invokes the available {@link PulsarContainerFactoryCustomizer} instances in the context
29+
* for a given {@link PulsarConsumerFactory}.
30+
*
31+
* @author Chris Bono
32+
* @since 3.4.0
33+
*/
34+
public class PulsarContainerFactoryCustomizers {
35+
36+
private final List<PulsarContainerFactoryCustomizer<?>> customizers;
37+
38+
public PulsarContainerFactoryCustomizers(List<? extends PulsarContainerFactoryCustomizer<?>> customizers) {
39+
this.customizers = (customizers != null) ? new ArrayList<>(customizers) : Collections.emptyList();
40+
}
41+
42+
/**
43+
* Customize the specified {@link PulsarContainerFactory}. Locates all
44+
* {@link PulsarContainerFactoryCustomizer} beans able to handle the specified
45+
* instance and invoke {@link PulsarContainerFactoryCustomizer#customize} on them.
46+
* @param <T> the type of container factory
47+
* @param containerFactory the container factory to customize
48+
* @return the customized container factory
49+
*/
50+
@SuppressWarnings("unchecked")
51+
public <T extends PulsarContainerFactory<?, ?>> T customize(T containerFactory) {
52+
LambdaSafe.callbacks(PulsarContainerFactoryCustomizer.class, this.customizers, containerFactory)
53+
.withLogger(PulsarContainerFactoryCustomizers.class)
54+
.invoke((customizer) -> customizer.customize(containerFactory));
55+
return containerFactory;
56+
}
57+
58+
}

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfiguration.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -164,12 +164,15 @@ private void applyMessageConsumerBuilderCustomizers(List<ReactiveMessageConsumer
164164
@ConditionalOnMissingBean(name = "reactivePulsarListenerContainerFactory")
165165
DefaultReactivePulsarListenerContainerFactory<?> reactivePulsarListenerContainerFactory(
166166
ReactivePulsarConsumerFactory<Object> reactivePulsarConsumerFactory, SchemaResolver schemaResolver,
167-
TopicResolver topicResolver) {
167+
TopicResolver topicResolver, PulsarContainerFactoryCustomizers containerFactoryCustomizers) {
168168
ReactivePulsarContainerProperties<Object> containerProperties = new ReactivePulsarContainerProperties<>();
169169
containerProperties.setSchemaResolver(schemaResolver);
170170
containerProperties.setTopicResolver(topicResolver);
171171
this.propertiesMapper.customizeContainerProperties(containerProperties);
172-
return new DefaultReactivePulsarListenerContainerFactory<>(reactivePulsarConsumerFactory, containerProperties);
172+
DefaultReactivePulsarListenerContainerFactory<?> containerFactory = new DefaultReactivePulsarListenerContainerFactory<>(
173+
reactivePulsarConsumerFactory, containerProperties);
174+
containerFactoryCustomizers.customize(containerFactory);
175+
return containerFactory;
173176
}
174177

175178
@Bean

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfigurationTests.java

Lines changed: 80 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
import org.springframework.pulsar.core.SchemaResolver;
7373
import org.springframework.pulsar.core.TopicResolver;
7474
import org.springframework.pulsar.listener.PulsarContainerProperties.TransactionSettings;
75+
import org.springframework.pulsar.reactive.config.DefaultReactivePulsarListenerContainerFactory;
7576
import org.springframework.pulsar.transaction.PulsarAwareTransactionManager;
7677
import org.springframework.test.util.ReflectionTestUtils;
7778

@@ -585,6 +586,45 @@ void whenTransactionEnabledFalseListenerContainerShouldNotUseTransactions() {
585586
});
586587
}
587588

589+
@Test
590+
void whenHasUserDefinedCustomizersAppliesInCorrectOrder() {
591+
this.contextRunner.withUserConfiguration(ListenerContainerFactoryCustomizersConfig.class)
592+
.run((context) -> assertThat(context).getBean(ConcurrentPulsarListenerContainerFactory.class)
593+
.hasFieldOrPropertyWithValue("containerProperties.subscriptionName", ":bar:foo"));
594+
}
595+
596+
@TestConfiguration(proxyBeanMethods = false)
597+
static class ListenerContainerFactoryCustomizersConfig {
598+
599+
@Bean
600+
@Order(50)
601+
PulsarContainerFactoryCustomizer<DefaultReactivePulsarListenerContainerFactory<?>> customizerIgnored() {
602+
return (containerFactory) -> {
603+
throw new IllegalStateException("should-not-have-matched");
604+
};
605+
}
606+
607+
@Bean
608+
@Order(200)
609+
PulsarContainerFactoryCustomizer<ConcurrentPulsarListenerContainerFactory<?>> customizerFoo() {
610+
return (containerFactory) -> appendToSubscriptionName(containerFactory, ":foo");
611+
}
612+
613+
@Bean
614+
@Order(100)
615+
PulsarContainerFactoryCustomizer<ConcurrentPulsarListenerContainerFactory<?>> customizerBar() {
616+
return (containerFactory) -> appendToSubscriptionName(containerFactory, ":bar");
617+
}
618+
619+
private void appendToSubscriptionName(ConcurrentPulsarListenerContainerFactory<?> containerFactory,
620+
String valueToAppend) {
621+
String subscriptionName = containerFactory.getContainerProperties().getSubscriptionName();
622+
String updatedValue = (subscriptionName != null) ? subscriptionName + valueToAppend : valueToAppend;
623+
containerFactory.getContainerProperties().setSubscriptionName(updatedValue);
624+
}
625+
626+
}
627+
588628
}
589629

590630
@Nested
@@ -617,7 +657,7 @@ void hasNoTopicBuilderWhenTopicDefaultsAreDisabled() {
617657
}
618658

619659
@Test
620-
<T> void whenHasUserDefinedCustomizersAppliesInCorrectOrder() {
660+
<T> void whenHasUserDefinedReaderBuilderCustomizersAppliesInCorrectOrder() {
621661
this.contextRunner.withPropertyValues("spring.pulsar.reader.name=fromPropsCustomizer")
622662
.withUserConfiguration(ReaderBuilderCustomizersConfig.class)
623663
.run((context) -> {
@@ -654,6 +694,13 @@ void whenVirtualThreadsAreEnabledOnJava20AndEarlierReaderShouldNotUseVirtualThre
654694
});
655695
}
656696

697+
@Test
698+
void whenHasUserDefinedFactoryCustomizersAppliesInCorrectOrder() {
699+
this.contextRunner.withUserConfiguration(ReaderContainerFactoryCustomizersConfig.class)
700+
.run((context) -> assertThat(context).getBean(DefaultPulsarReaderContainerFactory.class)
701+
.hasFieldOrPropertyWithValue("containerProperties.readerListener", ":bar:foo"));
702+
}
703+
657704
@TestConfiguration(proxyBeanMethods = false)
658705
static class ReaderBuilderCustomizersConfig {
659706

@@ -671,6 +718,38 @@ ReaderBuilderCustomizer<?> customizerBar() {
671718

672719
}
673720

721+
@TestConfiguration(proxyBeanMethods = false)
722+
static class ReaderContainerFactoryCustomizersConfig {
723+
724+
@Bean
725+
@Order(50)
726+
PulsarContainerFactoryCustomizer<DefaultReactivePulsarListenerContainerFactory<?>> customizerIgnored() {
727+
return (containerFactory) -> {
728+
throw new IllegalStateException("should-not-have-matched");
729+
};
730+
}
731+
732+
@Bean
733+
@Order(200)
734+
PulsarContainerFactoryCustomizer<DefaultPulsarReaderContainerFactory<?>> customizerFoo() {
735+
return (containerFactory) -> appendToReaderListener(containerFactory, ":foo");
736+
}
737+
738+
@Bean
739+
@Order(100)
740+
PulsarContainerFactoryCustomizer<DefaultPulsarReaderContainerFactory<?>> customizerBar() {
741+
return (containerFactory) -> appendToReaderListener(containerFactory, ":bar");
742+
}
743+
744+
private void appendToReaderListener(DefaultPulsarReaderContainerFactory<?> containerFactory,
745+
String valueToAppend) {
746+
Object readerListener = containerFactory.getContainerProperties().getReaderListener();
747+
String updatedValue = (readerListener != null) ? readerListener + valueToAppend : valueToAppend;
748+
containerFactory.getContainerProperties().setReaderListener(updatedValue);
749+
}
750+
751+
}
752+
674753
}
675754

676755
@Nested

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarConfigurationTests.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,15 @@ void whenHasUserDefinedConnectionDetailsBeanDoesNotAutoConfigureBean() {
8686
.isSameAs(customConnectionDetails));
8787
}
8888

89+
@Test
90+
void whenHasUserDefinedContainerFactoryCustomizersBeanDoesNotAutoConfigureBean() {
91+
PulsarContainerFactoryCustomizers customizers = mock(PulsarContainerFactoryCustomizers.class);
92+
this.contextRunner
93+
.withBean("customContainerFactoryCustomizers", PulsarContainerFactoryCustomizers.class, () -> customizers)
94+
.run((context) -> assertThat(context).getBean(PulsarContainerFactoryCustomizers.class)
95+
.isSameAs(customizers));
96+
}
97+
8998
@Nested
9099
class ClientTests {
91100

0 commit comments

Comments
 (0)