Skip to content

Commit 3570c69

Browse files
committed
Merge pull request #42062 from vpavic
* pr/42062: Add support for configuring Pulsar listener container concurrency Closes gh-42062
2 parents 0cb311b + ddf7af7 commit 3570c69

File tree

7 files changed

+47
-3
lines changed

7 files changed

+47
-3
lines changed

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfiguration.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
* @author Alexander Preuß
7070
* @author Phillip Webb
7171
* @author Jonas Geiregat
72+
* @author Vedran Pavic
7273
* @since 3.2.0
7374
*/
7475
@AutoConfiguration
@@ -187,7 +188,10 @@ ConcurrentPulsarListenerContainerFactory<?> pulsarListenerContainerFactory(
187188
}
188189
pulsarTransactionManager.ifUnique(containerProperties.transactions()::setTransactionManager);
189190
this.propertiesMapper.customizeContainerProperties(containerProperties);
190-
return new ConcurrentPulsarListenerContainerFactory<>(pulsarConsumerFactory, containerProperties);
191+
ConcurrentPulsarListenerContainerFactory<Object> listenerContainerFactory = new ConcurrentPulsarListenerContainerFactory<>(
192+
pulsarConsumerFactory, containerProperties);
193+
this.propertiesMapper.customizeConcurrentPulsarListenerContainerFactory(listenerContainerFactory);
194+
return listenerContainerFactory;
191195
}
192196

193197
@Bean

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarProperties.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -811,6 +811,11 @@ public static class Listener {
811811
*/
812812
private SchemaType schemaType;
813813

814+
/**
815+
* Number of threads used by listener container.
816+
*/
817+
private Integer concurrency;
818+
814819
/**
815820
* Whether to record observations for when the Observations API is available and
816821
* the client supports it.
@@ -825,6 +830,14 @@ public void setSchemaType(SchemaType schemaType) {
825830
this.schemaType = schemaType;
826831
}
827832

833+
public Integer getConcurrency() {
834+
return this.concurrency;
835+
}
836+
837+
public void setConcurrency(Integer concurrency) {
838+
this.concurrency = concurrency;
839+
}
840+
828841
public boolean isObservationEnabled() {
829842
return this.observationEnabled;
830843
}

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapper.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939

4040
import org.springframework.boot.context.properties.PropertyMapper;
4141
import org.springframework.boot.json.JsonWriter;
42+
import org.springframework.pulsar.config.ConcurrentPulsarListenerContainerFactory;
4243
import org.springframework.pulsar.core.PulsarTemplate;
4344
import org.springframework.pulsar.listener.PulsarContainerProperties;
4445
import org.springframework.pulsar.reader.PulsarReaderContainerProperties;
@@ -198,6 +199,13 @@ private void customizePulsarContainerListenerProperties(PulsarContainerPropertie
198199
map.from(properties::isObservationEnabled).to(containerProperties::setObservationEnabled);
199200
}
200201

202+
<T> void customizeConcurrentPulsarListenerContainerFactory(
203+
ConcurrentPulsarListenerContainerFactory<T> listenerContainerFactory) {
204+
PulsarProperties.Listener properties = this.properties.getListener();
205+
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
206+
map.from(properties::getConcurrency).to(listenerContainerFactory::setConcurrency);
207+
}
208+
201209
<T> void customizeReaderBuilder(ReaderBuilder<T> readerBuilder) {
202210
PulsarProperties.Reader properties = this.properties.getReader();
203211
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactivePropertiesMapper.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2012-2023 the original author or authors.
2+
* Copyright 2012-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -31,6 +31,7 @@
3131
*
3232
* @author Chris Bono
3333
* @author Phillip Webb
34+
* @author Vedran Pavic
3435
*/
3536
final class PulsarReactivePropertiesMapper {
3637

@@ -93,6 +94,7 @@ private void customizePulsarContainerListenerProperties(ReactivePulsarContainerP
9394
PulsarProperties.Listener properties = this.properties.getListener();
9495
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
9596
map.from(properties::getSchemaType).to(containerProperties::setSchemaType);
97+
map.from(properties::getConcurrency).to(containerProperties::setConcurrency);
9698
}
9799

98100
void customizeMessageReaderBuilder(ReactiveMessageReaderBuilder<?> builder) {

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapperTests.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141

4242
import org.springframework.boot.autoconfigure.pulsar.PulsarProperties.Consumer;
4343
import org.springframework.boot.autoconfigure.pulsar.PulsarProperties.Failover.BackupCluster;
44+
import org.springframework.pulsar.config.ConcurrentPulsarListenerContainerFactory;
4445
import org.springframework.pulsar.core.PulsarProducerFactory;
4546
import org.springframework.pulsar.core.PulsarTemplate;
4647
import org.springframework.pulsar.listener.PulsarContainerProperties;
@@ -272,6 +273,17 @@ void customizeContainerProperties() {
272273
assertThat(containerProperties.transactions().isEnabled()).isTrue();
273274
}
274275

276+
@Test
277+
void customizeConcurrentPulsarListenerContainerFactory() {
278+
PulsarProperties properties = new PulsarProperties();
279+
properties.getListener().setConcurrency(10);
280+
ConcurrentPulsarListenerContainerFactory<?> listenerContainerFactory = mock(
281+
ConcurrentPulsarListenerContainerFactory.class);
282+
new PulsarPropertiesMapper(properties)
283+
.customizeConcurrentPulsarListenerContainerFactory(listenerContainerFactory);
284+
then(listenerContainerFactory).should().setConcurrency(10);
285+
}
286+
275287
@Test
276288
@SuppressWarnings("unchecked")
277289
void customizeReaderBuilder() {

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesTests.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -393,9 +393,11 @@ class ListenerProperties {
393393
void bind() {
394394
Map<String, String> map = new HashMap<>();
395395
map.put("spring.pulsar.listener.schema-type", "avro");
396+
map.put("spring.pulsar.listener.concurrency", "10");
396397
map.put("spring.pulsar.listener.observation-enabled", "true");
397398
PulsarProperties.Listener properties = bindProperties(map).getListener();
398399
assertThat(properties.getSchemaType()).isEqualTo(SchemaType.AVRO);
400+
assertThat(properties.getConcurrency()).isEqualTo(10);
399401
assertThat(properties.isObservationEnabled()).isTrue();
400402
}
401403

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactivePropertiesMapperTests.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2012-2023 the original author or authors.
2+
* Copyright 2012-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -48,6 +48,7 @@
4848
*
4949
* @author Chris Bono
5050
* @author Phillip Webb
51+
* @author Vedran Pavic
5152
*/
5253
class PulsarReactivePropertiesMapperTests {
5354

@@ -120,10 +121,12 @@ void customizeContainerProperties() {
120121
PulsarProperties properties = new PulsarProperties();
121122
properties.getConsumer().getSubscription().setType(SubscriptionType.Shared);
122123
properties.getListener().setSchemaType(SchemaType.AVRO);
124+
properties.getListener().setConcurrency(10);
123125
ReactivePulsarContainerProperties<Object> containerProperties = new ReactivePulsarContainerProperties<>();
124126
new PulsarReactivePropertiesMapper(properties).customizeContainerProperties(containerProperties);
125127
assertThat(containerProperties.getSubscriptionType()).isEqualTo(SubscriptionType.Shared);
126128
assertThat(containerProperties.getSchemaType()).isEqualTo(SchemaType.AVRO);
129+
assertThat(containerProperties.getConcurrency()).isEqualTo(10);
127130
}
128131

129132
@Test

0 commit comments

Comments
 (0)