Skip to content

Commit

Permalink
feat: add event notification feature
Browse files Browse the repository at this point in the history
  • Loading branch information
ailtonguitar committed Oct 3, 2023
1 parent 805f0f2 commit 446b720
Show file tree
Hide file tree
Showing 19 changed files with 359 additions and 198 deletions.
14 changes: 6 additions & 8 deletions src/KafkaFlow.Abstractions/Consumers/IWorker.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
namespace KafkaFlow
{
using System;
using KafkaFlow.Observer;

/// <summary>
/// Represents the interface of a internal worker
Expand All @@ -14,19 +13,18 @@ public interface IWorker
int Id { get; }

/// <summary>
/// This handler is called immediately after a worker completes the consumption of a message
/// Gets the subject for worker stopping events where observers can subscribe to receive notifications.
/// </summary>
/// <param name="handler"><see cref="Action"/> to be executed</param>
void OnTaskCompleted(Action handler);
IEvent WorkerStopping { get; }

/// <summary>
/// Gets the subject for worker stopping events where observers can subscribe to receive notifications.
/// Gets the subject for worker stopped events where observers can subscribe to receive notifications.
/// </summary>
ISubject<WorkerStoppingSubject, VoidObject> WorkerStopping { get; }
IEvent WorkerStopped { get; }

/// <summary>
/// Gets the subject for worker stopped events where observers can subscribe to receive notifications.
/// Gets the subject for worker consumption completed events where observers can subscribe to receive notifications.
/// </summary>
ISubject<WorkerStoppedSubject, VoidObject> WorkerStopped { get; }
IEvent<IMessageContext> TaskCompleted { get; }
}
}
19 changes: 0 additions & 19 deletions src/KafkaFlow.Abstractions/Consumers/WorkerStoppedSubject.cs

This file was deleted.

19 changes: 0 additions & 19 deletions src/KafkaFlow.Abstractions/Consumers/WorkerStoppingSubject.cs

This file was deleted.

32 changes: 32 additions & 0 deletions src/KafkaFlow.Abstractions/IEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
namespace KafkaFlow
{
using System;
using System.Threading.Tasks;

/// <summary>
/// Represents an Event to be subscribed.
/// </summary>
public interface IEvent
{
/// <summary>
/// Subscribes to the event.
/// </summary>
/// <param name="handler">The handler to be called when the event is fired.</param>
/// <returns>Event subscription reference</returns>
IEventSubscription Subscribe(Func<Task> handler);
}

/// <summary>
/// Represents an Event to be subscribed.
/// </summary>
/// <typeparam name="TArg">The argument expected by the event.</typeparam>
public interface IEvent<TArg>
{
/// <summary>
/// Subscribes to the event.
/// </summary>
/// <param name="handler">The handler to be called when the event is fired.</param>
/// <returns>Event subscription reference</returns>
IEventSubscription Subscribe(Func<TArg, Task> handler);
}
}
15 changes: 15 additions & 0 deletions src/KafkaFlow.Abstractions/IEventSubscription.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
using System;

Check warning on line 1 in src/KafkaFlow.Abstractions/IEventSubscription.cs

View workflow job for this annotation

GitHub Actions / Test deployment

Using directive should appear within a namespace declaration

Check warning on line 1 in src/KafkaFlow.Abstractions/IEventSubscription.cs

View workflow job for this annotation

GitHub Actions / build

Using directive should appear within a namespace declaration [/home/runner/work/kafkaflow/kafkaflow/src/KafkaFlow.Abstractions/KafkaFlow.Abstractions.csproj]

namespace KafkaFlow
{
/// <summary>
/// Represents an Event subscription.
/// </summary>
public interface IEventSubscription
{
/// <summary>
/// Cancels the subscription to the event.
/// </summary>
void Cancel();
}
}
17 changes: 0 additions & 17 deletions src/KafkaFlow.Abstractions/Observer/ISubject.cs

This file was deleted.

18 changes: 0 additions & 18 deletions src/KafkaFlow.Abstractions/Observer/ISubjectObserver.cs

This file was deleted.

53 changes: 0 additions & 53 deletions src/KafkaFlow.Abstractions/Observer/Subject.cs

This file was deleted.

17 changes: 0 additions & 17 deletions src/KafkaFlow.Abstractions/VoidObject.cs

This file was deleted.

10 changes: 2 additions & 8 deletions src/KafkaFlow.BatchConsume/BatchConsumeMiddleware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,8 @@
using System.Threading.Tasks;
using KafkaFlow.Configuration;
using KafkaFlow.Consumers;
using KafkaFlow.Observer;

internal class BatchConsumeMiddleware
: IMessageMiddleware,
ISubjectObserver<WorkerStoppedSubject, VoidObject>,
IDisposable
internal class BatchConsumeMiddleware : IMessageMiddleware, IDisposable
{
private readonly SemaphoreSlim dispatchSemaphore = new(1, 1);

Expand All @@ -37,7 +33,7 @@ public BatchConsumeMiddleware(
this.batch = new(batchSize);
this.consumerConfiguration = middlewareContext.Consumer.Configuration;

middlewareContext.Worker.WorkerStopped.Subscribe(this);
middlewareContext.Worker.WorkerStopped.Subscribe(() => this.TriggerDispatchAndWaitAsync());
}

public async Task Invoke(IMessageContext context, MiddlewareDelegate next)
Expand Down Expand Up @@ -67,8 +63,6 @@ public async Task Invoke(IMessageContext context, MiddlewareDelegate next)
}
}

public async Task OnNotification(WorkerStoppedSubject subject, VoidObject arg) => await this.TriggerDispatchAndWaitAsync();

public void Dispose()
{
this.dispatchTask?.Dispose();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void Setup()

workerMock
.SetupGet(x => x.WorkerStopped)
.Returns(new WorkerStoppedSubject(this.logHandlerMock.Object));
.Returns(new Event(this.logHandlerMock.Object));

consumerConfigurationMock
.SetupGet(x => x.AutoMessageCompletion)
Expand Down
Loading

0 comments on commit 446b720

Please sign in to comment.