Skip to content

Commit 69eddfa

Browse files
committed
Send fractions of a second to Fluentd
Previously the timestamp did have a a precision of one second. The fraction got lost. The timestamp is now of the EventTime time according to specification: https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#eventtime-ext-format EventTime has nanosecond precision, but .NET only supports 100 nanoseconds (ticks). Fixes #12 Signed-off-by: Edwin Engelen <[email protected]>
1 parent d0e932a commit 69eddfa

File tree

2 files changed

+54
-7
lines changed

2 files changed

+54
-7
lines changed

src/NLog.Targets.Fluentd/Fluentd.cs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,11 @@
1616

1717
using System;
1818
using System.Collections.Generic;
19+
using System.Diagnostics;
1920
using System.IO;
20-
using System.Text;
2121
using System.Net.Sockets;
22-
using System.Diagnostics;
2322
using System.Reflection;
23+
using System.Text;
2424
using MsgPack;
2525
using MsgPack.Serialization;
2626

@@ -154,26 +154,26 @@ public void UnpackTo(Unpacker unpacker, object collection)
154154

155155
internal class FluentdEmitter
156156
{
157-
private static DateTime unixEpoch = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc);
158157
private readonly Packer packer;
159158
private readonly SerializationContext serializationContext;
160159
private readonly Stream destination;
161160

162161
public void Emit(DateTime timestamp, string tag, IDictionary<string, object> data)
163162
{
164-
long unixTimestamp = timestamp.ToUniversalTime().Subtract(unixEpoch).Ticks / 10000000;
165163
this.packer.PackArrayHeader(3);
166164
this.packer.PackString(tag, Encoding.UTF8);
167-
this.packer.Pack((ulong)unixTimestamp);
165+
this.packer.PackEventTime(timestamp);
168166
this.packer.Pack(data, serializationContext);
169167
this.destination.Flush(); // Change to packer.Flush() when packer is upgraded
170168
}
171169

172170
public FluentdEmitter(Stream stream)
173171
{
174172
this.destination = stream;
175-
this.packer = Packer.Create(destination);
176-
var embeddedContext = new SerializationContext(this.packer.CompatibilityOptions);
173+
174+
// PackerCompatibilityOptions.ProhibitExtendedTypeObjects must be turned off in order to use the PackExtendedTypeValue method
175+
this.packer = Packer.Create(destination, Packer.DefaultCompatibilityOptions & ~PackerCompatibilityOptions.ProhibitExtendedTypeObjects);
176+
var embeddedContext = new SerializationContext(this.packer.CompatibilityOptions);
177177
embeddedContext.Serializers.Register(new OrdinaryDictionarySerializer(embeddedContext, null));
178178
this.serializationContext = new SerializationContext(PackerCompatibilityOptions.PackBinaryAsRaw);
179179
this.serializationContext.Serializers.Register(new OrdinaryDictionarySerializer(this.serializationContext, embeddedContext));
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
using System;
2+
3+
namespace NLog.Targets
4+
{
5+
using MsgPack;
6+
7+
internal static class PackerExtensions
8+
{
9+
private const int nanoSecondsPerSecond = 1 * 1000 * 1000 * 1000;
10+
private const double ticksToNanoSecondsFactor = (double)nanoSecondsPerSecond / TimeSpan.TicksPerSecond;
11+
private static readonly long unixEpochTicks = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc).Ticks;
12+
13+
/// <summary>
14+
/// Write according to Fluend EventTime specification.
15+
/// </summary>
16+
/// <remarks>
17+
/// Specification: https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#eventtime-ext-format"
18+
/// </remarks>
19+
public static Packer PackEventTime(this Packer that, DateTime value)
20+
{
21+
DateTimeToEpoch(value, out var secondsFromEpoch, out var nanoSeconds);
22+
23+
that.PackExtendedTypeValue(
24+
0x0,
25+
new[]
26+
{
27+
(byte) ((ulong) (secondsFromEpoch >> 24) & (ulong) byte.MaxValue),
28+
(byte) ((ulong) (secondsFromEpoch >> 16) & (ulong) byte.MaxValue),
29+
(byte) ((ulong) (secondsFromEpoch >> 8) & (ulong) byte.MaxValue),
30+
(byte) ((ulong) secondsFromEpoch & (ulong) byte.MaxValue),
31+
(byte) ((ulong) (nanoSeconds >> 24) & (ulong) byte.MaxValue),
32+
(byte) ((ulong) (nanoSeconds >> 16) & (ulong) byte.MaxValue),
33+
(byte) ((ulong) (nanoSeconds >> 8) & (ulong) byte.MaxValue),
34+
(byte) ((ulong) nanoSeconds & (ulong) byte.MaxValue),
35+
});
36+
37+
return that;
38+
}
39+
40+
private static void DateTimeToEpoch(DateTime value, out uint secondsFromEpoch, out uint nanoSeconds)
41+
{
42+
var fromEpochTicks = value.ToUniversalTime().Ticks - unixEpochTicks;
43+
secondsFromEpoch = (uint)(fromEpochTicks / TimeSpan.TicksPerSecond);
44+
nanoSeconds = (uint)((fromEpochTicks - secondsFromEpoch * TimeSpan.TicksPerSecond) * ticksToNanoSecondsFactor);
45+
}
46+
}
47+
}

0 commit comments

Comments
 (0)