Skip to content

Commit f6fff53

Browse files
committed
feat(network): implement buffered stream for networked playback
Improves reliability of network audio playback by introducing a buffering layer. This prevents network stutters from directly impacting the audio thread, avoiding crashes or failures. - Adds `BufferedNetworkStream`, a thread-safe circular buffer that downloads and caches audio data in a background task. - Updates `DirectStreamProvider` to use `BufferedNetworkStream` for large or chunked network sources. - Optimizes `MiniAudioEngine` device enumeration to reduce memory allocations by reusing the device info array on refresh. - Adds `ReadIntoArray` extension to efficiently marshal native data into pre-allocated managed arrays, reducing GC pressure.
1 parent f57d5f1 commit f6fff53

File tree

4 files changed

+226
-20
lines changed

4 files changed

+226
-20
lines changed

Src/Backends/MiniAudio/MiniAudioEngine.cs

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@ protected override void CleanupBackend()
5757
Native.Free(_context);
5858
}
5959

60-
6160
/// <inheritdoc />
6261
public override AudioPlaybackDevice InitializePlaybackDevice(DeviceInfo? deviceInfo, AudioFormat format, DeviceConfig? config = null)
6362
{
@@ -225,27 +224,41 @@ public override void UpdateDevicesInfo()
225224
if (result != Result.Success)
226225
throw new InvalidOperationException($"Unable to get devices. MiniAudio result: {result}");
227226

228-
var playbackCount = (uint)playbackDeviceCountNint;
229-
var captureCount = (uint)captureDeviceCountNint;
227+
var playbackCountUint = (uint)playbackDeviceCountNint;
228+
var captureCountUint = (uint)captureDeviceCountNint;
229+
var playbackCount = (int)playbackCountUint;
230+
var captureCount = (int)captureCountUint;
230231

231232
try
232233
{
233234
// Marshal playback devices
234235
if (playbackCount > 0 && pPlaybackDevices != nint.Zero)
235-
PlaybackDevices = pPlaybackDevices.ReadArray<DeviceInfo>((int)playbackCount);
236+
{
237+
if (PlaybackDevices.Length != playbackCount) PlaybackDevices = new DeviceInfo[playbackCount];
238+
239+
// Read the native device information directly into our managed array.
240+
pPlaybackDevices.ReadIntoArray(PlaybackDevices, playbackCount);
241+
}
236242
else
237-
PlaybackDevices = [];
243+
{
244+
if (PlaybackDevices.Length != 0) PlaybackDevices = [];
245+
}
238246

239247
// Marshal capture devices
240248
if (captureCount > 0 && pCaptureDevices != nint.Zero)
241-
CaptureDevices = pCaptureDevices.ReadArray<DeviceInfo>((int)captureCount);
249+
{
250+
if (CaptureDevices.Length != captureCount) CaptureDevices = new DeviceInfo[captureCount];
251+
pCaptureDevices.ReadIntoArray(CaptureDevices, captureCount);
252+
}
242253
else
243-
CaptureDevices = [];
254+
{
255+
if (CaptureDevices.Length != 0) CaptureDevices = [];
256+
}
244257
}
245258
finally
246259
{
247-
if (pPlaybackDevices != nint.Zero) Native.FreeDeviceInfos(pPlaybackDevices, playbackCount);
248-
if (pCaptureDevices != nint.Zero) Native.FreeDeviceInfos(pCaptureDevices, captureCount);
260+
if (pPlaybackDevices != nint.Zero) Native.FreeDeviceInfos(pPlaybackDevices, playbackCountUint);
261+
if (pCaptureDevices != nint.Zero) Native.FreeDeviceInfos(pCaptureDevices, captureCountUint);
249262
}
250263
}
251264
}

Src/Providers/NetworkDataProvider.cs

Lines changed: 171 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
using SoundFlow.Interfaces;
88
using SoundFlow.Structs;
99

10-
1110
namespace SoundFlow.Providers;
1211

1312
/// <summary>
@@ -239,6 +238,7 @@ public virtual void Dispose()
239238

240239
/// <summary>
241240
/// Handles direct audio streams (e.g., MP3, WAV, OGG files).
241+
/// Uses a background buffering strategy for large files to prevent network issues from crashing the audio thread.
242242
/// </summary>
243243
internal sealed class DirectStreamProvider(AudioEngine engine, AudioFormat format, string url, HttpClient client)
244244
: NetworkDataProviderBase(engine, format, url, client)
@@ -270,8 +270,10 @@ public override async Task InitializeAsync()
270270
}
271271
else
272272
{
273-
// Otherwise, use the live network stream. Seeking will not be supported in this case.
274-
_stream = await response.Content.ReadAsStreamAsync();
273+
// For large or chunked streams, use a buffered stream.
274+
var bufferedStream = new BufferedNetworkStream();
275+
bufferedStream.StartProducerTask(response); // Starts the background download.
276+
_stream = bufferedStream;
275277
}
276278

277279
_decoder = Engine.CreateDecoder(_stream, Format);
@@ -606,4 +608,170 @@ public override void Dispose()
606608
_audioBuffer.Clear();
607609
}
608610
}
611+
}
612+
613+
/// <summary>
614+
/// A thread-safe, in-memory stream that acts as a circular buffer between a producer (network download)
615+
/// and a consumer (audio decoder). It blocks reads when empty and writes when full.
616+
/// </summary>
617+
internal sealed class BufferedNetworkStream(int bufferSize = 1 * 1024 * 1024) : Stream
618+
{
619+
private enum DownloadState { Buffering, Completed, Failed }
620+
621+
private readonly byte[] _buffer = new byte[bufferSize];
622+
private int _writePosition;
623+
private int _readPosition;
624+
private int _bytesAvailable;
625+
626+
private readonly object _lock = new();
627+
private volatile DownloadState _state = DownloadState.Buffering;
628+
private CancellationTokenSource? _cts;
629+
private Task? _producerTask;
630+
private bool _isDisposed;
631+
632+
/// <summary>
633+
/// Starts the background producer task that reads from the network response and fills the buffer.
634+
/// This method takes ownership of the HttpResponseMessage.
635+
/// </summary>
636+
/// <param name="sourceResponse">The HTTP response message containing the content stream.</param>
637+
public void StartProducerTask(HttpResponseMessage sourceResponse)
638+
{
639+
_cts = new CancellationTokenSource();
640+
_producerTask = Task.Run(async () =>
641+
{
642+
var tempBuffer = ArrayPool<byte>.Shared.Rent(16384); // 16KB read buffer
643+
try
644+
{
645+
await using var sourceStream = await sourceResponse.Content.ReadAsStreamAsync(_cts.Token);
646+
while (!_cts.IsCancellationRequested)
647+
{
648+
var bytesRead = await sourceStream.ReadAsync(tempBuffer, _cts.Token);
649+
if (bytesRead == 0) break; // End of network stream
650+
651+
Write(tempBuffer, 0, bytesRead);
652+
}
653+
654+
if (!_cts.IsCancellationRequested) SignalCompletion();
655+
}
656+
catch (Exception ex) when (ex is not OperationCanceledException)
657+
{
658+
// Network error occurred.
659+
SignalFailure();
660+
}
661+
finally
662+
{
663+
ArrayPool<byte>.Shared.Return(tempBuffer);
664+
sourceResponse.Dispose();
665+
}
666+
});
667+
}
668+
669+
public override int Read(byte[] buffer, int offset, int count)
670+
{
671+
lock (_lock)
672+
{
673+
while (_bytesAvailable == 0 && _state == DownloadState.Buffering && !_isDisposed)
674+
{
675+
Monitor.Wait(_lock);
676+
}
677+
678+
if (_bytesAvailable == 0) return 0; // End of stream or failure
679+
680+
var bytesToRead = Math.Min(count, _bytesAvailable);
681+
682+
// Read from circular buffer
683+
var firstChunkSize = Math.Min(bytesToRead, _buffer.Length - _readPosition);
684+
Buffer.BlockCopy(_buffer, _readPosition, buffer, offset, firstChunkSize);
685+
_readPosition = (_readPosition + firstChunkSize) % _buffer.Length;
686+
687+
if (firstChunkSize < bytesToRead)
688+
{
689+
var secondChunkSize = bytesToRead - firstChunkSize;
690+
Buffer.BlockCopy(_buffer, _readPosition, buffer, offset + firstChunkSize, secondChunkSize);
691+
_readPosition = (_readPosition + secondChunkSize) % _buffer.Length;
692+
}
693+
694+
_bytesAvailable -= bytesToRead;
695+
Monitor.PulseAll(_lock); // Signal producer that space is available
696+
return bytesToRead;
697+
}
698+
}
699+
700+
public override void Write(byte[] buffer, int offset, int count)
701+
{
702+
if (count == 0) return;
703+
704+
lock (_lock)
705+
{
706+
while (_buffer.Length - _bytesAvailable < count && !_isDisposed)
707+
{
708+
Monitor.Wait(_lock);
709+
}
710+
711+
if (_isDisposed) return;
712+
713+
// Write to circular buffer
714+
var firstChunkSize = Math.Min(count, _buffer.Length - _writePosition);
715+
Buffer.BlockCopy(buffer, offset, _buffer, _writePosition, firstChunkSize);
716+
_writePosition = (_writePosition + firstChunkSize) % _buffer.Length;
717+
718+
if (firstChunkSize < count)
719+
{
720+
var secondChunkSize = count - firstChunkSize;
721+
Buffer.BlockCopy(buffer, offset + firstChunkSize, _buffer, _writePosition, secondChunkSize);
722+
_writePosition = (_writePosition + secondChunkSize) % _buffer.Length;
723+
}
724+
725+
_bytesAvailable += count;
726+
Monitor.PulseAll(_lock); // Signal consumer that data is available
727+
}
728+
}
729+
730+
private void SignalCompletion()
731+
{
732+
lock (_lock)
733+
{
734+
_state = DownloadState.Completed;
735+
Monitor.PulseAll(_lock); // Wake any waiting readers to signal EOS
736+
}
737+
}
738+
739+
private void SignalFailure()
740+
{
741+
lock (_lock)
742+
{
743+
_state = DownloadState.Failed;
744+
Monitor.PulseAll(_lock); // Wake any waiting readers to signal EOS
745+
}
746+
}
747+
748+
protected override void Dispose(bool disposing)
749+
{
750+
if (_isDisposed) return;
751+
_isDisposed = true;
752+
753+
if (disposing)
754+
{
755+
lock (_lock)
756+
{
757+
_cts?.Cancel();
758+
Monitor.PulseAll(_lock); // Unblock any waiting threads
759+
}
760+
_producerTask?.Wait(TimeSpan.FromSeconds(5)); // Wait for producer to finish
761+
_cts?.Dispose();
762+
}
763+
764+
base.Dispose(disposing);
765+
}
766+
767+
#region Not Supported Stream Members
768+
public override bool CanRead => !_isDisposed;
769+
public override bool CanSeek => false;
770+
public override bool CanWrite => !_isDisposed;
771+
public override long Length => throw new NotSupportedException();
772+
public override long Position { get => throw new NotSupportedException(); set => throw new NotSupportedException(); }
773+
public override void Flush() { }
774+
public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
775+
public override void SetLength(long value) => throw new NotSupportedException();
776+
#endregion
609777
}

Src/SoundFlow.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
<PackageIcon>logo.png</PackageIcon>
1616
<PackageReadmeFile>README.md</PackageReadmeFile>
1717
<PackageTags>audio, sound, mp3, wav, playback, record, voice, volume, fft, simd, crossplatform, miniaudio, c#, .net, echo, noise</PackageTags>
18-
<Version>1.2.1-dev0</Version>
18+
<Version>1.2.1</Version>
1919
<PackageReleaseNotes>https://github.com/LSXPrime/SoundFlow/releases</PackageReleaseNotes>
2020
<GenerateDocumentationFile>true</GenerateDocumentationFile>
2121
<Company>LSXPrime</Company>

Src/Utils/Extensions.cs

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System.Runtime.InteropServices;
22
using SoundFlow.Enums;
3+
using SoundFlow.Structs;
34

45
namespace SoundFlow.Utils;
56

@@ -40,21 +41,45 @@ public static unsafe Span<T> GetSpan<T>(nint ptr, int length) where T : unmanage
4041
}
4142

4243
/// <summary>
43-
/// Reads an array of structures from a native memory pointer.
44+
/// Reads an array of structures from a native memory pointer into a pre-allocated destination array.
4445
/// </summary>
4546
/// <typeparam name="T">The type of the structures to read. Must be a value type.</typeparam>
4647
/// <param name="pointer">The native pointer to the start of the array.</param>
48+
/// <param name="destination">The pre-allocated array to write the structures into.</param>
4749
/// <param name="count">The number of structures to read.</param>
48-
/// <returns>An array of structures of type <typeparamref name="T"/> read from the specified pointer.</returns>
49-
public static T[] ReadArray<T>(this nint pointer, int count) where T : struct
50+
/// <exception cref="ArgumentException">Thrown if the destination array is smaller than <paramref name="count"/>.</exception>
51+
public static void ReadIntoArray<T>(this nint pointer, T[] destination, int count) where T : struct
5052
{
51-
var array = new T[count];
53+
if (destination.Length < count)
54+
{
55+
throw new ArgumentException("Destination array is not large enough to hold the requested number of items.", nameof(destination));
56+
}
57+
58+
if (count == 0) return;
59+
60+
var elementSize = Marshal.SizeOf<T>();
5261
for (var i = 0; i < count; i++)
5362
{
54-
var currentPtr = (nint)((long)pointer + i * Marshal.SizeOf<T>());
55-
array[i] = Marshal.PtrToStructure<T>(currentPtr);
63+
var currentPtr = (nint)(pointer + (long)i * elementSize);
64+
destination[i] = Marshal.PtrToStructure<T>(currentPtr);
5665
}
66+
}
67+
5768

69+
/// <summary>
70+
/// Reads an array of structures from a native memory pointer, allocating a new managed array.
71+
/// </summary>
72+
/// <typeparam name="T">The type of the structures to read. Must be a value type.</typeparam>
73+
/// <param name="pointer">The native pointer to the start of the array.</param>
74+
/// <param name="count">The number of structures to read.</param>
75+
/// <returns>A new array of structures of type <typeparamref name="T"/> read from the specified pointer.</returns>
76+
public static T[] ReadArray<T>(this nint pointer, int count) where T : struct
77+
{
78+
if (count == 0)
79+
return [];
80+
81+
var array = new T[count];
82+
ReadIntoArray(pointer, array, count);
5883
return array;
5984
}
60-
}
85+
}

0 commit comments

Comments
 (0)