diff --git a/src/DotNetCore.CAP/CAP.Builder.cs b/src/DotNetCore.CAP/CAP.Builder.cs
index 31e6f977..bdcb8954 100644
--- a/src/DotNetCore.CAP/CAP.Builder.cs
+++ b/src/DotNetCore.CAP/CAP.Builder.cs
@@ -101,7 +101,7 @@ public CapBuilder(IServiceCollection services)
/// Type of filter
public CapBuilder AddSubscribeFilter() where T : class, ISubscribeFilter
{
- Services.TryAddScoped();
+ Services.AddScoped();
return this;
}
diff --git a/src/DotNetCore.CAP/Internal/ISubscribeInvoker.Default.cs b/src/DotNetCore.CAP/Internal/ISubscribeInvoker.Default.cs
index 81ec004f..313f1fc4 100644
--- a/src/DotNetCore.CAP/Internal/ISubscribeInvoker.Default.cs
+++ b/src/DotNetCore.CAP/Internal/ISubscribeInvoker.Default.cs
@@ -3,6 +3,7 @@
using System;
using System.Collections.Concurrent;
+using System.Collections.Generic;
using System.ComponentModel;
using System.Linq;
using System.Threading;
@@ -87,40 +88,45 @@ public async Task InvokeAsync(ConsumerContext context,
}
}
- var filter = provider.GetService();
+ var filters = provider.GetServices().ToList();
+ Stack executedFilters = new Stack();
+
object? resultObj = null;
try
{
- if (filter != null)
+ foreach (var filter in filters)
{
- var etContext = new ExecutingContext(context, executeParameters);
- await filter.OnSubscribeExecutingAsync(etContext).ConfigureAwait(false);
- executeParameters = etContext.Arguments;
+ var ctx = new ExecutingContext(context, executeParameters);
+ await filter.OnSubscribeExecutingAsync(ctx).ConfigureAwait(false);
+ executeParameters = ctx.Arguments;
+ executedFilters.Push(filter);
}
resultObj = await ExecuteWithParameterAsync(executor, obj, executeParameters).ConfigureAwait(false);
- if (filter != null)
+ while (executedFilters.Count > 0)
{
- var edContext = new ExecutedContext(context, resultObj);
- await filter.OnSubscribeExecutedAsync(edContext).ConfigureAwait(false);
- resultObj = edContext.Result;
+ var filter = executedFilters.Peek();
+ var ctx = new ExecutedContext(context, resultObj);
+ await filter.OnSubscribeExecutedAsync(ctx).ConfigureAwait(false);
+ resultObj = ctx.Result;
+ executedFilters.Pop();
}
}
- catch (Exception e)
+ catch (Exception ex)
{
- if (filter != null)
+ if (executedFilters.Count == 0)
+ throw;
+ while (executedFilters.Count > 0)
{
- var exContext = new ExceptionContext(context, e);
+ var exContext = new ExceptionContext(context, ex);
+ var filter = executedFilters.Pop();
await filter.OnSubscribeExceptionAsync(exContext).ConfigureAwait(false);
+
if (!exContext.ExceptionHandled) exContext.Exception.ReThrow();
if (exContext.Result != null) resultObj = exContext.Result;
}
- else
- {
- throw;
- }
}
var callbackName = message.GetCallbackName();