From d8a5f016eccbe363c03cffddb601a32baf05e68e Mon Sep 17 00:00:00 2001 From: Ailton Silva Date: Tue, 3 Oct 2023 08:57:36 -0300 Subject: [PATCH] feat: add event notification feature --- .../Consumers/IWorker.cs | 15 +- .../Consumers/WorkerStoppedSubject.cs | 19 -- .../Consumers/WorkerStoppingSubject.cs | 19 -- src/KafkaFlow.Abstractions/IEvent.cs | 32 +++ .../IEventSubscription.cs | 15 ++ .../Observer/ISubject.cs | 17 -- .../Observer/ISubjectObserver.cs | 18 -- .../Observer/Subject.cs | 53 ----- src/KafkaFlow.Abstractions/VoidObject.cs | 17 -- .../BatchConsumeMiddleware.cs | 10 +- .../BatchConsumeMiddlewareTests.cs | 2 +- src/KafkaFlow.UnitTests/EventTests.cs | 204 ++++++++++++++++++ src/KafkaFlow/ConsumerManagerFactory.cs | 2 +- src/KafkaFlow/Consumers/ConsumerWorker.cs | 13 +- src/KafkaFlow/Consumers/ConsumerWorkerPool.cs | 9 +- .../Consumers/WorkerPoolStoppedSubject.cs | 12 -- src/KafkaFlow/Event.cs | 62 ++++++ src/KafkaFlow/EventSubscription.cs | 21 ++ src/KafkaFlow/MiddlewareExecutor.cs | 8 +- 19 files changed, 357 insertions(+), 191 deletions(-) delete mode 100644 src/KafkaFlow.Abstractions/Consumers/WorkerStoppedSubject.cs delete mode 100644 src/KafkaFlow.Abstractions/Consumers/WorkerStoppingSubject.cs create mode 100644 src/KafkaFlow.Abstractions/IEvent.cs create mode 100644 src/KafkaFlow.Abstractions/IEventSubscription.cs delete mode 100644 src/KafkaFlow.Abstractions/Observer/ISubject.cs delete mode 100644 src/KafkaFlow.Abstractions/Observer/ISubjectObserver.cs delete mode 100644 src/KafkaFlow.Abstractions/Observer/Subject.cs delete mode 100644 src/KafkaFlow.Abstractions/VoidObject.cs create mode 100644 src/KafkaFlow.UnitTests/EventTests.cs delete mode 100644 src/KafkaFlow/Consumers/WorkerPoolStoppedSubject.cs create mode 100644 src/KafkaFlow/Event.cs create mode 100644 src/KafkaFlow/EventSubscription.cs diff --git a/src/KafkaFlow.Abstractions/Consumers/IWorker.cs b/src/KafkaFlow.Abstractions/Consumers/IWorker.cs index a90dc7209..2720d5187 100644 --- a/src/KafkaFlow.Abstractions/Consumers/IWorker.cs +++ b/src/KafkaFlow.Abstractions/Consumers/IWorker.cs @@ -1,7 +1,6 @@ namespace KafkaFlow { using System; - using KafkaFlow.Observer; /// /// Represents the interface of a internal worker @@ -14,19 +13,19 @@ public interface IWorker int Id { get; } /// - /// This handler is called immediately after a worker completes the consumption of a message + /// Gets the subject for worker stopping events where observers can subscribe to receive notifications. /// - /// to be executed - void OnTaskCompleted(Action handler); + IEvent WorkerStopping { get; } /// - /// Gets the subject for worker stopping events where observers can subscribe to receive notifications. + /// Gets the subject for worker stopped events where observers can subscribe to receive notifications. /// - ISubject WorkerStopping { get; } + IEvent WorkerStopped { get; } /// - /// Gets the subject for worker stopped events where observers can subscribe to receive notifications. + /// This handler is called immediately after a worker completes the consumption of a message /// - ISubject WorkerStopped { get; } + /// to be executed + void OnTaskCompleted(Action handler); } } diff --git a/src/KafkaFlow.Abstractions/Consumers/WorkerStoppedSubject.cs b/src/KafkaFlow.Abstractions/Consumers/WorkerStoppedSubject.cs deleted file mode 100644 index 4d92d1908..000000000 --- a/src/KafkaFlow.Abstractions/Consumers/WorkerStoppedSubject.cs +++ /dev/null @@ -1,19 +0,0 @@ -namespace KafkaFlow -{ - using KafkaFlow.Observer; - - /// - /// Represents a subject specific to worker stopped events where observers can subscribe to receive notifications. - /// - public class WorkerStoppedSubject : Subject - { - /// - /// Initializes a new instance of the class. - /// - /// The log handler object to be used - public WorkerStoppedSubject(ILogHandler logHandler) - : base(logHandler) - { - } - } -} diff --git a/src/KafkaFlow.Abstractions/Consumers/WorkerStoppingSubject.cs b/src/KafkaFlow.Abstractions/Consumers/WorkerStoppingSubject.cs deleted file mode 100644 index 816ce5f5f..000000000 --- a/src/KafkaFlow.Abstractions/Consumers/WorkerStoppingSubject.cs +++ /dev/null @@ -1,19 +0,0 @@ -namespace KafkaFlow -{ - using KafkaFlow.Observer; - - /// - /// Represents a subject specific to worker stopping events where observers can subscribe to receive notifications. - /// - public class WorkerStoppingSubject : Subject - { - /// - /// Initializes a new instance of the class. - /// - /// The log handler object to be used - public WorkerStoppingSubject(ILogHandler logHandler) - : base(logHandler) - { - } - } -} diff --git a/src/KafkaFlow.Abstractions/IEvent.cs b/src/KafkaFlow.Abstractions/IEvent.cs new file mode 100644 index 000000000..892426235 --- /dev/null +++ b/src/KafkaFlow.Abstractions/IEvent.cs @@ -0,0 +1,32 @@ +namespace KafkaFlow +{ + using System; + using System.Threading.Tasks; + + /// + /// Represents an Event to be subscribed. + /// + public interface IEvent + { + /// + /// Subscribes to the event. + /// + /// The handler to be called when the event is fired. + /// Event subscription reference + IEventSubscription Subscribe(Func handler); + } + + /// + /// Represents an Event to be subscribed. + /// + /// The argument expected by the event. + public interface IEvent + { + /// + /// Subscribes to the event. + /// + /// The handler to be called when the event is fired. + /// Event subscription reference + IEventSubscription Subscribe(Func handler); + } +} diff --git a/src/KafkaFlow.Abstractions/IEventSubscription.cs b/src/KafkaFlow.Abstractions/IEventSubscription.cs new file mode 100644 index 000000000..17e71b902 --- /dev/null +++ b/src/KafkaFlow.Abstractions/IEventSubscription.cs @@ -0,0 +1,15 @@ +using System; + +namespace KafkaFlow +{ + /// + /// Represents an Event subscription. + /// + public interface IEventSubscription : IDisposable + { + /// + /// Cancels the subscription to the event. + /// + void Cancel(); + } +} diff --git a/src/KafkaFlow.Abstractions/Observer/ISubject.cs b/src/KafkaFlow.Abstractions/Observer/ISubject.cs deleted file mode 100644 index 669d24591..000000000 --- a/src/KafkaFlow.Abstractions/Observer/ISubject.cs +++ /dev/null @@ -1,17 +0,0 @@ -namespace KafkaFlow.Observer -{ - /// - /// Represents a subject in the observer design pattern that can be observed by observers. - /// - /// The type of the subject. - /// An argument type that will be passed to the observers - public interface ISubject - where TSubject : Subject - { - /// - /// Subscribes an observer to the subject. - /// - /// The observer to subscribe. - void Subscribe(ISubjectObserver observer); - } -} diff --git a/src/KafkaFlow.Abstractions/Observer/ISubjectObserver.cs b/src/KafkaFlow.Abstractions/Observer/ISubjectObserver.cs deleted file mode 100644 index 223cd863f..000000000 --- a/src/KafkaFlow.Abstractions/Observer/ISubjectObserver.cs +++ /dev/null @@ -1,18 +0,0 @@ -namespace KafkaFlow.Observer -{ - using System.Threading.Tasks; - - /// - /// Represents an observer in the observer design pattern that can receive notifications from a subject. - /// - /// The type of the subject. - /// An argument type that will be passed to the observers - public interface ISubjectObserver - { - /// - /// Called when a notification is received from the subject. - /// - /// A task representing the asynchronous notification handling. - Task OnNotification(TSubject subject, TArg arg); - } -} diff --git a/src/KafkaFlow.Abstractions/Observer/Subject.cs b/src/KafkaFlow.Abstractions/Observer/Subject.cs deleted file mode 100644 index 31fb1bfe9..000000000 --- a/src/KafkaFlow.Abstractions/Observer/Subject.cs +++ /dev/null @@ -1,53 +0,0 @@ -namespace KafkaFlow.Observer -{ - using System; - using System.Collections.Generic; - using System.Threading.Tasks; - - /// - /// A generic implementation that should be extended to help the use of the notification system. - /// - /// The type of the subject. - /// An argument type that will be passed to the observers - public abstract class Subject : ISubject - where TSubject : Subject - { - private readonly ILogHandler logHandler; - private readonly List> observers = new(); - - /// - /// Initializes a new instance of the class. - /// - /// The log handler object to be used - protected Subject(ILogHandler logHandler) - { - this.logHandler = logHandler; - } - - /// - /// Subscribes an observer to the subject, allowing it to receive notifications. - /// - /// The observer to subscribe. - public void Subscribe(ISubjectObserver observer) => this.observers.Add(observer); - - /// - /// Notifies all subscribed observers asynchronously. - /// - /// The parameter passed by the client. - /// A task representing the asynchronous notification operation. - public async Task NotifyAsync(TArg arg) - { - foreach (var observer in this.observers) - { - try - { - await observer.OnNotification((TSubject)this, arg); - } - catch (Exception e) - { - this.logHandler.Error("Error notifying observer", e, new { Subject = this.GetType().Name }); - } - } - } - } -} diff --git a/src/KafkaFlow.Abstractions/VoidObject.cs b/src/KafkaFlow.Abstractions/VoidObject.cs deleted file mode 100644 index d59912d4c..000000000 --- a/src/KafkaFlow.Abstractions/VoidObject.cs +++ /dev/null @@ -1,17 +0,0 @@ -namespace KafkaFlow; - -/// -/// A type that represents an empty object that should be ignored -/// -public class VoidObject -{ - /// - /// Gets the unique instance value - /// - public static readonly VoidObject Value = new(); - - private VoidObject() - { - // Empty - } -} diff --git a/src/KafkaFlow.BatchConsume/BatchConsumeMiddleware.cs b/src/KafkaFlow.BatchConsume/BatchConsumeMiddleware.cs index 70cc87530..a6f0621df 100644 --- a/src/KafkaFlow.BatchConsume/BatchConsumeMiddleware.cs +++ b/src/KafkaFlow.BatchConsume/BatchConsumeMiddleware.cs @@ -7,12 +7,8 @@ using System.Threading.Tasks; using KafkaFlow.Configuration; using KafkaFlow.Consumers; - using KafkaFlow.Observer; - internal class BatchConsumeMiddleware - : IMessageMiddleware, - ISubjectObserver, - IDisposable + internal class BatchConsumeMiddleware : IMessageMiddleware, IDisposable { private readonly SemaphoreSlim dispatchSemaphore = new(1, 1); @@ -37,7 +33,7 @@ public BatchConsumeMiddleware( this.batch = new(batchSize); this.consumerConfiguration = middlewareContext.Consumer.Configuration; - middlewareContext.Worker.WorkerStopped.Subscribe(this); + middlewareContext.Worker.WorkerStopped.Subscribe(() => this.TriggerDispatchAndWaitAsync()); } public async Task Invoke(IMessageContext context, MiddlewareDelegate next) @@ -67,8 +63,6 @@ public async Task Invoke(IMessageContext context, MiddlewareDelegate next) } } - public async Task OnNotification(WorkerStoppedSubject subject, VoidObject arg) => await this.TriggerDispatchAndWaitAsync(); - public void Dispose() { this.dispatchTask?.Dispose(); diff --git a/src/KafkaFlow.UnitTests/BatchConsume/BatchConsumeMiddlewareTests.cs b/src/KafkaFlow.UnitTests/BatchConsume/BatchConsumeMiddlewareTests.cs index e55e85ae8..2269db974 100644 --- a/src/KafkaFlow.UnitTests/BatchConsume/BatchConsumeMiddlewareTests.cs +++ b/src/KafkaFlow.UnitTests/BatchConsume/BatchConsumeMiddlewareTests.cs @@ -51,7 +51,7 @@ public void Setup() workerMock .SetupGet(x => x.WorkerStopped) - .Returns(new WorkerStoppedSubject(this.logHandlerMock.Object)); + .Returns(new Event(this.logHandlerMock.Object)); consumerConfigurationMock .SetupGet(x => x.AutoMessageCompletion) diff --git a/src/KafkaFlow.UnitTests/EventTests.cs b/src/KafkaFlow.UnitTests/EventTests.cs new file mode 100644 index 000000000..53aabdee0 --- /dev/null +++ b/src/KafkaFlow.UnitTests/EventTests.cs @@ -0,0 +1,204 @@ +namespace KafkaFlow.UnitTests +{ + using System; + using System.Collections.Generic; + using System.Linq; + using System.Threading.Tasks; + using Microsoft.VisualStudio.TestTools.UnitTesting; + using Moq; + + [TestClass] + public class EventTests + { + private readonly Event target; + private readonly Event typedTarget; + + public EventTests() + { + var log = new Mock(); + this.target = new Event(log.Object); + this.typedTarget = new Event(log.Object); + } + + [TestMethod] + public async Task FireAsync_EventSubscribed_CallDelegateWithSuccess() + { + // Arrange + var numberOfCalls = 0; + + this.target.Subscribe(() => + { + numberOfCalls++; + return Task.CompletedTask; + }); + + // Act + await this.target.FireAsync(); + + // Assert + Assert.AreEqual(1, numberOfCalls); + } + + [TestMethod] + public async Task FireAsync_EventWithMultipleObservers_CallAllDelegatesWithSuccess() + { + // Arrange + var numberOfCalls = 0; + + this.target.Subscribe(() => + { + numberOfCalls++; + return Task.CompletedTask; + }); + + this.target.Subscribe(() => + { + numberOfCalls++; + return Task.CompletedTask; + }); + + // Act + await this.target.FireAsync(); + + // Assert + Assert.AreEqual(2, numberOfCalls); + } + + [TestMethod] + public async Task FireAsync_EventWithMultipleObserversAndErrors_CallAllDelegatesAndContinueWithoutErrors() + { + // Arrange + var numberOfCalls = 0; + + this.target.Subscribe(() => throw new NotImplementedException()); + + this.target.Subscribe(() => + { + numberOfCalls++; + return Task.CompletedTask; + }); + + // Act + await this.target.FireAsync(); + + // Assert + Assert.AreEqual(1, numberOfCalls); + } + + [TestMethod] + public async Task FireAsync_EventSubscribedWithArgument_CallDelegateWithSuccess() + { + // Arrange + var expectedArgument = Guid.NewGuid().ToString(); + var receivedArgument = string.Empty; + + this.typedTarget.Subscribe(arg => + { + receivedArgument = arg; + return Task.CompletedTask; + }); + + // Act + await this.typedTarget.FireAsync(expectedArgument); + + // Assert + Assert.AreEqual(expectedArgument, receivedArgument); + } + + [TestMethod] + public async Task FireAsync_EventWithMultipleObserversAndArgument_CallAllDelegatesWithSuccess() + { + // Arrange + var expectedArgument = Guid.NewGuid().ToString(); + var receivedArguments = new List(); + + this.typedTarget.Subscribe(arg => + { + receivedArguments.Add(arg); + return Task.CompletedTask; + }); + + this.typedTarget.Subscribe(arg => + { + receivedArguments.Add(arg); + return Task.CompletedTask; + }); + + // Act + await this.typedTarget.FireAsync(expectedArgument); + + // Assert + Assert.AreEqual(2, receivedArguments.Count); + Assert.IsTrue(receivedArguments.All(x => x == expectedArgument)); + } + + [TestMethod] + public async Task FireAsync_TypedEventWithMultipleObserversAndErrors_CallAllDelegatesAndContinueWithoutErrors() + { + // Arrange + var numberOfCalls = 0; + + this.typedTarget.Subscribe(_ => throw new NotImplementedException()); + + this.typedTarget.Subscribe(_ => + { + numberOfCalls++; + return Task.CompletedTask; + }); + + // Act + await this.typedTarget.FireAsync(Guid.NewGuid().ToString()); + + // Assert + Assert.AreEqual(1, numberOfCalls); + } + + [TestMethod] + public async Task FireAsync_DuplicatedEventHandler_CallHandlerOnce() + { + // Arrange + var expectedArgument = Guid.NewGuid().ToString(); + var receivedArguments = new List(); + + Func handler = (arg) => + { + receivedArguments.Add(arg); + return Task.CompletedTask; + }; + + this.typedTarget.Subscribe(handler); + this.typedTarget.Subscribe(handler); + + // Act + await this.typedTarget.FireAsync(expectedArgument); + + // Assert + Assert.AreEqual(1, receivedArguments.Count); + Assert.IsTrue(receivedArguments.All(x => x == expectedArgument)); + } + + [TestMethod] + public async Task FireAsync_UnsubscribeEventHandler_DoesNotCallHandler() + { + // Arrange + var expectedArgument = Guid.NewGuid().ToString(); + var receivedArguments = new List(); + + Func handler = (arg) => + { + receivedArguments.Add(arg); + return Task.CompletedTask; + }; + + var subscription = this.typedTarget.Subscribe(handler); + + subscription.Cancel(); + + // Act + await this.typedTarget.FireAsync(expectedArgument); + + // Assert + Assert.AreEqual(0, receivedArguments.Count); + } + } +} diff --git a/src/KafkaFlow/ConsumerManagerFactory.cs b/src/KafkaFlow/ConsumerManagerFactory.cs index a3c752bc2..33bce7adb 100644 --- a/src/KafkaFlow/ConsumerManagerFactory.cs +++ b/src/KafkaFlow/ConsumerManagerFactory.cs @@ -22,7 +22,7 @@ public IConsumerManager Create(IConsumerConfiguration configuration, IDependency configuration, logHandler); - consumerWorkerPool.WorkerPoolStopped.Subscribe(middlewareExecutor); + consumerWorkerPool.WorkerPoolStopped.Subscribe(() => middlewareExecutor.OnWorkerPoolStopped()); var feeder = new WorkerPoolFeeder( consumer, diff --git a/src/KafkaFlow/Consumers/ConsumerWorker.cs b/src/KafkaFlow/Consumers/ConsumerWorker.cs index 56e5978eb..81041dc69 100644 --- a/src/KafkaFlow/Consumers/ConsumerWorker.cs +++ b/src/KafkaFlow/Consumers/ConsumerWorker.cs @@ -4,7 +4,6 @@ namespace KafkaFlow.Consumers using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; - using KafkaFlow.Observer; internal class ConsumerWorker : IConsumerWorker { @@ -15,8 +14,8 @@ internal class ConsumerWorker : IConsumerWorker private readonly Channel messagesBuffer; - private readonly WorkerStoppingSubject workerStoppingSubject; - private readonly WorkerStoppedSubject workerStoppedSubject; + private readonly Event workerStoppingSubject; + private readonly Event workerStoppedSubject; private CancellationTokenSource stopCancellationTokenSource; private Task backgroundTask; @@ -51,9 +50,9 @@ public ConsumerWorker( public IDependencyResolver WorkerDependencyResolver => this.workerDependencyResolverScope.Resolver; - public ISubject WorkerStopping => this.workerStoppingSubject; + public IEvent WorkerStopping => this.workerStoppingSubject; - public ISubject WorkerStopped => this.workerStoppedSubject; + public IEvent WorkerStopped => this.workerStoppedSubject; public ValueTask EnqueueAsync( IMessageContext context, @@ -98,7 +97,7 @@ public Task StartAsync() public async Task StopAsync() { - await this.workerStoppingSubject.NotifyAsync(VoidObject.Value); + await this.workerStoppingSubject.FireAsync(); this.messagesBuffer.Writer.TryComplete(); @@ -109,7 +108,7 @@ public async Task StopAsync() await this.backgroundTask.ConfigureAwait(false); - await this.workerStoppedSubject.NotifyAsync(VoidObject.Value); + await this.workerStoppedSubject.FireAsync(); } public void Dispose() diff --git a/src/KafkaFlow/Consumers/ConsumerWorkerPool.cs b/src/KafkaFlow/Consumers/ConsumerWorkerPool.cs index f82f494b9..5774d2c9e 100644 --- a/src/KafkaFlow/Consumers/ConsumerWorkerPool.cs +++ b/src/KafkaFlow/Consumers/ConsumerWorkerPool.cs @@ -7,7 +7,6 @@ namespace KafkaFlow.Consumers using System.Threading.Tasks; using Confluent.Kafka; using KafkaFlow.Configuration; - using KafkaFlow.Observer; internal class ConsumerWorkerPool : IConsumerWorkerPool { @@ -18,7 +17,7 @@ internal class ConsumerWorkerPool : IConsumerWorkerPool private readonly Factory distributionStrategyFactory; private readonly IOffsetCommitter offsetCommitter; - private readonly WorkerPoolStoppedSubject workerPoolStoppedSubject; + private readonly Event workerPoolStoppedSubject; private TaskCompletionSource startedTaskSource = new(); private List workers = new(); @@ -38,7 +37,7 @@ public ConsumerWorkerPool( this.middlewareExecutor = middlewareExecutor; this.logHandler = logHandler; this.distributionStrategyFactory = consumerConfiguration.DistributionStrategyFactory; - this.workerPoolStoppedSubject = new(logHandler); + this.workerPoolStoppedSubject = new Event(logHandler); this.offsetCommitter = consumer.Configuration.NoStoreOffsets ? new NullOffsetCommitter() : @@ -52,7 +51,7 @@ public ConsumerWorkerPool( public int CurrentWorkersCount { get; private set; } - public ISubject WorkerPoolStopped => this.workerPoolStoppedSubject; + public IEvent WorkerPoolStopped => this.workerPoolStoppedSubject; public async Task StartAsync(IReadOnlyCollection partitions, int workersCount) { @@ -121,7 +120,7 @@ public async Task StopAsync() this.offsetManager = null; - await this.workerPoolStoppedSubject.NotifyAsync(VoidObject.Value); + await this.workerPoolStoppedSubject.FireAsync(); await this.offsetCommitter.StopAsync(); } diff --git a/src/KafkaFlow/Consumers/WorkerPoolStoppedSubject.cs b/src/KafkaFlow/Consumers/WorkerPoolStoppedSubject.cs deleted file mode 100644 index 0a3a5124c..000000000 --- a/src/KafkaFlow/Consumers/WorkerPoolStoppedSubject.cs +++ /dev/null @@ -1,12 +0,0 @@ -namespace KafkaFlow.Consumers -{ - using KafkaFlow.Observer; - - internal class WorkerPoolStoppedSubject : Subject - { - public WorkerPoolStoppedSubject(ILogHandler logHandler) - : base(logHandler) - { - } - } -} diff --git a/src/KafkaFlow/Event.cs b/src/KafkaFlow/Event.cs new file mode 100644 index 000000000..232101232 --- /dev/null +++ b/src/KafkaFlow/Event.cs @@ -0,0 +1,62 @@ +namespace KafkaFlow +{ + using System; + using System.Collections.Generic; + using System.Threading.Tasks; + + internal class Event : IEvent + { + private readonly ILogHandler logHandler; + + private readonly IList> handlers = new List>(); + + public Event(ILogHandler logHandler) + { + this.logHandler = logHandler; + } + + public IEventSubscription Subscribe(Func handler) + { + if (!this.handlers.Contains(handler)) + { + this.handlers.Add(handler); + } + + return new EventSubscription(() => this.handlers.Remove(handler)); + } + + internal async Task FireAsync(TArg arg) + { + foreach (var handler in this.handlers) + { + try + { + if (handler is null) + { + continue; + } + + await handler.Invoke(arg); + } + catch (Exception e) + { + this.logHandler.Error("Error firing event", e, new { Event = this.GetType().Name }); + } + } + } + } + + internal class Event : IEvent + { + private readonly Event evt; + + public Event(ILogHandler logHandler) + { + this.evt = new Event(logHandler); + } + + public IEventSubscription Subscribe(Func handler) => this.evt.Subscribe(_ => handler.Invoke()); + + internal Task FireAsync() => this.evt.FireAsync(null); + } +} diff --git a/src/KafkaFlow/EventSubscription.cs b/src/KafkaFlow/EventSubscription.cs new file mode 100644 index 000000000..f2f7d169e --- /dev/null +++ b/src/KafkaFlow/EventSubscription.cs @@ -0,0 +1,21 @@ +namespace KafkaFlow +{ + using System; + + internal class EventSubscription : IEventSubscription + { + private readonly Action cancelDelegate; + + public EventSubscription(Action cancelDelegate) + { + this.cancelDelegate = cancelDelegate; + } + + public void Cancel() + { + this.cancelDelegate.Invoke(); + } + + public void Dispose() => this.Cancel(); + } +} diff --git a/src/KafkaFlow/MiddlewareExecutor.cs b/src/KafkaFlow/MiddlewareExecutor.cs index b759d6c37..b2710ff9d 100644 --- a/src/KafkaFlow/MiddlewareExecutor.cs +++ b/src/KafkaFlow/MiddlewareExecutor.cs @@ -5,12 +5,8 @@ namespace KafkaFlow using System.Linq; using System.Threading.Tasks; using KafkaFlow.Configuration; - using KafkaFlow.Consumers; - using KafkaFlow.Observer; - internal class MiddlewareExecutor - : IMiddlewareExecutor, - ISubjectObserver + internal class MiddlewareExecutor : IMiddlewareExecutor { private readonly IReadOnlyList configurations; @@ -27,7 +23,7 @@ public Task Execute(IMessageContext context, Func nextOpe return this.ExecuteDefinition(0, context, nextOperation); } - public Task OnNotification(WorkerPoolStoppedSubject subject, VoidObject arg) + internal Task OnWorkerPoolStopped() { this.workersMiddlewares.Clear(); return Task.CompletedTask;