diff --git a/.editorconfig b/.editorconfig index eac0d4922..40d253ffb 100644 --- a/.editorconfig +++ b/.editorconfig @@ -8,6 +8,9 @@ trim_trailing_whitespace = true insert_final_newline = true end_of_line = lf +[*.json] +indent_size = 2 + [src/NATS.Client.Core/NaCl/**.cs] generated_code = true diff --git a/NATS.Client.sln b/NATS.Client.sln index b98aea563..5e14b18f2 100644 --- a/NATS.Client.sln +++ b/NATS.Client.sln @@ -6,6 +6,10 @@ MinimumVisualStudioVersion = 10.0.40219.1 Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{4827B3EC-73D8-436D-AE2A-5E29AC95FD0C}" EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "sandbox", "sandbox", "{95A69671-16CA-4133-981C-CC381B7AAA30}" + ProjectSection(SolutionItems) = preProject + sandbox\shell.nix = sandbox\shell.nix + sandbox\Directory.Build.props = sandbox\Directory.Build.props + EndProjectSection EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "tests", "tests", "{C526E8AB-739A-48D7-8FC4-048978C9B650}" EndProject @@ -101,6 +105,10 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NATS.Net.DocsExamples", "te EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NATS.Client.Hosting.Tests", "tests\NATS.Client.Hosting.Tests\NATS.Client.Hosting.Tests.csproj", "{766C2486-34C3-4DD1-B31C-540C17C044B0}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NATS.Net.OpenTelemetry", "src\NATS.Net.OpenTelemetry\NATS.Net.OpenTelemetry.csproj", "{EC6A3E38-3906-4720-921D-BA1E439B4DE7}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Example.Core", "sandbox\Example.Core\Example.Core.csproj", "{8BFBB88D-7596-44B9-BF67-5402A662F244}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -267,6 +275,14 @@ Global {766C2486-34C3-4DD1-B31C-540C17C044B0}.Debug|Any CPU.Build.0 = Debug|Any CPU {766C2486-34C3-4DD1-B31C-540C17C044B0}.Release|Any CPU.ActiveCfg = Release|Any CPU {766C2486-34C3-4DD1-B31C-540C17C044B0}.Release|Any CPU.Build.0 = Release|Any CPU + {EC6A3E38-3906-4720-921D-BA1E439B4DE7}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {EC6A3E38-3906-4720-921D-BA1E439B4DE7}.Debug|Any CPU.Build.0 = Debug|Any CPU + {EC6A3E38-3906-4720-921D-BA1E439B4DE7}.Release|Any CPU.ActiveCfg = Release|Any CPU + {EC6A3E38-3906-4720-921D-BA1E439B4DE7}.Release|Any CPU.Build.0 = Release|Any CPU + {8BFBB88D-7596-44B9-BF67-5402A662F244}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {8BFBB88D-7596-44B9-BF67-5402A662F244}.Debug|Any CPU.Build.0 = Debug|Any CPU + {8BFBB88D-7596-44B9-BF67-5402A662F244}.Release|Any CPU.ActiveCfg = Release|Any CPU + {8BFBB88D-7596-44B9-BF67-5402A662F244}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -313,6 +329,8 @@ Global {6A7B9B9F-BFA4-4A6D-9006-0AAF597FC6DD} = {4827B3EC-73D8-436D-AE2A-5E29AC95FD0C} {389C05EB-A0B3-4097-8C1F-4D55818438CC} = {C526E8AB-739A-48D7-8FC4-048978C9B650} {766C2486-34C3-4DD1-B31C-540C17C044B0} = {C526E8AB-739A-48D7-8FC4-048978C9B650} + {EC6A3E38-3906-4720-921D-BA1E439B4DE7} = {4827B3EC-73D8-436D-AE2A-5E29AC95FD0C} + {8BFBB88D-7596-44B9-BF67-5402A662F244} = {95A69671-16CA-4133-981C-CC381B7AAA30} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {8CBB7278-D093-448E-B3DE-B5991209A1AA} diff --git a/sandbox/BlazorWasm/Client/Properties/launchSettings.json b/sandbox/BlazorWasm/Client/Properties/launchSettings.json index 5173d8631..09b977cf2 100644 --- a/sandbox/BlazorWasm/Client/Properties/launchSettings.json +++ b/sandbox/BlazorWasm/Client/Properties/launchSettings.json @@ -1,4 +1,5 @@ { + "$schema": "http://json.schemastore.org/launchsettings.json", "profiles": { "BlazorWasm": { "commandName": "Project", @@ -7,7 +8,8 @@ "inspectUri": "{wsProtocol}://{url.hostname}:{url.port}/_framework/debug/ws-proxy?browser={browserInspectUri}", "applicationUrl": "http://localhost:5000", "environmentVariables": { - "ASPNETCORE_ENVIRONMENT": "Development" + "ASPNETCORE_ENVIRONMENT": "Development", + "OTEL_SERVICE_NAME": "BlazorWasm" } } } diff --git a/sandbox/BlazorWasm/Server/BlazorWasm.Server.csproj b/sandbox/BlazorWasm/Server/BlazorWasm.Server.csproj index 46ebcd35b..f9ad84c00 100644 --- a/sandbox/BlazorWasm/Server/BlazorWasm.Server.csproj +++ b/sandbox/BlazorWasm/Server/BlazorWasm.Server.csproj @@ -10,9 +10,12 @@ + + + diff --git a/sandbox/BlazorWasm/Server/Program.cs b/sandbox/BlazorWasm/Server/Program.cs index 99572340d..82821a89d 100644 --- a/sandbox/BlazorWasm/Server/Program.cs +++ b/sandbox/BlazorWasm/Server/Program.cs @@ -1,10 +1,19 @@ using BlazorWasm.Server.NatsServices; -using NATS.Client.Core; +using Example.Core; using NATS.Client.Hosting; using NATS.Client.Serializers.Json; +using OpenTelemetry.Trace; + +TracingSetup.SetSandboxEnv(); var builder = WebApplication.CreateBuilder(args); +builder.Services.AddOpenTelemetry() + .WithTracing(o => o + .AddAspNetCoreInstrumentation() + .AddNatsInstrumentation() + .AddOtlpExporter()); + // Add services to the container. builder.Services.AddControllersWithViews(); builder.Services.AddRazorPages(); diff --git a/sandbox/BlazorWasm/Server/Properties/launchSettings.json b/sandbox/BlazorWasm/Server/Properties/launchSettings.json index c15d742e7..6fc5313e6 100644 --- a/sandbox/BlazorWasm/Server/Properties/launchSettings.json +++ b/sandbox/BlazorWasm/Server/Properties/launchSettings.json @@ -1,14 +1,16 @@ { - "profiles": { - "BlazorWasm.Server": { - "commandName": "Project", - "dotnetRunMessages": true, - "launchBrowser": true, - "inspectUri": "{wsProtocol}://{url.hostname}:{url.port}/_framework/debug/ws-proxy?browser={browserInspectUri}", - "applicationUrl": "http://localhost:5000", - "environmentVariables": { - "ASPNETCORE_ENVIRONMENT": "Development" - } + "$schema": "http://json.schemastore.org/launchsettings.json", + "profiles": { + "BlazorWasm.Server": { + "commandName": "Project", + "dotnetRunMessages": true, + "launchBrowser": true, + "inspectUri": "{wsProtocol}://{url.hostname}:{url.port}/_framework/debug/ws-proxy?browser={browserInspectUri}", + "applicationUrl": "http://localhost:5000", + "environmentVariables": { + "ASPNETCORE_ENVIRONMENT": "Development", + "OTEL_SERVICE_NAME": "BlazorWasm.Server" } } } +} diff --git a/sandbox/ConsoleApp/Properties/launchSettings.json b/sandbox/ConsoleApp/Properties/launchSettings.json new file mode 100644 index 000000000..788798196 --- /dev/null +++ b/sandbox/ConsoleApp/Properties/launchSettings.json @@ -0,0 +1,13 @@ +{ + "$schema": "http://json.schemastore.org/launchsettings.json", + "profiles": { + "ConsoleApp": { + "commandName": "Project", + "dotnetRunMessages": true, + "environmentVariables": { + "ASPNETCORE_ENVIRONMENT": "Development", + "OTEL_SERVICE_NAME": "ConsoleApp" + } + } + } +} diff --git a/sandbox/Directory.Build.props b/sandbox/Directory.Build.props new file mode 100644 index 000000000..1845877f9 --- /dev/null +++ b/sandbox/Directory.Build.props @@ -0,0 +1,12 @@ + + + + + + + + + + + + diff --git a/sandbox/Example.Core.PublishHeaders/Example.Core.PublishHeaders.csproj b/sandbox/Example.Core.PublishHeaders/Example.Core.PublishHeaders.csproj index 8e64548a3..5721cc952 100644 --- a/sandbox/Example.Core.PublishHeaders/Example.Core.PublishHeaders.csproj +++ b/sandbox/Example.Core.PublishHeaders/Example.Core.PublishHeaders.csproj @@ -11,6 +11,7 @@ + diff --git a/sandbox/Example.Core.PublishHeaders/Program.cs b/sandbox/Example.Core.PublishHeaders/Program.cs index 9b42b2faf..eb7aa4f58 100644 --- a/sandbox/Example.Core.PublishHeaders/Program.cs +++ b/sandbox/Example.Core.PublishHeaders/Program.cs @@ -1,8 +1,12 @@ // > nats sub bar.* + +using Example.Core; using Microsoft.Extensions.Logging; using NATS.Client.Core; using NATS.Client.Serializers.Json; +using var tracer = TracingSetup.RunSandboxTracing(); + var subject = "bar.xyz"; var options = NatsOpts.Default with { diff --git a/sandbox/Example.Core.PublishHeaders/Properties/launchSettings.json b/sandbox/Example.Core.PublishHeaders/Properties/launchSettings.json new file mode 100644 index 000000000..fa436250a --- /dev/null +++ b/sandbox/Example.Core.PublishHeaders/Properties/launchSettings.json @@ -0,0 +1,13 @@ +{ + "$schema": "http://json.schemastore.org/launchsettings.json", + "profiles": { + "Example.Core.PublishHeaders": { + "commandName": "Project", + "dotnetRunMessages": true, + "environmentVariables": { + "ASPNETCORE_ENVIRONMENT": "Development", + "OTEL_SERVICE_NAME": "Example.Core.PublishHeaders" + } + } + } +} diff --git a/sandbox/Example.Core.PublishModel/Example.Core.PublishModel.csproj b/sandbox/Example.Core.PublishModel/Example.Core.PublishModel.csproj index 0da21d55d..69905a6f9 100644 --- a/sandbox/Example.Core.PublishModel/Example.Core.PublishModel.csproj +++ b/sandbox/Example.Core.PublishModel/Example.Core.PublishModel.csproj @@ -11,6 +11,9 @@ + + + diff --git a/sandbox/Example.Core.PublishModel/Program.cs b/sandbox/Example.Core.PublishModel/Program.cs index d9ca83cbf..c780ccc5a 100644 --- a/sandbox/Example.Core.PublishModel/Program.cs +++ b/sandbox/Example.Core.PublishModel/Program.cs @@ -1,8 +1,12 @@ // > nats sub bar.* + +using Example.Core; using Microsoft.Extensions.Logging; using NATS.Client.Core; using NATS.Client.Serializers.Json; +using var tracer = TracingSetup.RunSandboxTracing(); + var subject = "bar.xyz"; var options = NatsOpts.Default with { @@ -13,6 +17,7 @@ Print("[CON] Connecting...\n"); await using var connection = new NatsConnection(options); +await connection.ConnectAsync(); for (var i = 0; i < 10; i++) { diff --git a/sandbox/Example.Core.PublishModel/Properties/launchSettings.json b/sandbox/Example.Core.PublishModel/Properties/launchSettings.json new file mode 100644 index 000000000..c8dff80e6 --- /dev/null +++ b/sandbox/Example.Core.PublishModel/Properties/launchSettings.json @@ -0,0 +1,13 @@ +{ + "$schema": "http://json.schemastore.org/launchsettings.json", + "profiles": { + "Example.Core.PublishModel": { + "commandName": "Project", + "dotnetRunMessages": true, + "environmentVariables": { + "ASPNETCORE_ENVIRONMENT": "Development", + "OTEL_SERVICE_NAME": "Example.Core.PublishModel" + } + } + } +} diff --git a/sandbox/Example.Core.SubscribeHeaders/Example.Core.SubscribeHeaders.csproj b/sandbox/Example.Core.SubscribeHeaders/Example.Core.SubscribeHeaders.csproj index 0da21d55d..beda6c3bb 100644 --- a/sandbox/Example.Core.SubscribeHeaders/Example.Core.SubscribeHeaders.csproj +++ b/sandbox/Example.Core.SubscribeHeaders/Example.Core.SubscribeHeaders.csproj @@ -11,6 +11,7 @@ + diff --git a/sandbox/Example.Core.SubscribeHeaders/Program.cs b/sandbox/Example.Core.SubscribeHeaders/Program.cs index 3994f1844..46d549aa8 100644 --- a/sandbox/Example.Core.SubscribeHeaders/Program.cs +++ b/sandbox/Example.Core.SubscribeHeaders/Program.cs @@ -1,9 +1,12 @@ // > nats pub bar.xyz --count=10 "my_message_{{ Count }}" -H X-Foo:Baz +using Example.Core; using Microsoft.Extensions.Logging; using NATS.Client.Core; using NATS.Client.Serializers.Json; +using var tracer = TracingSetup.RunSandboxTracing(); + var subject = "bar.*"; var options = NatsOpts.Default with { diff --git a/sandbox/Example.Core.SubscribeHeaders/Properties/launchSettings.json b/sandbox/Example.Core.SubscribeHeaders/Properties/launchSettings.json new file mode 100644 index 000000000..f2387fdc5 --- /dev/null +++ b/sandbox/Example.Core.SubscribeHeaders/Properties/launchSettings.json @@ -0,0 +1,13 @@ +{ + "$schema": "http://json.schemastore.org/launchsettings.json", + "profiles": { + "Example.Core.SubscribeHeaders": { + "commandName": "Project", + "dotnetRunMessages": true, + "environmentVariables": { + "ASPNETCORE_ENVIRONMENT": "Development", + "OTEL_SERVICE_NAME": "Example.Core.SubscribeHeaders" + } + } + } +} diff --git a/sandbox/Example.Core.SubscribeModel/Example.Core.SubscribeModel.csproj b/sandbox/Example.Core.SubscribeModel/Example.Core.SubscribeModel.csproj index 0da21d55d..beda6c3bb 100644 --- a/sandbox/Example.Core.SubscribeModel/Example.Core.SubscribeModel.csproj +++ b/sandbox/Example.Core.SubscribeModel/Example.Core.SubscribeModel.csproj @@ -11,6 +11,7 @@ + diff --git a/sandbox/Example.Core.SubscribeModel/Program.cs b/sandbox/Example.Core.SubscribeModel/Program.cs index cdc4e5a87..b528ee168 100644 --- a/sandbox/Example.Core.SubscribeModel/Program.cs +++ b/sandbox/Example.Core.SubscribeModel/Program.cs @@ -1,7 +1,10 @@ +using Example.Core; using Microsoft.Extensions.Logging; using NATS.Client.Core; using NATS.Client.Serializers.Json; +using var tracer = TracingSetup.RunSandboxTracing(); + var subject = "bar.*"; var options = NatsOpts.Default with { @@ -17,6 +20,7 @@ await foreach (var msg in connection.SubscribeAsync(subject)) { + using var activity = msg.StartChildActivity(); Print($"[RCV] {msg.Subject}: {msg.Data}\n"); } diff --git a/sandbox/Example.Core.SubscribeModel/Properties/launchSettings.json b/sandbox/Example.Core.SubscribeModel/Properties/launchSettings.json new file mode 100644 index 000000000..ecb6ffe9d --- /dev/null +++ b/sandbox/Example.Core.SubscribeModel/Properties/launchSettings.json @@ -0,0 +1,13 @@ +{ + "$schema": "http://json.schemastore.org/launchsettings.json", + "profiles": { + "Example.Core.SubscribeModel": { + "commandName": "Project", + "dotnetRunMessages": true, + "environmentVariables": { + "ASPNETCORE_ENVIRONMENT": "Development", + "OTEL_SERVICE_NAME": "Example.Core.SubscribeModel" + } + } + } +} diff --git a/sandbox/Example.Core.SubscribeQueueGroup/Example.Core.SubscribeQueueGroup.csproj b/sandbox/Example.Core.SubscribeQueueGroup/Example.Core.SubscribeQueueGroup.csproj index f0214d692..61bbab9b8 100644 --- a/sandbox/Example.Core.SubscribeQueueGroup/Example.Core.SubscribeQueueGroup.csproj +++ b/sandbox/Example.Core.SubscribeQueueGroup/Example.Core.SubscribeQueueGroup.csproj @@ -10,6 +10,7 @@ + diff --git a/sandbox/Example.Core.SubscribeQueueGroup/Program.cs b/sandbox/Example.Core.SubscribeQueueGroup/Program.cs index 4dbbe7792..8a7226086 100644 --- a/sandbox/Example.Core.SubscribeQueueGroup/Program.cs +++ b/sandbox/Example.Core.SubscribeQueueGroup/Program.cs @@ -1,7 +1,11 @@ // > nats pub foo.xyz --count=10 "my_message_{{ Count }}" + +using Example.Core; using Microsoft.Extensions.Logging; using NATS.Client.Core; +using var tracer = TracingSetup.RunSandboxTracing(); + var subject = "foo.*"; var options = NatsOpts.Default with { LoggerFactory = LoggerFactory.Create(builder => builder.AddConsole()) }; diff --git a/sandbox/Example.Core.SubscribeQueueGroup/Properties/launchSettings.json b/sandbox/Example.Core.SubscribeQueueGroup/Properties/launchSettings.json new file mode 100644 index 000000000..49da51589 --- /dev/null +++ b/sandbox/Example.Core.SubscribeQueueGroup/Properties/launchSettings.json @@ -0,0 +1,13 @@ +{ + "$schema": "http://json.schemastore.org/launchsettings.json", + "profiles": { + "Example.Core.SubscribeQueueGroup": { + "commandName": "Project", + "dotnetRunMessages": true, + "environmentVariables": { + "ASPNETCORE_ENVIRONMENT": "Development", + "OTEL_SERVICE_NAME": "Example.Core.SubscribeQueueGroup" + } + } + } +} diff --git a/sandbox/Example.Core.SubscribeRaw/Example.Core.SubscribeRaw.csproj b/sandbox/Example.Core.SubscribeRaw/Example.Core.SubscribeRaw.csproj index b709bad71..01c4aeb48 100644 --- a/sandbox/Example.Core.SubscribeRaw/Example.Core.SubscribeRaw.csproj +++ b/sandbox/Example.Core.SubscribeRaw/Example.Core.SubscribeRaw.csproj @@ -11,6 +11,7 @@ + diff --git a/sandbox/Example.Core.SubscribeRaw/Program.cs b/sandbox/Example.Core.SubscribeRaw/Program.cs index b80832d96..0e55816fc 100644 --- a/sandbox/Example.Core.SubscribeRaw/Program.cs +++ b/sandbox/Example.Core.SubscribeRaw/Program.cs @@ -1,7 +1,10 @@ using System.Text; +using Example.Core; using Microsoft.Extensions.Logging; using NATS.Client.Core; +using var tracer = TracingSetup.RunSandboxTracing(); + var subject = "foo.*"; var options = NatsOpts.Default with { LoggerFactory = LoggerFactory.Create(builder => builder.AddConsole()) }; diff --git a/sandbox/Example.Core.SubscribeRaw/Properties/launchSettings.json b/sandbox/Example.Core.SubscribeRaw/Properties/launchSettings.json new file mode 100644 index 000000000..ef4c62ca6 --- /dev/null +++ b/sandbox/Example.Core.SubscribeRaw/Properties/launchSettings.json @@ -0,0 +1,13 @@ +{ + "$schema": "http://json.schemastore.org/launchsettings.json", + "profiles": { + "Example.Core.SubscribeRaw": { + "commandName": "Project", + "dotnetRunMessages": true, + "environmentVariables": { + "ASPNETCORE_ENVIRONMENT": "Development", + "OTEL_SERVICE_NAME": "Example.Core.SubscribeRaw" + } + } + } +} diff --git a/sandbox/Example.Core/Example.Core.csproj b/sandbox/Example.Core/Example.Core.csproj new file mode 100644 index 000000000..483f59a68 --- /dev/null +++ b/sandbox/Example.Core/Example.Core.csproj @@ -0,0 +1,13 @@ + + + + net6.0 + enable + enable + + + + + + + diff --git a/sandbox/Example.Core/TracingSetup.cs b/sandbox/Example.Core/TracingSetup.cs new file mode 100644 index 000000000..34a12c054 --- /dev/null +++ b/sandbox/Example.Core/TracingSetup.cs @@ -0,0 +1,36 @@ +using System.Reflection; +using OpenTelemetry.Resources; +using OpenTelemetry.Trace; + +namespace Example.Core; + +public static class TracingSetup +{ + public static void SetSandboxEnv() + { + var instanceId = Guid.NewGuid().ToString(); + var assemblyName = Assembly.GetEntryAssembly()!.GetName().Name; + Environment.SetEnvironmentVariable("OTEL_SERVICE_NAME", assemblyName); + Environment.SetEnvironmentVariable("OTEL_RESOURCE_ATTRIBUTES", $"service.instance.id={instanceId}"); + Environment.SetEnvironmentVariable("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:16023"); // set to an OTLP endpoint + } + + public static TracerProvider RunSandboxTracing(bool console = false, bool internalTraces = false) + { + SetSandboxEnv(); + return new TracerProviderBuilderBase() + .ConfigureResource(o => o.AddTelemetrySdk()) + .AddNatsInstrumentation(includeInternal: internalTraces) + .MaybeAddInternalSource(internalTraces) + .AddOtlpExporter() + .MaybeAddConsoleExporter(console) + .Build() + ?? throw new Exception("Tracer provider build returned null."); + } + + private static TracerProviderBuilder MaybeAddConsoleExporter(this TracerProviderBuilder builder, bool console) + => console ? builder.AddConsoleExporter() : builder; + + private static TracerProviderBuilder MaybeAddInternalSource(this TracerProviderBuilder builder, bool internalTraces) + => internalTraces ? builder.AddSource("NATS.Client.Internal") : builder; +} diff --git a/sandbox/Example.JetStream.PullConsumer/Example.JetStream.PullConsumer.csproj b/sandbox/Example.JetStream.PullConsumer/Example.JetStream.PullConsumer.csproj index 5bba973ed..fb2051cc5 100644 --- a/sandbox/Example.JetStream.PullConsumer/Example.JetStream.PullConsumer.csproj +++ b/sandbox/Example.JetStream.PullConsumer/Example.JetStream.PullConsumer.csproj @@ -10,6 +10,7 @@ + diff --git a/sandbox/Example.JetStream.PullConsumer/Program.cs b/sandbox/Example.JetStream.PullConsumer/Program.cs index 9daf531db..b08ab17a4 100644 --- a/sandbox/Example.JetStream.PullConsumer/Program.cs +++ b/sandbox/Example.JetStream.PullConsumer/Program.cs @@ -1,10 +1,13 @@ using System.Diagnostics; using System.Text; +using Example.Core; using Microsoft.Extensions.Logging; using NATS.Client.Core; using NATS.Client.JetStream; using NATS.Client.JetStream.Models; +using var tracer = TracingSetup.RunSandboxTracing(internalTraces: false); + var cts = new CancellationTokenSource(); Console.CancelKeyPress += (_, e) => @@ -82,6 +85,7 @@ void Report(int i, Stopwatch sw, string data) // NoWaitFetch is a specialized operation not available on the public interface. await foreach (var msg in ((NatsJSConsumer)consumer).FetchNoWaitAsync>(opts: fetchNoWaitOpts, cancellationToken: cts.Token)) { + using var activity = msg.StartChildActivity(); fetchMsgCount++; using (msg.Data) { @@ -120,6 +124,7 @@ void Report(int i, Stopwatch sw, string data) await consumer.RefreshAsync(cts.Token); await foreach (var msg in consumer.FetchAsync>(opts: fetchOpts, cancellationToken: cts.Token)) { + using var activity = msg.StartChildActivity(); using (msg.Data) { var message = Encoding.ASCII.GetString(msg.Data.Span); @@ -151,6 +156,7 @@ void Report(int i, Stopwatch sw, string data) var next = await consumer.NextAsync>(opts: nextOpts, cancellationToken: cts.Token); if (next is { } msg) { + using var activity = msg.StartChildActivity(); using (msg.Data) { var message = Encoding.ASCII.GetString(msg.Data.Span); @@ -183,6 +189,7 @@ void Report(int i, Stopwatch sw, string data) var consumeStop = CancellationTokenSource.CreateLinkedTokenSource(cts.Token); await foreach (var msg in consumer.ConsumeAsync>(opts: consumeOpts, cancellationToken: consumeStop.Token)) { + using var activity = msg.StartChildActivity(); using (msg.Data) { var message = Encoding.ASCII.GetString(msg.Data.Span); diff --git a/sandbox/Example.JetStream.PullConsumer/Properties/launchSettings.json b/sandbox/Example.JetStream.PullConsumer/Properties/launchSettings.json new file mode 100644 index 000000000..d6babddc7 --- /dev/null +++ b/sandbox/Example.JetStream.PullConsumer/Properties/launchSettings.json @@ -0,0 +1,13 @@ +{ + "$schema": "http://json.schemastore.org/launchsettings.json", + "profiles": { + "Example.JetStream.PullConsumer": { + "commandName": "Project", + "dotnetRunMessages": true, + "environmentVariables": { + "ASPNETCORE_ENVIRONMENT": "Development", + "OTEL_SERVICE_NAME": "Example.JetStream.PullConsumer" + } + } + } +} diff --git a/sandbox/Example.KeyValueStore.Watcher/Example.KeyValueStore.Watcher.csproj b/sandbox/Example.KeyValueStore.Watcher/Example.KeyValueStore.Watcher.csproj index afe6c77db..6c5ef1e15 100644 --- a/sandbox/Example.KeyValueStore.Watcher/Example.KeyValueStore.Watcher.csproj +++ b/sandbox/Example.KeyValueStore.Watcher/Example.KeyValueStore.Watcher.csproj @@ -10,6 +10,7 @@ + diff --git a/sandbox/Example.KeyValueStore.Watcher/Program.cs b/sandbox/Example.KeyValueStore.Watcher/Program.cs index a301a0f4e..abf857756 100644 --- a/sandbox/Example.KeyValueStore.Watcher/Program.cs +++ b/sandbox/Example.KeyValueStore.Watcher/Program.cs @@ -1,8 +1,11 @@ +using Example.Core; using Microsoft.Extensions.Logging; using NATS.Client.Core; using NATS.Client.JetStream; using NATS.Client.KeyValueStore; +using var tracer = TracingSetup.RunSandboxTracing(); + // var options = NatsOpts.Default with { LoggerFactory = new MinimumConsoleLoggerFactory(LogLevel.Error) }; // var nats = new NatsConnection(options); var nats = new NatsConnection(); diff --git a/sandbox/Example.KeyValueStore.Watcher/Properties/launchSettings.json b/sandbox/Example.KeyValueStore.Watcher/Properties/launchSettings.json new file mode 100644 index 000000000..551ffcb1a --- /dev/null +++ b/sandbox/Example.KeyValueStore.Watcher/Properties/launchSettings.json @@ -0,0 +1,13 @@ +{ + "$schema": "http://json.schemastore.org/launchsettings.json", + "profiles": { + "Example.KeyValueStore.Watcher": { + "commandName": "Project", + "dotnetRunMessages": true, + "environmentVariables": { + "ASPNETCORE_ENVIRONMENT": "Development", + "OTEL_SERVICE_NAME": "Example.KeyValueStore.Watcher" + } + } + } +} diff --git a/sandbox/Example.NativeAot/Example.NativeAot.csproj b/sandbox/Example.NativeAot/Example.NativeAot.csproj index e57f5b242..2ae76343c 100644 --- a/sandbox/Example.NativeAot/Example.NativeAot.csproj +++ b/sandbox/Example.NativeAot/Example.NativeAot.csproj @@ -12,6 +12,7 @@ + diff --git a/sandbox/Example.NativeAot/Program.cs b/sandbox/Example.NativeAot/Program.cs index 6eb00b803..70b82c65f 100644 --- a/sandbox/Example.NativeAot/Program.cs +++ b/sandbox/Example.NativeAot/Program.cs @@ -1,11 +1,14 @@ using System.Buffers; using System.Text; using System.Text.Json.Serialization; +using Example.Core; using Google.Protobuf; using NATS.Client.Core; using NATS.Client.JetStream; using NATS.Client.JetStream.Models; +using var tracer = TracingSetup.RunSandboxTracing(); + // string { // Same as not specifying a serializer. diff --git a/sandbox/Example.NativeAot/Properties/launchSettings.json b/sandbox/Example.NativeAot/Properties/launchSettings.json new file mode 100644 index 000000000..5f1c57dc7 --- /dev/null +++ b/sandbox/Example.NativeAot/Properties/launchSettings.json @@ -0,0 +1,13 @@ +{ + "$schema": "http://json.schemastore.org/launchsettings.json", + "profiles": { + "Example.NativeAot": { + "commandName": "Project", + "dotnetRunMessages": true, + "environmentVariables": { + "ASPNETCORE_ENVIRONMENT": "Development", + "OTEL_SERVICE_NAME": "Example.NativeAot" + } + } + } +} diff --git a/sandbox/Example.ObjectStore/Example.ObjectStore.csproj b/sandbox/Example.ObjectStore/Example.ObjectStore.csproj index 5212087a8..3f1e3db58 100644 --- a/sandbox/Example.ObjectStore/Example.ObjectStore.csproj +++ b/sandbox/Example.ObjectStore/Example.ObjectStore.csproj @@ -10,6 +10,7 @@ + diff --git a/sandbox/Example.ObjectStore/Program.cs b/sandbox/Example.ObjectStore/Program.cs index b9424654f..2322a7fe2 100644 --- a/sandbox/Example.ObjectStore/Program.cs +++ b/sandbox/Example.ObjectStore/Program.cs @@ -1,9 +1,12 @@ using System.Security.Cryptography; +using Example.Core; using Microsoft.Extensions.Logging; using NATS.Client.Core; using NATS.Client.JetStream; using NATS.Client.ObjectStore; +using var tracer = TracingSetup.RunSandboxTracing(); + var opts = NatsOpts.Default with { LoggerFactory = LoggerFactory.Create(builder => builder.AddConsole()) }; var nats = new NatsConnection(opts); diff --git a/sandbox/Example.ObjectStore/Properties/launchSettings.json b/sandbox/Example.ObjectStore/Properties/launchSettings.json new file mode 100644 index 000000000..83bd3f302 --- /dev/null +++ b/sandbox/Example.ObjectStore/Properties/launchSettings.json @@ -0,0 +1,13 @@ +{ + "$schema": "http://json.schemastore.org/launchsettings.json", + "profiles": { + "Example.ObjectStore": { + "commandName": "Project", + "dotnetRunMessages": true, + "environmentVariables": { + "ASPNETCORE_ENVIRONMENT": "Development", + "OTEL_SERVICE_NAME": "Example.ObjectStore" + } + } + } +} diff --git a/sandbox/Example.ProtoBufMessages/Properties/launchSettings.json b/sandbox/Example.ProtoBufMessages/Properties/launchSettings.json new file mode 100644 index 000000000..878c6f766 --- /dev/null +++ b/sandbox/Example.ProtoBufMessages/Properties/launchSettings.json @@ -0,0 +1,13 @@ +{ + "$schema": "http://json.schemastore.org/launchsettings.json", + "profiles": { + "Example.ProtoBufMessages": { + "commandName": "Project", + "dotnetRunMessages": true, + "environmentVariables": { + "ASPNETCORE_ENVIRONMENT": "Development", + "OTEL_SERVICE_NAME": "Example.ProtoBufMessages" + } + } + } +} diff --git a/sandbox/Example.Services/Example.Services.csproj b/sandbox/Example.Services/Example.Services.csproj index 628a2a3d5..b5ef5302f 100644 --- a/sandbox/Example.Services/Example.Services.csproj +++ b/sandbox/Example.Services/Example.Services.csproj @@ -10,6 +10,7 @@ + diff --git a/sandbox/Example.Services/Program.cs b/sandbox/Example.Services/Program.cs index 694ca2ff7..30f1fc627 100644 --- a/sandbox/Example.Services/Program.cs +++ b/sandbox/Example.Services/Program.cs @@ -1,8 +1,11 @@ using System.Text; +using Example.Core; using Microsoft.Extensions.Logging; using NATS.Client.Core; using NATS.Client.Services; +using var tracer = TracingSetup.RunSandboxTracing(); + var opts = NatsOpts.Default with { LoggerFactory = LoggerFactory.Create(builder => builder.AddConsole()) }; var nats = new NatsConnection(opts); diff --git a/sandbox/Example.Services/Properties/launchSettings.json b/sandbox/Example.Services/Properties/launchSettings.json new file mode 100644 index 000000000..3b00a7080 --- /dev/null +++ b/sandbox/Example.Services/Properties/launchSettings.json @@ -0,0 +1,13 @@ +{ + "$schema": "http://json.schemastore.org/launchsettings.json", + "profiles": { + "Example.Services": { + "commandName": "Project", + "dotnetRunMessages": true, + "environmentVariables": { + "ASPNETCORE_ENVIRONMENT": "Development", + "OTEL_SERVICE_NAME": "Example.Services" + } + } + } +} diff --git a/sandbox/Example.TlsFirst/Example.TlsFirst.csproj b/sandbox/Example.TlsFirst/Example.TlsFirst.csproj index e4a0b5983..95052e21a 100644 --- a/sandbox/Example.TlsFirst/Example.TlsFirst.csproj +++ b/sandbox/Example.TlsFirst/Example.TlsFirst.csproj @@ -10,6 +10,15 @@ + + + + + + Never + Never + true + diff --git a/sandbox/Example.TlsFirst/Program.cs b/sandbox/Example.TlsFirst/Program.cs index 8a4305799..07fb694d5 100644 --- a/sandbox/Example.TlsFirst/Program.cs +++ b/sandbox/Example.TlsFirst/Program.cs @@ -1,5 +1,8 @@ +using Example.Core; using NATS.Client.Core; +using var tracer = TracingSetup.RunSandboxTracing(); + // await using var nats = new NatsConnection(); await using var nats = new NatsConnection(NatsOpts.Default with { TlsOpts = new NatsTlsOpts { Mode = TlsMode.Implicit, InsecureSkipVerify = true, } }); await nats.ConnectAsync(); diff --git a/sandbox/Example.TlsFirst/Properties/launchSettings.json b/sandbox/Example.TlsFirst/Properties/launchSettings.json new file mode 100644 index 000000000..b9e5ce940 --- /dev/null +++ b/sandbox/Example.TlsFirst/Properties/launchSettings.json @@ -0,0 +1,13 @@ +{ + "$schema": "http://json.schemastore.org/launchsettings.json", + "profiles": { + "Example.TlsFirst": { + "commandName": "Project", + "dotnetRunMessages": true, + "environmentVariables": { + "ASPNETCORE_ENVIRONMENT": "Development", + "OTEL_SERVICE_NAME": "Example.TlsFirst" + } + } + } +} diff --git a/sandbox/MicroBenchmark/Properties/launchSettings.json b/sandbox/MicroBenchmark/Properties/launchSettings.json new file mode 100644 index 000000000..1a2bf50c4 --- /dev/null +++ b/sandbox/MicroBenchmark/Properties/launchSettings.json @@ -0,0 +1,13 @@ +{ + "$schema": "http://json.schemastore.org/launchsettings.json", + "profiles": { + "Example.MicroBenchmark": { + "commandName": "Project", + "dotnetRunMessages": true, + "environmentVariables": { + "ASPNETCORE_ENVIRONMENT": "Development", + "OTEL_SERVICE_NAME": "Example.MicroBenchmark" + } + } + } +} diff --git a/sandbox/MinimumWebApp/MinimumWebApp.csproj b/sandbox/MinimumWebApp/MinimumWebApp.csproj index c3ae6efd3..fdb57f2b8 100644 --- a/sandbox/MinimumWebApp/MinimumWebApp.csproj +++ b/sandbox/MinimumWebApp/MinimumWebApp.csproj @@ -11,6 +11,12 @@ + + + + + + diff --git a/sandbox/MinimumWebApp/Program.cs b/sandbox/MinimumWebApp/Program.cs index 355baf998..4c8ee44d5 100644 --- a/sandbox/MinimumWebApp/Program.cs +++ b/sandbox/MinimumWebApp/Program.cs @@ -1,8 +1,18 @@ +using Example.Core; using NATS.Client.Core; using NATS.Client.Hosting; +using OpenTelemetry.Trace; + +TracingSetup.SetSandboxEnv(); var builder = WebApplication.CreateBuilder(args); +builder.Services.AddOpenTelemetry() + .WithTracing(o => o + .AddAspNetCoreInstrumentation() + .AddNatsInstrumentation() + .AddOtlpExporter()); + // Register NatsConnectionPool, NatsConnection, INatsCommand to ServiceCollection builder.Services.AddNats(); diff --git a/sandbox/MinimumWebApp/Properties/launchSettings.json b/sandbox/MinimumWebApp/Properties/launchSettings.json index be6911fdc..6fd292654 100644 --- a/sandbox/MinimumWebApp/Properties/launchSettings.json +++ b/sandbox/MinimumWebApp/Properties/launchSettings.json @@ -1,12 +1,5 @@ { - "iisSettings": { - "windowsAuthentication": false, - "anonymousAuthentication": true, - "iisExpress": { - "applicationUrl": "http://localhost:2196", - "sslPort": 0 - } - }, + "$schema": "http://json.schemastore.org/launchsettings.json", "profiles": { "MinimumWebApp": { "commandName": "Project", @@ -14,14 +7,8 @@ "launchBrowser": true, "applicationUrl": "http://localhost:5005", "environmentVariables": { - "ASPNETCORE_ENVIRONMENT": "Development" - } - }, - "IIS Express": { - "commandName": "IISExpress", - "launchBrowser": true, - "environmentVariables": { - "ASPNETCORE_ENVIRONMENT": "Development" + "ASPNETCORE_ENVIRONMENT": "Development", + "OTEL_SERVICE_NAME": "MinimumWebApp" } } } diff --git a/sandbox/shell.nix b/sandbox/shell.nix new file mode 100644 index 000000000..6e8f67950 --- /dev/null +++ b/sandbox/shell.nix @@ -0,0 +1,29 @@ +{ pkgs ? import {} }: +pkgs.mkShell { + name = "Nats Sandbox Shell"; + buildInputs = with pkgs; [ + natscli + nats-server + nats-top + parallel + man + ]; + + shellHook = '' + echo "Nats Sandbox shell started." + export RESPONSE='{ "Body": "Message {{Count}} @ {{Time}}" }' + alias server="nats-server --net localhost --js" + alias respond='nats reply bar.> "$RESPONSE"' + alias create-stream='nats stream create s1 --subjects="bar.>" --storage=memory --replicas=1 --defaults' + + alias aspire="dotnet run --project ./Example.Aspire.AppHost/Example.Aspire.AppHost.csproj" + alias pub="dotnet run --project ./Example.Core.PublishModel/Example.Core.PublishModel.csproj" + alias consumer="dotnet run --project ./Example.JetStream.PullConsumer/Example.JetStream.PullConsumer.csproj -- " + + echo "server - run nats server with jetstream" + echo "create-stream - run stream named s1 on subject bar.>" + echo "respond - run a responder on subject bar.>" + echo "pub - run Example.Core.PublishModel" + echo "consumer - run Example.JetStream.PullConsumer" + ''; +} diff --git a/src/NATS.Client.Core/INatsConnection.cs b/src/NATS.Client.Core/INatsConnection.cs index f7931d43e..82afa7581 100644 --- a/src/NATS.Client.Core/INatsConnection.cs +++ b/src/NATS.Client.Core/INatsConnection.cs @@ -1,5 +1,3 @@ -using System.Diagnostics.CodeAnalysis; - namespace NATS.Client.Core; public interface INatsConnection : IAsyncDisposable diff --git a/src/NATS.Client.Core/Internal/ActivityEndingMsgReader.cs b/src/NATS.Client.Core/Internal/ActivityEndingMsgReader.cs new file mode 100644 index 000000000..bd295ddd9 --- /dev/null +++ b/src/NATS.Client.Core/Internal/ActivityEndingMsgReader.cs @@ -0,0 +1,40 @@ +using System.Runtime.CompilerServices; +using System.Threading.Channels; + +namespace NATS.Client.Core.Internal; + +internal sealed class ActivityEndingMsgReader : ChannelReader> +{ + private readonly ChannelReader> _inner; + + public ActivityEndingMsgReader(ChannelReader> inner) => _inner = inner; + + public override bool CanCount => _inner.CanCount; + + public override bool CanPeek => _inner.CanPeek; + + public override int Count => _inner.Count; + + public override Task Completion => _inner.Completion; + + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public override bool TryRead(out NatsMsg item) + { + if (!_inner.TryRead(out item)) + return false; + + item.Activity?.Dispose(); + return true; + } + + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public override ValueTask WaitToReadAsync(CancellationToken cancellationToken = default) => _inner.WaitToReadAsync(cancellationToken); + + public override ValueTask> ReadAsync(CancellationToken cancellationToken = default) => _inner.ReadAsync(cancellationToken); + + public override bool TryPeek(out NatsMsg item) => _inner.TryPeek(out item); + + public override IAsyncEnumerable> ReadAllAsync(CancellationToken cancellationToken = default) => _inner.ReadAllAsync(cancellationToken); +} diff --git a/src/NATS.Client.Core/Internal/InboxSub.cs b/src/NATS.Client.Core/Internal/InboxSub.cs index 25cb73efb..0b496ee68 100644 --- a/src/NATS.Client.Core/Internal/InboxSub.cs +++ b/src/NATS.Client.Core/Internal/InboxSub.cs @@ -1,5 +1,6 @@ using System.Buffers; using System.Collections.Concurrent; +using System.Diagnostics; using System.Runtime.CompilerServices; using Microsoft.Extensions.Logging; @@ -28,7 +29,7 @@ public override ValueTask ReceiveAsync(string subject, string? replyTo, ReadOnly // Not used. Dummy implementation to keep base happy. protected override ValueTask ReceiveInternalAsync(string subject, string? replyTo, ReadOnlySequence? headersBuffer, ReadOnlySequence payloadBuffer) - => ValueTask.CompletedTask; + => throw new InvalidOperationException("InboxSub.ReceiveInternalAsync should not be called."); protected override void TryComplete() { diff --git a/src/NATS.Client.Core/NatsConnection.Publish.cs b/src/NATS.Client.Core/NatsConnection.Publish.cs index 9f5abedf0..6e85e76b0 100644 --- a/src/NATS.Client.Core/NatsConnection.Publish.cs +++ b/src/NATS.Client.Core/NatsConnection.Publish.cs @@ -1,33 +1,76 @@ +using System.Diagnostics; +using System.Diagnostics.CodeAnalysis; +using System.Runtime.CompilerServices; + namespace NATS.Client.Core; public partial class NatsConnection { /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] public ValueTask PublishAsync(string subject, NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default) - { - headers?.SetReadOnly(); - return ConnectionState != NatsConnectionState.Open - ? ConnectAndPublishAsync(subject, default, headers, replyTo, NatsRawSerializer.Default, cancellationToken) - : CommandWriter.PublishAsync(subject, default, headers, replyTo, NatsRawSerializer.Default, cancellationToken); - } + => PublishAsync(Telemetry.NatsActivities, subject, default, headers, replyTo, NatsRawSerializer.Default, cancellationToken); /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public ValueTask PublishAsync(in NatsMsg msg, INatsSerialize? serializer = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default) + => PublishAsync(Telemetry.NatsActivities, msg.Subject, msg.Data, msg.Headers, msg.ReplyTo, serializer, cancellationToken); + + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] public ValueTask PublishAsync(string subject, T? data, NatsHeaders? headers = default, string? replyTo = default, INatsSerialize? serializer = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default) + => PublishAsync(Telemetry.NatsActivities, subject, data, headers, replyTo, serializer ?? Opts.SerializerRegistry.GetSerializer(), cancellationToken); + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + internal ValueTask PublishNoneAsync(ActivitySource activitySource, string subject, NatsHeaders? headers = default, string? replyTo = default, CancellationToken cancellationToken = default) + => PublishAsync(activitySource, subject, data: default, headers, replyTo, NatsRawSerializer.Default, cancellationToken); + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + internal ValueTask PublishAsync(ActivitySource activitySource, string subject, T? data, NatsHeaders? headers = default, string? replyTo = default, INatsSerialize? serializer = default, CancellationToken cancellationToken = default) + { + if (ConnectionState != NatsConnectionState.Open) + return ConnectAndPublishAsync(activitySource, subject, data, headers, replyTo, serializer ?? Opts.SerializerRegistry.GetSerializer(), cancellationToken); + + return InnerPublishAsync(activitySource, subject, data, headers, replyTo, serializer ?? Opts.SerializerRegistry.GetSerializer(), cancellationToken); + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private ValueTask InnerPublishAsync(ActivitySource activitySource, string subject, T? data, NatsHeaders? headers, string? replyTo, INatsSerialize serializer, CancellationToken cancellationToken) + => activitySource.HasListeners() + ? PublishTracedAsync(activitySource, subject, data, headers, replyTo, serializer, cancellationToken) + : SendPublishAsync(subject, data, headers, replyTo, serializer, cancellationToken); + + [SuppressMessage("StyleCop.CSharp.OrderingRules", "SA1204:Static elements should appear before instance elements", Justification = "Method is private")] + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private ValueTask SendPublishAsync(string subject, T? data, NatsHeaders? headers, string? replyTo, INatsSerialize serializer, CancellationToken cancellationToken = default) { - serializer ??= Opts.SerializerRegistry.GetSerializer(); + Telemetry.AddTraceContextHeaders(Activity.Current, ref headers); headers?.SetReadOnly(); - return ConnectionState != NatsConnectionState.Open - ? ConnectAndPublishAsync(subject, data, headers, replyTo, serializer, cancellationToken) - : CommandWriter.PublishAsync(subject, data, headers, replyTo, serializer, cancellationToken); + + return CommandWriter.PublishAsync(subject, data, headers, replyTo, serializer, cancellationToken); } - /// - public ValueTask PublishAsync(in NatsMsg msg, INatsSerialize? serializer = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default) => - PublishAsync(msg.Subject, msg.Data, msg.Headers, msg.ReplyTo, serializer, opts, cancellationToken); + [SuppressMessage("StyleCop.CSharp.OrderingRules", "SA1204:Static elements should appear before instance elements", Justification = "Method is private")] + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private async ValueTask PublishTracedAsync(ActivitySource activitySource, string subject, T? data, NatsHeaders? headers, string? replyTo, INatsSerialize serializer, CancellationToken cancellationToken = default) + { + using var activity = Telemetry.StartSendActivity(activitySource, Telemetry.Constants.PublishActivityName, this, subject, replyTo); + + try + { + await SendPublishAsync(subject, data, headers, replyTo, serializer, cancellationToken).ConfigureAwait(false); + } + catch (Exception ex) + { + Telemetry.SetException(activity, ex); + throw; + } + } - private async ValueTask ConnectAndPublishAsync(string subject, T? data, NatsHeaders? headers, string? replyTo, INatsSerialize serializer, CancellationToken cancellationToken) + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private async ValueTask ConnectAndPublishAsync(ActivitySource activitySource, string subject, T? data, NatsHeaders? headers, string? replyTo, INatsSerialize serializer, CancellationToken cancellationToken) { await ConnectAsync().AsTask().WaitAsync(cancellationToken).ConfigureAwait(false); - await CommandWriter.PublishAsync(subject, data, headers, replyTo, serializer, cancellationToken).ConfigureAwait(false); + await InnerPublishAsync(activitySource, subject, data, headers, replyTo, serializer, cancellationToken).ConfigureAwait(false); } } diff --git a/src/NATS.Client.Core/NatsConnection.RequestReply.cs b/src/NATS.Client.Core/NatsConnection.RequestReply.cs index f5bfa0838..9db6fff37 100644 --- a/src/NATS.Client.Core/NatsConnection.RequestReply.cs +++ b/src/NATS.Client.Core/NatsConnection.RequestReply.cs @@ -7,23 +7,28 @@ namespace NATS.Client.Core; public partial class NatsConnection { - private static readonly NatsSubOpts ReplyOptsDefault = new NatsSubOpts - { - MaxMsgs = 1, - ThrowIfNoResponders = true, - }; + private static readonly NatsSubOpts ReplyOptsDefault = new NatsSubOpts { MaxMsgs = 1, ThrowIfNoResponders = true, }; - private static readonly NatsSubOpts ReplyManyOptsDefault = new NatsSubOpts - { - StopOnEmptyMsg = true, - ThrowIfNoResponders = true, - }; + private static readonly NatsSubOpts ReplyManyOptsDefault = new NatsSubOpts { StopOnEmptyMsg = true, ThrowIfNoResponders = true, }; /// public string NewInbox() => NewInbox(InboxPrefix); /// - public async ValueTask> RequestAsync( + public ValueTask> RequestAsync( + string subject, + TRequest? data, + NatsHeaders? headers = default, + INatsSerialize? requestSerializer = default, + INatsDeserialize? replySerializer = default, + NatsPubOpts? requestOpts = default, + NatsSubOpts? replyOpts = default, + CancellationToken cancellationToken = default) + => RequestAsync( + Telemetry.NatsActivities, subject, data, headers, requestSerializer, replySerializer, requestOpts, replyOpts, cancellationToken); + + internal async ValueTask> RequestAsync( + ActivitySource activitySource, string subject, TRequest? data, NatsHeaders? headers = default, @@ -34,7 +39,7 @@ public async ValueTask> RequestAsync( CancellationToken cancellationToken = default) { replyOpts = SetReplyOptsDefaults(replyOpts); - await using var sub = await RequestSubAsync(subject, data, headers, requestSerializer, replySerializer, requestOpts, replyOpts, cancellationToken) + await using var sub = await RequestSubAsync(activitySource, subject, data, headers, requestSerializer, replySerializer, requestOpts, replyOpts, cancellationToken) .ConfigureAwait(false); if (await sub.Msgs.WaitToReadAsync(cancellationToken).ConfigureAwait(false)) @@ -49,6 +54,7 @@ public async ValueTask> RequestAsync( } /// + [SuppressMessage("StyleCop.CSharp.OrderingRules", "SA1202:Elements should be ordered by access", Justification = "Internal is wrapped by public")] public async IAsyncEnumerable> RequestManyAsync( string subject, TRequest? data, @@ -60,7 +66,7 @@ public async IAsyncEnumerable> RequestManyAsync(subject, data, headers, requestSerializer, replySerializer, requestOpts, replyOpts, cancellationToken) + await using var sub = await RequestSubAsync(Telemetry.NatsActivities, subject, data, headers, requestSerializer, replySerializer, requestOpts, replyOpts, cancellationToken) .ConfigureAwait(false); while (await sub.Msgs.WaitToReadAsync(cancellationToken).ConfigureAwait(false)) @@ -73,6 +79,7 @@ public async IAsyncEnumerable> RequestManyAsync prefix) { Span buffer = stackalloc char[64]; @@ -80,7 +87,7 @@ internal static string NewInbox(ReadOnlySpan prefix) var totalLength = (uint)prefix.Length + (uint)NuidWriter.NuidLength + separatorLength; if (totalLength <= buffer.Length) { - buffer = buffer.Slice(0, (int)totalLength); + buffer = buffer[..(int)totalLength]; } else { @@ -92,7 +99,7 @@ internal static string NewInbox(ReadOnlySpan prefix) { prefix.CopyTo(buffer); buffer[prefix.Length] = '.'; - var remaining = buffer.Slice((int)totalPrefixLength); + var remaining = buffer[(int)totalPrefixLength..]; var didWrite = NuidWriter.TryWriteNuid(remaining); Debug.Assert(didWrite, "didWrite"); return new string(buffer); diff --git a/src/NATS.Client.Core/NatsConnection.RequestSub.cs b/src/NATS.Client.Core/NatsConnection.RequestSub.cs index a3f94c24f..95c2b3dd9 100644 --- a/src/NATS.Client.Core/NatsConnection.RequestSub.cs +++ b/src/NATS.Client.Core/NatsConnection.RequestSub.cs @@ -1,8 +1,11 @@ +using System.Diagnostics; + namespace NATS.Client.Core; public partial class NatsConnection { internal async ValueTask> RequestSubAsync( + ActivitySource activitySource, string subject, TRequest? data, NatsHeaders? headers = default, @@ -15,11 +18,11 @@ internal async ValueTask> RequestSubAsync( var replyTo = NewInbox(); replySerializer ??= Opts.SerializerRegistry.GetDeserializer(); - var sub = new NatsSub(this, SubscriptionManager.InboxSubBuilder, replyTo, queueGroup: default, replyOpts, replySerializer); + var sub = new NatsSub(activitySource, this, SubscriptionManager.InboxSubBuilder, replyTo, queueGroup: default, replyOpts, replySerializer); await SubAsync(sub, cancellationToken).ConfigureAwait(false); requestSerializer ??= Opts.SerializerRegistry.GetSerializer(); - await PublishAsync(subject, data, headers, replyTo, requestSerializer, requestOpts, cancellationToken).ConfigureAwait(false); + await PublishAsync(activitySource, subject, data, headers, replyTo, requestSerializer, cancellationToken).ConfigureAwait(false); return sub; } diff --git a/src/NATS.Client.Core/NatsConnection.Subscribe.cs b/src/NATS.Client.Core/NatsConnection.Subscribe.cs index 649f7d1fd..5a11744c9 100644 --- a/src/NATS.Client.Core/NatsConnection.Subscribe.cs +++ b/src/NATS.Client.Core/NatsConnection.Subscribe.cs @@ -16,7 +16,7 @@ public async IAsyncEnumerable> SubscribeAsync(string subject, stri { serializer ??= Opts.SerializerRegistry.GetDeserializer(); - await using var sub = new NatsSub(this, SubscriptionManager.GetManagerFor(subject), subject, queueGroup, opts, serializer, cancellationToken); + await using var sub = new NatsSub(Telemetry.NatsActivities, this, SubscriptionManager.GetManagerFor(subject), subject, queueGroup, opts, serializer, cancellationToken); using var anchor = RegisterSubAnchor(sub); await SubAsync(sub, cancellationToken: cancellationToken).ConfigureAwait(false); @@ -36,7 +36,7 @@ public async IAsyncEnumerable> SubscribeAsync(string subject, stri public async ValueTask> SubscribeCoreAsync(string subject, string? queueGroup = default, INatsDeserialize? serializer = default, NatsSubOpts? opts = default, CancellationToken cancellationToken = default) { serializer ??= Opts.SerializerRegistry.GetDeserializer(); - var sub = new NatsSub(this, SubscriptionManager.GetManagerFor(subject), subject, queueGroup, opts, serializer, cancellationToken); + var sub = new NatsSub(Telemetry.NatsActivities, this, SubscriptionManager.GetManagerFor(subject), subject, queueGroup, opts, serializer, cancellationToken); await SubAsync(sub, cancellationToken).ConfigureAwait(false); return sub; } diff --git a/src/NATS.Client.Core/NatsConnection.cs b/src/NATS.Client.Core/NatsConnection.cs index 4ccc8b7d1..3247f09ae 100644 --- a/src/NATS.Client.Core/NatsConnection.cs +++ b/src/NATS.Client.Core/NatsConnection.cs @@ -132,6 +132,8 @@ public NatsConnectionState ConnectionState internal ObjectPool ObjectPool => _pool; + internal bool SocketIsWebSocket => _socket is WebSocketConnection; + // only used for internal testing internal ISocketConnection? TestSocket => _socket; diff --git a/src/NATS.Client.Core/NatsHeaders.cs b/src/NATS.Client.Core/NatsHeaders.cs index e63ff0de3..9f3dc2179 100644 --- a/src/NATS.Client.Core/NatsHeaders.cs +++ b/src/NATS.Client.Core/NatsHeaders.cs @@ -1,4 +1,5 @@ using System.Collections; +using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using Microsoft.Extensions.Primitives; @@ -72,6 +73,8 @@ public enum Messages public Messages Message { get; internal set; } = Messages.Text; + internal Activity? Activity { get; set; } + /// /// Initializes a new instance of . /// @@ -126,6 +129,7 @@ public StringValues this[string key] { return value; } + return StringValues.Empty; } set @@ -134,6 +138,7 @@ public StringValues this[string key] { throw new ArgumentNullException(nameof(key)); } + ThrowIfReadOnly(); if (value.Count == 0) @@ -150,7 +155,7 @@ public StringValues this[string key] StringValues IDictionary.this[string key] { - get { return this[key]; } + get => this[key]; set { ThrowIfReadOnly(); @@ -181,6 +186,7 @@ public ICollection Keys { return EmptyKeys; } + return Store.Keys; } } @@ -196,6 +202,7 @@ public ICollection Values { return EmptyValues; } + return Store.Values; } } @@ -210,6 +217,7 @@ public void Add(KeyValuePair item) { throw new ArgumentException("The key is null"); } + ThrowIfReadOnly(); EnsureStore(1); Store.Add(item.Key, item.Value); @@ -226,6 +234,7 @@ public void Add(string key, StringValues value) { throw new ArgumentNullException(nameof(key)); } + ThrowIfReadOnly(); EnsureStore(1); Store.Add(key, value); @@ -253,6 +262,7 @@ public bool Contains(KeyValuePair item) { return false; } + return true; } @@ -267,6 +277,7 @@ public bool ContainsKey(string key) { return false; } + return Store.ContainsKey(key); } @@ -306,6 +317,7 @@ public bool Remove(KeyValuePair item) { return Store.Remove(item.Key); } + return false; } @@ -321,6 +333,7 @@ public bool Remove(string key) { return false; } + return Store.Remove(key); } @@ -337,6 +350,7 @@ public bool TryGetValue(string key, out StringValues value) value = default(StringValues); return false; } + return Store.TryGetValue(key, out value); } @@ -351,6 +365,7 @@ public Enumerator GetEnumerator() // Non-boxed Enumerator return default; } + return new Enumerator(Store.GetEnumerator()); } @@ -365,6 +380,7 @@ IEnumerator> IEnumerable Current { return _dictionaryEnumerator.Current; } + return default(KeyValuePair); } } diff --git a/src/NATS.Client.Core/NatsMsg.cs b/src/NATS.Client.Core/NatsMsg.cs index 53bc01750..3573fa20a 100644 --- a/src/NATS.Client.Core/NatsMsg.cs +++ b/src/NATS.Client.Core/NatsMsg.cs @@ -1,4 +1,4 @@ -using System.Buffers; +using System.Diagnostics; using System.Diagnostics.CodeAnalysis; namespace NATS.Client.Core; @@ -120,42 +120,11 @@ public readonly record struct NatsMsg( T? Data, INatsConnection? Connection) : INatsMsg { - internal static NatsMsg Build( - string subject, - string? replyTo, - in ReadOnlySequence? headersBuffer, - in ReadOnlySequence payloadBuffer, - INatsConnection? connection, - NatsHeaderParser headerParser, - INatsDeserialize serializer) - { - // Consider an empty payload as null or default value for value types. This way we are able to - // receive sentinels as nulls or default values. This might cause an issue with where we are not - // able to differentiate between an empty sentinel and actual default value of a struct e.g. 0 (zero). - var data = payloadBuffer.Length > 0 - ? serializer.Deserialize(payloadBuffer) - : default; - - NatsHeaders? headers = null; - - if (headersBuffer != null) - { - headers = new NatsHeaders(); - if (!headerParser.ParseHeaders(new SequenceReader(headersBuffer.Value), headers)) - { - throw new NatsException("Error parsing headers"); - } - - headers.SetReadOnly(); - } - - var size = subject.Length - + (replyTo?.Length ?? 0) - + (headersBuffer?.Length ?? 0) - + payloadBuffer.Length; - - return new NatsMsg(subject, replyTo, (int)size, headers, data, connection); - } + /// + /// Activity used to trace the receiving of the this message. It can be used to create child activities under this context. + /// + /// + public Activity? Activity => Headers?.Activity; /// /// Reply with an empty message. @@ -168,7 +137,10 @@ internal static NatsMsg Build( public ValueTask ReplyAsync(NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default) { CheckReplyPreconditions(); - return Connection.PublishAsync(ReplyTo, headers, replyTo, opts, cancellationToken); + var activitySource = Activity?.Source ?? Telemetry.NatsInternalActivities; + + // TODO: un-hack + return ((NatsConnection)Connection).PublishNoneAsync(activitySource, subject: ReplyTo, headers, replyTo, cancellationToken); } /// @@ -196,7 +168,10 @@ public ValueTask ReplyAsync(NatsHeaders? headers = default, string? replyTo = de public ValueTask ReplyAsync(TReply data, NatsHeaders? headers = default, string? replyTo = default, INatsSerialize? serializer = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default) { CheckReplyPreconditions(); - return Connection.PublishAsync(ReplyTo, data, headers, replyTo, serializer, opts, cancellationToken); + var activitySource = Activity?.Source ?? Telemetry.NatsInternalActivities; + + // TODO: un-hack + return ((NatsConnection)Connection).PublishAsync(activitySource, subject: ReplyTo, data, headers, replyTo, serializer, cancellationToken); } /// @@ -214,7 +189,10 @@ public ValueTask ReplyAsync(TReply data, NatsHeaders? headers = default, public ValueTask ReplyAsync(NatsMsg msg, INatsSerialize? serializer = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default) { CheckReplyPreconditions(); - return Connection.PublishAsync(msg with { Subject = ReplyTo }, serializer, opts, cancellationToken); + var activitySource = Activity?.Source ?? Telemetry.NatsInternalActivities; + + // TODO: un-hack + return ((NatsConnection)Connection).PublishAsync(activitySource, subject: ReplyTo, msg.Data, msg.Headers, msg.ReplyTo, serializer, cancellationToken); } [MemberNotNull(nameof(Connection))] diff --git a/src/NATS.Client.Core/NatsMsgTelemetryExtensions.cs b/src/NATS.Client.Core/NatsMsgTelemetryExtensions.cs new file mode 100644 index 000000000..aeebc28e2 --- /dev/null +++ b/src/NATS.Client.Core/NatsMsgTelemetryExtensions.cs @@ -0,0 +1,36 @@ +using System.Diagnostics; +using System.Runtime.CompilerServices; + +namespace NATS.Client.Core; + +public static class NatsMsgTelemetryExtensions +{ + /// + /// Get the activity context associated with a NatsMsg. + /// + /// + /// Returns the from the 'receive' on . + /// The value will be default if no activity is present. + /// + public static ActivityContext GetActivityContext(this in NatsMsg msg) + => msg.Activity?.Context ?? default; + + /// Start a child activity under the NatsMsg associated activity. + /// Nats message + /// Name of new activity + /// Optional tags to add to the activity + /// Returns child or null if no listeners. + /// + /// Consider setting Activity. to the returned value so that + /// the context flows to any child activities created. + /// + public static Activity? StartChildActivity( + this in NatsMsg msg, + [CallerMemberName] string name = "", + IEnumerable>? tags = null) + => Telemetry.NatsActivities.StartActivity( + name, + kind: ActivityKind.Internal, + parentContext: GetActivityContext(in msg), + tags: tags); +} diff --git a/src/NATS.Client.Core/NatsSub.cs b/src/NATS.Client.Core/NatsSub.cs index 853408e83..b6ae22e6a 100644 --- a/src/NATS.Client.Core/NatsSub.cs +++ b/src/NATS.Client.Core/NatsSub.cs @@ -1,4 +1,5 @@ using System.Buffers; +using System.Diagnostics; using System.Runtime.ExceptionServices; using System.Threading.Channels; using NATS.Client.Core.Internal; @@ -7,9 +8,11 @@ namespace NATS.Client.Core; public sealed class NatsSub : NatsSubBase, INatsSub { + private readonly ActivitySource _activitySource; private readonly Channel> _msgs; internal NatsSub( + ActivitySource activitySource, NatsConnection connection, ISubscriptionManager manager, string subject, @@ -19,27 +22,32 @@ internal NatsSub( CancellationToken cancellationToken = default) : base(connection, manager, subject, queueGroup, opts, cancellationToken) { + _activitySource = activitySource; _msgs = Channel.CreateBounded>( connection.GetChannelOpts(connection.Opts, opts?.ChannelOpts), msg => Connection.OnMessageDropped(this, _msgs?.Reader.Count ?? 0, msg)); + Msgs = new ActivityEndingMsgReader(_msgs.Reader); + Serializer = serializer; } - public ChannelReader> Msgs => _msgs.Reader; + public ChannelReader> Msgs { get; } private INatsDeserialize Serializer { get; } protected override async ValueTask ReceiveInternalAsync(string subject, string? replyTo, ReadOnlySequence? headersBuffer, ReadOnlySequence payloadBuffer) { - var natsMsg = NatsMsg.Build( - subject, - replyTo, + var natsMsg = ParseMsg( + activitySource: _activitySource, + activityName: Telemetry.Constants.ReceiveActivityName, + subject: subject, + replyTo: replyTo, headersBuffer, - payloadBuffer, + in payloadBuffer, Connection, Connection.HeaderParser, - Serializer); + serializer: Serializer); await _msgs.Writer.WriteAsync(natsMsg).ConfigureAwait(false); diff --git a/src/NATS.Client.Core/NatsSubBase.cs b/src/NATS.Client.Core/NatsSubBase.cs index e1b8a610b..3502deaf6 100644 --- a/src/NATS.Client.Core/NatsSubBase.cs +++ b/src/NATS.Client.Core/NatsSubBase.cs @@ -1,4 +1,5 @@ using System.Buffers; +using System.Diagnostics; using System.Runtime.ExceptionServices; using System.Threading.Channels; using Microsoft.Extensions.Logging; @@ -346,4 +347,62 @@ protected void EndSubscription(NatsSubEndReason reason) #pragma warning restore VSTHRD110 #pragma warning restore CA2012 } + + protected NatsMsg ParseMsg( + ActivitySource activitySource, + string activityName, + string subject, + string? replyTo, + ReadOnlySequence? headersBuffer, + in ReadOnlySequence payloadBuffer, + INatsConnection? connection, + NatsHeaderParser headerParser, + INatsDeserialize serializer) + { + NatsHeaders? headers; + if (headersBuffer != null) + { + headers = new NatsHeaders(); + if (!headerParser.ParseHeaders(new SequenceReader(headersBuffer.Value), headers)) + throw new NatsException("Error parsing headers"); + } + else + { + headers = null; + } + + var size = subject.Length + + (replyTo?.Length ?? 0) + + (headersBuffer?.Length ?? 0) + + payloadBuffer.Length; + + var activity = Telemetry.StartReceiveActivity( + activitySource, + Connection, + name: activityName, + subscriptionSubject: Subject, + queueGroup: QueueGroup, + subject: subject, + replyTo: replyTo, + bodySize: payloadBuffer.Length, + size: size, + headers: headers); + + if (activity is not null) + { + headers ??= new NatsHeaders(); + headers.Activity = activity; + } + + headers?.SetReadOnly(); + + // Consider an empty payload as null or default value for value types. This way we are able to + // receive sentinels as nulls or default values. This might cause an issue with where we are not + // able to differentiate between an empty sentinel and actual default value of a struct e.g. 0 (zero). + var data = payloadBuffer.Length > 0 + ? serializer.Deserialize(payloadBuffer) + : default; + + return new NatsMsg(subject, replyTo, (int)size, headers, data, connection); + } } diff --git a/src/NATS.Client.Core/Telemetry.cs b/src/NATS.Client.Core/Telemetry.cs new file mode 100644 index 000000000..494c8aa31 --- /dev/null +++ b/src/NATS.Client.Core/Telemetry.cs @@ -0,0 +1,296 @@ +using System.Diagnostics; +using System.Diagnostics.CodeAnalysis; + +namespace NATS.Client.Core; + +// https://opentelemetry.io/docs/specs/semconv/attributes-registry/messaging/ +// https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/#messaging-attributes +internal static class Telemetry +{ + public const string NatsActivitySource = "NATS.Client"; + public const string NatsInternalActivitySource = "NATS.Client.Internal"; + private static readonly object BoxedTrue = true; + + internal static ActivitySource NatsActivities { get; } = new(name: NatsActivitySource); + + internal static ActivitySource NatsInternalActivities { get; } = new(name: NatsInternalActivitySource); + + // ReSharper disable once ClassNeverInstantiated.Global + internal class Constants + { + public const string True = "true"; + public const string False = "false"; + public const string PublishActivityName = "publish"; + public const string ReceiveActivityName = "receive"; + + public const string SystemKey = "messaging.system"; + public const string SystemVal = "nats"; + public const string ClientId = "messaging.client_id"; + public const string OpKey = "messaging.operation"; + public const string OpPub = "publish"; + public const string OpRec = "receive"; + public const string OpProcess = "process"; + public const string MsgBodySize = "messaging.message.body.size"; + public const string MsgTotalSize = "messaging.message.envelope.size"; + + // destination + public const string DestTemplate = "messaging.destination.template"; + public const string DestName = "messaging.destination.name"; + public const string DestIsTemporary = "messaging.destination.temporary"; + public const string DestIsAnon = "messaging.destination.anonymous"; + public const string DestPubName = "messaging.destination_publish.name"; + public const string DestPubIsAnon = "messaging.destination_publish.anonymous"; + + public const string QueueGroup = "messaging.nats.consumer.group"; + public const string ReplyTo = "messaging.nats.message.reply_to"; + public const string Subject = "messaging.nats.message.subject"; + + public const string ServerAddress = "server.address"; + public const string ServerPort = "server.port"; + public const string NetworkProtoName = "network.protocol.name"; + public const string NetworkProtoVersion = "network.protocol.version"; + public const string NetworkPeerAddress = "network.peer.address"; + public const string NetworkPeerPort = "network.peer.port"; + public const string NetworkLocalAddress = "network.local.address"; + public const string NetworkLocalPort = "network.local.port"; + public const string NetworkTransport = "network.transport"; + + // rpc ??? + public const string RpcSystemKey = "rpc.system"; + public const string RpcSystemVal = "nats"; + + public const string JSAck = "messaging.nats.js.acked"; + } + + [SuppressMessage("StyleCop.CSharp.OrderingRules", "SA1201:Elements should appear in the correct order", Justification = "Class is all constant.")] + internal static Activity? StartSendActivity(ActivitySource source, string name, INatsConnection? connection, string subject, string? replyTo) + { + if (!source.HasListeners()) + return null; + + KeyValuePair[] tags; + if (connection is NatsConnection { ServerInfo: not null } conn) + { + var len = 12; + if (replyTo is not null) + len++; + + var serverPort = conn.ServerInfo.Port.ToString(); + tags = new KeyValuePair[len]; + tags[0] = new KeyValuePair(Constants.SystemKey, Constants.SystemVal); + tags[1] = new KeyValuePair(Constants.OpKey, Constants.OpPub); + tags[2] = new KeyValuePair(Constants.DestName, subject); + + tags[3] = new KeyValuePair(Constants.ClientId, conn.ServerInfo.ClientId.ToString()); + tags[4] = new KeyValuePair(Constants.ServerAddress, conn.ServerInfo.Host); + tags[5] = new KeyValuePair(Constants.ServerPort, serverPort); + tags[6] = new KeyValuePair(Constants.NetworkProtoName, "nats"); + tags[7] = new KeyValuePair(Constants.NetworkProtoVersion, conn.ServerInfo.ProtocolVersion.ToString()); + tags[8] = new KeyValuePair(Constants.NetworkPeerAddress, conn.ServerInfo.Host); + tags[9] = new KeyValuePair(Constants.NetworkPeerPort, serverPort); + tags[10] = new KeyValuePair(Constants.NetworkLocalAddress, conn.ServerInfo.ClientIp); + tags[11] = new KeyValuePair(Constants.NetworkTransport, conn.SocketIsWebSocket ? "websocket" : "tcp"); + + if (replyTo is not null) + tags[12] = new KeyValuePair(Constants.ReplyTo, replyTo); + } + else + { + var len = 3; + if (replyTo is not null) + len++; + + tags = new KeyValuePair[len]; + tags[0] = new KeyValuePair(Constants.SystemKey, Constants.SystemVal); + tags[1] = new KeyValuePair(Constants.OpKey, Constants.OpPub); + tags[2] = new KeyValuePair(Constants.DestName, subject); + + if (replyTo is not null) + tags[3] = new KeyValuePair(Constants.ReplyTo, replyTo); + } + + return source.StartActivity( + name, + kind: ActivityKind.Producer, + parentContext: default, // propagate from current activity + tags: tags); + } + + internal static void AddTraceContextHeaders(Activity? activity, ref NatsHeaders? headers) + { + if (activity is null) + return; + + headers ??= new NatsHeaders(); + DistributedContextPropagator.Current.Inject( + activity: activity, + carrier: headers, + setter: static (carrier, fieldName, fieldValue) => + { + if (carrier is not NatsHeaders headers) + { + Debug.Assert(false, "This should never be hit."); + return; + } + + headers[fieldName] = fieldValue; + }); + } + + internal static Activity? StartReceiveActivity( + ActivitySource source, + INatsConnection? connection, + string name, + string subscriptionSubject, + string? queueGroup, + string subject, + string? replyTo, + long bodySize, + long size, + NatsHeaders? headers) + { + if (!source.HasListeners()) + return null; + + KeyValuePair[] tags; + if (connection is NatsConnection { ServerInfo: not null } conn) + { + var serverPort = conn.ServerInfo.Port.ToString(); + + var len = 19; + if (replyTo is not null) + len++; + + tags = new KeyValuePair[len]; + tags[0] = new KeyValuePair(Constants.SystemKey, Constants.SystemVal); + tags[1] = new KeyValuePair(Constants.OpKey, Constants.OpRec); + tags[2] = new KeyValuePair(Constants.DestTemplate, subscriptionSubject); + tags[3] = new KeyValuePair(Constants.QueueGroup, queueGroup); + tags[4] = new KeyValuePair(Constants.Subject, subject); + tags[5] = new KeyValuePair(Constants.DestName, subject); + tags[6] = new KeyValuePair(Constants.DestPubName, subject); + tags[7] = new KeyValuePair(Constants.MsgBodySize, bodySize.ToString()); + tags[8] = new KeyValuePair(Constants.MsgTotalSize, size.ToString()); + + tags[9] = new KeyValuePair(Constants.ClientId, conn.ServerInfo.ClientId.ToString()); + tags[10] = new KeyValuePair(Constants.ServerAddress, conn.ServerInfo.Host); + tags[11] = new KeyValuePair(Constants.ServerPort, serverPort); + tags[12] = new KeyValuePair(Constants.NetworkProtoName, "nats"); + tags[13] = new KeyValuePair(Constants.NetworkProtoVersion, conn.ServerInfo.ProtocolVersion.ToString()); + tags[14] = new KeyValuePair(Constants.NetworkPeerAddress, conn.ServerInfo.Host); + tags[15] = new KeyValuePair(Constants.NetworkPeerPort, serverPort); + tags[16] = new KeyValuePair(Constants.NetworkLocalAddress, conn.ServerInfo.ClientIp); + tags[17] = new KeyValuePair(Constants.NetworkTransport, conn.SocketIsWebSocket ? "websocket" : "tcp"); + tags[18] = new KeyValuePair( + Constants.DestIsTemporary, + subscriptionSubject.StartsWith(conn.InboxPrefix, StringComparison.Ordinal) ? Constants.True : Constants.False); + + if (replyTo is not null) + tags[19] = new KeyValuePair(Constants.ReplyTo, replyTo); + } + else + { + tags = new KeyValuePair[10]; + tags[0] = new KeyValuePair(Constants.SystemKey, Constants.SystemVal); + tags[1] = new KeyValuePair(Constants.OpKey, Constants.OpRec); + tags[2] = new KeyValuePair(Constants.DestTemplate, subscriptionSubject); + tags[3] = new KeyValuePair(Constants.QueueGroup, queueGroup); + tags[4] = new KeyValuePair(Constants.Subject, subject); + tags[5] = new KeyValuePair(Constants.DestName, subject); + tags[6] = new KeyValuePair(Constants.DestPubName, subject); + tags[7] = new KeyValuePair(Constants.MsgBodySize, bodySize.ToString()); + tags[8] = new KeyValuePair(Constants.MsgTotalSize, size.ToString()); + + if (replyTo is not null) + tags[9] = new KeyValuePair(Constants.ReplyTo, replyTo); + } + + if (headers is null || !TryParseTraceContext(headers, out var context)) + context = default; + + return source.StartActivity( + name, + kind: ActivityKind.Consumer, + parentContext: context, + tags: tags); + } + + internal static void SetException(Activity? activity, Exception exception) + { + if (activity is null) + return; + + // see: https://opentelemetry.io/docs/specs/semconv/attributes-registry/exception/ + var message = GetMessage(exception); + var eventTags = new ActivityTagsCollection + { + ["exception.escaped"] = BoxedTrue, + ["exception.type"] = exception.GetType().FullName, + ["exception.message"] = message, + ["exception.stacktrace"] = GetStackTrace(exception), + }; + + var activityEvent = new ActivityEvent("exception", DateTimeOffset.UtcNow, eventTags); + + activity.AddEvent(activityEvent); + activity.SetStatus(ActivityStatusCode.Error, message); + return; + + static string GetMessage(Exception exception) + { + try + { + return exception.Message; + } + catch + { + return $"An exception of type {exception.GetType()} was thrown but the Message property threw an exception."; + } + } + + static string GetStackTrace(Exception? exception) + { + var stackTrace = exception?.StackTrace; + return string.IsNullOrWhiteSpace(stackTrace) ? string.Empty : stackTrace; + } + } + + private static bool TryParseTraceContext(NatsHeaders headers, out ActivityContext context) + { + DistributedContextPropagator.Current.ExtractTraceIdAndState( + carrier: headers, + getter: static (object? carrier, string fieldName, out string? fieldValue, out IEnumerable? fieldValues) => + { + if (carrier is not NatsHeaders headers) + { + Debug.Assert(false, "This should never be hit."); + fieldValue = null; + fieldValues = null; + return; + } + + if (headers.TryGetValue(fieldName, out var values)) + { + if (values.Count == 1) + { + fieldValue = values[0]; + fieldValues = null; + } + else + { + fieldValue = null; + fieldValues = values; + } + } + else + { + fieldValue = null; + fieldValues = null; + } + }, + out var traceParent, + out var traceState); + + return ActivityContext.TryParse(traceParent, traceState, out context); + } +} diff --git a/src/NATS.Client.JetStream/Internal/ActivityEndingJSMsgReader.cs b/src/NATS.Client.JetStream/Internal/ActivityEndingJSMsgReader.cs new file mode 100644 index 000000000..f347da660 --- /dev/null +++ b/src/NATS.Client.JetStream/Internal/ActivityEndingJSMsgReader.cs @@ -0,0 +1,26 @@ +using System.Runtime.CompilerServices; +using System.Threading.Channels; + +namespace NATS.Client.JetStream.Internal; + +internal sealed class ActivityEndingJSMsgReader : ChannelReader> +{ + private readonly ChannelReader> _inner; + + public ActivityEndingJSMsgReader(ChannelReader> inner) => _inner = inner; + + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public override bool TryRead(out NatsJSMsg item) + { + if (!_inner.TryRead(out item)) + return false; + + item.Activity?.Dispose(); + return true; + } + + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public override ValueTask WaitToReadAsync(CancellationToken cancellationToken = default) => _inner.WaitToReadAsync(cancellationToken); +} diff --git a/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs b/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs index d94c9b88b..2a7c2208b 100644 --- a/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs +++ b/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs @@ -1,4 +1,5 @@ using System.Buffers; +using System.Diagnostics; using System.Text; using System.Threading.Channels; using Microsoft.Extensions.Logging; @@ -133,7 +134,7 @@ public NatsJSConsume( // sufficiently large value to avoid blocking socket reads in the // NATS connection). _userMsgs = Channel.CreateBounded>(1000); - Msgs = _userMsgs.Reader; + Msgs = new ActivityEndingJSMsgReader(_userMsgs.Reader); // Capacity as 1 is enough here since it's used for signaling only. _pullRequests = Channel.CreateBounded(1); @@ -155,6 +156,7 @@ public ValueTask CallMsgNextAsync(string origin, ConsumerGetnextRequest request, } return Connection.PublishAsync( + Telemetry.NatsInternalActivities, subject: $"{_context.Opts.Prefix}.CONSUMER.MSG.NEXT.{_stream}.{_consumer}", data: request, replyTo: Subject, @@ -323,14 +325,16 @@ protected override async ValueTask ReceiveInternalAsync( else { var msg = new NatsJSMsg( - NatsMsg.Build( - subject, - replyTo, + ParseMsg( + Telemetry.NatsActivities, + activityName: "js_receive", + subject: subject, + replyTo: replyTo, headersBuffer, - payloadBuffer, + in payloadBuffer, Connection, Connection.HeaderParser, - _serializer), + serializer: _serializer), _context); lock (_pendingGate) @@ -357,7 +361,7 @@ protected override async ValueTask ReceiveInternalAsync( // We can't pass cancellation token here because we need to hand // the message to the user to be processed. Writer will be completed // when the user calls Stop() or when the subscription is closed. - await _userMsgs.Writer.WriteAsync(msg).ConfigureAwait(false); + await _userMsgs.Writer.WriteAsync(msg, CancellationToken.None).ConfigureAwait(false); } } @@ -437,7 +441,7 @@ private async Task PullLoop() await foreach (var pr in _pullRequests.Reader.ReadAllAsync().ConfigureAwait(false)) { var origin = $"pull-loop({pr.Origin})"; - await CallMsgNextAsync(origin, pr.Request).ConfigureAwait(false); + await CallMsgNextAsync(origin, pr.Request, CancellationToken.None).ConfigureAwait(false); if (_debug) { _logger.LogDebug(NatsJSLogEvents.PullRequest, "Pull request issued for {Origin} {Batch}, {MaxBytes}", origin, pr.Request.Batch, pr.Request.MaxBytes); diff --git a/src/NATS.Client.JetStream/Internal/NatsJSFetch.cs b/src/NATS.Client.JetStream/Internal/NatsJSFetch.cs index 751d2f923..f73d8b6f7 100644 --- a/src/NATS.Client.JetStream/Internal/NatsJSFetch.cs +++ b/src/NATS.Client.JetStream/Internal/NatsJSFetch.cs @@ -1,4 +1,5 @@ using System.Buffers; +using System.Diagnostics; using System.Text; using System.Threading.Channels; using Microsoft.Extensions.Logging; @@ -73,7 +74,7 @@ public NatsJSFetch( // sufficiently large value to avoid blocking socket reads in the // NATS connection). _userMsgs = Channel.CreateBounded>(1000); - Msgs = _userMsgs.Reader; + Msgs = new ActivityEndingJSMsgReader(_userMsgs.Reader); if (_debug) { @@ -128,6 +129,7 @@ public NatsJSFetch( public ValueTask CallMsgNextAsync(ConsumerGetnextRequest request, CancellationToken cancellationToken = default) => Connection.PublishAsync( + Telemetry.NatsInternalActivities, subject: $"{_context.Opts.Prefix}.CONSUMER.MSG.NEXT.{_stream}.{_consumer}", data: request, replyTo: Subject, @@ -224,14 +226,16 @@ protected override async ValueTask ReceiveInternalAsync( else { var msg = new NatsJSMsg( - NatsMsg.Build( - subject, - replyTo, + ParseMsg( + Telemetry.NatsActivities, + activityName: "js_receive", + subject: subject, + replyTo: replyTo, headersBuffer, - payloadBuffer, + in payloadBuffer, Connection, Connection.HeaderParser, - _serializer), + serializer: _serializer), _context); _pendingMsgs--; diff --git a/src/NATS.Client.JetStream/Internal/NatsJSNotificationChannel.cs b/src/NATS.Client.JetStream/Internal/NatsJSNotificationChannel.cs index c7eaf1e7a..9784a1ba0 100644 --- a/src/NATS.Client.JetStream/Internal/NatsJSNotificationChannel.cs +++ b/src/NATS.Client.JetStream/Internal/NatsJSNotificationChannel.cs @@ -25,7 +25,7 @@ public NatsJSNotificationChannel( SingleWriter = false, FullMode = BoundedChannelFullMode.DropOldest, }); - _loop = Task.Run(NotificationLoop); + _loop = Task.Run(NotificationLoop, CancellationToken.None); } public void Notify(INatsJSNotification notification) => _channel.Writer.TryWrite(notification); diff --git a/src/NATS.Client.JetStream/Internal/NatsJSOrderedConsume.cs b/src/NATS.Client.JetStream/Internal/NatsJSOrderedConsume.cs index e1a698320..d30e64a70 100644 --- a/src/NATS.Client.JetStream/Internal/NatsJSOrderedConsume.cs +++ b/src/NATS.Client.JetStream/Internal/NatsJSOrderedConsume.cs @@ -1,4 +1,5 @@ using System.Buffers; +using System.Diagnostics; using System.Text; using System.Threading.Channels; using Microsoft.Extensions.Logging; @@ -97,12 +98,12 @@ public NatsJSOrderedConsume( _userMsgs = Channel.CreateBounded>( Connection.GetChannelOpts(Connection.Opts, opts?.ChannelOpts), msg => Connection.OnMessageDropped(this, _userMsgs?.Reader.Count ?? 0, msg.Msg)); - Msgs = _userMsgs.Reader; + Msgs = new ActivityEndingJSMsgReader(_userMsgs.Reader); // Pull request channel is set as unbounded because we don't want to drop // them and minimize potential lock contention. _pullRequests = Channel.CreateUnbounded(); - _pullTask = Task.Run(PullLoop); + _pullTask = Task.Run(PullLoop, CancellationToken.None); ResetPending(); @@ -122,6 +123,7 @@ public ValueTask CallMsgNextAsync(string origin, ConsumerGetnextRequest request, } return Connection.PublishAsync( + Telemetry.NatsInternalActivities, subject: $"{_context.Opts.Prefix}.CONSUMER.MSG.NEXT.{_stream}.{_consumer}", data: request, replyTo: Subject, @@ -269,14 +271,16 @@ protected override async ValueTask ReceiveInternalAsync( else { var msg = new NatsJSMsg( - NatsMsg.Build( - subject, - replyTo, + ParseMsg( + Telemetry.NatsActivities, + activityName: "js_receive", + subject: subject, + replyTo: replyTo, headersBuffer, - payloadBuffer, + in payloadBuffer, Connection, Connection.HeaderParser, - _serializer), + serializer: _serializer), _context); lock (_pendingGate) diff --git a/src/NATS.Client.JetStream/Internal/NatsJSOrderedPushConsumer.cs b/src/NATS.Client.JetStream/Internal/NatsJSOrderedPushConsumer.cs index 04c762ffc..40e9234e9 100644 --- a/src/NATS.Client.JetStream/Internal/NatsJSOrderedPushConsumer.cs +++ b/src/NATS.Client.JetStream/Internal/NatsJSOrderedPushConsumer.cs @@ -1,4 +1,5 @@ using System.Buffers; +using System.Diagnostics; using System.Threading.Channels; using Microsoft.Extensions.Logging; using NATS.Client.Core; @@ -113,21 +114,18 @@ public NatsJSOrderedPushConsumer( // where it's not enough due to performance for example. _commandChannel = Channel.CreateBounded>(1); _msgChannel = Channel.CreateBounded>(1); + Msgs = new ActivityEndingJSMsgReader(_msgChannel.Reader); // A single request to create the consumer is enough because we don't want to create a new consumer // back to back in case the consumer is being recreated due to a timeout and a mismatch in consumer // sequence for example; creating the consumer once would solve both the issues. - _consumerCreateChannel = Channel.CreateBounded(new BoundedChannelOptions(1) - { - AllowSynchronousContinuations = false, - FullMode = BoundedChannelFullMode.DropOldest, - }); + _consumerCreateChannel = Channel.CreateBounded(new BoundedChannelOptions(1) { AllowSynchronousContinuations = false, FullMode = BoundedChannelFullMode.DropOldest, }); _consumerCreateTask = Task.Run(ConsumerCreateLoop); _commandTask = Task.Run(CommandLoop); } - public ChannelReader> Msgs => _msgChannel.Reader; + public ChannelReader> Msgs { get; } public bool IsDone => Volatile.Read(ref _done) > 0; @@ -148,7 +146,7 @@ public async ValueTask DisposeAsync() await _consumerCreateTask; await _commandTask; - await _context.DeleteConsumerAsync(_stream, Consumer, _cancellationToken); + await _context.DeleteConsumerAsync(Telemetry.NatsInternalActivities, _stream, Consumer, _cancellationToken); } internal void Init() @@ -198,7 +196,7 @@ private async Task CommandLoop() { if (headers.TryGetValue("Nats-Consumer-Stalled", out var flowControlReplyTo)) { - await _nats.PublishAsync(flowControlReplyTo, cancellationToken: _cancellationToken); + await _nats.PublishNoneAsync(Telemetry.NatsInternalActivities, subject: flowControlReplyTo, cancellationToken: _cancellationToken); } if (headers is { Code: 100, MessageText: "FlowControl Request" }) @@ -442,7 +440,19 @@ protected override async ValueTask ReceiveInternalAsync( ReadOnlySequence? headersBuffer, ReadOnlySequence payloadBuffer) { - var msg = new NatsJSMsg(NatsMsg.Build(subject, replyTo, headersBuffer, payloadBuffer, _nats, _headerParser, _serializer), _context); + var msg = new NatsJSMsg( + ParseMsg( + activitySource: Telemetry.NatsInternalActivities, + activityName: "js_receive", + subject: subject, + replyTo: replyTo, + headersBuffer, + in payloadBuffer, + Connection, + Connection.HeaderParser, + serializer: _serializer), + _context); + await _commands.WriteAsync(new NatsJSOrderedPushConsumerMsg { Command = NatsJSOrderedPushConsumerCommand.Msg, Msg = msg }, _cancellationToken).ConfigureAwait(false); } diff --git a/src/NATS.Client.JetStream/NatsJSConsumer.cs b/src/NATS.Client.JetStream/NatsJSConsumer.cs index 8f1d0f224..788d1abaf 100644 --- a/src/NATS.Client.JetStream/NatsJSConsumer.cs +++ b/src/NATS.Client.JetStream/NatsJSConsumer.cs @@ -1,3 +1,4 @@ +using System.Diagnostics; using System.Runtime.CompilerServices; using System.Threading.Channels; using NATS.Client.Core; @@ -282,6 +283,7 @@ public async IAsyncEnumerable> FetchNoWaitAsync( /// Server responded with an error. public async ValueTask RefreshAsync(CancellationToken cancellationToken = default) => Info = await _context.JSRequestResponseAsync( + Telemetry.NatsActivities, subject: $"{_context.Opts.Prefix}.CONSUMER.INFO.{_stream}.{_consumer}", request: null, cancellationToken).ConfigureAwait(false); diff --git a/src/NATS.Client.JetStream/NatsJSContext.Consumers.cs b/src/NATS.Client.JetStream/NatsJSContext.Consumers.cs index e361cd5c8..55c1f2e68 100644 --- a/src/NATS.Client.JetStream/NatsJSContext.Consumers.cs +++ b/src/NATS.Client.JetStream/NatsJSContext.Consumers.cs @@ -1,6 +1,8 @@ +using System.Diagnostics; +using System.Diagnostics.CodeAnalysis; using System.Runtime.CompilerServices; +using NATS.Client.Core; using NATS.Client.Core.Internal; -using NATS.Client.JetStream.Internal; using NATS.Client.JetStream.Models; namespace NATS.Client.JetStream; @@ -27,38 +29,11 @@ public ValueTask CreateOrderedConsumerAsync( } /// > - public async ValueTask CreateOrUpdateConsumerAsync( + public ValueTask CreateOrUpdateConsumerAsync( string stream, ConsumerConfig config, CancellationToken cancellationToken = default) - { - ThrowIfInvalidStreamName(stream); - - // TODO: Adjust API subject according to server version and filter subject - var subject = $"{Opts.Prefix}.CONSUMER.CREATE.{stream}"; - - if (!string.IsNullOrWhiteSpace(config.Name)) - { - subject += $".{config.Name}"; - config.Name = default!; - } - - if (!string.IsNullOrWhiteSpace(config.FilterSubject)) - { - subject += $".{config.FilterSubject}"; - } - - var response = await JSRequestResponseAsync( - subject: subject, - new ConsumerCreateRequest - { - StreamName = stream, - Config = config, - }, - cancellationToken); - - return new NatsJSConsumer(this, response); - } + => CreateOrUpdateConsumerAsync(Telemetry.NatsActivities, stream, config, cancellationToken); /// /// Gets consumer information from the server and creates a NATS JetStream consumer . @@ -75,6 +50,7 @@ public async ValueTask GetConsumerAsync(string stream, string c { ThrowIfInvalidStreamName(stream); var response = await JSRequestResponseAsync( + Telemetry.NatsActivities, subject: $"{Opts.Prefix}.CONSUMER.INFO.{stream}.{consumer}", request: null, cancellationToken); @@ -91,6 +67,7 @@ public async IAsyncEnumerable ListConsumersAsync( while (!cancellationToken.IsCancellationRequested) { var response = await JSRequestResponseAsync( + Telemetry.NatsActivities, subject: $"{Opts.Prefix}.CONSUMER.LIST.{stream}", new ConsumerListRequest { Offset = offset }, cancellationToken); @@ -117,6 +94,7 @@ public async IAsyncEnumerable ListConsumerNamesAsync( while (!cancellationToken.IsCancellationRequested) { var response = await JSRequestResponseAsync( + Telemetry.NatsActivities, subject: $"{Opts.Prefix}.CONSUMER.NAMES.{stream}", new ConsumerNamesRequest { Offset = offset }, cancellationToken); @@ -142,16 +120,55 @@ public async IAsyncEnumerable ListConsumerNamesAsync( /// Server responded with an error. /// The name is invalid. /// The name is null. - public async ValueTask DeleteConsumerAsync(string stream, string consumer, CancellationToken cancellationToken = default) + public ValueTask DeleteConsumerAsync(string stream, string consumer, CancellationToken cancellationToken = default) + => DeleteConsumerAsync(Telemetry.NatsActivities, stream, consumer, cancellationToken); + + public async ValueTask DeleteConsumerAsync(ActivitySource activitySource, string stream, string consumer, CancellationToken cancellationToken = default) { ThrowIfInvalidStreamName(stream); var response = await JSRequestResponseAsync( + activitySource, subject: $"{Opts.Prefix}.CONSUMER.DELETE.{stream}.{consumer}", request: null, cancellationToken); return response.Success; } + internal async ValueTask CreateOrUpdateConsumerAsync( + ActivitySource activitySource, + string stream, + ConsumerConfig config, + CancellationToken cancellationToken = default) + { + ThrowIfInvalidStreamName(stream); + + // TODO: Adjust API subject according to server version and filter subject + var subject = $"{Opts.Prefix}.CONSUMER.CREATE.{stream}"; + + if (!string.IsNullOrWhiteSpace(config.Name)) + { + subject += $".{config.Name}"; + config.Name = default!; + } + + if (!string.IsNullOrWhiteSpace(config.FilterSubject)) + { + subject += $".{config.FilterSubject}"; + } + + var response = await JSRequestResponseAsync( + activitySource, + subject: subject, + new ConsumerCreateRequest + { + StreamName = stream, + Config = config, + }, + cancellationToken); + + return new NatsJSConsumer(this, response); + } + internal ValueTask CreateOrderedConsumerInternalAsync( string stream, NatsJSOrderedConsumerOpts opts, @@ -195,6 +212,7 @@ internal ValueTask CreateOrderedConsumerInternalAsync( var subject = $"{Opts.Prefix}.CONSUMER.CREATE.{stream}.{name}"; return JSRequestResponseAsync( + activitySource: Telemetry.NatsInternalActivities, subject: subject, request, cancellationToken); diff --git a/src/NATS.Client.JetStream/NatsJSContext.Streams.cs b/src/NATS.Client.JetStream/NatsJSContext.Streams.cs index 53d61cbdd..99ffd92ab 100644 --- a/src/NATS.Client.JetStream/NatsJSContext.Streams.cs +++ b/src/NATS.Client.JetStream/NatsJSContext.Streams.cs @@ -1,4 +1,5 @@ using System.Runtime.CompilerServices; +using NATS.Client.Core; using NATS.Client.JetStream.Models; namespace NATS.Client.JetStream; @@ -21,6 +22,7 @@ public async ValueTask CreateStreamAsync( { ThrowIfInvalidStreamName(config.Name, nameof(config.Name)); var response = await JSRequestResponseAsync( + Telemetry.NatsActivities, subject: $"{Opts.Prefix}.STREAM.CREATE.{config.Name}", config, cancellationToken); @@ -43,6 +45,7 @@ public async ValueTask DeleteStreamAsync( { ThrowIfInvalidStreamName(stream); var response = await JSRequestResponseAsync( + Telemetry.NatsActivities, subject: $"{Opts.Prefix}.STREAM.DELETE.{stream}", request: null, cancellationToken); @@ -67,6 +70,7 @@ public async ValueTask PurgeStreamAsync( { ThrowIfInvalidStreamName(stream); var response = await JSRequestResponseAsync( + Telemetry.NatsActivities, subject: $"{Opts.Prefix}.STREAM.PURGE.{stream}", request: request, cancellationToken); @@ -91,6 +95,7 @@ public async ValueTask DeleteMessageAsync( { ThrowIfInvalidStreamName(stream); var response = await JSRequestResponseAsync( + Telemetry.NatsActivities, subject: $"{Opts.Prefix}.STREAM.MSG.DELETE.{stream}", request: request, cancellationToken); @@ -115,6 +120,7 @@ public async ValueTask GetStreamAsync( { ThrowIfInvalidStreamName(stream); var response = await JSRequestResponseAsync( + Telemetry.NatsActivities, subject: $"{Opts.Prefix}.STREAM.INFO.{stream}", request: request, cancellationToken); @@ -137,6 +143,7 @@ public async ValueTask UpdateStreamAsync( { ThrowIfInvalidStreamName(request.Name, nameof(request.Name)); var response = await JSRequestResponseAsync( + Telemetry.NatsActivities, subject: $"{Opts.Prefix}.STREAM.UPDATE.{request.Name}", request: request, cancellationToken); @@ -159,6 +166,7 @@ public async IAsyncEnumerable ListStreamsAsync( while (!cancellationToken.IsCancellationRequested) { var response = await JSRequestResponseAsync( + Telemetry.NatsActivities, subject: $"{Opts.Prefix}.STREAM.LIST", request: new StreamListRequest { @@ -189,6 +197,7 @@ public async IAsyncEnumerable ListStreamNamesAsync(string? subject = def while (!cancellationToken.IsCancellationRequested) { var response = await JSRequestResponseAsync( + Telemetry.NatsActivities, subject: $"{Opts.Prefix}.STREAM.NAMES", request: new StreamNamesRequest { diff --git a/src/NATS.Client.JetStream/NatsJSContext.cs b/src/NATS.Client.JetStream/NatsJSContext.cs index 462dd0079..67076507d 100644 --- a/src/NATS.Client.JetStream/NatsJSContext.cs +++ b/src/NATS.Client.JetStream/NatsJSContext.cs @@ -1,3 +1,4 @@ +using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using System.Runtime.CompilerServices; using Microsoft.Extensions.Logging; @@ -41,6 +42,7 @@ public NatsJSContext(NatsConnection connection, NatsJSOpts opts) /// The account information based on the NATS connection credentials. public ValueTask GetAccountInfoAsync(CancellationToken cancellationToken = default) => JSRequestResponseAsync( + activitySource: Telemetry.NatsActivities, subject: $"{Opts.Prefix}.INFO", request: null, cancellationToken); @@ -72,187 +74,222 @@ public ValueTask GetAccountInfoAsync(CancellationToken canc /// and the Nats-Msg-Id header value was set, msgId parameter value will be used. /// /// - public async ValueTask PublishAsync( + public ValueTask PublishAsync( string subject, T? data, INatsSerialize? serializer = default, NatsJSPubOpts? opts = default, NatsHeaders? headers = default, CancellationToken cancellationToken = default) + => PublishAsync(Telemetry.NatsActivities, subject, data, serializer, opts, headers, cancellationToken); + + internal static void ThrowIfInvalidStreamName([NotNull] string? name, [CallerArgumentExpression("name")] string? paramName = null) { - if (opts != null) + ArgumentNullException.ThrowIfNull(name, paramName); + + if (name.Length == 0) { - if (opts.MsgId != null) - { - headers ??= new NatsHeaders(); - headers["Nats-Msg-Id"] = opts.MsgId; - } + ThrowEmptyException(paramName); + } - if (opts.ExpectedLastMsgId != null) - { - headers ??= new NatsHeaders(); - headers["Nats-Expected-Last-Msg-Id"] = opts.ExpectedLastMsgId; - } + var nameSpan = name.AsSpan(); + if (nameSpan.IndexOfAny(" .") >= 0) + { + ThrowInvalidStreamNameException(paramName); + } + } - if (opts.ExpectedStream != null) + internal async ValueTask PublishAsync( + ActivitySource activitySource, + string subject, + T? data, + INatsSerialize? serializer = default, + NatsJSPubOpts? opts = default, + NatsHeaders? headers = default, + CancellationToken cancellationToken = default) + { + using var activity = Telemetry.StartSendActivity(activitySource, name: "js_publish", Connection, subject, replyTo: null); + try + { + if (opts != null) { - headers ??= new NatsHeaders(); - headers["Nats-Expected-Stream"] = opts.ExpectedStream; - } + if (opts.MsgId != null) + { + headers ??= new NatsHeaders(); + headers["Nats-Msg-Id"] = opts.MsgId; + } - if (opts.ExpectedLastSequence != null) - { - headers ??= new NatsHeaders(); - headers["Nats-Expected-Last-Sequence"] = opts.ExpectedLastSequence.ToString(); - } + if (opts.ExpectedLastMsgId != null) + { + headers ??= new NatsHeaders(); + headers["Nats-Expected-Last-Msg-Id"] = opts.ExpectedLastMsgId; + } - if (opts.ExpectedLastSubjectSequence != null) - { - headers ??= new NatsHeaders(); - headers["Nats-Expected-Last-Subject-Sequence"] = opts.ExpectedLastSubjectSequence.ToString(); - } - } + if (opts.ExpectedStream != null) + { + headers ??= new NatsHeaders(); + headers["Nats-Expected-Stream"] = opts.ExpectedStream; + } - opts ??= NatsJSPubOpts.Default; - var retryMax = opts.RetryAttempts; - var retryWait = opts.RetryWaitBetweenAttempts; + if (opts.ExpectedLastSequence != null) + { + headers ??= new NatsHeaders(); + headers["Nats-Expected-Last-Sequence"] = opts.ExpectedLastSequence.ToString(); + } - for (var i = 0; i < retryMax; i++) - { - await using var sub = await Connection.RequestSubAsync( - subject: subject, - data: data, - headers: headers, - requestSerializer: serializer, - replySerializer: NatsJSJsonSerializer.Default, - requestOpts: opts, - replyOpts: new NatsSubOpts - { - // It's important to set the timeout here so that the subscription can be - // stopped if the server doesn't respond or more likely case is that if there - // is a reconnect to the cluster between the request and waiting for a response, - // without the timeout the publish call will hang forever since the server - // which received the request won't be there to respond anymore. - Timeout = Connection.Opts.RequestTimeout, - - // If JetStream is disabled, a no responders error will be returned - // No responders error might also happen when reconnecting to cluster - ThrowIfNoResponders = true, - }, - cancellationToken) - .ConfigureAwait(false); + if (opts.ExpectedLastSubjectSequence != null) + { + headers ??= new NatsHeaders(); + headers["Nats-Expected-Last-Subject-Sequence"] = opts.ExpectedLastSubjectSequence.ToString(); + } + } + + opts ??= NatsJSPubOpts.Default; + var retryMax = opts.RetryAttempts; + var retryWait = opts.RetryWaitBetweenAttempts; - try + for (var i = 0; i < retryMax; i++) { - while (await sub.Msgs.WaitToReadAsync(cancellationToken).ConfigureAwait(false)) + await using var sub = await Connection.RequestSubAsync( + activitySource: Telemetry.NatsInternalActivities, + subject: subject, + data: data, + headers: headers, + requestSerializer: serializer, + replySerializer: NatsJSJsonSerializer.Default, + requestOpts: opts, + replyOpts: new NatsSubOpts + { + // It's important to set the timeout here so that the subscription can be + // stopped if the server doesn't respond or more likely case is that if there + // is a reconnect to the cluster between the request and waiting for a response, + // without the timeout the publish call will hang forever since the server + // which received the request won't be there to respond anymore. + Timeout = Connection.Opts.RequestTimeout, + + // If JetStream is disabled, a no responders error will be returned + // No responders error might also happen when reconnecting to cluster + ThrowIfNoResponders = true, + }, + cancellationToken) + .ConfigureAwait(false); + + try { - while (sub.Msgs.TryRead(out var msg)) + while (await sub.Msgs.WaitToReadAsync(cancellationToken).ConfigureAwait(false)) { - if (msg.Data == null) + while (sub.Msgs.TryRead(out var msg)) { - throw new NatsJSException("No response data received"); - } + if (msg.Data == null) + { + throw new NatsJSException("No response data received"); + } + + activity?.AddTag(Telemetry.Constants.JSAck, msg.Data.Error is null); - return msg.Data; + return msg.Data; + } } } - } - catch (NatsNoRespondersException) - { - } + catch (NatsNoRespondersException) + { + } - if (i < retryMax) - { - _logger.LogDebug(NatsJSLogEvents.PublishNoResponseRetry, "No response received, retrying {RetryCount}/{RetryMax}", i + 1, retryMax); - await Task.Delay(retryWait, cancellationToken); + if (i < retryMax) + { + _logger.LogDebug(NatsJSLogEvents.PublishNoResponseRetry, "No response received, retrying {RetryCount}/{RetryMax}", i + 1, retryMax); + await Task.Delay(retryWait, cancellationToken); + } } - } - - // We throw a specific exception here for convenience so that the caller doesn't - // have to check for the exception message etc. - throw new NatsJSPublishNoResponseException(); - } - internal static void ThrowIfInvalidStreamName([NotNull] string? name, [CallerArgumentExpression("name")] string? paramName = null) - { - ArgumentNullException.ThrowIfNull(name, paramName); - - if (name.Length == 0) - { - ThrowEmptyException(paramName); + // We throw a specific exception here for convenience so that the caller doesn't + // have to check for the exception message etc. + throw new NatsJSPublishNoResponseException(); } - - var nameSpan = name.AsSpan(); - if (nameSpan.IndexOfAny(" .") >= 0) + catch (Exception ex) { - ThrowInvalidStreamNameException(paramName); + Telemetry.SetException(activity, ex); + throw; } } internal string NewInbox() => NatsConnection.NewInbox(Connection.Opts.InboxPrefix); internal async ValueTask JSRequestResponseAsync( + ActivitySource activitySource, string subject, TRequest? request, CancellationToken cancellationToken = default) where TRequest : class where TResponse : class { - var response = await JSRequestAsync(subject, request, cancellationToken); + var response = await JSRequestAsync(activitySource, subject, request, cancellationToken); response.EnsureSuccess(); return response.Response!; } internal async ValueTask> JSRequestAsync( + ActivitySource activitySource, string subject, TRequest? request, CancellationToken cancellationToken = default) where TRequest : class where TResponse : class { + using var activity = Telemetry.StartSendActivity(activitySource, name: "js_publish", Connection, subject, replyTo: null); + if (request != null) { // TODO: Can't validate using JSON serializer context at the moment. // Validator.ValidateObject(request, new ValidationContext(request)); } - await using var sub = await Connection.RequestSubAsync( - subject: subject, - data: request, - headers: default, - replyOpts: new NatsSubOpts { Timeout = Connection.Opts.RequestTimeout }, - requestSerializer: NatsJSJsonSerializer.Default, - replySerializer: NatsJSErrorAwareJsonSerializer.Default, - cancellationToken: cancellationToken) - .ConfigureAwait(false); - - if (await sub.Msgs.WaitToReadAsync(cancellationToken).ConfigureAwait(false)) + try { - if (sub.Msgs.TryRead(out var msg)) + await using var sub = await Connection.RequestSubAsync( + activitySource: Telemetry.NatsInternalActivities, + subject: subject, + data: request, + headers: default, + replyOpts: new NatsSubOpts { Timeout = Connection.Opts.RequestTimeout }, + requestSerializer: NatsJSJsonSerializer.Default, + replySerializer: NatsJSErrorAwareJsonSerializer.Default, + cancellationToken: cancellationToken) + .ConfigureAwait(false); + + if (await sub.Msgs.WaitToReadAsync(cancellationToken).ConfigureAwait(false)) { - if (msg.Data == null) + if (sub.Msgs.TryRead(out var msg)) { - throw new NatsJSException("No response data received"); - } + if (msg.Data == null) + { + throw new NatsJSException("No response data received"); + } - return new NatsJSResponse(msg.Data, default); + return new NatsJSResponse(msg.Data, default); + } } - } - if (sub is NatsSubBase { EndReason: NatsSubEndReason.Exception, Exception: not null } sb) - { - if (sb.Exception is NatsSubException { Exception.SourceException: NatsJSApiErrorException jsError }) + if (sub is NatsSubBase { EndReason: NatsSubEndReason.Exception, Exception: not null } sb) { - // Clear exception here so that subscription disposal won't throw it. - sb.ClearException(); + if (sb.Exception is NatsSubException { Exception.SourceException: NatsJSApiErrorException jsError }) + { + // Clear exception here so that subscription disposal won't throw it. + sb.ClearException(); - return new NatsJSResponse(default, jsError.Error); + return new NatsJSResponse(default, jsError.Error); + } + + throw sb.Exception; } - throw sb.Exception; + throw new NatsJSApiNoResponseException(); + } + catch (Exception ex) + { + Telemetry.SetException(activity, ex); + throw; } - - throw new NatsJSApiNoResponseException(); } [DoesNotReturn] diff --git a/src/NATS.Client.JetStream/NatsJSMsg.cs b/src/NATS.Client.JetStream/NatsJSMsg.cs index b2f849041..d647e605c 100644 --- a/src/NATS.Client.JetStream/NatsJSMsg.cs +++ b/src/NATS.Client.JetStream/NatsJSMsg.cs @@ -1,4 +1,5 @@ using System.Buffers; +using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using System.Text; using NATS.Client.Core; @@ -145,6 +146,12 @@ public NatsJSMsg(NatsMsg msg, NatsJSContext context) _replyToDateTimeAndSeq = new Lazy(() => ReplyToDateTimeAndSeq.Parse(msg.ReplyTo)); } + /// + /// Activity used to trace the receiving of the this message. It can be used to create child activities under this context. + /// + /// + public Activity? Activity => _msg.Activity; + /// /// Subject of the user message. /// @@ -260,13 +267,16 @@ public ValueTask NakAsync(AckOpts? opts = default, TimeSpan delay = default, Can private async ValueTask SendAckAsync(ReadOnlySequence payload, AckOpts? opts = default, CancellationToken cancellationToken = default) { CheckPreconditions(); + var activitySource = Activity?.Source ?? Telemetry.NatsInternalActivities; if (_msg == default) throw new NatsJSException("No user message, can't acknowledge"); if (opts?.DoubleAck ?? _context.Opts.DoubleAck) { - await Connection.RequestAsync, object?>( + // TODO: un-hack + await ((NatsConnection)Connection).RequestAsync, object?>( + activitySource, subject: ReplyTo, data: payload, requestSerializer: NatsRawSerializer>.Default, @@ -275,7 +285,14 @@ private async ValueTask SendAckAsync(ReadOnlySequence payload, AckOpts? op } else { - await _msg.ReplyAsync( + var sub = ReplyTo; + if (string.IsNullOrWhiteSpace(sub)) + throw new NatsException("unable to send reply; ReplyTo is empty"); + + // TODO: un-hack + await ((NatsConnection)Connection).PublishAsync( + activitySource, + subject: sub, data: payload, serializer: NatsRawSerializer>.Default, cancellationToken: cancellationToken); diff --git a/src/NATS.Client.JetStream/NatsJSMsgTelemetryExtensions.cs b/src/NATS.Client.JetStream/NatsJSMsgTelemetryExtensions.cs new file mode 100644 index 000000000..8b0f90397 --- /dev/null +++ b/src/NATS.Client.JetStream/NatsJSMsgTelemetryExtensions.cs @@ -0,0 +1,37 @@ +using System.Diagnostics; +using System.Runtime.CompilerServices; +using NATS.Client.Core; + +namespace NATS.Client.JetStream; + +public static class NatsJSMsgTelemetryExtensions +{ + /// + /// Get the activity context associated with a NatsMsg. + /// + /// + /// Returns the from the 'receive' on . + /// The value will be default if no activity is present. + /// + public static ActivityContext GetActivityContext(this in NatsJSMsg msg) + => msg.Activity?.Context ?? default; + + /// Start a child activity under the NatsMsg associated activity. + /// Nats message + /// Name of new activity + /// Optional tags to add to the activity + /// Returns child or null if no listeners. + /// + /// Consider setting Activity. to the returned value so that + /// the context flows to any child activities created. + /// + public static Activity? StartChildActivity( + this in NatsJSMsg msg, + [CallerMemberName] string name = "", + IEnumerable>? tags = null) + => Telemetry.NatsActivities.StartActivity( + name, + kind: ActivityKind.Internal, + parentContext: GetActivityContext(in msg), + tags: tags); +} diff --git a/src/NATS.Client.JetStream/NatsJSOrderedConsumer.cs b/src/NATS.Client.JetStream/NatsJSOrderedConsumer.cs index 3531de7b2..2c9925d10 100644 --- a/src/NATS.Client.JetStream/NatsJSOrderedConsumer.cs +++ b/src/NATS.Client.JetStream/NatsJSOrderedConsumer.cs @@ -283,7 +283,7 @@ private async Task RecreateConsumer(string consumer, ulong seq, try { - await _context.DeleteConsumerAsync(_stream, consumer, cancellationToken); + await _context.DeleteConsumerAsync(Telemetry.NatsInternalActivities, _stream, consumer, cancellationToken); break; } catch (NatsJSApiNoResponseException) @@ -332,7 +332,7 @@ private async ValueTask TryDeleteConsumer(string consumerName, Cancellatio { try { - return await _context.DeleteConsumerAsync(_stream, consumerName, cancellationToken); + return await _context.DeleteConsumerAsync(Telemetry.NatsInternalActivities, _stream, consumerName, cancellationToken); } catch { diff --git a/src/NATS.Client.JetStream/NatsJSStream.cs b/src/NATS.Client.JetStream/NatsJSStream.cs index 0e1e6ad13..48ee1501e 100644 --- a/src/NATS.Client.JetStream/NatsJSStream.cs +++ b/src/NATS.Client.JetStream/NatsJSStream.cs @@ -163,6 +163,7 @@ public ValueTask DeleteConsumerAsync(string consumer, CancellationToken ca /// Server responded with an error. public async ValueTask RefreshAsync(CancellationToken cancellationToken = default) => Info = await _context.JSRequestResponseAsync( + Telemetry.NatsActivities, subject: $"{_context.Opts.Prefix}.STREAM.INFO.{_name}", request: null, cancellationToken).ConfigureAwait(false); @@ -170,6 +171,7 @@ public async ValueTask RefreshAsync(CancellationToken cancellationToken = defaul public ValueTask> GetDirectAsync(StreamMsgGetRequest request, INatsDeserialize? serializer = default, CancellationToken cancellationToken = default) { return _context.Connection.RequestAsync( + Telemetry.NatsActivities, subject: $"{_context.Opts.Prefix}.DIRECT.GET.{_name}", data: request, requestSerializer: NatsJSJsonSerializer.Default, @@ -179,6 +181,7 @@ public ValueTask> GetDirectAsync(StreamMsgGetRequest request, INat public ValueTask GetAsync(StreamMsgGetRequest request, CancellationToken cancellationToken = default) => _context.JSRequestResponseAsync( + Telemetry.NatsActivities, subject: $"{_context.Opts.Prefix}.STREAM.MSG.GET.{_name}", request: request, cancellationToken); diff --git a/src/NATS.Client.KeyValueStore/Internal/NatsKVWatchSub.cs b/src/NATS.Client.KeyValueStore/Internal/NatsKVWatchSub.cs index fb0d2c7eb..63ad02c88 100644 --- a/src/NATS.Client.KeyValueStore/Internal/NatsKVWatchSub.cs +++ b/src/NATS.Client.KeyValueStore/Internal/NatsKVWatchSub.cs @@ -1,4 +1,5 @@ using System.Buffers; +using System.Diagnostics; using System.Threading.Channels; using NATS.Client.Core; using NATS.Client.JetStream; @@ -10,7 +11,6 @@ internal class NatsKVWatchSub : NatsSubBase private readonly NatsJSContext _context; private readonly CancellationToken _cancellationToken; private readonly NatsConnection _nats; - private readonly NatsHeaderParser _headerParser; private readonly INatsDeserialize _serializer; private readonly ChannelWriter> _commands; @@ -31,7 +31,6 @@ public NatsKVWatchSub( _cancellationToken = cancellationToken; _serializer = serializer; _nats = context.Connection; - _headerParser = _nats.HeaderParser; _commands = commandChannel.Writer; _nats.ConnectionOpened += OnConnectionOpened; } @@ -54,7 +53,19 @@ protected override async ValueTask ReceiveInternalAsync( ReadOnlySequence? headersBuffer, ReadOnlySequence payloadBuffer) { - var msg = new NatsJSMsg(NatsMsg.Build(subject, replyTo, headersBuffer, payloadBuffer, _nats, _headerParser, _serializer), _context); + var msg = new NatsJSMsg( + ParseMsg( + activitySource: Telemetry.NatsInternalActivities, + activityName: "kv_cmd_receive", + subject: subject, + replyTo: replyTo, + headersBuffer, + in payloadBuffer, + Connection, + Connection.HeaderParser, + serializer: _serializer), + _context); + await _commands.WriteAsync(new NatsKVWatchCommandMsg { Command = NatsKVWatchCommand.Msg, Msg = msg }, _cancellationToken).ConfigureAwait(false); } diff --git a/src/NATS.Client.KeyValueStore/NatsKVStore.cs b/src/NATS.Client.KeyValueStore/NatsKVStore.cs index db2010783..cbf19c1fc 100644 --- a/src/NATS.Client.KeyValueStore/NatsKVStore.cs +++ b/src/NATS.Client.KeyValueStore/NatsKVStore.cs @@ -57,7 +57,7 @@ internal NatsKVStore(string bucket, NatsJSContext context, INatsJSStream stream) /// public async ValueTask PutAsync(string key, T value, INatsSerialize? serializer = default, CancellationToken cancellationToken = default) { - var ack = await _context.PublishAsync($"$KV.{Bucket}.{key}", value, serializer: serializer, cancellationToken: cancellationToken); + var ack = await _context.PublishAsync(Telemetry.NatsActivities, $"$KV.{Bucket}.{key}", value, serializer: serializer, cancellationToken: cancellationToken); ack.EnsureSuccess(); return ack.Seq; } @@ -94,7 +94,7 @@ public async ValueTask UpdateAsync(string key, T value, ulong revision try { - var ack = await _context.PublishAsync($"$KV.{Bucket}.{key}", value, headers: headers, serializer: serializer, cancellationToken: cancellationToken); + var ack = await _context.PublishAsync(Telemetry.NatsActivities, $"$KV.{Bucket}.{key}", value, headers: headers, serializer: serializer, cancellationToken: cancellationToken); ack.EnsureSuccess(); return ack.Seq; @@ -135,7 +135,7 @@ public async ValueTask DeleteAsync(string key, NatsKVDeleteOpts? opts = default, try { - var ack = await _context.PublishAsync(subject, null, headers: headers, cancellationToken: cancellationToken); + var ack = await _context.PublishAsync(Telemetry.NatsActivities, subject, null, headers: headers, cancellationToken: cancellationToken); ack.EnsureSuccess(); } catch (NatsJSApiException e) diff --git a/src/NATS.Client.ObjectStore/NatsObjStore.cs b/src/NATS.Client.ObjectStore/NatsObjStore.cs index 6059d7885..22672ad67 100644 --- a/src/NATS.Client.ObjectStore/NatsObjStore.cs +++ b/src/NATS.Client.ObjectStore/NatsObjStore.cs @@ -254,7 +254,7 @@ public async ValueTask PutAsync(ObjectMetadata meta, Stream stre var buffer = memoryOwner.Slice(0, currentChunkSize); // Chunks - var ack = await _context.PublishAsync(GetChunkSubject(nuid), buffer, serializer: NatsRawSerializer>.Default, cancellationToken: cancellationToken); + var ack = await _context.PublishAsync(Telemetry.NatsInternalActivities, GetChunkSubject(nuid), buffer, serializer: NatsRawSerializer>.Default, cancellationToken: cancellationToken); ack.EnsureSuccess(); if (eof) @@ -281,6 +281,7 @@ public async ValueTask PutAsync(ObjectMetadata meta, Stream stre try { await _context.JSRequestResponseAsync( + activitySource: Telemetry.NatsInternalActivities, subject: $"{_context.Opts.Prefix}.STREAM.PURGE.OBJ_{Bucket}", request: new StreamPurgeRequest { @@ -456,6 +457,7 @@ public async ValueTask AddBucketLinkAsync(string link, INatsObjS public async ValueTask SealAsync(CancellationToken cancellationToken = default) { var info = await _context.JSRequestResponseAsync( + activitySource: Telemetry.NatsActivities, subject: $"{_context.Opts.Prefix}.STREAM.INFO.{_stream.Info.Config.Name}", request: null, cancellationToken).ConfigureAwait(false); @@ -464,6 +466,7 @@ public async ValueTask SealAsync(CancellationToken cancellationToken = default) config.Sealed = true; var response = await _context.JSRequestResponseAsync( + activitySource: Telemetry.NatsActivities, subject: $"{_context.Opts.Prefix}.STREAM.UPDATE.{_stream.Info.Config.Name}", request: config, cancellationToken); @@ -645,7 +648,7 @@ public async ValueTask DeleteAsync(string key, CancellationToken cancellationTok private async ValueTask PublishMeta(ObjectMetadata meta, CancellationToken cancellationToken) { - var ack = await _context.PublishAsync(GetMetaSubject(meta.Name), meta, serializer: NatsObjJsonSerializer.Default, headers: NatsRollupHeaders, cancellationToken: cancellationToken); + var ack = await _context.PublishAsync(Telemetry.NatsInternalActivities, GetMetaSubject(meta.Name), meta, serializer: NatsObjJsonSerializer.Default, headers: NatsRollupHeaders, cancellationToken: cancellationToken); ack.EnsureSuccess(); } diff --git a/src/NATS.Client.Services/NatsSvcEndPoint.cs b/src/NATS.Client.Services/NatsSvcEndPoint.cs index b13a722ff..eb9fbe79c 100644 --- a/src/NATS.Client.Services/NatsSvcEndPoint.cs +++ b/src/NATS.Client.Services/NatsSvcEndPoint.cs @@ -190,7 +190,17 @@ protected override ValueTask ReceiveInternalAsync( Exception? exception; try { - msg = NatsMsg.Build(subject, replyTo, headersBuffer, payloadBuffer, _nats, _nats.HeaderParser, _serializer); + msg = ParseMsg( + activitySource: Telemetry.NatsActivities, + activityName: "svc_receive", + subject: subject, + replyTo: replyTo, + headersBuffer, + in payloadBuffer, + Connection, + Connection.HeaderParser, + serializer: _serializer); + exception = null; } catch (Exception e) diff --git a/src/NATS.Client.Services/NatsSvcMsg.cs b/src/NATS.Client.Services/NatsSvcMsg.cs index d3646b821..f5fe966d4 100644 --- a/src/NATS.Client.Services/NatsSvcMsg.cs +++ b/src/NATS.Client.Services/NatsSvcMsg.cs @@ -1,3 +1,4 @@ +using System.Diagnostics; using NATS.Client.Core; namespace NATS.Client.Services; @@ -24,6 +25,12 @@ public NatsSvcMsg(NatsMsg msg, NatsSvcEndpointBase? endPoint, Exception? exce _endPoint = endPoint; } + /// + /// Activity used to trace the receiving of the this message. It can be used to create child activities under this context. + /// + /// + public Activity? Activity => _msg.Activity; + /// /// Optional exception if there were any errors. /// diff --git a/src/NATS.Client.Services/NatsSvcMsgTelemetryExtensions.cs b/src/NATS.Client.Services/NatsSvcMsgTelemetryExtensions.cs new file mode 100644 index 000000000..7b8eca18c --- /dev/null +++ b/src/NATS.Client.Services/NatsSvcMsgTelemetryExtensions.cs @@ -0,0 +1,37 @@ +using System.Diagnostics; +using System.Runtime.CompilerServices; +using NATS.Client.Core; + +namespace NATS.Client.Services; + +public static class NatsSvcMsgTelemetryExtensions +{ + /// + /// Get the activity context associated with a NatsMsg. + /// + /// + /// Returns the from the 'receive' on . + /// The value will be default if no activity is present. + /// + public static ActivityContext GetActivityContext(this in NatsSvcMsg msg) + => msg.Activity?.Context ?? default; + + /// Start a child activity under the NatsMsg associated activity. + /// Nats message + /// Name of new activity + /// Optional tags to add to the activity + /// Returns child or null if no listeners. + /// + /// Consider setting Activity. to the returned value so that + /// the context flows to any child activities created. + /// + public static Activity? StartChildActivity( + this in NatsSvcMsg msg, + [CallerMemberName] string name = "", + IEnumerable>? tags = null) + => Telemetry.NatsActivities.StartActivity( + name, + kind: ActivityKind.Internal, + parentContext: GetActivityContext(in msg), + tags: tags); +} diff --git a/src/NATS.Net.OpenTelemetry/NATS.Net.OpenTelemetry.csproj b/src/NATS.Net.OpenTelemetry/NATS.Net.OpenTelemetry.csproj new file mode 100644 index 000000000..89f1cf38b --- /dev/null +++ b/src/NATS.Net.OpenTelemetry/NATS.Net.OpenTelemetry.csproj @@ -0,0 +1,27 @@ + + + + net6.0;net8.0 + enable + enable + true + true + + + pubsub;messaging;opentelemetry + NATS client OpenTelemetry extensions. + + + false + + + + + + + + + + + + diff --git a/src/NATS.Net.OpenTelemetry/OpenTelemetryTracingExtensions.cs b/src/NATS.Net.OpenTelemetry/OpenTelemetryTracingExtensions.cs new file mode 100644 index 000000000..3e64c7c0c --- /dev/null +++ b/src/NATS.Net.OpenTelemetry/OpenTelemetryTracingExtensions.cs @@ -0,0 +1,23 @@ +using NATS.Client.Core; + +// ReSharper disable once CheckNamespace - Place in OTEL namespace for discoverability +namespace OpenTelemetry.Trace; + +public static class OpenTelemetryTracingExtensions +{ + /// + /// Enables trace collection on activity sources from the NATS.Client nuget package. + /// + /// being configured. + /// Include traces from internal messaging used for control flow and lower level usage during high level abstractions (e.g. JetStream, KvStore, etc) + /// The same instance of to chain the calls. + public static TracerProviderBuilder AddNatsInstrumentation(this TracerProviderBuilder builder, bool includeInternal = false) + { + builder.AddSource("NATS.Client"); + + if (includeInternal) + builder.AddSource("NATS.Client.Internal"); + + return builder; + } +} diff --git a/tests/NATS.Client.Core.Tests/RequestReplyTest.cs b/tests/NATS.Client.Core.Tests/RequestReplyTest.cs index 9bcc86b3c..1d628d701 100644 --- a/tests/NATS.Client.Core.Tests/RequestReplyTest.cs +++ b/tests/NATS.Client.Core.Tests/RequestReplyTest.cs @@ -150,7 +150,7 @@ public async Task Request_reply_many_test_overall_timeout() var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var opts = new NatsSubOpts { Timeout = TimeSpan.FromSeconds(4) }; await using var rep = - await nats.RequestSubAsync("foo", 4, replyOpts: opts, cancellationToken: cts.Token); + await nats.RequestSubAsync(Telemetry.NatsActivities, "foo", 4, replyOpts: opts, cancellationToken: cts.Token); await foreach (var msg in rep.Msgs.ReadAllAsync(cts.Token)) { Assert.Equal(results[count++], msg.Data); @@ -184,7 +184,7 @@ public async Task Request_reply_many_test_idle_timeout() var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var opts = new NatsSubOpts { IdleTimeout = TimeSpan.FromSeconds(3) }; await using var rep = - await nats.RequestSubAsync("foo", 3, replyOpts: opts, cancellationToken: cts.Token); + await nats.RequestSubAsync(Telemetry.NatsActivities, "foo", 3, replyOpts: opts, cancellationToken: cts.Token); await foreach (var msg in rep.Msgs.ReadAllAsync(cts.Token)) { Assert.Equal(results[count++], msg.Data); @@ -214,7 +214,7 @@ public async Task Request_reply_many_test_start_up_timeout() var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var opts = new NatsSubOpts { StartUpTimeout = TimeSpan.FromSeconds(1) }; await using var rep = - await nats.RequestSubAsync("foo", 2, replyOpts: opts, cancellationToken: cts.Token); + await nats.RequestSubAsync(Telemetry.NatsActivities, "foo", 2, replyOpts: opts, cancellationToken: cts.Token); await foreach (var msg in rep.Msgs.ReadAllAsync(cts.Token)) { count++; @@ -247,7 +247,7 @@ public async Task Request_reply_many_test_max_count() var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var opts = new NatsSubOpts { MaxMsgs = 2 }; await using var rep = - await nats.RequestSubAsync("foo", 1, replyOpts: opts, cancellationToken: cts.Token); + await nats.RequestSubAsync(Telemetry.NatsActivities, "foo", 1, replyOpts: opts, cancellationToken: cts.Token); await foreach (var msg in rep.Msgs.ReadAllAsync(cts.Token)) { Assert.Equal(results[count++], msg.Data);