Skip to content

Commit

Permalink
bug: inconsistent serialization (#6)
Browse files Browse the repository at this point in the history
SerializationOptions from v0.5 are not considered everywhere, causing messages to be cleared when send between components.
  • Loading branch information
fw2568 authored Jul 28, 2023
1 parent 9a80fe6 commit 480d2fa
Show file tree
Hide file tree
Showing 10 changed files with 42 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@ public interface IWorkflow
IOperationManager Operations { get; }
IOperationTaskManager Tasks { get; }
IOperationMessaging Messaging { get; }


WorkflowOptions WorkflowOptions{ get; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public Task FailTask(IOperationTaskMessage message, ErrorData error, IDictionary
return _bus.SendWorkflowEvent(_options,
OperationTaskStatusEvent.Failed(
message.OperationId, message.InitiatingTaskId,
message.TaskId, error),additionalHeaders );
message.TaskId, error,_options.JsonSerializerOptions),additionalHeaders );
}


Expand All @@ -44,7 +44,8 @@ public Task CompleteTask(IOperationTaskMessage message, object responseMessage,
{
return _bus.SendWorkflowEvent(_options,
OperationTaskStatusEvent.Completed(
message.OperationId, message.InitiatingTaskId, message.TaskId, responseMessage), additionalHeaders);
message.OperationId, message.InitiatingTaskId, message.TaskId, responseMessage,
_options.JsonSerializerOptions), additionalHeaders);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ namespace Dbosoft.Rebus.Operations.Workflow;

public class DefaultWorkflow : IWorkflow
{
public DefaultWorkflow(IOperationManager operation, IOperationTaskManager tasks, IOperationMessaging messaging)
public DefaultWorkflow(WorkflowOptions workflowOptions, IOperationManager operation, IOperationTaskManager tasks, IOperationMessaging messaging)
{
WorkflowOptions = workflowOptions;
Operations = operation;
Tasks = tasks;
Messaging = messaging;
}

public WorkflowOptions WorkflowOptions { get; }
public IOperationManager Operations { get; }
public IOperationTaskManager Tasks { get; }
public IOperationMessaging Messaging { get; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,14 @@ public class FailedOperationHandler<T> : IHandleMessages<IFailed<T>> where T: IO
{
private readonly IOperationMessaging _operationMessaging;

private readonly WorkflowOptions _workflowOptions;
private readonly ILogger<FailedOperationHandler<T>> _logger;

public FailedOperationHandler(ILogger<FailedOperationHandler<T>> logger, IOperationMessaging operationMessaging)
public FailedOperationHandler(
WorkflowOptions workflowOptions,
ILogger<FailedOperationHandler<T>> logger, IOperationMessaging operationMessaging)
{
_workflowOptions = workflowOptions;
_logger = logger;
_operationMessaging = operationMessaging;
}
Expand All @@ -32,7 +36,8 @@ public async Task Handle(IFailed<T> failedMessage)
await _operationMessaging.DispatchTaskStatusEventAsync(
OperationTaskStatusEvent.Failed(
failedMessage.Message.OperationId, failedMessage.Message.InitiatingTaskId,
failedMessage.Message.TaskId, new ErrorData() { ErrorMessage = failedMessage.ErrorDescription }));
failedMessage.Message.TaskId, new ErrorData() { ErrorMessage = failedMessage.ErrorDescription },
_workflowOptions.JsonSerializerOptions));


}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,21 +58,23 @@ private Task InitiatingTaskFailed()
protected Task Fail(object? message = null)
{
return WorkflowEngine.Messaging.DispatchTaskStatusEventAsync(OperationTaskStatusEvent.Failed(
Data.OperationId, Data.ParentTaskId, Data.SagaTaskId, message));
Data.OperationId, Data.ParentTaskId, Data.SagaTaskId, message,
WorkflowEngine.WorkflowOptions.JsonSerializerOptions));
}


protected Task Complete(object? message = null)
{
return WorkflowEngine.Messaging.DispatchTaskStatusEventAsync(OperationTaskStatusEvent.Completed(
Data.OperationId, Data.ParentTaskId, Data.SagaTaskId, message));
Data.OperationId, Data.ParentTaskId, Data.SagaTaskId, message,
WorkflowEngine.WorkflowOptions.JsonSerializerOptions));
}

protected Task FailOrRun<T>(OperationTaskStatusEvent<T> message, Func<Task> completedFunc)
where T : class, new()
{
if (message.OperationFailed)
return Fail(message.GetMessage());
return Fail(message.GetMessage(WorkflowEngine.WorkflowOptions.JsonSerializerOptions));

return completedFunc();
}
Expand All @@ -82,8 +84,8 @@ protected Task FailOrRun<T, TOpMessage>(OperationTaskStatusEvent<T> message, Fun
where TOpMessage : class
{
return message.OperationFailed
? Fail(message.GetMessage())
: completedFunc(message.GetMessage() as TOpMessage
? Fail(message.GetMessage(WorkflowEngine.WorkflowOptions.JsonSerializerOptions))
: completedFunc(message.GetMessage(WorkflowEngine.WorkflowOptions.JsonSerializerOptions) as TOpMessage
?? throw new InvalidOperationException(
$"Message {typeof(T)} has not returned a result of type {typeof(TOpMessage)}."));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ public async Task Handle(CreateNewOperationTaskCommand message)

var command = JsonSerializer.Deserialize(message.CommandData,
Type.GetType(message.CommandType) ??
throw new InvalidOperationException($"Operation Workflow {message.OperationId}: unknown command type '{message.CommandType}'"));
throw new InvalidOperationException($"Operation Workflow {message.OperationId}: unknown command type '{message.CommandType}'"),
_workflow.WorkflowOptions.JsonSerializerOptions);

if(command == null)
throw new InvalidOperationException($"Operation Workflow {message.OperationId}: invalid command data in message '{message.CommandType}'");
Expand Down Expand Up @@ -178,7 +179,7 @@ public async Task Handle(OperationTaskStatusEvent message)
message.OperationFailed
? OperationTaskStatus.Failed
: OperationTaskStatus.Completed
, message.GetMessage()))
, message.GetMessage(_workflow.WorkflowOptions.JsonSerializerOptions)))

_log.LogDebug("Operation Workflow {operationId}, Task {taskId}: Status changed: {oldStatus} -> {newStatus}",
message.OperationId, message.TaskId, taskOldStatus, task.Status);
Expand All @@ -197,7 +198,7 @@ public async Task Handle(OperationTaskStatusEvent message)


if (await _workflow.Operations.TryChangeStatusAsync(op,
newStatus, message.GetMessage(), MessageContext.Current.Headers))
newStatus, message.GetMessage(_workflow.WorkflowOptions.JsonSerializerOptions), MessageContext.Current.Headers))
{
await _workflow.Messaging.DispatchOperationStatusEventAsync(new OperationStatusEvent
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Text.Json;

namespace Dbosoft.Rebus.Operations.Events
{
Expand All @@ -22,9 +23,9 @@ public static OperationTaskStatusEvent Failed(Guid operationId, Guid initiatingT
return new OperationTaskStatusEvent(operationId, initiatingTaskId, taskId, true, null, null);
}

public static OperationTaskStatusEvent Failed(Guid operationId, Guid initiatingTaskId, Guid taskId, object? message)
public static OperationTaskStatusEvent Failed(Guid operationId, Guid initiatingTaskId, Guid taskId, object? message, JsonSerializerOptions serializerOptions)
{
var (data, typeName) = SerializeMessage(message);
var (data, typeName) = SerializeMessage(message, serializerOptions);
return new OperationTaskStatusEvent(operationId, initiatingTaskId, taskId, true, typeName, data);
}

Expand All @@ -33,9 +34,9 @@ public static OperationTaskStatusEvent Completed(Guid operationId, Guid initiati
return new OperationTaskStatusEvent(operationId, initiatingTaskId, taskId, false, null, null);
}

public static OperationTaskStatusEvent Completed(Guid operationId, Guid initiatingTaskId, Guid taskId, object? message)
public static OperationTaskStatusEvent Completed(Guid operationId, Guid initiatingTaskId, Guid taskId, object? message, JsonSerializerOptions serializerOptions)
{
var (data, typeName) = SerializeMessage(message);
var (data, typeName) = SerializeMessage(message, serializerOptions);
return new OperationTaskStatusEvent(operationId, initiatingTaskId,taskId, false, typeName, data);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,16 @@ protected OperationTaskStatusEventBase(Guid operationId, Guid initiatingTaskId,
public Guid TaskId { get; set; }
public Guid InitiatingTaskId { get; set; }

protected static (string? data, string? type) SerializeMessage(object? message)
protected static (string? data, string? type) SerializeMessage(object? message, JsonSerializerOptions serializerOptions)
{
if (message == null)
return (null, null);


return (JsonSerializer.Serialize(message), message.GetType().AssemblyQualifiedName);
return (JsonSerializer.Serialize(message, serializerOptions), message.GetType().AssemblyQualifiedName);
}

public object? GetMessage()
public object? GetMessage(JsonSerializerOptions serializerOptions)
{
if (MessageData == null || MessageType == null)
return null;
Expand All @@ -44,10 +44,10 @@ protected static (string? data, string? type) SerializeMessage(object? message)

return type == null
? null
: JsonSerializer.Deserialize(MessageData, type);
: JsonSerializer.Deserialize(MessageData, type, serializerOptions);
}

public T? GetErrorDetails<T>()
public T? GetErrorDetails<T>(JsonSerializerOptions serializerOptions)
{
if (MessageData == null || MessageType == null)
return default;
Expand All @@ -58,11 +58,11 @@ protected static (string? data, string? type) SerializeMessage(object? message)

if (type != typeof(ErrorData) && !type.IsSubclassOf(typeof(ErrorData))) return default;

var data = JsonSerializer.Deserialize(MessageData, type) as ErrorData;
var data = JsonSerializer.Deserialize(MessageData, type, serializerOptions) as ErrorData;

if ( data?.AdditionalData is JsonElement element)
{
return element.Deserialize<T>();
return element.Deserialize<T>(serializerOptions);
}

return default;
Expand Down
1 change: 1 addition & 0 deletions test/Rebus.Operations.Tests/RebusTestBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public async Task<TestRebusSetup> SetupRebus(

messageEnricher ??= new DefaultMessageEnricher();
var workflow = new DefaultWorkflow(
workflowOptions,
opManager, taskManager, new RebusOperationMessaging(busStarter.Bus,
opDispatcher, taskDispatcher,messageEnricher, workflowOptions ));

Expand Down
3 changes: 3 additions & 0 deletions test/Rebus.Operations.Tests/WorkflowTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ public async Task SingleStep_Operation_failure_is_reported(bool throws)
activator.Register(() => new EmptyOperationTaskStatusEventHandler<TestCommand>());
activator.Register(() =>
new FailedOperationHandler<OperationTask<TestCommand>>(
wf.WorkflowOptions,
NullLogger<FailedOperationHandler<OperationTask<TestCommand>>>.Instance,
wf.Messaging));
});
Expand All @@ -162,6 +163,7 @@ public async Task Headers_are_passed_to_task()
activator.Register(() => new EmptyOperationTaskStatusEventHandler<TestCommand>());
activator.Register(() =>
new FailedOperationHandler<OperationTask<TestCommand>>(
wf.WorkflowOptions,
NullLogger<FailedOperationHandler<OperationTask<TestCommand>>>.Instance,
wf.Messaging));
}, messageEnricher);
Expand All @@ -173,6 +175,7 @@ await setup.OperationDispatcher.StartNew<TestCommand>(additionalHeaders:
await Task.Delay(1000);
Assert.True(ExposingHeadersCommandHandler.Called);
var headers = ExposingHeadersCommandHandler.Headers;
Assert.NotNull(headers);
Assert.Contains(headers, x => x.Key == "custom_header");

}
Expand Down

0 comments on commit 480d2fa

Please sign in to comment.