@@ -197,13 +197,8 @@ public async Task<bool> DisconnectAsync(DisconnectOptions? options = null)
197
197
}
198
198
199
199
/// <inheritdoc />
200
- public async Task < PublishResult > PublishAsync ( MQTT5PublishMessage message )
200
+ public async Task < PublishResult > PublishAsync ( MQTT5PublishMessage message , CancellationToken cancellationToken = default )
201
201
{
202
- if ( ! this . IsConnected ( ) )
203
- {
204
- throw new HiveMQttClientException ( "PublishAsync: Client is not connected. Check client.IsConnected() before calling PublishAsync." ) ;
205
- }
206
-
207
202
message . Validate ( ) ;
208
203
209
204
var packetIdentifier = this . GeneratePacketIdentifier ( ) ;
@@ -227,17 +222,12 @@ public async Task<PublishResult> PublishAsync(MQTT5PublishMessage message)
227
222
{
228
223
// Wait on the QoS 1 handshake
229
224
pubAckPacket = await publishPacket . OnPublishQoS1CompleteTCS . Task
230
- . WaitAsync ( TimeSpan . FromMilliseconds ( this . Options . ResponseTimeoutInMs ) )
225
+ . WaitAsync ( cancellationToken )
231
226
. ConfigureAwait ( false ) ;
232
227
}
233
- catch ( TimeoutException )
228
+ catch ( OperationCanceledException )
234
229
{
235
- Logger . Error ( "PublishAsync: QoS 1 timeout. No PUBACK response received in time." ) ;
236
- var disconnectOptions = new DisconnectOptions
237
- {
238
- ReasonCode = DisconnectReasonCode . UnspecifiedError ,
239
- } ;
240
- await this . DisconnectAsync ( disconnectOptions ) . ConfigureAwait ( false ) ;
230
+ Logger . Debug ( "PublishAsync: Operation cancelled by user." ) ;
241
231
throw ;
242
232
}
243
233
@@ -255,18 +245,12 @@ public async Task<PublishResult> PublishAsync(MQTT5PublishMessage message)
255
245
{
256
246
// Wait on the QoS 2 handshake
257
247
packetList = await publishPacket . OnPublishQoS2CompleteTCS . Task
258
- . WaitAsync ( TimeSpan . FromMilliseconds ( this . Options . ResponseTimeoutInMs ) )
248
+ . WaitAsync ( cancellationToken )
259
249
. ConfigureAwait ( false ) ;
260
250
}
261
- catch ( TimeoutException )
251
+ catch ( OperationCanceledException )
262
252
{
263
- Logger . Error ( "PublishAsync: QoS 2 timeout. No response received in time." ) ;
264
-
265
- var disconnectOptions = new DisconnectOptions
266
- {
267
- ReasonCode = DisconnectReasonCode . UnspecifiedError ,
268
- } ;
269
- await this . DisconnectAsync ( disconnectOptions ) . ConfigureAwait ( false ) ;
253
+ Logger . Debug ( "PublishAsync: Operation cancelled by user." ) ;
270
254
throw ;
271
255
}
272
256
@@ -278,11 +262,6 @@ public async Task<PublishResult> PublishAsync(MQTT5PublishMessage message)
278
262
}
279
263
}
280
264
281
- if ( publishResult is null )
282
- {
283
- throw new HiveMQttClientException ( "PublishAsync: QoS 2 complete but no PubRec packet received." ) ;
284
- }
285
-
286
265
return publishResult ;
287
266
}
288
267
@@ -330,11 +309,6 @@ public async Task<SubscribeResult> SubscribeAsync(string topic, QualityOfService
330
309
/// <inheritdoc />
331
310
public async Task < SubscribeResult > SubscribeAsync ( SubscribeOptions options )
332
311
{
333
- if ( ! this . IsConnected ( ) )
334
- {
335
- throw new HiveMQttClientException ( "SubscribeAsync: Client is not connected. Check client.IsConnected() before calling SubscribeAsync." ) ;
336
- }
337
-
338
312
// Fire the corresponding event
339
313
this . BeforeSubscribeEventLauncher ( options ) ;
340
314
@@ -445,11 +419,6 @@ public async Task<UnsubscribeResult> UnsubscribeAsync(List<Subscription> subscri
445
419
446
420
public async Task < UnsubscribeResult > UnsubscribeAsync ( UnsubscribeOptions unsubOptions )
447
421
{
448
- if ( ! this . IsConnected ( ) )
449
- {
450
- throw new HiveMQttClientException ( "UnsubscribeAsync: Client is not connected. Check client.IsConnected() before calling UnsubscribeAsync." ) ;
451
- }
452
-
453
422
// Fire the corresponding event
454
423
this . BeforeUnsubscribeEventLauncher ( unsubOptions . Subscriptions ) ;
455
424
@@ -513,6 +482,12 @@ public async Task<UnsubscribeResult> UnsubscribeAsync(UnsubscribeOptions unsubOp
513
482
/// <param name="clean">Indicates whether the disconnect was intended or not.</param>
514
483
private async Task < bool > HandleDisconnectionAsync ( bool clean = true )
515
484
{
485
+ if ( this . ConnectState == ConnectState . Disconnected )
486
+ {
487
+ Logger . Trace ( "HandleDisconnection: Already disconnected." ) ;
488
+ return false ;
489
+ }
490
+
516
491
Logger . Debug ( $ "HandleDisconnection: Handling disconnection. clean={ clean } .") ;
517
492
518
493
// Cancel all background tasks and close the socket
@@ -537,6 +512,9 @@ private async Task<bool> HandleDisconnectionAsync(bool clean = true)
537
512
this . OutgoingPublishQueue . Clear ( ) ;
538
513
}
539
514
515
+ // Delay for 1 seconds before launching the AfterDisconnect event
516
+ await Task . Delay ( 1000 ) . ConfigureAwait ( false ) ;
517
+
540
518
// Fire the corresponding after event
541
519
this . AfterDisconnectEventLauncher ( clean ) ;
542
520
return true ;
0 commit comments