Skip to content

Commit

Permalink
Subtasks failure is not always processed in parent tasks (#8)
Browse files Browse the repository at this point in the history
If a subtasks failure is handled by FailedOperationHandler the parent is not notified. 
OperationSaga now makes sure that the parent of a failed task is always notified.
  • Loading branch information
fw2568 authored Feb 12, 2024
1 parent 92dd03c commit 310ac89
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,24 @@ await _workflow.Messaging.DispatchOperationStatusEventAsync(new OperationStatusE

MarkAsComplete();
}
else
{
// capture failed operations and send status events to initiating task
if (message.OperationFailed)
{
var initiatingTask = await _workflow.Tasks
.GetByIdAsync(message.TaskId)
.ConfigureAwait(false);

if (initiatingTask != null && Data.Tasks.TryGetValue(initiatingTask.Id, out var taskCommandTypeName))
{
message.InitiatingTaskId = initiatingTask.InitiatingTaskId;
message.TaskId = initiatingTask.Id;
await _workflow.Messaging.DispatchTaskStatusEventAsync(taskCommandTypeName, message);
}
}
}

}

}
Expand Down
5 changes: 5 additions & 0 deletions test/Rebus.Operations.Tests/StepTwoCommandHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ namespace Dbosoft.Rebus.Operations.Tests;
public class StepTwoCommandHandler : IHandleMessages<OperationTask<StepTwoCommand>>
{
private readonly ITaskMessaging _messaging;
public bool Throws { get; set; }

public StepTwoCommandHandler(ITaskMessaging messaging)
{
Expand All @@ -18,6 +19,10 @@ public StepTwoCommandHandler(ITaskMessaging messaging)
public Task Handle(OperationTask<StepTwoCommand> message)
{
Called = true;

if (Throws)
throw new Exception("Failed");

return _messaging.CompleteTask(message);
}
}
7 changes: 3 additions & 4 deletions test/Rebus.Operations.Tests/TestCommandHandlerWithError.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,16 @@ namespace Dbosoft.Rebus.Operations.Tests;
public class TestCommandHandlerWithError : IHandleMessages<OperationTask<TestCommand>>
{
private readonly ITaskMessaging _messaging;
private readonly bool _throws;
public bool Throws { get; set; }

public TestCommandHandlerWithError(bool throws, ITaskMessaging messaging)
public TestCommandHandlerWithError(ITaskMessaging messaging)
{
_throws = throws;
_messaging = messaging;
}

public async Task Handle(OperationTask<TestCommand> message)
{
if (_throws)
if (Throws)
throw new InvalidOperationException();

await _messaging.FailTask(message, "error");
Expand Down
69 changes: 67 additions & 2 deletions test/Rebus.Operations.Tests/WorkflowTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using Dbosoft.Rebus.Operations.Events;
using Dbosoft.Rebus.Operations.Workflow;
using Microsoft.Extensions.Logging.Abstractions;
using Rebus.Retry.Simple;
using Xunit;
using Xunit.Abstractions;

Expand Down Expand Up @@ -92,6 +93,62 @@ public async Task MultiStep_Operation_is_processed(bool sendMode, string eventDe
}
}

[Theory]
[InlineData(false, "")]
[InlineData(true, "")]
[InlineData(false, "main")]
[InlineData(true, "main")]
public async Task MultiStep_Operation_Exception_is_reported(bool sendMode, string eventDestination)
{
StepOneCommandHandler? stepOneHandler;
StepTwoCommandHandler? stepTwoHandler;
using var setup = await SetupRebus(sendMode, eventDestination, configureActivator: (activator, wf, tasks, bus) =>
{
activator.Register(() => new IncomingTaskMessageHandler<MultiStepCommand>(bus,
NullLogger<IncomingTaskMessageHandler<MultiStepCommand>>.Instance, new DefaultMessageEnricher()));
activator.Register(() => new IncomingTaskMessageHandler<StepOneCommand>(bus,
NullLogger<IncomingTaskMessageHandler<StepOneCommand>>.Instance, new DefaultMessageEnricher()));
activator.Register(() => new IncomingTaskMessageHandler<StepTwoCommand>(bus,
NullLogger<IncomingTaskMessageHandler<StepTwoCommand>>.Instance, new DefaultMessageEnricher()));
activator.Register(() => new EmptyOperationStatusEventHandler());
activator.Register(() => new MultiStepSaga(wf));
activator.Register(() => new FailedOperationHandler<OperationTask<StepTwoCommand>>(wf.WorkflowOptions,
NullLogger< FailedOperationHandler<OperationTask<StepTwoCommand>>>.Instance,
wf.Messaging));
stepOneHandler = new StepOneCommandHandler(tasks);
stepTwoHandler = new StepTwoCommandHandler(tasks){Throws = true};
activator.Register(() => stepOneHandler);
activator.Register(() => stepTwoHandler);
});

TestOperationManager.Reset();
TestTaskManager.Reset();
StepOneCommandHandler.Called = false;
StepTwoCommandHandler.Called = false;

await setup.OperationDispatcher.StartNew<MultiStepCommand>();

var timeout = new CancellationTokenSource(60000);
while (
!timeout.Token.IsCancellationRequested &&
(TestOperationManager.Operations.First().Value.Status == OperationStatus.Running ||
TestOperationManager.Operations.First().Value.Status == OperationStatus.Queued))
// ReSharper disable once MethodSupportsCancellation
{
await Task.Delay(1000);
}

Assert.True(StepOneCommandHandler.Called);
Assert.True(StepTwoCommandHandler.Called);
Assert.Single(TestOperationManager.Operations);
Assert.Equal(3, TestTaskManager.Tasks.Count);
Assert.Equal(OperationStatus.Failed, TestOperationManager.Operations.First().Value.Status);


}

[Theory]
[InlineData(false, "")]
[InlineData(true, "")]
Expand Down Expand Up @@ -132,7 +189,7 @@ public async Task SingleStep_Operation_failure_is_reported(bool throws)
{
activator.Register(() => new IncomingTaskMessageHandler<TestCommand>(bus,
NullLogger<IncomingTaskMessageHandler<TestCommand>>.Instance, new DefaultMessageEnricher()));
activator.Register(() => new TestCommandHandlerWithError(throws, tasks));
activator.Register(() => new TestCommandHandlerWithError(tasks){Throws = true});
activator.Register(() => new EmptyOperationStatusEventHandler());
activator.Register(() => new EmptyOperationTaskStatusEventHandler<TestCommand>());
activator.Register(() =>
Expand All @@ -145,7 +202,15 @@ public async Task SingleStep_Operation_failure_is_reported(bool throws)
TestTaskManager.Reset();

await setup.OperationDispatcher.StartNew<TestCommand>();
await Task.Delay(throws ? 2000: 1000);
var timeout = new CancellationTokenSource(10000);
while (
!timeout.Token.IsCancellationRequested &&
(TestOperationManager.Operations.First().Value.Status == OperationStatus.Running ||
TestOperationManager.Operations.First().Value.Status == OperationStatus.Queued))
// ReSharper disable once MethodSupportsCancellation
{
await Task.Delay(1000);
}
Assert.Equal(OperationStatus.Failed ,TestOperationManager.Operations.First().Value.Status);

}
Expand Down

0 comments on commit 310ac89

Please sign in to comment.