Skip to content

Commit e9c57eb

Browse files
authored
Fix Connection Check in Publish Writer Task (#165)
1 parent 635ef37 commit e9c57eb

File tree

10 files changed

+160
-23
lines changed

10 files changed

+160
-23
lines changed

Source/HiveMQtt/Client/HiveMQClientEvents.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,7 @@ protected virtual void OnMessageReceivedEventLauncher(PublishPacket packet)
254254
{
255255
if (t.IsFaulted)
256256
{
257-
Logger.Error("per-subscription MessageReceivedEventLauncher exception: " + t.Exception.Message);
257+
Logger.Error($"per-subscription MessageReceivedEventLauncher faulted ({packet.Message.Topic}): " + t.Exception.Message);
258258
}
259259
},
260260
TaskScheduler.Default);

Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,10 +118,10 @@ private Task<bool> ConnectionPublishWriterAsync(CancellationToken cancellationTo
118118
break;
119119
}
120120

121-
while (this.ConnectState == ConnectState.Disconnected)
121+
while (this.ConnectState != ConnectState.Connected)
122122
{
123123
Logger.Trace($"{this.Options.ClientId}-(PW)- Not connected. Waiting for connect...");
124-
await Task.Delay(2000).ConfigureAwait(false);
124+
await Task.Delay(1000).ConfigureAwait(false);
125125
continue;
126126
}
127127

@@ -188,6 +188,8 @@ private Task<bool> ConnectionWriterAsync(CancellationToken cancellationToken) =>
188188
break;
189189
}
190190

191+
// We allow this task to run in Connecting, Connected, and Disconnecting states
192+
// because it is the one that has to send the CONNECT and DISCONNECT packets.
191193
while (this.ConnectState == ConnectState.Disconnected)
192194
{
193195
Logger.Trace($"{this.Options.ClientId}-(W)- Not connected. Waiting for connect...");

Tests/.editorconfig

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,4 +39,11 @@ dotnet_diagnostic.CA1062.severity = none
3939
# https://docs.microsoft.com/en-us/visualstudio/code-quality/ca1707
4040
dotnet_diagnostic.CA1707.severity = none
4141

42-
dotnet_diagnostic.CS1591.severity = none
42+
dotnet_diagnostic.CS1591.severity = none
43+
44+
# VSTHRD101: Avoid unsupported async delegates
45+
dotnet_diagnostic.VSTHRD101.severity = silent
46+
47+
48+
# VSTHRD101: Avoid unsupported async delegates
49+
dotnet_diagnostic.VSTHRD101.severity = suggestion

Tests/HiveMQtt.Test/HiveMQClient/ClientTest.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,8 @@ public async Task ClientStateAsync()
109109
// Publish QoS 0 (At most once delivery)
110110
_ = await client.PublishAsync("tests/ClientTest", new string("♚ ♛ ♜ ♝ ♞ ♟ ♔ ♕ ♖ ♗ ♘ ♙")).ConfigureAwait(false);
111111

112+
client.OnMessageReceived += (sender, args) => { };
113+
112114
var subResult = await client.SubscribeAsync(
113115
"tests/ClientTest",
114116
MQTT5.Types.QualityOfService.AtLeastOnceDelivery).ConfigureAwait(false);

Tests/HiveMQtt.Test/HiveMQClient/LWTTest.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ public async Task Last_Will_With_Properties_Async()
2424
// Set the event handler for the message received event
2525
listenerClient.OnMessageReceived += (sender, args) =>
2626
{
27-
messagesReceived++;
27+
Interlocked.Increment(ref messagesReceived);
2828
Assert.Equal(QualityOfService.AtLeastOnceDelivery, args.PublishMessage.QoS);
2929
Assert.Equal("last/will2", args.PublishMessage.Topic);
3030
Assert.Equal("last will message", args.PublishMessage.PayloadAsString);

Tests/HiveMQtt.Test/HiveMQClient/LastWillAndTestamentBuilderTest.cs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ public async Task Last_Will_With_Properties_Async()
3939
Assert.True(connectResult.ReasonCode == ConnAckReasonCode.Success);
4040
Assert.True(listenerClient.IsConnected());
4141

42-
var messagesReceived = 0;
4342
var taskLWTReceived = new TaskCompletionSource<bool>();
4443
#pragma warning disable SA1010 // Opening square brackets should be spaced correctly
4544
byte[] correlationDataBytes = [1, 2, 3, 4, 5];
@@ -48,7 +47,6 @@ public async Task Last_Will_With_Properties_Async()
4847
// Set the event handler for the message received event
4948
listenerClient.OnMessageReceived += (sender, args) =>
5049
{
51-
messagesReceived++;
5250
Assert.Equal(QualityOfService.AtLeastOnceDelivery, args.PublishMessage.QoS);
5351
Assert.Equal("last/will7", args.PublishMessage.Topic);
5452
Assert.Equal("last will message", args.PublishMessage.PayloadAsString);
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
namespace HiveMQtt.Test.HiveMQClient;
2+
3+
using System.Text;
4+
using HiveMQtt.Client;
5+
using HiveMQtt.MQTT5.Types;
6+
using Xunit;
7+
8+
public class QueuedPublishesTest
9+
{
10+
[Fact]
11+
public async Task Queued_Messages_Chain_Async()
12+
{
13+
14+
var batchSize = 1000;
15+
16+
var tasks = new[]
17+
{
18+
Task.Run(this.RelayClientAsync),
19+
Task.Run(this.ReceiverClientAsync),
20+
Task.Run(this.PublisherClientAsync),
21+
};
22+
23+
var results = await Task.WhenAll(tasks).ConfigureAwait(false);
24+
Assert.Equal(batchSize, results[0]);
25+
Assert.Equal(batchSize, results[1]);
26+
Assert.Equal(batchSize, results[2]);
27+
}
28+
29+
private async Task<int> PublisherClientAsync()
30+
{
31+
var batchSize = 1000;
32+
var firstTopic = "hmq-tests-qmc/q1";
33+
34+
///////////////////////////////////////////////////////////////
35+
// Publish 1000 messages with an incrementing payload
36+
///////////////////////////////////////////////////////////////
37+
var publisherOptions = new HiveMQClientOptionsBuilder()
38+
.WithClientId("hmq-tests-qmc/q1-publisher")
39+
.WithCleanStart(false)
40+
.WithSessionExpiryInterval(40000)
41+
.Build();
42+
var publishClient = new HiveMQClient(publisherOptions);
43+
await publishClient.ConnectAsync().ConfigureAwait(false);
44+
45+
// Wait for 1 second to allow other tasks to subscribe
46+
await Task.Delay(1000).ConfigureAwait(false);
47+
48+
for (var i = 0; i < batchSize; i++)
49+
{
50+
// Make a JSON string payload with the current number
51+
var payload = Encoding.UTF8.GetBytes($"{{\"number\":{i}}}");
52+
53+
// Publish the message to the topic "hmq-tests/q1" with exactly once delivery
54+
await publishClient.PublishAsync(firstTopic, payload, QualityOfService.ExactlyOnceDelivery).ConfigureAwait(false);
55+
}
56+
57+
return batchSize;
58+
}
59+
60+
private async Task<int> RelayClientAsync()
61+
{
62+
var firstTopic = "hmq-tests-qmc/q1";
63+
var secondTopic = "hmq-tests-qmc/q2";
64+
65+
////////////////////////////////////////////////////////////////////////////
66+
// Subscribe to the first topic and relay the messages to a second topic
67+
////////////////////////////////////////////////////////////////////////////
68+
var subscriberOptions = new HiveMQClientOptionsBuilder()
69+
.WithClientId("hmq-tests-qmc/q1-q2-relay")
70+
.WithCleanStart(false)
71+
.WithSessionExpiryInterval(40000)
72+
.Build();
73+
var subscribeClient = new HiveMQClient(subscriberOptions);
74+
75+
var relayCount = 0;
76+
subscribeClient.OnMessageReceived += async (sender, args) =>
77+
{
78+
// Republish the Message to the second topic
79+
var payload = args.PublishMessage.Payload;
80+
var publishResult = await subscribeClient.PublishAsync(secondTopic, payload, QualityOfService.ExactlyOnceDelivery).ConfigureAwait(false);
81+
Assert.NotNull(publishResult.QoS2ReasonCode);
82+
83+
// Atomically increment the relayCount
84+
Interlocked.Increment(ref relayCount);
85+
};
86+
87+
await subscribeClient.ConnectAsync().ConfigureAwait(false);
88+
await subscribeClient.SubscribeAsync(firstTopic, QualityOfService.ExactlyOnceDelivery).ConfigureAwait(false);
89+
90+
// Wait until all messages are relayed
91+
await Task.Delay(5000).ConfigureAwait(false);
92+
return relayCount;
93+
}
94+
95+
private async Task<int> ReceiverClientAsync()
96+
{
97+
var secondTopic = "hmq-tests-qmc/q2";
98+
99+
////////////////////////////////////////////////////////////////////////////
100+
// Subscribe to the second topic and count the received messages
101+
////////////////////////////////////////////////////////////////////////////
102+
var receiverOptions = new HiveMQClientOptionsBuilder()
103+
.WithClientId("hmq-tests-qmc/q2-receiver")
104+
.WithCleanStart(false)
105+
.WithSessionExpiryInterval(40000)
106+
.Build();
107+
var receiverClient = new HiveMQClient(receiverOptions);
108+
109+
var receivedCount = 0;
110+
receiverClient.OnMessageReceived += (sender, args) => Interlocked.Increment(ref receivedCount);
111+
112+
await receiverClient.ConnectAsync().ConfigureAwait(false);
113+
await receiverClient.SubscribeAsync(secondTopic, QualityOfService.ExactlyOnceDelivery).ConfigureAwait(false);
114+
115+
// Wait for the receiver to receive all messages
116+
await Task.Delay(5000).ConfigureAwait(false);
117+
return receivedCount;
118+
}
119+
}

Tests/HiveMQtt.Test/HiveMQClient/PubSubTest.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,12 +69,12 @@ public async Task QoS1PubSubAsync()
6969
// Set the event handler for the message received event
7070
client.OnMessageReceived += (sender, args) =>
7171
{
72-
messagesReceived++;
7372
Assert.Equal(QualityOfService.AtLeastOnceDelivery, args.PublishMessage.QoS);
7473
Assert.Equal(testTopic, args.PublishMessage.Topic);
7574
Assert.Equal(testPayload, args.PublishMessage.PayloadAsString);
7675

77-
if (messagesReceived >= 5)
76+
Interlocked.Increment(ref messagesReceived);
77+
if (messagesReceived == 10 && taskCompletionSource.Task.IsCompleted == false)
7878
{
7979
taskCompletionSource.SetResult(true);
8080
}
@@ -83,7 +83,7 @@ public async Task QoS1PubSubAsync()
8383
Client.Results.PublishResult result;
8484

8585
// Publish 10 messages
86-
for (var i = 0; i < 5; i++)
86+
for (var i = 0; i < 10; i++)
8787
{
8888
result = await client.PublishAsync(testTopic, testPayload, QualityOfService.AtLeastOnceDelivery).ConfigureAwait(false);
8989
Assert.IsType<Client.Results.PublishResult>(result);
@@ -115,7 +115,7 @@ public async Task QoS2PubSubAsync()
115115
// Set the event handler for the message received event
116116
client.OnMessageReceived += (sender, args) =>
117117
{
118-
messagesReceived++;
118+
Interlocked.Increment(ref messagesReceived);
119119
Assert.Equal(QualityOfService.ExactlyOnceDelivery, args.PublishMessage.QoS);
120120
Assert.Equal(testTopic, args.PublishMessage.Topic);
121121
Assert.Equal(testPayload, args.PublishMessage.PayloadAsString);

Tests/HiveMQtt.Test/HiveMQClient/PublishTest.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ public async Task ThreeNodeQoS0ChainedPublishesAsync()
204204
#pragma warning disable VSTHRD100 // Avoid async void methods
205205
async void Client2MessageHandler(object? sender, OnMessageReceivedEventArgs eventArgs)
206206
{
207-
client2MessageCount++;
207+
Interlocked.Increment(ref client2MessageCount);
208208
if (sender is HiveMQClient client)
209209
{
210210
var publishResult = await client.PublishAsync("HMQ/SecondTopic", eventArgs.PublishMessage.PayloadAsString, QualityOfService.AtMostOnceDelivery).ConfigureAwait(true);
@@ -223,7 +223,7 @@ async void Client2MessageHandler(object? sender, OnMessageReceivedEventArgs even
223223
#pragma warning disable VSTHRD100 // Avoid async void methods
224224
async void Client3MessageHandler(object? sender, OnMessageReceivedEventArgs eventArgs)
225225
{
226-
client3MessageCount++;
226+
Interlocked.Increment(ref client3MessageCount);
227227
Assert.NotNull(eventArgs.PublishMessage);
228228
Assert.Equal("Hello World", eventArgs.PublishMessage.PayloadAsString);
229229
}
@@ -295,7 +295,7 @@ public async Task ThreeNodeQoS1ChainedPublishesAsync()
295295
#pragma warning disable VSTHRD100 // Avoid async void methods
296296
async void Client2MessageHandler(object? sender, OnMessageReceivedEventArgs eventArgs)
297297
{
298-
client2MessageCount++;
298+
Interlocked.Increment(ref client2MessageCount);
299299
if (sender is HiveMQClient client)
300300
{
301301
var publishResult = await client.PublishAsync("HMQ/SecondTopic", eventArgs.PublishMessage.PayloadAsString, QualityOfService.AtLeastOnceDelivery).ConfigureAwait(false);
@@ -315,7 +315,7 @@ async void Client2MessageHandler(object? sender, OnMessageReceivedEventArgs even
315315
// client 3 will receive the final message
316316
void Client3MessageHandler(object? sender, OnMessageReceivedEventArgs eventArgs)
317317
{
318-
client3MessageCount++;
318+
Interlocked.Increment(ref client3MessageCount);
319319
Assert.NotNull(eventArgs.PublishMessage);
320320
Assert.Equal("Hello World", eventArgs.PublishMessage.PayloadAsString);
321321
}
@@ -386,7 +386,7 @@ public async Task ThreeNodeQoS2ChainedPublishesAsync()
386386
#pragma warning disable VSTHRD100 // Avoid async void methods
387387
async void Client2MessageHandler(object? sender, OnMessageReceivedEventArgs eventArgs)
388388
{
389-
client2MessageCount++;
389+
Interlocked.Increment(ref client2MessageCount);
390390
var client = sender as HiveMQClient;
391391
#pragma warning disable CS8602 // Dereference of a possibly null reference.
392392
var publishResult = await client.PublishAsync("HMQ/SecondTopic", eventArgs.PublishMessage.PayloadAsString, QualityOfService.ExactlyOnceDelivery).ConfigureAwait(true);
@@ -405,7 +405,7 @@ async void Client2MessageHandler(object? sender, OnMessageReceivedEventArgs even
405405
var client3MessageCount = 0;
406406
void Client3MessageHandler(object? sender, OnMessageReceivedEventArgs eventArgs)
407407
{
408-
client3MessageCount++;
408+
Interlocked.Increment(ref client3MessageCount);
409409
Assert.NotNull(eventArgs.PublishMessage);
410410
Assert.Equal("Hello World", eventArgs.PublishMessage.PayloadAsString);
411411
}

Tests/HiveMQtt.Test/HiveMQClient/SubscribeBuilderTest.cs

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -176,26 +176,35 @@ public async Task PerSubHandlerWithSingleLevelWildcardAsync()
176176
var subscribeOptions = new SubscribeOptionsBuilder()
177177
.WithSubscription("tests/PerSubHandlerWithSingleLevelWildcard/+/msg", MQTT5.Types.QualityOfService.AtLeastOnceDelivery, messageReceivedHandler: (sender, args) =>
178178
{
179-
messageCount++;
180179
var pattern = @"^tests/PerSubHandlerWithSingleLevelWildcard/[0-2]/msg$";
181180
var regex = new Regex(pattern);
182181
Assert.Matches(regex, args.PublishMessage.Topic);
183182

184183
Assert.Equal("test", args.PublishMessage.PayloadAsString);
185184

185+
Interlocked.Increment(ref messageCount);
186186
if (messageCount == 3)
187187
{
188188
if (args.PublishMessage.Topic == "tests/PerSubHandlerWithSingleLevelWildcard/0/msg")
189189
{
190-
tcs1.SetResult(true);
190+
if (!tcs1.Task.IsCompleted)
191+
{
192+
tcs1.SetResult(true);
193+
}
191194
}
192195
else if (args.PublishMessage.Topic == "tests/PerSubHandlerWithSingleLevelWildcard/1/msg")
193196
{
194-
tcs2.SetResult(true);
197+
if (!tcs2.Task.IsCompleted)
198+
{
199+
tcs2.SetResult(true);
200+
}
195201
}
196202
else if (args.PublishMessage.Topic == "tests/PerSubHandlerWithSingleLevelWildcard/2/msg")
197203
{
198-
tcs3.SetResult(true);
204+
if (!tcs3.Task.IsCompleted)
205+
{
206+
tcs3.SetResult(true);
207+
}
199208
}
200209
}
201210
})
@@ -245,14 +254,14 @@ public async Task PerSubHandlerWithMultiLevelWildcardAsync()
245254
MQTT5.Types.QualityOfService.AtLeastOnceDelivery,
246255
messageReceivedHandler: (sender, args) =>
247256
{
248-
messageCount++;
249257
var pattern = @"\Atests/PerSubHandlerWithMultiLevelWildcard/(/?|.+)\z";
250258
var regex = new Regex(pattern);
251259
Assert.Matches(regex, args.PublishMessage.Topic);
252260

253261
Assert.Equal("test", args.PublishMessage.PayloadAsString);
254262

255-
if (messageCount == 3)
263+
Interlocked.Increment(ref messageCount);
264+
if (messageCount == 3 && !tcs.Task.IsCompleted)
256265
{
257266
tcs.SetResult(true);
258267
}

0 commit comments

Comments
 (0)