Skip to content

Commit e87ae55

Browse files
committed
Add Pipeline to received context, this mean when you got a new request to fetch data, you can add the pipeline to handle what you want!
1 parent 169666f commit e87ae55

File tree

11 files changed

+125
-17
lines changed

11 files changed

+125
-17
lines changed

OfX.sln

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,4 +31,6 @@ Global
3131
{36F9049C-6A77-4EF7-9CE0-86A22CA36DEA}.Release|Any CPU.ActiveCfg = Release|Any CPU
3232
{36F9049C-6A77-4EF7-9CE0-86A22CA36DEA}.Release|Any CPU.Build.0 = Release|Any CPU
3333
EndGlobalSection
34+
GlobalSection(NestedProjects) = preSolution
35+
EndGlobalSection
3436
EndGlobal

src/OfX.EntityFrameworkCore/EfQueryOfHandler.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
using OfX.Abstractions;
77
using OfX.Attributes;
88
using OfX.EntityFrameworkCore.Delegates;
9-
using OfX.Exceptions;
109
using OfX.Responses;
1110

1211
namespace OfX.EntityFrameworkCore;

src/OfX.Grpc/Servers/OfXGrpcServer.cs

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ namespace OfX.Grpc.Servers;
1313

1414
public sealed class OfXGrpcServer(IServiceProvider serviceProvider) : OfXTransportService.OfXTransportServiceBase
1515
{
16-
private const string GetDataAsync = nameof(GetDataAsync);
16+
private const string ExecuteAsync = nameof(ExecuteAsync);
1717

1818
private static readonly Lazy<ConcurrentDictionary<Type, MethodInfo>> MethodInfoStorage =
1919
new(() => new ConcurrentDictionary<Type, MethodInfo>());
@@ -30,12 +30,16 @@ public override async Task<OfXItemsGrpcResponse> GetItems(GetOfXGrpcQuery reques
3030
if (!OfXCached.QueryMapHandler.TryGetValue(attributeType, out var handlerType))
3131
throw new OfXGrpcExceptions.CannotFindHandlerForOfAttribute(attributeType);
3232

33-
var handler = serviceProvider.GetRequiredService(handlerType);
33+
var modelArg = handlerType.GetGenericArguments()[0];
3434

35-
var genericMethod = MethodInfoStorage.Value.GetOrAdd(attributeType, q => handler.GetType().GetMethods()
35+
var pipeline = serviceProvider
36+
.GetRequiredService(typeof(ReceivedPipelinesImpl<,>).MakeGenericType(modelArg, attributeType));
37+
38+
var pipelineMethod = MethodInfoStorage.Value.GetOrAdd(attributeType, q => pipeline.GetType().GetMethods()
3639
.FirstOrDefault(m =>
37-
m.Name == GetDataAsync && m.GetParameters() is { Length: 1 } parameters &&
40+
m.Name == ExecuteAsync && m.GetParameters() is { Length: 1 } parameters &&
3841
parameters[0].ParameterType == typeof(RequestContext<>).MakeGenericType(q)));
42+
3943
var requestContextType = typeof(RequestContextImpl<>).MakeGenericType(attributeType);
4044

4145
var queryType = typeof(RequestOf<>).MakeGenericType(attributeType);
@@ -47,9 +51,8 @@ public override async Task<OfXItemsGrpcResponse> GetItems(GetOfXGrpcQuery reques
4751
.CreateInstance(requestContextType, query, headers, context.CancellationToken);
4852
object[] arguments = [requestContext];
4953
// Invoke the method and get the result
50-
var requestTask = ((Task<ItemsResponse<OfXDataResponse>>)genericMethod
51-
.Invoke(handler, arguments))!;
52-
var response = await requestTask;
54+
var response = await ((Task<ItemsResponse<OfXDataResponse>>)pipelineMethod!
55+
.Invoke(pipeline, arguments))!;
5356
var res = new OfXItemsGrpcResponse();
5457
response.Items.ForEach(a => res.Items.Add(new ItemGrpc { Id = a.Id, Value = a.Value }));
5558
return res;

src/OfX.Tests/OfXCoreTests.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
using OfX.Tests.Attributes;
88
using OfX.Tests.Contexts;
99
using OfX.Tests.Models;
10+
using OfX.Tests.Pipelines;
1011
using Xunit;
1112

1213
namespace OfX.Tests;
@@ -26,9 +27,8 @@ public OfXCoreTests()
2627
options.AddAttributesContainNamespaces(assembly);
2728
options.AddHandlersFromNamespaceContaining<ITestAssemblyMarker>();
2829
options.AddGrpcClients(c =>
29-
{
30-
c.AddGrpcHostWithOfXAttributes("localhost:5001", [typeof(UserOfAttribute)]);
31-
});
30+
c.AddGrpcHostWithOfXAttributes("localhost:5001", [typeof(UserOfAttribute)]));
31+
options.AddReceivedPipelines(c => c.OfType(typeof(TestReceivedPipelinesImpl<>)));
3232
})
3333
.AddOfXEFCore(options =>
3434
{
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
using OfX.Abstractions;
2+
using OfX.Attributes;
3+
using OfX.Responses;
4+
5+
namespace OfX.Tests.Pipelines;
6+
7+
public sealed class TestReceivedPipelinesImpl<TAttribute> : IReceivedPipelineBehavior<TAttribute>
8+
where TAttribute : OfXAttribute
9+
{
10+
public async Task<ItemsResponse<OfXDataResponse>> HandleAsync(RequestContext<TAttribute> requestContext,
11+
Func<Task<ItemsResponse<OfXDataResponse>>> next)
12+
{
13+
var result = await next.Invoke();
14+
Console.WriteLine("Here is TestReceivedPipelinesImpl<>");
15+
return result;
16+
}
17+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
using OfX.Attributes;
2+
using OfX.Responses;
3+
4+
namespace OfX.Abstractions;
5+
6+
public interface IReceivedPipelineBehavior<TTAttribute> where TTAttribute : OfXAttribute
7+
{
8+
Task<ItemsResponse<OfXDataResponse>> HandleAsync(RequestContext<TTAttribute> requestContext,
9+
Func<Task<ItemsResponse<OfXDataResponse>>> next);
10+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
using Microsoft.Extensions.DependencyInjection;
2+
using OfX.Abstractions;
3+
using OfX.Exceptions;
4+
5+
namespace OfX.ApplicationModels;
6+
7+
public sealed class ReceivedPipeline(IServiceCollection serviceCollection)
8+
{
9+
private static readonly Type interfaceReceivedPipeline = typeof(IReceivedPipelineBehavior<>);
10+
11+
public ReceivedPipeline OfType<TReceivedPipeline>()
12+
{
13+
OfType(typeof(TReceivedPipeline));
14+
return this;
15+
}
16+
17+
// Hmmm, this one is temporary!. I think should test more case!
18+
public ReceivedPipeline OfType(Type pipelineType)
19+
{
20+
var signatureInterfaceType = pipelineType.GetInterfaces()
21+
.FirstOrDefault(a => a.IsGenericType && a.GetGenericTypeDefinition() == interfaceReceivedPipeline);
22+
if (signatureInterfaceType is null)
23+
throw new OfXException.PipelineIsNotReceivedPipelineBehavior(pipelineType);
24+
if (pipelineType.IsGenericType)
25+
{
26+
var isContainsGenericParameters = pipelineType.ContainsGenericParameters;
27+
if (isContainsGenericParameters)
28+
{
29+
serviceCollection.AddScoped(interfaceReceivedPipeline, pipelineType);
30+
return this;
31+
}
32+
}
33+
serviceCollection.AddScoped(signatureInterfaceType, pipelineType);
34+
return this;
35+
}
36+
}

src/OfX/Exceptions/OfXException.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,7 @@ public sealed class AttributesFromNamespaceShouldBeAdded() :
1010

1111
public sealed class CurrentIdTypeWasNotSupported() :
1212
Exception("Current Id type was not supported. Please create a join us to contribute more!");
13+
14+
public sealed class PipelineIsNotReceivedPipelineBehavior(Type type) :
15+
Exception($"The input pipeline: {type.Name} is not matched with ReceivedPipelineBehavior. Please check again!");
1316
}

src/OfX/Extensions/OfXExtensions.cs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,6 @@ public static OfXRegister AddOfX(this IServiceCollection serviceCollection, Acti
4646
});
4747
}
4848

49-
50-
serviceCollection.AddScoped<IDataMappableService>(sp =>
51-
new DataMappableService(sp, newOfRegister.AttributesRegister));
52-
5349
var defaultImplementedInterface = typeof(DefaultMappableRequestHandler<>);
5450
newOfRegister.AttributesRegister.SelectMany(a => a.ExportedTypes)
5551
.Where(t => t is { IsClass: true, IsAbstract: false } && typeof(OfXAttribute).IsAssignableFrom(t))
@@ -63,12 +59,17 @@ public static OfXRegister AddOfX(this IServiceCollection serviceCollection, Acti
6359
// So we have to replace the default service if existed -> Good!
6460
serviceCollection.TryAddScoped(parentType, defaultImplementedService);
6561
});
62+
63+
serviceCollection.AddScoped<IDataMappableService>(sp =>
64+
new DataMappableService(sp, newOfRegister.AttributesRegister));
65+
66+
serviceCollection.AddScoped(typeof(ReceivedPipelinesImpl<,>));
67+
6668
return newOfRegister;
6769
}
6870

6971
public static void AddExtensionHandler(this IExtensionHandlersInstaller extensionHandlersInstaller,
70-
Type serviceType,
71-
Type implementationType, Type attributeType)
72+
Type serviceType, Type implementationType, Type attributeType)
7273
{
7374
if (!OfXCached.InternalQueryMapHandler.TryAdd(attributeType, serviceType))
7475
throw new OfXException.RequestMustNotBeAddMoreThanOneTimes();
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
using OfX.ApplicationModels;
2+
using OfX.Registries;
3+
4+
namespace OfX.Extensions;
5+
6+
public static class PipelineExtensions
7+
{
8+
public static void AddReceivedPipelines(this OfXRegister ofXRegister, Action<ReceivedPipeline> options)
9+
{
10+
var receivedPipeline = new ReceivedPipeline(ofXRegister.ServiceCollection);
11+
options.Invoke(receivedPipeline);
12+
}
13+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
using OfX.Abstractions;
2+
using OfX.Attributes;
3+
using OfX.Responses;
4+
5+
namespace OfX.Implementations;
6+
7+
public class ReceivedPipelinesImpl<TModel, TAttribute>(
8+
IEnumerable<IReceivedPipelineBehavior<TAttribute>> behaviors,
9+
IQueryOfHandler<TModel, TAttribute> handler)
10+
where TAttribute : OfXAttribute where TModel : class
11+
{
12+
public async Task<ItemsResponse<OfXDataResponse>> ExecuteAsync(RequestContext<TAttribute> request)
13+
{
14+
var next = new Func<Task<ItemsResponse<OfXDataResponse>>>(() => handler.GetDataAsync(request));
15+
16+
foreach (var behavior in behaviors.Reverse())
17+
{
18+
var current = next;
19+
next = () => behavior.HandleAsync(request, current);
20+
}
21+
22+
return await next();
23+
}
24+
}

0 commit comments

Comments
 (0)