Skip to content

Commit 4cc7e20

Browse files
authored
New BoundedDictionary (#173)
1 parent 930ccdc commit 4cc7e20

File tree

6 files changed

+550
-111
lines changed

6 files changed

+550
-111
lines changed

Source/HiveMQtt/Client/HiveMQClient.cs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,12 @@ public HiveMQClient(HiveMQClientOptions? options = null)
5757

5858
this.Options = options;
5959
this.cancellationTokenSource = new CancellationTokenSource();
60-
this.ClientReceiveSemaphore = new SemaphoreSlim(this.Options.ClientReceiveMaximum);
60+
61+
// In-flight transaction queues
62+
this.IPubTransactionQueue = new BoundedDictionaryX<int, List<ControlPacket>>(this.Options.ClientReceiveMaximum);
6163

6264
// Set protocol default until ConnAck is received
63-
this.BrokerReceiveSemaphore = new SemaphoreSlim(65535);
65+
this.OPubTransactionQueue = new BoundedDictionaryX<int, List<ControlPacket>>(65535);
6466
}
6567

6668
/// <inheritdoc />
@@ -140,9 +142,15 @@ public async Task<ConnectResult> ConnectAsync()
140142
/// <inheritdoc />
141143
public async Task<bool> DisconnectAsync(DisconnectOptions? options = null)
142144
{
145+
if (this.ConnectState == ConnectState.Disconnecting)
146+
{
147+
// We're already disconnecting in another task.
148+
return true;
149+
}
150+
143151
if (this.ConnectState != ConnectState.Connected)
144152
{
145-
Logger.Warn("DisconnectAsync called but this client is not connected. State is ${this.ConnectState}.");
153+
Logger.Warn($"DisconnectAsync called but this client is not connected. State is {this.ConnectState}.");
146154
return false;
147155
}
148156

@@ -510,10 +518,7 @@ private async Task<bool> HandleDisconnectionAsync(bool clean = true)
510518
// Cancel all background tasks and close the socket
511519
this.ConnectState = ConnectState.Disconnected;
512520

513-
// Don't use CancelAsync here to maintain backwards compatibility
514-
// with >=.net6.0. CancelAsync was introduced in .net8.0
515-
this.cancellationTokenSource.Cancel();
516-
this.CloseSocket();
521+
await this.CloseSocketAsync().ConfigureAwait(false);
517522

518523
if (clean)
519524
{

Source/HiveMQtt/Client/HiveMQClientSocket.cs

Lines changed: 65 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -254,25 +254,17 @@ private async Task<bool> CreateTLSConnectionAsync(Stream stream)
254254
}
255255
}
256256

257-
internal bool CloseSocket(bool? shutdownPipeline = true)
257+
internal async Task<bool> CloseSocketAsync(bool? shutdownPipeline = true)
258258
{
259-
// Cancel the background traffic processing tasks
260-
this.cancellationTokenSource.Cancel();
261-
262-
// Reset the tasks
263-
this.ConnectionPublishWriterTask = null;
264-
this.ConnectionWriterTask = null;
265-
this.ConnectionReaderTask = null;
266-
this.ReceivedPacketsHandlerTask = null;
267-
this.ConnectionMonitorTask = null;
259+
await this.CancelBackgroundTasksAsync().ConfigureAwait(false);
268260

269261
if (shutdownPipeline == true)
270262
{
271263
if (this.Reader != null && this.Writer != null)
272264
{
273265
// Dispose of the PipeReader and PipeWriter
274-
this.Reader.Complete();
275-
this.Writer.Complete();
266+
await this.Reader.CompleteAsync().ConfigureAwait(false);
267+
await this.Writer.CompleteAsync().ConfigureAwait(false);
276268

277269
// Shutdown the pipeline
278270
this.Reader = null;
@@ -284,7 +276,7 @@ internal bool CloseSocket(bool? shutdownPipeline = true)
284276
{
285277
// Dispose of the Stream
286278
this.Stream.Close();
287-
this.Stream.Dispose();
279+
await this.Stream.DisposeAsync().ConfigureAwait(false);
288280
this.Stream = null;
289281
}
290282

@@ -300,4 +292,64 @@ internal bool CloseSocket(bool? shutdownPipeline = true)
300292

301293
return true;
302294
}
295+
296+
/// <summary>
297+
/// Cancel all background tasks.
298+
/// </summary>
299+
/// <returns>A task representing the asynchronous operation.</returns>
300+
internal async Task CancelBackgroundTasksAsync()
301+
{
302+
// Don't use CancelAsync here to maintain backwards compatibility
303+
// with >=.net6.0. CancelAsync was introduced in .net8.0
304+
this.cancellationTokenSource.Cancel();
305+
306+
// Delay for a short period to allow the tasks to cancel
307+
await Task.Delay(1000).ConfigureAwait(false);
308+
309+
// Reset the tasks
310+
if (this.ConnectionPublishWriterTask is not null && this.ConnectionPublishWriterTask.IsCompleted)
311+
{
312+
this.ConnectionPublishWriterTask = null;
313+
}
314+
else
315+
{
316+
Logger.Error("ConnectionPublishWriterTask did not complete");
317+
}
318+
319+
if (this.ConnectionWriterTask is not null && this.ConnectionWriterTask.IsCompleted)
320+
{
321+
this.ConnectionWriterTask = null;
322+
}
323+
else
324+
{
325+
Logger.Error("ConnectionWriterTask did not complete");
326+
}
327+
328+
if (this.ConnectionReaderTask is not null && this.ConnectionReaderTask.IsCompleted)
329+
{
330+
this.ConnectionReaderTask = null;
331+
}
332+
else
333+
{
334+
Logger.Error("ConnectionReaderTask did not complete");
335+
}
336+
337+
if (this.ReceivedPacketsHandlerTask is not null && this.ReceivedPacketsHandlerTask.IsCompleted)
338+
{
339+
this.ReceivedPacketsHandlerTask = null;
340+
}
341+
else
342+
{
343+
Logger.Error("ReceivedPacketsHandlerTask did not complete");
344+
}
345+
346+
if (this.ConnectionMonitorTask is not null && this.ConnectionMonitorTask.IsCompleted)
347+
{
348+
this.ConnectionMonitorTask = null;
349+
}
350+
else
351+
{
352+
Logger.Error("ConnectionMonitorTask did not complete");
353+
}
354+
}
303355
}

0 commit comments

Comments
 (0)