Skip to content

Commit 7601951

Browse files
committed
Add support for configuring Pulsar listener container concurrency
This commit adds configuration property that allows users to configure Pulsar message listener container concurrency.
1 parent c06027b commit 7601951

File tree

7 files changed

+47
-3
lines changed

7 files changed

+47
-3
lines changed

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfiguration.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
* @author Alexander Preuß
7070
* @author Phillip Webb
7171
* @author Jonas Geiregat
72+
* @author Vedran Pavic
7273
* @since 3.2.0
7374
*/
7475
@AutoConfiguration
@@ -187,7 +188,10 @@ ConcurrentPulsarListenerContainerFactory<?> pulsarListenerContainerFactory(
187188
}
188189
pulsarTransactionManager.ifUnique(containerProperties.transactions()::setTransactionManager);
189190
this.propertiesMapper.customizeContainerProperties(containerProperties);
190-
return new ConcurrentPulsarListenerContainerFactory<>(pulsarConsumerFactory, containerProperties);
191+
ConcurrentPulsarListenerContainerFactory<Object> listenerContainerFactory = new ConcurrentPulsarListenerContainerFactory<>(
192+
pulsarConsumerFactory, containerProperties);
193+
this.propertiesMapper.customizeConcurrentPulsarListenerContainerFactory(listenerContainerFactory);
194+
return listenerContainerFactory;
191195
}
192196

193197
@Bean

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarProperties.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -811,6 +811,11 @@ public static class Listener {
811811
*/
812812
private SchemaType schemaType;
813813

814+
/**
815+
* Number of threads used by listener container.
816+
*/
817+
private Integer concurrency;
818+
814819
/**
815820
* Whether to record observations for when the Observations API is available and
816821
* the client supports it.
@@ -825,6 +830,14 @@ public void setSchemaType(SchemaType schemaType) {
825830
this.schemaType = schemaType;
826831
}
827832

833+
public Integer getConcurrency() {
834+
return this.concurrency;
835+
}
836+
837+
public void setConcurrency(Integer concurrency) {
838+
this.concurrency = concurrency;
839+
}
840+
828841
public boolean isObservationEnabled() {
829842
return this.observationEnabled;
830843
}

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapper.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939

4040
import org.springframework.boot.context.properties.PropertyMapper;
4141
import org.springframework.boot.json.JsonWriter;
42+
import org.springframework.pulsar.config.ConcurrentPulsarListenerContainerFactory;
4243
import org.springframework.pulsar.core.PulsarTemplate;
4344
import org.springframework.pulsar.listener.PulsarContainerProperties;
4445
import org.springframework.pulsar.reader.PulsarReaderContainerProperties;
@@ -198,6 +199,13 @@ private void customizePulsarContainerListenerProperties(PulsarContainerPropertie
198199
map.from(properties::isObservationEnabled).to(containerProperties::setObservationEnabled);
199200
}
200201

202+
<T> void customizeConcurrentPulsarListenerContainerFactory(
203+
ConcurrentPulsarListenerContainerFactory<T> listenerContainerFactory) {
204+
PulsarProperties.Listener properties = this.properties.getListener();
205+
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
206+
map.from(properties::getConcurrency).to(listenerContainerFactory::setConcurrency);
207+
}
208+
201209
<T> void customizeReaderBuilder(ReaderBuilder<T> readerBuilder) {
202210
PulsarProperties.Reader properties = this.properties.getReader();
203211
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactivePropertiesMapper.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2012-2023 the original author or authors.
2+
* Copyright 2012-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -31,6 +31,7 @@
3131
*
3232
* @author Chris Bono
3333
* @author Phillip Webb
34+
* @author Vedran Pavic
3435
*/
3536
final class PulsarReactivePropertiesMapper {
3637

@@ -93,6 +94,7 @@ private void customizePulsarContainerListenerProperties(ReactivePulsarContainerP
9394
PulsarProperties.Listener properties = this.properties.getListener();
9495
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
9596
map.from(properties::getSchemaType).to(containerProperties::setSchemaType);
97+
map.from(properties::getConcurrency).to(containerProperties::setConcurrency);
9698
}
9799

98100
void customizeMessageReaderBuilder(ReactiveMessageReaderBuilder<?> builder) {

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapperTests.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141

4242
import org.springframework.boot.autoconfigure.pulsar.PulsarProperties.Consumer;
4343
import org.springframework.boot.autoconfigure.pulsar.PulsarProperties.Failover.BackupCluster;
44+
import org.springframework.pulsar.config.ConcurrentPulsarListenerContainerFactory;
4445
import org.springframework.pulsar.core.PulsarProducerFactory;
4546
import org.springframework.pulsar.core.PulsarTemplate;
4647
import org.springframework.pulsar.listener.PulsarContainerProperties;
@@ -272,6 +273,17 @@ void customizeContainerProperties() {
272273
assertThat(containerProperties.transactions().isEnabled()).isTrue();
273274
}
274275

276+
@Test
277+
void customizeConcurrentPulsarListenerContainerFactory() {
278+
PulsarProperties properties = new PulsarProperties();
279+
properties.getListener().setConcurrency(10);
280+
ConcurrentPulsarListenerContainerFactory<?> listenerContainerFactory = mock(
281+
ConcurrentPulsarListenerContainerFactory.class);
282+
new PulsarPropertiesMapper(properties)
283+
.customizeConcurrentPulsarListenerContainerFactory(listenerContainerFactory);
284+
then(listenerContainerFactory).should().setConcurrency(10);
285+
}
286+
275287
@Test
276288
@SuppressWarnings("unchecked")
277289
void customizeReaderBuilder() {

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesTests.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -393,9 +393,11 @@ class ListenerProperties {
393393
void bind() {
394394
Map<String, String> map = new HashMap<>();
395395
map.put("spring.pulsar.listener.schema-type", "avro");
396+
map.put("spring.pulsar.listener.concurrency", "10");
396397
map.put("spring.pulsar.listener.observation-enabled", "true");
397398
PulsarProperties.Listener properties = bindProperties(map).getListener();
398399
assertThat(properties.getSchemaType()).isEqualTo(SchemaType.AVRO);
400+
assertThat(properties.getConcurrency()).isEqualTo(10);
399401
assertThat(properties.isObservationEnabled()).isTrue();
400402
}
401403

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactivePropertiesMapperTests.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2012-2023 the original author or authors.
2+
* Copyright 2012-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -48,6 +48,7 @@
4848
*
4949
* @author Chris Bono
5050
* @author Phillip Webb
51+
* @author Vedran Pavic
5152
*/
5253
class PulsarReactivePropertiesMapperTests {
5354

@@ -120,10 +121,12 @@ void customizeContainerProperties() {
120121
PulsarProperties properties = new PulsarProperties();
121122
properties.getConsumer().getSubscription().setType(SubscriptionType.Shared);
122123
properties.getListener().setSchemaType(SchemaType.AVRO);
124+
properties.getListener().setConcurrency(10);
123125
ReactivePulsarContainerProperties<Object> containerProperties = new ReactivePulsarContainerProperties<>();
124126
new PulsarReactivePropertiesMapper(properties).customizeContainerProperties(containerProperties);
125127
assertThat(containerProperties.getSubscriptionType()).isEqualTo(SubscriptionType.Shared);
126128
assertThat(containerProperties.getSchemaType()).isEqualTo(SchemaType.AVRO);
129+
assertThat(containerProperties.getConcurrency()).isEqualTo(10);
127130
}
128131

129132
@Test

0 commit comments

Comments
 (0)