diff --git a/src/NLog.Targets.Fluentd/Fluentd.cs b/src/NLog.Targets.Fluentd/Fluentd.cs index bdb56fc..e4cd12c 100644 --- a/src/NLog.Targets.Fluentd/Fluentd.cs +++ b/src/NLog.Targets.Fluentd/Fluentd.cs @@ -16,11 +16,11 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.IO; -using System.Text; using System.Net.Sockets; -using System.Diagnostics; using System.Reflection; +using System.Text; using MsgPack; using MsgPack.Serialization; @@ -154,17 +154,15 @@ public void UnpackTo(Unpacker unpacker, object collection) internal class FluentdEmitter { - private static DateTime unixEpoch = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc); private readonly Packer packer; private readonly SerializationContext serializationContext; private readonly Stream destination; public void Emit(DateTime timestamp, string tag, IDictionary data) { - long unixTimestamp = timestamp.ToUniversalTime().Subtract(unixEpoch).Ticks / 10000000; this.packer.PackArrayHeader(3); this.packer.PackString(tag, Encoding.UTF8); - this.packer.Pack((ulong)unixTimestamp); + this.packer.PackEventTime(timestamp); this.packer.Pack(data, serializationContext); this.destination.Flush(); // Change to packer.Flush() when packer is upgraded } @@ -172,8 +170,10 @@ public void Emit(DateTime timestamp, string tag, IDictionary dat public FluentdEmitter(Stream stream) { this.destination = stream; - this.packer = Packer.Create(destination); - var embeddedContext = new SerializationContext(this.packer.CompatibilityOptions); + + // PackerCompatibilityOptions.ProhibitExtendedTypeObjects must be turned off in order to use the PackExtendedTypeValue method + this.packer = Packer.Create(destination, Packer.DefaultCompatibilityOptions & ~PackerCompatibilityOptions.ProhibitExtendedTypeObjects); + var embeddedContext = new SerializationContext(this.packer.CompatibilityOptions); embeddedContext.Serializers.Register(new OrdinaryDictionarySerializer(embeddedContext, null)); this.serializationContext = new SerializationContext(PackerCompatibilityOptions.PackBinaryAsRaw); this.serializationContext.Serializers.Register(new OrdinaryDictionarySerializer(this.serializationContext, embeddedContext)); diff --git a/src/NLog.Targets.Fluentd/PackerExtensions.cs b/src/NLog.Targets.Fluentd/PackerExtensions.cs new file mode 100644 index 0000000..0b774f8 --- /dev/null +++ b/src/NLog.Targets.Fluentd/PackerExtensions.cs @@ -0,0 +1,47 @@ +using System; + +namespace NLog.Targets +{ + using MsgPack; + + internal static class PackerExtensions + { + private const int nanoSecondsPerSecond = 1 * 1000 * 1000 * 1000; + private const double ticksToNanoSecondsFactor = (double)nanoSecondsPerSecond / TimeSpan.TicksPerSecond; + private static readonly long unixEpochTicks = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc).Ticks; + + /// + /// Write according to Fluend EventTime specification. + /// + /// + /// Specification: https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#eventtime-ext-format" + /// + public static Packer PackEventTime(this Packer that, DateTime value) + { + DateTimeToEpoch(value, out var secondsFromEpoch, out var nanoSeconds); + + that.PackExtendedTypeValue( + 0x0, + new[] + { + (byte) ((ulong) (secondsFromEpoch >> 24) & (ulong) byte.MaxValue), + (byte) ((ulong) (secondsFromEpoch >> 16) & (ulong) byte.MaxValue), + (byte) ((ulong) (secondsFromEpoch >> 8) & (ulong) byte.MaxValue), + (byte) ((ulong) secondsFromEpoch & (ulong) byte.MaxValue), + (byte) ((ulong) (nanoSeconds >> 24) & (ulong) byte.MaxValue), + (byte) ((ulong) (nanoSeconds >> 16) & (ulong) byte.MaxValue), + (byte) ((ulong) (nanoSeconds >> 8) & (ulong) byte.MaxValue), + (byte) ((ulong) nanoSeconds & (ulong) byte.MaxValue), + }); + + return that; + } + + private static void DateTimeToEpoch(DateTime value, out uint secondsFromEpoch, out uint nanoSeconds) + { + var fromEpochTicks = value.ToUniversalTime().Ticks - unixEpochTicks; + secondsFromEpoch = (uint)(fromEpochTicks / TimeSpan.TicksPerSecond); + nanoSeconds = (uint)((fromEpochTicks - secondsFromEpoch * TimeSpan.TicksPerSecond) * ticksToNanoSecondsFactor); + } + } +}