Skip to content

Commit 35016e6

Browse files
committed
Add support for configuring Pulsar client IO and listener threads
This commit adds configuration properties that allow users to configure number of IO threads and listener threads used by the Pulsar client.
1 parent 019dd67 commit 35016e6

File tree

4 files changed

+40
-0
lines changed

4 files changed

+40
-0
lines changed

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarProperties.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
* @author Chris Bono
4545
* @author Phillip Webb
4646
* @author Swamy Mavuri
47+
* @author Vedran Pavic
4748
* @since 3.2.0
4849
*/
4950
@ConfigurationProperties("spring.pulsar")
@@ -131,6 +132,16 @@ public static class Client {
131132
*/
132133
private Duration connectionTimeout = Duration.ofSeconds(10);
133134

135+
/**
136+
* Number of threads to be used for handling connections to brokers.
137+
*/
138+
private Integer ioThreads;
139+
140+
/**
141+
* Number of threads to be used for message listeners.
142+
*/
143+
private Integer listenerThreads;
144+
134145
/**
135146
* Authentication settings.
136147
*/
@@ -173,6 +184,22 @@ public void setConnectionTimeout(Duration connectionTimeout) {
173184
this.connectionTimeout = connectionTimeout;
174185
}
175186

187+
public Integer getIoThreads() {
188+
return this.ioThreads;
189+
}
190+
191+
public void setIoThreads(Integer ioThreads) {
192+
this.ioThreads = ioThreads;
193+
}
194+
195+
public Integer getListenerThreads() {
196+
return this.listenerThreads;
197+
}
198+
199+
public void setListenerThreads(Integer listenerThreads) {
200+
this.listenerThreads = listenerThreads;
201+
}
202+
176203
public Authentication getAuthentication() {
177204
return this.authentication;
178205
}

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapper.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
* @author Chris Bono
5151
* @author Phillip Webb
5252
* @author Swamy Mavuri
53+
* @author Vedran Pavic
5354
*/
5455
final class PulsarPropertiesMapper {
5556

@@ -68,6 +69,8 @@ void customizeClientBuilder(ClientBuilder clientBuilder, PulsarConnectionDetails
6869
map.from(properties::getConnectionTimeout).to(timeoutProperty(clientBuilder::connectionTimeout));
6970
map.from(properties::getOperationTimeout).to(timeoutProperty(clientBuilder::operationTimeout));
7071
map.from(properties::getLookupTimeout).to(timeoutProperty(clientBuilder::lookupTimeout));
72+
map.from(properties::getIoThreads).to(clientBuilder::ioThreads);
73+
map.from(properties::getListenerThreads).to(clientBuilder::listenerThreads);
7174
map.from(this.properties.getTransaction()::isEnabled).whenTrue().to(clientBuilder::enableTransaction);
7275
customizeAuthentication(properties.getAuthentication(), clientBuilder::authentication);
7376
customizeServiceUrlProviderBuilder(clientBuilder::serviceUrl, clientBuilder::serviceUrlProvider, properties,

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapperTests.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
* @author Chris Bono
6060
* @author Phillip Webb
6161
* @author Swamy Mavuri
62+
* @author Vedran Pavic
6263
*/
6364
class PulsarPropertiesMapperTests {
6465

@@ -69,13 +70,17 @@ void customizeClientBuilderWhenHasNoAuthentication() {
6970
properties.getClient().setConnectionTimeout(Duration.ofSeconds(1));
7071
properties.getClient().setOperationTimeout(Duration.ofSeconds(2));
7172
properties.getClient().setLookupTimeout(Duration.ofSeconds(3));
73+
properties.getClient().setIoThreads(3);
74+
properties.getClient().setListenerThreads(10);
7275
ClientBuilder builder = mock(ClientBuilder.class);
7376
new PulsarPropertiesMapper(properties).customizeClientBuilder(builder,
7477
new PropertiesPulsarConnectionDetails(properties));
7578
then(builder).should().serviceUrl("https://example.com");
7679
then(builder).should().connectionTimeout(1000, TimeUnit.MILLISECONDS);
7780
then(builder).should().operationTimeout(2000, TimeUnit.MILLISECONDS);
7881
then(builder).should().lookupTimeout(3000, TimeUnit.MILLISECONDS);
82+
then(builder).should().ioThreads(3);
83+
then(builder).should().listenerThreads(10);
7984
}
8085

8186
@Test

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesTests.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
* @author Soby Chacko
5555
* @author Phillip Webb
5656
* @author Swamy Mavuri
57+
* @author Vedran Pavic
5758
*/
5859
class PulsarPropertiesTests {
5960

@@ -71,11 +72,15 @@ void bind() {
7172
map.put("spring.pulsar.client.operation-timeout", "1s");
7273
map.put("spring.pulsar.client.lookup-timeout", "2s");
7374
map.put("spring.pulsar.client.connection-timeout", "12s");
75+
map.put("spring.pulsar.client.io-threads", "3");
76+
map.put("spring.pulsar.client.listener-threads", "10");
7477
PulsarProperties.Client properties = bindProperties(map).getClient();
7578
assertThat(properties.getServiceUrl()).isEqualTo("my-service-url");
7679
assertThat(properties.getOperationTimeout()).isEqualTo(Duration.ofMillis(1000));
7780
assertThat(properties.getLookupTimeout()).isEqualTo(Duration.ofMillis(2000));
7881
assertThat(properties.getConnectionTimeout()).isEqualTo(Duration.ofMillis(12000));
82+
assertThat(properties.getIoThreads()).isEqualTo(3);
83+
assertThat(properties.getListenerThreads()).isEqualTo(10);
7984
}
8085

8186
@Test

0 commit comments

Comments
 (0)