Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send fractions of a second to Fluentd #13

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions src/NLog.Targets.Fluentd/Fluentd.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Text;
using System.Net.Sockets;
using System.Diagnostics;
using System.Reflection;
using System.Text;
using MsgPack;
using MsgPack.Serialization;

Expand Down Expand Up @@ -154,26 +154,26 @@ public void UnpackTo(Unpacker unpacker, object collection)

internal class FluentdEmitter
{
private static DateTime unixEpoch = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc);
private readonly Packer packer;
private readonly SerializationContext serializationContext;
private readonly Stream destination;

public void Emit(DateTime timestamp, string tag, IDictionary<string, object> data)
{
long unixTimestamp = timestamp.ToUniversalTime().Subtract(unixEpoch).Ticks / 10000000;
this.packer.PackArrayHeader(3);
this.packer.PackString(tag, Encoding.UTF8);
this.packer.Pack((ulong)unixTimestamp);
this.packer.PackEventTime(timestamp);
this.packer.Pack(data, serializationContext);
this.destination.Flush(); // Change to packer.Flush() when packer is upgraded
}

public FluentdEmitter(Stream stream)
{
this.destination = stream;
this.packer = Packer.Create(destination);
var embeddedContext = new SerializationContext(this.packer.CompatibilityOptions);

// PackerCompatibilityOptions.ProhibitExtendedTypeObjects must be turned off in order to use the PackExtendedTypeValue method
this.packer = Packer.Create(destination, Packer.DefaultCompatibilityOptions & ~PackerCompatibilityOptions.ProhibitExtendedTypeObjects);
var embeddedContext = new SerializationContext(this.packer.CompatibilityOptions);
embeddedContext.Serializers.Register(new OrdinaryDictionarySerializer(embeddedContext, null));
this.serializationContext = new SerializationContext(PackerCompatibilityOptions.PackBinaryAsRaw);
this.serializationContext.Serializers.Register(new OrdinaryDictionarySerializer(this.serializationContext, embeddedContext));
Expand Down
47 changes: 47 additions & 0 deletions src/NLog.Targets.Fluentd/PackerExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
using System;

namespace NLog.Targets
{
using MsgPack;

internal static class PackerExtensions
{
private const int nanoSecondsPerSecond = 1 * 1000 * 1000 * 1000;
private const double ticksToNanoSecondsFactor = (double)nanoSecondsPerSecond / TimeSpan.TicksPerSecond;
private static readonly long unixEpochTicks = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc).Ticks;

/// <summary>
/// Write according to Fluend EventTime specification.
/// </summary>
/// <remarks>
/// Specification: https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#eventtime-ext-format"
/// </remarks>
public static Packer PackEventTime(this Packer that, DateTime value)
{
DateTimeToEpoch(value, out var secondsFromEpoch, out var nanoSeconds);

that.PackExtendedTypeValue(
0x0,
new[]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be nice if it worked without additional byte[]-array allocation.

{
(byte) ((ulong) (secondsFromEpoch >> 24) & (ulong) byte.MaxValue),
(byte) ((ulong) (secondsFromEpoch >> 16) & (ulong) byte.MaxValue),
(byte) ((ulong) (secondsFromEpoch >> 8) & (ulong) byte.MaxValue),
(byte) ((ulong) secondsFromEpoch & (ulong) byte.MaxValue),
(byte) ((ulong) (nanoSeconds >> 24) & (ulong) byte.MaxValue),
(byte) ((ulong) (nanoSeconds >> 16) & (ulong) byte.MaxValue),
(byte) ((ulong) (nanoSeconds >> 8) & (ulong) byte.MaxValue),
(byte) ((ulong) nanoSeconds & (ulong) byte.MaxValue),
});

return that;
}

private static void DateTimeToEpoch(DateTime value, out uint secondsFromEpoch, out uint nanoSeconds)
{
var fromEpochTicks = value.ToUniversalTime().Ticks - unixEpochTicks;
secondsFromEpoch = (uint)(fromEpochTicks / TimeSpan.TicksPerSecond);
nanoSeconds = (uint)((fromEpochTicks - secondsFromEpoch * TimeSpan.TicksPerSecond) * ticksToNanoSecondsFactor);
}
}
}