Skip to content

Commit c06027b

Browse files
committed
Merge pull request #42052 from vpavic
* pr/42052: Add support for configuring Pulsar client IO and listener threads Closes gh-42052
2 parents 1ba0113 + aa40c0f commit c06027b

File tree

4 files changed

+59
-0
lines changed

4 files changed

+59
-0
lines changed

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarProperties.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
* @author Chris Bono
4545
* @author Phillip Webb
4646
* @author Swamy Mavuri
47+
* @author Vedran Pavic
4748
* @since 3.2.0
4849
*/
4950
@ConfigurationProperties("spring.pulsar")
@@ -136,6 +137,11 @@ public static class Client {
136137
*/
137138
private final Authentication authentication = new Authentication();
138139

140+
/**
141+
* Thread related configuration.
142+
*/
143+
private final Threads threads = new Threads();
144+
139145
/**
140146
* Failover settings.
141147
*/
@@ -177,6 +183,10 @@ public Authentication getAuthentication() {
177183
return this.authentication;
178184
}
179185

186+
public Threads getThreads() {
187+
return this.threads;
188+
}
189+
180190
public Failover getFailover() {
181191
return this.failover;
182192
}
@@ -959,6 +969,36 @@ public void setParam(Map<String, String> param) {
959969

960970
}
961971

972+
public static class Threads {
973+
974+
/**
975+
* Number of threads to be used for handling connections to brokers.
976+
*/
977+
private Integer io;
978+
979+
/**
980+
* Number of threads to be used for message listeners.
981+
*/
982+
private Integer listener;
983+
984+
public Integer getIo() {
985+
return this.io;
986+
}
987+
988+
public void setIo(Integer io) {
989+
this.io = io;
990+
}
991+
992+
public Integer getListener() {
993+
return this.listener;
994+
}
995+
996+
public void setListener(Integer listener) {
997+
this.listener = listener;
998+
}
999+
1000+
}
1001+
9621002
public static class Failover {
9631003

9641004
/**

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapper.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
* @author Chris Bono
5151
* @author Phillip Webb
5252
* @author Swamy Mavuri
53+
* @author Vedran Pavic
5354
*/
5455
final class PulsarPropertiesMapper {
5556

@@ -68,6 +69,8 @@ void customizeClientBuilder(ClientBuilder clientBuilder, PulsarConnectionDetails
6869
map.from(properties::getConnectionTimeout).to(timeoutProperty(clientBuilder::connectionTimeout));
6970
map.from(properties::getOperationTimeout).to(timeoutProperty(clientBuilder::operationTimeout));
7071
map.from(properties::getLookupTimeout).to(timeoutProperty(clientBuilder::lookupTimeout));
72+
map.from(properties.getThreads()::getIo).to(clientBuilder::ioThreads);
73+
map.from(properties.getThreads()::getListener).to(clientBuilder::listenerThreads);
7174
map.from(this.properties.getTransaction()::isEnabled).whenTrue().to(clientBuilder::enableTransaction);
7275
customizeAuthentication(properties.getAuthentication(), clientBuilder::authentication);
7376
customizeServiceUrlProviderBuilder(clientBuilder::serviceUrl, clientBuilder::serviceUrlProvider, properties,

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapperTests.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
* @author Chris Bono
6060
* @author Phillip Webb
6161
* @author Swamy Mavuri
62+
* @author Vedran Pavic
6263
*/
6364
class PulsarPropertiesMapperTests {
6465

@@ -69,13 +70,17 @@ void customizeClientBuilderWhenHasNoAuthentication() {
6970
properties.getClient().setConnectionTimeout(Duration.ofSeconds(1));
7071
properties.getClient().setOperationTimeout(Duration.ofSeconds(2));
7172
properties.getClient().setLookupTimeout(Duration.ofSeconds(3));
73+
properties.getClient().getThreads().setIo(3);
74+
properties.getClient().getThreads().setListener(10);
7275
ClientBuilder builder = mock(ClientBuilder.class);
7376
new PulsarPropertiesMapper(properties).customizeClientBuilder(builder,
7477
new PropertiesPulsarConnectionDetails(properties));
7578
then(builder).should().serviceUrl("https://example.com");
7679
then(builder).should().connectionTimeout(1000, TimeUnit.MILLISECONDS);
7780
then(builder).should().operationTimeout(2000, TimeUnit.MILLISECONDS);
7881
then(builder).should().lookupTimeout(3000, TimeUnit.MILLISECONDS);
82+
then(builder).should().ioThreads(3);
83+
then(builder).should().listenerThreads(10);
7984
}
8085

8186
@Test

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesTests.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
* @author Soby Chacko
5555
* @author Phillip Webb
5656
* @author Swamy Mavuri
57+
* @author Vedran Pavic
5758
*/
5859
class PulsarPropertiesTests {
5960

@@ -88,6 +89,16 @@ void bindAuthentication() {
8889
assertThat(properties.getAuthentication().getParam()).containsEntry("token", "1234");
8990
}
9091

92+
@Test
93+
void bindThread() {
94+
Map<String, String> map = new HashMap<>();
95+
map.put("spring.pulsar.client.threads.io", "3");
96+
map.put("spring.pulsar.client.threads.listener", "10");
97+
PulsarProperties.Client properties = bindProperties(map).getClient();
98+
assertThat(properties.getThreads().getIo()).isEqualTo(3);
99+
assertThat(properties.getThreads().getListener()).isEqualTo(10);
100+
}
101+
91102
@Test
92103
void bindFailover() {
93104
Map<String, String> map = new HashMap<>();

0 commit comments

Comments
 (0)