Skip to content

Commit

Permalink
message deferring / timestamping / real db testing (#11)
Browse files Browse the repository at this point in the history
* added test against real db connection
* added more complex tests
* added event timestamp
* added option to defer message completion
* async processing of incoming system message
* defer status change if not accepted
* defer progress messages
* improve logging
  • Loading branch information
fw2568 authored Apr 12, 2024
1 parent 0fddceb commit e566664
Show file tree
Hide file tree
Showing 61 changed files with 1,221 additions and 99 deletions.
6 changes: 6 additions & 0 deletions rebus-extensions.sln
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Rebus.SimpleInjector", "src
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Rebus.SimpleInjector.Tests", "test\Rebus.SimpleInjector.Tests\Rebus.SimpleInjector.Tests.csproj", "{539BED78-F196-4FA2-9E48-2185FE82444E}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Rebus.OperationsDB.Tests", "test\Rebus.OperationsDB.Tests\Rebus.OperationsDB.Tests.csproj", "{73DED33B-5E78-4843-B3B6-2C9BB0766E42}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -81,6 +83,10 @@ Global
{539BED78-F196-4FA2-9E48-2185FE82444E}.Debug|Any CPU.Build.0 = Debug|Any CPU
{539BED78-F196-4FA2-9E48-2185FE82444E}.Release|Any CPU.ActiveCfg = Release|Any CPU
{539BED78-F196-4FA2-9E48-2185FE82444E}.Release|Any CPU.Build.0 = Release|Any CPU
{73DED33B-5E78-4843-B3B6-2C9BB0766E42}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{73DED33B-5E78-4843-B3B6-2C9BB0766E42}.Debug|Any CPU.Build.0 = Debug|Any CPU
{73DED33B-5E78-4843-B3B6-2C9BB0766E42}.Release|Any CPU.ActiveCfg = Release|Any CPU
{73DED33B-5E78-4843-B3B6-2C9BB0766E42}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@ public interface IOperationTask
public Guid InitiatingTaskId { get; }


OperationTaskStatus Status { get; }
OperationTaskStatus Status { get; }

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ public interface IOperationManager
{
ValueTask<IOperation?> GetByIdAsync(Guid operationId);
ValueTask<IOperation> GetOrCreateAsync(Guid operationId, object command,
DateTimeOffset timestamp,
object? additionalData, IDictionary<string,string>? additionalHeaders);

ValueTask<bool> TryChangeStatusAsync(IOperation operation, OperationStatus newStatus, object? additionalData,
ValueTask<bool> TryChangeStatusAsync(IOperation operation, OperationStatus newStatus,
DateTimeOffset timestamp, object? additionalData,
IDictionary<string,string>? messageHeaders);
ValueTask AddProgressAsync(Guid progressId, DateTimeOffset timestamp, IOperation operation, IOperationTask task,
object? data, IDictionary<string,string>? messageHeaders);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Collections.Generic;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Dbosoft.Rebus.Operations.Events;

Expand All @@ -13,4 +14,8 @@ public interface IOperationMessaging

IOperationDispatcher OperationDispatcher { get; }
IOperationTaskDispatcher TaskDispatcher { get; }

Task SendDeferredMessage(object message, TimeSpan defer);
Task DeferredCurrentMessage(TimeSpan defer);

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,12 @@ namespace Dbosoft.Rebus.Operations.Workflow;
public interface IOperationTaskManager
{
ValueTask<IOperationTask?> GetByIdAsync(Guid taskId);
ValueTask<IOperationTask> GetOrCreateAsync(IOperation operation, object command, Guid taskId, Guid parentTaskId);
ValueTask<bool> TryChangeStatusAsync(IOperationTask task, OperationTaskStatus newStatus, object? additionalData);
ValueTask<IOperationTask> GetOrCreateAsync(IOperation operation, object command,
DateTimeOffset created,
Guid taskId, Guid parentTaskId);
ValueTask<bool> TryChangeStatusAsync(IOperationTask task, OperationTaskStatus newStatus,
DateTimeOffset timestamp,
object? additionalData);


}
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@ public DefaultOperationDispatcher(
_operationManager = operationManager;
}

protected override async ValueTask<(IOperation, object)> CreateOperation(object command, object? additionalData,
protected override async ValueTask<(IOperation, object)> CreateOperation(object command,
DateTimeOffset created,
object? additionalData,
IDictionary<string,string>? additionalHeaders)
{
return (await _operationManager.GetOrCreateAsync(Guid.NewGuid(), command,
additionalData,additionalHeaders), command);
created,
additionalData,additionalHeaders).ConfigureAwait(false), command);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ public DefaultOperationTaskDispatcher(
}

protected override async ValueTask<(IOperationTask, object)> CreateTask(Guid operationId, Guid initiatingTaskId,
object command, object? additionalData, IDictionary<string,string>? additionalHeaders)
object command, DateTimeOffset created, object? additionalData, IDictionary<string,string>? additionalHeaders)
{
var op = await _operationManager.GetByIdAsync(operationId);
var op = await _operationManager.GetByIdAsync(operationId).ConfigureAwait(false);
if (op == null)
{
throw new ArgumentException($"Operation {operationId} not found", nameof(operationId));
}

return (await _operationTaskManager.GetOrCreateAsync(op, command, Guid.NewGuid(), initiatingTaskId), command);
return (await _operationTaskManager.GetOrCreateAsync(op, command, created, Guid.NewGuid(), initiatingTaskId).ConfigureAwait(false), command);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,34 +43,38 @@ protected OperationDispatcherBase(IBus bus, WorkflowOptions options, ILogger<Ope
return StartOperation(commandType,additionalData, additionalHeaders);
}

protected abstract ValueTask<(IOperation, object)> CreateOperation(object command, object? additionalData, IDictionary<string,string>? additionalHeaders);
protected abstract ValueTask<(IOperation, object)> CreateOperation(object command, DateTimeOffset created, object? additionalData, IDictionary<string,string>? additionalHeaders);

protected async ValueTask<IOperation?> StartOperation(object command, object? additionalData, IDictionary<string,string>? additionalHeaders = null)
{

if (command == null)
throw new ArgumentNullException(nameof(command));

var(operation, taskCommand) = await CreateOperation(command, additionalData, additionalHeaders);

var created = DateTimeOffset.Now;
var (operation, taskCommand) =
await CreateOperation(command, created, additionalData, additionalHeaders).ConfigureAwait(false);

var commandJson = JsonSerializer.Serialize(taskCommand, _options.JsonSerializerOptions);

var taskMessage = new CreateNewOperationTaskCommand(
taskCommand.GetType().AssemblyQualifiedName,
commandJson,
operation.Id,
operation.Id,
Guid.NewGuid());
Guid.NewGuid(),
created);

var message = new CreateOperationCommand { TaskMessage = taskMessage };
await (string.IsNullOrWhiteSpace(_options.OperationsDestination)
? _bus.Send(message, additionalHeaders)
: _bus.Advanced.Routing.Send(_options.OperationsDestination, message, additionalHeaders));
? _bus.Send(message, additionalHeaders)
: _bus.Advanced.Routing.Send(_options.OperationsDestination, message, additionalHeaders))
.ConfigureAwait(false);

_logger.LogDebug("Send new command of type {commandType}. Id: {operationId}",
taskCommand.GetType().Name, operation.Id);

return operation;

}

}
Expand Down
6 changes: 5 additions & 1 deletion src/Rebus.Operations/Rebus.Operations.Core/OperationTask.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,21 @@ namespace Dbosoft.Rebus.Operations
{
public class OperationTask<T> : IOperationTaskMessage where T : class, new()
{
public OperationTask(T command, Guid operationId, Guid initiatingTaskId, Guid taskId)

public OperationTask(T command, Guid operationId, Guid initiatingTaskId, Guid taskId,
DateTimeOffset created)
{
Command = command;
OperationId = operationId;
InitiatingTaskId = initiatingTaskId;
TaskId = taskId;
Created = created;
}

public T Command { get; }
public Guid OperationId { get; }
public Guid InitiatingTaskId { get; }
public Guid TaskId { get; }
public DateTimeOffset Created { get; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,27 +42,28 @@ protected OperationTaskDispatcherBase(IBus bus, WorkflowOptions options, ILogger
return StartTask(operationId, initiatingTaskId, commandType, additionalData, additionalHeaders);
}

protected abstract ValueTask<(IOperationTask, object)> CreateTask(Guid operationId, Guid initiatingTaskId, object command, object? additionalData, IDictionary<string,string>? additionalHeaders);
protected abstract ValueTask<(IOperationTask, object)> CreateTask(Guid operationId, Guid initiatingTaskId, object command, DateTimeOffset created, object? additionalData, IDictionary<string,string>? additionalHeaders);

protected async ValueTask<IOperationTask?> StartTask(Guid operationId, Guid initiatingTaskId,
object command, object? additionalData, IDictionary<string,string>? additionalHeaders = null)
{
if (command == null)
throw new ArgumentNullException(nameof(command));

var (task, taskCommand) = await CreateTask(operationId, initiatingTaskId, command, additionalData, additionalHeaders);
var created = DateTimeOffset.UtcNow;
var (task, taskCommand) = await CreateTask(operationId, initiatingTaskId, command, created, additionalData, additionalHeaders).ConfigureAwait(false);
var commandJson = JsonSerializer.Serialize(taskCommand, _options.JsonSerializerOptions);

var taskMessage = new CreateNewOperationTaskCommand(
taskCommand.GetType().AssemblyQualifiedName,
commandJson,
operationId,
initiatingTaskId,
task.Id);
task.Id, created);

await (string.IsNullOrWhiteSpace(_options.OperationsDestination)
? _bus.Send(taskMessage, additionalHeaders)
: _bus.Advanced.Routing.Send(_options.OperationsDestination, taskMessage, additionalHeaders)) ;
: _bus.Advanced.Routing.Send(_options.OperationsDestination, taskMessage, additionalHeaders)).ConfigureAwait(false) ;

_logger.LogDebug("Send new command of type {commandType}. Id: {operationId}, ParentTaskId: {parentTaskId}",
taskCommand.GetType().Name, operationId, initiatingTaskId);
Expand Down
12 changes: 6 additions & 6 deletions src/Rebus.Operations/Rebus.Operations.Core/OperationsSetup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@ public static async Task<IBus> SubscribeEvents(IBus bus, WorkflowOptions options

if (!string.IsNullOrWhiteSpace(options.EventDestination))
{
await bus.Advanced.Topics.Subscribe(options.EventDestination);
await bus.Advanced.Topics.Subscribe(options.EventDestination).ConfigureAwait(false);
return bus;
}

await bus.Subscribe<OperationStatusEvent>();
await bus.Subscribe<OperationTaskAcceptedEvent>();
await bus.Subscribe<OperationTaskProgressEvent>();
await bus.Subscribe<OperationTaskStatusEvent>();
await bus.Subscribe<OperationTimeoutEvent>();
await bus.Subscribe<OperationStatusEvent>().ConfigureAwait(false);
await bus.Subscribe<OperationTaskAcceptedEvent>().ConfigureAwait(false);
await bus.Subscribe<OperationTaskProgressEvent>().ConfigureAwait(false);
await bus.Subscribe<OperationTaskStatusEvent>().ConfigureAwait(false);
await bus.Subscribe<OperationTimeoutEvent>().ConfigureAwait(false);

return bus;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
<wpf:ResourceDictionary xml:space="preserve" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:s="clr-namespace:System;assembly=mscorlib" xmlns:ss="urn:shemas-jetbrains-com:settings-storage-xaml" xmlns:wpf="http://schemas.microsoft.com/winfx/2006/xaml/presentation">
<s:String x:Key="/Default/CodeInspection/Daemon/ConfigureAwaitAnalysisMode/@EntryValue">Library</s:String></wpf:ResourceDictionary>
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ await _operationMessaging.DispatchTaskStatusEventAsync(
OperationTaskStatusEvent.Failed(
failedMessage.Message.OperationId, failedMessage.Message.InitiatingTaskId,
failedTaskId, new ErrorData() { ErrorMessage = failedMessage.ErrorDescription },
_workflowOptions.JsonSerializerOptions));
_workflowOptions.JsonSerializerOptions)).ConfigureAwait(false);


}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Dbosoft.Rebus.Operations.Commands;
using Dbosoft.Rebus.Operations.Events;
using Microsoft.Extensions.Logging;
using Rebus.Bus;
using Rebus.Handlers;
using Rebus.Pipeline;
using Rebus.Transport;


namespace Dbosoft.Rebus.Operations.Workflow
Expand All @@ -22,27 +24,39 @@ public IncomingTaskMessageHandler(IBus bus, ILogger<IncomingTaskMessageHandler<T
_messageEnricher = messageEnricher;
}

private static async void Resubmit(
IBus bus,
OperationTaskSystemMessage<T> taskMessage, IDictionary<string, string>? headers)
{
using var scope = new RebusTransactionScope();
await bus.SendLocal(new OperationTask<T>(taskMessage.Message,
taskMessage.OperationId, taskMessage.InitiatingTaskId,
taskMessage.TaskId, taskMessage.Created)
, headers
).ConfigureAwait(false);

await scope.CompleteAsync().ConfigureAwait(false);
}

public async Task Handle(OperationTaskSystemMessage<T> taskMessage)
{
if(taskMessage.Message==null)
throw new InvalidOperationException($"Operation Workflow {taskMessage.OperationId}/{taskMessage.TaskId}: missing command message");

var headers = _messageEnricher.EnrichHeadersFromIncomingSystemMessage(taskMessage, MessageContext.Current.Headers);
await _bus.SendLocal(new OperationTask<T>(taskMessage.Message, taskMessage.OperationId, taskMessage.InitiatingTaskId, taskMessage.TaskId)
, headers
).ConfigureAwait(false);

_logger.LogTrace($"Accepted incoming operation message. Operation id: '{taskMessage.OperationId}'");

var reply = new OperationTaskAcceptedEvent
{
OperationId = taskMessage.OperationId,
InitiatingTaskId = taskMessage.InitiatingTaskId,
TaskId = taskMessage.TaskId,
AdditionalData = _messageEnricher.EnrichTaskAcceptedReply(taskMessage)
AdditionalData = _messageEnricher.EnrichTaskAcceptedReply(taskMessage),
Created = taskMessage.Created
};

await _bus.Reply(reply).ConfigureAwait(false);
_logger.LogTrace($"Accepted incoming operation message. Operation id: '{taskMessage.OperationId}'");

Resubmit(_bus, taskMessage, headers);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,18 @@ public abstract class OperationManagerBase : IOperationManager

public abstract ValueTask<IOperation?> GetByIdAsync(Guid operationId);

public abstract ValueTask<IOperation> GetOrCreateAsync(Guid operationId, object command,
public abstract ValueTask<IOperation> GetOrCreateAsync(Guid operationId, object command,
DateTimeOffset timestamp,
object? additionalData,IDictionary<string,string>? additionalHeaders);


public abstract ValueTask AddProgressAsync(Guid progressId, DateTimeOffset timestamp, IOperation operation,
IOperationTask task,
object? data, IDictionary<string,string>? messageHeaders);

public abstract ValueTask<bool> TryChangeStatusAsync(IOperation operation, OperationStatus newStatus,
public abstract ValueTask<bool> TryChangeStatusAsync(IOperation operation,
OperationStatus newStatus,
DateTimeOffset timestamp,
object? additionalData, IDictionary<string,string>? messageHeaders);


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@ public abstract class OperationTaskManagerBase : IOperationTaskManager

public abstract ValueTask<IOperationTask?> GetByIdAsync(Guid taskId);

public abstract ValueTask<IOperationTask> GetOrCreateAsync(IOperation operation, object command, Guid taskId, Guid parentTaskId);
public abstract ValueTask<IOperationTask> GetOrCreateAsync(IOperation operation, object command,
DateTimeOffset created, Guid taskId, Guid parentTaskId);

public abstract ValueTask<bool> TryChangeStatusAsync(IOperationTask task, OperationTaskStatus newStatus,
public abstract ValueTask<bool> TryChangeStatusAsync(IOperationTask task,
OperationTaskStatus newStatus,
DateTimeOffset timestamp,
object? additionalData);


Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
using System.Threading.Tasks;
using System;
using System.Threading.Tasks;
using Dbosoft.Rebus.Operations.Events;
using JetBrains.Annotations;
using Microsoft.Extensions.Logging;
using Rebus.Handlers;
using Rebus.Logging;
using Rebus.Messages;
using Rebus.Pipeline;

namespace Dbosoft.Rebus.Operations.Workflow
Expand Down Expand Up @@ -35,12 +38,30 @@ public async Task Handle(OperationTaskProgressEvent message)

if (operation != null && task!=null)
{
if (task.Status == OperationTaskStatus.Queued)
{
var deferCount = 0;
if (MessageContext.Current.Headers.TryGetValue(Headers.DeferCount,
out var deferCountString))
{
deferCount = int.Parse(deferCountString);
}

if (deferCount < 5)
{
_logger.LogDebug("Operation Workflow {operationId}, Task {taskId}: Progress event received for queued task, deferred {deferCount} times, deferring for {deferTime} ms",
message.OperationId, message.TaskId, deferCount, 100 * (deferCount + 1));
await _workflow.Messaging.DeferredCurrentMessage(TimeSpan.FromMilliseconds(100 * (deferCount + 1))).ConfigureAwait(false);
return;
}
}

await _workflow.Operations.AddProgressAsync(
message.Id,
message.Timestamp,
operation,
task,
message.Data, MessageContext.Current.Headers);
message.Data, MessageContext.Current.Headers).ConfigureAwait(false);
}
else
{
Expand Down
Loading

0 comments on commit e566664

Please sign in to comment.