From 9cadaf20993f29b56e3afbe90846ccf4dca76ff3 Mon Sep 17 00:00:00 2001 From: Jeremie Desautels Date: Wed, 27 Dec 2017 12:55:45 -0500 Subject: [PATCH 01/26] Update LibLog --- .../ColoredConsoleLogProvider.cs | 24 +- .../App_Packages/LibLog.4.2/LibLog.cs | 316 ++++++++++++------ 2 files changed, 242 insertions(+), 98 deletions(-) diff --git a/Source/Picton.Messaging.IntegrationTests/ColoredConsoleLogProvider.cs b/Source/Picton.Messaging.IntegrationTests/ColoredConsoleLogProvider.cs index 9a54521..2b49128 100644 --- a/Source/Picton.Messaging.IntegrationTests/ColoredConsoleLogProvider.cs +++ b/Source/Picton.Messaging.IntegrationTests/ColoredConsoleLogProvider.cs @@ -23,6 +23,12 @@ public ColoredConsoleLogProvider(LogLevel minLevel = LogLevel.Trace) _minLevel = minLevel; } + + /// + /// Gets the specified named logger. + /// + /// Name of the logger. + /// The logger reference. public Logger GetLogger(string name) { return (logLevel, messageFunc, exception, formatParameters) => @@ -36,8 +42,7 @@ public Logger GetLogger(string name) { if (logLevel >= _minLevel) { - ConsoleColor consoleColor; - if (Colors.TryGetValue(logLevel, out consoleColor)) + if (Colors.TryGetValue(logLevel, out ConsoleColor consoleColor)) { var originalForground = Console.ForegroundColor; try @@ -76,12 +81,25 @@ private static void WriteMessage( Console.WriteLine("{0} | {1} | {2} | {3}", DateTime.UtcNow, logLevel, name, message); } + + /// + /// Opens a nested diagnostics context. Not supported in EntLib logging. + /// + /// The message to add to the diagnostics context. + /// A disposable that when disposed removes the message from the context. public IDisposable OpenNestedContext(string message) { return NullDisposable.Instance; } - public IDisposable OpenMappedContext(string key, string value) + + /// + /// Opens a mapped diagnostics context. Not supported in EntLib logging. + /// + /// A key. + /// A value. + /// A disposable that when disposed removes the map from the context. + public IDisposable OpenMappedContext(string key, object value, bool destructure = false) { return NullDisposable.Instance; } diff --git a/Source/Picton.Messaging/App_Packages/LibLog.4.2/LibLog.cs b/Source/Picton.Messaging/App_Packages/LibLog.4.2/LibLog.cs index 66487b0..dab3e68 100644 --- a/Source/Picton.Messaging/App_Packages/LibLog.4.2/LibLog.cs +++ b/Source/Picton.Messaging/App_Packages/LibLog.4.2/LibLog.cs @@ -9,7 +9,7 @@ // // https://github.com/damianh/LibLog //=============================================================================== -// Copyright © 2011-2015 Damian Hickey. All rights reserved. +// Copyright © 2011-2017 Damian Hickey. All rights reserved. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal @@ -127,6 +127,9 @@ enum LogLevel } #if !LIBLOG_PROVIDERS_ONLY +#if !LIBLOG_PORTABLE + [ExcludeFromCodeCoverage] +#endif #if LIBLOG_PUBLIC public #else @@ -173,7 +176,7 @@ public static bool IsWarnEnabled(this ILog logger) public static void Debug(this ILog logger, Func messageFunc) { GuardAgainstNullLogger(logger); - logger.Log(LogLevel.Debug, messageFunc); + logger.Log(LogLevel.Debug, WrapLogInternal(messageFunc)); } public static void Debug(this ILog logger, string message) @@ -221,7 +224,7 @@ public static void DebugException(this ILog logger, string message, Exception ex public static void Error(this ILog logger, Func messageFunc) { GuardAgainstNullLogger(logger); - logger.Log(LogLevel.Error, messageFunc); + logger.Log(LogLevel.Error, WrapLogInternal(messageFunc)); } public static void Error(this ILog logger, string message) @@ -260,7 +263,7 @@ public static void ErrorException(this ILog logger, string message, Exception ex public static void Fatal(this ILog logger, Func messageFunc) { - logger.Log(LogLevel.Fatal, messageFunc); + logger.Log(LogLevel.Fatal, WrapLogInternal(messageFunc)); } public static void Fatal(this ILog logger, string message) @@ -300,7 +303,7 @@ public static void FatalException(this ILog logger, string message, Exception ex public static void Info(this ILog logger, Func messageFunc) { GuardAgainstNullLogger(logger); - logger.Log(LogLevel.Info, messageFunc); + logger.Log(LogLevel.Info, WrapLogInternal(messageFunc)); } public static void Info(this ILog logger, string message) @@ -340,7 +343,7 @@ public static void InfoException(this ILog logger, string message, Exception exc public static void Trace(this ILog logger, Func messageFunc) { GuardAgainstNullLogger(logger); - logger.Log(LogLevel.Trace, messageFunc); + logger.Log(LogLevel.Trace, WrapLogInternal(messageFunc)); } public static void Trace(this ILog logger, string message) @@ -380,7 +383,7 @@ public static void TraceException(this ILog logger, string message, Exception ex public static void Warn(this ILog logger, Func messageFunc) { GuardAgainstNullLogger(logger); - logger.Log(LogLevel.Warn, messageFunc); + logger.Log(LogLevel.Warn, WrapLogInternal(messageFunc)); } public static void Warn(this ILog logger, string message) @@ -422,7 +425,7 @@ private static void GuardAgainstNullLogger(ILog logger) { if (logger == null) { - throw new ArgumentNullException(nameof(logger)); + throw new ArgumentNullException("logger"); } } @@ -441,6 +444,34 @@ private static T Return(this T value) { return value; } + + // Allow passing callsite-logger-type to LogProviderBase using messageFunc + internal static Func WrapLogSafeInternal(LoggerExecutionWrapper logger, Func messageFunc) + { + Func wrappedMessageFunc = () => + { + try + { + return messageFunc(); + } + catch (Exception ex) + { + logger.WrappedLogger(LogLevel.Error, () => LoggerExecutionWrapper.FailedToGenerateLogMessage, ex); + } + return null; + }; + return wrappedMessageFunc; + } + + // Allow passing callsite-logger-type to LogProviderBase using messageFunc + private static Func WrapLogInternal(Func messageFunc) + { + Func wrappedMessageFunc = () => + { + return messageFunc(); + }; + return wrappedMessageFunc; + } } #endif @@ -474,12 +505,15 @@ interface ILogProvider /// A key. /// A value. /// A disposable that when disposed removes the map from the context. - IDisposable OpenMappedContext(string key, string value); + IDisposable OpenMappedContext(string key, object value, bool destructure = false); } /// /// Provides a mechanism to create instances of objects. /// +#if !LIBLOG_PORTABLE + [ExcludeFromCodeCoverage] +#endif #if LIBLOG_PROVIDERS_ONLY internal #else @@ -492,6 +526,7 @@ static class LogProvider "with a non-null value first."; private static dynamic s_currentLogProvider; private static Action s_onCurrentLogProviderSet; + private static Lazy s_resolvedLogProvider = new Lazy(() => ForceResolveLogProvider()); [SuppressMessage("Microsoft.Performance", "CA1810:InitializeReferenceTypeStaticFieldsInline")] static LogProvider() @@ -641,13 +676,13 @@ static IDisposable OpenNestedContext(string message) #else internal #endif - static IDisposable OpenMappedContext(string key, string value) + static IDisposable OpenMappedContext(string key, object value, bool destructure = false) { ILogProvider logProvider = CurrentLogProvider ?? ResolveLogProvider(); return logProvider == null ? new DisposableAction(() => { }) - : logProvider.OpenMappedContext(key, value); + : logProvider.OpenMappedContext(key, value, destructure); } #endif @@ -677,7 +712,7 @@ static IDisposable OpenMappedContext(string key, string value) new Tuple(NLogLogProvider.IsLoggerAvailable, () => new NLogLogProvider()), new Tuple(Log4NetLogProvider.IsLoggerAvailable, () => new Log4NetLogProvider()), new Tuple(EntLibLogProvider.IsLoggerAvailable, () => new EntLibLogProvider()), - new Tuple(LoupeLogProvider.IsLoggerAvailable, () => new LoupeLogProvider()) + new Tuple(LoupeLogProvider.IsLoggerAvailable, () => new LoupeLogProvider()), }; #if !LIBLOG_PROVIDERS_ONLY @@ -690,9 +725,14 @@ private static void RaiseOnCurrentLogProviderSet() } #endif + internal static ILogProvider ResolveLogProvider() + { + return s_resolvedLogProvider.Value; + } + [SuppressMessage("Microsoft.Globalization", "CA1303:Do not pass literals as localized parameters", MessageId = "System.Console.WriteLine(System.String,System.Object,System.Object)")] [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")] - internal static ILogProvider ResolveLogProvider() + internal static ILogProvider ForceResolveLogProvider() { try { @@ -719,6 +759,9 @@ internal static ILogProvider ResolveLogProvider() } #if !LIBLOG_PROVIDERS_ONLY +#if !LIBLOG_PORTABLE + [ExcludeFromCodeCoverage] +#endif internal class NoOpLogger : ILog { internal static readonly NoOpLogger Instance = new NoOpLogger(); @@ -732,15 +775,22 @@ public bool Log(LogLevel logLevel, Func messageFunc, Exception exception } #if !LIBLOG_PROVIDERS_ONLY +#if !LIBLOG_PORTABLE + [ExcludeFromCodeCoverage] +#endif internal class LoggerExecutionWrapper : ILog { private readonly Logger _logger; + private readonly ICallSiteExtension _callsiteLogger; private readonly Func _getIsDisabled; internal const string FailedToGenerateLogMessage = "Failed to generate log message"; + Func _lastExtensionMethod; + internal LoggerExecutionWrapper(Logger logger, Func getIsDisabled = null) { _logger = logger; + _callsiteLogger = new CallSiteExtension(); _getIsDisabled = getIsDisabled ?? (() => false); } @@ -761,19 +811,58 @@ public bool Log(LogLevel logLevel, Func messageFunc, Exception exception return _logger(logLevel, null); } - Func wrappedMessageFunc = () => - { - try +#if !LIBLOG_PORTABLE + // Callsite HACK - Using the messageFunc to provide the callsite-logger-type + var lastExtensionMethod = _lastExtensionMethod; + if (lastExtensionMethod == null || !lastExtensionMethod.Equals(messageFunc)) + { + // Callsite HACK - Cache the last validated messageFunc as Equals is faster than type-check + lastExtensionMethod = null; + var methodType = messageFunc.Method.DeclaringType; + if (methodType == typeof(LogExtensions) || (methodType != null && methodType.DeclaringType == typeof(LogExtensions))) { - return messageFunc(); + lastExtensionMethod = messageFunc; } - catch (Exception ex) + } + + if (lastExtensionMethod != null) + { + // Callsite HACK - LogExtensions has called virtual ILog interface method to get here, callsite-stack is good + _lastExtensionMethod = lastExtensionMethod; + return _logger(logLevel, LogExtensions.WrapLogSafeInternal(this, messageFunc), exception, formatParameters); + } + else +#endif + { + Func wrappedMessageFunc = () => { - Log(LogLevel.Error, () => FailedToGenerateLogMessage, ex); - } - return null; - }; - return _logger(logLevel, wrappedMessageFunc, exception, formatParameters); + try + { + return messageFunc(); + } + catch (Exception ex) + { + _logger(LogLevel.Error, () => FailedToGenerateLogMessage, ex); + } + return null; + }; + + // Callsite HACK - Need to ensure proper callsite stack without inlining, so calling the logger within a virtual interface method + return _callsiteLogger.Log(_logger, logLevel, wrappedMessageFunc, exception, formatParameters); + } + } + + interface ICallSiteExtension + { + bool Log(Logger logger, LogLevel logLevel, Func messageFunc, Exception exception, object[] formatParameters); + } + + class CallSiteExtension : ICallSiteExtension + { + bool ICallSiteExtension.Log(Logger logger, LogLevel logLevel, Func messageFunc, Exception exception, object[] formatParameters) + { + return logger(logLevel, messageFunc, exception, formatParameters); + } } } #endif @@ -800,10 +889,13 @@ namespace Picton.Messaging.Logging.LogProviders #endif using System.Text.RegularExpressions; +#if !LIBLOG_PORTABLE + [ExcludeFromCodeCoverage] +#endif internal abstract class LogProviderBase : ILogProvider { protected delegate IDisposable OpenNdc(string message); - protected delegate IDisposable OpenMdc(string key, string value); + protected delegate IDisposable OpenMdc(string key, object value, bool destructure); private readonly Lazy _lazyOpenNdcMethod; private readonly Lazy _lazyOpenMdcMethod; @@ -824,9 +916,9 @@ public IDisposable OpenNestedContext(string message) return _lazyOpenNdcMethod.Value(message); } - public IDisposable OpenMappedContext(string key, string value) + public IDisposable OpenMappedContext(string key, object value, bool destructure = false) { - return _lazyOpenMdcMethod.Value(key, value); + return _lazyOpenMdcMethod.Value(key, value, destructure); } protected virtual OpenNdc GetOpenNdcMethod() @@ -836,10 +928,13 @@ protected virtual OpenNdc GetOpenNdcMethod() protected virtual OpenMdc GetOpenMdcMethod() { - return (_, __) => NoopDisposableInstance; + return (_, __, ___) => NoopDisposableInstance; } } +#if !LIBLOG_PORTABLE + [ExcludeFromCodeCoverage] +#endif internal class NLogLogProvider : LogProviderBase { private readonly Func _getLoggerByNameDelegate; @@ -900,9 +995,9 @@ protected override OpenMdc GetOpenMdcMethod() .Lambda>(removeMethodCall, keyParam) .Compile(); - return (key, value) => + return (key, value, _) => { - set(key, value); + set(key, value.ToString()); return new DisposableAction(() => remove(key)); }; } @@ -921,6 +1016,9 @@ private static Func GetGetLoggerMethodCall() return Expression.Lambda>(methodCall, nameParam).Compile(); } +#if !LIBLOG_PORTABLE + [ExcludeFromCodeCoverage] +#endif internal class NLogLogger { private readonly dynamic _logger; @@ -957,18 +1055,27 @@ static NLogLogger() { throw new InvalidOperationException("Type NLog.LogEventInfo was not found."); } - MethodInfo createLogEventInfoMethodInfo = logEventInfoType.GetMethodPortable("Create", - logEventLevelType, typeof(string), typeof(Exception), typeof(IFormatProvider), typeof(string), typeof(object[])); + + ConstructorInfo loggingEventConstructor = + logEventInfoType.GetConstructorPortable(logEventLevelType, typeof(string), typeof(IFormatProvider), typeof(string), typeof(object[]), typeof(Exception)); + ParameterExpression loggerNameParam = Expression.Parameter(typeof(string)); ParameterExpression levelParam = Expression.Parameter(typeof(object)); ParameterExpression messageParam = Expression.Parameter(typeof(string)); ParameterExpression exceptionParam = Expression.Parameter(typeof(Exception)); UnaryExpression levelCast = Expression.Convert(levelParam, logEventLevelType); - MethodCallExpression createLogEventInfoMethodCall = Expression.Call(null, - createLogEventInfoMethodInfo, - levelCast, loggerNameParam, exceptionParam, - Expression.Constant(null, typeof(IFormatProvider)), messageParam, Expression.Constant(null, typeof(object[]))); - _logEventInfoFact = Expression.Lambda>(createLogEventInfoMethodCall, + + NewExpression newLoggingEventExpression = + Expression.New(loggingEventConstructor, + levelCast, + loggerNameParam, + Expression.Constant(null, typeof(IFormatProvider)), + messageParam, + Expression.Constant(null, typeof(object[])), + exceptionParam + ); + + _logEventInfoFact = Expression.Lambda>(newLoggingEventExpression, loggerNameParam, levelParam, messageParam, exceptionParam).Compile(); } catch { } @@ -986,40 +1093,30 @@ public bool Log(LogLevel logLevel, Func messageFunc, Exception exception { return IsLogLevelEnable(logLevel); } + + var callsiteMessageFunc = messageFunc; messageFunc = LogMessageFormatter.SimulateStructuredLogging(messageFunc, formatParameters); if (_logEventInfoFact != null) { if (IsLogLevelEnable(logLevel)) { - var nlogLevel = this.TranslateLevel(logLevel); - Type s_callerStackBoundaryType; + Type callsiteLoggerType = typeof(NLogLogger); #if !LIBLOG_PORTABLE - StackTrace stack = new StackTrace(); - Type thisType = GetType(); - Type knownType0 = typeof(LoggerExecutionWrapper); - Type knownType1 = typeof(LogExtensions); - //Maybe inline, so we may can't found any LibLog classes in stack - s_callerStackBoundaryType = null; - for (var i = 0; i < stack.FrameCount; i++) + // Callsite HACK - Extract the callsite-logger-type from the messageFunc + var methodType = callsiteMessageFunc.Method.DeclaringType; + if (methodType == typeof(LogExtensions) || (methodType != null && methodType.DeclaringType == typeof(LogExtensions))) { - var declaringType = stack.GetFrame(i).GetMethod().DeclaringType; - if (!IsInTypeHierarchy(thisType, declaringType) && - !IsInTypeHierarchy(knownType0, declaringType) && - !IsInTypeHierarchy(knownType1, declaringType)) - { - if (i > 1) - s_callerStackBoundaryType = stack.GetFrame(i - 1).GetMethod().DeclaringType; - break; - } + callsiteLoggerType = typeof(LogExtensions); + } + else if (methodType == typeof(LoggerExecutionWrapper) || (methodType != null && methodType.DeclaringType == typeof(LoggerExecutionWrapper))) + { + callsiteLoggerType = typeof(LoggerExecutionWrapper); } -#else - s_callerStackBoundaryType = null; #endif - if (s_callerStackBoundaryType != null) - _logger.Log(s_callerStackBoundaryType, _logEventInfoFact(_logger.Name, nlogLevel, messageFunc(), exception)); - else - _logger.Log(_logEventInfoFact(_logger.Name, nlogLevel, messageFunc(), exception)); + var nlogLevel = this.TranslateLevel(logLevel); + var nlogEvent = _logEventInfoFact(_logger.Name, nlogLevel, messageFunc(), exception); + _logger.Log(callsiteLoggerType, nlogEvent); return true; } return false; @@ -1029,6 +1126,7 @@ public bool Log(LogLevel logLevel, Func messageFunc, Exception exception { return LogException(logLevel, messageFunc, exception); } + switch (logLevel) { case LogLevel.Debug: @@ -1077,19 +1175,6 @@ public bool Log(LogLevel logLevel, Func messageFunc, Exception exception return false; } - private static bool IsInTypeHierarchy(Type currentType, Type checkType) - { - while (currentType != null && currentType != typeof(object)) - { - if (currentType == checkType) - { - return true; - } - currentType = currentType.GetBaseTypePortable(); - } - return false; - } - [SuppressMessage("Microsoft.Maintainability", "CA1502:AvoidExcessiveComplexity")] private bool LogException(LogLevel logLevel, Func messageFunc, Exception exception) { @@ -1177,12 +1262,15 @@ private object TranslateLevel(LogLevel logLevel) case LogLevel.Fatal: return _levelFatal; default: - throw new ArgumentOutOfRangeException(nameof(logLevel), logLevel, null); + throw new ArgumentOutOfRangeException("logLevel", logLevel, null); } } } } +#if !LIBLOG_PORTABLE + [ExcludeFromCodeCoverage] +#endif internal class Log4NetLogProvider : LogProviderBase { private readonly Func _getLoggerByNameDelegate; @@ -1270,9 +1358,9 @@ protected override OpenMdc GetOpenMdcMethod() .Lambda>(removeMethodCall, keyParam) .Compile(); - return (key, value) => + return (key, value, _) => { - set(key, value); + set(key, value.ToString()); return new DisposableAction(() => remove(key)); }; } @@ -1291,27 +1379,27 @@ private static Func GetGetLoggerMethodCall() return Expression.Lambda>(methodCall, nameParam).Compile(); } +#if !LIBLOG_PORTABLE + [ExcludeFromCodeCoverage] +#endif internal class Log4NetLogger { private readonly dynamic _logger; private static Type s_callerStackBoundaryType; private static readonly object CallerStackBoundaryTypeSync = new object(); - private readonly object _levelDebug; - private readonly object _levelInfo; - private readonly object _levelWarn; - private readonly object _levelError; - private readonly object _levelFatal; - private readonly Func _isEnabledForDelegate; - private readonly Action _logDelegate; - private readonly Func _createLoggingEvent; - private Action _loggingEventPropertySetter; + private static readonly object _levelDebug; + private static readonly object _levelInfo; + private static readonly object _levelWarn; + private static readonly object _levelError; + private static readonly object _levelFatal; + private static readonly Func _isEnabledForDelegate; + private static readonly Action _logDelegate; + private static readonly Func _createLoggingEvent; + private static readonly Action _loggingEventPropertySetter; - [SuppressMessage("Microsoft.Naming", "CA2204:Literals should be spelled correctly", MessageId = "ILogger")] - internal Log4NetLogger(dynamic logger) + static Log4NetLogger() { - _logger = logger.Logger; - var logEventLevelType = Type.GetType("log4net.Core.Level, log4net"); if (logEventLevelType == null) { @@ -1346,6 +1434,12 @@ internal Log4NetLogger(dynamic logger) _loggingEventPropertySetter = GetLoggingEventPropertySetter(loggingEventType); } + [SuppressMessage("Microsoft.Naming", "CA2204:Literals should be spelled correctly", MessageId = "ILogger")] + internal Log4NetLogger(dynamic logger) + { + _logger = logger.Logger; + } + private static Action GetLogDelegate(Type loggerType, Type loggingEventType, UnaryExpression instanceCast, ParameterExpression instanceParam) { @@ -1552,12 +1646,15 @@ private object TranslateLevel(LogLevel logLevel) case LogLevel.Fatal: return _levelFatal; default: - throw new ArgumentOutOfRangeException(nameof(logLevel), logLevel, null); + throw new ArgumentOutOfRangeException("logLevel", logLevel, null); } } } } +#if !LIBLOG_PORTABLE + [ExcludeFromCodeCoverage] +#endif internal class EntLibLogProvider : LogProviderBase { private const string TypeTemplate = "Microsoft.Practices.EnterpriseLibrary.Logging.{0}, Microsoft.Practices.EnterpriseLibrary.Logging"; @@ -1674,6 +1771,9 @@ private static MemberInitExpression GetWriteLogExpression(Expression message, return memberInit; } +#if !LIBLOG_PORTABLE + [ExcludeFromCodeCoverage] +#endif internal class EntLibLogger { private readonly string _loggerName; @@ -1732,10 +1832,14 @@ private static int MapSeverity(LogLevel logLevel) } } +#if !LIBLOG_PORTABLE + [ExcludeFromCodeCoverage] +#endif internal class SerilogLogProvider : LogProviderBase { private readonly Func _getLoggerByNameDelegate; private static bool s_providerIsAvailableOverride = true; + private static Func _pushProperty; [SuppressMessage("Microsoft.Naming", "CA2204:Literals should be spelled correctly", MessageId = "Serilog")] public SerilogLogProvider() @@ -1745,6 +1849,7 @@ public SerilogLogProvider() throw new InvalidOperationException("Serilog.Log not found"); } _getLoggerByNameDelegate = GetForContextMethodCall(); + _pushProperty = GetPushProperty(); } public static bool ProviderIsAvailableOverride @@ -1765,15 +1870,15 @@ internal static bool IsLoggerAvailable() protected override OpenNdc GetOpenNdcMethod() { - return message => GetPushProperty()("NDC", message); + return message => _pushProperty("NDC", message, false); } protected override OpenMdc GetOpenMdcMethod() { - return (key, value) => GetPushProperty()(key, value); + return (key, value, destructure) => _pushProperty(key, value, destructure); } - private static Func GetPushProperty() + private static Func GetPushProperty() { Type ndcContextType = Type.GetType("Serilog.Context.LogContext, Serilog") ?? Type.GetType("Serilog.Context.LogContext, Serilog.FullNetFx"); @@ -1797,7 +1902,7 @@ private static Func GetPushProperty() destructureObjectParam) .Compile(); - return (key, value) => pushProperty(key, value, false); + return (key, value, destructure) => pushProperty(key, value, destructure); } private static Type GetLogManagerType() @@ -1827,6 +1932,9 @@ private static Func GetForContextMethodCall() return name => func("SourceContext", name, false); } +#if !LIBLOG_PORTABLE + [ExcludeFromCodeCoverage] +#endif internal class SerilogLogger { private readonly object _logger; @@ -1977,6 +2085,9 @@ private static object TranslateLevel(LogLevel logLevel) } } +#if !LIBLOG_PORTABLE + [ExcludeFromCodeCoverage] +#endif internal class LoupeLogProvider : LogProviderBase { /// @@ -2051,6 +2162,9 @@ private static WriteDelegate GetLogWriteDelegate() return callDelegate; } +#if !LIBLOG_PORTABLE + [ExcludeFromCodeCoverage] +#endif internal class LoupeLogger { private const string LogSystem = "LibLog"; @@ -2103,12 +2217,15 @@ private static int ToLogMessageSeverity(LogLevel logLevel) case LogLevel.Fatal: return TraceEventTypeValues.Critical; default: - throw new ArgumentOutOfRangeException(nameof(logLevel)); + throw new ArgumentOutOfRangeException("logLevel"); } } } } +#if !LIBLOG_PORTABLE + [ExcludeFromCodeCoverage] +#endif internal static class TraceEventTypeValues { internal static readonly Type Type; @@ -2136,6 +2253,9 @@ static TraceEventTypeValues() } } +#if !LIBLOG_PORTABLE + [ExcludeFromCodeCoverage] +#endif internal static class LogMessageFormatter { //private static readonly Regex Pattern = new Regex(@"\{@?\w{1,}\}"); @@ -2220,6 +2340,9 @@ public static string FormatStructuredMessage(string targetMessage, object[] form } } +#if !LIBLOG_PORTABLE + [ExcludeFromCodeCoverage] +#endif internal static class TypeExtensions { internal static ConstructorInfo GetConstructorPortable(this Type type, params Type[] types) @@ -2309,6 +2432,9 @@ internal static Assembly GetAssemblyPortable(this Type type) } } +#if !LIBLOG_PORTABLE + [ExcludeFromCodeCoverage] +#endif internal class DisposableAction : IDisposable { private readonly Action _onDispose; From 5a491b357133d62b1348cf5181165359d5e9202b Mon Sep 17 00:00:00 2001 From: Jeremie Desautels Date: Wed, 27 Dec 2017 14:31:20 -0500 Subject: [PATCH 02/26] Edit XML comments that are no longer accurate --- Source/Picton.Messaging/AsyncMessagePump.cs | 2 -- Source/Picton.Messaging/AsyncMessagePumpWithHandlers.cs | 5 ++--- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/Source/Picton.Messaging/AsyncMessagePump.cs b/Source/Picton.Messaging/AsyncMessagePump.cs index 7712ac2..c8235b6 100644 --- a/Source/Picton.Messaging/AsyncMessagePump.cs +++ b/Source/Picton.Messaging/AsyncMessagePump.cs @@ -14,8 +14,6 @@ namespace Picton.Messaging /// /// High performance message processor (also known as a message "pump") for Azure storage queues. /// Designed to monitor an Azure storage queue and process the message as quickly and efficiently as possible. - /// When messages are present in the queue, this message pump will increase the number of tasks that can concurrently process messages. - /// Conversly, this message pump will reduce the number of tasks that can concurrently process messages when the queue is empty. /// public class AsyncMessagePump { diff --git a/Source/Picton.Messaging/AsyncMessagePumpWithHandlers.cs b/Source/Picton.Messaging/AsyncMessagePumpWithHandlers.cs index 9898d3a..d67b59a 100644 --- a/Source/Picton.Messaging/AsyncMessagePumpWithHandlers.cs +++ b/Source/Picton.Messaging/AsyncMessagePumpWithHandlers.cs @@ -16,9 +16,8 @@ namespace Picton.Messaging { /// - /// High performance message processor (also known as a message "pump") for Azure storage queues. Designed to monitor an Azure storage queue and process the message as quickly and efficiently as possible. - /// When messages are present in the queue, this message pump will increase the number of tasks that can concurrently process messages. - /// Conversly, this message pump will reduce the number of tasks that can concurrently process messages when the queue is empty. + /// High performance message processor (also known as a message "pump") for Azure storage queues. + /// Designed to monitor an Azure storage queue and process the message as quickly and efficiently as possible. /// public class AsyncMessagePumpWithHandlers { From 4876a031a1238f3c31a27fdb3ade1d05b55ec466 Mon Sep 17 00:00:00 2001 From: Jeremie Desautels Date: Wed, 27 Dec 2017 14:33:26 -0500 Subject: [PATCH 03/26] No longer need to delay the pump --- Source/Picton.Messaging/AsyncMessagePump.cs | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/Source/Picton.Messaging/AsyncMessagePump.cs b/Source/Picton.Messaging/AsyncMessagePump.cs index c8235b6..fe42b30 100644 --- a/Source/Picton.Messaging/AsyncMessagePump.cs +++ b/Source/Picton.Messaging/AsyncMessagePump.cs @@ -152,7 +152,6 @@ public void Stop() var runningTasks = new ConcurrentDictionary(); var semaphore = new SemaphoreSlim(_concurrentTasks, _concurrentTasks); var queuedMessages = new ConcurrentQueue(); - var fetchTaskStarted = false; // Define the task that fetches messages from the Azure queue RecurrentCancellableTask.StartNew( @@ -170,9 +169,6 @@ public void Stop() { queuedMessages.Enqueue(message); } - - // Indicate that we have fetched messages which will allow the message pump to start if it has not already started - fetchTaskStarted = true; } else { @@ -193,12 +189,6 @@ public void Stop() cancellationToken, TaskCreationOptions.LongRunning); - // Delay the pump until we have fetched messages - while (!fetchTaskStarted && !cancellationToken.IsCancellationRequested) - { - await Task.Delay(250); - } - // Define the task pump var pumpTask = Task.Run(async () => { From afe7c0793de15e6ead548883b6d0f0e970b405dc Mon Sep 17 00:00:00 2001 From: Jeremie Desautels Date: Wed, 27 Dec 2017 14:40:45 -0500 Subject: [PATCH 04/26] Remove dead code --- Source/Picton.Messaging/AsyncMessagePump.cs | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/Source/Picton.Messaging/AsyncMessagePump.cs b/Source/Picton.Messaging/AsyncMessagePump.cs index fe42b30..8adfbf8 100644 --- a/Source/Picton.Messaging/AsyncMessagePump.cs +++ b/Source/Picton.Messaging/AsyncMessagePump.cs @@ -157,7 +157,7 @@ public void Stop() RecurrentCancellableTask.StartNew( async () => { - // Fetched messages from the Azure queue when the concurrent queue falls below an "acceptable" count. + // Fetched messages from the Azure queue when the number of items in the concurrent queue falls below an "acceptable" level. if (queuedMessages.Count <= _concurrentTasks / 2) { var messages = await _queueManager.GetMessagesAsync(_concurrentTasks, visibilityTimeout, null, null, cancellationToken).ConfigureAwait(false); @@ -256,16 +256,6 @@ public void Stop() await Task.WhenAll(runningTasks.Values).UntilCancelled().ConfigureAwait(false); } - private bool IsCancellationRequested(Exception e) - { - if (e == null) return false; - - if (e is OperationCanceledException oce) return true; - if (e is TaskCanceledException tce) return true; - - return IsCancellationRequested(e.InnerException); - } - #endregion } } From f21693a4ec22847ef6e1fe6625e61f608ae0e639 Mon Sep 17 00:00:00 2001 From: Jeremie Desautels Date: Sun, 21 Jan 2018 18:49:44 -0500 Subject: [PATCH 05/26] Change the default delay when a queue is empty to 1.5 seconds This change is to account for the fact that there is already a 0.5 second delay between each attempt to fetch messages from the Azure queue. The goal is to make sure that the total delay is exactly 2 seconds (it was 2.5 seconds before this change). --- Source/Picton.Messaging/AsyncMessagePump.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Source/Picton.Messaging/AsyncMessagePump.cs b/Source/Picton.Messaging/AsyncMessagePump.cs index 8adfbf8..f4cffed 100644 --- a/Source/Picton.Messaging/AsyncMessagePump.cs +++ b/Source/Picton.Messaging/AsyncMessagePump.cs @@ -99,7 +99,7 @@ public AsyncMessagePump(string queueName, IStorageAccount storageAccount, int co _visibilityTimeout = visibilityTimeout; _maxDequeueCount = maxDequeueCount; - OnQueueEmpty = cancellationToken => Task.Delay(2000, cancellationToken).Wait(); + OnQueueEmpty = cancellationToken => Task.Delay(1500, cancellationToken).Wait(); OnError = (message, exception, isPoison) => _logger.ErrorException("An error occured when processing a message", exception); } From e91daea4b5cdffb6584194b4c4f1cf8b2f6e6b89 Mon Sep 17 00:00:00 2001 From: Jeremie Desautels Date: Sun, 21 Jan 2018 18:50:50 -0500 Subject: [PATCH 06/26] Simplify netfull vs netstandard reflection code --- .../AsyncMessagePumpWithHandlers.cs | 33 ++++++++----------- 1 file changed, 13 insertions(+), 20 deletions(-) diff --git a/Source/Picton.Messaging/AsyncMessagePumpWithHandlers.cs b/Source/Picton.Messaging/AsyncMessagePumpWithHandlers.cs index d67b59a..4cd5cf6 100644 --- a/Source/Picton.Messaging/AsyncMessagePumpWithHandlers.cs +++ b/Source/Picton.Messaging/AsyncMessagePumpWithHandlers.cs @@ -138,37 +138,20 @@ private static IDictionary GetMessageHandlers() { var assemblies = GetLocalAssemblies(); -#if NETFULL - var typesWithMessageHandlerInterfaces = assemblies - .SelectMany(x => x.GetTypes()) - .Where(t => !t.IsInterface) - .Select(type => new - { - Type = type, - MessageTypes = type - .GetInterfaces() - .Where(i => i.IsGenericType) - .Where(t => t.GetGenericTypeDefinition() == typeof(IMessageHandler<>)) - .SelectMany(i => i.GetGenericArguments()) - }) - .Where(t => t.MessageTypes != null && t.MessageTypes.Any()) - .ToArray(); -#else var typesWithMessageHandlerInterfaces = assemblies .SelectMany(x => x.GetTypes()) - .Where(t => !t.GetTypeInfo().IsInterface) + .Where(t => !GetTypeInfo(t).IsInterface) .Select(type => new { Type = type, MessageTypes = type .GetInterfaces() - .Where(i => i.GetTypeInfo().IsGenericType) - .Where(t => t.GetGenericTypeDefinition() == typeof(IMessageHandler<>)) + .Where(i => GetTypeInfo(i).IsGenericType) + .Where(i => i.GetGenericTypeDefinition() == typeof(IMessageHandler<>)) .SelectMany(i => i.GetGenericArguments()) }) .Where(t => t.MessageTypes != null && t.MessageTypes.Any()) .ToArray(); -#endif var oneTypePerMessageHandler = typesWithMessageHandlerInterfaces .SelectMany(t => t.MessageTypes, (t, messageType) => @@ -185,6 +168,11 @@ private static IDictionary GetMessageHandlers() } #if NETFULL + private static Type GetTypeInfo(Type type) + { + return type; + } + private static IEnumerable GetLocalAssemblies() { var callingAssembly = Assembly.GetCallingAssembly(); @@ -194,6 +182,11 @@ private static IEnumerable GetLocalAssemblies() .Where(x => !x.IsDynamic && new Uri(x.CodeBase).AbsolutePath.Contains(path)).ToList(); } #else + private static TypeInfo GetTypeInfo(Type type) + { + return type.GetTypeInfo(); + } + private static IEnumerable GetLocalAssemblies() { var dependencies = DependencyContext.Default.RuntimeLibraries; From 08792bb400faa7d5ee1201a0e422c7bcf2aa7095 Mon Sep 17 00:00:00 2001 From: Jeremie Desautels Date: Sun, 21 Jan 2018 18:52:51 -0500 Subject: [PATCH 07/26] Simplify OnQueueEmpty in integration test Recent improvement to Picton ensure there is only one task to fetch messages from Azure queue. Therefore we no longer need to ensure that only one task is attempting to stop the message pump. --- .../Program.cs | 33 +++++-------------- 1 file changed, 9 insertions(+), 24 deletions(-) diff --git a/Source/Picton.Messaging.IntegrationTests/Program.cs b/Source/Picton.Messaging.IntegrationTests/Program.cs index 2af2538..e6a9a31 100644 --- a/Source/Picton.Messaging.IntegrationTests/Program.cs +++ b/Source/Picton.Messaging.IntegrationTests/Program.cs @@ -71,9 +71,6 @@ public static void ProcessSimpleMessages(string queueName, IStorageAccount stora var logger = logProvider.GetLogger("ProcessSimpleMessages"); Stopwatch sw = null; - var lockObject = new Object(); - var stopping = false; - // Configure the message pump var messagePump = new AsyncMessagePump(queueName, storageAccount, 10, TimeSpan.FromMinutes(1), 3) { @@ -86,29 +83,17 @@ public static void ProcessSimpleMessages(string queueName, IStorageAccount stora // Stop the message pump when the queue is empty. messagePump.OnQueueEmpty = cancellationToken => { - // Make sure we try to stop it only once (otherwise each concurrent task would try to stop it) - if (!stopping) - { - lock (lockObject) - { - if (sw.IsRunning) sw.Stop(); - if (!stopping) - { - // Indicate that the message pump is stopping - stopping = true; + if (sw.IsRunning) sw.Stop(); - // Log to console - logger(Logging.LogLevel.Debug, () => "Asking the 'simple' message pump to stop"); + // Log to console + logger(Logging.LogLevel.Debug, () => "Asking the 'simple' message pump to stop"); - // Run the 'OnStop' on a different thread so we don't block it - Task.Run(() => - { - messagePump.Stop(); - logger(Logging.LogLevel.Debug, () => "The 'simple' message pump has been stopped"); - }).ConfigureAwait(false); - } - } - } + // Run the 'OnStop' on a different thread so we don't block it + Task.Run(() => + { + messagePump.Stop(); + logger(Logging.LogLevel.Debug, () => "The 'simple' message pump has been stopped"); + }).ConfigureAwait(false); }; // Start the message pump From 51a33909c16344e400916c5f5d1a7f25d35eeacf Mon Sep 17 00:00:00 2001 From: Jeremie Desautels Date: Mon, 22 Jan 2018 14:59:39 -0500 Subject: [PATCH 08/26] Ensure we only restore nuget packages once --- build.cake | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/build.cake b/build.cake index baa33c8..3973d1a 100644 --- a/build.cake +++ b/build.cake @@ -164,6 +164,7 @@ Task("Build") DotNetCoreBuild(sourceFolder + libraryName + ".sln", new DotNetCoreBuildSettings { Configuration = configuration, + NoRestore = true, ArgumentCustomization = args => args.Append("/p:SemVer=" + versionInfo.LegacySemVerPadded) }); }); @@ -233,7 +234,7 @@ Task("Create-NuGet-Package") IncludeSymbols = false, NoBuild = true, OutputDirectory = outputDir, - ArgumentCustomization = (args) => + ArgumentCustomization = (args) => { return args .Append("/p:Version={0}", versionInfo.LegacySemVerPadded) From 0bcd5079d256b9bbf7455136b2ffe236537ca292 Mon Sep 17 00:00:00 2001 From: Jeremie Desautels Date: Mon, 22 Jan 2018 15:04:53 -0500 Subject: [PATCH 09/26] Simplify locking in unit tests --- .../AsyncMessagePumpTests.cs | 55 +++++++++---------- 1 file changed, 26 insertions(+), 29 deletions(-) diff --git a/Source/Picton.Messaging.UnitTests/AsyncMessagePumpTests.cs b/Source/Picton.Messaging.UnitTests/AsyncMessagePumpTests.cs index cacf5a9..8303450 100644 --- a/Source/Picton.Messaging.UnitTests/AsyncMessagePumpTests.cs +++ b/Source/Picton.Messaging.UnitTests/AsyncMessagePumpTests.cs @@ -127,7 +127,7 @@ public void No_message_processed_when_queue_is_empty() // Assert onMessageInvokeCount.ShouldBe(0); - onQueueEmptyInvokeCount.ShouldBeGreaterThan(0); + onQueueEmptyInvokeCount.ShouldBe(1); onErrorInvokeCount.ShouldBe(0); // You would expect the 'GetMessagesAsync' method to be invoked only once, but unfortunately we can't be sure. @@ -160,23 +160,21 @@ public void Message_processed() mockQueue.Setup(q => q.GetMessagesAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())).ReturnsAsync((int messageCount, TimeSpan? visibilityTimeout, QueueRequestOptions options, OperationContext operationContext, CancellationToken cancellationToken) => { - if (cloudMessage == null) Enumerable.Empty(); - - lock (lockObject) + if (cloudMessage != null) { - if (cloudMessage != null) - { - // DequeueCount is a private property. Therefore we must use reflection to change its value - var dequeueCountProperty = cloudMessage.GetType().GetProperty("DequeueCount"); - dequeueCountProperty.SetValue(cloudMessage, cloudMessage.DequeueCount + 1); - - return new[] { cloudMessage }; - } - else + lock (lockObject) { - return Enumerable.Empty(); + if (cloudMessage != null) + { + // DequeueCount is a private property. Therefore we must use reflection to change its value + var dequeueCountProperty = cloudMessage.GetType().GetProperty("DequeueCount"); + dequeueCountProperty.SetValue(cloudMessage, cloudMessage.DequeueCount + 1); + + return new[] { cloudMessage }; + } } } + return Enumerable.Empty(); }); mockQueue.Setup(q => q.DeleteMessageAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())).Returns((string messageId, string popReceipt, QueueRequestOptions options, OperationContext operationContext, CancellationToken cancellationToken) => { @@ -219,9 +217,9 @@ public void Message_processed() // Assert onMessageInvokeCount.ShouldBe(1); - onQueueEmptyInvokeCount.ShouldBeGreaterThan(0); + onQueueEmptyInvokeCount.ShouldBe(1); onErrorInvokeCount.ShouldBe(0); - mockQueue.Verify(q => q.GetMessagesAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny()), Times.Exactly(2)); + mockQueue.Verify(q => q.GetMessagesAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny()), Times.AtLeast(2)); mockQueue.Verify(q => q.DeleteMessageAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny()), Times.Exactly(1)); } @@ -247,23 +245,21 @@ public void Poison_message_is_rejected() mockQueue.Setup(q => q.GetMessagesAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())).ReturnsAsync((int messageCount, TimeSpan? visibilityTimeout, QueueRequestOptions options, OperationContext operationContext, CancellationToken cancellationToken) => { - if (cloudMessage == null) return Enumerable.Empty(); - - lock (lockObject) + if (cloudMessage != null) { - if (cloudMessage != null) - { - // DequeueCount is a private property. Therefore we must use reflection to change its value - var dequeueCountProperty = cloudMessage.GetType().GetProperty("DequeueCount"); - dequeueCountProperty.SetValue(cloudMessage, retries + 1); // intentionally set 'DequeueCount' to a value exceeding maxRetries to simulate a poison message - - return new[] { cloudMessage }; - } - else + lock (lockObject) { - return Enumerable.Empty(); + if (cloudMessage != null) + { + // DequeueCount is a private property. Therefore we must use reflection to change its value + var dequeueCountProperty = cloudMessage.GetType().GetProperty("DequeueCount"); + dequeueCountProperty.SetValue(cloudMessage, retries + 1); // intentionally set 'DequeueCount' to a value exceeding maxRetries to simulate a poison message + + return new[] { cloudMessage }; + } } } + return Enumerable.Empty(); }); mockQueue.Setup(q => q.DeleteMessageAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())).Returns((string messageId, string popReceipt, QueueRequestOptions options, OperationContext operationContext, CancellationToken cancellationToken) => { @@ -312,6 +308,7 @@ public void Poison_message_is_rejected() onErrorInvokeCount.ShouldBe(1); isRejected.ShouldBeTrue(); mockQueue.Verify(q => q.GetMessagesAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny()), Times.AtLeast(2)); + mockQueue.Verify(q => q.DeleteMessageAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny()), Times.Exactly(1)); } [Fact] From 92fbb11a7040243e493969ebf9ab274d82d15492 Mon Sep 17 00:00:00 2001 From: Jeremie Desautels Date: Mon, 22 Jan 2018 15:06:03 -0500 Subject: [PATCH 10/26] Remove unnecessary locking in integration tests --- .../Program.cs | 32 ++++++++----------- 1 file changed, 13 insertions(+), 19 deletions(-) diff --git a/Source/Picton.Messaging.IntegrationTests/Program.cs b/Source/Picton.Messaging.IntegrationTests/Program.cs index e6a9a31..c4b485c 100644 --- a/Source/Picton.Messaging.IntegrationTests/Program.cs +++ b/Source/Picton.Messaging.IntegrationTests/Program.cs @@ -120,7 +120,6 @@ public static void ProcessMessagesWithHandlers(string queueName, IStorageAccount { var logger = logProvider.GetLogger("ProcessMessagesWithHandlers"); - var lockObject = new Object(); var stopping = false; Stopwatch sw = null; @@ -132,25 +131,20 @@ public static void ProcessMessagesWithHandlers(string queueName, IStorageAccount // However, ensure that we try to stop it only once (otherwise each concurrent task would try to stop it) if (!stopping) { - lock (lockObject) + if (sw.IsRunning) sw.Stop(); + + // Indicate that the message pump is stopping + stopping = true; + + // Log to console + logger(Logging.LogLevel.Debug, () => "Asking the message pump with handlers to stop"); + + // Run the 'OnStop' on a different thread so we don't block it + Task.Run(() => { - if (sw.IsRunning) sw.Stop(); - if (!stopping) - { - // Indicate that the message pump is stopping - stopping = true; - - // Log to console - logger(Logging.LogLevel.Debug, () => "Asking the message pump with handlers to stop"); - - // Run the 'OnStop' on a different thread so we don't block it - Task.Run(() => - { - messagePump.Stop(); - logger(Logging.LogLevel.Debug, () => "The message pump with handlers has been stopped"); - }).ConfigureAwait(false); - } - } + messagePump.Stop(); + logger(Logging.LogLevel.Debug, () => "The message pump with handlers has been stopped"); + }).ConfigureAwait(false); } }; From 789482e4d1839254364ed4964d431cda83ca1d00 Mon Sep 17 00:00:00 2001 From: Jeremie Desautels Date: Mon, 22 Jan 2018 21:06:07 -0500 Subject: [PATCH 11/26] Upgrade CakeBuild to 0.25.0 --- build.cake | 2 +- tools/packages.config | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/build.cake b/build.cake index 3973d1a..187de06 100644 --- a/build.cake +++ b/build.cake @@ -5,7 +5,7 @@ #tool "nuget:?package=GitVersion.CommandLine&version=4.0.0-beta0012" #tool "nuget:?package=GitReleaseManager&version=0.6.0" #tool "nuget:?package=OpenCover&version=4.6.519" -#tool "nuget:?package=ReportGenerator&version=3.0.2" +#tool "nuget:?package=ReportGenerator&version=3.1.1" #tool "nuget:?package=coveralls.io&version=1.4.2" #tool "nuget:?package=xunit.runner.console&version=2.3.1" diff --git a/tools/packages.config b/tools/packages.config index 1e316f4..31f2b2a 100644 --- a/tools/packages.config +++ b/tools/packages.config @@ -1,4 +1,4 @@ - + From 56806bf8ec20628538f08f39d3ced4404418af89 Mon Sep 17 00:00:00 2001 From: Jeremie Desautels Date: Tue, 23 Jan 2018 17:04:10 -0500 Subject: [PATCH 12/26] Fetch messages from Azure queue synchronously Resolves #13 --- Source/Picton.Messaging/AsyncMessagePump.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Source/Picton.Messaging/AsyncMessagePump.cs b/Source/Picton.Messaging/AsyncMessagePump.cs index f4cffed..7b0ca9b 100644 --- a/Source/Picton.Messaging/AsyncMessagePump.cs +++ b/Source/Picton.Messaging/AsyncMessagePump.cs @@ -155,12 +155,12 @@ public void Stop() // Define the task that fetches messages from the Azure queue RecurrentCancellableTask.StartNew( - async () => + () => { // Fetched messages from the Azure queue when the number of items in the concurrent queue falls below an "acceptable" level. if (queuedMessages.Count <= _concurrentTasks / 2) { - var messages = await _queueManager.GetMessagesAsync(_concurrentTasks, visibilityTimeout, null, null, cancellationToken).ConfigureAwait(false); + var messages = _queueManager.GetMessagesAsync(_concurrentTasks, visibilityTimeout, null, null, cancellationToken).Result; if (messages.Any()) { _logger.Trace($"Fetched {messages.Count()} message(s) from the queue."); From cfac0e8302d4411faa7a95f3ae8968ba8a018d8f Mon Sep 17 00:00:00 2001 From: Jeremie Desautels Date: Tue, 23 Jan 2018 17:19:41 -0500 Subject: [PATCH 13/26] Handle TaskCanceledException when fetching messages from Azure queue Resolves #14 --- Source/Picton.Messaging/AsyncMessagePump.cs | 27 ++++++++++++++++++--- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/Source/Picton.Messaging/AsyncMessagePump.cs b/Source/Picton.Messaging/AsyncMessagePump.cs index 7b0ca9b..023ab8f 100644 --- a/Source/Picton.Messaging/AsyncMessagePump.cs +++ b/Source/Picton.Messaging/AsyncMessagePump.cs @@ -5,6 +5,7 @@ using Picton.Messaging.Utils; using System; using System.Collections.Concurrent; +using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; @@ -157,10 +158,26 @@ public void Stop() RecurrentCancellableTask.StartNew( () => { - // Fetched messages from the Azure queue when the number of items in the concurrent queue falls below an "acceptable" level. + // Fetch messages from the Azure queue when the number of items in the concurrent queue falls below an "acceptable" level. if (queuedMessages.Count <= _concurrentTasks / 2) { - var messages = _queueManager.GetMessagesAsync(_concurrentTasks, visibilityTimeout, null, null, cancellationToken).Result; + IEnumerable messages = null; + try + { + messages = _queueManager.GetMessagesAsync(_concurrentTasks, visibilityTimeout, null, null, cancellationToken).Result; + } + catch (TaskCanceledException) + { + // The message pump is shutting down. + // This exception can be safely ignored. + } + catch (Exception e) + { + _logger.InfoException("An error occured while fetching messages from the Azure queue. The error was caught and ignored.", e.GetBaseException()); + } + + if (messages == null) return; + if (messages.Any()) { _logger.Trace($"Fetched {messages.Count()} message(s) from the queue."); @@ -217,7 +234,8 @@ public void Stop() OnMessage?.Invoke(message, cancellationToken); // Delete the processed message from the queue - await _queueManager.DeleteMessageAsync(message, null, null, cancellationToken).ConfigureAwait(false); + // PLEASE NOTE: we use "CancellationToken.None" to ensure a processed message is deleted from the queue even when the message pump is shutting down + await _queueManager.DeleteMessageAsync(message, null, null, CancellationToken.None).ConfigureAwait(false); } catch (Exception ex) { @@ -225,7 +243,8 @@ public void Stop() OnError?.Invoke(message, ex, isPoison); if (isPoison) { - await _queueManager.DeleteMessageAsync(message, null, null, cancellationToken).ConfigureAwait(false); + // PLEASE NOTE: we use "CancellationToken.None" to ensure a processed message is deleted from the queue even when the message pump is shutting down + await _queueManager.DeleteMessageAsync(message, null, null, CancellationToken.None).ConfigureAwait(false); } } From 534db3515532034505faaf1b93dcce6919a66ca1 Mon Sep 17 00:00:00 2001 From: Jeremie Desautels Date: Tue, 23 Jan 2018 22:36:22 -0500 Subject: [PATCH 14/26] Ensure we stop attempting to fetch messages from Azure queue when the pump is shutting down #14 --- Source/Picton.Messaging/AsyncMessagePump.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Source/Picton.Messaging/AsyncMessagePump.cs b/Source/Picton.Messaging/AsyncMessagePump.cs index 023ab8f..9abfdcd 100644 --- a/Source/Picton.Messaging/AsyncMessagePump.cs +++ b/Source/Picton.Messaging/AsyncMessagePump.cs @@ -159,7 +159,7 @@ public void Stop() () => { // Fetch messages from the Azure queue when the number of items in the concurrent queue falls below an "acceptable" level. - if (queuedMessages.Count <= _concurrentTasks / 2) + if (!cancellationToken.IsCancellationRequested && queuedMessages.Count <= _concurrentTasks / 2) { IEnumerable messages = null; try From bad0705c074b2298a88cc85c8f67180a6225a73b Mon Sep 17 00:00:00 2001 From: Jeremie Desautels Date: Mon, 5 Feb 2018 19:54:19 -0500 Subject: [PATCH 15/26] Fix typos --- .../Picton.Messaging.IntegrationTests.csproj | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Source/Picton.Messaging.IntegrationTests/Picton.Messaging.IntegrationTests.csproj b/Source/Picton.Messaging.IntegrationTests/Picton.Messaging.IntegrationTests.csproj index 0e11557..3fc2613 100644 --- a/Source/Picton.Messaging.IntegrationTests/Picton.Messaging.IntegrationTests.csproj +++ b/Source/Picton.Messaging.IntegrationTests/Picton.Messaging.IntegrationTests.csproj @@ -3,8 +3,8 @@ Exe netcoreapp1.1 - Picton.Mesaging.IntegrationTests - Picton.Mesaging.IntegrationTests + Picton.Messaging.IntegrationTests + Picton.Messaging.IntegrationTests From f030843f8cbe98b16544818febd4df3cc78c51b1 Mon Sep 17 00:00:00 2001 From: Jeremie Desautels Date: Mon, 5 Feb 2018 19:59:45 -0500 Subject: [PATCH 16/26] Remove unnecessary locking and simplify tests --- .../Program.cs | 37 +++++-------------- .../AsyncMessagePumpTests.cs | 28 +++----------- 2 files changed, 15 insertions(+), 50 deletions(-) diff --git a/Source/Picton.Messaging.IntegrationTests/Program.cs b/Source/Picton.Messaging.IntegrationTests/Program.cs index c4b485c..3269d28 100644 --- a/Source/Picton.Messaging.IntegrationTests/Program.cs +++ b/Source/Picton.Messaging.IntegrationTests/Program.cs @@ -83,17 +83,13 @@ public static void ProcessSimpleMessages(string queueName, IStorageAccount stora // Stop the message pump when the queue is empty. messagePump.OnQueueEmpty = cancellationToken => { + // Stop the timer if (sw.IsRunning) sw.Stop(); - // Log to console + // Stop the message pump logger(Logging.LogLevel.Debug, () => "Asking the 'simple' message pump to stop"); - - // Run the 'OnStop' on a different thread so we don't block it - Task.Run(() => - { - messagePump.Stop(); - logger(Logging.LogLevel.Debug, () => "The 'simple' message pump has been stopped"); - }).ConfigureAwait(false); + messagePump.Stop(); + logger(Logging.LogLevel.Debug, () => "The 'simple' message pump has been stopped"); }; // Start the message pump @@ -120,32 +116,19 @@ public static void ProcessMessagesWithHandlers(string queueName, IStorageAccount { var logger = logProvider.GetLogger("ProcessMessagesWithHandlers"); - var stopping = false; Stopwatch sw = null; // Configure the message pump var messagePump = new AsyncMessagePumpWithHandlers(queueName, storageAccount, 10, TimeSpan.FromMinutes(1), 3); messagePump.OnQueueEmpty = cancellationToken => { - // Stop the message pump when the queue is empty. - // However, ensure that we try to stop it only once (otherwise each concurrent task would try to stop it) - if (!stopping) - { - if (sw.IsRunning) sw.Stop(); - - // Indicate that the message pump is stopping - stopping = true; - - // Log to console - logger(Logging.LogLevel.Debug, () => "Asking the message pump with handlers to stop"); + // Stop the timer + if (sw.IsRunning) sw.Stop(); - // Run the 'OnStop' on a different thread so we don't block it - Task.Run(() => - { - messagePump.Stop(); - logger(Logging.LogLevel.Debug, () => "The message pump with handlers has been stopped"); - }).ConfigureAwait(false); - } + // Stop the message pump + logger(Logging.LogLevel.Debug, () => "Asking the message pump with handlers to stop"); + messagePump.Stop(); + logger(Logging.LogLevel.Debug, () => "The message pump with handlers has been stopped"); }; // Start the message pump diff --git a/Source/Picton.Messaging.UnitTests/AsyncMessagePumpTests.cs b/Source/Picton.Messaging.UnitTests/AsyncMessagePumpTests.cs index 8303450..0fbb897 100644 --- a/Source/Picton.Messaging.UnitTests/AsyncMessagePumpTests.cs +++ b/Source/Picton.Messaging.UnitTests/AsyncMessagePumpTests.cs @@ -110,12 +110,7 @@ public void No_message_processed_when_queue_is_empty() messagePump.OnQueueEmpty = cancellationToken => { Interlocked.Increment(ref onQueueEmptyInvokeCount); - - // Run the 'OnStop' on a different thread so we don't block it - Task.Run(() => - { - messagePump.Stop(); - }).ConfigureAwait(false); + messagePump.Stop(); }; messagePump.OnError = (message, exception, isPoison) => { @@ -193,12 +188,7 @@ public void Message_processed() messagePump.OnQueueEmpty = cancellationToken => { Interlocked.Increment(ref onQueueEmptyInvokeCount); - - // Run the 'OnStop' on a different thread so we don't block it - Task.Run(() => - { - messagePump.Stop(); - }).ConfigureAwait(false); + messagePump.Stop(); }; messagePump.OnError = (message, exception, isPoison) => { @@ -279,12 +269,7 @@ public void Poison_message_is_rejected() messagePump.OnQueueEmpty = cancellationToken => { Interlocked.Increment(ref onQueueEmptyInvokeCount); - - // Run the 'OnStop' on a different thread so we don't block it - Task.Run(() => - { - messagePump.Stop(); - }).ConfigureAwait(false); + messagePump.Stop(); }; messagePump.OnError = (message, exception, isPoison) => { @@ -350,11 +335,8 @@ public void Exceptions_in_OnQueueEmpty_are_ignored() } } - // Run the 'OnStop' on a different thread so we don't block it - Task.Run(() => - { - messagePump.Stop(); - }).ConfigureAwait(false); + // Stop the message pump + messagePump.Stop(); }; messagePump.OnError = (message, exception, isPoison) => { From a69bd45fd655e7f6d9c47b4cfb28a20eac258b53 Mon Sep 17 00:00:00 2001 From: Jeremie Desautels Date: Mon, 5 Feb 2018 20:12:33 -0500 Subject: [PATCH 17/26] Upgrade nuget packages --- .../Picton.Messaging.UnitTests.csproj | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Source/Picton.Messaging.UnitTests/Picton.Messaging.UnitTests.csproj b/Source/Picton.Messaging.UnitTests/Picton.Messaging.UnitTests.csproj index 2dc8dab..c66990e 100644 --- a/Source/Picton.Messaging.UnitTests/Picton.Messaging.UnitTests.csproj +++ b/Source/Picton.Messaging.UnitTests/Picton.Messaging.UnitTests.csproj @@ -8,8 +8,8 @@ - - + + From 6920d6fb5aa22d9f081cee7f2394be3127ce260f Mon Sep 17 00:00:00 2001 From: Jeremie Desautels Date: Mon, 5 Feb 2018 20:12:59 -0500 Subject: [PATCH 18/26] Remove unnecessary nuget packages --- Source/Picton.Messaging/AsyncMessagePumpWithHandlers.cs | 6 +----- Source/Picton.Messaging/Picton.Messaging.csproj | 2 -- 2 files changed, 1 insertion(+), 7 deletions(-) diff --git a/Source/Picton.Messaging/AsyncMessagePumpWithHandlers.cs b/Source/Picton.Messaging/AsyncMessagePumpWithHandlers.cs index 4cd5cf6..f53dd8c 100644 --- a/Source/Picton.Messaging/AsyncMessagePumpWithHandlers.cs +++ b/Source/Picton.Messaging/AsyncMessagePumpWithHandlers.cs @@ -5,12 +5,8 @@ using Picton.Messaging.Messages; using System; using System.Collections.Generic; -using System.IO; using System.Linq; using System.Reflection; -#if NETSTANDARD -using System.Runtime.Loader; -#endif using System.Threading; namespace Picton.Messaging @@ -176,7 +172,7 @@ private static Type GetTypeInfo(Type type) private static IEnumerable GetLocalAssemblies() { var callingAssembly = Assembly.GetCallingAssembly(); - var path = new Uri(Path.GetDirectoryName(callingAssembly.Location)).AbsolutePath; + var path = new Uri(System.IO.Path.GetDirectoryName(callingAssembly.Location)).AbsolutePath; return AppDomain.CurrentDomain.GetAssemblies() .Where(x => !x.IsDynamic && new Uri(x.CodeBase).AbsolutePath.Contains(path)).ToList(); diff --git a/Source/Picton.Messaging/Picton.Messaging.csproj b/Source/Picton.Messaging/Picton.Messaging.csproj index 53cf58a..06204bb 100644 --- a/Source/Picton.Messaging/Picton.Messaging.csproj +++ b/Source/Picton.Messaging/Picton.Messaging.csproj @@ -28,9 +28,7 @@ - - All From ad2c1eb22d263ce405d0f2425030e478ed3f7de5 Mon Sep 17 00:00:00 2001 From: Jeremie Desautels Date: Mon, 5 Feb 2018 19:12:28 -0500 Subject: [PATCH 19/26] Add reference to App.Metrics nuget package and record when messages are fetched and processed --- Source/Picton.Messaging/AsyncMessagePump.cs | 130 ++++++++++-------- .../AsyncMessagePumpWithHandlers.cs | 13 +- Source/Picton.Messaging/Metrics.cs | 48 +++++++ .../Picton.Messaging/Picton.Messaging.csproj | 1 + 4 files changed, 130 insertions(+), 62 deletions(-) create mode 100644 Source/Picton.Messaging/Metrics.cs diff --git a/Source/Picton.Messaging/AsyncMessagePump.cs b/Source/Picton.Messaging/AsyncMessagePump.cs index 9abfdcd..b6b4c08 100644 --- a/Source/Picton.Messaging/AsyncMessagePump.cs +++ b/Source/Picton.Messaging/AsyncMessagePump.cs @@ -1,4 +1,5 @@ -using Microsoft.WindowsAzure.Storage; +using App.Metrics; +using Microsoft.WindowsAzure.Storage; using Picton.Interfaces; using Picton.Managers; using Picton.Messaging.Logging; @@ -26,6 +27,7 @@ public class AsyncMessagePump private readonly int _concurrentTasks; private readonly TimeSpan? _visibilityTimeout; private readonly int _maxDequeueCount; + private readonly IMetricsRoot _metrics; private CancellationTokenSource _cancellationTokenSource; private ManualResetEvent _safeToExitHandle; @@ -77,8 +79,9 @@ public class AsyncMessagePump /// The number of concurrent tasks. /// The visibility timeout. /// The maximum dequeue count. - public AsyncMessagePump(string queueName, CloudStorageAccount cloudStorageAccount, int concurrentTasks = 25, TimeSpan? visibilityTimeout = null, int maxDequeueCount = 3) - : this(queueName, StorageAccount.FromCloudStorageAccount(cloudStorageAccount), concurrentTasks, visibilityTimeout, maxDequeueCount) + /// + public AsyncMessagePump(string queueName, CloudStorageAccount cloudStorageAccount, int concurrentTasks = 25, TimeSpan? visibilityTimeout = null, int maxDequeueCount = 3, IMetricsBuilder metricsBuilder = null) + : this(queueName, StorageAccount.FromCloudStorageAccount(cloudStorageAccount), concurrentTasks, visibilityTimeout, maxDequeueCount, metricsBuilder) { } @@ -90,7 +93,8 @@ public AsyncMessagePump(string queueName, CloudStorageAccount cloudStorageAccoun /// The number of concurrent tasks. /// The queue visibility timeout /// The number of times to try processing a given message before giving up - public AsyncMessagePump(string queueName, IStorageAccount storageAccount, int concurrentTasks = 25, TimeSpan? visibilityTimeout = null, int maxDequeueCount = 3) + /// + public AsyncMessagePump(string queueName, IStorageAccount storageAccount, int concurrentTasks = 25, TimeSpan? visibilityTimeout = null, int maxDequeueCount = 3, IMetricsBuilder metricsBuilder = null) { if (concurrentTasks < 1) throw new ArgumentException("Number of concurrent tasks must be greather than zero", nameof(concurrentTasks)); if (maxDequeueCount < 1) throw new ArgumentException("Number of retries must be greather than zero", nameof(maxDequeueCount)); @@ -100,6 +104,8 @@ public AsyncMessagePump(string queueName, IStorageAccount storageAccount, int co _visibilityTimeout = visibilityTimeout; _maxDequeueCount = maxDequeueCount; + _metrics = (metricsBuilder ?? new MetricsBuilder()).Build(); + OnQueueEmpty = cancellationToken => Task.Delay(1500, cancellationToken).Wait(); OnError = (message, exception, isPoison) => _logger.ErrorException("An error occured when processing a message", exception); } @@ -161,43 +167,48 @@ public void Stop() // Fetch messages from the Azure queue when the number of items in the concurrent queue falls below an "acceptable" level. if (!cancellationToken.IsCancellationRequested && queuedMessages.Count <= _concurrentTasks / 2) { - IEnumerable messages = null; - try - { - messages = _queueManager.GetMessagesAsync(_concurrentTasks, visibilityTimeout, null, null, cancellationToken).Result; - } - catch (TaskCanceledException) + using (_metrics.Measure.Timer.Time(Metrics.MessageFetchingTimer)) { - // The message pump is shutting down. - // This exception can be safely ignored. - } - catch (Exception e) - { - _logger.InfoException("An error occured while fetching messages from the Azure queue. The error was caught and ignored.", e.GetBaseException()); - } - - if (messages == null) return; - - if (messages.Any()) - { - _logger.Trace($"Fetched {messages.Count()} message(s) from the queue."); - - foreach (var message in messages) + IEnumerable messages = null; + try { - queuedMessages.Enqueue(message); + messages = _queueManager.GetMessagesAsync(_concurrentTasks, visibilityTimeout, null, null, cancellationToken).Result; + _metrics.Measure.Counter.Increment(Metrics.MessagesFetchingCounter); } - } - else - { - _logger.Trace("The queue is empty, no messages fetched."); - try + catch (TaskCanceledException) { - // The queue is empty - OnQueueEmpty?.Invoke(cancellationToken); + // The message pump is shutting down. + // This exception can be safely ignored. } catch (Exception e) { - _logger.InfoException("An error occured when handling an empty queue. The error was caught and ignored.", e.GetBaseException()); + _logger.InfoException("An error occured while fetching messages from the Azure queue. The error was caught and ignored.", e.GetBaseException()); + } + + if (messages == null) return; + + if (messages.Any()) + { + _logger.Trace($"Fetched {messages.Count()} message(s) from the queue."); + + foreach (var message in messages) + { + queuedMessages.Enqueue(message); + } + } + else + { + _logger.Trace("The queue is empty, no messages fetched."); + try + { + // The queue is empty + OnQueueEmpty?.Invoke(cancellationToken); + _metrics.Measure.Counter.Increment(Metrics.QueueEmptyCounter); + } + catch (Exception e) + { + _logger.InfoException("An error occured when handling an empty queue. The error was caught and ignored.", e.GetBaseException()); + } } } } @@ -217,40 +228,45 @@ public void Stop() var runningTask = Task.Run( async () => { - if (cancellationToken.IsCancellationRequested) return false; + var messageProcessed = false; - queuedMessages.TryDequeue(out CloudMessage message); + if (cancellationToken.IsCancellationRequested) return messageProcessed; - if (message == null) - { - // False indicates that no message was processed - return false; - } - else + using (_metrics.Measure.Timer.Time(Metrics.MessageProcessingTimer)) { - try - { - // Process the message - OnMessage?.Invoke(message, cancellationToken); + queuedMessages.TryDequeue(out CloudMessage message); - // Delete the processed message from the queue - // PLEASE NOTE: we use "CancellationToken.None" to ensure a processed message is deleted from the queue even when the message pump is shutting down - await _queueManager.DeleteMessageAsync(message, null, null, CancellationToken.None).ConfigureAwait(false); - } - catch (Exception ex) + if (message != null) { - var isPoison = message.DequeueCount > _maxDequeueCount; - OnError?.Invoke(message, ex, isPoison); - if (isPoison) + try { + // Process the message + OnMessage?.Invoke(message, cancellationToken); + + // Delete the processed message from the queue // PLEASE NOTE: we use "CancellationToken.None" to ensure a processed message is deleted from the queue even when the message pump is shutting down await _queueManager.DeleteMessageAsync(message, null, null, CancellationToken.None).ConfigureAwait(false); } - } + catch (Exception ex) + { + var isPoison = message.DequeueCount > _maxDequeueCount; + OnError?.Invoke(message, ex, isPoison); + if (isPoison) + { + // PLEASE NOTE: we use "CancellationToken.None" to ensure a processed message is deleted from the queue even when the message pump is shutting down + await _queueManager.DeleteMessageAsync(message, null, null, CancellationToken.None).ConfigureAwait(false); + } + } - // True indicates that a message was processed - return true; + messageProcessed = true; + } } + + // Increment the counter if we processed a message + if (messageProcessed) _metrics.Measure.Counter.Increment(Metrics.MessagesProcessedCounter); + + // Return a value indicating whether we processed a message or not + return messageProcessed; }, CancellationToken.None); diff --git a/Source/Picton.Messaging/AsyncMessagePumpWithHandlers.cs b/Source/Picton.Messaging/AsyncMessagePumpWithHandlers.cs index f53dd8c..bad2520 100644 --- a/Source/Picton.Messaging/AsyncMessagePumpWithHandlers.cs +++ b/Source/Picton.Messaging/AsyncMessagePumpWithHandlers.cs @@ -1,4 +1,5 @@ -using Microsoft.Extensions.DependencyModel; +using App.Metrics; +using Microsoft.Extensions.DependencyModel; using Microsoft.WindowsAzure.Storage; using Picton.Interfaces; using Picton.Messaging.Logging; @@ -70,8 +71,9 @@ public Action OnQueueEmpty /// The number of concurrent tasks. /// The visibility timeout. /// The maximum dequeue count. - public AsyncMessagePumpWithHandlers(string queueName, CloudStorageAccount cloudStorageAccount, int concurrentTasks = 25, TimeSpan? visibilityTimeout = null, int maxDequeueCount = 3) - : this(queueName, StorageAccount.FromCloudStorageAccount(cloudStorageAccount), concurrentTasks, visibilityTimeout, maxDequeueCount) + /// + public AsyncMessagePumpWithHandlers(string queueName, CloudStorageAccount cloudStorageAccount, int concurrentTasks = 25, TimeSpan? visibilityTimeout = null, int maxDequeueCount = 3, IMetricsBuilder metricsBuilder = null) + : this(queueName, StorageAccount.FromCloudStorageAccount(cloudStorageAccount), concurrentTasks, visibilityTimeout, maxDequeueCount, metricsBuilder) { } @@ -83,9 +85,10 @@ public AsyncMessagePumpWithHandlers(string queueName, CloudStorageAccount cloudS /// The number of concurrent tasks. /// The queue visibility timeout /// The number of times to try processing a given message before giving up - public AsyncMessagePumpWithHandlers(string queueName, IStorageAccount storageAccount, int concurrentTasks = 25, TimeSpan? visibilityTimeout = null, int maxDequeueCount = 3) + /// + public AsyncMessagePumpWithHandlers(string queueName, IStorageAccount storageAccount, int concurrentTasks = 25, TimeSpan? visibilityTimeout = null, int maxDequeueCount = 3, IMetricsBuilder metricsBuilder = null) { - _messagePump = new AsyncMessagePump(queueName, storageAccount, concurrentTasks, visibilityTimeout, maxDequeueCount) + _messagePump = new AsyncMessagePump(queueName, storageAccount, concurrentTasks, visibilityTimeout, maxDequeueCount, metricsBuilder) { OnMessage = (message, cancellationToken) => { diff --git a/Source/Picton.Messaging/Metrics.cs b/Source/Picton.Messaging/Metrics.cs new file mode 100644 index 0000000..184b6b8 --- /dev/null +++ b/Source/Picton.Messaging/Metrics.cs @@ -0,0 +1,48 @@ +using App.Metrics; +using App.Metrics.Counter; +using App.Metrics.Timer; + +namespace Picton.Messaging +{ + internal static class Metrics + { + /// + /// Gets the counter indicating the number of messages processed by the message pump + /// + public static CounterOptions MessagesProcessedCounter => new CounterOptions + { + Name = "Messages processed", + MeasurementUnit = Unit.Items + }; + + /// + /// Gets the counter indicating the time it takes to process a message + /// + public static TimerOptions MessageProcessingTimer => new TimerOptions + { + Name = "Message processing timer" + }; + + /// + /// Gets the counter indicating the number of times messages have been fetched from the Azure queue + /// + public static CounterOptions FetchMessagesCounter => new CounterOptions + { + Name = "Fetch Messages", + MeasurementUnit = Unit.Items + }; + + public static TimerOptions MessageFetchingTimer => new TimerOptions + { + Name = "Message fetching timer" + }; + + /// + /// Gets the counter indicating the number of times we attempted to fetch messages from the Azure queue but the queue was empty + /// + public static CounterOptions QueueEmptyCounter => new CounterOptions + { + Name = "Queue Empty" + }; + } +} diff --git a/Source/Picton.Messaging/Picton.Messaging.csproj b/Source/Picton.Messaging/Picton.Messaging.csproj index 06204bb..6277776 100644 --- a/Source/Picton.Messaging/Picton.Messaging.csproj +++ b/Source/Picton.Messaging/Picton.Messaging.csproj @@ -28,6 +28,7 @@ + From fd286e40430cdfa799328955d5a40b5839c9a068 Mon Sep 17 00:00:00 2001 From: Jeremie Desautels Date: Mon, 5 Feb 2018 20:04:56 -0500 Subject: [PATCH 20/26] Fix incorrect metric name --- Source/Picton.Messaging/AsyncMessagePump.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Source/Picton.Messaging/AsyncMessagePump.cs b/Source/Picton.Messaging/AsyncMessagePump.cs index b6b4c08..d092355 100644 --- a/Source/Picton.Messaging/AsyncMessagePump.cs +++ b/Source/Picton.Messaging/AsyncMessagePump.cs @@ -173,7 +173,7 @@ public void Stop() try { messages = _queueManager.GetMessagesAsync(_concurrentTasks, visibilityTimeout, null, null, cancellationToken).Result; - _metrics.Measure.Counter.Increment(Metrics.MessagesFetchingCounter); + _metrics.Measure.Counter.Increment(Metrics.FetchMessagesCounter); } catch (TaskCanceledException) { From b7afb6deb36ff1c55c9623bfc4e8192419be2b99 Mon Sep 17 00:00:00 2001 From: Jeremie Desautels Date: Mon, 5 Feb 2018 22:14:39 -0500 Subject: [PATCH 21/26] Change the type of the metrics dependency --- Source/Picton.Messaging/AsyncMessagePump.cs | 27 ++++++++++++++----- .../AsyncMessagePumpWithHandlers.cs | 12 ++++----- 2 files changed, 26 insertions(+), 13 deletions(-) diff --git a/Source/Picton.Messaging/AsyncMessagePump.cs b/Source/Picton.Messaging/AsyncMessagePump.cs index d092355..b441c07 100644 --- a/Source/Picton.Messaging/AsyncMessagePump.cs +++ b/Source/Picton.Messaging/AsyncMessagePump.cs @@ -27,7 +27,7 @@ public class AsyncMessagePump private readonly int _concurrentTasks; private readonly TimeSpan? _visibilityTimeout; private readonly int _maxDequeueCount; - private readonly IMetricsRoot _metrics; + private readonly IMetrics _metrics; private CancellationTokenSource _cancellationTokenSource; private ManualResetEvent _safeToExitHandle; @@ -79,9 +79,9 @@ public class AsyncMessagePump /// The number of concurrent tasks. /// The visibility timeout. /// The maximum dequeue count. - /// - public AsyncMessagePump(string queueName, CloudStorageAccount cloudStorageAccount, int concurrentTasks = 25, TimeSpan? visibilityTimeout = null, int maxDequeueCount = 3, IMetricsBuilder metricsBuilder = null) - : this(queueName, StorageAccount.FromCloudStorageAccount(cloudStorageAccount), concurrentTasks, visibilityTimeout, maxDequeueCount, metricsBuilder) + /// + public AsyncMessagePump(string queueName, CloudStorageAccount cloudStorageAccount, int concurrentTasks = 25, TimeSpan? visibilityTimeout = null, int maxDequeueCount = 3, IMetrics metrics = null) + : this(queueName, StorageAccount.FromCloudStorageAccount(cloudStorageAccount), concurrentTasks, visibilityTimeout, maxDequeueCount, metrics) { } @@ -93,8 +93,8 @@ public AsyncMessagePump(string queueName, CloudStorageAccount cloudStorageAccoun /// The number of concurrent tasks. /// The queue visibility timeout /// The number of times to try processing a given message before giving up - /// - public AsyncMessagePump(string queueName, IStorageAccount storageAccount, int concurrentTasks = 25, TimeSpan? visibilityTimeout = null, int maxDequeueCount = 3, IMetricsBuilder metricsBuilder = null) + /// + public AsyncMessagePump(string queueName, IStorageAccount storageAccount, int concurrentTasks = 25, TimeSpan? visibilityTimeout = null, int maxDequeueCount = 3, IMetrics metrics = null) { if (concurrentTasks < 1) throw new ArgumentException("Number of concurrent tasks must be greather than zero", nameof(concurrentTasks)); if (maxDequeueCount < 1) throw new ArgumentException("Number of retries must be greather than zero", nameof(maxDequeueCount)); @@ -104,7 +104,20 @@ public AsyncMessagePump(string queueName, IStorageAccount storageAccount, int co _visibilityTimeout = visibilityTimeout; _maxDequeueCount = maxDequeueCount; - _metrics = (metricsBuilder ?? new MetricsBuilder()).Build(); + if (metrics == null) + { + var noop = new MetricsBuilder(); + noop.Configuration.Configure(new MetricsOptions() + { + Enabled = false, + ReportingEnabled = false + }); + _metrics = noop.Build(); + } + else + { + _metrics = metrics; + } OnQueueEmpty = cancellationToken => Task.Delay(1500, cancellationToken).Wait(); OnError = (message, exception, isPoison) => _logger.ErrorException("An error occured when processing a message", exception); diff --git a/Source/Picton.Messaging/AsyncMessagePumpWithHandlers.cs b/Source/Picton.Messaging/AsyncMessagePumpWithHandlers.cs index bad2520..5a16fcc 100644 --- a/Source/Picton.Messaging/AsyncMessagePumpWithHandlers.cs +++ b/Source/Picton.Messaging/AsyncMessagePumpWithHandlers.cs @@ -71,9 +71,9 @@ public Action OnQueueEmpty /// The number of concurrent tasks. /// The visibility timeout. /// The maximum dequeue count. - /// - public AsyncMessagePumpWithHandlers(string queueName, CloudStorageAccount cloudStorageAccount, int concurrentTasks = 25, TimeSpan? visibilityTimeout = null, int maxDequeueCount = 3, IMetricsBuilder metricsBuilder = null) - : this(queueName, StorageAccount.FromCloudStorageAccount(cloudStorageAccount), concurrentTasks, visibilityTimeout, maxDequeueCount, metricsBuilder) + /// + public AsyncMessagePumpWithHandlers(string queueName, CloudStorageAccount cloudStorageAccount, int concurrentTasks = 25, TimeSpan? visibilityTimeout = null, int maxDequeueCount = 3, IMetricsRoot metrics = null) + : this(queueName, StorageAccount.FromCloudStorageAccount(cloudStorageAccount), concurrentTasks, visibilityTimeout, maxDequeueCount, metrics) { } @@ -85,10 +85,10 @@ public AsyncMessagePumpWithHandlers(string queueName, CloudStorageAccount cloudS /// The number of concurrent tasks. /// The queue visibility timeout /// The number of times to try processing a given message before giving up - /// - public AsyncMessagePumpWithHandlers(string queueName, IStorageAccount storageAccount, int concurrentTasks = 25, TimeSpan? visibilityTimeout = null, int maxDequeueCount = 3, IMetricsBuilder metricsBuilder = null) + /// + public AsyncMessagePumpWithHandlers(string queueName, IStorageAccount storageAccount, int concurrentTasks = 25, TimeSpan? visibilityTimeout = null, int maxDequeueCount = 3, IMetricsRoot metrics = null) { - _messagePump = new AsyncMessagePump(queueName, storageAccount, concurrentTasks, visibilityTimeout, maxDequeueCount, metricsBuilder) + _messagePump = new AsyncMessagePump(queueName, storageAccount, concurrentTasks, visibilityTimeout, maxDequeueCount, metrics) { OnMessage = (message, cancellationToken) => { From ba38d61984e63de587fb11020c4177fec4feb7f9 Mon Sep 17 00:00:00 2001 From: Jeremie Desautels Date: Tue, 13 Feb 2018 16:12:06 -0500 Subject: [PATCH 22/26] Inject the IMetrics interface instead of IMetricsRoot --- Source/Picton.Messaging/AsyncMessagePumpWithHandlers.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Source/Picton.Messaging/AsyncMessagePumpWithHandlers.cs b/Source/Picton.Messaging/AsyncMessagePumpWithHandlers.cs index 5a16fcc..1c2d558 100644 --- a/Source/Picton.Messaging/AsyncMessagePumpWithHandlers.cs +++ b/Source/Picton.Messaging/AsyncMessagePumpWithHandlers.cs @@ -72,7 +72,7 @@ public Action OnQueueEmpty /// The visibility timeout. /// The maximum dequeue count. /// - public AsyncMessagePumpWithHandlers(string queueName, CloudStorageAccount cloudStorageAccount, int concurrentTasks = 25, TimeSpan? visibilityTimeout = null, int maxDequeueCount = 3, IMetricsRoot metrics = null) + public AsyncMessagePumpWithHandlers(string queueName, CloudStorageAccount cloudStorageAccount, int concurrentTasks = 25, TimeSpan? visibilityTimeout = null, int maxDequeueCount = 3, IMetrics metrics = null) : this(queueName, StorageAccount.FromCloudStorageAccount(cloudStorageAccount), concurrentTasks, visibilityTimeout, maxDequeueCount, metrics) { } @@ -86,7 +86,7 @@ public AsyncMessagePumpWithHandlers(string queueName, CloudStorageAccount cloudS /// The queue visibility timeout /// The number of times to try processing a given message before giving up /// - public AsyncMessagePumpWithHandlers(string queueName, IStorageAccount storageAccount, int concurrentTasks = 25, TimeSpan? visibilityTimeout = null, int maxDequeueCount = 3, IMetricsRoot metrics = null) + public AsyncMessagePumpWithHandlers(string queueName, IStorageAccount storageAccount, int concurrentTasks = 25, TimeSpan? visibilityTimeout = null, int maxDequeueCount = 3, IMetrics metrics = null) { _messagePump = new AsyncMessagePump(queueName, storageAccount, concurrentTasks, visibilityTimeout, maxDequeueCount, metrics) { From 590db57756516145609c9743be719835bcb30904 Mon Sep 17 00:00:00 2001 From: Jeremie Desautels Date: Tue, 13 Feb 2018 16:49:36 -0500 Subject: [PATCH 23/26] Capture and report the number of messages pending in the queue --- Source/Picton.Messaging.sln | 2 +- Source/Picton.Messaging/AsyncMessagePump.cs | 29 +++++++++++++++-- Source/Picton.Messaging/Metrics.cs | 32 +++++++++++-------- .../Picton.Messaging/Picton.Messaging.csproj | 4 +-- 4 files changed, 48 insertions(+), 19 deletions(-) diff --git a/Source/Picton.Messaging.sln b/Source/Picton.Messaging.sln index 513bf68..704ef72 100644 --- a/Source/Picton.Messaging.sln +++ b/Source/Picton.Messaging.sln @@ -1,7 +1,7 @@  Microsoft Visual Studio Solution File, Format Version 12.00 # Visual Studio 15 -VisualStudioVersion = 15.0.27130.2010 +VisualStudioVersion = 15.0.27130.2027 MinimumVisualStudioVersion = 10.0.40219.1 Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{53D250B0-3F11-4CD4-AF30-5F636B405D87}" ProjectSection(SolutionItems) = preProject diff --git a/Source/Picton.Messaging/AsyncMessagePump.cs b/Source/Picton.Messaging/AsyncMessagePump.cs index b441c07..c7129e5 100644 --- a/Source/Picton.Messaging/AsyncMessagePump.cs +++ b/Source/Picton.Messaging/AsyncMessagePump.cs @@ -175,7 +175,7 @@ public void Stop() // Define the task that fetches messages from the Azure queue RecurrentCancellableTask.StartNew( - () => + async () => { // Fetch messages from the Azure queue when the number of items in the concurrent queue falls below an "acceptable" level. if (!cancellationToken.IsCancellationRequested && queuedMessages.Count <= _concurrentTasks / 2) @@ -185,8 +185,7 @@ public void Stop() IEnumerable messages = null; try { - messages = _queueManager.GetMessagesAsync(_concurrentTasks, visibilityTimeout, null, null, cancellationToken).Result; - _metrics.Measure.Counter.Increment(Metrics.FetchMessagesCounter); + messages = await _queueManager.GetMessagesAsync(_concurrentTasks, visibilityTimeout, null, null, cancellationToken).ConfigureAwait(false); } catch (TaskCanceledException) { @@ -230,6 +229,30 @@ public void Stop() cancellationToken, TaskCreationOptions.LongRunning); + // Define the task that checks how many messages are queued + RecurrentCancellableTask.StartNew( + async () => + { + try + { + var count = await _queueManager.GetApproximateMessageCountAsync(cancellationToken).ConfigureAwait(false); + count += queuedMessages.Count; + _metrics.Measure.Gauge.SetValue(Metrics.QueuedMessagesGauge, count); + } + catch (TaskCanceledException) + { + // The message pump is shutting down. + // This exception can be safely ignored. + } + catch (Exception e) + { + _logger.InfoException("An error occured while checking how many message are waiting in the queue. The error was caught and ignored.", e.GetBaseException()); + } + }, + TimeSpan.FromMilliseconds(5000), + cancellationToken, + TaskCreationOptions.LongRunning); + // Define the task pump var pumpTask = Task.Run(async () => { diff --git a/Source/Picton.Messaging/Metrics.cs b/Source/Picton.Messaging/Metrics.cs index 184b6b8..72d0ea0 100644 --- a/Source/Picton.Messaging/Metrics.cs +++ b/Source/Picton.Messaging/Metrics.cs @@ -1,5 +1,6 @@ using App.Metrics; using App.Metrics.Counter; +using App.Metrics.Gauge; using App.Metrics.Timer; namespace Picton.Messaging @@ -11,7 +12,8 @@ internal static class Metrics /// public static CounterOptions MessagesProcessedCounter => new CounterOptions { - Name = "Messages processed", + Context = "Picton", + Name = "MessagesProcessedCount", MeasurementUnit = Unit.Items }; @@ -20,21 +22,14 @@ internal static class Metrics /// public static TimerOptions MessageProcessingTimer => new TimerOptions { - Name = "Message processing timer" - }; - - /// - /// Gets the counter indicating the number of times messages have been fetched from the Azure queue - /// - public static CounterOptions FetchMessagesCounter => new CounterOptions - { - Name = "Fetch Messages", - MeasurementUnit = Unit.Items + Context = "Picton", + Name = "MessageProcessingTime" }; public static TimerOptions MessageFetchingTimer => new TimerOptions { - Name = "Message fetching timer" + Context = "Picton", + Name = "MessageFetchingTime" }; /// @@ -42,7 +37,18 @@ internal static class Metrics /// public static CounterOptions QueueEmptyCounter => new CounterOptions { - Name = "Queue Empty" + Context = "Picton", + Name = "QueueEmptyCount" + }; + + /// + /// Gets the guage indicating the number of messages waiting in the queue over time + /// + public static GaugeOptions QueuedMessagesGauge => new GaugeOptions + { + Context = "Picton", + Name = "QueuedMessages", + MeasurementUnit = Unit.Items }; } } diff --git a/Source/Picton.Messaging/Picton.Messaging.csproj b/Source/Picton.Messaging/Picton.Messaging.csproj index 6277776..86dcf0a 100644 --- a/Source/Picton.Messaging/Picton.Messaging.csproj +++ b/Source/Picton.Messaging/Picton.Messaging.csproj @@ -28,9 +28,9 @@ - + - + All From 648b5206c3c546efc79c4d0e5e75e845a420b582 Mon Sep 17 00:00:00 2001 From: Jeremie Desautels Date: Tue, 13 Feb 2018 16:50:37 -0500 Subject: [PATCH 24/26] Report metrics to Datadog in integration testing --- .../Datadog/DatadogFormatter.cs | 41 ++++++ .../Datadog/DatadogFormatterOptions.cs | 14 ++ .../Datadog/MetricJson.cs | 13 ++ .../Datadog/MetricSnapshotDatadogWriter.cs | 139 ++++++++++++++++++ .../Datadog/SeriesJson.cs | 7 + .../Picton.Messaging.IntegrationTests.csproj | 4 + .../Program.cs | 51 +++++-- .../Utils/RecurrentCancellableTask.cs | 6 +- 8 files changed, 262 insertions(+), 13 deletions(-) create mode 100644 Source/Picton.Messaging.IntegrationTests/Datadog/DatadogFormatter.cs create mode 100644 Source/Picton.Messaging.IntegrationTests/Datadog/DatadogFormatterOptions.cs create mode 100644 Source/Picton.Messaging.IntegrationTests/Datadog/MetricJson.cs create mode 100644 Source/Picton.Messaging.IntegrationTests/Datadog/MetricSnapshotDatadogWriter.cs create mode 100644 Source/Picton.Messaging.IntegrationTests/Datadog/SeriesJson.cs diff --git a/Source/Picton.Messaging.IntegrationTests/Datadog/DatadogFormatter.cs b/Source/Picton.Messaging.IntegrationTests/Datadog/DatadogFormatter.cs new file mode 100644 index 0000000..205f301 --- /dev/null +++ b/Source/Picton.Messaging.IntegrationTests/Datadog/DatadogFormatter.cs @@ -0,0 +1,41 @@ +using App.Metrics; +using App.Metrics.Formatters; +using App.Metrics.Serialization; +using System.IO; +using System.Threading; +using System.Threading.Tasks; + +namespace Picton.Messaging.IntegrationTests.Datadog +{ + /// + /// Formatter for encoding Metrics in Datadog JSON + /// + public class DatadogFormatter : IMetricsOutputFormatter + { + private readonly DatadogFormatterOptions _options; + + /// + /// Constructor + /// + public DatadogFormatter(DatadogFormatterOptions options) + { + _options = options; + } + + /// + public Task WriteAsync(Stream output, MetricsDataValueSource metricsData, CancellationToken cancellationToken = new CancellationToken()) + { + var serializer = new MetricSnapshotSerializer(); + using (var streamWriter = new StreamWriter(output)) + { + using (var writer = new MetricSnapshotDatadogWriter(streamWriter, _options)) + serializer.Serialize(writer, metricsData); + } + + return Task.CompletedTask; + } + + /// + public MetricsMediaTypeValue MediaType => new MetricsMediaTypeValue("application", "com.datadoghq.metrics", "v1", "json"); + } +} diff --git a/Source/Picton.Messaging.IntegrationTests/Datadog/DatadogFormatterOptions.cs b/Source/Picton.Messaging.IntegrationTests/Datadog/DatadogFormatterOptions.cs new file mode 100644 index 0000000..987bbe3 --- /dev/null +++ b/Source/Picton.Messaging.IntegrationTests/Datadog/DatadogFormatterOptions.cs @@ -0,0 +1,14 @@ + +namespace Picton.Messaging.IntegrationTests.Datadog +{ + /// + /// Options for data reported to Datadog + /// + public class DatadogFormatterOptions + { + /// + /// The Hostname that is reported. Usually Environment.MachineName + /// + public string Hostname { get; set; } + } +} diff --git a/Source/Picton.Messaging.IntegrationTests/Datadog/MetricJson.cs b/Source/Picton.Messaging.IntegrationTests/Datadog/MetricJson.cs new file mode 100644 index 0000000..85fdcbb --- /dev/null +++ b/Source/Picton.Messaging.IntegrationTests/Datadog/MetricJson.cs @@ -0,0 +1,13 @@ +namespace Picton.Messaging.IntegrationTests.Datadog +{ + /// + /// For serializing http://docs.datadoghq.com/api/?lang=console#metrics + /// + class MetricJson + { + public string Metric { get; set; } + public object[][] Points { get; set; } + public string Host { get; set; } + public string[] Tags { get; set; } + } +} diff --git a/Source/Picton.Messaging.IntegrationTests/Datadog/MetricSnapshotDatadogWriter.cs b/Source/Picton.Messaging.IntegrationTests/Datadog/MetricSnapshotDatadogWriter.cs new file mode 100644 index 0000000..998ca36 --- /dev/null +++ b/Source/Picton.Messaging.IntegrationTests/Datadog/MetricSnapshotDatadogWriter.cs @@ -0,0 +1,139 @@ +using App.Metrics; +using App.Metrics.Serialization; +using Newtonsoft.Json; +using Newtonsoft.Json.Serialization; +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; + +namespace Picton.Messaging.IntegrationTests.Datadog +{ + class MetricSnapshotDatadogWriter : IMetricSnapshotWriter + { + private StreamWriter _streamWriter; + private readonly DatadogFormatterOptions _options; + private readonly List _metrics = new List(); + + static readonly JsonSerializerSettings JsonSettings = new JsonSerializerSettings + { + ContractResolver = new LowercaseContractResolver() + }; + + private class LowercaseContractResolver : DefaultContractResolver + { + protected override string ResolvePropertyName(string propertyName) + { + return propertyName.ToLower(); + } + } + + public MetricSnapshotDatadogWriter(StreamWriter streamWriter, DatadogFormatterOptions options) + { + _streamWriter = streamWriter; + _options = options; + } + + public void Dispose() + { + if (_streamWriter != null) + { + Flush(); + + _streamWriter?.Dispose(); + _streamWriter = null; + } + } + + /// + public void Write(string context, string name, object value, MetricTags tags, DateTime timestamp) + { + Write(context, name, new[] { "value" }, new[] { value }, tags, timestamp); + } + + /// + public void Write(string context, string name, IEnumerable columns, IEnumerable values, MetricTags tags, DateTime timestamp) + { + var posixTimestamp = (timestamp - new DateTimeOffset(1970, 1, 1, 0, 0, 0, TimeSpan.Zero)).TotalSeconds; //TODO: Check this + + var dict = columns.Zip(values, (column, value) => new { column, value }).ToDictionary(p => p.column, p => p.value); + + switch (tags.Values[Array.IndexOf(tags.Keys, "mtype")]) + { + case "apdex": + Write(posixTimestamp, context, name, "count", dict["samples"]); + Write(posixTimestamp, context, name, "score", dict["score"]); + Write(posixTimestamp, context, name, "satisfied", dict["satisfied"]); + Write(posixTimestamp, context, name, "tolerating", dict["tolerating"]); + Write(posixTimestamp, context, name, "frustrating", dict["frustrating"]); + break; + case "gauge": + Write(posixTimestamp, context, name, dict["value"]); + break; + case "counter": + if (dict.ContainsKey("value")) + Write(posixTimestamp, context, name, dict["value"]); + break; + case "meter": + Write(posixTimestamp, context, name, "count", dict["count.meter"]); + Write(posixTimestamp, context, name, "15m", dict["rate15m"]); + Write(posixTimestamp, context, name, "5m", dict["rate5m"]); + Write(posixTimestamp, context, name, "1m", dict["rate1m"]); + Write(posixTimestamp, context, name, "avg", dict["rate.mean"]); + break; + case "timer": + Write(posixTimestamp, context, name, "1mrate", dict["rate1m"]); + Write(posixTimestamp, context, name, "5mrate", dict["rate5m"]); + Write(posixTimestamp, context, name, "15mrate", dict["rate15m"]); + WriteHistogram(posixTimestamp, context, name, dict); + break; + case "histogram": + WriteHistogram(posixTimestamp, context, name, dict); + break; + } + } + + private void WriteHistogram(double posixTimestamp, string context, string name, Dictionary dict) + { + Write(posixTimestamp, context, name, "count", dict["count.hist"]); + + Write(posixTimestamp, context, name, "max", dict["max"]); + Write(posixTimestamp, context, name, "avg", dict["mean"]); + Write(posixTimestamp, context, name, "median", dict["median"]); + Write(posixTimestamp, context, name, "min", dict["min"]); + Write(posixTimestamp, context, name, "stdDev", dict["stddev"]); + + Write(posixTimestamp, context, name, "75percentile", dict["p75"]); + Write(posixTimestamp, context, name, "95percentile", dict["p95"]); + Write(posixTimestamp, context, name, "98percentile", dict["p98"]); + Write(posixTimestamp, context, name, "99percentile", dict["p99"]); + Write(posixTimestamp, context, name, "999percentile", dict["p999"]); + } + + private void Write(double timestamp, string context, string name, string subname, object value) + { + Write(timestamp, context, name + "." + subname, value); + } + + private void Write(double timestamp, string context, string name, object value) + { + _metrics.Add(new MetricJson + { + Host = _options.Hostname, + Metric = context + "." + name, + //Tags = tags.Values, + Points = new[] + { + new[] { timestamp, value } + } + }); + } + + private void Flush() + { + _streamWriter.Write(JsonConvert.SerializeObject(new SeriesJson { Series = _metrics.ToArray() }, JsonSettings)); + } + + public GeneratedMetricNameMapping MetricNameMapping { get; } = new GeneratedMetricNameMapping(); + } +} diff --git a/Source/Picton.Messaging.IntegrationTests/Datadog/SeriesJson.cs b/Source/Picton.Messaging.IntegrationTests/Datadog/SeriesJson.cs new file mode 100644 index 0000000..b8c1f86 --- /dev/null +++ b/Source/Picton.Messaging.IntegrationTests/Datadog/SeriesJson.cs @@ -0,0 +1,7 @@ +namespace Picton.Messaging.IntegrationTests.Datadog +{ + class SeriesJson + { + public MetricJson[] Series { get; set; } + } +} diff --git a/Source/Picton.Messaging.IntegrationTests/Picton.Messaging.IntegrationTests.csproj b/Source/Picton.Messaging.IntegrationTests/Picton.Messaging.IntegrationTests.csproj index 3fc2613..ea95e11 100644 --- a/Source/Picton.Messaging.IntegrationTests/Picton.Messaging.IntegrationTests.csproj +++ b/Source/Picton.Messaging.IntegrationTests/Picton.Messaging.IntegrationTests.csproj @@ -7,6 +7,10 @@ Picton.Messaging.IntegrationTests + + + + diff --git a/Source/Picton.Messaging.IntegrationTests/Program.cs b/Source/Picton.Messaging.IntegrationTests/Program.cs index 3269d28..9c52c24 100644 --- a/Source/Picton.Messaging.IntegrationTests/Program.cs +++ b/Source/Picton.Messaging.IntegrationTests/Program.cs @@ -1,7 +1,9 @@ -using Microsoft.WindowsAzure.Storage; +using App.Metrics; +using Microsoft.WindowsAzure.Storage; using Microsoft.WindowsAzure.Storage.Queue; using Picton.Interfaces; using Picton.Managers; +using Picton.Messaging.IntegrationTests.Datadog; using Picton.Messaging.Logging; using System; using System.Diagnostics; @@ -27,23 +29,42 @@ static void Main() // Ensure the Console is tall enough Console.WindowHeight = Math.Min(60, Console.LargestWindowHeight); + // Configure where metrics are published to + var datadogApiKey = Environment.GetEnvironmentVariable("DATADOG_APIKEY"); + var metrics = new MetricsBuilder() + .Report.OverHttp(o => + { + o.HttpSettings.RequestUri = new Uri($"https://app.datadoghq.com/api/v1/series?api_key={datadogApiKey}"); + o.MetricsOutputFormatter = new DatadogFormatter(new DatadogFormatterOptions { Hostname = Environment.MachineName }); + o.FlushInterval = TimeSpan.FromSeconds(2); + }) + .Build(); + // Setup the message queue in Azure storage emulator var storageAccount = StorageAccount.FromCloudStorageAccount(CloudStorageAccount.DevelopmentStorageAccount); var queueName = "myqueue"; logger(Logging.LogLevel.Info, () => "Begin integration tests..."); - var numberOfMessages = 50; + var numberOfMessages = 25; + + logger(Logging.LogLevel.Info, () => $"Adding {numberOfMessages} string messages to the queue..."); + AddStringMessagesToQueue(numberOfMessages, queueName, storageAccount, logProvider).Wait(); + logger(Logging.LogLevel.Info, () => "Processing the messages in the queue..."); + ProcessSimpleMessages(queueName, storageAccount, logProvider, metrics); logger(Logging.LogLevel.Info, () => $"Adding {numberOfMessages} simple messages to the queue..."); AddSimpleMessagesToQueue(numberOfMessages, queueName, storageAccount, logProvider).Wait(); logger(Logging.LogLevel.Info, () => "Processing the messages in the queue..."); - ProcessSimpleMessages(queueName, storageAccount, logProvider); + ProcessSimpleMessages(queueName, storageAccount, logProvider, metrics); logger(Logging.LogLevel.Info, () => $"Adding {numberOfMessages} messages with handlers to the queue..."); AddMessagesWithHandlerToQueue(numberOfMessages, queueName, storageAccount, logProvider).Wait(); logger(Logging.LogLevel.Info, () => "Processing the messages in the queue..."); - ProcessMessagesWithHandlers(queueName, storageAccount, logProvider); + ProcessMessagesWithHandlers(queueName, storageAccount, logProvider, metrics); + + // Send metrics to Datadog + Task.WhenAll(metrics.ReportRunner.RunAllAsync()).Wait(); // Flush the console key buffer while (Console.KeyAvailable) Console.ReadKey(true); @@ -53,9 +74,8 @@ static void Main() Console.ReadKey(); } - public static async Task AddSimpleMessagesToQueue(int numberOfMessages, string queueName, IStorageAccount storageAccount, ILogProvider logProvider) + public static async Task AddStringMessagesToQueue(int numberOfMessages, string queueName, IStorageAccount storageAccount, ILogProvider logProvider) { - // Add messages to our testing queue var cloudQueueClient = storageAccount.CreateCloudQueueClient(); var cloudQueue = cloudQueueClient.GetQueueReference(queueName); await cloudQueue.CreateIfNotExistsAsync().ConfigureAwait(false); @@ -66,13 +86,24 @@ public static async Task AddSimpleMessagesToQueue(int numberOfMessages, string q } } - public static void ProcessSimpleMessages(string queueName, IStorageAccount storageAccount, ILogProvider logProvider) + public static async Task AddSimpleMessagesToQueue(int numberOfMessages, string queueName, IStorageAccount storageAccount, ILogProvider logProvider) + { + var queueManager = new QueueManager(queueName, storageAccount); + await queueManager.CreateIfNotExistsAsync().ConfigureAwait(false); + await queueManager.ClearAsync().ConfigureAwait(false); + for (var i = 0; i < numberOfMessages; i++) + { + await queueManager.AddMessageAsync($"Hello world {i}").ConfigureAwait(false); + } + } + + public static void ProcessSimpleMessages(string queueName, IStorageAccount storageAccount, ILogProvider logProvider, IMetrics metrics) { var logger = logProvider.GetLogger("ProcessSimpleMessages"); Stopwatch sw = null; // Configure the message pump - var messagePump = new AsyncMessagePump(queueName, storageAccount, 10, TimeSpan.FromMinutes(1), 3) + var messagePump = new AsyncMessagePump(queueName, storageAccount, 10, TimeSpan.FromMinutes(1), 3, metrics) { OnMessage = (message, cancellationToken) => { @@ -112,14 +143,14 @@ public static async Task AddMessagesWithHandlerToQueue(int numberOfMessages, str } } - public static void ProcessMessagesWithHandlers(string queueName, IStorageAccount storageAccount, ILogProvider logProvider) + public static void ProcessMessagesWithHandlers(string queueName, IStorageAccount storageAccount, ILogProvider logProvider, IMetrics metrics) { var logger = logProvider.GetLogger("ProcessMessagesWithHandlers"); Stopwatch sw = null; // Configure the message pump - var messagePump = new AsyncMessagePumpWithHandlers(queueName, storageAccount, 10, TimeSpan.FromMinutes(1), 3); + var messagePump = new AsyncMessagePumpWithHandlers(queueName, storageAccount, 10, TimeSpan.FromMinutes(1), 3, metrics); messagePump.OnQueueEmpty = cancellationToken => { // Stop the timer diff --git a/Source/Picton.Messaging/Utils/RecurrentCancellableTask.cs b/Source/Picton.Messaging/Utils/RecurrentCancellableTask.cs index 896affc..67af162 100644 --- a/Source/Picton.Messaging/Utils/RecurrentCancellableTask.cs +++ b/Source/Picton.Messaging/Utils/RecurrentCancellableTask.cs @@ -19,16 +19,16 @@ public static class RecurrentCancellableTask /// The poll interval. /// The token. /// The task creation options - public static void StartNew(Action action, TimeSpan pollInterval, CancellationToken token, TaskCreationOptions taskCreationOptions = TaskCreationOptions.None) + public static void StartNew(Func action, TimeSpan pollInterval, CancellationToken token, TaskCreationOptions taskCreationOptions = TaskCreationOptions.None) { Task.Factory.StartNew( - () => + async () => { do { try { - action(); + await action().ConfigureAwait(false); if (token.WaitHandle.WaitOne(pollInterval)) break; } catch From 66dcde015f4062f283d55244f1eef1a522772de1 Mon Sep 17 00:00:00 2001 From: Jeremie Desautels Date: Tue, 13 Feb 2018 17:07:43 -0500 Subject: [PATCH 25/26] Send metrics to Datadog --- Source/Picton.Messaging.IntegrationTests/Program.cs | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/Source/Picton.Messaging.IntegrationTests/Program.cs b/Source/Picton.Messaging.IntegrationTests/Program.cs index 9c52c24..1b8234e 100644 --- a/Source/Picton.Messaging.IntegrationTests/Program.cs +++ b/Source/Picton.Messaging.IntegrationTests/Program.cs @@ -1,4 +1,5 @@ using App.Metrics; +using App.Metrics.Scheduling; using Microsoft.WindowsAzure.Storage; using Microsoft.WindowsAzure.Storage.Queue; using Picton.Interfaces; @@ -40,6 +41,15 @@ static void Main() }) .Build(); + // Send metrics to Datadog + var sendMetricsJob = new AppMetricsTaskScheduler( + TimeSpan.FromSeconds(2), + async () => + { + await Task.WhenAll(metrics.ReportRunner.RunAllAsync()); + }); + sendMetricsJob.Start(); + // Setup the message queue in Azure storage emulator var storageAccount = StorageAccount.FromCloudStorageAccount(CloudStorageAccount.DevelopmentStorageAccount); var queueName = "myqueue"; @@ -63,9 +73,6 @@ static void Main() logger(Logging.LogLevel.Info, () => "Processing the messages in the queue..."); ProcessMessagesWithHandlers(queueName, storageAccount, logProvider, metrics); - // Send metrics to Datadog - Task.WhenAll(metrics.ReportRunner.RunAllAsync()).Wait(); - // Flush the console key buffer while (Console.KeyAvailable) Console.ReadKey(true); From c164b6770189b0f46a6d85fc5c760b04327d91b7 Mon Sep 17 00:00:00 2001 From: Jeremie Desautels Date: Tue, 13 Feb 2018 18:18:57 -0500 Subject: [PATCH 26/26] Remove the IStorageAccount interface --- .../Program.cs | 12 ++--- .../AsyncMessagePumpTests.cs | 50 +++++++++++++------ Source/Picton.Messaging/AsyncMessagePump.cs | 20 ++------ .../AsyncMessagePumpWithHandlers.cs | 21 ++------ .../Picton.Messaging/Picton.Messaging.csproj | 2 +- 5 files changed, 47 insertions(+), 58 deletions(-) diff --git a/Source/Picton.Messaging.IntegrationTests/Program.cs b/Source/Picton.Messaging.IntegrationTests/Program.cs index 1b8234e..f732e25 100644 --- a/Source/Picton.Messaging.IntegrationTests/Program.cs +++ b/Source/Picton.Messaging.IntegrationTests/Program.cs @@ -51,7 +51,7 @@ static void Main() sendMetricsJob.Start(); // Setup the message queue in Azure storage emulator - var storageAccount = StorageAccount.FromCloudStorageAccount(CloudStorageAccount.DevelopmentStorageAccount); + var storageAccount = CloudStorageAccount.DevelopmentStorageAccount; var queueName = "myqueue"; logger(Logging.LogLevel.Info, () => "Begin integration tests..."); @@ -81,7 +81,7 @@ static void Main() Console.ReadKey(); } - public static async Task AddStringMessagesToQueue(int numberOfMessages, string queueName, IStorageAccount storageAccount, ILogProvider logProvider) + public static async Task AddStringMessagesToQueue(int numberOfMessages, string queueName, CloudStorageAccount storageAccount, ILogProvider logProvider) { var cloudQueueClient = storageAccount.CreateCloudQueueClient(); var cloudQueue = cloudQueueClient.GetQueueReference(queueName); @@ -93,7 +93,7 @@ public static async Task AddStringMessagesToQueue(int numberOfMessages, string q } } - public static async Task AddSimpleMessagesToQueue(int numberOfMessages, string queueName, IStorageAccount storageAccount, ILogProvider logProvider) + public static async Task AddSimpleMessagesToQueue(int numberOfMessages, string queueName, CloudStorageAccount storageAccount, ILogProvider logProvider) { var queueManager = new QueueManager(queueName, storageAccount); await queueManager.CreateIfNotExistsAsync().ConfigureAwait(false); @@ -104,7 +104,7 @@ public static async Task AddSimpleMessagesToQueue(int numberOfMessages, string q } } - public static void ProcessSimpleMessages(string queueName, IStorageAccount storageAccount, ILogProvider logProvider, IMetrics metrics) + public static void ProcessSimpleMessages(string queueName, CloudStorageAccount storageAccount, ILogProvider logProvider, IMetrics metrics) { var logger = logProvider.GetLogger("ProcessSimpleMessages"); Stopwatch sw = null; @@ -139,7 +139,7 @@ public static void ProcessSimpleMessages(string queueName, IStorageAccount stora logger(Logging.LogLevel.Info, () => $"\tDone in {sw.Elapsed.ToDurationString()}"); } - public static async Task AddMessagesWithHandlerToQueue(int numberOfMessages, string queueName, IStorageAccount storageAccount, ILogProvider logProvider) + public static async Task AddMessagesWithHandlerToQueue(int numberOfMessages, string queueName, CloudStorageAccount storageAccount, ILogProvider logProvider) { var queueManager = new QueueManager(queueName, storageAccount); await queueManager.CreateIfNotExistsAsync().ConfigureAwait(false); @@ -150,7 +150,7 @@ public static async Task AddMessagesWithHandlerToQueue(int numberOfMessages, str } } - public static void ProcessMessagesWithHandlers(string queueName, IStorageAccount storageAccount, ILogProvider logProvider, IMetrics metrics) + public static void ProcessMessagesWithHandlers(string queueName, CloudStorageAccount storageAccount, ILogProvider logProvider, IMetrics metrics) { var logger = logProvider.GetLogger("ProcessMessagesWithHandlers"); diff --git a/Source/Picton.Messaging.UnitTests/AsyncMessagePumpTests.cs b/Source/Picton.Messaging.UnitTests/AsyncMessagePumpTests.cs index 0fbb897..e86ba15 100644 --- a/Source/Picton.Messaging.UnitTests/AsyncMessagePumpTests.cs +++ b/Source/Picton.Messaging.UnitTests/AsyncMessagePumpTests.cs @@ -3,11 +3,11 @@ using Microsoft.WindowsAzure.Storage.Blob; using Microsoft.WindowsAzure.Storage.Queue; using Moq; -using Picton.Interfaces; using Shouldly; using System; using System.Linq; using System.Reflection; +using System.Text; using System.Threading; using System.Threading.Tasks; using Xunit; @@ -24,7 +24,7 @@ public void Null_cloudQueue_throws() { Should.Throw(() => { - var messagePump = new AsyncMessagePump("myqueue", (IStorageAccount)null, 1, TimeSpan.FromMinutes(1), 3); + var messagePump = new AsyncMessagePump("myqueue", (CloudStorageAccount)null, 1, TimeSpan.FromMinutes(1), 3); }); } @@ -33,7 +33,7 @@ public void Number_of_concurrent_tasks_too_small_throws() { Should.Throw(() => { - var mockStorageAccount = new Mock(MockBehavior.Strict); + var mockStorageAccount = GetMockStorageAccount(null, null); var messagePump = new AsyncMessagePump("myqueue", mockStorageAccount.Object, 0, TimeSpan.FromMinutes(1), 3); }); } @@ -43,7 +43,7 @@ public void DequeueCount_too_small_throws() { Should.Throw(() => { - var mockStorageAccount = new Mock(MockBehavior.Strict); + var mockStorageAccount = GetMockStorageAccount(null, null); var messagePump = new AsyncMessagePump("myqueue", mockStorageAccount.Object, 1, TimeSpan.FromMinutes(1), 0); }); } @@ -389,8 +389,8 @@ private static Mock GetMockQueue(string queueName) private static Mock GetMockQueueClient(Mock mockQueue) { var mockQueueStorageUri = new Uri(QUEUE_STORAGE_URL); - var mockCredentials = new StorageCredentials("mySasToken"); - var mockQueueClient = new Mock(MockBehavior.Strict, mockQueueStorageUri, mockCredentials); + var storageCredentials = GetStorageCredentials(); + var mockQueueClient = new Mock(MockBehavior.Strict, mockQueueStorageUri, storageCredentials); mockQueueClient .Setup(c => c.GetQueueReference(mockQueue.Object.Name)) .Returns(mockQueue.Object) @@ -398,18 +398,36 @@ private static Mock GetMockQueueClient(Mock mockQu return mockQueueClient; } - private static Mock GetMockStorageAccount(Mock mockBlobClient, Mock mockQueueClient) + private static Mock GetMockStorageAccount(Mock mockBlobClient, Mock mockQueueClient) { - var storageAccount = new Mock(MockBehavior.Strict); - storageAccount - .Setup(s => s.CreateCloudBlobClient()) - .Returns(mockBlobClient.Object) - .Verifiable(); - storageAccount - .Setup(s => s.CreateCloudQueueClient()) - .Returns(mockQueueClient.Object) - .Verifiable(); + var storageCredentials = GetStorageCredentials(); + var storageAccount = new Mock(MockBehavior.Strict, storageCredentials, true); + + if (mockBlobClient != null) + { + storageAccount + .Setup(s => s.CreateCloudBlobClient()) + .Returns(mockBlobClient.Object) + .Verifiable(); + } + + if (mockQueueClient != null) + { + storageAccount + .Setup(s => s.CreateCloudQueueClient()) + .Returns(mockQueueClient.Object) + .Verifiable(); + } + return storageAccount; } + + private static StorageCredentials GetStorageCredentials() + { + var accountAccessKey = Convert.ToBase64String(Encoding.UTF8.GetBytes("this_is_a_bogus_account_access_key")); + var storageCredentials = new StorageCredentials("account_name", accountAccessKey); + return storageCredentials; + } + } } diff --git a/Source/Picton.Messaging/AsyncMessagePump.cs b/Source/Picton.Messaging/AsyncMessagePump.cs index c7129e5..71fb3b1 100644 --- a/Source/Picton.Messaging/AsyncMessagePump.cs +++ b/Source/Picton.Messaging/AsyncMessagePump.cs @@ -75,26 +75,12 @@ public class AsyncMessagePump /// Initializes a new instance of the class. /// /// Name of the queue. - /// The cloud storage account. + /// The cloud storage account. /// The number of concurrent tasks. /// The visibility timeout. /// The maximum dequeue count. - /// - public AsyncMessagePump(string queueName, CloudStorageAccount cloudStorageAccount, int concurrentTasks = 25, TimeSpan? visibilityTimeout = null, int maxDequeueCount = 3, IMetrics metrics = null) - : this(queueName, StorageAccount.FromCloudStorageAccount(cloudStorageAccount), concurrentTasks, visibilityTimeout, maxDequeueCount, metrics) - { - } - - /// - /// Initializes a new instance of the class. - /// - /// Name of the queue. - /// The storage account. - /// The number of concurrent tasks. - /// The queue visibility timeout - /// The number of times to try processing a given message before giving up - /// - public AsyncMessagePump(string queueName, IStorageAccount storageAccount, int concurrentTasks = 25, TimeSpan? visibilityTimeout = null, int maxDequeueCount = 3, IMetrics metrics = null) + /// The system where metrics are published + public AsyncMessagePump(string queueName, CloudStorageAccount storageAccount, int concurrentTasks = 25, TimeSpan? visibilityTimeout = null, int maxDequeueCount = 3, IMetrics metrics = null) { if (concurrentTasks < 1) throw new ArgumentException("Number of concurrent tasks must be greather than zero", nameof(concurrentTasks)); if (maxDequeueCount < 1) throw new ArgumentException("Number of retries must be greather than zero", nameof(maxDequeueCount)); diff --git a/Source/Picton.Messaging/AsyncMessagePumpWithHandlers.cs b/Source/Picton.Messaging/AsyncMessagePumpWithHandlers.cs index 1c2d558..b0aad3f 100644 --- a/Source/Picton.Messaging/AsyncMessagePumpWithHandlers.cs +++ b/Source/Picton.Messaging/AsyncMessagePumpWithHandlers.cs @@ -1,7 +1,6 @@ using App.Metrics; using Microsoft.Extensions.DependencyModel; using Microsoft.WindowsAzure.Storage; -using Picton.Interfaces; using Picton.Messaging.Logging; using Picton.Messaging.Messages; using System; @@ -67,26 +66,12 @@ public Action OnQueueEmpty /// Initializes a new instance of the class. /// /// Name of the queue. - /// The cloud storage account. + /// The cloud storage account. /// The number of concurrent tasks. /// The visibility timeout. /// The maximum dequeue count. - /// - public AsyncMessagePumpWithHandlers(string queueName, CloudStorageAccount cloudStorageAccount, int concurrentTasks = 25, TimeSpan? visibilityTimeout = null, int maxDequeueCount = 3, IMetrics metrics = null) - : this(queueName, StorageAccount.FromCloudStorageAccount(cloudStorageAccount), concurrentTasks, visibilityTimeout, maxDequeueCount, metrics) - { - } - - /// - /// Initializes a new instance of the class. - /// - /// Name of the queue. - /// The storage account - /// The number of concurrent tasks. - /// The queue visibility timeout - /// The number of times to try processing a given message before giving up - /// - public AsyncMessagePumpWithHandlers(string queueName, IStorageAccount storageAccount, int concurrentTasks = 25, TimeSpan? visibilityTimeout = null, int maxDequeueCount = 3, IMetrics metrics = null) + /// The system where metrics are published + public AsyncMessagePumpWithHandlers(string queueName, CloudStorageAccount storageAccount, int concurrentTasks = 25, TimeSpan? visibilityTimeout = null, int maxDequeueCount = 3, IMetrics metrics = null) { _messagePump = new AsyncMessagePump(queueName, storageAccount, concurrentTasks, visibilityTimeout, maxDequeueCount, metrics) { diff --git a/Source/Picton.Messaging/Picton.Messaging.csproj b/Source/Picton.Messaging/Picton.Messaging.csproj index 86dcf0a..c476958 100644 --- a/Source/Picton.Messaging/Picton.Messaging.csproj +++ b/Source/Picton.Messaging/Picton.Messaging.csproj @@ -30,7 +30,7 @@ - + All