Skip to content

Commit

Permalink
ready for 1.0.0
Browse files Browse the repository at this point in the history
  • Loading branch information
neuecc committed Apr 26, 2021
1 parent 1b6eca2 commit ffdbd05
Show file tree
Hide file tree
Showing 14 changed files with 203 additions and 38 deletions.
190 changes: 159 additions & 31 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,11 @@ The publisher/subscriber(internally we called MessageBroker) is managed by DI, i
`IPublisher<T>/ISubscriber<T>` is keyless(type only) however MessagePipe has similar interface `IPublisher<TKey, TMessage>/ISubscriber<TKey, TMessage>` that is keyed(topic) interface.

For example, our real usecase, // TODO sentence.
// TODO: Image
For example, our real usecase, There is an application that connects Unity and MagicOnion (a real-time communication framework like SignalR) and delivers it via a browser by Blazor. At that time, we needed something to connect Blazor's page (Browser lifecycle) and MagicOnion's Hub (Connection lifecycle) to transmit data. We also need to distribute the connections by their IDs.

`Browser <-> Blazor <- [MessagePipe] -> MagicOnion <-> Unity`

We solved this with the following code.

```csharp
// MagicOnion(similar as SignalR, realtime event framework for .NET and Unity)
Expand Down Expand Up @@ -162,80 +165,203 @@ public partial class BlazorPage : ComponentBase, IDisposable

> The main difference of Reactive Extensions' Subject is has no `OnCompleted`. OnCompleted may or may not be used, making it very difficult to determine the intent to the observer(subscriber). Also, we usually subscribe to multiple events from the same (different event type)publisher, and it is difficult to handle duplicate OnCompleted in that case. For this reason, MessagePipe only provides a simple Publish(OnNext). If you want to convey completion, please receive a separate event and perform dedicated processing there.
In addition to standard Pub/Sub, MessagePipe supports async handlers, mediator patterns with handlers that accept return values, and filters for pre-and-post execution customization.

Publish/Subscribe
---
Publish/Subscribe interface has keyed(topic) and keyless, sync and async interface.

```csharp
// keyless-sync
public interface IPublisher<TMessage>
{
void Publish(TMessage message);
}

public interface ISubscriber<TMessage>
{
public IDisposable Subscribe(IMessageHandler<TMessage> handler, params MessageHandlerFilter<TMessage>[] filters);
}

// keyless-async
public interface IAsyncPublisher<TMessage>
{
// async interface's publish is fire-and-forget
void Publish(TMessage message, CancellationToken cancellationToken = default(CancellationToken));
ValueTask PublishAsync(TMessage message, CancellationToken cancellationToken = default(CancellationToken));
ValueTask PublishAsync(TMessage message, AsyncPublishStrategy publishStrategy, CancellationToken cancellationToken = default(CancellationToken));
}

TODO: mediator
https://docs.microsoft.com/ja-jp/dotnet/architecture/microservices/microservice-ddd-cqrs-patterns/microservice-application-layer-implementation-web-api#implement-the-command-and-command-handler-patterns




async, await all.


public interface IAsyncSubscriber<TMessage>
{
IDisposable Subscribe(IAsyncMessageHandler<TMessage> asyncHandler, params AsyncMessageHandlerFilter<TMessage>[] filters);
}

// keyed-sync
public interface IPublisher<TKey, TMessage>
where TKey : notnull
{
void Publish(TKey key, TMessage message);
}

request/response.
public interface ISubscriber<TKey, TMessage>
where TKey : notnull
{
IDisposable Subscribe(TKey key, IMessageHandler<TMessage> handler, params MessageHandlerFilter<TMessage>[] filters);
}

// keyed-async
public interface IAsyncPublisher<TKey, TMessage>
where TKey : notnull
{
void Publish(TKey key, TMessage message, CancellationToken cancellationToken = default(CancellationToken));
ValueTask PublishAsync(TKey key, TMessage message, CancellationToken cancellationToken = default(CancellationToken));
ValueTask PublishAsync(TKey key, TMessage message, AsyncPublishStrategy publishStrategy, CancellationToken cancellationToken = default(CancellationToken));
}

public interface IAsyncSubscriber<TKey, TMessage>
where TKey : notnull
{
IDisposable Subscribe(TKey key, IAsyncMessageHandler<TMessage> asyncHandler, params AsyncMessageHandlerFilter<TMessage>[] filters);
}
```

filter
All are available in the form of `IPublisher/Subscribe<T>` in the DI. async handler can await all subscribers completed by `await PublishAsync`. Asynchronous methods can work sequentially or in parallel, depending on `AsyncPublishStrategy` (defaults is `Parallel`, can be changed by `MessagePipeOptions` or by specifying at publish time). If you don't need to wait, you can call `void Publish` to act as fire-and-forget.

The before and after of execution can be changed by passing a custom filter. See the [Filter](#filter) section for details.

If an error occurs, it will be propagated to the caller and subsequent subscribers will be stopped. This behavior can be changed by writing a filter to ignore errors.

Request/Response/All
---
Similar as [MediatR](https://github.com/jbogard/MediatR), implement support of mediator pattern.

```csharp
public interface IRequestHandler<in TRequest, out TResponse>
{
TResponse Invoke(TRequest request);
}

public interface IAsyncRequestHandler<in TRequest, TResponse>
{
ValueTask<TResponse> InvokeAsync(TRequest request, CancellationToken cancellationToken = default);
}
```

For example, declare handler for Ping type.

```csharp
public readonly struct Ping { }
public readonly struct Pong { }

public class PingPongHandler : IRequestHandler<Ping, Pong>
{
public Pong Invoke(Ping request)
{
Console.WriteLine("Ping called.");
return new Pong();
}
}
```

Publish/Subscribe
---
You can get handler like this.

```csharp
class FooController
{
IRequestHandler<Ping, Pong> requestHandler;

keyed(topic) and keyless interface.
// automatically instantiate PingPongHandler.
public FooController(IRequestHandler<Ping, Pong> requestHandler)
{
this.requestHandler = requestHandler;
}

public void Run()
{
var pong = this.requestHandler.Invoke(new Ping());
Console.WriteLine("PONG");
}
}
```

For more complex implementation patterns, [this Microsoft documentation](https://docs.microsoft.com/en-us/dotnet/architecture/microservices/microservice-ddd-cqrs-patterns/microservice-application-layer-implementation-web-api#implement-the-command-process-pipeline-with-a-mediator-pattern-mediatr) is applicable.

Declare many request handlers, you can use `IRequestAllHandler`, `IAsyncRequestAllHandler` instead of single handler.

```csharp
public interface IPublisher<TMessage>
public interface IRequestAllHandler<in TRequest, out TResponse>
{
void Publish(TMessage message);
TResponse[] InvokeAll(TRequest request);
IEnumerable<TResponse> InvokeAllLazy(TRequest request);
}

public interface ISubscriber<TMessage>
public interface IAsyncRequestAllHandler<in TRequest, TResponse>
{
public IDisposable Subscribe(IMessageHandler<TMessage> handler, params MessageHandlerFilter<TMessage>[] filters);
ValueTask<TResponse[]> InvokeAllAsync(TRequest request, CancellationToken cancellationToken = default);
ValueTask<TResponse[]> InvokeAllAsync(TRequest request, AsyncPublishStrategy publishStrategy, CancellationToken cancellationToken = default);
IAsyncEnumerable<TResponse> InvokeAllLazyAsync(TRequest request, CancellationToken cancellationToken = default);
}
```

```csharp
public class PingPongHandler1 : IRequestHandler<Ping, Pong>
{
public Pong Invoke(Ping request)
{
Console.WriteLine("Ping1 called.");
return new Pong();
}
}

with key






public class PingPongHandler2 : IRequestHandler<Ping, Pong>
{
public Pong Invoke(Ping request)
{
Console.WriteLine("Ping1 called.");
return new Pong();
}
}

Request/Response/All
---
class BarController
{
IRequestAllHandler<Ping, Pong> requestAllHandler;

public FooController(IRequestAllHandler<Ping, Pong> requestAllHandler)
{
this.requestAllHandler = requestAllHandler;
}

public void Run()
{
var pongs = this.requestAllHandler.InvokeAll(new Ping());
Console.WriteLine("PONG COUNT:" + pongs.Length);
}
}
```

Subscribe Extensions
---
`ISubscriber`(`IAsyncSubscriber`) interface requires `IMessageHandler<T>` to handle message.

with predicate
AsObservable
```csharp
public interface ISubscriber<TMessage>
{
IDisposable Subscribe(IMessageHandler<TMessage> handler, params MessageHandlerFilter<TMessage>[] filters);
}
```

However, the extension method allows you to write `Action<T>` directly.

```csharp
public static IDisposable Subscribe<TMessage>(this ISubscriber<TMessage> subscriber, Action<TMessage> handler, params MessageHandlerFilter<TMessage>[] filters)
public static IDisposable Subscribe<TMessage>(this ISubscriber<TMessage> subscriber, Action<TMessage> handler, Func<TMessage, bool> predicate, params MessageHandlerFilter<TMessage>[] filters)
public static IObservable<TMessage> AsObservable<TMessage>(this ISubscriber<TMessage> subscriber, params MessageHandlerFilter<TMessage>[] filters)
```

Also, the `Func<TMessage, bool>` overload can filter messages by predicate (internally implemented with PredicateFilter, where Order is int.MinValue and is always checked first).

`AsObservable` can convert message pipeline to `IObservable<T>`, it can handle by Reactive Extensions(in Unity, you can use `UniRx`).

Filter
---
Expand Down Expand Up @@ -283,6 +409,8 @@ Host.CreateDefaultBuilder()
});
```

use the filter by attribute, you can use these attributes: `[MessageHandlerFilter(type, order)]`, `[AsyncMessageHandlerFilter(type, order)]`, `[RequestHandlerFilter(type, order)]`, `[AsyncRequestHandlerFilter(type, order)]`.

These are idea showcase of filter.

```csharp
Expand Down
Binary file added opensource.snk
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
<TargetFramework>net5.0</TargetFramework>
<LangVersion>latest</LangVersion>
<Configurations>Debug;Release;WinBenchmark</Configurations>
<SignAssembly>true</SignAssembly>
<AssemblyOriginatorKeyFile>../../opensource.snk</AssemblyOriginatorKeyFile>
</PropertyGroup>

<ItemGroup>
Expand Down
2 changes: 2 additions & 0 deletions sandbox/MessagePipe.Sandbox.ConsoleApp/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,8 @@ public void CheckMoreAndMore()
}
}



public class PingHandler : IRequestHandler<Ping, Pong>
{
public Pong Invoke(Ping request)
Expand Down
15 changes: 15 additions & 0 deletions src/MessagePipe.Redis/MessagePipe.Redis.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,21 @@
<LangVersion>latest</LangVersion>
<Nullable>enable</Nullable>
<Configurations>Debug;Release;WinBenchmark</Configurations>

<!-- NuGet Packaging -->
<PackageVersion>$(Version)</PackageVersion>
<Company>Cysharp</Company>
<Authors>Cysharp</Authors>
<Copyright>© Cysharp, Inc.</Copyright>
<PackageTags>pubsub;eventaggregator</PackageTags>
<Description>Redis IDistributedPublisher/Subscriber provider for MessagePipe.</Description>
<PackageProjectUrl>https://github.com/Cysharp/MessagePipe</PackageProjectUrl>
<RepositoryUrl>$(PackageProjectUrl)</RepositoryUrl>
<RepositoryType>git</RepositoryType>
<PackageLicenseExpression>MIT</PackageLicenseExpression>
<SignAssembly>true</SignAssembly>
<AssemblyOriginatorKeyFile>../../opensource.snk</AssemblyOriginatorKeyFile>
<PackageIcon>../MessagePipe/Icon.png</PackageIcon>
</PropertyGroup>

<ItemGroup>
Expand Down
3 changes: 2 additions & 1 deletion src/MessagePipe.Redis/ServiceCollectionRedisExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
using MessagePipe;
using MessagePipe.Redis;
using Microsoft.Extensions.DependencyInjection;
using StackExchange.Redis;
using System;
using System.Collections.Generic;
using System.Text;

namespace Microsoft.Extensions.DependencyInjection
namespace MessagePipe
{
public static class ServiceCollectionRedisExtensions
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
using System;
using VContainer;
using Microsoft.Extensions.DependencyInjection;
using MessagePipe.VContainer;

namespace MessagePipe
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
using System;
using Zenject;
using Microsoft.Extensions.DependencyInjection;
using MessagePipe.Zenject;

namespace MessagePipe
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
using MessagePipe;
using MessagePipe.Internal;
#if !UNITY_2018_3_OR_NEWER
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
#endif
using System;
using System.Collections.Generic;
using System.Linq;

namespace Microsoft.Extensions.DependencyInjection
namespace MessagePipe
{
public static class ServiceCollectionExtensions
{
Expand Down
Binary file added src/MessagePipe/Icon.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
15 changes: 15 additions & 0 deletions src/MessagePipe/MessagePipe.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,21 @@
<Nullable>enable</Nullable>
<WarningsAsErrors>true</WarningsAsErrors>
<Configurations>Debug;Release;WinBenchmark</Configurations>

<!-- NuGet Packaging -->
<PackageVersion>$(Version)</PackageVersion>
<Company>Cysharp</Company>
<Authors>Cysharp</Authors>
<Copyright>© Cysharp, Inc.</Copyright>
<PackageTags>pubsub;eventaggregator</PackageTags>
<Description>High performance in-memory/distributed messaging pipeline for .NET and Unity.</Description>
<PackageProjectUrl>https://github.com/Cysharp/MessagePipe</PackageProjectUrl>
<RepositoryUrl>$(PackageProjectUrl)</RepositoryUrl>
<RepositoryType>git</RepositoryType>
<PackageLicenseExpression>MIT</PackageLicenseExpression>
<SignAssembly>true</SignAssembly>
<AssemblyOriginatorKeyFile>../../opensource.snk</AssemblyOriginatorKeyFile>
<PackageIcon>Icon.png</PackageIcon>
</PropertyGroup>

<ItemGroup>
Expand Down
3 changes: 2 additions & 1 deletion src/MessagePipe/ServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
using MessagePipe;
using MessagePipe.Internal;
#if !UNITY_2018_3_OR_NEWER
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
#endif
using System;
using System.Collections.Generic;
using System.Linq;

namespace Microsoft.Extensions.DependencyInjection
namespace MessagePipe
{
public static class ServiceCollectionExtensions
{
Expand Down
4 changes: 2 additions & 2 deletions src/MessagePipe/_InternalVisibleTo.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System.Runtime.CompilerServices;

[assembly: InternalsVisibleTo("MessagePipe.Tests")]
[assembly: InternalsVisibleTo("MessagePipe.Sandbox.ConsoleApp")]
[assembly: InternalsVisibleTo("MessagePipe.Tests, PublicKey=00240000048000009400000006020000002400005253413100040000010001000144ec28f1e9ef7b17dacc47425a7a153aea0a7baa590743a2d1a86f4b3e10a8a12712c6e647966bfd8bd6e830048b23bd42bbc56f179585c15b8c19cf86c0eed1b73c993dd7a93a30051dd50fdda0e4d6b65e6874e30f1c37cf8bcbc7fe02c7f2e6a0a3327c0ccc1631bf645f40732521fa0b41a30c178d08f7dd779d42a1ee")]
[assembly: InternalsVisibleTo("MessagePipe.Sandbox.ConsoleApp, PublicKey=00240000048000009400000006020000002400005253413100040000010001000144ec28f1e9ef7b17dacc47425a7a153aea0a7baa590743a2d1a86f4b3e10a8a12712c6e647966bfd8bd6e830048b23bd42bbc56f179585c15b8c19cf86c0eed1b73c993dd7a93a30051dd50fdda0e4d6b65e6874e30f1c37cf8bcbc7fe02c7f2e6a0a3327c0ccc1631bf645f40732521fa0b41a30c178d08f7dd779d42a1ee")]
2 changes: 2 additions & 0 deletions tests/MessagePipe.Tests/MessagePipe.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
<TargetFramework>net5.0</TargetFramework>
<IsPackable>false</IsPackable>
<Configurations>Debug;Release;WinBenchmark</Configurations>
<SignAssembly>true</SignAssembly>
<AssemblyOriginatorKeyFile>../../opensource.snk</AssemblyOriginatorKeyFile>
</PropertyGroup>

<ItemGroup>
Expand Down

0 comments on commit ffdbd05

Please sign in to comment.