Skip to content

Commit

Permalink
Send fractions of a second to Fluentd
Browse files Browse the repository at this point in the history
Previously the timestamp did have a a precision of one second. The fraction got lost.

The timestamp is now of the EventTime time according to specification:
https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#eventtime-ext-format

EventTime has nanosecond precision, but .NET only supports 100 nanoseconds (ticks).

Fixes fluent#12

Signed-off-by: Edwin Engelen <edwin@engelen.name>
  • Loading branch information
EdwinEngelen committed Mar 26, 2020
1 parent d0e932a commit 69eddfa
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 7 deletions.
14 changes: 7 additions & 7 deletions src/NLog.Targets.Fluentd/Fluentd.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Text;
using System.Net.Sockets;
using System.Diagnostics;
using System.Reflection;
using System.Text;
using MsgPack;
using MsgPack.Serialization;

Expand Down Expand Up @@ -154,26 +154,26 @@ public void UnpackTo(Unpacker unpacker, object collection)

internal class FluentdEmitter
{
private static DateTime unixEpoch = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc);
private readonly Packer packer;
private readonly SerializationContext serializationContext;
private readonly Stream destination;

public void Emit(DateTime timestamp, string tag, IDictionary<string, object> data)
{
long unixTimestamp = timestamp.ToUniversalTime().Subtract(unixEpoch).Ticks / 10000000;
this.packer.PackArrayHeader(3);
this.packer.PackString(tag, Encoding.UTF8);
this.packer.Pack((ulong)unixTimestamp);
this.packer.PackEventTime(timestamp);
this.packer.Pack(data, serializationContext);
this.destination.Flush(); // Change to packer.Flush() when packer is upgraded
}

public FluentdEmitter(Stream stream)
{
this.destination = stream;
this.packer = Packer.Create(destination);
var embeddedContext = new SerializationContext(this.packer.CompatibilityOptions);

// PackerCompatibilityOptions.ProhibitExtendedTypeObjects must be turned off in order to use the PackExtendedTypeValue method
this.packer = Packer.Create(destination, Packer.DefaultCompatibilityOptions & ~PackerCompatibilityOptions.ProhibitExtendedTypeObjects);
var embeddedContext = new SerializationContext(this.packer.CompatibilityOptions);
embeddedContext.Serializers.Register(new OrdinaryDictionarySerializer(embeddedContext, null));
this.serializationContext = new SerializationContext(PackerCompatibilityOptions.PackBinaryAsRaw);
this.serializationContext.Serializers.Register(new OrdinaryDictionarySerializer(this.serializationContext, embeddedContext));
Expand Down
47 changes: 47 additions & 0 deletions src/NLog.Targets.Fluentd/PackerExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
using System;

namespace NLog.Targets
{
using MsgPack;

internal static class PackerExtensions
{
private const int nanoSecondsPerSecond = 1 * 1000 * 1000 * 1000;
private const double ticksToNanoSecondsFactor = (double)nanoSecondsPerSecond / TimeSpan.TicksPerSecond;
private static readonly long unixEpochTicks = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc).Ticks;

/// <summary>
/// Write according to Fluend EventTime specification.
/// </summary>
/// <remarks>
/// Specification: https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#eventtime-ext-format"
/// </remarks>
public static Packer PackEventTime(this Packer that, DateTime value)
{
DateTimeToEpoch(value, out var secondsFromEpoch, out var nanoSeconds);

that.PackExtendedTypeValue(
0x0,
new[]
{
(byte) ((ulong) (secondsFromEpoch >> 24) & (ulong) byte.MaxValue),
(byte) ((ulong) (secondsFromEpoch >> 16) & (ulong) byte.MaxValue),
(byte) ((ulong) (secondsFromEpoch >> 8) & (ulong) byte.MaxValue),
(byte) ((ulong) secondsFromEpoch & (ulong) byte.MaxValue),
(byte) ((ulong) (nanoSeconds >> 24) & (ulong) byte.MaxValue),
(byte) ((ulong) (nanoSeconds >> 16) & (ulong) byte.MaxValue),
(byte) ((ulong) (nanoSeconds >> 8) & (ulong) byte.MaxValue),
(byte) ((ulong) nanoSeconds & (ulong) byte.MaxValue),
});

return that;
}

private static void DateTimeToEpoch(DateTime value, out uint secondsFromEpoch, out uint nanoSeconds)
{
var fromEpochTicks = value.ToUniversalTime().Ticks - unixEpochTicks;
secondsFromEpoch = (uint)(fromEpochTicks / TimeSpan.TicksPerSecond);
nanoSeconds = (uint)((fromEpochTicks - secondsFromEpoch * TimeSpan.TicksPerSecond) * ticksToNanoSecondsFactor);
}
}
}

0 comments on commit 69eddfa

Please sign in to comment.