From 58ca1b918388172c03c82c0143bd614297bc888c Mon Sep 17 00:00:00 2001 From: Vedran Pavic Date: Wed, 28 Aug 2024 23:01:23 +0200 Subject: [PATCH] Add support for configuring Pulsar client IO and listener threads This commit adds configuration properties that allow users to configure number of IO threads and listener threads used by the Pulsar client. --- .../pulsar/PulsarProperties.java | 40 +++++++++++++++++++ .../pulsar/PulsarPropertiesMapper.java | 3 ++ .../pulsar/PulsarPropertiesMapperTests.java | 5 +++ .../pulsar/PulsarPropertiesTests.java | 11 +++++ 4 files changed, 59 insertions(+) diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarProperties.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarProperties.java index 961173243002..e7cbd0340e09 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarProperties.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarProperties.java @@ -44,6 +44,7 @@ * @author Chris Bono * @author Phillip Webb * @author Swamy Mavuri + * @author Vedran Pavic * @since 3.2.0 */ @ConfigurationProperties("spring.pulsar") @@ -136,6 +137,11 @@ public static class Client { */ private final Authentication authentication = new Authentication(); + /** + * Thread related configuration. + */ + private final Threads threads = new Threads(); + /** * Failover settings. */ @@ -177,6 +183,10 @@ public Authentication getAuthentication() { return this.authentication; } + public Threads getThreads() { + return this.threads; + } + public Failover getFailover() { return this.failover; } @@ -959,6 +969,36 @@ public void setParam(Map param) { } + public static class Threads { + + /** + * Number of threads to be used for handling connections to brokers. + */ + private Integer io; + + /** + * Number of threads to be used for message listeners. + */ + private Integer listener; + + public Integer getIo() { + return this.io; + } + + public void setIo(Integer io) { + this.io = io; + } + + public Integer getListener() { + return this.listener; + } + + public void setListener(Integer listener) { + this.listener = listener; + } + + } + public static class Failover { /** diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapper.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapper.java index aa9f505b4cb2..9665c4cdb942 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapper.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapper.java @@ -50,6 +50,7 @@ * @author Chris Bono * @author Phillip Webb * @author Swamy Mavuri + * @author Vedran Pavic */ final class PulsarPropertiesMapper { @@ -68,6 +69,8 @@ void customizeClientBuilder(ClientBuilder clientBuilder, PulsarConnectionDetails map.from(properties::getConnectionTimeout).to(timeoutProperty(clientBuilder::connectionTimeout)); map.from(properties::getOperationTimeout).to(timeoutProperty(clientBuilder::operationTimeout)); map.from(properties::getLookupTimeout).to(timeoutProperty(clientBuilder::lookupTimeout)); + map.from(properties.getThreads()::getIo).to(clientBuilder::ioThreads); + map.from(properties.getThreads()::getListener).to(clientBuilder::listenerThreads); map.from(this.properties.getTransaction()::isEnabled).whenTrue().to(clientBuilder::enableTransaction); customizeAuthentication(properties.getAuthentication(), clientBuilder::authentication); customizeServiceUrlProviderBuilder(clientBuilder::serviceUrl, clientBuilder::serviceUrlProvider, properties, diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapperTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapperTests.java index f23584aab0ab..dbacef33f9c3 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapperTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapperTests.java @@ -59,6 +59,7 @@ * @author Chris Bono * @author Phillip Webb * @author Swamy Mavuri + * @author Vedran Pavic */ class PulsarPropertiesMapperTests { @@ -69,6 +70,8 @@ void customizeClientBuilderWhenHasNoAuthentication() { properties.getClient().setConnectionTimeout(Duration.ofSeconds(1)); properties.getClient().setOperationTimeout(Duration.ofSeconds(2)); properties.getClient().setLookupTimeout(Duration.ofSeconds(3)); + properties.getClient().getThreads().setIo(3); + properties.getClient().getThreads().setListener(10); ClientBuilder builder = mock(ClientBuilder.class); new PulsarPropertiesMapper(properties).customizeClientBuilder(builder, new PropertiesPulsarConnectionDetails(properties)); @@ -76,6 +79,8 @@ void customizeClientBuilderWhenHasNoAuthentication() { then(builder).should().connectionTimeout(1000, TimeUnit.MILLISECONDS); then(builder).should().operationTimeout(2000, TimeUnit.MILLISECONDS); then(builder).should().lookupTimeout(3000, TimeUnit.MILLISECONDS); + then(builder).should().ioThreads(3); + then(builder).should().listenerThreads(10); } @Test diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesTests.java index 0c173b3567cb..6ef42ef83452 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesTests.java @@ -54,6 +54,7 @@ * @author Soby Chacko * @author Phillip Webb * @author Swamy Mavuri + * @author Vedran Pavic */ class PulsarPropertiesTests { @@ -88,6 +89,16 @@ void bindAuthentication() { assertThat(properties.getAuthentication().getParam()).containsEntry("token", "1234"); } + @Test + void bindThread() { + Map map = new HashMap<>(); + map.put("spring.pulsar.client.threads.io", "3"); + map.put("spring.pulsar.client.threads.listener", "10"); + PulsarProperties.Client properties = bindProperties(map).getClient(); + assertThat(properties.getThreads().getIo()).isEqualTo(3); + assertThat(properties.getThreads().getListener()).isEqualTo(10); + } + @Test void bindFailover() { Map map = new HashMap<>();