diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarProperties.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarProperties.java index 3972eec5029f..a2dc55c92c6f 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarProperties.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarProperties.java @@ -822,6 +822,11 @@ public static class Listener { */ private boolean observationEnabled; + /** + * Startup settings. + */ + private final Startup startup = new Startup(); + public SchemaType getSchemaType() { return this.schemaType; } @@ -846,6 +851,10 @@ public void setObservationEnabled(boolean observationEnabled) { this.observationEnabled = observationEnabled; } + public Startup getStartup() { + return this.startup; + } + } public static class Reader { @@ -876,6 +885,11 @@ public static class Reader { */ private boolean readCompacted; + /** + * Startup settings. + */ + private final Startup startup = new Startup(); + public String getName() { return this.name; } @@ -916,6 +930,53 @@ public void setReadCompacted(boolean readCompacted) { this.readCompacted = readCompacted; } + public Startup getStartup() { + return this.startup; + } + + } + + public static class Startup { + + /** + * The max time to wait for the container to start. + */ + private Duration timeout; + + /** + * The action to take when the container fails to start. + */ + private FailurePolicy onFailure; + + public Duration getTimeout() { + return this.timeout; + } + + public void setTimeout(Duration timeout) { + this.timeout = timeout; + } + + public FailurePolicy getOnFailure() { + return this.onFailure; + } + + public void setOnFailure(FailurePolicy onFailure) { + this.onFailure = onFailure; + } + + } + + public enum FailurePolicy { + + /** Stop the container and throw exception. */ + STOP, + + /** Stop the container but do not throw exception. */ + CONTINUE, + + /** Retry startup asynchronously. */ + RETRY + } public static class Template { diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapper.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapper.java index cfe34eb6f8ba..034d5968b954 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapper.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapper.java @@ -37,8 +37,10 @@ import org.apache.pulsar.client.api.ServiceUrlProvider; import org.apache.pulsar.client.impl.AutoClusterFailover.AutoClusterFailoverBuilderImpl; +import org.springframework.boot.autoconfigure.pulsar.PulsarProperties.FailurePolicy; import org.springframework.boot.context.properties.PropertyMapper; import org.springframework.boot.json.JsonWriter; +import org.springframework.pulsar.config.StartupFailurePolicy; import org.springframework.pulsar.core.PulsarTemplate; import org.springframework.pulsar.listener.PulsarContainerProperties; import org.springframework.pulsar.reader.PulsarReaderContainerProperties; @@ -198,6 +200,17 @@ private void customizePulsarContainerListenerProperties(PulsarContainerPropertie map.from(properties::getSchemaType).to(containerProperties::setSchemaType); map.from(properties::getConcurrency).to(containerProperties::setConcurrency); map.from(properties::isObservationEnabled).to(containerProperties::setObservationEnabled); + customizeListenerStartupProperties(containerProperties); + } + + private void customizeListenerStartupProperties(PulsarContainerProperties containerProperties) { + PulsarProperties.Startup properties = this.properties.getListener().getStartup(); + PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); + map.from(properties::getTimeout).to(containerProperties::setConsumerStartTimeout); + map.from(properties::getOnFailure) + .as(FailurePolicy::name) + .as(StartupFailurePolicy::valueOf) + .to(containerProperties::setStartupFailurePolicy); } void customizeReaderBuilder(ReaderBuilder readerBuilder) { @@ -214,6 +227,17 @@ void customizeReaderContainerProperties(PulsarReaderContainerProperties readerCo PulsarProperties.Reader properties = this.properties.getReader(); PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); map.from(properties::getTopics).to(readerContainerProperties::setTopics); + customizeReaderStartupProperties(readerContainerProperties); + } + + private void customizeReaderStartupProperties(PulsarReaderContainerProperties readerContainerProperties) { + PulsarProperties.Startup properties = this.properties.getReader().getStartup(); + PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); + map.from(properties::getTimeout).to(readerContainerProperties::setReaderStartTimeout); + map.from(properties::getOnFailure) + .as(FailurePolicy::name) + .as(StartupFailurePolicy::valueOf) + .to(readerContainerProperties::setStartupFailurePolicy); } private Consumer timeoutProperty(BiConsumer setter) { diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactivePropertiesMapper.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactivePropertiesMapper.java index f936a6c8afcd..8d4d6c30f997 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactivePropertiesMapper.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactivePropertiesMapper.java @@ -22,7 +22,9 @@ import org.apache.pulsar.reactive.client.api.ReactiveMessageReaderBuilder; import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderBuilder; +import org.springframework.boot.autoconfigure.pulsar.PulsarProperties.FailurePolicy; import org.springframework.boot.context.properties.PropertyMapper; +import org.springframework.pulsar.config.StartupFailurePolicy; import org.springframework.pulsar.reactive.listener.ReactivePulsarContainerProperties; /** @@ -96,6 +98,16 @@ private void customizePulsarContainerListenerProperties(ReactivePulsarContainerP PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); map.from(properties::getSchemaType).to(containerProperties::setSchemaType); map.from(properties::getConcurrency).to(containerProperties::setConcurrency); + customizeListenerStartupProperties(containerProperties); + } + + private void customizeListenerStartupProperties(ReactivePulsarContainerProperties containerProperties) { + PulsarProperties.Startup properties = this.properties.getListener().getStartup(); + PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); + map.from(properties::getOnFailure) + .as(FailurePolicy::name) + .as(StartupFailurePolicy::valueOf) + .to(containerProperties::setStartupFailurePolicy); } void customizeMessageReaderBuilder(ReactiveMessageReaderBuilder builder) { diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapperTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapperTests.java index 5e8ca006fcd2..8835bf3fecff 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapperTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapperTests.java @@ -41,9 +41,12 @@ import org.springframework.boot.autoconfigure.pulsar.PulsarProperties.Consumer; import org.springframework.boot.autoconfigure.pulsar.PulsarProperties.Failover.BackupCluster; +import org.springframework.boot.autoconfigure.pulsar.PulsarProperties.FailurePolicy; +import org.springframework.pulsar.config.StartupFailurePolicy; import org.springframework.pulsar.core.PulsarProducerFactory; import org.springframework.pulsar.core.PulsarTemplate; import org.springframework.pulsar.listener.PulsarContainerProperties; +import org.springframework.pulsar.reader.PulsarReaderContainerProperties; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; @@ -265,6 +268,8 @@ void customizeContainerProperties() { properties.getListener().setSchemaType(SchemaType.AVRO); properties.getListener().setConcurrency(10); properties.getListener().setObservationEnabled(true); + properties.getListener().getStartup().setOnFailure(FailurePolicy.RETRY); + properties.getListener().getStartup().setTimeout(Duration.ofSeconds(25)); properties.getTransaction().setEnabled(true); PulsarContainerProperties containerProperties = new PulsarContainerProperties("my-topic-pattern"); new PulsarPropertiesMapper(properties).customizeContainerProperties(containerProperties); @@ -274,6 +279,8 @@ void customizeContainerProperties() { assertThat(containerProperties.getConcurrency()).isEqualTo(10); assertThat(containerProperties.isObservationEnabled()).isTrue(); assertThat(containerProperties.transactions().isEnabled()).isTrue(); + assertThat(containerProperties.getStartupFailurePolicy()).isEqualTo(StartupFailurePolicy.RETRY); + assertThat(containerProperties.getConsumerStartTimeout()).isEqualTo(Duration.ofSeconds(25)); } @Test @@ -295,4 +302,19 @@ void customizeReaderBuilder() { then(builder).should().readCompacted(true); } + @Test + @SuppressWarnings("unchecked") + void customizeReaderContainerProperties() { + PulsarProperties properties = new PulsarProperties(); + List topics = List.of("mytopic"); + properties.getReader().setTopics(topics); + properties.getReader().getStartup().setOnFailure(FailurePolicy.CONTINUE); + properties.getReader().getStartup().setTimeout(Duration.ofSeconds(25)); + PulsarReaderContainerProperties readerContainerProperties = new PulsarReaderContainerProperties(); + new PulsarPropertiesMapper(properties).customizeReaderContainerProperties(readerContainerProperties); + assertThat(readerContainerProperties.getTopics()).isEqualTo(topics); + assertThat(readerContainerProperties.getStartupFailurePolicy()).isEqualTo(StartupFailurePolicy.CONTINUE); + assertThat(readerContainerProperties.getReaderStartTimeout()).isEqualTo(Duration.ofSeconds(25)); + } + } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesTests.java index 72fbdd0e73f4..45ed96ff288d 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesTests.java @@ -38,6 +38,7 @@ import org.springframework.boot.autoconfigure.pulsar.PulsarProperties.Defaults.TypeMapping; import org.springframework.boot.autoconfigure.pulsar.PulsarProperties.Failover; import org.springframework.boot.autoconfigure.pulsar.PulsarProperties.Failover.BackupCluster; +import org.springframework.boot.autoconfigure.pulsar.PulsarProperties.FailurePolicy; import org.springframework.boot.context.properties.bind.BindException; import org.springframework.boot.context.properties.bind.Binder; import org.springframework.boot.context.properties.source.MapConfigurationPropertySource; @@ -395,10 +396,14 @@ void bind() { map.put("spring.pulsar.listener.schema-type", "avro"); map.put("spring.pulsar.listener.concurrency", "10"); map.put("spring.pulsar.listener.observation-enabled", "true"); + map.put("spring.pulsar.listener.startup.on-failure", "retry"); + map.put("spring.pulsar.listener.startup.timeout", "2m"); PulsarProperties.Listener properties = bindProperties(map).getListener(); assertThat(properties.getSchemaType()).isEqualTo(SchemaType.AVRO); assertThat(properties.getConcurrency()).isEqualTo(10); assertThat(properties.isObservationEnabled()).isTrue(); + assertThat(properties.getStartup().getOnFailure()).isEqualTo(FailurePolicy.RETRY); + assertThat(properties.getStartup().getTimeout()).isEqualTo(Duration.ofMinutes(2)); } } @@ -414,12 +419,16 @@ void bind() { map.put("spring.pulsar.reader.subscription-name", "my-subscription"); map.put("spring.pulsar.reader.subscription-role-prefix", "sub-role"); map.put("spring.pulsar.reader.read-compacted", "true"); + map.put("spring.pulsar.reader.startup.on-failure", "continue"); + map.put("spring.pulsar.reader.startup.timeout", "23s"); PulsarProperties.Reader properties = bindProperties(map).getReader(); assertThat(properties.getName()).isEqualTo("my-reader"); assertThat(properties.getTopics()).containsExactly("my-topic"); assertThat(properties.getSubscriptionName()).isEqualTo("my-subscription"); assertThat(properties.getSubscriptionRolePrefix()).isEqualTo("sub-role"); assertThat(properties.isReadCompacted()).isTrue(); + assertThat(properties.getStartup().getOnFailure()).isEqualTo(FailurePolicy.CONTINUE); + assertThat(properties.getStartup().getTimeout()).isEqualTo(Duration.ofSeconds(23)); } } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactivePropertiesMapperTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactivePropertiesMapperTests.java index 1c45f1aa9c09..c109f8bda313 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactivePropertiesMapperTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactivePropertiesMapperTests.java @@ -37,6 +37,8 @@ import org.springframework.boot.autoconfigure.pulsar.PulsarProperties.Consumer; import org.springframework.boot.autoconfigure.pulsar.PulsarProperties.Consumer.Subscription; +import org.springframework.boot.autoconfigure.pulsar.PulsarProperties.FailurePolicy; +import org.springframework.pulsar.config.StartupFailurePolicy; import org.springframework.pulsar.reactive.listener.ReactivePulsarContainerProperties; import static org.assertj.core.api.Assertions.assertThat; @@ -123,12 +125,14 @@ void customizeContainerProperties() { properties.getConsumer().getSubscription().setName("my-subscription"); properties.getListener().setSchemaType(SchemaType.AVRO); properties.getListener().setConcurrency(10); + properties.getListener().getStartup().setOnFailure(FailurePolicy.CONTINUE); ReactivePulsarContainerProperties containerProperties = new ReactivePulsarContainerProperties<>(); new PulsarReactivePropertiesMapper(properties).customizeContainerProperties(containerProperties); assertThat(containerProperties.getSubscriptionType()).isEqualTo(SubscriptionType.Shared); assertThat(containerProperties.getSubscriptionName()).isEqualTo("my-subscription"); assertThat(containerProperties.getSchemaType()).isEqualTo(SchemaType.AVRO); assertThat(containerProperties.getConcurrency()).isEqualTo(10); + assertThat(containerProperties.getStartupFailurePolicy()).isEqualTo(StartupFailurePolicy.CONTINUE); } @Test