Skip to content

Commit

Permalink
Add caching to DefaultTypeResolver (issue #437)
Browse files Browse the repository at this point in the history
  • Loading branch information
adambajguz committed Oct 8, 2023
1 parent d662b9d commit d5ae9dd
Showing 1 changed file with 11 additions and 5 deletions.
16 changes: 11 additions & 5 deletions src/KafkaFlow.Serializer/DefaultTypeResolver.cs
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
namespace KafkaFlow
{
using System;
using System.Collections.Concurrent;

internal class DefaultTypeResolver : IMessageTypeResolver
{
private const string MessageType = "Message-Type";

private static readonly ConcurrentDictionary<string, Type> ConsumeTypeCache = new(StringComparer.Ordinal);
private static readonly ConcurrentDictionary<Type, string> ProduceTypeCache = new();

public Type OnConsume(IMessageContext context)
{
var typeName = context.Headers.GetString(MessageType);

return typeName is null ?
null :
Type.GetType(typeName);
null :
ConsumeTypeCache.GetOrAdd(typeName, Type.GetType);
}

public void OnProduce(IMessageContext context)
Expand All @@ -24,9 +28,11 @@ public void OnProduce(IMessageContext context)

var messageType = context.Message.Value.GetType();

context.Headers.SetString(
MessageType,
$"{messageType.FullName}, {messageType.Assembly.GetName().Name}");
string messageTypeName = ProduceTypeCache.GetOrAdd(
messageType,
static messageType => $"{messageType.FullName}, {messageType.Assembly.GetName().Name}");

context.Headers.SetString(MessageType, messageTypeName);
}
}
}

0 comments on commit d5ae9dd

Please sign in to comment.