Skip to content

Commit

Permalink
refactor: upgrade async and remove sync events
Browse files Browse the repository at this point in the history
  • Loading branch information
ailtonguitar authored and simaoribeiro committed Nov 23, 2023
1 parent a02dfe6 commit 6dc2706
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 84 deletions.
18 changes: 2 additions & 16 deletions src/KafkaFlow.Abstractions/IEvent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,12 @@
/// </summary>
public interface IEvent
{
/// <summary>
/// Subscribes to the async event.
/// </summary>
/// <param name="handler">The handler to be called when the async event is fired.</param>
/// <returns>Event subscription reference</returns>
IEventSubscription Subscribe(Func<Task> handler);

/// <summary>
/// Subscribes to the event.
/// </summary>
/// <param name="handler">The handler to be called when the event is fired.</param>
/// <returns>Event subscription reference</returns>
IEventSubscription Subscribe(Action handler);
IEventSubscription Subscribe(Func<Task> handler);
}

/// <summary>
Expand All @@ -29,18 +22,11 @@ public interface IEvent
/// <typeparam name="TArg">The argument expected by the event.</typeparam>
public interface IEvent<out TArg>
{
/// <summary>
/// Subscribes to the async event.
/// </summary>
/// <param name="handler">The handler to be called when the async event is fired.</param>
/// <returns>Event subscription reference</returns>
IEventSubscription Subscribe(Func<TArg, Task> handler);

/// <summary>
/// Subscribes to the event.
/// </summary>
/// <param name="handler">The handler to be called when the event is fired.</param>
/// <returns>Event subscription reference</returns>
IEventSubscription Subscribe(Action<TArg> handler);
IEventSubscription Subscribe(Func<TArg, Task> handler);
}
}

Check warning on line 32 in src/KafkaFlow.Abstractions/IEvent.cs

View workflow job for this annotation

GitHub Actions / Deploy to GitHub Pages

12 changes: 0 additions & 12 deletions src/KafkaFlow.Admin.Dashboard/Properties/launchSettings.json

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ internal static class OpenTelemetryConsumerEventsHandler
private const string AttributeMessagingKafkaSourcePartition = "messaging.kafka.source.partition";
private static readonly TextMapPropagator Propagator = Propagators.DefaultTextMapPropagator;

public static void OnConsumeStarted(IMessageContext context)
public static Task OnConsumeStarted(IMessageContext context)
{
try
{
Expand Down Expand Up @@ -50,17 +50,21 @@ public static void OnConsumeStarted(IMessageContext context)
{
// If there is any failure, do not propagate the context.
}

return Task.CompletedTask;
}

public static void OnConsumeCompleted(IMessageContext context)
public static Task OnConsumeCompleted(IMessageContext context)
{
if (context.Items.TryGetValue(ActivitySourceAccessor.ActivityString, out var value) && value is Activity activity)
{
activity?.Dispose();
}

return Task.CompletedTask;
}

public static void OnConsumeError(IMessageContext context, Exception ex)
public static Task OnConsumeError(IMessageContext context, Exception ex)
{
if (context.Items.TryGetValue(ActivitySourceAccessor.ActivityString, out var value) && value is Activity activity)
{
Expand All @@ -69,6 +73,8 @@ public static void OnConsumeError(IMessageContext context, Exception ex)

activity?.Dispose();
}

return Task.CompletedTask;
}

private static IEnumerable<string> ExtractTraceContextIntoBasicProperties(IMessageContext context, string key)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ internal static class OpenTelemetryProducerEventsHandler
private const string AttributeMessagingKafkaDestinationPartition = "messaging.kafka.destination.partition";
private static readonly TextMapPropagator Propagator = Propagators.DefaultTextMapPropagator;

public static void OnProducerStarted(IMessageContext context)
public static Task OnProducerStarted(IMessageContext context)
{
try
{
Expand Down Expand Up @@ -60,17 +60,21 @@ public static void OnProducerStarted(IMessageContext context)
{
// If there is any failure, do not propagate the context.
}

return Task.CompletedTask;
}

public static void OnProducerCompleted(IMessageContext context)
public static Task OnProducerCompleted(IMessageContext context)
{
if (context.Items.TryGetValue(ActivitySourceAccessor.ActivityString, out var value) && value is Activity activity)
{
activity?.Dispose();
}

return Task.CompletedTask;
}

public static void OnProducerError(IMessageContext context, Exception ex)
public static Task OnProducerError(IMessageContext context, Exception ex)
{
if (context.Items.TryGetValue(ActivitySourceAccessor.ActivityString, out var value) && value is Activity activity)
{
Expand All @@ -79,6 +83,8 @@ public static void OnProducerError(IMessageContext context, Exception ex)

activity?.Dispose();
}

return Task.CompletedTask;
}

private static void InjectTraceContextIntoBasicProperties(IMessageContext context, string key, string value)
Expand Down
2 changes: 1 addition & 1 deletion src/KafkaFlow/Consumers/ConsumerWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ private async Task ProcessMessageAsync(ConsumeResult<byte[], byte[]> message, Ca
message.TopicPartitionOffset,
() => scope.Dispose());

this.globalEvents.FireMessageConsumeStarted(new MessageEventContext(context));
await this.globalEvents.FireMessageConsumeStartedAsync(new MessageEventContext(context));

await this.middlewareExecutor
.Execute(scope.Resolver, context, _ => Task.CompletedTask)
Expand Down
71 changes: 25 additions & 46 deletions src/KafkaFlow/Event.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ internal class Event<TArg> : IEvent<TArg>
{
private readonly ILogHandler logHandler;

private readonly List<Func<TArg, Task>> asyncHandlers = new();
private readonly List<Action<TArg>> syncHandlers = new();
private readonly List<Func<TArg, Task>> handlers = new();

public Event(ILogHandler logHandler)
{
Expand All @@ -19,61 +18,43 @@ public Event(ILogHandler logHandler)

public IEventSubscription Subscribe(Func<TArg, Task> handler)
{
return this.Subscribe(this.asyncHandlers, handler);
}

public IEventSubscription Subscribe(Action<TArg> handler)
{
return this.Subscribe(this.syncHandlers, handler);
}
if (handler is null)
{
throw new ArgumentNullException("Handler cannot be null");
}

internal Task FireAsync(TArg arg)
{
return this.InvokeAsyncHandlers(arg);
}
if (!this.handlers.Contains(handler))
{
this.handlers.Add(handler);
}

internal void Fire(TArg arg)
{
this.InvokeSyncHandlers(arg);
return new EventSubscription(() => this.handlers.Remove(handler));
}

private IEventSubscription Subscribe<T>(List<T> handlersList, T handler)
internal Task FireAsync(TArg arg)
{
if (!handlersList.Contains(handler))
{
handlersList.Add(handler);
}
var tasks = this.handlers
.Select(handler => this.ProcessHandler(handler, arg));

return new EventSubscription(() => handlersList.Remove(handler));
return Task.WhenAll(tasks);
}

private async Task InvokeAsyncHandlers(TArg arg)
private Task ProcessHandler(Func<TArg, Task> handler, TArg arg)
{
foreach (var handler in this.asyncHandlers.Where(h => h is not null))
try
{
try
{
await handler.Invoke(arg);
}
catch (Exception ex)
return handler.Invoke(arg).ContinueWith(t =>
{
this.LogHandlerOnError(ex);
}
if (t.IsFaulted)
{
this.LogHandlerOnError(t.Exception);
}
});
}
}

private void InvokeSyncHandlers(TArg arg)
{
foreach (var handler in this.syncHandlers.Where(h => h is not null))
catch (Exception ex)
{
try
{
handler.Invoke(arg);
}
catch (Exception ex)
{
this.LogHandlerOnError(ex);
}
this.LogHandlerOnError(ex);
return Task.CompletedTask;
}
}

Expand All @@ -94,8 +75,6 @@ public Event(ILogHandler logHandler)

public IEventSubscription Subscribe(Func<Task> handler) => this.evt.Subscribe(_ => handler.Invoke());

public IEventSubscription Subscribe(Action handler) => this.evt.Subscribe(_ => handler.Invoke());

internal Task FireAsync() => this.evt.FireAsync(null);
}
}
3 changes: 0 additions & 3 deletions src/KafkaFlow/GlobalEvents.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,6 @@ public GlobalEvents(ILogHandler log)
public Task FireMessageConsumeStartedAsync(MessageEventContext context)
=> this.messageConsumeStarted.FireAsync(context);

public void FireMessageConsumeStarted(MessageEventContext context)
=> this.messageConsumeStarted.Fire(context);

public Task FireMessageConsumeErrorAsync(MessageErrorEventContext context)
=> this.messageConsumeError.FireAsync(context);

Expand Down

0 comments on commit 6dc2706

Please sign in to comment.