diff --git a/PingPonger/Program.cs b/PingPonger/Program.cs index e276e2a..a060e58 100644 --- a/PingPonger/Program.cs +++ b/PingPonger/Program.cs @@ -68,7 +68,7 @@ public static int Main(string[] args) } if (options.TCP) { - logConfig.WriteTo.TCPSink(options.Url, options.Port, new LogstashJsonFormatter()); + logConfig.WriteTo.TCPSink(options.Url, options.Port, null, null, new LogstashJsonFormatter()); } } else if (options.IP.Length > 0) @@ -86,7 +86,7 @@ public static int Main(string[] args) } if (options.TCP) { - logConfig.WriteTo.TCPSink(ipAddress, options.Port, new LogstashJsonFormatter()); + logConfig.WriteTo.TCPSink(ipAddress, options.Port,null,null, new LogstashJsonFormatter()); } } else diff --git a/Serilog.Sinks.Network.Test/JsonFormatter.cs b/Serilog.Sinks.Network.Test/JsonFormatter.cs index 7a4487f..1024d2c 100644 --- a/Serilog.Sinks.Network.Test/JsonFormatter.cs +++ b/Serilog.Sinks.Network.Test/JsonFormatter.cs @@ -1,7 +1,8 @@ using System; using System.Net; +using System.Net.Sockets; +using System.Threading.Tasks; using FluentAssertions; -using Serilog.Core; using Serilog.Formatting; using Serilog.Sinks.Network.Formatters; using Xunit; @@ -10,30 +11,28 @@ namespace Serilog.Sinks.Network.Test { public class JsonFormatter { - private TCPServer _server; - private Logger _logger; - - - private void ConfigureTestLogger(ITextFormatter formatter = null) + private static LoggerAndSocket ConfigureTestLogger(ITextFormatter formatter = null) { - var port = new Random().Next(50003) + 10000; - _server = new TCPServer(IPAddress.Loopback, port); - _server.Start(); + var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + socket.Bind(new IPEndPoint(IPAddress.Loopback, 0)); + socket.Listen(); - _logger = new LoggerConfiguration() - .WriteTo.TCPSink(IPAddress.Loopback, port, formatter) + var logger = new LoggerConfiguration() + .WriteTo.TCPSink(IPAddress.Loopback, ((IPEndPoint)socket.LocalEndPoint!).Port, null, null, formatter) .CreateLogger(); + + return new LoggerAndSocket { Logger = logger, Socket = socket }; } [Fact] - public void MustNotLogATrailingCommaWhenThereAreNoProperties() + public async Task MustNotLogATrailingCommaWhenThereAreNoProperties() { - ConfigureTestLogger(new LogstashJsonFormatter()); + using var fixture = ConfigureTestLogger(new LogstashJsonFormatter()); var arbitraryMessage = nameof(JsonFormatter) + "MustNotLogATrailingCommaWhenThereAreNoProperties" + Guid.NewGuid(); - _logger.Information(arbitraryMessage); + fixture.Logger.Information(arbitraryMessage); - var receivedData = ServerPoller.PollForReceivedData(_server); + var receivedData = await ServerPoller.PollForReceivedData(fixture.Socket); var loggedData = receivedData?.TrimEnd('\n'); var logMessageWithTrailingComma = $"\"message\":\"{arbitraryMessage}\",}}"; @@ -41,14 +40,14 @@ public void MustNotLogATrailingCommaWhenThereAreNoProperties() } [Fact] - public void CanStillLogMessagesWithExceptions() + public async Task CanStillLogMessagesWithExceptions() { - ConfigureTestLogger(new LogstashJsonFormatter()); + using var fixture = ConfigureTestLogger(new LogstashJsonFormatter()); var arbitraryMessage = nameof(JsonFormatter) + "CanStillLogMessagesWithExceptions" + Guid.NewGuid(); - _logger.Information(new Exception("exploding"), arbitraryMessage); + fixture.Logger.Information(new Exception("exploding"), arbitraryMessage); - var receivedData = ServerPoller.PollForReceivedData(_server); + var receivedData = await ServerPoller.PollForReceivedData(fixture.Socket); receivedData.Should().Contain("\"exception\":\"System.Exception: exploding\"}"); } diff --git a/Serilog.Sinks.Network.Test/LoggerAndSocket.cs b/Serilog.Sinks.Network.Test/LoggerAndSocket.cs new file mode 100644 index 0000000..6e0ea48 --- /dev/null +++ b/Serilog.Sinks.Network.Test/LoggerAndSocket.cs @@ -0,0 +1,15 @@ +using System.Net.Sockets; + +namespace Serilog.Sinks.Network.Test +{ + public record LoggerAndSocket : System.IDisposable + { + public required ILogger Logger; + public required Socket Socket; + public void Dispose() + { + Socket.Dispose(); + } + } + +} \ No newline at end of file diff --git a/Serilog.Sinks.Network.Test/ServerPoller.cs b/Serilog.Sinks.Network.Test/ServerPoller.cs index e99076a..3a0a9ac 100644 --- a/Serilog.Sinks.Network.Test/ServerPoller.cs +++ b/Serilog.Sinks.Network.Test/ServerPoller.cs @@ -1,29 +1,46 @@ using System; -using System.Diagnostics; -using System.Linq; +using System.Collections.Generic; +using System.Net.Sockets; +using System.Text; +using System.Threading; +using System.Threading.Tasks; namespace Serilog.Sinks.Network.Test { internal static class ServerPoller { - public static string PollForReceivedData(DataReceiver dataReceiver) + public static async Task PollForReceivedData(Socket socket, bool udp = false) { - var stopwatch = Stopwatch.StartNew(); - string receivedData = null; - while (string.IsNullOrEmpty(receivedData)) + var buffer = new byte[1000]; + var cts = new CancellationTokenSource(); + cts.CancelAfter(TimeSpan.FromSeconds(30.0)); + var result = new List(); + + Socket clientSocket; + if (udp) + { + clientSocket = socket; + } + else { - receivedData = dataReceiver.ReceivedData.SingleOrDefault(); - if (stopwatch.Elapsed > TimeSpan.FromSeconds(5)) + clientSocket = await socket.AcceptAsync(cts.Token); + } + var isDone = false; + while (!isDone) + { + int readResult = await clientSocket.ReceiveAsync(buffer, SocketFlags.None, cts.Token); + for (var i = 0; i < readResult; i++) { - throw new NoDataReceivedWithinFiveSeconds(); + result.Add(buffer[i]); } - } - return receivedData; + if (readResult < buffer.Length) + { + isDone = true; + } + } + + return Encoding.ASCII.GetString(result.ToArray()); } } - - internal class NoDataReceivedWithinFiveSeconds : Exception - { - } } \ No newline at end of file diff --git a/Serilog.Sinks.Network.Test/WhenLoggingViaTCP.cs b/Serilog.Sinks.Network.Test/WhenLoggingViaTCP.cs deleted file mode 100644 index 5ef78ca..0000000 --- a/Serilog.Sinks.Network.Test/WhenLoggingViaTCP.cs +++ /dev/null @@ -1,78 +0,0 @@ -using System; -using System.Dynamic; -using System.Net; -using FluentAssertions; -using Newtonsoft.Json; -using Serilog.Formatting; -using Serilog.Formatting.Compact; -using Serilog.Sinks.Network.Formatters; -using Xunit; - -namespace Serilog.Sinks.Network.Test -{ - public class WhenLoggingViaTCP : IDisposable - { - private ILogger _logger; - private TCPServer _server; - - private void ConfigureTestLogger(ITextFormatter formatter = null) - { - var port = new Random().Next(50020) + 10000; - _server = new TCPServer(IPAddress.Loopback, port); - _server.Start(); - - _logger = new LoggerConfiguration() - .WriteTo.TCPSink(IPAddress.Loopback, port, formatter) - .CreateLogger(); - } - - [Fact] - public void CanLogHelloWorld_WithLogstashJsonFormatter() - { - ConfigureTestLogger(new LogstashJsonFormatter()); - var arbitraryMessage = nameof(WhenLoggingViaTCP) + "CanLogHelloWorld_WithLogstashJsonFormatter" + Guid.NewGuid(); - _logger.Information(arbitraryMessage); - var receivedData = ServerPoller.PollForReceivedData(_server); - receivedData.Should().Contain($"\"message\":\"{arbitraryMessage}\""); - } - - [Fact] - public void CanLogHelloWorld_WithDefaultFormatter() - { - ConfigureTestLogger(); - var arbitraryMessage = nameof(WhenLoggingViaTCP) + "CanLogHelloWorld_WithDefaultFormatter" + Guid.NewGuid(); - _logger.Information(arbitraryMessage); - - var receivedData = ServerPoller.PollForReceivedData(_server); - - receivedData.Should().Contain($"\"message\":\"{arbitraryMessage}\""); - } - - [Fact] - public void CanLogHelloWorld_WithRawFormatter() - { - ConfigureTestLogger(new CompactJsonFormatter()); - var arbitraryMessage = nameof(WhenLoggingViaTCP) + "CanLogHelloWorld_WithCompactJsonFormatter" + Guid.NewGuid(); - _logger.Information(arbitraryMessage); - var receivedData = ServerPoller.PollForReceivedData(_server); - receivedData.Should().Contain($"\"{arbitraryMessage}\""); - } - - [Fact] - public void CanLogWithProperties() - { - ConfigureTestLogger(); - _logger.Information("TCP Hello {location}", "world"); - var receivedData = ServerPoller.PollForReceivedData(_server); - dynamic payload = JsonConvert.DeserializeObject(receivedData); - Assert.Equal("Information", payload.level); - Assert.Equal("TCP Hello \"world\"", payload.message); - Assert.Equal("world", payload.location); - } - - public void Dispose() - { - _server.Stop(); - } - } -} diff --git a/Serilog.Sinks.Network.Test/WhenLoggingViaTcp.cs b/Serilog.Sinks.Network.Test/WhenLoggingViaTcp.cs new file mode 100644 index 0000000..192e9fb --- /dev/null +++ b/Serilog.Sinks.Network.Test/WhenLoggingViaTcp.cs @@ -0,0 +1,79 @@ +using System; +using System.Dynamic; +using System.Net; +using System.Net.Sockets; +using System.Threading.Tasks; +using FluentAssertions; +using FluentAssertions.Execution; +using Newtonsoft.Json; +using Serilog.Formatting; +using Serilog.Formatting.Compact; +using Serilog.Sinks.Network.Formatters; +using Xunit; + +namespace Serilog.Sinks.Network.Test +{ + public class WhenLoggingViaTcp + { + private static LoggerAndSocket ConfigureTestLogger(ITextFormatter formatter = null) + { + var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + socket.Bind(new IPEndPoint(IPAddress.Loopback, 0)); + socket.Listen(); + + var logger = new LoggerConfiguration() + .WriteTo.TCPSink(IPAddress.Loopback, ((IPEndPoint)socket.LocalEndPoint!).Port, null, null, formatter) + .CreateLogger(); + + return new LoggerAndSocket { Logger = logger, Socket = socket }; + } + + [Fact] + public async Task CanLogHelloWorld_WithLogstashJsonFormatter() + { + using var fixture = ConfigureTestLogger(new LogstashJsonFormatter()); + var arbitraryMessage = nameof(WhenLoggingViaTcp) + "CanLogHelloWorld_WithLogstashJsonFormatter" + Guid.NewGuid(); + fixture.Logger.Information(arbitraryMessage); + var receivedData = await ServerPoller.PollForReceivedData(fixture.Socket); + receivedData.Should().Contain($"\"message\":\"{arbitraryMessage}\""); + } + + [Fact] + public async Task CanLogHelloWorld_WithDefaultFormatter() + { + using var fixture = ConfigureTestLogger(); + var arbitraryMessage = nameof(WhenLoggingViaTcp) + "CanLogHelloWorld_WithDefaultFormatter" + Guid.NewGuid(); + fixture.Logger.Information(arbitraryMessage); + + var receivedData = await ServerPoller.PollForReceivedData(fixture.Socket); + + receivedData.Should().Contain($"\"message\":\"{arbitraryMessage}\""); + } + + [Fact] + public async Task CanLogHelloWorld_WithRawFormatter() + { + using var fixture =ConfigureTestLogger(new CompactJsonFormatter()); + var arbitraryMessage = nameof(WhenLoggingViaTcp) + "CanLogHelloWorld_WithCompactJsonFormatter" + Guid.NewGuid(); + fixture.Logger.Information(arbitraryMessage); + var receivedData = await ServerPoller.PollForReceivedData(fixture.Socket); + receivedData.Should().Contain($"\"{arbitraryMessage}\""); + } + + [Fact] + public async Task CanLogWithProperties() + { + using var fixture = ConfigureTestLogger(); + fixture.Logger.Information("TCP Hello {location}", "world"); + var receivedData = await ServerPoller.PollForReceivedData(fixture.Socket); + dynamic payload = JsonConvert.DeserializeObject(receivedData); + if (payload == null) + { + throw new AssertionFailedException("expected payload not null"); + } + Assert.Equal("Information", payload.level); + Assert.Equal("TCP Hello \"world\"", payload.message); + Assert.Equal("world", payload.location); + } + } +} diff --git a/Serilog.Sinks.Network.Test/WhenLoggingViaUDP.cs b/Serilog.Sinks.Network.Test/WhenLoggingViaUDP.cs deleted file mode 100644 index 0921076..0000000 --- a/Serilog.Sinks.Network.Test/WhenLoggingViaUDP.cs +++ /dev/null @@ -1,84 +0,0 @@ -using System; -using System.Dynamic; -using System.Net; -using FluentAssertions; -using Newtonsoft.Json; -using Serilog.Formatting; -using Serilog.Formatting.Compact; -using Serilog.Sinks.Network.Formatters; -using Xunit; - -namespace Serilog.Sinks.Network.Test -{ - public class WhenLoggingViaUDP : IDisposable - { - private ILogger _logger; - private UDPListener _listener; - - private void ConfigureTestLogger(ITextFormatter formatter = null) - { - var port = new Random().Next(50123) + 10235; - _logger = new LoggerConfiguration() - .WriteTo.UDPSink(IPAddress.Loopback, port, formatter) - .CreateLogger(); - - _listener = new UDPListener(port); - _listener.Start(); - } - - [Fact] - public void CanLogHelloWorld_WithLogstashJsonFormatter() - { - ConfigureTestLogger(new LogstashJsonFormatter()); - var arbitraryMessage = nameof(WhenLoggingViaUDP) + "CanLogHelloWorld_WithLogstashJsonFormatter" + Guid.NewGuid(); - _logger.Information(arbitraryMessage); - var receivedData = ServerPoller.PollForReceivedData(_listener); - receivedData.Should().Contain($"\"message\":\"{arbitraryMessage}\""); - } - - [Fact] - public void CanLogHelloWorld_WithDefaultFormatter() - { - ConfigureTestLogger(); - var arbitraryMessage = nameof(WhenLoggingViaUDP) + "CanLogHelloWorld_WithDefaultFormatter" + Guid.NewGuid(); - _logger.Information(arbitraryMessage); - var receivedData = ServerPoller.PollForReceivedData(_listener); - receivedData.Should().Contain($"\"message\":\"{arbitraryMessage}\""); - } - - [Fact] - public void CanLogHelloWorld_WithRawFormatter() - { -#pragma warning disable 618 - // specifically testing the deprecated RawFormatter - ConfigureTestLogger(new CompactJsonFormatter()); -#pragma warning restore 618 - - var arbitraryMessage = nameof(WhenLoggingViaUDP) + "CanLogHelloWorld_WithCompactJsonFormatter" + Guid.NewGuid(); - _logger.Information(arbitraryMessage); - var receivedData = ServerPoller.PollForReceivedData(_listener); - - - receivedData.Should().Contain($"\"{arbitraryMessage}\""); - } - - [Fact] - public void CanLogWithProperties() - { - ConfigureTestLogger(); - - _logger.Information("UDP Hello {location}", "world"); - var receivedData = ServerPoller.PollForReceivedData(_listener); - var stringPayload = receivedData; - dynamic payload = JsonConvert.DeserializeObject(stringPayload); - Assert.Equal("Information", payload.level); - Assert.Equal("UDP Hello \"world\"", payload.message); - Assert.Equal("world", payload.location); - } - - public void Dispose() - { - _listener.Stop(); - } - } -} diff --git a/Serilog.Sinks.Network.Test/WhenLoggingViaUdp.cs b/Serilog.Sinks.Network.Test/WhenLoggingViaUdp.cs new file mode 100644 index 0000000..e220d81 --- /dev/null +++ b/Serilog.Sinks.Network.Test/WhenLoggingViaUdp.cs @@ -0,0 +1,74 @@ +using System; +using System.Dynamic; +using System.Net; +using System.Net.Sockets; +using System.Threading.Tasks; +using FluentAssertions; +using FluentAssertions.Execution; +using Newtonsoft.Json; +using Serilog.Formatting; +using Serilog.Formatting.Compact; +using Serilog.Sinks.Network.Formatters; +using Xunit; + +namespace Serilog.Sinks.Network.Test +{ + public class WhenLoggingViaUdp + { + private static LoggerAndSocket ConfigureTestLogger(ITextFormatter formatter = null) + { + var socket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp); + socket.Bind(new IPEndPoint(IPAddress.Loopback, 0)); + + var logger = new LoggerConfiguration() + .WriteTo.UDPSink(IPAddress.Loopback, ((IPEndPoint)socket.LocalEndPoint!).Port, formatter) + .CreateLogger(); + + return new LoggerAndSocket { Logger = logger, Socket = socket }; + } + + [Fact] + public async Task CanLogHelloWorld_WithLogstashJsonFormatter() + { + using var fixture = ConfigureTestLogger(new LogstashJsonFormatter()); + var arbitraryMessage = nameof(WhenLoggingViaUdp) + "CanLogHelloWorld_WithLogstashJsonFormatter" + Guid.NewGuid(); + fixture.Logger.Information(arbitraryMessage); + var receivedData = await ServerPoller.PollForReceivedData(fixture.Socket, udp: true); + receivedData.Should().Contain($"\"message\":\"{arbitraryMessage}\""); + } + + [Fact] + public async Task CanLogHelloWorld_WithDefaultFormatter() + { + using var fixture = ConfigureTestLogger(); + var arbitraryMessage = nameof(WhenLoggingViaUdp) + "CanLogHelloWorld_WithDefaultFormatter" + Guid.NewGuid(); + fixture.Logger.Information(arbitraryMessage); + var receivedData = await ServerPoller.PollForReceivedData(fixture.Socket, udp: true); + receivedData.Should().Contain($"\"message\":\"{arbitraryMessage}\""); + } + + [Fact] + public async Task CanLogHelloWorld_WithCompactJsonFormatter() + { + using var fixture = ConfigureTestLogger(new CompactJsonFormatter()); + var arbitraryMessage = nameof(WhenLoggingViaUdp) + "CanLogHelloWorld_WithCompactJsonFormatter" + Guid.NewGuid(); + fixture.Logger.Information(arbitraryMessage); + var receivedData = await ServerPoller.PollForReceivedData(fixture.Socket, udp: true); + receivedData.Should().Contain($"\"{arbitraryMessage}\""); + } + + [Fact] + public async Task CanLogWithProperties() + { + using var fixture = ConfigureTestLogger(); + + fixture.Logger.Information("UDP Hello {location}", "world"); + var receivedData = await ServerPoller.PollForReceivedData(fixture.Socket, udp: true); + var stringPayload = receivedData; + dynamic payload = JsonConvert.DeserializeObject(stringPayload) ?? throw new AssertionFailedException("expected deserialization to work"); + Assert.Equal("Information", payload.level); + Assert.Equal("UDP Hello \"world\"", payload.message); + Assert.Equal("world", payload.location); + } + } +} diff --git a/Serilog.Sinks.Network/NetworkLoggerConfigurationExtensions.cs b/Serilog.Sinks.Network/NetworkLoggerConfigurationExtensions.cs index 9498847..b554f54 100644 --- a/Serilog.Sinks.Network/NetworkLoggerConfigurationExtensions.cs +++ b/Serilog.Sinks.Network/NetworkLoggerConfigurationExtensions.cs @@ -20,7 +20,7 @@ public static LoggerConfiguration UDPSink( this LoggerSinkConfiguration loggerConfiguration, string uri, int port, - ITextFormatter textFormatter = null, + ITextFormatter? textFormatter = null, LogEventLevel restrictedToMinimumLevel = LevelAlias.Minimum) { return UDPSink(loggerConfiguration, ResolveAddress(uri), port, textFormatter, restrictedToMinimumLevel); @@ -30,7 +30,7 @@ public static LoggerConfiguration UDPSink( this LoggerSinkConfiguration loggerConfiguration, IPAddress ipAddress, int port, - ITextFormatter textFormatter = null, + ITextFormatter? textFormatter = null, LogEventLevel restrictedToMinimumLevel = LevelAlias.Minimum) { var sink = new UDPSink(ipAddress, port, textFormatter ?? new LogstashJsonFormatter()); @@ -41,37 +41,63 @@ public static LoggerConfiguration TCPSink( this LoggerSinkConfiguration loggerConfiguration, IPAddress ipAddress, int port, - ITextFormatter textFormatter = null, + int? writeTimeoutMs = null, + int? disposeTimeoutMs = null, + ITextFormatter? textFormatter = null, LogEventLevel restrictedToMinimumLevel = LevelAlias.Minimum) { - var sink = new TCPSink(ipAddress, port, textFormatter ?? new LogstashJsonFormatter()); - return loggerConfiguration.Sink(sink, restrictedToMinimumLevel); + return TCPSink(loggerConfiguration, $"tcp://{ipAddress}:{port}", writeTimeoutMs, disposeTimeoutMs, textFormatter, restrictedToMinimumLevel); + } + + public static LoggerConfiguration TCPSink( + this LoggerSinkConfiguration loggerConfiguration, + IPAddress ipAddress, + int port, + LogEventLevel restrictedToMinimumLevel = LevelAlias.Minimum) + { + return TCPSink(loggerConfiguration, ipAddress, port,null, null, null, restrictedToMinimumLevel); } public static LoggerConfiguration TCPSink( this LoggerSinkConfiguration loggerConfiguration, string host, int port, - ITextFormatter textFormatter = null, + int? writeTimeoutMs = null, + int? disposeTimeoutMs = null, + ITextFormatter? textFormatter = null, LogEventLevel restrictedToMinimumLevel = LevelAlias.Minimum) { - return TCPSink(loggerConfiguration, $"{host}:{port}", textFormatter, restrictedToMinimumLevel); + return TCPSink(loggerConfiguration, $"{host}:{port}", writeTimeoutMs, disposeTimeoutMs, textFormatter, restrictedToMinimumLevel); } public static LoggerConfiguration TCPSink( this LoggerSinkConfiguration loggerConfiguration, string uri, - ITextFormatter textFormatter = null, + int? writeTimeoutMs = null, + int? disposeTimeoutMs = null, + ITextFormatter? textFormatter = null, LogEventLevel restrictedToMinimumLevel = LevelAlias.Minimum) { - var sink = new TCPSink(BuildUri(uri), textFormatter ?? new LogstashJsonFormatter()); + var socketWriter = new TcpSocketWriter( + BuildUri(uri), + writeTimeoutMs: writeTimeoutMs, + disposeTimeoutMs: disposeTimeoutMs); + var sink = new TCPSink(socketWriter, textFormatter ?? new LogstashJsonFormatter()); return loggerConfiguration.Sink(sink, restrictedToMinimumLevel); } + + public static LoggerConfiguration TCPSink( + this LoggerSinkConfiguration loggerConfiguration, + string uri, + LogEventLevel restrictedToMinimumLevel = LevelAlias.Minimum) + { + return TCPSink(loggerConfiguration, uri, null, null, null, restrictedToMinimumLevel); + } private static IPAddress ResolveAddress(string uri) { // Check if it is IP address - IPAddress address; + IPAddress? address; if (IPAddress.TryParse(uri, out address)) return address; @@ -84,7 +110,7 @@ private static IPAddress ResolveAddress(string uri) return IPAddress.Loopback; } - private static IPAddress ResolveIP(string uri) + private static IPAddress? ResolveIP(string uri) { try { @@ -111,6 +137,7 @@ private static Uri BuildUri(string s) { throw new ArgumentNullException("Uri should be in the format tcp://server:port", ex); } + if (uri.Port == 0) throw new UriFormatException("Uri port cannot be 0"); if (!(uri.Scheme.ToLower() == "tcp" || uri.Scheme.ToLower() == "tls")) diff --git a/Serilog.Sinks.Network/Serilog.Sinks.Network.csproj b/Serilog.Sinks.Network/Serilog.Sinks.Network.csproj index b143585..7080a49 100644 --- a/Serilog.Sinks.Network/Serilog.Sinks.Network.csproj +++ b/Serilog.Sinks.Network/Serilog.Sinks.Network.csproj @@ -1,7 +1,9 @@ - netstandard2.0 + net8.0 + Serilog.Sinks.Network + enable diff --git a/Serilog.Sinks.Network/Sinks/TCP/TCPSink.cs b/Serilog.Sinks.Network/Sinks/TCP/TCPSink.cs index f6ff116..0f13999 100644 --- a/Serilog.Sinks.Network/Sinks/TCP/TCPSink.cs +++ b/Serilog.Sinks.Network/Sinks/TCP/TCPSink.cs @@ -13,15 +13,14 @@ public class TCPSink : ILogEventSink, IDisposable private readonly ITextFormatter _formatter; private readonly TcpSocketWriter _socketWriter; - public TCPSink(IPAddress ipAddress, int port, ITextFormatter formatter) + public TCPSink(Uri uri, ITextFormatter formatter) + : this(new TcpSocketWriter(uri), formatter) { - _socketWriter = new TcpSocketWriter(new Uri($"tcp://{ipAddress}:{port}")); - _formatter = formatter; } - public TCPSink(Uri uri, ITextFormatter formatter) + public TCPSink(TcpSocketWriter socketWriter, ITextFormatter formatter) { - _socketWriter = new TcpSocketWriter(uri); + _socketWriter = socketWriter; _formatter = formatter; } diff --git a/Serilog.Sinks.Network/Sinks/TCP/TCPSocketWriter.cs b/Serilog.Sinks.Network/Sinks/TCP/TCPSocketWriter.cs index 42ea400..96da430 100644 --- a/Serilog.Sinks.Network/Sinks/TCP/TCPSocketWriter.cs +++ b/Serilog.Sinks.Network/Sinks/TCP/TCPSocketWriter.cs @@ -45,11 +45,13 @@ namespace Serilog.Sinks.Network.Sinks.TCP public class TcpSocketWriter : IDisposable { private readonly FixedSizeQueue _eventQueue; - private readonly ExponentialBackoffTcpReconnectionPolicy _reconnectPolicy = new ExponentialBackoffTcpReconnectionPolicy(); - private readonly CancellationTokenSource _tokenSource; // Must be private or Dispose will not function properly. - private readonly TaskCompletionSource _disposed = new TaskCompletionSource(); + private readonly CancellationTokenSource _disposingCts; // Must be private or Dispose will not function properly. + private readonly CancellationTokenSource _forceQuitCts; + private readonly TaskCompletionSource _disposed = new(); + private readonly TimeSpan _writeTimeout; + private readonly TimeSpan _disposeTimeout; - private Stream _stream; + private Stream? _stream; /// /// Event that is invoked when reconnecting after a TCP session is dropped fails. @@ -98,116 +100,166 @@ public static void UnexpectedErrorLogger(Exception ex, Action /// Uri to open a TCP socket to. /// The maximum number of log entries to queue before starting to drop entries. - public TcpSocketWriter(Uri uri, int maxQueueSize = 5000) + /// The amount of time, in milliseconds, before timing out any write operation. Defaults to 30_000. + /// The amount of time, in milliseconds, before timing out .Dispose and giving up flushing messages. Defaults to 30_000. + public TcpSocketWriter( + Uri uri, + int? writeTimeoutMs = null, + int? disposeTimeoutMs = null, + int maxQueueSize = 5000) { _eventQueue = new FixedSizeQueue(maxQueueSize); - _tokenSource = new CancellationTokenSource(); + _disposingCts = new CancellationTokenSource(); + _forceQuitCts = new CancellationTokenSource(); + _writeTimeout = TimeSpan.FromMilliseconds(writeTimeoutMs ?? 30_000); + _disposeTimeout = TimeSpan.FromMilliseconds(disposeTimeoutMs ?? 30_000); - Func> tryOpenSocket = async h => + var tryOpenStream = () => + ExponentialBackoffTcpReconnectionPolicy.ConnectAsync(OpenSocket, uri, _disposingCts.Token); + + var threadReady = new TaskCompletionSource(); + + var queueListener = Task.Factory.StartNew(async () => { try { - TcpClient client = new TcpClient(); - await client.ConnectAsync(uri.Host, uri.Port); - Stream stream = client.GetStream(); - if (uri.Scheme.ToLower() != "tls") - return stream; - - var sslStream = new SslStream(client.GetStream(), false, null, null); - await sslStream.AuthenticateAsClientAsync(uri.Host); - return sslStream; + _stream = await tryOpenStream(); + threadReady.SetResult(true); // Signal the calling thread that we are ready. + await Run(tryOpenStream); } catch (Exception e) { LoggingFailureHandler(e); - throw; } - }; + finally + { + _stream?.Dispose(); + _disposed.SetResult(true); + } + }, CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default); + threadReady.Task.Wait(TimeSpan.FromSeconds(5)); + } - var threadReady = new TaskCompletionSource(); + private async Task OpenSocket(Uri uri) + { + try + { + var client = new TcpClient(); + await client.ConnectAsync(uri.Host, uri.Port); + var stream = client.GetStream(); + stream.WriteTimeout = (int)_writeTimeout.TotalMilliseconds; + + if (uri.Scheme.ToLowerInvariant() != "tls") + return stream; + + var sslStream = new SslStream(client.GetStream(), false, null , null); + await sslStream.AuthenticateAsClientAsync(uri.Host); + return sslStream; + } + catch (Exception e) + { + LoggingFailureHandler(e); + throw; + } + } - Task queueListener = Task.Factory.StartNew(async () => + private async Task Run(Func> tryOpenStream) + { + string? entry = null; + while (_stream != null) // null indicates that the thread has been cancelled and cleaned up. { - try + // If we are in the middle of .Dispose... + if (_disposingCts.Token.IsCancellationRequested) { - bool sslEnabled = uri.Scheme.ToLower() == "tls"; - _stream = await _reconnectPolicy.ConnectAsync(tryOpenSocket, uri, _tokenSource.Token); - threadReady.SetResult(true); // Signal the calling thread that we are ready. + // Post-condition: no further items will be added to the queue, so there will be a finite number of items to handle. + _eventQueue.CompleteAdding(); + await FlushQueue(entry); + break; + } - string entry = null; - while (_stream != null) // null indicates that the thread has been cancelled and cleaned up. + + if (entry == null) + { + entry = _eventQueue.Dequeue(_disposingCts.Token); + } + else + { + try { - if (_tokenSource.Token.IsCancellationRequested) - { - _eventQueue.CompleteAdding(); - // Post-condition: no further items will be added to the queue, so there will be a finite number of items to handle. - while (_eventQueue.Count > 0) - { - entry = _eventQueue.Dequeue(); - try - { - byte[] messsage = Encoding.UTF8.GetBytes(entry); - await _stream.WriteAsync(messsage, 0, messsage.Length); - await _stream.FlushAsync(); - } - catch (SocketException ex) - { - LoggingFailureHandler(ex); - } - } - break; - } - if (entry == null) - { - entry = _eventQueue.Dequeue(_tokenSource.Token); - } - else - { - try - { - byte[] messsage = Encoding.UTF8.GetBytes(entry); - await _stream.WriteAsync(messsage, 0, messsage.Length); - await _stream.FlushAsync(); - // No exception, it was sent - entry = null; - } - catch (IOException ex) - { - LoggingFailureHandler(ex); - _stream = await _reconnectPolicy.ConnectAsync(tryOpenSocket, uri, _tokenSource.Token); - } - catch (SocketException ex) - { - LoggingFailureHandler(ex); - _stream = await _reconnectPolicy.ConnectAsync(tryOpenSocket, uri, _tokenSource.Token); - } - } + var message = Encoding.UTF8.GetBytes(entry); + await _stream.WriteAsync(message, _disposingCts.Token); + await _stream.FlushAsync(_disposingCts.Token); + // No exception, it was sent + entry = null; + } + catch (IOException ex) + { + LoggingFailureHandler(ex); + _stream = await tryOpenStream(); + } + catch (SocketException ex) + { + LoggingFailureHandler(ex); + _stream = await tryOpenStream(); } } - catch (Exception e) + } + } + + private async Task FlushQueue(string entry) + { + while (_eventQueue.Count > 0) + { + if(_forceQuitCts.Token.IsCancellationRequested) + break; + + if (entry == null) { - LoggingFailureHandler(e); + entry = _eventQueue.Dequeue(_disposingCts.Token); } - finally + else { - if (_stream != null) + if (_stream == null) { - _stream.Dispose(); + // Logic error: this should never be null here. + if (_forceQuitCts.Token.IsCancellationRequested) + break; + LoggingFailureHandler(new InvalidOperationException("_stream was null and should not be null")); + } + else + { + try + { + var message = Encoding.UTF8.GetBytes(entry); + await _stream.WriteAsync(message, _forceQuitCts.Token); + await _stream.FlushAsync(_forceQuitCts.Token); + // No exception, it was sent + entry = null; + } + catch (SocketException ex) + { + if (_forceQuitCts.Token.IsCancellationRequested) + break; + LoggingFailureHandler(ex); + } } - - _disposed.SetResult(true); } - }, CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default); - threadReady.Task.Wait(TimeSpan.FromSeconds(5)); + } } - + public void Dispose() { // The following operations are idempotent. Issue a cancellation to tell the // writer thread to stop the queue from accepting entries and write what it has // before cleaning up, then wait until that cleanup is finished. - _tokenSource.Cancel(); - Task.Run(async () => await _disposed.Task).Wait(); + _disposingCts.Cancel(); + Task.WhenAny(new[] + { + Task.Run(async () => await _disposed.Task), + Task.Delay(_disposeTimeout), + }).Wait(); + + _forceQuitCts.Cancel(); } /// @@ -231,19 +283,23 @@ public void Enqueue(string entry) /// public class ExponentialBackoffTcpReconnectionPolicy { - private readonly int ceiling = 10 * 60; // 10 minutes in seconds + private const int Ceiling = 10 * 60; // 10 minutes in seconds - public async Task ConnectAsync(Func> connect, Uri host, CancellationToken cancellationToken) + public static async Task ConnectAsync(Func> connect, Uri host, + CancellationToken cancellationToken) { int delay = 1; // in seconds while (!cancellationToken.IsCancellationRequested) { try { - Log.Debug("Attempting to connect to TCP endpoint {host} after delay of {delay} seconds", host, delay); + Log.Debug("Attempting to connect to TCP endpoint {host} after delay of {delay} seconds", host, + delay); return await connect(host); } - catch (SocketException) { } + catch (SocketException) + { + } // If this is cancelled via the cancellationToken instead of // completing its delay, the next while-loop test will fail, @@ -251,7 +307,7 @@ public async Task ConnectAsync(Func> connect, Uri host // with no additional connection attempts. await Task.Delay(delay * 1000, cancellationToken); // The nth delay is min(10 minutes, 2^n - 1 seconds). - delay = Math.Min((delay + 1) * 2 - 1, ceiling); + delay = Math.Min((delay + 1) * 2 - 1, Ceiling); } // cancellationToken has been cancelled. @@ -267,7 +323,6 @@ public async Task ConnectAsync(Func> connect, Uri host internal class FixedSizeQueue { private int Size { get; } - private readonly IProgress _progress = new Progress(); private bool IsCompleted { get; set; } private readonly BlockingCollection _collection = new BlockingCollection(); @@ -293,7 +348,6 @@ public void Enqueue(T obj) { _collection.Take(); } - _progress.Report(true); } } diff --git a/Serilog.Sinks.Network/Sinks/UDP/UDPSink.cs b/Serilog.Sinks.Network/Sinks/UDP/UDPSink.cs index 219e853..2dba439 100644 --- a/Serilog.Sinks.Network/Sinks/UDP/UDPSink.cs +++ b/Serilog.Sinks.Network/Sinks/UDP/UDPSink.cs @@ -35,7 +35,6 @@ public void Emit(LogEvent logEvent) public void Dispose() { _socket?.Dispose(); - _socket = null; } } }