Skip to content

Commit

Permalink
feat: add event notification feature
Browse files Browse the repository at this point in the history
  • Loading branch information
ailtonguitar committed Sep 29, 2023
1 parent 0970b4c commit bf2da76
Show file tree
Hide file tree
Showing 18 changed files with 206 additions and 186 deletions.
6 changes: 2 additions & 4 deletions src/KafkaFlow.Abstractions/Consumers/IWorker.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
namespace KafkaFlow
{
using System;
using KafkaFlow.Configuration;
using KafkaFlow.Observer;

/// <summary>
/// Represents the interface of a internal worker
Expand All @@ -23,11 +21,11 @@ public interface IWorker
/// <summary>
/// Gets the subject for worker stopping events where observers can subscribe to receive notifications.
/// </summary>
ISubject<WorkerStoppingSubject, VoidObject> WorkerStopping { get; }
IEvent WorkerStopping { get; }

/// <summary>
/// Gets the subject for worker stopped events where observers can subscribe to receive notifications.
/// </summary>
ISubject<WorkerStoppedSubject, VoidObject> WorkerStopped { get; }
IEvent WorkerStopped { get; }
}
}
19 changes: 0 additions & 19 deletions src/KafkaFlow.Abstractions/Consumers/WorkerStoppedSubject.cs

This file was deleted.

19 changes: 0 additions & 19 deletions src/KafkaFlow.Abstractions/Consumers/WorkerStoppingSubject.cs

This file was deleted.

15 changes: 15 additions & 0 deletions src/KafkaFlow.Abstractions/IEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
namespace KafkaFlow

Check warning on line 1 in src/KafkaFlow.Abstractions/IEvent.cs

View workflow job for this annotation

GitHub Actions / build

Missing XML comment for publicly visible type or member 'IEvent' [/home/runner/work/kafkaflow/kafkaflow/src/KafkaFlow.Abstractions/KafkaFlow.Abstractions.csproj]

Check warning on line 1 in src/KafkaFlow.Abstractions/IEvent.cs

View workflow job for this annotation

GitHub Actions / build

Missing XML comment for publicly visible type or member 'IEvent.Subscribe(Func<Task>)' [/home/runner/work/kafkaflow/kafkaflow/src/KafkaFlow.Abstractions/KafkaFlow.Abstractions.csproj]

Check warning on line 1 in src/KafkaFlow.Abstractions/IEvent.cs

View workflow job for this annotation

GitHub Actions / build

Missing XML comment for publicly visible type or member 'IEvent<TArg>' [/home/runner/work/kafkaflow/kafkaflow/src/KafkaFlow.Abstractions/KafkaFlow.Abstractions.csproj]

Check warning on line 1 in src/KafkaFlow.Abstractions/IEvent.cs

View workflow job for this annotation

GitHub Actions / build

Missing XML comment for publicly visible type or member 'IEvent<TArg>.Subscribe(Func<TArg, Task>)' [/home/runner/work/kafkaflow/kafkaflow/src/KafkaFlow.Abstractions/KafkaFlow.Abstractions.csproj]
{
using System;
using System.Threading.Tasks;

public interface IEvent

Check warning on line 6 in src/KafkaFlow.Abstractions/IEvent.cs

View workflow job for this annotation

GitHub Actions / Test deployment

Missing XML comment for publicly visible type or member 'IEvent'
{
void Subscribe(Func<Task> handle);

Check warning on line 8 in src/KafkaFlow.Abstractions/IEvent.cs

View workflow job for this annotation

GitHub Actions / Test deployment

Missing XML comment for publicly visible type or member 'IEvent.Subscribe(Func<Task>)'
}

public interface IEvent<TArg>

Check warning on line 11 in src/KafkaFlow.Abstractions/IEvent.cs

View workflow job for this annotation

GitHub Actions / Test deployment

Missing XML comment for publicly visible type or member 'IEvent<TArg>'
{
void Subscribe(Func<TArg, Task> handle);

Check warning on line 13 in src/KafkaFlow.Abstractions/IEvent.cs

View workflow job for this annotation

GitHub Actions / Test deployment

Missing XML comment for publicly visible type or member 'IEvent<TArg>.Subscribe(Func<TArg, Task>)'
}
}
17 changes: 0 additions & 17 deletions src/KafkaFlow.Abstractions/Observer/ISubject.cs

This file was deleted.

18 changes: 0 additions & 18 deletions src/KafkaFlow.Abstractions/Observer/ISubjectObserver.cs

This file was deleted.

53 changes: 0 additions & 53 deletions src/KafkaFlow.Abstractions/Observer/Subject.cs

This file was deleted.

17 changes: 0 additions & 17 deletions src/KafkaFlow.Abstractions/VoidObject.cs

This file was deleted.

12 changes: 12 additions & 0 deletions src/KafkaFlow.Admin.Dashboard/Properties/launchSettings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"profiles": {
"KafkaFlow.Admin.Dashboard": {
"commandName": "Project",
"launchBrowser": true,
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development"
},
"applicationUrl": "https://localhost:53859;http://localhost:53860"
}
}
}
10 changes: 2 additions & 8 deletions src/KafkaFlow.BatchConsume/BatchConsumeMiddleware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,8 @@
using System.Threading.Tasks;
using KafkaFlow.Configuration;
using KafkaFlow.Consumers;
using KafkaFlow.Observer;

internal class BatchConsumeMiddleware
: IMessageMiddleware,
ISubjectObserver<WorkerStoppedSubject, VoidObject>,
IDisposable
internal class BatchConsumeMiddleware : IMessageMiddleware, IDisposable
{
private readonly SemaphoreSlim dispatchSemaphore = new(1, 1);

Expand All @@ -37,7 +33,7 @@ public BatchConsumeMiddleware(
this.batch = new(batchSize);
this.consumerConfiguration = middlewareContext.Consumer.Configuration;

middlewareContext.Worker.WorkerStopped.Subscribe(this);
middlewareContext.Worker.WorkerStopped.Subscribe(() => this.TriggerDispatchAndWaitAsync());
}

public async Task Invoke(IMessageContext context, MiddlewareDelegate next)
Expand Down Expand Up @@ -67,8 +63,6 @@ public async Task Invoke(IMessageContext context, MiddlewareDelegate next)
}
}

public async Task OnNotification(WorkerStoppedSubject subject, VoidObject arg) => await this.TriggerDispatchAndWaitAsync();

public void Dispose()
{
this.dispatchTask?.Dispose();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void Setup()

workerMock
.SetupGet(x => x.WorkerStopped)
.Returns(new WorkerStoppedSubject(this.logHandlerMock.Object));
.Returns(new Event(this.logHandlerMock.Object));

consumerConfigurationMock
.SetupGet(x => x.AutoMessageCompletion)
Expand Down
87 changes: 87 additions & 0 deletions src/KafkaFlow.UnitTests/EventTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
namespace KafkaFlow.UnitTests
{
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Moq;

[TestClass]
public class EventTests
{
[TestMethod]
public async Task FireAsync_EventSubscribed_CallDelegateWithSuccess()
{
// Arrange
var numberOfCalls = 0;
var log = new Mock<ILogHandler>();

var event1 = new Event(log.Object);

event1.Subscribe(() =>
{
numberOfCalls++;
return Task.CompletedTask;
});

// Act
await event1.FireAsync();

// Assert
Assert.AreEqual(1, numberOfCalls);
}

[TestMethod]
public async Task FireAsync_EventWithMultipleObservers_CallAllDelegatesWithSuccess()
{
// Arrange
var numberOfCalls = 0;
var log = new Mock<ILogHandler>();

var event1 = new Event(log.Object);

event1.Subscribe(() =>
{
numberOfCalls++;
return Task.CompletedTask;
});

event1.Subscribe(() =>
{
numberOfCalls++;
return Task.CompletedTask;
});

// Act
await event1.FireAsync();

// Assert
Assert.AreEqual(2, numberOfCalls);
}

[TestMethod]
public async Task FireAsync_EventWithMultipleObserversAndErrors_CallAllDelegatesAndContinueWithoutErrors()
{
// Arrange
var numberOfCalls = 0;
var log = new Mock<ILogHandler>();

var event1 = new Event(log.Object);

event1.Subscribe(() =>
{
throw new System.Exception();
});

event1.Subscribe(() =>
{
numberOfCalls++;
return Task.CompletedTask;
});

// Act
await event1.FireAsync();

// Assert
Assert.AreEqual(1, numberOfCalls);
}
}
}
2 changes: 1 addition & 1 deletion src/KafkaFlow/ConsumerManagerFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public IConsumerManager Create(IConsumerConfiguration configuration, IDependency
configuration,
logHandler);

consumerWorkerPool.WorkerPoolStopped.Subscribe(middlewareExecutor);
consumerWorkerPool.WorkerPoolStopped.Subscribe(() => middlewareExecutor.OnNotification());

var feeder = new WorkerPoolFeeder(
consumer,
Expand Down
13 changes: 6 additions & 7 deletions src/KafkaFlow/Consumers/ConsumerWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ namespace KafkaFlow.Consumers
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using KafkaFlow.Observer;

internal class ConsumerWorker : IConsumerWorker
{
Expand All @@ -15,8 +14,8 @@ internal class ConsumerWorker : IConsumerWorker

private readonly Channel<IMessageContext> messagesBuffer;

private readonly WorkerStoppingSubject workerStoppingSubject;
private readonly WorkerStoppedSubject workerStoppedSubject;
private readonly Event workerStoppingSubject;
private readonly Event workerStoppedSubject;

private CancellationTokenSource stopCancellationTokenSource;
private Task backgroundTask;
Expand Down Expand Up @@ -51,9 +50,9 @@ public ConsumerWorker(

public IDependencyResolver WorkerDependencyResolver => this.workerDependencyResolverScope.Resolver;

public ISubject<WorkerStoppingSubject, VoidObject> WorkerStopping => this.workerStoppingSubject;
public IEvent WorkerStopping => this.workerStoppingSubject;

public ISubject<WorkerStoppedSubject, VoidObject> WorkerStopped => this.workerStoppedSubject;
public IEvent WorkerStopped => this.workerStoppedSubject;

public ValueTask EnqueueAsync(
IMessageContext context,
Expand Down Expand Up @@ -98,7 +97,7 @@ public Task StartAsync()

public async Task StopAsync()
{
await this.workerStoppingSubject.NotifyAsync(VoidObject.Value);
await this.workerStoppingSubject.FireAsync();

this.messagesBuffer.Writer.TryComplete();

Expand All @@ -109,7 +108,7 @@ public async Task StopAsync()

await this.backgroundTask.ConfigureAwait(false);

await this.workerStoppedSubject.NotifyAsync(VoidObject.Value);
await this.workerStoppedSubject.FireAsync();
}

public void Dispose()
Expand Down
Loading

0 comments on commit bf2da76

Please sign in to comment.