Skip to content

Commit

Permalink
Merge pull request fluent#8 from snakefoot/Vs2017
Browse files Browse the repository at this point in the history
Upgraded to VS2017 and added NetStandard 2.0
  • Loading branch information
moriyoshi authored Jan 24, 2018
2 parents d8007c8 + 497a3be commit d0e932a
Show file tree
Hide file tree
Showing 6 changed files with 160 additions and 165 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ SendTimeout | Send timeout
LingerEnabled | Wait for all the data to be sent when closing the connection | false
LingerTime | Linger timeout | 2
EmitStackTraceWhenAvailable | Emit a stacktrace for every log entry when available | false
IncludeAllProperties | Include structured logging parameters for every log entry | false


License
Expand Down
201 changes: 127 additions & 74 deletions src/NLog.Targets.Fluentd/Fluentd.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,28 +21,34 @@
using System.Net.Sockets;
using System.Diagnostics;
using System.Reflection;
using NLog;
using MsgPack;
using MsgPack.Serialization;
using NLog.Common;


namespace NLog.Targets
{
internal class OrdinaryDictionarySerializer: MessagePackSerializer<IDictionary<string, object>>
{
internal OrdinaryDictionarySerializer(SerializationContext ownerContext) : base(ownerContext)
{
}

internal class OrdinaryDictionarySerializer : MessagePackSerializer<IDictionary<string, object>>
{
private readonly SerializationContext embeddedContext;

internal OrdinaryDictionarySerializer(SerializationContext ownerContext, SerializationContext embeddedContext) : base(ownerContext)
{
this.embeddedContext = embeddedContext ?? ownerContext;
}

protected override void PackToCore(Packer packer, IDictionary<string, object> objectTree)
{
packer.PackMapHeader(objectTree);
foreach (KeyValuePair<string, object> pair in objectTree)
{
packer.PackString(pair.Key);
var serializationContext = new SerializationContext(packer.CompatibilityOptions);
serializationContext.Serializers.Register(this);
packer.Pack(pair.Value, serializationContext);
if (pair.Value == null)
{
packer.PackNull();
}
else
{
packer.Pack(pair.Value, this.embeddedContext);
}
}
}

Expand All @@ -60,7 +66,11 @@ protected void UnpackTo(Unpacker unpacker, IDictionary<string, object> dict, lon
{
throw new InvalidMessagePackStreamException("unexpected EOF");
}
if (unpacker.IsMapHeader)
if (unpacker.LastReadData.IsNil)
{
dict.Add(key, null);
}
else if (unpacker.IsMapHeader)
{
long innerMapLength = value.AsInt64();
var innerDict = new Dictionary<string, object>();
Expand Down Expand Up @@ -123,40 +133,50 @@ public void UnpackTo(Unpacker unpacker, IDictionary<string, object> collection)

protected override IDictionary<string, object> UnpackFromCore(Unpacker unpacker)
{
if (!unpacker.IsMapHeader)
{
throw new InvalidMessagePackStreamException("map header expected");
}

var retval = new Dictionary<string, object>();
UnpackTo(unpacker, retval);
return retval;
}

public void UnpackTo(Unpacker unpacker, object collection)
{
var _collection = collection as IDictionary<string, object>;
if (_collection == null)
var dictionary = collection as IDictionary<string, object>;
if (dictionary == null)
throw new NotSupportedException();
UnpackTo(unpacker, _collection);
UnpackTo(unpacker, dictionary);
}
}

internal class FluentdEmitter
{
private static DateTime unixEpoch = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc);
private Packer packer;
private SerializationContext serializationContext;
private readonly Packer packer;
private readonly SerializationContext serializationContext;
private readonly Stream destination;

public void Emit(DateTime timestamp, string tag, IDictionary<string, object> data)
{
long unixTimestamp = timestamp.ToUniversalTime().Subtract(unixEpoch).Ticks / 10000000;
packer.PackArrayHeader(3);
packer.PackString(tag, Encoding.UTF8);
packer.Pack((ulong)unixTimestamp);
packer.Pack(data, serializationContext);
this.packer.PackArrayHeader(3);
this.packer.PackString(tag, Encoding.UTF8);
this.packer.Pack((ulong)unixTimestamp);
this.packer.Pack(data, serializationContext);
this.destination.Flush(); // Change to packer.Flush() when packer is upgraded
}

public FluentdEmitter(Stream stream)
{
this.destination = stream;
this.packer = Packer.Create(destination);
var embeddedContext = new SerializationContext(this.packer.CompatibilityOptions);
embeddedContext.Serializers.Register(new OrdinaryDictionarySerializer(embeddedContext, null));
this.serializationContext = new SerializationContext(PackerCompatibilityOptions.PackBinaryAsRaw);
this.serializationContext.Serializers.Register(new OrdinaryDictionarySerializer(this.serializationContext));
this.packer = Packer.Create(stream);
this.serializationContext.Serializers.Register(new OrdinaryDictionarySerializer(this.serializationContext, embeddedContext));
}
}

Expand Down Expand Up @@ -185,6 +205,8 @@ public class Fluentd : NLog.Targets.TargetWithLayout

public bool EmitStackTraceWhenAvailable { get; set; }

public bool IncludeAllProperties { get; set; }

private TcpClient client;

private Stream stream;
Expand All @@ -196,56 +218,55 @@ protected override void InitializeTarget()
base.InitializeTarget();
}

private void InitializeClient()
{
client = new TcpClient();
client.NoDelay = this.NoDelay;
client.ReceiveBufferSize = this.ReceiveBufferSize;
client.SendBufferSize = this.SendBufferSize;
client.SendTimeout = this.SendTimeout;
client.ReceiveTimeout = this.ReceiveTimeout;
client.LingerState = new LingerOption(this.LingerEnabled, this.LingerTime);
private void InitializeClient()
{
this.client = new TcpClient();
this.client.NoDelay = this.NoDelay;
this.client.ReceiveBufferSize = this.ReceiveBufferSize;
this.client.SendBufferSize = this.SendBufferSize;
this.client.SendTimeout = this.SendTimeout;
this.client.ReceiveTimeout = this.ReceiveTimeout;
this.client.LingerState = new LingerOption(this.LingerEnabled, this.LingerTime);
}

protected void EnsureConnected()
{
try
if (this.client == null)
{
if(client == null)
{
InitializeClient();
ConnectClient();
}
else if (!client.Connected)
{
Cleanup();
InitializeClient();
ConnectClient();
}
InitializeClient();
ConnectClient();
}
catch (Exception e)
else if (!this.client.Connected)
{
Cleanup();
InitializeClient();
ConnectClient();
}
}

private void ConnectClient()
{
client.Connect(this.Host, this.Port);
this.stream = this.client.GetStream();
this.emitter = new FluentdEmitter(this.stream);
private void ConnectClient()
{
this.client.Connect(this.Host, this.Port);
this.stream = this.client.GetStream();
this.emitter = new FluentdEmitter(this.stream);
}

protected void Cleanup()
{
if (this.stream != null)
try
{
this.stream.Dispose();
this.stream = null;
this.stream?.Dispose();
this.client?.Close();
}
catch (Exception ex)
{
NLog.Common.InternalLogger.Warn("Fluentd Close - " + ex.ToString());
}
if (this.client != null)
finally
{
this.client.Close();
this.stream = null;
this.client = null;
this.emitter = null;
}
}

Expand All @@ -269,7 +290,7 @@ protected override void Write(LogEventInfo logEvent)
{ "logger_name", logEvent.LoggerName },
{ "sequence_id", logEvent.SequenceID },
};
if (EmitStackTraceWhenAvailable && logEvent.HasStackTrace)
if (this.EmitStackTraceWhenAvailable && logEvent.HasStackTrace)
{
var transcodedFrames = new List<Dictionary<string, object>>();
StackTrace stackTrace = logEvent.StackTrace;
Expand All @@ -288,31 +309,63 @@ protected override void Write(LogEventInfo logEvent)
}
record.Add("stacktrace", transcodedFrames);
}
EnsureConnected();
if (this.emitter != null)
if (this.IncludeAllProperties && logEvent.Properties.Count > 0)
{
try
{
this.emitter.Emit(logEvent.TimeStamp, Tag, record);
}
catch (Exception e)
foreach (var property in logEvent.Properties)
{
var propertyKey = property.Key.ToString();
if (string.IsNullOrEmpty(propertyKey))
continue;

record[propertyKey] = SerializePropertyValue(propertyKey, property.Value);
}
}

try
{
EnsureConnected();
}
catch (Exception ex)
{
NLog.Common.InternalLogger.Warn("Fluentd Connect - " + ex.ToString());
throw; // Notify NLog of failure
}

try
{
this.emitter?.Emit(logEvent.TimeStamp, this.Tag, record);
}
catch (Exception ex)
{
NLog.Common.InternalLogger.Warn("Fluentd Emit - " + ex.ToString());
throw; // Notify NLog of failure
}
}

private static object SerializePropertyValue(string propertyKey, object propertyValue)
{
if (propertyValue == null || Convert.GetTypeCode(propertyValue) != TypeCode.Object || propertyValue is decimal)
{
return propertyValue; // immutable
}
else
{
return propertyValue.ToString();
}
}

public Fluentd()
{
Host = "127.0.0.1";
Port = 24224;
ReceiveBufferSize = 8192;
SendBufferSize = 8192;
ReceiveTimeout = 1000;
SendTimeout = 1000;
LingerEnabled = true;
LingerTime = 1000;
EmitStackTraceWhenAvailable = false;
Tag = Assembly.GetCallingAssembly().GetName().Name;
this.Host = "127.0.0.1";
this.Port = 24224;
this.ReceiveBufferSize = 8192;
this.SendBufferSize = 8192;
this.ReceiveTimeout = 1000;
this.SendTimeout = 1000;
this.LingerEnabled = true;
this.LingerTime = 1000;
this.EmitStackTraceWhenAvailable = false;
this.Tag = Assembly.GetCallingAssembly().GetName().Name;
}
}
}
Loading

0 comments on commit d0e932a

Please sign in to comment.