diff --git a/.editorconfig b/.editorconfig
index eac0d4922..40d253ffb 100644
--- a/.editorconfig
+++ b/.editorconfig
@@ -8,6 +8,9 @@ trim_trailing_whitespace = true
insert_final_newline = true
end_of_line = lf
+[*.json]
+indent_size = 2
+
[src/NATS.Client.Core/NaCl/**.cs]
generated_code = true
diff --git a/NATS.Client.sln b/NATS.Client.sln
index b98aea563..5e14b18f2 100644
--- a/NATS.Client.sln
+++ b/NATS.Client.sln
@@ -6,6 +6,10 @@ MinimumVisualStudioVersion = 10.0.40219.1
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{4827B3EC-73D8-436D-AE2A-5E29AC95FD0C}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "sandbox", "sandbox", "{95A69671-16CA-4133-981C-CC381B7AAA30}"
+ ProjectSection(SolutionItems) = preProject
+ sandbox\shell.nix = sandbox\shell.nix
+ sandbox\Directory.Build.props = sandbox\Directory.Build.props
+ EndProjectSection
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "tests", "tests", "{C526E8AB-739A-48D7-8FC4-048978C9B650}"
EndProject
@@ -101,6 +105,10 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NATS.Net.DocsExamples", "te
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NATS.Client.Hosting.Tests", "tests\NATS.Client.Hosting.Tests\NATS.Client.Hosting.Tests.csproj", "{766C2486-34C3-4DD1-B31C-540C17C044B0}"
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NATS.Net.OpenTelemetry", "src\NATS.Net.OpenTelemetry\NATS.Net.OpenTelemetry.csproj", "{EC6A3E38-3906-4720-921D-BA1E439B4DE7}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Example.Core", "sandbox\Example.Core\Example.Core.csproj", "{8BFBB88D-7596-44B9-BF67-5402A662F244}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -267,6 +275,14 @@ Global
{766C2486-34C3-4DD1-B31C-540C17C044B0}.Debug|Any CPU.Build.0 = Debug|Any CPU
{766C2486-34C3-4DD1-B31C-540C17C044B0}.Release|Any CPU.ActiveCfg = Release|Any CPU
{766C2486-34C3-4DD1-B31C-540C17C044B0}.Release|Any CPU.Build.0 = Release|Any CPU
+ {EC6A3E38-3906-4720-921D-BA1E439B4DE7}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {EC6A3E38-3906-4720-921D-BA1E439B4DE7}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {EC6A3E38-3906-4720-921D-BA1E439B4DE7}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {EC6A3E38-3906-4720-921D-BA1E439B4DE7}.Release|Any CPU.Build.0 = Release|Any CPU
+ {8BFBB88D-7596-44B9-BF67-5402A662F244}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {8BFBB88D-7596-44B9-BF67-5402A662F244}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {8BFBB88D-7596-44B9-BF67-5402A662F244}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {8BFBB88D-7596-44B9-BF67-5402A662F244}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -313,6 +329,8 @@ Global
{6A7B9B9F-BFA4-4A6D-9006-0AAF597FC6DD} = {4827B3EC-73D8-436D-AE2A-5E29AC95FD0C}
{389C05EB-A0B3-4097-8C1F-4D55818438CC} = {C526E8AB-739A-48D7-8FC4-048978C9B650}
{766C2486-34C3-4DD1-B31C-540C17C044B0} = {C526E8AB-739A-48D7-8FC4-048978C9B650}
+ {EC6A3E38-3906-4720-921D-BA1E439B4DE7} = {4827B3EC-73D8-436D-AE2A-5E29AC95FD0C}
+ {8BFBB88D-7596-44B9-BF67-5402A662F244} = {95A69671-16CA-4133-981C-CC381B7AAA30}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {8CBB7278-D093-448E-B3DE-B5991209A1AA}
diff --git a/sandbox/BlazorWasm/Client/Properties/launchSettings.json b/sandbox/BlazorWasm/Client/Properties/launchSettings.json
index 5173d8631..09b977cf2 100644
--- a/sandbox/BlazorWasm/Client/Properties/launchSettings.json
+++ b/sandbox/BlazorWasm/Client/Properties/launchSettings.json
@@ -1,4 +1,5 @@
{
+ "$schema": "http://json.schemastore.org/launchsettings.json",
"profiles": {
"BlazorWasm": {
"commandName": "Project",
@@ -7,7 +8,8 @@
"inspectUri": "{wsProtocol}://{url.hostname}:{url.port}/_framework/debug/ws-proxy?browser={browserInspectUri}",
"applicationUrl": "http://localhost:5000",
"environmentVariables": {
- "ASPNETCORE_ENVIRONMENT": "Development"
+ "ASPNETCORE_ENVIRONMENT": "Development",
+ "OTEL_SERVICE_NAME": "BlazorWasm"
}
}
}
diff --git a/sandbox/BlazorWasm/Server/BlazorWasm.Server.csproj b/sandbox/BlazorWasm/Server/BlazorWasm.Server.csproj
index 46ebcd35b..f9ad84c00 100644
--- a/sandbox/BlazorWasm/Server/BlazorWasm.Server.csproj
+++ b/sandbox/BlazorWasm/Server/BlazorWasm.Server.csproj
@@ -10,9 +10,12 @@
+
+
+
diff --git a/sandbox/BlazorWasm/Server/Program.cs b/sandbox/BlazorWasm/Server/Program.cs
index 99572340d..82821a89d 100644
--- a/sandbox/BlazorWasm/Server/Program.cs
+++ b/sandbox/BlazorWasm/Server/Program.cs
@@ -1,10 +1,19 @@
using BlazorWasm.Server.NatsServices;
-using NATS.Client.Core;
+using Example.Core;
using NATS.Client.Hosting;
using NATS.Client.Serializers.Json;
+using OpenTelemetry.Trace;
+
+TracingSetup.SetSandboxEnv();
var builder = WebApplication.CreateBuilder(args);
+builder.Services.AddOpenTelemetry()
+ .WithTracing(o => o
+ .AddAspNetCoreInstrumentation()
+ .AddNatsInstrumentation()
+ .AddOtlpExporter());
+
// Add services to the container.
builder.Services.AddControllersWithViews();
builder.Services.AddRazorPages();
diff --git a/sandbox/BlazorWasm/Server/Properties/launchSettings.json b/sandbox/BlazorWasm/Server/Properties/launchSettings.json
index c15d742e7..6fc5313e6 100644
--- a/sandbox/BlazorWasm/Server/Properties/launchSettings.json
+++ b/sandbox/BlazorWasm/Server/Properties/launchSettings.json
@@ -1,14 +1,16 @@
{
- "profiles": {
- "BlazorWasm.Server": {
- "commandName": "Project",
- "dotnetRunMessages": true,
- "launchBrowser": true,
- "inspectUri": "{wsProtocol}://{url.hostname}:{url.port}/_framework/debug/ws-proxy?browser={browserInspectUri}",
- "applicationUrl": "http://localhost:5000",
- "environmentVariables": {
- "ASPNETCORE_ENVIRONMENT": "Development"
- }
+ "$schema": "http://json.schemastore.org/launchsettings.json",
+ "profiles": {
+ "BlazorWasm.Server": {
+ "commandName": "Project",
+ "dotnetRunMessages": true,
+ "launchBrowser": true,
+ "inspectUri": "{wsProtocol}://{url.hostname}:{url.port}/_framework/debug/ws-proxy?browser={browserInspectUri}",
+ "applicationUrl": "http://localhost:5000",
+ "environmentVariables": {
+ "ASPNETCORE_ENVIRONMENT": "Development",
+ "OTEL_SERVICE_NAME": "BlazorWasm.Server"
}
}
}
+}
diff --git a/sandbox/ConsoleApp/Properties/launchSettings.json b/sandbox/ConsoleApp/Properties/launchSettings.json
new file mode 100644
index 000000000..788798196
--- /dev/null
+++ b/sandbox/ConsoleApp/Properties/launchSettings.json
@@ -0,0 +1,13 @@
+{
+ "$schema": "http://json.schemastore.org/launchsettings.json",
+ "profiles": {
+ "ConsoleApp": {
+ "commandName": "Project",
+ "dotnetRunMessages": true,
+ "environmentVariables": {
+ "ASPNETCORE_ENVIRONMENT": "Development",
+ "OTEL_SERVICE_NAME": "ConsoleApp"
+ }
+ }
+ }
+}
diff --git a/sandbox/Directory.Build.props b/sandbox/Directory.Build.props
new file mode 100644
index 000000000..1845877f9
--- /dev/null
+++ b/sandbox/Directory.Build.props
@@ -0,0 +1,12 @@
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/sandbox/Example.Core.PublishHeaders/Example.Core.PublishHeaders.csproj b/sandbox/Example.Core.PublishHeaders/Example.Core.PublishHeaders.csproj
index 8e64548a3..5721cc952 100644
--- a/sandbox/Example.Core.PublishHeaders/Example.Core.PublishHeaders.csproj
+++ b/sandbox/Example.Core.PublishHeaders/Example.Core.PublishHeaders.csproj
@@ -11,6 +11,7 @@
+
diff --git a/sandbox/Example.Core.PublishHeaders/Program.cs b/sandbox/Example.Core.PublishHeaders/Program.cs
index 9b42b2faf..eb7aa4f58 100644
--- a/sandbox/Example.Core.PublishHeaders/Program.cs
+++ b/sandbox/Example.Core.PublishHeaders/Program.cs
@@ -1,8 +1,12 @@
// > nats sub bar.*
+
+using Example.Core;
using Microsoft.Extensions.Logging;
using NATS.Client.Core;
using NATS.Client.Serializers.Json;
+using var tracer = TracingSetup.RunSandboxTracing();
+
var subject = "bar.xyz";
var options = NatsOpts.Default with
{
diff --git a/sandbox/Example.Core.PublishHeaders/Properties/launchSettings.json b/sandbox/Example.Core.PublishHeaders/Properties/launchSettings.json
new file mode 100644
index 000000000..fa436250a
--- /dev/null
+++ b/sandbox/Example.Core.PublishHeaders/Properties/launchSettings.json
@@ -0,0 +1,13 @@
+{
+ "$schema": "http://json.schemastore.org/launchsettings.json",
+ "profiles": {
+ "Example.Core.PublishHeaders": {
+ "commandName": "Project",
+ "dotnetRunMessages": true,
+ "environmentVariables": {
+ "ASPNETCORE_ENVIRONMENT": "Development",
+ "OTEL_SERVICE_NAME": "Example.Core.PublishHeaders"
+ }
+ }
+ }
+}
diff --git a/sandbox/Example.Core.PublishModel/Example.Core.PublishModel.csproj b/sandbox/Example.Core.PublishModel/Example.Core.PublishModel.csproj
index 0da21d55d..69905a6f9 100644
--- a/sandbox/Example.Core.PublishModel/Example.Core.PublishModel.csproj
+++ b/sandbox/Example.Core.PublishModel/Example.Core.PublishModel.csproj
@@ -11,6 +11,9 @@
+
+
+
diff --git a/sandbox/Example.Core.PublishModel/Program.cs b/sandbox/Example.Core.PublishModel/Program.cs
index d9ca83cbf..c780ccc5a 100644
--- a/sandbox/Example.Core.PublishModel/Program.cs
+++ b/sandbox/Example.Core.PublishModel/Program.cs
@@ -1,8 +1,12 @@
// > nats sub bar.*
+
+using Example.Core;
using Microsoft.Extensions.Logging;
using NATS.Client.Core;
using NATS.Client.Serializers.Json;
+using var tracer = TracingSetup.RunSandboxTracing();
+
var subject = "bar.xyz";
var options = NatsOpts.Default with
{
@@ -13,6 +17,7 @@
Print("[CON] Connecting...\n");
await using var connection = new NatsConnection(options);
+await connection.ConnectAsync();
for (var i = 0; i < 10; i++)
{
diff --git a/sandbox/Example.Core.PublishModel/Properties/launchSettings.json b/sandbox/Example.Core.PublishModel/Properties/launchSettings.json
new file mode 100644
index 000000000..c8dff80e6
--- /dev/null
+++ b/sandbox/Example.Core.PublishModel/Properties/launchSettings.json
@@ -0,0 +1,13 @@
+{
+ "$schema": "http://json.schemastore.org/launchsettings.json",
+ "profiles": {
+ "Example.Core.PublishModel": {
+ "commandName": "Project",
+ "dotnetRunMessages": true,
+ "environmentVariables": {
+ "ASPNETCORE_ENVIRONMENT": "Development",
+ "OTEL_SERVICE_NAME": "Example.Core.PublishModel"
+ }
+ }
+ }
+}
diff --git a/sandbox/Example.Core.SubscribeHeaders/Example.Core.SubscribeHeaders.csproj b/sandbox/Example.Core.SubscribeHeaders/Example.Core.SubscribeHeaders.csproj
index 0da21d55d..beda6c3bb 100644
--- a/sandbox/Example.Core.SubscribeHeaders/Example.Core.SubscribeHeaders.csproj
+++ b/sandbox/Example.Core.SubscribeHeaders/Example.Core.SubscribeHeaders.csproj
@@ -11,6 +11,7 @@
+
diff --git a/sandbox/Example.Core.SubscribeHeaders/Program.cs b/sandbox/Example.Core.SubscribeHeaders/Program.cs
index 3994f1844..46d549aa8 100644
--- a/sandbox/Example.Core.SubscribeHeaders/Program.cs
+++ b/sandbox/Example.Core.SubscribeHeaders/Program.cs
@@ -1,9 +1,12 @@
// > nats pub bar.xyz --count=10 "my_message_{{ Count }}" -H X-Foo:Baz
+using Example.Core;
using Microsoft.Extensions.Logging;
using NATS.Client.Core;
using NATS.Client.Serializers.Json;
+using var tracer = TracingSetup.RunSandboxTracing();
+
var subject = "bar.*";
var options = NatsOpts.Default with
{
diff --git a/sandbox/Example.Core.SubscribeHeaders/Properties/launchSettings.json b/sandbox/Example.Core.SubscribeHeaders/Properties/launchSettings.json
new file mode 100644
index 000000000..f2387fdc5
--- /dev/null
+++ b/sandbox/Example.Core.SubscribeHeaders/Properties/launchSettings.json
@@ -0,0 +1,13 @@
+{
+ "$schema": "http://json.schemastore.org/launchsettings.json",
+ "profiles": {
+ "Example.Core.SubscribeHeaders": {
+ "commandName": "Project",
+ "dotnetRunMessages": true,
+ "environmentVariables": {
+ "ASPNETCORE_ENVIRONMENT": "Development",
+ "OTEL_SERVICE_NAME": "Example.Core.SubscribeHeaders"
+ }
+ }
+ }
+}
diff --git a/sandbox/Example.Core.SubscribeModel/Example.Core.SubscribeModel.csproj b/sandbox/Example.Core.SubscribeModel/Example.Core.SubscribeModel.csproj
index 0da21d55d..beda6c3bb 100644
--- a/sandbox/Example.Core.SubscribeModel/Example.Core.SubscribeModel.csproj
+++ b/sandbox/Example.Core.SubscribeModel/Example.Core.SubscribeModel.csproj
@@ -11,6 +11,7 @@
+
diff --git a/sandbox/Example.Core.SubscribeModel/Program.cs b/sandbox/Example.Core.SubscribeModel/Program.cs
index cdc4e5a87..b528ee168 100644
--- a/sandbox/Example.Core.SubscribeModel/Program.cs
+++ b/sandbox/Example.Core.SubscribeModel/Program.cs
@@ -1,7 +1,10 @@
+using Example.Core;
using Microsoft.Extensions.Logging;
using NATS.Client.Core;
using NATS.Client.Serializers.Json;
+using var tracer = TracingSetup.RunSandboxTracing();
+
var subject = "bar.*";
var options = NatsOpts.Default with
{
@@ -17,6 +20,7 @@
await foreach (var msg in connection.SubscribeAsync(subject))
{
+ using var activity = msg.StartChildActivity();
Print($"[RCV] {msg.Subject}: {msg.Data}\n");
}
diff --git a/sandbox/Example.Core.SubscribeModel/Properties/launchSettings.json b/sandbox/Example.Core.SubscribeModel/Properties/launchSettings.json
new file mode 100644
index 000000000..ecb6ffe9d
--- /dev/null
+++ b/sandbox/Example.Core.SubscribeModel/Properties/launchSettings.json
@@ -0,0 +1,13 @@
+{
+ "$schema": "http://json.schemastore.org/launchsettings.json",
+ "profiles": {
+ "Example.Core.SubscribeModel": {
+ "commandName": "Project",
+ "dotnetRunMessages": true,
+ "environmentVariables": {
+ "ASPNETCORE_ENVIRONMENT": "Development",
+ "OTEL_SERVICE_NAME": "Example.Core.SubscribeModel"
+ }
+ }
+ }
+}
diff --git a/sandbox/Example.Core.SubscribeQueueGroup/Example.Core.SubscribeQueueGroup.csproj b/sandbox/Example.Core.SubscribeQueueGroup/Example.Core.SubscribeQueueGroup.csproj
index f0214d692..61bbab9b8 100644
--- a/sandbox/Example.Core.SubscribeQueueGroup/Example.Core.SubscribeQueueGroup.csproj
+++ b/sandbox/Example.Core.SubscribeQueueGroup/Example.Core.SubscribeQueueGroup.csproj
@@ -10,6 +10,7 @@
+
diff --git a/sandbox/Example.Core.SubscribeQueueGroup/Program.cs b/sandbox/Example.Core.SubscribeQueueGroup/Program.cs
index 4dbbe7792..8a7226086 100644
--- a/sandbox/Example.Core.SubscribeQueueGroup/Program.cs
+++ b/sandbox/Example.Core.SubscribeQueueGroup/Program.cs
@@ -1,7 +1,11 @@
// > nats pub foo.xyz --count=10 "my_message_{{ Count }}"
+
+using Example.Core;
using Microsoft.Extensions.Logging;
using NATS.Client.Core;
+using var tracer = TracingSetup.RunSandboxTracing();
+
var subject = "foo.*";
var options = NatsOpts.Default with { LoggerFactory = LoggerFactory.Create(builder => builder.AddConsole()) };
diff --git a/sandbox/Example.Core.SubscribeQueueGroup/Properties/launchSettings.json b/sandbox/Example.Core.SubscribeQueueGroup/Properties/launchSettings.json
new file mode 100644
index 000000000..49da51589
--- /dev/null
+++ b/sandbox/Example.Core.SubscribeQueueGroup/Properties/launchSettings.json
@@ -0,0 +1,13 @@
+{
+ "$schema": "http://json.schemastore.org/launchsettings.json",
+ "profiles": {
+ "Example.Core.SubscribeQueueGroup": {
+ "commandName": "Project",
+ "dotnetRunMessages": true,
+ "environmentVariables": {
+ "ASPNETCORE_ENVIRONMENT": "Development",
+ "OTEL_SERVICE_NAME": "Example.Core.SubscribeQueueGroup"
+ }
+ }
+ }
+}
diff --git a/sandbox/Example.Core.SubscribeRaw/Example.Core.SubscribeRaw.csproj b/sandbox/Example.Core.SubscribeRaw/Example.Core.SubscribeRaw.csproj
index b709bad71..01c4aeb48 100644
--- a/sandbox/Example.Core.SubscribeRaw/Example.Core.SubscribeRaw.csproj
+++ b/sandbox/Example.Core.SubscribeRaw/Example.Core.SubscribeRaw.csproj
@@ -11,6 +11,7 @@
+
diff --git a/sandbox/Example.Core.SubscribeRaw/Program.cs b/sandbox/Example.Core.SubscribeRaw/Program.cs
index b80832d96..0e55816fc 100644
--- a/sandbox/Example.Core.SubscribeRaw/Program.cs
+++ b/sandbox/Example.Core.SubscribeRaw/Program.cs
@@ -1,7 +1,10 @@
using System.Text;
+using Example.Core;
using Microsoft.Extensions.Logging;
using NATS.Client.Core;
+using var tracer = TracingSetup.RunSandboxTracing();
+
var subject = "foo.*";
var options = NatsOpts.Default with { LoggerFactory = LoggerFactory.Create(builder => builder.AddConsole()) };
diff --git a/sandbox/Example.Core.SubscribeRaw/Properties/launchSettings.json b/sandbox/Example.Core.SubscribeRaw/Properties/launchSettings.json
new file mode 100644
index 000000000..ef4c62ca6
--- /dev/null
+++ b/sandbox/Example.Core.SubscribeRaw/Properties/launchSettings.json
@@ -0,0 +1,13 @@
+{
+ "$schema": "http://json.schemastore.org/launchsettings.json",
+ "profiles": {
+ "Example.Core.SubscribeRaw": {
+ "commandName": "Project",
+ "dotnetRunMessages": true,
+ "environmentVariables": {
+ "ASPNETCORE_ENVIRONMENT": "Development",
+ "OTEL_SERVICE_NAME": "Example.Core.SubscribeRaw"
+ }
+ }
+ }
+}
diff --git a/sandbox/Example.Core/Example.Core.csproj b/sandbox/Example.Core/Example.Core.csproj
new file mode 100644
index 000000000..483f59a68
--- /dev/null
+++ b/sandbox/Example.Core/Example.Core.csproj
@@ -0,0 +1,13 @@
+
+
+
+ net6.0
+ enable
+ enable
+
+
+
+
+
+
+
diff --git a/sandbox/Example.Core/TracingSetup.cs b/sandbox/Example.Core/TracingSetup.cs
new file mode 100644
index 000000000..34a12c054
--- /dev/null
+++ b/sandbox/Example.Core/TracingSetup.cs
@@ -0,0 +1,36 @@
+using System.Reflection;
+using OpenTelemetry.Resources;
+using OpenTelemetry.Trace;
+
+namespace Example.Core;
+
+public static class TracingSetup
+{
+ public static void SetSandboxEnv()
+ {
+ var instanceId = Guid.NewGuid().ToString();
+ var assemblyName = Assembly.GetEntryAssembly()!.GetName().Name;
+ Environment.SetEnvironmentVariable("OTEL_SERVICE_NAME", assemblyName);
+ Environment.SetEnvironmentVariable("OTEL_RESOURCE_ATTRIBUTES", $"service.instance.id={instanceId}");
+ Environment.SetEnvironmentVariable("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:16023"); // set to an OTLP endpoint
+ }
+
+ public static TracerProvider RunSandboxTracing(bool console = false, bool internalTraces = false)
+ {
+ SetSandboxEnv();
+ return new TracerProviderBuilderBase()
+ .ConfigureResource(o => o.AddTelemetrySdk())
+ .AddNatsInstrumentation(includeInternal: internalTraces)
+ .MaybeAddInternalSource(internalTraces)
+ .AddOtlpExporter()
+ .MaybeAddConsoleExporter(console)
+ .Build()
+ ?? throw new Exception("Tracer provider build returned null.");
+ }
+
+ private static TracerProviderBuilder MaybeAddConsoleExporter(this TracerProviderBuilder builder, bool console)
+ => console ? builder.AddConsoleExporter() : builder;
+
+ private static TracerProviderBuilder MaybeAddInternalSource(this TracerProviderBuilder builder, bool internalTraces)
+ => internalTraces ? builder.AddSource("NATS.Client.Internal") : builder;
+}
diff --git a/sandbox/Example.JetStream.PullConsumer/Example.JetStream.PullConsumer.csproj b/sandbox/Example.JetStream.PullConsumer/Example.JetStream.PullConsumer.csproj
index 5bba973ed..fb2051cc5 100644
--- a/sandbox/Example.JetStream.PullConsumer/Example.JetStream.PullConsumer.csproj
+++ b/sandbox/Example.JetStream.PullConsumer/Example.JetStream.PullConsumer.csproj
@@ -10,6 +10,7 @@
+
diff --git a/sandbox/Example.JetStream.PullConsumer/Program.cs b/sandbox/Example.JetStream.PullConsumer/Program.cs
index 9daf531db..b08ab17a4 100644
--- a/sandbox/Example.JetStream.PullConsumer/Program.cs
+++ b/sandbox/Example.JetStream.PullConsumer/Program.cs
@@ -1,10 +1,13 @@
using System.Diagnostics;
using System.Text;
+using Example.Core;
using Microsoft.Extensions.Logging;
using NATS.Client.Core;
using NATS.Client.JetStream;
using NATS.Client.JetStream.Models;
+using var tracer = TracingSetup.RunSandboxTracing(internalTraces: false);
+
var cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) =>
@@ -82,6 +85,7 @@ void Report(int i, Stopwatch sw, string data)
// NoWaitFetch is a specialized operation not available on the public interface.
await foreach (var msg in ((NatsJSConsumer)consumer).FetchNoWaitAsync>(opts: fetchNoWaitOpts, cancellationToken: cts.Token))
{
+ using var activity = msg.StartChildActivity();
fetchMsgCount++;
using (msg.Data)
{
@@ -120,6 +124,7 @@ void Report(int i, Stopwatch sw, string data)
await consumer.RefreshAsync(cts.Token);
await foreach (var msg in consumer.FetchAsync>(opts: fetchOpts, cancellationToken: cts.Token))
{
+ using var activity = msg.StartChildActivity();
using (msg.Data)
{
var message = Encoding.ASCII.GetString(msg.Data.Span);
@@ -151,6 +156,7 @@ void Report(int i, Stopwatch sw, string data)
var next = await consumer.NextAsync>(opts: nextOpts, cancellationToken: cts.Token);
if (next is { } msg)
{
+ using var activity = msg.StartChildActivity();
using (msg.Data)
{
var message = Encoding.ASCII.GetString(msg.Data.Span);
@@ -183,6 +189,7 @@ void Report(int i, Stopwatch sw, string data)
var consumeStop = CancellationTokenSource.CreateLinkedTokenSource(cts.Token);
await foreach (var msg in consumer.ConsumeAsync>(opts: consumeOpts, cancellationToken: consumeStop.Token))
{
+ using var activity = msg.StartChildActivity();
using (msg.Data)
{
var message = Encoding.ASCII.GetString(msg.Data.Span);
diff --git a/sandbox/Example.JetStream.PullConsumer/Properties/launchSettings.json b/sandbox/Example.JetStream.PullConsumer/Properties/launchSettings.json
new file mode 100644
index 000000000..d6babddc7
--- /dev/null
+++ b/sandbox/Example.JetStream.PullConsumer/Properties/launchSettings.json
@@ -0,0 +1,13 @@
+{
+ "$schema": "http://json.schemastore.org/launchsettings.json",
+ "profiles": {
+ "Example.JetStream.PullConsumer": {
+ "commandName": "Project",
+ "dotnetRunMessages": true,
+ "environmentVariables": {
+ "ASPNETCORE_ENVIRONMENT": "Development",
+ "OTEL_SERVICE_NAME": "Example.JetStream.PullConsumer"
+ }
+ }
+ }
+}
diff --git a/sandbox/Example.KeyValueStore.Watcher/Example.KeyValueStore.Watcher.csproj b/sandbox/Example.KeyValueStore.Watcher/Example.KeyValueStore.Watcher.csproj
index afe6c77db..6c5ef1e15 100644
--- a/sandbox/Example.KeyValueStore.Watcher/Example.KeyValueStore.Watcher.csproj
+++ b/sandbox/Example.KeyValueStore.Watcher/Example.KeyValueStore.Watcher.csproj
@@ -10,6 +10,7 @@
+
diff --git a/sandbox/Example.KeyValueStore.Watcher/Program.cs b/sandbox/Example.KeyValueStore.Watcher/Program.cs
index a301a0f4e..abf857756 100644
--- a/sandbox/Example.KeyValueStore.Watcher/Program.cs
+++ b/sandbox/Example.KeyValueStore.Watcher/Program.cs
@@ -1,8 +1,11 @@
+using Example.Core;
using Microsoft.Extensions.Logging;
using NATS.Client.Core;
using NATS.Client.JetStream;
using NATS.Client.KeyValueStore;
+using var tracer = TracingSetup.RunSandboxTracing();
+
// var options = NatsOpts.Default with { LoggerFactory = new MinimumConsoleLoggerFactory(LogLevel.Error) };
// var nats = new NatsConnection(options);
var nats = new NatsConnection();
diff --git a/sandbox/Example.KeyValueStore.Watcher/Properties/launchSettings.json b/sandbox/Example.KeyValueStore.Watcher/Properties/launchSettings.json
new file mode 100644
index 000000000..551ffcb1a
--- /dev/null
+++ b/sandbox/Example.KeyValueStore.Watcher/Properties/launchSettings.json
@@ -0,0 +1,13 @@
+{
+ "$schema": "http://json.schemastore.org/launchsettings.json",
+ "profiles": {
+ "Example.KeyValueStore.Watcher": {
+ "commandName": "Project",
+ "dotnetRunMessages": true,
+ "environmentVariables": {
+ "ASPNETCORE_ENVIRONMENT": "Development",
+ "OTEL_SERVICE_NAME": "Example.KeyValueStore.Watcher"
+ }
+ }
+ }
+}
diff --git a/sandbox/Example.NativeAot/Example.NativeAot.csproj b/sandbox/Example.NativeAot/Example.NativeAot.csproj
index e57f5b242..2ae76343c 100644
--- a/sandbox/Example.NativeAot/Example.NativeAot.csproj
+++ b/sandbox/Example.NativeAot/Example.NativeAot.csproj
@@ -12,6 +12,7 @@
+
diff --git a/sandbox/Example.NativeAot/Program.cs b/sandbox/Example.NativeAot/Program.cs
index 6eb00b803..70b82c65f 100644
--- a/sandbox/Example.NativeAot/Program.cs
+++ b/sandbox/Example.NativeAot/Program.cs
@@ -1,11 +1,14 @@
using System.Buffers;
using System.Text;
using System.Text.Json.Serialization;
+using Example.Core;
using Google.Protobuf;
using NATS.Client.Core;
using NATS.Client.JetStream;
using NATS.Client.JetStream.Models;
+using var tracer = TracingSetup.RunSandboxTracing();
+
// string
{
// Same as not specifying a serializer.
diff --git a/sandbox/Example.NativeAot/Properties/launchSettings.json b/sandbox/Example.NativeAot/Properties/launchSettings.json
new file mode 100644
index 000000000..5f1c57dc7
--- /dev/null
+++ b/sandbox/Example.NativeAot/Properties/launchSettings.json
@@ -0,0 +1,13 @@
+{
+ "$schema": "http://json.schemastore.org/launchsettings.json",
+ "profiles": {
+ "Example.NativeAot": {
+ "commandName": "Project",
+ "dotnetRunMessages": true,
+ "environmentVariables": {
+ "ASPNETCORE_ENVIRONMENT": "Development",
+ "OTEL_SERVICE_NAME": "Example.NativeAot"
+ }
+ }
+ }
+}
diff --git a/sandbox/Example.ObjectStore/Example.ObjectStore.csproj b/sandbox/Example.ObjectStore/Example.ObjectStore.csproj
index 5212087a8..3f1e3db58 100644
--- a/sandbox/Example.ObjectStore/Example.ObjectStore.csproj
+++ b/sandbox/Example.ObjectStore/Example.ObjectStore.csproj
@@ -10,6 +10,7 @@
+
diff --git a/sandbox/Example.ObjectStore/Program.cs b/sandbox/Example.ObjectStore/Program.cs
index b9424654f..2322a7fe2 100644
--- a/sandbox/Example.ObjectStore/Program.cs
+++ b/sandbox/Example.ObjectStore/Program.cs
@@ -1,9 +1,12 @@
using System.Security.Cryptography;
+using Example.Core;
using Microsoft.Extensions.Logging;
using NATS.Client.Core;
using NATS.Client.JetStream;
using NATS.Client.ObjectStore;
+using var tracer = TracingSetup.RunSandboxTracing();
+
var opts = NatsOpts.Default with { LoggerFactory = LoggerFactory.Create(builder => builder.AddConsole()) };
var nats = new NatsConnection(opts);
diff --git a/sandbox/Example.ObjectStore/Properties/launchSettings.json b/sandbox/Example.ObjectStore/Properties/launchSettings.json
new file mode 100644
index 000000000..83bd3f302
--- /dev/null
+++ b/sandbox/Example.ObjectStore/Properties/launchSettings.json
@@ -0,0 +1,13 @@
+{
+ "$schema": "http://json.schemastore.org/launchsettings.json",
+ "profiles": {
+ "Example.ObjectStore": {
+ "commandName": "Project",
+ "dotnetRunMessages": true,
+ "environmentVariables": {
+ "ASPNETCORE_ENVIRONMENT": "Development",
+ "OTEL_SERVICE_NAME": "Example.ObjectStore"
+ }
+ }
+ }
+}
diff --git a/sandbox/Example.ProtoBufMessages/Properties/launchSettings.json b/sandbox/Example.ProtoBufMessages/Properties/launchSettings.json
new file mode 100644
index 000000000..878c6f766
--- /dev/null
+++ b/sandbox/Example.ProtoBufMessages/Properties/launchSettings.json
@@ -0,0 +1,13 @@
+{
+ "$schema": "http://json.schemastore.org/launchsettings.json",
+ "profiles": {
+ "Example.ProtoBufMessages": {
+ "commandName": "Project",
+ "dotnetRunMessages": true,
+ "environmentVariables": {
+ "ASPNETCORE_ENVIRONMENT": "Development",
+ "OTEL_SERVICE_NAME": "Example.ProtoBufMessages"
+ }
+ }
+ }
+}
diff --git a/sandbox/Example.Services/Example.Services.csproj b/sandbox/Example.Services/Example.Services.csproj
index 628a2a3d5..b5ef5302f 100644
--- a/sandbox/Example.Services/Example.Services.csproj
+++ b/sandbox/Example.Services/Example.Services.csproj
@@ -10,6 +10,7 @@
+
diff --git a/sandbox/Example.Services/Program.cs b/sandbox/Example.Services/Program.cs
index 694ca2ff7..30f1fc627 100644
--- a/sandbox/Example.Services/Program.cs
+++ b/sandbox/Example.Services/Program.cs
@@ -1,8 +1,11 @@
using System.Text;
+using Example.Core;
using Microsoft.Extensions.Logging;
using NATS.Client.Core;
using NATS.Client.Services;
+using var tracer = TracingSetup.RunSandboxTracing();
+
var opts = NatsOpts.Default with { LoggerFactory = LoggerFactory.Create(builder => builder.AddConsole()) };
var nats = new NatsConnection(opts);
diff --git a/sandbox/Example.Services/Properties/launchSettings.json b/sandbox/Example.Services/Properties/launchSettings.json
new file mode 100644
index 000000000..3b00a7080
--- /dev/null
+++ b/sandbox/Example.Services/Properties/launchSettings.json
@@ -0,0 +1,13 @@
+{
+ "$schema": "http://json.schemastore.org/launchsettings.json",
+ "profiles": {
+ "Example.Services": {
+ "commandName": "Project",
+ "dotnetRunMessages": true,
+ "environmentVariables": {
+ "ASPNETCORE_ENVIRONMENT": "Development",
+ "OTEL_SERVICE_NAME": "Example.Services"
+ }
+ }
+ }
+}
diff --git a/sandbox/Example.TlsFirst/Example.TlsFirst.csproj b/sandbox/Example.TlsFirst/Example.TlsFirst.csproj
index e4a0b5983..95052e21a 100644
--- a/sandbox/Example.TlsFirst/Example.TlsFirst.csproj
+++ b/sandbox/Example.TlsFirst/Example.TlsFirst.csproj
@@ -10,6 +10,15 @@
+
+
+
+
+
+ Never
+ Never
+ true
+
diff --git a/sandbox/Example.TlsFirst/Program.cs b/sandbox/Example.TlsFirst/Program.cs
index 8a4305799..07fb694d5 100644
--- a/sandbox/Example.TlsFirst/Program.cs
+++ b/sandbox/Example.TlsFirst/Program.cs
@@ -1,5 +1,8 @@
+using Example.Core;
using NATS.Client.Core;
+using var tracer = TracingSetup.RunSandboxTracing();
+
// await using var nats = new NatsConnection();
await using var nats = new NatsConnection(NatsOpts.Default with { TlsOpts = new NatsTlsOpts { Mode = TlsMode.Implicit, InsecureSkipVerify = true, } });
await nats.ConnectAsync();
diff --git a/sandbox/Example.TlsFirst/Properties/launchSettings.json b/sandbox/Example.TlsFirst/Properties/launchSettings.json
new file mode 100644
index 000000000..b9e5ce940
--- /dev/null
+++ b/sandbox/Example.TlsFirst/Properties/launchSettings.json
@@ -0,0 +1,13 @@
+{
+ "$schema": "http://json.schemastore.org/launchsettings.json",
+ "profiles": {
+ "Example.TlsFirst": {
+ "commandName": "Project",
+ "dotnetRunMessages": true,
+ "environmentVariables": {
+ "ASPNETCORE_ENVIRONMENT": "Development",
+ "OTEL_SERVICE_NAME": "Example.TlsFirst"
+ }
+ }
+ }
+}
diff --git a/sandbox/MicroBenchmark/Properties/launchSettings.json b/sandbox/MicroBenchmark/Properties/launchSettings.json
new file mode 100644
index 000000000..1a2bf50c4
--- /dev/null
+++ b/sandbox/MicroBenchmark/Properties/launchSettings.json
@@ -0,0 +1,13 @@
+{
+ "$schema": "http://json.schemastore.org/launchsettings.json",
+ "profiles": {
+ "Example.MicroBenchmark": {
+ "commandName": "Project",
+ "dotnetRunMessages": true,
+ "environmentVariables": {
+ "ASPNETCORE_ENVIRONMENT": "Development",
+ "OTEL_SERVICE_NAME": "Example.MicroBenchmark"
+ }
+ }
+ }
+}
diff --git a/sandbox/MinimumWebApp/MinimumWebApp.csproj b/sandbox/MinimumWebApp/MinimumWebApp.csproj
index c3ae6efd3..fdb57f2b8 100644
--- a/sandbox/MinimumWebApp/MinimumWebApp.csproj
+++ b/sandbox/MinimumWebApp/MinimumWebApp.csproj
@@ -11,6 +11,12 @@
+
+
+
+
+
+
diff --git a/sandbox/MinimumWebApp/Program.cs b/sandbox/MinimumWebApp/Program.cs
index 355baf998..4c8ee44d5 100644
--- a/sandbox/MinimumWebApp/Program.cs
+++ b/sandbox/MinimumWebApp/Program.cs
@@ -1,8 +1,18 @@
+using Example.Core;
using NATS.Client.Core;
using NATS.Client.Hosting;
+using OpenTelemetry.Trace;
+
+TracingSetup.SetSandboxEnv();
var builder = WebApplication.CreateBuilder(args);
+builder.Services.AddOpenTelemetry()
+ .WithTracing(o => o
+ .AddAspNetCoreInstrumentation()
+ .AddNatsInstrumentation()
+ .AddOtlpExporter());
+
// Register NatsConnectionPool, NatsConnection, INatsCommand to ServiceCollection
builder.Services.AddNats();
diff --git a/sandbox/MinimumWebApp/Properties/launchSettings.json b/sandbox/MinimumWebApp/Properties/launchSettings.json
index be6911fdc..6fd292654 100644
--- a/sandbox/MinimumWebApp/Properties/launchSettings.json
+++ b/sandbox/MinimumWebApp/Properties/launchSettings.json
@@ -1,12 +1,5 @@
{
- "iisSettings": {
- "windowsAuthentication": false,
- "anonymousAuthentication": true,
- "iisExpress": {
- "applicationUrl": "http://localhost:2196",
- "sslPort": 0
- }
- },
+ "$schema": "http://json.schemastore.org/launchsettings.json",
"profiles": {
"MinimumWebApp": {
"commandName": "Project",
@@ -14,14 +7,8 @@
"launchBrowser": true,
"applicationUrl": "http://localhost:5005",
"environmentVariables": {
- "ASPNETCORE_ENVIRONMENT": "Development"
- }
- },
- "IIS Express": {
- "commandName": "IISExpress",
- "launchBrowser": true,
- "environmentVariables": {
- "ASPNETCORE_ENVIRONMENT": "Development"
+ "ASPNETCORE_ENVIRONMENT": "Development",
+ "OTEL_SERVICE_NAME": "MinimumWebApp"
}
}
}
diff --git a/sandbox/shell.nix b/sandbox/shell.nix
new file mode 100644
index 000000000..6e8f67950
--- /dev/null
+++ b/sandbox/shell.nix
@@ -0,0 +1,29 @@
+{ pkgs ? import {} }:
+pkgs.mkShell {
+ name = "Nats Sandbox Shell";
+ buildInputs = with pkgs; [
+ natscli
+ nats-server
+ nats-top
+ parallel
+ man
+ ];
+
+ shellHook = ''
+ echo "Nats Sandbox shell started."
+ export RESPONSE='{ "Body": "Message {{Count}} @ {{Time}}" }'
+ alias server="nats-server --net localhost --js"
+ alias respond='nats reply bar.> "$RESPONSE"'
+ alias create-stream='nats stream create s1 --subjects="bar.>" --storage=memory --replicas=1 --defaults'
+
+ alias aspire="dotnet run --project ./Example.Aspire.AppHost/Example.Aspire.AppHost.csproj"
+ alias pub="dotnet run --project ./Example.Core.PublishModel/Example.Core.PublishModel.csproj"
+ alias consumer="dotnet run --project ./Example.JetStream.PullConsumer/Example.JetStream.PullConsumer.csproj -- "
+
+ echo "server - run nats server with jetstream"
+ echo "create-stream - run stream named s1 on subject bar.>"
+ echo "respond - run a responder on subject bar.>"
+ echo "pub - run Example.Core.PublishModel"
+ echo "consumer - run Example.JetStream.PullConsumer"
+ '';
+}
diff --git a/src/NATS.Client.Core/INatsConnection.cs b/src/NATS.Client.Core/INatsConnection.cs
index f7931d43e..82afa7581 100644
--- a/src/NATS.Client.Core/INatsConnection.cs
+++ b/src/NATS.Client.Core/INatsConnection.cs
@@ -1,5 +1,3 @@
-using System.Diagnostics.CodeAnalysis;
-
namespace NATS.Client.Core;
public interface INatsConnection : IAsyncDisposable
diff --git a/src/NATS.Client.Core/Internal/ActivityEndingMsgReader.cs b/src/NATS.Client.Core/Internal/ActivityEndingMsgReader.cs
new file mode 100644
index 000000000..bd295ddd9
--- /dev/null
+++ b/src/NATS.Client.Core/Internal/ActivityEndingMsgReader.cs
@@ -0,0 +1,40 @@
+using System.Runtime.CompilerServices;
+using System.Threading.Channels;
+
+namespace NATS.Client.Core.Internal;
+
+internal sealed class ActivityEndingMsgReader : ChannelReader>
+{
+ private readonly ChannelReader> _inner;
+
+ public ActivityEndingMsgReader(ChannelReader> inner) => _inner = inner;
+
+ public override bool CanCount => _inner.CanCount;
+
+ public override bool CanPeek => _inner.CanPeek;
+
+ public override int Count => _inner.Count;
+
+ public override Task Completion => _inner.Completion;
+
+ ///
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public override bool TryRead(out NatsMsg item)
+ {
+ if (!_inner.TryRead(out item))
+ return false;
+
+ item.Activity?.Dispose();
+ return true;
+ }
+
+ ///
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public override ValueTask WaitToReadAsync(CancellationToken cancellationToken = default) => _inner.WaitToReadAsync(cancellationToken);
+
+ public override ValueTask> ReadAsync(CancellationToken cancellationToken = default) => _inner.ReadAsync(cancellationToken);
+
+ public override bool TryPeek(out NatsMsg item) => _inner.TryPeek(out item);
+
+ public override IAsyncEnumerable> ReadAllAsync(CancellationToken cancellationToken = default) => _inner.ReadAllAsync(cancellationToken);
+}
diff --git a/src/NATS.Client.Core/Internal/InboxSub.cs b/src/NATS.Client.Core/Internal/InboxSub.cs
index 25cb73efb..0b496ee68 100644
--- a/src/NATS.Client.Core/Internal/InboxSub.cs
+++ b/src/NATS.Client.Core/Internal/InboxSub.cs
@@ -1,5 +1,6 @@
using System.Buffers;
using System.Collections.Concurrent;
+using System.Diagnostics;
using System.Runtime.CompilerServices;
using Microsoft.Extensions.Logging;
@@ -28,7 +29,7 @@ public override ValueTask ReceiveAsync(string subject, string? replyTo, ReadOnly
// Not used. Dummy implementation to keep base happy.
protected override ValueTask ReceiveInternalAsync(string subject, string? replyTo, ReadOnlySequence? headersBuffer, ReadOnlySequence payloadBuffer)
- => ValueTask.CompletedTask;
+ => throw new InvalidOperationException("InboxSub.ReceiveInternalAsync should not be called.");
protected override void TryComplete()
{
diff --git a/src/NATS.Client.Core/NatsConnection.Publish.cs b/src/NATS.Client.Core/NatsConnection.Publish.cs
index 9f5abedf0..6e85e76b0 100644
--- a/src/NATS.Client.Core/NatsConnection.Publish.cs
+++ b/src/NATS.Client.Core/NatsConnection.Publish.cs
@@ -1,33 +1,76 @@
+using System.Diagnostics;
+using System.Diagnostics.CodeAnalysis;
+using System.Runtime.CompilerServices;
+
namespace NATS.Client.Core;
public partial class NatsConnection
{
///
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
public ValueTask PublishAsync(string subject, NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default)
- {
- headers?.SetReadOnly();
- return ConnectionState != NatsConnectionState.Open
- ? ConnectAndPublishAsync(subject, default, headers, replyTo, NatsRawSerializer.Default, cancellationToken)
- : CommandWriter.PublishAsync(subject, default, headers, replyTo, NatsRawSerializer.Default, cancellationToken);
- }
+ => PublishAsync(Telemetry.NatsActivities, subject, default, headers, replyTo, NatsRawSerializer.Default, cancellationToken);
///
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public ValueTask PublishAsync(in NatsMsg msg, INatsSerialize? serializer = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default)
+ => PublishAsync(Telemetry.NatsActivities, msg.Subject, msg.Data, msg.Headers, msg.ReplyTo, serializer, cancellationToken);
+
+ ///
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
public ValueTask PublishAsync(string subject, T? data, NatsHeaders? headers = default, string? replyTo = default, INatsSerialize? serializer = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default)
+ => PublishAsync(Telemetry.NatsActivities, subject, data, headers, replyTo, serializer ?? Opts.SerializerRegistry.GetSerializer(), cancellationToken);
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ internal ValueTask PublishNoneAsync(ActivitySource activitySource, string subject, NatsHeaders? headers = default, string? replyTo = default, CancellationToken cancellationToken = default)
+ => PublishAsync(activitySource, subject, data: default, headers, replyTo, NatsRawSerializer.Default, cancellationToken);
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ internal ValueTask PublishAsync(ActivitySource activitySource, string subject, T? data, NatsHeaders? headers = default, string? replyTo = default, INatsSerialize? serializer = default, CancellationToken cancellationToken = default)
+ {
+ if (ConnectionState != NatsConnectionState.Open)
+ return ConnectAndPublishAsync(activitySource, subject, data, headers, replyTo, serializer ?? Opts.SerializerRegistry.GetSerializer(), cancellationToken);
+
+ return InnerPublishAsync(activitySource, subject, data, headers, replyTo, serializer ?? Opts.SerializerRegistry.GetSerializer(), cancellationToken);
+ }
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ private ValueTask InnerPublishAsync(ActivitySource activitySource, string subject, T? data, NatsHeaders? headers, string? replyTo, INatsSerialize serializer, CancellationToken cancellationToken)
+ => activitySource.HasListeners()
+ ? PublishTracedAsync(activitySource, subject, data, headers, replyTo, serializer, cancellationToken)
+ : SendPublishAsync(subject, data, headers, replyTo, serializer, cancellationToken);
+
+ [SuppressMessage("StyleCop.CSharp.OrderingRules", "SA1204:Static elements should appear before instance elements", Justification = "Method is private")]
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ private ValueTask SendPublishAsync(string subject, T? data, NatsHeaders? headers, string? replyTo, INatsSerialize serializer, CancellationToken cancellationToken = default)
{
- serializer ??= Opts.SerializerRegistry.GetSerializer();
+ Telemetry.AddTraceContextHeaders(Activity.Current, ref headers);
headers?.SetReadOnly();
- return ConnectionState != NatsConnectionState.Open
- ? ConnectAndPublishAsync(subject, data, headers, replyTo, serializer, cancellationToken)
- : CommandWriter.PublishAsync(subject, data, headers, replyTo, serializer, cancellationToken);
+
+ return CommandWriter.PublishAsync(subject, data, headers, replyTo, serializer, cancellationToken);
}
- ///
- public ValueTask PublishAsync(in NatsMsg msg, INatsSerialize? serializer = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default) =>
- PublishAsync(msg.Subject, msg.Data, msg.Headers, msg.ReplyTo, serializer, opts, cancellationToken);
+ [SuppressMessage("StyleCop.CSharp.OrderingRules", "SA1204:Static elements should appear before instance elements", Justification = "Method is private")]
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ private async ValueTask PublishTracedAsync(ActivitySource activitySource, string subject, T? data, NatsHeaders? headers, string? replyTo, INatsSerialize serializer, CancellationToken cancellationToken = default)
+ {
+ using var activity = Telemetry.StartSendActivity(activitySource, Telemetry.Constants.PublishActivityName, this, subject, replyTo);
+
+ try
+ {
+ await SendPublishAsync(subject, data, headers, replyTo, serializer, cancellationToken).ConfigureAwait(false);
+ }
+ catch (Exception ex)
+ {
+ Telemetry.SetException(activity, ex);
+ throw;
+ }
+ }
- private async ValueTask ConnectAndPublishAsync(string subject, T? data, NatsHeaders? headers, string? replyTo, INatsSerialize serializer, CancellationToken cancellationToken)
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ private async ValueTask ConnectAndPublishAsync(ActivitySource activitySource, string subject, T? data, NatsHeaders? headers, string? replyTo, INatsSerialize serializer, CancellationToken cancellationToken)
{
await ConnectAsync().AsTask().WaitAsync(cancellationToken).ConfigureAwait(false);
- await CommandWriter.PublishAsync(subject, data, headers, replyTo, serializer, cancellationToken).ConfigureAwait(false);
+ await InnerPublishAsync(activitySource, subject, data, headers, replyTo, serializer, cancellationToken).ConfigureAwait(false);
}
}
diff --git a/src/NATS.Client.Core/NatsConnection.RequestReply.cs b/src/NATS.Client.Core/NatsConnection.RequestReply.cs
index f5bfa0838..9db6fff37 100644
--- a/src/NATS.Client.Core/NatsConnection.RequestReply.cs
+++ b/src/NATS.Client.Core/NatsConnection.RequestReply.cs
@@ -7,23 +7,28 @@ namespace NATS.Client.Core;
public partial class NatsConnection
{
- private static readonly NatsSubOpts ReplyOptsDefault = new NatsSubOpts
- {
- MaxMsgs = 1,
- ThrowIfNoResponders = true,
- };
+ private static readonly NatsSubOpts ReplyOptsDefault = new NatsSubOpts { MaxMsgs = 1, ThrowIfNoResponders = true, };
- private static readonly NatsSubOpts ReplyManyOptsDefault = new NatsSubOpts
- {
- StopOnEmptyMsg = true,
- ThrowIfNoResponders = true,
- };
+ private static readonly NatsSubOpts ReplyManyOptsDefault = new NatsSubOpts { StopOnEmptyMsg = true, ThrowIfNoResponders = true, };
///
public string NewInbox() => NewInbox(InboxPrefix);
///
- public async ValueTask> RequestAsync(
+ public ValueTask> RequestAsync(
+ string subject,
+ TRequest? data,
+ NatsHeaders? headers = default,
+ INatsSerialize? requestSerializer = default,
+ INatsDeserialize? replySerializer = default,
+ NatsPubOpts? requestOpts = default,
+ NatsSubOpts? replyOpts = default,
+ CancellationToken cancellationToken = default)
+ => RequestAsync(
+ Telemetry.NatsActivities, subject, data, headers, requestSerializer, replySerializer, requestOpts, replyOpts, cancellationToken);
+
+ internal async ValueTask> RequestAsync(
+ ActivitySource activitySource,
string subject,
TRequest? data,
NatsHeaders? headers = default,
@@ -34,7 +39,7 @@ public async ValueTask> RequestAsync(
CancellationToken cancellationToken = default)
{
replyOpts = SetReplyOptsDefaults(replyOpts);
- await using var sub = await RequestSubAsync(subject, data, headers, requestSerializer, replySerializer, requestOpts, replyOpts, cancellationToken)
+ await using var sub = await RequestSubAsync(activitySource, subject, data, headers, requestSerializer, replySerializer, requestOpts, replyOpts, cancellationToken)
.ConfigureAwait(false);
if (await sub.Msgs.WaitToReadAsync(cancellationToken).ConfigureAwait(false))
@@ -49,6 +54,7 @@ public async ValueTask> RequestAsync(
}
///
+ [SuppressMessage("StyleCop.CSharp.OrderingRules", "SA1202:Elements should be ordered by access", Justification = "Internal is wrapped by public")]
public async IAsyncEnumerable> RequestManyAsync(
string subject,
TRequest? data,
@@ -60,7 +66,7 @@ public async IAsyncEnumerable> RequestManyAsync(subject, data, headers, requestSerializer, replySerializer, requestOpts, replyOpts, cancellationToken)
+ await using var sub = await RequestSubAsync(Telemetry.NatsActivities, subject, data, headers, requestSerializer, replySerializer, requestOpts, replyOpts, cancellationToken)
.ConfigureAwait(false);
while (await sub.Msgs.WaitToReadAsync(cancellationToken).ConfigureAwait(false))
@@ -73,6 +79,7 @@ public async IAsyncEnumerable> RequestManyAsync prefix)
{
Span buffer = stackalloc char[64];
@@ -80,7 +87,7 @@ internal static string NewInbox(ReadOnlySpan prefix)
var totalLength = (uint)prefix.Length + (uint)NuidWriter.NuidLength + separatorLength;
if (totalLength <= buffer.Length)
{
- buffer = buffer.Slice(0, (int)totalLength);
+ buffer = buffer[..(int)totalLength];
}
else
{
@@ -92,7 +99,7 @@ internal static string NewInbox(ReadOnlySpan prefix)
{
prefix.CopyTo(buffer);
buffer[prefix.Length] = '.';
- var remaining = buffer.Slice((int)totalPrefixLength);
+ var remaining = buffer[(int)totalPrefixLength..];
var didWrite = NuidWriter.TryWriteNuid(remaining);
Debug.Assert(didWrite, "didWrite");
return new string(buffer);
diff --git a/src/NATS.Client.Core/NatsConnection.RequestSub.cs b/src/NATS.Client.Core/NatsConnection.RequestSub.cs
index a3f94c24f..95c2b3dd9 100644
--- a/src/NATS.Client.Core/NatsConnection.RequestSub.cs
+++ b/src/NATS.Client.Core/NatsConnection.RequestSub.cs
@@ -1,8 +1,11 @@
+using System.Diagnostics;
+
namespace NATS.Client.Core;
public partial class NatsConnection
{
internal async ValueTask> RequestSubAsync(
+ ActivitySource activitySource,
string subject,
TRequest? data,
NatsHeaders? headers = default,
@@ -15,11 +18,11 @@ internal async ValueTask> RequestSubAsync(
var replyTo = NewInbox();
replySerializer ??= Opts.SerializerRegistry.GetDeserializer();
- var sub = new NatsSub(this, SubscriptionManager.InboxSubBuilder, replyTo, queueGroup: default, replyOpts, replySerializer);
+ var sub = new NatsSub(activitySource, this, SubscriptionManager.InboxSubBuilder, replyTo, queueGroup: default, replyOpts, replySerializer);
await SubAsync(sub, cancellationToken).ConfigureAwait(false);
requestSerializer ??= Opts.SerializerRegistry.GetSerializer();
- await PublishAsync(subject, data, headers, replyTo, requestSerializer, requestOpts, cancellationToken).ConfigureAwait(false);
+ await PublishAsync(activitySource, subject, data, headers, replyTo, requestSerializer, cancellationToken).ConfigureAwait(false);
return sub;
}
diff --git a/src/NATS.Client.Core/NatsConnection.Subscribe.cs b/src/NATS.Client.Core/NatsConnection.Subscribe.cs
index 649f7d1fd..5a11744c9 100644
--- a/src/NATS.Client.Core/NatsConnection.Subscribe.cs
+++ b/src/NATS.Client.Core/NatsConnection.Subscribe.cs
@@ -16,7 +16,7 @@ public async IAsyncEnumerable> SubscribeAsync(string subject, stri
{
serializer ??= Opts.SerializerRegistry.GetDeserializer();
- await using var sub = new NatsSub(this, SubscriptionManager.GetManagerFor(subject), subject, queueGroup, opts, serializer, cancellationToken);
+ await using var sub = new NatsSub(Telemetry.NatsActivities, this, SubscriptionManager.GetManagerFor(subject), subject, queueGroup, opts, serializer, cancellationToken);
using var anchor = RegisterSubAnchor(sub);
await SubAsync(sub, cancellationToken: cancellationToken).ConfigureAwait(false);
@@ -36,7 +36,7 @@ public async IAsyncEnumerable> SubscribeAsync(string subject, stri
public async ValueTask> SubscribeCoreAsync(string subject, string? queueGroup = default, INatsDeserialize? serializer = default, NatsSubOpts? opts = default, CancellationToken cancellationToken = default)
{
serializer ??= Opts.SerializerRegistry.GetDeserializer();
- var sub = new NatsSub(this, SubscriptionManager.GetManagerFor(subject), subject, queueGroup, opts, serializer, cancellationToken);
+ var sub = new NatsSub(Telemetry.NatsActivities, this, SubscriptionManager.GetManagerFor(subject), subject, queueGroup, opts, serializer, cancellationToken);
await SubAsync(sub, cancellationToken).ConfigureAwait(false);
return sub;
}
diff --git a/src/NATS.Client.Core/NatsConnection.cs b/src/NATS.Client.Core/NatsConnection.cs
index 4ccc8b7d1..3247f09ae 100644
--- a/src/NATS.Client.Core/NatsConnection.cs
+++ b/src/NATS.Client.Core/NatsConnection.cs
@@ -132,6 +132,8 @@ public NatsConnectionState ConnectionState
internal ObjectPool ObjectPool => _pool;
+ internal bool SocketIsWebSocket => _socket is WebSocketConnection;
+
// only used for internal testing
internal ISocketConnection? TestSocket => _socket;
diff --git a/src/NATS.Client.Core/NatsHeaders.cs b/src/NATS.Client.Core/NatsHeaders.cs
index e63ff0de3..9f3dc2179 100644
--- a/src/NATS.Client.Core/NatsHeaders.cs
+++ b/src/NATS.Client.Core/NatsHeaders.cs
@@ -1,4 +1,5 @@
using System.Collections;
+using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using Microsoft.Extensions.Primitives;
@@ -72,6 +73,8 @@ public enum Messages
public Messages Message { get; internal set; } = Messages.Text;
+ internal Activity? Activity { get; set; }
+
///
/// Initializes a new instance of .
///
@@ -126,6 +129,7 @@ public StringValues this[string key]
{
return value;
}
+
return StringValues.Empty;
}
set
@@ -134,6 +138,7 @@ public StringValues this[string key]
{
throw new ArgumentNullException(nameof(key));
}
+
ThrowIfReadOnly();
if (value.Count == 0)
@@ -150,7 +155,7 @@ public StringValues this[string key]
StringValues IDictionary.this[string key]
{
- get { return this[key]; }
+ get => this[key];
set
{
ThrowIfReadOnly();
@@ -181,6 +186,7 @@ public ICollection Keys
{
return EmptyKeys;
}
+
return Store.Keys;
}
}
@@ -196,6 +202,7 @@ public ICollection Values
{
return EmptyValues;
}
+
return Store.Values;
}
}
@@ -210,6 +217,7 @@ public void Add(KeyValuePair item)
{
throw new ArgumentException("The key is null");
}
+
ThrowIfReadOnly();
EnsureStore(1);
Store.Add(item.Key, item.Value);
@@ -226,6 +234,7 @@ public void Add(string key, StringValues value)
{
throw new ArgumentNullException(nameof(key));
}
+
ThrowIfReadOnly();
EnsureStore(1);
Store.Add(key, value);
@@ -253,6 +262,7 @@ public bool Contains(KeyValuePair item)
{
return false;
}
+
return true;
}
@@ -267,6 +277,7 @@ public bool ContainsKey(string key)
{
return false;
}
+
return Store.ContainsKey(key);
}
@@ -306,6 +317,7 @@ public bool Remove(KeyValuePair item)
{
return Store.Remove(item.Key);
}
+
return false;
}
@@ -321,6 +333,7 @@ public bool Remove(string key)
{
return false;
}
+
return Store.Remove(key);
}
@@ -337,6 +350,7 @@ public bool TryGetValue(string key, out StringValues value)
value = default(StringValues);
return false;
}
+
return Store.TryGetValue(key, out value);
}
@@ -351,6 +365,7 @@ public Enumerator GetEnumerator()
// Non-boxed Enumerator
return default;
}
+
return new Enumerator(Store.GetEnumerator());
}
@@ -365,6 +380,7 @@ IEnumerator> IEnumerable Current
{
return _dictionaryEnumerator.Current;
}
+
return default(KeyValuePair);
}
}
diff --git a/src/NATS.Client.Core/NatsMsg.cs b/src/NATS.Client.Core/NatsMsg.cs
index 53bc01750..3573fa20a 100644
--- a/src/NATS.Client.Core/NatsMsg.cs
+++ b/src/NATS.Client.Core/NatsMsg.cs
@@ -1,4 +1,4 @@
-using System.Buffers;
+using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
namespace NATS.Client.Core;
@@ -120,42 +120,11 @@ public readonly record struct NatsMsg(
T? Data,
INatsConnection? Connection) : INatsMsg
{
- internal static NatsMsg Build(
- string subject,
- string? replyTo,
- in ReadOnlySequence? headersBuffer,
- in ReadOnlySequence payloadBuffer,
- INatsConnection? connection,
- NatsHeaderParser headerParser,
- INatsDeserialize serializer)
- {
- // Consider an empty payload as null or default value for value types. This way we are able to
- // receive sentinels as nulls or default values. This might cause an issue with where we are not
- // able to differentiate between an empty sentinel and actual default value of a struct e.g. 0 (zero).
- var data = payloadBuffer.Length > 0
- ? serializer.Deserialize(payloadBuffer)
- : default;
-
- NatsHeaders? headers = null;
-
- if (headersBuffer != null)
- {
- headers = new NatsHeaders();
- if (!headerParser.ParseHeaders(new SequenceReader(headersBuffer.Value), headers))
- {
- throw new NatsException("Error parsing headers");
- }
-
- headers.SetReadOnly();
- }
-
- var size = subject.Length
- + (replyTo?.Length ?? 0)
- + (headersBuffer?.Length ?? 0)
- + payloadBuffer.Length;
-
- return new NatsMsg(subject, replyTo, (int)size, headers, data, connection);
- }
+ ///
+ /// Activity used to trace the receiving of the this message. It can be used to create child activities under this context.
+ ///
+ ///
+ public Activity? Activity => Headers?.Activity;
///
/// Reply with an empty message.
@@ -168,7 +137,10 @@ internal static NatsMsg Build(
public ValueTask ReplyAsync(NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default)
{
CheckReplyPreconditions();
- return Connection.PublishAsync(ReplyTo, headers, replyTo, opts, cancellationToken);
+ var activitySource = Activity?.Source ?? Telemetry.NatsInternalActivities;
+
+ // TODO: un-hack
+ return ((NatsConnection)Connection).PublishNoneAsync(activitySource, subject: ReplyTo, headers, replyTo, cancellationToken);
}
///
@@ -196,7 +168,10 @@ public ValueTask ReplyAsync(NatsHeaders? headers = default, string? replyTo = de
public ValueTask ReplyAsync(TReply data, NatsHeaders? headers = default, string? replyTo = default, INatsSerialize? serializer = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default)
{
CheckReplyPreconditions();
- return Connection.PublishAsync(ReplyTo, data, headers, replyTo, serializer, opts, cancellationToken);
+ var activitySource = Activity?.Source ?? Telemetry.NatsInternalActivities;
+
+ // TODO: un-hack
+ return ((NatsConnection)Connection).PublishAsync(activitySource, subject: ReplyTo, data, headers, replyTo, serializer, cancellationToken);
}
///
@@ -214,7 +189,10 @@ public ValueTask ReplyAsync(TReply data, NatsHeaders? headers = default,
public ValueTask ReplyAsync(NatsMsg msg, INatsSerialize? serializer = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default)
{
CheckReplyPreconditions();
- return Connection.PublishAsync(msg with { Subject = ReplyTo }, serializer, opts, cancellationToken);
+ var activitySource = Activity?.Source ?? Telemetry.NatsInternalActivities;
+
+ // TODO: un-hack
+ return ((NatsConnection)Connection).PublishAsync(activitySource, subject: ReplyTo, msg.Data, msg.Headers, msg.ReplyTo, serializer, cancellationToken);
}
[MemberNotNull(nameof(Connection))]
diff --git a/src/NATS.Client.Core/NatsMsgTelemetryExtensions.cs b/src/NATS.Client.Core/NatsMsgTelemetryExtensions.cs
new file mode 100644
index 000000000..aeebc28e2
--- /dev/null
+++ b/src/NATS.Client.Core/NatsMsgTelemetryExtensions.cs
@@ -0,0 +1,36 @@
+using System.Diagnostics;
+using System.Runtime.CompilerServices;
+
+namespace NATS.Client.Core;
+
+public static class NatsMsgTelemetryExtensions
+{
+ ///
+ /// Get the activity context associated with a NatsMsg.
+ ///
+ ///
+ /// Returns the from the 'receive' on .
+ /// The value will be default if no activity is present.
+ ///
+ public static ActivityContext GetActivityContext(this in NatsMsg msg)
+ => msg.Activity?.Context ?? default;
+
+ /// Start a child activity under the NatsMsg associated activity.
+ /// Nats message
+ /// Name of new activity
+ /// Optional tags to add to the activity
+ /// Returns child or null if no listeners.
+ ///
+ /// Consider setting Activity. to the returned value so that
+ /// the context flows to any child activities created.
+ ///
+ public static Activity? StartChildActivity(
+ this in NatsMsg msg,
+ [CallerMemberName] string name = "",
+ IEnumerable>? tags = null)
+ => Telemetry.NatsActivities.StartActivity(
+ name,
+ kind: ActivityKind.Internal,
+ parentContext: GetActivityContext(in msg),
+ tags: tags);
+}
diff --git a/src/NATS.Client.Core/NatsSub.cs b/src/NATS.Client.Core/NatsSub.cs
index 853408e83..b6ae22e6a 100644
--- a/src/NATS.Client.Core/NatsSub.cs
+++ b/src/NATS.Client.Core/NatsSub.cs
@@ -1,4 +1,5 @@
using System.Buffers;
+using System.Diagnostics;
using System.Runtime.ExceptionServices;
using System.Threading.Channels;
using NATS.Client.Core.Internal;
@@ -7,9 +8,11 @@ namespace NATS.Client.Core;
public sealed class NatsSub : NatsSubBase, INatsSub
{
+ private readonly ActivitySource _activitySource;
private readonly Channel> _msgs;
internal NatsSub(
+ ActivitySource activitySource,
NatsConnection connection,
ISubscriptionManager manager,
string subject,
@@ -19,27 +22,32 @@ internal NatsSub(
CancellationToken cancellationToken = default)
: base(connection, manager, subject, queueGroup, opts, cancellationToken)
{
+ _activitySource = activitySource;
_msgs = Channel.CreateBounded>(
connection.GetChannelOpts(connection.Opts, opts?.ChannelOpts),
msg => Connection.OnMessageDropped(this, _msgs?.Reader.Count ?? 0, msg));
+ Msgs = new ActivityEndingMsgReader(_msgs.Reader);
+
Serializer = serializer;
}
- public ChannelReader> Msgs => _msgs.Reader;
+ public ChannelReader> Msgs { get; }
private INatsDeserialize Serializer { get; }
protected override async ValueTask ReceiveInternalAsync(string subject, string? replyTo, ReadOnlySequence? headersBuffer, ReadOnlySequence payloadBuffer)
{
- var natsMsg = NatsMsg.Build(
- subject,
- replyTo,
+ var natsMsg = ParseMsg(
+ activitySource: _activitySource,
+ activityName: Telemetry.Constants.ReceiveActivityName,
+ subject: subject,
+ replyTo: replyTo,
headersBuffer,
- payloadBuffer,
+ in payloadBuffer,
Connection,
Connection.HeaderParser,
- Serializer);
+ serializer: Serializer);
await _msgs.Writer.WriteAsync(natsMsg).ConfigureAwait(false);
diff --git a/src/NATS.Client.Core/NatsSubBase.cs b/src/NATS.Client.Core/NatsSubBase.cs
index e1b8a610b..3502deaf6 100644
--- a/src/NATS.Client.Core/NatsSubBase.cs
+++ b/src/NATS.Client.Core/NatsSubBase.cs
@@ -1,4 +1,5 @@
using System.Buffers;
+using System.Diagnostics;
using System.Runtime.ExceptionServices;
using System.Threading.Channels;
using Microsoft.Extensions.Logging;
@@ -346,4 +347,62 @@ protected void EndSubscription(NatsSubEndReason reason)
#pragma warning restore VSTHRD110
#pragma warning restore CA2012
}
+
+ protected NatsMsg ParseMsg(
+ ActivitySource activitySource,
+ string activityName,
+ string subject,
+ string? replyTo,
+ ReadOnlySequence? headersBuffer,
+ in ReadOnlySequence payloadBuffer,
+ INatsConnection? connection,
+ NatsHeaderParser headerParser,
+ INatsDeserialize serializer)
+ {
+ NatsHeaders? headers;
+ if (headersBuffer != null)
+ {
+ headers = new NatsHeaders();
+ if (!headerParser.ParseHeaders(new SequenceReader(headersBuffer.Value), headers))
+ throw new NatsException("Error parsing headers");
+ }
+ else
+ {
+ headers = null;
+ }
+
+ var size = subject.Length
+ + (replyTo?.Length ?? 0)
+ + (headersBuffer?.Length ?? 0)
+ + payloadBuffer.Length;
+
+ var activity = Telemetry.StartReceiveActivity(
+ activitySource,
+ Connection,
+ name: activityName,
+ subscriptionSubject: Subject,
+ queueGroup: QueueGroup,
+ subject: subject,
+ replyTo: replyTo,
+ bodySize: payloadBuffer.Length,
+ size: size,
+ headers: headers);
+
+ if (activity is not null)
+ {
+ headers ??= new NatsHeaders();
+ headers.Activity = activity;
+ }
+
+ headers?.SetReadOnly();
+
+ // Consider an empty payload as null or default value for value types. This way we are able to
+ // receive sentinels as nulls or default values. This might cause an issue with where we are not
+ // able to differentiate between an empty sentinel and actual default value of a struct e.g. 0 (zero).
+ var data = payloadBuffer.Length > 0
+ ? serializer.Deserialize(payloadBuffer)
+ : default;
+
+ return new NatsMsg(subject, replyTo, (int)size, headers, data, connection);
+ }
}
diff --git a/src/NATS.Client.Core/Telemetry.cs b/src/NATS.Client.Core/Telemetry.cs
new file mode 100644
index 000000000..494c8aa31
--- /dev/null
+++ b/src/NATS.Client.Core/Telemetry.cs
@@ -0,0 +1,296 @@
+using System.Diagnostics;
+using System.Diagnostics.CodeAnalysis;
+
+namespace NATS.Client.Core;
+
+// https://opentelemetry.io/docs/specs/semconv/attributes-registry/messaging/
+// https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/#messaging-attributes
+internal static class Telemetry
+{
+ public const string NatsActivitySource = "NATS.Client";
+ public const string NatsInternalActivitySource = "NATS.Client.Internal";
+ private static readonly object BoxedTrue = true;
+
+ internal static ActivitySource NatsActivities { get; } = new(name: NatsActivitySource);
+
+ internal static ActivitySource NatsInternalActivities { get; } = new(name: NatsInternalActivitySource);
+
+ // ReSharper disable once ClassNeverInstantiated.Global
+ internal class Constants
+ {
+ public const string True = "true";
+ public const string False = "false";
+ public const string PublishActivityName = "publish";
+ public const string ReceiveActivityName = "receive";
+
+ public const string SystemKey = "messaging.system";
+ public const string SystemVal = "nats";
+ public const string ClientId = "messaging.client_id";
+ public const string OpKey = "messaging.operation";
+ public const string OpPub = "publish";
+ public const string OpRec = "receive";
+ public const string OpProcess = "process";
+ public const string MsgBodySize = "messaging.message.body.size";
+ public const string MsgTotalSize = "messaging.message.envelope.size";
+
+ // destination
+ public const string DestTemplate = "messaging.destination.template";
+ public const string DestName = "messaging.destination.name";
+ public const string DestIsTemporary = "messaging.destination.temporary";
+ public const string DestIsAnon = "messaging.destination.anonymous";
+ public const string DestPubName = "messaging.destination_publish.name";
+ public const string DestPubIsAnon = "messaging.destination_publish.anonymous";
+
+ public const string QueueGroup = "messaging.nats.consumer.group";
+ public const string ReplyTo = "messaging.nats.message.reply_to";
+ public const string Subject = "messaging.nats.message.subject";
+
+ public const string ServerAddress = "server.address";
+ public const string ServerPort = "server.port";
+ public const string NetworkProtoName = "network.protocol.name";
+ public const string NetworkProtoVersion = "network.protocol.version";
+ public const string NetworkPeerAddress = "network.peer.address";
+ public const string NetworkPeerPort = "network.peer.port";
+ public const string NetworkLocalAddress = "network.local.address";
+ public const string NetworkLocalPort = "network.local.port";
+ public const string NetworkTransport = "network.transport";
+
+ // rpc ???
+ public const string RpcSystemKey = "rpc.system";
+ public const string RpcSystemVal = "nats";
+
+ public const string JSAck = "messaging.nats.js.acked";
+ }
+
+ [SuppressMessage("StyleCop.CSharp.OrderingRules", "SA1201:Elements should appear in the correct order", Justification = "Class is all constant.")]
+ internal static Activity? StartSendActivity(ActivitySource source, string name, INatsConnection? connection, string subject, string? replyTo)
+ {
+ if (!source.HasListeners())
+ return null;
+
+ KeyValuePair[] tags;
+ if (connection is NatsConnection { ServerInfo: not null } conn)
+ {
+ var len = 12;
+ if (replyTo is not null)
+ len++;
+
+ var serverPort = conn.ServerInfo.Port.ToString();
+ tags = new KeyValuePair[len];
+ tags[0] = new KeyValuePair(Constants.SystemKey, Constants.SystemVal);
+ tags[1] = new KeyValuePair(Constants.OpKey, Constants.OpPub);
+ tags[2] = new KeyValuePair(Constants.DestName, subject);
+
+ tags[3] = new KeyValuePair(Constants.ClientId, conn.ServerInfo.ClientId.ToString());
+ tags[4] = new KeyValuePair(Constants.ServerAddress, conn.ServerInfo.Host);
+ tags[5] = new KeyValuePair(Constants.ServerPort, serverPort);
+ tags[6] = new KeyValuePair(Constants.NetworkProtoName, "nats");
+ tags[7] = new KeyValuePair(Constants.NetworkProtoVersion, conn.ServerInfo.ProtocolVersion.ToString());
+ tags[8] = new KeyValuePair(Constants.NetworkPeerAddress, conn.ServerInfo.Host);
+ tags[9] = new KeyValuePair(Constants.NetworkPeerPort, serverPort);
+ tags[10] = new KeyValuePair(Constants.NetworkLocalAddress, conn.ServerInfo.ClientIp);
+ tags[11] = new KeyValuePair(Constants.NetworkTransport, conn.SocketIsWebSocket ? "websocket" : "tcp");
+
+ if (replyTo is not null)
+ tags[12] = new KeyValuePair(Constants.ReplyTo, replyTo);
+ }
+ else
+ {
+ var len = 3;
+ if (replyTo is not null)
+ len++;
+
+ tags = new KeyValuePair[len];
+ tags[0] = new KeyValuePair(Constants.SystemKey, Constants.SystemVal);
+ tags[1] = new KeyValuePair(Constants.OpKey, Constants.OpPub);
+ tags[2] = new KeyValuePair(Constants.DestName, subject);
+
+ if (replyTo is not null)
+ tags[3] = new KeyValuePair(Constants.ReplyTo, replyTo);
+ }
+
+ return source.StartActivity(
+ name,
+ kind: ActivityKind.Producer,
+ parentContext: default, // propagate from current activity
+ tags: tags);
+ }
+
+ internal static void AddTraceContextHeaders(Activity? activity, ref NatsHeaders? headers)
+ {
+ if (activity is null)
+ return;
+
+ headers ??= new NatsHeaders();
+ DistributedContextPropagator.Current.Inject(
+ activity: activity,
+ carrier: headers,
+ setter: static (carrier, fieldName, fieldValue) =>
+ {
+ if (carrier is not NatsHeaders headers)
+ {
+ Debug.Assert(false, "This should never be hit.");
+ return;
+ }
+
+ headers[fieldName] = fieldValue;
+ });
+ }
+
+ internal static Activity? StartReceiveActivity(
+ ActivitySource source,
+ INatsConnection? connection,
+ string name,
+ string subscriptionSubject,
+ string? queueGroup,
+ string subject,
+ string? replyTo,
+ long bodySize,
+ long size,
+ NatsHeaders? headers)
+ {
+ if (!source.HasListeners())
+ return null;
+
+ KeyValuePair[] tags;
+ if (connection is NatsConnection { ServerInfo: not null } conn)
+ {
+ var serverPort = conn.ServerInfo.Port.ToString();
+
+ var len = 19;
+ if (replyTo is not null)
+ len++;
+
+ tags = new KeyValuePair[len];
+ tags[0] = new KeyValuePair(Constants.SystemKey, Constants.SystemVal);
+ tags[1] = new KeyValuePair(Constants.OpKey, Constants.OpRec);
+ tags[2] = new KeyValuePair(Constants.DestTemplate, subscriptionSubject);
+ tags[3] = new KeyValuePair(Constants.QueueGroup, queueGroup);
+ tags[4] = new KeyValuePair(Constants.Subject, subject);
+ tags[5] = new KeyValuePair(Constants.DestName, subject);
+ tags[6] = new KeyValuePair(Constants.DestPubName, subject);
+ tags[7] = new KeyValuePair(Constants.MsgBodySize, bodySize.ToString());
+ tags[8] = new KeyValuePair(Constants.MsgTotalSize, size.ToString());
+
+ tags[9] = new KeyValuePair(Constants.ClientId, conn.ServerInfo.ClientId.ToString());
+ tags[10] = new KeyValuePair(Constants.ServerAddress, conn.ServerInfo.Host);
+ tags[11] = new KeyValuePair(Constants.ServerPort, serverPort);
+ tags[12] = new KeyValuePair(Constants.NetworkProtoName, "nats");
+ tags[13] = new KeyValuePair(Constants.NetworkProtoVersion, conn.ServerInfo.ProtocolVersion.ToString());
+ tags[14] = new KeyValuePair(Constants.NetworkPeerAddress, conn.ServerInfo.Host);
+ tags[15] = new KeyValuePair(Constants.NetworkPeerPort, serverPort);
+ tags[16] = new KeyValuePair(Constants.NetworkLocalAddress, conn.ServerInfo.ClientIp);
+ tags[17] = new KeyValuePair(Constants.NetworkTransport, conn.SocketIsWebSocket ? "websocket" : "tcp");
+ tags[18] = new KeyValuePair(
+ Constants.DestIsTemporary,
+ subscriptionSubject.StartsWith(conn.InboxPrefix, StringComparison.Ordinal) ? Constants.True : Constants.False);
+
+ if (replyTo is not null)
+ tags[19] = new KeyValuePair(Constants.ReplyTo, replyTo);
+ }
+ else
+ {
+ tags = new KeyValuePair[10];
+ tags[0] = new KeyValuePair(Constants.SystemKey, Constants.SystemVal);
+ tags[1] = new KeyValuePair(Constants.OpKey, Constants.OpRec);
+ tags[2] = new KeyValuePair(Constants.DestTemplate, subscriptionSubject);
+ tags[3] = new KeyValuePair(Constants.QueueGroup, queueGroup);
+ tags[4] = new KeyValuePair(Constants.Subject, subject);
+ tags[5] = new KeyValuePair(Constants.DestName, subject);
+ tags[6] = new KeyValuePair(Constants.DestPubName, subject);
+ tags[7] = new KeyValuePair(Constants.MsgBodySize, bodySize.ToString());
+ tags[8] = new KeyValuePair(Constants.MsgTotalSize, size.ToString());
+
+ if (replyTo is not null)
+ tags[9] = new KeyValuePair(Constants.ReplyTo, replyTo);
+ }
+
+ if (headers is null || !TryParseTraceContext(headers, out var context))
+ context = default;
+
+ return source.StartActivity(
+ name,
+ kind: ActivityKind.Consumer,
+ parentContext: context,
+ tags: tags);
+ }
+
+ internal static void SetException(Activity? activity, Exception exception)
+ {
+ if (activity is null)
+ return;
+
+ // see: https://opentelemetry.io/docs/specs/semconv/attributes-registry/exception/
+ var message = GetMessage(exception);
+ var eventTags = new ActivityTagsCollection
+ {
+ ["exception.escaped"] = BoxedTrue,
+ ["exception.type"] = exception.GetType().FullName,
+ ["exception.message"] = message,
+ ["exception.stacktrace"] = GetStackTrace(exception),
+ };
+
+ var activityEvent = new ActivityEvent("exception", DateTimeOffset.UtcNow, eventTags);
+
+ activity.AddEvent(activityEvent);
+ activity.SetStatus(ActivityStatusCode.Error, message);
+ return;
+
+ static string GetMessage(Exception exception)
+ {
+ try
+ {
+ return exception.Message;
+ }
+ catch
+ {
+ return $"An exception of type {exception.GetType()} was thrown but the Message property threw an exception.";
+ }
+ }
+
+ static string GetStackTrace(Exception? exception)
+ {
+ var stackTrace = exception?.StackTrace;
+ return string.IsNullOrWhiteSpace(stackTrace) ? string.Empty : stackTrace;
+ }
+ }
+
+ private static bool TryParseTraceContext(NatsHeaders headers, out ActivityContext context)
+ {
+ DistributedContextPropagator.Current.ExtractTraceIdAndState(
+ carrier: headers,
+ getter: static (object? carrier, string fieldName, out string? fieldValue, out IEnumerable? fieldValues) =>
+ {
+ if (carrier is not NatsHeaders headers)
+ {
+ Debug.Assert(false, "This should never be hit.");
+ fieldValue = null;
+ fieldValues = null;
+ return;
+ }
+
+ if (headers.TryGetValue(fieldName, out var values))
+ {
+ if (values.Count == 1)
+ {
+ fieldValue = values[0];
+ fieldValues = null;
+ }
+ else
+ {
+ fieldValue = null;
+ fieldValues = values;
+ }
+ }
+ else
+ {
+ fieldValue = null;
+ fieldValues = null;
+ }
+ },
+ out var traceParent,
+ out var traceState);
+
+ return ActivityContext.TryParse(traceParent, traceState, out context);
+ }
+}
diff --git a/src/NATS.Client.JetStream/Internal/ActivityEndingJSMsgReader.cs b/src/NATS.Client.JetStream/Internal/ActivityEndingJSMsgReader.cs
new file mode 100644
index 000000000..f347da660
--- /dev/null
+++ b/src/NATS.Client.JetStream/Internal/ActivityEndingJSMsgReader.cs
@@ -0,0 +1,26 @@
+using System.Runtime.CompilerServices;
+using System.Threading.Channels;
+
+namespace NATS.Client.JetStream.Internal;
+
+internal sealed class ActivityEndingJSMsgReader : ChannelReader>
+{
+ private readonly ChannelReader> _inner;
+
+ public ActivityEndingJSMsgReader(ChannelReader> inner) => _inner = inner;
+
+ ///
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public override bool TryRead(out NatsJSMsg item)
+ {
+ if (!_inner.TryRead(out item))
+ return false;
+
+ item.Activity?.Dispose();
+ return true;
+ }
+
+ ///
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public override ValueTask WaitToReadAsync(CancellationToken cancellationToken = default) => _inner.WaitToReadAsync(cancellationToken);
+}
diff --git a/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs b/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs
index d94c9b88b..2a7c2208b 100644
--- a/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs
+++ b/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs
@@ -1,4 +1,5 @@
using System.Buffers;
+using System.Diagnostics;
using System.Text;
using System.Threading.Channels;
using Microsoft.Extensions.Logging;
@@ -133,7 +134,7 @@ public NatsJSConsume(
// sufficiently large value to avoid blocking socket reads in the
// NATS connection).
_userMsgs = Channel.CreateBounded>(1000);
- Msgs = _userMsgs.Reader;
+ Msgs = new ActivityEndingJSMsgReader(_userMsgs.Reader);
// Capacity as 1 is enough here since it's used for signaling only.
_pullRequests = Channel.CreateBounded(1);
@@ -155,6 +156,7 @@ public ValueTask CallMsgNextAsync(string origin, ConsumerGetnextRequest request,
}
return Connection.PublishAsync(
+ Telemetry.NatsInternalActivities,
subject: $"{_context.Opts.Prefix}.CONSUMER.MSG.NEXT.{_stream}.{_consumer}",
data: request,
replyTo: Subject,
@@ -323,14 +325,16 @@ protected override async ValueTask ReceiveInternalAsync(
else
{
var msg = new NatsJSMsg(
- NatsMsg.Build(
- subject,
- replyTo,
+ ParseMsg(
+ Telemetry.NatsActivities,
+ activityName: "js_receive",
+ subject: subject,
+ replyTo: replyTo,
headersBuffer,
- payloadBuffer,
+ in payloadBuffer,
Connection,
Connection.HeaderParser,
- _serializer),
+ serializer: _serializer),
_context);
lock (_pendingGate)
@@ -357,7 +361,7 @@ protected override async ValueTask ReceiveInternalAsync(
// We can't pass cancellation token here because we need to hand
// the message to the user to be processed. Writer will be completed
// when the user calls Stop() or when the subscription is closed.
- await _userMsgs.Writer.WriteAsync(msg).ConfigureAwait(false);
+ await _userMsgs.Writer.WriteAsync(msg, CancellationToken.None).ConfigureAwait(false);
}
}
@@ -437,7 +441,7 @@ private async Task PullLoop()
await foreach (var pr in _pullRequests.Reader.ReadAllAsync().ConfigureAwait(false))
{
var origin = $"pull-loop({pr.Origin})";
- await CallMsgNextAsync(origin, pr.Request).ConfigureAwait(false);
+ await CallMsgNextAsync(origin, pr.Request, CancellationToken.None).ConfigureAwait(false);
if (_debug)
{
_logger.LogDebug(NatsJSLogEvents.PullRequest, "Pull request issued for {Origin} {Batch}, {MaxBytes}", origin, pr.Request.Batch, pr.Request.MaxBytes);
diff --git a/src/NATS.Client.JetStream/Internal/NatsJSFetch.cs b/src/NATS.Client.JetStream/Internal/NatsJSFetch.cs
index 751d2f923..f73d8b6f7 100644
--- a/src/NATS.Client.JetStream/Internal/NatsJSFetch.cs
+++ b/src/NATS.Client.JetStream/Internal/NatsJSFetch.cs
@@ -1,4 +1,5 @@
using System.Buffers;
+using System.Diagnostics;
using System.Text;
using System.Threading.Channels;
using Microsoft.Extensions.Logging;
@@ -73,7 +74,7 @@ public NatsJSFetch(
// sufficiently large value to avoid blocking socket reads in the
// NATS connection).
_userMsgs = Channel.CreateBounded>(1000);
- Msgs = _userMsgs.Reader;
+ Msgs = new ActivityEndingJSMsgReader(_userMsgs.Reader);
if (_debug)
{
@@ -128,6 +129,7 @@ public NatsJSFetch(
public ValueTask CallMsgNextAsync(ConsumerGetnextRequest request, CancellationToken cancellationToken = default) =>
Connection.PublishAsync(
+ Telemetry.NatsInternalActivities,
subject: $"{_context.Opts.Prefix}.CONSUMER.MSG.NEXT.{_stream}.{_consumer}",
data: request,
replyTo: Subject,
@@ -224,14 +226,16 @@ protected override async ValueTask ReceiveInternalAsync(
else
{
var msg = new NatsJSMsg(
- NatsMsg.Build(
- subject,
- replyTo,
+ ParseMsg(
+ Telemetry.NatsActivities,
+ activityName: "js_receive",
+ subject: subject,
+ replyTo: replyTo,
headersBuffer,
- payloadBuffer,
+ in payloadBuffer,
Connection,
Connection.HeaderParser,
- _serializer),
+ serializer: _serializer),
_context);
_pendingMsgs--;
diff --git a/src/NATS.Client.JetStream/Internal/NatsJSNotificationChannel.cs b/src/NATS.Client.JetStream/Internal/NatsJSNotificationChannel.cs
index c7eaf1e7a..9784a1ba0 100644
--- a/src/NATS.Client.JetStream/Internal/NatsJSNotificationChannel.cs
+++ b/src/NATS.Client.JetStream/Internal/NatsJSNotificationChannel.cs
@@ -25,7 +25,7 @@ public NatsJSNotificationChannel(
SingleWriter = false,
FullMode = BoundedChannelFullMode.DropOldest,
});
- _loop = Task.Run(NotificationLoop);
+ _loop = Task.Run(NotificationLoop, CancellationToken.None);
}
public void Notify(INatsJSNotification notification) => _channel.Writer.TryWrite(notification);
diff --git a/src/NATS.Client.JetStream/Internal/NatsJSOrderedConsume.cs b/src/NATS.Client.JetStream/Internal/NatsJSOrderedConsume.cs
index e1a698320..d30e64a70 100644
--- a/src/NATS.Client.JetStream/Internal/NatsJSOrderedConsume.cs
+++ b/src/NATS.Client.JetStream/Internal/NatsJSOrderedConsume.cs
@@ -1,4 +1,5 @@
using System.Buffers;
+using System.Diagnostics;
using System.Text;
using System.Threading.Channels;
using Microsoft.Extensions.Logging;
@@ -97,12 +98,12 @@ public NatsJSOrderedConsume(
_userMsgs = Channel.CreateBounded>(
Connection.GetChannelOpts(Connection.Opts, opts?.ChannelOpts),
msg => Connection.OnMessageDropped(this, _userMsgs?.Reader.Count ?? 0, msg.Msg));
- Msgs = _userMsgs.Reader;
+ Msgs = new ActivityEndingJSMsgReader(_userMsgs.Reader);
// Pull request channel is set as unbounded because we don't want to drop
// them and minimize potential lock contention.
_pullRequests = Channel.CreateUnbounded();
- _pullTask = Task.Run(PullLoop);
+ _pullTask = Task.Run(PullLoop, CancellationToken.None);
ResetPending();
@@ -122,6 +123,7 @@ public ValueTask CallMsgNextAsync(string origin, ConsumerGetnextRequest request,
}
return Connection.PublishAsync(
+ Telemetry.NatsInternalActivities,
subject: $"{_context.Opts.Prefix}.CONSUMER.MSG.NEXT.{_stream}.{_consumer}",
data: request,
replyTo: Subject,
@@ -269,14 +271,16 @@ protected override async ValueTask ReceiveInternalAsync(
else
{
var msg = new NatsJSMsg(
- NatsMsg.Build(
- subject,
- replyTo,
+ ParseMsg(
+ Telemetry.NatsActivities,
+ activityName: "js_receive",
+ subject: subject,
+ replyTo: replyTo,
headersBuffer,
- payloadBuffer,
+ in payloadBuffer,
Connection,
Connection.HeaderParser,
- _serializer),
+ serializer: _serializer),
_context);
lock (_pendingGate)
diff --git a/src/NATS.Client.JetStream/Internal/NatsJSOrderedPushConsumer.cs b/src/NATS.Client.JetStream/Internal/NatsJSOrderedPushConsumer.cs
index 04c762ffc..40e9234e9 100644
--- a/src/NATS.Client.JetStream/Internal/NatsJSOrderedPushConsumer.cs
+++ b/src/NATS.Client.JetStream/Internal/NatsJSOrderedPushConsumer.cs
@@ -1,4 +1,5 @@
using System.Buffers;
+using System.Diagnostics;
using System.Threading.Channels;
using Microsoft.Extensions.Logging;
using NATS.Client.Core;
@@ -113,21 +114,18 @@ public NatsJSOrderedPushConsumer(
// where it's not enough due to performance for example.
_commandChannel = Channel.CreateBounded>(1);
_msgChannel = Channel.CreateBounded>(1);
+ Msgs = new ActivityEndingJSMsgReader(_msgChannel.Reader);
// A single request to create the consumer is enough because we don't want to create a new consumer
// back to back in case the consumer is being recreated due to a timeout and a mismatch in consumer
// sequence for example; creating the consumer once would solve both the issues.
- _consumerCreateChannel = Channel.CreateBounded(new BoundedChannelOptions(1)
- {
- AllowSynchronousContinuations = false,
- FullMode = BoundedChannelFullMode.DropOldest,
- });
+ _consumerCreateChannel = Channel.CreateBounded(new BoundedChannelOptions(1) { AllowSynchronousContinuations = false, FullMode = BoundedChannelFullMode.DropOldest, });
_consumerCreateTask = Task.Run(ConsumerCreateLoop);
_commandTask = Task.Run(CommandLoop);
}
- public ChannelReader> Msgs => _msgChannel.Reader;
+ public ChannelReader> Msgs { get; }
public bool IsDone => Volatile.Read(ref _done) > 0;
@@ -148,7 +146,7 @@ public async ValueTask DisposeAsync()
await _consumerCreateTask;
await _commandTask;
- await _context.DeleteConsumerAsync(_stream, Consumer, _cancellationToken);
+ await _context.DeleteConsumerAsync(Telemetry.NatsInternalActivities, _stream, Consumer, _cancellationToken);
}
internal void Init()
@@ -198,7 +196,7 @@ private async Task CommandLoop()
{
if (headers.TryGetValue("Nats-Consumer-Stalled", out var flowControlReplyTo))
{
- await _nats.PublishAsync(flowControlReplyTo, cancellationToken: _cancellationToken);
+ await _nats.PublishNoneAsync(Telemetry.NatsInternalActivities, subject: flowControlReplyTo, cancellationToken: _cancellationToken);
}
if (headers is { Code: 100, MessageText: "FlowControl Request" })
@@ -442,7 +440,19 @@ protected override async ValueTask ReceiveInternalAsync(
ReadOnlySequence? headersBuffer,
ReadOnlySequence payloadBuffer)
{
- var msg = new NatsJSMsg(NatsMsg.Build(subject, replyTo, headersBuffer, payloadBuffer, _nats, _headerParser, _serializer), _context);
+ var msg = new NatsJSMsg(
+ ParseMsg(
+ activitySource: Telemetry.NatsInternalActivities,
+ activityName: "js_receive",
+ subject: subject,
+ replyTo: replyTo,
+ headersBuffer,
+ in payloadBuffer,
+ Connection,
+ Connection.HeaderParser,
+ serializer: _serializer),
+ _context);
+
await _commands.WriteAsync(new NatsJSOrderedPushConsumerMsg { Command = NatsJSOrderedPushConsumerCommand.Msg, Msg = msg }, _cancellationToken).ConfigureAwait(false);
}
diff --git a/src/NATS.Client.JetStream/NatsJSConsumer.cs b/src/NATS.Client.JetStream/NatsJSConsumer.cs
index 8f1d0f224..788d1abaf 100644
--- a/src/NATS.Client.JetStream/NatsJSConsumer.cs
+++ b/src/NATS.Client.JetStream/NatsJSConsumer.cs
@@ -1,3 +1,4 @@
+using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading.Channels;
using NATS.Client.Core;
@@ -282,6 +283,7 @@ public async IAsyncEnumerable> FetchNoWaitAsync(
/// Server responded with an error.
public async ValueTask RefreshAsync(CancellationToken cancellationToken = default) =>
Info = await _context.JSRequestResponseAsync