Skip to content

Component Execution Status

Marty Mathis edited this page Aug 27, 2020 · 10 revisions

Pipeline Component Execution Status Monitoring

Pipeline Framework has some built in hooks you can extend to provide status feedback on the execution of each component within your pipeline. By implementing, IPipelineComponentExecutionStatusReceiver or IAsyncPipelineComponentExecutionStatusReceiver (depending on the type of pipeline you choose), you can receive status notifications for each executing component and handle those notifications as you see fit.

Registering Component Execution Status Receiver

There are currently two ways to accomplish this:

Using builder:

var pipeline = PipelineBuilder<ExamplePipelinePayload>
    .InitializeAsyncPipeline(new ExecutionStatusReceiver())
    .WithComponent<FooComponent>()
    .WithComponent<DelayComponent>()
    .WithComponent<BarComponent>()
    .WithComponentResolver(resolver)
    .WithSettings(settings)
    .Build();

Using Microsoft Dependency Injection Extension Package:

    var services = new ServiceCollection();

    services
        .AddPipelineFramework()
        .AddAsyncPipeline<ExamplePipelinePayload, ExecutionStatusReceiver>(
            cfg => cfg
                .WithComponent<FooComponent>()
                .WithComponent<DelayComponent>()
                .WithComponent<BarComponent>(),
            settings);

Example usage with Serilog:

using PipelineFramework.Abstractions;
using Serilog;
using System.Diagnostics.CodeAnalysis;
using System.Threading.Tasks;

namespace PipelineFramework.Core.Examples
{
    public class ExecutionStatusReceiver : IPipelineComponentExecutionStatusReceiver
    {
        private readonly ILogger _logger;

        public ExecutionStatusReceiver(ILogger logger)
        {
            _logger = logger;
        }

        public void ReceiveExecutionStarting(PipelineComponentExecutionStartingInfo executionInfo)
        {
            var logger = EnrichLogger(_logger, executionInfo.Payload);
            logger.Information(
                "Component '{PipelineComponentName}' execution starting at {TimeStamp}",
                executionInfo.PipelineComponentName,
                executionInfo.TimeStamp.ToShortTimeString());
        }

        public void ReceiveExecutionCompleted(PipelineComponentExecutionCompletedInfo executionInfo)
        {
            var logger = EnrichLogger(_logger, executionInfo.Payload);
            if (executionInfo.Exception == null)
            {
                logger.Information(
                       "Component '{PipelineComponentName}' execution completed at {TimeStamp}.  Duration: {TotalMilliseconds}ms",
                       executionInfo.PipelineComponentName,
                       executionInfo.TimeStamp.ToShortTimeString(),
                       executionInfo.ExecutionTime.TotalMilliseconds);
            }
            else
            {
                logger.Error(
                    executionInfo.Exception,
                    "Component '{PipelineComponentName}' execution failed at {TimeStamp}.  Duration: {TotalMilliseconds}ms",
                    executionInfo.PipelineComponentName,
                    executionInfo.TimeStamp.ToShortTimeString(),
                    executionInfo.ExecutionTime.TotalMilliseconds);
            }
        }

        private static ILogger EnrichLogger(ILogger logger, object payload) =>
            payload is ExamplePipelinePayload p
                ? logger.ForContext("FooKey", p.FooKey).ForContext("Result", p.Result)
                : logger;
    }
}
Clone this wiki locally