2121using System . Net . Sockets ;
2222using System . Diagnostics ;
2323using System . Reflection ;
24- using NLog ;
2524using MsgPack ;
2625using MsgPack . Serialization ;
27- using NLog . Common ;
28-
26+
2927namespace NLog . Targets
3028{
31- internal class OrdinaryDictionarySerializer : MessagePackSerializer < IDictionary < string , object > >
32- {
33- internal OrdinaryDictionarySerializer ( SerializationContext ownerContext ) : base ( ownerContext )
34- {
35- }
36-
29+ internal class OrdinaryDictionarySerializer : MessagePackSerializer < IDictionary < string , object > >
30+ {
31+ private readonly SerializationContext embeddedContext ;
32+
33+ internal OrdinaryDictionarySerializer ( SerializationContext ownerContext , SerializationContext embeddedContext ) : base ( ownerContext )
34+ {
35+ this . embeddedContext = embeddedContext ?? ownerContext ;
36+ }
37+
3738 protected override void PackToCore ( Packer packer , IDictionary < string , object > objectTree )
3839 {
3940 packer . PackMapHeader ( objectTree ) ;
4041 foreach ( KeyValuePair < string , object > pair in objectTree )
4142 {
4243 packer . PackString ( pair . Key ) ;
43- var serializationContext = new SerializationContext ( packer . CompatibilityOptions ) ;
44- serializationContext . Serializers . Register ( this ) ;
45- packer . Pack ( pair . Value , serializationContext ) ;
44+ if ( pair . Value == null )
45+ {
46+ packer . PackNull ( ) ;
47+ }
48+ else
49+ {
50+ packer . Pack ( pair . Value , this . embeddedContext ) ;
51+ }
4652 }
4753 }
4854
@@ -60,7 +66,11 @@ protected void UnpackTo(Unpacker unpacker, IDictionary<string, object> dict, lon
6066 {
6167 throw new InvalidMessagePackStreamException ( "unexpected EOF" ) ;
6268 }
63- if ( unpacker . IsMapHeader )
69+ if ( unpacker . LastReadData . IsNil )
70+ {
71+ dict . Add ( key , null ) ;
72+ }
73+ else if ( unpacker . IsMapHeader )
6474 {
6575 long innerMapLength = value . AsInt64 ( ) ;
6676 var innerDict = new Dictionary < string , object > ( ) ;
@@ -123,40 +133,50 @@ public void UnpackTo(Unpacker unpacker, IDictionary<string, object> collection)
123133
124134 protected override IDictionary < string , object > UnpackFromCore ( Unpacker unpacker )
125135 {
136+ if ( ! unpacker . IsMapHeader )
137+ {
138+ throw new InvalidMessagePackStreamException ( "map header expected" ) ;
139+ }
140+
126141 var retval = new Dictionary < string , object > ( ) ;
127142 UnpackTo ( unpacker , retval ) ;
128143 return retval ;
129144 }
130145
131146 public void UnpackTo ( Unpacker unpacker , object collection )
132147 {
133- var _collection = collection as IDictionary < string , object > ;
134- if ( _collection == null )
148+ var dictionary = collection as IDictionary < string , object > ;
149+ if ( dictionary == null )
135150 throw new NotSupportedException ( ) ;
136- UnpackTo ( unpacker , _collection ) ;
151+ UnpackTo ( unpacker , dictionary ) ;
137152 }
138153 }
139154
140155 internal class FluentdEmitter
141156 {
142157 private static DateTime unixEpoch = new DateTime ( 1970 , 1 , 1 , 0 , 0 , 0 , DateTimeKind . Utc ) ;
143- private Packer packer ;
144- private SerializationContext serializationContext ;
158+ private readonly Packer packer ;
159+ private readonly SerializationContext serializationContext ;
160+ private readonly Stream destination ;
145161
146162 public void Emit ( DateTime timestamp , string tag , IDictionary < string , object > data )
147163 {
148164 long unixTimestamp = timestamp . ToUniversalTime ( ) . Subtract ( unixEpoch ) . Ticks / 10000000 ;
149- packer . PackArrayHeader ( 3 ) ;
150- packer . PackString ( tag , Encoding . UTF8 ) ;
151- packer . Pack ( ( ulong ) unixTimestamp ) ;
152- packer . Pack ( data , serializationContext ) ;
165+ this . packer . PackArrayHeader ( 3 ) ;
166+ this . packer . PackString ( tag , Encoding . UTF8 ) ;
167+ this . packer . Pack ( ( ulong ) unixTimestamp ) ;
168+ this . packer . Pack ( data , serializationContext ) ;
169+ this . destination . Flush ( ) ; // Change to packer.Flush() when packer is upgraded
153170 }
154171
155172 public FluentdEmitter ( Stream stream )
156173 {
174+ this . destination = stream ;
175+ this . packer = Packer . Create ( destination ) ;
176+ var embeddedContext = new SerializationContext ( this . packer . CompatibilityOptions ) ;
177+ embeddedContext . Serializers . Register ( new OrdinaryDictionarySerializer ( embeddedContext , null ) ) ;
157178 this . serializationContext = new SerializationContext ( PackerCompatibilityOptions . PackBinaryAsRaw ) ;
158- this . serializationContext . Serializers . Register ( new OrdinaryDictionarySerializer ( this . serializationContext ) ) ;
159- this . packer = Packer . Create ( stream ) ;
179+ this . serializationContext . Serializers . Register ( new OrdinaryDictionarySerializer ( this . serializationContext , embeddedContext ) ) ;
160180 }
161181 }
162182
@@ -185,6 +205,8 @@ public class Fluentd : NLog.Targets.TargetWithLayout
185205
186206 public bool EmitStackTraceWhenAvailable { get ; set ; }
187207
208+ public bool IncludeAllProperties { get ; set ; }
209+
188210 private TcpClient client ;
189211
190212 private Stream stream ;
@@ -196,56 +218,55 @@ protected override void InitializeTarget()
196218 base . InitializeTarget ( ) ;
197219 }
198220
199- private void InitializeClient ( )
200- {
201- client = new TcpClient ( ) ;
202- client . NoDelay = this . NoDelay ;
203- client . ReceiveBufferSize = this . ReceiveBufferSize ;
204- client . SendBufferSize = this . SendBufferSize ;
205- client . SendTimeout = this . SendTimeout ;
206- client . ReceiveTimeout = this . ReceiveTimeout ;
207- client . LingerState = new LingerOption ( this . LingerEnabled , this . LingerTime ) ;
221+ private void InitializeClient ( )
222+ {
223+ this . client = new TcpClient ( ) ;
224+ this . client . NoDelay = this . NoDelay ;
225+ this . client . ReceiveBufferSize = this . ReceiveBufferSize ;
226+ this . client . SendBufferSize = this . SendBufferSize ;
227+ this . client . SendTimeout = this . SendTimeout ;
228+ this . client . ReceiveTimeout = this . ReceiveTimeout ;
229+ this . client . LingerState = new LingerOption ( this . LingerEnabled , this . LingerTime ) ;
208230 }
209231
210232 protected void EnsureConnected ( )
211233 {
212- try
234+ if ( this . client == null )
213235 {
214- if ( client == null )
215- {
216- InitializeClient ( ) ;
217- ConnectClient ( ) ;
218- }
219- else if ( ! client . Connected )
220- {
221- Cleanup ( ) ;
222- InitializeClient ( ) ;
223- ConnectClient ( ) ;
224- }
236+ InitializeClient ( ) ;
237+ ConnectClient ( ) ;
225238 }
226- catch ( Exception e )
239+ else if ( ! this . client . Connected )
227240 {
241+ Cleanup ( ) ;
242+ InitializeClient ( ) ;
243+ ConnectClient ( ) ;
228244 }
229245 }
230246
231- private void ConnectClient ( )
232- {
233- client . Connect ( this . Host , this . Port ) ;
234- this . stream = this . client . GetStream ( ) ;
235- this . emitter = new FluentdEmitter ( this . stream ) ;
247+ private void ConnectClient ( )
248+ {
249+ this . client . Connect ( this . Host , this . Port ) ;
250+ this . stream = this . client . GetStream ( ) ;
251+ this . emitter = new FluentdEmitter ( this . stream ) ;
236252 }
237253
238254 protected void Cleanup ( )
239255 {
240- if ( this . stream != null )
256+ try
241257 {
242- this . stream . Dispose ( ) ;
243- this . stream = null ;
258+ this . stream ? . Dispose ( ) ;
259+ this . client ? . Close ( ) ;
260+ }
261+ catch ( Exception ex )
262+ {
263+ NLog . Common . InternalLogger . Warn ( "Fluentd Close - " + ex . ToString ( ) ) ;
244264 }
245- if ( this . client != null )
265+ finally
246266 {
247- this . client . Close ( ) ;
267+ this . stream = null ;
248268 this . client = null ;
269+ this . emitter = null ;
249270 }
250271 }
251272
@@ -269,7 +290,7 @@ protected override void Write(LogEventInfo logEvent)
269290 { "logger_name" , logEvent . LoggerName } ,
270291 { "sequence_id" , logEvent . SequenceID } ,
271292 } ;
272- if ( EmitStackTraceWhenAvailable && logEvent . HasStackTrace )
293+ if ( this . EmitStackTraceWhenAvailable && logEvent . HasStackTrace )
273294 {
274295 var transcodedFrames = new List < Dictionary < string , object > > ( ) ;
275296 StackTrace stackTrace = logEvent . StackTrace ;
@@ -288,31 +309,63 @@ protected override void Write(LogEventInfo logEvent)
288309 }
289310 record . Add ( "stacktrace" , transcodedFrames ) ;
290311 }
291- EnsureConnected ( ) ;
292- if ( this . emitter != null )
312+ if ( this . IncludeAllProperties && logEvent . Properties . Count > 0 )
293313 {
294- try
295- {
296- this . emitter . Emit ( logEvent . TimeStamp , Tag , record ) ;
297- }
298- catch ( Exception e )
314+ foreach ( var property in logEvent . Properties )
299315 {
316+ var propertyKey = property . Key . ToString ( ) ;
317+ if ( string . IsNullOrEmpty ( propertyKey ) )
318+ continue ;
319+
320+ record [ propertyKey ] = SerializePropertyValue ( propertyKey , property . Value ) ;
300321 }
301322 }
323+
324+ try
325+ {
326+ EnsureConnected ( ) ;
327+ }
328+ catch ( Exception ex )
329+ {
330+ NLog . Common . InternalLogger . Warn ( "Fluentd Connect - " + ex . ToString ( ) ) ;
331+ throw ; // Notify NLog of failure
332+ }
333+
334+ try
335+ {
336+ this . emitter ? . Emit ( logEvent . TimeStamp , this . Tag , record ) ;
337+ }
338+ catch ( Exception ex )
339+ {
340+ NLog . Common . InternalLogger . Warn ( "Fluentd Emit - " + ex . ToString ( ) ) ;
341+ throw ; // Notify NLog of failure
342+ }
343+ }
344+
345+ private static object SerializePropertyValue ( string propertyKey , object propertyValue )
346+ {
347+ if ( propertyValue == null || Convert . GetTypeCode ( propertyValue ) != TypeCode . Object || propertyValue is decimal )
348+ {
349+ return propertyValue ; // immutable
350+ }
351+ else
352+ {
353+ return propertyValue . ToString ( ) ;
354+ }
302355 }
303356
304357 public Fluentd ( )
305358 {
306- Host = "127.0.0.1" ;
307- Port = 24224 ;
308- ReceiveBufferSize = 8192 ;
309- SendBufferSize = 8192 ;
310- ReceiveTimeout = 1000 ;
311- SendTimeout = 1000 ;
312- LingerEnabled = true ;
313- LingerTime = 1000 ;
314- EmitStackTraceWhenAvailable = false ;
315- Tag = Assembly . GetCallingAssembly ( ) . GetName ( ) . Name ;
359+ this . Host = "127.0.0.1" ;
360+ this . Port = 24224 ;
361+ this . ReceiveBufferSize = 8192 ;
362+ this . SendBufferSize = 8192 ;
363+ this . ReceiveTimeout = 1000 ;
364+ this . SendTimeout = 1000 ;
365+ this . LingerEnabled = true ;
366+ this . LingerTime = 1000 ;
367+ this . EmitStackTraceWhenAvailable = false ;
368+ this . Tag = Assembly . GetCallingAssembly ( ) . GetName ( ) . Name ;
316369 }
317370 }
318371}
0 commit comments