Skip to content

Commit 06f546f

Browse files
committed
Improved disposing and fixed tests
1 parent d0328fe commit 06f546f

File tree

6 files changed

+55
-49
lines changed

6 files changed

+55
-49
lines changed

projects/RabbitMQ.Client/client/RentedOutgoingMemory.cs

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,18 @@
22

33
using System;
44
using System.Buffers;
5+
using System.Diagnostics;
56
using System.IO.Pipelines;
67
using System.Threading.Tasks;
8+
using RabbitMQ.Client.Impl;
79

810
namespace RabbitMQ.Client
911
{
10-
internal class RentedOutgoingMemory : IDisposable
12+
internal sealed class RentedOutgoingMemory : IDisposable
1113
{
14+
private readonly TaskCompletionSource<bool>? _sendCompletionSource;
1215
private bool _disposedValue;
1316
private byte[]? _rentedArray;
14-
private TaskCompletionSource<bool>? _sendCompletionSource;
1517
private ReadOnlySequence<byte> _data;
1618

1719
public RentedOutgoingMemory(ReadOnlyMemory<byte> data, byte[]? rentedArray = null, bool waitSend = false)
@@ -50,30 +52,30 @@ internal ReadOnlySequence<byte> Data
5052
/// <summary>
5153
/// Mark the data as sent.
5254
/// </summary>
53-
public void DidSend()
55+
/// <returns><c>true</c> if the object can be disposed, <c>false</c> if the <see cref="SocketFrameHandler"/> is waiting for the data to be sent.</returns>
56+
public bool DidSend()
5457
{
5558
if (_sendCompletionSource is null)
5659
{
57-
Dispose();
58-
}
59-
else
60-
{
61-
_sendCompletionSource.SetResult(true);
60+
return true;
6261
}
62+
63+
_sendCompletionSource.SetResult(true);
64+
return false;
6365
}
6466

6567
/// <summary>
6668
/// Wait for the data to be sent.
6769
/// </summary>
68-
/// <returns>A <see cref="ValueTask"/> that completes when the data is sent.</returns>
69-
public ValueTask WaitForDataSendAsync()
70+
/// <returns><c>true</c> if the data was sent and the object can be disposed.</returns>
71+
public ValueTask<bool> WaitForDataSendAsync()
7072
{
71-
return _sendCompletionSource is null ? default : WaitForFinishCore();
73+
return _sendCompletionSource is null ? new ValueTask<bool>(false) : WaitForFinishCore();
7274

73-
async ValueTask WaitForFinishCore()
75+
async ValueTask<bool> WaitForFinishCore()
7476
{
7577
await _sendCompletionSource.Task.ConfigureAwait(false);
76-
Dispose();
78+
return true;
7779
}
7880
}
7981

@@ -92,6 +94,7 @@ private void Dispose(bool disposing)
9294
return;
9395
}
9496

97+
Debug.Assert(_sendCompletionSource is null or { Task.IsCompleted: true }, "The send task should be completed before disposing.");
9598
_disposedValue = true;
9699

97100
if (disposing)
@@ -109,7 +112,6 @@ private void Dispose(bool disposing)
109112
public void Dispose()
110113
{
111114
Dispose(disposing: true);
112-
GC.SuppressFinalize(this);
113115
}
114116
}
115117
}

projects/RabbitMQ.Client/client/impl/SessionBase.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ public ValueTask TransmitAsync<TMethod, THeader>(in TMethod cmd, in THeader head
180180
ThrowAlreadyClosedException();
181181
}
182182

183-
copyBody ??= body.Length > Connection.CopyBodyToMemoryThreshold;
183+
copyBody ??= body.Length <= Connection.CopyBodyToMemoryThreshold;
184184

185185
return Connection.WriteAsync(Framing.SerializeToFrames(ref Unsafe.AsRef(cmd), ref Unsafe.AsRef(header), body, ChannelNumber, Connection.MaxPayloadSize, copyBody.Value));
186186
}

projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -308,8 +308,13 @@ public async ValueTask WriteAsync(RentedOutgoingMemory frames)
308308
await _channelWriter.WriteAsync(frames)
309309
.ConfigureAwait(false);
310310

311-
await frames.WaitForDataSendAsync()
311+
bool didSend = await frames.WaitForDataSendAsync()
312312
.ConfigureAwait(false);
313+
314+
if (didSend)
315+
{
316+
frames.Dispose();
317+
}
313318
}
314319
}
315320

@@ -346,7 +351,10 @@ await _pipeWriter.FlushAsync()
346351
}
347352
finally
348353
{
349-
frames.DidSend();
354+
if (frames.DidSend())
355+
{
356+
frames.Dispose();
357+
}
350358
}
351359
}
352360

projects/Test/AsyncIntegration/TestBasicPublishCopyBodyAsync.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,9 @@ public async Task TestNonCopyingBody(ushort size)
3636

3737
Assert.Equal((uint)1, await _channel.QueuePurgeAsync(q));
3838

39-
// It is expected that the rented bytes is smaller than the size of the body
40-
// since we're not copying the body. Only the frame headers are rented.
41-
Assert.True(rentedBytes < size);
39+
// It is expected that the rented bytes is larger than the size of the body
40+
// since the body is copied with the frame headers.
41+
Assert.True(rentedBytes >= size);
4242
}
4343

4444
[Theory]
@@ -59,8 +59,8 @@ public async Task TestCopyingBody(ushort size)
5959

6060
Assert.Equal((uint)1, await _channel.QueuePurgeAsync(q));
6161

62-
// It is expected that the rented bytes is larger than the size of the body
63-
// since the body is copied with the frame headers.
64-
Assert.True(rentedBytes >= size);
62+
// It is expected that the rented bytes is smaller than the size of the body
63+
// since we're not copying the body. Only the frame headers are rented.
64+
Assert.True(rentedBytes < size);
6565
}
6666
}

projects/Test/AsyncIntegration/TestConcurrentAccessWithSharedConnectionAsync.cs

Lines changed: 16 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -55,37 +55,28 @@ public override async Task InitializeAsync()
5555
_conn.ConnectionShutdown += HandleConnectionShutdown;
5656
}
5757

58-
[Fact]
59-
public Task TestConcurrentChannelOpenAndPublishingWithBlankMessagesAsync()
58+
[Theory]
59+
[InlineData(false)]
60+
[InlineData(true)]
61+
public Task TestConcurrentChannelOpenAndPublishingWithBlankMessagesAsync(bool copyBody)
6062
{
61-
return TestConcurrentChannelOpenAndPublishingWithBodyAsync(Array.Empty<byte>(), 30);
63+
return TestConcurrentChannelOpenAndPublishingWithBodyAsync(Array.Empty<byte>(), copyBody, 30);
6264
}
6365

64-
[Fact]
65-
public Task TestConcurrentChannelOpenAndPublishingSize64Async()
66-
{
67-
return TestConcurrentChannelOpenAndPublishingWithBodyOfSizeAsync(64);
68-
}
69-
70-
[Fact]
71-
public Task TestConcurrentChannelOpenAndPublishingSize256Async()
72-
{
73-
return TestConcurrentChannelOpenAndPublishingWithBodyOfSizeAsync(256);
74-
}
75-
76-
[Fact]
77-
public Task TestConcurrentChannelOpenAndPublishingSize1024Async()
78-
{
79-
return TestConcurrentChannelOpenAndPublishingWithBodyOfSizeAsync(1024);
80-
}
81-
82-
private Task TestConcurrentChannelOpenAndPublishingWithBodyOfSizeAsync(ushort length, int iterations = 30)
66+
[Theory]
67+
[InlineData(64, false)]
68+
[InlineData(64, true)]
69+
[InlineData(256, false)]
70+
[InlineData(256, true)]
71+
[InlineData(1024, false)]
72+
[InlineData(1024, true)]
73+
public Task TestConcurrentChannelOpenAndPublishingWithBodyOfSizeAsync(ushort length, bool copyBody, int iterations = 30)
8374
{
8475
byte[] body = GetRandomBody(length);
85-
return TestConcurrentChannelOpenAndPublishingWithBodyAsync(body, iterations);
76+
return TestConcurrentChannelOpenAndPublishingWithBodyAsync(body, copyBody, iterations);
8677
}
8778

88-
private Task TestConcurrentChannelOpenAndPublishingWithBodyAsync(byte[] body, int iterations)
79+
private Task TestConcurrentChannelOpenAndPublishingWithBodyAsync(byte[] body, bool copyBody, int iterations)
8980
{
9081
return TestConcurrentChannelOperationsAsync(async (conn) =>
9182
{
@@ -128,7 +119,7 @@ private Task TestConcurrentChannelOpenAndPublishingWithBodyAsync(byte[] body, in
128119
QueueDeclareOk q = await ch.QueueDeclareAsync(queue: string.Empty, passive: false, durable: false, exclusive: true, autoDelete: true, arguments: null);
129120
for (ushort j = 0; j < _messageCount; j++)
130121
{
131-
await ch.BasicPublishAsync("", q.QueueName, body, mandatory: true);
122+
await ch.BasicPublishAsync("", q.QueueName, body, mandatory: true, copyBody: copyBody);
132123
}
133124

134125
Assert.True(await tcs.Task);

projects/Test/Unit/TestRentedOutgoingMemory.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,12 @@ public async Task TestNonBlocking()
1717
var waitTask = rentedMemory.WaitForDataSendAsync().AsTask();
1818
var timeoutTask = Task.Delay(100);
1919
var completedTask = await Task.WhenAny(timeoutTask, waitTask);
20+
bool didSend = rentedMemory.DidSend();
2021

2122
// Assert
2223
Assert.Equal(waitTask, completedTask);
24+
Assert.False(waitTask.Result);
25+
Assert.True(didSend);
2326
}
2427

2528
[Fact]
@@ -49,11 +52,13 @@ public async Task TestBlockingCompleted()
4952
var waitTask = rentedMemory.WaitForDataSendAsync().AsTask();
5053
var timeoutTask = Task.Delay(100);
5154

52-
rentedMemory.DidSend();
55+
bool didSend = rentedMemory.DidSend();
5356

5457
var completedTask = await Task.WhenAny(timeoutTask, waitTask);
5558

5659
// Assert
5760
Assert.Equal(waitTask, completedTask);
61+
Assert.True(waitTask.Result);
62+
Assert.False(didSend);
5863
}
5964
}

0 commit comments

Comments
 (0)