Skip to content

Commit aa40c0f

Browse files
vpavicphilwebb
authored andcommitted
Add support for configuring Pulsar client IO and listener threads
Add configuration properties that allow users to configure number of IO threads and listener threads used by the Pulsar client. See gh-42052
1 parent 1ba0113 commit aa40c0f

File tree

4 files changed

+59
-0
lines changed

4 files changed

+59
-0
lines changed

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarProperties.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
* @author Chris Bono
4545
* @author Phillip Webb
4646
* @author Swamy Mavuri
47+
* @author Vedran Pavic
4748
* @since 3.2.0
4849
*/
4950
@ConfigurationProperties("spring.pulsar")
@@ -136,6 +137,11 @@ public static class Client {
136137
*/
137138
private final Authentication authentication = new Authentication();
138139

140+
/**
141+
* Thread related configuration.
142+
*/
143+
private final Threads threads = new Threads();
144+
139145
/**
140146
* Failover settings.
141147
*/
@@ -177,6 +183,10 @@ public Authentication getAuthentication() {
177183
return this.authentication;
178184
}
179185

186+
public Threads getThreads() {
187+
return this.threads;
188+
}
189+
180190
public Failover getFailover() {
181191
return this.failover;
182192
}
@@ -959,6 +969,36 @@ public void setParam(Map<String, String> param) {
959969

960970
}
961971

972+
public static class Threads {
973+
974+
/**
975+
* Number of threads to be used for handling connections to brokers.
976+
*/
977+
private Integer io;
978+
979+
/**
980+
* Number of threads to be used for message listeners.
981+
*/
982+
private Integer listener;
983+
984+
public Integer getIo() {
985+
return this.io;
986+
}
987+
988+
public void setIo(Integer io) {
989+
this.io = io;
990+
}
991+
992+
public Integer getListener() {
993+
return this.listener;
994+
}
995+
996+
public void setListener(Integer listener) {
997+
this.listener = listener;
998+
}
999+
1000+
}
1001+
9621002
public static class Failover {
9631003

9641004
/**

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapper.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
* @author Chris Bono
5151
* @author Phillip Webb
5252
* @author Swamy Mavuri
53+
* @author Vedran Pavic
5354
*/
5455
final class PulsarPropertiesMapper {
5556

@@ -68,6 +69,8 @@ void customizeClientBuilder(ClientBuilder clientBuilder, PulsarConnectionDetails
6869
map.from(properties::getConnectionTimeout).to(timeoutProperty(clientBuilder::connectionTimeout));
6970
map.from(properties::getOperationTimeout).to(timeoutProperty(clientBuilder::operationTimeout));
7071
map.from(properties::getLookupTimeout).to(timeoutProperty(clientBuilder::lookupTimeout));
72+
map.from(properties.getThreads()::getIo).to(clientBuilder::ioThreads);
73+
map.from(properties.getThreads()::getListener).to(clientBuilder::listenerThreads);
7174
map.from(this.properties.getTransaction()::isEnabled).whenTrue().to(clientBuilder::enableTransaction);
7275
customizeAuthentication(properties.getAuthentication(), clientBuilder::authentication);
7376
customizeServiceUrlProviderBuilder(clientBuilder::serviceUrl, clientBuilder::serviceUrlProvider, properties,

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapperTests.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
* @author Chris Bono
6060
* @author Phillip Webb
6161
* @author Swamy Mavuri
62+
* @author Vedran Pavic
6263
*/
6364
class PulsarPropertiesMapperTests {
6465

@@ -69,13 +70,17 @@ void customizeClientBuilderWhenHasNoAuthentication() {
6970
properties.getClient().setConnectionTimeout(Duration.ofSeconds(1));
7071
properties.getClient().setOperationTimeout(Duration.ofSeconds(2));
7172
properties.getClient().setLookupTimeout(Duration.ofSeconds(3));
73+
properties.getClient().getThreads().setIo(3);
74+
properties.getClient().getThreads().setListener(10);
7275
ClientBuilder builder = mock(ClientBuilder.class);
7376
new PulsarPropertiesMapper(properties).customizeClientBuilder(builder,
7477
new PropertiesPulsarConnectionDetails(properties));
7578
then(builder).should().serviceUrl("https://example.com");
7679
then(builder).should().connectionTimeout(1000, TimeUnit.MILLISECONDS);
7780
then(builder).should().operationTimeout(2000, TimeUnit.MILLISECONDS);
7881
then(builder).should().lookupTimeout(3000, TimeUnit.MILLISECONDS);
82+
then(builder).should().ioThreads(3);
83+
then(builder).should().listenerThreads(10);
7984
}
8085

8186
@Test

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesTests.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
* @author Soby Chacko
5555
* @author Phillip Webb
5656
* @author Swamy Mavuri
57+
* @author Vedran Pavic
5758
*/
5859
class PulsarPropertiesTests {
5960

@@ -88,6 +89,16 @@ void bindAuthentication() {
8889
assertThat(properties.getAuthentication().getParam()).containsEntry("token", "1234");
8990
}
9091

92+
@Test
93+
void bindThread() {
94+
Map<String, String> map = new HashMap<>();
95+
map.put("spring.pulsar.client.threads.io", "3");
96+
map.put("spring.pulsar.client.threads.listener", "10");
97+
PulsarProperties.Client properties = bindProperties(map).getClient();
98+
assertThat(properties.getThreads().getIo()).isEqualTo(3);
99+
assertThat(properties.getThreads().getListener()).isEqualTo(10);
100+
}
101+
91102
@Test
92103
void bindFailover() {
93104
Map<String, String> map = new HashMap<>();

0 commit comments

Comments
 (0)