Skip to content

Commit 3ce88ff

Browse files
committed
Improve Pulsar listener container concurrency configuration
This is a follow-up to gh-42062 that utilizes newly introduced `concurrency` property in `PulsarContainerProperties` to simplify auto-configuration support for Pulsar listener container concurrency. See: spring-projects/spring-pulsar#820
1 parent 5bd2b19 commit 3ce88ff

File tree

3 files changed

+4
-24
lines changed

3 files changed

+4
-24
lines changed

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfiguration.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@
6969
* @author Alexander Preuß
7070
* @author Phillip Webb
7171
* @author Jonas Geiregat
72-
* @author Vedran Pavic
7372
* @since 3.2.0
7473
*/
7574
@AutoConfiguration
@@ -188,10 +187,7 @@ ConcurrentPulsarListenerContainerFactory<?> pulsarListenerContainerFactory(
188187
}
189188
pulsarTransactionManager.ifUnique(containerProperties.transactions()::setTransactionManager);
190189
this.propertiesMapper.customizeContainerProperties(containerProperties);
191-
ConcurrentPulsarListenerContainerFactory<Object> listenerContainerFactory = new ConcurrentPulsarListenerContainerFactory<>(
192-
pulsarConsumerFactory, containerProperties);
193-
this.propertiesMapper.customizeConcurrentPulsarListenerContainerFactory(listenerContainerFactory);
194-
return listenerContainerFactory;
190+
return new ConcurrentPulsarListenerContainerFactory<>(pulsarConsumerFactory, containerProperties);
195191
}
196192

197193
@Bean

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapper.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -196,16 +196,10 @@ private void customizePulsarContainerListenerProperties(PulsarContainerPropertie
196196
PulsarProperties.Listener properties = this.properties.getListener();
197197
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
198198
map.from(properties::getSchemaType).to(containerProperties::setSchemaType);
199+
map.from(properties::getConcurrency).to(containerProperties::setConcurrency);
199200
map.from(properties::isObservationEnabled).to(containerProperties::setObservationEnabled);
200201
}
201202

202-
<T> void customizeConcurrentPulsarListenerContainerFactory(
203-
ConcurrentPulsarListenerContainerFactory<T> listenerContainerFactory) {
204-
PulsarProperties.Listener properties = this.properties.getListener();
205-
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
206-
map.from(properties::getConcurrency).to(listenerContainerFactory::setConcurrency);
207-
}
208-
209203
<T> void customizeReaderBuilder(ReaderBuilder<T> readerBuilder) {
210204
PulsarProperties.Reader properties = this.properties.getReader();
211205
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapperTests.java

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141

4242
import org.springframework.boot.autoconfigure.pulsar.PulsarProperties.Consumer;
4343
import org.springframework.boot.autoconfigure.pulsar.PulsarProperties.Failover.BackupCluster;
44-
import org.springframework.pulsar.config.ConcurrentPulsarListenerContainerFactory;
4544
import org.springframework.pulsar.core.PulsarProducerFactory;
4645
import org.springframework.pulsar.core.PulsarTemplate;
4746
import org.springframework.pulsar.listener.PulsarContainerProperties;
@@ -263,27 +262,18 @@ void customizeContainerProperties() {
263262
PulsarProperties properties = new PulsarProperties();
264263
properties.getConsumer().getSubscription().setType(SubscriptionType.Shared);
265264
properties.getListener().setSchemaType(SchemaType.AVRO);
265+
properties.getListener().setConcurrency(10);
266266
properties.getListener().setObservationEnabled(true);
267267
properties.getTransaction().setEnabled(true);
268268
PulsarContainerProperties containerProperties = new PulsarContainerProperties("my-topic-pattern");
269269
new PulsarPropertiesMapper(properties).customizeContainerProperties(containerProperties);
270270
assertThat(containerProperties.getSubscriptionType()).isEqualTo(SubscriptionType.Shared);
271271
assertThat(containerProperties.getSchemaType()).isEqualTo(SchemaType.AVRO);
272+
assertThat(containerProperties.getConcurrency()).isEqualTo(10);
272273
assertThat(containerProperties.isObservationEnabled()).isTrue();
273274
assertThat(containerProperties.transactions().isEnabled()).isTrue();
274275
}
275276

276-
@Test
277-
void customizeConcurrentPulsarListenerContainerFactory() {
278-
PulsarProperties properties = new PulsarProperties();
279-
properties.getListener().setConcurrency(10);
280-
ConcurrentPulsarListenerContainerFactory<?> listenerContainerFactory = mock(
281-
ConcurrentPulsarListenerContainerFactory.class);
282-
new PulsarPropertiesMapper(properties)
283-
.customizeConcurrentPulsarListenerContainerFactory(listenerContainerFactory);
284-
then(listenerContainerFactory).should().setConcurrency(10);
285-
}
286-
287277
@Test
288278
@SuppressWarnings("unchecked")
289279
void customizeReaderBuilder() {

0 commit comments

Comments
 (0)