From 9ac6830a2955643c7ae6cd9cedc1fa98fce59ac6 Mon Sep 17 00:00:00 2001 From: Sascha Kiefer Date: Wed, 17 Apr 2024 11:39:29 +0200 Subject: [PATCH] feat: add support for csharp namespace in protobuf schemas --- .../ConfluentProtobufTypeNameResolver.cs | 12 +++- .../ConfluentProtobufTypeNameResolverTests.cs | 55 +++++++++++++++++++ 2 files changed, 64 insertions(+), 3 deletions(-) diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConfluentProtobufTypeNameResolver.cs b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConfluentProtobufTypeNameResolver.cs index 42237baf2..32b6b7e74 100644 --- a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConfluentProtobufTypeNameResolver.cs +++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConfluentProtobufTypeNameResolver.cs @@ -20,7 +20,13 @@ public async Task ResolveAsync(int id) var schemaString = (await _client.GetSchemaAsync(id, "serialized")).SchemaString; var protoFields = FileDescriptorProto.Parser.ParseFrom(ByteString.FromBase64(schemaString)); - - return $"{protoFields.Package}.{protoFields.MessageType.FirstOrDefault()?.Name}"; + var messageType = protoFields.MessageType.FirstOrDefault()?.Name; + var ns = protoFields.Options?.HasCsharpNamespace == true + ? protoFields.Options.CsharpNamespace + : protoFields.Package; + return BuildTypeName(messageType, ns); } -} + + private static string BuildTypeName(string messageType, string ns) + => string.IsNullOrEmpty(ns) ? messageType ?? string.Empty : $"{ns}.{messageType}"; +} \ No newline at end of file diff --git a/tests/KafkaFlow.UnitTests/Middlewares/Serialization/ConfluentProtobufTypeNameResolverTests.cs b/tests/KafkaFlow.UnitTests/Middlewares/Serialization/ConfluentProtobufTypeNameResolverTests.cs index 2541043b2..4b608a95d 100644 --- a/tests/KafkaFlow.UnitTests/Middlewares/Serialization/ConfluentProtobufTypeNameResolverTests.cs +++ b/tests/KafkaFlow.UnitTests/Middlewares/Serialization/ConfluentProtobufTypeNameResolverTests.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using System.Threading.Tasks; using Confluent.SchemaRegistry; @@ -44,4 +45,58 @@ public async Task ResolveAsync_ValidProtobufObject_ReturnsProtoFields() // Assert protoFields.Should().NotBeNull(); } + + [TestMethod] + public async Task ResolveAsync_SchemaWithPackageOnly_ReturnsTypeNameWithPackageNamespace() + { + // Arrange + // below schema-string is base64 encoded protobuf schema of 'syntax = \"proto3\";\npackage kafkaflow.test;\n\nmessage Person {\n string name = 1;\n}\n' + var schemaString = "CgdkZWZhdWx0Eg5rYWZrYWZsb3cudGVzdCIUCgZQZXJzb24SCgoEbmFtZRgBKAliBnByb3RvMw=="; + var schemaId = 420; + + _schemaRegistryClient.Setup(client => client.GetSchemaAsync(schemaId, "serialized")) + .ReturnsAsync(new RegisteredSchema("test", 1, schemaId, schemaString, SchemaType.Protobuf, new List())); + + // Act + var actual = await _schemaRegistryTypeResolver.ResolveAsync(schemaId); + + // Assert + actual.Should().Be("kafkaflow.test.Person"); + } + + [TestMethod] + public async Task ResolveAsync_SchemaWithCsharpNamespace_ReturnsTypeNameWithCsharpNamespace() + { + // Arrange + // below schema-string is base64 encoded protobuf schema of 'syntax = \"proto3\";\npackage kafkaflow.test;\n\nmessage Person {\n string name = 1;\n}\n' + var schemaString = "CgdkZWZhdWx0Eg5rYWZrYWZsb3cudGVzdCIUCgZQZXJzb24SCgoEbmFtZRgBKAlCEaoCDkthZmthRmxvdy5UZXN0YgZwcm90bzM="; + var schemaId = 420; + + _schemaRegistryClient.Setup(client => client.GetSchemaAsync(schemaId, "serialized")) + .ReturnsAsync(new RegisteredSchema("test", 1, schemaId, schemaString, SchemaType.Protobuf, new List())); + + // Act + var actual = await _schemaRegistryTypeResolver.ResolveAsync(schemaId); + + // Assert + Assert.AreEqual("KafkaFlow.Test.Person", actual); + } + + [TestMethod] + public async Task ResolveAsync_SchemaWithoutPackageOrNamespace_ReturnsTypeNameWithoutNamespace() + { + // Arrange + // below schema-string is base64 encoded protobuf schema of 'syntax = \"proto3\";\n\nmessage Person {\n string name = 1;\n}\n' + var schemaString = "CgdkZWZhdWx0IhQKBlBlcnNvbhIKCgRuYW1lGAEoCWIGcHJvdG8z"; + var schemaId = 420; + + _schemaRegistryClient.Setup(client => client.GetSchemaAsync(schemaId, "serialized")) + .ReturnsAsync(new RegisteredSchema("test", 1, schemaId, schemaString, SchemaType.Protobuf, new List())); + + // Act + var actual = await _schemaRegistryTypeResolver.ResolveAsync(schemaId); + + // Assert + Assert.AreEqual("Person", actual); + } }