Open
Description
Describe the bug
My application is deployed on Azure (App Service). This application regularly sends messages to a ServiceBus queue. Most of the time it works fine, but sometimes during an hour or so, most of the sending would fail with one of the following error (see stack traces).
When it fails, the client does not retry to send the message.
Exception or Stack Trace
com.azure.messaging.servicebus.ServiceBusException: Cannot send a message when request response channel is disposed. ConnectionId:MF_bcef72_1737541515971
at com.azure.messaging.servicebus.ServiceBusSenderAsyncClient.mapError(ServiceBusSenderAsyncClient.java:934)
at reactor.core.publisher.Mono.lambda$onErrorMap$29(Mono.java:3862)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onError(TracingSubscriber.java:85)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onError(MonoFlatMap.java:180)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onError(TracingSubscriber.java:85)
at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onError(FluxHide.java:142)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onError(MonoPeekTerminal.java:258)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onError(TracingSubscriber.java:85)
at reactor.core.publisher.SerializedSubscriber.onError(SerializedSubscriber.java:124)
at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.whenError(FluxRetryWhen.java:229)
at reactor.core.publisher.FluxRetryWhen$RetryWhenOtherSubscriber.onError(FluxRetryWhen.java:279)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onError(TracingSubscriber.java:85)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onError(TracingSubscriber.java:85)
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onError(FluxContextWrite.java:121)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onError(TracingSubscriber.java:85)
at reactor.core.publisher.FluxConcatMapNoPrefetch$FluxConcatMapNoPrefetchSubscriber.maybeOnError(FluxConcatMapNoPrefetch.java:327)
at reactor.core.publisher.FluxConcatMapNoPrefetch$FluxConcatMapNoPrefetchSubscriber.onNext(FluxConcatMapNoPrefetch.java:212)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onNext(TracingSubscriber.java:68)
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
at reactor.core.publisher.SinkManyEmitterProcessor.drain(SinkManyEmitterProcessor.java:476)
at reactor.core.publisher.SinkManyEmitterProcessor$EmitterInner.drainParent(SinkManyEmitterProcessor.java:620)
at reactor.core.publisher.FluxPublish$PubSubInner.request(FluxPublish.java:874)
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.request(FluxContextWrite.java:136)
at reactor.core.publisher.FluxConcatMapNoPrefetch$FluxConcatMapNoPrefetchSubscriber.request(FluxConcatMapNoPrefetch.java:337)
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.request(FluxContextWrite.java:136)
at reactor.core.publisher.Operators$DeferredSubscription.request(Operators.java:1743)
at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.onError(FluxRetryWhen.java:196)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onError(TracingSubscriber.java:85)
at reactor.core.publisher.SerializedSubscriber.onError(SerializedSubscriber.java:124)
at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.onError(FluxTimeout.java:220)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onError(TracingSubscriber.java:85)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onError(FluxPeekFuseable.java:234)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onError(TracingSubscriber.java:85)
at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onError(FluxHide.java:142)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondError(MonoFlatMap.java:241)
at reactor.core.publisher.MonoFlatMap$FlatMapInner.onError(MonoFlatMap.java:315)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onError(TracingSubscriber.java:85)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondError(MonoFlatMap.java:241)
at reactor.core.publisher.MonoFlatMap$FlatMapInner.onError(MonoFlatMap.java:315)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onError(TracingSubscriber.java:85)
at reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:134)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onError(TracingSubscriber.java:85)
at reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:134)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onError(TracingSubscriber.java:85)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onError(MonoIgnoreThen.java:280)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onError(TracingSubscriber.java:85)
at reactor.core.publisher.MonoWhen$WhenInner.onError(MonoWhen.java:423)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onError(TracingSubscriber.java:85)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onError(FluxMapFuseable.java:142)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onError(TracingSubscriber.java:85)
at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onError(FluxHide.java:142)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondError(MonoFlatMap.java:241)
at reactor.core.publisher.MonoFlatMap$FlatMapInner.onError(MonoFlatMap.java:315)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onError(TracingSubscriber.java:85)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondError(MonoFlatMap.java:241)
at reactor.core.publisher.MonoFlatMap$FlatMapInner.onError(MonoFlatMap.java:315)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onError(TracingSubscriber.java:85)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondError(MonoFlatMap.java:241)
at reactor.core.publisher.MonoFlatMap$FlatMapInner.onError(MonoFlatMap.java:315)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onError(TracingSubscriber.java:85)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onError(Operators.java:2236)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onError(TracingSubscriber.java:85)
at reactor.core.publisher.FluxHandle$HandleSubscriber.onError(FluxHandle.java:213)
at reactor.core.publisher.Operators.error(Operators.java:198)
at reactor.core.publisher.MonoError.subscribe(MonoError.java:53)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:76)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:165)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onNext(TracingSubscriber.java:68)
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:74)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onNext(TracingSubscriber.java:68)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onNext(TracingSubscriber.java:68)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onNext(TracingSubscriber.java:68)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onNext(TracingSubscriber.java:68)
at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onNext(TracingSubscriber.java:68)
at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onNext(TracingSubscriber.java:68)
at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onNext(TracingSubscriber.java:68)
at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondComplete(MonoFlatMap.java:245)
at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:305)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onNext(TracingSubscriber.java:68)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onNext(TracingSubscriber.java:68)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onNext(TracingSubscriber.java:68)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondComplete(MonoFlatMap.java:245)
at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:305)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onNext(TracingSubscriber.java:68)
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onNext(TracingSubscriber.java:68)
at reactor.core.publisher.MonoCompletionStage$MonoCompletionStageSubscription.apply(MonoCompletionStage.java:121)
at reactor.core.publisher.MonoCompletionStage$MonoCompletionStageSubscription.apply(MonoCompletionStage.java:67)
at java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:934)
at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:911)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1773)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1760)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
Suppressed: java.lang.Exception: #block terminated with an error
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:146)
at reactor.core.publisher.Mono.block(Mono.java:1807)
at com.azure.messaging.servicebus.ServiceBusSenderClient.sendMessage(ServiceBusSenderClient.java:266)
or
java.lang.IllegalStateException: Timeout on blocking read for 245600000000 NANOSECONDS
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:129)
at reactor.core.publisher.Mono.block(Mono.java:1807)
at com.azure.messaging.servicebus.ServiceBusSenderClient.sendMessage(ServiceBusSenderClient.java:266)
Code Snippet
public AbstractBusEventProducer(ServiceBusConfiguration.ServiceBusProperties properties, TokenCredential tokenCredential) {
this.queueName = writerProperties.queueName();
this.client = new ServiceBusClientBuilder()
.fullyQualifiedNamespace(properties.fullyQualifiedNamespace())
.credential(tokenCredential)
.sender()
.queueName(writerProperties.queueName())
.buildClient();
}
// This is the method we use to send messages
public void sendMessage(T message) {
try {
client.sendMessage(new ServiceBusMessage(mapper.writeValueAsString(message)));
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
Expected behavior
I would expect the message to be sent, or at least retried in case of failure.
Setup (please complete the following information):
- Library/Libraries:
com.azure:azure-messaging-servicebus:7.17.7
- Java version: 17
- App Server/Environment: Azure App Service
- Frameworks: Spring Boot
Metadata
Metadata
Assignees
Labels
This issue points to a problem in the data-plane of the library.Issues that are reported by GitHub users external to the Azure organization.Workflow: This issue needs attention from Azure service team or SDK teamThe issue doesn't require a change to the product in order to be resolved. Most issues start as that