Skip to content

Commit 01e745e

Browse files
committed
Add support for configuring Pulsar listener container concurrency
This commit adds configuration property that allows users to configure Pulsar message listener container concurrency.
1 parent ad730a6 commit 01e745e

File tree

5 files changed

+44
-1
lines changed

5 files changed

+44
-1
lines changed

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfiguration.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
* @author Alexander Preuß
7070
* @author Phillip Webb
7171
* @author Jonas Geiregat
72+
* @author Vedran Pavic
7273
* @since 3.2.0
7374
*/
7475
@AutoConfiguration
@@ -187,7 +188,10 @@ ConcurrentPulsarListenerContainerFactory<?> pulsarListenerContainerFactory(
187188
}
188189
pulsarTransactionManager.ifUnique(containerProperties.transactions()::setTransactionManager);
189190
this.propertiesMapper.customizeContainerProperties(containerProperties);
190-
return new ConcurrentPulsarListenerContainerFactory<>(pulsarConsumerFactory, containerProperties);
191+
ConcurrentPulsarListenerContainerFactory<Object> listenerContainerFactory = new ConcurrentPulsarListenerContainerFactory<>(
192+
pulsarConsumerFactory, containerProperties);
193+
this.propertiesMapper.customizeConcurrentPulsarListenerContainerFactory(listenerContainerFactory);
194+
return listenerContainerFactory;
191195
}
192196

193197
@Bean

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarProperties.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
* @author Chris Bono
4545
* @author Phillip Webb
4646
* @author Swamy Mavuri
47+
* @author Vedran Pavic
4748
* @since 3.2.0
4849
*/
4950
@ConfigurationProperties("spring.pulsar")
@@ -801,6 +802,11 @@ public static class Listener {
801802
*/
802803
private SchemaType schemaType;
803804

805+
/**
806+
* Number of threads used by listener container.
807+
*/
808+
private Integer concurrency;
809+
804810
/**
805811
* Whether to record observations for when the Observations API is available and
806812
* the client supports it.
@@ -815,6 +821,14 @@ public void setSchemaType(SchemaType schemaType) {
815821
this.schemaType = schemaType;
816822
}
817823

824+
public Integer getConcurrency() {
825+
return this.concurrency;
826+
}
827+
828+
public void setConcurrency(Integer concurrency) {
829+
this.concurrency = concurrency;
830+
}
831+
818832
public boolean isObservationEnabled() {
819833
return this.observationEnabled;
820834
}

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapper.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939

4040
import org.springframework.boot.context.properties.PropertyMapper;
4141
import org.springframework.boot.json.JsonWriter;
42+
import org.springframework.pulsar.config.ConcurrentPulsarListenerContainerFactory;
4243
import org.springframework.pulsar.core.PulsarTemplate;
4344
import org.springframework.pulsar.listener.PulsarContainerProperties;
4445
import org.springframework.pulsar.reader.PulsarReaderContainerProperties;
@@ -50,6 +51,7 @@
5051
* @author Chris Bono
5152
* @author Phillip Webb
5253
* @author Swamy Mavuri
54+
* @author Vedran Pavic
5355
*/
5456
final class PulsarPropertiesMapper {
5557

@@ -195,6 +197,13 @@ private void customizePulsarContainerListenerProperties(PulsarContainerPropertie
195197
map.from(properties::isObservationEnabled).to(containerProperties::setObservationEnabled);
196198
}
197199

200+
<T> void customizeConcurrentPulsarListenerContainerFactory(
201+
ConcurrentPulsarListenerContainerFactory<T> listenerContainerFactory) {
202+
PulsarProperties.Listener properties = this.properties.getListener();
203+
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
204+
map.from(properties::getConcurrency).to(listenerContainerFactory::setConcurrency);
205+
}
206+
198207
<T> void customizeReaderBuilder(ReaderBuilder<T> readerBuilder) {
199208
PulsarProperties.Reader properties = this.properties.getReader();
200209
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapperTests.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141

4242
import org.springframework.boot.autoconfigure.pulsar.PulsarProperties.Consumer;
4343
import org.springframework.boot.autoconfigure.pulsar.PulsarProperties.Failover.BackupCluster;
44+
import org.springframework.pulsar.config.ConcurrentPulsarListenerContainerFactory;
4445
import org.springframework.pulsar.core.PulsarProducerFactory;
4546
import org.springframework.pulsar.core.PulsarTemplate;
4647
import org.springframework.pulsar.listener.PulsarContainerProperties;
@@ -59,6 +60,7 @@
5960
* @author Chris Bono
6061
* @author Phillip Webb
6162
* @author Swamy Mavuri
63+
* @author Vedran Pavic
6264
*/
6365
class PulsarPropertiesMapperTests {
6466

@@ -267,6 +269,17 @@ void customizeContainerProperties() {
267269
assertThat(containerProperties.transactions().isEnabled()).isTrue();
268270
}
269271

272+
@Test
273+
void customizeConcurrentPulsarListenerContainerFactory() {
274+
PulsarProperties properties = new PulsarProperties();
275+
properties.getListener().setConcurrency(10);
276+
ConcurrentPulsarListenerContainerFactory<?> listenerContainerFactory = mock(
277+
ConcurrentPulsarListenerContainerFactory.class);
278+
new PulsarPropertiesMapper(properties)
279+
.customizeConcurrentPulsarListenerContainerFactory(listenerContainerFactory);
280+
then(listenerContainerFactory).should().setConcurrency(10);
281+
}
282+
270283
@Test
271284
@SuppressWarnings("unchecked")
272285
void customizeReaderBuilder() {

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesTests.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
* @author Soby Chacko
5555
* @author Phillip Webb
5656
* @author Swamy Mavuri
57+
* @author Vedran Pavic
5758
*/
5859
class PulsarPropertiesTests {
5960

@@ -382,9 +383,11 @@ class ListenerProperties {
382383
void bind() {
383384
Map<String, String> map = new HashMap<>();
384385
map.put("spring.pulsar.listener.schema-type", "avro");
386+
map.put("spring.pulsar.listener.concurrency", "10");
385387
map.put("spring.pulsar.listener.observation-enabled", "true");
386388
PulsarProperties.Listener properties = bindProperties(map).getListener();
387389
assertThat(properties.getSchemaType()).isEqualTo(SchemaType.AVRO);
390+
assertThat(properties.getConcurrency()).isEqualTo(10);
388391
assertThat(properties.isObservationEnabled()).isTrue();
389392
}
390393

0 commit comments

Comments
 (0)