Skip to content

[BUG] Message sending in ServiceBus queue occasionally fails with error "Cannot send a message when request response channel is disposed" or "Timeout on blocking read" #43877

Open
@s-vivien

Description

@s-vivien

Describe the bug
My application is deployed on Azure (App Service). This application regularly sends messages to a ServiceBus queue. Most of the time it works fine, but sometimes during an hour or so, most of the sending would fail with one of the following error (see stack traces).
When it fails, the client does not retry to send the message.

Exception or Stack Trace

com.azure.messaging.servicebus.ServiceBusException: Cannot send a message when request response channel is disposed. ConnectionId:MF_bcef72_1737541515971
	at com.azure.messaging.servicebus.ServiceBusSenderAsyncClient.mapError(ServiceBusSenderAsyncClient.java:934)
	at reactor.core.publisher.Mono.lambda$onErrorMap$29(Mono.java:3862)
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94)
	at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onError(TracingSubscriber.java:85)
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onError(MonoFlatMap.java:180)
	at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onError(TracingSubscriber.java:85)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onError(FluxHide.java:142)
	at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onError(MonoPeekTerminal.java:258)
	at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onError(TracingSubscriber.java:85)
	at reactor.core.publisher.SerializedSubscriber.onError(SerializedSubscriber.java:124)
	at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.whenError(FluxRetryWhen.java:229)
	at reactor.core.publisher.FluxRetryWhen$RetryWhenOtherSubscriber.onError(FluxRetryWhen.java:279)
	at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onError(TracingSubscriber.java:85)
	at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onError(TracingSubscriber.java:85)
	at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onError(FluxContextWrite.java:121)
	at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onError(TracingSubscriber.java:85)
	at reactor.core.publisher.FluxConcatMapNoPrefetch$FluxConcatMapNoPrefetchSubscriber.maybeOnError(FluxConcatMapNoPrefetch.java:327)
	at reactor.core.publisher.FluxConcatMapNoPrefetch$FluxConcatMapNoPrefetchSubscriber.onNext(FluxConcatMapNoPrefetch.java:212)
	at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onNext(TracingSubscriber.java:68)
	at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
	at reactor.core.publisher.SinkManyEmitterProcessor.drain(SinkManyEmitterProcessor.java:476)
	at reactor.core.publisher.SinkManyEmitterProcessor$EmitterInner.drainParent(SinkManyEmitterProcessor.java:620)
	at reactor.core.publisher.FluxPublish$PubSubInner.request(FluxPublish.java:874)
	at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.request(FluxContextWrite.java:136)
	at reactor.core.publisher.FluxConcatMapNoPrefetch$FluxConcatMapNoPrefetchSubscriber.request(FluxConcatMapNoPrefetch.java:337)
	at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.request(FluxContextWrite.java:136)
	at reactor.core.publisher.Operators$DeferredSubscription.request(Operators.java:1743)
	at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.onError(FluxRetryWhen.java:196)
	at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onError(TracingSubscriber.java:85)
	at reactor.core.publisher.SerializedSubscriber.onError(SerializedSubscriber.java:124)
	at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.onError(FluxTimeout.java:220)
	at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onError(TracingSubscriber.java:85)
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onError(FluxPeekFuseable.java:234)
	at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onError(TracingSubscriber.java:85)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onError(FluxHide.java:142)
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondError(MonoFlatMap.java:241)
	at reactor.core.publisher.MonoFlatMap$FlatMapInner.onError(MonoFlatMap.java:315)
	at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onError(TracingSubscriber.java:85)
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondError(MonoFlatMap.java:241)
	at reactor.core.publisher.MonoFlatMap$FlatMapInner.onError(MonoFlatMap.java:315)
	at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onError(TracingSubscriber.java:85)
	at reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:134)
	at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onError(TracingSubscriber.java:85)
	at reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:134)
	at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onError(TracingSubscriber.java:85)
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onError(MonoIgnoreThen.java:280)
	at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onError(TracingSubscriber.java:85)
	at reactor.core.publisher.MonoWhen$WhenInner.onError(MonoWhen.java:423)
	at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onError(TracingSubscriber.java:85)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onError(FluxMapFuseable.java:142)
	at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onError(TracingSubscriber.java:85)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onError(FluxHide.java:142)
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondError(MonoFlatMap.java:241)
	at reactor.core.publisher.MonoFlatMap$FlatMapInner.onError(MonoFlatMap.java:315)
	at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onError(TracingSubscriber.java:85)
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondError(MonoFlatMap.java:241)
	at reactor.core.publisher.MonoFlatMap$FlatMapInner.onError(MonoFlatMap.java:315)
	at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onError(TracingSubscriber.java:85)
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondError(MonoFlatMap.java:241)
	at reactor.core.publisher.MonoFlatMap$FlatMapInner.onError(MonoFlatMap.java:315)
	at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onError(TracingSubscriber.java:85)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onError(Operators.java:2236)
	at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onError(TracingSubscriber.java:85)
	at reactor.core.publisher.FluxHandle$HandleSubscriber.onError(FluxHandle.java:213)
	at reactor.core.publisher.Operators.error(Operators.java:198)
	at reactor.core.publisher.MonoError.subscribe(MonoError.java:53)
	at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:76)
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:165)
	at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onNext(TracingSubscriber.java:68)
	at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:74)
	at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onNext(TracingSubscriber.java:68)
	at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onNext(TracingSubscriber.java:68)
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
	at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onNext(TracingSubscriber.java:68)
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
	at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onNext(TracingSubscriber.java:68)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
	at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180)
	at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onNext(TracingSubscriber.java:68)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
	at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onNext(TracingSubscriber.java:68)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
	at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180)
	at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onNext(TracingSubscriber.java:68)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondComplete(MonoFlatMap.java:245)
	at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:305)
	at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onNext(TracingSubscriber.java:68)
	at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122)
	at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onNext(TracingSubscriber.java:68)
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
	at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onNext(TracingSubscriber.java:68)
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondComplete(MonoFlatMap.java:245)
	at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:305)
	at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onNext(TracingSubscriber.java:68)
	at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onNext(TracingSubscriber.java:68)
	at reactor.core.publisher.MonoCompletionStage$MonoCompletionStageSubscription.apply(MonoCompletionStage.java:121)
	at reactor.core.publisher.MonoCompletionStage$MonoCompletionStageSubscription.apply(MonoCompletionStage.java:67)
	at java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:934)
	at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:911)
	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
	at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1773)
	at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1760)
	at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
	at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
	at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
	at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
	at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
	Suppressed: java.lang.Exception: #block terminated with an error
		at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:146)
		at reactor.core.publisher.Mono.block(Mono.java:1807)
		at com.azure.messaging.servicebus.ServiceBusSenderClient.sendMessage(ServiceBusSenderClient.java:266)

or

java.lang.IllegalStateException: Timeout on blocking read for 245600000000 NANOSECONDS
	at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:129)
	at reactor.core.publisher.Mono.block(Mono.java:1807)
	at com.azure.messaging.servicebus.ServiceBusSenderClient.sendMessage(ServiceBusSenderClient.java:266)

Code Snippet

public AbstractBusEventProducer(ServiceBusConfiguration.ServiceBusProperties properties, TokenCredential tokenCredential) {
    this.queueName = writerProperties.queueName();
    this.client = new ServiceBusClientBuilder()
            .fullyQualifiedNamespace(properties.fullyQualifiedNamespace())
            .credential(tokenCredential)
            .sender()
            .queueName(writerProperties.queueName())
            .buildClient();
}

// This is the method we use to send messages
public void sendMessage(T message) {
    try {
        client.sendMessage(new ServiceBusMessage(mapper.writeValueAsString(message)));
    } catch (JsonProcessingException e) {
        throw new RuntimeException(e);
    }
}

Expected behavior
I would expect the message to be sent, or at least retried in case of failure.

Setup (please complete the following information):

  • Library/Libraries: com.azure:azure-messaging-servicebus:7.17.7
  • Java version: 17
  • App Server/Environment: Azure App Service
  • Frameworks: Spring Boot

Metadata

Metadata

Assignees

Labels

ClientThis issue points to a problem in the data-plane of the library.Service Buscustomer-reportedIssues that are reported by GitHub users external to the Azure organization.needs-team-attentionWorkflow: This issue needs attention from Azure service team or SDK teamquestionThe issue doesn't require a change to the product in order to be resolved. Most issues start as that

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions